某个线程或模块的代码负责生产数据(工厂),而生产出来的数据却不得不交给另一模块(消费者)来对其进行处理,在这之间使用了队列、栈等类似超市的东西来存储数据(超市),这就抽象除了我们的生产者/消费者模型。
其中,产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者;生产者和消费者之间的中介就叫做缓冲区。
为什么要使用生产者-消费者模型
生产者消费者模型通过一个容器解决生产者和消费者的强耦合(强度相互依赖)问题。生产者消费者彼此间不直接通讯,而通过阻塞队列进行通讯,即生产者生产完数据,不用等待消费者消费数据,直接扔给阻塞队列,消费者不找生产者要数据,而是从阻塞队列里取,阻塞队列相当于一个缓冲区,平衡生产者和消费者的处理能力。这个阻塞队列就是用给生产者和消费者解耦的。
生产者-消费者模型的优点
1 解耦:降低生产者和消费之间的依赖关系
如果不使用邮筒(缓冲区)需要把信件交给邮递员,但是前提是你得认识快递员(相当于生产者消费者的强耦合),万一邮递员换人了你还得重新认识一下(相当于消费者变化导致修改生产者代码)。而对邮筒来说比较固定,你依赖它的成本比较低(相当于和缓冲区之间的弱耦合)。
2 支持并发
即生产者和消费者是两个可以独立的并发主体,互不干扰的运行,从寄信的例子看,如果没有邮筒就需要在路口等待邮递员过来收(相当于生产者阻塞);又或者邮递员挨家挨户的询问谁要寄信(相当于消费者轮询)。不管是那种方法效率都比较低下。
3 支持忙闲不均
如果生产数据的速度时快时慢,缓冲区可以对其进行适当缓冲。当生产的数据太块时,消费者来不及处理,未处理的数据可以暂时存在缓冲区。等生产者的生产速度慢下来,消费者再慢慢处理掉。
例如寄信的例子,假设邮递员一次只能带1000封信,万一某次碰到了中秋节送贺卡,需要邮递的信封数量超过1000封,这个时候邮筒(缓冲区)就派上用场了,邮递员吧来不及带走的信封暂存在邮筒中,等下次再过来拿。
实现方式
1、使用synchronized(wait()和notify())
public class ProducerConsumer {
public static void main(String[] args) {
Resource resource = new Resource();
Producer p1 = new Producer(resource);
Producer p2 = new Producer(resource);
Producer p3 = new Producer(resource);
Consumer c1 = new Consumer(resource);
p1.start();
p2.start();
p3.start();
c1.start();
}
}
//公共资源类
class Resource {
private int number = 0;
private int size = 10;
/**
* 取资源
*/
public synchronized void remove() {
if (number > 0) {
number--;
System.out.println("消费者" + Thread.currentThread().getName() + ":" + number);
notifyAll();
} else {
try {
wait();
System.out.println("消费者" + Thread.currentThread().getName() + "进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 添加资源
*/
public synchronized void add() {
if (number < size) {
number++;
System.out.println("生产者" + Thread.currentThread().getName() + ":" + number);
notifyAll();
} else {
try {
wait();
System.out.println("生产者" + Thread.currentThread().getName() + "进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//生产者
class Producer extends Thread {
private Resource resource;
public Producer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
//消费者
class Consumer extends Thread {
private Resource resource;
public Consumer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
2、使用Lock实现(await()和signal())
public class ProducerConsumer {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
Resource resource = new Resource(lock,producerCondition,consumerCondition);
Producer p1 = new Producer(resource);
Consumer c1 = new Consumer(resource);
Consumer c2 = new Consumer(resource);
Consumer c3 = new Consumer(resource);
p1.start();;
c1.start();
c2.start();
c3.start();
}
}
class Resource{
private int number = 0;
private int size = 10;
private Lock lock;
private Condition producerCondition;
private Condition consumerCondition;
public Resource(Lock lock, Condition producerCondition, Condition consumerCondition) {
this.lock = lock;
this.producerCondition = producerCondition;
this.consumerCondition = consumerCondition;
}
public void add(){
lock.lock();
try {
if (number < size){
number++;
System.out.println(Thread.currentThread().getName()+":"+number);
//唤醒等待的消费者
consumerCondition.signalAll();
}else {
try {
producerCondition.await();
System.out.println(Thread.currentThread().getName()+"线程进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
}
//消费者取资源
public void remove(){
lock.lock();
try {
if (number > 0){
number--;
System.out.println(Thread.currentThread().getName()+":"+number);
//唤醒等待的生产者
producerCondition.signalAll();
}else {
try {
consumerCondition.await();
System.out.println(Thread.currentThread().getName()+"线程进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
}
}
class Producer extends Thread{
private Resource resource;
public Producer(Resource resource){
this.resource = resource;
setName("生产者");
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
class Consumer extends Thread{
private Resource resource;
public Consumer(Resource resource){
this.resource = resource;
setName("消费者");
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
3、阻塞队列实现
/**
* 使用阻塞队列实现
*/
public class ProducerConsumer {
public static void main(String[] args) {
Resource resource = new Resource();
Producer p1 = new Producer(resource);
Producer p2 = new Producer(resource);
Consumer c1 = new Consumer(resource);
p1.start();
p2.start();
c1.start();
}
}
class Resource{
BlockingQueue queue = new LinkedBlockingQueue(10);
//添加资源
public void add() {
try {
queue.put(1);
System.out.println("生产者"+Thread.currentThread().getName()+":"+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费资源
public void remove(){
try {
queue.take();
System.out.println("消费者"+Thread.currentThread().getName()+":"+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Producer extends Thread{
private Resource resource;
public Producer(Resource resource){
this.resource = resource;
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.add();
}
}
}
class Consumer extends Thread{
private Resource resource;
public Consumer(Resource resource){
this.resource = resource;
}
public void run(){
while (true){
try {
Thread.sleep((long) (1000+Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource.remove();
}
}
}
参考: