使用 Java 线程中的 BlockingQueue 实现生产者消费者解决方案
生产者消费者是 Java 并发和多线程中最常见的问题之一。它出现在帮助管理多个线程尝试访问共享资源的同步过程中。本文将帮助我们找到使用 Java 线程中的 BlockingQueue 实现生产者消费者解决方案。
生产者消费者问题和 BlockingQueue
理解生产者消费者问题
生产者和消费者是两个不同的实体或进程,它们使用一个共享队列。此队列是一个固定大小的缓冲区。生产者生成信息片段并将它们存储在队列中。消费者使用给定的信息并将其从队列中移除。
实际问题发生在
当生产者即使缓冲区已满也继续生成数据时。
当消费者尝试移除数据时,缓冲区为空时。
生产者或消费者的速度较慢。
两者都试图同时更新缓冲区。
解决方案
当缓冲区已满时,生产者必须停止数据生成。
当缓冲区为空时,消费者必须停止从缓冲区移除信息。
只有当缓冲区既不为空也不满时,生产者和消费者才能工作。
BlockingQueue
Java 在 'java.util.concurrent' 包中提供了 BlockingQueue 接口。使用此队列的主要优点是,在检索和删除项目时,它会等待队列变为非空。此外,在添加项目时,它会等待可用空间。此功能使其成为生产者消费者解决方案的完美选择。
语法
BlockingQueue< Type > nameOfObject = new LinkedBlockingQueue<>();
这里,LinkedBlockingQueue 是一个实现了 BlockingQueue 接口的类。
使用 Java 中的 BlockingQueue 实现生产者消费者解决方案
方法
创建两个类及其相应的构造函数。这两个类都将扩展 'Thread' 类。第一个类用于生产者,第二个类用于消费者。
在两个类中,定义类型为 'Integer' 的 BlockingQueue 并将其作为参数传递给构造函数。这里,'Integer' 是一个包装类。
在 Producer 类中,我们覆盖内置方法 'run()' 以从 Producer 端生成数据。现在,使用 for 循环迭代 5 次,并使用 'put()' 方法将数据存储到 BlockingQueue 中,间隔为 1 秒。
在 Consumer 类中,再次覆盖内置方法 'run()' 以使用名为 'take()' 的内置方法从 Consumer 端使用数据。
在 main() 方法中,定义 BlockingQueue 的对象并将它们作为参数传递给生产者和消费者类的构造函数。
示例
import java.util.concurrent.*; class Producr extends Thread { protected BlockingQueue<Integer> blcque; Producr(BlockingQueue<Integer> blcque) { // constructor this.blcque = blcque; } public void run() { // overriding run method while (true) { for(int i = 1; i <= 5; i++) { try { System.out.println("Producer is running " + i); blcque.put(i); // to produce data // produce data with an interval of 1 sec Thread.sleep(1000); } // to handle exception catch (InterruptedException exp) { System.out.println("An interruption occurred at Producer"); } } } } } class Consumr extends Thread { protected BlockingQueue<Integer> blcque; Consumr(BlockingQueue<Integer> blcque) { // constructor this.blcque = blcque; } public void run() { // overriding run method try { while (true) { Integer elem = blcque.take(); // to consume data System.out.println("Consumer is running " + elem); } } // to handle exception catch (InterruptedException exp) { System.out.println("An interruption occurred at Producer"); } } } public class Solution { public static void main(String[] args) throws InterruptedException { // create an object of BlockingQueue BlockingQueue<Integer> bufrShr = new LinkedBlockingQueue<>(); // passing object of BlockingQueue as arguments Producr threadProd = new Producr(bufrShr); Consumr threadCon = new Consumr(bufrShr); // to start the process threadProd.start(); threadCon.start(); // to exit the process after 5 sec Thread.sleep(5000); System.exit(0); } }
输出
Producer is running 1 Consumer is running 1 Producer is running 2 Consumer is running 2 Producer is running 3 Consumer is running 3 Producer is running 4 Consumer is running 4 Producer is running 5 Consumer is running 5
Learn Java in-depth with real-world projects through our Java certification course. Enroll and become a certified expert to boost your career.
结论
我们从定义生产者消费者问题开始本文,在下一节中,我们通过引入 BlockingQueue 接口提出了解决此问题的可能解决方案。最后,我们讨论了一个 Java 程序,该程序向我们展示了如何实际使用 BlockingQueue 来解决给定的问题。