Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
2.3.0, 2.3.1
Description
In https://github.com/apache/spark/pull/19080, we simplified the distribution/partitioning framework, and make all the join-like operators require HashClusteredDistribution from children. Unfortunately streaming join operator was missed.
This can cause wrong result. Think about
val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a)
The physical plan is
*(3) Project [a#5, b#6, c#7, c#14] +- StreamingSymmetricHashJoin [a#5, b#6], [a#12, b#13], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 5a1ab77a-ed5c-4f0b-8bcb-fc5637152b97, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(a#5, b#6, 5) : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6, (value#1 * 3) AS c#7] : +- StreamingRelation MemoryStream[value#1], [value#1] +- Exchange hashpartitioning(b#13, 5) +- *(2) Project [value#3 AS a#12, (value#3 * 3) AS b#13, (value#3 * 4) AS c#14] +- StreamingRelation MemoryStream[value#3], [value#3]
The left table is hash partitioned by a, b, while the right table is hash partitioned by b. This means, we may have a matching record that is in different partitions, which should be in the output but not.