5757import io .grpc .internal .StatsTraceContext ;
5858import java .util .concurrent .Executor ;
5959import java .util .concurrent .ScheduledFuture ;
60- import java . util . concurrent . atomic . AtomicInteger ;
60+
6161import javax .annotation .Nullable ;
6262import javax .annotation .concurrent .ThreadSafe ;
6363
@@ -73,7 +73,12 @@ public final class BinderClientTransport extends BinderTransport
7373 private final Bindable serviceBinding ;
7474
7575 /** Number of ongoing calls which keep this transport "in-use". */
76- private final AtomicInteger numInUseStreams ;
76+ @ GuardedBy ("this" )
77+ private int numInUseStreams ;
78+
79+ /** Last in-use state that was reported to the listener */
80+ @ GuardedBy ("this" )
81+ private boolean listenerInUse ;
7782
7883 private final long readyTimeoutMillis ;
7984 private final PingTracker pingTracker ;
@@ -114,7 +119,8 @@ public BinderClientTransport(
114119 Boolean preAuthServerOverride = options .getEagAttributes ().get (PRE_AUTH_SERVER_OVERRIDE );
115120 this .preAuthorizeServer =
116121 preAuthServerOverride != null ? preAuthServerOverride : factory .preAuthorizeServers ;
117- numInUseStreams = new AtomicInteger ();
122+ numInUseStreams = 0 ;
123+ listenerInUse = false ;
118124 pingTracker = new PingTracker (Ticker .systemTicker (), (id ) -> sendPing (id ));
119125
120126 serviceBinding =
@@ -259,9 +265,7 @@ public synchronized ClientStream newStream(
259265 return newFailingClientStream (failure , attributes , headers , tracers );
260266 }
261267
262- if (inbound .countsForInUse () && numInUseStreams .getAndIncrement () == 0 ) {
263- clientTransportListener .transportInUse (true );
264- }
268+ updateInUseStreamsIfNeed (inbound .countsForInUse (), 1 );
265269 Outbound .ClientOutbound outbound =
266270 new Outbound .ClientOutbound (this , callId , method , headers , statsTraceContext );
267271 if (method .getType ().clientSendsOneMessage ()) {
@@ -273,9 +277,7 @@ public synchronized ClientStream newStream(
273277
274278 @ Override
275279 protected void unregisterInbound (Inbound <?> inbound ) {
276- if (inbound .countsForInUse () && numInUseStreams .decrementAndGet () == 0 ) {
277- clientTransportListener .transportInUse (false );
278- }
280+ updateInUseStreamsIfNeed (inbound .countsForInUse (), -1 );
279281 super .unregisterInbound (inbound );
280282 }
281283
@@ -305,7 +307,9 @@ void notifyShutdown(Status status) {
305307 @ Override
306308 @ GuardedBy ("this" )
307309 void notifyTerminated () {
308- if (numInUseStreams .getAndSet (0 ) > 0 ) {
310+ if (numInUseStreams > 0 ) {
311+ numInUseStreams = 0 ;
312+ listenerInUse = false ;
309313 clientTransportListener .transportInUse (false );
310314 }
311315 if (readyTimeoutFuture != null ) {
@@ -391,6 +395,25 @@ private synchronized void handleAuthResult(Throwable t) {
391395 Status .INTERNAL .withDescription ("Could not evaluate SecurityPolicy" ).withCause (t ), true );
392396 }
393397
398+ /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */
399+ private synchronized void updateInUseStreamsIfNeed (boolean countsForInUse , int delta ) {
400+ if (!countsForInUse ) {
401+ return ;
402+ }
403+
404+ numInUseStreams += delta ;
405+ if (numInUseStreams < 0 ) {
406+ // Defensive: prevent negative due to unexpected double-decrement
407+ numInUseStreams = 0 ;
408+ }
409+
410+ boolean nowInUseStream = numInUseStreams > 0 ;
411+ if (nowInUseStream != listenerInUse ) {
412+ listenerInUse = nowInUseStream ;
413+ clientTransportListener .transportInUse (nowInUseStream );
414+ }
415+ }
416+
394417 @ GuardedBy ("this" )
395418 @ Override
396419 protected void handlePingResponse (Parcel parcel ) {
0 commit comments