Apache Flume 快速指南



Apache Flume - 简介

什么是 Flume?

Apache Flume 是一种用于收集、聚合和传输大量流式数据的工具/服务/数据摄取机制,例如日志文件、事件(等等)从各种来源到一个集中的数据存储。

Flume 是一种高度可靠、分布式且可配置的工具。它主要设计用于将流式数据(日志数据)从各种 Web 服务器复制到 HDFS。

Apache Flume

Flume 的应用

假设一个电子商务 Web 应用程序想要分析某个特定区域的客户行为。为此,他们需要将可用的日志数据移动到 Hadoop 中进行分析。在这里,Apache Flume 就派上用场了。

Flume 用于以更高的速度将应用程序服务器生成的日志数据移动到 HDFS 中。

Flume 的优势

以下是使用 Flume 的优势:

  • 使用 Apache Flume,我们可以将数据存储到任何集中的存储中(HBase、HDFS)。

  • 当传入数据的速率超过写入目标数据的速率时,Flume 充当数据生产者和集中式存储之间的中介,并在它们之间提供稳定的数据流。

  • Flume 提供了**上下文路由**的功能。

  • Flume 中的事务是基于通道的,其中为每条消息维护两个事务(一个发送者和一个接收者)。它保证可靠的消息传递。

  • Flume 可靠、容错、可扩展、可管理且可定制。

Flume 的特性

Flume 的一些显著特性如下:

  • Flume 有效地将来自多个 Web 服务器的日志数据摄取到集中式存储(HDFS、HBase)中。

  • 使用 Flume,我们可以立即将来自多个服务器的数据获取到 Hadoop 中。

  • 除了日志文件外,Flume 还用于导入社交网络网站(如 Facebook 和 Twitter)以及电子商务网站(如 Amazon 和 Flipkart)生成的巨量事件数据。

  • Flume 支持大量源和目标类型。

  • Flume 支持多跳流、扇入扇出流、上下文路由等。

  • Flume 可以水平扩展。

Apache Flume - Hadoop 中的数据传输

众所周知,**大数据**是指无法使用传统计算技术处理的大型数据集的集合。分析大数据可以得出有价值的结果。**Hadoop** 是一个开源框架,它允许使用简单的编程模型在计算机集群的分布式环境中存储和处理大数据。

流式/日志数据

通常,大部分要分析的数据将由各种数据源生成,例如应用程序服务器、社交网络站点、云服务器和企业服务器。这些数据将以**日志文件**和**事件**的形式存在。

**日志文件** - 通常,日志文件是一个列出操作系统中发生的事件/操作的文件。例如,Web 服务器在日志文件中列出对服务器发出的每个请求。

在收集此类日志数据后,我们可以获取有关以下方面的信息:

  • 应用程序性能并定位各种软件和硬件故障。
  • 用户行为并获得更好的业务洞察。

将数据传输到 HDFS 系统的传统方法是使用**put**命令。让我们看看如何使用**put**命令。

HDFS put 命令

处理日志数据的主要挑战在于将这些由多个服务器生成的日志移动到 Hadoop 环境中。

Hadoop**文件系统 Shell**提供命令将数据插入 Hadoop 并从中读取。您可以使用如下所示的**put**命令将数据插入 Hadoop。

$ Hadoop fs –put /path of the required file  /path in HDFS where to save the file 

put 命令的问题

我们可以使用 Hadoop 的**put**命令将数据从这些源传输到 HDFS。但是,它存在以下缺点:

  • 使用**put**命令,我们**一次只能传输一个文件**,而数据生成器生成数据的速度要快得多。由于对旧数据的分析准确性较低,因此我们需要找到一种解决方案来实时传输数据。

  • 如果我们使用**put**命令,则需要打包数据并准备好上传。由于 Web 服务器持续生成数据,因此这是一项非常困难的任务。

我们需要的是一种解决方案,它可以克服**put**命令的缺点,并将“流式数据”从数据生成器传输到集中式存储(尤其是 HDFS)中,延迟更小。

HDFS 的问题

在 HDFS 中,文件以目录条目形式存在,并且在关闭文件之前,文件的长度将被视为零。例如,如果源正在将数据写入 HDFS 并且操作过程中网络中断(在未关闭文件的情况下),则写入文件中的数据将丢失。

因此,我们需要一个可靠、可配置且可维护的系统来将日志数据传输到 HDFS 中。

**注意** - 在 POSIX 文件系统中,每当我们访问文件(例如执行写操作)时,其他程序仍然可以读取此文件(至少可以读取文件的已保存部分)。这是因为文件在关闭之前就存在于磁盘上。

可用的解决方案

为了将流式数据(日志文件、事件等)从各种来源发送到 HDFS,我们可以使用以下工具:

Facebook 的 Scribe

Scribe 是一款非常流行的工具,用于聚合和流式传输日志数据。它旨在扩展到大量节点,并对网络和节点故障具有鲁棒性。

Apache Kafka

Kafka 由 Apache 软件基金会开发。它是一个开源消息代理。使用 Kafka,我们可以处理高吞吐量和低延迟的馈送。

Apache Flume

Apache Flume 是一种用于收集、聚合和传输大量流式数据的工具/服务/数据摄取机制,例如日志数据、事件(等等)从各种 Web 服务器到一个集中的数据存储。

它是一个高度可靠、分布式且可配置的工具,主要设计用于将流式数据从各种来源传输到 HDFS。

在本教程中,我们将详细讨论如何使用 Flume 以及一些示例。

Apache Flume - 架构

下图描述了 Flume 的基本架构。如图所示,**数据生成器**(例如 Facebook、Twitter)生成数据,这些数据由在其上运行的各个 Flume**代理**收集。此后,**数据收集器**(它也是一个代理)从代理收集数据,这些数据会被聚合并推送到集中式存储(例如 HDFS 或 HBase)中。

Flume Architecture

Flume 事件

**事件**是**Flume**内部传输的数据的基本单位。它包含一个要从源传输到目标的字节数组有效负载,并附带可选的标头。典型的 Flume 事件将具有以下结构:

Flume Event

Flume 代理

**代理**是 Flume 中一个独立的守护进程(JVM)。它接收来自客户端或其他代理的数据(事件),并将其转发到其下一个目标(接收器或代理)。Flume 可能有多个代理。下图表示一个**Flume 代理**

Flume Agent

如图所示,Flume 代理包含三个主要组件,即**源**、**通道**和**接收器**。

**源**是代理的一个组件,它接收来自数据生成器的数据,并以 Flume 事件的形式将其传输到一个或多个通道。

Apache Flume 支持多种类型的源,每个源都接收来自指定数据生成器的事件。

**示例** - Avro 源、Thrift 源、Twitter 1% 源等。

通道

**通道**是一个瞬态存储,它接收来自源的事件,并在接收器使用它们之前缓冲它们。它充当源和接收器之间的桥梁。

这些通道是完全事务性的,它们可以与任意数量的源和接收器一起工作。

**示例** - JDBC 通道、文件系统通道、内存通道等。

接收器

**接收器**将数据存储到集中式存储(如 HBase 和 HDFS)中。它使用通道中的数据(事件),并将其传递到目标。接收器的目标可能是另一个代理或中央存储。

**示例** - HDFS 接收器

**注意** - Flume 代理可以有多个源、接收器和通道。我们在本教程的 Flume 配置章节中列出了所有支持的源、接收器和通道。

Flume 代理的其他组件

我们上面讨论的是代理的基本组件。除此之外,我们还有几个组件在将事件从数据生成器传输到集中式存储中发挥着至关重要的作用。

拦截器

拦截器用于更改/检查在源和通道之间传输的 Flume 事件。

通道选择器

这些用于确定在有多个通道的情况下选择哪个通道来传输数据。通道选择器有两种类型:

  • **默认通道选择器** - 这些也称为复制通道选择器,它们会复制每个通道中的所有事件。

  • **多路复用通道选择器** - 这些根据该事件标头中的地址决定要发送事件的通道。

接收器处理器

这些用于从选定的接收器组中调用特定接收器。这些用于为接收器创建故障转移路径或跨多个接收器从通道中负载均衡事件。

Apache Flume - 数据流

Flume是一个用于将日志数据移动到HDFS的框架。通常,日志事件和日志数据由日志服务器生成,这些服务器上运行着Flume代理。这些代理从数据生成器接收数据。

这些代理中的数据将由一个称为**收集器**(Collector)的中间节点收集。与代理类似,Flume中可以有多个收集器。

最后,来自所有这些收集器的数据将被聚合并推送到一个集中式存储中,例如HBase或HDFS。下图解释了Flume中的数据流。

Flume DataFlow

多跳流

在Flume中,可以有多个代理,并且在到达最终目的地之前,事件可能会经过多个代理。这被称为**多跳流**(multi-hop flow)。

扇出流

从一个源到多个通道的数据流称为**扇出流**(fan-out flow)。它分为两种类型:

  • **复制**(Replicating) - 数据流将数据复制到所有配置的通道中。

  • **多路复用**(Multiplexing) - 数据流将数据发送到事件头中指定的选定通道。

扇入流

数据流从多个源传输到一个通道称为**扇入流**(fan-in flow)。

故障处理

在Flume中,对于每个事件,都会发生两个事务:一个在发送方,一个在接收方。发送方将事件发送到接收方。接收方在收到数据后立即提交其自身的事务,并向发送方发送“已收到”信号。发送方在收到信号后提交其事务。(发送方在收到接收方的信号之前不会提交其事务。)

Apache Flume - 环境

我们在上一章中已经讨论了Flume的架构。在本章中,让我们看看如何下载和设置Apache Flume。

在继续操作之前,您需要在系统中拥有Java环境。因此,首先,请确保您的系统中已安装Java。在本教程中的一些示例中,我们使用了Hadoop HDFS(作为接收器)。因此,我们建议您安装Hadoop以及Java。要收集更多信息,请访问以下链接:https://tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm

安装Flume

首先,从网站 https://flume.apache.org/下载最新版本的Apache Flume软件。

步骤1

打开网站。点击主页左侧的**下载**链接。它将带您到Apache Flume的下载页面。

Installing Flume

步骤2

在下载页面中,您可以看到Apache Flume的二进制文件和源文件的链接。点击链接apache-flume-1.6.0-bin.tar.gz

您将被重定向到一个镜像列表,您可以在其中点击任意一个镜像开始下载。同样,您可以通过点击apache-flume-1.6.0-src.tar.gz下载Apache Flume的源代码。

步骤3

在与HadoopHBase和其他软件的安装目录相同的目录中创建一个名为Flume的目录(如果您已安装任何软件),如下所示。

$ mkdir Flume 

步骤4

解压缩下载的tar文件,如下所示。

$ cd Downloads/ 
$ tar zxvf apache-flume-1.6.0-bin.tar.gz  
$ tar zxvf apache-flume-1.6.0-src.tar.gz

步骤5

将apache-flume-1.6.0-bin.tar文件的內容移动到之前创建的Flume目录中,如下所示。(假设我们在名为Hadoop的本地用户中创建了Flume目录。)

$ mv apache-flume-1.6.0-bin.tar/* /home/Hadoop/Flume/

配置Flume

要配置Flume,我们必须修改三个文件,即flume-env.sh、flumeconf.propertiesbash.rc

设置路径/类路径

.bashrc文件中,设置Flume的主文件夹、路径和类路径,如下所示。

setting the path

conf文件夹

如果您打开Apache Flume的conf文件夹,您将看到以下四个文件:

  • flume-conf.properties.template,
  • flume-env.sh.template,
  • flume-env.ps1.template, 和
  • log4j.properties.
conf Folder

现在重命名

  • flume-conf.properties.template文件为flume-conf.properties,以及

  • flume-env.sh.templateflume-env.sh

flume-env.sh

打开flume-env.sh文件并将JAVA_Home设置为Java在系统中安装的文件夹。

flume-env.sh

验证安装

通过浏览bin文件夹并键入以下命令来验证Apache Flume的安装。

$ ./flume-ng 

如果成功安装了Flume,您将获得Flume的帮助提示,如下所示。

Verifying the Installation

Apache Flume - 配置

安装Flume后,我们需要使用配置文件对其进行配置,该配置文件是一个包含**键值对**的Java属性文件。我们需要将值传递给文件中的键。

在Flume配置文件中,我们需要:

  • 命名当前代理的组件。
  • 描述/配置源。
  • 描述/配置接收器。
  • 描述/配置通道。
  • 将源和接收器绑定到通道。

通常,我们可以在Flume中有多个代理。我们可以使用唯一的名称来区分每个代理。并使用此名称,我们必须配置每个代理。

命名组件

首先,您需要命名/列出代理的组件,例如源、接收器和通道,如下所示。

agent_name.sources = source_name 
agent_name.sinks = sink_name 
agent_name.channels = channel_name 

Flume支持各种源、接收器和通道。它们列在下表中。

通道 接收器
  • Avro源
  • Thrift源
  • Exec源
  • JMS源
  • 暂存目录源
  • Twitter 1% firehose源
  • Kafka源
  • NetCat源
  • 序列生成器源
  • Syslog源
  • Syslog TCP源
  • 多端口Syslog TCP源
  • Syslog UDP源
  • HTTP源
  • 压力源
  • 旧版源
  • Thrift旧版源
  • 自定义源
  • Scribe源
  • 内存通道
  • JDBC通道
  • Kafka通道
  • 文件通道
  • 可溢出内存通道
  • 伪事务通道
  • HDFS接收器
  • Hive接收器
  • 日志接收器
  • Avro接收器
  • Thrift接收器
  • IRC接收器
  • 文件滚动接收器
  • 空接收器
  • HBase接收器
  • 异步HBase接收器
  • MorphlineSolr接收器
  • ElasticSearch接收器
  • Kite数据集接收器
  • Kafka接收器

您可以使用其中任何一个。例如,如果您使用Twitter源通过内存通道将Twitter数据传输到HDFS接收器,并且代理名称为TwitterAgent,则

TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS 

列出代理的组件后,您必须通过为其属性提供值来描述源、接收器和通道。

描述源

每个源将具有单独的属性列表。名为“type”的属性对每个源都是通用的,它用于指定我们正在使用的源的类型。

除了属性“type”之外,还需要提供特定源所有**必需**属性的值来对其进行配置,如下所示。

agent_name.sources. source_name.type = value 
agent_name.sources. source_name.property2 = value 
agent_name.sources. source_name.property3 = value 

例如,如果我们考虑twitter源,以下是我们必须提供值才能对其进行配置的属性。

TwitterAgent.sources.Twitter.type = Twitter (type name) 
TwitterAgent.sources.Twitter.consumerKey =  
TwitterAgent.sources.Twitter.consumerSecret = 
TwitterAgent.sources.Twitter.accessToken =   
TwitterAgent.sources.Twitter.accessTokenSecret = 

描述接收器

与源一样,每个接收器将具有单独的属性列表。名为“type”的属性对每个接收器都是通用的,它用于指定我们正在使用的接收器的类型。除了属性“type”之外,还需要提供特定接收器所有**必需**属性的值来对其进行配置,如下所示。

agent_name.sinks. sink_name.type = value 
agent_name.sinks. sink_name.property2 = value 
agent_name.sinks. sink_name.property3 = value

例如,如果我们考虑HDFS接收器,以下是我们必须提供值才能对其进行配置的属性。

TwitterAgent.sinks.HDFS.type = hdfs (type name)  
TwitterAgent.sinks.HDFS.hdfs.path = HDFS directory’s Path to store the data

描述通道

Flume提供各种通道来在源和接收器之间传输数据。因此,除了源和通道之外,还需要描述代理中使用的通道。

要描述每个通道,您需要设置所需的属性,如下所示。

agent_name.channels.channel_name.type = value 
agent_name.channels.channel_name. property2 = value 
agent_name.channels.channel_name. property3 = value 

例如,如果我们考虑内存通道,以下是我们必须提供值才能对其进行配置的属性。

TwitterAgent.channels.MemChannel.type = memory (type name)

将源和接收器绑定到通道

由于通道连接源和接收器,因此需要将两者都绑定到通道,如下所示。

agent_name.sources.source_name.channels = channel_name 
agent_name.sinks.sink_name.channels = channel_name 

以下示例显示了如何将源和接收器绑定到通道。这里,我们考虑twitter源、内存通道HDFS接收器

TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channels = MemChannel 

启动Flume代理

配置完成后,我们必须启动Flume代理。操作如下:

$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent 

其中:

  • agent - 启动Flume代理的命令

  • --conf ,-c<conf> - 使用conf目录中的配置文件

  • -f<file> - 指定配置文件路径,如果缺少

  • --name, -n <name> - twitter代理的名称

  • -D property =value - 设置Java系统属性值。

Apache Flume - 获取 Twitter 数据

使用Flume,我们可以从各种服务中获取数据并将其传输到集中式存储(HDFS和HBase)。本章说明如何使用Apache Flume从Twitter服务获取数据并将其存储在HDFS中。

如Flume架构中所述,Web服务器生成日志数据,并且Flume中的代理收集此数据。通道将此数据缓冲到接收器,接收器最终将其推送到集中式存储。

在本例中提供的示例中,我们将创建一个应用程序并使用Apache Flume提供的实验性Twitter源获取其中的推文。我们将使用内存通道缓冲这些推文,并使用HDFS接收器将这些推文推送到HDFS中。

Fetch Data

要获取Twitter数据,我们将必须按照以下步骤操作:

  • 创建Twitter应用程序
  • 安装/启动HDFS
  • 配置Flume

创建Twitter应用程序

为了从Twitter获取推文,需要创建一个Twitter应用程序。按照以下步骤创建Twitter应用程序。

步骤1

要创建一个 Twitter 应用程序,请点击以下链接 https://apps.twitter.com/。登录您的 Twitter 账户。您将看到一个 Twitter 应用程序管理窗口,您可以在其中创建、删除和管理 Twitter 应用程序。

Application Management window

步骤2

点击创建新的应用按钮。您将被重定向到一个窗口,其中包含一个应用程序表单,您需要填写您的详细信息以创建应用程序。填写网站地址时,请提供完整的 URL 模式,例如,http://example.com。

Create an Application

步骤3

填写详细信息,完成后接受开发者协议,点击页面底部的创建您的 Twitter 应用程序按钮。如果一切顺利,将根据提供的详细信息创建一个应用程序,如下所示。

Application created

步骤4

在页面底部的密钥和访问令牌选项卡下,您可以看到一个名为创建我的访问令牌的按钮。点击它以生成访问令牌。

Key Access Tokens

步骤5

最后,点击页面右上角的测试 OAuth按钮。这将导致跳转到一个页面,该页面显示您的消费者密钥、消费者密钥秘密、访问令牌访问令牌密钥。复制这些详细信息。这些信息用于在 Flume 中配置代理。

OAuth Tool

启动 HDFS

由于我们将数据存储在 HDFS 中,因此我们需要安装/验证 Hadoop。启动 Hadoop 并在其中创建一个文件夹来存储 Flume 数据。在配置 Flume 之前,请按照以下步骤操作。

步骤 1:安装/验证 Hadoop

安装 Hadoop。如果您的系统中已经安装了 Hadoop,请使用 Hadoop 版本命令验证安装,如下所示。

$ hadoop version 

如果您的系统包含 Hadoop,并且您已设置路径变量,则您将获得以下输出:

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

步骤 2:启动 Hadoop

浏览 Hadoop 的sbin目录并启动 yarn 和 Hadoop dfs(分布式文件系统),如下所示。

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
  
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out 

步骤 3:在 HDFS 中创建目录

在 Hadoop DFS 中,您可以使用mkdir命令创建目录。浏览它并在所需的路径中创建一个名为twitter_data的目录,如下所示。

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://127.0.0.1:9000/user/Hadoop/twitter_data 

配置Flume

我们必须使用conf文件夹中的配置文件配置源、通道和接收器。本章提供的示例使用 Apache Flume 提供的一个实验性源,名为Twitter 1% Firehose内存通道和 HDFS 接收器。

Twitter 1% Firehose 源

此源处于高度实验阶段。它使用流式 API 连接到 Twitter 1% 样本 Firehose,并持续下载推文,将其转换为 Avro 格式,并将 Avro 事件发送到下游 Flume 接收器。

安装 Flume 后,我们将默认获得此源。与该源对应的jar文件位于lib文件夹中,如下所示。

Twitter Jar Files

设置类路径

classpath变量设置为Flume-env.sh文件中的 Flume 的lib文件夹,如下所示。

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/* 

此源需要 Twitter 应用程序的详细信息,例如消费者密钥、消费者密钥秘密、访问令牌访问令牌密钥。配置此源时,您必须为以下属性提供值:

  • 通道

  • 源类型:org.apache.flume.source.twitter.TwitterSource

  • consumerKey - OAuth 消费者密钥

  • consumerSecret - OAuth 消费者密钥秘密

  • accessToken - OAuth 访问令牌

  • accessTokenSecret - OAuth 令牌密钥

  • maxBatchSize - Twitter 批处理中应包含的最大 Twitter 消息数。默认值为 1000(可选)。

  • maxBatchDurationMillis - 关闭批处理之前要等待的最大毫秒数。默认值为 1000(可选)。

通道

我们正在使用内存通道。要配置内存通道,您必须为通道的类型提供值。

  • type - 它保存通道的类型。在我们的示例中,类型为MemChannel

  • Capacity - 它是通道中存储的事件的最大数量。其默认值为 100(可选)。

  • TransactionCapacity - 它是通道接受或发送的事件的最大数量。其默认值为 100(可选)。

HDFS接收器

此接收器将数据写入 HDFS。要配置此接收器,您必须提供以下详细信息。

  • 通道

  • type - hdfs

  • hdfs.path - HDFS 中要存储数据的目录的路径。

并且我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

  • fileType - 这是我们 HDFS 文件所需的格式。SequenceFile、DataStreamCompressedStream是此流可用的三种类型。在我们的示例中,我们使用DataStream

  • writeFormat - 可以是文本或可写。

  • batchSize - 它是写入文件中的事件数,然后将其刷新到 HDFS 中。其默认值为 100。

  • rollsize - 它是触发滚动操作的文件大小。其默认值为 100。

  • rollCount - 它是写入文件中的事件数,然后将其滚动。其默认值为 10。

示例 - 配置文件

以下是配置文件的一个示例。复制此内容并将其保存为 Flume 的 conf 文件夹中的twitter.conf

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
  
# Describing/Configuring the sink 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://127.0.0.1:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
 
# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel 

执行

浏览 Flume 主目录并执行应用程序,如下所示。

$ cd $FLUME_HOME 
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切顺利,推文流式传输到 HDFS 将开始。以下是在获取推文时命令提示符窗口的快照。

Fetching Tweets

验证 HDFS

您可以使用以下 URL 访问 Hadoop 管理 Web UI。

https://127.0.0.1:50070/ 

点击页面右侧名为实用程序的下拉菜单。您可以看到两个选项,如下面的快照所示。

Verifying HDFS

点击浏览文件系统并输入您存储推文的 HDFS 目录的路径。在我们的示例中,路径将为/user/Hadoop/twitter_data/。然后,您可以看到存储在 HDFS 中的 Twitter 日志文件列表,如下所示。

Browse the file system

Apache Flume - 序列生成器源

在上一章中,我们已经了解了如何将数据从 Twitter 源获取到 HDFS。本章介绍如何从序列生成器获取数据。

先决条件

要运行本章提供的示例,您需要安装HDFS以及Flume。因此,在继续之前,请验证 Hadoop 安装并启动 HDFS。(请参阅上一章以了解如何启动 HDFS)。

配置Flume

我们必须使用conf文件夹中的配置文件配置源、通道和接收器。本章提供的示例使用序列生成器源、内存通道HDFS 接收器

序列生成器源

它是持续生成事件的源。它维护一个从 0 开始并递增 1 的计数器。它用于测试目的。配置此源时,您必须为以下属性提供值:

  • 通道

  • 源类型 - seq

通道

我们正在使用内存通道。要配置内存通道,您必须为通道的类型提供值。以下是配置内存通道时需要提供的属性列表:

  • type - 它保存通道的类型。在我们的示例中,类型为 MemChannel。

  • Capacity - 它是通道中存储的事件的最大数量。其默认值为 100。(可选)

  • TransactionCapacity - 它是通道接受或发送的事件的最大数量。其默认值为 100。(可选)

HDFS接收器

此接收器将数据写入 HDFS。要配置此接收器,您必须提供以下详细信息。

  • 通道

  • type - hdfs

  • hdfs.path - HDFS 中要存储数据的目录的路径。

并且我们可以根据场景提供一些可选值。以下是我们在应用程序中配置的 HDFS 接收器的可选属性。

  • fileType - 这是我们 HDFS 文件所需的格式。SequenceFile、DataStreamCompressedStream是此流可用的三种类型。在我们的示例中,我们使用DataStream

  • writeFormat - 可以是文本或可写。

  • batchSize - 它是写入文件中的事件数,然后将其刷新到 HDFS 中。其默认值为 100。

  • rollsize - 它是触发滚动操作的文件大小。其默认值为 100。

  • rollCount - 它是写入文件中的事件数,然后将其滚动。其默认值为 10。

示例 - 配置文件

以下是配置文件的一个示例。复制此内容并将其保存为 Flume 的 conf 文件夹中的seq_gen .conf

# Naming the components on the current agent 

SeqGenAgent.sources = SeqSource   
SeqGenAgent.channels = MemChannel 
SeqGenAgent.sinks = HDFS 
 
# Describing/Configuring the source 
SeqGenAgent.sources.SeqSource.type = seq
  
# Describing/Configuring the sink
SeqGenAgent.sinks.HDFS.type = hdfs 
SeqGenAgent.sinks.HDFS.hdfs.path = hdfs://127.0.0.1:9000/user/Hadoop/seqgen_data/
SeqGenAgent.sinks.HDFS.hdfs.filePrefix = log 
SeqGenAgent.sinks.HDFS.hdfs.rollInterval = 0
SeqGenAgent.sinks.HDFS.hdfs.rollCount = 10000
SeqGenAgent.sinks.HDFS.hdfs.fileType = DataStream 
 
# Describing/Configuring the channel 
SeqGenAgent.channels.MemChannel.type = memory 
SeqGenAgent.channels.MemChannel.capacity = 1000 
SeqGenAgent.channels.MemChannel.transactionCapacity = 100 
 
# Binding the source and sink to the channel 
SeqGenAgent.sources.SeqSource.channels = MemChannel
SeqGenAgent.sinks.HDFS.channel = MemChannel 

执行

浏览 Flume 主目录并执行应用程序,如下所示。

$ cd $FLUME_HOME 
$./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/seq_gen.conf 
   --name SeqGenAgent 

如果一切顺利,源将开始生成序列号,这些序列号将以日志文件形式推送到 HDFS 中。

以下是命令提示符窗口的快照,该窗口将序列生成器生成的数据获取到 HDFS 中。

Data Generated

验证 HDFS

您可以使用以下 URL 访问 Hadoop 管理 Web UI:

https://127.0.0.1:50070/

点击页面右侧名为实用程序的下拉菜单。您可以看到两个选项,如下面的图表所示。

Verifying the HDFS

点击浏览文件系统并输入您存储序列生成器生成数据的 HDFS 目录的路径。

在我们的示例中,路径将为/user/Hadoop/ seqgen_data /。然后,您可以看到序列生成器生成的日志文件列表,存储在 HDFS 中,如下所示。

Browse the file system

验证文件内容

所有这些日志文件都包含按顺序排列的数字。您可以使用cat命令在文件系统中验证这些文件的内容,如下所示。

Verifying the Contents of the File

Apache Flume - NetCat 源

本章以一个示例来说明如何生成事件并随后将其记录到控制台。为此,我们使用NetCat源和logger接收器。

先决条件

要运行本章提供的示例,您需要安装Flume

配置Flume

我们必须使用conf文件夹中的配置文件配置源、通道和接收器。本章提供的示例使用NetCat 源、内存通道logger 接收器

NetCat源

配置 NetCat 源时,我们必须在配置源时指定端口。现在源(NetCat 源)侦听给定端口,并将我们在该端口中输入的每一行作为单个事件接收,并通过指定的通道将其传输到接收器。

配置此源时,您必须为以下属性提供值:

  • 通道

  • 源类型 - netcat

  • bind - 要绑定的主机名或 IP 地址。

  • port - 我们希望源侦听的端口号。

通道

我们正在使用内存通道。要配置内存通道,您必须为通道的类型提供值。以下是配置内存通道时需要提供的属性列表:

  • type - 它保存通道的类型。在我们的示例中,类型为MemChannel

  • Capacity - 它是通道中存储的事件的最大数量。其默认值为 100。(可选)

  • TransactionCapacity − 它表示通道接受或发送事件的最大数量。其默认值为 100。(可选)。

日志接收器

此接收器会记录传递给它的所有事件。通常,它用于测试或调试目的。要配置此接收器,您必须提供以下详细信息。

  • 通道

  • type − logger

示例配置文件

下面给出一个配置文件示例。复制此内容并将其另存为 Flume 的 conf 文件夹中的 netcat.conf

# Naming the components on the current agent
NetcatAgent.sources = Netcat   
NetcatAgent.channels = MemChannel 
NetcatAgent.sinks = LoggerSink  

# Describing/Configuring the source 
NetcatAgent.sources.Netcat.type = netcat 
NetcatAgent.sources.Netcat.bind = localhost
NetcatAgent.sources.Netcat.port = 56565  

# Describing/Configuring the sink 
NetcatAgent.sinks.LoggerSink.type = logger  

# Describing/Configuring the channel 
NetcatAgent.channels.MemChannel.type = memory 
NetcatAgent.channels.MemChannel.capacity = 1000 
NetcatAgent.channels.MemChannel.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
NetcatAgent.sources.Netcat.channels = MemChannel
NetcatAgent.sinks. LoggerSink.channel = MemChannel

执行

浏览 Flume 主目录并执行应用程序,如下所示。

$ cd $FLUME_HOME
$ ./bin/flume-ng agent --conf $FLUME_CONF --conf-file $FLUME_CONF/netcat.conf 
   --name NetcatAgent -Dflume.root.logger=INFO,console

如果一切正常,源将开始监听给定的端口。在本例中,它是 56565。下面是已启动并监听端口 56565 的 NetCat 源的命令提示符窗口的快照。

Execution

将数据传递到源

要将数据传递到 NetCat 源,您必须打开配置文件中给出的端口。打开一个单独的终端并使用 curl 命令连接到源 (56565)。连接成功后,您将收到一条“connected”消息,如下所示。

$ curl telnet://127.0.0.1:56565 
connected 

现在您可以逐行输入您的数据(每行之后,您必须按 Enter 键)。NetCat 源将每一行作为单个事件接收,您将收到一条“OK”的消息。

完成数据传递后,您可以按 (Ctrl+C) 退出控制台。下面是使用 curl 命令连接到源的控制台快照。

Passing Data

在上述控制台中输入的每一行都将被源作为单个事件接收。由于我们使用了 Logger 接收器,因此这些事件将通过指定的通道(在本例中为内存通道)记录到控制台(源控制台)。

以下快照显示了记录事件的 NetCat 控制台。

NetCat console
广告