0%

自定义函数

自定义函数

有时候在使用sql语句的时候会用到函数来对查出来的数据进行再次的操作,可以使用自定义函数来实现自定义的功能

UDF自定义函数

可以使用spark.udf来进行注册添加自定义函数

1
2
3
4
5
6
// 自定义函数名为prefix
spark.udf.register("prefix",(name:String,prefix:String) =>{
prefix+":"+name
})
// 在使用该函数的时候也要使用注册的函数名来进行使用
spark.sql("select prefix(name,'name') from user").show()

UDAF自定义函数

UDAF是聚合函数,本身就提供了一些聚合函数,如count(),avg(),max()等,除此之外还可以进行自定义聚合函数,通过继承Aggregator来实现自定义的聚合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Aggregator[-IN, BUF, OUT]()
// 三个泛型分别为输入类型,Buf类型,输出类型
class AgeAvg1 extends Aggregator[Long,Buf,Double]{
// 缓冲区的初始值
override def zero: Buf = {
Buf(0,0)

}

// 聚合到缓冲区
override def reduce(b: Buf, a: Long): Buf = {
b.ageTotal += a
b.count += 1
b
}

// 缓冲区merge
override def merge(b1: Buf, b2: Buf): Buf = {
b1.ageTotal += b2.ageTotal
b1.count += b2.count
b1

}

// 最终结果
override def finish(reduction: Buf): Double = {
reduction.ageTotal.toDouble / reduction.count

}

// buffer的编码
override def bufferEncoder: Encoder[Buf] = {
Encoders.product
}

// 输出的编码
override def outputEncoder: Encoder[Double] = {
Encoders.scalaDouble
}
}

case class Buf(var ageTotal : Long,var count:Long)

使用自定义的函数

1
2
3
4
5
// 使用dataFrame的方式
dataFrame.createOrReplaceTempView("user")

spark.udf.register("ageAvg",functions.udaf(new AgeAvg1))
spark.sql("select ageAvg(age) from user").show()

如果想要使用dataSet的方式来使用自定义函数的话,此时该自定义函数的入参就不能是age了,而需要改成User

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
case class Buf(var ageTotal : Long,var count:Long)


class AgeAvg extends Aggregator[User,Buf,Double]{
// 缓冲区的初始值
override def zero: Buf = {
Buf(0,0)

}

// 聚合到缓冲区
override def reduce(b: Buf, a: User): Buf = {
b.ageTotal += a.age
b.count += 1
b
}

// 缓冲区merge
override def merge(b1: Buf, b2: Buf): Buf = {
b1.ageTotal += b2.ageTotal
b1.count += b2.count
b1

}

// 最终结果
override def finish(reduction: Buf): Double = {
reduction.ageTotal.toDouble / reduction.count

}

// buffer的编码
override def bufferEncoder: Encoder[Buf] = {
Encoders.product
}

// 输出的编码
override def outputEncoder: Encoder[Double] = {
Encoders.scalaDouble
}
}

使用DataSet来调用自定义函数

1
2
3
4
5
6
7
8
9
// 使用dataSet的方式
val dataSet = dataFrame.as[User]

// 构造聚合函数
val ageAvg = new AgeAvg
// 将聚合函数转为可查询的列
val avgColumn = ageAvg.toColumn

dataSet.select(avgColumn).show()