Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.0
-
None
Description
Updated time:15/Nov/19
We saw this blog solved our confusion.
http://www.russellspitzer.com/2018/05/10/SparkPartitions/
--------------------------------------------------------
Updated time:15/Nov/19
I discussed this issue with my colleagues today. We think that spark has caused cross-border problems in the process of doing shuffle.
The problem may be in the Sort-based Shuffle stage. When the map task partition is too large, and the storage of the writerIndex variable uses int, writerIndex may cause cross-border problems. If this is the case, the variable writerIndex replaces int with long should solve the current problem.
--------------------------------------------------------
I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When Dataset<Row> partition size is small, the program works well. But when I increase the size of the Dataset<Row> partition , the function df.coalesce(10) will throw ChunkFetchFailureException.
19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
... 1 more
19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))
Let me explain more experimental details.
When each partition size is small:
- Storage Level: Memory Deserialized 1x Replicated
- Cached Partitions: 200
- Total Partitions: 200
- Memory Size: 356.2 GB
- Disk Size: 0.0 B
Data Distribution on 100 Executors
The log of the successful task is as follows:
19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver
When I increase the size of each partition:
- Storage Level: Disk Serialized 1x Replicated
- Cached Partitions: 200
- Total Partitions: 200
- Memory Size: 390.8 GB
- Disk Size: 166.8 GB
Data Distribution on 100 Executors
In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.