Description
Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to
queryStagePreparationRules, the position OptimizeSkewedJoin was applied has been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin applied on changed from plan of new stage which is about to submit to whole spark plan.
In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not work because the number of collected shuffleStages is more than 2.
The following test will prove it:
test("OptimizeSkewJoin may not work") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100", SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.SHUFFLE_PARTITIONS.key -> "10") { withTempView("skewData1", "skewData2", "skewData3") { spark .range(0, 1000, 1, 10) .selectExpr("id % 3 as key1", "id % 3 as value1") .createOrReplaceTempView("skewData1") spark .range(0, 1000, 1, 10) .selectExpr("id % 1 as key2", "id as value2") .createOrReplaceTempView("skewData2") spark .range(0, 1000, 1, 10) .selectExpr("id % 1 as key3", "id as value3") .createOrReplaceTempView("skewData3") // Query has two skewedJoin in two continuous stages. val (_, adaptive1) = runAdaptiveAndVerifyResult( """ |SELECT key1 FROM skewData1 s1 |JOIN skewData2 s2 |ON s1.key1 = s2.key2 |JOIN skewData3 |ON s1.value1 = value3 |""".stripMargin) val shuffles1 = collect(adaptive1) { case s: ShuffleExchangeExec => s } assert(shuffles1.size == 4) val smj1 = findTopLevelSortMergeJoin(adaptive1) assert(smj1.size == 2 && smj1.forall(_.isSkewJoin)) } } }
I'll open a PR shortly to fix this issue
Attachments
Issue Links
- is caused by
-
SPARK-33832 Add an option in AQE to mitigate skew even if it causes an new shuffle
- Resolved
- links to