-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Add Circuit breaker support #20203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add Circuit breaker support #20203
Conversation
Signed-off-by: sakrah <[email protected]>
WalkthroughDocumentServiceImpl now accepts a CircuitBreakerService and enforces an IN_FLIGHT_REQUESTS circuit-breaker check for bulk requests: it estimates request bytes, calls addEstimateBytesAndMaybeBreak before sending, and ensures bytes are released on success, failure, or pre-call exceptions. Changes
Sequence DiagramsequenceDiagram
actor Client
participant DocumentServiceImpl
participant CircuitBreakerService
participant CircuitBreaker
participant BulkService
participant ResponseObserver
Client->>DocumentServiceImpl: bulk(request)
DocumentServiceImpl->>DocumentServiceImpl: compute contentLength
DocumentServiceImpl->>CircuitBreakerService: getBreaker(IN_FLIGHT_REQUESTS)
CircuitBreakerService-->>DocumentServiceImpl: CircuitBreaker
rect rgb(240,248,255)
DocumentServiceImpl->>CircuitBreaker: addEstimateBytesAndMaybeBreak(size, "<grpc_bulk_request>")
alt breaker throws
CircuitBreaker-->>DocumentServiceImpl: throws CircuitBreakingException
DocumentServiceImpl->>CircuitBreaker: addWithoutBreaking(-size)
DocumentServiceImpl->>ResponseObserver: onError(exception)
else breaker allows
CircuitBreaker-->>DocumentServiceImpl: ok
Note over DocumentServiceImpl,BulkService: wrap listener to release bytes on completion/failure
DocumentServiceImpl->>BulkService: bulk(request, wrappedListener)
BulkService-->>DocumentServiceImpl: onResponse / onFailure
DocumentServiceImpl->>CircuitBreaker: addWithoutBreaking(-size)
DocumentServiceImpl->>ResponseObserver: onNext/onError
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
30-41: Circuit breaker integration inbulklooks correct; consider tightening try/catch scope to avoid theoretical double-releaseThe new
CircuitBreakerServicedependency and thebulkimplementation cover the important paths:
- Reserve bytes via
IN_FLIGHT_REQUESTSbefore work.- Release bytes in the wrapped listener on both
onResponseandonFailure.- Release bytes in the
catchblock when failing beforeclient.bulk(...)(e.g., CB trip or request parsing error).One subtle edge case to be aware of: the
trycurrently also wraps theclient.bulk(bulkRequest, wrappedListener)call. Ifclient.bulkwere ever to:
- Invoke the provided listener (causing
addWithoutBreaking(-contentLength)via thefinally), and- Then throw a
RuntimeExceptionsynchronously,the
catchblock would calladdWithoutBreaking(-contentLength)a second time, double-decrementing the in-flight breaker.If
NodeClient.bulkis guaranteed to either invoke the listener or throw (but not both), the current code is fine. If you want this to be future-proof, a defensive option would be to narrow thetry/catchto only cover the breaker reservation +BulkRequestProtoUtils.prepareRequest(...), letting the wrapped listener be solely responsible for releasing bytes onceclient.bulkhas been invoked.Not a blocker, but worth considering to make the accounting robust against changes in
client.bulkbehavior.Also applies to: 52-88
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (1)
11-17: Circuit breaker tests give solid coverage of the new behaviorThe added mocks and tests around
CircuitBreakerServiceandCircuitBreakerexercise all the key flows:
- CB is consulted before processing (
addEstimateBytesAndMaybeBreakwith the expected source tag).- Trip path rejects the request, does not call
client.bulk, releases bytes, and surfaces an error.- Both success and failure paths through the wrapped
ActionListenerrelease bytes.- The “exactly once” test guards against double-release in the happy path.
If you want to tighten them further, a minor optional enhancement would be to capture the
longarguments passed toaddWithoutBreakingand assert they are negative and consistent with the serialized request size, but the current tests are already effective for regressions in control flow.Also applies to: 27-42, 50-55, 59-64, 91-203
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java(2 hunks)modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java(2 hunks)modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java(3 hunks)modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: detect-breaking-change
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: Analyze (java)
🔇 Additional comments (2)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java (1)
41-41: Stream → Collectors migration for interceptor lists looks correctUsing
Collectors.toList()here is compatible with older JDKs and preserves the intended behavior for the interceptor collections in tests. No issues from my side.Also applies to: 633-636, 653-658
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (1)
19-19: CircuitBreakerService injection into gRPC document service is wired correctlyBoth
getAuxTransportsandgetSecureAuxTransportsnow constructDocumentServiceImplwith the sharedCircuitBreakerService, keeping behavior consistent across secure/non-secure transports. This matches the updated constructor and looks good.Also applies to: 184-187, 236-238
|
❌ Gradle check result for 015c303: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
- Add entry for PR opensearch-project#20203 - Document circuit breaker support for gRPC transport Signed-off-by: sakrah <[email protected]>
Signed-off-by: sakrah <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
BulkRequestProtoUtils(24-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: Analyze (java)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15-intel)
🔇 Additional comments (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (3)
13-14: Imports look correct for the circuit breaker integration.The imports are appropriate for the IN_FLIGHT_REQUESTS circuit breaker pattern and the AtomicBoolean for ensuring single-release semantics.
Also applies to: 24-25
32-43: Constructor changes for circuit breaker integration look good.The
circuitBreakerServicefield is properly declared asfinaland the constructor signature update is documented. The AI summary confirms thatGrpcPluginhas been updated to pass theCircuitBreakerService.
63-89: Wrapped listener implementation correctly ensures byte release.The anonymous
ActionListenerproperly wraps the base listener and uses theAtomicBooleanguard withcompareAndSetto ensure bytes are released exactly once on either success or failure path. Thefinallyblocks guarantee release even if the base listener throws.
| final int contentLength = request.getSerializedSize(); | ||
| CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); | ||
| final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
|
||
| try { | ||
| inFlightRequestsBreaker.addEstimateBytesAndMaybeBreak(contentLength, "<grpc_bulk_request>"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Bytes released even when breaker trips before adding them.
If addEstimateBytesAndMaybeBreak() throws a CircuitBreakingException, the bytes were not added to the breaker. However, the catch block at lines 91-93 unconditionally calls addWithoutBreaking(-contentLength), which will subtract bytes that were never reserved. This corrupts the circuit breaker accounting and can lead to negative byte tracking.
Track whether bytes were successfully added before attempting release:
final int contentLength = request.getSerializedSize();
CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
final AtomicBoolean closed = new AtomicBoolean(false);
+ boolean bytesReserved = false;
try {
inFlightRequestsBreaker.addEstimateBytesAndMaybeBreak(contentLength, "<grpc_bulk_request>");
+ bytesReserved = true;
org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request);
// ... (listener code unchanged)
client.bulk(bulkRequest, wrappedListener);
} catch (RuntimeException e) {
- if (closed.compareAndSet(false, true)) {
+ if (bytesReserved && closed.compareAndSet(false, true)) {
inFlightRequestsBreaker.addWithoutBreaking(-contentLength);
}
logger.debug("DocumentServiceImpl failed: {} - {}", e.getClass().getSimpleName(), e.getMessage());Also applies to: 90-93
🤖 Prompt for AI Agents
In
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java
around lines 54-59 (and also address lines 90-93), the code currently calls
inFlightRequestsBreaker.addEstimateBytesAndMaybeBreak(contentLength, ...) and
later unconditionally calls addWithoutBreaking(-contentLength) even if the
addEstimateBytesAndMaybeBreak threw, which can decrement bytes that were never
added; fix by introducing a boolean flag (e.g., bytesReserved) set to true only
after addEstimateBytesAndMaybeBreak returns successfully, and only call
addWithoutBreaking(-contentLength) in the cleanup/close path when bytesReserved
is true (ensure the same pattern is applied to the code at lines 90-93) so you
never release bytes that were not reserved.
|
❌ Gradle check result for f0f43c0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
This PR adds circuit breaker support to the gRPC transport layer, similar to how the REST API handles memory protection. This prevents out-of-memory errors by rejecting requests when memory pressure is too high.
Changes
DocumentServiceImplto check circuit breakers before processing bulk requestsGrpcPluginto passCircuitBreakerServicetoDocumentServiceImplTesting
All existing tests pass. Added new tests for:
Related Issues
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
New Features
Tests
Changelog
✏️ Tip: You can customize this high-level summary in your review settings.