博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hive UDAF开发入门和运行过程详解(转)
阅读量:5939 次
发布时间:2019-06-19

本文共 6365 字,大约阅读时间需要 21 分钟。

介绍

hive的用户自定义聚合函数(UDAF)是一个很好的功能,集成了先进的数据处理。hive有两种UDAF:简单和通用。顾名思义,简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用,如可变长度参数列表。通用UDAF可以使用​​所有功能,但是UDAF就写的比较复杂,不直观。

本文只介绍通用UDAF。

UDAF是需要在hive的sql语句和group by联合使用,hive的group by对于每个分组,只能返回一条记录,这点和mysql不一样,切记。

 

UDAF开发概览

开发通用UDAF有两个步骤,第一个是编写resolver类,第二个是编写evaluator类。resolver负责类型检查,操作符重载。evaluator真正实现UDAF的逻辑。通常来说,顶层UDAF类继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,里面编写嵌套类evaluator 实现UDAF的逻辑。

 本文以Hive的内置UDAF sum函数的源代码作为示例讲解。

 

实现 resolver

resolver通常继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是我们更建议继承AbstractGenericUDAFResolver,隔离将来hive接口的变化。

GenericUDAFResolver和GenericUDAFResolver2接口的区别是,后面的允许evaluator实现可以访问更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。

