Skip to content

Commit c0a32d4

Browse files
Preserve Thread Context by gRPC Interceptor (#19776)
1 parent b08de24 commit c0a32d4

File tree

10 files changed

+287
-82
lines changed

10 files changed

+287
-82
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4949
- Refactor the GetStats, FlushStats and QueryCacheStats class to use the Builder pattern instead of constructors ([#19935](https://github.com/opensearch-project/OpenSearch/pull/19935))
5050
- Add RangeSemver for `dependencies` in `plugin-descriptor.properties` ([#19939](https://github.com/opensearch-project/OpenSearch/pull/19939))
5151
- Refactor the FieldDataStats and CompletionStats class to use the Builder pattern instead of constructors ([#19936](https://github.com/opensearch-project/OpenSearch/pull/19936))
52+
- Thread Context Preservation by gRPC Interceptor ([#19776](https://github.com/opensearch-project/OpenSearch/pull/19776))
53+
5254

5355
### Fixed
5456
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))

modules/transport-grpc/spi/README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,15 +334,17 @@ The k-NN query's `filter` field is a `QueryContainer` protobuf type that can con
334334

335335
### Overview
336336

337-
Intercept incoming gRPC requests for authentication, authorization, logging, metrics, rate limiting,etc
337+
Intercept incoming gRPC requests for authentication, authorization, logging, metrics, rate limiting, etc. Interceptors have access to OpenSearch's `ThreadContext` to store and retrieve request-scoped data.
338+
339+
**Context Preservation:** The transport-grpc module automatically preserves ThreadContext across async boundaries. Any data set by interceptors will be available in the gRPC service implementation, even when execution switches to different threads.
338340

339341
### Basic Usage
340342

341343
**1. Implement Provider:**
342344
```java
343345
public class SampleInterceptorProvider implements GrpcInterceptorProvider {
344346
@Override
345-
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors() {
347+
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
346348
return Arrays.asList(
347349
// First interceptor (order = 5, runs first)
348350
new GrpcInterceptorProvider.OrderedGrpcInterceptor() {
@@ -353,6 +355,7 @@ public class SampleInterceptorProvider implements GrpcInterceptorProvider {
353355
public ServerInterceptor getInterceptor() {
354356
return (call, headers, next) -> {
355357
String methodName = call.getMethodDescriptor().getFullMethodName();
358+
threadContext.putTransient("grpc.method", methodName);
356359
System.out.println("First interceptor - Method: " + methodName);
357360
return next.startCall(call, headers);
358361
};

modules/transport-grpc/spi/src/main/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProvider.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88
package org.opensearch.transport.grpc.spi;
99

10+
import org.opensearch.common.util.concurrent.ThreadContext;
11+
1012
import java.util.List;
1113

1214
import io.grpc.ServerInterceptor;
@@ -19,12 +21,17 @@
1921
public interface GrpcInterceptorProvider {
2022

2123
/**
22-
* Returns a list of ordered gRPC interceptors.
24+
* Returns a list of ordered gRPC interceptors with access to ThreadContext.
2325
* Each interceptor must have a unique order value.
2426
*
27+
* This follows the pattern established by REST handler wrappers where
28+
* the thread context is provided to allow interceptors to:
29+
* - Extract headers from gRPC metadata and store in ThreadContext
30+
* - Preserve context across async boundaries
31+
* @param threadContext The thread context for managing request context
2532
* @return List of ordered gRPC interceptors
2633
*/
27-
List<OrderedGrpcInterceptor> getOrderedGrpcInterceptors();
34+
List<OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext);
2835

2936
/**
3037
* Provides a gRPC interceptor with an order value for execution priority.
@@ -42,6 +49,8 @@ interface OrderedGrpcInterceptor {
4249

4350
/**
4451
* Returns the actual gRPC ServerInterceptor instance.
52+
* The interceptor can use the ThreadContext provided to the parent
53+
* GrpcInterceptorProvider to manage request context.
4554
*
4655
* @return the server interceptor
4756
*/

modules/transport-grpc/spi/src/test/java/org/opensearch/transport/grpc/spi/GrpcInterceptorProviderTests.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
package org.opensearch.transport.grpc.spi;
1010

11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.common.util.concurrent.ThreadContext;
1113
import org.opensearch.test.OpenSearchTestCase;
1214

1315
import java.util.Collections;
@@ -22,26 +24,45 @@ public class GrpcInterceptorProviderTests extends OpenSearchTestCase {
2224

2325
public void testBasicProviderImplementation() {
2426
TestGrpcInterceptorProvider provider = new TestGrpcInterceptorProvider(10);
27+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
2528

26-
List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors();
29+
List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors(threadContext);
2730
assertNotNull(interceptors);
2831
assertEquals(1, interceptors.size());
2932
assertEquals(10, interceptors.get(0).order());
3033
}
3134

3235
public void testProviderReturnsEmptyList() {
36+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
3337
GrpcInterceptorProvider provider = new GrpcInterceptorProvider() {
3438
@Override
35-
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors() {
39+
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
3640
return Collections.emptyList();
3741
}
3842
};
3943

40-
List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors();
44+
List<GrpcInterceptorProvider.OrderedGrpcInterceptor> interceptors = provider.getOrderedGrpcInterceptors(threadContext);
4145
assertNotNull(interceptors);
4246
assertTrue(interceptors.isEmpty());
4347
}
4448

49+
public void testProviderReceivesThreadContext() {
50+
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
51+
threadContext.putHeader("X-Test-Header", "test-value");
52+
53+
GrpcInterceptorProvider provider = new GrpcInterceptorProvider() {
54+
@Override
55+
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext ctx) {
56+
// Verify that the provider receives the ThreadContext
57+
assertNotNull("ThreadContext should not be null", ctx);
58+
assertEquals("test-value", ctx.getHeader("X-Test-Header"));
59+
return Collections.emptyList();
60+
}
61+
};
62+
63+
provider.getOrderedGrpcInterceptors(threadContext);
64+
}
65+
4566
private static class TestGrpcInterceptorProvider implements GrpcInterceptorProvider {
4667
private final int order;
4768

@@ -50,7 +71,7 @@ private static class TestGrpcInterceptorProvider implements GrpcInterceptorProvi
5071
}
5172

5273
@Override
53-
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors() {
74+
public List<GrpcInterceptorProvider.OrderedGrpcInterceptor> getOrderedGrpcInterceptors(ThreadContext threadContext) {
5475
return Collections.singletonList(createTestInterceptor(order, "test-interceptor"));
5576
}
5677
}

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.List;
5353
import java.util.Map;
5454
import java.util.function.Supplier;
55+
import java.util.stream.Collectors;
5556

5657
import io.grpc.BindableService;
5758

@@ -84,7 +85,8 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl
8485
private final List<GrpcServiceFactory> servicesFactory = new ArrayList<>();
8586
private QueryBuilderProtoConverterRegistryImpl queryRegistry;
8687
private AbstractQueryBuilderProtoUtils queryUtils;
87-
private GrpcInterceptorChain serverInterceptor = new GrpcInterceptorChain();
88+
private GrpcInterceptorChain serverInterceptor; // Initialized in createComponents
89+
private List<GrpcInterceptorProvider> interceptorProviders = new ArrayList<>();
8890
private Client client;
8991

9092
/**
@@ -118,39 +120,10 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
118120
}
119121
List<GrpcInterceptorProvider> providers = loader.loadExtensions(GrpcInterceptorProvider.class);
120122
if (providers != null) {
121-
List<OrderedGrpcInterceptor> orderedList = new ArrayList<>();
122-
for (GrpcInterceptorProvider provider : providers) {
123-
orderedList.addAll(provider.getOrderedGrpcInterceptors());
124-
}
125-
126-
// Validate that no two interceptors have the same order
127-
Map<Integer, List<OrderedGrpcInterceptor>> orderMap = new HashMap<>();
128-
for (OrderedGrpcInterceptor interceptor : orderedList) {
129-
int order = interceptor.order();
130-
orderMap.computeIfAbsent(order, k -> new ArrayList<>()).add(interceptor);
131-
}
132-
133-
// Check for duplicates and throw exception if found
134-
for (Map.Entry<Integer, List<OrderedGrpcInterceptor>> entry : orderMap.entrySet()) {
135-
if (entry.getValue().size() > 1) {
136-
throw new IllegalArgumentException(
137-
"Multiple gRPC interceptors have the same order value: "
138-
+ entry.getKey()
139-
+ ". Each interceptor must have a unique order value."
140-
);
141-
}
142-
}
143-
144-
// Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern
145-
orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order));
146-
147-
if (!orderedList.isEmpty()) {
148-
// Create a single chain interceptor that manages the execution
149-
// This ensures proper ordering and exception handling
150-
serverInterceptor.addInterceptors(orderedList);
151-
152-
logger.info("Loaded {} gRPC interceptors into chain", orderedList.size());
153-
}
123+
// Note: ThreadContext will be provided during component creation
124+
// For now, we collect providers to be initialized later with ThreadContext
125+
this.interceptorProviders = providers;
126+
logger.info("Found {} gRPC interceptor providers, will initialize during component creation", providers.size());
154127
}
155128
// Load discovered gRPC service factories
156129
List<GrpcServiceFactory> services = loader.loadExtensions(GrpcServiceFactory.class);
@@ -363,6 +336,53 @@ public Collection<Object> createComponents(
363336
) {
364337
this.client = client;
365338

339+
// Initialize the interceptor chain with ThreadContext
340+
this.serverInterceptor = new GrpcInterceptorChain(threadPool.getThreadContext());
341+
342+
List<OrderedGrpcInterceptor> orderedList = new ArrayList<>();
343+
344+
// Then add plugin-provided interceptors
345+
if (!interceptorProviders.isEmpty()) {
346+
for (GrpcInterceptorProvider provider : interceptorProviders) {
347+
orderedList.addAll(provider.getOrderedGrpcInterceptors(threadPool.getThreadContext()));
348+
}
349+
350+
// Validate that no two interceptors have the same order
351+
Map<Integer, List<OrderedGrpcInterceptor>> orderMap = new HashMap<>();
352+
for (OrderedGrpcInterceptor interceptor : orderedList) {
353+
int order = interceptor.order();
354+
orderMap.computeIfAbsent(order, k -> new ArrayList<>()).add(interceptor);
355+
}
356+
357+
// Check for duplicates and throw exception if found
358+
for (Map.Entry<Integer, List<OrderedGrpcInterceptor>> entry : orderMap.entrySet()) {
359+
if (entry.getValue().size() > 1) {
360+
String conflictingInterceptors = entry.getValue()
361+
.stream()
362+
.map(i -> i.getInterceptor().getClass().getName())
363+
.collect(Collectors.joining(", "));
364+
throw new IllegalArgumentException(
365+
"Multiple gRPC interceptors have the same order value ["
366+
+ entry.getKey()
367+
+ "]: "
368+
+ conflictingInterceptors
369+
+ ". Each interceptor must have a unique order value."
370+
);
371+
}
372+
}
373+
374+
// Sort by order and create a chain - similar to OpenSearch's ActionFilter pattern
375+
orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::order));
376+
377+
if (!orderedList.isEmpty()) {
378+
// Create a single chain interceptor that manages the execution
379+
// This ensures proper ordering and exception handling
380+
serverInterceptor.addInterceptors(orderedList);
381+
382+
logger.info("Loaded {} gRPC interceptors into chain", orderedList.size());
383+
}
384+
}
385+
366386
// Create the registry
367387
this.queryRegistry = new QueryBuilderProtoConverterRegistryImpl();
368388

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/Netty4GrpcServerTransport.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import static org.opensearch.common.settings.Setting.listSetting;
5656
import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
5757
import static org.opensearch.transport.Transport.resolveTransportPublishPort;
58+
import static org.opensearch.transport.grpc.GrpcPlugin.GRPC_THREAD_POOL_NAME;
5859

5960
/**
6061
* Netty4 gRPC server implemented as a LifecycleComponent.
@@ -275,7 +276,7 @@ public Netty4GrpcServerTransport(
275276
NetworkService networkService,
276277
ThreadPool threadPool
277278
) {
278-
this(settings, services, networkService, threadPool, new GrpcInterceptorChain());
279+
this(settings, services, networkService, threadPool, new GrpcInterceptorChain(threadPool.getThreadContext()));
279280
}
280281

281282
/**
@@ -320,7 +321,7 @@ protected void doStart() {
320321
this.workerEventLoopGroup = new NioEventLoopGroup(nettyEventLoopThreads, daemonThreadFactory(settings, "grpc_worker"));
321322

322323
// Use OpenSearch's managed thread pool for gRPC request processing
323-
this.grpcExecutor = threadPool.executor("grpc");
324+
this.grpcExecutor = threadPool.executor(GRPC_THREAD_POOL_NAME);
324325

325326
bindServer();
326327
success = true;

0 commit comments

Comments
 (0)