Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public static Metadata GetContextMeta(IMessageConsumeContext context) {
[PublicAPI]
public static class ProducedMessageExtensions {
extension(ProducedMessage message) {
public Stream? GetOriginalStream()
=> message.AdditionalHeaders?.Get<Stream>(GatewayContextItems.OriginalStream);
public StreamName? GetOriginalStream()
=> message.AdditionalHeaders?.Get<StreamName>(GatewayContextItems.OriginalStream);
Comment on lines +33 to +34

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Public api return-type break 🐞 Bug ⛯ Reliability

ProducedMessageExtensions.GetOriginalStream changed its return type from Stream? to StreamName?,
which is a source/binary breaking change for downstream consumers of this library. If this is
released as a patch/minor version, it can break compilation and already-built binaries that
reference the old signature.
Agent Prompt
## Issue description
`ProducedMessageExtensions.GetOriginalStream()` is marked as public API and its return type was changed from `System.IO.Stream?` to `StreamName?`. This is a breaking change for consumers and must be handled explicitly (SemVer major bump or compatibility shim).

## Issue Context
The old implementation was incorrect because the stored value is `StreamName` (from `IBaseConsumeContext.Stream`) and `Metadata.Get<T>` only returns values when the stored type exactly matches `T`.

## Fix Focus Areas
- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[30-35]
- src/Gateway/src/Eventuous.Gateway/GatewayMetaHelper.cs[15-24]

## Suggested approach
Choose one:
1) **Breaking-change approach (preferred if you can bump major):** keep current signature, and add release notes/CHANGELOG entry.
2) **Compatibility approach (preferred for patch/minor):**
   - Add `StreamName? GetOriginalStreamName()` (new method).
   - Re-introduce old `Stream? GetOriginalStream()` signature, mark `[Obsolete]`, and keep its old behavior (or return `null`) to avoid breaking builds.
   - Update internal docs/examples to use `GetOriginalStreamName()`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


public object? GetOriginalMessage()
=> message.AdditionalHeaders?.Get<object>(GatewayContextItems.OriginalMessage);
Expand Down
64 changes: 64 additions & 0 deletions src/Gateway/test/Eventuous.Tests.Gateway/GatewayMetaTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using Eventuous.Gateway;
using Eventuous.Producers;

namespace Eventuous.Tests.Gateway;

public class GatewayMetaTests {
[Test]
public async Task GetOriginalStream_ReturnsStreamName() {
var streamName = new StreamName("Test-123");

var headers = new Metadata(new Dictionary<string, object?> {
[GatewayContextItems.OriginalStream] = streamName
});

var message = new ProducedMessage("test", null, headers);

var result = message.GetOriginalStream();

await Assert.That(result).IsNotNull();
await Assert.That(result!.Value).IsEqualTo(streamName);
}

[Test]
public async Task GetOriginalMessageId_ReturnsMessageId() {
var messageId = Guid.NewGuid().ToString();

var headers = new Metadata(new Dictionary<string, object?> {
[GatewayContextItems.OriginalMessageId] = messageId
});

var message = new ProducedMessage("test", null, headers);

await Assert.That(message.GetOriginalMessageId()).IsEqualTo(messageId);
}

[Test]
public async Task GetOriginalMessageType_ReturnsMessageType() {
var headers = new Metadata(new Dictionary<string, object?> {
[GatewayContextItems.OriginalMessageType] = "test-event"
});

var message = new ProducedMessage("test", null, headers);

await Assert.That(message.GetOriginalMessageType()).IsEqualTo("test-event");
}

[Test]
public async Task GetOriginalStreamPosition_ReturnsPosition() {
var headers = new Metadata(new Dictionary<string, object?> {
[GatewayContextItems.OriginalStreamPosition] = 42UL
});

var message = new ProducedMessage("test", null, headers);

await Assert.That(message.GetOriginalStreamPosition()).IsEqualTo(42UL);
}

[Test]
public async Task GetOriginalStream_WithNullHeaders_ReturnsNull() {
var message = new ProducedMessage("test", null);

await Assert.That(message.GetOriginalStream()).IsNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ protected override ValueTask Subscribe(CancellationToken cancellationToken) {
Options.QueueOptions.Arguments
);

Log.InfoLog?.Log("Binding exchange {Exchange} to queue {Queue}", exchange, Options.SubscriptionId);
Log.InfoLog?.Log("Binding exchange {Exchange} to queue {Queue}", exchange, queue);

_channel.QueueBind(
Options.SubscriptionId,
queue,
exchange,
Options.BindingOptions.RoutingKey,
Options.BindingOptions.Arguments
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using Eventuous.Producers;
using Eventuous.RabbitMq.Producers;
using Eventuous.RabbitMq.Subscriptions;
using Eventuous.Subscriptions.Filters;
using Eventuous.TestHelpers.TUnit;
using Eventuous.TestHelpers.TUnit.Logging;
using Eventuous.Tests.Subscriptions.Base;

namespace Eventuous.Tests.RabbitMq;

[ClassDataSource<RabbitMqFixture>]
public class CustomQueueSubscriptionSpec {
static CustomQueueSubscriptionSpec() => TypeMap.Instance.RegisterKnownEventTypes(typeof(TestEvent).Assembly);

RabbitMqProducer _producer = null!;
TestEventHandler _handler = null!;
#pragma warning disable TUnit0023
RabbitMqSubscription _subscription = null!;
TestEventListener _es = null!;
#pragma warning restore TUnit0023
readonly StreamName _exchange;
readonly ILogger<CustomQueueSubscriptionSpec> _log;
readonly ILoggerFactory _loggerFactory;
readonly RabbitMqFixture _fixture;

public CustomQueueSubscriptionSpec(RabbitMqFixture fixture) {
_fixture = fixture;
_exchange = new(Guid.NewGuid().ToString());
_loggerFactory = LoggingExtensions.GetLoggerFactory();
_log = _loggerFactory.CreateLogger<CustomQueueSubscriptionSpec>();
}

[Test]
public async Task SubscribeWithCustomQueueName(CancellationToken cancellationToken) {
var testEvent = TestEvent.Create();
await _producer.Produce(_exchange, testEvent, new(), cancellationToken: cancellationToken);
await _handler.AssertThat().Timebox(10.Seconds()).Any().Match(x => x as TestEvent == testEvent).Validate(cancellationToken);
}

[Before(Test)]
public async ValueTask InitializeAsync() {
_es = new();
_handler = new();
_producer = new(_fixture.ConnectionFactory);

var subscriptionId = Guid.NewGuid().ToString();
var customQueue = Guid.NewGuid().ToString();

_subscription = new(
_fixture.ConnectionFactory,
new RabbitMqSubscriptionOptions {
ConcurrencyLimit = 10,
SubscriptionId = subscriptionId,
Exchange = _exchange,
ThrowOnError = true,
QueueOptions = new RabbitMqSubscriptionOptions.RabbitMqQueueOptions { Queue = customQueue }
},
new ConsumePipe().AddDefaultConsumer(_handler),
_loggerFactory
);
await _subscription.SubscribeWithLog(_log);
await _producer.StartAsync();
}

[After(Test)]
public async ValueTask DisposeAsync() {
await _producer.StopAsync();
await _subscription.UnsubscribeWithLog(_log);
_es.Dispose();
await _subscription.DisposeAsync();
}
}
Loading