背景
本文基于 Spark 3.3, 最近在用 Spark Api 做 DataSet[Row]
转换的时候遇到了一些 Spark内部转换的问题, 在此记录一下。
杂谈
我们知道在Spark中分为 InternalRow
和Row
, 前者是 Spark 内部的使用的一行数据的表示,后者是给Spark开发者使用的行数据表示。
在Spark中如果涉及到 InternalRow
和Row
转换的时候,这个时候就会用到 ExpressionEncoder[Row]
来做序列化和反序列化,而获取 ExpressionEncoder[Row]
的方式一般就是调用RowEncoder.apply(StructType)
方法。比如在delta 1.0.0
用到的 Row转换:
val joinedRowEncoder = RowEncoder(joinedPlan.schema)
val outputRowEncoder = RowEncoder(deltaTxn.metadata.schema).resolveAndBind()
val processor = new JoinedRowProcessor(
targetRowHasNoMatch = resolveOnJoinedPlan(Seq(col(SOURCE_ROW_PRESENT_COL).isNull.expr)).head,
sourceRowHasNoMatch = resolveOnJoinedPlan(Seq(col(TARGET_ROW_PRESENT_COL).isNull.expr)).head,
matchedConditions = matchedClauses.map(clauseCondition),
matchedOutputs = matchedClauses.map(matchedClauseOutput),
notMatchedConditions = notMatchedClauses.map(clauseCondition),
notMatchedOutputs = notMatchedClauses.map(notMatchedClauseOutput),
noopCopyOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal.FalseLiteral :+ incrNoopCountExpr),
deleteRowOutput =
resolveOnJoinedPlan(targetOutputCols :+ Literal.TrueLiteral :+ Literal.TrueLiteral),
joinedAttributes = joinedPlan.output,
joinedRowEncoder = joinedRowEncoder,
outputRowEncoder = outputRowEncoder)
val outputDF =
Dataset.ofRows(spark, joinedPlan).mapPartitions(processor.processPartition)(outputRowEncoder)
logDebug("writeAllChanges: join output plan:
" + outputDF.queryExecution)
这里会涉及到两个ROW的转换,两个ROW的 Schema 是不一致的,如果要涉及到两个ROW之间的转换的时候,而且spark.implicits._
也没对应的隐式参数的时候,就得自己构造ExpressionEncoder[Row]
,其实 说到底 spark序列化和反序列化用的都是Expression表达式
,下面就来分析一下这里的序列化和反序列化是怎么做的。
分析
直接上代码:
object RowEncoder {
def apply(schema: StructType, lenient: Boolean): ExpressionEncoder[Row] = {
val cls = classOf[Row]
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
val serializer = serializerFor(inputObject, schema, lenient)
val deserializer = deserializerFor(GetColumnByOrdinal(0, serializer.dataType), schema)
new ExpressionEncoder[Row](
serializer,
deserializer,
ClassTag(cls))
}
...
}
经过serializerFor
方法以后,返回 CreateNamedStruct(Seq(GetExternalRowField(BoundReference(0, ObjectType(cls), nullable = true),index,name)))
,注意如果,存在String类型的话,在序列化的时候会调用 StaticInvoke(classOf[UTF8String],"fromString")
进行反射调用序列化。
而经过deserializerFor
方法以后,返回CreateExternalRow(Seq(GetStructField(GetColumnByOrdinal(0, serializer.dataType))))
,注意对于 String类型的,在反序列化的时候会调用Invoke("toString")
反射调用反序列化。
而真正在进行行处理的时候,会调用ExpressionEncoder[Row].createSerializer
和ExpressionEncoder[Row].createDeserializer
。对于ExpressionEncoder[Row].createDeserializer
调用之前,还得调用resolveAndBind
进行参数的绑定。
对于序列化
主要是如下方法:
def createSerializer(): Serializer[T] = new Serializer[T](optimizedSerializer)
class Serializer[T](private val expressions: Seq[Expression])
extends (T => InternalRow) with Serializable {
@transient
private[this] var inputRow: GenericInternalRow = _
@transient
private[this] var extractProjection: UnsafeProjection = _
override def apply(t: T): InternalRow = try {
if (extractProjection == null) {
inputRow = new GenericInternalRow(1)
extractProjection = GenerateUnsafeProjection.generate(expressions)
}
inputRow(0) = t
extractProjection(inputRow)
} catch {
case e: Exception =>
throw QueryExecutionErrors.expressionEncodingError(e, expressions)
}
}
可以看到在apply的方法中会进行如下操作:
// 新建一个只有一列数据的ROW,并赋值为输入的值。
inputRow = new GenericInternalRow(1)
inputRow(0) = t
这里就和序列化的表达式BoundReference(0, ObjectType(cls), nullable = true)
吻合了: 取行数据中第一列的值.extractProjection 最终会根据 表达式计算出结果并返回 UnsafeRow
对于反序列化
在反序列化的时候,得先调用resolveAndBind
方法,进行Schema的绑定,便于从一样数据中取对应的数据。
def resolveAndBind(
attrs: Seq[Attribute] = schema.toAttributes,
analyzer: Analyzer = SimpleAnalyzer): ExpressionEncoder[T] = {
val dummyPlan = CatalystSerde.deserialize(LocalRelation(attrs))(this)
val analyzedPlan = analyzer.execute(dummyPlan)
analyzer.checkAnalysis(analyzedPlan)
val resolved = SimplifyCasts(analyzedPlan).asInstanceOf[DeserializeToObject].deserializer
val bound = BindReferences.bindReference(resolved, attrs)
copy(objDeserializer = bound)
}
这个CatalystSerde.deserialize
方法获取deserializer
变量:
val deserializer: Expression = {
if (isSerializedAsStructForTopLevel) {
// We serialized this kind of objects to root-level row. The input of general deserializer
// is a `GetColumnByOrdinal(0)` expression to extract first column of a row. We need to
// transform attributes accessors.
objDeserializer.transform {
case UnresolvedExtractValue(GetColumnByOrdinal(0, _),
Literal(part: UTF8String, StringType)) =>
UnresolvedAttribute.quoted(part.toString)
case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) =>
GetColumnByOrdinal(ordinal, dt)
case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n
case If(IsNull(GetColumnByOrdinal(0, _)), _, i: InitializeJavaBean) => i
}
} else {
// For other input objects like primitive, array, map, etc., we deserialize the first column
// of a row to the object.
objDeserializer
}
}
这里会把表达式变成CreateExternalRow(Seq(GetColumnByOrdinal(index)))
,最终会
得到DeserializeToObject(UnresolvedDeserializer(CreateExternalRow(Seq(GetColumnByOrdinal(index)))),LocalRelation(attrs))
计划。
该计划经过ResolveDeserializer 规则解析, 会把 GetColumnByOrdinal(index)
变成对应的属性值。
最终 BindReferences.bindReference(resolved, attrs)
转换成Seq(BoundReference(ordinal, a.dataType, input(ordinal).nullable))
可执行表达式,最最终绑定到表的特定属性上,从而获取对应的值。
真正时机进行操作的时候,调用的是createDeserializer
方法:
def createDeserializer(): Deserializer[T] = new Deserializer[T](optimizedDeserializer)
class Deserializer[T](private val expressions: Seq[Expression])
extends (InternalRow => T) with Serializable {
@transient
private[this] var constructProjection: Projection = _
override def apply(row: InternalRow): T = try {
if (constructProjection == null) {
constructProjection = SafeProjection.create(expressions)
}
constructProjection(row).get(0, anyObjectType).asInstanceOf[T]
} catch {
case e: Exception =>
throw QueryExecutionErrors.expressionDecodingError(e, expressions)
}
}
可以看到 最终的表达式CreateExternalRow(Seq(BoundReference(ordinal, a.dataType, input(ordinal).nullable)))
会生成 GenericRowWithSchema
类型的ROW,
constructProjection = SafeProjection.create(expressions)
constructProjection(row).get(0, anyObjectType).asInstanceOf[T]
其中 constructProjection 返回的是 SpecificInternalRow
类型的ROW。
所以constructProjection(row)
返回的是SpecificInternalRow(GenericRowWithSchema)
的值,所以get(0)是 GenericRowWithSchema类型的ROW,也就是ROW类型。
额外的话
对于BoundReference(ordinal, a.dataType, input(ordinal).nullable)
该方法,该方法是用来把表示涉及的属性,给映射到对应的计划的属性值上,这样我们计算的时候,就可以获取到对应的值,一般是调用BindReferences.bindReference
方法,这也是为什么表达式能获取到对应的属性值的原因。