Skip to content

Conversation

@sakrah
Copy link

@sakrah sakrah commented Dec 9, 2025

Description

This PR adds circuit breaker support to the gRPC transport layer, similar to how the REST API handles memory protection. This prevents out-of-memory errors by rejecting requests when memory pressure is too high.

Changes

  • Modified DocumentServiceImpl to check circuit breakers before processing bulk requests
  • Added circuit breaker byte tracking that releases bytes on request completion (success or failure)
  • Updated GrpcPlugin to pass CircuitBreakerService to DocumentServiceImpl
  • Added comprehensive tests for circuit breaker functionality

Testing

All existing tests pass. Added new tests for:

  • Circuit breaker checked before processing
  • Request rejection when circuit breaker trips
  • Proper byte release on success/failure
  • No double-release of bytes

Related Issues

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Summary by CodeRabbit

  • New Features

    • Added circuit breaker protection for gRPC bulk requests to limit in-flight memory usage and improve stability.
  • Tests

    • Added tests validating circuit breaker checks, rejection behavior, and guaranteed release of tracked bytes on success or failure.
  • Changelog

    • Documented the new circuit breaker support in the Unreleased changelog entry.

✏️ Tip: You can customize this high-level summary in your review settings.

@sakrah sakrah requested a review from a team as a code owner December 9, 2025 21:11
@github-actions github-actions bot added stalled Issues that have stalled labels Dec 9, 2025
@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Walkthrough

DocumentServiceImpl now accepts a CircuitBreakerService and enforces an IN_FLIGHT_REQUESTS circuit-breaker check for bulk requests: it estimates request bytes, calls addEstimateBytesAndMaybeBreak before sending, and ensures bytes are released on success, failure, or pre-call exceptions.

Changes

Cohort / File(s) Summary
Plugin wiring & service
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java, modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java
DocumentServiceImpl constructor updated to accept CircuitBreakerService. bulk(...) now computes content length, obtains the IN_FLIGHT_REQUESTS breaker, calls addEstimateBytesAndMaybeBreak, wraps the listener to call addWithoutBreaking(-size) on completion or failure, and releases bytes on pre-call exceptions.
Tests and collection changes
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java, modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java
Replaced Stream.toList() with Stream.collect(Collectors.toList()) in GrpcPluginTests. DocumentServiceImplTests updated to mock CircuitBreakerService/CircuitBreaker and add tests verifying breaker check, trip/reject behavior, and that bytes are released exactly once on success or failure.
Changelog
CHANGELOG.md
Added Unreleased 3.x entry documenting circuit breaker support for gRPC transport.

Sequence Diagram

sequenceDiagram
    actor Client
    participant DocumentServiceImpl
    participant CircuitBreakerService
    participant CircuitBreaker
    participant BulkService
    participant ResponseObserver

    Client->>DocumentServiceImpl: bulk(request)
    DocumentServiceImpl->>DocumentServiceImpl: compute contentLength
    DocumentServiceImpl->>CircuitBreakerService: getBreaker(IN_FLIGHT_REQUESTS)
    CircuitBreakerService-->>DocumentServiceImpl: CircuitBreaker

    rect rgb(240,248,255)
    DocumentServiceImpl->>CircuitBreaker: addEstimateBytesAndMaybeBreak(size, "<grpc_bulk_request>")
    alt breaker throws
        CircuitBreaker-->>DocumentServiceImpl: throws CircuitBreakingException
        DocumentServiceImpl->>CircuitBreaker: addWithoutBreaking(-size)
        DocumentServiceImpl->>ResponseObserver: onError(exception)
    else breaker allows
        CircuitBreaker-->>DocumentServiceImpl: ok
        Note over DocumentServiceImpl,BulkService: wrap listener to release bytes on completion/failure
        DocumentServiceImpl->>BulkService: bulk(request, wrappedListener)
        BulkService-->>DocumentServiceImpl: onResponse / onFailure
        DocumentServiceImpl->>CircuitBreaker: addWithoutBreaking(-size)
        DocumentServiceImpl->>ResponseObserver: onNext/onError
    end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Verify listener wrapping ensures bytes released exactly once (closed guard).
  • Confirm GrpcPlugin registers services with the new constructor in all transport registration paths.
  • Review tests for correct mocking and that exception paths assert observer onError and no client.bulk invocation.

