Description
The new user defined aggregator feature (SPARK-27296) based on calling 'functions.udaf(aggregator)' works fine when the aggregator input type is atomic, e.g. 'Aggregator[Double, _, _]', however if the input type is an array, like 'Aggregator[Array[Double], _, _]', it is tripping over the following:
/**
- When constructing [[MapObjects]], the element type must be given, which may not be available
- before analysis. This class acts like a placeholder for [[MapObjects]], and will be replaced by
- [[MapObjects]] during analysis after the input data is resolved.
- Note that, ideally we should not serialize and send unresolved expressions to executors, but
- users may accidentally do this(e.g. mistakenly reference an encoder instance when implementing
- Aggregator). Here we mark `function` as transient because it may reference scala Type, which is
- not serializable. Then even users mistakenly reference unresolved expression and serialize it,
- it's just a performance issue(more network traffic), and will not fail.
*/
case class UnresolvedMapObjects(
@transient function: Expression => Expression,
child: Expression,
customCollectionCls: Option[Class[_]] = None) extends UnaryExpression with Unevaluable {
override lazy val resolved = false
override def dataType: DataType = customCollectionCls.map(ObjectType.apply).getOrElse
{ throw new UnsupportedOperationException("not resolved") }}
The '@transient' is causing the function to be unpacked as 'null' over on the executors, and it is causing a null-pointer exception here, when it tries to do 'function(loopVar)'
object MapObjects {
def apply(
function: Expression => Expression,
inputData: Expression,
elementType: DataType,
elementNullable: Boolean = true,
customCollectionCls: Option[Class[_]] = None): MapObjects =
{ val loopVar = LambdaVariable("MapObject", elementType, elementNullable) MapObjects(loopVar, function(loopVar), inputData, customCollectionCls) }
}
I believe it may be possible to just use 'loopVar' instead of 'function(loopVar)', whenever 'function' is null, but need second opinion from catalyst developers on what a robust fix should be