Apache ActiveMQ 快速指南



Apache ActiveMQ - 概述

什么是 ActiveMQ?

ActiveMQ 是一个用 Java 编写的开源消息代理。它完全符合 JMS 1.1 标准。它由 Apache 软件基金会开发和维护,并根据 Apache 许可证授权。它为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。

JMS 是一种允许开发基于消息的系统的规范。ActiveMQ 充当消息代理,位于应用程序之间,允许它们以异步和可靠的方式进行通信。

AMQ

消息类型

为了更好地理解,下面解释了两种消息选项。

点对点

在这种类型的通信中,代理仅向一个消费者发送消息,而其他消费者将等待直到他们从代理接收消息。没有消费者会收到相同的消息。

如果没有消费者,代理将保存消息,直到获得消费者。这种类型的通信也称为**基于队列的通信**,其中生产者将消息发送到队列,并且只有一个消费者从队列中获取一条消息。如果有多个消费者,他们可能会收到下一条消息,但他们不会收到与其他消费者相同的 消息。

Point to Point Messaging

发布/订阅

在这种类型的通信中,代理向所有活动消费者发送相同的消息副本。这种类型的通信也称为**基于主题的通信**,其中代理向所有订阅特定主题的活动消费者发送相同的消息。此模型支持单向通信,其中不需要验证已传输的消息。

Publish/Subscribe Messaging

Apache ActiveMQ - 环境设置

本章将指导您如何准备开发环境以开始使用 ActiveMQ。它还将教您如何在设置 ActiveMQ 之前在您的机器上设置 JDK、Maven 和 Eclipse -

设置 Java 开发工具包 (JDK)

您可以从 Oracle 的 Java 网站下载最新版本的 SDK - Java SE 下载。 您将在下载的文件中找到安装 JDK 的说明,请按照给定的说明安装和配置设置。最后设置 PATH 和 JAVA_HOME 环境变量以引用包含 java 和 javac 的目录,通常分别为 java_install_dir/bin 和 java_install_dir。

如果您运行的是 Windows 并已将 JDK 安装在 C:\jdk-11.0.11 中,则必须将以下行添加到您的 C:\autoexec.bat 文件中。

set PATH=C:\jdk-11.0.11;%PATH% 
set JAVA_HOME=C:\jdk-11.0.11 

或者,在 Windows NT/2000/XP 上,您必须右键单击“我的电脑”,选择“属性”→“高级”→“环境变量”。然后,您必须更新 PATH 值并单击“确定”按钮。

在 Unix(Solaris、Linux 等)上,如果 SDK 安装在 /usr/local/jdk-11.0.11 中并且您使用的是 C shell,则必须将以下内容添加到您的 .cshrc 文件中。

setenv PATH /usr/local/jdk-11.0.11/bin:$PATH 
setenv JAVA_HOME /usr/local/jdk-11.0.11

或者,如果您使用的是集成开发环境 (IDE),例如 Borland JBuilder、Eclipse、IntelliJ IDEA 或 Sun ONE Studio,则必须编译并运行一个简单的程序以确认 IDE 知道您安装 Java 的位置。否则,您必须按照 IDE 文档中给出的说明进行正确的设置。

设置 Eclipse IDE

本教程中的所有示例都是使用 Eclipse IDE 编写的。因此,我们建议您应该在您的机器上安装最新版本的 Eclipse。

要安装 Eclipse IDE,请从 www.eclipse.org/downloads/ 下载最新的 Eclipse 二进制文件。下载安装后,将二进制分发版解压缩到方便的位置。例如,在 Windows 上为 C:\eclipse,或在 Linux/Unix 上为 /usr/local/eclipse,最后适当地设置 PATH 变量。

可以通过在 Windows 机器上执行以下命令启动 Eclipse,或者您可以简单地双击 eclipse.exe

%C:\eclipse\eclipse.exe 

可以通过在 Unix(Solaris、Linux 等)机器上执行以下命令启动 Eclipse -

$/usr/local/eclipse/eclipse

成功启动后,如果一切正常,则应显示以下结果 -

