Apache Pig - 用户自定义函数



除了内置函数外,Apache Pig 还广泛支持**用户自定义函数**(UDF)。使用这些 UDF,我们可以定义自己的函数并使用它们。UDF 支持以六种编程语言提供,即 Java、Jython、Python、JavaScript、Ruby 和 Groovy。

对于编写 UDF,Java 提供了完整支持,其他所有语言都提供了有限的支持。使用 Java,您可以编写涉及所有处理部分的 UDF,例如数据加载/存储、列转换和聚合。由于 Apache Pig 是用 Java 编写的,因此与其他语言相比,使用 Java 语言编写的 UDF 效率更高。

在 Apache Pig 中,我们还有一个名为**Piggybank**的 Java UDF 存储库。使用 Piggybank,我们可以访问其他用户编写的 Java UDF,并贡献我们自己的 UDF。

Java 中的 UDF 类型

使用 Java 编写 UDF 时,我们可以创建和使用以下三种类型的函数:

  • **过滤函数** - 过滤函数用作 filter 语句中的条件。这些函数接受 Pig 值作为输入并返回布尔值。

  • **Eval 函数** - Eval 函数用于 FOREACH-GENERATE 语句中。这些函数接受 Pig 值作为输入并返回 Pig 结果。

  • **代数函数** - 代数函数作用于 FOREACHGENERATE 语句中的内部包。这些函数用于对内部包执行完整的 MapReduce 操作。

使用 Java 编写 UDF

要使用 Java 编写 UDF,我们必须集成 jar 文件**Pig-0.15.0.jar**。在本节中,我们将讨论如何使用 Eclipse 编写一个示例 UDF。在继续操作之前,请确保您已在系统中安装了 Eclipse 和 Maven。

按照以下步骤编写 UDF 函数:

  • 打开 Eclipse 并创建一个新项目(例如**myproject**)。

  • 将新创建的项目转换为 Maven 项目。

  • 将以下内容复制到 pom.xml 中。此文件包含 Apache Pig 和 Hadoop-core jar 文件的 Maven 依赖项。

<project xmlns = "http://maven.apache.org/POM/4.0.0"
   xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0http://maven.apache .org/xsd/maven-4.0.0.xsd"> 
	
   <modelVersion>4.0.0</modelVersion> 
   <groupId>Pig_Udf</groupId> 
   <artifactId>Pig_Udf</artifactId> 
   <version>0.0.1-SNAPSHOT</version>
	
   <build>    
      <sourceDirectory>src</sourceDirectory>    
      <plugins>      
         <plugin>        
            <artifactId>maven-compiler-plugin</artifactId>        
            <version>3.3</version>        
            <configuration>          
               <source>1.7</source>          
               <target>1.7</target>        
            </configuration>      
         </plugin>    
      </plugins>  
   </build>
	
   <dependencies> 
	
      <dependency>            
         <groupId>org.apache.pig</groupId>            
         <artifactId>pig</artifactId>            
         <version>0.15.0</version>     
      </dependency> 
		
      <dependency>        
         <groupId>org.apache.hadoop</groupId>            
         <artifactId>hadoop-core</artifactId>            
         <version>0.20.2</version>     
      </dependency> 
      
   </dependencies>  
	
</project>
  • 保存文件并刷新它。在**Maven 依赖项**部分,您可以找到下载的 jar 文件。

  • 创建一个名为**Sample_Eval**的新类文件,并将以下内容复制到其中。

import java.io.IOException; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.Tuple; 
 
import java.io.IOException; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.Tuple;

public class Sample_Eval extends EvalFunc<String>{ 

   public String exec(Tuple input) throws IOException {   
      if (input == null || input.size() == 0)      
      return null;      
      String str = (String)input.get(0);      
      return str.toUpperCase();  
   } 
}

在编写 UDF 时,必须继承 EvalFunc 类并为**exec()**函数提供实现。在此函数中,编写了 UDF 所需的代码。在上面的示例中,我们返回了将给定列的内容转换为大写的代码。

  • 在没有错误地编译类后,右键单击 Sample_Eval.java 文件。它会为您提供一个菜单。选择**导出**,如下面的屏幕截图所示。

Select export
  • 单击**导出**后,您将看到以下窗口。单击**JAR 文件**。

Click on Export
  • 通过单击**下一步>**按钮继续。您将看到另一个窗口,您需要在本地文件系统中输入路径,您需要在其中存储 jar 文件。

jar export
  • 最后单击**完成**按钮。在指定的文件夹中,将创建一个 Jar 文件**sample_udf.jar**。此 jar 文件包含用 Java 编写的 UDF。

使用 UDF

编写 UDF 并生成 Jar 文件后,请按照以下步骤操作:

步骤 1:注册 Jar 文件

在编写 UDF(在 Java 中)后,我们必须使用 Register 操作符注册包含 UDF 的 Jar 文件。通过注册 Jar 文件,用户可以将 UDF 的位置告知 Apache Pig。

语法

以下是 Register 操作符的语法。

REGISTER path; 

示例

例如,让我们注册本章前面创建的 sample_udf.jar。

以本地模式启动 Apache Pig 并注册 jar 文件 sample_udf.jar,如下所示。

$cd PIG_HOME/bin 
$./pig –x local 

REGISTER '/$PIG_HOME/sample_udf.jar'

**注意** - 假设 Jar 文件位于路径 - /$PIG_HOME/sample_udf.jar

步骤 2:定义别名

注册 UDF 后,我们可以使用**Define**操作符为其定义别名。

语法

以下是 Define 操作符的语法。

DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] }; 

示例

为 sample_eval 定义别名,如下所示。

DEFINE sample_eval sample_eval();

步骤 3:使用 UDF

定义别名后,您可以像使用内置函数一样使用 UDF。假设 HDFS ** /Pig_Data/**目录中有一个名为 emp_data 的文件,其内容如下所示。

001,Robin,22,newyork
002,BOB,23,Kolkata
003,Maya,23,Tokyo
004,Sara,25,London 
005,David,23,Bhuwaneshwar 
006,Maggy,22,Chennai
007,Robert,22,newyork
008,Syam,23,Kolkata
009,Mary,25,Tokyo
010,Saran,25,London 
011,Stacy,25,Bhuwaneshwar 
012,Kelly,22,Chennai

并假设我们已将此文件加载到 Pig 中,如下所示。

grunt> emp_data = LOAD 'hdfs://127.0.0.1:9000/pig_data/emp1.txt' USING PigStorage(',')
   as (id:int, name:chararray, age:int, city:chararray);

现在让我们使用 UDF **sample_eval**将员工姓名转换为大写。

grunt> Upper_case = FOREACH emp_data GENERATE sample_eval(name);

验证关系**Upper_case**的内容,如下所示。

grunt> Dump Upper_case;
  
(ROBIN)
(BOB)
(MAYA)
(SARA)
(DAVID)
(MAGGY)
(ROBERT)
(SYAM)
(MARY)
(SARAN)
(STACY)
(KELLY)
广告