Apache Storm 快速指南



Apache Storm - 简介

什么是 Apache Storm?

Apache Storm 是一个分布式实时大数据处理系统。Storm 旨在以容错且横向扩展的方式处理海量数据。它是一个流数据框架,具有最高的摄取率能力。尽管 Storm 是无状态的,但它通过 Apache ZooKeeper 管理分布式环境和集群状态。它很简单,您可以在并行中对实时数据执行各种操作。

Apache Storm 继续成为实时数据分析的领导者。Storm 易于设置和操作,并保证每个消息至少会被拓扑处理一次。

Apache Storm 与 Hadoop

基本上,Hadoop 和 Storm 框架用于分析大数据。它们彼此互补,但在某些方面有所不同。Apache Storm 执行所有操作(持久性除外),而 Hadoop 擅长所有操作,但在实时计算方面却有所欠缺。下表比较了 Storm 和 Hadoop 的属性。

Storm Hadoop
实时流处理 批处理
无状态 有状态
基于 ZooKeeper 的协调的主/从架构。主节点称为 **nimbus**,从节点称为 **supervisor**。 带或不带基于 ZooKeeper 的协调的主从架构。主节点是 **job tracker**,从节点是 **task tracker**。
Storm 流处理可以在集群上每秒访问数万条消息。 Hadoop 分布式文件系统 (HDFS) 使用 MapReduce 框架处理需要数分钟或数小时才能完成的海量数据。
Storm 拓扑一直运行,直到用户关闭或发生意外的不可恢复的故障。 MapReduce 作业按顺序执行并最终完成。
两者都是分布式且容错的
如果 nimbus/supervisor 死亡,重新启动使其从停止的地方继续,因此不会受到任何影响。 如果 JobTracker 死亡,所有正在运行的作业都会丢失。

Apache Storm 的用例

Apache Storm 因其实时大数据流处理而闻名。因此,大多数公司都将 Storm 作为其系统不可或缺的一部分。一些值得注意的例子如下:

**Twitter** - Twitter 使用 Apache Storm 构建其一系列“发布者分析产品”。“发布者分析产品”处理 Twitter 平台上的每一条推文和点击。Apache Storm 与 Twitter 基础架构深度集成。

**NaviSite** - NaviSite 使用 Storm 构建事件日志监控/审计系统。系统中生成的每个日志都会通过 Storm。Storm 会根据配置的正则表达式集检查消息,如果匹配,则将该特定消息保存到数据库中。

**Wego** - Wego 是一家位于新加坡的旅游元搜索引擎。旅游相关数据来自世界各地的许多来源,时间也不同。Storm 帮助 Wego 搜索实时数据,解决并发问题并为最终用户找到最佳匹配。

Apache Storm 的优势

以下是 Apache Storm 提供的优势列表:

  • Storm 是开源的、健壮的且用户友好的。它可以被小型公司和大型企业使用。

  • Storm 具有容错性、灵活性和可靠性,并支持任何编程语言。

  • 允许实时流处理。

  • Storm 非常快,因为它具有强大的数据处理能力。

  • Storm 通过线性增加资源,即使在负载增加的情况下也能保持性能。它具有高度的可扩展性。

  • Storm 在几秒或几分钟内完成数据刷新和端到端交付响应,具体取决于问题。它的延迟非常低。

  • Storm 具有运营智能。

  • 即使集群中的任何连接节点死亡或消息丢失,Storm 也可以保证数据处理。

Apache Storm - 核心概念

Apache Storm 从一端读取原始的实时数据流,并将其通过一系列小的处理单元,并在另一端输出处理后的/有用的信息。

下图描述了 Apache Storm 的核心概念。

Core Concept

现在让我们仔细看看 Apache Storm 的组件:

组件 描述
Tuple Tuple 是 Storm 中的主要数据结构。它是有序元素的列表。默认情况下,Tuple 支持所有数据类型。通常,它被建模为一组逗号分隔的值,并传递到 Storm 集群。
Stream Stream 是 Tuple 的无序序列。
Spouts 流的来源。通常,Storm 从原始数据源(如 Twitter Streaming API、Apache Kafka 队列、Kestrel 队列等)接受输入数据。或者,您可以编写 spout 从数据源读取数据。“ISpout”是实现 spout 的核心接口。一些具体的接口是 IRichSpout、BaseRichSpout、KafkaSpout 等。
Bolts Bolt 是逻辑处理单元。Spout 将数据传递给 bolt,bolt 处理并生成新的输出流。Bolt 可以执行过滤、聚合、连接、与数据源和数据库交互等操作。Bolt 接收数据并向一个或多个 bolt 发射数据。“IBolt”是实现 bolt 的核心接口。一些常见的接口是 IRichBolt、IBasicBolt 等。

让我们以“Twitter 分析”的实时示例为例,看看它如何在 Apache Storm 中建模。下图描述了结构。

Twitter Analysis

“Twitter 分析”的输入来自 Twitter Streaming API。Spout 将使用 Twitter Streaming API 读取用户的推文,并将其输出为 Tuple 流。来自 spout 的单个 Tuple 将包含 Twitter 用户名和一条推文作为逗号分隔的值。然后,此 Tuple 流将转发到 Bolt,Bolt 将推文拆分为单个单词,计算单词计数,并将信息持久化到配置的数据源。现在,我们可以通过查询数据源轻松获取结果。

Topology

Spout 和 bolt 连接在一起,形成一个拓扑。实时应用程序逻辑在 Storm 拓扑中指定。简单来说,拓扑是一个有向图,其中顶点是计算,边是数据流。

一个简单的拓扑以 spout 开始。Spout 将数据发射到一个或多个 bolt。Bolt 代表拓扑中具有最小处理逻辑的节点,bolt 的输出可以作为输入发射到另一个 bolt。

Storm 使拓扑始终运行,直到您杀死拓扑。Apache Storm 的主要工作是运行拓扑,并在给定时间运行任意数量的拓扑。

Tasks

现在您对 spout 和 bolt 有了基本的了解。它们是拓扑的最小逻辑单元,拓扑是使用单个 spout 和一系列 bolt 构建的。为了使拓扑成功运行,它们应该按特定顺序正确执行。Storm 执行每个 spout 和 bolt 的过程称为“Tasks”。简单来说,task 或者是 spout 的执行,或者是 bolt 的执行。在给定时间,每个 spout 和 bolt 都可以有多个实例在多个独立线程中运行。

Workers

拓扑以分布式方式在多个工作节点上运行。Storm 将任务均匀地分布在所有工作节点上。工作节点的作用是侦听作业并在有新作业到达时启动或停止进程。

Stream Grouping

数据流从 spout 流向 bolt 或从一个 bolt 流向另一个 bolt。Stream Grouping 控制 Tuple 如何在拓扑中路由,并帮助我们了解 Tuple 在拓扑中的流动。有四个内置的分组,如下所述。

Shuffle Grouping

在 shuffle grouping 中,相同数量的 Tuple 会随机分布到执行 bolt 的所有工作节点。下图描述了结构。

Shuffle Grouping

Field Grouping

将 Tuple 中具有相同值的字段组合在一起,并将其余 Tuple 保留在外部。然后,将具有相同字段值的 Tuple 发送到执行 bolt 的同一工作节点。例如,如果流按字段“word”分组,则具有相同字符串“Hello”的 Tuple 将移动到同一工作节点。下图显示了 Field Grouping 的工作原理。

Field Grouping

Global Grouping

所有流都可以分组并转发到一个 bolt。此分组将所有源实例生成的 Tuple 发送到单个目标实例(具体来说,选择 ID 最低的 worker)。

Global Grouping

All Grouping

All Grouping 将每个 Tuple 的单个副本发送到接收 bolt 的所有实例。这种分组用于向 bolt 发送信号。All Grouping 对于连接操作很有用。

All Grouping

Apache Storm - 集群架构

Apache Storm 的主要亮点之一是它是一个容错、快速且没有“单点故障”(SPOF)的分布式应用程序。我们可以根据需要在尽可能多的系统中安装 Apache Storm 以提高应用程序的容量。

让我们看看 Apache Storm 集群是如何设计的以及其内部架构。下图描述了集群设计。

Zookeeper Framework

Apache Storm 有两种类型的节点,**Nimbus**(主节点)和 **Supervisor**(工作节点)。Nimbus 是 Apache Storm 的核心组件。Nimbus 的主要工作是运行 Storm 拓扑。Nimbus 分析拓扑并收集要执行的任务。然后,它将任务分发到可用的 supervisor。

supervisor 将拥有一个或多个工作进程。supervisor 将任务委托给工作进程。工作进程将根据需要生成执行器并运行任务。Apache Storm 使用内部分布式消息系统来实现 nimbus 和 supervisor 之间的通信。

组件 描述
Nimbus Nimbus 是 Storm 集群的主节点。集群中的所有其他节点都称为**工作节点**。主节点负责在所有工作节点之间分发数据,将任务分配给工作节点并监控故障。
Supervisor 遵循 nimbus 给出的指令的节点称为 Supervisor。**Supervisor** 具有多个工作进程,它管理工作进程以完成 nimbus 分配的任务。
Worker process 工作进程将执行与特定拓扑相关的任务。工作进程本身不会运行任务,而是创建**执行器**并要求它们执行特定任务。工作进程将有多个执行器。
Executor 执行器不过是工作进程生成的单个线程。执行器运行一个或多个任务,但仅针对特定的 spout 或 bolt。
Task 任务执行实际的数据处理。因此,它要么是 spout,要么是 bolt。
ZooKeeper 框架

Apache ZooKeeper 是一项由集群(节点组)用于彼此之间协调并使用强大的同步技术维护共享数据的服务。Nimbus 是无状态的,因此它依赖于 ZooKeeper 来监视工作节点状态。

ZooKeeper 帮助主管与 Nimbus 进行交互。它负责维护 Nimbus 和主管的状态。

Storm 本质上是无状态的。尽管无状态特性有其自身的缺点,但它实际上帮助 Storm 以最佳和最快的方式处理实时数据。

不过,Storm 并非完全无状态。它将其状态存储在 Apache ZooKeeper 中。由于状态在 Apache ZooKeeper 中可用,因此可以重新启动失败的 Nimbus 并使其从中断的地方继续工作。通常,服务监控工具(如 **monit**)会监视 Nimbus,并在发生任何故障时重新启动它。

