Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-39152

StreamCorruptedException cause job failure for disk persisted RDD

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.4.0
    • 3.4.0
    • Block Manager
    • None

    Description

      In case of a disk corruption a disk persisted RDD block will lead to job failure as the block registration is always leads to the same file. So even when the task is rescheduled on a different executor the job will fail.

      Example

      First failure (the block is locally available):

      22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in stage 12.0 (TID 51853)
      java.io.StreamCorruptedException: invalid stream header: 00000000
        at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
        at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
        at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
        at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
        at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
        at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
      

      Then the task might be rescheduled on a different executor but as the block is registered to the first blockmanager the error will be the same:

      java.io.StreamCorruptedException: invalid stream header: 00000000
        at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
        at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
        at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
        at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
        at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
        at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698)
        at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
      

      My idea is to retry the IO operations a few times and when all of them failed deregistering the block and let the following task to recompute it.

      Attachments

        Activity

          People

            attilapiros Attila Zsolt Piros
            attilapiros Attila Zsolt Piros
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: