java并发锁机制-ReentrantLock Condtion准备篇

根据网上的说法,jdk并发包中的 Condition await 与 signal 实现了 Object.wait notify 语义。以下总结,是基于Condition await,singal方法的实现原理总结出来的:

  • monitorObject.wait,该方法调用必须在临界区中(锁保护的代码段)被调用,线程如果在临界区中调用监视器的wait方法,然后线程会释放占有监视器monitorObject的锁,然后阻塞(等待条件的发生,该线程会保存在monitorObject的条件等待队列,当该线程收到信号或中断被唤醒后,首先需要尝试获取监视器的锁,然后继续执行操作,如果是被中断,需要在获取锁后,才会被中断。)
  • monitorObject.notify,该方法调用同样只能在临界区中被调用,锁的释放,在执行完临界区后,才会释放。根据Condition.singal实现机制,首先唤醒,是先将线程从条件等待队列放入到同步阻塞队列,然后执行完临界区代码后,释放锁,其他线程竞争锁。

为了对Condition await 与 signal 方法有一个直接的了解,现给出一个简单的生产者、消费者测试示例:

package persistent.prestige.study.concurent.bread;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestMain {
    public static void main(String[] args) {
        BreadContainerByObject container = new BreadContainerByObject();
        for (int i = 0; i < 5; i++) {
            new Thread(new Producers(container)).start();
        }
        for (int i = 0; i < 3; i++) {
            new Thread(new Customer(container)).start();
        }
    }
}
interface BreadContainer extends Serializable {
    public void put(Bread b) throws InterruptedException;
    public Bread poll() throws InterruptedException;
}
/**
 * 基于 Reentrant Condition实现
 * @author dingwei2
 *
 */
@SuppressWarnings("serial")
class BreadContainerByCondition implements BreadContainer {
    private Lock lock = new ReentrantLock();
    private Condition NotFull = lock.newCondition();
    private Condition NotEmpty = lock.newCondition();
    // 面包容器
    private List<Bread> breads = new ArrayList<Bread>();
    private static final int MAX = 20;
    private volatile int num = 0;
    @Override
    public void put(Bread b) throws InterruptedException {
        // TODO Auto-generated method stub
        try {
            lock.lock();
            while(breads.size() >= MAX ) { //已经满了
                NotFull.await();
            }
            b.setId(num ++);
            breads.add(b);
            //放入一个元素后,NotEmpty
            NotEmpty.signalAll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Bread poll() throws InterruptedException{
        try {
            lock.lock();
            while(breads.isEmpty()) {//如果为空
                NotEmpty.await();
            }
            Bread b = breads.remove(breads.size() -1);
            NotFull.signalAll();
            return b;    
        } finally {
            lock.unlock();
        }
    }
}
/**
 * 基于 Object.notify Object.wait
 * @author dingwei2
 *
 */
@SuppressWarnings("serial")
class BreadContainerByObject implements BreadContainer{
    // 面包容器
    private List<Bread> breads = new ArrayList<Bread>();
    private static final int MAX = 20;
    private volatile int num = 0;
    public void put(Bread b) {
        synchronized (breads) {
            while(breads.size() >= MAX) {
                try {
                    breads.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();//这里不应该 将 InterruptedExcepiton 吞掉
                }
            }
            b.setId(num ++);
            breads.add(b);
            breads.notifyAll();
        }
    }
    public Bread poll() {
        Bread b = null;
        synchronized (breads) {
            while(breads.size() < 1) {
                try {
                    breads.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();//这里不应该 将 InterruptedExcepiton 吞掉
                }
            }
            b = breads.remove(breads.size() -1);
            breads.notifyAll();
        }
        return b;
    }
}

/**
 * 生产者
 * 
 * @author dingwei2
 *
 */
class Producers implements Runnable {

    private BreadContainerByObject container;

    public Producers(BreadContainerByObject container) {
        this.container = container;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        for (int i = 0; i < 5; i++) {
            Bread b = new Bread();
            b.setFactoryName(Thread.currentThread().getName());
            container.put(b);
        }

    }

}

/**
 * 消费者
 * @author dingwei2
 *
 */
class Customer implements Runnable {

    public BreadContainerByObject container;

    public Customer(BreadContainerByObject container) {
        this.container = container;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        for(int i = 0; i < 5; i ++ ) {
            Bread b = container.poll();
            System.out.println(Thread.currentThread().getName() + "消费了" + b.toString());
        }
        
    }
}



@SuppressWarnings("serial")
class Bread implements Serializable {
    
    private Integer id;
    
    private String factoryName;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "面包:" + (id == null ? 0 : id.intValue()) + ";生产工厂:" + getFactoryName();
    }

    public String getFactoryName() {
        return factoryName;
    }

    public void setFactoryName(String factoryName) {
        this.factoryName = factoryName;
    }
}

使用Reentrant Condition ,细化了消息通知的粒度,比如使用了当队列中产品时,通过 NotEmpty 条件来唤醒消费者,当队列还有可用的空间存放产品时,使用 NotFull 条件来唤醒生产者,使用两个条件队列,确保被唤醒的线程的准确性,加入到同步队列的节点,在该节点获取到锁后,确实是满足条件的(特别在临界情况的时候)。而Object.wait, Object.notify , 生产者,消费者在同一个条件队列中排队。

版权信息:本文由中间件兴趣圈创作

禁止非授权转载,违着依法追究相关法律责任

如需转载,请联系 codingw@126.com