Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3426

Sort-based shuffle compression behavior is inconsistent

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.1.0, 1.2.0
    • 1.1.1, 1.2.0
    • Spark Core
    • None

    Description

      We have the following configs:

      spark.shuffle.compress
      spark.shuffle.spill.compress
      

      When these two diverge, sort-based shuffle fails with a compression exception under certain workloads. This is because in sort-based shuffle we serve the index file (using spark.shuffle.spill.compress) as a normal shuffle file (using spark.shuffle.compress). It was unfortunate in retrospect that these two configs were exposed so we can't easily remove them.

      Here is how this can be reproduced. Set the following in your spark-defaults.conf:

      spark.master                  local-cluster[1,1,512]
      spark.shuffle.spill.compress  false
      spark.shuffle.compress        true
      spark.shuffle.manager         sort
      spark.shuffle.memoryFraction  0.001
      

      Then run the following in spark-shell:

      sc.parallelize(0 until 100000).map(i => (i/4, i)).groupByKey().collect()
      

      This leads to compression errors, such as the following:

      [info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, joshs-mbp): java.io.IOException: FAILED_TO_UNCOMPRESS(5)
      [info]         org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
      [info]         org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
      [info]         org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
      [info]         org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
      [info]         org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
      [info]         org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
      [info]         org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
      [info]         org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
      [info]         org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090)
      [info]         org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:350)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:196)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
      [info]         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
      [info]         org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
      [info]         org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      [info]         org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
      [info]         org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
      [info]         org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
      [info]         org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
      [info]         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      [info]         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      [info]         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      [info]         org.apache.spark.scheduler.Task.run(Task.scala:56)
      [info]         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
      [info]         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      [info]         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      [info]         java.lang.Thread.run(Thread.java:745)
      

      Similarly, with

      spark.shuffle.spill.compress  true
      spark.shuffle.compress        false
      

      we see

      info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 8, joshs-mbp): java.io.StreamCorruptedException: invalid stream header: 82534E41
      [info]         java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
      [info]         java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
      [info]         org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:57)
      [info]         org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:57)
      [info]         org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:95)
      [info]         org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:355)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$fetchLocalBlocks$1$$anonfun$apply$4.apply(ShuffleBlockFetcherIterator.scala:197)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:244)
      [info]         org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
      [info]         scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
      [info]         org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
      [info]         org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      [info]         org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
      [info]         org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
      [info]         org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
      [info]         org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
      [info]         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      [info]         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      [info]         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      [info]         org.apache.spark.scheduler.Task.run(Task.scala:56)
      [info]         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
      [info]         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      [info]         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      [info]         java.lang.Thread.run(Thread.java:745)
      

      Attachments

        Issue Links

          Activity

            People

              joshrosen Josh Rosen
              andrewor14 Andrew Or
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: