Skip to content

Commit fc30e05

Browse files
authored
Remove option to enable direct buffer pooling (#47956)
This commit removes the option to change the netty system properties to reenable the direct buffer pooling. It also removes the need for us to disable the buffer pooling in the system properties file. Instead, we programmatically craete an allocator that is used by our networking layer. This commit does introduce an Elasticsearch property which allows the user to fallback on the netty default allocator. If they choose this option, they can configure the default allocator how they wish using the standard netty properties.
1 parent f4ac711 commit fc30e05

File tree

11 files changed

+257
-62
lines changed

11 files changed

+257
-62
lines changed

buildSrc/src/main/groovy/org/elasticsearch/gradle/BuildPlugin.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -889,7 +889,6 @@ class BuildPlugin implements Plugin<Project> {
889889
test.systemProperty('io.netty.noUnsafe', 'true')
890890
test.systemProperty('io.netty.noKeySetOptimization', 'true')
891891
test.systemProperty('io.netty.recycler.maxCapacityPerThread', '0')
892-
test.systemProperty('io.netty.allocator.numDirectArenas', '0')
893892

894893
test.testLogging { TestLoggingContainer logging ->
895894
logging.showExceptions = true

distribution/src/config/jvm.options

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
-Dio.netty.noUnsafe=true
8181
-Dio.netty.noKeySetOptimization=true
8282
-Dio.netty.recycler.maxCapacityPerThread=0
83-
-Dio.netty.allocator.numDirectArenas=0
8483

8584
# log4j 2
8685
-Dlog4j.shutdownHookEnabled=false

distribution/tools/launchers/src/main/java/org/elasticsearch/tools/launchers/JvmErgonomics.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,6 @@ static List<String> choose(final List<String> userDefinedJvmOptions) throws Inte
5555
final List<String> ergonomicChoices = new ArrayList<>();
5656
final Map<String, Optional<String>> finalJvmOptions = finalJvmOptions(userDefinedJvmOptions);
5757
final long heapSize = extractHeapSize(finalJvmOptions);
58-
final Map<String, String> systemProperties = extractSystemProperties(userDefinedJvmOptions);
59-
if (systemProperties.containsKey("io.netty.allocator.type") == false) {
60-
if (heapSize <= 1 << 30) {
61-
ergonomicChoices.add("-Dio.netty.allocator.type=unpooled");
62-
} else {
63-
ergonomicChoices.add("-Dio.netty.allocator.type=pooled");
64-
}
65-
}
6658
final long maxDirectMemorySize = extractMaxDirectMemorySize(finalJvmOptions);
6759
if (maxDirectMemorySize == 0) {
6860
ergonomicChoices.add("-XX:MaxDirectMemorySize=" + heapSize / 2);

distribution/tools/launchers/src/test/java/org/elasticsearch/tools/launchers/JvmErgonomicsTests.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,20 +117,6 @@ public void testExtractNoSystemProperties() {
117117
assertTrue(parsedSystemProperties.isEmpty());
118118
}
119119

120-
public void testPooledMemoryChoiceOnSmallHeap() throws InterruptedException, IOException {
121-
final String smallHeap = randomFrom(Arrays.asList("64M", "512M", "1024M", "1G"));
122-
assertThat(
123-
JvmErgonomics.choose(Arrays.asList("-Xms" + smallHeap, "-Xmx" + smallHeap)),
124-
hasItem("-Dio.netty.allocator.type=unpooled"));
125-
}
126-
127-
public void testPooledMemoryChoiceOnNotSmallHeap() throws InterruptedException, IOException {
128-
final String largeHeap = randomFrom(Arrays.asList("1025M", "2048M", "2G", "8G"));
129-
assertThat(
130-
JvmErgonomics.choose(Arrays.asList("-Xms" + largeHeap, "-Xmx" + largeHeap)),
131-
hasItem("-Dio.netty.allocator.type=pooled"));
132-
}
133-
134120
public void testMaxDirectMemorySizeChoice() throws InterruptedException, IOException {
135121
final Map<String, String> heapMaxDirectMemorySize = Map.of(
136122
"64M", Long.toString((64L << 20) / 2),

modules/transport-netty4/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ integTestRunner {
6666
TaskProvider<Test> pooledTest = tasks.register("pooledTest", Test) {
6767
include '**/*Tests.class'
6868
systemProperty 'es.set.netty.runtime.available.processors', 'false'
69-
systemProperty 'io.netty.allocator.type', 'pooled'
69+
systemProperty 'es.use_unpooled_allocator', 'false'
7070
}
7171
// TODO: we can't use task avoidance here because RestIntegTestTask does the testcluster creation
7272
RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTestTask) {
@@ -75,7 +75,7 @@ RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTes
7575
}
7676
}
7777
testClusters.pooledIntegTest {
78-
systemProperty 'io.netty.allocator.type', 'pooled'
78+
systemProperty 'es.use_unpooled_allocator', 'false'
7979
}
8080
check.dependsOn(pooledTest, pooledIntegTest)
8181

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.http.netty4;
2121

2222
import io.netty.bootstrap.ServerBootstrap;
23-
import io.netty.buffer.ByteBufAllocator;
2423
import io.netty.channel.Channel;
2524
import io.netty.channel.ChannelFuture;
2625
import io.netty.channel.ChannelHandler;
@@ -32,7 +31,6 @@
3231
import io.netty.channel.RecvByteBufAllocator;
3332
import io.netty.channel.nio.NioEventLoopGroup;
3433
import io.netty.channel.socket.nio.NioChannelOption;
35-
import io.netty.channel.socket.nio.NioServerSocketChannel;
3634
import io.netty.handler.codec.ByteToMessageDecoder;
3735
import io.netty.handler.codec.http.HttpContentCompressor;
3836
import io.netty.handler.codec.http.HttpContentDecompressor;
@@ -63,7 +61,7 @@
6361
import org.elasticsearch.http.HttpServerChannel;
6462
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
6563
import org.elasticsearch.threadpool.ThreadPool;
66-
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
64+
import org.elasticsearch.transport.NettyAllocator;
6765
import org.elasticsearch.transport.netty4.Netty4Utils;
6866

6967
import java.net.InetSocketAddress;
@@ -186,14 +184,12 @@ protected void doStart() {
186184
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
187185
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
188186

189-
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
190-
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
191-
// per-event-loop thread to be used for IO operations.
192-
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
193-
serverBootstrap.channel(NioServerSocketChannel.class);
194-
} else {
195-
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
196-
}
187+
// NettyAllocator will return the channel type designed to work with the configuredAllocator
188+
serverBootstrap.channel(NettyAllocator.getServerChannelType());
189+
190+
// Set the allocators for both the server channel and the child channels created
191+
serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
192+
serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
197193

198194
serverBootstrap.childHandler(configureServerChannelHandler());
199195
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.transport;
21+
22+
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.ByteBufAllocator;
24+
import io.netty.buffer.CompositeByteBuf;
25+
import io.netty.buffer.PooledByteBufAllocator;
26+
import io.netty.buffer.UnpooledByteBufAllocator;
27+
import io.netty.channel.Channel;
28+
import io.netty.channel.ServerChannel;
29+
import io.netty.channel.socket.nio.NioServerSocketChannel;
30+
import io.netty.channel.socket.nio.NioSocketChannel;
31+
import org.elasticsearch.common.Booleans;
32+
import org.elasticsearch.monitor.jvm.JvmInfo;
33+
34+
public class NettyAllocator {
35+
36+
private static final ByteBufAllocator ALLOCATOR;
37+
38+
private static final String USE_UNPOOLED = "es.use_unpooled_allocator";
39+
private static final String USE_NETTY_DEFAULT = "es.unsafe.use_netty_default_allocator";
40+
41+
static {
42+
if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
43+
ALLOCATOR = ByteBufAllocator.DEFAULT;
44+
} else {
45+
ByteBufAllocator delegate;
46+
if (useUnpooled()) {
47+
delegate = new NoDirectBuffers(UnpooledByteBufAllocator.DEFAULT);
48+
} else {
49+
int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena();
50+
int pageSize = PooledByteBufAllocator.defaultPageSize();
51+
int maxOrder = PooledByteBufAllocator.defaultMaxOrder();
52+
int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize();
53+
int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize();
54+
int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize();
55+
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
56+
delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize,
57+
smallCacheSize, normalCacheSize, useCacheForAllThreads);
58+
}
59+
ALLOCATOR = new NoDirectBuffers(delegate);
60+
}
61+
}
62+
63+
public static boolean useCopySocket() {
64+
return ALLOCATOR instanceof NoDirectBuffers;
65+
}
66+
67+
public static ByteBufAllocator getAllocator() {
68+
return ALLOCATOR;
69+
}
70+
71+
public static Class<? extends Channel> getChannelType() {
72+
if (ALLOCATOR instanceof NoDirectBuffers) {
73+
return CopyBytesSocketChannel.class;
74+
} else {
75+
return NioSocketChannel.class;
76+
}
77+
}
78+
79+
public static Class<? extends ServerChannel> getServerChannelType() {
80+
if (ALLOCATOR instanceof NoDirectBuffers) {
81+
return CopyBytesServerSocketChannel.class;
82+
} else {
83+
return NioServerSocketChannel.class;
84+
}
85+
}
86+
87+
private static boolean useUnpooled() {
88+
if (System.getProperty(USE_UNPOOLED) != null) {
89+
return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED));
90+
} else {
91+
long heapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
92+
return heapSize <= 1 << 30;
93+
}
94+
}
95+
96+
private static class NoDirectBuffers implements ByteBufAllocator {
97+
98+
private final ByteBufAllocator delegate;
99+
100+
private NoDirectBuffers(ByteBufAllocator delegate) {
101+
this.delegate = delegate;
102+
}
103+
104+
@Override
105+
public ByteBuf buffer() {
106+
return heapBuffer();
107+
}
108+
109+
@Override
110+
public ByteBuf buffer(int initialCapacity) {
111+
return heapBuffer(initialCapacity);
112+
}
113+
114+
@Override
115+
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
116+
return heapBuffer(initialCapacity, maxCapacity);
117+
}
118+
119+
@Override
120+
public ByteBuf ioBuffer() {
121+
return heapBuffer();
122+
}
123+
124+
@Override
125+
public ByteBuf ioBuffer(int initialCapacity) {
126+
return heapBuffer(initialCapacity);
127+
}
128+
129+
@Override
130+
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
131+
return heapBuffer(initialCapacity, maxCapacity);
132+
}
133+
134+
@Override
135+
public ByteBuf heapBuffer() {
136+
return delegate.heapBuffer();
137+
}
138+
139+
@Override
140+
public ByteBuf heapBuffer(int initialCapacity) {
141+
return delegate.heapBuffer(initialCapacity);
142+
}
143+
144+
@Override
145+
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
146+
return delegate.heapBuffer(initialCapacity, maxCapacity);
147+
}
148+
149+
@Override
150+
public ByteBuf directBuffer() {
151+
// TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the
152+
// JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap
153+
// ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException
154+
return heapBuffer();
155+
}
156+
157+
@Override
158+
public ByteBuf directBuffer(int initialCapacity) {
159+
// TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the
160+
// JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap
161+
// ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException
162+
return heapBuffer(initialCapacity);
163+
}
164+
165+
@Override
166+
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
167+
// TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the
168+
// JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap
169+
// ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException
170+
return heapBuffer(initialCapacity, maxCapacity);
171+
}
172+
173+
@Override
174+
public CompositeByteBuf compositeBuffer() {
175+
return compositeHeapBuffer();
176+
}
177+
178+
@Override
179+
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
180+
return compositeHeapBuffer(maxNumComponents);
181+
}
182+
183+
@Override
184+
public CompositeByteBuf compositeHeapBuffer() {
185+
return delegate.compositeHeapBuffer();
186+
}
187+
188+
@Override
189+
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
190+
return delegate.compositeHeapBuffer(maxNumComponents);
191+
}
192+
193+
@Override
194+
public CompositeByteBuf compositeDirectBuffer() {
195+
throw new UnsupportedOperationException("Direct buffers not supported.");
196+
}
197+
198+
@Override
199+
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
200+
throw new UnsupportedOperationException("Direct buffers not supported.");
201+
}
202+
203+
@Override
204+
public boolean isDirectBufferPooled() {
205+
assert delegate.isDirectBufferPooled() == false;
206+
return false;
207+
}
208+
209+
@Override
210+
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
211+
return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
212+
}
213+
}
214+
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import io.netty.bootstrap.Bootstrap;
2323
import io.netty.bootstrap.ServerBootstrap;
24-
import io.netty.buffer.ByteBufAllocator;
2524
import io.netty.channel.AdaptiveRecvByteBufAllocator;
2625
import io.netty.channel.Channel;
2726
import io.netty.channel.ChannelFuture;
@@ -34,8 +33,6 @@
3433
import io.netty.channel.RecvByteBufAllocator;
3534
import io.netty.channel.nio.NioEventLoopGroup;
3635
import io.netty.channel.socket.nio.NioChannelOption;
37-
import io.netty.channel.socket.nio.NioServerSocketChannel;
38-
import io.netty.channel.socket.nio.NioSocketChannel;
3936
import io.netty.util.AttributeKey;
4037
import io.netty.util.concurrent.Future;
4138
import org.apache.logging.log4j.LogManager;
@@ -59,8 +56,7 @@
5956
import org.elasticsearch.core.internal.net.NetUtils;
6057
import org.elasticsearch.indices.breaker.CircuitBreakerService;
6158
import org.elasticsearch.threadpool.ThreadPool;
62-
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
63-
import org.elasticsearch.transport.CopyBytesSocketChannel;
59+
import org.elasticsearch.transport.NettyAllocator;
6460
import org.elasticsearch.transport.TcpTransport;
6561
import org.elasticsearch.transport.TransportSettings;
6662

@@ -152,13 +148,9 @@ private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
152148
final Bootstrap bootstrap = new Bootstrap();
153149
bootstrap.group(eventLoopGroup);
154150

155-
// If direct buffer pooling is disabled, use the CopyBytesSocketChannel which will pool a single
156-
// direct buffer per-event-loop thread which will be used for IO operations.
157-
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
158-
bootstrap.channel(NioSocketChannel.class);
159-
} else {
160-
bootstrap.channel(CopyBytesSocketChannel.class);
161-
}
151+
// NettyAllocator will return the channel type designed to work with the configured allocator
152+
bootstrap.channel(NettyAllocator.getChannelType());
153+
bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
162154

163155
bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
164156
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
@@ -216,14 +208,12 @@ private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoop
216208

217209
serverBootstrap.group(eventLoopGroup);
218210

219-
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
220-
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
221-
// per-event-loop thread to be used for IO operations.
222-
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
223-
serverBootstrap.channel(NioServerSocketChannel.class);
224-
} else {
225-
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
226-
}
211+
// NettyAllocator will return the channel type designed to work with the configuredAllocator
212+
serverBootstrap.channel(NettyAllocator.getServerChannelType());
213+
214+
// Set the allocators for both the server channel and the child channels created
215+
serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
216+
serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
227217

228218
serverBootstrap.childHandler(getServerChannelInitializer(name));
229219
serverBootstrap.handler(new ServerChannelExceptionHandler());

0 commit comments

Comments
 (0)