public class GenericUDAFSum extends AbstractGenericUDAFResolver {  static final Log LOG = LogFactory.getLog(GenericUDAFSum.class.getName());  @Override  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)    throws SemanticException {    // Type-checking goes here!    return new GenericUDAFSumLong();   } public static class GenericUDAFSumLong extends GenericUDAFEvaluator {    // UDAF logic goes here!  } }

这个就是UDAF的代码骨架,第一行创建LOG对象,用来写入警告和错误到hive的log。GenericUDAFResolver只需要重写一个方法:getEvaluator,它根据SQL传入的参数类型,返回正确的evaluator。这里最主要是实现操作符的重载。

getEvaluator的完整代码如下:

public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)    throws SemanticException {    if (parameters.length != 1) {      throw new UDFArgumentTypeException(parameters.length - 1,          "Exactly one argument is expected.");    }    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {      throw new UDFArgumentTypeException(0,          "Only primitive type arguments are accepted but "          + parameters[0].getTypeName() + " is passed.");    }    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {    case BYTE:    case SHORT:    case INT:    case LONG:    case TIMESTAMP:      return new GenericUDAFSumLong();    case FLOAT:    case DOUBLE:    case STRING:      return new GenericUDAFSumDouble();    case BOOLEAN:    default:      throw new UDFArgumentTypeException(0,          "Only numeric or string type arguments are accepted but "          + parameters[0].getTypeName() + " is passed.");    }

这里做了类型检查,如果不是原生类型(即符合类型,array,map此类),则抛出异常,还实现了操作符重载,对于整数类型,使用GenericUDAFSumLong实现UDAF的逻辑,对于浮点类型,使用GenericUDAFSumDouble实现UDAF的逻辑。

 

实现evaluator

所有evaluators必须继承抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子类必须实现它的一些抽象方法,实现UDAF的逻辑。

GenericUDAFEvaluator有一个嵌套类Mode,这个类很重要,它表示了udaf在mapreduce的各个阶段,理解Mode的含义,就可以理解了hive的UDAF的运行流程。

public static enum Mode {    /**     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合     * 将会调用iterate()和terminatePartial()     */    PARTIAL1,        /**     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:     * 将会调用merge() 和 terminatePartial()      */    PARTIAL2,        /**     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合      * 将会调用merge()和terminate()     */    FINAL,        /**     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合      * 将会调用 iterate()和terminate()     */    COMPLETE  };

一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

下面以GenericUDAFSumLong的evaluator实现讲解

public static class GenericUDAFSumLong extends GenericUDAFEvaluator {private PrimitiveObjectInspector inputOI;    private LongWritable result;   //这个方法返回了UDAF的返回类型,这里确定了sum自定义函数的返回类型是Long类型    @Override    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {      assert (parameters.length == 1);      super.init(m, parameters);      result = new LongWritable(0);      inputOI = (PrimitiveObjectInspector) parameters[0];      return PrimitiveObjectInspectorFactory.writableLongObjectInspector;    }    /** 存储sum的值的类 */    static class SumLongAgg implements AggregationBuffer {      boolean empty;      long sum;    }    //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。    @Override    public AggregationBuffer getNewAggregationBuffer() throws HiveException {      SumLongAgg result = new SumLongAgg();      reset(result);      return result;    }        //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。    @Override    public void reset(AggregationBuffer agg) throws HiveException {      SumLongAgg myagg = (SumLongAgg) agg;      myagg.empty = true;      myagg.sum = 0;    }    private boolean warned = false;      //map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。    @Override    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {      assert (parameters.length == 1);      try {        merge(agg, parameters[0]);      } catch (NumberFormatException e) {        if (!warned) {          warned = true;          LOG.warn(getClass().getSimpleName() + " "              + StringUtils.stringifyException(e));        }      }    }   //mapper结束要返回的结果,还有combiner结束返回的结果    @Override    public Object terminatePartial(AggregationBuffer agg) throws HiveException {      return terminate(agg);    }        //combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。    @Override    public void merge(AggregationBuffer agg, Object partial) throws HiveException {      if (partial != null) {        SumLongAgg myagg = (SumLongAgg) agg;        myagg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputOI);        myagg.empty = false;      }    }         //reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。    @Override    public Object terminate(AggregationBuffer agg) throws HiveException {      SumLongAgg myagg = (SumLongAgg) agg;      if (myagg.empty) {        return null;      }      result.set(myagg.sum);      return result;    }  }

除了GenericUDAFSumLong,还有重载的GenericUDAFSumDouble,以上代码都在hive的源码:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。

 

注意

terminate()返回的数据类型要跟输入时的数据类型保持一致,不然会报错!

修改方法注册

修改 ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入编写的UDAF类,并注册名字。

FunctionRegistry类包含了hive的所有内置自定义函数。想要更好学习hive的UDAF,建议多看看里面的UDAF。

 

总结

本文的目的是为初学者入门学习udaf,所以介绍了udaf的概览,尤其是udaf的运行过程,这对初学者是比较大的槛。

考虑入门,本文简单介绍了sum的UDAF实现,但是如果想要更好理解UDAF的运行过程,建议再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF对hive的运行流程要控制的更加精细,并判断当前运行的Mode做一定的逻辑处理。

 

参考 

转自 

转载于:https://www.cnblogs.com/juefan/p/3584095.html

你可能感兴趣的文章
【转】Principles of training multi-layer neural network using backpropagation
查看>>
并查集hdu1232
查看>>
改动Androidproject的名称(非Eclipse重命名)
查看>>
tomcat work目录的作用就是编译每个项目里的jsp文件为java文件如果项目没有jsp页面则这个项目文件夹为空...
查看>>
dedecms后台左侧菜单500错误怎么处理
查看>>
Maven配置将war包部署到Tomcat(tomcat7-maven-plugin)
查看>>
Spring MVC学习-------------訪问到静态的文件
查看>>
Unity应用架构设计(11)——一个网络层的构建
查看>>
运行自己的shell脚本
查看>>
内存错误的类别
查看>>
Authentication 方案优化探索(JWT, Session, Refresh Token, etc.)
查看>>
Struts2 关于返回type="chain"的用法.
查看>>
Maven私服安装及配置——(十二)
查看>>
设计模式 - 迭代器模式(iterator pattern) 具体解释
查看>>
Codeforces554B:Ohana Cleans Up
查看>>
【java】jvm查看当前虚拟机堆大小限制
查看>>
python写入excel(xlswriter)--生成图表
查看>>
Sublime Text 2 和 Verilog HDL
查看>>
NetworkStream.write只能使用一次,后面再使用无效
查看>>
Android Studio离线打包5+SDK
查看>>