Hadoop - MapReduce



MapReduce是一个框架,使用它可以编写应用程序,以可靠的方式在大型商用硬件集群上并行处理海量数据。

什么是 MapReduce?

MapReduce 是一种基于 Java 的分布式计算处理技术和程序模型。MapReduce 算法包含两个重要的任务:Map 和 Reduce。Map 接收一组数据并将其转换为另一组数据,其中单个元素被分解成元组(键/值对)。其次是 Reduce 任务,它接收 Map 的输出作为输入,并将这些数据元组组合成更小的一组元组。正如 MapReduce 名称的顺序所暗示的那样,Reduce 任务总是在 Map 作业之后执行。

MapReduce 的主要优势在于它易于将数据处理扩展到多个计算节点。在 MapReduce 模型下,数据处理原语称为 Mapper 和 Reducer。将数据处理应用程序分解为MapperReducer有时并非易事。但是,一旦我们以 MapReduce 的形式编写了一个应用程序,将应用程序扩展到在集群中的数百、数千甚至数万台机器上运行仅仅是配置更改的问题。这种简单的可扩展性吸引了许多程序员使用 MapReduce 模型。

算法

  • 通常,MapReduce 范式基于将计算发送到数据所在的位置!

  • MapReduce 程序分三个阶段执行:Map 阶段、Shuffle 阶段和 Reduce 阶段。

    • Map 阶段 - Map 或 Mapper 的工作是处理输入数据。通常,输入数据以文件或目录的形式存在,并存储在 Hadoop 文件系统 (HDFS) 中。输入文件逐行传递给 Mapper 函数。Mapper 处理数据并创建多个小的数据块。

    • Reduce 阶段 - 此阶段是Shuffle阶段和Reduce阶段的组合。Reducer 的工作是处理来自 Mapper 的数据。处理后,它会生成一组新的输出,并将存储在 HDFS 中。

  • 在 MapReduce 作业期间,Hadoop 将 Map 和 Reduce 任务发送到集群中的相应服务器。

  • 框架管理所有数据传递细节,例如发出任务、验证任务完成以及在节点之间复制数据。

  • 大部分计算发生在具有本地磁盘数据的节点上,这减少了网络流量。

  • 给定任务完成后,集群收集并减少数据以形成适当的结果,并将其发送回 Hadoop 服务器。

MapReduce Algorithm

输入和输出(Java 视角)

MapReduce 框架操作的是<键,值>对,也就是说,框架将作业的输入视为一组<键,值>对,并生成一组<键,值>对作为作业的输出,这些对可能是不同类型的。

键和值类应以序列化方式由框架处理,因此需要实现 Writable 接口。此外,键类必须实现 Writable-Comparable 接口,以方便框架进行排序。MapReduce 作业的输入和输出类型 - (输入) <k1, v1> → map → <k2, v2> → reduce → <k3, v3>(输出)。

输入 输出
Map <k1, v1> 列表 (<k2, v2>)
Reduce <k2, 列表(v2)> 列表 (<k3, v3>)

术语

  • 有效载荷 - 应用程序实现 Map 和 Reduce 函数,并构成作业的核心。

  • Mapper - Mapper 将输入键/值对映射到一组中间键/值对。

  • NameNode - 管理 Hadoop 分布式文件系统 (HDFS) 的节点。

  • DataNode - 在进行任何处理之前,数据预先存在于其中的节点。

  • MasterNode - JobTracker 运行的节点,接受来自客户端的作业请求。

  • SlaveNode - Map 和 Reduce 程序运行的节点。

  • JobTracker - 调度作业并将分配的作业跟踪到 Task Tracker。

  • Task Tracker - 跟踪任务并将状态报告给 JobTracker。

  • 作业 - 程序是在数据集上执行 Mapper 和 Reducer 的过程。

  • 任务 - 在数据片上执行 Mapper 或 Reducer 的过程。

  • 任务尝试 - 在 SlaveNode 上执行任务的特定尝试实例。

示例场景

以下是关于某个组织电力消耗的数据。它包含各个年份的月度电力消耗和年度平均值。

一月 二月 三月 四月 五月 六月 七月 八月 九月 十月 十一月 十二月 平均值
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

如果上述数据作为输入给出,我们必须编写应用程序来处理它并生成结果,例如查找使用量最大的年份、使用量最小的年份等等。对于具有有限记录数量的程序员来说,这是一个简单的任务。他们只需编写逻辑来生成所需的输出,并将数据传递给编写的应用程序。

但是,想想代表某个州自成立以来所有大型工业的电力消耗的数据。

当我们编写应用程序来处理此类海量数据时,

  • 它们将花费大量时间来执行。

  • 当我们将数据从源移动到网络服务器等等时,将会有大量的网络流量。

