Apache Storm - Trident



Trident 是 Storm 的扩展。与 Storm 一样,Trident 也是由 Twitter 开发的。开发 Trident 的主要原因是在 Storm 之上提供高级抽象,以及提供有状态流处理和低延迟分布式查询。

Trident 使用 spout 和 bolt,但这些低级组件在执行之前由 Trident 自动生成。Trident 具有函数、过滤器、联接、分组和聚合功能。

Trident 将流处理为一系列批次,这些批次称为事务。通常,这些小批次的大小将以数千或数百万个元组为单位,具体取决于输入流。这样,Trident 就不同于 Storm,Storm 执行逐个元组的处理。

批处理的概念与数据库事务非常相似。每个事务都分配一个事务 ID。一旦所有处理完成,则认为事务成功。但是,处理其中一个事务的元组失败将导致整个事务重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,并在事务结束时调用 commit。

Trident Topology

Trident API 提供了一个简单的选项,可以使用“TridentTopology”类创建 Trident topology。基本上,Trident topology 从 spout 接收输入流,并在流上执行有序的操作序列(过滤器、聚合、分组等)。Storm Tuple 被 Trident Tuple 替换,Bolt 被操作替换。可以如下创建一个简单的 Trident topology:

TridentTopology topology = new TridentTopology();

Trident 元组

Trident 元组是命名值的列表。TridentTuple 接口是 Trident topology 的数据模型。TridentTuple 接口是 Trident topology 可以处理的基本数据单元。

Trident Spout

Trident spout 类似于 Storm spout,但增加了使用 Trident 功能的选项。实际上,我们仍然可以使用我们在 Storm topology 中使用的 IRichSpout,但它本质上是非事务性的,我们将无法使用 Trident 提供的优势。

具有使用 Trident 功能的所有功能的基本 spout 是“ITridentSpout”。它支持事务性和不透明事务性语义。其他 spout 包括 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。

除了这些通用 spout 之外,Trident 还有许多 trident spout 的示例实现。其中之一是 FeederBatchSpout spout,我们可以使用它轻松地发送命名列表的 trident 元组,而无需担心批处理、并行性等。

FeederBatchSpout 的创建和数据馈送可以如下所示:

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident 操作

Trident 依赖于“Trident 操作”来处理 trident 元组的输入流。Trident API 有许多内置操作来处理简单到复杂的流处理。这些操作范围从简单的验证到 trident 元组的复杂分组和聚合。让我们了解一下最重要和最常用的操作。

过滤器

过滤器是一个用于执行输入验证任务的对象。Trident 过滤器获取 trident 元组字段的子集作为输入,并根据是否满足某些条件返回 true 或 false。如果返回 true,则元组保留在输出流中;否则,元组将从流中删除。过滤器将基本上继承自BaseFilter类并实现isKeep方法。以下是过滤器操作的示例实现:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

过滤器函数可以使用“each”方法在 topology 中调用。“Fields”类可用于指定输入(trident 元组的子集)。示例代码如下:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

函数

函数是一个用于对单个 trident 元组执行简单操作的对象。它获取 trident 元组字段的子集并发出零个或多个新的 trident 元组字段。

函数基本上继承自BaseFunction类并实现execute方法。下面给出了一个示例实现:

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

就像过滤器操作一样,函数操作可以使用each方法在 topology 中调用。示例代码如下:

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚合是一个用于对输入批次、分区或流执行聚合操作的对象。Trident 有三种类型的聚合。它们如下:

  • aggregate - 独立地聚合每个 trident 元组批次。在聚合过程中,元组最初使用全局分组重新分区,以将同一批次的所有分区合并到单个分区中。

  • partitionAggregate - 聚合每个分区而不是整个 trident 元组批次。分区聚合的输出完全替换输入元组。分区聚合的输出包含一个单字段元组。

  • persistentaggregate - 聚合所有批次中的所有 trident 元组并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

