HCatalog - 输入输出格式



HCatInputFormatHCatOutputFormat 接口用于从 HDFS 读取数据,并在处理后使用 MapReduce 作业将结果数据写入 HDFS。让我们详细说明输入和输出格式接口。

HCatInputFormat

HCatInputFormat 用于 MapReduce 作业,从 HCatalog 管理的表中读取数据。HCatInputFormat 提供了一个 Hadoop 0.20 MapReduce API,用于读取数据,就像数据已发布到表中一样。

序号 方法名称和描述
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName) throws IOException

设置作业使用的输入。它使用给定的输入规范查询元数据存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

设置作业使用的输入。它使用给定的输入规范查询元数据存储,并将匹配的分区序列化到 MapReduce 任务的作业配置中。

3

public HCatInputFormat setFilter(String filter) throws IOException

在输入表上设置过滤器。

4

public HCatInputFormat setProperties(Properties properties) throws IOException

设置输入格式的属性。

HCatInputFormat API 包含以下方法:

  • setInput
  • setOutputSchema
  • getTableSchema

要使用 HCatInputFormat 读取数据,首先使用被读取表的必要信息实例化一个 InputJobInfo,然后使用 InputJobInfo 调用 setInput

您可以使用 setOutputSchema 方法包含一个 投影模式,以指定输出字段。如果没有指定模式,则将返回表中的所有列。您可以使用 getTableSchema 方法确定指定输入表的表模式。

HCatOutputFormat

HCatOutputFormat 用于 MapReduce 作业,将数据写入 HCatalog 管理的表。HCatOutputFormat 提供了一个 Hadoop 0.20 MapReduce API,用于将数据写入表。当 MapReduce 作业使用 HCatOutputFormat 写入输出时,将使用为表配置的默认 OutputFormat,并且作业完成后,新分区将发布到表中。

序号 方法名称和描述
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

设置作业写入输出的信息。它查询元数据服务器以查找要用于表的 StorageHandler。如果分区已发布,则会抛出错误。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

设置写入分区的数据模式。如果没有调用此方法,则默认使用表模式。

3

public RecordWriter , HCatRecord > getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException

获取作业的记录写入器。它使用 StorageHandler 的默认 OutputFormat 获取记录写入器。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

获取此输出格式的输出提交器。它确保正确提交输出。

HCatOutputFormat API 包含以下方法:

  • setOutput
  • setSchema
  • getTableSchema

对 HCatOutputFormat 的第一次调用必须是 setOutput;任何其他调用都会抛出异常,指出输出格式未初始化。

写入数据的模式由 setSchema 方法指定。必须调用此方法,提供正在写入的数据模式。如果您的数据与表模式具有相同的模式,则可以使用 HCatOutputFormat.getTableSchema() 获取表模式,然后将其传递给 setSchema()

示例

以下 MapReduce 程序从一个表读取数据(假设该表在第二列“column 1”中有一个整数),并计算找到的每个不同值的实例数。也就是说,它执行等效于“select col1, count(*) from $table group by col1;”的操作。

例如,如果第二列中的值为 {1, 1, 1, 3, 3, 5},则程序将生成以下值和计数输出:

1, 3
3, 2
5, 1

现在让我们来看一下程序代码:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在编译上述程序之前,您必须下载一些jar包并将它们添加到此应用程序的类路径中。您需要下载所有 Hive jar 包和 HCatalog jar 包(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令将这些jar文件从本地复制到HDFS,并将它们添加到类路径

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令编译并执行给定的程序。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

现在,检查您的输出目录(hdfs: user/tmp/hive)以查看输出(part_0000,part_0001)。

广告
© . All rights reserved.