hive 自定义函数UDF,UDAF


自定义函数

在hive中,有时候一些内置的函数,和普通的查询操作已经满足不了我们要查询的要求,这时候可以自己写一些自定义函数来处理。自定义函数(user defined function =UDF)

由于hive本身是用java语言开发,所以udf必须用java来写才可以。

Hive中有三种UDF

1.      普通udf(UDF)
操作单个数据行,且产生一个数据作为输出。例如(数学函数,字符串函数)

2.      聚合udf (UDAF)
接受多个数据行,并产生一个数据行作为输出。例如(COUNT,MAX函数等)

3.      表生成UDF(UDTF)
接受一个数据行,然后返回产生多个数据行(一个表作为输出)。比如lateral view(据说是一个将行转成列的函数)。

编写UDF

编写UDF必须满足一下:

1.      必须是org.apache.hadoop.hive.ql.exec.UDF的子类

2.      必须实现evaluate函数。


 

 

 

1.      strip UDF

java code

package com.hcr.hadoop.hive;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class Strip extends UDF {
public Text evaluate(String str) {
return str == null ? null : new Text(StringUtils.strip(str));
}
public Text evaluate(String str,String chrStr) {
return str == null ? null : new Text(StringUtils.strip(str,chrStr));
}
}


写完代码打成jar包hcr.jar

 

hive> add jar/root/hcr/tmp/hcr.jar; 
Added /root/hcr/tmp/hcr.jar to class path
Added resource: /root/hcr/tmp/hcr.jar


 

声明strip函数

create temporary function  strip as 'com.hcr.hadoop.hive.Strip';
select year,strip(name,'ruishen') from records2;

原数据

1990    ruishenh0
1992 ruishenh2
1991 ruishenh1
1993 ruishenh3
1994 ruishenh4
1995 ruishenh5
1996 ruishenh6
1997 ruishenh7
1998 ruishenh8

执行完后显示


1990    0
1992 2
1991 1
1993 3
1994 4
1995 5
1996 6
1997 7
1998 8
 

编写UDAF

编写UDAF必须满足一下:

1.      必须是org.apache.hadoop.hive.ql.exec.UDAF的子类

2.      且包含一个或多个嵌套的,实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类

3.      如果是计算函数必须包含如下5个函数

a)        Init()
初始化计算函数和内部数据结构状态等。

b)        Iterate()
每一个新值调用聚集计算时都会调用这个函数。计算函数要聚集计算的结果更新其内部状态,iterate函数接受的参数和hive中被调用函数的参数是对应的。

c)        terminatePartial()
这个就是取计算到当前(局部)的时候的数据对象函数。(比如1-10。计算5的时候要调用一下这个函数查看一下当前的内部结构对象也就是1-5的聚合结果)

d)        merge()
在hive决定要合并一个部分聚集值和另一个部分聚集值是会调用merge()方法,该方法接受一个对象输入,这个对象的类型必须和terminatePartial()返回的一致。

e)        terminate()
hive需要最终聚集结果时会调用terminate方法,计算函数需要把状态作为一个值返回。

 

实例UDAF(一)

package com.hcr.hadoop.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;

public class Maxinum extends UDAF {

public static class MaxinumIntUDAFEvaluator implements UDAFEvaluator {

private IntWritable result;

@Override
public void init() {
result = null;
}

public boolean iterate(IntWritable value) {
if (value == null) {
return true;
}
if (result == null) {
result = new IntWritable(value.get());
} else {
System.out.println("result:"+result+",value:"+value);
result.set(Math.max(result.get(), value.get()));
}
return true;
}

public IntWritable terminatePartial() {
return result;
}

public boolean merge(IntWritable value) {
System.out.println("merge-result:"+result);
return iterate(value);
}

public IntWritable terminate() {
System.out.println("terminate-result:"+result);
return result;
}
}

}
 

hive> add jar /root/hcr/tmp/hcr.jar;
Added /root/hcr/tmp/hcr.jar to class path
Added resource: /root/hcr/tmp/hcr.jar
hive> create temporary function maxinum as 'com.hcr.hadoop.hive.Maxinum';
OK
Time taken: 0.192 seconds
hive> select maxinum(temperature) fromrecords;
58

实例UDAF(二)

 

package com.hcr.hadoop.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;

public class Mean extends UDAF {

public static class MeanDoubleUDAFEvaluator implements UDAFEvaluator {

public static class PartialResult {
double sum;
long count;
}

private PartialResult partial;

@Override
public void init() {
partial = null;
}

public boolean iterate(DoubleWritable value) {
if (value == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
}
partial.sum += value.get();
partial.count++;
return true;
}

public PartialResult terminatePartial() {
return partial;
}

public boolean merge(PartialResult value) {
if (value == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
}
partial.sum += value.sum;
partial.count += value.count;
return true;
}

public DoubleWritable terminate() {
if (partial == null)
return null;
return new DoubleWritable(partial.sum / partial.count);
}
}

}


 

hive> add jar /root/hcr/tmp/hcr.jar;                               
Added /root/hcr/tmp/hcr.jar to class path
Added resource: /root/hcr/tmp/hcr.jar
hive> create temporary function mean as'com.hcr.hadoop.hive.Mean';
OK
Time taken: 0.198 seconds
hive>ALTER TABLE records CHANGE COLUMN temperature temperature double;
hive> select mean(temperature) from records;
OK
43.916666666666664
Time taken: 20.769 seconds
hive>

 

关于很多系统内置的UDAF函数可以在下边这个类中查看注册的UDF和UDAF等

org.apache.hadoop.hive.ql.exec.FunctionRegistry

参照hive内部的写法

先写一个resolver类,然后便写evaluator(GenericUDAFEvaluator的子类)类

参靠:

http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

https://cwiki.apache.org/confluence/display/Hive/GenericUDAFCaseStudy#GenericUDAFCaseStudy-Preliminaries


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
© 2014-2019 ITdaan.com 粤ICP备14056181号