Skip to content

Commit

Permalink
Remove support for Co-operative Sticky
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Apr 16, 2024
1 parent 4aa013b commit 4d6daa2
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 29 deletions.
2 changes: 1 addition & 1 deletion release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ This section lists features in master, available by [AppVeyor](https://ci.appvey
## Release 9.7.7

- Fix for Kafka transport to remove spurious call to StoreOffset (we commit offsets in the consumer)
- Fix for Kafka transport when partitions re-assigned, new behavior
- Do not allow a Co-operative Sticky strategy as it breaks with manual commits https://github.com/confluentinc/confluent-kafka-dotnet/issues/2206

## Release 9.3.6 ##

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,24 +157,20 @@ public KafkaMessageConsumer(
_readCommittedOffsetsTimeoutMs = readCommittedOffsetsTimeoutMs;

_consumer = new ConsumerBuilder<string, byte[]>(_consumerConfig)
.SetPartitionsAssignedHandler((consumer, partitions) =>
.SetPartitionsAssignedHandler((consumer, list) =>
{
var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}");
var partitions = list.Select(p => $"{p.Topic} : {p.Partition.Value}");
s_logger.LogInformation("Partition Added {Channels}", String.Join(",", partitions));
// Determine strategy and act accordingly
if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
consumer.IncrementalAssign(partitions);
else
consumer.Assign(partitions);
_partitions.AddRange(list);
})
.SetPartitionsRevokedHandler((consumer, partitions) =>
.SetPartitionsRevokedHandler((consumer, list) =>
{
//We should commit any offsets we have stored for these partitions
try
{
_consumer?.Commit(partitions);
_consumer?.Commit(list);
}
catch (KafkaException error)
{
Expand All @@ -184,31 +180,21 @@ public KafkaMessageConsumer(
);
}
var revokedPartitions = partitions.Select(tpo => tpo.TopicPartition);
var revokedPartitionInfo = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
var revokedPartitionInfo = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitionInfo));
// Determine strategy and act accordingly
if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
consumer.IncrementalUnassign(revokedPartitions );
else
consumer.Unassign();
_partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetPartitionsLostHandler((consumer, partitions) =>
_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetPartitionsLostHandler((consumer, list) =>
{
var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
var lostPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
s_logger.LogInformation("Partitions for consumer lost {Channels}", string.Join(",", lostPartitions));
_partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
// This is typically treated the same as revocation
consumer.IncrementalUnassign(_partitions);
_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetErrorHandler((_, error) =>
.SetErrorHandler((consumer, error) =>
{
s_logger.LogError("Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", error.Code,
error.Reason, error.IsFatal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public KafkaSubscription (
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin)
: base(dataType, name, channelName, routingKey, bufferSize, noOfPerformers, timeoutInMilliseconds, requeueCount,
requeueDelayInMilliseconds, unacceptableMessageLimit, isAsync, channelFactory, makeChannels, emptyChannelDelay, channelFailureDelay)
{
Expand All @@ -170,6 +170,10 @@ public KafkaSubscription (
NumPartitions = numOfPartitions;
ReplicationFactor = replicationFactor;
PartitionAssignmentStrategy = partitionAssignmentStrategy;

if (PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
throw new ArgumentOutOfRangeException("partitionAssignmentStrategy",
"CooperativeSticky is not supported for with manual commits, see https://github.com/confluentinc/librdkafka/issues/4059");
}
}

Expand Down Expand Up @@ -226,7 +230,7 @@ public KafkaSubscription(
OnMissingChannel makeChannels = OnMissingChannel.Create,
int emptyChannelDelay = 500,
int channelFailureDelay = 1000,
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky)
PartitionAssignmentStrategy partitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin)
: base(typeof(T), name, channelName, routingKey, groupId, bufferSize, noOfPerformers, timeoutInMilliseconds,
requeueCount, requeueDelayInMilliseconds, unacceptableMessageLimit, offsetDefault, commitBatchSize,
sessionTimeoutMs, maxPollIntervalMs, sweepUncommittedOffsetsIntervalMs, isolationLevel, isAsync,
Expand Down

0 comments on commit 4d6daa2

Please sign in to comment.