Skip to content

Commit

Permalink
Ensured a new CommandProcessorProvider for each Message Pump (#1866)
Browse files Browse the repository at this point in the history
* Ensured a new CommandProcessorProvider for each Message Pump

* Downgrade Microsoft.Data.SqlClient to 3.0.1

* Plumbed the CommandProcessor Factory down to the creation of each Message Pump
  • Loading branch information
preardon authored Nov 23, 2021
1 parent d87cde7 commit 42e313d
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 43 deletions.
15 changes: 12 additions & 3 deletions samples/ASBTaskQueue/GreetingsSender.Web/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using Greetings.Adaptors.Data;
using Greetings.Adaptors.Services;
using Greetings.Ports.CommandHandlers;
using Greetings.Ports.Commands;
using Microsoft.AspNetCore.Builder;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -11,21 +14,26 @@
using Paramore.Brighter;
using Greetings.Ports.Events;
using Greetings.Ports.Mappers;
using Paramore.Brighter.MessagingGateway.AzureServiceBus.ClientProvider;

var builder = WebApplication.CreateBuilder(args);

string dbConnString = "server=(localdb)\\mssqllocaldb;database=BrighterTests;trusted_connection=yes";
string dbConnString = "Server=127.0.0.1,11433;Database=BrighterTests;User Id=sa;Password=Password1!;Application Name=BrighterTests;MultipleActiveResultSets=True";

//EF
builder.Services.AddDbContext<GreetingsDataContext>(o =>
{
o.UseSqlServer(dbConnString);
});

//Services

builder.Services.AddScoped<IUnitOfWork, UnitOfWork>();

//Brighter
string asbConnectionString = "Endpoint=sb://.servicebus.windows.net/;Authentication=Managed Identity";
string asbEndpoint = ".servicebus.windows.net";

var asbConnection = new AzureServiceBusConfiguration(asbConnectionString, true);
var asbConnection = new ServiceBusVisualStudioCredentialClientProvider(asbEndpoint);
var producer = AzureServiceBusMessageProducerFactory.Get(asbConnection);

var outboxConfig = new MsSqlConfiguration(dbConnString, "BrighterOutbox");
Expand All @@ -43,6 +51,7 @@
{
r.Add(typeof(GreetingEvent), typeof(GreetingEventMessageMapper));
r.Add(typeof(GreetingAsyncEvent), typeof(GreetingEventAsyncMessageMapper));
r.Add(typeof(AddGreetingCommand), typeof(AddGreetingMessageMapper));
});


Expand Down
1 change: 1 addition & 0 deletions samples/ASBTaskQueue/GreetingsWorker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public async static Task Main(string[] args)
options.Subscriptions = subscriptions;
options.ChannelFactory = new AzureServiceBusChannelFactory(asbConsumerFactory);
options.UseScoped = true;
}).UseMsSqlOutbox(outboxConfig, typeof(MsSqlSqlAuthConnectionProvider))
.UseMsSqlTransactionConnectionProvider(typeof(MsSqlEntityFrameworkCoreConnectionProvider<GreetingsDataContext>))
.UseExternalBus(AzureServiceBusMessageProducerFactory.Get(clientProvider))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.5.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="4.0.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="3.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.5.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="4.0.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="3.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
</ItemGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Paramore.Brighter.MsSql/Paramore.Brighter.MsSql.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Data.SqlClient" Version="4.0.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="3.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.5.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="4.0.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="3.0.1" />
<PackageReference Include="System.Data.SqlClient" Version="4.8.3" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using Microsoft.Extensions.DependencyInjection;
using Paramore.Brighter.Extensions.DependencyInjection;

Expand Down Expand Up @@ -33,24 +33,26 @@ public static IBrighterBuilder AddServiceActivator(

services.AddSingleton<IDispatcher>(BuildDispatcher);

if (options.UseScoped)
{
services.AddTransient<IAmACommandProcessorProvider, ScopedCommandProcessorProvider>();
}
else
{
services.AddTransient<IAmACommandProcessorProvider, CommandProcessorProvider>();
}

return ServiceCollectionExtensions.BrighterHandlerBuilder(services, options);
}

private static Dispatcher BuildDispatcher(IServiceProvider serviceProvider)
{
var commandProcessorProvider = serviceProvider.GetService<IAmACommandProcessorProvider>();
var options = serviceProvider.GetService<ServiceActivatorOptions>();

var dispatcherBuilder = DispatchBuilder.With().CommandProcessorProvider(commandProcessorProvider);
Func<IAmACommandProcessorProvider> providerFactory;

if (options.UseScoped)
{
providerFactory = () => new ScopedCommandProcessorProvider(serviceProvider);
}
else
{
var commandProcessor = serviceProvider.GetService<IAmACommandProcessor>();
providerFactory = () => new CommandProcessorProvider(commandProcessor);
}

var dispatcherBuilder = DispatchBuilder.With().CommandProcessorFactory(providerFactory);

var messageMapperRegistry = ServiceCollectionExtensions.MessageMapperRegistry(serviceProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ public Dispatcher Build(string hostName)
.ExternalBus(new MessagingConfiguration(producer, outgoingMessageMapperRegistry), outbox)
.RequestContextFactory(new InMemoryRequestContextFactory())
.Build();

var commandProcessorProvider = new CommandProcessorProvider(commandProcessor);


// These are the control bus channels, we hardcode them because we want to know they exist, but we use
// a base naming scheme to allow centralized management.
var connectionsConfiguration = new Subscription[]
Expand All @@ -179,7 +177,7 @@ public Dispatcher Build(string hostName)
};

return DispatchBuilder.With()
.CommandProcessorProvider(commandProcessorProvider)
.CommandProcessorFactory(() => new CommandProcessorProvider(commandProcessor))
.MessageMappers(incomingMessageMapperRegistry)
.DefaultChannelFactory(_channelFactory)
.Connections(connectionsConfiguration)
Expand Down
23 changes: 12 additions & 11 deletions src/Paramore.Brighter.ServiceActivator/DispatchBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Licence
#region Licence
/* The MIT License (MIT)
Copyright © 2014 Ian Cooper <[email protected]>
Expand All @@ -22,6 +22,7 @@ THE SOFTWARE. */

#endregion

using System;
using System.Collections.Generic;
using System.Linq;

Expand All @@ -33,9 +34,9 @@ namespace Paramore.Brighter.ServiceActivator
/// progressive interfaces to manage the requirements for a complete Dispatcher via Intellisense in the IDE. The intent is to make it easier to
/// recognize those dependencies that you need to configure
/// </summary>
public class DispatchBuilder : INeedACommandProcessorProvider, INeedAChannelFactory, INeedAMessageMapper, INeedAListOfConnections, IAmADispatchBuilder
public class DispatchBuilder : INeedACommandProcessorFactory, INeedAChannelFactory, INeedAMessageMapper, INeedAListOfConnections, IAmADispatchBuilder
{
private IAmACommandProcessorProvider _commandProcessorProvider;
private Func<IAmACommandProcessorProvider> _commandProcessorFactory;
private IAmAMessageMapperRegistry _messageMapperRegistry;
private IAmAChannelFactory _defaultChannelFactory;
private IEnumerable<Subscription> _connections;
Expand All @@ -46,19 +47,19 @@ private DispatchBuilder() { }
/// Begins the fluent interface
/// </summary>
/// <returns>INeedALogger.</returns>
public static INeedACommandProcessorProvider With()
public static INeedACommandProcessorFactory With()
{
return new DispatchBuilder();
}

/// <summary>
/// The command processor used to send and publish messages to handlers by the service activator.
/// </summary>
/// <param name="theCommandProcessorProvider">The command processor provider.</param>
/// <param name="commandProcessorFactory">The command processor Factory.</param>
/// <returns>INeedAMessageMapper.</returns>
public INeedAMessageMapper CommandProcessorProvider(IAmACommandProcessorProvider theCommandProcessorProvider)
public INeedAMessageMapper CommandProcessorFactory(Func<IAmACommandProcessorProvider> commandProcessorFactory)
{
_commandProcessorProvider = theCommandProcessorProvider;
_commandProcessorFactory = commandProcessorFactory;
return this;
}

Expand Down Expand Up @@ -109,7 +110,7 @@ public IAmADispatchBuilder Connections(IEnumerable<Subscription> connections)
/// <returns>Dispatcher.</returns>
public Dispatcher Build()
{
return new Dispatcher(_commandProcessorProvider, _messageMapperRegistry, _connections);
return new Dispatcher(_commandProcessorFactory, _messageMapperRegistry, _connections);
}
}

Expand All @@ -118,14 +119,14 @@ public Dispatcher Build()
/// <summary>
/// Interface INeedACommandProcessor
/// </summary>
public interface INeedACommandProcessorProvider
public interface INeedACommandProcessorFactory
{
/// <summary>
/// The command processor used to send and publish messages to handlers by the service activator.
/// </summary>
/// <param name="commandProcessorProvider">The command processor provider.</param>
/// <param name="commandProcessorFactory">The command processor provider Factory.</param>
/// <returns>INeedAMessageMapper.</returns>
INeedAMessageMapper CommandProcessorProvider(IAmACommandProcessorProvider commandProcessorProvider);
INeedAMessageMapper CommandProcessorFactory(Func<IAmACommandProcessorProvider> commandProcessorFactory);
}

/// <summary>
Expand Down
14 changes: 7 additions & 7 deletions src/Paramore.Brighter.ServiceActivator/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public class Dispatcher : IDispatcher
/// Gets the command processor.
/// </summary>
/// <value>The command processor.</value>
public IAmACommandProcessor CommandProcessor { get => CommandProcessorProvider.Get(); }
public IAmACommandProcessor CommandProcessor { get => CommandProcessorFactory.Invoke().Get(); }

/// <summary>
///
/// </summary>
public IAmACommandProcessorProvider CommandProcessorProvider { get; }
public Func<IAmACommandProcessorProvider> CommandProcessorFactory { get; }

/// <summary>
/// Gets the connections.
Expand Down Expand Up @@ -87,15 +87,15 @@ public class Dispatcher : IDispatcher
/// <summary>
/// Initializes a new instance of the <see cref="Dispatcher"/> class.
/// </summary>
/// <param name="commandProcessorProvider">The command processor Provider.</param>
/// <param name="commandProcessorFactory">The command processor Factory.</param>
/// <param name="messageMapperRegistry">The message mapper registry.</param>
/// <param name="connections">The connections.</param>
public Dispatcher(
IAmACommandProcessorProvider commandProcessorProvider,
Func<IAmACommandProcessorProvider> commandProcessorFactory,
IAmAMessageMapperRegistry messageMapperRegistry,
IEnumerable<Subscription> connections)
{
CommandProcessorProvider = commandProcessorProvider;
CommandProcessorFactory = commandProcessorFactory;

Connections = connections;
_messageMapperRegistry = messageMapperRegistry;
Expand All @@ -109,7 +109,7 @@ public Dispatcher(
}

public Dispatcher(IAmACommandProcessor commandProcessor, IAmAMessageMapperRegistry messageMapperRegistry,
IEnumerable<Subscription> connection) : this(new CommandProcessorProvider(commandProcessor),
IEnumerable<Subscription> connection) : this(() => new CommandProcessorProvider(commandProcessor),
messageMapperRegistry, connection)
{
}
Expand Down Expand Up @@ -287,7 +287,7 @@ private IEnumerable<Consumer> CreateConsumers(IEnumerable<Subscription> connecti
int performer = i;
s_logger.LogInformation("Dispatcher: Creating consumer number {ConsumerNumber} for subscription: {ChannelName}", performer + 1, connection.Name);
var consumerFactoryType = typeof(ConsumerFactory<>).MakeGenericType(connection.DataType);
var consumerFactory = (IConsumerFactory)Activator.CreateInstance(consumerFactoryType, CommandProcessorProvider, _messageMapperRegistry, connection);
var consumerFactory = (IConsumerFactory)Activator.CreateInstance(consumerFactoryType, CommandProcessorFactory.Invoke(), _messageMapperRegistry, connection);
list.Add(consumerFactory.Create());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public DispatchBuilderTests()
.Build();

_builder = DispatchBuilder.With()
.CommandProcessorProvider(new CommandProcessorProvider(commandProcessor))
.CommandProcessorFactory(() =>new CommandProcessorProvider(commandProcessor))
.MessageMappers(messageMapperRegistry)
.DefaultChannelFactory(new ChannelFactory(rmqMessageConsumerFactory))
.Connections(new []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public DispatchBuilderWithNamedGateway()
.Build();

_builder = DispatchBuilder.With()
.CommandProcessorProvider(new CommandProcessorProvider(commandProcessor))
.CommandProcessorFactory(() => new CommandProcessorProvider(commandProcessor))
.MessageMappers(messageMapperRegistry)
.DefaultChannelFactory(new ChannelFactory(rmqMessageConsumerFactory))
.Connections(new []
Expand Down

0 comments on commit 42e313d

Please sign in to comment.