diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt index a569d08c7ba..810fe0b543c 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt @@ -294,6 +294,23 @@ namespace NServiceBus { public static NServiceBus.IStartableEndpointWithExternallyManagedContainer Create(NServiceBus.EndpointConfiguration configuration, Microsoft.Extensions.DependencyInjection.IServiceCollection serviceCollection) { } } + public interface IMultiEndpointInstance : System.IAsyncDisposable + { + System.Collections.Generic.IReadOnlyCollection Endpoints { get; } + NServiceBus.IEndpointInstance this[string endpointName] { get; } + NServiceBus.IEndpointInstance this[object serviceKey] { get; } + System.Threading.Tasks.Task Stop(System.Threading.CancellationToken cancellationToken = default); + public sealed class EndpointInstanceInfo + { + public string EndpointName { get; } + public object ServiceKey { get; } + public NServiceBus.IEndpointInstance Instance { get; } + } + } + public interface IStartableMultiEndpointWithExternallyManagedContainer + { + System.Threading.Tasks.Task Start(System.IServiceProvider serviceProvider, System.Threading.CancellationToken cancellationToken = default); + } public static class ErrorQueueSettings { public const string SettingsKey = "errorQueue"; @@ -384,6 +401,18 @@ namespace NServiceBus public NServiceBus.HostInfoSettings UsingInstalledFilePath() { } public NServiceBus.HostInfoSettings UsingNames(string instanceName, string hostName) { } } + public static class MultiEndpoint + { + public static NServiceBus.IStartableMultiEndpointWithExternallyManagedContainer Create(Microsoft.Extensions.DependencyInjection.IServiceCollection serviceCollection, System.Action configure) { } + public static System.Threading.Tasks.Task Start(System.Action configure, System.Threading.CancellationToken cancellationToken = default) { } + } + public sealed class MultiEndpointConfiguration + { + public MultiEndpointConfiguration() { } + public NServiceBus.EndpointConfiguration AddEndpoint(string endpointName, System.Action? configure = null) { } + public NServiceBus.EndpointConfiguration AddEndpoint(object serviceKey, string endpointName, System.Action? configure = null) { } + public void AddEndpoint(NServiceBus.EndpointConfiguration configuration, object? serviceKey = null) { } + } public interface IAmStartedByMessages : NServiceBus.IHandleMessages { } public interface ICancellableContext { @@ -2369,4 +2398,4 @@ namespace NServiceBus.Unicast.Transport { public static NServiceBus.Transport.OutgoingMessage Create(NServiceBus.MessageIntent intent) { } } -} \ No newline at end of file +} diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/ErrorLoggingTest.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/ErrorLoggingTest.cs index b51e39cfcbb..0ca061e5d3b 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/ErrorLoggingTest.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/ErrorLoggingTest.cs @@ -3,7 +3,7 @@ using System; using System.IO; using System.Reflection; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; using Particular.Approvals; diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_handler_dll_is_scanned.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_handler_dll_is_scanned.cs index 0581d3d3935..375d21bbfcc 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_handler_dll_is_scanned.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_handler_dll_is_scanned.cs @@ -1,7 +1,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_messages_referencing_core_or_interfaces_is_scanned.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_messages_referencing_core_or_interfaces_is_scanned.cs index a14c2c25ab3..e7cf1732821 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_messages_referencing_core_or_interfaces_is_scanned.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_messages_referencing_core_or_interfaces_is_scanned.cs @@ -2,7 +2,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner; using System.IO; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_no_reference_dlls_is_scanned.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_no_reference_dlls_is_scanned.cs index 748e9670d39..76da6580e28 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_no_reference_dlls_is_scanned.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_no_reference_dlls_is_scanned.cs @@ -2,7 +2,7 @@ using System.IO; using System.Reflection; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_non_dot_net_dll_is_scanned.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_non_dot_net_dll_is_scanned.cs index b211de9a772..b6c48dceb7d 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_non_dot_net_dll_is_scanned.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_directory_with_non_dot_net_dll_is_scanned.cs @@ -2,7 +2,7 @@ using System.IO; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_exclusion_predicate_is_used.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_exclusion_predicate_is_used.cs index 5b6917ce934..6d56f7ca032 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_exclusion_predicate_is_used.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_exclusion_predicate_is_used.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_assemblies_with_circular_dependencies.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_assemblies_with_circular_dependencies.cs index ca19351a205..bed37fc30ed 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_assemblies_with_circular_dependencies.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_assemblies_with_circular_dependencies.cs @@ -2,7 +2,7 @@ using System.IO; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_directory_with_nested_directories.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_directory_with_nested_directories.cs index 14dace35a4b..1013581902d 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_directory_with_nested_directories.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_directory_with_nested_directories.cs @@ -2,7 +2,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner; using System.IO; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_top_level_only.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_top_level_only.cs index 4c9218f0cd7..a18e2130944 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_top_level_only.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_scanning_top_level_only.cs @@ -2,7 +2,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner; using System.IO; using System.Linq; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/AssemblyScanner/When_using_type_forwarding.cs b/src/NServiceBus.Core.Tests/AssemblyScanner/When_using_type_forwarding.cs index 5379f65d114..bf87aab5126 100644 --- a/src/NServiceBus.Core.Tests/AssemblyScanner/When_using_type_forwarding.cs +++ b/src/NServiceBus.Core.Tests/AssemblyScanner/When_using_type_forwarding.cs @@ -4,7 +4,7 @@ namespace NServiceBus.Core.Tests.AssemblyScanner; using System.Linq; using System.Reflection.Metadata; using System.Reflection.PortableExecutable; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/Config/When_scanning_assemblies.cs b/src/NServiceBus.Core.Tests/Config/When_scanning_assemblies.cs index 9559979cd5b..212756d1cf5 100644 --- a/src/NServiceBus.Core.Tests/Config/When_scanning_assemblies.cs +++ b/src/NServiceBus.Core.Tests/Config/When_scanning_assemblies.cs @@ -3,7 +3,7 @@ namespace NServiceBus.Core.Tests.Config; using System.Collections.Generic; using System.Linq; using System.Reflection; -using Hosting.Helpers; +using NServiceBus.Hosting.Helpers; using NUnit.Framework; [TestFixture] diff --git a/src/NServiceBus.Core.Tests/Hosting/MultiEndpointTests.cs b/src/NServiceBus.Core.Tests/Hosting/MultiEndpointTests.cs new file mode 100644 index 00000000000..2b412e85dff --- /dev/null +++ b/src/NServiceBus.Core.Tests/Hosting/MultiEndpointTests.cs @@ -0,0 +1,310 @@ +namespace NServiceBus.Core.Tests.Hosting; + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using NServiceBus; +using NServiceBus.Features; +using NUnit.Framework; + +[TestFixture] +public class MultiEndpointTests +{ + [Test] + public void Create_requires_at_least_one_endpoint() + { + var services = new ServiceCollection(); + + var exception = Assert.Throws(() => MultiEndpoint.Create(services, _ => { })); + + Assert.That(exception!.Message, Does.Contain("At least one endpoint must be configured.")); + } + + [Test] + public void Create_registers_keyed_endpoint_services() + { + var services = new ServiceCollection(); + + MultiEndpoint.Create(services, configuration => + { + var sales = configuration.AddEndpoint("Sales"); + + sales.TypesToScanInternal([]); + sales.UseSerialization(); + sales.UseTransport(new LearningTransport()); + sales.SendOnly(); + + var shipping = configuration.AddEndpoint("Shipping", c => c.TypesToScanInternal([])); + shipping.TypesToScanInternal([]); + shipping.UseSerialization(); + shipping.UseTransport(new LearningTransport()); + shipping.SendOnly(); + }); + + var messageSessions = services.Where(descriptor => descriptor.ServiceType == typeof(IMessageSession)).ToList(); + Assert.That(messageSessions, Has.Count.EqualTo(2)); + Assert.That(messageSessions.Select(d => d.ServiceKey), Is.SupersetOf(new object[] { "Sales", "Shipping" })); + + var endpointInstances = services.Where(descriptor => descriptor.ServiceType == typeof(IEndpointInstance)).ToList(); + Assert.That(endpointInstances, Has.Count.EqualTo(2)); + Assert.That(endpointInstances.Select(d => d.ServiceKey), Is.SupersetOf(new object[] { "Sales", "Shipping" })); + + var lazySessions = services.Where(descriptor => descriptor.ServiceType == typeof(Lazy)).ToList(); + Assert.That(lazySessions, Has.Count.EqualTo(2)); + Assert.That(lazySessions.Select(d => d.ServiceKey), Is.SupersetOf(new object[] { "Sales", "Shipping" })); + } + + [Test] + public async Task DemoTest() + { + var services = new ServiceCollection(); + + var startable = MultiEndpoint.Create(services, configuration => + { + var sales = configuration.AddEndpoint("Sales"); + var salesRecoverability = sales.Recoverability(); + salesRecoverability.Immediate(immediate => immediate.NumberOfRetries(0)); + salesRecoverability.Delayed(immediate => immediate.NumberOfRetries(0)); + sales.TypesToScanInternal([typeof(MySalesCommandHandler), typeof(MyFancyFeature)]); + sales.UseSerialization(); + sales.UseTransport(new LearningTransport()); + sales.EnableFeature(); + + var shipping = configuration.AddEndpoint("Shipping", c => c.TypesToScanInternal([])); + var shippingRecoverability = shipping.Recoverability(); + shippingRecoverability.Immediate(immediate => immediate.NumberOfRetries(0)); + shippingRecoverability.Delayed(immediate => immediate.NumberOfRetries(0)); + shipping.TypesToScanInternal([typeof(MySalesEventHandler), typeof(MyFancyFeature)]); + // notice doesn't enable feature and therefore leads to dependency of MySalesEventHandler not being resolved + shipping.EnableFeature(); + shipping.UseSerialization(); + shipping.UseTransport(new LearningTransport()); + }); + + await using var serviceProvider = services.BuildServiceProvider(); + await startable.Start(serviceProvider); + + var salesSession = serviceProvider.GetKeyedService("Sales"); + var shippingSession = serviceProvider.GetKeyedService("Shipping"); + + await salesSession.SendLocal(new MySalesCommand { Message = "Hello from sales" }); + //await shippingSession.SendLocal(new MySalesCommand { Message = "Should result in no handlers found" }); + await shippingSession.Send("Sales", new MySalesCommand { Message = "Hello from shipping" }); + + await salesSession.Publish(new MySalesEvent()); + + await Task.Delay(TimeSpan.FromSeconds(3)); + } + + class MyFancyFeature : Feature + { + static int instanceCounter; + readonly int instanceCount; + + public MyFancyFeature() => instanceCount = Interlocked.Increment(ref instanceCounter); + + protected internal override void Setup(FeatureConfigurationContext context) + { + int scopeCount = 0; + // this is just to demonstrate arbitrary complexity + // this would not work + // context.Services.AddKeyedSingleton("Hello from MyFancyFeature"+ instanceCount, "hellostring"); + // context.Services.AddScoped(sp => new ScopeDependency(sp.GetRequiredKeyedService("hellostring") + scopeCount++)); + context.Services.AddSingleton(new SingletonDependencyOfScopedDependency("Hello from MyFancyFeature"+ instanceCount)); + context.Services.AddScoped(sp => new ScopeDependency(sp.GetRequiredService().Value + scopeCount++)); + } + } + + public class MySalesCommand : ICommand + { + public string Message { get; set; } + } + + public class MySalesEvent : IEvent; + + class MySalesCommandHandler(ScopeDependency dependency) : IHandleMessages + { + public async Task Handle(MySalesCommand message, IMessageHandlerContext context) + { + await TestContext.Out.WriteLineAsync(message.Message); + await TestContext.Out.WriteLineAsync(dependency.Value); + } + } + + // Bombs if MyFancyFeature is not enabled + class MySalesEventHandler(ScopeDependency dependency) : IHandleMessages + { + public async Task Handle(MySalesEvent message, IMessageHandlerContext context) + { + await TestContext.Out.WriteAsync("Got the my sales event"); + await TestContext.Out.WriteLineAsync(dependency.Value); + } + } + + [Test] + public void Configuration_throws_for_duplicate_endpoint_names() + { + var configuration = new MultiEndpointConfiguration(); + + configuration.AddEndpoint("Sales"); + + var exception = Assert.Throws(() => configuration.AddEndpoint("Sales")); + + Assert.That(exception!.Message, Does.Contain("already been added")); + } + + [Test] + public void Configuration_throws_for_duplicate_service_keys() + { + var configuration = new MultiEndpointConfiguration(); + + configuration.AddEndpoint("custom", "Sales"); + + var exception = Assert.Throws(() => configuration.AddEndpoint("custom", "Shipping")); + + Assert.That(exception!.Message, Does.Contain("unique service key")); + } + + [Test] + public void Keyed_provider_accesses_endpoint_and_shared_services() + { + var services = new ServiceCollection(); + services.AddSingleton("shared"); + + var adapter = new KeyedServiceCollectionAdapter(services, "key"); + adapter.AddSingleton(new EndpointInstanceAccessor()); + + var provider = services.BuildServiceProvider(); + var keyedProvider = new KeyedServiceProviderAdapter(provider, "key", adapter); + + Assert.That(keyedProvider.GetService(typeof(EndpointInstanceAccessor)), Is.Not.Null); + Assert.That(keyedProvider.GetService(typeof(string)), Is.EqualTo("shared")); + } + + [Test] + public void Keyed_provider_gets_keyed_services() + { + var services = new ServiceCollection(); + + var salesAdapter = new KeyedServiceCollectionAdapter(services, "sales"); + salesAdapter.AddSingleton(); + salesAdapter.AddSingleton(); + + var billingAdapter = new KeyedServiceCollectionAdapter(services, "billing"); + billingAdapter.AddSingleton(); + billingAdapter.AddSingleton(); + + var provider = services.BuildServiceProvider(); + var salesKeyedProvider = new KeyedServiceProviderAdapter(provider, "sales", salesAdapter); + var billingKeyedProvider = new KeyedServiceProviderAdapter(provider, "billing", billingAdapter); + + Assert.That(salesKeyedProvider.GetServices().ToList(), Has.Count.EqualTo(2)); + Assert.That(billingKeyedProvider.GetServices().ToList(), Has.Count.EqualTo(2)); + } + + interface IMyService; + + class MyService1 : IMyService; + class MyService2 : IMyService; + + [Test] + public void Implementation_factory_gets_keyed_provider() + { + var services = new ServiceCollection(); + var adapter = new KeyedServiceCollectionAdapter(services, "endpoint"); + + IServiceProvider? capturedProvider = null; + + adapter.AddSingleton(sp => + { + capturedProvider = sp; + return new object(); + }); + + var provider = services.BuildServiceProvider(); + var keyedProvider = new KeyedServiceProviderAdapter(provider, "endpoint", adapter); + + Assert.That(keyedProvider.GetService(typeof(object)), Is.Not.Null); + Assert.That(capturedProvider, Is.TypeOf()); + } + + [Test] + public async Task Implementation_factory_with_scope_gets_keyed_provider() + { + var services = new ServiceCollection(); + + var salesAdapter = new KeyedServiceCollectionAdapter(services, "sales"); + int salesScopeCount = 0; + salesAdapter.AddSingleton(new SingletonDependencyOfScopedDependency("Hello from sales")); + salesAdapter.AddScoped(sp => new ScopeDependency(sp.GetRequiredService().Value + salesScopeCount++)); + + var billingAdapter = new KeyedServiceCollectionAdapter(services, "billing"); + int billingScopeCount = 0; + billingAdapter.AddSingleton(new SingletonDependencyOfScopedDependency("Hello from billing")); + billingAdapter.AddScoped(sp => new ScopeDependency(sp.GetRequiredService().Value + billingScopeCount++)); + + var provider = services.BuildServiceProvider(); + + ScopeDependency salesScope1Dependency; + ScopeDependency salesScope2Dependency; + ScopeDependency billingScope1Dependency; + ScopeDependency billingScope2Dependency; + await using (var scope1 = provider.CreateAsyncScope()) + await using (var scope2 = provider.CreateAsyncScope()) + { + salesScope1Dependency = scope1.ServiceProvider.GetRequiredKeyedService("sales"); + var salesScope1DependencySecondResolve = scope1.ServiceProvider.GetRequiredKeyedService("sales"); + salesScope2Dependency = scope2.ServiceProvider.GetRequiredKeyedService("sales"); + billingScope1Dependency = scope1.ServiceProvider.GetRequiredKeyedService("billing"); + var billingScope1DependencySecondResolve = scope1.ServiceProvider.GetRequiredKeyedService("billing"); + billingScope2Dependency = scope2.ServiceProvider.GetRequiredKeyedService("billing"); + + using (Assert.EnterMultipleScope()) + { + Assert.That(salesScope1Dependency.Disposed, Is.Zero); + Assert.That(salesScope1Dependency.Value, Is.EqualTo("Hello from sales0")); + Assert.That(salesScope1Dependency, Is.EqualTo(salesScope1DependencySecondResolve)); + Assert.That(salesScope1Dependency, Is.Not.EqualTo(salesScope2Dependency)); + } + using (Assert.EnterMultipleScope()) + { + Assert.That(billingScope1Dependency.Disposed, Is.Zero); + Assert.That(billingScope1Dependency.Value, Is.EqualTo("Hello from billing0")); + Assert.That(billingScope1Dependency, Is.EqualTo(billingScope1DependencySecondResolve)); + Assert.That(billingScope1Dependency, Is.Not.EqualTo(billingScope2Dependency)); + } + + using (Assert.EnterMultipleScope()) + { + Assert.That(salesScope2Dependency.Disposed, Is.Zero); + Assert.That(salesScope2Dependency.Value, Is.EqualTo("Hello from sales1")); + } + using (Assert.EnterMultipleScope()) + { + Assert.That(billingScope2Dependency.Disposed, Is.Zero); + Assert.That(billingScope2Dependency.Value, Is.EqualTo("Hello from billing1")); + } + } + + using (Assert.EnterMultipleScope()) + { + Assert.That(salesScope1Dependency.Disposed, Is.EqualTo(1)); + Assert.That(salesScope2Dependency.Disposed, Is.EqualTo(1)); + Assert.That(billingScope1Dependency.Disposed, Is.EqualTo(1)); + Assert.That(billingScope2Dependency.Disposed, Is.EqualTo(1)); + } + } + + sealed class ScopeDependency(string value) : IDisposable + { + public string Value { get; } = value; + + public int Disposed { get; private set; } + + public void Dispose() => Disposed++; + } + + record SingletonDependencyOfScopedDependency(string Value); +} \ No newline at end of file diff --git a/src/NServiceBus.Core/IMultiEndpointInstance.cs b/src/NServiceBus.Core/IMultiEndpointInstance.cs new file mode 100644 index 00000000000..fe99a783c0a --- /dev/null +++ b/src/NServiceBus.Core/IMultiEndpointInstance.cs @@ -0,0 +1,62 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +/// +/// Represents multiple running endpoints that share a single service collection. +/// +public interface IMultiEndpointInstance : IAsyncDisposable +{ + /// + /// Gets metadata about all running endpoints. + /// + IReadOnlyCollection Endpoints { get; } + + /// + /// Gets the running endpoint instance associated with the provided endpoint name. + /// + IEndpointInstance GetByEndpointName(string endpointName); + + /// + /// Gets the running endpoint instance associated with the provided service key. + /// + IEndpointInstance GetByKey(string serviceKey); + + /// + /// Stops all running endpoints. + /// + Task Stop(CancellationToken cancellationToken = default); + + /// + /// Describes a running endpoint instance managed by . + /// + public sealed class EndpointInstanceInfo + { + internal EndpointInstanceInfo(string endpointName, string serviceKey, IEndpointInstance instance) + { + EndpointName = endpointName; + ServiceKey = serviceKey; + Instance = instance; + } + + /// + /// The endpoint name. + /// + public string EndpointName { get; } + + /// + /// The service key associated with the endpoint registrations. + /// + public string ServiceKey { get; } + + /// + /// The running endpoint instance. + /// + public IEndpointInstance Instance { get; } + } +} diff --git a/src/NServiceBus.Core/IStartableMultiEndpointWithExternallyManagedContainer.cs b/src/NServiceBus.Core/IStartableMultiEndpointWithExternallyManagedContainer.cs new file mode 100644 index 00000000000..36dc766a0bc --- /dev/null +++ b/src/NServiceBus.Core/IStartableMultiEndpointWithExternallyManagedContainer.cs @@ -0,0 +1,18 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Threading; +using System.Threading.Tasks; + +/// +/// Represents a multi-endpoint host in the start-up phase where the container is externally managed. +/// +public interface IStartableMultiEndpointWithExternallyManagedContainer +{ + /// + /// Starts all configured endpoints using the provided . + /// + Task Start(IServiceProvider serviceProvider, CancellationToken cancellationToken = default); +} diff --git a/src/NServiceBus.Core/MultiEndpoint/EndpointInstanceAccessor.cs b/src/NServiceBus.Core/MultiEndpoint/EndpointInstanceAccessor.cs new file mode 100644 index 00000000000..22266efcaf5 --- /dev/null +++ b/src/NServiceBus.Core/MultiEndpoint/EndpointInstanceAccessor.cs @@ -0,0 +1,32 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Threading; + +class EndpointInstanceAccessor +{ + public IEndpointInstance Get() + { + var current = Volatile.Read(ref instance); + if (current == null) + { + throw new InvalidOperationException("The endpoint instance is only available after the endpoint has started."); + } + + return current; + } + + public void Set(IEndpointInstance endpointInstance) + { + ArgumentNullException.ThrowIfNull(endpointInstance); + + if (Interlocked.CompareExchange(ref instance, endpointInstance, null) != null) + { + throw new InvalidOperationException("The endpoint instance has already been set."); + } + } + + IEndpointInstance? instance; +} diff --git a/src/NServiceBus.Core/MultiEndpoint/KeyedServiceCollectionAdapter.cs b/src/NServiceBus.Core/MultiEndpoint/KeyedServiceCollectionAdapter.cs new file mode 100644 index 00000000000..af9c3e94121 --- /dev/null +++ b/src/NServiceBus.Core/MultiEndpoint/KeyedServiceCollectionAdapter.cs @@ -0,0 +1,163 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Collections; +using System.Collections.Generic; +using Microsoft.Extensions.DependencyInjection; + +class KeyedServiceCollectionAdapter : IServiceCollection +{ + public KeyedServiceCollectionAdapter(IServiceCollection inner, object serviceKey) + { + ArgumentNullException.ThrowIfNull(inner); + ArgumentNullException.ThrowIfNull(serviceKey); + + this.inner = inner; + this.serviceKey = serviceKey; + } + + public ServiceDescriptor this[int index] + { + get => descriptors[index]; + set => throw new NotSupportedException("Replacing service descriptors is not supported for multi endpoint services."); + } + + public int Count => descriptors.Count; + + public bool IsReadOnly => false; + + public void Add(ServiceDescriptor item) + { + ArgumentNullException.ThrowIfNull(item); + + var keyedDescriptor = EnsureKeyedDescriptor(item); + descriptors.Add(keyedDescriptor); + inner.Add(keyedDescriptor); + } + + public void Clear() + { + foreach (var descriptor in descriptors) + { + _ = inner.Remove(descriptor); + } + + descriptors.Clear(); + serviceTypes.Clear(); + } + + public bool Contains(ServiceDescriptor item) + { + ArgumentNullException.ThrowIfNull(item); + + return descriptors.Contains(item); + } + + public void CopyTo(ServiceDescriptor[] array, int arrayIndex) + { + descriptors.CopyTo(array, arrayIndex); + } + + public IEnumerator GetEnumerator() => descriptors.GetEnumerator(); + + public int IndexOf(ServiceDescriptor item) + { + ArgumentNullException.ThrowIfNull(item); + + return descriptors.IndexOf(item); + } + + public void Insert(int index, ServiceDescriptor item) + { + throw new NotSupportedException("Inserting service descriptors at specific positions is not supported for multi endpoint services."); + } + + public bool Remove(ServiceDescriptor item) + { + ArgumentNullException.ThrowIfNull(item); + + if (!descriptors.Remove(item)) + { + return false; + } + + _ = inner.Remove(item); + _ = serviceTypes.Remove(item.ServiceType); + return true; + } + + public void RemoveAt(int index) + { + var descriptor = descriptors[index]; + descriptors.RemoveAt(index); + _ = inner.Remove(descriptor); + _ = serviceTypes.Remove(descriptor.ServiceType); + } + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + public bool ContainsService(Type serviceType) + { + ArgumentNullException.ThrowIfNull(serviceType); + + if (serviceTypes.Contains(serviceType)) + { + return true; + } + + if (serviceType.IsGenericType) + { + var definition = serviceType.GetGenericTypeDefinition(); + return serviceTypes.Contains(definition); + } + + return false; + } + + ServiceDescriptor EnsureKeyedDescriptor(ServiceDescriptor descriptor) + { + if (descriptor.IsKeyedService) + { + if (!Equals(descriptor.ServiceKey, serviceKey)) + { + throw new InvalidOperationException("Endpoint scoped registrations must use the endpoint service key."); + } + + serviceTypes.Add(descriptor.ServiceType); + return descriptor; + } + + ServiceDescriptor keyedDescriptor; + + if (descriptor.ImplementationInstance is not null) + { + keyedDescriptor = new ServiceDescriptor(descriptor.ServiceType, serviceKey, descriptor.ImplementationInstance); + } + else if (descriptor.ImplementationFactory is not null) + { + keyedDescriptor = new ServiceDescriptor(descriptor.ServiceType, serviceKey, (serviceProvider, key) => + { + var keyedProvider = new KeyedServiceProviderAdapter(serviceProvider, key ?? serviceKey, this); + return descriptor.ImplementationFactory!(keyedProvider); + }, descriptor.Lifetime); + } + else if (descriptor.ImplementationType is not null) + { + keyedDescriptor = new ServiceDescriptor(descriptor.ServiceType, serviceKey, descriptor.ImplementationType, descriptor.Lifetime); + } + else + { + throw new InvalidOperationException($"Unsupported service descriptor configuration for service type '{descriptor.ServiceType}'."); + } + + serviceTypes.Add(keyedDescriptor.ServiceType); + return keyedDescriptor; + } + + readonly IServiceCollection inner; + readonly object serviceKey; + readonly List descriptors = []; + readonly HashSet serviceTypes = []; +} \ No newline at end of file diff --git a/src/NServiceBus.Core/MultiEndpoint/KeyedServiceProviderAdapter.cs b/src/NServiceBus.Core/MultiEndpoint/KeyedServiceProviderAdapter.cs new file mode 100644 index 00000000000..f38609864b9 --- /dev/null +++ b/src/NServiceBus.Core/MultiEndpoint/KeyedServiceProviderAdapter.cs @@ -0,0 +1,60 @@ +#nullable enable +namespace NServiceBus; + +using System; +using System.Linq; +using Microsoft.Extensions.DependencyInjection; + +class KeyedServiceProviderAdapter : IServiceProvider +{ + public KeyedServiceProviderAdapter(IServiceProvider inner, object serviceKey, KeyedServiceCollectionAdapter serviceCollection) + { + ArgumentNullException.ThrowIfNull(inner); + ArgumentNullException.ThrowIfNull(serviceKey); + ArgumentNullException.ThrowIfNull(serviceCollection); + + this.inner = inner; + this.serviceKey = serviceKey; + this.serviceCollection = serviceCollection; + } + + public object? GetService(Type serviceType) + { + ArgumentNullException.ThrowIfNull(serviceType); + + if (serviceType == typeof(IServiceScopeFactory)) + { + var scopeFactory = inner.GetService(); + if (scopeFactory != null) + { + return new KeyedServiceScopeFactory(scopeFactory, serviceKey, serviceCollection); + } + } + + if (serviceType.IsGenericType && + serviceType.GetGenericTypeDefinition() == typeof(System.Collections.Generic.IEnumerable<>)) + { + var itemType = serviceType.GetGenericArguments()[0]; + + var keyedServices = inner.GetKeyedServices(itemType, serviceKey); + if (keyedServices.Any() || serviceCollection.ContainsService(serviceType)) + { + return keyedServices; + } + + return inner.GetServices(serviceType); + } + + var keyed = inner.GetKeyedService(serviceType, serviceKey); + if (keyed != null || serviceCollection.ContainsService(serviceType)) + { + return keyed; + } + + return inner.GetService(serviceType); + } + + readonly IServiceProvider inner; + readonly object serviceKey; + readonly KeyedServiceCollectionAdapter serviceCollection; +} \ No newline at end of file diff --git a/src/NServiceBus.Core/MultiEndpoint/KeyedServiceScopeFactory.cs b/src/NServiceBus.Core/MultiEndpoint/KeyedServiceScopeFactory.cs new file mode 100644 index 00000000000..eced6ad14b9 --- /dev/null +++ b/src/NServiceBus.Core/MultiEndpoint/KeyedServiceScopeFactory.cs @@ -0,0 +1,43 @@ +#nullable enable +namespace NServiceBus; + +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; + +class KeyedServiceScopeFactory(IServiceScopeFactory innerFactory, object serviceKey, KeyedServiceCollectionAdapter serviceCollection) : IServiceScopeFactory +{ + public IServiceScope CreateScope() + { + var innerScope = innerFactory.CreateScope(); + return new KeyedServiceScope(innerScope, serviceKey, serviceCollection); + } + + class KeyedServiceScope : IServiceScope, IAsyncDisposable + { + public KeyedServiceScope(IServiceScope innerScope, object serviceKey, KeyedServiceCollectionAdapter serviceCollection) + { + ArgumentNullException.ThrowIfNull(innerScope); + + this.innerScope = innerScope; + ServiceProvider = new KeyedServiceProviderAdapter(innerScope.ServiceProvider, serviceKey, serviceCollection); + } + + public IServiceProvider ServiceProvider { get; } + + public void Dispose() => innerScope.Dispose(); + + public ValueTask DisposeAsync() + { + if (innerScope is IAsyncDisposable asyncDisposable) + { + return asyncDisposable.DisposeAsync(); + } + + innerScope.Dispose(); + return ValueTask.CompletedTask; + } + + readonly IServiceScope innerScope; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/MultiEndpoint/MultiEndpoint.cs b/src/NServiceBus.Core/MultiEndpoint/MultiEndpoint.cs new file mode 100644 index 00000000000..38c30ea165f --- /dev/null +++ b/src/NServiceBus.Core/MultiEndpoint/MultiEndpoint.cs @@ -0,0 +1,243 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; + +/// +/// Entry point for configuring and starting multiple endpoints that share a single service collection. +/// +public static class MultiEndpoint +{ + /// + /// Configures multiple endpoints on the provided and returns a startable host that assumes the service provider will be externally managed. + /// + public static IStartableMultiEndpointWithExternallyManagedContainer Create(IServiceCollection serviceCollection, Action configure) + { + ArgumentNullException.ThrowIfNull(serviceCollection); + ArgumentNullException.ThrowIfNull(configure); + + var configuration = new MultiEndpointConfiguration(); + configure(configuration); + + var definitions = configuration.Build(); + if (definitions.Count == 0) + { + throw new InvalidOperationException("At least one endpoint must be configured."); + } + + var endpointHosts = new List(definitions.Count); + + foreach (var definition in definitions) + { + var keyedServices = new KeyedServiceCollectionAdapter(serviceCollection, definition.ServiceKey); + var accessor = new EndpointInstanceAccessor(); + + keyedServices.AddSingleton(accessor); + keyedServices.AddSingleton(_ => new Lazy(() => accessor.Get())); + keyedServices.AddSingleton(_ => accessor.Get()); + keyedServices.AddSingleton(_ => accessor.Get()); + + var endpointCreator = EndpointCreator.Create(definition.Configuration, keyedServices); + endpointHosts.Add(new EndpointHost(definition.EndpointName, definition.ServiceKey, endpointCreator, keyedServices, accessor)); + } + + return new MultiEndpointHost(endpointHosts); + } + + /// + /// Configures, builds, and starts endpoints on a new managed by the caller. + /// + public static async Task Start(Action configure, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(configure); + + var services = new ServiceCollection(); + var startable = Create(services, configure); + var serviceProvider = services.BuildServiceProvider(); + + var instance = await startable.Start(serviceProvider, cancellationToken).ConfigureAwait(false); + return new OwnedServiceProviderMultiEndpointInstance(instance, serviceProvider); + } + + sealed class EndpointHost(string endpointName, string serviceKey, EndpointCreator creator, KeyedServiceCollectionAdapter services, EndpointInstanceAccessor accessor) + { + public string EndpointName { get; } = endpointName; + public string ServiceKey { get; } = serviceKey; + public EndpointCreator EndpointCreator { get; } = creator; + public KeyedServiceCollectionAdapter Services { get; } = services; + public EndpointInstanceAccessor Accessor { get; } = accessor; + } + + sealed class MultiEndpointHost(IReadOnlyCollection endpointHosts) : IStartableMultiEndpointWithExternallyManagedContainer + { + public async Task Start(IServiceProvider serviceProvider, CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(serviceProvider); + + var startedEndpoints = new List(endpointHosts.Count); + + try + { + foreach (var endpoint in endpointHosts) + { + var keyedProvider = new KeyedServiceProviderAdapter(serviceProvider, endpoint.ServiceKey, endpoint.Services); + var startable = endpoint.EndpointCreator.CreateStartableEndpoint(keyedProvider, serviceProviderIsExternallyManaged: true); + + await startable.RunInstallers(cancellationToken).ConfigureAwait(false); + await startable.Setup(cancellationToken).ConfigureAwait(false); + var runningInstance = await startable.Start(cancellationToken).ConfigureAwait(false); + + endpoint.Accessor.Set(runningInstance); + startedEndpoints.Add(new IMultiEndpointInstance.EndpointInstanceInfo(endpoint.EndpointName, endpoint.ServiceKey, runningInstance)); + } + } + catch + { + await StopEndpoints(startedEndpoints, CancellationToken.None).ConfigureAwait(false); + throw; + } + + return new MultiEndpointInstance(startedEndpoints); + } + + static async Task StopEndpoints(List endpoints, CancellationToken cancellationToken) + { + for (var index = endpoints.Count - 1; index >= 0; index--) + { + try + { + await endpoints[index].Instance.Stop(cancellationToken).ConfigureAwait(false); + } + catch + { + // Best effort stop when recovering from startup failures. + } + } + } + } + + sealed class MultiEndpointInstance : IMultiEndpointInstance + { + public MultiEndpointInstance(IReadOnlyList endpoints) + { + endpointList = endpoints; + foreach (var endpoint in endpoints) + { + endpointsByName.Add(endpoint.EndpointName, endpoint); + endpointsByServiceKey.Add(endpoint.ServiceKey, endpoint); + } + } + + public IReadOnlyCollection Endpoints => endpointList; + public IEndpointInstance GetByEndpointName(string endpointName) + { + ArgumentNullException.ThrowIfNull(endpointName); + return endpointsByName[endpointName].Instance; + } + + public IEndpointInstance GetByKey(string serviceKey) + { + ArgumentNullException.ThrowIfNull(serviceKey); + return endpointsByServiceKey[serviceKey].Instance; + } + + public async Task Stop(CancellationToken cancellationToken = default) + { + if (Interlocked.Exchange(ref stopped, 1) != 0) + { + return; + } + + List? exceptions = null; + + for (var index = endpointList.Count - 1; index >= 0; index--) + { + var endpoint = endpointList[index]; + + try + { + await endpoint.Instance.Stop(cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + exceptions ??= []; + exceptions.Add(ex); + } + } + + if (exceptions != null) + { + throw new AggregateException("One or more endpoints failed to stop.", exceptions); + } + } + + public async ValueTask DisposeAsync() + { + await Stop().ConfigureAwait(false); + } + + readonly Dictionary endpointsByName = new(StringComparer.OrdinalIgnoreCase); + readonly Dictionary endpointsByServiceKey = []; + readonly IReadOnlyList endpointList; + int stopped; + } + + sealed class OwnedServiceProviderMultiEndpointInstance(IMultiEndpointInstance inner, IServiceProvider serviceProvider) + : IMultiEndpointInstance + { + public IReadOnlyCollection Endpoints => inner.Endpoints; + public IEndpointInstance GetByEndpointName(string endpointName) => inner.GetByEndpointName(endpointName); + + public IEndpointInstance GetByKey(string serviceKey) => inner.GetByKey(serviceKey); + + public async Task Stop(CancellationToken cancellationToken = default) + { + if (Interlocked.Exchange(ref disposed, 1) != 0) + { + return; + } + + Exception? stopException = null; + + try + { + await inner.Stop(cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + stopException = ex; + } + + await DisposeServiceProviderAsync().ConfigureAwait(false); + + if (stopException != null) + { + throw stopException; + } + } + + public async ValueTask DisposeAsync() + { + await Stop().ConfigureAwait(false); + } + + async Task DisposeServiceProviderAsync() + { + if (serviceProvider is IAsyncDisposable asyncDisposable) + { + await asyncDisposable.DisposeAsync().ConfigureAwait(false); + } + else if (serviceProvider is IDisposable disposable) + { + disposable.Dispose(); + } + } + + int disposed; + } +} diff --git a/src/NServiceBus.Core/MultiEndpoint/MultiEndpointConfiguration.cs b/src/NServiceBus.Core/MultiEndpoint/MultiEndpointConfiguration.cs new file mode 100644 index 00000000000..7bf93da4d83 --- /dev/null +++ b/src/NServiceBus.Core/MultiEndpoint/MultiEndpointConfiguration.cs @@ -0,0 +1,79 @@ +#nullable enable + +namespace NServiceBus; + +using System; +using System.Collections.Generic; + +/// +/// Configuration builder for hosting multiple endpoints within the same service collection. +/// +public sealed class MultiEndpointConfiguration +{ + /// + /// Adds a new endpoint to the multi-endpoint host and returns its configuration object for further customization. + /// + /// The name of the endpoint. + /// Optional delegate used to customize the endpoint configuration. + public EndpointConfiguration AddEndpoint(string endpointName, Action? configure = null) => + AddEndpoint(endpointName, endpointName, configure); + + /// + /// Adds a new endpoint associated with the provided service key. + /// + /// The service key used to register endpoint specific services in the service collection. + /// The name of the endpoint. + /// Optional delegate used to customize the endpoint configuration. + public EndpointConfiguration AddEndpoint(string serviceKey, string endpointName, Action? configure = null) + { + ArgumentNullException.ThrowIfNull(serviceKey); + ArgumentNullException.ThrowIfNull(endpointName); + + var configuration = new EndpointConfiguration(endpointName); + configure?.Invoke(configuration); + AddEndpointInternal(configuration, serviceKey); + return configuration; + } + + /// + /// Adds an existing to the multi-endpoint host. + /// + /// The endpoint configuration to add. + /// Optional service key used to register endpoint specific services. If not provided the endpoint name is used. + public void AddEndpoint(EndpointConfiguration configuration, string? serviceKey = null) + { + ArgumentNullException.ThrowIfNull(configuration); + + var endpointName = configuration.Settings.EndpointName(); + var resolvedKey = serviceKey ?? endpointName; + AddEndpointInternal(configuration, resolvedKey); + } + + internal IReadOnlyCollection Build() => endpointDefinitions.AsReadOnly(); + + void AddEndpointInternal(EndpointConfiguration configuration, string serviceKey) + { + ArgumentNullException.ThrowIfNull(configuration); + ArgumentNullException.ThrowIfNull(serviceKey); + + var endpointName = configuration.Settings.EndpointName(); + + if (!endpointNames.Add(endpointName)) + { + throw new InvalidOperationException($"An endpoint named '{endpointName}' has already been added."); + } + + if (!serviceKeys.Add(serviceKey)) + { + throw new InvalidOperationException("Each endpoint requires a unique service key."); + } + + endpointDefinitions.Add(new EndpointDefinition(endpointName, serviceKey, configuration)); + } + + readonly List endpointDefinitions = []; + readonly HashSet endpointNames = new(StringComparer.OrdinalIgnoreCase); + readonly HashSet serviceKeys = []; + + internal readonly record struct EndpointDefinition(string EndpointName, string ServiceKey, EndpointConfiguration Configuration); +} diff --git a/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs b/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs index 69cf839d750..544b789a507 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/LoadHandlersConnector.cs @@ -48,7 +48,11 @@ public override async Task Invoke(IIncomingLogicalMessageContext context, Func