使用 Java 线程中的 BlockingQueue 实现生产者消费者解决方案


生产者消费者是 Java 并发和多线程中最常见的问题之一。它出现在帮助管理多个线程尝试访问共享资源的同步过程中。本文将帮助我们找到使用 Java 线程中的 BlockingQueue 实现生产者消费者解决方案。

生产者消费者问题和 BlockingQueue

理解生产者消费者问题

生产者和消费者是两个不同的实体或进程,它们使用一个共享队列。此队列是一个固定大小的缓冲区。生产者生成信息片段并将它们存储在队列中。消费者使用给定的信息并将其从队列中移除。

实际问题发生在

  • 当生产者即使缓冲区已满也继续生成数据时。

  • 当消费者尝试移除数据时,缓冲区为空时。

  • 生产者或消费者的速度较慢。

  • 两者都试图同时更新缓冲区。

解决方案

  • 当缓冲区已满时,生产者必须停止数据生成。

  • 当缓冲区为空时,消费者必须停止从缓冲区移除信息。

  • 只有当缓冲区既不为空也不满时,生产者和消费者才能工作。

BlockingQueue

Java 在 'java.util.concurrent' 包中提供了 BlockingQueue 接口。使用此队列的主要优点是,在检索和删除项目时,它会等待队列变为非空。此外,在添加项目时,它会等待可用空间。此功能使其成为生产者消费者解决方案的完美选择。

语法

BlockingQueue< Type > nameOfObject = new LinkedBlockingQueue<>();

这里,LinkedBlockingQueue 是一个实现了 BlockingQueue 接口的类。

使用 Java 中的 BlockingQueue 实现生产者消费者解决方案

方法

  • 创建两个类及其相应的构造函数。这两个类都将扩展 'Thread' 类。第一个类用于生产者,第二个类用于消费者。

  • 在两个类中,定义类型为 'Integer' 的 BlockingQueue 并将其作为参数传递给构造函数。这里,'Integer' 是一个包装类。

  • 在 Producer 类中,我们覆盖内置方法 'run()' 以从 Producer 端生成数据。现在,使用 for 循环迭代 5 次,并使用 'put()' 方法将数据存储到 BlockingQueue 中,间隔为 1 秒。

  • 在 Consumer 类中,再次覆盖内置方法 'run()' 以使用名为 'take()' 的内置方法从 Consumer 端使用数据。

  • 在 main() 方法中,定义 BlockingQueue 的对象并将它们作为参数传递给生产者和消费者类的构造函数。

示例

Open Compiler
import java.util.concurrent.*; class Producr extends Thread { protected BlockingQueue<Integer> blcque; Producr(BlockingQueue<Integer> blcque) { // constructor this.blcque = blcque; } public void run() { // overriding run method while (true) { for(int i = 1; i <= 5; i++) { try { System.out.println("Producer is running " + i); blcque.put(i); // to produce data // produce data with an interval of 1 sec Thread.sleep(1000); } // to handle exception catch (InterruptedException exp) { System.out.println("An interruption occurred at Producer"); } } } } } class Consumr extends Thread { protected BlockingQueue<Integer> blcque; Consumr(BlockingQueue<Integer> blcque) { // constructor this.blcque = blcque; } public void run() { // overriding run method try { while (true) { Integer elem = blcque.take(); // to consume data System.out.println("Consumer is running " + elem); } } // to handle exception catch (InterruptedException exp) { System.out.println("An interruption occurred at Producer"); } } } public class Solution { public static void main(String[] args) throws InterruptedException { // create an object of BlockingQueue BlockingQueue<Integer> bufrShr = new LinkedBlockingQueue<>(); // passing object of BlockingQueue as arguments Producr threadProd = new Producr(bufrShr); Consumr threadCon = new Consumr(bufrShr); // to start the process threadProd.start(); threadCon.start(); // to exit the process after 5 sec Thread.sleep(5000); System.exit(0); } }

输出

Producer is running 1
Consumer is running 1
Producer is running 2
Consumer is running 2
Producer is running 3
Consumer is running 3
Producer is running 4
Consumer is running 4
Producer is running 5
Consumer is running 5

Learn Java in-depth with real-world projects through our Java certification course. Enroll and become a certified expert to boost your career.

结论

我们从定义生产者消费者问题开始本文,在下一节中,我们通过引入 BlockingQueue 接口提出了解决此问题的可能解决方案。最后,我们讨论了一个 Java 程序,该程序向我们展示了如何实际使用 BlockingQueue 来解决给定的问题。

更新于: 2023年7月20日

1K+ 次浏览

开启你的 职业生涯

通过完成课程获得认证

开始学习
广告