SparkSQL(8)-Sparksql聚合操作(Aggregation)
Spark SQL(8)-Spark sql聚合操作(Aggregation)之前简单总结了spark从sql到物理计划的整个流程,接下来就总结下Spark SQL中关于聚合的操作。
聚合操作的物理计划⽣成
⾸先从⼀条sql开始吧
SELECT NAME,COUNT(*) FRON PEOPLE GROUP BY NAME
这条sql的经过antlr4解析后的树结构如下:
在解析出来的树结构中可以看出来,在querySpecification下⾯多了aggregation⼦节点。这次我们只关注关于聚合的相关操作。在analyze的阶段,关于聚合的解析是在AstBuilder.withQuerySpecification⽅法中:
private def withQuerySpecification(
ctx: QuerySpecificationContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
import ctx._
// 去掉了⼀些其他操作代码。。。
// Add where.
val withFilter = withLateralView.optionalMap(where)(filter)
// Add aggregation or a project.
val namedExpressions = expressions.map {
ca e: NamedExpression => e
ca e: Expression => UnresolvedAlias(e)
}
val withProject = if (aggregation != null) {
withAggregation(aggregation, namedExpressions, withFilter)
} el if (Empty) {
Project(namedExpressions, withFilter)
} el {
withFilter
}
// Having
val withHaving = withProject.optional(having) {
// Note that we add a cast to non-predicate expressions. If the expression itlf is
/
/ already boolean, the optimizer will get rid of the unnecessary cast.
val predicate = expression(having) match {
ca p: Predicate => p
ca e => Cast(e, BooleanType)
}
Filter(predicate, withProject)
}
// Distinct
val withDistinct = if (tQuantifier() != null && tQuantifier().DISTINCT() != null) {
白扁豆花
Distinct(withHaving)
} el {
withHaving
}
// Window
// Hint
}
}
如下为withAggregation⽅法:
private def withAggregation(
ctx: AggregationContext,
lectExpressions: Seq[NamedExpression],
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val groupByExpressions = upingExpressions)
if (ctx.GROUPING != null) {
// GROUP BY .... GROUPING SETS (...)
val lectedGroupByExprs =
GroupingSets(lectedGroupByExprs, groupByExpressions, query, lectExpressions)
} el {
// GROUP BY .... (WITH CUBE | WITH ROLLUP)?
val mappedGroupByExpressions = if (ctx.CUBE != null) {
Seq(Cube(groupByExpressions))
} el if (ctx.ROLLUP != null) {
Seq(Rollup(groupByExpressions))
} el {
groupByExpressions
}
Aggregate(mappedGroupByExpressions, lectExpressions, query)
}
}
可以看出来最后在树中添加了⼀个Aggregate节点。现在这⾥跳过优化的操作就是物理计划的处理,物理计划⾥⾯主要关注聚合相关的策略就是:
object Aggregation extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
ca PhysicalAggregation(
groupingExpressions, aggregateExpressions, resultExpressions, child) =>
val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.Set).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column ts. Our `RewriteDistinctAggregates` should take care this ca.
<("You hit a query analyzer bug. Plea report your query to " +
"Spark ur mailing list.")
}
val aggregateOperator =
if (functionsWithDistinct.isEmpty) {
aggregate.AggUtils.planAggregateWithoutDistinct(
groupingExpressions,
aggregateExpressions,
resultExpressions,
planLater(child))
} el {
aggregate.AggUtils.planAggregateWithOneDistinct(
groupingExpressions,
functionsWithDistinct,
functionsWithoutDistinct,
resultExpressions,
planLater(child))
}
aggregateOperator
ca _ => Nil
}
}
从上⾯的逻辑可以看出来,这⾥根据函数⾥⾯有没有包含distinct操作,分别调⽤planAggregateWithoutDistinct和planAggregateWithOneDistinct来⽣成物理计划。到此再经过准备阶段,聚合操作的物理计划为⽣成也就结束了。
接下来会分析下planAggregateWithoutDistinct和planAggregateWithOneDistinct实现的不同,还有就是spark sql针对聚合操作的实现⽅式。在介绍这俩个之前⾸先介绍下聚合的模式(AggregateMode)
聚合的模式AggregateMode和聚合函数
Partial 主要是代表局部合并,对输⼊的数据更新到聚合缓冲区,返回聚合缓冲区数据;
Final 将聚合缓冲区的数据进⾏合并,返回最终的结果;
Complete 不能进⾏局部合并,直接计算返回最终的结果;
PartialMerge 对聚合缓冲区的数据进⾏合并,其主要⽤于distinct语句中,返回的依然是聚合缓冲区数据。
接下来顺便介绍下聚合函数分类:
1、DeclarativeAggregate 声明式的聚合函数
2、ImperativeAggregate 指令式的聚合函数
3、TypedImperativeAggregate是ImperativeAggregate的⼦类,他可以⽤java 对象存储在内存缓冲区中。
声明的聚合函数和指令式的聚合函数的不同主要体现在update、merge操作上,DeclarativeAggregate对这俩个操作主要是重写表达式的形式来体现;ImperativeAggregate则要重写其⽅法。
接下来介绍下planAggregateWithoutDistinct和planAggregateWithOneDistinct的不同:
关于planAggregateWithoutDistinct:
def planAggregateWithoutDistinct(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
// Check if we can u HashAggregate.
// 1. Create an Aggregate Operator for partial aggregations.
val groupingAttributes = groupingExpressions.map(_.toAttribute)
val partialAggregateExpressions = aggregateExpressions.map(_.copy(mode = Partial))
val partialAggregateAttributes =
partialAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
val partialResultExpressions =
groupingAttributes ++
partialAggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
val partialAggregate = createAggregate(
requiredChildDistributionExpressions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = partialAggregateExpressions,
aggregateAttributes = partialAggregateAttributes,
initialInputBufferOfft = 0,
排球可以用脚吗
resultExpressions = partialResultExpressions,
child = child)
// 2. Create an Aggregate Operator for final aggregations.
val finalAggregateExpressions = aggregateExpressions.map(_.copy(mode = Final))
// The attributes of the final aggregation buffer, which is prented as input to the result
// projection:
val finalAggregateAttributes = finalAggregateExpressions.map(_.resultAttribute)
val finalAggregate = createAggregate(
requiredChildDistributionExpressions = Some(groupingAttributes),
groupingExpressions = groupingAttributes,
aggregateExpressions = finalAggregateExpressions,
aggregateAttributes = finalAggregateAttributes,
initialInputBufferOfft = groupingExpressions.length,
resultExpressions = resultExpressions,
child = partialAggregate)
finalAggregate :: Nil周到的意思
}
上⾯的⽅法其实可以总结成俩步,第⼀步就是创建⼀个聚合计划⽤于局部合并阶段,第⼆步就是创建⼀个final聚合计算。
关于planAggregateWithOneDistinct:
这个其实和上⾯的planAggregateWithoutDistinct差不太多,只不过是变成了四步:
1、创建⼀个聚合计划⽤于局部合并阶段
2、创建partialMerge计划;
3、创建⼀个partial计划这⼀步⽤于distinct
4、创建⼀个final计划
在这俩个⽅法⾥⾯都⽤到了createAggregate在这个⽅法⾥⾯确定了到底使⽤何种⽅式来实现聚合计算
private def createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOfft: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan = {
val uHash = HashAggregateExec.supportsAggregate(
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
if (uHash) {
HashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOfft = initialInputBufferOfft,
resultExpressions = resultExpressions,
child = child)
} el {
val objectHashEnabled = f.uObjectHashAggregation
val uObjectHash = ObjectHashAggregateExec.supportsAggregate(aggregateExpressions)
if (objectHashEnabled && uObjectHash) {
ObjectHashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOfft = initialInputBufferOfft,
resultExpressions = resultExpressions,
child = child)
} el {
大理美食
SortAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
共和执政aggregateAttributes = aggregateAttributes,
initialInputBufferOfft = initialInputBufferOfft,
resultExpressions = resultExpressions,
child = child)
}
}
}
从上⾯的逻辑可以看出来;如果可以进⾏hashAggregate操作则选取hashAggregate; 他的具体条件是聚合的schema都在下⾯这些⾥⾯就可以采⽤hashAggregate廉政谈话记录表
static {
mutableFieldTypes = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(new DataType[] {
NullType,
BooleanType,
ByteType,
ShortType,
IntegerType,
LongType,
FloatType,
DoubleType,
DateType,
TimestampType
})));
}
之后如果打开了objectHash的开关,并且聚合的函数表达式是TypedImperativeAggregate那么就采⽤objectHash;然后如何前⾯俩个都不满⾜那么就选择sortAggregate聚合的⽅式。下⾯会介绍下这三种聚合⽅式。
HashAggregateExec介绍
hashAggregate的逻辑主要是构建⼀个hashmap,以分组为key,将数据保存在这个map中进⾏聚合计算,这个map维护在内存中,如果内存不⾜的情况下,会进⾏溢写的操作,之后hashaggregate会退化为基于排序的聚合操作。
在doExecute⽅法中实例化⼀个TungstenAggregationIterator,在这个类⾥⾯实现了聚合的操作:
1、hashMap = new UnsafeFixedWidthAggregationMap;这个map⾥⾯保存了分组的key和其对应的聚合缓冲数据;在
UnsafeFixedWidthAggregationMap⾥⾯,重要的成员变量有map = BytesToBytesMap 实际保存的数据就在这个map⾥⾯。
2、主要逻辑在processInputs⽅法⾥⾯:
private def processInputs(fallbackStartsAt: (Int, Int)): Unit = {
if (groupingExpressions.isEmpty) {
// If there is no grouping expressions, we can just reu the same buffer over and over again.
// Note that it would be better to eliminate the hash map entirely in the future.
茶的起源简单介绍val groupingKey = groupingProjection.apply(null)
val buffer: UnsafeRow = AggregationBufferFromUnsafeRow(groupingKey)
while (inputIter.hasNext) {
val newInput = ()
processRow(buffer, newInput)
}
} el {
var i = 0
while (inputIter.hasNext) {
val newInput = ()
val groupingKey = groupingProjection.apply(newInput)
var buffer: UnsafeRow = null
if (i < fallbackStartsAt._2) {
buffer = AggregationBufferFromUnsafeRow(groupingKey)
}
if (buffer == null) {
val sorter = hashMap.destructAndCreateExternalSorter()
if (externalSorter == null) {
externalSorter = sorter
} el {
<(sorter)
}
i = 0
buffer = AggregationBufferFromUnsafeRow(groupingKey)
if (buffer == null) {
// failed to allocate the first page
throw new SparkOutOfMemoryError("No enough memory for aggregation")
}
}
processRow(buffer, newInput)
i += 1
}
if (externalSorter != null) {
val sorter = hashMap.destructAndCreateExternalSorter()
<(sorter)
hashMap.free()
switchToSortBadAggregation()
}
}
}
这个⽅法⾥⾯的逻辑⼤体就是:从inputIter⾥⾯获取数据,然后根据聚合缓冲区的数据对数据进⾏新增或者更新的操作;在调⽤AggregationBufferFromUnsafeRow(groupingKey);如果返回的数据为null,那么表⽰内存不⾜,这个时候就会进⾏溢写操作:
这⾥的溢写操作会new UnsafeKVExternalSorter 并返回保存到externalSorter中,如果是初次那么直接赋值,如果不是那么就进⾏merge;这⾥会把hashmap⾥⾯的map就是bytesbybtesMap的数据会传
进去,之后创建UnsafeInMemorySorter,将bytesbybtesMap的数据导⼊到UnsafeInMemorySorter⾥⾯;在之后调⽤ateWithExistingInMemorySorter,对数据进⾏排序溢写溢写结束之后会重置bytesbybtesMap然后hashmap继续申请内存继续计算,如果内存不⾜继续溢写;直到inputIter没有元素;
接着会根据externalSorter是否为null来判断需不需要切换到基于排序聚合操作。
如果不切换基于排序的聚合;则会给aggregationBufferMapIterator和mapIteratorHasNext赋值;
如果切换到基于排序的聚合;那么会调⽤switchToSortBadAggregation;初始化⼀些基于排序的变量;之后会⽤于next和hasNext⽅法中:
基于排序的聚合需要的变量有:
externalSorter = UnsafeKVExternalSorter;在switchToSortBadAggregation⾥⾯,externalSorter⾸先会调⽤UnsafeKVExternalSorter.sortedIterator⽅法拿到排序后的record迭代器,之后调⽤其next就⾏,这⾥的next的值就是sortedInputHasNewGroup的值,⽤于表⽰是否还有值(这⾥只是⾸次,相当于初始化这个变量的值)。
override final def hasNext: Boolean = {
(sortBad && sortedInputHasNewGroup) || (!sortBad && mapIteratorHasNext)
}自然风景图片
override final def next(): UnsafeRow = {
if (hasNext) {