Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.3.0
-
None
-
None
Description
While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel with Structured Streaming (for prediction in a streaming job), I ran into a bug which seems to indicate that nested UDTs don't work with streaming.
Here's a simplified version of the code:
package org.apache.spark.ml.feature import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamTest class MinHashLSHSuite extends StreamTest { @transient var dataset: Dataset[_] = _ override def beforeAll(): Unit = { super.beforeAll() val data = { for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 1.0))) } dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") } test("a test") { val localSpark = spark import localSpark.implicits._ val df = Seq[(Int, Array[Vector])]( (1, Array(Vectors.dense(1.0, 2.0))), (2, Array(Vectors.dense(1.1, 2.1))) ).toDF("a", "b") df.show() // THIS SUCCEEDS df.collect().foreach(println) // THIS SUCCEEDS testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS FAILS rows.foreach { case Row(a: Int, b: Array[_]) => } } } def testTransformerOnStreamData[A : Encoder]( dataframe: DataFrame) (globalCheckFunction: Seq[Row] => Unit): Unit = { val stream = MemoryStream[A] val streamDF = stream.toDS().toDF("a", "b") val data = dataframe.as[A].collect() val streamOutput = streamDF .select("a", "b") testStream(streamOutput) ( AddData(stream, data: _*), CheckAnswer(globalCheckFunction) ) } }
The streaming test fails with stack trace:
[info] - a test *** FAILED *** (2 seconds, 325 milliseconds) [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) [info] [info] == Progress == [info] AddData to MemoryStream[_1#24,_2#25]: (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba) [info] => CheckAnswerByFunc [info] [info] == Stream == [info] Output Mode: Append [info] Stream state: {MemoryStream[_1#24,_2#25]: 0} [info] Thread state: alive [info] Thread stack trace: java.lang.Thread.sleep(Native Method) [info] org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) [info] org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) [info] org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) [info] org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) [info] org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) [info] [info] [info] == Sink == [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])] [info] [info] [info] == Plan == [info] == Parsed Logical Plan == [info] Project [a#27, b#28] [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] [info] [info] == Analyzed Logical Plan == [info] a: int, b: array<vector> [info] Project [a#27, b#28] [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] [info] [info] == Optimized Logical Plan == [info] Project [_1#36 AS a#27, _2#37 AS b#28] [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] [info] [info] == Physical Plan == [info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28] [info] +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] (StreamTest.scala:430) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528) [info] at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) [info] at org.scalatest.Assertions$class.fail(Assertions.scala:1089) [info] at org.scalatest.FunSuite.fail(FunSuite.scala:1560) [info] at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:430) [info] at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:683) [info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:704) [info] at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:693) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) [info] at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:693) [info] at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:692) [info] at org.apache.spark.ml.feature.MinHashLSHSuite.testStream(MinHashLSHSuite.scala:28) [info] at org.apache.spark.ml.feature.MinHashLSHSuite.testTransformerOnStreamData(MinHashLSHSuite.scala:201) [info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply$mcV$sp(MinHashLSHSuite.scala:184) [info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174) [info] at org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174) [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103) [info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183) [info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) [info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) [info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196) [info] at org.apache.spark.ml.feature.MinHashLSHSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(MinHashLSHSuite.scala:28) [info] at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221) [info] at org.apache.spark.ml.feature.MinHashLSHSuite.runTest(MinHashLSHSuite.scala:28) [info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) [info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229) [info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396) [info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384) [info] at scala.collection.immutable.List.foreach(List.scala:381) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) [info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) [info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229) [info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1560) [info] at org.scalatest.Suite$class.run(Suite.scala:1147) [info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560) [info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233) [info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:521) [info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52) [info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480) [info] at sbt.ForkMain$Run$2.call(ForkMain.java:296) [info] at sbt.ForkMain$Run$2.call(ForkMain.java:286) [info] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [info] at java.lang.Thread.run(Thread.java:745)
Attachments
Issue Links
- is related to
-
SPARK-12878 Dataframe fails with nested User Defined Types
- Resolved
-
SPARK-15989 PySpark SQL python-only UDTs don't support nested types
- Resolved