Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.1.0, 1.2.0
-
None
Description
We have the following configs:
spark.shuffle.compress spark.shuffle.spill.compress
When these two diverge, sort-based shuffle fails with a compression exception under certain workloads. This is because in sort-based shuffle we serve the index file (using spark.shuffle.spill.compress) as a normal shuffle file (using spark.shuffle.compress). It was unfortunate in retrospect that these two configs were exposed so we can't easily remove them.
Here is how this can be reproduced. Set the following in your spark-defaults.conf:
spark.master local-cluster[1,1,512] spark.shuffle.spill.compress false spark.shuffle.compress true spark.shuffle.manager sort spark.shuffle.memoryFraction 0.001
Then run the following in spark-shell:
sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect()
This leads to compression errors, such as the following:
[info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, joshs-mbp): java.io.IOException: FAILED_TO_UNCOMPRESS(5) [info] org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) [info] org.xerial.snappy.SnappyNative.rawUncompress(Native Method) [info] org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) [info] org.xerial.snappy.Snappy.uncompress(Snappy.java:480) [info] org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127) [info] org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88) [info] org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) [info] org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128) [info] org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090) [info] org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:350) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) [info] scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [info] org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) [info] org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) [info] org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) [info] org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) [info] org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) [info] org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) [info] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [info] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [info] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [info] org.apache.spark.scheduler.Task.run(Task.scala:56) [info] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) [info] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [info] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [info] java.lang.Thread.run(Thread.java:745)
Similarly, with
spark.shuffle.spill.compress true spark.shuffle.compress false
we see
info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, joshs-mbp): java.io.StreamCorruptedException: invalid stream header: 82534E41 [info] java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) [info] java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) [info] org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:57) [info] org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:57) [info] org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95) [info] org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:355) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:244) [info] org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52) [info] scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) [info] org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) [info] org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) [info] org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129) [info] org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) [info] org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) [info] org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) [info] org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) [info] org.apache.spark.rdd.RDD.iterator(RDD.scala:229) [info] org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [info] org.apache.spark.scheduler.Task.run(Task.scala:56) [info] org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) [info] java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [info] java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [info] java.lang.Thread.run(Thread.java:745)
Attachments
Issue Links
- is related to
-
SPARK-3367 Remove spark.shuffle.spill.compress (replace it with existing spark.shuffle.compress)
- Resolved
- relates to
-
SPARK-3630 Identify cause of Kryo+Snappy PARSING_ERROR
- Resolved
- links to