Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.0
Description
Spark's networking layer supports sending messages backed by a FileRegion or a ByteBuf. Sending large FileRegion's works, as netty supports large FileRegions. However, ByteBuf is limited to 2GB. This is particularly a problem for sending large datasets that are already in memory, eg. cached RDD blocks.
eg. if you try to replicate a block stored in memory that is over 2 GB, you will see an exception like:
18/05/16 12:40:57 ERROR client.TransportClient: Failed to send RPC 7420542363232096629 to xyz.com/172.31.113.213:44358: io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291)) io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291)) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081) at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128) at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291)) at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:129) at io.netty.buffer.CompositeByteBuf.setIndex(CompositeByteBuf.java:1688) at io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:110) at io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:359) at org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:87) at org.apache.spark.storage.ByteBufferBlockData.toNetty(BlockManager.scala:95) at org.apache.spark.storage.BlockManagerManagedBuffer.convertToNetty(BlockManagerManagedBuffer.scala:52) at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:58) at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:33) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88) ... 17 more
A simple solution to this is to create a "FileRegion" which is backed by a ChunkedByteBuffer (spark's existing datastructure to support blocks > 2GB in memory).
A drawback to this approach is that blocks that are cached in memory as deserialized values would need to have the entire block serialized into memory before it can be pushed. However, that would involve a larger change to the block manager as well, and is not strictly necessary, so can be handled separately as a performance improvement.