使用 Java 线程中的 BlockingQueue 实现生产者消费者解决方案
生产者消费者是 Java 并发和多线程中最常见的问题之一。它出现在帮助管理多个线程尝试访问共享资源的同步过程中。本文将帮助我们找到使用 Java 线程中的 BlockingQueue 实现生产者消费者解决方案。
生产者消费者问题和 BlockingQueue
理解生产者消费者问题
生产者和消费者是两个不同的实体或进程,它们使用一个共享队列。此队列是一个固定大小的缓冲区。生产者生成信息片段并将它们存储在队列中。消费者使用给定的信息并将其从队列中移除。
实际问题发生在
当生产者即使缓冲区已满也继续生成数据时。
当消费者尝试移除数据时,缓冲区为空时。
生产者或消费者的速度较慢。
两者都试图同时更新缓冲区。
解决方案
当缓冲区已满时,生产者必须停止数据生成。
当缓冲区为空时,消费者必须停止从缓冲区移除信息。
只有当缓冲区既不为空也不满时,生产者和消费者才能工作。
BlockingQueue
Java 在 'java.util.concurrent' 包中提供了 BlockingQueue 接口。使用此队列的主要优点是,在检索和删除项目时,它会等待队列变为非空。此外,在添加项目时,它会等待可用空间。此功能使其成为生产者消费者解决方案的完美选择。
语法
BlockingQueue< Type > nameOfObject = new LinkedBlockingQueue<>();
这里,LinkedBlockingQueue 是一个实现了 BlockingQueue 接口的类。
使用 Java 中的 BlockingQueue 实现生产者消费者解决方案
方法
创建两个类及其相应的构造函数。这两个类都将扩展 'Thread' 类。第一个类用于生产者,第二个类用于消费者。
在两个类中,定义类型为 'Integer' 的 BlockingQueue 并将其作为参数传递给构造函数。这里,'Integer' 是一个包装类。
在 Producer 类中,我们覆盖内置方法 'run()' 以从 Producer 端生成数据。现在,使用 for 循环迭代 5 次,并使用 'put()' 方法将数据存储到 BlockingQueue 中,间隔为 1 秒。
在 Consumer 类中,再次覆盖内置方法 'run()' 以使用名为 'take()' 的内置方法从 Consumer 端使用数据。
在 main() 方法中,定义 BlockingQueue 的对象并将它们作为参数传递给生产者和消费者类的构造函数。
示例
import java.util.concurrent.*;
class Producr extends Thread {
protected BlockingQueue<Integer> blcque;
Producr(BlockingQueue<Integer> blcque) { // constructor
this.blcque = blcque;
}
public void run() { // overriding run method
while (true) {
for(int i = 1; i <= 5; i++) {
try {
System.out.println("Producer is running " + i);
blcque.put(i); // to produce data
// produce data with an interval of 1 sec
Thread.sleep(1000);
}
// to handle exception
catch (InterruptedException exp) {
System.out.println("An interruption occurred at Producer");
}
}
}
}
}
class Consumr extends Thread {
protected BlockingQueue<Integer> blcque;
Consumr(BlockingQueue<Integer> blcque) { // constructor
this.blcque = blcque;
}
public void run() { // overriding run method
try {
while (true) {
Integer elem = blcque.take(); // to consume data
System.out.println("Consumer is running " + elem);
}
}
// to handle exception
catch (InterruptedException exp) {
System.out.println("An interruption occurred at Producer");
}
}
}
public class Solution {
public static void main(String[] args) throws InterruptedException {
// create an object of BlockingQueue
BlockingQueue<Integer> bufrShr = new LinkedBlockingQueue<>();
// passing object of BlockingQueue as arguments
Producr threadProd = new Producr(bufrShr);
Consumr threadCon = new Consumr(bufrShr);
// to start the process
threadProd.start();
threadCon.start();
// to exit the process after 5 sec
Thread.sleep(5000);
System.exit(0);
}
}
输出
Producer is running 1 Consumer is running 1 Producer is running 2 Consumer is running 2 Producer is running 3 Consumer is running 3 Producer is running 4 Consumer is running 4 Producer is running 5 Consumer is running 5
结论
我们从定义生产者消费者问题开始本文,在下一节中,我们通过引入 BlockingQueue 接口提出了解决此问题的可能解决方案。最后,我们讨论了一个 Java 程序,该程序向我们展示了如何实际使用 BlockingQueue 来解决给定的问题。
数据结构
网络
RDBMS
操作系统
Java
iOS
HTML
CSS
Android
Python
C 编程
C++
C#
MongoDB
MySQL
Javascript
PHP