Apache Storm 还具有称为 **Trident Topology** 的高级拓扑,它具有状态维护功能,并且还提供类似 Pig 的高级 API。我们将在接下来的章节中讨论所有这些功能。

Apache Storm - 工作流程

一个工作的 Storm 集群应该有一个 Nimbus 和一个或多个主管。另一个重要的节点是 Apache ZooKeeper,它将用于 Nimbus 和主管之间的协调。

现在让我们仔细看看 Apache Storm 的工作流程 -

  • 最初,Nimbus 将等待“Storm Topology”提交给它。

  • 一旦提交了拓扑,它将处理拓扑并收集所有要执行的任务以及任务执行的顺序。

  • 然后,Nimbus 将任务均匀地分配给所有可用的主管。

  • 在特定时间间隔内,所有主管都将向 Nimbus 发送心跳以通知它们仍然处于活动状态。

  • 当主管死亡并且没有向 Nimbus 发送心跳时,Nimbus 将任务分配给另一个主管。

  • 当 Nimbus 本身死亡时,主管将毫无问题地继续执行已分配的任务。

  • 一旦所有任务都完成了,主管将等待新的任务进来。

  • 同时,死掉的 Nimbus 将由服务监控工具自动重新启动。

  • 重新启动的 Nimbus 将从停止的地方继续。同样,死掉的主管也可以自动重新启动。由于 Nimbus 和主管都可以自动重新启动,并且两者都将像以前一样继续,因此可以保证 Storm 至少处理一次所有任务。

  • 一旦所有拓扑都处理完毕,Nimbus 将等待新的拓扑到达,类似地,主管将等待新的任务。

默认情况下,Storm 集群中有两种模式 -

  • **本地模式** - 此模式用于开发、测试和调试,因为它是查看所有拓扑组件协同工作最简单的方法。在此模式下,我们可以调整参数,使我们能够查看拓扑在不同的 Storm 配置环境中如何运行。在本地模式下,Storm 拓扑在单个 JVM 中的本地机器上运行。

  • **生产模式** - 在此模式下,我们将拓扑提交到正在工作的 Storm 集群,该集群由许多进程组成,通常运行在不同的机器上。如 Storm 工作流程中所述,正在工作的集群将无限期运行,直到关闭。

Storm - 分布式消息系统

Apache Storm 处理实时数据,输入通常来自消息队列系统。外部分布式消息系统将提供实时计算所需的输入。Spout 将从消息系统读取数据并将其转换为元组,并输入到 Apache Storm 中。有趣的事实是,Apache Storm 在内部使用自己的分布式消息系统来实现 Nimbus 和主管之间的通信。

什么是分布式消息系统?

分布式消息基于可靠消息队列的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息系统提供了可靠性、可扩展性和持久性的优势。

大多数消息模式遵循 **发布-订阅** 模型(简称为 **Pub-Sub**),其中消息的发送者称为 **发布者**,而希望接收消息的接收者称为 **订阅者**。

一旦发送者发布了消息,订阅者就可以在过滤选项的帮助下接收选定的消息。通常我们有两种类型的过滤,一种是 **基于主题的过滤**,另一种是 **基于内容的过滤**。

请注意,Pub-Sub 模型只能通过消息进行通信。它是一种非常松散耦合的架构;即使发送者也不知道他们的订阅者是谁。许多消息模式使消息代理能够交换发布的消息,以便许多订阅者及时访问。一个现实生活中的例子是 Dish TV,它发布不同的频道,如体育、电影、音乐等,任何人都可以订阅他们自己的一组频道,并在其订阅的频道可用时获得它们。

Messaging System

下表描述了一些流行的高吞吐量消息系统 -

分布式消息系统 描述
Apache Kafka Kafka 是在 LinkedIn 公司开发的,后来成为 Apache 的一个子项目。Apache Kafka 基于代理启用、持久、分布式发布-订阅模型。Kafka 快速、可扩展且高效。
RabbitMQ RabbitMQ 是一种开源的分布式健壮消息应用程序。它易于使用,并且可以在所有平台上运行。
JMS(Java 消息服务) JMS 是一种开源 API,支持在应用程序之间创建、读取和发送消息。它提供保证的消息传递并遵循发布-订阅模型。
ActiveMQ ActiveMQ 消息系统是 JMS 的开源 API。
ZeroMQ ZeroMQ 是无代理的对等消息处理。它提供推拉、路由器-经销商消息模式。
Kestrel Kestrel 是一种快速、可靠且简单的分布式消息队列。

Thrift 协议

Thrift 是 Facebook 为跨语言服务开发和远程过程调用 (RPC) 而构建的。后来,它成为了一个开源的 Apache 项目。Apache Thrift 是一种 **接口定义语言**,并允许以简单的方式在定义的数据类型之上定义新的数据类型和服务实现。

Apache Thrift 也是一个通信框架,支持嵌入式系统、移动应用程序、Web 应用程序和许多其他编程语言。与 Apache Thrift 相关的一些关键功能包括其模块化、灵活性以及高性能。此外,它可以在分布式应用程序中执行流、消息传递和 RPC。

