From 4a0edfabd254094ebbcfb9c258dfa3179f3d45aa Mon Sep 17 00:00:00 2001 From: Paul Reardon Date: Thu, 2 Dec 2021 00:25:16 +0000 Subject: [PATCH] Added Subscription Configuration - Added a SubscriptionConfiguration so we have controller over the created Subscriptions - Plumbed in Subscription Configuration throught --- .../ASBTaskQueue/GreetingsWorker/Program.cs | 3 +- .../AzureServiceBusChannelFactory.cs | 2 +- .../AzureServiceBusConsumer.cs | 11 +++---- .../AzureServiceBusConsumerFactory.cs | 5 ++- .../AzureServiceBusSubscription.cs | 14 ++++---- ...zureServiceBusSubscriptionConfiguration.cs | 29 +++++++++++++++++ .../AdministrationClientWrapper.cs | 32 ++++++++++++++----- .../IAdministrationClientWrapper.cs | 17 ++++++++-- .../AzureServiceBusConsumerTests.cs | 22 +++++++------ ...en_consuming_a_message_via_the_consumer.cs | 27 +++++++++++++++- ...When_posting_a_message_via_the_producer.cs | 2 +- 11 files changed, 126 insertions(+), 38 deletions(-) create mode 100644 src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscriptionConfiguration.cs diff --git a/samples/ASBTaskQueue/GreetingsWorker/Program.cs b/samples/ASBTaskQueue/GreetingsWorker/Program.cs index 5df8c125ae..1a9175c322 100644 --- a/samples/ASBTaskQueue/GreetingsWorker/Program.cs +++ b/samples/ASBTaskQueue/GreetingsWorker/Program.cs @@ -50,8 +50,7 @@ public async static Task Main(string[] args) makeChannels: OnMissingChannel.Create, requeueCount: 3, isAsync: true, - noOfPerformers: 2, - sqlFilter: ""), + noOfPerformers: 2), new AzureServiceBusSubscription( new SubscriptionName(GreetingEventMessageMapper.Topic), new ChannelName(subscriptionName), diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs index f68115d8c3..920f16e7da 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusChannelFactory.cs @@ -36,7 +36,7 @@ public IAmAChannel CreateChannel(Subscription subscription) } IAmAMessageConsumer messageConsumer = - _azureServiceBusConsumerFactory.Create(azureServiceBusSubscription, azureServiceBusSubscription.SqlFilter); + _azureServiceBusConsumerFactory.Create(azureServiceBusSubscription); return new Channel( channelName: subscription.ChannelName, diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs index eeeb37bf26..8ae6428a70 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs @@ -19,10 +19,10 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer private readonly int _batchSize; private IServiceBusReceiverWrapper _serviceBusReceiver; private readonly string _subscriptionName; - private readonly string _sqlFilter; private bool _subscriptionCreated; private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); private readonly OnMissingChannel _makeChannel; + private readonly AzureServiceBusSubscriptionConfiguration _subscriptionConfiguration; private readonly ServiceBusReceiveMode _receiveMode; /// @@ -36,12 +36,13 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer /// How many messages to receive at a time. /// The mode in which to Receive. /// The mode in which to make Channels. + /// The configuration options for the subscriptions. public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMessageProducerSync messageProducerSync, IAdministrationClientWrapper administrationClientWrapper, IServiceBusReceiverProvider serviceBusReceiverProvider, int batchSize = 10, ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.ReceiveAndDelete, OnMissingChannel makeChannels = OnMissingChannel.Create, - string sqlFilter = "") + AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = default) { _subscriptionName = subscriptionName; _topicName = topicName; @@ -50,8 +51,8 @@ public AzureServiceBusConsumer(string topicName, string subscriptionName, _serviceBusReceiverProvider = serviceBusReceiverProvider; _batchSize = batchSize; _makeChannel = makeChannels; + _subscriptionConfiguration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration(); _receiveMode = receiveMode; - _sqlFilter = sqlFilter; GetMessageReceiverProvider(); } @@ -291,8 +292,6 @@ private static int GetHandledCount(IBrokeredMessageWrapper azureServiceBusMessag private void EnsureSubscription() { - const int maxDeliveryCount = 2000; - if (_subscriptionCreated || _makeChannel.Equals(OnMissingChannel.Assume)) return; @@ -310,7 +309,7 @@ private void EnsureSubscription() $"Subscription {_subscriptionName} does not exist on topic {_topicName} and missing channel mode set to Validate."); } - _administrationClientWrapper.CreateSubscription(_topicName, _subscriptionName, maxDeliveryCount, _sqlFilter); + _administrationClientWrapper.CreateSubscription(_topicName, _subscriptionName, _subscriptionConfiguration); _subscriptionCreated = true; } catch (ServiceBusException ex) diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs index 7d73ede745..2d7b0ca6e0 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumerFactory.cs @@ -41,6 +41,9 @@ public IAmAMessageConsumer Create(Subscription subscription) { var nameSpaceManagerWrapper = new AdministrationClientWrapper(_clientProvider); + AzureServiceBusSubscriptionConfiguration config = new AzureServiceBusSubscriptionConfiguration(); + if (subscription is AzureServiceBusSubscription sub) config = sub.Configuration; + return new AzureServiceBusConsumer(subscription.RoutingKey, subscription.ChannelName, new AzureServiceBusMessageProducer(nameSpaceManagerWrapper, new ServiceBusSenderProvider(_clientProvider), subscription.MakeChannels), nameSpaceManagerWrapper, @@ -48,7 +51,7 @@ public IAmAMessageConsumer Create(Subscription subscription) makeChannels: subscription.MakeChannels, receiveMode: _ackOnRead ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock, batchSize: subscription.BufferSize, - sqlFilter: sqlFilter); + subscriptionConfiguration: config); } } } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs index d72416f9e1..73914ee36a 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscription.cs @@ -7,8 +7,8 @@ namespace Paramore.Brighter.MessagingGateway.AzureServiceBus /// public class AzureServiceBusSubscription : Subscription { - public string SqlFilter { get; } - + public AzureServiceBusSubscriptionConfiguration Configuration { get; } + /// /// Initializes an Instance of /// @@ -25,6 +25,7 @@ public class AzureServiceBusSubscription : Subscription /// /// The channel factory to create channels for Consumer. /// Should we make channels if they don't exist, defaults to creating + /// The configuration options for the subscriptions. public AzureServiceBusSubscription( Type dataType, SubscriptionName name = null, @@ -39,11 +40,11 @@ public AzureServiceBusSubscription( bool isAsync = false, IAmAChannelFactory channelFactory = null, OnMissingChannel makeChannels = OnMissingChannel.Create, - string sqlFilter = "") + AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null) : base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMs, unacceptableMessageLimit, isAsync, channelFactory, makeChannels) { - SqlFilter = sqlFilter; + Configuration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration(); } } @@ -68,6 +69,7 @@ public class AzureServiceBusSubscription : AzureServiceBusSubscription where /// /// The channel factory to create channels for Consumer. /// Should we make channels if they don't exist, defaults to creating + /// The configuration options for the subscriptions. public AzureServiceBusSubscription( SubscriptionName name = null, ChannelName channelName = null, @@ -81,10 +83,10 @@ public AzureServiceBusSubscription( bool isAsync = false, IAmAChannelFactory channelFactory = null, OnMissingChannel makeChannels = OnMissingChannel.Create, - string sqlFilter = "") + AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null) : base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMs, unacceptableMessageLimit, - isAsync, channelFactory, makeChannels, sqlFilter) + isAsync, channelFactory, makeChannels, subscriptionConfiguration) { } } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscriptionConfiguration.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscriptionConfiguration.cs new file mode 100644 index 0000000000..fc5e3d5b70 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusSubscriptionConfiguration.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; + +namespace Paramore.Brighter.MessagingGateway.AzureServiceBus +{ + public class AzureServiceBusSubscriptionConfiguration + { + /// + /// The Maximum amount of times that a Message can be delivered before it is dead Lettered + /// + public int MaxDeliveryCount { get; set; } = 5; + /// + /// Dead letter a message when it expires + /// + public bool DeadLetteringOnMessageExpiration { get; set; } = true; + /// + /// How long message locks are held for + /// + public TimeSpan LockDuration { get; set; } = TimeSpan.FromMinutes(1); + /// + /// How long messages sit in the queue before they expire + /// + public TimeSpan DefaultMessageTimeToLive { get; set; } = TimeSpan.FromDays(3); + /// + /// A Sql Filter to apply to the subscription + /// + public string SqlFilter = String.Empty; + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/AdministrationClientWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/AdministrationClientWrapper.cs index ee6e6422db..42bfd8a73b 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/AdministrationClientWrapper.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/AdministrationClientWrapper.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus.Administration; @@ -147,12 +148,24 @@ public bool SubscriptionExists(string topicName, string subscriptionName) /// /// The name of the Topic. /// The name of the Subscription. - /// Maximum message delivery count. - public void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount = 2000, string sqlFilter = "") + /// The configuration options for the subscriptions. + public void CreateSubscription(string topicName, string subscriptionName, AzureServiceBusSubscriptionConfiguration subscriptionConfiguration) { - CreateSubscriptionAsync(topicName, subscriptionName, maxDeliveryCount).Wait(); + CreateSubscriptionAsync(topicName, subscriptionName, subscriptionConfiguration).Wait(); } - + + /// + /// Get a Subscription. + /// + /// The name of the Topic. + /// The name of the Subscription. + /// The Cancellation Token. + public async Task GetSubscriptionAsync(string topicName, string subscriptionName, + CancellationToken cancellationToken = default) + { + return await _administrationClient.GetSubscriptionAsync(topicName, subscriptionName, cancellationToken); + } + private void Initialise() { s_logger.LogDebug("Initialising new management client wrapper..."); @@ -170,7 +183,7 @@ private void Initialise() s_logger.LogDebug("New management client wrapper initialised."); } - private async Task CreateSubscriptionAsync(string topicName, string subscriptionName, int maxDeliveryCount = 2000, string sqlFilter = "") + private async Task CreateSubscriptionAsync(string topicName, string subscriptionName, AzureServiceBusSubscriptionConfiguration subscriptionConfiguration) { s_logger.LogInformation("Creating subscription {ChannelName} for topic {Topic}...", subscriptionName, topicName); @@ -181,11 +194,14 @@ private async Task CreateSubscriptionAsync(string topicName, string subscription var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName) { - MaxDeliveryCount = maxDeliveryCount + MaxDeliveryCount = subscriptionConfiguration.MaxDeliveryCount, + DeadLetteringOnMessageExpiration = subscriptionConfiguration.DeadLetteringOnMessageExpiration, + LockDuration = subscriptionConfiguration.LockDuration, + DefaultMessageTimeToLive = subscriptionConfiguration.DefaultMessageTimeToLive }; - var ruleOptions = string.IsNullOrEmpty(sqlFilter) - ? new CreateRuleOptions() : new CreateRuleOptions("sqlFilter",new SqlRuleFilter(sqlFilter)); + var ruleOptions = string.IsNullOrEmpty(subscriptionConfiguration.SqlFilter) + ? new CreateRuleOptions() : new CreateRuleOptions("sqlFilter",new SqlRuleFilter(subscriptionConfiguration.SqlFilter)); try { diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IAdministrationClientWrapper.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IAdministrationClientWrapper.cs index 0ca4be954e..fd87c1ec0e 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IAdministrationClientWrapper.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/IAdministrationClientWrapper.cs @@ -1,4 +1,6 @@ -using System.Threading.Tasks; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers { @@ -39,12 +41,21 @@ public interface IAdministrationClientWrapper /// /// The name of the Topic. /// The name of the Subscription. - /// Maximum message delivery count. - void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount, string sqlFilter = ""); + /// The configuration options for the subscriptions. + void CreateSubscription(string topicName, string subscriptionName, AzureServiceBusSubscriptionConfiguration subscriptionConfiguration); /// /// Reset the Connection. /// void Reset(); + + /// + /// Get a Subscription. + /// + /// The name of the Topic. + /// The name of the Subscription. + /// The Cancellation Token. + Task GetSubscriptionAsync(string topicName, string subscriptionName, + CancellationToken cancellationToken = default); } } diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs index 125ea50455..4b512569b0 100644 --- a/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/AzureServiceBusConsumerTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -19,6 +20,9 @@ public class AzureServiceBusConsumerTests private readonly Mock _mockMessageProducer; private readonly Mock _mockMessageReceiver; + private readonly AzureServiceBusSubscriptionConfiguration _subConfig = + new AzureServiceBusSubscriptionConfiguration(); + public AzureServiceBusConsumerTests() { _nameSpaceManagerWrapper = new Mock(); @@ -30,7 +34,7 @@ public AzureServiceBusConsumerTests() _mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.ReceiveAndDelete)).Returns(_messageReceiver.Object); _azureServiceBusConsumer = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object, - _nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Create); + _nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Create, subscriptionConfiguration: _subConfig); } [Fact] @@ -78,7 +82,7 @@ public void When_a_subscription_does_not_exist_and_messages_are_in_the_queue_the Message[] result = _azureServiceBusConsumer.Receive(400); - _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000, "")); + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", _subConfig)); Assert.Equal("somebody", result[0].Body.Value); } @@ -171,7 +175,7 @@ public void When_the_user_properties_on_the_azure_sb_message_is_null_it_should_d var brokeredMessageList = new List(); var message1 = new Mock(); message1.Setup(m => m.MessageBodyValue).Returns(Encoding.UTF8.GetBytes("somebody")); - message1.Setup(m => m.ApplicationProperties).Returns(null as IReadOnlyDictionary); + message1.Setup(m => m.ApplicationProperties).Returns(new Dictionary()); brokeredMessageList.Add(message1.Object); _messageReceiver.Setup(x => x.Receive(10, TimeSpan.FromMilliseconds(400))).Returns(Task.FromResult>(brokeredMessageList)); @@ -199,7 +203,7 @@ public void When_there_are_no_messages_then_it_returns_an_empty_array() public void When_trying_to_create_a_subscription_which_was_already_created_by_another_thread_it_should_ignore_the_error() { _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); - _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", 2000, "")) + _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", _subConfig)) .Throws(new ServiceBusException("whatever", ServiceBusFailureReason.MessagingEntityAlreadyExists)); var brokeredMessageList = new List(); @@ -213,7 +217,7 @@ public void When_trying_to_create_a_subscription_which_was_already_created_by_an Message[] result = _azureServiceBusConsumer.Receive(400); - _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000, "")); + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", _subConfig)); Assert.Equal("somebody", result[0].Body.Value); } @@ -264,7 +268,7 @@ public void public void When_there_is_an_error_talking_to_servicebus_when_creating_the_subscription_then_a_ChannelFailureException_is_raised_and_ManagementClientWrapper_is_reinitilised() { _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); - _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", 2000, "")).Throws(new Exception()); + _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", _subConfig)).Throws(new Exception()); Assert.Throws(() => _azureServiceBusConsumer.Receive(400)); _nameSpaceManagerWrapper.Verify(managementClientWrapper => managementClientWrapper.Reset(), Times.Once); @@ -300,7 +304,7 @@ public void Once_the_subscription_is_created_or_exits_it_does_not_check_if_it_ex if (subscriptionExists == false) { - _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000, ""), Times.Once); + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", _subConfig), Times.Once); } _nameSpaceManagerWrapper.Verify(f => f.SubscriptionExists("topic", "subscription"), Times.Once); @@ -310,7 +314,7 @@ public void Once_the_subscription_is_created_or_exits_it_does_not_check_if_it_ex public void When_MessagingEntityAlreadyExistsException_does_not_check_if_subscription_exists() { _nameSpaceManagerWrapper.Setup(f => f.SubscriptionExists("topic", "subscription")).Returns(false); - _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", 2000, "")) + _nameSpaceManagerWrapper.Setup(f => f.CreateSubscription("topic", "subscription", new AzureServiceBusSubscriptionConfiguration())) .Throws(new ServiceBusException("whatever", ServiceBusFailureReason.MessagingEntityAlreadyExists)); var brokeredMessageList = new List(); @@ -325,7 +329,7 @@ public void When_MessagingEntityAlreadyExistsException_does_not_check_if_subscri Message[] result = _azureServiceBusConsumer.Receive(400); _azureServiceBusConsumer.Receive(400); - _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", 2000, "")); + _nameSpaceManagerWrapper.Verify(f => f.CreateSubscription("topic", "subscription", _subConfig)); Assert.Equal("somebody", result[0].Body.Value); _nameSpaceManagerWrapper.Verify(f => f.SubscriptionExists("topic", "subscription"), Times.Once); diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_consuming_a_message_via_the_consumer.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_consuming_a_message_via_the_consumer.cs index d8c29d9f45..b1fccf3a9f 100644 --- a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_consuming_a_message_via_the_consumer.cs +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_consuming_a_message_via_the_consumer.cs @@ -1,5 +1,6 @@ using System; using System.Text.Json; +using System.Threading; using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using FluentAssertions; @@ -23,6 +24,7 @@ public class ASBConsumerTests : IDisposable private readonly string _channelName; private readonly ServiceBusClient _serviceBusClient; private readonly IAdministrationClientWrapper _administrationClient; + private readonly AzureServiceBusSubscriptionConfiguration _subscriptionConfiguration; public ASBConsumerTests() { @@ -50,9 +52,18 @@ public ASBConsumerTests() new MessageBody(JsonSerializer.Serialize(command, JsonSerialisationOptions.Options)) ); + _subscriptionConfiguration = new AzureServiceBusSubscriptionConfiguration() + { + DeadLetteringOnMessageExpiration = true, + DefaultMessageTimeToLive = TimeSpan.FromDays(4), + LockDuration = TimeSpan.FromMinutes(3), + MaxDeliveryCount = 7, + SqlFilter = "1=1" + }; + var clientProvider = ASBCreds.ASBClientProvider; _administrationClient = new AdministrationClientWrapper(clientProvider); - _administrationClient.CreateSubscription(_topicName, _channelName, 5); + _administrationClient.CreateSubscription(_topicName, _channelName, _subscriptionConfiguration); _serviceBusClient = clientProvider.GetServiceBusClient(); @@ -114,6 +125,20 @@ public async Task When_Requeueing_a_message_via_the_consumer() requeuedMessage.Body.Value.Should().Be(message.Body.Value); } + [Fact] + public async Task When_A_Subscription_is_created_the_properties_are_set_as_Expected() + { + var sub = await _administrationClient.GetSubscriptionAsync(_topicName, _channelName, CancellationToken.None); + + sub.DeadLetteringOnMessageExpiration.Should() + .Be(_subscriptionConfiguration.DeadLetteringOnMessageExpiration); + sub.DefaultMessageTimeToLive.Should().Be(_subscriptionConfiguration.DefaultMessageTimeToLive); + sub.LockDuration.Should().Be(_subscriptionConfiguration.LockDuration); + sub.MaxDeliveryCount.Should().Be(_subscriptionConfiguration.MaxDeliveryCount); + + //ToDo: Need to Add Test for Filter + } + public void Dispose() { _administrationClient.DeleteTopicAsync(_topicName).GetAwaiter().GetResult(); diff --git a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_posting_a_message_via_the_producer.cs b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_posting_a_message_via_the_producer.cs index 9b465574dc..f799cd484b 100644 --- a/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_posting_a_message_via_the_producer.cs +++ b/tests/Paramore.Brighter.AzureServiceBus.Tests/MessagingGateway/When_posting_a_message_via_the_producer.cs @@ -50,7 +50,7 @@ public ASBProducerTests() var clientProvider = ASBCreds.ASBClientProvider; _administrationClient = new AdministrationClientWrapper(clientProvider); - _administrationClient.CreateSubscription(_topicName, channelName, 5); + _administrationClient.CreateSubscription(_topicName, channelName, new AzureServiceBusSubscriptionConfiguration()); var channelFactory = new AzureServiceBusChannelFactory(new AzureServiceBusConsumerFactory(clientProvider, false));