Skip to content
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
Premium large-message entities by reading the `com.microsoft:max-message-batch-size` vendor property
from the AMQP sender link instead of using `max-message-size`. ([#48214](https://github.com/Azure/azure-sdk-for-java/pull/48214))
- Fixed `ServiceBusAdministrationClient.updateSubscription()` silently ignoring `defaultMessageTimeToLive` changes. The property was incorrectly nullified before serialization. ([#48495](https://github.com/Azure/azure-sdk-for-java/issues/48495))
- Fixed session-enabled `ServiceBusProcessorClient` logging a spurious `DeliveryNotOnLinkException`
("...does not exist in the link's DeliveryMap") at ERROR when a message handler settles a message
manually (e.g. `complete()`) while auto-complete is left enabled. The V2 session disposition path now
marks the message settled on success, so the redundant auto-settlement short-circuits at the
already-settled guard instead of attempting a second disposition on the receive-link. The message was
always settled correctly; only the misleading error log is removed. ([#47356](https://github.com/Azure/azure-sdk-for-java/issues/47356))

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,15 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit

Mono<Void> updateDispositionMono;
if (receiver != null) {
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState);
// Mark the message settled once the broker acknowledges the disposition, mirroring the non-session
// and V1 session paths (ServiceBusReceiverAsyncClient). This arms the message.isSettled() guard above
// so that a redundant settlement attempt (e.g. when the handler calls complete() manually AND
// auto-complete is left enabled) short-circuits here with a benign "already settled" error instead of
// reaching the receive-link and logging a spurious DeliveryNotOnLinkException once the delivery has
// already been removed from the link's DeliveryMap.
// See https://github.com/Azure/azure-sdk-for-java/issues/47356
updateDispositionMono = receiver.updateDisposition(message.getLockToken(), deliveryState)
.<Void>then(Mono.fromRunnable(() -> message.setIsSettled()));
} else {
updateDispositionMono
= Mono.error(DeliveryNotOnLinkException.noMatchingDelivery(message.getLockToken(), deliveryState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -501,6 +502,62 @@ public void shouldCompleteMessageWhenSessionIdDiffersInCase() {
verify(onTerminate, times(1)).run();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldNotReDispositionWhenHandlerSettlesWithAutoCompleteEnabled() {
// Regression test for https://github.com/Azure/azure-sdk-for-java/issues/47356
// When a session message handler settles the message manually (e.g. complete()) while auto-complete is
// left enabled, the redundant auto-settlement must short-circuit at the message.isSettled() guard and
// must NOT issue a second disposition on the receive-link (which would log a spurious
// DeliveryNotOnLinkException once the delivery has been removed from the link's DeliveryMap).
final String session1Id = "1";

final HashMap<Message, ServiceBusReceivedMessage> session1Messages = createMockMessages(session1Id, 1);
// Wire the mock message's settled flag so the isSettled() guard in updateDisposition() behaves like the
// real message: setIsSettled() flips it to true and isSettled() reflects the current value.
final ServiceBusReceivedMessage message = session1Messages.values().iterator().next();
final AtomicBoolean settled = new AtomicBoolean(false);
when(message.isSettled()).thenAnswer(__ -> settled.get());
doAnswer(__ -> {
settled.set(true);
return null;
}).when(message).setIsSettled();

final TestPublisher<AmqpEndpointState> session1EpStates = TestPublisher.createCold();
session1EpStates.next(AmqpEndpointState.ACTIVE);
final Session session1 = createMockSession(session1Id, session1Messages, session1EpStates);
when(session1.getLink().updateDisposition(any(), any())).thenReturn(Mono.empty());
final MessageSerializer serializer = createMockmessageSerializer(session1Messages);
final ServiceBusSessionAcquirer sessionAcquirer = createMockSessionAcquirer(session1);
final Runnable onTerminate = createMockOnTerminate();

final int maxSessions = 1;
final int concurrency = 1;
final boolean autoDispositionDisabled = false; // auto-complete enabled.
// The handler settles the message manually, mirroring the customer scenario in the issue.
final Consumer<ServiceBusReceivedMessageContext> processMessage = ServiceBusReceivedMessageContext::complete;
final Consumer<ServiceBusErrorContext> processError = e -> {
};
final SessionsMessagePump pump = createSessionsMessagePump(sessionAcquirer, idleTimeoutDisabled, maxSessions,
concurrency, autoDispositionDisabled, serializer, processMessage, processError, onTerminate);

try (VirtualTimeStepVerifier verifier = new VirtualTimeStepVerifier()) {
verifier.create(() -> pump.begin()).thenAwait().thenCancel().verify();
}

// The successful manual disposition marks the message settled...
Assertions.assertTrue(message.isSettled());
verify(message, times(1)).setIsSettled();
// ...so the redundant auto-complete short-circuits at the isSettled() guard and the receive-link sees
// exactly ONE disposition (the manual one), never a second that would raise DeliveryNotOnLinkException.
verify(session1.getLink(), times(1)).updateDisposition(lockTokenCaptor.capture(),
deliveryStateCaptor.capture());
Assertions.assertEquals(message.getLockToken(), lockTokenCaptor.getValue());
Assertions.assertEquals(Accepted.getInstance(), deliveryStateCaptor.getValue());
verify(session1.getLink(), times(1)).closeAsync();
verify(onTerminate, times(1)).run();
}

@Test
@Execution(ExecutionMode.SAME_THREAD)
public void shouldEmitErrorIfBeginInvokedMoreThanOnce() {
Expand Down
Loading