Skip to content

Commit

Permalink
fix: avoid to access client consumer assignment directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 22, 2021
1 parent c8be4ee commit 954510b
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 61 deletions.
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample.Dashboard/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void ConfigureServices(IServiceCollection services)
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id")
.EnableTelemetry("kafka-flow.telemetry", "kafka-flow.telemetry.group.id")
.EnableTelemetry("kafka-flow.admin", "kafka-flow.telemetry.group.id")
.AddConsumer(
consumer => consumer
.Topics("topic-dashboard")
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow.Admin.Dashboard/ClientApp/dist/main.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<ngb-alert #successAlert *ngIf="successMessage" type="success" (closed)="successMessage=''">
<div class="text-center"><b>Success! </b> <span class="text-center">{{successMessage}}</span></div>
</ngb-alert>
<div class="container" *ngFor="let group of telemetryResponse.groups ">
<div class="container" *ngFor="let group of telemetryResponse?.groups ">
<div class="card my-3">
<div class="card-body">
<h3>Group Id: {{ group.groupId }}</h3>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,19 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<ResetConsumerOffsetHandler>())));
.AddHandlers(new[]
{
typeof(ChangeConsumerWorkersCountHandler),
typeof(PauseConsumerByNameHandler),
typeof(PauseConsumersByGroupHandler),
typeof(PauseConsumersByGroupTopicHandler),
typeof(ResetConsumerOffsetHandler),
typeof(RestartConsumerByNameHandler),
typeof(ResumeConsumerByNameHandler),
typeof(ResumeConsumersByGroupHandler),
typeof(ResumeConsumersByGroupTopicHandler),
typeof(RewindConsumerOffsetToDateTimeHandler),
}))));
}

/// <inheritdoc cref="EnableAdminMessages(KafkaFlow.Configuration.IClusterConfigurationBuilder,string,string)"/>
Expand Down Expand Up @@ -108,7 +120,7 @@ public static IClusterConfigurationBuilder EnableTelemetry(
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<ConsumerTelemetryMetricHandler>())))
.AddHandler<ConsumerTelemetryMetricHandler>())))
.OnStarted(resolver => resolver.Resolve<ITelemetryScheduler>().Start(telemetryId, topicName))
.OnStopping(resolver => resolver.Resolve<ITelemetryScheduler>().Stop(telemetryId));
}
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Configuration/ClusterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ public void AddProducers(IEnumerable<IProducerConfiguration> configurations) =>

public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke();
}
}
}
36 changes: 22 additions & 14 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, Lis

private readonly List<Action<IConsumer<byte[], byte[]>, Error>> errorsHandlers = new();
private readonly List<Action<IConsumer<byte[], byte[]>, string>> statisticsHandlers = new();
private readonly ConsumerFlowManager flowManager;

private IConsumer<byte[], byte[]> consumer;

Expand All @@ -31,6 +32,9 @@ public Consumer(
this.dependencyResolver = dependencyResolver;
this.logHandler = logHandler;
this.Configuration = configuration;
this.flowManager = new ConsumerFlowManager(
this,
this.logHandler);

foreach (var handler in this.Configuration.StatisticsHandlers)
{
Expand All @@ -52,9 +56,9 @@ public Consumer(

public IReadOnlyList<string> Subscription => this.consumer?.Subscription.AsReadOnly();

public IReadOnlyList<TopicPartition> Assignment => this.consumer?.Assignment.AsReadOnly();
public IReadOnlyList<TopicPartition> Assignment { get; private set; } = new List<TopicPartition>();

public IConsumerFlowManager FlowManager { get; private set; }
public IConsumerFlowManager FlowManager => this.flowManager;

public string MemberId => this.consumer?.MemberId;

Expand All @@ -64,7 +68,7 @@ public ConsumerStatus Status
{
get
{
if (this.FlowManager is null || this.Assignment.Count == 0)
if (this.FlowManager is null)
{
return ConsumerStatus.Stopped;
}
Expand All @@ -74,7 +78,7 @@ public ConsumerStatus Status
return ConsumerStatus.Running;
}

return this.FlowManager.PausedPartitions.Count == this.consumer.Assignment.Count ?
return this.FlowManager.PausedPartitions.Count == this.Assignment.Count ?
ConsumerStatus.Paused :
ConsumerStatus.PartiallyRunning;
}
Expand Down Expand Up @@ -115,7 +119,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
try
{
this.EnsureConsumer();

await this.flowManager.SemaphoreWait();
return this.consumer.Consume(cancellationToken);
}
catch (OperationCanceledException)
Expand All @@ -137,6 +141,10 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
{
this.logHandler.Error("Kafka Consumer Error", ex, null);
}
finally
{
this.flowManager.SemaphoreRelease();
}
}
}

Expand All @@ -158,20 +166,20 @@ private void EnsureConsumer()
.SetPartitionsAssignedHandler(
(consumer, partitions) =>
{
this.FlowManager = new ConsumerFlowManager(
this,
this.consumer,
this.logHandler);
this.Assignment = partitions;
this.flowManager.Start(consumer);

this.partitionsAssignedHandlers.ForEach(x => x(this.dependencyResolver, consumer, partitions));
this.partitionsAssignedHandlers.ForEach(x =>
x(this.dependencyResolver, consumer, partitions));
})
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
this.FlowManager.Dispose();
this.FlowManager = null;
this.Assignment = new List<TopicPartition>();
this.flowManager.Stop();

this.partitionsRevokedHandlers.ForEach(x => x(this.dependencyResolver, consumer, partitions));
this.partitionsRevokedHandlers.ForEach(
x => x(this.dependencyResolver, consumer, partitions));
})
.SetErrorHandler((consumer, error) => this.errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => this.statisticsHandlers.ForEach(x => x(consumer, statistics)))
Expand All @@ -186,4 +194,4 @@ private void InvalidateConsumer()
this.consumer = null;
}
}
}
}
107 changes: 67 additions & 40 deletions src/KafkaFlow/Consumers/ConsumerFlowManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@ namespace KafkaFlow.Consumers
internal class ConsumerFlowManager : IConsumerFlowManager
{
private readonly IConsumer consumer;
private readonly IConsumer<byte[], byte[]> clientConsumer;
private readonly ILogHandler logHandler;
private readonly List<TopicPartition> pausedPartitions = new();
private readonly SemaphoreSlim consumerSemaphore = new(1, 1);

private IConsumer<byte[], byte[]> clientConsumer;
private CancellationTokenSource heartbeatTokenSource;
private Task heartbeatTask;

public ConsumerFlowManager(
IConsumer consumer,
IConsumer<byte[], byte[]> clientConsumer,
ILogHandler logHandler)
{
this.consumer = consumer;
this.clientConsumer = clientConsumer;
this.logHandler = logHandler;
}

Expand All @@ -48,37 +47,18 @@ public void Pause(IReadOnlyCollection<TopicPartition> topicPartitions)
return;
}

this.heartbeatTokenSource = new CancellationTokenSource();

this.heartbeatTask = Task.Run(
() =>
{
const int consumeTimeoutCall = 1000;
this.StartHeartbeat();
}
}

try
{
while (!this.heartbeatTokenSource.IsCancellationRequested)
{
var result = this.clientConsumer.Consume(consumeTimeoutCall);
public Task SemaphoreWait()
{
return this.consumerSemaphore.WaitAsync();
}

if (result != null)
{
this.logHandler.Warning(
"Paused consumer heartbeat process wrongly read a message, please report this issue",
null);
}
}
}
catch (Exception ex)
{
this.logHandler.Error(
"Error executing paused consumer background heartbeat",
ex,
null);
}
},
this.heartbeatTokenSource.Token);
}
public void SemaphoreRelease()
{
this.consumerSemaphore.Release();
}

public void Resume(IReadOnlyCollection<TopicPartition> topicPartitions)
Expand All @@ -100,22 +80,69 @@ public void Resume(IReadOnlyCollection<TopicPartition> topicPartitions)
return;
}

this.heartbeatTokenSource?.Cancel();

this.heartbeatTask.GetAwaiter().GetResult();
this.heartbeatTask.Dispose();
this.StopHeartbeat();

this.clientConsumer.Resume(topicPartitions);
}
}

public void Dispose()

public void Start(IConsumer<byte[], byte[]> clientConsumer)
{
this.heartbeatTokenSource?.Cancel();
this.heartbeatTokenSource?.Dispose();
this.clientConsumer = clientConsumer;
}

public void Stop()
{
this.pausedPartitions.Clear();
this.StopHeartbeat();
}

private void StartHeartbeat()
{
this.heartbeatTokenSource = new CancellationTokenSource();

this.heartbeatTask = Task.Run(
() =>
{
if (this.consumerSemaphore.Wait(0))
{
try
{
const int consumeTimeoutCall = 1000;

while (!this.heartbeatTokenSource.IsCancellationRequested)
{
var result = this.clientConsumer.Consume(consumeTimeoutCall);

if (result != null)
{
this.logHandler.Warning(
"Paused consumer heartbeat process wrongly read a message, please report this issue",
null);
}
}
}
catch (Exception ex)
{
this.logHandler.Error(
"Error executing paused consumer background heartbeat",
ex,
null);
}
finally
{
this.consumerSemaphore.Release();
}
}
});
}

private void StopHeartbeat()
{
this.heartbeatTokenSource?.Cancel();
this.heartbeatTask?.GetAwaiter().GetResult();
this.heartbeatTask?.Dispose();
}
}
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/IConsumerFlowManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaFlow.Consumers
/// <summary>
/// The consumer flow manager
/// </summary>
public interface IConsumerFlowManager : IDisposable
public interface IConsumerFlowManager
{
/// <summary>
/// Gets a list of the consumer paused partitions
Expand Down

0 comments on commit 954510b

Please sign in to comment.