Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
1.6.0
-
None
Description
Spark 1.6.0 crashes when using nested User Defined Types in a Dataframe.
In version 1.5.2 the code below worked just fine:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.types._ @SQLUserDefinedType(udt = classOf[AUDT]) case class A(list:Seq[B]) class AUDT extends UserDefinedType[A] { override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true))) override def userClass: Class[A] = classOf[A] override def serialize(obj: Any): Any = obj match { case A(list) => val row = new GenericMutableRow(1) row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray)) row } override def deserialize(datum: Any): A = { datum match { case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq) } } } object AUDT extends AUDT @SQLUserDefinedType(udt = classOf[BUDT]) case class B(text:Int) class BUDT extends UserDefinedType[B] { override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false))) override def userClass: Class[B] = classOf[B] override def serialize(obj: Any): Any = obj match { case B(text) => val row = new GenericMutableRow(1) row.setInt(0, text) row } override def deserialize(datum: Any): B = { datum match { case row: InternalRow => new B(row.getInt(0)) } } } object BUDT extends BUDT object Test { def main(args:Array[String]) = { val col = Seq(new A(Seq(new B(1), new B(2))), new A(Seq(new B(3), new B(4)))) val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark")) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val df = sc.parallelize(1 to 2 zip col).toDF("id","b") df.select("b").show() df.collect().foreach(println) } }
In the new version (1.6.0) I needed to include the following import:
`import org.apache.spark.sql.catalyst.expressions.GenericMutableRow`
However, Spark crashes in runtime:
16/01/18 14:36:22 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be cast to org.apache.spark.sql.catalyst.InternalRow at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:51) at org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:248) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Attachments
Issue Links
- blocks
-
SPARK-24465 LSHModel should support Structured Streaming for transform
- Resolved
- is duplicated by
-
SPARK-10637 DataFrames: saving with nested User Data Types
- Resolved
- relates to
-
SPARK-23848 Structured Streaming fails with nested UDTs
- Resolved