Skip to content

Commit 258f392

Browse files
authored
Merge branch 'main' into 0.18.0_script
Signed-off-by: Karen X <karenxyr@gmail.com>
2 parents 3bd9a21 + cef8d98 commit 258f392

File tree

12 files changed

+596
-27
lines changed

12 files changed

+596
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3030
- Adding logic for histogram aggregation using skiplist ([#19130](https://github.com/opensearch-project/OpenSearch/pull/19130))
3131
- Add skip_list param for date, scaled float and token count fields ([#19142](https://github.com/opensearch-project/OpenSearch/pull/19142))
3232
- Implement GRPC MatchPhrase, MultiMatch queries ([#19449](https://github.com/opensearch-project/OpenSearch/pull/19449))
33-
- Implement GRPC Script query ([#19455](https://github.com/opensearch-project/OpenSearch/pull/19455))
33+
- Optimize gRPC transport thread management for improved throughput ([#19278](https://github.com/opensearch-project/OpenSearch/pull/19278))
3434

3535
### Changed
3636
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
@@ -76,6 +76,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7676
- Fix lag metric for pull-based ingestion when streaming source is empty ([#19393](https://github.com/opensearch-project/OpenSearch/pull/19393))
7777
- Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))
7878
- Fix updated keyword field params leading to stale responses from request cache ([#19385](https://github.com/opensearch-project/OpenSearch/pull/19385))
79+
- Implement SslHandler retrieval logic for transport-reactor-netty4 plugin ([#19458](https://github.com/opensearch-project/OpenSearch/pull/19458))
7980

8081
### Dependencies
8182
- Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400))

modules/transport-grpc/README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ setting 'aux.transport.secure-transport-grpc.port', '9400-9500' //optional
2929
| **grpc.host** | List of addresses the gRPC server will bind to. | `["0.0.0.0"]` | `[]` |
3030
| **grpc.bind_host** | List of addresses to bind the gRPC server to. Can be distinct from publish hosts. | `["0.0.0.0", "::"]` | Value of `grpc.host` |
3131
| **grpc.publish_host** | List of hostnames or IPs published to peers for client connections. | `["thisnode.example.com"]` | Value of `grpc.host` |
32-
| **grpc.netty.worker_count** | Number of Netty worker threads for the gRPC server. Controls concurrency and parallelism. | `2` | Number of processors |
32+
| **grpc.netty.worker_count** | Number of Netty worker threads for the gRPC server. Controls network I/O concurrency. | `2` | Number of processors |
33+
| **grpc.netty.executor_count** | Number of threads in the ForkJoinPool for processing gRPC service calls. Controls request processing parallelism. | `32` | 2 × Number of processors |
3334
| **grpc.netty.max_concurrent_connection_calls** | Maximum number of simultaneous in-flight requests allowed per client connection. | `200` | `100` |
3435
| **grpc.netty.max_connection_age** | Maximum age a connection is allowed before being gracefully closed. Supports time units like `ms`, `s`, `m`. | `500ms` | Not set (no limit) |
3536
| **grpc.netty.max_connection_idle** | Maximum duration a connection can be idle before being closed. Supports time units like `ms`, `s`, `m`. | `2m` | Not set (no limit) |
@@ -50,13 +51,24 @@ setting 'grpc.host', '["0.0.0.0"]'
5051
setting 'grpc.bind_host', '["0.0.0.0", "::", "10.0.0.1"]'
5152
setting 'grpc.publish_host', '["thisnode.example.com"]'
5253
setting 'grpc.netty.worker_count', '2'
54+
setting 'grpc.netty.executor_count', '32'
5355
setting 'grpc.netty.max_concurrent_connection_calls', '200'
5456
setting 'grpc.netty.max_connection_age', '500ms'
5557
setting 'grpc.netty.max_connection_idle', '2m'
5658
setting 'grpc.netty.max_msg_size', '10mb'
5759
setting 'grpc.netty.keepalive_timeout', '1s'
5860
```
5961

62+
## Thread Pool Monitoring
63+
64+
The dedicated thread pool used for gRPC request processing is registered as a standard OpenSearch thread pool named `grpc`, controlled by the `grpc.netty.executor_count` setting.
65+
66+
The gRPC thread pool stats can be monitored using:
67+
68+
```bash
69+
curl -X GET "localhost:9200/_nodes/stats/thread_pool?filter_path=nodes.*.thread_pool.grpc"
70+
```
71+
6072
## Testing
6173

6274
### Unit Tests

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.opensearch.repositories.RepositoriesService;
2828
import org.opensearch.script.ScriptService;
2929
import org.opensearch.telemetry.tracing.Tracer;
30+
import org.opensearch.threadpool.ExecutorBuilder;
31+
import org.opensearch.threadpool.FixedExecutorBuilder;
3032
import org.opensearch.threadpool.ThreadPool;
3133
import org.opensearch.transport.AuxTransport;
3234
import org.opensearch.transport.client.Client;
@@ -49,6 +51,7 @@
4951

5052
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.GRPC_TRANSPORT_SETTING_KEY;
5153
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST;
54+
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_EXECUTOR_COUNT;
5255
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_HOST;
5356
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_KEEPALIVE_TIMEOUT;
5457
import static org.opensearch.transport.grpc.Netty4GrpcServerTransport.SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS;
@@ -69,6 +72,9 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl
6972

7073
private static final Logger logger = LogManager.getLogger(GrpcPlugin.class);
7174

75+
/** The name of the gRPC thread pool */
76+
public static final String GRPC_THREAD_POOL_NAME = "grpc";
77+
7278
private Client client;
7379
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
7480
private QueryBuilderProtoConverterRegistryImpl queryRegistry;
@@ -163,7 +169,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
163169
);
164170
return Collections.singletonMap(
165171
GRPC_TRANSPORT_SETTING_KEY,
166-
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
172+
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService, threadPool)
167173
);
168174
}
169175

@@ -206,7 +212,13 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
206212
);
207213
return Collections.singletonMap(
208214
GRPC_SECURE_TRANSPORT_SETTING_KEY,
209-
() -> new SecureNetty4GrpcServerTransport(settings, grpcServices, networkService, secureAuxTransportSettingsProvider)
215+
() -> new SecureNetty4GrpcServerTransport(
216+
settings,
217+
grpcServices,
218+
networkService,
219+
threadPool,
220+
secureAuxTransportSettingsProvider
221+
)
210222
);
211223
}
212224

@@ -235,6 +247,7 @@ public List<Setting<?>> getSettings() {
235247
SETTING_GRPC_PUBLISH_HOST,
236248
SETTING_GRPC_BIND_HOST,
237249
SETTING_GRPC_WORKER_COUNT,
250+
SETTING_GRPC_EXECUTOR_COUNT,
238251
SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS,
239252
SETTING_GRPC_MAX_MSG_SIZE,
240253
SETTING_GRPC_MAX_CONNECTION_AGE,
@@ -243,6 +256,22 @@ public List<Setting<?>> getSettings() {
243256
);
244257
}
245258

259+
/**
260+
* Returns the executor builders for this plugin's custom thread pools.
261+
* Creates a dedicated thread pool for gRPC request processing that integrates
262+
* with OpenSearch's thread pool monitoring and management system.
263+
*
264+
* @param settings the current settings
265+
* @return executor builders for this plugin's custom thread pools
266+
*/
267+
@Override
268+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
269+
final int executorCount = SETTING_GRPC_EXECUTOR_COUNT.get(settings);
270+
return List.of(
271+
new FixedExecutorBuilder(settings, GRPC_THREAD_POOL_NAME, executorCount, 1000, "thread_pool." + GRPC_THREAD_POOL_NAME)
272+
);
273+
}
274+
246275
/**
247276
* Creates components used by the plugin.
248277
* Stores the client for later use in creating gRPC services, and the query registry which registers the types of supported GRPC Search queries.

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java

Lines changed: 83 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.core.common.transport.TransportAddress;
2222
import org.opensearch.core.common.unit.ByteSizeUnit;
2323
import org.opensearch.core.common.unit.ByteSizeValue;
24+
import org.opensearch.threadpool.ThreadPool;
2425
import org.opensearch.transport.AuxTransport;
2526
import org.opensearch.transport.BindTransportException;
2627

@@ -32,6 +33,7 @@
3233
import java.util.List;
3334
import java.util.Objects;
3435
import java.util.concurrent.CopyOnWriteArrayList;
36+
import java.util.concurrent.ExecutorService;
3537
import java.util.concurrent.TimeUnit;
3638
import java.util.concurrent.atomic.AtomicReference;
3739
import java.util.function.Function;
@@ -118,6 +120,16 @@ public class Netty4GrpcServerTransport extends AuxTransport {
118120
Setting.Property.NodeScope
119121
);
120122

123+
/**
124+
* Configure size of executor thread pool for handling gRPC calls.
125+
*/
126+
public static final Setting<Integer> SETTING_GRPC_EXECUTOR_COUNT = new Setting<>(
127+
"grpc.netty.executor_count",
128+
(s) -> Integer.toString(OpenSearchExecutors.allocatedProcessors(s) * 2),
129+
(s) -> Setting.parseInt(s, 1, "grpc.netty.executor_count"),
130+
Setting.Property.NodeScope
131+
);
132+
121133
/**
122134
* Controls the number of allowed simultaneous in flight requests a single client connection may send.
123135
*/
@@ -189,10 +201,12 @@ public class Netty4GrpcServerTransport extends AuxTransport {
189201
protected final Settings settings;
190202

191203
private final NetworkService networkService;
204+
private final ThreadPool threadPool;
192205
private final List<BindableService> services;
193206
private final String[] bindHosts;
194207
private final String[] publishHosts;
195208
private final int nettyEventLoopThreads;
209+
private final int executorThreads;
196210
private final long maxInboundMessageSize;
197211
private final long maxConcurrentConnectionCalls;
198212
private final TimeValue maxConnectionAge;
@@ -202,19 +216,28 @@ public class Netty4GrpcServerTransport extends AuxTransport {
202216
private final List<UnaryOperator<NettyServerBuilder>> serverBuilderConfigs = new ArrayList<>();
203217

204218
private volatile BoundTransportAddress boundAddress;
205-
private volatile EventLoopGroup eventLoopGroup;
219+
private volatile EventLoopGroup bossEventLoopGroup;
220+
private volatile EventLoopGroup workerEventLoopGroup;
221+
private volatile ExecutorService grpcExecutor;
206222

207223
/**
208224
* Creates a new Netty4GrpcServerTransport instance.
209225
* @param settings the configured settings.
210226
* @param services the gRPC compatible services to be registered with the server.
211227
* @param networkService the bind/publish addresses.
228+
* @param threadPool the thread pool for gRPC request processing.
212229
*/
213-
public Netty4GrpcServerTransport(Settings settings, List<BindableService> services, NetworkService networkService) {
230+
public Netty4GrpcServerTransport(
231+
Settings settings,
232+
List<BindableService> services,
233+
NetworkService networkService,
234+
ThreadPool threadPool
235+
) {
214236
logger.debug("Initializing Netty4GrpcServerTransport with settings = {}", settings);
215237
this.settings = Objects.requireNonNull(settings);
216238
this.services = Objects.requireNonNull(services);
217239
this.networkService = Objects.requireNonNull(networkService);
240+
this.threadPool = Objects.requireNonNull(threadPool);
218241
final List<String> grpcBindHost = SETTING_GRPC_BIND_HOST.get(settings);
219242
this.bindHosts = (grpcBindHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING.get(settings) : grpcBindHost).toArray(
220243
Strings.EMPTY_ARRAY
@@ -224,6 +247,7 @@ public Netty4GrpcServerTransport(Settings settings, List<BindableService> servic
224247
.toArray(Strings.EMPTY_ARRAY);
225248
this.port = SETTING_GRPC_PORT.get(settings);
226249
this.nettyEventLoopThreads = SETTING_GRPC_WORKER_COUNT.get(settings);
250+
this.executorThreads = SETTING_GRPC_EXECUTOR_COUNT.get(settings);
227251
this.maxInboundMessageSize = SETTING_GRPC_MAX_MSG_SIZE.get(settings).getBytes();
228252
this.maxConcurrentConnectionCalls = SETTING_GRPC_MAX_CONCURRENT_CONNECTION_CALLS.get(settings);
229253
this.maxConnectionAge = SETTING_GRPC_MAX_CONNECTION_AGE.get(settings);
@@ -232,12 +256,22 @@ public Netty4GrpcServerTransport(Settings settings, List<BindableService> servic
232256
this.portSettingKey = SETTING_GRPC_PORT.getKey();
233257
}
234258

259+
/**
260+
* Returns the setting key used to identify this transport type.
261+
*
262+
* @return the gRPC transport setting key
263+
*/
235264
@Override
236265
public String settingKey() {
237266
return GRPC_TRANSPORT_SETTING_KEY;
238267
}
239268

240-
// public for tests
269+
/**
270+
* Returns the bound transport addresses for this gRPC server.
271+
* This method is public for testing purposes.
272+
*
273+
* @return the bound transport address containing all bound addresses and publish address
274+
*/
241275
@Override
242276
public BoundTransportAddress getBoundAddress() {
243277
return this.boundAddress;
@@ -259,10 +293,16 @@ protected void addServerConfig(UnaryOperator<NettyServerBuilder> configModifier)
259293
protected void doStart() {
260294
boolean success = false;
261295
try {
262-
this.eventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_event_loop"));
296+
// Create separate boss and worker event loop groups for better isolation
297+
this.bossEventLoopGroup = new NioEventLoopGroup(1, daemonThreadFactory(settings, "grpc_boss"));
298+
this.workerEventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_worker"));
299+
300+
// Use OpenSearch's managed thread pool for gRPC request processing
301+
this.grpcExecutor = threadPool.executor("grpc");
302+
263303
bindServer();
264304
success = true;
265-
logger.info("Started gRPC server on port {}", port);
305+
logger.info("Started gRPC server on port {} with {} executor threads", port, executorThreads);
266306
} finally {
267307
if (!success) {
268308
doStop();
@@ -289,12 +329,25 @@ protected void doStop() {
289329
}
290330
}
291331
}
292-
if (eventLoopGroup != null) {
332+
333+
// Note: grpcExecutor is managed by OpenSearch's ThreadPool, so we don't shut it down here
334+
335+
// Shutdown event loop groups
336+
if (bossEventLoopGroup != null) {
293337
try {
294-
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
338+
bossEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
295339
} catch (InterruptedException e) {
296340
Thread.currentThread().interrupt();
297-
logger.warn("Failed to shut down event loop group");
341+
logger.warn("Failed to shut down boss event loop group");
342+
}
343+
}
344+
345+
if (workerEventLoopGroup != null) {
346+
try {
347+
workerEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
348+
} catch (InterruptedException e) {
349+
Thread.currentThread().interrupt();
350+
logger.warn("Failed to shut down worker event loop group");
298351
}
299352
}
300353
}
@@ -305,7 +358,12 @@ protected void doStop() {
305358
*/
306359
@Override
307360
protected void doClose() {
308-
eventLoopGroup.close();
361+
if (bossEventLoopGroup != null) {
362+
bossEventLoopGroup.close();
363+
}
364+
if (workerEventLoopGroup != null) {
365+
workerEventLoopGroup.close();
366+
}
309367
}
310368

311369
private void bindServer() {
@@ -356,9 +414,9 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan
356414
try {
357415
final InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber);
358416
final NettyServerBuilder serverBuilder = NettyServerBuilder.forAddress(address)
359-
.directExecutor()
360-
.bossEventLoopGroup(eventLoopGroup)
361-
.workerEventLoopGroup(eventLoopGroup)
417+
.executor(grpcExecutor)
418+
.bossEventLoopGroup(bossEventLoopGroup)
419+
.workerEventLoopGroup(workerEventLoopGroup)
362420
.maxInboundMessageSize((int) maxInboundMessageSize)
363421
.maxConcurrentCallsPerConnection((int) maxConcurrentConnectionCalls)
364422
.maxConnectionAge(maxConnectionAge.duration(), maxConnectionAge.timeUnit())
@@ -391,4 +449,17 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan
391449

392450
return addr.get();
393451
}
452+
453+
// Package-private methods for testing
454+
ExecutorService getGrpcExecutorForTesting() {
455+
return grpcExecutor;
456+
}
457+
458+
EventLoopGroup getBossEventLoopGroupForTesting() {
459+
return bossEventLoopGroup;
460+
}
461+
462+
EventLoopGroup getWorkerEventLoopGroupForTesting() {
463+
return workerEventLoopGroup;
464+
}
394465
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/ssl/SecureNetty4GrpcServerTransport.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.settings.Settings;
1414
import org.opensearch.common.transport.PortsRange;
1515
import org.opensearch.plugins.SecureAuxTransportSettingsProvider;
16+
import org.opensearch.threadpool.ThreadPool;
1617
import org.opensearch.transport.grpc.Netty4GrpcServerTransport;
1718

1819
import javax.net.ssl.SSLContext;
@@ -73,15 +74,17 @@ public Collection<String> cipherSuites() {
7374
* @param settings the configured settings.
7475
* @param services the gRPC compatible services to be registered with the server.
7576
* @param networkService the bind/publish addresses.
77+
* @param threadPool the thread pool for managing gRPC executor and monitoring.
7678
* @param secureTransportSettingsProvider TLS configuration settings.
7779
*/
7880
public SecureNetty4GrpcServerTransport(
7981
Settings settings,
8082
List<BindableService> services,
8183
NetworkService networkService,
84+
ThreadPool threadPool,
8285
SecureAuxTransportSettingsProvider secureTransportSettingsProvider
8386
) {
84-
super(settings, services, networkService);
87+
super(settings, services, networkService, threadPool);
8588
this.port = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.get(settings);
8689
this.portSettingKey = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.getKey();
8790
try {

0 commit comments

Comments
 (0)