Skip to content

Fix RabbitMQ queue binding and Gateway GetOriginalStream#498

Merged
alexeyzimarev merged 2 commits intodevfrom
fix/rabbitmq-queue-bind-and-gateway-stream
Feb 27, 2026
Merged

Fix RabbitMQ queue binding and Gateway GetOriginalStream#498
alexeyzimarev merged 2 commits intodevfrom
fix/rabbitmq-queue-bind-and-gateway-stream

Conversation

@alexeyzimarev
Copy link
Contributor

Summary

  • RabbitMQ: QueueBind used Options.SubscriptionId instead of the resolved queue name, so overriding the queue via QueueOptions.Queue broke exchange-to-queue binding and message delivery
  • Gateway: GetOriginalStream() retrieved the header as System.IO.Stream instead of StreamName, causing it to always return null

Both bugs were identified by Qodo review on #495.

Test plan

  • RabbitMQ: verify subscription works with a custom QueueOptions.Queue name
  • Gateway: verify GetOriginalStream() returns the correct StreamName

🤖 Generated with Claude Code

RabbitMQ: QueueBind used SubscriptionId instead of the resolved queue
name, breaking message delivery when QueueOptions.Queue was overridden.

Gateway: GetOriginalStream() retrieved the header as System.IO.Stream
instead of StreamName, so it always returned null.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@qodo-free-for-open-source-projects
Copy link
Contributor

Review Summary by Qodo

Fix RabbitMQ queue binding and Gateway stream retrieval

🐞 Bug fix

Grey Divider

Walkthroughs

Description
• Fixed RabbitMQ queue binding using resolved queue name instead of subscription ID
• Fixed Gateway GetOriginalStream() return type from Stream to StreamName
• Corrected logging to display actual queue name in RabbitMQ binding
Diagram
flowchart LR
  A["RabbitMQ QueueBind"] -->|"Use resolved queue name"| B["Correct binding"]
  C["Gateway GetOriginalStream"] -->|"Return StreamName type"| D["Correct stream retrieval"]
Loading

Grey Divider

File Changes

1. src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs 🐞 Bug fix +2/-2

Fix GetOriginalStream return type to StreamName

• Changed GetOriginalStream() return type from Stream? to StreamName?
• Updated header retrieval to use StreamName type instead of Stream
• Fixes null return issue when retrieving original stream name from message headers

src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs


2. src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs 🐞 Bug fix +2/-2

Fix RabbitMQ queue binding to use resolved queue name

• Changed QueueBind() call to use resolved queue variable instead of Options.SubscriptionId
• Updated logging statement to display actual queue name instead of subscription ID
• Fixes message delivery failure when custom queue name is specified via QueueOptions.Queue

src/RabbitMq/src/Eventuous.RabbitMq/Subscriptions/RabbitMqSubscription.cs


Grey Divider

Qodo Logo

@qodo-free-for-open-source-projects
Copy link
Contributor

qodo-free-for-open-source-projects bot commented Feb 27, 2026

Code Review by Qodo

🐞 Bugs (1) 📘 Rule violations (0) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Public API return-type break 🐞 Bug ⛯ Reliability
Description
ProducedMessageExtensions.GetOriginalStream changed its return type from Stream? to StreamName?,
which is a source/binary breaking change for downstream consumers of this library. If this is
released as a patch/minor version, it can break compilation and already-built binaries that
reference the old signature.
Code

src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[R33-34]

+        public StreamName? GetOriginalStream()
+            => message.AdditionalHeaders?.Get<StreamName>(GatewayContextItems.OriginalStream);
Evidence
The extension method is explicitly part of the public surface ([PublicAPI]) and the method signature
now returns StreamName?. The stored header value is a StreamName (from
IMessageConsumeContext.Stream), and Metadata.Get<T> requires an exact type match, which explains why
the old Stream-based signature always returned null—but changing the signature is still a breaking
API change that must be handled via versioning/compat strategy.

src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[30-35]
src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[15-24]
src/Core/src/Eventuous.Subscriptions/Context/IMessageConsumeContext.cs[27-30]
src/Core/src/Eventuous.Shared/Meta/Metadata.cs[24-27]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`ProducedMessageExtensions.GetOriginalStream()` is marked as public API and its return type was changed from `System.IO.Stream?` to `StreamName?`. This is a breaking change for consumers and must be handled explicitly (SemVer major bump or compatibility shim).

## Issue Context
The old implementation was incorrect because the stored value is `StreamName` (from `IBaseConsumeContext.Stream`) and `Metadata.Get&lt;T&gt;` only returns values when the stored type exactly matches `T`.

## Fix Focus Areas
- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[30-35]
- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[15-24]

## Suggested approach
Choose one:
1) **Breaking-change approach (preferred if you can bump major):** keep current signature, and add release notes/CHANGELOG entry.
2) **Compatibility approach (preferred for patch/minor):**
  - Add `StreamName? GetOriginalStreamName()` (new method).
  - Re-introduce old `Stream? GetOriginalStream()` signature, mark `[Obsolete]`, and keep its old behavior (or return `null`) to avoid breaking builds.
  - Update internal docs/examples to use `GetOriginalStreamName()`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Comment on lines +33 to +34
public StreamName? GetOriginalStream()
=> message.AdditionalHeaders?.Get<StreamName>(GatewayContextItems.OriginalStream);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Public api return-type break 🐞 Bug ⛯ Reliability

ProducedMessageExtensions.GetOriginalStream changed its return type from Stream? to StreamName?,
which is a source/binary breaking change for downstream consumers of this library. If this is
released as a patch/minor version, it can break compilation and already-built binaries that
reference the old signature.
Agent Prompt
## Issue description
`ProducedMessageExtensions.GetOriginalStream()` is marked as public API and its return type was changed from `System.IO.Stream?` to `StreamName?`. This is a breaking change for consumers and must be handled explicitly (SemVer major bump or compatibility shim).

## Issue Context
The old implementation was incorrect because the stored value is `StreamName` (from `IBaseConsumeContext.Stream`) and `Metadata.Get<T>` only returns values when the stored type exactly matches `T`.

## Fix Focus Areas
- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[30-35]
- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[15-24]

## Suggested approach
Choose one:
1) **Breaking-change approach (preferred if you can bump major):** keep current signature, and add release notes/CHANGELOG entry.
2) **Compatibility approach (preferred for patch/minor):**
   - Add `StreamName? GetOriginalStreamName()` (new method).
   - Re-introduce old `Stream? GetOriginalStream()` signature, mark `[Obsolete]`, and keep its old behavior (or return `null`) to avoid breaking builds.
   - Update internal docs/examples to use `GetOriginalStreamName()`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

@github-actions
Copy link

github-actions bot commented Feb 27, 2026

Test Results

 57 files   57 suites   34m 10s ⏱️
299 tests 299 ✅ 0 💤 0 ❌
900 runs  900 ✅ 0 💤 0 ❌

Results for commit ba7ba9c.

♻️ This comment has been updated with latest results.

…ream

RabbitMQ: integration test that uses a custom QueueOptions.Queue name
different from SubscriptionId, verifying messages are still delivered.

Gateway: unit tests for ProducedMessageExtensions verifying that
GetOriginalStream returns StreamName (not System.IO.Stream), plus
tests for other header accessors and null-header edge case.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@alexeyzimarev
Copy link
Contributor Author

/review

@qodo-free-for-open-source-projects
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 2 🔵🔵⚪⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Type Compatibility

Changing GetOriginalStream() to return StreamName? fixes the bug, but verify all call sites and any serialized metadata still store GatewayContextItems.OriginalStream as a StreamName (and not a string or other representation). If headers can come from external systems, consider whether a fallback conversion is needed.

public StreamName? GetOriginalStream()
    => message.AdditionalHeaders?.Get<StreamName>(GatewayContextItems.OriginalStream);
Binding Correctness

QueueBind now uses the resolved queue name, which is correct for custom queues. Validate that queue is always the final declared queue name in all configuration paths (including server-named queues if supported) and that the log statement matches the actual binding target.

Log.InfoLog?.Log("Binding exchange {Exchange} to queue {Queue}", exchange, queue);

_channel.QueueBind(
    queue,
    exchange,
    Options.BindingOptions.RoutingKey,
    Options.BindingOptions.Arguments
);
Test Reliability

The test produces an event before the subscription is created/started, which can be flaky unless the exchange/queue binding already exists and RabbitMQ reliably retains it until the consumer comes up. Consider producing after SubscribeWithLog completes (or add an explicit synchronization step) to ensure the test strictly validates binding and delivery.

public async Task SubscribeWithCustomQueueName(CancellationToken cancellationToken) {
    var testEvent = TestEvent.Create();
    await _producer.Produce(_exchange, testEvent, new(), cancellationToken: cancellationToken);
    await _handler.AssertThat().Timebox(10.Seconds()).Any().Match(x => x as TestEvent == testEvent).Validate(cancellationToken);
}

[Before(Test)]
public async ValueTask InitializeAsync() {
    _es       = new();
    _handler  = new();
    _producer = new(_fixture.ConnectionFactory);

    var subscriptionId = Guid.NewGuid().ToString();
    var customQueue    = Guid.NewGuid().ToString();

    _subscription = new(
        _fixture.ConnectionFactory,
        new RabbitMqSubscriptionOptions {
            ConcurrencyLimit = 10,
            SubscriptionId   = subscriptionId,
            Exchange         = _exchange,
            ThrowOnError     = true,
            QueueOptions     = new RabbitMqSubscriptionOptions.RabbitMqQueueOptions { Queue = customQueue }
        },
        new ConsumePipe().AddDefaultConsumer(_handler),
        _loggerFactory
    );
    await _subscription.SubscribeWithLog(_log);
    await _producer.StartAsync();
}
📄 References
  1. No matching references available

@alexeyzimarev alexeyzimarev merged commit 887e0ac into dev Feb 27, 2026
5 checks passed
@alexeyzimarev alexeyzimarev deleted the fix/rabbitmq-queue-bind-and-gateway-stream branch February 27, 2026 10:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant