From 42e313d888a32f2ea6a9cdd5c86cc7af40dbabc4 Mon Sep 17 00:00:00 2001 From: Paul Reardon Date: Tue, 23 Nov 2021 11:50:21 +0000 Subject: [PATCH] Ensured a new CommandProcessorProvider for each Message Pump (#1866) * Ensured a new CommandProcessorProvider for each Message Pump * Downgrade Microsoft.Data.SqlClient to 3.0.1 * Plumbed the CommandProcessor Factory down to the creation of each Message Pump --- .../GreetingsSender.Web/Program.cs | 15 ++++++++--- .../ASBTaskQueue/GreetingsWorker/Program.cs | 1 + .../Paramore.Brighter.Inbox.MsSql.csproj | 2 +- ...ore.Brighter.MessagingGateway.MsSql.csproj | 2 +- .../Paramore.Brighter.MsSql.csproj | 2 +- .../Paramore.Brighter.Outbox.MsSql.csproj | 2 +- .../ServiceCollectionExtensions.cs | 26 ++++++++++--------- .../ControlBusReceiverBuilder.cs | 6 ++--- .../DispatchBuilder.cs | 23 ++++++++-------- .../Dispatcher.cs | 14 +++++----- .../When_building_a_dispatcher.cs | 2 +- ...uilding_a_dispatcher_with_named_gateway.cs | 2 +- 12 files changed, 54 insertions(+), 43 deletions(-) diff --git a/samples/ASBTaskQueue/GreetingsSender.Web/Program.cs b/samples/ASBTaskQueue/GreetingsSender.Web/Program.cs index 29e989cb27..aa197cc307 100644 --- a/samples/ASBTaskQueue/GreetingsSender.Web/Program.cs +++ b/samples/ASBTaskQueue/GreetingsSender.Web/Program.cs @@ -1,4 +1,7 @@ using Greetings.Adaptors.Data; +using Greetings.Adaptors.Services; +using Greetings.Ports.CommandHandlers; +using Greetings.Ports.Commands; using Microsoft.AspNetCore.Builder; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; @@ -11,10 +14,11 @@ using Paramore.Brighter; using Greetings.Ports.Events; using Greetings.Ports.Mappers; +using Paramore.Brighter.MessagingGateway.AzureServiceBus.ClientProvider; var builder = WebApplication.CreateBuilder(args); -string dbConnString = "server=(localdb)\\mssqllocaldb;database=BrighterTests;trusted_connection=yes"; +string dbConnString = "Server=127.0.0.1,11433;Database=BrighterTests;User Id=sa;Password=Password1!;Application Name=BrighterTests;MultipleActiveResultSets=True"; //EF builder.Services.AddDbContext(o => @@ -22,10 +26,14 @@ o.UseSqlServer(dbConnString); }); +//Services + +builder.Services.AddScoped(); + //Brighter -string asbConnectionString = "Endpoint=sb://.servicebus.windows.net/;Authentication=Managed Identity"; +string asbEndpoint = ".servicebus.windows.net"; -var asbConnection = new AzureServiceBusConfiguration(asbConnectionString, true); +var asbConnection = new ServiceBusVisualStudioCredentialClientProvider(asbEndpoint); var producer = AzureServiceBusMessageProducerFactory.Get(asbConnection); var outboxConfig = new MsSqlConfiguration(dbConnString, "BrighterOutbox"); @@ -43,6 +51,7 @@ { r.Add(typeof(GreetingEvent), typeof(GreetingEventMessageMapper)); r.Add(typeof(GreetingAsyncEvent), typeof(GreetingEventAsyncMessageMapper)); + r.Add(typeof(AddGreetingCommand), typeof(AddGreetingMessageMapper)); }); diff --git a/samples/ASBTaskQueue/GreetingsWorker/Program.cs b/samples/ASBTaskQueue/GreetingsWorker/Program.cs index 002795c232..1c60402cb9 100644 --- a/samples/ASBTaskQueue/GreetingsWorker/Program.cs +++ b/samples/ASBTaskQueue/GreetingsWorker/Program.cs @@ -89,6 +89,7 @@ public async static Task Main(string[] args) options.Subscriptions = subscriptions; options.ChannelFactory = new AzureServiceBusChannelFactory(asbConsumerFactory); options.UseScoped = true; + }).UseMsSqlOutbox(outboxConfig, typeof(MsSqlSqlAuthConnectionProvider)) .UseMsSqlTransactionConnectionProvider(typeof(MsSqlEntityFrameworkCoreConnectionProvider)) .UseExternalBus(AzureServiceBusMessageProducerFactory.Get(clientProvider)) diff --git a/src/Paramore.Brighter.Inbox.MsSql/Paramore.Brighter.Inbox.MsSql.csproj b/src/Paramore.Brighter.Inbox.MsSql/Paramore.Brighter.Inbox.MsSql.csproj index 3d4856f5db..e1078f9c58 100644 --- a/src/Paramore.Brighter.Inbox.MsSql/Paramore.Brighter.Inbox.MsSql.csproj +++ b/src/Paramore.Brighter.Inbox.MsSql/Paramore.Brighter.Inbox.MsSql.csproj @@ -17,7 +17,7 @@ - + \ No newline at end of file diff --git a/src/Paramore.Brighter.MessagingGateway.MsSql/Paramore.Brighter.MessagingGateway.MsSql.csproj b/src/Paramore.Brighter.MessagingGateway.MsSql/Paramore.Brighter.MessagingGateway.MsSql.csproj index 964b8fabc6..39dfa1ac82 100644 --- a/src/Paramore.Brighter.MessagingGateway.MsSql/Paramore.Brighter.MessagingGateway.MsSql.csproj +++ b/src/Paramore.Brighter.MessagingGateway.MsSql/Paramore.Brighter.MessagingGateway.MsSql.csproj @@ -11,7 +11,7 @@ - + \ No newline at end of file diff --git a/src/Paramore.Brighter.MsSql/Paramore.Brighter.MsSql.csproj b/src/Paramore.Brighter.MsSql/Paramore.Brighter.MsSql.csproj index 1d05a9597b..9c75f7fb2d 100644 --- a/src/Paramore.Brighter.MsSql/Paramore.Brighter.MsSql.csproj +++ b/src/Paramore.Brighter.MsSql/Paramore.Brighter.MsSql.csproj @@ -8,7 +8,7 @@ - + diff --git a/src/Paramore.Brighter.Outbox.MsSql/Paramore.Brighter.Outbox.MsSql.csproj b/src/Paramore.Brighter.Outbox.MsSql/Paramore.Brighter.Outbox.MsSql.csproj index b8fbf6dbaf..e013deab9a 100644 --- a/src/Paramore.Brighter.Outbox.MsSql/Paramore.Brighter.Outbox.MsSql.csproj +++ b/src/Paramore.Brighter.Outbox.MsSql/Paramore.Brighter.Outbox.MsSql.csproj @@ -18,7 +18,7 @@ - + \ No newline at end of file diff --git a/src/Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection/ServiceCollectionExtensions.cs index 694f32bbb8..7c86632f09 100644 --- a/src/Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection/ServiceCollectionExtensions.cs @@ -1,4 +1,4 @@ -using System; +using System; using Microsoft.Extensions.DependencyInjection; using Paramore.Brighter.Extensions.DependencyInjection; @@ -33,24 +33,26 @@ public static IBrighterBuilder AddServiceActivator( services.AddSingleton(BuildDispatcher); - if (options.UseScoped) - { - services.AddTransient(); - } - else - { - services.AddTransient(); - } - return ServiceCollectionExtensions.BrighterHandlerBuilder(services, options); } private static Dispatcher BuildDispatcher(IServiceProvider serviceProvider) { - var commandProcessorProvider = serviceProvider.GetService(); var options = serviceProvider.GetService(); - var dispatcherBuilder = DispatchBuilder.With().CommandProcessorProvider(commandProcessorProvider); + Func providerFactory; + + if (options.UseScoped) + { + providerFactory = () => new ScopedCommandProcessorProvider(serviceProvider); + } + else + { + var commandProcessor = serviceProvider.GetService(); + providerFactory = () => new CommandProcessorProvider(commandProcessor); + } + + var dispatcherBuilder = DispatchBuilder.With().CommandProcessorFactory(providerFactory); var messageMapperRegistry = ServiceCollectionExtensions.MessageMapperRegistry(serviceProvider); diff --git a/src/Paramore.Brighter.ServiceActivator/ControlBusReceiverBuilder.cs b/src/Paramore.Brighter.ServiceActivator/ControlBusReceiverBuilder.cs index af1a797317..68159c2e45 100644 --- a/src/Paramore.Brighter.ServiceActivator/ControlBusReceiverBuilder.cs +++ b/src/Paramore.Brighter.ServiceActivator/ControlBusReceiverBuilder.cs @@ -161,9 +161,7 @@ public Dispatcher Build(string hostName) .ExternalBus(new MessagingConfiguration(producer, outgoingMessageMapperRegistry), outbox) .RequestContextFactory(new InMemoryRequestContextFactory()) .Build(); - - var commandProcessorProvider = new CommandProcessorProvider(commandProcessor); - + // These are the control bus channels, we hardcode them because we want to know they exist, but we use // a base naming scheme to allow centralized management. var connectionsConfiguration = new Subscription[] @@ -179,7 +177,7 @@ public Dispatcher Build(string hostName) }; return DispatchBuilder.With() - .CommandProcessorProvider(commandProcessorProvider) + .CommandProcessorFactory(() => new CommandProcessorProvider(commandProcessor)) .MessageMappers(incomingMessageMapperRegistry) .DefaultChannelFactory(_channelFactory) .Connections(connectionsConfiguration) diff --git a/src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs b/src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs index 2ae6510bf1..9fce8735c6 100644 --- a/src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs +++ b/src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs @@ -1,4 +1,4 @@ -#region Licence +#region Licence /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -22,6 +22,7 @@ THE SOFTWARE. */ #endregion +using System; using System.Collections.Generic; using System.Linq; @@ -33,9 +34,9 @@ namespace Paramore.Brighter.ServiceActivator /// progressive interfaces to manage the requirements for a complete Dispatcher via Intellisense in the IDE. The intent is to make it easier to /// recognize those dependencies that you need to configure /// - public class DispatchBuilder : INeedACommandProcessorProvider, INeedAChannelFactory, INeedAMessageMapper, INeedAListOfConnections, IAmADispatchBuilder + public class DispatchBuilder : INeedACommandProcessorFactory, INeedAChannelFactory, INeedAMessageMapper, INeedAListOfConnections, IAmADispatchBuilder { - private IAmACommandProcessorProvider _commandProcessorProvider; + private Func _commandProcessorFactory; private IAmAMessageMapperRegistry _messageMapperRegistry; private IAmAChannelFactory _defaultChannelFactory; private IEnumerable _connections; @@ -46,7 +47,7 @@ private DispatchBuilder() { } /// Begins the fluent interface /// /// INeedALogger. - public static INeedACommandProcessorProvider With() + public static INeedACommandProcessorFactory With() { return new DispatchBuilder(); } @@ -54,11 +55,11 @@ public static INeedACommandProcessorProvider With() /// /// The command processor used to send and publish messages to handlers by the service activator. /// - /// The command processor provider. + /// The command processor Factory. /// INeedAMessageMapper. - public INeedAMessageMapper CommandProcessorProvider(IAmACommandProcessorProvider theCommandProcessorProvider) + public INeedAMessageMapper CommandProcessorFactory(Func commandProcessorFactory) { - _commandProcessorProvider = theCommandProcessorProvider; + _commandProcessorFactory = commandProcessorFactory; return this; } @@ -109,7 +110,7 @@ public IAmADispatchBuilder Connections(IEnumerable connections) /// Dispatcher. public Dispatcher Build() { - return new Dispatcher(_commandProcessorProvider, _messageMapperRegistry, _connections); + return new Dispatcher(_commandProcessorFactory, _messageMapperRegistry, _connections); } } @@ -118,14 +119,14 @@ public Dispatcher Build() /// /// Interface INeedACommandProcessor /// - public interface INeedACommandProcessorProvider + public interface INeedACommandProcessorFactory { /// /// The command processor used to send and publish messages to handlers by the service activator. /// - /// The command processor provider. + /// The command processor provider Factory. /// INeedAMessageMapper. - INeedAMessageMapper CommandProcessorProvider(IAmACommandProcessorProvider commandProcessorProvider); + INeedAMessageMapper CommandProcessorFactory(Func commandProcessorFactory); } /// diff --git a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs index 8fadf3e788..d10f3c6ebf 100644 --- a/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs +++ b/src/Paramore.Brighter.ServiceActivator/Dispatcher.cs @@ -52,12 +52,12 @@ public class Dispatcher : IDispatcher /// Gets the command processor. /// /// The command processor. - public IAmACommandProcessor CommandProcessor { get => CommandProcessorProvider.Get(); } + public IAmACommandProcessor CommandProcessor { get => CommandProcessorFactory.Invoke().Get(); } /// /// /// - public IAmACommandProcessorProvider CommandProcessorProvider { get; } + public Func CommandProcessorFactory { get; } /// /// Gets the connections. @@ -87,15 +87,15 @@ public class Dispatcher : IDispatcher /// /// Initializes a new instance of the class. /// - /// The command processor Provider. + /// The command processor Factory. /// The message mapper registry. /// The connections. public Dispatcher( - IAmACommandProcessorProvider commandProcessorProvider, + Func commandProcessorFactory, IAmAMessageMapperRegistry messageMapperRegistry, IEnumerable connections) { - CommandProcessorProvider = commandProcessorProvider; + CommandProcessorFactory = commandProcessorFactory; Connections = connections; _messageMapperRegistry = messageMapperRegistry; @@ -109,7 +109,7 @@ public Dispatcher( } public Dispatcher(IAmACommandProcessor commandProcessor, IAmAMessageMapperRegistry messageMapperRegistry, - IEnumerable connection) : this(new CommandProcessorProvider(commandProcessor), + IEnumerable connection) : this(() => new CommandProcessorProvider(commandProcessor), messageMapperRegistry, connection) { } @@ -287,7 +287,7 @@ private IEnumerable CreateConsumers(IEnumerable connecti int performer = i; s_logger.LogInformation("Dispatcher: Creating consumer number {ConsumerNumber} for subscription: {ChannelName}", performer + 1, connection.Name); var consumerFactoryType = typeof(ConsumerFactory<>).MakeGenericType(connection.DataType); - var consumerFactory = (IConsumerFactory)Activator.CreateInstance(consumerFactoryType, CommandProcessorProvider, _messageMapperRegistry, connection); + var consumerFactory = (IConsumerFactory)Activator.CreateInstance(consumerFactoryType, CommandProcessorFactory.Invoke(), _messageMapperRegistry, connection); list.Add(consumerFactory.Create()); } diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher.cs b/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher.cs index 6488864539..f3dc5ac411 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher.cs @@ -81,7 +81,7 @@ public DispatchBuilderTests() .Build(); _builder = DispatchBuilder.With() - .CommandProcessorProvider(new CommandProcessorProvider(commandProcessor)) + .CommandProcessorFactory(() =>new CommandProcessorProvider(commandProcessor)) .MessageMappers(messageMapperRegistry) .DefaultChannelFactory(new ChannelFactory(rmqMessageConsumerFactory)) .Connections(new [] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher_with_named_gateway.cs b/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher_with_named_gateway.cs index dcedff874f..1771e79058 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher_with_named_gateway.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessageDispatch/When_building_a_dispatcher_with_named_gateway.cs @@ -76,7 +76,7 @@ public DispatchBuilderWithNamedGateway() .Build(); _builder = DispatchBuilder.With() - .CommandProcessorProvider(new CommandProcessorProvider(commandProcessor)) + .CommandProcessorFactory(() => new CommandProcessorProvider(commandProcessor)) .MessageMappers(messageMapperRegistry) .DefaultChannelFactory(new ChannelFactory(rmqMessageConsumerFactory)) .Connections(new []