Status: Resolved
Resolution: Fixed
2.0.0, 2.0.1
Was brought to my attention where the following code produces incorrect results
val data = List(TestData("A", 1, 7)) val frame = session.sqlContext.createDataFrame(session.sparkContext.parallelize(data)) frame.createCassandraTable( keySpaceName, table, partitionKeyColumns = Some(Seq("id"))) frame .write .format("org.apache.spark.sql.cassandra") .mode(SaveMode.Append) .options(Map("table" -> table, "keyspace" -> keySpaceName)) .save() val loaded = sparkSession.sqlContext .read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> table, "keyspace" -> ks)) .load() .select("id", "col1", "col2") val min1 = loaded.groupBy("id").agg(min("col1").as("min")) val min2 = loaded.groupBy("id").agg(min("col2").as("min")) min1.union(min2).show() /* prints: +---+---+ | id|min| +---+---+ | A| 1| | A| 1| +---+---+ Should be | A| 1| | A| 7| */
I looked into the explain pattern and saw
Union :- *HashAggregate(keys=[id#93], functions=[min(col1#94)]) : +- Exchange hashpartitioning(id#93, 200) : +- *HashAggregate(keys=[id#93], functions=[partial_min(col1#94)]) : +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@7ec20844 [id#93,col1#94] +- *HashAggregate(keys=[id#93], functions=[min(col2#95)]) +- ReusedExchange [id#93, min#153], Exchange hashpartitioning(id#93, 200)
Which was different than using a parallelized collection as the DF backing. So I tested the same code with a Parquet backed DF and saw the same results.
frame.write.parquet("garbagetest") val parquet ="garbagetest").select("id", "col1", "col2") println("PDF") parquetmin1.union(parquetmin2).explain() parquetmin1.union(parquetmin2).show() /* prints: +---+---+ | id|min| +---+---+ | A| 1| | A| 1| +---+---+ */
Which leads me to believe there is something wrong with the reused exchange.