Poem

🐇 I counted bytes beneath the moon,

I held them tight, then let them swoon,
If weight grows fierce and memory weeps,
I trip the line and guard your sleeps.
One hop, one heartbeat—bytes set free.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.29% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: adding circuit breaker support to the gRPC transport layer.
Description check ✅ Passed The description is comprehensive and follows the template structure with clear sections for Changes, Testing, and completed checklist items.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)

30-41: Circuit breaker integration in bulk looks correct; consider tightening try/catch scope to avoid theoretical double-release

The new CircuitBreakerService dependency and the bulk implementation cover the important paths:

  • Reserve bytes via IN_FLIGHT_REQUESTS before work.
  • Release bytes in the wrapped listener on both onResponse and onFailure.
  • Release bytes in the catch block when failing before client.bulk(...) (e.g., CB trip or request parsing error).

One subtle edge case to be aware of: the try currently also wraps the client.bulk(bulkRequest, wrappedListener) call. If client.bulk were ever to:

  1. Invoke the provided listener (causing addWithoutBreaking(-contentLength) via the finally), and
  2. Then throw a RuntimeException synchronously,

the catch block would call addWithoutBreaking(-contentLength) a second time, double-decrementing the in-flight breaker.

If NodeClient.bulk is guaranteed to either invoke the listener or throw (but not both), the current code is fine. If you want this to be future-proof, a defensive option would be to narrow the try/catch to only cover the breaker reservation + BulkRequestProtoUtils.prepareRequest(...), letting the wrapped listener be solely responsible for releasing bytes once client.bulk has been invoked.

Not a blocker, but worth considering to make the accounting robust against changes in client.bulk behavior.

Also applies to: 52-88

modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (1)

11-17: Circuit breaker tests give solid coverage of the new behavior

The added mocks and tests around CircuitBreakerService and CircuitBreaker exercise all the key flows:

  • CB is consulted before processing (addEstimateBytesAndMaybeBreak with the expected source tag).
  • Trip path rejects the request, does not call client.bulk, releases bytes, and surfaces an error.
  • Both success and failure paths through the wrapped ActionListener release bytes.
  • The “exactly once” test guards against double-release in the happy path.

If you want to tighten them further, a minor optional enhancement would be to capture the long arguments passed to addWithoutBreaking and assert they are negative and consistent with the serialized request size, but the current tests are already effective for regressions in control flow.

Also applies to: 27-42, 50-55, 59-64, 91-203

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba9ec47 and 015c303.

