Apache Pig 快速指南



Apache Pig - 概述

什么是 Apache Pig?

Apache Pig 是 MapReduce 之上的抽象层。它是一个用于分析大型数据集并将其表示为数据流的工具/平台。Pig 通常与 **Hadoop** 一起使用;我们可以使用 Apache Pig 在 Hadoop 中执行所有数据操作。

为了编写数据分析程序,Pig 提供了一种名为 **Pig Latin** 的高级语言。这种语言提供了各种操作符,程序员可以使用这些操作符开发自己的函数来读取、写入和处理数据。

为了使用 **Apache Pig** 分析数据,程序员需要使用 Pig Latin 语言编写脚本。所有这些脚本在内部都转换为 Map 和 Reduce 任务。Apache Pig 有一个称为 **Pig Engine** 的组件,它接受 Pig Latin 脚本作为输入并将这些脚本转换为 MapReduce 作业。

为什么我们需要 Apache Pig?

不太擅长 Java 的程序员通常在使用 Hadoop 时会遇到困难,尤其是在执行任何 MapReduce 任务时。Apache Pig 对所有这些程序员来说都是一种福音。

  • 使用 **Pig Latin**,程序员可以轻松地执行 MapReduce 任务,而无需在 Java 中键入复杂的代码。

  • Apache Pig 使用 **多查询方法**,从而减少了代码长度。例如,在 Java 中需要键入 200 行代码 (LoC) 的操作,在 Apache Pig 中只需键入 10 行代码即可轻松完成。最终,Apache Pig 将开发时间缩短了近 16 倍。

  • Pig Latin 是一种 **类似 SQL 的语言**,如果您熟悉 SQL,那么学习 Apache Pig 就很容易。

  • Apache Pig 提供了许多内置操作符来支持数据操作,如连接、过滤、排序等。此外,它还提供了 MapReduce 中缺少的嵌套数据类型,如元组、包和映射。

Pig 的特性

Apache Pig 具有以下特性:

  • **丰富的操作符集** - 它提供了许多操作符来执行连接、排序、过滤等操作。

  • **易于编程** - Pig Latin 类似于 SQL,如果您擅长 SQL,那么编写 Pig 脚本就很容易。

  • **优化机会** - Apache Pig 中的任务会自动优化其执行,因此程序员只需要关注语言的语义。

  • **可扩展性** - 用户可以使用现有的操作符开发自己的函数来读取、处理和写入数据。

  • **UDF** - Pig 提供了在其他编程语言(如 Java)中创建 **用户定义函数** 并在 Pig 脚本中调用或嵌入它们的工具。

  • **处理所有类型的数据** - Apache Pig 分析所有类型的数据,包括结构化数据和非结构化数据。它将结果存储在 HDFS 中。

Apache Pig 与 MapReduce 的比较

以下是 Apache Pig 和 MapReduce 的主要区别。

Apache Pig MapReduce
Apache Pig 是一种数据流语言。 MapReduce 是一种数据处理范式。
它是一种高级语言。 MapReduce 是低级且僵化的。
在 Apache Pig 中执行连接操作非常简单。 在 MapReduce 中,在数据集之间执行连接操作非常困难。
任何具有 SQL 基础知识的新手程序员都可以方便地使用 Apache Pig。 必须具备 Java 经验才能使用 MapReduce。
Apache Pig 使用多查询方法,从而在很大程度上减少了代码长度。 MapReduce 执行相同任务所需的代码行数几乎是其 20 倍。
无需编译。在执行时,每个 Apache Pig 操作符都会在内部转换为一个 MapReduce 作业。 MapReduce 作业的编译过程很长。

Apache Pig 与 SQL 的比较

以下是 Apache Pig 和 SQL 的主要区别。

Pig SQL
Pig Latin 是一种 **过程式** 语言。 SQL 是一种 **声明式** 语言。
在 Apache Pig 中,**模式** 是可选的。我们可以存储数据而不设计模式(值存储为 $01、$02 等)。 SQL 中模式是必须的。
Apache Pig 中的数据模型是 **嵌套关系型** 的。 SQL 中使用的数据模型是 **扁平关系型** 的。
Apache Pig 提供了有限的 **查询优化** 机会。 SQL 中有更多查询优化的机会。

除了上述差异外,Apache Pig Latin:

  • 允许在管道中进行拆分。
  • 允许开发人员将数据存储在管道中的任何位置。
  • 声明执行计划。
  • 提供操作符来执行 ETL(提取、转换和加载)函数。

Apache Pig 与 Hive 的比较

Apache Pig 和 Hive 都用于创建 MapReduce 作业。在某些情况下,Hive 以类似于 Apache Pig 的方式在 HDFS 上运行。在下表中,我们列出了一些将 Apache Pig 与 Hive 区分开来的重要要点。

Apache Pig Hive
Apache Pig 使用一种名为 **Pig Latin** 的语言。它最初是在 **雅虎** 开发的。 Hive 使用一种名为 **HiveQL** 的语言。它最初是在 **Facebook** 开发的。
Pig Latin 是一种数据流语言。 HiveQL 是一种查询处理语言。
Pig Latin 是一种过程式语言,它适合管道范式。 HiveQL 是一种声明式语言。
Apache Pig 可以处理结构化、非结构化和半结构化数据。 Hive 主要用于结构化数据。

Apache Pig 的应用

Apache Pig 通常由数据科学家用于执行涉及临时处理和快速原型设计的任务。Apache Pig 用于:

  • 处理大型数据源,如 Web 日志。
  • 执行搜索平台的数据处理。
  • 处理时间敏感的数据加载。

Apache Pig - 历史

在 **2006** 年,Apache Pig 作为雅虎的一个研究项目开发,特别是为了在每个数据集上创建和执行 MapReduce 作业。在 **2007** 年,Apache Pig 通过 Apache incubator 开源。在 **2008** 年,Apache Pig 发布了第一个版本。在 **2010** 年,Apache Pig 毕业成为 Apache 一级项目。

Apache Pig - 架构

在 Hadoop 中使用 Pig 分析数据时使用的语言称为 **Pig Latin**。它是一种高级数据处理语言,提供丰富的 数据类型和操作符来对数据执行各种操作。

为了执行特定任务,使用 Pig 的程序员需要使用 Pig Latin 语言编写 Pig 脚本,并使用任何执行机制(Grunt Shell、UDF、嵌入式)执行它们。执行后,这些脚本将经过 Pig 框架应用的一系列转换,以产生所需的输出。

在内部,Apache Pig 将这些脚本转换为一系列 MapReduce 作业,从而简化了程序员的工作。Apache Pig 的架构如下所示。

Apache Pig Architecture

Apache Pig 组件

如图所示,Apache Pig 框架中有多个组件。让我们看一下主要组件。

解析器

最初,Pig 脚本由解析器处理。它检查脚本的语法,执行类型检查以及其他杂项检查。解析器的输出将是一个 DAG(有向无环图),它表示 Pig Latin 语句和逻辑操作符。

在 DAG 中,脚本的逻辑操作符表示为节点,数据流表示为边。

优化器

逻辑计划(DAG)传递给逻辑优化器,后者执行逻辑优化,如投影和下推。

编译器

编译器将优化的逻辑计划编译成一系列 MapReduce 作业。

执行引擎

最后,MapReduce 作业按排序顺序提交到 Hadoop。最后,这些 MapReduce 作业在 Hadoop 上执行,生成所需的结果。

Pig Latin 数据模型

Pig Latin 的数据模型是完全嵌套的,它允许复杂非原子数据类型,如 **map** 和 **tuple**。以下是 Pig Latin 数据模型的图形表示。

Data Model

原子

Pig Latin 中的任何单个值,无论其数据类型如何,都称为 **原子**。它存储为字符串,可用作字符串和数字。int、long、float、double、chararray 和 bytearray 是 Pig 的原子值。数据片段或简单的原子值称为 **字段**。

**示例** - ‘raja’ 或 ‘30’

元组

由一组有序字段形成的记录称为元组,字段可以是任何类型。元组类似于 RDBMS 表中的一行。

**示例** - (Raja, 30)

包是一个无序的元组集合。换句话说,元组的集合(非唯一)称为包。每个元组可以包含任意数量的字段(灵活的模式)。包由‘{}’表示。它类似于关系数据库管理系统(RDBMS)中的表,但与RDBMS中的表不同,它不需要每个元组都包含相同数量的字段,也不需要相同位置(列)的字段具有相同的类型。

示例 − {(Raja, 30), (Mohammad, 45)}

包可以作为关系中的一个字段;在这种情况下,它被称为内部包

示例 − {Raja, 30, {9848022338, [email protected],}}

映射

映射(或数据映射)是一组键值对。需要是chararray类型,并且应该是唯一的。可以是任何类型。它由‘[]’表示。

示例 − [name#Raja, age#30]

关系

关系是元组的包。Pig Latin中的关系是无序的(不能保证元组以任何特定顺序处理)。

Apache Pig - 安装

本章介绍如何在您的系统中下载、安装和设置Apache Pig

先决条件

在使用Apache Pig之前,您必须在系统上安装Hadoop和Java。因此,在安装Apache Pig之前,请按照以下链接中给出的步骤安装Hadoop和Java:

https://tutorialspoint.com/hadoop/hadoop_enviornment_setup.htm

下载Apache Pig

首先,从以下网站下载最新版本的Apache Pig:

步骤 1

打开Apache Pig网站的主页。在新闻部分下,点击发布页面链接,如下面的快照所示。

Home Page

步骤 2

点击指定的链接后,您将被重定向到Apache Pig 发布页面。在此页面上,在下载部分下,您将有两个链接,即Pig 0.8 及更高版本Pig 0.7 及更早版本。点击Pig 0.8 及更高版本链接,然后您将被重定向到包含一组镜像的页面。

Apache Pig Releases

步骤 3

选择并点击以下所示的任意一个镜像。

Click Mirrors

步骤 4

这些镜像将带您到Pig 发布页面。此页面包含 Apache Pig 的各种版本。点击其中最新的版本。

Pig Release

步骤 5

在这些文件夹中,您将拥有 Apache Pig 在各种发行版中的源代码和二进制文件。下载 Apache Pig 0.15 的源代码和二进制文件的 tar 文件,pig0.15.0-src.tar.gzpig-0.15.0.tar.gz。

Index

安装Apache Pig

下载 Apache Pig 软件后,请按照以下步骤在 Linux 环境中安装它。

步骤 1

在安装Hadoop、Java和其他软件的安装目录所在的同一目录中创建一个名为 Pig 的目录。(在本教程中,我们在名为 Hadoop 的用户中创建了 Pig 目录)。

$ mkdir Pig

步骤 2

解压下载的 tar 文件,如下所示。

$ cd Downloads/ 
$ tar zxvf pig-0.15.0-src.tar.gz 
$ tar zxvf pig-0.15.0.tar.gz 

步骤 3

pig-0.15.0-src.tar.gz文件的内容移动到之前创建的Pig目录中,如下所示。

$ mv pig-0.15.0-src.tar.gz/* /home/Hadoop/Pig/

配置Apache Pig

安装 Apache Pig 后,我们需要对其进行配置。要进行配置,我们需要编辑两个文件:bashrc 和 pig.properties

.bashrc 文件

.bashrc文件中,设置以下变量:

  • PIG_HOME 文件夹到 Apache Pig 的安装文件夹,

  • PATH 环境变量到 bin 文件夹,以及

  • PIG_CLASSPATH 环境变量到 Hadoop 安装的 etc(配置)文件夹(包含 core-site.xml、hdfs-site.xml 和 mapred-site.xml 文件的目录)。

export PIG_HOME = /home/Hadoop/Pig
export PATH  = $PATH:/home/Hadoop/pig/bin
export PIG_CLASSPATH = $HADOOP_HOME/conf

pig.properties 文件

在 Pig 的conf文件夹中,我们有一个名为pig.properties的文件。在 pig.properties 文件中,您可以设置如下所示的各种参数。

pig -h properties 

支持以下属性:

Logging: verbose = true|false; default is false. This property is the same as -v
       switch brief=true|false; default is false. This property is the same 
       as -b switch debug=OFF|ERROR|WARN|INFO|DEBUG; default is INFO.             
       This property is the same as -d switch aggregate.warning = true|false; default is true. 
       If true, prints count of warnings of each type rather than logging each warning.		 
		 
Performance tuning: pig.cachedbag.memusage=<mem fraction>; default is 0.2 (20% of all memory).
       Note that this memory is shared across all large bags used by the application.         
       pig.skewedjoin.reduce.memusagea=<mem fraction>; default is 0.3 (30% of all memory).
       Specifies the fraction of heap available for the reducer to perform the join.
       pig.exec.nocombiner = true|false; default is false.
           Only disable combiner as a temporary workaround for problems.         
       opt.multiquery = true|false; multiquery is on by default.
           Only disable multiquery as a temporary workaround for problems.
       opt.fetch=true|false; fetch is on by default.
           Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.         
       pig.tmpfilecompression = true|false; compression is off by default.             
           Determines whether output of intermediate jobs is compressed.         
       pig.tmpfilecompression.codec = lzo|gzip; default is gzip.
           Used in conjunction with pig.tmpfilecompression. Defines compression type.         
       pig.noSplitCombination = true|false. Split combination is on by default.
           Determines if multiple small files are combined into a single map.         
			  
       pig.exec.mapPartAgg = true|false. Default is false.             
           Determines if partial aggregation is done within map phase, before records are sent to combiner.         
       pig.exec.mapPartAgg.minReduction=<min aggregation factor>. Default is 10.             
           If the in-map partial aggregation does not reduce the output num records by this factor, it gets disabled.
			  
Miscellaneous: exectype = mapreduce|tez|local; default is mapreduce. This property is the same as -x switch
       pig.additional.jars.uris=<comma seperated list of jars>. Used in place of register command.
       udf.import.list=<comma seperated list of imports>. Used to avoid package names in UDF.
       stop.on.failure = true|false; default is false. Set to true to terminate on the first error.         
       pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.
           Determines the timezone used to handle datetime datatype and UDFs.
Additionally, any Hadoop property can be specified.

验证安装

通过键入 version 命令来验证 Apache Pig 的安装。如果安装成功,您将获得如下所示的 Apache Pig 版本。

$ pig –version 
 
Apache Pig version 0.15.0 (r1682971)  
compiled Jun 01 2015, 11:44:35

Apache Pig - 执行

在上一章中,我们介绍了如何安装 Apache Pig。在本章中,我们将讨论如何执行 Apache Pig。

Apache Pig 执行模式

您可以以两种模式运行 Apache Pig,即本地模式HDFS 模式

本地模式

在此模式下,所有文件都安装并在本地主机和本地文件系统中运行。不需要 Hadoop 或 HDFS。此模式通常用于测试目的。

MapReduce 模式

MapReduce 模式是指我们使用 Apache Pig 加载或处理 Hadoop 文件系统 (HDFS) 中存在的数据。在此模式下,每当我们执行 Pig Latin 语句来处理数据时,都会在后端调用 MapReduce 作业以对 HDFS 中存在的数据执行特定操作。

Apache Pig 执行机制

Apache Pig 脚本可以通过三种方式执行,即交互模式、批处理模式和嵌入模式。

  • 交互模式(Grunt shell)− 您可以使用 Grunt shell 以交互模式运行 Apache Pig。在此 shell 中,您可以输入 Pig Latin 语句并获取输出(使用 Dump 运算符)。

  • 批处理模式(脚本)− 您可以通过将 Pig Latin 脚本写入扩展名为.pig的单个文件中来以批处理模式运行 Apache Pig。

  • 嵌入模式(UDF)− Apache Pig 提供了在 Java 等编程语言中定义我们自己的函数(User Defined Functions)并在我们的脚本中使用它们的机制。

调用 Grunt Shell

您可以使用-x选项以所需的模式(本地/MapReduce)调用 Grunt shell,如下所示。

本地模式 MapReduce 模式

命令 −

$ ./pig –x local

命令 −

$ ./pig -x mapreduce

输出

Local Mode Output

输出

MapReduce Mode Output

这两个命令中的任何一个都会为您提供如下所示的 Grunt shell 提示符。

grunt>

您可以使用‘ctrl + d’退出 Grunt shell。

调用 Grunt shell 后,您可以通过直接在其中输入 Pig Latin 语句来执行 Pig 脚本。

grunt> customers = LOAD 'customers.txt' USING PigStorage(',');

以批处理模式执行 Apache Pig

您可以将整个 Pig Latin 脚本写入一个文件中,并使用–x 命令执行它。假设我们在一个名为sample_script.pig的文件中有一个 Pig 脚本,如下所示。

Sample_script.pig

student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student.txt' USING
   PigStorage(',') as (id:int,name:chararray,city:chararray);
  
Dump student;

现在,您可以如下所示执行上述文件中的脚本。

本地模式 MapReduce 模式
$ pig -x local Sample_script.pig $ pig -x mapreduce Sample_script.pig

注意 − 我们将在后续章节中详细讨论如何在批处理模式嵌入模式下运行 Pig 脚本。

Apache Pig - Grunt Shell

调用 Grunt shell 后,您可以在 shell 中运行 Pig 脚本。除此之外,Grunt shell 还提供了一些有用的 shell 和实用程序命令。本章介绍 Grunt shell 提供的 shell 和实用程序命令。

注意 − 在本章的某些部分,使用了LoadStore等命令。请参阅相应的章节以获取有关它们的详细信息。

Shell 命令

Apache Pig 的 Grunt shell 主要用于编写 Pig Latin 脚本。在此之前,我们可以使用shfs调用任何 shell 命令。

sh 命令

使用sh命令,我们可以从 Grunt shell 调用任何 shell 命令。从 Grunt shell 使用sh命令,我们无法执行作为 shell 环境一部分的命令(例如- cd)。

语法

以下是sh命令的语法。

grunt> sh shell command parameters

示例

我们可以使用sh选项从 Grunt shell 调用 Linux shell 的ls命令,如下所示。在此示例中,它列出了/pig/bin/目录中的文件。

grunt> sh ls
   
pig 
pig_1444799121955.log 
pig.cmd 
pig.py

fs 命令

使用fs命令,我们可以从 Grunt shell 调用任何 FsShell 命令。

语法

以下是fs命令的语法。

grunt> sh File System command parameters

示例

我们可以使用 fs 命令从 Grunt shell 调用 HDFS 的 ls 命令。在以下示例中,它列出了 HDFS 根目录中的文件。

grunt> fs –ls
  
Found 3 items
drwxrwxrwx   - Hadoop supergroup          0 2015-09-08 14:13 Hbase
drwxr-xr-x   - Hadoop supergroup          0 2015-09-09 14:52 seqgen_data
drwxr-xr-x   - Hadoop supergroup          0 2015-09-08 11:30 twitter_data

同样,我们可以使用fs命令从 Grunt shell 调用所有其他文件系统 shell 命令。

实用程序命令

Grunt shell 提供了一组实用程序命令。这些包括clear、help、history、quitset等实用程序命令;以及exec、killrun等命令,用于从 Grunt shell 控制 Pig。以下是 Grunt shell 提供的实用程序命令的描述。

clear 命令

clear命令用于清除 Grunt shell 的屏幕。

语法

您可以使用clear命令清除 grunt shell 的屏幕,如下所示。

grunt> clear

help 命令

help命令为您提供 Pig 命令或 Pig 属性的列表。

用法

您可以使用help命令获取 Pig 命令的列表,如下所示。

grunt> help

Commands: <pig latin statement>; - See the PigLatin manual for details:
https://hadoop.apache.org/pig
  
File system commands:fs <fs arguments> - Equivalent to Hadoop dfs  command:
https://hadoop.apache.org/common/docs/current/hdfs_shell.html
	 
Diagnostic Commands:describe <alias>[::<alias] - Show the schema for the alias.
Inner aliases can be described as A::B.
    explain [-script <pigscript>] [-out <path>] [-brief] [-dot|-xml] 
       [-param <param_name>=<pCram_value>]
       [-param_file <file_name>] [<alias>] - 
       Show the execution plan to compute the alias or for entire script.
       -script - Explain the entire script.
       -out - Store the output into directory rather than print to stdout.
       -brief - Don't expand nested plans (presenting a smaller graph for overview).
       -dot - Generate the output in .dot format. Default is text format.
       -xml - Generate the output in .xml format. Default is text format.
       -param <param_name - See parameter substitution for details.
       -param_file <file_name> - See parameter substitution for details.
       alias - Alias to explain.
       dump <alias> - Compute the alias and writes the results to stdout.

Utility Commands: exec [-param <param_name>=param_value] [-param_file <file_name>] <script> -
       Execute the script with access to grunt environment including aliases.
       -param <param_name - See parameter substitution for details.
       -param_file <file_name> - See parameter substitution for details.
       script - Script to be executed.
    run [-param <param_name>=param_value] [-param_file <file_name>] <script> -
       Execute the script with access to grunt environment.
		 -param <param_name - See parameter substitution for details.         
       -param_file <file_name> - See parameter substitution for details.
       script - Script to be executed.
    sh  <shell command> - Invoke a shell command.
    kill <job_id> - Kill the hadoop job specified by the hadoop job id.
    set <key> <value> - Provide execution parameters to Pig. Keys and values are case sensitive.
       The following keys are supported:
       default_parallel - Script-level reduce parallelism. Basic input size heuristics used 
       by default.
       debug - Set debug on or off. Default is off.
       job.name - Single-quoted name for jobs. Default is PigLatin:<script name>     
       job.priority - Priority for jobs. Values: very_low, low, normal, high, very_high.
       Default is normal stream.skippath - String that contains the path.
       This is used by streaming any hadoop property.
    help - Display this message.
    history [-n] - Display the list statements in cache.
       -n Hide line numbers.
    quit - Quit the grunt shell. 

history 命令

此命令显示自调用 Grunt shell 以来已执行/使用的语句列表。

用法

假设自打开 Grunt shell 以来我们已执行了三个语句。

grunt> customers = LOAD 'hdfs://127.0.0.1:9000/pig_data/customers.txt' USING PigStorage(',');
 
grunt> orders = LOAD 'hdfs://127.0.0.1:9000/pig_data/orders.txt' USING PigStorage(',');
 
grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student.txt' USING PigStorage(',');
 

然后,使用history命令将产生以下输出。

grunt> history

customers = LOAD 'hdfs://127.0.0.1:9000/pig_data/customers.txt' USING PigStorage(','); 
  
orders = LOAD 'hdfs://127.0.0.1:9000/pig_data/orders.txt' USING PigStorage(',');
   
student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student.txt' USING PigStorage(',');
 

set 命令

set命令用于显示/分配 Pig 中使用的键的值。

用法

使用此命令,您可以为以下键设置值。

描述和值
default_parallel 您可以通过将任何整数作为值传递给此键来设置 map 作业的 reducer 数量。
debug 您可以通过将 on/off 传递给此键来关闭或打开 Pig 中的调试功能。
job.name 您可以通过将字符串值传递给此键来将作业名称设置为所需的作业。
job.priority

您可以通过将以下值之一传递给此键来设置作业的优先级:

  • very_low
  • low
  • normal
  • high
  • very_high
stream.skippath 对于流式传输,您可以设置数据不应从中传输的路径,方法是将所需的路径以字符串的形式传递给此键。

quit 命令

您可以使用此命令退出 Grunt shell。

用法

如下所示退出 Grunt shell。

grunt> quit

现在让我们看看可以使用哪些命令从 Grunt shell 控制 Apache Pig。

exec 命令

使用exec命令,我们可以从 Grunt shell 执行 Pig 脚本。

语法

以下是实用程序命令exec的语法。

grunt> exec [–param param_name = param_value] [–param_file file_name] [script]

示例

假设在 HDFS 的/pig_data/目录中有一个名为student.txt的文件,其内容如下。

Student.txt

001,Rajiv,Hyderabad
002,siddarth,Kolkata
003,Rajesh,Delhi

并且,假设我们在 HDFS 的/pig_data/目录中有一个名为sample_script.pig的脚本文件,其内容如下。

Sample_script.pig

student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student.txt' USING PigStorage(',') 
   as (id:int,name:chararray,city:chararray);
  
Dump student;

现在,让我们使用exec命令从 Grunt shell 执行上述脚本,如下所示。

grunt> exec /sample_script.pig

输出

exec 命令执行sample_script.pig中的脚本。按照脚本中的指示,它将student.txt文件加载到 Pig 中,并显示以下内容的 Dump 运算符结果。

(1,Rajiv,Hyderabad)
(2,siddarth,Kolkata)
(3,Rajesh,Delhi) 

kill 命令

您可以使用此命令从 Grunt shell 中终止作业。

语法

以下是kill命令的语法。

grunt> kill JobId

示例

假设有一个正在运行的 Pig 作业,其 ID 为Id_0055,您可以使用以下所示的kill命令从 Grunt shell 中终止它。

grunt> kill Id_0055

run 命令

您可以使用run命令从 Grunt shell 中运行 Pig 脚本。

语法

以下是run命令的语法。

grunt> run [–param param_name = param_value] [–param_file file_name] script

示例

假设在 HDFS 的/pig_data/目录中有一个名为student.txt的文件,其内容如下。

Student.txt

001,Rajiv,Hyderabad
002,siddarth,Kolkata
003,Rajesh,Delhi

并且,假设我们在本地文件系统中有一个名为sample_script.pig的脚本文件,其内容如下。

Sample_script.pig

student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student.txt' USING
   PigStorage(',') as (id:int,name:chararray,city:chararray);

现在,让我们使用以下所示的 run 命令从 Grunt shell 中运行上述脚本。

grunt> run /sample_script.pig

您可以使用以下所示的Dump 运算符查看脚本的输出。

grunt> Dump;

(1,Rajiv,Hyderabad)
(2,siddarth,Kolkata)
(3,Rajesh,Delhi)

注意 - execrun命令的区别在于,如果我们使用run,则脚本中的语句将在命令历史记录中可用。

Pig Latin – 基础知识

Pig Latin 是用于使用 Apache Pig 分析 Hadoop 中数据的语言。在本章中,我们将讨论 Pig Latin 的基础知识,例如 Pig Latin 语句、数据类型、通用和关系运算符以及 Pig Latin UDF。

Pig Latin – 数据模型

如前几章所述,Pig 的数据模型是完全嵌套的。关系是 Pig Latin 数据模型的最外层结构。它是一个,其中 -

  • 包是元组的集合。
  • 元组是一组有序的字段。
  • 字段是一段数据。

Pig Latin – 语句

在使用 Pig Latin 处理数据时,语句是基本结构。

  • 这些语句与关系一起使用。它们包括表达式模式

  • 每个语句以分号 (;) 结尾。

  • 我们将通过语句使用 Pig Latin 提供的运算符执行各种操作。

  • 除了 LOAD 和 STORE 之外,在执行所有其他操作时,Pig Latin 语句将关系作为输入,并生成另一个关系作为输出。

  • 在 Grunt shell 中输入Load语句后,将立即对其进行语义检查。要查看模式的内容,您需要使用Dump运算符。只有在执行dump操作后,才会执行将数据加载到文件系统中的 MapReduce 作业。

示例

以下是一个 Pig Latin 语句,它将数据加载到 Apache Pig 中。

grunt> Student_data = LOAD 'student_data.txt' USING PigStorage(',')as 
   ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

Pig Latin – 数据类型

下表描述了 Pig Latin 数据类型。

序号 数据类型 描述和示例
1 int

表示有符号的 32 位整数。

示例 : 8

2 long

表示有符号的 64 位整数。

示例:5L

3 float

表示有符号的 32 位浮点数。

示例:5.5F

4 double

表示 64 位浮点数。

示例 : 10.5

5 chararray

以 Unicode UTF-8 格式表示字符数组(字符串)。

示例:'tutorials point'

6 Bytearray

表示字节数组(blob)。

7 Boolean

表示布尔值。

示例:true/false。

8 Datetime

表示日期时间。

示例:1970-01-01T00:00:00.000+00:00

9 Biginteger

表示 Java BigInteger。

示例 : 60708090709

10 Bigdecimal

表示 Java BigDecimal

示例 : 185.98376256272893883

复杂类型
11 元组

元组是一组有序的字段。

示例:(raja, 30)

12

包是元组的集合。

示例:{(raju,30),(Mohhammad,45)}

13 映射

Map 是一组键值对。

示例:[‘name’#’Raju’, ‘age’#30]

空值

以上所有数据类型的值都可以为 NULL。Apache Pig 对空值采用与 SQL 相同的方式处理。

空值可以是未知值或不存在的值。它用作可选值的占位符。这些空值可以自然出现,也可以是操作的结果。

Pig Latin – 算术运算符

下表描述了 Pig Latin 的算术运算符。假设 a = 10 且 b = 20。

运算符 描述 示例
+

加法 - 将运算符两侧的值相加

a + b 将得到 30
-

减法 - 从左操作数中减去右操作数

a − b 将得到 -10
*

乘法 - 将运算符两侧的值相乘

a * b 将得到 200
/

除法 - 将左操作数除以右操作数

b / a 将得到 2
%

模数 - 将左操作数除以右操作数并返回余数

b % a 将得到 0
? :

Bincond - 评估布尔运算符。它有三个操作数,如下所示。

变量x = (表达式) ? value1 如果为真 : value2 如果为假

b = (a == 1)? 20: 30;

如果 a=1,则 b 的值为 20。

如果 a!=1,则 b 的值为 30。

CASE

WHEN

THEN

ELSE END

Case - case 运算符等效于嵌套的 bincond 运算符。

CASE f2 % 2

WHEN 0 THEN 'even'

WHEN 1 THEN 'odd'

END

Pig Latin – 比较运算符

下表描述了 Pig Latin 的比较运算符。

运算符 描述 示例
==

等于 - 检查两个操作数的值是否相等;如果是,则条件变为真。

(a = b) 为假
!=

不等于 - 检查两个操作数的值是否相等。如果值不相等,则条件变为真。

(a != b) 为真。
>

大于 - 检查左操作数的值是否大于右操作数的值。如果是,则条件变为真。

(a > b) 为假。
<

小于 - 检查左操作数的值是否小于右操作数的值。如果是,则条件变为真。

(a < b) 为真。
>=

大于或等于 - 检查左操作数的值是否大于或等于右操作数的值。如果是,则条件变为真。

(a >= b) 为假。
<=

小于或等于 - 检查左操作数的值是否小于或等于右操作数的值。如果是,则条件变为真。

(a <= b) 为真。
matches

模式匹配 - 检查左侧的字符串是否与右侧的常量匹配。

f1 matches '.*tutorial.*'

Pig Latin – 类型构造运算符

下表描述了 Pig Latin 的类型构造运算符。

运算符 描述 示例
()

元组构造运算符 - 此运算符用于构造元组。

(Raju, 30)
{}

包构造运算符 - 此运算符用于构造包。

{(Raju, 30), (Mohammad, 45)}
[]

Map 构造运算符 - 此运算符用于构造元组。

[name#Raja, age#30]

Pig Latin – 关系运算

下表描述了 Pig Latin 的关系运算符。

运算符 描述
加载和存储
LOAD 将数据从文件系统(本地/HDFS)加载到关系中。
STORE 将关系保存到文件系统(本地/HDFS)。
过滤
FILTER 从关系中删除不需要的行。
DISTINCT 从关系中删除重复的行。
FOREACH, GENERATE 根据数据列生成数据转换。
STREAM 使用外部程序转换关系。
分组和连接
JOIN 连接两个或多个关系。
COGROUP 对两个或多个关系中的数据进行分组。
GROUP 对单个关系中的数据进行分组。
CROSS 创建两个或多个关系的笛卡尔积。
排序
ORDER 根据一个或多个字段(升序或降序)对关系进行排序。
LIMIT 从关系中获取有限数量的元组。
组合和拆分
UNION 将两个或多个关系组合成一个关系。
SPLIT 将一个关系拆分为两个或多个关系。
诊断操作符
DUMP 在控制台上打印关系的内容。
DESCRIBE 描述关系的模式。
EXPLAIN 查看计算关系的逻辑、物理或 MapReduce 执行计划。
ILLUSTRATE 查看一系列语句的分步执行。

Apache Pig - 读取数据

通常,Apache Pig 运行在 Hadoop 之上。它是一个分析工具,用于分析存在于Hadoop File System 中的大型数据集。要使用 Apache Pig 分析数据,我们必须首先将数据加载到 Apache Pig 中。本章说明如何从 HDFS 将数据加载到 Apache Pig 中。

准备 HDFS

在 MapReduce 模式下,Pig 从 HDFS 读取(加载)数据并将结果存储回 HDFS。因此,让我们启动 HDFS 并在 HDFS 中创建以下示例数据。

学生 ID 名字 姓氏 电话 城市
001 Rajiv Reddy 9848022337 海德拉巴
002 siddarth Battacharya 9848022338 加尔各答
003 Rajesh Khanna 9848022339 德里
004 Preethi Agarwal 9848022330 浦那
005 Trupthi Mohanthy 9848022336 布巴内斯瓦尔
006 Archana Mishra 9848022335 钦奈

以上数据集包含六名学生的个人详细信息,如 ID、名字、姓氏、电话号码和城市。

步骤 1:验证 Hadoop

首先,使用以下所示的 Hadoop 版本命令验证安装。

$ hadoop version

如果您的系统包含 Hadoop,并且您已设置 PATH 变量,则您将获得以下输出 -

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop
common-2.6.0.jar

步骤 2:启动 HDFS

浏览 Hadoop 的sbin目录并启动yarn和 Hadoop dfs(分布式文件系统),如下所示。

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to /home/Hadoop/hadoop/logs/hadoopHadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to /home/Hadoop/hadoop/logs/hadoopHadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to /home/Hadoop/hadoop/logs/hadoop-Hadoopsecondarynamenode-localhost.localdomain.out
 
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to /home/Hadoop/hadoop/logs/yarn-Hadoopresourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to /home/Hadoop/hadoop/logs/yarnHadoop-nodemanager-localhost.localdomain.out

步骤 3:在 HDFS 中创建目录

在 Hadoop DFS 中,您可以使用mkdir命令创建目录。在所需路径中使用以下所示的命令在 HDFS 中创建一个名为Pig_Data的新目录。

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://127.0.0.1:9000/Pig_Data 

步骤 4:将数据放入 HDFS

Pig 的输入文件将每个元组/记录存储在单独的行中。记录中的实体由分隔符分隔(在我们的示例中,我们使用了“,”)。

在本地文件系统中,创建一个名为student_data.txt的输入文件,内容如下所示。

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

现在,使用put命令将文件从本地文件系统移动到 HDFS,如下所示。(您也可以使用copyFromLocal命令)。

$ cd $HADOOP_HOME/bin 
$ hdfs dfs -put /home/Hadoop/Pig/Pig_Data/student_data.txt dfs://127.0.0.1:9000/pig_data/

验证文件

您可以使用cat命令验证文件是否已移动到 HDFS,如下所示。

$ cd $HADOOP_HOME/bin
$ hdfs dfs -cat hdfs://127.0.0.1:9000/pig_data/student_data.txt

输出

您可以看到文件的内容,如下所示。

15/10/01 12:16:55 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
  
001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai

Load 运算符

您可以使用Pig LatinLOAD运算符从文件系统(HDFS/本地)加载数据到 Apache Pig 中。

语法

load 语句由“=”运算符分隔的两部分组成。在左侧,我们需要提及要存储数据的关系名称,在右侧,我们需要定义如何存储数据。以下是Load运算符的语法。

Relation_name = LOAD 'Input file path' USING function as schema;

其中,

  • relation_name - 我们必须提及要存储数据的关系

  • 输入文件路径 - 我们必须提及存储文件的 HDFS 目录。(在 MapReduce 模式下)

  • function - 我们必须从 Apache Pig 提供的加载函数集中选择一个函数(BinStorage, JsonLoader, PigStorage, TextLoader)。

  • Schema - 我们必须定义数据的模式。我们可以按如下方式定义所需的模式:

(column1 : data type, column2 : data type, column3 : data type);

注意 - 我们在不指定模式的情况下加载数据。在这种情况下,列将被寻址为 $01, $02 等…(检查)。

示例

例如,让我们使用LOAD命令将student_data.txt中的数据加载到 Pig 中,并将其命名为Student模式。

启动 Pig Grunt Shell

首先,打开 Linux 终端。在 MapReduce 模式下启动 Pig Grunt shell,如下所示。

$ Pig –x mapreduce

它将启动 Pig Grunt shell,如下所示。

15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL
15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE
15/10/01 12:33:37 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType

2015-10-01 12:33:38,080 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0 (r1682971) compiled Jun 01 2015, 11:44:35
2015-10-01 12:33:38,080 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/Hadoop/pig_1443683018078.log
2015-10-01 12:33:38,242 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/Hadoop/.pigbootup not found
  
2015-10-01 12:33:39,630 [main]
INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://127.0.0.1:9000
 
grunt>

执行 Load 语句

现在,通过在 Grunt shell 中执行以下 Pig Latin 语句,将student_data.txt文件中的数据加载到 Pig 中。

grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data.txt' 
   USING PigStorage(',')
   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, 
   city:chararray );

以下是上述语句的描述。

关系名称 我们将数据存储在student模式中。
输入文件路径 我们正在读取student_data.txt文件中的数据,该文件位于 HDFS 的 /pig_data/ 目录中。
存储函数 我们使用了PigStorage()函数。它将数据加载并存储为结构化文本文件。它将元组的每个实体分隔使用的分隔符作为参数。默认情况下,它将‘\t’作为参数。
模式

我们使用以下模式存储数据。

id firstname lastname phone city
数据类型 int 字符数组 字符数组 字符数组 字符数组

注意 - load语句只会将数据加载到 Pig 中指定的关系中。要验证Load语句的执行,您必须使用诊断运算符,这些运算符将在后面的章节中讨论。

Apache Pig - 存储数据

在上一章中,我们学习了如何将数据加载到 Apache Pig 中。您可以使用store运算符将加载的数据存储到文件系统中。本章解释如何使用Store运算符在 Apache Pig 中存储数据。

语法

以下是 Store 语句的语法。

STORE Relation_name INTO ' required_directory_path ' [USING function];

示例

假设我们在 HDFS 中有一个名为student_data.txt的文件,内容如下。

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

并且我们使用 LOAD 运算符将其读取到student关系中,如下所示。

grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data.txt' 
   USING PigStorage(',')
   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, 
   city:chararray );

现在,让我们将关系存储到 HDFS 目录“/pig_Output/”中,如下所示。

grunt> STORE student INTO ' hdfs://127.0.0.1:9000/pig_Output/ ' USING PigStorage (',');

输出

执行store语句后,您将获得以下输出。将创建一个具有指定名称的目录,并将数据存储在其中。

2015-10-05 13:05:05,429 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.
MapReduceLau ncher - 100% complete
2015-10-05 13:05:05,429 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - 
Script Statistics:
   
HadoopVersion    PigVersion    UserId    StartedAt             FinishedAt             Features 
2.6.0            0.15.0        Hadoop    2015-10-0 13:03:03    2015-10-05 13:05:05    UNKNOWN  
Success!  
Job Stats (time in seconds): 
JobId          Maps    Reduces    MaxMapTime    MinMapTime    AvgMapTime    MedianMapTime    
job_14459_06    1        0           n/a           n/a           n/a           n/a
MaxReduceTime    MinReduceTime    AvgReduceTime    MedianReducetime    Alias    Feature   
     0                 0                0                0             student  MAP_ONLY 
OutPut folder
hdfs://127.0.0.1:9000/pig_Output/ 
 
Input(s): Successfully read 0 records from: "hdfs://127.0.0.1:9000/pig_data/student_data.txt"  
Output(s): Successfully stored 0 records in: "hdfs://127.0.0.1:9000/pig_Output"  
Counters:
Total records written : 0
Total bytes written : 0
Spillable Memory Manager spill count : 0 
Total bags proactively spilled: 0
Total records proactively spilled: 0
  
Job DAG: job_1443519499159_0006
  
2015-10-05 13:06:06,192 [main] INFO  org.apache.pig.backend.hadoop.executionengine
.mapReduceLayer.MapReduceLau ncher - Success!

验证

您可以验证存储的数据,如下所示。

步骤 1

首先,使用ls命令列出名为pig_output目录中的文件,如下所示。

hdfs dfs -ls 'hdfs://127.0.0.1:9000/pig_Output/'
Found 2 items
rw-r--r-   1 Hadoop supergroup          0 2015-10-05 13:03 hdfs://127.0.0.1:9000/pig_Output/_SUCCESS
rw-r--r-   1 Hadoop supergroup        224 2015-10-05 13:03 hdfs://127.0.0.1:9000/pig_Output/part-m-00000

您可以观察到,执行store语句后创建了两个文件。

步骤 2

使用cat命令列出名为part-m-00000文件的内容,如下所示。

$ hdfs dfs -cat 'hdfs://127.0.0.1:9000/pig_Output/part-m-00000' 
1,Rajiv,Reddy,9848022337,Hyderabad
2,siddarth,Battacharya,9848022338,Kolkata
3,Rajesh,Khanna,9848022339,Delhi
4,Preethi,Agarwal,9848022330,Pune
5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
6,Archana,Mishra,9848022335,Chennai 

Apache Pig - 诊断运算符

load语句只会将数据加载到 Apache Pig 中指定的关系中。要验证Load语句的执行,您必须使用诊断运算符。Pig Latin 提供了四种不同类型的诊断运算符:

  • Dump 运算符
  • Describe 运算符
  • Explain 运算符
  • Illustrate 运算符

在本章中,我们将讨论 Pig Latin 的 Dump 运算符。

Dump 运算符

Dump运算符用于运行 Pig Latin 语句并在屏幕上显示结果。它通常用于调试目的。

语法

以下是Dump运算符的语法。

grunt> Dump Relation_Name

示例

假设我们在 HDFS 中有一个名为student_data.txt的文件,内容如下。

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

并且我们使用 LOAD 运算符将其读取到student关系中,如下所示。

grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data.txt' 
   USING PigStorage(',')
   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, 
   city:chararray );

现在,让我们使用Dump 运算符打印关系的内容,如下所示。

grunt> Dump student

执行上述Pig Latin语句后,它将启动一个 MapReduce 作业以从 HDFS 读取数据。它将产生以下输出。

2015-10-01 15:05:27,642 [main]
INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 
100% complete
2015-10-01 15:05:27,652 [main]
INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:   
HadoopVersion  PigVersion  UserId    StartedAt             FinishedAt       Features             
2.6.0          0.15.0      Hadoop  2015-10-01 15:03:11  2015-10-01 05:27     UNKNOWN
                                                
Success!  
Job Stats (time in seconds):
  
JobId           job_14459_0004
Maps                 1  
Reduces              0  
MaxMapTime          n/a    
MinMapTime          n/a
AvgMapTime          n/a 
MedianMapTime       n/a
MaxReduceTime        0
MinReduceTime        0  
AvgReduceTime        0
MedianReducetime     0
Alias             student 
Feature           MAP_ONLY        
Outputs           hdfs://127.0.0.1:9000/tmp/temp580182027/tmp757878456,

Input(s): Successfully read 0 records from: "hdfs://127.0.0.1:9000/pig_data/
student_data.txt"
  
Output(s): Successfully stored 0 records in: "hdfs://127.0.0.1:9000/tmp/temp580182027/
tmp757878456"  

Counters: Total records written : 0 Total bytes written : 0 Spillable Memory Manager 
spill count : 0Total bags proactively spilled: 0 Total records proactively spilled: 0  

Job DAG: job_1443519499159_0004
  
2015-10-01 15:06:28,403 [main]
INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - Success!
2015-10-01 15:06:28,441 [main] INFO  org.apache.pig.data.SchemaTupleBackend - 
Key [pig.schematuple] was not set... will not generate code.
2015-10-01 15:06:28,485 [main]
INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths 
to process : 1
2015-10-01 15:06:28,485 [main]
INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths
to process : 1

(1,Rajiv,Reddy,9848022337,Hyderabad)
(2,siddarth,Battacharya,9848022338,Kolkata)
(3,Rajesh,Khanna,9848022339,Delhi)
(4,Preethi,Agarwal,9848022330,Pune)
(5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)
(6,Archana,Mishra,9848022335,Chennai)

Apache Pig - Describe 操作符

describe运算符用于查看关系的模式。

语法

describe运算符的语法如下:

grunt> Describe Relation_name

示例

假设我们在 HDFS 中有一个名为student_data.txt的文件,内容如下。

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

并且我们使用 LOAD 运算符将其读取到student关系中,如下所示。

grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data.txt' USING PigStorage(',')
   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们描述名为student的关系并验证模式,如下所示。

grunt> describe student;

输出

执行上述Pig Latin语句后,它将产生以下输出。

grunt> student: { id: int,firstname: chararray,lastname: chararray,phone: chararray,city: chararray }

Apache Pig - Explain 操作符

explain运算符用于显示关系的逻辑、物理和 MapReduce 执行计划。

语法

以下是explain运算符的语法。

grunt> explain Relation_name;

示例

假设我们在 HDFS 中有一个名为student_data.txt的文件,内容如下。

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

并且我们使用 LOAD 运算符将其读取到student关系中,如下所示。

grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data.txt' USING PigStorage(',')
   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们使用explain运算符解释名为 student 的关系,如下所示。

grunt> explain student;

输出

它将产生以下输出。

$ explain student;

2015-10-05 11:32:43,660 [main]
2015-10-05 11:32:43,660 [main] INFO  org.apache.pig.newplan.logical.optimizer
.LogicalPlanOptimizer -
{RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator,
GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, 
MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer,
PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}  
#-----------------------------------------------
# New Logical Plan: 
#-----------------------------------------------
student: (Name: LOStore Schema:
id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#
35:chararray)
| 
|---student: (Name: LOForEach Schema:
id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#
35:chararray)
    |   |
    |   (Name: LOGenerate[false,false,false,false,false] Schema:
id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#
35:chararray)ColumnPrune:InputUids=[34, 35, 32, 33,
31]ColumnPrune:OutputUids=[34, 35, 32, 33, 31]
    |   |   | 
    |   |   (Name: Cast Type: int Uid: 31) 
    |   |   |     |   |   |---id:(Name: Project Type: bytearray Uid: 31 Input: 0 Column: (*))
    |   |   |     
    |   |   (Name: Cast Type: chararray Uid: 32)
    |   |   | 
    |   |   |---firstname:(Name: Project Type: bytearray Uid: 32 Input: 1
Column: (*))
    |   |   |
    |   |   (Name: Cast Type: chararray Uid: 33)
    |   |   |
    |   |   |---lastname:(Name: Project Type: bytearray Uid: 33 Input: 2
	 Column: (*))
    |   |   | 
    |   |   (Name: Cast Type: chararray Uid: 34)
    |   |   |  
    |   |   |---phone:(Name: Project Type: bytearray Uid: 34 Input: 3 Column:
(*))
    |   |   | 
    |   |   (Name: Cast Type: chararray Uid: 35)
    |   |   |  
    |   |   |---city:(Name: Project Type: bytearray Uid: 35 Input: 4 Column:
(*))
    |   | 
    |   |---(Name: LOInnerLoad[0] Schema: id#31:bytearray)
    |   |  
    |   |---(Name: LOInnerLoad[1] Schema: firstname#32:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[2] Schema: lastname#33:bytearray)
    |   |
    |   |---(Name: LOInnerLoad[3] Schema: phone#34:bytearray)
    |   | 
    |   |---(Name: LOInnerLoad[4] Schema: city#35:bytearray)
    |
    |---student: (Name: LOLoad Schema: 
id#31:bytearray,firstname#32:bytearray,lastname#33:bytearray,phone#34:bytearray
,city#35:bytearray)RequiredFields:null 
#-----------------------------------------------
# Physical Plan: #-----------------------------------------------
student: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
| 
|---student: New For Each(false,false,false,false,false)[bag] - scope-35
    |   |
    |   Cast[int] - scope-21
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |  
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |   | 
    |   Cast[chararray] - scope-27
    |   |  
    |   |---Project[bytearray][2] - scope-26 
    |   |  
    |   Cast[chararray] - scope-30 
    |   |  
    |   |---Project[bytearray][3] - scope-29
    |   |
    |   Cast[chararray] - scope-33
    |   | 
    |   |---Project[bytearray][4] - scope-32
    | 
    |---student: Load(hdfs://127.0.0.1:9000/pig_data/student_data.txt:PigStorage(',')) - scope19
2015-10-05 11:32:43,682 [main]
INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - 
File concatenation threshold: 100 optimistic? false
2015-10-05 11:32:43,684 [main]
INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOp timizer - 
MR plan size before optimization: 1 2015-10-05 11:32:43,685 [main]
INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.
MultiQueryOp timizer - MR plan size after optimization: 1 
#--------------------------------------------------
# Map Reduce Plan                                   
#--------------------------------------------------
MapReduce node scope-37
Map Plan
student: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
|
|---student: New For Each(false,false,false,false,false)[bag] - scope-35
    |   |
    |   Cast[int] - scope-21 
    |   |
    |   |---Project[bytearray][0] - scope-20
    |   |
    |   Cast[chararray] - scope-24
    |   |
    |   |---Project[bytearray][1] - scope-23
    |   |
    |   Cast[chararray] - scope-27
    |   | 
    |   |---Project[bytearray][2] - scope-26 
    |   | 
    |   Cast[chararray] - scope-30 
    |   |  
    |   |---Project[bytearray][3] - scope-29 
    |   | 
    |   Cast[chararray] - scope-33
    |   | 
    |   |---Project[bytearray][4] - scope-32 
    |  
    |---student:
Load(hdfs://127.0.0.1:9000/pig_data/student_data.txt:PigStorage(',')) - scope
19-------- Global sort: false
 ---------------- 

Apache Pig - Illustrate 操作符

illustrate运算符提供了一系列语句的逐步执行过程。

语法

以下是illustrate运算符的语法。

grunt> illustrate Relation_name;

示例

假设我们在 HDFS 中有一个名为student_data.txt的文件,内容如下。

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata 
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune 
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

并且我们使用 LOAD 运算符将其读取到student关系中,如下所示。

grunt> student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data.txt' USING PigStorage(',')
   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们说明名为 student 的关系,如下所示。

grunt> illustrate student;

输出

执行上述语句后,您将获得以下输出。

grunt> illustrate student;

INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$M ap - Aliases
being processed per job phase (AliasName[line,offset]): M: student[1,10] C:  R:
---------------------------------------------------------------------------------------------
|student | id:int | firstname:chararray | lastname:chararray | phone:chararray | city:chararray |
--------------------------------------------------------------------------------------------- 
|        | 002    | siddarth            | Battacharya        | 9848022338      | Kolkata        |
---------------------------------------------------------------------------------------------

Apache Pig - Group 操作符

GROUP运算符用于对一个或多个关系中的数据进行分组。它收集具有相同键的数据。

语法

以下是group运算符的语法。

grunt> Group_data = GROUP Relation_name BY age;

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi
004,Preethi,Agarwal,21,9848022330,Pune
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar
006,Archana,Mishra,23,9848022335,Chennai
007,Komal,Nayak,24,9848022334,trivendram
008,Bharathi,Nambiayar,24,9848022333,Chennai

并且我们已将此文件加载到 Apache Pig 中,关系名为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);

现在,让我们按年龄对关系中的记录/元组进行分组,如下所示。

grunt> group_data = GROUP student_details by age;

验证

使用DUMP运算符验证group_data关系,如下所示。

grunt> Dump group_data;

输出

然后您将获得显示名为group_data的关系内容的输出,如下所示。在这里您可以观察到结果模式有两列:

  • 一个是age,我们根据它对关系进行了分组。

  • 另一个是bag,它包含元组组,即具有相应年龄的学生记录。

(21,{(4,Preethi,Agarwal,21,9848022330,Pune),(1,Rajiv,Reddy,21,9848022337,Hydera bad)})
(22,{(3,Rajesh,Khanna,22,9848022339,Delhi),(2,siddarth,Battacharya,22,984802233 8,Kolkata)})
(23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)})
(24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)})

您可以使用describe命令查看分组数据后的表模式,如下所示。

grunt> Describe group_data;
  
group_data: {group: int,student_details: {(id: int,firstname: chararray,
               lastname: chararray,age: int,phone: chararray,city: chararray)}}

同样,您可以使用illustrate命令获取模式的示例说明,如下所示。

$ Illustrate group_data;

它将产生以下输出:

------------------------------------------------------------------------------------------------- 
|group_data|  group:int | student_details:bag{:tuple(id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray)}|
------------------------------------------------------------------------------------------------- 
|          |     21     | { 4, Preethi, Agarwal, 21, 9848022330, Pune), (1, Rajiv, Reddy, 21, 9848022337, Hyderabad)}| 
|          |     2      | {(2,siddarth,Battacharya,22,9848022338,Kolkata),(003,Rajesh,Khanna,22,9848022339,Delhi)}| 
-------------------------------------------------------------------------------------------------

按多列分组

让我们按年龄和城市对关系进行分组,如下所示。

grunt> group_multiple = GROUP student_details by (age, city);

您可以使用 Dump 运算符验证名为group_multiple的关系内容,如下所示。

grunt> Dump group_multiple; 
  
((21,Pune),{(4,Preethi,Agarwal,21,9848022330,Pune)})
((21,Hyderabad),{(1,Rajiv,Reddy,21,9848022337,Hyderabad)})
((22,Delhi),{(3,Rajesh,Khanna,22,9848022339,Delhi)})
((22,Kolkata),{(2,siddarth,Battacharya,22,9848022338,Kolkata)})
((23,Chennai),{(6,Archana,Mishra,23,9848022335,Chennai)})
((23,Bhuwaneshwar),{(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)})
((24,Chennai),{(8,Bharathi,Nambiayar,24,9848022333,Chennai)})
(24,trivendram),{(7,Komal,Nayak,24,9848022334,trivendram)})

全部分组

您可以按所有列对关系进行分组,如下所示。

grunt> group_all = GROUP student_details All;

现在,验证group_all关系的内容,如下所示。

grunt> Dump group_all;  
  
(all,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334 ,trivendram), 
(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336,Bhuw aneshwar), 
(4,Preethi,Agarwal,21,9848022330,Pune),(3,Rajesh,Khanna,22,9848022339,Delhi), 
(2,siddarth,Battacharya,22,9848022338,Kolkata),(1,Rajiv,Reddy,21,9848022337,Hyd erabad)})

Apache Pig - Cogroup 操作符

COGROUP运算符的工作方式与GROUP运算符大致相同。这两个运算符之间的唯一区别在于,group运算符通常用于一个关系,而cogroup运算符用于涉及两个或多个关系的语句。

使用 Cogroup 对两个关系进行分组

假设我们在 HDFS 目录/pig_data/中分别有两个文件,名为student_details.txtemployee_details.txt,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi
004,Preethi,Agarwal,21,9848022330,Pune
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar
006,Archana,Mishra,23,9848022335,Chennai
007,Komal,Nayak,24,9848022334,trivendram
008,Bharathi,Nambiayar,24,9848022333,Chennai

employee_details.txt

001,Robin,22,newyork 
002,BOB,23,Kolkata 
003,Maya,23,Tokyo 
004,Sara,25,London 
005,David,23,Bhuwaneshwar 
006,Maggy,22,Chennai

并且我们已将这些文件加载到 Pig 中,关系名称分别为student_detailsemployee_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray); 
  
grunt> employee_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/employee_details.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, city:chararray);

现在,让我们按键 age 对student_detailsemployee_details关系的记录/元组进行分组,如下所示。

grunt> cogroup_data = COGROUP student_details by age, employee_details by age;

验证

使用DUMP运算符验证cogroup_data关系,如下所示。

grunt> Dump cogroup_data;

输出

它将产生以下输出,显示名为cogroup_data的关系内容,如下所示。

(21,{(4,Preethi,Agarwal,21,9848022330,Pune), (1,Rajiv,Reddy,21,9848022337,Hyderabad)}, 
   {    })  
(22,{ (3,Rajesh,Khanna,22,9848022339,Delhi), (2,siddarth,Battacharya,22,9848022338,Kolkata) },  
   { (6,Maggy,22,Chennai),(1,Robin,22,newyork) })  
(23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)}, 
   {(5,David,23,Bhuwaneshwar),(3,Maya,23,Tokyo),(2,BOB,23,Kolkata)}) 
(24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)}, 
   { })  
(25,{   }, 
   {(4,Sara,25,London)})

cogroup运算符根据年龄对每个关系中的元组进行分组,其中每个组表示一个特定的年龄值。

例如,如果我们考虑结果的第一个元组,它按年龄 21 分组。并且它包含两个包:

  • 第一个包包含第一个关系(在本例中为student_details)中所有年龄为 21 的元组,以及

  • 第二个包包含第二个关系(在本例中为employee_details)中所有年龄为 21 的元组。

如果某个关系没有年龄值为 21 的元组,则它将返回一个空包。

Apache Pig - Join 操作符

JOIN运算符用于组合两个或多个关系中的记录。在执行联接操作时,我们声明每个关系中的一个(或一组)元组作为键。当这些键匹配时,这两个特定元组将匹配,否则记录将被丢弃。联接可以是以下类型:

  • 自联接
  • 内联接
  • 外联接 - 左联接、右联接和全联接

本章将通过示例解释如何在 Pig Latin 中使用联接运算符。假设我们在 HDFS 的/pig_data/目录中分别有两个文件,名为customers.txtorders.txt,内容如下。

customers.txt

1,Ramesh,32,Ahmedabad,2000.00
2,Khilan,25,Delhi,1500.00
3,kaushik,23,Kota,2000.00
4,Chaitali,25,Mumbai,6500.00 
5,Hardik,27,Bhopal,8500.00
6,Komal,22,MP,4500.00
7,Muffy,24,Indore,10000.00

orders.txt

102,2009-10-08 00:00:00,3,3000
100,2009-10-08 00:00:00,3,1500
101,2009-11-20 00:00:00,2,1560
103,2008-05-20 00:00:00,4,2060

并且我们已将这两个文件加载到 Pig 中,关系分别为customersorders,如下所示。

grunt> customers = LOAD 'hdfs://127.0.0.1:9000/pig_data/customers.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, address:chararray, salary:int);
  
grunt> orders = LOAD 'hdfs://127.0.0.1:9000/pig_data/orders.txt' USING PigStorage(',')
   as (oid:int, date:chararray, customer_id:int, amount:int);

现在,让我们对这两个关系执行各种联接操作。

自联接

自联接用于将表与自身联接,就好像该表是两个关系一样,暂时重命名至少一个关系。

通常,在 Apache Pig 中,要执行自联接,我们将多次加载相同的数据,并使用不同的别名(名称)。因此,让我们将customers.txt文件的内容加载为两个表,如下所示。

grunt> customers1 = LOAD 'hdfs://127.0.0.1:9000/pig_data/customers.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, address:chararray, salary:int);
  
grunt> customers2 = LOAD 'hdfs://127.0.0.1:9000/pig_data/customers.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, address:chararray, salary:int); 

语法

以下是使用JOIN运算符执行自联接操作的语法。

grunt> Relation3_name = JOIN Relation1_name BY key, Relation2_name BY key ;

示例

让我们通过联接customers1customers2这两个关系,对customers关系执行自联接操作,如下所示。

grunt> customers3 = JOIN customers1 BY id, customers2 BY id;

验证

使用DUMP运算符验证customers3关系,如下所示。

grunt> Dump customers3;

输出

它将产生以下输出,显示customers关系的内容。

(1,Ramesh,32,Ahmedabad,2000,1,Ramesh,32,Ahmedabad,2000)
(2,Khilan,25,Delhi,1500,2,Khilan,25,Delhi,1500)
(3,kaushik,23,Kota,2000,3,kaushik,23,Kota,2000)
(4,Chaitali,25,Mumbai,6500,4,Chaitali,25,Mumbai,6500)
(5,Hardik,27,Bhopal,8500,5,Hardik,27,Bhopal,8500)
(6,Komal,22,MP,4500,6,Komal,22,MP,4500)
(7,Muffy,24,Indore,10000,7,Muffy,24,Indore,10000)

内联接

内联接使用频率很高;它也称为等值联接。当两个表中都有匹配项时,内联接将返回行。

它通过根据联接谓词组合两个关系(例如 A 和 B)的列值来创建一个新关系。查询将 A 的每一行与 B 的每一行进行比较,以查找满足联接谓词的所有行对。当满足联接谓词时,A 和 B 的每一对匹配行的列值将组合到结果行中。

语法

以下是使用JOIN运算符执行内联接操作的语法。

grunt> result = JOIN relation1 BY columnname, relation2 BY columnname;

示例

让我们对customersorders这两个关系执行内联接操作,如下所示。

grunt> coustomer_orders = JOIN customers BY id, orders BY customer_id;

验证

使用DUMP运算符验证coustomer_orders关系,如下所示。

grunt> Dump coustomer_orders;

输出

您将获得以下输出,其中包含名为coustomer_orders的关系的内容。

(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

注意

外部连接:与内部连接不同,外部连接返回至少一个关系中的所有行。外部连接操作以三种方式执行 -

  • 左外部连接
  • 右外部连接
  • 全外部连接

左外部连接

左外部连接操作返回左表中的所有行,即使右关系中没有匹配项。

语法

下面是使用JOIN运算符执行左外部连接操作的语法。

grunt> Relation3_name = JOIN Relation1_name BY id LEFT OUTER, Relation2_name BY customer_id;

示例

让我们在两个关系customers和orders上执行左外部连接操作,如下所示。

grunt> outer_left = JOIN customers BY id LEFT OUTER, orders BY customer_id;

验证

使用DUMP运算符验证关系outer_left,如下所示。

grunt> Dump outer_left;

输出

它将生成以下输出,显示关系outer_left的内容。

(1,Ramesh,32,Ahmedabad,2000,,,,)
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)
(5,Hardik,27,Bhopal,8500,,,,)
(6,Komal,22,MP,4500,,,,)
(7,Muffy,24,Indore,10000,,,,) 

右外部连接

右外部连接操作返回右表中的所有行,即使左表中没有匹配项。

语法

下面是使用JOIN运算符执行右外部连接操作的语法。

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;

示例

让我们在两个关系customersorders上执行右外部连接操作,如下所示。

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;

验证

使用DUMP运算符验证关系outer_right,如下所示。

grunt> Dump outer_right

输出

它将生成以下输出,显示关系outer_right的内容。

(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

全外部连接

全外部连接操作在其中一个关系中存在匹配时返回行。

语法

下面是使用JOIN运算符执行全外部连接的语法。

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;

示例

让我们在两个关系customersorders上执行全外部连接操作,如下所示。

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;

验证

使用DUMP运算符验证关系outer_full,如下所示。

grun> Dump outer_full; 

输出

它将生成以下输出,显示关系outer_full的内容。

(1,Ramesh,32,Ahmedabad,2000,,,,)
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)
(5,Hardik,27,Bhopal,8500,,,,)
(6,Komal,22,MP,4500,,,,)
(7,Muffy,24,Indore,10000,,,,)

使用多个键

我们可以使用多个键执行JOIN操作。

语法

以下是如何使用多个键在两个表上执行JOIN操作。

grunt> Relation3_name = JOIN Relation2_name BY (key1, key2), Relation3_name BY (key1, key2);

假设我们在HDFS的/pig_data/目录中具有两个文件,名为employee.txtemployee_contact.txt,如下所示。

employee.txt

001,Rajiv,Reddy,21,programmer,003
002,siddarth,Battacharya,22,programmer,003
003,Rajesh,Khanna,22,programmer,003
004,Preethi,Agarwal,21,programmer,003
005,Trupthi,Mohanthy,23,programmer,003
006,Archana,Mishra,23,programmer,003
007,Komal,Nayak,24,teamlead,002
008,Bharathi,Nambiayar,24,manager,001

employee_contact.txt

001,9848022337,[email protected],Hyderabad,003
002,9848022338,[email protected],Kolkata,003
003,9848022339,[email protected],Delhi,003
004,9848022330,[email protected],Pune,003
005,9848022336,[email protected],Bhuwaneshwar,003
006,9848022335,[email protected],Chennai,003
007,9848022334,[email protected],trivendram,002
008,9848022333,[email protected],Chennai,001

并且我们已将这两个文件加载到Pig中,关系为employeeemployee_contact,如下所示。

grunt> employee = LOAD 'hdfs://127.0.0.1:9000/pig_data/employee.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray, age:int, designation:chararray, jobid:int);
  
grunt> employee_contact = LOAD 'hdfs://127.0.0.1:9000/pig_data/employee_contact.txt' USING PigStorage(',') 
   as (id:int, phone:chararray, email:chararray, city:chararray, jobid:int);

现在,让我们使用JOIN运算符连接这两个关系的内容,如下所示。

grunt> emp = JOIN employee BY (id,jobid), employee_contact BY (id,jobid);

验证

使用DUMP运算符验证关系emp,如下所示。

grunt> Dump emp; 

输出

它将生成以下输出,显示名为emp的关系的内容,如下所示。

(1,Rajiv,Reddy,21,programmer,113,1,9848022337,[email protected],Hyderabad,113)
(2,siddarth,Battacharya,22,programmer,113,2,9848022338,[email protected],Kolka ta,113)  
(3,Rajesh,Khanna,22,programmer,113,3,9848022339,[email protected],Delhi,113)  
(4,Preethi,Agarwal,21,programmer,113,4,9848022330,[email protected],Pune,113)  
(5,Trupthi,Mohanthy,23,programmer,113,5,9848022336,[email protected],Bhuwaneshw ar,113)  
(6,Archana,Mishra,23,programmer,113,6,9848022335,[email protected],Chennai,113)  
(7,Komal,Nayak,24,teamlead,112,7,9848022334,[email protected],trivendram,112)  
(8,Bharathi,Nambiayar,24,manager,111,8,9848022333,[email protected],Chennai,111)

Apache Pig - Cross 操作符

CROSS运算符计算两个或多个关系的笛卡尔积。本章将通过示例说明如何在Pig Latin中使用cross运算符。

语法

下面是CROSS运算符的语法。

grunt> Relation3_name = CROSS Relation1_name, Relation2_name;

示例

假设我们在HDFS的/pig_data/目录中具有两个文件,名为customers.txtorders.txt,如下所示。

customers.txt

1,Ramesh,32,Ahmedabad,2000.00
2,Khilan,25,Delhi,1500.00
3,kaushik,23,Kota,2000.00
4,Chaitali,25,Mumbai,6500.00
5,Hardik,27,Bhopal,8500.00
6,Komal,22,MP,4500.00
7,Muffy,24,Indore,10000.00

orders.txt

102,2009-10-08 00:00:00,3,3000
100,2009-10-08 00:00:00,3,1500
101,2009-11-20 00:00:00,2,1560
103,2008-05-20 00:00:00,4,2060

并且我们已将这两个文件加载到 Pig 中,关系分别为customersorders,如下所示。

grunt> customers = LOAD 'hdfs://127.0.0.1:9000/pig_data/customers.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, address:chararray, salary:int);
  
grunt> orders = LOAD 'hdfs://127.0.0.1:9000/pig_data/orders.txt' USING PigStorage(',')
   as (oid:int, date:chararray, customer_id:int, amount:int);

现在,让我们使用这两个关系上的cross运算符获取这两个关系的笛卡尔积,如下所示。

grunt> cross_data = CROSS customers, orders;

验证

使用DUMP运算符验证关系cross_data,如下所示。

grunt> Dump cross_data;

输出

它将生成以下输出,显示关系cross_data的内容。

(7,Muffy,24,Indore,10000,103,2008-05-20 00:00:00,4,2060) 
(7,Muffy,24,Indore,10000,101,2009-11-20 00:00:00,2,1560) 
(7,Muffy,24,Indore,10000,100,2009-10-08 00:00:00,3,1500) 
(7,Muffy,24,Indore,10000,102,2009-10-08 00:00:00,3,3000) 
(6,Komal,22,MP,4500,103,2008-05-20 00:00:00,4,2060) 
(6,Komal,22,MP,4500,101,2009-11-20 00:00:00,2,1560) 
(6,Komal,22,MP,4500,100,2009-10-08 00:00:00,3,1500) 
(6,Komal,22,MP,4500,102,2009-10-08 00:00:00,3,3000) 
(5,Hardik,27,Bhopal,8500,103,2008-05-20 00:00:00,4,2060) 
(5,Hardik,27,Bhopal,8500,101,2009-11-20 00:00:00,2,1560) 
(5,Hardik,27,Bhopal,8500,100,2009-10-08 00:00:00,3,1500) 
(5,Hardik,27,Bhopal,8500,102,2009-10-08 00:00:00,3,3000) 
(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060) 
(4,Chaitali,25,Mumbai,6500,101,2009-20 00:00:00,4,2060) 
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560) 
(2,Khilan,25,Delhi,1500,100,2009-10-08 00:00:00,3,1500) 
(2,Khilan,25,Delhi,1500,102,2009-10-08 00:00:00,3,3000) 
(1,Ramesh,32,Ahmedabad,2000,103,2008-05-20 00:00:00,4,2060) 
(1,Ramesh,32,Ahmedabad,2000,101,2009-11-20 00:00:00,2,1560) 
(1,Ramesh,32,Ahmedabad,2000,100,2009-10-08 00:00:00,3,1500) 
(1,Ramesh,32,Ahmedabad,2000,102,2009-10-08 00:00:00,3,3000)-11-20 00:00:00,2,1560) 
(4,Chaitali,25,Mumbai,6500,100,2009-10-08 00:00:00,3,1500) 
(4,Chaitali,25,Mumbai,6500,102,2009-10-08 00:00:00,3,3000) 
(3,kaushik,23,Kota,2000,103,2008-05-20 00:00:00,4,2060) 
(3,kaushik,23,Kota,2000,101,2009-11-20 00:00:00,2,1560) 
(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500) 
(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000) 
(2,Khilan,25,Delhi,1500,103,2008-05-20 00:00:00,4,2060) 
(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560) 
(2,Khilan,25,Delhi,1500,100,2009-10-08 00:00:00,3,1500)
(2,Khilan,25,Delhi,1500,102,2009-10-08 00:00:00,3,3000) 
(1,Ramesh,32,Ahmedabad,2000,103,2008-05-20 00:00:00,4,2060) 
(1,Ramesh,32,Ahmedabad,2000,101,2009-11-20 00:00:00,2,1560) 
(1,Ramesh,32,Ahmedabad,2000,100,2009-10-08 00:00:00,3,1500) 
(1,Ramesh,32,Ahmedabad,2000,102,2009-10-08 00:00:00,3,3000)  

Apache Pig - Union 操作符

Pig Latin的UNION运算符用于合并两个关系的内容。要在两个关系上执行UNION操作,它们的列和域必须相同。

语法

下面是UNION运算符的语法。

grunt> Relation_name3 = UNION Relation_name1, Relation_name2;

示例

假设我们在HDFS的/pig_data/目录中具有两个文件,名为student_data1.txtstudent_data2.txt,如下所示。

Student_data1.txt

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata
003,Rajesh,Khanna,9848022339,Delhi
004,Preethi,Agarwal,9848022330,Pune
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai.

Student_data2.txt

7,Komal,Nayak,9848022334,trivendram.
8,Bharathi,Nambiayar,9848022333,Chennai.

并且我们已将这两个文件加载到Pig中,关系为student1student2,如下所示。

grunt> student1 = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data1.txt' USING PigStorage(',') 
   as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray); 
 
grunt> student2 = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_data2.txt' USING PigStorage(',') 
   as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);

现在,让我们使用UNION运算符合并这两个关系的内容,如下所示。

grunt> student = UNION student1, student2;

验证

使用DUMP运算符验证关系student,如下所示。

grunt> Dump student; 

输出

它将显示以下输出,显示关系student的内容。

(1,Rajiv,Reddy,9848022337,Hyderabad) (2,siddarth,Battacharya,9848022338,Kolkata)
(3,Rajesh,Khanna,9848022339,Delhi)
(4,Preethi,Agarwal,9848022330,Pune) 
(5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)
(6,Archana,Mishra,9848022335,Chennai) 
(7,Komal,Nayak,9848022334,trivendram) 
(8,Bharathi,Nambiayar,9848022333,Chennai)

Apache Pig - Split 操作符

SPLIT运算符用于将关系拆分为两个或多个关系。

语法

下面是SPLIT运算符的语法。

grunt> SPLIT Relation1_name INTO Relation2_name IF (condition1), Relation2_name (condition2),

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi 
004,Preethi,Agarwal,21,9848022330,Pune 
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 
006,Archana,Mishra,23,9848022335,Chennai 
007,Komal,Nayak,24,9848022334,trivendram 
008,Bharathi,Nambiayar,24,9848022333,Chennai

并且我们已将此文件加载到Pig中,关系名为student_details,如下所示。

student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray); 

现在,让我们将关系拆分为两个,一个列出年龄小于23的员工,另一个列出年龄在22到25之间的员工。

SPLIT student_details into student_details1 if age<23, student_details2 if (22<age and age>25);

验证

使用DUMP运算符验证关系student_details1student_details2,如下所示。

grunt> Dump student_details1;  

grunt> Dump student_details2; 

输出

它将生成以下输出,分别显示关系student_details1student_details2的内容。

grunt> Dump student_details1; 
(1,Rajiv,Reddy,21,9848022337,Hyderabad) 
(2,siddarth,Battacharya,22,9848022338,Kolkata)
(3,Rajesh,Khanna,22,9848022339,Delhi) 
(4,Preethi,Agarwal,21,9848022330,Pune)
  
grunt> Dump student_details2; 
(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar) 
(6,Archana,Mishra,23,9848022335,Chennai) 
(7,Komal,Nayak,24,9848022334,trivendram) 
(8,Bharathi,Nambiayar,24,9848022333,Chennai)

Apache Pig - Filter 操作符

FILTER运算符用于根据条件从关系中选择所需的元组。

语法

下面是FILTER运算符的语法。

grunt> Relation2_name = FILTER Relation1_name BY (condition);

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi 
004,Preethi,Agarwal,21,9848022330,Pune 
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 
006,Archana,Mishra,23,9848022335,Chennai 
007,Komal,Nayak,24,9848022334,trivendram 
008,Bharathi,Nambiayar,24,9848022333,Chennai

并且我们已将此文件加载到Pig中,关系名为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);

现在,让我们使用Filter运算符获取属于Chennai市的学生的详细信息。

filter_data = FILTER student_details BY city == 'Chennai';

验证

使用DUMP运算符验证关系filter_data,如下所示。

grunt> Dump filter_data;

输出

它将生成以下输出,显示关系filter_data的内容,如下所示。

(6,Archana,Mishra,23,9848022335,Chennai)
(8,Bharathi,Nambiayar,24,9848022333,Chennai)

Apache Pig - Distinct 操作符

DISTINCT运算符用于从关系中删除冗余(重复)元组。

语法

下面是DISTINCT运算符的语法。

grunt> Relation_name2 = DISTINCT Relatin_name1;

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,9848022337,Hyderabad
002,siddarth,Battacharya,9848022338,Kolkata 
002,siddarth,Battacharya,9848022338,Kolkata 
003,Rajesh,Khanna,9848022339,Delhi 
003,Rajesh,Khanna,9848022339,Delhi 
004,Preethi,Agarwal,9848022330,Pune 
005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar
006,Archana,Mishra,9848022335,Chennai 
006,Archana,Mishra,9848022335,Chennai

并且我们已将此文件加载到Pig中,关系名为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',') 
   as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);

现在,让我们使用DISTINCT运算符从名为student_details的关系中删除冗余(重复)元组,并将其存储为另一个名为distinct_data的关系,如下所示。

grunt> distinct_data = DISTINCT student_details;

验证

使用DUMP运算符验证关系distinct_data,如下所示。

grunt> Dump distinct_data;

输出

它将生成以下输出,显示关系distinct_data的内容,如下所示。

(1,Rajiv,Reddy,9848022337,Hyderabad)
(2,siddarth,Battacharya,9848022338,Kolkata) 
(3,Rajesh,Khanna,9848022339,Delhi) 
(4,Preethi,Agarwal,9848022330,Pune) 
(5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)
(6,Archana,Mishra,9848022335,Chennai)

Apache Pig - Foreach 操作符

FOREACH运算符用于根据列数据生成指定的 데이터 변환。

语法

下面是FOREACH运算符的语法。

grunt> Relation_name2 = FOREACH Relatin_name1 GENERATE (required data);

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi 
004,Preethi,Agarwal,21,9848022330,Pune 
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 
006,Archana,Mishra,23,9848022335,Chennai 
007,Komal,Nayak,24,9848022334,trivendram 
008,Bharathi,Nambiayar,24,9848022333,Chennai

并且我们已将此文件加载到Pig中,关系名为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在,让我们从关系student_details中获取每个学生的id、age和city值,并使用foreach运算符将其存储到另一个名为foreach_data的关系中,如下所示。

grunt> foreach_data = FOREACH student_details GENERATE id,age,city;

验证

使用DUMP运算符验证关系foreach_data,如下所示。

grunt> Dump foreach_data;

输出

它将生成以下输出,显示关系foreach_data的内容。

(1,21,Hyderabad)
(2,22,Kolkata)
(3,22,Delhi)
(4,21,Pune) 
(5,23,Bhuwaneshwar)
(6,23,Chennai) 
(7,24,trivendram)
(8,24,Chennai) 

Apache Pig - Order By

ORDER BY运算符用于根据一个或多个字段以排序的顺序显示关系的内容。

语法

下面是ORDER BY运算符的语法。

grunt> Relation_name2 = ORDER Relatin_name1 BY (ASC|DESC);

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi 
004,Preethi,Agarwal,21,9848022330,Pune 
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 
006,Archana,Mishra,23,9848022335,Chennai 
007,Komal,Nayak,24,9848022334,trivendram 
008,Bharathi,Nambiayar,24,9848022333,Chennai

并且我们已将此文件加载到Pig中,关系名为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在,让我们根据学生的年龄以降序对关系进行排序,并使用ORDER BY运算符将其存储到另一个名为order_by_data的关系中,如下所示。

grunt> order_by_data = ORDER student_details BY age DESC;

验证

使用DUMP运算符验证关系order_by_data,如下所示。

grunt> Dump order_by_data; 

输出

它将生成以下输出,显示关系order_by_data的内容。

(8,Bharathi,Nambiayar,24,9848022333,Chennai)
(7,Komal,Nayak,24,9848022334,trivendram)
(6,Archana,Mishra,23,9848022335,Chennai) 
(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)
(3,Rajesh,Khanna,22,9848022339,Delhi) 
(2,siddarth,Battacharya,22,9848022338,Kolkata)
(4,Preethi,Agarwal,21,9848022330,Pune) 
(1,Rajiv,Reddy,21,9848022337,Hyderabad)

Apache Pig - Limit 操作符

LIMIT运算符用于从关系中获取有限数量的元组。

语法

下面是LIMIT运算符的语法。

grunt> Result = LIMIT Relation_name required number of tuples;

示例

假设我们在 HDFS 目录/pig_data/中有一个名为student_details.txt的文件,内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi 
004,Preethi,Agarwal,21,9848022330,Pune 
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 
006,Archana,Mishra,23,9848022335,Chennai 
007,Komal,Nayak,24,9848022334,trivendram 
008,Bharathi,Nambiayar,24,9848022333,Chennai

并且我们已将此文件加载到Pig中,关系名为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在,让我们根据学生的年龄以降序对关系进行排序,并使用ORDER BY运算符将其存储到另一个名为limit_data的关系中,如下所示。

grunt> limit_data = LIMIT student_details 4; 

验证

使用DUMP运算符验证关系limit_data,如下所示。

grunt> Dump limit_data; 

输出

它将生成以下输出,显示关系limit_data的内容,如下所示。

(1,Rajiv,Reddy,21,9848022337,Hyderabad) 
(2,siddarth,Battacharya,22,9848022338,Kolkata) 
(3,Rajesh,Khanna,22,9848022339,Delhi) 
(4,Preethi,Agarwal,21,9848022330,Pune) 

Apache Pig - Eval 函数

Apache Pig提供了各种内置函数,即eval、load、store、math、string、bagtuple函数。

Eval函数

下面是Apache Pig提供的eval函数列表。

序号 函数和描述
1 AVG()

计算包中数值的平均值。

2 BagToString()

将包的元素连接成一个字符串。连接时,可以在这些值之间放置分隔符(可选)。

3 CONCAT()

连接两个或多个相同类型的表达式。

4 COUNT()

获取包中元素的数量,同时计算包中元组的数量。

5 COUNT_STAR()

它类似于COUNT()函数。它用于获取包中元素的数量。

6 DIFF()

比较元组中的两个包(字段)。

7 IsEmpty()

检查包或映射是否为空。

8 MAX()

计算单列包中列(数值或chararrays)的最高值。

9 MIN()

获取单列包中特定列的最小(最低)值(数值或chararray)。

10 PluckTuple()

使用Pig Latin PluckTuple()函数,我们可以定义一个字符串前缀并过滤关系中以给定前缀开头的列。

11 SIZE()

根据任何Pig数据类型计算元素的数量。

12 SUBTRACT()

减去两个包。它以两个包作为输入,并返回一个包,其中包含第一个包中不在第二个包中的元组。

13 SUM()

获取单列包中列的数值总和。

14 TOKENIZE()

将字符串(包含一组单词)拆分为单个元组,并返回一个包含拆分操作输出的包。

Apache Pig - 加载和存储函数

Apache Pig中的LoadStore函数用于确定数据如何进入和退出Pig。这些函数与load和store运算符一起使用。下面是Pig中可用的load和store函数列表。

序号 函数和描述
1 PigStorage()

加载和存储结构化文件。

2 TextLoader()

将非结构化数据加载到Pig中。

3 BinStorage()

使用机器可读格式将数据加载和存储到Pig中。

4 处理压缩

在Pig Latin中,我们可以加载和存储压缩数据。

Apache Pig - Bag & Tuple 函数

下面是Bag和Tuple函数的列表。

序号 函数和描述
1 TOBAG()

将两个或多个表达式转换为包。

2 TOP()

获取关系的前N个元组。

3 TOTUPLE()

将一个或多个表达式转换为元组。

4 TOMAP()

将键值对转换为映射。

Apache Pig - 字符串函数

在Apache Pig中,我们有以下字符串函数。

序号 函数和描述
1 ENDSWITH(string, testAgainst)

验证给定字符串是否以特定子字符串结尾。

2 STARTSWITH(string, substring)

接受两个字符串参数,并验证第一个字符串是否以第二个字符串开头。

3 SUBSTRING(string, startIndex, stopIndex)

从给定字符串返回子字符串。

4 EqualsIgnoreCase(string1, string2)

比较两个字符串,忽略大小写。

5 INDEXOF(string, ‘character’, startIndex)

返回字符串中字符的第一次出现,从起始索引向前搜索。

6 LAST_INDEX_OF(expression)

从起始索引向后搜索,返回字符在字符串中最后一次出现的索引。

7 LCFIRST(expression)

将字符串中的第一个字符转换为小写。

8 UCFIRST(expression)

返回一个字符串,其中第一个字符转换为大写。

9 UPPER(expression)

UPPER(expression) 返回转换为大写的字符串。

10 LOWER(expression)

将字符串中的所有字符转换为小写。

11 REPLACE(string, ‘oldChar’, ‘newChar’);

用新字符替换字符串中现有的字符。

12 STRSPLIT(string, regex, limit)

根据给定的正则表达式拆分字符串。

13 STRSPLITTOBAG(string, regex, limit)

类似于STRSPLIT()函数,它根据给定的分隔符拆分字符串,并将结果返回到一个包中。

14 TRIM(expression)

返回字符串的副本,其中删除了前导和尾随空格。

15 LTRIM(expression)

返回字符串的副本,其中删除了前导空格。

16 RTRIM(expression)

返回字符串的副本,其中删除了尾随空格。

Apache Pig - 日期时间函数

Apache Pig 提供以下日期和时间函数:

序号 函数和描述
1 ToDate(milliseconds)

此函数根据给定的参数返回一个日期时间对象。此函数的其他替代方法为 ToDate(iosstring)、ToDate(userstring, format)、ToDate(userstring, format, timezone)

2 CurrentTime()

返回当前时间的日期时间对象。

3 GetDay(datetime)

从日期时间对象返回一个月的日期。

4 GetHour(datetime)

从日期时间对象返回一天的小时。

5 GetMilliSecond(datetime)

从日期时间对象返回一秒的毫秒数。

6 GetMinute(datetime)

从日期时间对象返回一小时的分钟数。

7 GetMonth(datetime)

从日期时间对象返回一年中的月份。

8 GetSecond(datetime)

从日期时间对象返回一分钟的秒数。

9 GetWeek(datetime)

从日期时间对象返回一年中的周数。

10 GetWeekYear(datetime)

从日期时间对象返回周年的年份。

11 GetYear(datetime)

从日期时间对象返回年份。

12 AddDuration(datetime, duration)

返回日期时间对象与持续时间对象的组合结果。

13 SubtractDuration(datetime, duration)

从日期时间对象中减去持续时间对象并返回结果。

14 DaysBetween(datetime1, datetime2)

返回两个日期时间对象之间的天数。

15 HoursBetween(datetime1, datetime2)

返回两个日期时间对象之间的小时数。

16 MilliSecondsBetween(datetime1, datetime2)

返回两个日期时间对象之间的毫秒数。

17 MinutesBetween(datetime1, datetime2)

返回两个日期时间对象之间的分钟数。

18 MonthsBetween(datetime1, datetime2)

返回两个日期时间对象之间的月数。

19 SecondsBetween(datetime1, datetime2)

返回两个日期时间对象之间的秒数。

20 WeeksBetween(datetime1, datetime2)

返回两个日期时间对象之间的周数。

21 YearsBetween(datetime1, datetime2)

返回两个日期时间对象之间的年数。

Apache Pig - 数学函数

Apache Pig 中有以下数学函数:

序号 函数和描述
1 ABS(expression)

获取表达式的绝对值。

2 ACOS(expression)

获取表达式的反余弦。

3 ASIN(expression)

获取表达式的反正弦。

4 ATAN(expression)

此函数用于获取表达式的反正切。

5 CBRT(expression)

此函数用于获取表达式的立方根。

6 CEIL(expression)

此函数用于获取表达式四舍五入到最接近的整数的值。

7 COS(expression)

此函数用于获取表达式的三角余弦。

8 COSH(expression)

此函数用于获取表达式的双曲余弦。

9 EXP(expression)

此函数用于获取欧拉数 e 的 x 次幂。

10 FLOOR(expression)

获取表达式四舍五入到最接近的整数的值。

11 LOG(expression)

获取表达式的自然对数(以 e 为底)。

12 LOG10(expression)

获取表达式的以 10 为底的对数。

13 RANDOM( )

获取大于或等于 0.0 且小于 1.0 的伪随机数(类型为 double)。

14 ROUND(expression)

获取表达式四舍五入到整数(如果结果类型为 float)或四舍五入到长整数(如果结果类型为 double)的值。

15 SIN(expression)

获取表达式的正弦。

16 SINH(expression)

获取表达式的双曲正弦。

17 SQRT(expression)

获取表达式的正平方根。

18 TAN(expression)

获取角度的三角正切。

19 TANH(expression)

获取表达式的双曲正切。

Apache Pig - 用户自定义函数

除了内置函数外,Apache Pig 还广泛支持用户自定义函数 (UDF)。使用这些 UDF,我们可以定义自己的函数并使用它们。UDF 支持以六种编程语言提供,即 Java、Jython、Python、JavaScript、Ruby 和 Groovy。

在编写 UDF 时,Java 提供了完整的支持,而在所有其他语言中则提供了有限的支持。使用 Java,您可以编写涉及所有处理部分的 UDF,例如数据加载/存储、列转换和聚合。由于 Apache Pig 是用 Java 编写的,因此与其他语言相比,使用 Java 语言编写的 UDF 工作效率更高。

在 Apache Pig 中,我们还有一个名为Piggybank的 Java UDF 存储库。使用 Piggybank,我们可以访问其他用户编写的 Java UDF,并贡献我们自己的 UDF。

Java 中的 UDF 类型

在使用 Java 编写 UDF 时,我们可以创建和使用以下三种类型的函数:

  • 过滤器函数 - 过滤器函数用作过滤器语句中的条件。这些函数接受 Pig 值作为输入并返回布尔值。

  • Eval 函数 - Eval 函数用于 FOREACH-GENERATE 语句中。这些函数接受 Pig 值作为输入并返回 Pig 结果。

  • 代数函数 - 代数函数作用于 FOREACHGENERATE 语句中的内部包。这些函数用于对内部包执行完整的 MapReduce 操作。

使用 Java 编写 UDF

要使用 Java 编写 UDF,我们必须集成 jar 文件Pig-0.15.0.jar。在本节中,我们将讨论如何使用 Eclipse 编写一个示例 UDF。在继续之前,请确保您已在系统中安装了 Eclipse 和 Maven。

按照以下步骤编写 UDF 函数:

  • 打开 Eclipse 并创建一个新项目(例如myproject)。

  • 将新创建的项目转换为 Maven 项目。

  • 将以下内容复制到 pom.xml 中。此文件包含 Apache Pig 和 Hadoop-core jar 文件的 Maven 依赖项。

<project xmlns = "http://maven.apache.org/POM/4.0.0"
   xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0http://maven.apache .org/xsd/maven-4.0.0.xsd"> 
	
   <modelVersion>4.0.0</modelVersion> 
   <groupId>Pig_Udf</groupId> 
   <artifactId>Pig_Udf</artifactId> 
   <version>0.0.1-SNAPSHOT</version>
	
   <build>    
      <sourceDirectory>src</sourceDirectory>    
      <plugins>      
         <plugin>        
            <artifactId>maven-compiler-plugin</artifactId>        
            <version>3.3</version>        
            <configuration>          
               <source>1.7</source>          
               <target>1.7</target>        
            </configuration>      
         </plugin>    
      </plugins>  
   </build>
	
   <dependencies> 
	
      <dependency>            
         <groupId>org.apache.pig</groupId>            
         <artifactId>pig</artifactId>            
         <version>0.15.0</version>     
      </dependency> 
		
      <dependency>        
         <groupId>org.apache.hadoop</groupId>            
         <artifactId>hadoop-core</artifactId>            
         <version>0.20.2</version>     
      </dependency> 
      
   </dependencies>  
	
</project>
  • 保存文件并刷新它。在Maven 依赖项部分,您可以找到下载的 jar 文件。

  • 创建一个名为Sample_Eval的新类文件,并将以下内容复制到其中。

import java.io.IOException; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.Tuple; 
 
import java.io.IOException; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.Tuple;

public class Sample_Eval extends EvalFunc<String>{ 

   public String exec(Tuple input) throws IOException {   
      if (input == null || input.size() == 0)      
      return null;      
      String str = (String)input.get(0);      
      return str.toUpperCase();  
   } 
}

在编写 UDF 时,必须继承 EvalFunc 类并为exec()函数提供实现。在此函数中,编写了 UDF 所需的代码。在上面的示例中,我们返回了将给定列的内容转换为大写的代码。

  • 在编译类没有错误后,右键单击 Sample_Eval.java 文件。它会为您提供一个菜单。选择导出,如下面的屏幕截图所示。

Select export
  • 单击导出后,您将看到以下窗口。单击JAR 文件

Click on Export
  • 通过单击下一步>按钮继续。您将看到另一个窗口,您需要在本地文件系统中输入路径,您需要将 jar 文件存储在该路径中。

jar export
  • 最后单击完成按钮。在指定的文件夹中,将创建一个 Jar 文件sample_udf.jar。此 jar 文件包含用 Java 编写的 UDF。

使用 UDF

编写 UDF 并生成 Jar 文件后,请按照以下步骤操作:

步骤 1:注册 Jar 文件

在编写 UDF(在 Java 中)后,我们必须使用 Register 运算符注册包含 UDF 的 Jar 文件。通过注册 Jar 文件,用户可以将 UDF 的位置通知 Apache Pig。

语法

以下是 Register 运算符的语法。

REGISTER path; 

示例

例如,让我们注册本章前面创建的 sample_udf.jar。

在本地模式下启动 Apache Pig 并注册 jar 文件 sample_udf.jar,如下所示。

$cd PIG_HOME/bin 
$./pig –x local 

REGISTER '/$PIG_HOME/sample_udf.jar'

注意 - 假设 Jar 文件位于路径 - /$PIG_HOME/sample_udf.jar 中

步骤 2:定义别名

注册 UDF 后,我们可以使用Define运算符为其定义别名。

语法

以下是 Define 运算符的语法。

DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] }; 

示例

为 sample_eval 定义别名,如下所示。

DEFINE sample_eval sample_eval();

步骤 3:使用 UDF

定义别名后,您可以像使用内置函数一样使用 UDF。假设 HDFS /Pig_Data/目录中有一个名为 emp_data 的文件,其内容如下。

001,Robin,22,newyork
002,BOB,23,Kolkata
003,Maya,23,Tokyo
004,Sara,25,London 
005,David,23,Bhuwaneshwar 
006,Maggy,22,Chennai
007,Robert,22,newyork
008,Syam,23,Kolkata
009,Mary,25,Tokyo
010,Saran,25,London 
011,Stacy,25,Bhuwaneshwar 
012,Kelly,22,Chennai

并假设我们已将此文件加载到 Pig 中,如下所示。

grunt> emp_data = LOAD 'hdfs://127.0.0.1:9000/pig_data/emp1.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, city:chararray);

现在让我们使用 UDF sample_eval将员工姓名转换为大写。

grunt> Upper_case = FOREACH emp_data GENERATE sample_eval(name);

验证关系Upper_case的内容,如下所示。

grunt> Dump Upper_case;
  
(ROBIN)
(BOB)
(MAYA)
(SARA)
(DAVID)
(MAGGY)
(ROBERT)
(SYAM)
(MARY)
(SARAN)
(STACY)
(KELLY)

Apache Pig - 运行脚本

在本节中,我们将了解如何在批处理模式下运行 Apache Pig 脚本。

Pig 脚本中的注释

在文件中编写脚本时,我们可以像下面这样在其中包含注释。

多行注释

我们将以 '/*' 开头多行注释,以 '*/' 结束它们。

/* These are the multi-line comments 
  In the pig script */ 

单行注释

我们将以 '--' 开头单行注释。

--we can write single line comments like this.

在批处理模式下执行 Pig 脚本

在批处理模式下执行 Apache Pig 语句时,请按照以下步骤操作。

步骤 1

将所有必需的 Pig Latin 语句写入单个文件。我们可以在单个文件中编写所有 Pig Latin 语句和命令,并将其保存为.pig文件。

步骤 2

执行 Apache Pig 脚本。您可以从 shell(Linux)执行 Pig 脚本,如下所示。

本地模式 MapReduce 模式
$ pig -x local Sample_script.pig $ pig -x mapreduce Sample_script.pig

您也可以使用 exec 命令从 Grunt shell 执行它,如下所示。

grunt> exec /sample_script.pig

从 HDFS 执行 Pig 脚本

我们还可以执行驻留在 HDFS 中的 Pig 脚本。假设 HDFS 目录/pig_data/中有一个名为Sample_script.pig的 Pig 脚本。我们可以像下面这样执行它。

$ pig -x mapreduce hdfs://127.0.0.1:9000/pig_data/Sample_script.pig 

示例

假设我们在 HDFS 中有一个名为student_details.txt的文件,其内容如下。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad 
002,siddarth,Battacharya,22,9848022338,Kolkata
003,Rajesh,Khanna,22,9848022339,Delhi 
004,Preethi,Agarwal,21,9848022330,Pune 
005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 
006,Archana,Mishra,23,9848022335,Chennai 
007,Komal,Nayak,24,9848022334,trivendram 
008,Bharathi,Nambiayar,24,9848022333,Chennai

我们还在同一个 HDFS 目录中有一个名为sample_script.pig的示例脚本。此文件包含对student关系执行操作和转换的语句,如下所示。

student = LOAD 'hdfs://127.0.0.1:9000/pig_data/student_details.txt' USING PigStorage(',')
   as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);
	
student_order = ORDER student BY age DESC;
  
student_limit = LIMIT student_order 4;
  
Dump student_limit;
  • 脚本的第一条语句将名为student_details.txt的文件中的数据加载为名为student的关系。

  • 脚本的第二条语句将根据年龄按降序排列关系的元组,并将其存储为student_order

  • 脚本的第三条语句将student_order的前 4 个元组存储为student_limit

  • 最后,第四条语句将转储关系student_limit的内容。

现在让我们执行sample_script.pig,如下所示。

$./pig -x mapreduce hdfs://127.0.0.1:9000/pig_data/sample_script.pig

Apache Pig 将执行并为您提供以下内容的输出。

(7,Komal,Nayak,24,9848022334,trivendram)
(8,Bharathi,Nambiayar,24,9848022333,Chennai) 
(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar) 
(6,Archana,Mishra,23,9848022335,Chennai)
2015-10-19 10:31:27,446 [main] INFO  org.apache.pig.Main - Pig script completed in 12
minutes, 32 seconds and 751 milliseconds (752751 ms)
广告