自定义函数
在使用sql语句的时候会用到函数来对查出来的数据进行再次的操作,可以使用自定义函数来实现自定义的功能
UDF自定义函数
可以使用spark.udf来进行注册添加自定义函数
1 2 3 4 5 6
| spark.udf.register("prefix",(name:String,prefix:String) =>{ prefix+":"+name })
spark.sql("select prefix(name,'name') from user").show()
|
UDAF自定义函数
UDAF是聚合函数,本身就提供了一些聚合函数,如count(),avg(),max()等,除此之外还可以进行自定义聚合函数,通过继承Aggregator来实现自定义的聚合函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
class AgeAvg1 extends Aggregator[Long,Buf,Double]{ override def zero: Buf = { Buf(0,0)
}
override def reduce(b: Buf, a: Long): Buf = { b.ageTotal += a b.count += 1 b }
override def merge(b1: Buf, b2: Buf): Buf = { b1.ageTotal += b2.ageTotal b1.count += b2.count b1
}
override def finish(reduction: Buf): Double = { reduction.ageTotal.toDouble / reduction.count
}
override def bufferEncoder: Encoder[Buf] = { Encoders.product }
override def outputEncoder: Encoder[Double] = { Encoders.scalaDouble } }
case class Buf(var ageTotal : Long,var count:Long)
|
使用自定义的函数
1 2 3 4 5
| dataFrame.createOrReplaceTempView("user")
spark.udf.register("ageAvg",functions.udaf(new AgeAvg1)) spark.sql("select ageAvg(age) from user").show()
|
如果想要使用dataSet的方式来使用自定义函数的话,此时该自定义函数的入参就不能是age了,而需要改成User
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| case class Buf(var ageTotal : Long,var count:Long)
class AgeAvg extends Aggregator[User,Buf,Double]{ override def zero: Buf = { Buf(0,0)
}
override def reduce(b: Buf, a: User): Buf = { b.ageTotal += a.age b.count += 1 b }
override def merge(b1: Buf, b2: Buf): Buf = { b1.ageTotal += b2.ageTotal b1.count += b2.count b1
}
override def finish(reduction: Buf): Double = { reduction.ageTotal.toDouble / reduction.count
}
override def bufferEncoder: Encoder[Buf] = { Encoders.product }
override def outputEncoder: Encoder[Double] = { Encoders.scalaDouble } }
|
使用DataSet来调用自定义函数
1 2 3 4 5 6 7 8 9
| val dataSet = dataFrame.as[User]
val ageAvg = new AgeAvg
val avgColumn = ageAvg.toColumn
dataSet.select(avgColumn).show()
|