Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Jul 9, 2020

What changes were proposed in this pull request?

This PR introduced a new internal configuration property spark.localCluster.shutdown.wait.timeoutMs to shutdown local cluster gracefully.

Why are the changes needed?

Almost every time we call sc.stop with local cluster mode, like following exceptions will be thrown.

20/07/09 08:36:45 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:167)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:691)
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

It's confusable for users so I think it's better to fix.

The reason is the asynchronously sent RPC message KillExecutor from Master can be processed after the message loop stops in Worker.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Before applying this change, we can almost always reproduce the issue with the following command.

$ bin/spark-shell --master "local-cluster[10, 1, 1024]" 
scala> :q

I confirmed no more exception thrown after this change with the same command shown above.

@sarutak
Copy link
Member Author

sarutak commented Jul 9, 2020

Even applying this change, like the following warn message can be shown but it happens even with standalone cluster mode so it should be a separate issue.

20/07/09 11:56:39 WARN Master: Got status update for unknown executor app-20200709115631-0000/0

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125429 has finished for PR 29049 at commit 3607274.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Jul 9, 2020

I've actually opened a fix for the same issue in #28746. However, people does not very like this undetermined solution, though it's the simplest way to fix.

@sarutak
Copy link
Member Author

sarutak commented Jul 9, 2020

Ah, I didn't notice the existing PR. Yes, I think this is a simplest compromise but it's not deterministic.
Will you continue to work on the PR? If so, I'll close this PR.

@Ngone51
Copy link
Member

Ngone51 commented Jul 9, 2020

I'd like to continue the work and please also give more inputs if possible. Thanks!

@sarutak
Copy link
Member Author

sarutak commented Jul 9, 2020

All right. Please go ahead.

@sarutak sarutak closed this Jul 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants