Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30677

Spark Streaming Job stuck when Kinesis Shard is increased when the job is running

Details

    Description

      Spark job stopped processing when the number of shards is increased when the job is already running.

      We have observed the below exceptions.

       

      2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
      2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
      2020-01-27 06:42:29 ERROR FileBasedWriteAheadLog_ReceivedBlockTracker:70 - Failed to write to write ahead log after 3 failures
      2020-01-27 06:42:29 WARN BatchedWriteAheadLog:87 - BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 lim=1845 cap=1845],1580107349095,Future(<not completed>)))
      java.io.IOException: Not supported
      at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
      at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
      at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
      at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
      at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
      at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
      at java.lang.Thread.run(Thread.java:748)
      2020-01-27 06:42:29 WARN ReceivedBlockTracker:87 - Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(36),Some(SequenceNumberRanges(SequenceNumberRange(XXXXXXXXXXX,shardId-000000000006,49603657998853972269624727295162770770442241924489281634,49603657998853972269624727295206292099948368574778703970,36))),WriteAheadLogBasedStoreResult(input-0-1580106915391,Some(36),FileBasedWriteAheadLogSegment(s3://XXXXXXXXXXX/spark/checkpoint/XX/XXXXXXXXXXX/receivedData/0/log-1580107349000-1580107409000,0,31769)))) to the WriteAheadLog.
      org.apache.spark.SparkException: Exception thrown in awaitResult:
      at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
      at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
      at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
      at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
      at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
      at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
      at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
      at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Not supported
      at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
      at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
      at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
      at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
      at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
      at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
      ... 1 more
      2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-0-1580106915392 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
      2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349323-1580107409323
      2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-3-1580106915123 in memory on XXXXXXXXXXX:42027 (size: 25.9 KB, free: 3.4 GB)
      2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349908-1580107409908
      2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-2-1580106915311 in memory on XXXXXXXXXXX:38393 (size: 29.3 KB, free: 3.4 GB)
      2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-0-1580106915393 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
      2020-01-27 06:42:30 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107350000-1580107410000
      2020-01-27 06:42:30 INFO JobScheduler:54 - Added jobs for time 1580107350000 ms
      2020-01-27 06:42:30 INFO JobGenerator:54 - Checkpointing graph for time 1580107350000 ms
      2020-01-27 06:42:30 INFO DStreamGraph:54 - Updating checkpoint data for time 1580107350000 ms
      2020-01-27 06:42:30 INFO DStreamGraph:54 - Updated checkpoint data for time 1580107350000 ms
      2020-01-27 06:42:30 INFO CheckpointWriter:54 - Submitted checkpoint of time 1580107350000 ms to writer queue

       

      Note : 
      1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket

      2. Spark submit Configuration as below:

      spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory 4608M
      --conf spark.yarn.driver.memoryOverhead=710M
      --conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3 --executor-cores 3
      --conf spark.dynamicAllocation.minExecutors=1
      --conf spark.dynamicAllocation.maxExecutors=2
      --conf spark.dynamicAllocation.initialExecutors=2
      --conf spark.locality.wait.node=0
      --conf spark.dynamicAllocation.enabled=true
      --conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
      --conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
      --conf spark.scheduler.mode=FAIR
      --conf spark.metrics.conf=XXXXXXXXXXXX.properties --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
      --conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
      --conf spark.streaming.receiver.writeAheadLog.enable=true
      --conf spark.streaming.receiver.blockStoreTimeout=59
      --conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
      --conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &

      3. EMR Version - 5.26

      4. Hadoop Distribution - Amazon 2.8.5

      5. Hardware Config

      • Master (3 instances - Multi Master Cluster)
        c5.2xlarge
        8 vCore, 16 GiB memory, EBS only storage
        EBS Storage:64 GiB
      • Core (6 instances [Min - 2, Max - 6])
        c5.4xlarge
        16 vCore, 32 GiB memory, EBS only storage
        EBS Storage:1000 GiB

      6. There are 3 spark jobs running on the same cluster

      7. Streaming - Kinesis

      8. Cluster Config and Instance Config is attached

      Attachments

        1. Cluster-Config-P1.JPG
          116 kB
          Mullaivendhan Ariaputhri
        2. Cluster-Config-P2.JPG
          32 kB
          Mullaivendhan Ariaputhri
        3. Instance-Config-P1.JPG
          118 kB
          Mullaivendhan Ariaputhri
        4. Instance-Config-P2.JPG
          35 kB
          Mullaivendhan Ariaputhri

        Activity

          No work has yet been logged on this issue.

          People

            Unassigned Unassigned
            jasmineemullai@gmail.com Mullaivendhan Ariaputhri
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: