Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.1.0, 1.1.1, 1.2.0
-
None
Description
The current job group id of a Spark context is stored in the localProperties member value. This data structure is designed to be thread local, and its settings are not preserved when ComplexFutureAction instantiates a new Future.
One consequence of this is that takeAsync() does not behave in the same way as other async actions, e.g. countAsync(). For example, this test (if copied into StatusTrackerSuite.scala), will fail, because "my-job-group2" is not propagated to the Future which actually instantiates the job:
test("getJobIdsForGroup() with takeAsync()") { sc = new SparkContext("local", "test", new SparkConf(false)) sc.setJobGroup("my-job-group2", "description") sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty) val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1) val firstJobId = eventually(timeout(10 seconds)) { firstJobFuture.jobIds.head } eventually(timeout(10 seconds)) { sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId)) } }
It also impacts current PR for SPARK-1021, which involves additional uses of ComplexFutureAction.
Attachments
Issue Links
- blocks
-
SPARK-1021 sortByKey() launches a cluster job when it shouldn't
- Resolved
- links to