From aee26e00e76aa4e925972f000c28f61cd8c59cd4 Mon Sep 17 00:00:00 2001 From: Douglas Lima Date: Tue, 1 Jun 2021 12:39:07 -0300 Subject: [PATCH] feat: create telemetry workflow --- .../KafkaFlow.Sample.Dashboard.csproj | 13 ++++++++----- .../Adapters/ConsumerResponseAdapter.cs | 2 ++ .../ClusterConfigurationBuilderExtensions.cs | 4 ++-- ...{TelemetryCache.cs => InMemoryTelemetryCache.cs} | 8 ++++---- src/KafkaFlow.Admin/TelemetryScheduler.cs | 8 ++++---- src/KafkaFlow/Configuration/ClusterConfiguration.cs | 6 +++--- .../Configuration/ConsumerConfigurationBuilder.cs | 2 +- .../Configuration/IConsumerConfiguration.cs | 2 +- src/KafkaFlow/Configuration/KafkaConfiguration.cs | 3 --- src/KafkaFlow/Consumers/ConsumerManager.cs | 2 -- src/KafkaFlow/Consumers/ConsumerWorkerPool.cs | 2 ++ src/KafkaFlow/Consumers/IConsumerFlowManager.cs | 4 ++++ src/KafkaFlow/Consumers/IMessageConsumer.cs | 2 +- 13 files changed, 32 insertions(+), 26 deletions(-) rename src/KafkaFlow.Admin/{TelemetryCache.cs => InMemoryTelemetryCache.cs} (77%) diff --git a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj index c9cd49e0c..dcd8c3d0e 100644 --- a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj +++ b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj @@ -16,7 +16,6 @@ - @@ -26,11 +25,15 @@ - - - - + + + + + + + + diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs index 9db72a6d4..fbdd76f9e 100644 --- a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs +++ b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs @@ -41,6 +41,7 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co Topic = c.Key, HostName = Environment.MachineName, PausedPartitions = c.Select(x => x.Partition.Value).ToList(), + LastUpdate = DateTime.Now, }) .Union(consumer.RunningPartitions .GroupBy(c => c.Topic) @@ -49,6 +50,7 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co Topic = c.Key, HostName = Environment.MachineName, RunningPartitions = c.Select(x => x.Partition.Value).ToList(), + LastUpdate = DateTime.Now, })); } diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index cb67e5637..22748a32b 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -33,7 +33,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages( cluster.DependencyConfigurator.AddSingleton(); cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); return cluster .AddProducer( @@ -81,7 +81,7 @@ public static IClusterConfigurationBuilder EnableTelemetry( string consumerGroup) { cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); var groupId = $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; diff --git a/src/KafkaFlow.Admin/TelemetryCache.cs b/src/KafkaFlow.Admin/InMemoryTelemetryCache.cs similarity index 77% rename from src/KafkaFlow.Admin/TelemetryCache.cs rename to src/KafkaFlow.Admin/InMemoryTelemetryCache.cs index 1ff821325..61e7c766c 100644 --- a/src/KafkaFlow.Admin/TelemetryCache.cs +++ b/src/KafkaFlow.Admin/InMemoryTelemetryCache.cs @@ -7,15 +7,15 @@ namespace KafkaFlow.Admin /// /// Provide cache operations related to telemetry data /// - public class TelemetryCache : ITelemetryCache + public class InMemoryTelemetryCache : ITelemetryCache { private readonly IMemoryCache cache; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - /// The memory cache interface to get metric data - public TelemetryCache(IMemoryCache cache) => this.cache = cache; + /// The memory cache class to manage telemetry data + public InMemoryTelemetryCache(IMemoryCache cache) => this.cache = cache; /// public List Get(string groupId, string consumerName) diff --git a/src/KafkaFlow.Admin/TelemetryScheduler.cs b/src/KafkaFlow.Admin/TelemetryScheduler.cs index 73be65afa..fedbcac70 100644 --- a/src/KafkaFlow.Admin/TelemetryScheduler.cs +++ b/src/KafkaFlow.Admin/TelemetryScheduler.cs @@ -6,21 +6,21 @@ namespace KafkaFlow.Admin internal class TelemetryScheduler { - private static readonly Lazy> timers = new (() => new Dictionary()); + private static readonly Lazy> Timers = new (() => new Dictionary()); public static void Set(string key, TimerCallback callback, TimeSpan dueTime, TimeSpan period) { Unset(key); - timers.Value[key] = new Timer(callback, null, dueTime, period); + Timers.Value[key] = new Timer(callback, null, dueTime, period); } public static void Unset(string key) { - if (timers.Value.TryGetValue(key, out var timer)) + if (Timers.Value.TryGetValue(key, out var timer)) { timer.Dispose(); - timers.Value.Remove(key); + Timers.Value.Remove(key); } } } diff --git a/src/KafkaFlow/Configuration/ClusterConfiguration.cs b/src/KafkaFlow/Configuration/ClusterConfiguration.cs index c5c2e5cfa..68670106c 100644 --- a/src/KafkaFlow/Configuration/ClusterConfiguration.cs +++ b/src/KafkaFlow/Configuration/ClusterConfiguration.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration internal class ClusterConfiguration { private readonly Func securityInformationHandler; - private Action onStopHandler = _ => { }; + private readonly Action onStopHandler = _ => { }; private readonly List producers = new(); private readonly List consumers = new(); @@ -31,12 +31,12 @@ public ClusterConfiguration( public IReadOnlyCollection Consumers => this.consumers.AsReadOnly(); + public Action OnStopHandler => this.onStopHandler; + public void AddConsumers(IEnumerable configurations) => this.consumers.AddRange(configurations); public void AddProducers(IEnumerable configurations) => this.producers.AddRange(configurations); public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke(); - - public Action OnStopHandler => this.onStopHandler; } } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 5a9d63a15..7ea3664eb 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration using Confluent.Kafka; using KafkaFlow.Consumers.DistributionStrategies; - internal class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder + internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder { private readonly List topics = new(); private readonly List> statisticsHandlers = new(); diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs index a953ec80f..52420cfd2 100644 --- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs @@ -30,7 +30,7 @@ public interface IConsumerConfiguration string ConsumerName { get; } /// - /// Gets the consumer readonly flag + /// Gets a value indicating whether the consumer is readonly or not /// bool IsReadonly { get; } diff --git a/src/KafkaFlow/Configuration/KafkaConfiguration.cs b/src/KafkaFlow/Configuration/KafkaConfiguration.cs index af58eff94..b5e9895b0 100644 --- a/src/KafkaFlow/Configuration/KafkaConfiguration.cs +++ b/src/KafkaFlow/Configuration/KafkaConfiguration.cs @@ -1,14 +1,11 @@ namespace KafkaFlow.Configuration { - using System; using System.Collections.Generic; internal class KafkaConfiguration { private readonly List clusters = new(); - public Action OnStopHandler = _ => { }; - public IReadOnlyCollection Clusters => this.clusters; public void AddClusters(IEnumerable configurations) => this.clusters.AddRange(configurations); diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index 2398b1d0b..c112f99c0 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -70,8 +70,6 @@ private void OnPartitionAssigned(IReadOnlyCollection partitions) "Partitions assigned", this.GetConsumerLogInfo(partitions)); - this.Consumer.FlowManager.UpdatePausedPartitions(partitions); - this.WorkerPool .StartAsync(partitions) .GetAwaiter() diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index 18e5cb61e..ad27ddbe0 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -41,6 +41,8 @@ public async Task StartAsync(IEnumerable partitions) this.logHandler), partitions); + this.consumer.FlowManager.UpdatePausedPartitions(partitions); + await Task.WhenAll( Enumerable .Range(0, this.consumer.Configuration.WorkersCount) diff --git a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs index 315b035f2..d35e5a280 100644 --- a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs @@ -31,6 +31,10 @@ public interface IConsumerFlowManager : IDisposable /// A list of partitions void Resume(IReadOnlyCollection topicPartitions); + /// + /// Removes the running partitions from the list of paused partitions + /// + /// A list of partitions that are running void UpdatePausedPartitions(IEnumerable partitionsRunning); } } diff --git a/src/KafkaFlow/Consumers/IMessageConsumer.cs b/src/KafkaFlow/Consumers/IMessageConsumer.cs index 6e288ae46..4838d6813 100644 --- a/src/KafkaFlow/Consumers/IMessageConsumer.cs +++ b/src/KafkaFlow/Consumers/IMessageConsumer.cs @@ -16,7 +16,7 @@ public interface IMessageConsumer string ConsumerName { get; } /// - /// Gets the readonly flag defined in the configuration + /// Gets a value indicating whether the consumer is readonly or not /// bool IsReadonly { get; }