- Apache ActiveMQ 教程
- Apache ActiveMQ - 首页
- Apache ActiveMQ - 概述
- Apache ActiveMQ - 环境设置
- Apache ActiveMQ - 功能特性
- Apache ActiveMQ - 运行 Broker 服务器
- Apache ActiveMQ - 管理控制台
- Apache ActiveMQ 基于队列的示例
- Apache ActiveMQ - 生产者应用程序
- Apache ActiveMQ - 消费者应用程序
- Apache ActiveMQ - 测试应用程序
- Apache ActiveMQ 基于主题的示例
- Apache ActiveMQ - 发布者应用程序
- Apache ActiveMQ - 订阅者应用程序
- Apache ActiveMQ - 测试应用程序
- Apache ActiveMQ 有用资源
- Apache ActiveMQ 快速指南
- Apache ActiveMQ - 有用资源
- Apache ActiveMQ - 讨论
Apache ActiveMQ 快速指南
Apache ActiveMQ - 概述
什么是 ActiveMQ?
ActiveMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache 软件基金会开发和维护,并根据 Apache 许可证授权。它为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。
JMS 是一种允许开发基于消息的系统的规范。ActiveMQ 充当消息代理,位于应用程序之间,允许它们以异步和可靠的方式进行通信。
消息类型
为了更好地理解,下面解释了两种消息选项。
点对点
在这种类型的通信中,代理仅向一个消费者发送消息,而其他消费者将等待直到他们从代理接收消息。没有消费者会收到相同的消息。
如果没有消费者,代理将保存消息,直到获得消费者。这种类型的通信也称为**基于队列的通信**,其中生产者将消息发送到队列,并且只有一个消费者从队列中获取一条消息。如果有多个消费者,他们可能会收到下一条消息,但他们不会收到与其他消费者相同的 消息。
发布/订阅
在这种类型的通信中,代理向所有活动消费者发送相同的消息副本。这种类型的通信也称为**基于主题的通信**,其中代理向所有订阅特定主题的活动消费者发送相同的消息。此模型支持单向通信,其中不需要验证已传输的消息。
Apache ActiveMQ - 环境设置
本章将指导您如何准备开发环境以开始使用 ActiveMQ。它还将教您如何在设置 ActiveMQ 之前在您的机器上设置 JDK、Maven 和 Eclipse -
设置 Java 开发工具包 (JDK)
您可以从 Oracle 的 Java 网站下载最新版本的 SDK - Java SE 下载。 您将在下载的文件中找到安装 JDK 的说明,请按照给定的说明安装和配置设置。最后设置 PATH 和 JAVA_HOME 环境变量以引用包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。
如果您运行的是 Windows 并已将 JDK 安装在 C:\jdk-11.0.11 中,则必须将以下行添加到您的 C:\autoexec.bat 文件中。
set PATH=C:\jdk-11.0.11;%PATH% set JAVA_HOME=C:\jdk-11.0.11
或者,在 Windows NT/2000/XP 上,您必须右键单击“我的电脑”,选择“属性”→“高级”→“环境变量”。然后,您必须更新 PATH 值并单击“确定”按钮。
在 Unix(Solaris、Linux 等)上,如果 SDK 安装在 /usr/local/jdk-11.0.11 中并且您使用的是 C shell,则必须将以下内容添加到您的 .cshrc 文件中。
setenv PATH /usr/local/jdk-11.0.11/bin:$PATH setenv JAVA_HOME /usr/local/jdk-11.0.11
或者,如果您使用的是集成开发环境 (IDE),例如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio,则必须编译并运行一个简单的程序以确认 IDE 知道您安装 Java 的位置。否则,您必须按照 IDE 文档中给出的说明进行正确的设置。
设置 Eclipse IDE
本教程中的所有示例都是使用 Eclipse IDE 编写的。因此,我们建议您应该在您的机器上安装最新版本的 Eclipse。
要安装 Eclipse IDE,请从 www.eclipse.org/downloads/ 下载最新的 Eclipse 二进制文件。下载安装后,将二进制分发版解压缩到方便的位置。例如,在 Windows 上为 C:\eclipse,或在 Linux/Unix 上为 /usr/local/eclipse,最后适当地设置 PATH 变量。
可以通过在 Windows 机器上执行以下命令启动 Eclipse,或者您可以简单地双击 eclipse.exe
%C:\eclipse\eclipse.exe
可以通过在 Unix(Solaris、Linux 等)机器上执行以下命令启动 Eclipse -
$/usr/local/eclipse/eclipse
成功启动后,如果一切正常,则应显示以下结果 -
设置 Maven
在本教程中,我们使用 maven 来运行和构建基于 spring 的示例以运行基于 ActiveMQ 的应用程序。请按照 Maven - 环境设置 来安装 maven。
下载 ActiveMQ
您可以从其官方页面下载最新稳定版本的 ActiveMQ。请按照 下载 ActivMQ 下载 ActiveMQ。我们使用了 2022 年 2 月 15 日发布的 5.13.4 版本。将存档内容解压缩到您选择的文件夹中。我们已解压缩到 **F:/ → Apache → apache-activemq-5.16.4**。
Apache ActiveMQ - 功能特性
ActiveMQ 旨在为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是 ActiveMQ 的一些主要功能。
**JMS 兼容** - ActiveMQ 完全符合 JMS 1.1 标准。JMS 规范提供了一种用于同步或异步消息传递、一次且仅一次消息传递、订阅者的消息持久性等的标准机制。
**连接选项** - ActiveMQ 支持 HTTP/S、多播、SSL、Stomp、TCP、UDP、XMPP,从而提供广泛的连接选项,并允许各种系统使用它们选择的协议进行通信。
**可插拔架构** - ActiveMQ 允许选择持久性机制,并提供根据应用程序需求自定义安全性的选项,以进行身份验证和授权。
**多平台** - ActiveMQ 为许多流行的语言(如 Java、C、C++、.NET、Perl、PHP、Python、Ruby 等)提供客户端 API。ActiveMQ Broker 将在 JVM 中运行,但客户端可以使用任何支持的语言编写。
**Broker 集群** - ActiveMQ 允许准备一个用于扩展的 Broker 网络,并且可以支持不同类型的拓扑。
**功能丰富** - ActiveMQ 为 Broker 和客户端提供许多高级功能,并支持 Apache Camel。
**简单的管理界面** - ActiveMQ 管理控制台易于使用,但仍然提供许多强大的管理功能。
Apache ActiveMQ - 运行 Broker 服务器
我们已下载 ActiveMQ 的最新版本,如 ActiveMQ - 环境设置 中所述。现在转到文件夹 **F:/ → Apache → apache-activemq-5.16.4/bin** 并键入以下命令。
示例
F:\Apache\apache-activemq-5.16.4\bin>activemq start
输出
您将看到类似的输出,ActiveMQ 将开始运行。
Java Runtime: Oracle Corporation 11.0.11 C:\Program Files\Java\jdk-11.0.11 Heap sizes: current=1048576k free=1041918k max=1048576k JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=F:\Apache\apache-activemq-5.16.4\bin\..\conf\login.config -Dactivemq.classpath=F:\Apache\apache-activemq-5.16.4\bin\..\conf;F:\Apache\apache-activemq-5.16.4\bin\../conf;F:\Apache\apache-activemq-5.16.4\bin\../conf; -Dactivemq.home=F:\Apache\apache-activemq-5.16.4\bin\.. -Dactivemq.base=F:\Apache\apache-activemq-5.16.4\bin\.. -Dactivemq.conf=F:\Apache\apache-activemq-5.16.4\bin\..\conf -Dactivemq.data=F:\Apache\apache-activemq-5.16.4\bin\..\data -Djava.io.tmpdir=F:\Apache\apache-activemq-5.16.4\bin\..\data\tmp Extensions classpath: [F:\Apache\apache-activemq-5.16.4\bin\..\lib,F:\Apache\apache-activemq-5.16.4\bin\..\lib\camel,F:\Apache\apache-activemq-5.16.4\bin\..\lib\optional,F:\Apache\apache-activemq-5.16.4\bin\..\lib\web,F:\Apache\apache-activemq-5.16.4\bin\..\lib\extra] ACTIVEMQ_HOME: F:\Apache\apache-activemq-5.16.4\bin\.. ACTIVEMQ_BASE: F:\Apache\apache-activemq-5.16.4\bin\.. ACTIVEMQ_CONF: F:\Apache\apache-activemq-5.16.4\bin\..\conf ACTIVEMQ_DATA: F:\Apache\apache-activemq-5.16.4\bin\..\data Loading message broker from: xbean:activemq.xml INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@53fe15ff: startup date [Sat Feb 26 12:50:18 IST 2022]; root of context hierarchy INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[F:\Apache\apache-activemq-5.16.4\bin\..\data\kahadb] INFO | PListStore:[F:\Apache\apache-activemq-5.16.4\bin\..\data\localhost\tmp_storage] started INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) is starting INFO | Listening for connections at: tcp://DESKTOP-86KD9FC:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector openwire started INFO | Listening for connections at: amqp://DESKTOP-86KD9FC:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector amqp started INFO | Listening for connections at: stomp://DESKTOP-86KD9FC:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector stomp started INFO | Listening for connections at: mqtt://DESKTOP-86KD9FC:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector mqtt started INFO | Starting Jetty server INFO | Creating Jetty connector WARN | [email protected]@4f966719{/,null,STARTING} has uncovered http methods for path: / INFO | Listening for connections at ws://DESKTOP-86KD9FC:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 INFO | Connector ws started INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started INFO | For help or more information please see: https://activemq.apache.org INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/ INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
验证
现在在浏览器中打开 **http://127.0.0.1:8161/admin/**。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列、主题、连接等。
Apache ActiveMQ - 管理控制台
一旦 ActiveMQ 服务器启动并运行。您可以使用管理控制台来管理队列、主题、订阅者、连接、网络等。
在浏览器中打开 **http://127.0.0.1:8161/admin/**。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列、主题、连接等。
队列
单击“队列”选项卡,输入队列名称为 testQueue 并单击“创建”按钮。现在您可以在列表中看到该队列。
主题
同样,您可以创建主题并在“主题”选项卡中检查主题。
其他
同样,您可以浏览订阅者、连接、网络桥、调度程序详细信息。
发送
“发送”选项卡允许通过指定目标和其他详细信息将 JMS 消息发送到特定队列或主题。
Apache ActiveMQ - 生产者应用程序
现在让我们创建一个生产者应用程序,它将消息发送到 ActiveMQ 队列。
创建项目
使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。
输入详细信息,如下所示 -
**groupId** - com.tutorialspoint
**artifactId** - producer
**version** - 0.0.1-SNAPSHOT
**name** - ActiveMQ 生产者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Producer</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个生产者类,它将消息发送到 ActiveMQ 队列。
package com.tutorialspoint.activemq; import java.io.Console; import java.util.Scanner; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue Destination destination = session.createQueue("MyFirstQueue"); // Create a producer specific to queue MessageProducer producer = session.createProducer(destination); Scanner input = new Scanner(System.in); String response; do { System.out.println("Enter message: "); response = input.nextLine(); // Create a message object TextMessage msg = session.createTextMessage(response); // Send the message to the queue producer.send(msg); } while (!response.equalsIgnoreCase("Quit")); input.close(); // Close the connection connection.close(); } }
生产者类创建连接,启动会话,创建生产者,然后要求用户输入消息。如果用户输入 quit,则应用程序终止,否则它将消息发送到队列。
我们将在 ActiveMQ - 测试应用程序 一章中运行此应用程序。
Apache ActiveMQ - 消费者应用程序
现在让我们创建一个消费者应用程序,它将从 ActiveMQ 队列接收消息。
创建项目
使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。
输入详细信息,如下所示 -
**groupId** - com.tutorialspoint
**artifactId** - consumer
**version** - 0.0.1-SNAPSHOT
**name** - ActiveMQ 消费者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Consumer</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个消费者类,它将从 ActiveMQ 队列接收消息。
package com.tutorialspoint.activemq; import java.io.Console; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Consumer { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a queue Destination destination = session.createQueue("MyFirstQueue"); // Create a consumer specific to queue MessageConsumer consumer = session.createConsumer(destination); Console c = System.console(); String response; do { // Receive the message Message msg = consumer.receive(); response = ((TextMessage) msg).getText(); System.out.println("Received = "+response); } while (!response.equalsIgnoreCase("Quit")); // Close the connection connection.close(); } }
消费者类创建连接,启动会话,创建消费者,然后从队列接收消息(如果有)。如果队列包含 quit 作为消息,则应用程序终止,否则它将继续轮询队列以获取消息。
我们将在 ActiveMQ - 测试应用程序 一章中运行此应用程序。
Apache ActiveMQ - 测试应用程序
启动 ActiveMQ 服务器
现在让我们启动 ActiveMQ 服务器。转到文件夹 **F:/ → Apache → apache-activemq-5.16.4/bin** 并键入以下命令。
示例
F:\Apache\apache-activemq-5.16.4\bin>activemq start
输出
您将看到类似的输出,ActiveMQ 将开始运行。
... INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started INFO | For help or more information please see: https://activemq.apache.org INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/ INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
启动生产者应用程序
在 eclipse 中,右键单击 Producer.java 源代码,然后选择“以 Java 应用程序运行”。生产者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message:
启动消费者应用程序
在 eclipse 中,右键单击 Consumer.java 源代码,然后选择“以 Java 应用程序运行”。消费者应用程序将开始运行,您将看到如下输出 -
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
发送消息
在生产者控制台窗口中,键入 Hi 并按 Enter 键发送消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message: Hi
接收消息
在消费者控制台窗口中验证是否已收到消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Received = Hi
发送 quit 作为消息以终止生产者和消费者控制台窗口会话。
验证
现在在浏览器中打开 **http://127.0.0.1:8161/admin/**。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列以检查状态。它将显示已入队和已传递的 2 条消息。
Apache ActiveMQ - 发布者应用程序
现在让我们创建一个发布者应用程序,它将消息发送到 ActiveMQ 队列。
创建项目
使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。
输入详细信息,如下所示 -
**groupId** - com.tutorialspoint
**artifactId** - publisher
**version** - 0.0.1-SNAPSHOT
**name** - ActiveMQ 发布者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>publisher</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Publisher</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个发布者类,它将消息发送到 ActiveMQ 主题以将其广播给所有订阅者。
package com.tutorialspoint.activemq; import java.io.Console; import java.util.Scanner; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Publisher { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a topic Destination destination = session.createTopic("MyFirstTopic"); // Create a publisher specific to topic MessageProducer publisher = session.createProducer(destination); Scanner input = new Scanner(System.in); String response; do { System.out.println("Enter message: "); response = input.nextLine(); // Create a message object TextMessage msg = session.createTextMessage(response); // Send the message to the topic publisher.send(msg); } while (!response.equalsIgnoreCase("Quit")); input.close(); // Close the connection connection.close(); } }
生产者类创建连接,启动会话,创建生产者,然后要求用户输入消息。如果用户输入 quit,则应用程序终止,否则它将消息发送到主题。
我们将在 ActiveMQ - 测试应用程序 一章中运行此应用程序。
Apache ActiveMQ - 订阅者应用程序
现在让我们创建一个订阅者应用程序,它将从 ActiveMQ 主题接收消息。
创建项目
使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。
输入详细信息,如下所示 -
**groupId** - com.tutorialspoint
**artifactId** - subscriber
**version** - 0.0.1-SNAPSHOT
**name** - ActiveMQ 订阅者
单击“完成”按钮,将创建一个新项目。
pom.xml
现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tutorialspoint.activemq</groupId> <artifactId>subscriber</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQ Subscriber</name> <dependencies> <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.40.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.1</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> <plugin> <groupId>org.fusesource.mvnplugins</groupId> <artifactId>maven-uberize-plugin</artifactId> <version>1.14</version> <executions> <execution> <phase>package</phase> <goals><goal>uberize</goal></goals> </execution> </executions> </plugin> </plugins> </build> </project>
现在创建一个 Subscriber 类,它将接收来自 ActiveMQ 队列的消息。
package com.tutorialspoint.activemq; import java.io.Console; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnectionFactory; public class Subscriber { public static void main(String[] args) throws Exception { // Create a connection to ActiveMQ JMS broker using AMQP protocol JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672"); Connection connection = factory.createConnection("admin", "password"); connection.start(); // Create a session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create a topic Destination destination = session.createTopic("MyFirstTopic"); // Create a subscriber specific to topic MessageConsumer subscriber = session.createConsumer(destination); Console c = System.console(); String response; do { // Receive the message Message msg = subscriber.receive(); response = ((TextMessage) msg).getText(); System.out.println("Received = "+response); } while (!response.equalsIgnoreCase("Quit")); // Close the connection connection.close(); } }
Subscriber 类创建一个连接,启动会话,创建一个消费者,然后接收主题中的消息(如果有)。如果主题包含 quit 作为消息,则应用程序终止,否则它将继续轮询队列以获取消息。
我们将多次运行此应用程序,以便在ActiveMQ - 测试应用程序章节中创建多个订阅者。
Apache ActiveMQ - 测试应用程序主题
启动 ActiveMQ 服务器
现在让我们启动 ActiveMQ 服务器。转到文件夹 **F:/ → Apache → apache-activemq-5.16.4/bin** 并键入以下命令。
示例
F:\Apache\apache-activemq-5.16.4\bin>activemq start
输出
您将看到类似的输出,ActiveMQ 将开始运行。
... INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started INFO | For help or more information please see: https://activemq.apache.org INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/ INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/
启动发布者应用程序
在 Eclipse 中,右键单击 Publisher.java 源文件,然后选择“Run As”→“Java Application”。发布者应用程序将开始运行,您将看到如下输出:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message:
启动订阅者应用程序
在 Eclipse 中,右键单击 Subscriber.java 源文件,然后选择“Run As”→“Java Application”。订阅者应用程序将开始运行,您将看到如下输出:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
启动另一个订阅者应用程序
在 Eclipse 中,再次右键单击 Subscriber.java 源文件,然后选择“Run As”→“Java Application”。另一个订阅者应用程序将开始运行,您将看到如下输出:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
发送消息
在发布者控制台窗口中,键入 Hi 并按 Enter 键发送消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Enter message: Hi
接收消息
在订阅者控制台窗口中验证每个窗口是否都收到了消息。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Received = Hi
发送 Quit 作为消息以终止所有发布者和订阅者控制台窗口会话。
验证
现在在浏览器中打开http://127.0.0.1:8161/admin/。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查主题以检查状态。它将显示排队和已传送的多个消息。