📒 Files selected for processing (4)
  • modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (2 hunks)
  • modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (2 hunks)
  • modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java (3 hunks)
  • modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/services/document/DocumentServiceImplTests.java (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
  • BulkRequestProtoUtils (24-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: detect-breaking-change
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: Analyze (java)
🔇 Additional comments (2)
modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/GrpcPluginTests.java (1)

41-41: Stream → Collectors migration for interceptor lists looks correct

Using Collectors.toList() here is compatible with older JDKs and preserves the intended behavior for the interceptor collections in tests. No issues from my side.

Also applies to: 633-636, 653-658

modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/GrpcPlugin.java (1)

19-19: CircuitBreakerService injection into gRPC document service is wired correctly

Both getAuxTransports and getSecureAuxTransports now construct DocumentServiceImpl with the shared CircuitBreakerService, keeping behavior consistent across secure/non-secure transports. This matches the updated constructor and looks good.

Also applies to: 184-187, 236-238

@github-actions
Copy link
Contributor

github-actions bot commented Dec 9, 2025

❌ Gradle check result for 015c303: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

- Add entry for PR opensearch-project#20203
- Document circuit breaker support for gRPC transport

Signed-off-by: sakrah <[email protected]>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dfa35ba and f0f43c0.

📒 Files selected for processing (1)
  • modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (1)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestProtoUtils.java (1)
  • BulkRequestProtoUtils (24-100)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: Analyze (java)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, macos-15-intel)
🔇 Additional comments (3)
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java (3)

13-14: Imports look correct for the circuit breaker integration.

The imports are appropriate for the IN_FLIGHT_REQUESTS circuit breaker pattern and the AtomicBoolean for ensuring single-release semantics.

Also applies to: 24-25


32-43: Constructor changes for circuit breaker integration look good.

The circuitBreakerService field is properly declared as final and the constructor signature update is documented. The AI summary confirms that GrpcPlugin has been updated to pass the CircuitBreakerService.


63-89: Wrapped listener implementation correctly ensures byte release.

The anonymous ActionListener properly wraps the base listener and uses the AtomicBoolean guard with compareAndSet to ensure bytes are released exactly once on either success or failure path. The finally blocks guarantee release even if the base listener throws.

Comment on lines +54 to +59
final int contentLength = request.getSerializedSize();
CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
final AtomicBoolean closed = new AtomicBoolean(false);

try {
inFlightRequestsBreaker.addEstimateBytesAndMaybeBreak(contentLength, "<grpc_bulk_request>");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Bytes released even when breaker trips before adding them.

If addEstimateBytesAndMaybeBreak() throws a CircuitBreakingException, the bytes were not added to the breaker. However, the catch block at lines 91-93 unconditionally calls addWithoutBreaking(-contentLength), which will subtract bytes that were never reserved. This corrupts the circuit breaker accounting and can lead to negative byte tracking.

Track whether bytes were successfully added before attempting release:

     final int contentLength = request.getSerializedSize();
     CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
     final AtomicBoolean closed = new AtomicBoolean(false);
+    boolean bytesReserved = false;

     try {
         inFlightRequestsBreaker.addEstimateBytesAndMaybeBreak(contentLength, "<grpc_bulk_request>");
+        bytesReserved = true;

         org.opensearch.action.bulk.BulkRequest bulkRequest = BulkRequestProtoUtils.prepareRequest(request);
         // ... (listener code unchanged)

         client.bulk(bulkRequest, wrappedListener);
     } catch (RuntimeException e) {
-        if (closed.compareAndSet(false, true)) {
+        if (bytesReserved && closed.compareAndSet(false, true)) {
             inFlightRequestsBreaker.addWithoutBreaking(-contentLength);
         }
         logger.debug("DocumentServiceImpl failed: {} - {}", e.getClass().getSimpleName(), e.getMessage());

Also applies to: 90-93

🤖 Prompt for AI Agents
In
modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java
around lines 54-59 (and also address lines 90-93), the code currently calls
inFlightRequestsBreaker.addEstimateBytesAndMaybeBreak(contentLength, ...) and
later unconditionally calls addWithoutBreaking(-contentLength) even if the
addEstimateBytesAndMaybeBreak threw, which can decrement bytes that were never
added; fix by introducing a boolean flag (e.g., bytesReserved) set to true only
after addEstimateBytesAndMaybeBreak returns successfully, and only call
addWithoutBreaking(-contentLength) in the cleanup/close path when bytesReserved
is true (ensure the same pattern is applied to the code at lines 90-93) so you
never release bytes that were not reserved.

@github-actions
Copy link
Contributor

github-actions bot commented Dec 9, 2025

❌ Gradle check result for f0f43c0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@opensearch-trigger-bot opensearch-trigger-bot bot removed the stalled Issues that have stalled label Dec 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants