-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add Circuit breaker support for gRPC #20203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Circuit breaker support for gRPC #20203
Conversation
📝 WalkthroughWalkthroughDocumentServiceImpl and SearchServiceImpl now accept a CircuitBreakerService, estimate request sizes and charge the IN_FLIGHT_REQUESTS breaker before processing, wrap response observers to release bytes once on terminal events, and convert circuit-breaking/runtime errors into gRPC errors. Changes
Sequence DiagramsequenceDiagram
participant Caller as gRPC Client
participant Service as DocumentServiceImpl / SearchServiceImpl
participant CBService as CircuitBreakerService
participant Breaker as CircuitBreaker (IN_FLIGHT_REQUESTS)
participant NodeClient as NodeClient (bulk/search)
participant Resp as StreamObserver (response)
Caller->>Service: bulk/search(request)
Service->>Service: estimate requestSize
Service->>CBService: getBreaker(IN_FLIGHT_REQUESTS)
CBService-->>Service: Breaker
Service->>Breaker: addEstimateBytesAndMaybeBreak(requestSize, tag)
alt Breaker throws CircuitBreakingException
Breaker-->>Service: throws CircuitBreakingException
Service->>Resp: onError(mapped gRPC error)
else Allowed
Breaker-->>Service: allowed
Service->>Resp: wrap Resp with CircuitBreakerStreamObserver(requestSize)
Service->>NodeClient: invoke bulk/search(request, listener(for wrapped Resp))
NodeClient-->>Service: listener.onResponse / listener.onFailure
Resp-->>Breaker: CircuitBreakerStreamObserver releases bytes once (addWithoutBreaking(-requestSize))
Resp-->>Caller: onNext/onCompleted or onError
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📜 Recent review detailsConfiguration used: defaults Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (3)📓 Common learnings📚 Learning: 2026-01-02T19:23:29.698ZApplied to files:
📚 Learning: 2026-01-02T19:23:29.698ZApplied to files:
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
🔇 Additional comments (1)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
30-41: Circuit breaker integration inbulklooks correct; consider tightening try/catch scope to avoid theoretical double-releaseThe new
CircuitBreakerServicedependency and thebulkimplementation cover the important paths:
- Reserve bytes via
IN_FLIGHT_REQUESTSbefore work.- Release bytes in the wrapped listener on both
onResponseandonFailure.- Release bytes in the
catchblock when failing beforeclient.bulk(...)(e.g., CB trip or request parsing error).One subtle edge case to be aware of: the
trycurrently also wraps theclient.bulk(bulkRequest, wrappedListener)call. Ifclient.bulkwere ever to:
- Invoke the provided listener (causing
addWithoutBreaking(-contentLength)via thefinally), and- Then throw a
RuntimeExceptionsynchronously,the
catchblock would calladdWithoutBreaking(-contentLength)a second time, double-decrementing the in-flight breaker.If
NodeClient.bulkis guaranteed to either invoke the listener or throw (but not both), the current code is fine. If you want this to be future-proof, a defensive option would be to narrow thetry/catchto only cover the breaker reservation +BulkRequestProtoUtils.prepareRequest(...), letting the wrapped listener be solely responsible for releasing bytes onceclient.bulkhas been invoked.Not a blocker, but worth considering to make the accounting robust against changes in
client.bulkbehavior.Also applies to: 52-88
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (1)
11-17: Circuit breaker tests give solid coverage of the new behaviorThe added mocks and tests around
CircuitBreakerServiceandCircuitBreakerexercise all the key flows:
- CB is consulted before processing (
addEstimateBytesAndMaybeBreakwith the expected source tag).- Trip path rejects the request, does not call
client.bulk, releases bytes, and surfaces an error.- Both success and failure paths through the wrapped
ActionListenerrelease bytes.- The “exactly once” test guards against double-release in the happy path.
If you want to tighten them further, a minor optional enhancement would be to capture the
longarguments passed toaddWithoutBreakingand assert they are negative and consistent with the serialized request size, but the current tests are already effective for regressions in control flow.Also applies to: 27-42, 50-55, 59-64, 91-203
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java(2 hunks)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java(2 hunks)modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java(3 hunks)modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: detect-breaking-change
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: Analyze (java)
🔇 Additional comments (2)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java (1)
41-41: Stream → Collectors migration for interceptor lists looks correctUsing
Collectors.toList()here is compatible with older JDKs and preserves the intended behavior for the interceptor collections in tests. No issues from my side.Also applies to: 633-636, 653-658
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (1)
19-19: CircuitBreakerService injection into gRPC document service is wired correctlyBoth
getAuxTransportsandgetSecureAuxTransportsnow constructDocumentServiceImplwith the sharedCircuitBreakerService, keeping behavior consistent across secure/non-secure transports. This matches the updated constructor and looks good.Also applies to: 184-187, 236-238
|
❌ Gradle check result for 015c303: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
- Add entry for PR opensearch-project#20203 - Document circuit breaker support for gRPC transport Signed-off-by: sakrah <sakrah@uber.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: Analyze (java)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15-intel)
🔇 Additional comments (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (3)
13-14: Imports look correct for the circuit breaker integration.The imports are appropriate for the IN_FLIGHT_REQUESTS circuit breaker pattern and the AtomicBoolean for ensuring single-release semantics.
Also applies to: 24-25
32-43: Constructor changes for circuit breaker integration look good.The
circuitBreakerServicefield is properly declared asfinaland the constructor signature update is documented. The AI summary confirms thatGrpcPluginhas been updated to pass theCircuitBreakerService.
63-89: Wrapped listener implementation correctly ensures byte release.The anonymous
ActionListenerproperly wraps the base listener and uses theAtomicBooleanguard withcompareAndSetto ensure bytes are released exactly once on either success or failure path. Thefinallyblocks guarantee release even if the base listener throws.
...transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java
Outdated
Show resolved
Hide resolved
|
❌ Gradle check result for f0f43c0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
f0f43c0 to
47c6865
Compare
- Add entry for PR opensearch-project#20203 - Document circuit breaker support for gRPC transport Signed-off-by: sakrah <sakrah@uber.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
51-76: Remove the byte decrement in the CircuitBreakingException catch block.The
addEstimateBytesAndMaybeBreakmethod only adds bytes if the limit check passes. When the method throwsCircuitBreakingException, bytes were never added to the breaker, so callingaddWithoutBreaking(-requestSize)incorrectly decrements the breaker's used bytes. Additionally, if the parent breaker limit is exceeded,addEstimateBytesAndMaybeBreakalready handles the decrement internally before throwing.Lines 65-69 should not call
addWithoutBreaking(-requestSize)in the catch block. Bytes should only be released viaCircuitBreakerStreamObserverwhen the response completes.
🧹 Nitpick comments (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (1)
40-61: Constructor properly validates dependencies.Good defensive programming with explicit null-checks for all dependencies. This provides clear error messages when misconfigured.
Note:
DocumentServiceImpldoes not have similar null-checks in its constructor. Consider adding them for consistency across service implementations.modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
17-57: Well-designed wrapper with single-release guarantee.The
CircuitBreakerStreamObservercorrectly:
- Delegates
onNextcalls without releasing bytes- Releases bytes exactly once on terminal events (
onCompleted/onError) usingAtomicBoolean.compareAndSet- Ensures bytes are released before delegating terminal events
Consider adding Javadoc to document the class purpose and thread-safety guarantees for future maintainers.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
CHANGELOG.mdmodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.java
🚧 Files skipped from review as they are similar to previous changes (1)
- modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-12-13T20:16:15.318Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4Http3ServerTransport.java:101-123
Timestamp: 2025-12-13T20:16:15.318Z
Learning: In OpenSearch, only one HTTP transport implementation can be active and loaded at a time, so duplicate setting definitions (such as h3.max_stream_local_length, h3.max_stream_remote_length, and h3.max_streams) across different transport implementations like Netty4Http3ServerTransport and ReactorNetty4HttpServerTransport will not cause setting registration conflicts.
Applied to files:
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java
📚 Learning: 2025-12-19T21:26:37.090Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4QuicServerTransport.java:0-0
Timestamp: 2025-12-19T21:26:37.090Z
Learning: In Netty4QuicServerTransport (modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4QuicServerTransport.java), the secureHttpTransportSettingsProvider parameter is guaranteed to be non-null because the plugin verifies its presence during instantiation, so no explicit null-check is needed in the constructor.
Applied to files:
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java
🧬 Code graph analysis (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java (1)
SearchRequestActionListener(26-61)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
CircuitBreakerStreamObserver(17-57)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/GrpcErrorHandler.java (1)
GrpcErrorHandler(34-178)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (1)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/search/query/QueryBuilderProtoTestUtils.java (1)
QueryBuilderProtoTestUtils(13-51)
🪛 LanguageTool
CHANGELOG.md
[grammar] ~18-~18: Use a hyphen to join words.
Context: .../pull/19539)) - Add a mapper for context aware segments grouping criteria ([#1923...
(QB_NEW_EN_HYPHEN)
[uncategorized] ~20-~20: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...568)) - Add support for repository with Server side encryption enabled and client side encr...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
[grammar] ~20-~20: Use a hyphen to join words.
Context: ...erver side encryption enabled and client side encryption as well based on a flag....
(QB_NEW_EN_HYPHEN)
[uncategorized] ~37-~37: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...enSearch/pull/19878)) - Add support for context aware segments ([#19098](https://github.com/o...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (21)
- GitHub Check: gradle-check
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: Analyze (java)
- GitHub Check: Mend Security Check
🔇 Additional comments (9)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (2)
171-191: LGTM! Circuit breaker service is correctly wired to gRPC services.The
CircuitBreakerServiceis properly passed to bothDocumentServiceImplandSearchServiceImpl, enabling in-flight request tracking for the non-secure transport.
222-250: Consistent wiring for secure transport.The secure transport follows the same pattern as the non-secure transport, ensuring circuit breaker protection is applied uniformly.
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (1)
69-97: Circuit breaker integration follows the same pattern as DocumentServiceImpl.The implementation is consistent with
DocumentServiceImpl:
- Bytes are reserved before processing
- Response observer is wrapped with
CircuitBreakerStreamObserver- Exceptions properly release bytes and propagate errors
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (2)
125-166: Comprehensive circuit breaker test coverage.Tests properly verify:
- Circuit breaker is checked before processing requests
- Tripped breaker rejects requests and prevents
client.search()from being called- Bytes are released after rejection
- Errors are propagated to the client
168-202: Good integration test for the wrapped observer pattern.The test correctly verifies that:
- The
CircuitBreakerStreamObserverwrapper is used- Successful responses flow through to the original observer
- Bytes are released via the wrapper on completion
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.java (2)
52-78: Core functionality tests are well-structured.Tests verify the essential behaviors:
onNextdelegates without releasing bytesonCompletedandonErrorboth release bytes exactly once- Delegation to the underlying observer works correctly
81-107: Single-release guarantee is properly tested.These tests verify the critical
AtomicBooleanlogic that prevents double-release of circuit breaker bytes, which is essential for correct accounting.modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (2)
96-137: Comprehensive circuit breaker tests for bulk operations.Tests properly verify the circuit breaker integration:
- Breaker is checked before processing
- Tripped breaker prevents
client.bulk()from being called- Bytes are released after rejection
- Errors are propagated correctly
139-231: Thorough lifecycle testing with real response objects.Good approach using
ArgumentCaptorto capture the listener and simulate both success and failure paths. The tests verify:
- Bytes released on successful completion
- Bytes released on failure
- Single-release guarantee (no double-release)
The use of real
BulkResponseobjects (lines 154-161) provides more realistic test coverage than pure mocking.
|
❌ Gradle check result for 47c6865: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
68-72: Same concern as SearchServiceImpl: verify breaker semantics.This has the same pattern where bytes may be released without being reserved if
addEstimateBytesAndMaybeBreak()throws before adding. The verification request forSearchServiceImplapplies here as well.
🧹 Nitpick comments (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
24-41: Well-designed wrapper with idempotent release.The use of
AtomicBooleanfor single-release semantics is the correct pattern. Consider adding null checks for constructor parameters for defensive programming consistency with other service classes.🔎 Optional: Add defensive null checks
public CircuitBreakerStreamObserver(StreamObserver<T> delegate, CircuitBreakerService circuitBreakerService, int requestSize) { + if (delegate == null) { + throw new IllegalArgumentException("Delegate observer cannot be null"); + } + if (circuitBreakerService == null) { + throw new IllegalArgumentException("CircuitBreakerService cannot be null"); + } this.delegate = delegate; this.circuitBreakerService = circuitBreakerService; this.requestSize = requestSize; }modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
40-42: Constructor correctly stores circuit breaker service.Note: Unlike
SearchServiceImpl, this constructor doesn't have null checks. Consider adding them for consistency.🔎 Add null checks for consistency
public DocumentServiceImpl(Client client, CircuitBreakerService circuitBreakerService) { + if (client == null) { + throw new IllegalArgumentException("Client cannot be null"); + } + if (circuitBreakerService == null) { + throw new IllegalArgumentException("CircuitBreakerService cannot be null"); + } this.client = client; this.circuitBreakerService = circuitBreakerService; }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.java
🚧 Files skipped from review as they are similar to previous changes (2)
- modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
- modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.java
🧰 Additional context used
🧬 Code graph analysis (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java (1)
SearchRequestActionListener(26-61)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
CircuitBreakerStreamObserver(24-80)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/GrpcErrorHandler.java (1)
GrpcErrorHandler(34-178)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (4)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java (1)
BulkRequestActionListener(26-72)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
CircuitBreakerStreamObserver(24-80)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/GrpcErrorHandler.java (1)
GrpcErrorHandler(34-178)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: Analyze (java)
- GitHub Check: detect-breaking-change
🔇 Additional comments (13)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (2)
186-189: LGTM!The circuit breaker service is correctly wired to both
DocumentServiceImplandSearchServiceImpl. The pattern is consistent with OpenSearch's approach of passing infrastructure services through constructors.
240-243: Consistent wiring for secure transport.The circuit breaker integration mirrors the non-secure transport path, ensuring both transport types have memory protection.
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (3)
47-60: Constructor properly validates dependencies.Good defensive programming with explicit null checks for all required dependencies.
94-98: RuntimeException handling correctly releases bytes.Since this catch block is only reached after
addEstimateBytesAndMaybeBreak()succeeds at line 78, releasing bytes here is correct.
89-93: No issue.addEstimateBytesAndMaybeBreak()increments the breaker's tracked bytes before throwingCircuitBreakingException, so the subsequentaddWithoutBreaking(-requestSize)at line 90 correctly releases the reserved bytes.modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (2)
58-71: Correct ordering: release before delegate.Releasing bytes before calling delegate ensures memory accounting is updated even if the delegate throws. This is the right pattern for resource cleanup.
73-79: LGTM!The
compareAndSetpattern correctly prevents double-release in concurrent scenarios.modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
59-67: Proper integration with async callback flow.The wrapped observer ensures bytes are released when the async bulk operation completes (success or failure), preventing leaks in the normal execution path.
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (5)
54-67: Test setup correctly mocks circuit breaker infrastructure.Good pattern: mocking both
CircuitBreakerServiceandCircuitBreakerallows fine-grained control over circuit breaker behavior in tests.
95-107: Good test for verifying circuit breaker check ordering.This ensures the breaker is consulted before any request processing occurs.
109-136: Test verifies rejection but byte release assertion may need review.Line 132 verifies
addWithoutBreakingis called afterCircuitBreakingException. If the production fix ensures bytes aren't released when they weren't reserved, this assertion would need to change toverify(circuitBreaker, never()).addWithoutBreaking(anyLong()).
138-230: Comprehensive tests for byte release semantics.Good coverage of:
- Release on success (via
onCompleted)- Release on failure (via
onError)- Single-release guarantee (via
times(1)assertion)These tests effectively validate the
CircuitBreakerStreamObserverintegration.
232-241: Clean test helper method.Creates a minimal valid request suitable for circuit breaker testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
CHANGELOG.md (1)
15-15: Typo: "seperate" should be "separate".This issue was already flagged in a previous review.
🔎 Proposed fix
-- Add seperate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532)) +- Add separate shard limit validation for local and remote indices ([#19532](https://github.com/opensearch-project/OpenSearch/pull/19532))
🧹 Nitpick comments (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
40-43: Add null checks in constructor for consistency.The constructor should validate its parameters to fail fast with clear error messages.
SearchServiceImplincludes null checks for all constructor parameters (lines 48-56), butDocumentServiceImpldoes not.🔎 Proposed fix
public DocumentServiceImpl(Client client, CircuitBreakerService circuitBreakerService) { + if (client == null) { + throw new IllegalArgumentException("Client cannot be null"); + } + if (circuitBreakerService == null) { + throw new IllegalArgumentException("Circuit breaker service cannot be null"); + } this.client = client; this.circuitBreakerService = circuitBreakerService; }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
CHANGELOG.mdmodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java
🧰 Additional context used
🧬 Code graph analysis (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (4)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java (1)
BulkRequestActionListener(26-72)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
CircuitBreakerStreamObserver(24-80)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/GrpcErrorHandler.java (1)
GrpcErrorHandler(34-178)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (1)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/search/query/QueryBuilderProtoTestUtils.java (1)
QueryBuilderProtoTestUtils(13-51)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchRequestProtoUtils.java (1)
SearchRequestProtoUtils(43-321)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
CircuitBreakerStreamObserver(24-80)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/GrpcErrorHandler.java (1)
GrpcErrorHandler(34-178)
🪛 LanguageTool
CHANGELOG.md
[uncategorized] ~18-~18: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...nSearch/pull/19539)) - Add a mapper for context aware segments grouping criteria ([#19233](ht...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
[uncategorized] ~20-~20: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...568)) - Add support for repository with Server side encryption enabled and client side encr...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
[uncategorized] ~20-~20: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...with Server side encryption enabled and client side encryption as well based on a flag ([#1...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
[uncategorized] ~37-~37: If this is a compound adjective that modifies the following noun, use a hyphen.
Context: ...enSearch/pull/19878)) - Add support for context aware segments ([#19098](https://github.com/o...
(EN_COMPOUND_ADJECTIVE_INTERNAL)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: Analyze (java)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
🔇 Additional comments (6)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
52-78: Circuit breaker integration implemented correctly.The circuit breaker lifecycle is properly managed:
- Bytes are reserved before processing (line 57)
CircuitBreakingExceptionis caught without manual byte release (line 68), since bytes were never added- Synchronous exceptions trigger manual release (line 73) and use the original observer
- Asynchronous success/failure paths use
CircuitBreakerStreamObserverto release bytes exactly onceThis design prevents double-release and ensures proper accounting.
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (2)
47-60: LGTM: Proper null checks in constructor.The constructor validates all parameters with clear error messages, following defensive programming best practices.
70-99: Circuit breaker integration implemented correctly.The implementation follows the same well-designed pattern as
DocumentServiceImpl:
- Request size is tracked via the
IN_FLIGHT_REQUESTSbreakerCircuitBreakerStreamObserverwraps the response observer to ensure bytes are released on completion or error- Synchronous exceptions are handled with manual byte release
- The
CircuitBreakingExceptionhandler correctly avoids releasing bytes that were never reservedmodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (1)
95-227: Comprehensive circuit breaker test coverage.The test suite thoroughly validates the circuit breaker integration:
- Pre-processing checks verify the breaker is consulted before executing requests
- Rejection scenarios confirm requests are blocked when the breaker trips
- Success and failure paths verify bytes are released via
CircuitBreakerStreamObserver- The exactly-once test guards against double-release bugs
The tests properly use
ArgumentCaptorto simulate asynchronous listener callbacks and verify the complete lifecycle.modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (2)
88-96: Good defensive testing for null parameters.The test validates that the constructor properly rejects a null
CircuitBreakerService, ensuring fail-fast behavior with clear error messages.
137-228: Comprehensive circuit breaker test coverage.The test suite validates all critical circuit breaker scenarios:
- Pre-processing verification confirms the breaker is checked before processing
- Rejection handling ensures requests are properly blocked when the breaker trips
- Observer wrapping tests verify the
CircuitBreakerStreamObservercorrectly releases bytes on success- Exception handling tests confirm bytes are released when errors occur
The tests properly simulate both synchronous exceptions and asynchronous listener callbacks.
|
❌ Gradle check result for 9a1bb41: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #20203 +/- ##
============================================
- Coverage 73.30% 73.28% -0.02%
+ Complexity 71861 71819 -42
============================================
Files 5791 5792 +1
Lines 328565 328610 +45
Branches 47308 47311 +3
============================================
- Hits 240845 240831 -14
- Misses 68420 68467 +47
- Partials 19300 19312 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
...transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java
Show resolved
Hide resolved
Signed-off-by: sakrah <sakrah@uber.com>
- Add entry for PR opensearch-project#20203 - Document circuit breaker support for gRPC transport Signed-off-by: sakrah <sakrah@uber.com>
Signed-off-by: sakrah <sakrah@uber.com>
…t cleanup Signed-off-by: Sam Akrah <sakrah@uber.com>
Signed-off-by: Sam Akrah <sakrah@uber.com>
Signed-off-by: Sam Akrah <sakrah@uber.com>
Signed-off-by: Sam Akrah <sakrah@uber.com>
…locks CircuitBreakingException is thrown before bytes are added (current breaker limit) or bytes are already decremented internally (parent breaker limit). Only RuntimeException/IOException catch blocks should decrement bytes since those exceptions occur after bytes were successfully added. Also fix CHANGELOG formatting issues. Signed-off-by: Sam Akrah <sakrah@uber.com>
9a1bb41 to
1837d3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In
@modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java:
- Around line 40-42: The DocumentServiceImpl constructor does not validate the
circuitBreakerService parameter, risking an NPE when getBreaker is called;
update the DocumentServiceImpl(Client client, CircuitBreakerService
circuitBreakerService) constructor to check for null on circuitBreakerService
(similar to SearchServiceImpl’s parameter validation), throw a
NullPointerException or use Objects.requireNonNull with a clear message if it is
null, and ensure the field assignment only proceeds after validation so
getBreaker cannot be invoked on a null service.
In
@modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java:
- Around line 120-135: Remove the duplicate test method testSearchWithException
and retain the more descriptive testCircuitBreakerBytesReleasedOnException;
specifically delete the entire testSearchWithException method (the block that
creates a SearchRequest, stubs client.search to throw, calls service.search, and
verifies circuitBreaker.addWithoutBreaking and responseObserver.onError) so only
testCircuitBreakerBytesReleasedOnException remains, then run tests to ensure no
unused imports or references remain and clean them up if needed.
🧹 Nitpick comments (2)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (2)
109-118: Consider adding circuit breaker verification (optional).While this test correctly verifies the basic search flow, it could optionally verify that the circuit breaker was checked before processing. However, since other tests specifically cover circuit breaker behavior, keeping this as a simple smoke test is also reasonable.
Optional: Add circuit breaker verification
public void testSearchSuccess() throws IOException { // Create a test request org.opensearch.protobufs.SearchRequest request = createTestSearchRequest(); // Call the search method service.search(request, responseObserver); + // Verify circuit breaker was checked + verify(circuitBreaker).addEstimateBytesAndMaybeBreak(anyLong(), eq("<grpc_request>")); + // Verify that client.search was called with any SearchRequest and any ActionListener verify(client).search(any(org.opensearch.action.search.SearchRequest.class), any()); }
43-236: Consider adding explicit double-release prevention test.The PR objectives mention "prevention of double-release" as a tested scenario. While the CircuitBreakerStreamObserver likely has internal guards against this, consider adding an explicit test that verifies bytes are released exactly once even if multiple terminal events (onCompleted, onError) are triggered.
Suggested test for double-release prevention
public void testCircuitBreakerBytesReleasedOnlyOnce() throws IOException { // Create a test request org.opensearch.protobufs.SearchRequest request = createTestSearchRequest(); // Capture the ActionListener to simulate multiple terminal events @SuppressWarnings("unchecked") ArgumentCaptor<ActionListener<SearchResponse>> listenerCaptor = ArgumentCaptor.forClass(ActionListener.class); // Call the search method service.search(request, responseObserver); // Verify client.search was called and capture the listener verify(client).search(any(org.opensearch.action.search.SearchRequest.class), listenerCaptor.capture()); // Simulate successful response SearchResponse mockResponse = mock(SearchResponse.class); when(mockResponse.getTook()).thenReturn(TimeValue.timeValueMillis(100)); when(mockResponse.isTimedOut()).thenReturn(false); when(mockResponse.getTotalShards()).thenReturn(5); when(mockResponse.getSuccessfulShards()).thenReturn(5); when(mockResponse.getSkippedShards()).thenReturn(0); when(mockResponse.getFailedShards()).thenReturn(0); when(mockResponse.getShardFailures()).thenReturn(new org.opensearch.action.search.ShardSearchFailure[0]); when(mockResponse.getClusters()).thenReturn(new SearchResponse.Clusters(0, 0, 0)); when(mockResponse.getHits()).thenReturn(SearchHits.empty()); when(mockResponse.getInternalResponse()).thenReturn(mock(SearchResponseSections.class)); // Trigger onResponse listenerCaptor.getValue().onResponse(mockResponse); // Verify bytes were released once after onCompleted verify(circuitBreaker).addWithoutBreaking(anyLong()); // Try to trigger multiple completions on the wrapped observer (if accessible) // This would verify the internal guard prevents double-release // Note: This may require exposing test hooks or using reflection }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
CHANGELOG.mdmodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.java
🚧 Files skipped from review as they are similar to previous changes (3)
- CHANGELOG.md
- modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java
- modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: The gRPC search API in OpenSearch is marked as "experimental" in official documentation, so changes to proto schemas that remove previously unsupported fields (those throwing UnsupportedOperationException) are not considered breaking changes.
📚 Learning: 2026-01-02T19:23:16.689Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:16.689Z
Learning: In OpenSearch's gRPC transport module, treat removals of previously-unsupported fields (those throwing UnsupportedOperationException) from experimental proto schemas as non-breaking changes. When reviewing changes to proto-related Java code in this module, document that such removals do not count as breaking API changes, and ensure tests reflect compatibility expectations accordingly.
Applied to files:
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.javamodules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java
📚 Learning: 2026-01-02T19:23:29.698Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: In the transport-grpc module, suggest and aggregations protos were removed from SearchRequestBody in protobufs 1.0.0 because they haven't been vetted for accuracy in the API specification. The URL parameter suggest support (suggest_field, suggest_mode, suggest_size, suggest_text) is a minimized subset and not intended as a replacement for full Suggester functionality.
Applied to files:
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.javamodules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
🧬 Code graph analysis (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (4)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java (1)
BulkRequestActionListener(26-72)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserver.java (1)
CircuitBreakerStreamObserver(24-80)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/util/GrpcErrorHandler.java (1)
GrpcErrorHandler(34-178)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (1)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/search/query/QueryBuilderProtoTestUtils.java (1)
QueryBuilderProtoTestUtils(13-51)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: Analyze (java)
- GitHub Check: detect-breaking-change
🔇 Additional comments (12)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/CircuitBreakerStreamObserverTests.java (1)
1-127: Excellent test coverage for the CircuitBreakerStreamObserver wrapper!The test suite comprehensively validates the circuit breaker lifecycle:
- Delegation of all methods to the underlying observer
- Byte release exactly once on terminal events (onCompleted/onError)
- Prevention of double-release across multiple terminal event combinations
- No byte release on non-terminal events (onNext)
The tests ensure the wrapper correctly manages circuit breaker accounting in all scenarios.
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (1)
186-189: Correct wiring of CircuitBreakerService into gRPC services.The circuit breaker service is consistently passed to both DocumentServiceImpl and SearchServiceImpl constructors across both secure and non-secure transport paths, ensuring memory protection is enabled for all gRPC requests.
Also applies to: 240-243
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/SearchServiceImpl.java (2)
47-60: Constructor properly validates circuit breaker service.The constructor includes appropriate null checks for the new CircuitBreakerService parameter, maintaining consistency with existing validation for client and queryUtils.
74-98: Circuit breaker integration follows correct lifecycle pattern.The implementation properly:
- Estimates and reserves bytes before processing (line 78)
- Wraps the observer to ensure bytes are released exactly once on terminal events (lines 80-84)
- Skips byte release when CircuitBreakingException occurs, since bytes were never added (lines 89-92)
- Releases bytes on synchronous exceptions after successful reservation (line 94)
- Delegates async byte release to CircuitBreakerStreamObserver (line 87)
The pattern prevents double-release because OpenSearch's
ActionListenercontract guarantees that eitheronResponse()oronFailure()is invoked—never both. Ifclient.search()throws synchronously, the listener is never invoked and bytes are released at line 94. If the search executes asynchronously, the listener invokes the wrapped observer exactly once (eitheronCompleted()oronError()), which releases bytes via an atomic flag.modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
53-77: Circuit breaker integration correctly implemented.The bulk method properly handles the circuit breaker lifecycle:
- Reserves bytes before processing (line 57)
- Wraps the observer to ensure exactly-once byte release on terminal events (lines 59-63)
- Correctly avoids releasing bytes when CircuitBreakingException occurs, since they were never added (lines 68-71)
- Releases bytes on synchronous failures after successful reservation (line 73)
- Delegates async byte release to CircuitBreakerStreamObserver (line 66)
The previous critical issue regarding byte release on breaker trips has been addressed—the CircuitBreakingException catch block no longer attempts to release unreserved bytes.
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (2)
54-67: Test setup correctly mocks circuit breaker dependencies.The setup properly creates mocks for CircuitBreakerService and CircuitBreaker, and wires them into the service under test. This enables comprehensive testing of circuit breaker interactions.
95-227: Comprehensive test coverage for circuit breaker integration.The test suite thoroughly validates all circuit breaker scenarios:
- Pre-processing breaker check (lines 95-107)
- Request rejection when breaker trips (lines 109-133)
- Byte release on successful completion (lines 135-166)
- Byte release on failure (lines 168-191)
- Prevention of double-release (lines 193-227)
The tests use ArgumentCaptor to simulate async callbacks, enabling thorough verification of the circuit breaker lifecycle in both success and failure paths. This pattern ensures the integration works correctly in production.
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (5)
51-66: LGTM! Circuit breaker test setup is correct.The mock setup properly wires the CircuitBreakerService to return the IN_FLIGHT_REQUESTS breaker, and the constructor call correctly includes the circuitBreakerService parameter.
88-96: LGTM! Null validation test added.The new test properly validates that CircuitBreakerService cannot be null and verifies the appropriate error message.
137-149: LGTM! Circuit breaker pre-processing check verified.This test correctly verifies that the circuit breaker is checked before processing the search request, aligning with the PR objective.
151-175: LGTM! Breaker trip and rejection logic verified.This test correctly verifies that when the circuit breaker trips, the request is rejected before processing (client.search is never called), and an error is propagated to the client. The test aligns with the PR objective for request rejection behavior.
177-211: LGTM! StreamObserver wrapper and success path verified.This test correctly verifies that the CircuitBreakerStreamObserver wraps the response observer and releases bytes on successful completion, aligning with the PR objective for byte release on success.
...transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java
Show resolved
Hide resolved
...nsport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
Outdated
Show resolved
Hide resolved
|
❌ Gradle check result for 1837d3f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Sam Akrah <sakrah@uber.com>
|
❌ Gradle check result for 902e1af: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 902e1af: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: sakrah <sakrah.sky@gmail.com>
- Remove duplicate testSearchWithException test - Enhance testCircuitBreakerBytesReleasedOnException to capture and verify that released bytes equal negative of added bytes Signed-off-by: Sam Akrah <sakrah@uber.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (1)
160-194: Consider simplifying the extensive SearchResponse mocking.The test correctly verifies that bytes are released on successful completion through the CircuitBreakerStreamObserver wrapper. However, the SearchResponse mocking (lines 175-185) is quite extensive and could make the test brittle to structural changes.
💡 Alternative: Extract response creation to a test helper
Consider extracting the SearchResponse creation to a helper method to reduce duplication and improve maintainability:
private SearchResponse createMockSearchResponse() { SearchResponse mockResponse = mock(SearchResponse.class); when(mockResponse.getTook()).thenReturn(TimeValue.timeValueMillis(100)); when(mockResponse.isTimedOut()).thenReturn(false); when(mockResponse.getTotalShards()).thenReturn(5); when(mockResponse.getSuccessfulShards()).thenReturn(5); when(mockResponse.getSkippedShards()).thenReturn(0); when(mockResponse.getFailedShards()).thenReturn(0); when(mockResponse.getShardFailures()).thenReturn(new org.opensearch.action.search.ShardSearchFailure[0]); when(mockResponse.getClusters()).thenReturn(new SearchResponse.Clusters(0, 0, 0)); when(mockResponse.getHits()).thenReturn(SearchHits.empty()); when(mockResponse.getInternalResponse()).thenReturn(mock(SearchResponseSections.class)); return mockResponse; }This would allow reuse if similar response mocking is needed in future tests.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: The gRPC search API in OpenSearch is marked as "experimental" in official documentation, so changes to proto schemas that remove previously unsupported fields (those throwing UnsupportedOperationException) are not considered breaking changes.
📚 Learning: 2026-01-02T19:23:16.689Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:16.689Z
Learning: In OpenSearch's gRPC transport module, treat removals of previously-unsupported fields (those throwing UnsupportedOperationException) from experimental proto schemas as non-breaking changes. When reviewing changes to proto-related Java code in this module, document that such removals do not count as breaking API changes, and ensure tests reflect compatibility expectations accordingly.
Applied to files:
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
📚 Learning: 2025-12-12T18:40:08.452Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:256-256
Timestamp: 2025-12-12T18:40:08.452Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), URI limit validation has been moved from the protocol layer to the transport layer, making it protocol-agnostic. The random protocol selection in ReactorHttpClient.https(settings) is intentional to ensure all tests validate correct behavior across HTTP/1.1, HTTP/2, and HTTP/3.
Applied to files:
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
📚 Learning: 2025-12-12T13:31:51.234Z
Learnt from: andriyredko
Repo: opensearch-project/OpenSearch PR: 20017
File: plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java:642-646
Timestamp: 2025-12-12T13:31:51.234Z
Learning: In the OpenSearch ReactorNetty4 secure HTTP transport tests (plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ssl/SecureReactorNetty4HttpServerTransportTests.java), the createBuilderWithPort() helper intentionally uses randomBoolean() for the HTTP/3 enabled setting to ensure all tests validate correct behavior with both HTTP/3 enabled and disabled configurations.
Applied to files:
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
📚 Learning: 2026-01-02T19:23:29.698Z
Learnt from: karenyrx
Repo: opensearch-project/OpenSearch PR: 20335
File: modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/search/SearchSourceBuilderProtoUtils.java:155-172
Timestamp: 2026-01-02T19:23:29.698Z
Learning: In the transport-grpc module, suggest and aggregations protos were removed from SearchRequestBody in protobufs 1.0.0 because they haven't been vetted for accuracy in the API specification. The URL parameter suggest support (suggest_field, suggest_mode, suggest_size, suggest_text) is a minimized subset and not intended as a replacement for full Suggester functionality.
Applied to files:
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: detect-breaking-change
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: Analyze (java)
🔇 Additional comments (5)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/SearchServiceImplTests.java (5)
51-66: LGTM: Circuit breaker test infrastructure properly configured.The mock setup correctly chains the CircuitBreakerService to return the mocked CircuitBreaker, and the constructor now properly includes the circuit breaker service dependency.
88-96: LGTM: Comprehensive constructor validation.The new null-check test for the circuit breaker service parameter completes the constructor validation coverage.
120-158: LGTM: Circuit breaker integration properly tested.These tests correctly verify:
- The circuit breaker is checked before processing requests
- Requests are rejected when the breaker trips without invoking the downstream client
- Errors are properly propagated to the response observer
196-224: LGTM: Excellent byte accounting validation.The test correctly verifies that bytes are properly tracked and released on exception paths, with good magnitude validation ensuring the released bytes equal the negative of added bytes.
43-231: Double-release protection is already tested comprehensively.The CircuitBreakerStreamObserverTests.java file contains explicit tests for all double-release scenarios:
testBytesReleasedExactlyOnceOnMultipleCalls()verifies that callingonCompleted()thenonError()releases bytes exactly oncetestBytesReleasedExactlyOnceOnErrorThenCompleted()verifies that callingonError()thenonCompleted()releases bytes exactly onceBoth tests use
verify(circuitBreaker, times(1)).addWithoutBreaking()to enforce that bytes are released only once regardless of multiple terminal call attempts. The concern in the original comment is not applicable.Likely an incorrect or invalid review comment.
|
❌ Gradle check result for b0c9ee5: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: sakrah <sakrah.sky@gmail.com>
|
❕ Gradle check result for 4876c61: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
Description
This PR adds circuit breaker support to the gRPC transport layer, similar to how the REST API handles memory protection. This prevents out-of-memory errors by rejecting requests when memory pressure is too high.
Changes
DocumentServiceImplto check circuit breakers before processing bulk requestsSearchServiceImplto check circuit breakers before processing bulk requestsGrpcPluginto passCircuitBreakerServicetoDocumentServiceImplandSearchServiceImplTesting
All existing tests pass. Added new tests for:
Related Issues
#20286
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
New Features
Tests
Changelog
✏️ Tip: You can customize this high-level summary in your review settings.