Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.3
-
None
Description
Spark job stopped processing when the number of shards is increased when the job is already running.
We have observed the below exceptions.
2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
2020-01-27 06:42:29 WARN FileBasedWriteAheadLog_ReceivedBlockTracker:66 - Failed to write to write ahead log
2020-01-27 06:42:29 ERROR FileBasedWriteAheadLog_ReceivedBlockTracker:70 - Failed to write to write ahead log after 3 failures
2020-01-27 06:42:29 WARN BatchedWriteAheadLog:87 - BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=0 lim=1845 cap=1845],1580107349095,Future(<not completed>)))
java.io.IOException: Not supported
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
at java.lang.Thread.run(Thread.java:748)
2020-01-27 06:42:29 WARN ReceivedBlockTracker:87 - Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(36),Some(SequenceNumberRanges(SequenceNumberRange(XXXXXXXXXXX,shardId-000000000006,49603657998853972269624727295162770770442241924489281634,49603657998853972269624727295206292099948368574778703970,36))),WriteAheadLogBasedStoreResult(input-0-1580106915391,Some(36),FileBasedWriteAheadLogSegment(s3://XXXXXXXXXXX/spark/checkpoint/XX/XXXXXXXXXXX/receivedData/0/log-1580107349000-1580107409000,0,31769)))) to the WriteAheadLog.
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:84)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:242)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:89)
at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:347)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:522)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:520)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Not supported
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.append(S3NativeFileSystem.java:588)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1181)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.append(EmrFileSystem.java:295)
at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream$lzycompute(FileBasedWriteAheadLogWriter.scala:32)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.stream(FileBasedWriteAheadLogWriter.scala:32)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:35)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:229)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:94)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:50)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:175)
at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:142)
... 1 more
2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-0-1580106915392 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349323-1580107409323
2020-01-27 06:42:29 INFO BlockManagerInfo:54 - Added input-3-1580106915123 in memory on XXXXXXXXXXX:42027 (size: 25.9 KB, free: 3.4 GB)
2020-01-27 06:42:29 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107349908-1580107409908
2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-2-1580106915311 in memory on XXXXXXXXXXX:38393 (size: 29.3 KB, free: 3.4 GB)
2020-01-27 06:42:30 INFO BlockManagerInfo:54 - Added input-0-1580106915393 in memory on XXXXXXXXXXX:38393 (size: 31.0 KB, free: 3.4 GB)
2020-01-27 06:42:30 INFO MultipartUploadOutputStream:414 - close closed:false s3://XXXXXXXXXXX/spark/checkpoint/XXXXXXXXXXX/XXXXXXXXXXX/receivedBlockMetadata/log-1580107350000-1580107410000
2020-01-27 06:42:30 INFO JobScheduler:54 - Added jobs for time 1580107350000 ms
2020-01-27 06:42:30 INFO JobGenerator:54 - Checkpointing graph for time 1580107350000 ms
2020-01-27 06:42:30 INFO DStreamGraph:54 - Updating checkpoint data for time 1580107350000 ms
2020-01-27 06:42:30 INFO DStreamGraph:54 - Updated checkpoint data for time 1580107350000 ms
2020-01-27 06:42:30 INFO CheckpointWriter:54 - Submitted checkpoint of time 1580107350000 ms to writer queue
Note :
1. Writeahead logs and Checkpoint is being maitained in AWS S3 bucket
2. Spark submit Configuration as below:
spark-submit --deploy-mode cluster --executor-memory 4608M --driver-memory 4608M
--conf spark.yarn.driver.memoryOverhead=710M
--conf spark.yarn.executor.memoryOverhead=710M --driver-cores 3 --executor-cores 3
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=2
--conf spark.dynamicAllocation.initialExecutors=2
--conf spark.locality.wait.node=0
--conf spark.dynamicAllocation.enabled=true
--conf maximizeResourceAllocation=false --class XXXXXXXXXXXX
--conf spark.streaming.driver.writeAheadLog.closeFileAfterWrite=true
--conf spark.scheduler.mode=FAIR
--conf spark.metrics.conf=XXXXXXXXXXXX.properties --files=s3://XXXXXXXXXXXX/XXXXXXXXXXXX.properties
--conf spark.streaming.receiver.writeAheadLog.closeFileAfterWrite=true
--conf spark.streaming.receiver.writeAheadLog.enable=true
--conf spark.streaming.receiver.blockStoreTimeout=59
--conf spark.streaming.driver.writeAheadLog.batchingTimeout=30000
--conf spark.streaming.receiver.maxRate=120 s3://XXXXXXXXXXXX/XXXXXXXXXXXX.jar yarn XXXXXXXXXXXX applicationContext-XXXXXXXXXXXX-streaming.xml root kinesis 60 &
3. EMR Version - 5.26
4. Hadoop Distribution - Amazon 2.8.5
5. Hardware Config
- Master (3 instances - Multi Master Cluster)
c5.2xlarge
8 vCore, 16 GiB memory, EBS only storage
EBS Storage:64 GiB - Core (6 instances [Min - 2, Max - 6])
c5.4xlarge
16 vCore, 32 GiB memory, EBS only storage
EBS Storage:1000 GiB
6. There are 3 spark jobs running on the same cluster
7. Streaming - Kinesis
8. Cluster Config and Instance Config is attached