Eclipse Home Page

设置 Maven

在本教程中,我们使用 maven 来运行和构建基于 spring 的示例以运行基于 ActiveMQ 的应用程序。请按照 Maven - 环境设置 来安装 maven。

下载 ActiveMQ

您可以从其官方页面下载最新稳定版本的 ActiveMQ。请按照 下载 ActivMQ 下载 ActiveMQ。我们使用了 2022 年 2 月 15 日发布的 5.13.4 版本。将存档内容解压缩到您选择的文件夹中。我们已解压缩到 **F:/ → Apache → apache-activemq-5.16.4**。

Apache ActiveMQ - 功能特性

ActiveMQ 旨在为企业级消息应用程序提供高可用性、可扩展性、可靠性、性能和安全性。以下是 ActiveMQ 的一些主要功能。

  • **JMS 兼容** - ActiveMQ 完全符合 JMS 1.1 标准。JMS 规范提供了一种用于同步或异步消息传递、一次且仅一次消息传递、订阅者的消息持久性等的标准机制。

  • **连接选项** - ActiveMQ 支持 HTTP/S、多播、SSL、Stomp、TCP、UDP、XMPP,从而提供广泛的连接选项,并允许各种系统使用它们选择的协议进行通信。

  • **可插拔架构** - ActiveMQ 允许选择持久性机制,并提供根据应用程序需求自定义安全性的选项,以进行身份验证和授权。

  • **多平台** - ActiveMQ 为许多流行的语言(如 Java、C、C++、.NET、Perl、PHP、Python、Ruby 等)提供客户端 API。ActiveMQ Broker 将在 JVM 中运行,但客户端可以使用任何支持的语言编写。

  • **Broker 集群** - ActiveMQ 允许准备一个用于扩展的 Broker 网络,并且可以支持不同类型的拓扑。

  • **功能丰富** - ActiveMQ 为 Broker 和客户端提供许多高级功能,并支持 Apache Camel。

  • **简单的管理界面** - ActiveMQ 管理控制台易于使用,但仍然提供许多强大的管理功能。

Apache ActiveMQ - 运行 Broker 服务器

我们已下载 ActiveMQ 的最新版本,如 ActiveMQ - 环境设置 中所述。现在转到文件夹 **F:/ → Apache → apache-activemq-5.16.4/bin** 并键入以下命令。

示例

F:\Apache\apache-activemq-5.16.4\bin>activemq start

输出

您将看到类似的输出,ActiveMQ 将开始运行。

Java Runtime: Oracle Corporation 11.0.11 C:\Program Files\Java\jdk-11.0.11
   Heap sizes: current=1048576k  free=1041918k  max=1048576k
    JVM args: -Dcom.sun.management.jmxremote -Xms1G -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=F:\Apache\apache-activemq-5.16.4\bin\..\conf\login.config -Dactivemq.classpath=F:\Apache\apache-activemq-5.16.4\bin\..\conf;F:\Apache\apache-activemq-5.16.4\bin\../conf;F:\Apache\apache-activemq-5.16.4\bin\../conf; -Dactivemq.home=F:\Apache\apache-activemq-5.16.4\bin\.. -Dactivemq.base=F:\Apache\apache-activemq-5.16.4\bin\.. -Dactivemq.conf=F:\Apache\apache-activemq-5.16.4\bin\..\conf -Dactivemq.data=F:\Apache\apache-activemq-5.16.4\bin\..\data -Djava.io.tmpdir=F:\Apache\apache-activemq-5.16.4\bin\..\data\tmp
Extensions classpath:
   [F:\Apache\apache-activemq-5.16.4\bin\..\lib,F:\Apache\apache-activemq-5.16.4\bin\..\lib\camel,F:\Apache\apache-activemq-5.16.4\bin\..\lib\optional,F:\Apache\apache-activemq-5.16.4\bin\..\lib\web,F:\Apache\apache-activemq-5.16.4\bin\..\lib\extra]
ACTIVEMQ_HOME: F:\Apache\apache-activemq-5.16.4\bin\..
ACTIVEMQ_BASE: F:\Apache\apache-activemq-5.16.4\bin\..
ACTIVEMQ_CONF: F:\Apache\apache-activemq-5.16.4\bin\..\conf
ACTIVEMQ_DATA: F:\Apache\apache-activemq-5.16.4\bin\..\data
Loading message broker from: xbean:activemq.xml
 INFO | Refreshing org.apache.activemq.xbean.XBeanBrokerFactory$1@53fe15ff: startup date [Sat Feb 26 12:50:18 IST 2022]; root of context hierarchy
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[F:\Apache\apache-activemq-5.16.4\bin\..\data\kahadb]
 INFO | PListStore:[F:\Apache\apache-activemq-5.16.4\bin\..\data\localhost\tmp_storage] started
 INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) is starting
 INFO | Listening for connections at: tcp://DESKTOP-86KD9FC:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector openwire started
 INFO | Listening for connections at: amqp://DESKTOP-86KD9FC:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector amqp started
 INFO | Listening for connections at: stomp://DESKTOP-86KD9FC:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector stomp started
 INFO | Listening for connections at: mqtt://DESKTOP-86KD9FC:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector mqtt started
 INFO | Starting Jetty server
 INFO | Creating Jetty connector
 WARN | [email protected]@4f966719{/,null,STARTING} has uncovered http methods for path: /
 INFO | Listening for connections at ws://DESKTOP-86KD9FC:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
 INFO | Connector ws started
 INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started
 INFO | For help or more information please see: https://activemq.apache.org
 INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/
 INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/

验证

现在在浏览器中打开 **http://127.0.0.1:8161/admin/**。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列、主题、连接等。

admin console

Apache ActiveMQ - 管理控制台

一旦 ActiveMQ 服务器启动并运行。您可以使用管理控制台来管理队列、主题、订阅者、连接、网络等。

在浏览器中打开 **http://127.0.0.1:8161/admin/**。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列、主题、连接等。

Admin Console

队列

单击“队列”选项卡,输入队列名称为 testQueue 并单击“创建”按钮。现在您可以在列表中看到该队列。

Queues in Admin Console

主题

同样,您可以创建主题并在“主题”选项卡中检查主题。

Topics in Admin Console

其他

同样,您可以浏览订阅者、连接、网络桥、调度程序详细信息。

发送

“发送”选项卡允许通过指定目标和其他详细信息将 JMS 消息发送到特定队列或主题。

Send Message in Admin Console

Apache ActiveMQ - 生产者应用程序

现在让我们创建一个生产者应用程序,它将消息发送到 ActiveMQ 队列。

创建项目

使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。

输入详细信息,如下所示 -

  • **groupId** - com.tutorialspoint

  • **artifactId** - producer

  • **version** - 0.0.1-SNAPSHOT

  • **name** - ActiveMQ 生产者

单击“完成”按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
   https://maven.apache.org/xsd/maven-4.0.0.xsd">
   
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>producer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>ActiveMQ Producer</name>
   <dependencies>
      <dependency>
         <groupId>org.apache.geronimo.specs</groupId>
         <artifactId>geronimo-jms_1.1_spec</artifactId>
         <version>1.1</version>
      </dependency>
      <dependency>
         <groupId>org.apache.qpid</groupId>
         <artifactId>qpid-jms-client</artifactId>
         <version>0.40.0</version>
      </dependency>
   </dependencies>
   <build>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.1</version>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
            </configuration>
         </plugin>
         <plugin>
            <groupId>org.fusesource.mvnplugins</groupId>
            <artifactId>maven-uberize-plugin</artifactId>
            <version>1.14</version>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals><goal>uberize</goal></goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>  
</project>

现在创建一个生产者类,它将消息发送到 ActiveMQ 队列。

package com.tutorialspoint.activemq;

import java.io.Console;
import java.util.Scanner;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;

