java并发锁机制-ReentrantLock Condtion准备篇
根据网上的说法,jdk并发包中的 Condition await 与 signal 实现了 Object.wait notify 语义。以下总结,是基于Condition await,singal方法的实现原理总结出来的:
- monitorObject.wait,该方法调用必须在临界区中(锁保护的代码段)被调用,线程如果在临界区中调用监视器的wait方法,然后线程会释放占有监视器monitorObject的锁,然后阻塞(等待条件的发生,该线程会保存在monitorObject的条件等待队列,当该线程收到信号或中断被唤醒后,首先需要尝试获取监视器的锁,然后继续执行操作,如果是被中断,需要在获取锁后,才会被中断。)
- monitorObject.notify,该方法调用同样只能在临界区中被调用,锁的释放,在执行完临界区后,才会释放。根据Condition.singal实现机制,首先唤醒,是先将线程从条件等待队列放入到同步阻塞队列,然后执行完临界区代码后,释放锁,其他线程竞争锁。
为了对Condition await 与 signal 方法有一个直接的了解,现给出一个简单的生产者、消费者测试示例:
package persistent.prestige.study.concurent.bread;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestMain {
public static void main(String[] args) {
BreadContainerByObject container = new BreadContainerByObject();
for (int i = 0; i < 5; i++) {
new Thread(new Producers(container)).start();
}
for (int i = 0; i < 3; i++) {
new Thread(new Customer(container)).start();
}
}
}
interface BreadContainer extends Serializable {
public void put(Bread b) throws InterruptedException;
public Bread poll() throws InterruptedException;
}
/**
* 基于 Reentrant Condition实现
* @author dingwei2
*
*/
@SuppressWarnings("serial")
class BreadContainerByCondition implements BreadContainer {
private Lock lock = new ReentrantLock();
private Condition NotFull = lock.newCondition();
private Condition NotEmpty = lock.newCondition();
// 面包容器
private List<Bread> breads = new ArrayList<Bread>();
private static final int MAX = 20;
private volatile int num = 0;
@Override
public void put(Bread b) throws InterruptedException {
// TODO Auto-generated method stub
try {
lock.lock();
while(breads.size() >= MAX ) { //已经满了
NotFull.await();
}
b.setId(num ++);
breads.add(b);
//放入一个元素后,NotEmpty
NotEmpty.signalAll();
} finally {
lock.unlock();
}
}
@Override
public Bread poll() throws InterruptedException{
try {
lock.lock();
while(breads.isEmpty()) {//如果为空
NotEmpty.await();
}
Bread b = breads.remove(breads.size() -1);
NotFull.signalAll();
return b;
} finally {
lock.unlock();
}
}
}
/**
* 基于 Object.notify Object.wait
* @author dingwei2
*
*/
@SuppressWarnings("serial")
class BreadContainerByObject implements BreadContainer{
// 面包容器
private List<Bread> breads = new ArrayList<Bread>();
private static final int MAX = 20;
private volatile int num = 0;
public void put(Bread b) {
synchronized (breads) {
while(breads.size() >= MAX) {
try {
breads.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();//这里不应该 将 InterruptedExcepiton 吞掉
}
}
b.setId(num ++);
breads.add(b);
breads.notifyAll();
}
}
public Bread poll() {
Bread b = null;
synchronized (breads) {
while(breads.size() < 1) {
try {
breads.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();//这里不应该 将 InterruptedExcepiton 吞掉
}
}
b = breads.remove(breads.size() -1);
breads.notifyAll();
}
return b;
}
}
/**
* 生产者
*
* @author dingwei2
*
*/
class Producers implements Runnable {
private BreadContainerByObject container;
public Producers(BreadContainerByObject container) {
this.container = container;
}
@Override
public void run() {
// TODO Auto-generated method stub
for (int i = 0; i < 5; i++) {
Bread b = new Bread();
b.setFactoryName(Thread.currentThread().getName());
container.put(b);
}
}
}
/**
* 消费者
* @author dingwei2
*
*/
class Customer implements Runnable {
public BreadContainerByObject container;
public Customer(BreadContainerByObject container) {
this.container = container;
}
@Override
public void run() {
// TODO Auto-generated method stub
for(int i = 0; i < 5; i ++ ) {
Bread b = container.poll();
System.out.println(Thread.currentThread().getName() + "消费了" + b.toString());
}
}
}
@SuppressWarnings("serial")
class Bread implements Serializable {
private Integer id;
private String factoryName;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
@Override
public String toString() {
return "面包:" + (id == null ? 0 : id.intValue()) + ";生产工厂:" + getFactoryName();
}
public String getFactoryName() {
return factoryName;
}
public void setFactoryName(String factoryName) {
this.factoryName = factoryName;
}
}
使用Reentrant Condition ,细化了消息通知的粒度,比如使用了当队列中产品时,通过 NotEmpty 条件来唤醒消费者,当队列还有可用的空间存放产品时,使用 NotFull 条件来唤醒生产者,使用两个条件队列,确保被唤醒的线程的准确性,加入到同步队列的节点,在该节点获取到锁后,确实是满足条件的(特别在临界情况的时候)。而Object.wait, Object.notify , 生产者,消费者在同一个条件队列中排队。