为了解决这些问题,我们有 MapReduce 框架。

输入数据

上述数据保存为sample.txt并作为输入给出。输入文件如下所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

示例程序

以下是使用 MapReduce 框架对示例数据进行处理的程序。

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

将上述程序保存为ProcessUnits.java。程序的编译和执行解释如下。

Process Units 程序的编译和执行

让我们假设我们在 Hadoop 用户的主目录中(例如 /home/hadoop)。

按照以下步骤编译和执行上述程序。

步骤 1

以下命令用于创建目录以存储编译后的 Java 类。

$ mkdir units 

步骤 2

下载Hadoop-core-1.2.1.jar,它用于编译和执行 MapReduce 程序。访问以下链接mvnrepository.com下载 jar 包。让我们假设下载的文件夹是/home/hadoop/

步骤 3

以下命令用于编译ProcessUnits.java程序并为程序创建一个 jar 包。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

步骤 4

以下命令用于在 HDFS 中创建一个输入目录。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

步骤 5

以下命令用于将名为sample.txt的输入文件复制到 HDFS 的输入目录中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

步骤 6

以下命令用于验证输入目录中的文件。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

步骤 7

以下命令用于通过从输入目录获取输入文件来运行 Eleunit_max 应用程序。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

等待一段时间直到文件执行完毕。执行后,如下所示,输出将包含输入拆分的数量、Map 任务的数量、Reducer 任务的数量等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40 

步骤 8

以下命令用于验证输出文件夹中的结果文件。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

步骤 9

以下命令用于查看Part-00000文件中的输出。此文件由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

以下是 MapReduce 程序生成的输出。

1981    34 
1984    40 
1985    45 

步骤 10

以下命令用于将输出文件夹从 HDFS 复制到本地文件系统进行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

重要命令

所有 Hadoop 命令都由$HADOOP_HOME/bin/hadoop命令调用。运行没有任何参数的 Hadoop 脚本将打印所有命令的描述。

用法 - hadoop [--config confdir] COMMAND

下表列出了可用的选项及其描述。

序号 选项和描述
1

namenode -format

格式化 DFS 文件系统。

2

secondarynamenode

运行 DFS 次要 NameNode。

3

namenode

运行 DFS NameNode。

4

datanode

运行 DFS DataNode。

5

dfsadmin

运行 DFS 管理员客户端。

6

mradmin

运行 Map-Reduce 管理员客户端。

7

fsck

运行 DFS 文件系统检查实用程序。

8

fs

运行通用文件系统用户客户端。

9

balancer

运行集群平衡实用程序。

10

oiv

将离线 fsimage 查看器应用于 fsimage。

11

fetchdt

从 NameNode 获取委托令牌。

12

jobtracker

运行 MapReduce 作业跟踪器节点。

13

pipes

运行 Pipes 作业。

14

tasktracker

运行 MapReduce 任务跟踪器节点。

15

historyserver

作为独立守护进程运行作业历史服务器。

16

job

操作 MapReduce 作业。

17

queue

获取有关 JobQueue 的信息。

18

version

打印版本。

19

jar <jar>

运行 jar 文件。

20

distcp <srcurl> <desturl>

递归复制文件或目录。

21

distcp2 <srcurl> <desturl>

DistCp 版本 2。

22

archive -archiveName NAME -p <父路径> <源>* <目标>

创建 Hadoop 归档。

23

classpath

打印获取 Hadoop jar 和所需库所需的类路径。

24

daemonlog

获取/设置每个守护程序的日志级别

如何与 MapReduce 作业交互

用法 - hadoop job [GENERIC_OPTIONS]

以下是 Hadoop 作业中可用的通用选项。

序号 GENERIC_OPTION 和描述
1

-submit <job-file>

提交作业。

2

-status <job-id>

打印 Map 和 Reduce 完成百分比以及所有作业计数器。

3

-counter <job-id> <group-name> <countername>

打印计数器值。

4

-kill <job-id>

杀死作业。

5

-events <job-id> <fromevent-#> <#-of-events>

打印作业跟踪器接收的给定范围内的事件详细信息。

6

-history [all] <jobOutputDir> - history < jobOutputDir>

打印作业详细信息、失败和被杀死的提示详细信息。可以通过指定 [all] 选项来查看有关作业的更多详细信息,例如为每个任务完成的成功任务和任务尝试。

7

-list[all]

显示所有作业。-list 仅显示尚未完成的作业。

8

-kill-task <task-id>

杀死任务。被杀死的任务不会计入失败的尝试。

9

-fail-task <task-id>

使任务失败。失败的任务计入失败的尝试。

10

-set-priority <job-id> <priority>

更改作业的优先级。允许的优先级值为 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW

查看作业状态

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

查看作业输出目录的历史记录

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

杀死作业

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 
广告