Skip to content

Improve TaskBatcher performance in case of a datacenter failure #41406

Closed
@incubos

Description

@incubos

We have a production cluster containing several hundred shards distributed between several datacenters. In case of a datacenter failure it takes 10-15 min for the cluster to converge and resume normal functioning, which is not quite acceptable.
Stack trace analysis shows that all transport_server_worker.default threads are blocked on TaskBatcher.submitTasks() during that downtime interval:

"elasticsearch[XXX][[transport_server_worker.default]][T#72]" - Thread t@251
   java.lang.Thread.State: BLOCKED
        at org.elasticsearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:70)
        - waiting to lock <7d67b8b6> (a java.util.HashMap) owned by "elasticsearch[XXX][[transport_server_worker.default]][T#91]" t@270
        at org.elasticsearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:476)
        at org.elasticsearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:450)
        at org.elasticsearch.cluster.action.shard.ShardStateAction$ShardFailedTransportHandler.messageReceived(ShardStateAction.java:211)
        at org.elasticsearch.cluster.action.shard.ShardStateAction$ShardFailedTransportHandler.messageReceived(ShardStateAction.java:197)
        at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)
        at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:69)
        at org.elasticsearch.transport.TcpTransport$RequestHandler.doRun(TcpTransport.java:1556)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
        at org.elasticsearch.common.util.concurrent.EsExecutors$1.execute(EsExecutors.java:110)
        at org.elasticsearch.transport.TcpTransport.handleRequest(TcpTransport.java:1513)
        at org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1396)
        at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:75)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:544)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at java.lang.Thread.run(Thread.java:745)

At the same time TaskBatcher queue contains tens of thousands tasks.
There are two major causes for the bottleneck:

  1. TaskBatcher effectively serializes attempts to submit state update tasks due to use of serialized(tasksPerBatchingKey)
  2. TaskBatcher compares each added task to all the existing tasks in O(n) to ensure no duplicate exists

The problem can be easily reproduced using v5.6.11 release. TaskBatcher remains unchanged in master.

Metadata

Metadata

Assignees

No one assigned

    Labels

    :Distributed Indexing/DistributedA catch all label for anything in the Distributed Indexing Area. Please avoid if you can.Team:Distributed (Obsolete)Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions