Details
-
Bug
-
Status: In Progress
-
Minor
-
Resolution: Unresolved
-
3.0.0, 3.0.1, 3.1.0
-
None
-
None
Description
I used this command to run SparkPi on a yarn cluster with dynamicAllocation enabled: "$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ./spark-examples.jar 1000" and received error log below every time.
20/05/06 16:31:44 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:169) at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:684) at org.apache.spark.network.server.AbstractAuthRpcHandler.receive(AbstractAuthRpcHandler.java:66) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) 20/05/06 16:31:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 20/05/06 16:31:45 INFO MemoryStore: MemoryStore cleared 20/05/06 16:31:45 INFO BlockManager: BlockManager stopped
After some investigation, I found this issue might be introduced in https://github.com/apache/spark/pull/25964. There is a race between driver backend and executor backend that could happen when driver shutdown.
PR#25964 added a new message type LaunchedExecutor and updated the communication mechanism between executor and driver when launching executor to:
- executor backend sends "RegisterExecutor" to the driver backend.
- the driver backend replies "true".
- executor backend instantiates executor once it receives "true" from driver backend.
- after the executor is instantiated, the executor backend sends "LaunchedExecutor" to the driver backend.
- the driver backend makes offers for executor when received "LaunchedExecutor".
So the issue occurs in steps 3 and 4. If the driver backend is stopped(hence driver endpoint removed in dispatcher) during step 3, in step 4, when executor backend tries to send "LaunchedExecutor" to driver backend, RPC dispatcher will throw a SparkException for "Could not find CoarseGrainedScheduler". These exception logs are verbose and somewhat misleading.
This race can be fixed or greatly alleviated through these changes:
When the stop() in CoarseGrainedSchedulerBackend is called:
- A stopping boolean variable is set to true.
- driverEndpoint will not be stopped at this time. (dispatcher will stop it at the end)
And when the stopping is set to true, the driver backend will:
- replies sendFailure to executor backend when receives "RegisterExecutor".
- replies "StopExecutor" to executor backend (or "RemoveExecutor" to self) when receives "LaunchedExecutor"
Attachments
Issue Links
- is related to
-
SPARK-28488 Race in k8s scheduler shutdown can lead to misleading exceptions.
- Open
- links to