ZeroMQ - 通信模式



ZeroMQ 是一个高性能的异步消息库,它提供了一套用于生成分布式应用程序的通信模式。它提供了一种简单而结构化的方式来在各种应用程序中实现消息传递模式。以下是 ZeroMQ 的一些基本核心概念:

  • Socket 作为通信端点:ZeroMQ Socket 支持高级消息传递模式,例如 PUB-SUB 和 PUSH-PULL。
  • 异步消息传递:ZeroMQ Socket 异步运行,允许应用程序发送和接收消息而不会阻塞。
  • 可扩展性和性能:ZeroMQ 旨在水平扩展,允许多对多的配置,同时保持高吞吐量和低延迟。

通信模式

以下是通信模式列表:

  • 请求-回复 (REQ-REP)
  • 发布-订阅 (PUB-SUB)
  • 推送-拉取 (PUSH-PULL)
  • 配对 (PAIR)
  • 处理程序-路由器 (DEALER-ROUTER)
  • 路由器-处理程序 (ROUTER-DEALER)
  • XPUB-XSUB

这些通信模式可以以各种方式组合起来创建复杂的分布式系统。ZeroMQ 提供了一个灵活且可扩展的消息传递框架,允许开发者实现从简单的客户端-服务器配置到复杂的分布式架构的任何内容。

请求-回复

这是一种远程服务调用和任务分配通信模式,它将一组客户端连接到一组服务。这意味着它是一个简单的客户端-服务器通信,客户端向服务器发送请求,服务器回复响应。

例如,考虑一个响应客户端HTTP请求的 Web 服务器。

Req-Rep-Pattern

示例

以下是 REQ/REP 的示例。首先,我们连接到服务器并从客户端获取响应。

package com.zeromq.mavenProject;

import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class Practice {
   public static void main(String[] args) {
      try (ZContext context = new ZContext()) {
         System.out.println("Connecting to TP server");

         //  Socket to talk to server
         ZMQ.Socket socket = context.createSocket(SocketType.REQ);
         socket.connect("tcp://127.0.0.1:5555");

         for (int requestNbr = 0; requestNbr != 5; requestNbr++) {
             String request = "Tutorialspoint";
             System.out.println("Sending TP " + requestNbr);
             socket.send(request.getBytes(ZMQ.CHARSET), 0);

             byte[] reply = socket.recv(0);
             System.out.println(
                "Received " + new String(reply, ZMQ.CHARSET) + " " +
                requestNbr
             );
         }
      }
   }
}

输出

Connecting to TP server
Sending TP 0
Received world 0
Sending TP 1
Received world 1
Sending TP 2
Received world 2
Sending TP 3
Received world 3
Sending TP 4
Received world 4

发布-订阅

这是一种数据分发通信模式,它将一组发布者连接到一组订阅者。它用于将消息广播到多个接收者。

例如,考虑一个向多个订阅者广播新闻文章的新闻提要系统。

Pub-Sub-Pattern

示例

在这个示例中,发布者创建一个 PUB socket,绑定到一个端口,并发送消息。订阅者创建一个 SUB socket,连接到发布者,接收消息,并打印收到的消息。

// Publisher (PUB) class
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

// After running the PUB comment it runs SUB.

public class PubSubPublisher {
   public static void main(String[] args) {
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PUB);
      socket.bind("tcp://*:5555");

      while (true) {
         socket.send("Hello, subscribers!".getBytes(), 0);
      }
   }
}

// Subscriber (SUB) class
public class PubSubSubscriber {
   public static void main(String[] args) {
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.SUB);
      socket.connect("tcp://127.0.0.1:5555");
      socket.subscribe("".getBytes()); // Subscribe to all messages

      while (true) {
         byte[] message = socket.recv(0);
         System.out.println("Received message: " + new String(message));
      }
   }
}

输出

Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!
Received message: Hello, subscribers!

推送-拉取

此模式可用于负载均衡和任务分配。推送 socket 将消息发送到拉取 socket,拉取 socket 接收然后处理消息。

例如,考虑一个将任务分配给多个工作节点的负载均衡器。

Push-Pull-Pattern

示例

在这个示例中,推送器创建一个 PUSH socket 并连接到拉取器。拉取器创建一个拉取 socket,将其绑定到端口,从推送器接收工作项,并打印它们。

package com.zeromq.mavenProject;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

// After running the PUSH comment it runs PULL.

public class PushPullPusher {
   public static void main(String[] args) {
      // Create a PUSH socket and connect to the puller
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PUSH);
      socket.connect("tcp://127.0.0.1:5500");

      // Send work items to the puller
      while (true) {
         socket.send("Work item".getBytes(), 0);
      }
   }
}

public class PushPullPuller {
   public static void main(String[] args) {
      // Create a PULL socket and bind it to a port
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PULL);
      socket.bind("tcp://*:5500");

      // Receive work items from the pusher
      while (true) {
         byte[] message = socket.recv(0);
         System.out.println("Received work item: " + new String(message));
      }
   }
}

输出


配对

此模式可用于简单的点对点交互。一对 socket 以点对点方式相互连接,以便两个节点可以双向通信。

例如,考虑一个允许两个用户相互通信的聊天应用程序。

Pair-Pattern

示例

在这个示例中,我们通过创建两个可以相互发送和接收消息的节点来演示 PAIR 模式。我们正在检查连接是双向的还是异步的。如果是,则每个节点都在发送下一条消息之前等待另一个节点响应。

package com.zeromq.mavenProject;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

// After running the PairNode1 comment it runs PairNode2.

public class PairNode1 {
   public static void main(String[] args) {
      // Create a PAIR socket and bind it to a port
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PAIR);
      socket.bind("tcp://*:5000");

      // Receive messages from Node 2 and send responses
      while (true) {
         byte[] message = socket.recv(0);
         System.out.println("Received message: " + new String(message));
         socket.send("Hello, Node 2!".getBytes(), 0);
      }
   }
}

public class PairNode2 {
   public static void main(String[] args) {
      // Create a PAIR socket and connect to Node 1
      Context context = ZMQ.context(1);
      Socket socket = context.socket(ZMQ.PAIR);
      socket.connect("tcp://127.0.0.1:5000");

      // Send messages to Node 1 and receive responses
      while (true) {
         socket.send("Hello, Node 1!".getBytes(), 0);
         byte[] message = socket.recv(0);
         System.out.println("Received response: " + new String(message));
      }
   }
}

输出

Received message: Hello, Node 1!
Received message: Hello, Node 1!
Received message: Hello, Node 1!
Received message: Hello, Node 1!
Received message: Hello, Node 1!

处理程序-路由器

此模式用于实现复杂的分布式系统。处理程序 socket 将消息发送到路由器 socket,路由器 socket 将它们分派给一个或多个已连接的节点。

例如,考虑一个将查询路由到多个节点的分布式数据库系统。

路由器-处理程序

此模式有助于实现复杂的分布式系统。路由器 socket 将消息从一个或多个已连接的处理程序 socket 路由,每个处理程序 socket 将其发送到其各自的节点。

例如,考虑一个将路由到多个边缘服务器的内容分发网络。

XPUB-XSUB

此通信模式可用于创建动态发布-订阅系统。XPUB 是一个特殊的发布者,允许订阅者动态连接和断开连接。XSUB 是一个特殊的订阅者,可以连接到多个发布者。

例如,考虑一个实时分析系统,其中多个发布者将数据发送到多个订阅者。

广告