SparkSQL自定义函数UDF、UDAF聚合函数以及开窗函数的使用

更新时间:2023-07-06 12:35:56 阅读: 评论:0

SparkSQL⾃定义函数UDF、UDAF聚合函数以及开窗函数的
使⽤
⼀、UDF的使⽤
1、Spark SQL⾃定义函数就是可以通过scala写⼀个类,然后在SparkSession上注册⼀个函数并对应这个类,然后在SQL语句中就可以使⽤该函数了,⾸先定义UDF函数,那么创建⼀个SqlUdf类,并且继承UDF1或UDF2等等,UDF后边的数字表⽰了当调⽤函数时会传⼊进来有⼏个参数,最后⼀个R则表⽰返回的数据类型,如下图所⽰:
2、这⾥选择继承UDF2,如下代码所⽰:
package com.udf
import org.apache.spark.sql.api.java.UDF2
class SqlUDF extends UDF2[String,Integer,String] {
override def call(t1: String, t2: Integer): String = {
t1+"_udf_test_"+t2
}
}
3、然后在SparkSession⽣成的对象上通过ister进⾏注册,如下代码所⽰:
val conf=new SparkConf().tAppName("AppUdf").tMaster("local")
val sparkSession=SparkSession.builder().config(conf).getOrCreate()
//指定函数名为:splicing_t1_t2 此函数名只有通过ister注册过之后才能够被使⽤,第⼆个参数是继承与UDF的类
predator//第三个参数是返回类型
ister("splicing_t1_t2",new SqlUDF,DataTypes.StringType)
nuclear waste
4、⽣成模拟数据,并注册⼀个临时表,如下代码所⽰:
var rows=Seq[Row]()
val random=new Random()
for(i <- 0 until 10){
warehou是什么意思val name="name"+i
val Int(30)%15+15
val row=Row(name,age)
rows +:=row
}
val rowsRDD=sparkSession.sparkContext.parallelize(rows)
val ateStructType(Array[StructField](
加勒比海盗 剧情
)
val ateDataFrame(rowsRDD,schema)
英文请假条
df.show()
输出 结果如下图所⽰:
5、在sql语句中使⽤⾃定义函数splicing_t1_t2,然后将函数的返回结果定义⼀个别名name_age,如下代码所⽰:
val sql="SELECT name,age,splicing_t1_t2(name,age) name_age FROM person"
sparkSession.sql(sql).show()
输出结果如下:
6、由此可以看到在⾃定义的UDF类中,想如何操作都可以了,完整代码如下;
package com.udf
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.pes.{DataTypes, StructField}
import scala.util.Random
object AppUdf {
def main(args:Array[String]):Unit={
val conf=new SparkConf().tAppName("AppUdf").tMaster("local")
val sparkSession=SparkSession.builder().config(conf).getOrCreate()
//指定函数名为:splicing_t1_t2 此函数名只有通过ister注册过之后才能够被使⽤,第⼆个参数是继承与UDF的类    //第三个参数是返回类型
ister("splicing_t1_t2",new SqlUDF,DataTypes.StringType)
var rows=Seq[Row]()
val random=new Random()
for(i <- 0 until 10){
val name="name"+i
val Int(30)%15+15
val row=Row(name,age)
rows +:=row
}
val rowsRDD=sparkSession.sparkContext.parallelize(rows)
val ateStructType(Array[StructField](
)
val ateDataFrame(rowsRDD,schema)
三月英语缩写
val sql="SELECT name,age,splicing_t1_t2(name,age) name_age FROM person"
sparkSession.sql(sql).show()
sparkSession.clo()
}
}
⼆、⽆类型的⽤户⾃定于聚合函数:UrDefinedAggregateFunction
1、它是⼀个接⼝,需要实现的⽅法有:
class AvgAge extends UrDefinedAggregateFunction {
//设置输⼊数据的类型,指定输⼊数据的字段与类型,它与在⽣成表时创建字段时的⽅法相同
override def inputSchema: StructType =
//指定缓冲数据的字段与类型
override def bufferSchema: StructType =
//指定数据的返回类型
override def dataType: DataType =
//指定是否是确定性,对输⼊数据进⾏⼀致性检验,是⼀个布尔值,当为true时,表⽰对于同样的输⼊会得到同样的输出  override def deterministic: Boolean =
//initialize⽤户初始化缓存数据
override def initialize(buffer: MutableAggregationBuffer): Unit =
//当有新的输⼊数据时,update就会更新缓存变量
override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
/
/将更新的缓存变量进⾏合并,有可能每个缓存变量的值都不在⼀个节点上,最终是要将所有节点的值进⾏合并才⾏
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =
//⼀个计算⽅法,⽤于计算我们的最终结果
override def evaluate(buffer: Row): Any =
}
这是⼀个计算平均年龄的⾃定义聚合函数,实现代码如下所⽰:
package com.udf
import java.math.BigDecimal
import org.apache.spark.sql.Row
import org.apache.pressions.{MutableAggregationBuffer, UrDefinedAggregateFunction}
import org.apache.pes.{DataType, DataTypes, StructField, StructType}
/**
* ⽤于计算平均年龄的聚合函数
*/
class AvgAge extends UrDefinedAggregateFunction {
/**
陕西经济师报名* 设置输⼊数据的类型,指定输⼊数据的字段与类型,它与在⽣成表时创建字段时的⽅法相同
* ⽐如计算平均年龄,输⼊的是age这⼀列的数据,注意此处的age名称可以随意命名
* @return
*/
override def inputSchema: StructType = ateStructType(Array[StructField](ateStructField("age",DataTypes.IntegerType,true)))
/**
* 指定缓冲数据的字段与类型,相当于中间变量
* 由于要计算平均值,⾸先要计算出总和与个数才能计算平均值,因此需要进来⼀个值就要累加并计数才能计算出平均值
* 所以要定义两个变量作为累加和以及计数的变量
* @return
*/
override def bufferSchema: StructType = ateStructType(Array[StructField](
))
//指定数据的返回类型,由于平均值是double类型,因此定义DoubleType
override def dataType: DataType = DataTypes.DoubleType
/**
* 设置该函数是否为幂等函数
* 幂等函数:即只要输⼊的数据相同,结果⼀定相同
* true表⽰是幂等函数,fal表⽰不是
* @return
*/
override def deterministic: Boolean = true
/**
* initialize⽤于初始化缓存变量的值,也就是初始化bufferSchema函数中定义的两个变量的值sum,count
* 其中buffer(0)就表⽰sum值,buffer(1)就表⽰count的值,如果还有第3个,则使⽤buffer(3)表⽰
* @param buffer
*/
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0,0.0) //或使⽤buffer(0)=0.0
buffer.update(1,0) //或使⽤buffer(1)=0
}
/**
* 当有⼀⾏数据进来时就会调⽤update⼀次,有多少⾏就会调⽤多少次,input就表⽰在调⽤⾃定义函数中有多少个参数,最终会将
* 这些参数⽣成⼀个Row对象,在使⽤时可以通过String或Long等⽅式获得对应的值
* 缓冲中的变量sum,count使⽤buffer(0)或Double(0)的⽅式获取到
* @param buffer
* @param input
*/
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val Double(0)
val Int(1)
buffer.update(0,Int(0).toDouble)
缺欠
buffer.update(1,count+1)
}
/**
* 将更新的缓存变量进⾏合并,有可能每个缓存变量的值都不在⼀个节点上,最终是要将所有节点的值进⾏合并才⾏
* 其中buffer1是本节点上的缓存变量,⽽buffer2是从其他节点上过来的缓存变量然后转换为⼀个Row对象,然后将buffer2  * 中的数据合并到buffer1中去即可
* @param buffer1
* @param buffer2
*/
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val Double(0)
val Int(1)
val Double(0)
val Int(1)
buffer1.update(0,sum1+sum2)
buffer1.update(1,count1+count2)
}
/**
* ⼀个计算⽅法,⽤于计算我们的最终结果,也就相当于返回值
* @param buffer
* @return
*/
override def evaluate(buffer: Row): Any = {
val bd = new Double(0)/Int(1).toDouble)
bd.tScale(2, BigDecimal.ROUND_HALF_UP).doubleValue//保留两位⼩数
}
soil}
2、注册该类,并指定到⼀个⾃定义函数中,如下图所⽰:
六级分数分布情况3、在表中加⼀列字段id,通过GROUP BY进⾏分组计算,如
4、在sql语句中使⽤group_age_avg,如下图所⽰:
输出结果如下图所⽰:

本文发布于:2023-07-06 12:35:56,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/168877.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:函数   数据   类型
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图