- MapReduce 教程
- MapReduce - 首页
- MapReduce - 简介
- MapReduce - 算法
- MapReduce - 安装
- MapReduce - API
- MapReduce - Hadoop 实现
- MapReduce - 分区器
- MapReduce - 合并器
- MapReduce - Hadoop 管理
- MapReduce 资源
- MapReduce - 快速指南
- MapReduce - 有用资源
- MapReduce - 讨论
MapReduce - 分区器
分区器在处理输入数据集时就像一个条件。分区阶段发生在 Map 阶段之后,Reduce 阶段之前。
分区器的数量等于 Reducer 的数量。这意味着分区器将根据 Reducer 的数量划分数据。因此,来自单个分区器的数据由单个 Reducer 处理。
分区器
分区器对中间 Map 输出的键值对进行分区。它使用用户定义的条件对数据进行分区,该条件就像一个哈希函数。分区的总数与作业的 Reducer 任务数相同。让我们举一个例子来了解分区器是如何工作的。
MapReduce 分区器实现
为了方便起见,让我们假设我们有一个名为 Employee 的小表,其中包含以下数据。我们将使用此示例数据作为我们的输入数据集来演示分区器的工作原理。
| ID | 姓名 | 年龄 | 性别 | 薪资 |
|---|---|---|---|---|
| 1201 | gopal | 45 | 男 | 50,000 |
| 1202 | manisha | 40 | 女 | 50,000 |
| 1203 | khalil | 34 | 男 | 30,000 |
| 1204 | prasanth | 30 | 男 | 30,000 |
| 1205 | kiran | 20 | 男 | 40,000 |
| 1206 | laxmi | 25 | 女 | 35,000 |
| 1207 | bhavya | 20 | 女 | 15,000 |
| 1208 | reshma | 19 | 女 | 15,000 |
| 1209 | kranthi | 22 | 男 | 22,000 |
| 1210 | Satish | 24 | 男 | 25,000 |
| 1211 | Krishna | 25 | 男 | 25,000 |
| 1212 | Arshad | 28 | 男 | 20,000 |
| 1213 | lavanya | 18 | 女 | 8,000 |
我们必须编写一个应用程序来处理输入数据集,以查找不同年龄组(例如,20岁以下,21到30岁之间,30岁以上)中按性别划分的最高薪资员工。
输入数据
以上数据保存在“/home/hadoop/hadoopPartitioner”目录下的input.txt文件中,并作为输入提供。
| 1201 | gopal | 45 | 男 | 50000 |
| 1202 | manisha | 40 | 女 | 51000 |
| 1203 | khaleel | 34 | 男 | 30000 |
| 1204 | prasanth | 30 | 男 | 31000 |
| 1205 | kiran | 20 | 男 | 40000 |
| 1206 | laxmi | 25 | 女 | 35000 |
| 1207 | bhavya | 20 | 女 | 15000 |
| 1208 | reshma | 19 | 女 | 14000 |
| 1209 | kranthi | 22 | 男 | 22000 |
| 1210 | Satish | 24 | 男 | 25000 |
| 1211 | Krishna | 25 | 男 | 26000 |
| 1212 | Arshad | 28 | 男 | 20000 |
| 1213 | lavanya | 18 | 女 | 8000 |
根据给定的输入,以下是程序的算法解释。
Map 任务
Map 任务接受键值对作为输入,而我们有文本文件中的文本数据。此 Map 任务的输入如下:
输入 - 键将是诸如“任何特殊键 + 文件名 + 行号”之类的模式(例如:key = @input1),而值将是该行中的数据(例如:value = 1201 \t gopal \t 45 \t Male \t 50000)。
方法 - 此 Map 任务的操作如下:
读取值(记录数据),它作为输入值从参数列表中的字符串中获取。
使用 split 函数,分离性别并存储在字符串变量中。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
将性别信息和记录数据值作为输出键值对从 Map 任务发送到分区任务。
context.write(new Text(gender), new Text(value));
对文本文件中的所有记录重复以上所有步骤。
输出 - 您将获得性别数据和记录数据值作为键值对。
分区器任务
分区器任务接受来自 Map 任务的键值对作为其输入。分区意味着将数据划分为段。根据给定的分区条件标准,输入的键值对数据可以根据年龄标准分为三个部分。
输入 - 键值对集合中的所有数据。
key = 记录中性别的字段值。
value = 该性别的整个记录数据值。
方法 - 分区逻辑的过程如下。
- 从输入键值对中读取年龄字段值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
根据以下条件检查年龄值。
- 年龄小于或等于 20
- 年龄大于 20 且小于或等于 30。
- 年龄大于 30。
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
输出 - 键值对的所有数据被分割成三个键值对集合。Reducer 在每个集合上单独工作。
Reduce 任务
分区器任务的数量等于 Reducer 任务的数量。这里我们有三个分区器任务,因此我们有三个 Reducer 任务要执行。
输入 - Reducer 将使用不同的键值对集合执行三次。
key = 记录中性别的字段值。
value = 该性别的整个记录数据。
方法 - 以下逻辑将应用于每个集合。
- 读取每个记录的薪资字段值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
将薪资与 max 变量进行比较。如果 str[4] 是最大薪资,则将 str[4] 分配给 max,否则跳过此步骤。
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
对每个键集合重复步骤 1 和 2(Male & Female 是键集合)。执行完这三个步骤后,您将找到 Male 键集合中的一个最大薪资和 Female 键集合中的一个最大薪资。
context.write(new Text(key), new IntWritable(max));
输出 - 最终,您将获得一组键值对数据,它们包含在三个不同年龄组的集合中。它分别包含每个年龄组中来自 Male 集合的最大薪资和来自 Female 集合的最大薪资。
执行 Map、分区器和 Reduce 任务后,键值对数据的三个集合将存储在三个不同的文件中作为输出。
所有这三个任务都被视为 MapReduce 作业。这些作业的以下要求和规范应在配置中指定:
- 作业名称
- 键和值的输入和输出格式
- Map、Reduce 和分区器任务的各个类
Configuration conf = getConf(); //Create Job Job job = new Job(conf, "topsal"); job.setJarByClass(PartitionerExample.class); // File Input and Output paths FileInputFormat.setInputPaths(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job,new Path(arg[1])); //Set Mapper class and Output format for key-value pair. job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //set partitioner statement job.setPartitionerClass(CaderPartitioner.class); //Set Reducer class and Input/Output format for key-value pair. job.setReducerClass(ReduceClass.class); //Number of Reducer tasks. job.setNumReduceTasks(3); //Input and Output format for data job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
示例程序
以下程序展示了如何在 MapReduce 程序中为给定条件实现分区器。
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
将上述代码保存为“/home/hadoop/hadoopPartitioner”中的PartitionerExample.java。下面给出程序的编译和执行。
编译和执行
让我们假设我们在 Hadoop 用户的主目录中(例如,/home/hadoop)。
按照以下步骤编译和执行上述程序。
步骤 1 - 下载 Hadoop-core-1.2.1.jar,用于编译和执行 MapReduce 程序。您可以从mvnrepository.com下载该 jar 文件。
让我们假设下载的文件夹是“/home/hadoop/hadoopPartitioner”
步骤 2 - 以下命令用于编译程序PartitionerExample.java并为程序创建 jar 文件。
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .
步骤 3 - 使用以下命令在 HDFS 中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤 4 - 使用以下命令将名为input.txt的输入文件复制到 HDFS 的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
步骤 5 - 使用以下命令验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤 6 - 使用以下命令运行 Top salary 应用程序,并从输入目录中获取输入文件。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
等待一段时间,直到文件执行完毕。执行后,输出包含许多输入拆分、Map 任务和 Reducer 任务。
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully 15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=467 FILE: Number of bytes written=426777 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=480 HDFS: Number of bytes written=72 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Launched map tasks=1 Launched reduce tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=8212 Total time spent by all reduces in occupied slots (ms)=59858 Total time spent by all map tasks (ms)=8212 Total time spent by all reduce tasks (ms)=59858 Total vcore-seconds taken by all map tasks=8212 Total vcore-seconds taken by all reduce tasks=59858 Total megabyte-seconds taken by all map tasks=8409088 Total megabyte-seconds taken by all reduce tasks=61294592 Map-Reduce Framework Map input records=13 Map output records=13 Map output bytes=423 Map output materialized bytes=467 Input split bytes=119 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=467 Reduce input records=13 Reduce output records=6 Spilled Records=26 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=224 CPU time spent (ms)=3690 Physical memory (bytes) snapshot=553816064 Virtual memory (bytes) snapshot=3441266688 Total committed heap usage (bytes)=334102528 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=361 File Output Format Counters Bytes Written=72
步骤 7 - 使用以下命令验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
您将在三个文件中找到输出,因为您在程序中使用了三个分区器和三个 Reducer。
步骤 8 - 使用以下命令查看Part-00000文件中的输出。此文件由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Part-00000 中的输出
Female 15000 Male 40000
使用以下命令查看Part-00001文件中的输出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Part-00001 中的输出
Female 35000 Male 31000
使用以下命令查看Part-00002文件中的输出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Part-00002 中的输出
Female 51000 Male 50000