Hazelcast - IMap



java.util.concurrent.Map 提供了一个接口,支持在单个 JVM 中存储键值对。而 java.util.concurrent.ConcurrentMap 扩展了它,以支持在单个 JVM 中使用多个线程时的线程安全。

类似地,IMap 扩展了 ConcurrentHashMap 并提供了一个接口,使映射在多个 JVM 之间线程安全。它提供类似的功能:put、get 等。

IMap 支持同步备份和异步备份。同步备份确保即使保存队列的 JVM 宕机,所有元素都将被保留并可从备份中获取。

让我们来看一个有用函数的例子。

创建 & 读/写

添加元素和读取元素。让我们在两个 JVM 上执行以下代码。一个 JVM 上运行生产者代码,另一个 JVM 上运行消费者代码。

示例

第一部分是生产者代码,它创建了一个映射并将项目添加到其中。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a map
   IMap<String, String> hzStock = hazelcast.getMap("stock");
   hzStock.put("Mango", "4");
   hzStock.put("Apple", "1");
   hzStock.put("Banana", "7");
   hzStock.put("Watermelon", "10");
   Thread.sleep(5000);
   System.exit(0);
}

第二部分是消费者代码,它读取元素。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a map
   IMap<String, String> hzStock = hazelcast.getMap("stock");
   for(Map.Entry<String, String> entry: hzStock.entrySet()){
      System.out.println(entry.getKey() + ":" + entry.getValue());
   }
   Thread.sleep(5000);
   System.exit(0);
}

输出

消费者代码的输出 -

Mango:4
Apple:1
Banana:7
Watermelon:10

有用方法

序号 函数名称 & 描述
1

put(K key, V value)

向映射中添加一个元素

2

remove(K key)

从映射中删除一个元素

3

keySet()

返回映射中所有键的副本

4

localKeySet()

返回本地分区中存在的所有键的副本

5

values()

返回映射中所有值的副本

6

size()

返回映射中元素的数量

7

containsKey(K key)

如果键存在则返回 true

8

executeOnEnteries(EntryProcessor processor)

对映射的所有键应用处理器并返回此应用的输出。我们将在下一节中查看一个示例。

9

addEntryListener(EntryListener listener, value)

在映射中添加/删除/修改元素时通知订阅者。

10

addLocalEntryListener(EntryListener listener, value)

在本地分区中添加/删除/修改元素时通知订阅者

逐出

默认情况下,Hazelcast 中的键会无限期地保留在 IMap 中。如果我们有一组非常大的键,那么我们需要确保与不常使用的键相比,经常使用的键存储在 IMap 中,以获得更好的性能和有效的内存使用。

为此,可以通过 remove()/evict() 函数手动删除不常使用的键。但是,Hazelcast 还提供基于各种逐出算法的键自动逐出。

可以通过 XML 或以编程方式设置此策略。让我们来看一个例子 -

<map name="stock">
   <max-size policy="FREE_HEAP_PERCENTAGE">30</max-size>
   <eviction-policy>LFU</eviction-policy>
</map>

上述配置中有两个属性。

  • Max-size - 用于告知 Hazelcast 映射“stock”达到最大大小的限制的策略。

  • Eviction-policy - 一旦达到上述 max-size 策略,使用什么算法来删除/逐出键。

以下是一些有用的 max_size 策略。

序号 最大大小策略 & 描述
1

PER_NODE

每个 JVM 映射的最大条目数,这是默认策略。

2

FREE_HEAP

在 JVM 中保留的最小空闲堆内存(以 MB 为单位)

3

FREE_HEAP_PERCENTAGE

在 JVM 中保留的最小空闲堆内存(以百分比表示)

4

take()

返回队列的头或等待元素可用

5

USED_HEAP

JVM 中允许使用的最大堆内存(以 MB 为单位)

6

USED_HEAP_PERCENTAGE

JVM 中允许使用的最大堆内存(以百分比表示)

以下是一些有用的逐出策略 -

序号 逐出策略 & 描述
1

