Skip to content

Commit

Permalink
Merge pull request #1879 from preardon/feature/asb-sqlFilter
Browse files Browse the repository at this point in the history
Add Control over Subscription Creation
  • Loading branch information
preardon authored Dec 2, 2021
2 parents 40e8c30 + 4a0edfa commit dc11d11
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 31 deletions.
1 change: 1 addition & 0 deletions samples/ASBTaskQueue/GreetingsWorker/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
using Greetings.Adaptors.Data;
using Greetings.Adaptors.Services;
using Greetings.Ports.CommandHandlers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public IAmAChannel CreateChannel(Subscription subscription)
throw new ArgumentException("The minimum allowed timeout is 400 milliseconds");
}

IAmAMessageConsumer messageConsumer = _azureServiceBusConsumerFactory.Create(azureServiceBusSubscription);
IAmAMessageConsumer messageConsumer =
_azureServiceBusConsumerFactory.Create(azureServiceBusSubscription);

return new Channel(
channelName: subscription.ChannelName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer
private bool _subscriptionCreated;
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<AzureServiceBusConsumer>();
private readonly OnMissingChannel _makeChannel;
private readonly AzureServiceBusSubscriptionConfiguration _subscriptionConfiguration;
private readonly ServiceBusReceiveMode _receiveMode;

/// <summary>
Expand All @@ -35,8 +36,13 @@ public class AzureServiceBusConsumer : IAmAMessageConsumer
/// <param name="batchSize">How many messages to receive at a time.</param>
/// <param name="receiveMode">The mode in which to Receive.</param>
/// <param name="makeChannels">The mode in which to make Channels.</param>
public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMessageProducerSync messageProducerSync, IAdministrationClientWrapper administrationClientWrapper,
IServiceBusReceiverProvider serviceBusReceiverProvider, int batchSize = 10, ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.ReceiveAndDelete, OnMissingChannel makeChannels = OnMissingChannel.Create)
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
public AzureServiceBusConsumer(string topicName, string subscriptionName,
IAmAMessageProducerSync messageProducerSync, IAdministrationClientWrapper administrationClientWrapper,
IServiceBusReceiverProvider serviceBusReceiverProvider, int batchSize = 10,
ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.ReceiveAndDelete,
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = default)
{
_subscriptionName = subscriptionName;
_topicName = topicName;
Expand All @@ -45,6 +51,7 @@ public AzureServiceBusConsumer(string topicName, string subscriptionName, IAmAMe
_serviceBusReceiverProvider = serviceBusReceiverProvider;
_batchSize = batchSize;
_makeChannel = makeChannels;
_subscriptionConfiguration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration();
_receiveMode = receiveMode;

GetMessageReceiverProvider();
Expand Down Expand Up @@ -285,8 +292,6 @@ private static int GetHandledCount(IBrokeredMessageWrapper azureServiceBusMessag

private void EnsureSubscription()
{
const int maxDeliveryCount = 2000;

if (_subscriptionCreated || _makeChannel.Equals(OnMissingChannel.Assume))
return;

Expand All @@ -304,7 +309,7 @@ private void EnsureSubscription()
$"Subscription {_subscriptionName} does not exist on topic {_topicName} and missing channel mode set to Validate.");
}

_administrationClientWrapper.CreateSubscription(_topicName, _subscriptionName, maxDeliveryCount);
_administrationClientWrapper.CreateSubscription(_topicName, _subscriptionName, _subscriptionConfiguration);
_subscriptionCreated = true;
}
catch (ServiceBusException ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ public IAmAMessageConsumer Create(Subscription subscription)
{
var nameSpaceManagerWrapper = new AdministrationClientWrapper(_clientProvider);

AzureServiceBusSubscriptionConfiguration config = new AzureServiceBusSubscriptionConfiguration();
if (subscription is AzureServiceBusSubscription sub) config = sub.Configuration;

return new AzureServiceBusConsumer(subscription.RoutingKey, subscription.ChannelName,
new AzureServiceBusMessageProducer(nameSpaceManagerWrapper,
new ServiceBusSenderProvider(_clientProvider), subscription.MakeChannels), nameSpaceManagerWrapper,
new ServiceBusReceiverProvider(_clientProvider),
makeChannels: subscription.MakeChannels,
receiveMode: _ackOnRead ? ServiceBusReceiveMode.ReceiveAndDelete : ServiceBusReceiveMode.PeekLock,
batchSize: subscription.BufferSize);
batchSize: subscription.BufferSize,
subscriptionConfiguration: config);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ namespace Paramore.Brighter.MessagingGateway.AzureServiceBus
/// </summary>
public class AzureServiceBusSubscription : Subscription
{
public AzureServiceBusSubscriptionConfiguration Configuration { get; }

/// <summary>
/// Initializes an Instance of <see cref="AzureServiceBusSubscription"/>
/// </summary>
Expand All @@ -23,6 +25,7 @@ public class AzureServiceBusSubscription : Subscription
/// <param name="isAsync"></param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
public AzureServiceBusSubscription(
Type dataType,
SubscriptionName name = null,
Expand All @@ -36,10 +39,12 @@ public AzureServiceBusSubscription(
int unacceptableMessageLimit = 0,
bool isAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount, requeueDelayInMs, unacceptableMessageLimit, isAsync, channelFactory,
makeChannels)
{
Configuration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration();
}
}

Expand All @@ -64,6 +69,7 @@ public class AzureServiceBusSubscription<T> : AzureServiceBusSubscription where
/// <param name="isAsync"></param>
/// <param name="channelFactory">The channel factory to create channels for Consumer.</param>
/// <param name="makeChannels">Should we make channels if they don't exist, defaults to creating</param>
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
public AzureServiceBusSubscription(
SubscriptionName name = null,
ChannelName channelName = null,
Expand All @@ -76,10 +82,11 @@ public AzureServiceBusSubscription(
int unacceptableMessageLimit = 0,
bool isAsync = false,
IAmAChannelFactory channelFactory = null,
OnMissingChannel makeChannels = OnMissingChannel.Create)
OnMissingChannel makeChannels = OnMissingChannel.Create,
AzureServiceBusSubscriptionConfiguration subscriptionConfiguration = null)
: base(typeof(T), name, channelName, routingKey, bufferSize, noOfPerformers,
timeoutInMilliseconds, requeueCount, requeueDelayInMs, unacceptableMessageLimit,
isAsync, channelFactory, makeChannels)
isAsync, channelFactory, makeChannels, subscriptionConfiguration)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus
{
public class AzureServiceBusSubscriptionConfiguration
{
/// <summary>
/// The Maximum amount of times that a Message can be delivered before it is dead Lettered
/// </summary>
public int MaxDeliveryCount { get; set; } = 5;
/// <summary>
/// Dead letter a message when it expires
/// </summary>
public bool DeadLetteringOnMessageExpiration { get; set; } = true;
/// <summary>
/// How long message locks are held for
/// </summary>
public TimeSpan LockDuration { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>
/// How long messages sit in the queue before they expire
/// </summary>
public TimeSpan DefaultMessageTimeToLive { get; set; } = TimeSpan.FromDays(3);
/// <summary>
/// A Sql Filter to apply to the subscription
/// </summary>
public string SqlFilter = String.Empty;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -146,12 +148,24 @@ public bool SubscriptionExists(string topicName, string subscriptionName)
/// </summary>
/// <param name="topicName">The name of the Topic.</param>
/// <param name="subscriptionName">The name of the Subscription.</param>
/// <param name="maxDeliveryCount">Maximum message delivery count.</param>
public void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount = 2000)
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
public void CreateSubscription(string topicName, string subscriptionName, AzureServiceBusSubscriptionConfiguration subscriptionConfiguration)
{
CreateSubscriptionAsync(topicName, subscriptionName, maxDeliveryCount).Wait();
CreateSubscriptionAsync(topicName, subscriptionName, subscriptionConfiguration).Wait();
}


/// <summary>
/// Get a Subscription.
/// </summary>
/// <param name="topicName">The name of the Topic.</param>
/// <param name="subscriptionName">The name of the Subscription.</param>
/// <param name="cancellationToken">The Cancellation Token.</param>
public async Task<SubscriptionProperties> GetSubscriptionAsync(string topicName, string subscriptionName,
CancellationToken cancellationToken = default)
{
return await _administrationClient.GetSubscriptionAsync(topicName, subscriptionName, cancellationToken);
}

private void Initialise()
{
s_logger.LogDebug("Initialising new management client wrapper...");
Expand All @@ -169,7 +183,7 @@ private void Initialise()
s_logger.LogDebug("New management client wrapper initialised.");
}

private async Task CreateSubscriptionAsync(string topicName, string subscriptionName, int maxDeliveryCount = 2000)
private async Task CreateSubscriptionAsync(string topicName, string subscriptionName, AzureServiceBusSubscriptionConfiguration subscriptionConfiguration)
{
s_logger.LogInformation("Creating subscription {ChannelName} for topic {Topic}...", subscriptionName, topicName);

Expand All @@ -180,12 +194,18 @@ private async Task CreateSubscriptionAsync(string topicName, string subscription

var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
{
MaxDeliveryCount = maxDeliveryCount
MaxDeliveryCount = subscriptionConfiguration.MaxDeliveryCount,
DeadLetteringOnMessageExpiration = subscriptionConfiguration.DeadLetteringOnMessageExpiration,
LockDuration = subscriptionConfiguration.LockDuration,
DefaultMessageTimeToLive = subscriptionConfiguration.DefaultMessageTimeToLive
};

var ruleOptions = string.IsNullOrEmpty(subscriptionConfiguration.SqlFilter)
? new CreateRuleOptions() : new CreateRuleOptions("sqlFilter",new SqlRuleFilter(subscriptionConfiguration.SqlFilter));

try
{
await _administrationClient.CreateSubscriptionAsync(subscriptionOptions);
await _administrationClient.CreateSubscriptionAsync(subscriptionOptions, ruleOptions);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers
{
Expand Down Expand Up @@ -39,12 +41,21 @@ public interface IAdministrationClientWrapper
/// </summary>
/// <param name="topicName">The name of the Topic.</param>
/// <param name="subscriptionName">The name of the Subscription.</param>
/// <param name="maxDeliveryCount">Maximum message delivery count.</param>
void CreateSubscription(string topicName, string subscriptionName, int maxDeliveryCount);
/// <param name="subscriptionConfiguration">The configuration options for the subscriptions.</param>
void CreateSubscription(string topicName, string subscriptionName, AzureServiceBusSubscriptionConfiguration subscriptionConfiguration);

/// <summary>
/// Reset the Connection.
/// </summary>
void Reset();

/// <summary>
/// Get a Subscription.
/// </summary>
/// <param name="topicName">The name of the Topic.</param>
/// <param name="subscriptionName">The name of the Subscription.</param>
/// <param name="cancellationToken">The Cancellation Token.</param>
Task<SubscriptionProperties> GetSubscriptionAsync(string topicName, string subscriptionName,
CancellationToken cancellationToken = default);
}
}
Loading

0 comments on commit dc11d11

Please sign in to comment.