Details
Description
LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate a wrong result:
Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected, but at right side we have 100K rows including 999, the result will be
- one row is (999, 999)
- the rest rows are (null, xxx)
Once you call show(), the row (999,999) has only 1/100000th chance to be selected by CollectLimit.
The actual optimization might be,
- push down limit
- but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
Here is my notebook:
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
import scala.util.Random._ val dl = shuffle(1 to 100000).toDF("id") val dr = shuffle(1 to 100000).toDF("id") println("data frame dl:") dl.explain println("data frame dr:") dr.explain val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1) j.explain j.show(false)
data frame dl: == Physical Plan == LocalTableScan [id#10] data frame dr: == Physical Plan == LocalTableScan [id#16] == Physical Plan == CollectLimit 1 +- SortMergeJoin [id#10], [id#16], FullOuter :- *Sort [id#10 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#10, 200) : +- *LocalLimit 1 : +- LocalTableScan [id#10] +- *Sort [id#16 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#16, 200) +- LocalTableScan [id#16] import scala.util.Random._ dl: org.apache.spark.sql.DataFrame = [id: int] dr: org.apache.spark.sql.DataFrame = [id: int] j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int] +----+---+ |id |id | +----+---+ |null|148| +----+---+