NONE

不会进行任何逐出,这是默认策略

2

LFU

最不常使用将被逐出

3

LRU

最近最少使用的键将被逐出

另一个有用的逐出参数是生存时间(秒),即 TTL。有了它,我们可以要求 Hazelcast 删除任何超过 X 秒的键。这确保了我们在达到最大大小策略之前主动删除旧键。

分区数据和高可用性

关于 IMap 需要注意的一点是,与其他集合不同,数据在多个 JVM 之间进行分区。所有数据都不需要存储/存在于单个 JVM 上。所有 JVM 仍然可以访问完整的数据。这使 Hazelcast 能够在线性扩展到可用的 JVM,并且不受单个 JVM 内存的限制。

IMap 实例被分成多个分区。默认情况下,映射被分成 271 个分区。这些分区分布在可用的 Hazelcast 成员中。添加到映射中的每个条目都存储在一个分区中。

让我们在两个 JVM 上执行此代码。

public static void main(String... args) throws IOException, InterruptedException {
   //initialize hazelcast instance
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   // create a map
   IMap<String, String> hzStock = hazelcast.getMap("stock");
   hzStock.put("Mango", "4");
   hzStock.put("Apple", "1");
   hzStock.put("Banana", "7");
   hzStock.put("Watermelon", "10");
   Thread.sleep(5000);
   // print the keys which are local to these instance
   hzStock.localKeySet().forEach(System.out::println);
   System.exit(0);
}

输出

如以下输出所示,消费者 1 打印它自己的包含 2 个键的分区 -

Mango
Watermelon

消费者 2 拥有包含其他 2 个键的分区 -

Banana
Apple

默认情况下,IMap 有一个同步备份,这意味着即使一个节点/成员宕机,数据也不会丢失。备份有两种类型。

  • 同步 - 在键也在另一个节点/成员上备份之前,map.put(key, value) 不会成功。同步备份是阻塞的,因此会影响 put 调用的性能。

  • 异步 - 存储键的备份最终会执行。异步备份是非阻塞且快速的,但如果成员宕机,它们不能保证数据的持久性。

可以使用 XML 配置来配置值。例如,让我们为我们的 stock 映射执行此操作 -

<map name="stock">
   <backup-count>1</backup-count>
   <async-backup-count>1<async-backup-count>
</map>

哈希码和相等

示例

在基于 Java 的 HashMap 中,键比较通过检查 hashCode() 和 equals() 方法的相等性来进行。例如,车辆可能有序列号和型号以使其简单。

public class Vehicle implements Serializable{
   private static final long serialVersionUID = 1L;
   private int serialId;
   private String model;

   public Vehicle(int serialId, String model) {
      super();
      this.serialId = serialId;
      this.model = model;
   }
   public int getId() {
      return serialId;
   }
   public String getModel() {
      return model;
   }
   @Override
   public int hashCode() {
      final int prime = 31;
      int result = 1;
      result = prime * result + serialId;
      return result;
   }
   @Override
   public boolean equals(Object obj) {
      if (this == obj)
         return true;
      if (obj == null)
         return false;
      if (getClass() != obj.getClass())
         return false;
      Vehicle other = (Vehicle) obj;
      if (serialId != other.serialId)
         return false;
         return true;
   }
}

当我们尝试将上述类用作 HashMap 和 IMap 的键时,我们会看到比较中的差异。

public static void main(String... args) throws IOException, InterruptedException {
   // create a Java based hash map
   Map<Vehicle, String> vehicleOwner = new HashMap<>();
   Vehicle v1 = new Vehicle(123, "Honda");
   vehicleOwner.put(v1, "John");
        
   Vehicle v2 = new Vehicle(123, null);
   System.out.println(vehicleOwner.containsKey(v2));
   
   // create a hazelcast map
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IMap<Vehicle, String> hzVehicleOwner = hazelcast.getMap("owner");
   hzVehicleOwner.put(v1, "John");
   System.out.println(hzVehicleOwner.containsKey(v2));
   System.exit(0);
}

