Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30602 SPIP: Support push-based shuffle to improve shuffle efficiency
  3. SPARK-36558

Stage has all tasks finished but with ongoing finalization can cause job hang

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Blocker
    • Resolution: Won't Fix
    • 3.2.0, 3.3.0
    • None
    • Spark Core
    • None

    Description

       

      For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721). And it breaks the original assumption that a "missing" stage must have tasks to run. 

      Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end.

       

      The example to reproduce:

      First, change `MyRDD` to allow it to compute:

      override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = {
         Iterator.single((1, 1))
       }

      Then run this test:

      test("Job hang") {
        initPushBasedShuffleConfs(conf)
        conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
        DAGSchedulerSuite.clearMergerLocs
        DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
        val latch = new CountDownLatch(1)
        val myDAGScheduler = new MyDAGScheduler(
          sc,
          sc.dagScheduler.taskScheduler,
          sc.listenerBus,
          sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
          sc.env.blockManager.master,
          sc.env) {
          override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
            // By this, we can mimic a stage with all tasks finished
            // but finalization is incomplete.
            latch.countDown()
          }
        }
        sc.dagScheduler = myDAGScheduler
        sc.taskScheduler.setDAGScheduler(myDAGScheduler)
        val parts = 20
        val shuffleMapRdd = new MyRDD(sc, parts, Nil)
        val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
        val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
        reduceRdd1.countAsync()
        latch.await()
        // scalastyle:off
        println("=========after wait==========")
        // set _shuffleMergedFinalized to true can avoid the hang.
        // shuffleDep._shuffleMergedFinalized = true
        val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
        reduceRdd2.count()
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Ngone51 wuyi
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: