From 4d0d944284bc43a5e4d15dd6bbf2db49b46137a2 Mon Sep 17 00:00:00 2001 From: Sam Rumley <45108474+samrumley88@users.noreply.github.com> Date: Wed, 12 Jan 2022 11:16:39 +0000 Subject: [PATCH] Postgres outbox transaction provider (#1936) * Added PostgreSql transaction support, inc EntityFramework connection provider. * Separate methods for getting the Connection provider and getting an open connection in PostgreSqlOutboxSync and PostgreSqlInbox classes. * Remove author attribute from PostgreSql Inbox csproj file. Added in error (copy-paste). * Tidy up Brighter.sln --- Brighter.sln | 30 +- .../Paramore.Brighter.Inbox.Postgres.csproj | 21 +- .../PostgresSqlInbox.cs | 224 ++++++---- .../PostgresSqlInboxConfiguration.cs | 18 +- ...Paramore.Brighter.Outbox.PostgreSql.csproj | 1 + .../PostgreSqlOutboxConfiguration.cs | 21 +- .../PostgreSqlOutboxSync.cs | 384 +++++++++++------- .../ServiceCollectionExtensions.cs | 48 ++- ...hter.PostgreSql.EntityFrameworkCore.csproj | 22 + ...greSqlEntityFrameworkConnectionProvider.cs | 60 +++ .../IPostgreSqlConnectionProvider.cs | 19 + ...PostgreSqlTransactionConnectionProvider.cs | 4 + .../Paramore.Brighter.PostgreSql.csproj | 18 + .../PostgreSqlConfiguration.cs | 12 + .../PostgreSqlNpgsqlConnectionProvider.cs | 42 ++ 15 files changed, 662 insertions(+), 262 deletions(-) create mode 100644 src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj create mode 100644 src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs create mode 100644 src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs create mode 100644 src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs create mode 100644 src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj create mode 100644 src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs create mode 100644 src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs diff --git a/Brighter.sln b/Brighter.sln index 1dc116a904..0e5341c19c 100644 --- a/Brighter.sln +++ b/Brighter.sln @@ -247,7 +247,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Greetings_MySqlMigrations", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreetingsScopedReceiverConsole", "samples\ASBTaskQueue\GreetingsScopedReceiverConsole\GreetingsScopedReceiverConsole.csproj", "{9D9F08A9-66EE-4AA2-8F11-2FA662EAADE2}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GreetingsWorker", "samples\ASBTaskQueue\GreetingsWorker\GreetingsWorker.csproj", "{93589653-2B49-4818-BE98-FE6F16EC72EC}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GreetingsWorker", "samples\ASBTaskQueue\GreetingsWorker\GreetingsWorker.csproj", "{93589653-2B49-4818-BE98-FE6F16EC72EC}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Paramore.Brighter.PostgreSql", "src\Paramore.Brighter.PostgreSql\Paramore.Brighter.PostgreSql.csproj", "{08E6D0F8-B6CE-454F-8761-77731D99F743}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.PostgreSql.EntityFrameworkCore", "src\Paramore.Brighter.PostgreSql.EntityFrameworkCore\Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj", "{AA85493A-4120-4DA0-BAA5-CBF34D238A64}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -1327,6 +1331,30 @@ Global {93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|Mixed Platforms.Build.0 = Release|Any CPU {93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|x86.ActiveCfg = Release|Any CPU {93589653-2B49-4818-BE98-FE6F16EC72EC}.Release|x86.Build.0 = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Any CPU.Build.0 = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|x86.ActiveCfg = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Debug|x86.Build.0 = Debug|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Any CPU.ActiveCfg = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Any CPU.Build.0 = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|x86.ActiveCfg = Release|Any CPU + {08E6D0F8-B6CE-454F-8761-77731D99F743}.Release|x86.Build.0 = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|x86.ActiveCfg = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Debug|x86.Build.0 = Debug|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Any CPU.Build.0 = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|x86.ActiveCfg = Release|Any CPU + {AA85493A-4120-4DA0-BAA5-CBF34D238A64}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj b/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj index f5743fbd06..3ae8575457 100644 --- a/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj +++ b/src/Paramore.Brighter.Inbox.Postgres/Paramore.Brighter.Inbox.Postgres.csproj @@ -1,15 +1,18 @@ - - netstandard2.0 - + + This is an implementation of the inbox used for decoupled invocation of commands by Paramore.Brighter, using PostgreSql + netstandard2.0 + MySql;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability + - - - + + + + - - - + + + diff --git a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs index 691d0afa11..565c24030a 100644 --- a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs +++ b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInbox.cs @@ -34,12 +34,14 @@ THE SOFTWARE. */ using NpgsqlTypes; using Paramore.Brighter.Inbox.Exceptions; using Paramore.Brighter.Logging; +using Paramore.Brighter.PostgreSql; namespace Paramore.Brighter.Inbox.Postgres { public class PostgresSqlInbox : IAmAnInbox, IAmAnInboxAsync { private readonly PostgresSqlInboxConfiguration _configuration; + private readonly IPostgreSqlConnectionProvider _connectionProvider; private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); /// /// If false we the default thread synchronization context to run any continuation, if true we re-use the original @@ -51,38 +53,44 @@ public class PostgresSqlInbox : IAmAnInbox, IAmAnInboxAsync /// public bool ContinueOnCapturedContext { get; set; } - public PostgresSqlInbox(PostgresSqlInboxConfiguration postgresSqlInboxConfiguration) - { - _configuration = postgresSqlInboxConfiguration; - ContinueOnCapturedContext = false; - } - + public PostgresSqlInbox(PostgresSqlInboxConfiguration configuration, IPostgreSqlConnectionProvider connectionProvider = null) + { + _configuration = configuration; + _connectionProvider = connectionProvider; + ContinueOnCapturedContext = false; + } public void Add(T command, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest { - var parameters = InitAddDbParameters(command, contextKey); - - using (var connection = GetConnection()) - { - connection.Open(); - var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); - try - { - sqlcmd.ExecuteNonQuery(); - } - catch (PostgresException sqlException) - { - if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) - { - s_logger.LogWarning( - "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - return; - } - - throw; - } - } + var connectionProvider = GetConnectionProvider(); + var parameters = InitAddDbParameters(command, contextKey); + var connection = GetOpenConnection(connectionProvider); + + try + { + using (var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds)) + { + sqlcmd.ExecuteNonQuery(); + } + } + catch (PostgresException sqlException) + { + if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) + { + s_logger.LogWarning( + "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); + return; + } + throw; + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); + } } public T Get(Guid id, string contextKey, int timeoutInMilliseconds = -1) where T : class, IRequest @@ -90,8 +98,8 @@ public T Get(Guid id, string contextKey, int timeoutInMilliseconds = -1) wher var sql = $"SELECT * FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey"; var parameters = new[] { - CreateNpgsqlParameter("CommandId", id), - CreateNpgsqlParameter("ContextKey", contextKey) + InitNpgsqlParameter("CommandId", id), + InitNpgsqlParameter("ContextKey", contextKey) }; return ExecuteCommand(command => ReadCommand(command.ExecuteReader(), id), sql, timeoutInMilliseconds, parameters); @@ -102,38 +110,45 @@ public bool Exists(Guid id, string contextKey, int timeoutInMilliseconds = -1 var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey FETCH FIRST 1 ROWS ONLY"; var parameters = new[] { - CreateNpgsqlParameter("CommandId", id), - CreateNpgsqlParameter("ContextKey", contextKey) + InitNpgsqlParameter("CommandId", id), + InitNpgsqlParameter("ContextKey", contextKey) }; return ExecuteCommand(command => command.ExecuteReader().HasRows, sql, timeoutInMilliseconds, parameters); } public async Task AddAsync(T command, string contextKey, int timeoutInMilliseconds = -1, - CancellationToken cancellationToken = default(CancellationToken)) where T : class, IRequest + CancellationToken cancellationToken = default) where T : class, IRequest { + var connectionProvider = GetConnectionProvider(); var parameters = InitAddDbParameters(command, contextKey); + var connection = await GetOpenConnectionAsync(connectionProvider, cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - using (var connection = GetConnection()) + try { - await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds); - try + using (var sqlcmd = InitAddDbCommand(connection, parameters, timeoutInMilliseconds)) { await sqlcmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); } - catch (PostgresException sqlException) + } + catch (PostgresException sqlException) + { + if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) { - if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) - { - s_logger.LogWarning( - "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", - command.Id); - return; - } - - throw; + s_logger.LogWarning( + "PostgresSqlOutbox: A duplicate Command with the CommandId {Id} was inserted into the Outbox, ignoring and continuing", + command.Id); + return; } + + throw; + } + finally + { + if (!connectionProvider.IsSharedConnection) + await connection.DisposeAsync().ConfigureAwait(ContinueOnCapturedContext); + else if (!connectionProvider.HasOpenTransaction) + await connection.CloseAsync().ConfigureAwait(ContinueOnCapturedContext); } } @@ -143,8 +158,8 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise var parameters = new[] { - CreateNpgsqlParameter("CommandId", id), - CreateNpgsqlParameter("ContextKey", contextKey) + InitNpgsqlParameter("CommandId", id), + InitNpgsqlParameter("ContextKey", contextKey) }; return await ExecuteCommandAsync( @@ -161,8 +176,8 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise var sql = $"SELECT DISTINCT CommandId FROM {_configuration.InBoxTableName} WHERE CommandId = @CommandId AND ContextKey = @ContextKey FETCH FIRST 1 ROWS ONLY"; var parameters = new[] { - CreateNpgsqlParameter("CommandId", id), - CreateNpgsqlParameter("ContextKey", contextKey) + InitNpgsqlParameter("CommandId", id), + InitNpgsqlParameter("ContextKey", contextKey) }; return await ExecuteCommandAsync( @@ -178,12 +193,42 @@ public async Task AddAsync(T command, string contextKey, int timeoutInMillise .ConfigureAwait(ContinueOnCapturedContext); } - private NpgsqlConnection GetConnection() + private IPostgreSqlConnectionProvider GetConnectionProvider(IAmABoxTransactionConnectionProvider transactionConnectionProvider = null) { - return new NpgsqlConnection(_configuration.ConnectionString); + var connectionProvider = _connectionProvider ?? new PostgreSqlNpgsqlConnectionProvider(_configuration); + + if (transactionConnectionProvider != null) + { + if (transactionConnectionProvider is IPostgreSqlTransactionConnectionProvider provider) + connectionProvider = provider; + else + throw new Exception($"{nameof(transactionConnectionProvider)} does not implement interface {nameof(IPostgreSqlTransactionConnectionProvider)}."); + } + + return connectionProvider; } - private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value) + private NpgsqlConnection GetOpenConnection(IPostgreSqlConnectionProvider connectionProvider) + { + NpgsqlConnection connection = connectionProvider.GetConnection(); + + if (connection.State != ConnectionState.Open) + connection.Open(); + + return connection; + } + + private async Task GetOpenConnectionAsync(IPostgreSqlConnectionProvider connectionProvider, CancellationToken cancellationToken = default) + { + NpgsqlConnection connection = await connectionProvider.GetConnectionAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + if (connection.State != ConnectionState.Open) + await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + return connection; + } + + private NpgsqlParameter InitNpgsqlParameter(string parametername, object value) { if (value != null) return new NpgsqlParameter(parametername, value); @@ -194,10 +239,9 @@ private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value private DbCommand InitAddDbCommand(DbConnection connection, DbParameter[] parameters, int timeoutInMilliseconds) { var command = connection.CreateCommand(); - var sql = string.Format( + command.CommandText = string.Format( "INSERT INTO {0} (CommandID, CommandType, CommandBody, Timestamp, ContextKey) VALUES (@CommandID, @CommandType, @CommandBody, @Timestamp, @ContextKey)", _configuration.InBoxTableName); - command.CommandText = sql; command.Parameters.AddRange(parameters); return command; } @@ -207,11 +251,11 @@ private DbParameter[] InitAddDbParameters(T command, string contextKey) where var commandJson = JsonSerializer.Serialize(command, JsonSerialisationOptions.Options); var parameters = new[] { - CreateNpgsqlParameter("CommandID", command.Id), - CreateNpgsqlParameter("CommandType", typeof (T).Name), - CreateNpgsqlParameter("CommandBody", commandJson), + InitNpgsqlParameter("CommandID", command.Id), + InitNpgsqlParameter("CommandType", typeof (T).Name), + InitNpgsqlParameter("CommandBody", commandJson), new NpgsqlParameter("Timestamp", NpgsqlDbType.TimestampTz) {Value = DateTimeOffset.UtcNow}, - CreateNpgsqlParameter("ContextKey", contextKey) + InitNpgsqlParameter("ContextKey", contextKey) }; return parameters; } @@ -219,16 +263,28 @@ private DbParameter[] InitAddDbParameters(T command, string contextKey) where private T ExecuteCommand(Func execute, string sql, int timeoutInMilliseconds, params DbParameter[] parameters) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) + var connectionProvider = GetConnectionProvider(); + var connection = GetOpenConnection(connectionProvider); + + try { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); + using (var command = connection.CreateCommand()) + { + if (timeoutInMilliseconds != -1) + command.CommandTimeout = timeoutInMilliseconds; - connection.Open(); - var item = execute(command); - return item; + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + return execute(command); + } + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); } } @@ -236,19 +292,31 @@ private async Task ExecuteCommandAsync( Func> execute, string sql, int timeoutInMilliseconds, - CancellationToken cancellationToken = default(CancellationToken), + CancellationToken cancellationToken = default, params DbParameter[] parameters) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) + var connectionProvider = GetConnectionProvider(); + var connection = await GetOpenConnectionAsync(connectionProvider, cancellationToken).ConfigureAwait(ContinueOnCapturedContext); + + try { - if (timeoutInMilliseconds != -1) command.CommandTimeout = timeoutInMilliseconds; - command.CommandText = sql; - command.Parameters.AddRange(parameters); + using (var command = connection.CreateCommand()) + { + if (timeoutInMilliseconds != -1) + command.CommandTimeout = timeoutInMilliseconds; - await connection.OpenAsync(cancellationToken).ConfigureAwait(ContinueOnCapturedContext); - var item = await execute(command).ConfigureAwait(ContinueOnCapturedContext); - return item; + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + return await execute(command).ConfigureAwait(ContinueOnCapturedContext); + } + } + finally + { + if (!connectionProvider.IsSharedConnection) + await connection.DisposeAsync().ConfigureAwait(ContinueOnCapturedContext); + else if (!connectionProvider.HasOpenTransaction) + await connection.CloseAsync().ConfigureAwait(ContinueOnCapturedContext); } } diff --git a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs index 7d9194c192..31ff311cd9 100644 --- a/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs +++ b/src/Paramore.Brighter.Inbox.Postgres/PostgresSqlInboxConfiguration.cs @@ -23,28 +23,26 @@ THE SOFTWARE. */ #endregion +using Paramore.Brighter.PostgreSql; namespace Paramore.Brighter.Inbox.Postgres { - public class PostgresSqlInboxConfiguration + public class PostgresSqlInboxConfiguration : PostgreSqlConfiguration { - public PostgresSqlInboxConfiguration(string connectionString, string tableName) + public PostgresSqlInboxConfiguration(string connectionString, string tableName) : base(connectionString) { - ConnectionString = connectionString; InBoxTableName = tableName; } - /// - /// Gets the connection string. - /// - /// The connection string. - public string ConnectionString { get; private set; } + public PostgresSqlInboxConfiguration(string tableName) : base(null) + { + InBoxTableName = tableName; + } /// /// Gets the name of the outbox table. /// /// The name of the outbox table. - public string InBoxTableName { get; private set; } - + public string InBoxTableName { get; } } } diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj b/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj index 6c78d8a9b6..636ba175fd 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj +++ b/src/Paramore.Brighter.Outbox.PostgreSql/Paramore.Brighter.Outbox.PostgreSql.csproj @@ -7,6 +7,7 @@ + diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs index b593c5ccc3..73a7a44c3c 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxConfiguration.cs @@ -22,36 +22,35 @@ THE SOFTWARE. */ #endregion -using System; -using System.Collections.Generic; -using System.Text; +using Paramore.Brighter.PostgreSql; namespace Paramore.Brighter.Outbox.PostgreSql { - public class PostgreSqlOutboxConfiguration + public class PostgreSqlOutboxConfiguration : PostgreSqlConfiguration { /// /// Initialises a new instance of /// /// The Subscription String /// Name of the OutBox table - public PostgreSqlOutboxConfiguration(string connectionstring,string outBoxTablename) + public PostgreSqlOutboxConfiguration(string connectionstring, string outBoxTablename) : base(connectionstring) { - ConnectionString = connectionstring; OutboxTableName = outBoxTablename; } /// - /// Gets the subscription string. + /// Initialises a new instance of /// - /// The subscription string. - public string ConnectionString { get; private set; } + /// Name of the OutBox table + public PostgreSqlOutboxConfiguration(string outBoxTablename) : base(null) + { + OutboxTableName = outBoxTablename; + } /// /// Gets the name of the outbox table. /// /// The name of the outbox table. - public string OutboxTableName { get; private set; } - + public string OutboxTableName { get; } } } diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs index 8bf59bfaa7..2ec70def27 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/PostgreSqlOutboxSync.cs @@ -31,13 +31,15 @@ THE SOFTWARE. */ using Npgsql; using NpgsqlTypes; using Paramore.Brighter.Logging; +using Paramore.Brighter.PostgreSql; namespace Paramore.Brighter.Outbox.PostgreSql { public class PostgreSqlOutboxSync : IAmAnOutboxSync, IAmAnOutboxViewer { - private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); private readonly PostgreSqlOutboxConfiguration _configuration; + private readonly IPostgreSqlConnectionProvider _connectionProvider; + private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); public bool ContinueOnCapturedContext { @@ -48,10 +50,11 @@ public bool ContinueOnCapturedContext /// /// Initialises a new instance of class. /// - /// PostgreSql Configuration. - public PostgreSqlOutboxSync(PostgreSqlOutboxConfiguration configuration) + /// PostgreSql Outbox Configuration. + public PostgreSqlOutboxSync(PostgreSqlOutboxConfiguration configuration, IPostgreSqlConnectionProvider connectionProvider = null) { _configuration = configuration; + _connectionProvider = connectionProvider; } /// @@ -61,30 +64,36 @@ public PostgreSqlOutboxSync(PostgreSqlOutboxConfiguration configuration) /// The time allowed for the write in milliseconds; on a -1 default public void Add(Message message, int outBoxTimeout = -1, IAmABoxTransactionConnectionProvider transactionConnectionProvider = null) { + var connectionProvider = GetConnectionProvider(transactionConnectionProvider); var parameters = InitAddDbParameters(message); - using (var connection = GetConnection()) + var connection = GetOpenConnection(connectionProvider); + + try { - connection.Open(); using (var command = InitAddDbCommand(connection, parameters)) { - try - { - command.ExecuteNonQuery(); - } - catch (PostgresException sqlException) - { - if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) - { - s_logger.LogWarning( - "PostgresSQLOutbox: A duplicate Message with the MessageId {Id} was inserted into the Outbox, ignoring and continuing", - message.Id); - return; - } - - throw; - } + command.ExecuteNonQuery(); } } + catch (PostgresException sqlException) + { + if (sqlException.SqlState == PostgresErrorCodes.UniqueViolation) + { + s_logger.LogWarning( + "PostgresSQLOutbox: A duplicate Message with the MessageId {Id} was inserted into the Outbox, ignoring and continuing", + message.Id); + return; + } + + throw; + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); + } } /// @@ -103,22 +112,30 @@ public IEnumerable DispatchedMessages( int outboxTimeout = -1, Dictionary args = null) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - CreatePagedDispatchedCommand(command, millisecondsDispatchedSince, pageSize, pageNumber); + var connectionProvider = GetConnectionProvider(); + var connection = GetOpenConnection(connectionProvider); - connection.Open(); + try + { + using (var command = InitPagedDispatchedCommand(connection, millisecondsDispatchedSince, pageSize, pageNumber)) + { + var messages = new List(); - var dbDataReader = command.ExecuteReader(); + using (var dbDataReader = command.ExecuteReader()) + { + while (dbDataReader.Read()) + messages.Add(MapAMessage(dbDataReader)); + } - var messages = new List(); - while (dbDataReader.Read()) - { - messages.Add(MapAMessage(dbDataReader)); + return messages; } - - return messages; + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); } } @@ -131,26 +148,35 @@ public IEnumerable DispatchedMessages( /// A list of messages public IList Get(int pageSize = 100, int pageNumber = 1, Dictionary args = null) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - CreatePagedReadCommand(command, _configuration, pageSize, pageNumber); + var connectionProvider = GetConnectionProvider(); + var connection = GetOpenConnection(connectionProvider); - connection.Open(); + try + { + using (var command = InitPagedReadCommand(connection, pageSize, pageNumber)) + { + var messages = new List(); - var dbDataReader = command.ExecuteReader(); + using (var dbDataReader = command.ExecuteReader()) + { + while (dbDataReader.Read()) + { + messages.Add(MapAMessage(dbDataReader)); + } + } - var messages = new List(); - while (dbDataReader.Read()) - { - messages.Add(MapAMessage(dbDataReader)); + return messages; } - - return messages; + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); } } - /// /// Gets the specified message identifier. /// @@ -162,7 +188,7 @@ public Message Get(Guid messageId, int outBoxTimeout = -1) var sql = string.Format( "SELECT Id, MessageId, Topic, MessageType, Timestamp, Correlationid, ReplyTo, ContentType, HeaderBag, Body FROM {0} WHERE MessageId = @MessageId", _configuration.OutboxTableName); - var parameters = new[] {CreateNpgsqlParameter("MessageId", messageId)}; + var parameters = new[] { InitNpgsqlParameter("MessageId", messageId) }; return ExecuteCommand(command => MapFunction(command.ExecuteReader()), sql, outBoxTimeout, parameters); } @@ -174,14 +200,23 @@ public Message Get(Guid messageId, int outBoxTimeout = -1) /// When was the message dispatched, defaults to UTC now public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary args = null) { - using (var connection = GetConnection()) + var connectionProvider = GetConnectionProvider(); + var connection = GetOpenConnection(connectionProvider); + + try { - connection.Open(); using (var command = InitMarkDispatchedCommand(connection, id, dispatchedAt)) { command.ExecuteNonQuery(); } } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); + } } /// @@ -190,34 +225,69 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, DictionaryHow long ago as the message sent? /// How many messages to return at once? /// Which page number of messages - /// Additional parameters required for search, if any - /// A list of messages that are outstanding for dispatch - public IEnumerable OutstandingMessages( - double millSecondsSinceSent, - int pageSize = 100, - int pageNumber = 1, - Dictionary args = null) + /// Additional parameters required for search, if any + /// A list of messages that are outstanding for dispatch + public IEnumerable OutstandingMessages( + double millSecondsSinceSent, + int pageSize = 100, + int pageNumber = 1, + Dictionary args = null) { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - CreatePagedOutstandingCommand(command, millSecondsSinceSent, pageSize, pageNumber); + var connectionProvider = GetConnectionProvider(); + var connection = GetOpenConnection(connectionProvider); - connection.Open(); + try + { + using (var command = InitPagedOutstandingCommand(connection, millSecondsSinceSent, pageSize, pageNumber)) + { + var messages = new List(); - var dbDataReader = command.ExecuteReader(); + using (var dbDataReader = command.ExecuteReader()) + { + while (dbDataReader.Read()) + { + messages.Add(MapAMessage(dbDataReader)); + } + } - var messages = new List(); - while (dbDataReader.Read()) - { - messages.Add(MapAMessage(dbDataReader)); + return messages; } + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); + } + } - return messages; + private IPostgreSqlConnectionProvider GetConnectionProvider(IAmABoxTransactionConnectionProvider transactionConnectionProvider = null) + { + var connectionProvider = _connectionProvider ?? new PostgreSqlNpgsqlConnectionProvider(_configuration); + + if (transactionConnectionProvider != null) + { + if (transactionConnectionProvider is IPostgreSqlTransactionConnectionProvider provider) + connectionProvider = provider; + else + throw new Exception($"{nameof(transactionConnectionProvider)} does not implement interface {nameof(IPostgreSqlTransactionConnectionProvider)}."); } + + return connectionProvider; + } + + private NpgsqlConnection GetOpenConnection(IPostgreSqlConnectionProvider connectionProvider) + { + NpgsqlConnection connection = connectionProvider.GetConnection(); + + if (connection.State != ConnectionState.Open) + connection.Open(); + + return connection; } - private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value) + private NpgsqlParameter InitNpgsqlParameter(string parametername, object value) { if (value != null) return new NpgsqlParameter(parametername, value); @@ -225,121 +295,133 @@ private NpgsqlParameter CreateNpgsqlParameter(string parametername, object value return new NpgsqlParameter(parametername, DBNull.Value); } - private void CreatePagedDispatchedCommand(NpgsqlCommand command, double millisecondsDispatchedSince, + private NpgsqlCommand InitPagedDispatchedCommand(NpgsqlConnection connection, double millisecondsDispatchedSince, int pageSize, int pageNumber) { - var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; + var command = connection.CreateCommand(); + var parameters = new[] { - CreateNpgsqlParameter("PageNumber", pageNumber), - CreateNpgsqlParameter("PageSize", pageSize), - CreateNpgsqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince) + InitNpgsqlParameter("PageNumber", pageNumber), + InitNpgsqlParameter("PageSize", pageSize), + InitNpgsqlParameter("OutstandingSince", -1 * millisecondsDispatchedSince) }; - var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName); + var pagingSqlFormat = + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE DISPATCHED IS NOT NULL AND DISPATCHED < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; - command.CommandText = sql; + command.CommandText = string.Format(pagingSqlFormat, _configuration.OutboxTableName); command.Parameters.AddRange(parameters); + + return command; } - private void CreatePagedReadCommand(NpgsqlCommand command, PostgreSqlOutboxConfiguration configuration, - int pageSize, int pageNumber) + private NpgsqlCommand InitPagedReadCommand(NpgsqlConnection connection, int pageSize, int pageNumber) { - var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; + var command = connection.CreateCommand(); + var parameters = new[] { - CreateNpgsqlParameter("PageNumber", pageNumber), - CreateNpgsqlParameter("PageSize", pageSize) + InitNpgsqlParameter("PageNumber", pageNumber), + InitNpgsqlParameter("PageSize", pageSize) }; - var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName); + var pagingSqlFormat = + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp DESC) AS NUMBER, * FROM {0}) AS TBL WHERE NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp DESC"; - command.CommandText = sql; + command.CommandText = string.Format(pagingSqlFormat, _configuration.OutboxTableName); command.Parameters.AddRange(parameters); + + return command; } - private void CreatePagedOutstandingCommand(NpgsqlCommand command, double milliSecondsSinceAdded, int pageSize, + private NpgsqlCommand InitPagedOutstandingCommand(NpgsqlConnection connection, double milliSecondsSinceAdded, int pageSize, int pageNumber) { - var pagingSqlFormat = - "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC"; + var command = connection.CreateCommand(); + var parameters = new[] { - CreateNpgsqlParameter("PageNumber", pageNumber), - CreateNpgsqlParameter("PageSize", pageSize), - CreateNpgsqlParameter("OutstandingSince", milliSecondsSinceAdded) + InitNpgsqlParameter("PageNumber", pageNumber), + InitNpgsqlParameter("PageSize", pageSize), + InitNpgsqlParameter("OutstandingSince", milliSecondsSinceAdded) }; - var sql = string.Format(pagingSqlFormat, _configuration.OutboxTableName); + var pagingSqlFormat = + "SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY Timestamp ASC) AS NUMBER, * FROM {0} WHERE DISPATCHED IS NULL) AS TBL WHERE TIMESTAMP < (CURRENT_TIMESTAMP + (@OutstandingSince || ' millisecond')::INTERVAL) AND NUMBER BETWEEN ((@PageNumber-1)*@PageSize+1) AND (@PageNumber*@PageSize) ORDER BY Timestamp ASC"; - command.CommandText = sql; + command.CommandText = string.Format(pagingSqlFormat, _configuration.OutboxTableName); command.Parameters.AddRange(parameters); - } - - private T ExecuteCommand(Func execute, string sql, int messageStoreTimeout, - NpgsqlParameter[] parameters) - { - using (var connection = GetConnection()) - using (var command = connection.CreateCommand()) - { - command.CommandText = sql; - command.Parameters.AddRange(parameters); - - if (messageStoreTimeout != -1) command.CommandTimeout = messageStoreTimeout; - connection.Open(); - return execute(command); - } - } - - private NpgsqlConnection GetConnection() - { - return new NpgsqlConnection(_configuration.ConnectionString); - } - - private NpgsqlCommand InitAddDbCommand(NpgsqlConnection connection, NpgsqlParameter[] parameters) - { - var command = connection.CreateCommand(); - var sql = string.Format( - "INSERT INTO {0} (MessageId, MessageType, Topic, Timestamp, CorrelationId, ReplyTo, ContentType, HeaderBag, Body) VALUES (@MessageId, @MessageType, @Topic, @Timestamp::timestamptz, @CorrelationId, @ReplyTo, @ContentType, @HeaderBag, @Body)", - _configuration.OutboxTableName); - command.CommandText = sql; - command.Parameters.AddRange(parameters); return command; } private NpgsqlParameter[] InitAddDbParameters(Message message) { var bagjson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); - var parameters = new NpgsqlParameter[] + return new NpgsqlParameter[] { - CreateNpgsqlParameter("MessageId", message.Id), - CreateNpgsqlParameter("MessageType", message.Header.MessageType.ToString()), - CreateNpgsqlParameter("Topic", message.Header.Topic), + InitNpgsqlParameter("MessageId", message.Id), + InitNpgsqlParameter("MessageType", message.Header.MessageType.ToString()), + InitNpgsqlParameter("Topic", message.Header.Topic), new NpgsqlParameter("Timestamp", NpgsqlDbType.TimestampTz) {Value = message.Header.TimeStamp}, - CreateNpgsqlParameter("CorrelationId", message.Header.CorrelationId), - CreateNpgsqlParameter("ReplyTo", message.Header.ReplyTo), - CreateNpgsqlParameter("ContentType", message.Header.ContentType), - CreateNpgsqlParameter("HeaderBag", bagjson), - CreateNpgsqlParameter("Body", message.Body.Value) + InitNpgsqlParameter("CorrelationId", message.Header.CorrelationId), + InitNpgsqlParameter("ReplyTo", message.Header.ReplyTo), + InitNpgsqlParameter("ContentType", message.Header.ContentType), + InitNpgsqlParameter("HeaderBag", bagjson), + InitNpgsqlParameter("Body", message.Body.Value) }; - return parameters; } private NpgsqlCommand InitMarkDispatchedCommand(NpgsqlConnection connection, Guid messageId, DateTime? dispatchedAt) { var command = connection.CreateCommand(); - var sql = - $"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId"; - command.CommandText = sql; - command.Parameters.Add(CreateNpgsqlParameter("MessageId", messageId)); - command.Parameters.Add(CreateNpgsqlParameter("DispatchedAt", dispatchedAt)); + command.CommandText = $"UPDATE {_configuration.OutboxTableName} SET Dispatched = @DispatchedAt WHERE MessageId = @MessageId"; + command.Parameters.Add(InitNpgsqlParameter("MessageId", messageId)); + command.Parameters.Add(InitNpgsqlParameter("DispatchedAt", dispatchedAt)); return command; } + private T ExecuteCommand(Func execute, string sql, int messageStoreTimeout, + NpgsqlParameter[] parameters) + { + var connectionProvider = GetConnectionProvider(); + var connection = GetOpenConnection(connectionProvider); + + try + { + using (var command = connection.CreateCommand()) + { + command.CommandText = sql; + command.Parameters.AddRange(parameters); + + if (messageStoreTimeout != -1) + command.CommandTimeout = messageStoreTimeout; + + return execute(command); + } + } + finally + { + if (!connectionProvider.IsSharedConnection) + connection.Dispose(); + else if (!connectionProvider.HasOpenTransaction) + connection.Close(); + } + } + + private NpgsqlCommand InitAddDbCommand(NpgsqlConnection connection, NpgsqlParameter[] parameters) + { + var command = connection.CreateCommand(); + + var addSqlFormat = "INSERT INTO {0} (MessageId, MessageType, Topic, Timestamp, CorrelationId, ReplyTo, ContentType, HeaderBag, Body) VALUES (@MessageId, @MessageType, @Topic, @Timestamp::timestamptz, @CorrelationId, @ReplyTo, @ContentType, @HeaderBag, @Body)"; + + command.CommandText = string.Format(addSqlFormat, _configuration.OutboxTableName); + command.Parameters.AddRange(parameters); + + return command; + } private Message MapFunction(IDataReader reader) { @@ -363,11 +445,11 @@ private Message MapAMessage(IDataReader dr) var contentType = GetContentType(dr); var header = new MessageHeader( - messageId:id, - topic:topic, - messageType:messageType, - timeStamp:timeStamp, - handledCount:0, + messageId: id, + topic: topic, + messageType: messageType, + timeStamp: timeStamp, + handledCount: 0, delayedMilliseconds: 0, correlationId: correlationId, replyTo: replyTo, @@ -405,7 +487,8 @@ private static Guid GetMessageId(IDataReader dr) private string GetContentType(IDataReader dr) { var ordinal = dr.GetOrdinal("ContentType"); - if (dr.IsDBNull(ordinal)) return null; + if (dr.IsDBNull(ordinal)) + return null; var replyTo = dr.GetString(ordinal); return replyTo; @@ -413,11 +496,12 @@ private string GetContentType(IDataReader dr) private string GetReplyTo(IDataReader dr) { - var ordinal = dr.GetOrdinal("ReplyTo"); - if (dr.IsDBNull(ordinal)) return null; + var ordinal = dr.GetOrdinal("ReplyTo"); + if (dr.IsDBNull(ordinal)) + return null; - var replyTo = dr.GetString(ordinal); - return replyTo; + var replyTo = dr.GetString(ordinal); + return replyTo; } private static Dictionary GetContextBag(IDataReader dr) @@ -431,7 +515,8 @@ private static Dictionary GetContextBag(IDataReader dr) private Guid? GetCorrelationId(IDataReader dr) { var ordinal = dr.GetOrdinal("CorrelationId"); - if (dr.IsDBNull(ordinal)) return null; + if (dr.IsDBNull(ordinal)) + return null; var correlationId = dr.GetGuid(ordinal); return correlationId; @@ -447,7 +532,4 @@ private static DateTime GetTimeStamp(IDataReader dr) } } - - } - diff --git a/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs index f8ff53093e..16637e5e10 100644 --- a/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.Outbox.PostgreSql/ServiceCollectionExtensions.cs @@ -1,26 +1,70 @@ using System; using Microsoft.Extensions.DependencyInjection; using Paramore.Brighter.Extensions.DependencyInjection; +using Paramore.Brighter.PostgreSql; namespace Paramore.Brighter.Outbox.PostgreSql { public static class ServiceCollectionExtensions { public static IBrighterBuilder UsePostgreSqlOutbox( - this IBrighterBuilder brighterBuilder, PostgreSqlOutboxConfiguration configuration, ServiceLifetime serviceLifetime = ServiceLifetime.Singleton) + this IBrighterBuilder brighterBuilder, PostgreSqlOutboxConfiguration configuration, Type connectionProvider = null, ServiceLifetime serviceLifetime = ServiceLifetime.Singleton) { + if (brighterBuilder is null) + throw new ArgumentNullException($"{nameof(brighterBuilder)} cannot be null.", nameof(brighterBuilder)); + + if (configuration is null) + throw new ArgumentNullException($"{nameof(configuration)} cannot be null.", nameof(configuration)); + brighterBuilder.Services.AddSingleton(configuration); + if (connectionProvider is object) + { + if (!typeof(IPostgreSqlConnectionProvider).IsAssignableFrom(connectionProvider)) + throw new Exception($"Unable to register provider of type {connectionProvider.GetType().Name}. Class does not implement interface {nameof(IPostgreSqlConnectionProvider)}."); + + brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IPostgreSqlConnectionProvider), connectionProvider, serviceLifetime)); + } + brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmAnOutboxSync), BuildPostgreSqlOutboxSync, serviceLifetime)); return brighterBuilder; } + /// + /// Use this transaction provider to ensure that the Outbox and the Entity Store are correct + /// + /// Allows extension method + /// What is the type of the connection provider. Must implement interface IPostgreSqlTransactionConnectionProvider + /// What is the lifetime of registered interfaces + /// Allows fluent syntax + /// This is paired with Use Outbox (above) when required + /// Registers the following + /// -- IAmABoxTransactionConnectionProvider: the provider of a connection for any existing transaction + public static IBrighterBuilder UsePostgreSqlTransactionConnectionProvider( + this IBrighterBuilder brighterBuilder, Type connectionProvider, + ServiceLifetime serviceLifetime = ServiceLifetime.Scoped) + { + if (brighterBuilder is null) + throw new ArgumentNullException($"{nameof(brighterBuilder)} cannot be null.", nameof(brighterBuilder)); + + if (connectionProvider is null) + throw new ArgumentNullException($"{nameof(connectionProvider)} cannot be null.", nameof(connectionProvider)); + + if (!typeof(IPostgreSqlTransactionConnectionProvider).IsAssignableFrom(connectionProvider)) + throw new Exception($"Unable to register provider of type {connectionProvider.GetType().Name}. Class does not implement interface {nameof(IPostgreSqlTransactionConnectionProvider)}."); + + brighterBuilder.Services.Add(new ServiceDescriptor(typeof(IAmABoxTransactionConnectionProvider), connectionProvider, serviceLifetime)); + + return brighterBuilder; + } + private static PostgreSqlOutboxSync BuildPostgreSqlOutboxSync(IServiceProvider provider) { var config = provider.GetService(); + var connectionProvider = provider.GetService(); - return new PostgreSqlOutboxSync(config); + return new PostgreSqlOutboxSync(config, connectionProvider); } } } diff --git a/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj new file mode 100644 index 0000000000..6736d47c44 --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/Paramore.Brighter.PostgreSql.EntityFrameworkCore.csproj @@ -0,0 +1,22 @@ + + + + Sam Rumley + Common components required to get a PostgreSql connection from Entity Framework Core. + netstandard2.1;net6.0 + RabbitMQ;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability + + + + + + + + + + + + + + + diff --git a/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs new file mode 100644 index 0000000000..9f14c6d1e8 --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql.EntityFrameworkCore/PostgreSqlEntityFrameworkConnectionProvider.cs @@ -0,0 +1,60 @@ +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Npgsql; + +namespace Paramore.Brighter.PostgreSql.EntityFrameworkCore +{ + /// + /// A connection provider that uses the same connection as EF Core + /// + /// The Db Context to take the connection from + public class PostgreSqlEntityFrameworkConnectionProvider : IPostgreSqlTransactionConnectionProvider where T : DbContext + { + private readonly T _context; + + /// + /// Constructs and instance from a database context + /// + /// The database context to use + public PostgreSqlEntityFrameworkConnectionProvider(T context) + { + _context = context; + } + + /// + /// Get the current connection of the database context + /// + /// The NpgsqlConnection that is in use + public NpgsqlConnection GetConnection() + { + return (NpgsqlConnection)_context.Database.GetDbConnection(); + } + + /// + /// Get the current connection of the database context + /// + /// A cancellation token + /// + public Task GetConnectionAsync(CancellationToken cancellationToken = default) + { + var tcs = new TaskCompletionSource(); + tcs.SetResult((NpgsqlConnection)_context.Database.GetDbConnection()); + return tcs.Task; + } + + /// + /// Get the ambient Transaction + /// + /// The NpgsqlTransaction + public NpgsqlTransaction GetTransaction() + { + return (NpgsqlTransaction)_context.Database.CurrentTransaction?.GetDbTransaction(); + } + + public bool HasOpenTransaction { get => _context.Database.CurrentTransaction != null; } + + public bool IsSharedConnection { get => true; } + } +} diff --git a/src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs new file mode 100644 index 0000000000..c0df3cdf5f --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql/IPostgreSqlConnectionProvider.cs @@ -0,0 +1,19 @@ +using System.Threading; +using System.Threading.Tasks; +using Npgsql; + +namespace Paramore.Brighter.PostgreSql +{ + public interface IPostgreSqlConnectionProvider + { + NpgsqlConnection GetConnection(); + + Task GetConnectionAsync(CancellationToken cancellationToken = default); + + NpgsqlTransaction GetTransaction(); + + bool HasOpenTransaction { get; } + + bool IsSharedConnection { get; } + } +} diff --git a/src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs new file mode 100644 index 0000000000..82df857aca --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql/IPostgreSqlTransactionConnectionProvider.cs @@ -0,0 +1,4 @@ +namespace Paramore.Brighter.PostgreSql +{ + public interface IPostgreSqlTransactionConnectionProvider : IPostgreSqlConnectionProvider, IAmABoxTransactionConnectionProvider { } +} diff --git a/src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj b/src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj new file mode 100644 index 0000000000..8f2e074ee9 --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql/Paramore.Brighter.PostgreSql.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.0 + Sam Rumley + Common components required to connect to PostgreSql for inbox and outbox. + RabbitMQ;AMQP;Command;Event;Service Activator;Decoupled;Invocation;Messaging;Remote;Command Dispatcher;Command Processor;Request;Service;Task Queue;Work Queue;Retry;Circuit Breaker;Availability + + + + + + + + + + + diff --git a/src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs b/src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs new file mode 100644 index 0000000000..649f613e04 --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql/PostgreSqlConfiguration.cs @@ -0,0 +1,12 @@ +namespace Paramore.Brighter.PostgreSql +{ + public abstract class PostgreSqlConfiguration + { + protected PostgreSqlConfiguration(string connectionString) + { + ConnectionString = connectionString; + } + + public string ConnectionString { get; } + } +} diff --git a/src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs b/src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs new file mode 100644 index 0000000000..0eb773f027 --- /dev/null +++ b/src/Paramore.Brighter.PostgreSql/PostgreSqlNpgsqlConnectionProvider.cs @@ -0,0 +1,42 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Npgsql; + +namespace Paramore.Brighter.PostgreSql +{ + public class PostgreSqlNpgsqlConnectionProvider : IPostgreSqlConnectionProvider + { + private readonly string _connectionString; + + public PostgreSqlNpgsqlConnectionProvider(PostgreSqlConfiguration configuration) + { + if (string.IsNullOrWhiteSpace(configuration?.ConnectionString)) + throw new ArgumentNullException(nameof(configuration.ConnectionString)); + + _connectionString = configuration.ConnectionString; + } + + public NpgsqlConnection GetConnection() + { + return new NpgsqlConnection(_connectionString); + } + + public async Task GetConnectionAsync(CancellationToken cancellationToken = default) + { + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + tcs.SetResult(GetConnection()); + return await tcs.Task; + } + + public NpgsqlTransaction GetTransaction() + { + //This connection factory does not support transactions + return null; + } + + public bool HasOpenTransaction { get => false; } + + public bool IsSharedConnection { get => false; } + } +}