Details
-
Sub-task
-
Status: Resolved
-
Blocker
-
Resolution: Won't Fix
-
3.2.0, 3.3.0
-
None
-
None
Description
For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721). And it breaks the original assumption that a "missing" stage must have tasks to run.
Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end.
The example to reproduce:
First, change `MyRDD` to allow it to compute:
override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = { Iterator.single((1, 1)) }
Then run this test:
test("Job hang") { initPushBasedShuffleConfs(conf) conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5) DAGSchedulerSuite.clearMergerLocs DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) val latch = new CountDownLatch(1) val myDAGScheduler = new MyDAGScheduler( sc, sc.dagScheduler.taskScheduler, sc.listenerBus, sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], sc.env.blockManager.master, sc.env) { override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = { // By this, we can mimic a stage with all tasks finished // but finalization is incomplete. latch.countDown() } } sc.dagScheduler = myDAGScheduler sc.taskScheduler.setDAGScheduler(myDAGScheduler) val parts = 20 val shuffleMapRdd = new MyRDD(sc, parts, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) reduceRdd1.countAsync() latch.await() // scalastyle:off println("=========after wait==========") // set _shuffleMergedFinalized to true can avoid the hang. // shuffleDep._shuffleMergedFinalized = true val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep)) reduceRdd2.count() }