Storm 广泛使用 Thrift 协议进行其内部通信和数据定义。Storm 拓扑只是 **Thrift 结构**。在 Apache Storm 中运行拓扑的 Storm Nimbus 是一个 **Thrift 服务**。

Apache Storm - 安装

现在让我们看看如何在您的机器上安装 Apache Storm 框架。这里有三个主要步骤 -

  • 如果您的系统上还没有安装 Java,请安装它。
  • 安装 ZooKeeper 框架。
  • 安装 Apache Storm 框架。

步骤 1 - 验证 Java 安装

使用以下命令检查您的系统上是否已安装 Java。

$ java -version

如果 Java 已存在,则您将看到其版本号。否则,下载最新版本的 JDK。

步骤 1.1 - 下载 JDK

使用以下链接下载最新版本的 JDK - www.oracle.com

最新版本是 JDK 8u 60,文件是 **“jdk-8u60-linux-x64.tar.gz”**。将文件下载到您的机器上。

步骤 1.2 - 解压缩文件

通常文件会下载到 **downloads** 文件夹中。使用以下命令解压缩 tar 设置。

$ cd /go/to/download/path
$ tar -zxf jdk-8u60-linux-x64.gz

步骤 1.3 - 移动到 opt 目录

为了使所有用户都能使用 Java,请将解压缩的 Java 内容移动到“/usr/local/java”文件夹。

$ su
password: (type password of root user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

步骤 1.4 - 设置路径

要设置路径和 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。

export JAVA_HOME =/usr/jdk/jdk-1.8.0_60
export PATH=$PATH:$JAVA_HOME/bin

现在将所有更改应用到当前正在运行的系统中。

$ source ~/.bashrc

步骤 1.5 - Java 备选方案

使用以下命令更改 Java 备选方案。

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

步骤 1.6

现在使用步骤 1 中说明的验证命令 **(java -version)** 验证 Java 安装。

步骤 2 - ZooKeeper 框架安装

步骤 2.1 - 下载 ZooKeeper

要在您的机器上安装 ZooKeeper 框架,请访问以下链接并下载最新版本的 ZooKeeper https://zookeeper.net.cn/releases.html

截至目前,ZooKeeper 的最新版本是 3.4.6 (ZooKeeper-3.4.6.tar.gz)。

步骤 2.2 - 解压缩 tar 文件

使用以下命令解压缩 tar 文件 -

$ cd opt/
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

步骤 2.3 - 创建配置文件

使用命令“vi conf/zoo.cfg”打开名为“conf/zoo.cfg”的配置文件,并将所有以下参数设置为起点。

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

成功保存配置文件后,您可以启动 ZooKeeper 服务器。

步骤 2.4 - 启动 ZooKeeper 服务器

使用以下命令启动 ZooKeeper 服务器。

$ bin/zkServer.sh start

执行此命令后,您将获得如下响应 -

$ JMX enabled by default
$ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg
$ Starting zookeeper ... STARTED

步骤 2.5 - 启动 CLI

使用以下命令启动 CLI。

$ bin/zkCli.sh

执行上述命令后,您将连接到 ZooKeeper 服务器并获得以下响应。

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None path:null
[zk: localhost:2181(CONNECTED) 0]

步骤 2.6 - 停止 ZooKeeper 服务器

连接服务器并执行所有操作后,您可以使用以下命令停止 ZooKeeper 服务器。

bin/zkServer.sh stop

您已成功在您的机器上安装了 Java 和 ZooKeeper。现在让我们看看安装 Apache Storm 框架的步骤。

步骤 3 - Apache Storm 框架安装

步骤 3.1 下载 Storm

要在您的机器上安装 Storm 框架,请访问以下链接并下载最新版本的 Storm http://storm.apache.org/downloads.html

截至目前,Storm 的最新版本是“apache-storm-0.9.5.tar.gz”。

步骤 3.2 - 解压缩 tar 文件

使用以下命令解压缩 tar 文件 -

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

步骤 3.3 - 打开配置文件

Storm 的当前版本包含一个位于“conf/storm.yaml”的文件,该文件配置 Storm 守护程序。将以下信息添加到该文件中。

$ vi conf/storm.yaml
storm.zookeeper.servers:
 - "localhost"
storm.local.dir: “/path/to/storm/data(any path)”
nimbus.host: "localhost"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

应用所有更改后,保存并返回终端。

步骤 3.4 - 启动 Nimbus

$ bin/storm nimbus

步骤 3.5 - 启动主管

$ bin/storm supervisor

步骤 3.6 启动 UI

$ bin/storm ui

启动 Storm 用户界面应用程序后,在您喜欢的浏览器中键入 URL **https://127.0.0.1:8080**,您就可以看到 Storm 集群信息及其正在运行的拓扑。该页面应类似于以下屏幕截图。

Strom UI

Apache Storm - 工作示例

我们已经了解了 Apache Storm 的核心技术细节,现在是时候编写一些简单的场景了。

场景 – 移动通话记录分析器

移动通话及其时长将作为输入提供给 Apache Storm,Storm 将处理并对同一呼叫者和接收者之间的呼叫进行分组,以及他们的总呼叫次数。

Spout 创建

Spout 是用于数据生成的组件。基本上,Spout 将实现 IRichSpout 接口。“IRichSpout”接口具有以下重要方法 -

  • **open** - 为 Spout 提供执行环境。执行程序将运行此方法来初始化 Spout。

  • **nextTuple** - 通过收集器发出生成的数据。

  • **close** - 当 Spout 将要关闭时调用此方法。

  • **declareOutputFields** - 声明元组的输出模式。

  • **ack** - 确认已处理特定元组

  • **fail** - 指定未处理特定元组,并且不应重新处理。

打开

**open** 方法的签名如下 -

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf − 提供此 spout 的 Storm 配置。

  • context − 提供有关 spout 在拓扑中的位置、其任务 ID、输入和输出信息的完整信息。

  • collector − 使我们能够发出将由 bolt 处理的元组。

nextTuple

nextTuple 方法的签名如下所示:

nextTuple()

nextTuple() 从与 ack() 和 fail() 方法相同的循环中定期调用。当没有工作要做时,它必须释放线程的控制权,以便其他方法有机会被调用。因此,nextTuple 的第一行检查处理是否已完成。如果是,它应该休眠至少 1 毫秒以减少处理器负载,然后再返回。

close

close 方法的签名如下所示:

close()

declareOutputFields

declareOutputFields 方法的签名如下所示:

declareOutputFields(OutputFieldsDeclarer declarer)

declarer − 用于声明输出流 ID、输出字段等。

此方法用于指定元组的输出模式。

ack

ack 方法的签名如下所示:

ack(Object msgId)

此方法确认已处理特定元组。

fail

nextTuple 方法的签名如下所示:

ack(Object msgId)

此方法通知特定元组未完全处理。Storm 将重新处理该特定元组。

FakeCallLogReaderSpout

在我们的场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含。

  • 主叫号码
  • 被叫号码
  • 持续时间

由于我们没有呼叫日志的实时信息,我们将生成虚假的呼叫日志。虚假信息将使用 Random 类创建。完整的程序代码如下所示。

编码 - FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt 创建

Bolt 是一种组件,它以元组作为输入,处理元组,并产生新的元组作为输出。Bolt 将实现IRichBolt接口。在此程序中,使用两个 bolt 类CallLogCreatorBoltCallLogCounterBolt来执行操作。

IRichBolt 接口具有以下方法:

  • prepare − 为 bolt 提供执行环境。执行器将运行此方法来初始化 spout。

  • execute − 处理单个输入元组。

  • cleanup − 当 bolt 将要关闭时调用。

  • **declareOutputFields** - 声明元组的输出模式。

Prepare

prepare 方法的签名如下所示:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf − 提供此 bolt 的 Storm 配置。

  • context − 提供有关 bolt 在拓扑中的位置、其任务 ID、输入和输出信息等的完整信息。

  • collector − 使我们能够发出已处理的元组。

execute

execute 方法的签名如下所示:

execute(Tuple tuple)

这里tuple是要处理的输入元组。

execute方法一次处理一个元组。可以通过 Tuple 类的 getValue 方法访问元组数据。不需要立即处理输入元组。可以处理多个元组并将其输出为单个输出元组。可以使用 OutputCollector 类发出已处理的元组。

cleanup

cleanup 方法的签名如下所示:

cleanup()

declareOutputFields

declareOutputFields 方法的签名如下所示:

declareOutputFields(OutputFieldsDeclarer declarer)

这里参数declarer用于声明输出流 ID、输出字段等。

此方法用于指定元组的输出模式

呼叫日志创建器 Bolt

呼叫日志创建器 bolt 接收呼叫日志元组。呼叫日志元组包含主叫号码、被叫号码和呼叫持续时间。此 bolt 仅通过组合主叫号码和被叫号码来创建一个新值。新值的格式为“主叫号码 - 被叫号码”,并将其命名为新字段“call”。完整的代码如下所示。

编码 - CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

呼叫日志计数器 Bolt

呼叫日志计数器 bolt 接收呼叫及其持续时间作为元组。此 bolt 在 prepare 方法中初始化一个字典 (Map) 对象。在execute方法中,它检查元组并为元组中的每个新的“call”值在字典对象中创建一个新条目,并在字典对象中设置值 1。对于字典中已有的条目,它只是将其值递增。简单来说,此 bolt 将呼叫及其计数保存在字典对象中。除了将呼叫及其计数保存在字典中,我们还可以将其保存到数据源中。完整的程序代码如下所示:

编码 - CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

创建拓扑

Storm 拓扑基本上是一个 Thrift 结构。TopologyBuilder 类提供简单易用的方法来创建复杂的拓扑。TopologyBuilder 类具有设置 spout (setSpout) 和设置 bolt (setBolt) 的方法。最后,TopologyBuilder 具有 createTopology 以创建拓扑。使用以下代码片段创建拓扑:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping 方法有助于为 spout 和 bolt 设置流分组。

本地集群

出于开发目的,我们可以使用“LocalCluster”对象创建本地集群,然后使用“LocalCluster”类的“submitTopology”方法提交拓扑。 “submitTopology”的参数之一是“Config”类的实例。“Config”类用于在提交拓扑之前设置配置选项。此配置选项将在运行时与集群配置合并,并通过 prepare 方法发送到所有任务(spout 和 bolt)。拓扑提交到集群后,我们将等待 10 秒钟,让集群计算提交的拓扑,然后使用“LocalCluster”的“shutdown”方法关闭集群。完整的程序代码如下所示:

编码 - LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

构建和运行应用程序

完整的应用程序有四个 Java 代码。它们是:

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

可以使用以下命令构建应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行应用程序:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

输出

应用程序启动后,它将输出有关集群启动过程、spout 和 bolt 处理以及最终集群关闭过程的完整详细信息。在“CallLogCounterBolt”中,我们打印了呼叫及其计数详细信息。此信息将显示在控制台上,如下所示:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非 JVM 语言

Storm 拓扑由 Thrift 接口实现,这使得在任何语言中提交拓扑变得容易。Storm 支持 Ruby、Python 和许多其他语言。让我们看看 python 绑定。

Python 绑定

Python 是一种通用的解释型、交互式、面向对象的高级编程语言。Storm 支持 Python 来实现其拓扑。Python 支持发出、锚定、确认和记录操作。

如您所知,bolt 可以用任何语言定义。用另一种语言编写的 bolt 作为子进程执行,Storm 通过 stdin/stdout 上的 JSON 消息与这些子进程通信。首先获取一个支持 python 绑定的示例 bolt WordCount。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

这里类WordCount实现IRichBolt接口并使用指定的 python 实现运行超级方法参数“splitword.py”。现在创建一个名为“splitword.py”的 python 实现。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

这是 Python 的示例实现,用于计算给定句子中的单词数。类似地,您也可以绑定其他支持的语言。

Apache Storm - Trident

Trident 是 Storm 的扩展。与 Storm 一样,Trident 也由 Twitter 开发。开发 Trident 的主要原因是在 Storm 之上提供高级抽象,以及有状态流处理和低延迟分布式查询。

Trident 使用 spout 和 bolt,但这些低级组件在执行之前由 Trident 自动生成。Trident 具有函数、过滤器、联接、分组和聚合。

Trident 将流处理为一系列批次,这些批次称为事务。通常,这些小批次的大小将以数千或数百万个元组为单位,具体取决于输入流。这样,Trident 就不同于 Storm,后者执行逐个元组的处理。

批处理概念与数据库事务非常相似。每个事务都分配一个事务 ID。一旦所有处理完成,则认为事务成功。但是,处理事务的其中一个元组失败将导致整个事务被重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,并在事务结束时调用 commit。

Trident 拓扑

Trident API 公开了一个简单的选项,可以使用“TridentTopology”类创建 Trident 拓扑。基本上,Trident 拓扑从 spout 接收输入流,并在流上执行有序的序列操作(过滤器、聚合、分组等)。Storm 元组被 Trident 元组替换,Bolt 被操作替换。可以如下创建简单的 Trident 拓扑:

TridentTopology topology = new TridentTopology();

Trident 元组

Trident 元组是命名的值列表。TridentTuple 接口是 Trident 拓扑的数据模型。TridentTuple 接口是 Trident 拓扑可以处理的基本数据单元。

Trident Spout

Trident spout 类似于 Storm spout,但增加了使用 Trident 功能的选项。实际上,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但它本质上是非事务性的,我们无法使用 Trident 提供的优势。

具有使用 Trident 功能的所有功能的基本 spout 是“ITridentSpout”。它支持事务性和不透明事务性语义。其他 spout 是 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。

除了这些通用 spout 之外,Trident 还有许多 trident spout 的示例实现。其中之一是 FeederBatchSpout spout,我们可以使用它轻松发送命名的 trident 元组列表,而无需担心批处理、并行性等。

FeederBatchSpout 创建和数据馈送可以如下所示完成:

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident 操作

Trident 依赖于“Trident 操作”来处理 trident 元组的输入流。Trident API 有许多内置操作来处理从简单到复杂的流处理。这些操作范围从简单的验证到 trident 元组的复杂分组和聚合。让我们了解一下最重要和最常用的操作。

过滤器

过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取 trident 元组字段的子集作为输入,并根据是否满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组将从流中删除。过滤器基本上将继承自BaseFilter类并实现isKeep方法。以下是对过滤器操作的示例实现:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以在拓扑中使用“each”方法调用过滤器函数。“Fields”类可用于指定输入(trident 元组的子集)。示例代码如下所示:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

函数

函数是一个用于对单个 trident 元组执行简单操作的对象。它获取 trident 元组字段的子集并发出零个或多个新的 trident 元组字段。

函数基本上继承自BaseFunction类并实现execute方法。下面给出了一个示例实现:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

就像 Filter 操作一样,可以在拓扑中使用each方法调用 Function 操作。示例代码如下所示:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚合是一个用于对输入批次或分区或流执行聚合操作的对象。Trident 有三种类型的聚合。它们如下所示:

  • aggregate − 对每个 Trident 元组批次独立进行聚合。在聚合过程中,元组最初使用全局分组重新分区,将同一批次的所有分区合并到单个分区中。

  • partitionAggregate − 对每个分区进行聚合,而不是对整个 Trident 元组批次进行聚合。分区聚合的输出完全替换输入元组。分区聚合的输出包含单个字段元组。

  • persistentaggregate − 对所有批次中的所有 Trident 元组进行聚合,并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

聚合操作可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 接口创建。“count” 聚合器在以上示例中使用,是内置聚合器之一。它使用“CombinerAggregator”实现。实现如下:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

分组

分组操作是内置操作,可以通过groupBy方法调用。groupBy 方法通过对指定字段执行 partitionBy 来重新分区流,然后在每个分区内,将组字段相等的元组组合在一起。通常,我们会将“groupBy”与“persistentAggregate”一起使用以获取分组聚合。示例代码如下:

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和连接

合并和连接可以通过分别使用“merge”和“join”方法来完成。合并组合一个或多个流。连接类似于合并,但连接使用来自两侧的 Trident 元组字段来检查和连接两个流。此外,连接仅在批次级别工作。示例代码如下:

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

状态维护

Trident 提供了一种状态维护机制。状态信息可以存储在拓扑本身中,也可以存储在单独的数据库中。原因是为了维护一个状态,如果任何元组在处理过程中失败,则重试失败的元组。这在更新状态时会产生问题,因为您不确定此元组的状态是否已更新过。如果元组在更新状态之前失败,则重试元组将使状态稳定。但是,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数,使状态不稳定。需要执行以下步骤以确保消息仅处理一次:

  • 将元组分成小批次处理。

  • 为每个批次分配一个唯一的 ID。如果批次重试,则赋予相同的唯一 ID。

  • 状态更新在批次之间是有序的。例如,第二个批次的状态更新只有在第一个批次的状态更新完成后才可能进行。

分布式 RPC

分布式 RPC 用于查询和检索 Trident 拓扑中的结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求,并将其传递给拓扑。拓扑处理请求并将结果发送到分布式 RPC 服务器,然后由分布式 RPC 服务器重定向到客户端。Trident 的分布式 RPC 查询执行方式类似于普通的 RPC 查询,但这些查询是并行运行的。

何时使用 Trident?

在许多用例中,如果要求仅处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,在 Storm 的情况下,很难实现完全一次处理。因此,对于需要完全一次处理的用例,Trident 将很有用。Trident 并非适用于所有用例,特别是高性能用例,因为它增加了 Storm 的复杂性并管理状态。

Trident 的工作示例

我们将把上一节中完成的呼叫日志分析器应用程序转换为 Trident 框架。由于 Trident 的高级 API,Trident 应用程序将比普通的 Storm 应用程序相对容易。Storm 将基本上需要在 Trident 中执行任何一个 Function、Filter、Aggregate、GroupBy、Join 和 Merge 操作。最后,我们将使用LocalDRPC类启动 DRPC 服务器,并使用LocalDRPC类的execute方法搜索一些关键字。

格式化呼叫信息

FormatCall 类的目的是格式化包含“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下:

编码:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit 类的目的是根据“逗号 (,)”分割输入字符串,并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整的代码如下:

编码:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日志分析器

这是主应用程序。最初,应用程序将初始化 TridentTopology 并使用FeederBatchSpout提供呼叫者信息。可以使用 TridentTopology 类的newStream方法创建 Trident 拓扑流。类似地,可以使用 TridentTopology 类的newDRCPStream方法创建 Trident 拓扑 DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。LocalDRPC具有 execute 方法来搜索一些关键字。完整的代码如下所示。

编码:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

构建和运行应用程序

完整的应用程序包含三个 Java 代码。它们如下:

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下命令构建应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行应用程序:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

应用程序启动后,将输出有关集群启动过程、操作处理、DRPC 服务器和客户端信息以及最终集群关闭过程的完整详细信息。此输出将显示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

Twitter 中的 Apache Storm

在本章中,我们将讨论 Apache Storm 的一个实时应用程序。我们将了解 Storm 如何在 Twitter 中使用。

Twitter

Twitter 是一种在线社交网络服务,提供了一个发送和接收用户推文的平台。注册用户可以阅读和发布推文,但未注册用户只能阅读推文。主题标签用于通过在相关关键字前添加 # 来按关键字对推文进行分类。现在让我们以查找每个主题使用最多的主题标签的实时场景为例。

Spout 创建

Spout 的目的是尽快获取人们提交的推文。Twitter 提供“Twitter 流式 API”,这是一个基于 Web 服务的工具,用于实时检索人们提交的推文。Twitter 流式 API 可以通过任何编程语言访问。

twitter4j 是一个开源的非官方 Java 库,它提供了一个基于 Java 的模块来轻松访问 Twitter 流式 API。twitter4j 提供了一个基于监听器的框架来访问推文。要访问 Twitter 流式 API,我们需要注册 Twitter 开发者帐户,并获取以下 OAuth 身份验证详细信息。

  • 客户密钥
  • 客户密钥秘钥
  • 访问令牌
  • 访问令牌秘钥

Storm 在其入门工具包中提供了一个 Twitter Spout,TwitterSampleSpout。我们将使用它来检索推文。Spout 需要 OAuth 身份验证详细信息和至少一个关键字。Spout 将根据关键字发出实时推文。完整的程序代码如下所示。

编码:TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
   SpoutOutputCollector _collector;
   LinkedBlockingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpout(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this.consumerKey = consumerKey;
         this.consumerSecret = consumerSecret;
         this.accessToken = accessToken;
         this.accessTokenSecret = accessTokenSecret;
         this.keyWords = keyWords;
   }
		
   public TwitterSampleSpout() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open(Map conf, TopologyContext context,
      SpoutOutputCollector collector) {
         queue = new LinkedBlockingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.offer(status);
            }
					
            @Override
            public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(long l, long l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = new ConfigurationBuilder();
				
         cb.setDebugEnabled(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.length == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(new Values(ret));
      }
   }
			
   @Override
   public void close() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("tweet"));
   }
}

主题标签读取 Bolt

Spout 发出的推文将转发到HashtagReaderBolt,它将处理推文并发出所有可用的主题标签。HashtagReaderBolt 使用 twitter4j 提供的getHashTagEntities方法。getHashTagEntities 读取推文并返回主题标签列表。完整的程序代码如下:

编码:HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
         System.out.println("Hashtag: " + hashtage.getText());
         this.collector.emit(new Values(hashtage.getText()));
      }
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

主题标签计数 Bolt

发出的主题标签将转发到HashtagCounterBolt。此 Bolt 将处理所有主题标签,并使用 Java Map 对象将每个主题标签及其计数保存在内存中。完整的程序代码如下所示。

编码:HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.put(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.put(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

提交拓扑

提交拓扑是主应用程序。Twitter 拓扑包含TwitterSampleSpoutHashtagReaderBoltHashtagCounterBolt。以下程序代码显示了如何提交拓扑。

编码:TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
   public static void main(String[] args) throws Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
         .shuffleGrouping("twitter-spout");

      builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

构建和运行应用程序

完整的应用程序包含四个 Java 代码。它们如下:

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

您可以使用以下命令编译应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

使用以下命令执行应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

输出

应用程序将打印当前可用的主题标签及其计数。输出应类似于以下内容:

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1

雅虎财经中的 Apache Storm

雅虎财经是互联网领先的商业新闻和金融数据网站。它是雅虎的一部分,提供有关金融新闻、市场统计、国际市场数据以及任何人都可以访问的其他金融资源信息。

如果您是注册的雅虎用户,则可以自定义雅虎财经以利用其某些产品。雅虎财经 API 用于从雅虎查询财务数据。

此 API 显示的数据比实时数据延迟 15 分钟,并且每 1 分钟更新一次其数据库,以访问当前的股票相关信息。现在让我们以一家公司的实时场景为例,看看如何在股票价值跌破 100 时发出警报。

Spout 创建

Spout 的目的是获取公司详细信息并将价格发出到 Bolt。您可以使用以下程序代码创建 Spout。

编码:YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

//import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
   private SpoutOutputCollector collector;
   private boolean completed = false;
   private TopologyContext context;
	
   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      try {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this.collector.emit(new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this.collector.emit(new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("company", "price"));
   }

   @Override
   public void close() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Bolt 创建

此处,Bolt 的目的是在价格跌破 100 时处理给定公司的价格。它使用 Java Map 对象在股票价格跌破 100 时将截止价格限制警报设置为true;否则为 false。完整的程序代码如下:

编码:PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.cutOffMap = new HashMap <String, Integer>();
      this.cutOffMap.put("INTC", 100);
      this.cutOffMap.put("AAPL", 100);
      this.cutOffMap.put("GOOGL", 100);

      this.resultMap = new HashMap<String, Boolean>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this.resultMap.put(company, true);
         } else {
            this.resultMap.put(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

提交拓扑

这是主应用程序,其中 YahooFinanceSpout.java 和 PriceCutOffBolt.java 连接在一起并生成拓扑。以下程序代码显示了如何提交拓扑。

编码:YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
   public static void main(String[] args) throws Exception{
      Config config = new Config();
      config.setDebug(true);
		
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

      builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

构建和运行应用程序

完整的应用程序包含三个 Java 代码。它们如下:

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

可以使用以下命令构建应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

可以使用以下命令运行应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

输出

输出将类似于以下内容:

GOOGL : false
AAPL : false
INTC : true

Apache Storm - 应用

Apache Storm 框架支持当今许多最佳的工业应用。在本章中,我们将简要概述 Storm 的一些最著名的应用。

Klout

Klout 是一款使用社交媒体分析根据在线社交影响力对用户进行排名的应用程序,通过Klout 分数,这是一个介于 1 和 100 之间的数值。Klout 使用 Apache Storm 的内置 Trident 抽象来创建流式传输数据的复杂拓扑。

天气频道

天气频道使用 Storm 拓扑来摄取天气数据。它已与 Twitter 合作,以便在 Twitter 和移动应用程序上启用天气信息广告。OpenSignal 是一家专门从事无线覆盖范围测绘的公司。StormTagWeatherSignal 是 OpenSignal 创建的天气相关项目。StormTag 是一款蓝牙气象站,可连接到钥匙扣上。设备收集的天气数据会发送到 WeatherSignal 应用程序和 OpenSignal 服务器。

电信行业

电信提供商每秒处理数百万个电话呼叫。他们对掉话和声音质量差进行取证分析。呼叫详细信息记录以每秒数百万的速度流入,Apache Storm 实时处理这些记录并识别任何令人担忧的模式。Storm 分析可用于持续改进通话质量。

广告