现在,为什么 Hazelcast 会给出 false 作为答案?

Hazelcast 序列化键并将其以二进制格式存储为字节数组。由于这些键被序列化,因此无法基于 equals() 和 hashcode() 进行比较。

在 Hazelcast 的情况下需要序列化和反序列化,因为 get()、containsKey() 等函数可能在不拥有键的节点上调用,因此需要远程调用。

序列化和反序列化是昂贵的操作,因此,Hazelcast 不使用 equals() 方法,而是比较字节数组。

这意味着 Vehicle 类的所有属性都必须匹配,而不仅仅是 id。因此,让我们执行以下代码 -

示例

public static void main(String... args) throws IOException, InterruptedException {
   Vehicle v1 = new Vehicle(123, "Honda");
   // create a hazelcast map
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IMap<Vehicle, String> hzVehicleOwner = hazelcast.getMap("owner");
   Vehicle v3 = new Vehicle(123, "Honda");
   System.out.println(hzVehicleOwner.containsKey(v3));
   System.exit(0);
}

输出

上述代码的输出为 -

true

此输出表示 Vehicle 的所有属性都必须匹配才能相等。

EntryProcessor

EntryProcessor 是一种构造,支持将代码发送到数据而不是将数据带到代码。它支持序列化、传输和在拥有 IMap 键的节点上执行函数,而不是将数据带到启动函数执行的节点。

示例

让我们用一个例子来理解这一点。假设我们创建了一个 Vehicle -> Owner 的 IMap。现在,我们想存储所有者的“小写”。那么我们该如何做到这一点呢?

public static void main(String... args) throws IOException, InterruptedException {
   // create a hazelcast map
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IMap<Vehicle, String> hzVehicleOwner = hazelcast.getMap("owner");
   hzVehicleOwner.put(new Vehicle(123, "Honda"), "John");
   hzVehicleOwner.put(new Vehicle(23, "Hyundai"), "Betty");
   hzVehicleOwner.put(new Vehicle(103, "Mercedes"), "Jane");
   for(Map.Entry<Vehicle, String> entry:  hzVehicleOwner.entrySet())
   hzVehicleOwner.put(entry.getKey(), entry.getValue().toLowerCase());
   for(Map.Entry<Vehicle, String> entry: hzVehicleOwner.entrySet())
   System.out.println(entry.getValue());
   System.exit(0);
}

输出

上述代码的输出为 -

john
jane
betty

虽然此代码看起来很简单,但如果键的数量很大,它在可扩展性方面存在主要缺点 -

  • 处理将在单个/调用者节点上发生,而不是分布在各个节点上。

  • 需要更多时间和内存才能在调用者节点上获取键信息。

这就是 EntryProcessor 发挥作用的地方。我们将转换为小写的函数发送到每个拥有键的节点。这使得处理并行化并控制内存需求。

示例

public static void main(String... args) throws IOException, InterruptedException {
   // create a hazelcast map
   HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();
   IMap<Vehicle, String> hzVehicleOwner = hazelcast.getMap("owner");
   hzVehicleOwner.put(new Vehicle(123, "Honda"), "John");
   hzVehicleOwner.put(new Vehicle(23, "Hyundai"), "Betty");
   hzVehicleOwner.put(new Vehicle(103, "Mercedes"), "Jane");
   hzVehicleOwner.executeOnEntries(new OwnerToLowerCaseEntryProcessor());
   for(Map.Entry<Vehicle, String> entry: hzVehicleOwner.entrySet())
   System.out.println(entry.getValue());
   System.exit(0);
}
static class OwnerToLowerCaseEntryProcessor extends
AbstractEntryProcessor<Vehicle, String> {
   @Override
   public Object process(Map.Entry<Vehicle, String> entry) {
      String ownerName = entry.getValue();
      entry.setValue(ownerName.toLowerCase());
      return null;
   }
}

输出

上述代码的输出为 -

john
jane
betty
hazelcast_data_structures.htm
广告