Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.0
-
None
Description
TL;DR
When speculative tasks fail/get killed, they are still considered as pending and count towards the calculation of number of needed executors.
Symptom
In one of our production job (where it's running 4 tasks per executor), we found that it was holding 6 executors at the end with only 2 tasks running (1 speculative). With more logging enabled, we found the job printed:
pendingTasks is 0 pendingSpeculativeTasks is 17 totalRunningTasks is 2
while the job only had 1 speculative task running and 16 speculative tasks intentionally killed because of corresponding original tasks had finished.
An easy repro of the issue (`--conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in cluster mode):
val n = 4000 val someRDD = sc.parallelize(1 to n, n) someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => { if (index < 300 && index >= 150) { Thread.sleep(index * 1000) // Fake running tasks } else if (index == 300) { Thread.sleep(1000 * 1000) // Fake long running tasks } it.toList.map(x => index + ", " + x).iterator }).collect
You will see when running the last task, we would be hold 38 executors (see attachment), which is exactly (152 + 3) / 4 = 38.
The Bug
Upon examining the code of pendingSpeculativeTasks:
stageAttemptToNumSpeculativeTasks.map { case (stageAttempt, numTasks) =>
numTasks - stageAttemptToSpeculativeTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
}.sum
where stageAttemptToNumSpeculativeTasks(stageAttempt) is incremented on onSpeculativeTaskSubmitted, but never decremented. stageAttemptToNumSpeculativeTasks -= stageAttempt is performed on stage completion. This means Spark is marking ended speculative tasks as pending, which leads to Spark to hold more executors that it actually needs!
I will have a PR ready to fix this issue, along with SPARK-28403 too
Attachments
Attachments
Issue Links
- Blocked
-
SPARK-28403 Executor Allocation Manager can add an extra executor when speculative tasks
- Resolved
- relates to
-
SPARK-27082 Dynamic Allocation: we should consider the scenario that speculative task being killed and never resubmit
- Resolved
- links to