聚合操作可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 接口创建。上面示例中使用的“count”聚合器是内置聚合器之一。它是使用“CombinerAggregator”实现的。实现如下:

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

分组

分组操作是内置操作,可以通过groupBy方法调用。groupBy 方法通过对指定字段执行 partitionBy 来重新分区流,然后在每个分区内,它将组字段相等的元组组合在一起。通常,我们将“groupBy”与“persistentAggregate”一起使用以获取分组聚合。示例代码如下:

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和联接

合并和联接可以通过分别使用“merge”和“join”方法来完成。合并组合一个或多个流。联接类似于合并,除了联接使用来自双方的 trident 元组字段来检查和联接两个流。此外,联接仅在批次级别工作。示例代码如下:

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

状态维护

Trident 提供了一种状态维护机制。状态信息可以存储在 topology 本身中,或者也可以将其存储在单独的数据库中。原因是维护一个状态,如果任何元组在处理过程中失败,则重试失败的元组。这在更新状态时会产生问题,因为您不确定此元组的状态是否已以前更新过。如果元组在更新状态之前失败,则重试元组将使状态稳定。但是,如果元组在更新状态后失败,则重试相同的元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息仅处理一次:

  • 以小批次处理元组。

  • 为每个批次分配一个唯一的 ID。如果批次被重试,则为其分配相同的唯一 ID。

  • 状态更新在批次之间是有序的。例如,第二个批次的状态更新在第一个批次的状态更新完成之前是不可能的。

分布式 RPC

分布式 RPC 用于查询和检索 Trident topology 的结果。Storm 有一个内置的分布式 RPC 服务器。分布式 RPC 服务器接收来自客户端的 RPC 请求并将其传递给 topology。topology 处理请求并将结果发送到分布式 RPC 服务器,然后由分布式 RPC 服务器重定向到客户端。Trident 的分布式 RPC 查询像普通的 RPC 查询一样执行,只是这些查询是并行运行的。

何时使用 Trident?

在许多用例中,如果需求是仅处理一次查询,我们可以通过在 Trident 中编写 topology 来实现它。另一方面,在 Storm 的情况下,很难实现恰好一次处理。因此,Trident 将对那些需要恰好一次处理的用例有用。Trident 并非适用于所有用例,尤其是高性能用例,因为它增加了 Storm 的复杂性并管理状态。

Trident 的工作示例

我们将把上一节中完成的呼叫日志分析器应用程序转换为 Trident 框架。由于其高级 API,Trident 应用程序将相对容易,与普通 storm 相比。Storm 将基本上需要在 Trident 中执行任何一个函数、过滤器、聚合、GroupBy、联接和合并操作。最后,我们将使用LocalDRPC类启动 DRPC 服务器,并使用LocalDRPC类的execute方法搜索某些关键字。

格式化呼叫信息

FormatCall 类的目的是格式化包含“呼叫者号码”和“接收者号码”的呼叫信息。完整的程序代码如下:

编码:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit 类的目的是根据“逗号 (,)”分割输入字符串并发出字符串中的每个单词。此函数用于解析分布式查询的输入参数。完整代码如下:

编码:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日志分析器

这是主应用程序。最初,应用程序将初始化 TridentTopology 并使用FeederBatchSpout提供呼叫者信息。可以使用 TridentTopology 类的newStream方法创建 Trident topology 流。类似地,可以使用 TridentTopology 类的newDRCPStream方法创建 Trident topology DRPC 流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。LocalDRPC具有 execute 方法来搜索某些关键字。完整代码如下所示。

编码:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

构建和运行应用程序

完整的应用程序包含三个 Java 代码。它们如下:

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下命令构建应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令运行应用程序:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

应用程序启动后,应用程序将输出有关集群启动过程、操作处理、DRPC 服务器和客户端信息以及最终集群关闭过程的完整详细信息。此输出将显示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends
广告