Skip to content

Commit

Permalink
Disable netty direct buffer pooling by default (#44837)
Browse files Browse the repository at this point in the history
Elasticsearch does not grant Netty reflection access to get Unsafe. The
only mechanism that currently exists to free direct buffers in a timely
manner is to use Unsafe. This leads to the occasional scenario, under
heavy network load, that direct byte buffers can slowly build up without
being freed.

This commit disables Netty direct buffer pooling and moves to a strategy
of using a single thread-local direct buffer for interfacing with sockets.
This will reduce the memory usage from networking. Elasticsearch
currently derives very little value from direct buffer usage (TLS,
compression, Lucene, Elasticsearch handling, etc all use heap bytes). So
this seems like the correct trade-off until that changes.
  • Loading branch information
Tim-Brooks authored Aug 8, 2019
1 parent fd4acb3 commit e0f9d61
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ static List<String> choose(final List<String> userDefinedJvmOptions) throws Inte
ergonomicChoices.add("-Dio.netty.allocator.type=pooled");
}
}
if (systemProperties.containsKey("io.netty.allocator.numDirectArenas") == false) {
ergonomicChoices.add("-Dio.netty.allocator.numDirectArenas=0");
}
final long maxDirectMemorySize = extractMaxDirectMemorySize(finalJvmOptions);
if (maxDirectMemorySize == 0) {
ergonomicChoices.add("-XX:MaxDirectMemorySize=" + heapSize / 2);
Expand Down
6 changes: 6 additions & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ test {
* other if we allow them to set the number of available processors as it's set-once in Netty.
*/
systemProperty 'es.set.netty.runtime.available.processors', 'false'

// Disable direct buffer pooling as it is disabled by default in Elasticsearch
systemProperty 'io.netty.allocator.numDirectArenas', '0'
}

integTestRunner {
Expand All @@ -59,6 +62,9 @@ integTestRunner {
* other if we allow them to set the number of available processors as it's set-once in Netty.
*/
systemProperty 'es.set.netty.runtime.available.processors', 'false'

// Disable direct buffer pooling as it is disabled by default in Elasticsearch
systemProperty 'io.netty.allocator.numDirectArenas', '0'
}

thirdPartyAudit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.http.netty4;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.net.InetSocketAddress;
Expand Down Expand Up @@ -145,7 +147,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {

private final int maxCompositeBufferComponents;

protected volatile ServerBootstrap serverBootstrap;
private volatile ServerBootstrap serverBootstrap;

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
Expand Down Expand Up @@ -183,7 +185,15 @@ protected void doStart() {

serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.channel(NioServerSocketChannel.class);

// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}

serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;

import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.SocketUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.nio.channels.SocketChannel;
import java.util.List;

/**
* This class is adapted from {@link NioServerSocketChannel} class in the Netty project. It overrides the
* channel read messages behavior to ensure that a {@link CopyBytesSocketChannel} socket channel is created.
*/
public class CopyBytesServerSocketChannel extends NioServerSocketChannel {

private static final Logger logger = LogManager.getLogger(CopyBytesServerSocketChannel.class);

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
buf.add(new CopyBytesSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.elasticsearch.common.SuppressForbidden;

import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Objects;

import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;


/**
* This class is adapted from {@link NioSocketChannel} class in the Netty project. It overrides the channel
* read/write behavior to ensure that the bytes are always copied to a thread-local direct bytes buffer. This
* happens BEFORE the call to the Java {@link SocketChannel} is issued.
*
* The purpose of this class is to allow the disabling of netty direct buffer pooling while allowing us to
* control how bytes end up being copied to direct memory. If we simply disabled netty pooling, we would rely
* on the JDK's internal thread local buffer pooling. Instead, this class allows us to create a one thread
* local buffer with a defined size.
*/
@SuppressForbidden(reason = "Channel#write")
public class CopyBytesSocketChannel extends NioSocketChannel {

private static final int MAX_BYTES_PER_WRITE = 1 << 20;

private static final ThreadLocal<ByteBuffer> ioBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(MAX_BYTES_PER_WRITE));
private final WriteConfig writeConfig = new WriteConfig();

public CopyBytesSocketChannel() {
super();
}

CopyBytesSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
}

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}

// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
assert Arrays.stream(nioBuffers).filter(Objects::nonNull).noneMatch(ByteBuffer::isDirect) : "Expected all to be heap buffers";
int nioBufferCnt = in.nioBufferCount();

if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
} else {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer ioBuffer = getIoBuffer();
copyBytes(nioBuffers, nioBufferCnt, ioBuffer);
ioBuffer.flip();

int attemptedBytes = ioBuffer.remaining();
final int localWrittenBytes = ch.write(ioBuffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
}
} while (writeSpinCount > 0);

incompleteWrite(writeSpinCount < 0);
}

@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
ByteBuffer ioBuffer = getIoBuffer();
int bytesRead = javaChannel().read(ioBuffer);
ioBuffer.flip();
if (bytesRead > 0) {
byteBuf.writeBytes(ioBuffer);
}
return bytesRead;
}

private static ByteBuffer getIoBuffer() {
ByteBuffer ioBuffer = CopyBytesSocketChannel.ioBuffer.get();
ioBuffer.clear();
return ioBuffer;
}

private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
// By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
// SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
// make a best effort to adjust as OS behavior changes.
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
writeConfig.setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
writeConfig.setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}

private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
ByteBuffer buffer = source[i];
int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
int initialLimit = buffer.limit();
buffer.limit(buffer.position() + nBytesToCopy);
destination.put(buffer);
buffer.limit(initialLimit);
}
}

private final class WriteConfig {

private volatile int maxBytesPerGatheringWrite = MAX_BYTES_PER_WRITE;

private WriteConfig() {
calculateMaxBytesPerGatheringWrite();
}

void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = Math.min(maxBytesPerGatheringWrite, MAX_BYTES_PER_WRITE);
}

int getMaxBytesPerGatheringWrite() {
return maxBytesPerGatheringWrite;
}

private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = config().getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(config().getSendBufferSize() << 1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -58,6 +59,8 @@
import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.CopyBytesSocketChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;

Expand Down Expand Up @@ -148,7 +151,14 @@ protected void doStart() {
private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);

// If direct buffer pooling is disabled, use the CopyBytesSocketChannel which will pool a single
// direct buffer per-event-loop thread which will be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
bootstrap.channel(NioSocketChannel.class);
} else {
bootstrap.channel(CopyBytesSocketChannel.class);
}

bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
Expand Down Expand Up @@ -205,7 +215,15 @@ private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoop
final ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(eventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);

// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}

serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.handler(new ServerChannelExceptionHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void close() throws IOException {
* Wraps the {@link RefCountedNioGroup}. Calls {@link RefCountedNioGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
private class WrappedNioGroup implements NioGroup {
private static class WrappedNioGroup implements NioGroup {

private final RefCountedNioGroup refCountedNioGroup;

Expand Down
Loading

0 comments on commit e0f9d61

Please sign in to comment.