- RabbitMQ 教程
- RabbitMQ - 首页
- RabbitMQ - 概述
- RabbitMQ - 环境搭建
- RabbitMQ - 特性
- RabbitMQ - 安装
- 基于队列的示例
- RabbitMQ - 生产者应用
- RabbitMQ - 消费者应用
- RabbitMQ - 测试应用
- 基于主题的示例
- RabbitMQ - 发布者应用
- RabbitMQ - 订阅者应用
- RabbitMQ - 测试应用
- RabbitMQ 有用资源
- RabbitMQ 快速指南
- RabbitMQ - 有用资源
- RabbitMQ - 讨论
RabbitMQ 快速指南
RabbitMQ - 概述
什么是 RabbitMQ?
RabbitMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache 软件基金会开发和维护,并根据 Apache 许可证授权。它为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。
JMS 是一种允许开发基于消息的系统的规范。RabbitMQ 充当消息代理,位于应用程序之间,允许它们以异步和可靠的方式进行通信。
消息类型
为了更好地理解,下面解释了两种消息选项。
点对点
在这种类型的通信中,代理仅向一个消费者发送消息,而其他消费者将等待直到他们从代理接收消息。没有哪个消费者会收到相同的消息。
如果没有消费者,代理将保留消息,直到收到消费者。这种类型的通信也称为**基于队列的通信**,其中生产者将消息发送到队列,并且只有一个消费者从队列中获取一条消息。如果有多个消费者,他们可能会收到下一条消息,但他们不会收到与其他消费者相同的 消息。
发布/订阅
在这种类型的通信中,代理将相同的消息副本发送给所有活动的消费者。这种类型的通信也称为**基于主题的通信**,其中代理将相同的消息发送给订阅了特定主题的所有活动消费者。此模型支持单向通信,不需要验证已传输的消息。
RabbitMQ - 环境搭建
本章将指导您如何准备开发环境以开始使用 RabbitMQ。它还将教您如何在设置 RabbitMQ 之前在您的机器上设置 JDK、Maven 和 Eclipse:
设置 Java 开发工具包 (JDK)
您可以从 Oracle 的 Java 网站下载最新版本的 SDK:Java SE 下载。 您将在下载的文件中找到安装 JDK 的说明,请按照给定的说明安装和配置设置。最后设置 PATH 和 JAVA_HOME 环境变量以指向包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。
如果您运行的是 Windows 并已将 JDK 安装在 C:\jdk-11.0.11 中,则必须将以下行添加到 C:\autoexec.bat 文件中。
set PATH=C:\jdk-11.0.11;%PATH% set JAVA_HOME=C:\jdk-11.0.11
或者,在 Windows NT/2000/XP 上,您必须右键单击“我的电脑”,选择“属性”→“高级”→“环境变量”。然后,您必须更新 PATH 值并单击“确定”按钮。
在 Unix(Solaris、Linux 等)上,如果 SDK 安装在 /usr/local/jdk-11.0.11 中并且您使用的是 C shell,则必须将以下内容添加到您的 .cshrc 文件中。
setenv PATH /usr/local/jdk-11.0.11/bin:$PATH setenv JAVA_HOME /usr/local/jdk-11.0.11
或者,如果您使用的是集成开发环境 (IDE),例如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio,则必须编译并运行一个简单的程序以确认 IDE 知道您已安装 Java 的位置。否则,您必须按照 IDE 文档中给出的说明进行正确的设置。
设置 Eclipse IDE
本教程中的所有示例均使用 Eclipse IDE 编写。因此,我们建议您应该在您的机器上安装最新版本的 Eclipse。
要安装 Eclipse IDE,请从www.eclipse.org/downloads下载最新的 Eclipse 二进制文件。下载安装后,将二进制分发版解压缩到方便的位置。例如,在 Windows 上为 C:\eclipse,或在 Linux/Unix 上为 /usr/local/eclipse,最后适当地设置 PATH 变量。
可以通过在 Windows 机器上执行以下命令启动 Eclipse,或者您可以简单地双击 eclipse.exe。
%C:\eclipse\eclipse.exe
可以通过在 Unix(Solaris、Linux 等)机器上执行以下命令启动 Eclipse:
$/usr/local/eclipse/eclipse
成功启动后,如果一切正常,则应显示以下结果:
设置 Maven
在本教程中,我们使用 maven 来运行和构建基于 Spring 的示例以运行基于 RabbitMQ 的应用程序。请遵循Maven - 环境搭建来安装 maven。
RabbitMQ - 特性
RabbitMQ 是最流行的开源消息代理之一。它旨在为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是 RabbitMQ 的一些主要特性。
**轻量级** - RabbitMQ 轻量级,易于在本地和云端安装。
**连接选项** - RabbitMQ 支持多种消息协议,并且可以部署在分布式/联合配置中,以满足高可用性和可扩展性要求。
**可插拔架构** - RabbitMQ 允许选择持久性机制,并提供根据应用程序需求自定义安全性的选项(身份验证和授权)。
**多平台** - RabbitMQ 为许多流行的语言提供客户端 API,例如 Java、Python、JavaScript、Ruby 等。
**代理集群** - RabbitMQ 可以作为集群部署以实现高可用性和吞吐量。它可以跨多个可用区和区域联合。
**功能丰富** - RabbitMQ 为代理和客户端提供了许多高级功能。
**简单的管理界面** - RabbitMQ 管理控制台易于使用,但仍然提供许多强大的管理功能。
**企业级和云就绪** - RabbitMQ 支持可插拔的身份验证和授权。它支持 LDAP 和 TLS。它可以轻松部署在公共云和私有云中。
**功能丰富** - RabbitMQ 为代理和客户端提供了许多高级功能。它提供插件以支持持续集成、运营指标以及与其他企业系统的集成等。
**管理** - RabbitMQ 提供 HTTP API、命令行工具和 UI 来管理和监控 RabbitMQ。
RabbitMQ - 安装
RabbitMQ 基于 Erlang 运行时构建,因此在安装 RabbitMQ 之前,我们需要下载 Erlang 并安装它。确保您使用管理员权限安装 Erlang 和 RabbitMQ。
Erlang
Erlang 是一种通用编程语言和运行时环境。您可以从其主页下载最新版本的 Erlang:下载 Erlang/OTP。 我们正在 Windows 上安装 Erlang 并下载了适用于 Windows 的**Erlang/OTP 24.2.2** 安装程序 - otp_win64_24.2.2.exe。
现在通过双击安装程序安装 Erlang,并遵循默认选择完成设置。
RabbitMQ 安装
从其官方下载页面下载最新的 RabbitMQ 二进制文件。我们已下载 3.9.13 版本为 rabbitmq-server-3.9.13.exe (适用于Windows)。
现在通过双击安装程序安装 RabbitMQ,并遵循默认选择完成设置。
默认情况下,RabbitMQ 作为 Windows 服务运行。要启用基于 Web 的管理 UI,需要执行以下步骤。
转到 RabbitMQ 安装目录并键入如下命令:
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins.bat enable rabbitmq_management Enabling plugins on node rabbit@DESKTOP-86KD9FC: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@DESKTOP-86KD9FC... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch started 3 plugins. C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management Enabling plugins on node rabbit@DESKTOP-86KD9FC: rabbitmq_shovel rabbitmq_shovel_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_shovel rabbitmq_shovel_management rabbitmq_web_dispatch Applying plugin configuration to rabbit@DESKTOP-86KD9FC... The following plugins have been enabled: rabbitmq_shovel rabbitmq_shovel_management started 2 plugins. C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.13\sbin>
使用管理员权限编辑 C:\Windows\System32\drivers\etc\hosts 文件,并在其中添加以下行:
127.0.0.1 rabbitmq
验证安装
现在打开**http://rabbitmq:15672/** 以打开管理控制台。使用 guest/guest 登录。
RabbitMQ - 生产者应用
现在让我们创建一个生产者应用程序,它将消息发送到 RabbitMQ 队列。
创建项目
使用 Eclipse,选择**文件** → **新建** → **Maven 项目**。勾选**创建简单的项目(跳过原型选择)**并单击“下一步”。
输入如下所示的详细信息:
**groupId** - com.tutorialspoint
**artifactId** - producer
**version** - 0.0.1-SNAPSHOT
**name** - RabbitMQ 生产者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQ Producer</name> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> </dependency> </dependencies> </project>
现在创建一个生产者类,它将消息发送到 RabbitMQ 队列。
package com.tutorialspoint.rabbitmq; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static String QUEUE = "MyFirstQueue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE, false, false, false, null); Scanner input = new Scanner(System.in); String message; do { System.out.println("Enter message: "); message = input.nextLine(); channel.basicPublish("", QUEUE, null, message.getBytes()); } while (!message.equalsIgnoreCase("Quit")); } } }
生产者类创建一个连接,创建一个通道,连接到一个队列。如果用户输入 quit,则应用程序终止,否则它将使用 basicPublish 方法将消息发送到队列。
我们将在RabbitMQ - 测试应用章节中运行此应用程序。
RabbitMQ - 消费者应用
现在让我们创建一个消费者应用程序,它将从 RabbitMQ 队列接收消息。
创建项目
使用 Eclipse,选择**文件** → **新建** → **Maven 项目**。勾选**创建简单的项目(跳过原型选择)**并单击“下一步”。
输入如下所示的详细信息
**groupId** - com.tutorialspoint
**artifactId** - consumer
**version** - 0.0.1-SNAPSHOT
**name** - RabbitMQ 消费者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQ Consumer</name> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> </dependency> </dependencies> </project>
现在创建一个消费者类,它将从 RabbitMQ 队列接收消息。
package com.tutorialspoint.rabbitmq; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Consumer { private static String QUEUE = "MyFirstQueue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); System.out.println("Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received '" + message + "'"); }; channel.basicConsume(QUEUE, true, deliverCallback, consumerTag -> { }); } }
消费者类创建一个连接,创建一个通道,如果不存在则创建一个队列,然后如果有任何消息则从队列接收消息,并且它将继续轮询队列以获取消息。一旦消息被传递,它就会由 basicConsume() 方法使用 deliverCallback 处理。
我们将在RabbitMQ - 测试应用章节中运行此应用程序。
RabbitMQ - 测试应用
启动生产者应用程序
在 Eclipse 中,右键单击 Producer.java 源代码,然后选择“运行方式”→“Java 应用程序”。生产者应用程序将开始运行,您将看到如下所示的输出:
Enter message:
启动消费者应用程序
在 Eclipse 中,右键单击 Consumer.java 源代码,然后选择“运行方式”→“Java 应用程序”。消费者应用程序将开始运行,您将看到如下所示的输出:
Waiting for messages. To exit press CTRL+C
发送消息
在生产者控制台窗口中,键入 Hi 并按 Enter 键发送消息。
Enter message: Hi
接收消息
在消费者控制台窗口中验证是否已收到消息。
Waiting for messages. To exit press CTRL+C Received = Hi
发送 Quit 作为消息以终止生产者窗口会话并终止客户端窗口会话。
验证
现在在您的浏览器中打开**http://rabbitmq:15672/**。它将要求输入凭据。使用 guest/guest 作为用户名/密码,它将加载 RabbitMQ 管理控制台,您可以在其中检查队列以检查状态。它将显示排队和已交付的消息。
RabbitMQ - 发布者应用
现在让我们创建一个发布者应用程序,它将消息发送到 RabbitMQ 交换机。此交换机将消息传递到与交换机绑定的队列。
创建项目
使用 Eclipse,选择**文件** → **新建** → **Maven 项目**。勾选**创建简单的项目(跳过原型选择)**并单击“下一步”。
输入如下所示的详细信息:
**groupId** - com.tutorialspoint
**artifactId** - publisher
**version** - 0.0.1-SNAPSHOT
**name** - RabbitMQ 发布者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>publisher</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQ Publisher</name> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> </dependency> </dependencies> </project>
现在创建一个发布者类,它将消息发送到 RabbitMQ 主题以将其广播给所有订阅者。
package com.tutorialspoint.rabbitmq; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Publisher { private static final String EXCHANGE = "MyExchange"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE, "fanout"); Scanner input = new Scanner(System.in); String message; do { System.out.println("Enter message: "); message = input.nextLine(); channel.basicPublish(EXCHANGE, "", null, message.getBytes()); } while (!message.equalsIgnoreCase("Quit")); } } }
生产者类创建一个连接,创建一个通道,声明一个交换机,然后要求用户输入消息。消息被发送到交换机,并且因为我们没有传递队列名称,所以所有绑定到此交换机的队列都将收到消息。如果用户输入 quit,则应用程序终止,否则它将消息发送到主题。
我们将在RabbitMQ - 测试应用章节中运行此应用程序。
RabbitMQ - 订阅者应用
现在让我们创建一个订阅者应用程序,它将接收来自RabbitMQ主题的消息。
创建项目
使用 Eclipse,选择**文件** → **新建** → **Maven 项目**。勾选**创建简单的项目(跳过原型选择)**并单击“下一步”。
输入如下所示的详细信息:
**groupId** - com.tutorialspoint
artifactId − subscriber
**version** - 0.0.1-SNAPSHOT
name − RabbitMQ 订阅者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 RabbitMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>subscriber</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitMQ Subscriber</name> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.14.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.26</version> </dependency> </dependencies> </project>
现在创建一个Subscriber类,它将接收来自RabbitMQ队列的消息。
package com.tutorialspoint.rabbitmq; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Subscriber { private static String EXCHANGE = "MyExchange"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE, ""); System.out.println("Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Subscriber类创建连接,创建通道,声明交换机,创建一个随机队列并将其与交换机绑定,然后接收主题中的消息(如果有)。按Ctrl + C终止,否则它将继续轮询队列以查找消息。
我们将在RabbitMQ - 测试应用程序章节中多次运行此应用程序以创建多个订阅者。
RabbitMQ - 测试应用程序主题
启动发布者应用程序
在eclipse中,右键单击Publisher.java源代码,然后选择“Run As”→“Java Application”。发布者应用程序将开始运行,您将看到如下输出:
Enter message:
启动订阅者应用程序
在eclipse中,右键单击Subscriber.java源代码,然后选择“Run As”→“Java Application”。订阅者应用程序将开始运行,您将看到如下输出:
Waiting for messages. To exit press CTRL+C
启动另一个订阅者应用程序
在eclipse中,再次右键单击Subscriber.java源代码,然后选择“Run As”→“Java Application”。另一个订阅者应用程序将开始运行,您将看到如下输出:
Waiting for messages. To exit press CTRL+C
发送消息
在发布者控制台窗口中,键入Hi并按Enter键发送消息。
Enter message: Hi
接收消息
在订阅者控制台窗口中验证每个窗口是否都收到了消息。
Received = Hi
发送Quit作为消息以终止所有发布者和订阅者控制台窗口会话。
验证
现在在浏览器中打开http://rabbitmq:15672/。它将要求输入凭据。使用guest/guest作为用户名/密码,它将加载RabbitMQ管理控制台,您可以在其中检查队列和交换机以检查已传递消息和绑定的状态。