Skip to content

Commit 7f08056

Browse files
committed
binder: make Listener callbacks mutually exclusive and fix in-use race
- Serialize transportShutdown/transportTerminated/transportReady/transportInUse under listener lock - Atomic in-use counter with reconcile to prevent incorrect false when −1/+1 race occurs - Dispatch callbacks asynchronously to avoid lock-order deadlocks - Behavior unchanged for users; improves correctness under concurrency
1 parent 896c2fd commit 7f08056

File tree

1 file changed

+75
-19
lines changed

1 file changed

+75
-19
lines changed

binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java

Lines changed: 75 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -314,22 +314,38 @@ public synchronized void shutdownNow(Status reason) {
314314
@Override
315315
@GuardedBy("this")
316316
void notifyShutdown(Status status) {
317-
clientTransportListener.transportShutdown(status);
317+
// Defer to listener executor with external synchronization
318+
scheduleOnListener(
319+
new Runnable() {
320+
@Override
321+
public void run() {
322+
clientTransportListener.transportShutdown(status);
323+
}
324+
});
318325
}
319326

320327
@Override
321328
@GuardedBy("this")
322329
void notifyTerminated() {
323330
if (numInUseStreams.getAndSet(0) > 0) {
324-
listenerInUse.set(false);
325-
clientTransportListener.transportInUse(false);
331+
if (listenerInUse.compareAndSet(true, false)) {
332+
scheduleTransportInUseNotification(false);
333+
} else {
334+
listenerInUse.set(false);
335+
}
326336
}
327337
if (readyTimeoutFuture != null) {
328338
readyTimeoutFuture.cancel(false);
329339
readyTimeoutFuture = null;
330340
}
331341
serviceBinding.unbind();
332-
clientTransportListener.transportTerminated();
342+
scheduleOnListener(
343+
new Runnable() {
344+
@Override
345+
public void run() {
346+
clientTransportListener.transportTerminated();
347+
}
348+
});
333349
}
334350

335351
@Override
@@ -449,8 +465,11 @@ public void handleSetupTransport() {
449465
@GuardedBy("this")
450466
private void onHandshakeComplete() {
451467
setState(TransportState.READY);
452-
attributes = clientTransportListener.filterTransport(attributes);
453-
clientTransportListener.transportReady();
468+
final Attributes currentAttrs = attributes;
469+
// Perform filter on listener thread with external synchronization, then update attrs and
470+
// notify ready without holding transport lock to avoid deadlocks.
471+
scheduleFilterTransportAndReady(currentAttrs);
472+
454473
if (readyTimeoutFuture != null) {
455474
readyTimeoutFuture.cancel(false);
456475
readyTimeoutFuture = null;
@@ -471,34 +490,32 @@ private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta)
471490
if (!countsForInUse) {
472491
return;
473492
}
474-
int prev, next;
475493

476494
if (delta > 0) {
477-
next = numInUseStreams.incrementAndGet();
478-
prev = next - 1;
495+
numInUseStreams.incrementAndGet();
479496
} else {
480-
prev = numInUseStreams.get();
481-
int updated;
497+
// Decrement with floor at 0
498+
int prev = numInUseStreams.get();
482499

483500
while (true) {
484501
int current = prev;
485502
int newValue = current > 0 ? current - 1 : 0;
486503
if (numInUseStreams.compareAndSet(current, newValue)) {
487-
updated = newValue;
488504
break;
489505
}
490506
prev = numInUseStreams.get();
491507
}
492-
next = updated;
493508
}
509+
reconcileInUseState();
510+
}
494511

495-
boolean prevInUse = prev > 0;
496-
boolean nextInUse = next > 0;
512+
/** Reconcile listenerInUse with the current stream count to avoid stale toggles under races. */
513+
private void reconcileInUseState() {
514+
boolean nowInUse = numInUseStreams.get() > 0;
515+
boolean prev = listenerInUse.get();
497516

498-
if (prevInUse != nextInUse) {
499-
if (listenerInUse.compareAndSet(prevInUse, nextInUse)) {
500-
scheduleTransportInUseNotification(nextInUse);
501-
}
517+
if(prev != nowInUse && listenerInUse.compareAndSet(prev, nowInUse)) {
518+
scheduleTransportInUseNotification(nowInUse);
502519
}
503520
}
504521

@@ -519,6 +536,45 @@ public void run() {
519536
});
520537
}
521538

539+
private void scheduleFilterTransportAndReady(final Attributes attrsSnapshot) {
540+
getScheduledExecutorService()
541+
.execute(
542+
new Runnable() {
543+
@Override
544+
public void run() {
545+
final Attributes filtered;
546+
synchronized (listenerNotifyLock) {
547+
filtered = clientTransportListener.filterTransport(attrsSnapshot);
548+
}
549+
550+
synchronized (BinderClientTransport.class) {
551+
attributes = filtered;
552+
}
553+
554+
scheduleOnListener(
555+
new Runnable() {
556+
@Override
557+
public void run() {
558+
clientTransportListener.transportReady();
559+
}
560+
});
561+
}
562+
});
563+
}
564+
565+
private void scheduleOnListener(final Runnable task) {
566+
getScheduledExecutorService()
567+
.execute(
568+
new Runnable() {
569+
@Override
570+
public void run() {
571+
synchronized (listenerNotifyLock) {
572+
task.run();
573+
}
574+
}
575+
});
576+
}
577+
522578
@GuardedBy("this")
523579
@Override
524580
protected void handlePingResponse(Parcel parcel) {

0 commit comments

Comments
 (0)