Java生产者消费者模式实战解析

张开发
2026/4/14 12:30:37 15 分钟阅读

分享文章

Java生产者消费者模式实战解析
Java生产者消费者模式实战解析异步模式传统版改进版阻塞队列异步模式传统版异步模式之生产者/消费者classShareData{privateintnumber0;privateLocklocknewReentrantLock();privateConditionconditionlock.newCondition();publicvoidincrement()throwsException{// 同步代码块加锁lock.lock();try{// 判断 防止虚假唤醒while(number!0){// 等待不能生产condition.await();}// 干活number;System.out.println(Thread.currentThread().getName()\t number);// 通知 唤醒condition.signalAll();}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}publicvoiddecrement()throwsException{// 同步代码块加锁lock.lock();try{// 判断 防止虚假唤醒while(number0){// 等待不能消费condition.await();}// 干活number--;System.out.println(Thread.currentThread().getName()\t number);// 通知 唤醒condition.signalAll();}catch(Exceptione){e.printStackTrace();}finally{lock.unlock();}}}publicclassTraditionalProducerConsumer{publicstaticvoidmain(String[]args){ShareDatashareDatanewShareData();// t1线程生产newThread(()-{for(inti0;i5;i){shareData.increment();}},t1).start();// t2线程消费newThread(()-{for(inti0;i5;i){shareData.decrement();}},t2).start();}}改进版异步模式之生产者/消费者消费队列可以用来平衡生产和消费的线程资源不需要产生结果和消费结果的线程一一对应生产者仅负责产生结果数据不关心数据该如何处理而消费者专心处理结果数据消息队列是有容量限制的满时不会再加入数据空时不会再消耗数据JDK 中各种阻塞队列采用的就是这种模式publicclassdemo{publicstaticvoidmain(String[]args){MessageQueuequeuenewMessageQueue(2);for(inti0;i3;i){intidi;newThread(()-{queue.put(newMessage(id,值id));},生产者i).start();}newThread(()-{while(true){try{Thread.sleep(1000);Messagemessagequeue.take();}catch(InterruptedExceptione){e.printStackTrace();}}},消费者).start();}}//消息队列类Java间线程之间通信classMessageQueue{privateLinkedListMessagelistnewLinkedList();//消息的队列集合privateintcapacity;//队列容量publicMessageQueue(intcapacity){this.capacitycapacity;}//获取消息publicMessagetake(){//检查队列是否为空synchronized(list){while(list.isEmpty()){try{sout(Thread.currentThread().getName():队列为空消费者线程等待);list.wait();}catch(InterruptedExceptione){e.printStackTrace();}}//从队列的头部获取消息返回Messagemessagelist.removeFirst();sout(Thread.currentThread().getName()已消费消息--message);list.notifyAll();returnmessage;}}//存入消息publicvoidput(Messagemessage){synchronized(list){//检查队列是否满while(list.size()capacity){try{sout(Thread.currentThread().getName():队列为已满生产者线程等待);list.wait();}catch(InterruptedExceptione){e.printStackTrace();}}//将消息加入队列尾部list.addLast(message);sout(Thread.currentThread().getName():已生产消息--message);list.notifyAll();}}}finalclassMessage{privateintid;privateObjectvalue;//get set}阻塞队列publicstaticvoidmain(String[]args){ExecutorServiceconsumerExecutors.newFixedThreadPool(1);ExecutorServiceproducerExecutors.newFixedThreadPool(1);BlockingQueueIntegerqueuenewSynchronousQueue();producer.submit(()-{try{System.out.println(生产...);Thread.sleep(1000);queue.put(10);}catch(InterruptedExceptione){e.printStackTrace();}});consumer.submit(()-{try{System.out.println(等待消费...);Integerresultqueue.take();System.out.println(结果为:result);}catch(InterruptedExceptione){e.printStackTrace();}});}

更多文章