Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
2.4.5, 3.0.0
-
None
Description
The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures
// Check whether a class represents a Scala closure private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") }
This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala functions become Java lambdas.
As an example, the following code works well in Spark 2.4 Spark Shell:
scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. import org.apache.spark.sql.functions.lit defined class Foo col: org.apache.spark.sql.Column = 123 df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
But fails in 3.0
scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.map(RDD.scala:421) ... 39 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 123) - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) - object (class $iw, $iw@2d87ac2b) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) ... 47 more
*Apache Spark 2.4.5 with Scala 2.12*
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.5 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) Type in expressions to have them evaluated. Type :help for more information. scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:393) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.map(RDD.scala:392) ... 45 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 123) - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) - object (class $iw, $iw@73534675) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$1952/356563238, $Lambda$1952/356563238@6ca95b1e) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400) ... 53 more