Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37063 SQL Adaptive Query Execution QA: Phase 2
  3. SPARK-37328

SPARK-33832 brings the bug that OptimizeSkewedJoin may not work since it was applied on whole plan innstead of new stage plan

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.3.0
    • SQL
    • None

    Description

      Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to 

      queryStagePreparationRules, the position OptimizeSkewedJoin was applied has been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin applied on changed from plan of new stage which is about to submit to whole spark plan.

      In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not work because the number of collected shuffleStages is more than 2.

      The following test will prove it:

       

       

      test("OptimizeSkewJoin may not work") {
        withSQLConf(
          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
          SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
          SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
          SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
          SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
          SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
          withTempView("skewData1", "skewData2", "skewData3") {
            spark
              .range(0, 1000, 1, 10)
              .selectExpr("id % 3 as key1", "id % 3 as value1")
              .createOrReplaceTempView("skewData1")
            spark
              .range(0, 1000, 1, 10)
              .selectExpr("id % 1 as key2", "id as value2")
              .createOrReplaceTempView("skewData2")
            spark
              .range(0, 1000, 1, 10)
              .selectExpr("id % 1 as key3", "id as value3")
              .createOrReplaceTempView("skewData3")
      
            // Query has two skewedJoin in two continuous stages.
            val (_, adaptive1) =
              runAdaptiveAndVerifyResult(
                """
                  |SELECT key1 FROM skewData1 s1
                  |JOIN skewData2 s2
                  |ON s1.key1 = s2.key2
                  |JOIN skewData3
                  |ON s1.value1 = value3
                  |""".stripMargin)
            val shuffles1 = collect(adaptive1) {
              case s: ShuffleExchangeExec => s
            }
            assert(shuffles1.size == 4)
            val smj1 = findTopLevelSortMergeJoin(adaptive1)
            assert(smj1.size == 2 && smj1.forall(_.isSkewJoin))
          }
        }
      } 

      I'll open a PR shortly to fix this issue

       

      Attachments

        Issue Links

          Activity

            People

              cloud_fan Wenchen Fan
              Thomas Liu Lietong Liu
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: