`
canofy
  • 浏览: 820346 次
  • 性别: Icon_minigender_1
  • 来自: 北京、四川
社区版块
存档分类
最新评论

生产者/消费者模式(阻塞队列)

    博客分类:
  • j2EE
阅读更多
生产消费者模式

貌似也是阻塞的问题

花了一些时间终于弄明白这个鸟东东,以前还以为是不复杂的一个东西的,以前一直以为和观察者模式差不多(其实也是差不多的,呵呵),生产消费者模式应该是可以通过观察者模式来实现的,对于在什么环境下使用现在想的还不是特别清楚,主要是在实际中还没使用过这个。

需要使用到同步,以及线程,属于多并发行列,和观察者模式的差异也就在于此吧,所以实现起来也主要在这里的差异。

参考地址:http://blog.csdn.net/program_think/


在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
 
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据


◇解耦
  假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
  
◇支持并发(concurrency)
  生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
  使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。
  
◇支持忙闲不均
  缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
  

用了两种方式实现了一下这个模式,主要参考了网上的一些例子才弄明白,这里对队列的实现有很多种方法,需要和具体的应用相结合吧,队列缓冲区很简单,现在已有大量的实现,缺点是在性能上面(内存分配的开销和同步/互斥的开销),下面的实现都是这种方式;环形缓冲区(减少了内存分配的开销),双缓冲区(减少了同步/互斥的开销)。
第一个例子是使用的信号量的东东,没有执行具体的东西,只是实现了这个例子,要做复杂的业务逻辑的话需要自己在某些方法内去具体实现
代码如下:

消费者:
public class TestConsumer implements Runnable {
	
	TestQueue obj;
	
	public TestConsumer(TestQueue tq){
		this.obj=tq;
	}

	public void run() {				
		try {
			for(int i=0;i<10;i++){
				obj.consumer();
			}			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

生产者:
public class TestProduct implements Runnable {
	
	TestQueue obj;
	
	public TestProduct(TestQueue tq){
		this.obj=tq;
	}
	
	public void run() {
		for(int i=0;i<10;i++){
			try {
				obj.product("test"+i);
			} catch (Exception e) {				
				e.printStackTrace();
			}
		}
	}

}


队列(使用了信号量,采用synchronized进行同步,采用lock进行同步会出错,或许是还不知道实现的方法):
public static Object signal=new Object();
	boolean bFull=false; 
	private List thingsList=new ArrayList(); 
	private final ReentrantLock lock = new ReentrantLock(true);
	BlockingQueue q = new ArrayBlockingQueue(10);
	/**
	 * 生产
	 * @param thing
	 * @throws Exception
	 */
	public void product(String thing) throws Exception{	
		synchronized(signal){
			if(!bFull){
				bFull=true;
				  //产生一些东西,放到 thingsList 共享资源中
				System.out.println("product");
				System.out.println("仓库已满,正等待消费..."); 
				thingsList.add(thing); 
			    signal.notify(); //然后通知消费者
			}	       
		    signal.wait(); // 然后自己进入signal待召队列
			
		}
		
	}
	
	/**
	 * 消费
	 * @return
	 * @throws Exception
	 */
	public String consumer()throws Exception{			
		synchronized(signal){
			if(!bFull)  {  
					 signal.wait(); // 进入signal待召队列,等待生产者的通知
			}
			bFull=false; 
			// 读取buf 共享资源里面的东西
			System.out.println("consume");
			System.out.println("仓库已空,正等待生产..."); 
			signal.notify(); // 然后通知生产者
		}
		String result="";
		if(thingsList.size()>0){
			result=thingsList.get(thingsList.size()-1).toString();
			thingsList.remove(thingsList.size()-1);
		}
		return result;
	}

测试代码:
public class TestMain {
	public static void main(String[] args) throws Exception{
		TestQueue tq=new TestQueue();
		TestProduct tp=new TestProduct(tq);
		TestConsumer tc=new TestConsumer(tq);
		Thread t1=new Thread(tp);
		Thread t2=new Thread(tc);
		t1.start();
		t2.start();
	}
}


运行结果:
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...

第二种发放使用java.util.concurrent.BlockingQueue类来重写的队列那个类,使用这个方法比较简单,并且性能上也没有什么问题。
这是jdk里面的例子
* class Producer implements Runnable {
 *   private final BlockingQueue queue;
 *   Producer(BlockingQueue q) { queue = q; }
 *   public void run() {
 *     try {
 *       while(true) { queue.put(produce()); }
 *     } catch (InterruptedException ex) { ... handle ...}
 *   }
 *   Object produce() { ... }
 * }
 *
 * class Consumer implements Runnable {
 *   private final BlockingQueue queue;
 *   Consumer(BlockingQueue q) { queue = q; }
 *   public void run() {
 *     try {
 *       while(true) { consume(queue.take()); }
 *     } catch (InterruptedException ex) { ... handle ...}
 *   }
 *   void consume(Object x) { ... }
 * }
 *
 * class Setup {
 *   void main() {
 *     BlockingQueue q = new SomeQueueImplementation();
 *     Producer p = new Producer(q);
 *     Consumer c1 = new Consumer(q);
 *     Consumer c2 = new Consumer(q);
 *     new Thread(p).start();
 *     new Thread(c1).start();
 *     new Thread(c2).start();
 *   }
 * }


jdk1.5以上的一个实现,使用了Lock以及条件变量等东西
 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }
分享到:
评论
1 楼 only_xxp 2010-09-05  
学习了!!!

相关推荐

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    NULL 博文链接:https://xiongjiajia.iteye.com/blog/2325943

    生产者与消费者 进程的同步与互斥模拟

    实验题目: 生产者与消费者(综合性实验) 实验环境: C语言编译器 实验内容: ① 由用户指定要产生的进程及其类别,存入进入就绪队列。   ② 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip

    生产者与消费者问题

    假设M个生产者和N个消费者共享一个具有K(K大于1)个缓冲区的循环缓冲结构BUFFER(提示:可以用一个循环队列或一个整型数组来表示),并设置两个指针IN和OUT,其中IN指向生产者线程当前可用的空缓冲区的在BUFFER中的...

    操作系统实验:生产者与消费者.cpp

    通过实验模拟生产者与消费者之间的关系,了解并掌握他们之间的关系及其原理。由此增加对进程同步的问题的了解。 实验要求: 每个进程有一个进程控制块(PCB)表示。进程控制块可以包含如下信息:进程

    生产者与消费者 进程调度模拟(c++)

    实验题目: 生产者与消费者(综合性实验) 实验环境: C语言编译器 实验内容: ① 由用户指定要产生的进程及其类别,存入进入就绪队列。   ② 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则...

    java模拟阻塞队列

    实现java模拟阻塞队列的例子,该代码包括,阻塞队列实现生产者,消费者。和模拟阻塞队列实现生产者及消费者模式,帮助你更好的理解java多线程

    基于Java实现生产者与消费者算法模拟【100010232】

    本次课程设计选到的题目为生产者消费者算法模拟,通过需求分析和资料搜寻,掌握到生产者/消费者的模式原理和优点,同时也了解到了几种可以实现生产者消费者的方式,如信号量方式,管程方式,阻塞队列方式等。

    JAVA多线程之生产者消费者模型.docx

    那么在这个过程中,生产者和消费者是不直接接触的,所谓的‘货架’其实就是一个阻塞队列,生产者生产的产品不直接给消费者消费,而是仍给阻塞队列,这个阻塞队列就是来解决生产者消费者的强耦合的。就是生产者消费者...

    【Java】Queue、BlockingQueue和队列实现生产者消费者模式

    源码:BlockingQueue实现生产者消费者模式→ 输出结果截图 1. Queue接口 – 队列 public interface Queue extends Collection Collection的子接口,表示队列FIFO(First In First Out) 常用方法: (1)抛出异常...

    使用VC++6.0实现的“操作系统”课程中的生产者-消费者问题

    一、原理 生产者线程: while (true) ...可以选择菜单项“开启线程-&gt;加快(减慢)生产,减慢(加快)消费”来随机调整生产和消费的时间,以观察生产者或消费者线程阻塞的状况。 程序在VC++6.0下编译通过。

    消息分发框架(基于JAVA阻塞队列实现、 生产者消费者模型)

    消息分发框架,基于java阻塞队列实现,生产者消费者模型 可用于任务分发,服务器消息消息,以及网络IO 性能优化,多线程

    操作系统实验 生产者消费者 PV操作

    熟练应用生产者消费者PV操作的实验, 实验内容 1. 由用户指定要产生的进程及其类别,存入进入就绪队列。  2. 调度程序从就绪队列中提取一个就绪进程运行。如果申请的资源被阻塞则进入相应的等待队列,调度程序调度...

    计算机操作系统课程设计报告《生产者---消费者问题》.doc

    生产者和消 费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不 用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队 列里取,阻塞队列就相当于一个缓冲区...

    操作系统生产者与消费者(综合性实验)北林

    实验内容: ...一个就绪队列(ready),两个等待队列:生产者等待队列(producer);消费者等待队列(consumer)。一个链表(over),用于收集已经运行结束的进程 本程序通过函数模拟信号量的原子操作。

    不加锁、非阻塞模式的环形队列

    环形队列,不加锁的生产者消费者模式,使用前提:1,缓冲区设置足够大,2,消费保证足够快

    【每日爬虫】:生产者与消费者模式爬取王者荣耀壁纸

    生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不直接找生产者要数据,而是从阻塞队列里取,阻塞队列就相当于一

    Java并发编程:阻塞队列

     使用非阻塞队列的时候有一个很大问题是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,必须额外地实现同步策略以及线程间唤醒策略,这个实现起来非常麻烦。但是有了阻塞队列不一样了,它会

    concurrentqueue:C ++ 11的快速多生产者,多消费者,无锁的并发队列

    注意:如果您需要的只是一个单一生产者,单一消费者队列,那么我也可以选择。 特征 击倒你的。 单头实现。 只需将其放入您的项目中即可。 完全线程安全的无锁队列。 从任何数量的线程同时使用。 C ++ 11实现-尽...

    深入多线程之:深入生产者、消费者队列分析

    上次我们使用AutoResetEvent实现了一个生产/消费者队列。这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行。 我们使用数组来跟踪线程。 Thread[] _...

Global site tag (gtag.js) - Google Analytics