- MapReduce 教程
- MapReduce - 首页
- MapReduce - 简介
- MapReduce - 算法
- MapReduce - 安装
- MapReduce - API
- MapReduce - Hadoop 实现
- MapReduce - 分区器
- MapReduce - 组合器
- MapReduce - Hadoop 管理
- MapReduce 资源
- MapReduce - 快速指南
- MapReduce - 有用资源
- MapReduce - 讨论
MapReduce - 组合器
组合器,也称为**半归约器**,是一个可选的类,它通过接受来自 Map 类的输入,然后将输出键值对传递给 Reducer 类来运行。
组合器的主要功能是汇总具有相同键的 map 输出记录。组合器的输出(键值集合)将作为输入通过网络发送到实际的 Reducer 任务。
组合器
组合器类用于 Map 类和 Reduce 类之间,以减少 Map 和 Reduce 之间的数据传输量。通常,map 任务的输出很大,传输到 reduce 任务的数据量很大。
下面的 MapReduce 任务图显示了组合器阶段。
组合器如何工作?
以下是关于 MapReduce 组合器如何工作的简要总结:
组合器没有预定义的接口,它必须实现 Reducer 接口的 reduce() 方法。
组合器对每个 map 输出键进行操作。它必须与 Reducer 类具有相同的输出键值类型。
组合器可以从大型数据集中生成摘要信息,因为它替换了原始的 Map 输出。
虽然组合器是可选的,但它有助于将数据分成多个组以用于 Reduce 阶段,这使得处理更容易。
MapReduce 组合器实现
以下示例提供了关于组合器的理论概念。让我们假设我们有以下名为**input.txt** 的输入文本文件用于 MapReduce。
What do you mean by Object What do you know about Java What is Java Virtual Machine How Java enabled High Performance
下面讨论了带有组合器的 MapReduce 程序的重要阶段。
记录读取器
这是 MapReduce 的第一阶段,其中记录读取器逐行读取输入文本文件中的文本,并产生键值对作为输出。
**输入** - 来自输入文件的逐行文本。
**输出** - 形成键值对。以下是预期的键值对集。
<1, What do you mean by Object> <2, What do you know about Java> <3, What is Java Virtual Machine> <4, How Java enabled High Performance>
Map 阶段
Map 阶段从记录读取器接收输入,对其进行处理,并产生另一组键值对作为输出。
**输入** - 来自记录读取器的以下键值对是输入。
<1, What do you mean by Object> <2, What do you know about Java> <3, What is Java Virtual Machine> <4, How Java enabled High Performance>
Map 阶段读取每个键值对,使用 StringTokenizer 将值中的每个单词分开,将每个单词视为键,并将该单词的计数作为值。以下代码片段显示了 Mapper 类和 map 函数。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
**输出** - 预期输出如下:
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1> <What,1> <do,1> <you,1> <know,1> <about,1> <Java,1> <What,1> <is,1> <Java,1> <Virtual,1> <Machine,1> <How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
组合器阶段
组合器阶段接收来自 Map 阶段的每个键值对,对其进行处理,并产生**键值集合**对作为输出。
**输入** - 来自 Map 阶段的以下键值对是输入。
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1> <What,1> <do,1> <you,1> <know,1> <about,1> <Java,1> <What,1> <is,1> <Java,1> <Virtual,1> <Machine,1> <How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
组合器阶段读取每个键值对,将公共单词组合为键,将值组合为集合。通常,组合器的代码和操作与 Reducer 的代码和操作类似。以下是 Mapper、Combiner 和 Reducer 类声明的代码片段。
job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class);
**输出** - 预期输出如下:
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1> <know,1> <about,1> <Java,1,1,1> <is,1> <Virtual,1> <Machine,1> <How,1> <enabled,1> <High,1> <Performance,1>
Reducer 阶段
Reducer 阶段接收来自组合器阶段的每个键值集合对,对其进行处理,并将输出作为键值对传递。请注意,组合器的功能与 Reducer 相同。
**输入** - 来自组合器阶段的以下键值对是输入。
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1> <know,1> <about,1> <Java,1,1,1> <is,1> <Virtual,1> <Machine,1> <How,1> <enabled,1> <High,1> <Performance,1>
Reducer 阶段读取每个键值对。以下是组合器的代码片段。
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
**输出** - Reducer 阶段的预期输出如下:
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1> <know,1> <about,1> <Java,3> <is,1> <Virtual,1> <Machine,1> <How,1> <enabled,1> <High,1> <Performance,1>
记录写入器
这是 MapReduce 的最后阶段,其中记录写入器写入 Reducer 阶段的每个键值对,并将输出作为文本发送。
**输入** - 来自 Reducer 阶段的每个键值对以及输出格式。
**输出** - 它以文本格式提供键值对。以下是预期输出。
What 3 do 2 you 2 mean 1 by 1 Object 1 know 1 about 1 Java 3 is 1 Virtual 1 Machine 1 How 1 enabled 1 High 1 Performance 1
示例程序
以下代码块计算程序中的单词数。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
将上述程序保存为**WordCount.java**。程序的编译和执行如下所示。
编译和执行
让我们假设我们位于 Hadoop 用户的主目录(例如,/home/hadoop)。
按照以下步骤编译和执行上述程序。
**步骤 1** - 使用以下命令创建一个目录来存储已编译的 Java 类。
$ mkdir units
**步骤 2** - 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从mvnrepository.com下载该 jar 包。
让我们假设下载的文件夹是 /home/hadoop/。
**步骤 3** - 使用以下命令编译**WordCount.java**程序并为程序创建一个 jar 包。
$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java $ jar -cvf units.jar -C units/ .
**步骤 4** - 使用以下命令在 HDFS 中创建一个输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
**步骤 5** - 使用以下命令将名为**input.txt** 的输入文件复制到 HDFS 的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir
**步骤 6** - 使用以下命令验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
**步骤 7** - 使用以下命令运行 Word count 应用程序,并从输入目录中获取输入文件。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间直到文件执行完毕。执行后,输出包含许多输入拆分、Map 任务和 Reducer 任务。
**步骤 8** - 使用以下命令验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
**步骤 9** - 使用以下命令查看**Part-00000**文件中的输出。此文件由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是 MapReduce 程序生成的输出。
What 3 do 2 you 2 mean 1 by 1 Object 1 know 1 about 1 Java 3 is 1 Virtual 1 Machine 1 How 1 enabled 1 High 1 Performance 1