public class Producer {
   public static void main(String[] args) throws Exception {

      // Create a connection to ActiveMQ JMS broker using AMQP protocol
      JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
      Connection connection = factory.createConnection("admin", "password");
      connection.start();

      // Create a session
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      // Create a queue
      Destination destination = session.createQueue("MyFirstQueue");

      // Create a producer specific to queue
      MessageProducer producer = session.createProducer(destination);

      Scanner input = new Scanner(System.in);
      String response;
      do {
         System.out.println("Enter message: ");
         response = input.nextLine();
         // Create a message object
         TextMessage msg = session.createTextMessage(response);

         // Send the message to the queue
         producer.send(msg);

      } while (!response.equalsIgnoreCase("Quit"));
      input.close();

      // Close the connection
      connection.close();
   }
}

生产者类创建连接,启动会话,创建生产者,然后要求用户输入消息。如果用户输入 quit,则应用程序终止,否则它将消息发送到队列。

我们将在 ActiveMQ - 测试应用程序 一章中运行此应用程序。

Apache ActiveMQ - 消费者应用程序

现在让我们创建一个消费者应用程序,它将从 ActiveMQ 队列接收消息。

创建项目

使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。

输入详细信息,如下所示 -

  • **groupId** - com.tutorialspoint

  • **artifactId** - consumer

  • **version** - 0.0.1-SNAPSHOT

  • **name** - ActiveMQ 消费者

单击“完成”按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
   https://maven.apache.org/xsd/maven-4.0.0.xsd">
   
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>consumer</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>ActiveMQ Consumer</name>
   <dependencies>
      <dependency>
         <groupId>org.apache.geronimo.specs</groupId>
         <artifactId>geronimo-jms_1.1_spec</artifactId>
         <version>1.1</version>
      </dependency>
      <dependency>
         <groupId>org.apache.qpid</groupId>
         <artifactId>qpid-jms-client</artifactId>
         <version>0.40.0</version>
      </dependency>
   </dependencies>
   <build>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.1</version>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
            </configuration>
         </plugin>
         <plugin>
            <groupId>org.fusesource.mvnplugins</groupId>
            <artifactId>maven-uberize-plugin</artifactId>
            <version>1.14</version>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals><goal>uberize</goal></goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>  
</project>

现在创建一个消费者类,它将从 ActiveMQ 队列接收消息。

package com.tutorialspoint.activemq;

import java.io.Console;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;

public class Consumer {
   public static void main(String[] args) throws Exception {
      // Create a connection to ActiveMQ JMS broker using AMQP protocol
      JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
      Connection connection = factory.createConnection("admin", "password");
      connection.start();

      // Create a session
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      // Create a queue
      Destination destination = session.createQueue("MyFirstQueue");

      // Create a consumer specific to queue
      MessageConsumer consumer = session.createConsumer(destination);

      Console c = System.console();
      String response;
      do {      	
         // Receive the message
         Message msg = consumer.receive();
         response = ((TextMessage) msg).getText();

         System.out.println("Received = "+response);

      } while (!response.equalsIgnoreCase("Quit"));

      // Close the connection
      connection.close();
   }
}

消费者类创建连接,启动会话,创建消费者,然后从队列接收消息(如果有)。如果队列包含 quit 作为消息,则应用程序终止,否则它将继续轮询队列以获取消息。

我们将在 ActiveMQ - 测试应用程序 一章中运行此应用程序。

Apache ActiveMQ - 测试应用程序

启动 ActiveMQ 服务器

现在让我们启动 ActiveMQ 服务器。转到文件夹 **F:/ → Apache → apache-activemq-5.16.4/bin** 并键入以下命令。

示例

F:\Apache\apache-activemq-5.16.4\bin>activemq start

输出

您将看到类似的输出,ActiveMQ 将开始运行。

...
INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started
INFO | For help or more information please see: https://activemq.apache.org
INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/
INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/

启动生产者应用程序

在 eclipse 中,右键单击 Producer.java 源代码,然后选择“以 Java 应用程序运行”。生产者应用程序将开始运行,您将看到如下输出 -

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Enter message:

启动消费者应用程序

