Hazelcast - IQueue



java.util.concurrent.BlockingQueue 提供了一个接口,支持 JVM 中的线程以不同的速率生成和使用消息。生产者会根据可用容量阻塞,消费者会阻塞等待队列中出现可用元素。

类似地,IQueue 扩展了 BlockingQueue,使其成为分布式版本。它提供类似的功能:put、take 等。

关于 IQueue 需要注意的一点是,与其他集合不同,数据不会被分区。所有数据都存储/存在于单个 JVM 上。所有 JVM 仍然可以访问数据,但队列无法扩展到单个机器/JVM 之外。如果元素数量超过可用内存,则会抛出 OutOfMemoryException。

队列支持同步备份和异步备份。同步备份确保即使持有队列的 JVM 宕机,所有元素都将被保留并可从备份中访问。

让我们来看一个有用功能的示例。

添加元素和读取元素

让我们在 3 个 JVM 上执行以下代码。在一个 JVM 上运行生产者代码,在另外两个 JVM 上运行消费者代码。

示例

第一部分是生产者代码,它创建一个队列并将项目添加到其中。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a queue
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   String[] fruits = {"Mango", "Apple", "Banana", "Watermelon"};
   for (String fruit : fruits) {
      System.out.println("Producing: " + fruit);
      Thread.sleep(1000);
   }
   System.exit(0);
}

第二部分是消费者代码,它读取元素。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IQueue<String> hzFruits = hazelcast.getQueue("fruits");
   while(!hzFruits.isEmpty()) {
   System.out.println("Consuming: " + hzFruits.take());
      Thread.sleep(2000);
   }
   System.exit(0);
}

输出

生产者代码的输出显示它无法添加现有元素。

Producing Mango
Producing Apple
Producing Banana
Producing Watermelon

第一个消费者的代码输出显示它消耗了部分数据。

Consuming Mango
Consuming Banana

第二个消费者的代码输出显示它消耗了剩余的数据:

Consuming Apple
Consuming Watermelon

有用方法

序号 函数名称和描述
1

add(Type element)

向列表中添加元素

2

remove(Type element)

从列表中删除元素

3

poll()

返回队列的头部,如果队列为空则返回 NULL

4

take()

返回队列的头部,或等到元素可用

5

size()

返回列表中元素的数量

6

contains(Type element)

返回元素是否存在

7

getPartitionKey()

返回保存列表的分区键

6

addItemListener(ItemListener<Type>listener, value)

通知订阅者列表中添加/删除/修改了元素。

hazelcast_data_structures.htm
广告