Java高并发24-使用自定义锁生成一个消费模型
一、使用自定义锁实现生成--消费模型
下面我们使用上节自定义的锁实现一个简单的生产--消费模型,代码如下:
package com.ruigege.LockSourceAnalysis6;
import java.util.Queue;import java.util.concurrent.locks.Condition;
public class Test { final static NonReentrantLock lock = new NonReentrantLock(); final static Condition notFull = lock.newCondition(); final static Condition notEmpty = lock.newCondition();
final static Queue<String> queue = new LinkedBlockingQueue<String>(); final static int queueSize = 10;
public static void main(String[] args) { Thread producer = new Thread(new Runnable() { public void run() { // 获取独占锁 lock.lock(); try { // (1)如果队列满了,则等待 while(queue.size() == queueSize) { notEmpty.await(); } // (2)添加元素到队列 queue.add("ele");
// (3)唤醒消费线程 notFull.signalAll(); }catch(Exception e) { e.printStackTrace(); }finally { // 释放锁 lock.unlock(); } } });
Thread consumer = new Thread(new Runnable() { public void run() { // 获取独占锁 lock.lock(); try { // 队列空,则等待 while(0 == queue.size()) { notFull.await(); } // 消费一个元素 String ele = queue.poll(); // 唤醒生产线程 notEmpty.signalAll(); }catch(Exception e) { e.printStackTrace(); }finally { // 释放锁 lock.unlock(); } } }); // 启动线程 producer.start(); consumer.start(); }
}
如上代码首先创建了一个NonReentrantLock的一个对象lock,然后调用lock.newCondition创建了两个条件变量,用来进行生产者和消费者线程之间的同步。 在main函数中,首先创建了producer生产线程,在线程内部首先调用lock.lock()获取独占锁,然后判断当前队列是否已经满了,如果满了则调用notEmpty.await()阻塞挂起当前线程,需要注意的是,这里使用了while而不是if是为了避免虚假唤醒,如果队列不满则直接向队列里面添加元素,然后调用notFull.signalAll()唤醒所有因为消费元素而被i阻塞的消费线程,最后释放获取的锁。
二.使用自定义锁实现一个消费模型
package com.ruigege.LockSourceAnalysis6;
import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;
public class NonReentrantLockME implements Lock,java.io.Serializable{ // 内部帮助类 private static class Sync extends AbstractQueueSynchronizer { // 是否锁已经被持有 protected boolean isHeldExclusively() { return getState() == 1; }
// 如果state为0,则尝试获取锁 public boolean tryAcquire(int acquires) { assert acquires == 1; if(compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
// 尝试释放锁,设置state为0 protected boolean tryRelease(int release) { assert releases == 1; if(getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; }
// 提供条件变量接口 Condition newConditon() { return new ConditionObject(); } }
// 创建一个Sync来做具体的工作 private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1);
} public Condition newCondition() { return sync.newConditon(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(timeout)); }}
使用NonReentrantLock创建一个实例,然后调用newCondition方法来生成两个条件变量来进行生产者和消费者线程之间的同步。 在main函数中,首先创建了producer生产线程,在线程内部先获取了独占锁,然后看一下队列是否满了,如果满了,那就阻塞当前线程,如果没有满直接在队列中加入队列中,这里使用的while循环而不是使用if语句,这是为了避免虚假唤醒。然后调用notFull.sinalAll()唤醒所有因为消费元素而被阻塞的消费线程,最后释放了锁。 在main函数中创建了consumer线程,先获取独占锁,先判断队列有没有元素,如果没有元素,那么就先挂起当前线程,这里使用了while是为了避免虚假唤醒,如果队列中不为空,那么就拿出一个元素,然后唤醒因为队列满而被阻塞的生产线程,最后释放获取的锁。
赞 (0)