在 eclipse 中,右键单击 Consumer.java 源代码,然后选择“以 Java 应用程序运行”。消费者应用程序将开始运行,您将看到如下输出 -

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

发送消息

在生产者控制台窗口中,键入 Hi 并按 Enter 键发送消息。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Enter message:
Hi

接收消息

在消费者控制台窗口中验证是否已收到消息。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received = Hi

发送 quit 作为消息以终止生产者和消费者控制台窗口会话。

验证

现在在浏览器中打开 **http://127.0.0.1:8161/admin/**。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查队列以检查状态。它将显示已入队和已传递的 2 条消息。

Queue

Apache ActiveMQ - 发布者应用程序

现在让我们创建一个发布者应用程序,它将消息发送到 ActiveMQ 队列。

创建项目

使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。

输入详细信息,如下所示 -

  • **groupId** - com.tutorialspoint

  • **artifactId** - publisher

  • **version** - 0.0.1-SNAPSHOT

  • **name** - ActiveMQ 发布者

单击“完成”按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
   https://maven.apache.org/xsd/maven-4.0.0.xsd">
   
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>publisher</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>ActiveMQ Publisher</name>
   <dependencies>
      <dependency>
         <groupId>org.apache.geronimo.specs</groupId>
         <artifactId>geronimo-jms_1.1_spec</artifactId>
         <version>1.1</version>
      </dependency>
      <dependency>
         <groupId>org.apache.qpid</groupId>
         <artifactId>qpid-jms-client</artifactId>
         <version>0.40.0</version>
      </dependency>
   </dependencies>
   <build>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.1</version>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
            </configuration>
         </plugin>
         <plugin>
            <groupId>org.fusesource.mvnplugins</groupId>
            <artifactId>maven-uberize-plugin</artifactId>
            <version>1.14</version>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals><goal>uberize</goal></goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>  
</project>

现在创建一个发布者类,它将消息发送到 ActiveMQ 主题以将其广播给所有订阅者。

package com.tutorialspoint.activemq;

import java.io.Console;
import java.util.Scanner;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;

public class Publisher {
   public static void main(String[] args) throws Exception {
      // Create a connection to ActiveMQ JMS broker using AMQP protocol
      JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
      Connection connection = factory.createConnection("admin", "password");
      connection.start();

      // Create a session
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      // Create a topic
      Destination destination = session.createTopic("MyFirstTopic");

      // Create a publisher specific to topic
      MessageProducer publisher = session.createProducer(destination);

      Scanner input = new Scanner(System.in);
      String response;
      do {
         System.out.println("Enter message: ");
         response = input.nextLine();
         // Create a message object
         TextMessage msg = session.createTextMessage(response);

         // Send the message to the topic
         publisher.send(msg);

      } while (!response.equalsIgnoreCase("Quit"));
      input.close();

      // Close the connection
      connection.close();
   }
}

生产者类创建连接,启动会话,创建生产者,然后要求用户输入消息。如果用户输入 quit,则应用程序终止,否则它将消息发送到主题。

我们将在 ActiveMQ - 测试应用程序 一章中运行此应用程序。

Apache ActiveMQ - 订阅者应用程序

现在让我们创建一个订阅者应用程序,它将从 ActiveMQ 主题接收消息。

创建项目

使用 eclipse,选择 **文件** → **新建** → **Maven 项目**。勾选 **创建简单的项目(跳过原型选择)** 并单击“下一步”。

输入详细信息,如下所示 -

  • **groupId** - com.tutorialspoint

  • **artifactId** - subscriber

  • **version** - 0.0.1-SNAPSHOT

  • **name** - ActiveMQ 订阅者

单击“完成”按钮,将创建一个新项目。

pom.xml

现在更新 pom.xml 的内容以包含 ActiveMQ 的依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
   https://maven.apache.org/xsd/maven-4.0.0.xsd">
   
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.tutorialspoint.activemq</groupId>
   <artifactId>subscriber</artifactId>
   <version>0.0.1-SNAPSHOT</version>
   <name>ActiveMQ Subscriber</name>
   <dependencies>
      <dependency>
         <groupId>org.apache.geronimo.specs</groupId>
         <artifactId>geronimo-jms_1.1_spec</artifactId>
         <version>1.1</version>
      </dependency>
      <dependency>
         <groupId>org.apache.qpid</groupId>
         <artifactId>qpid-jms-client</artifactId>
         <version>0.40.0</version>
      </dependency>
   </dependencies>
   <build>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.1</version>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
            </configuration>
         </plugin>
         <plugin>
            <groupId>org.fusesource.mvnplugins</groupId>
            <artifactId>maven-uberize-plugin</artifactId>
            <version>1.14</version>
            <executions>
               <execution>
                  <phase>package</phase>
                  <goals><goal>uberize</goal></goals>
               </execution>
            </executions>
         </plugin>
      </plugins>
   </build>  
</project>

现在创建一个 Subscriber 类,它将接收来自 ActiveMQ 队列的消息。

package com.tutorialspoint.activemq;

import java.io.Console;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;

public class Subscriber {
   public static void main(String[] args) throws Exception {
      // Create a connection to ActiveMQ JMS broker using AMQP protocol
      JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
      Connection connection = factory.createConnection("admin", "password");
      connection.start();

      // Create a session
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      // Create a topic
      Destination destination = session.createTopic("MyFirstTopic");

      // Create a subscriber specific to topic
      MessageConsumer subscriber = session.createConsumer(destination);

      Console c = System.console();
      String response;
      do {      	
         // Receive the message
         Message msg = subscriber.receive();
         response = ((TextMessage) msg).getText();

         System.out.println("Received = "+response);

      } while (!response.equalsIgnoreCase("Quit"));

      // Close the connection
      connection.close();
   }
}

Subscriber 类创建一个连接,启动会话,创建一个消费者,然后接收主题中的消息(如果有)。如果主题包含 quit 作为消息,则应用程序终止,否则它将继续轮询队列以获取消息。

我们将多次运行此应用程序,以便在ActiveMQ - 测试应用程序章节中创建多个订阅者。

Apache ActiveMQ - 测试应用程序主题

启动 ActiveMQ 服务器

现在让我们启动 ActiveMQ 服务器。转到文件夹 **F:/ → Apache → apache-activemq-5.16.4/bin** 并键入以下命令。

示例

F:\Apache\apache-activemq-5.16.4\bin>activemq start

输出

您将看到类似的输出,ActiveMQ 将开始运行。

...
INFO | Apache ActiveMQ 5.16.4 (localhost, ID:DESKTOP-86KD9FC-52669-1645860020983-0:1) started
INFO | For help or more information please see: https://activemq.apache.org
INFO | ActiveMQ WebConsole available at http://127.0.0.1:8161/
INFO | ActiveMQ Jolokia REST API available at http://127.0.0.1:8161/api/jolokia/

启动发布者应用程序

在 Eclipse 中,右键单击 Publisher.java 源文件,然后选择“Run As”→“Java Application”。发布者应用程序将开始运行,您将看到如下输出:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Enter message:

启动订阅者应用程序

在 Eclipse 中,右键单击 Subscriber.java 源文件,然后选择“Run As”→“Java Application”。订阅者应用程序将开始运行,您将看到如下输出:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

启动另一个订阅者应用程序

在 Eclipse 中,再次右键单击 Subscriber.java 源文件,然后选择“Run As”→“Java Application”。另一个订阅者应用程序将开始运行,您将看到如下输出:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

发送消息

在发布者控制台窗口中,键入 Hi 并按 Enter 键发送消息。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Enter message:
Hi

接收消息

在订阅者控制台窗口中验证每个窗口是否都收到了消息。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received = Hi

发送 Quit 作为消息以终止所有发布者和订阅者控制台窗口会话。

验证

现在在浏览器中打开http://127.0.0.1:8161/admin/。它将要求输入凭据。使用 admin/admin 作为用户名/密码,它将加载 ActiveMQ 管理控制台,您可以在其中检查主题以检查状态。它将显示排队和已传送的多个消息。

Topics
广告