diff --git a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj index c9cd49e0c..dcd8c3d0e 100644 --- a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj +++ b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj @@ -16,7 +16,6 @@ - @@ -26,11 +25,15 @@ - - - - + + + + + + + + diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs index 9db72a6d4..fbdd76f9e 100644 --- a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs +++ b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs @@ -41,6 +41,7 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co Topic = c.Key, HostName = Environment.MachineName, PausedPartitions = c.Select(x => x.Partition.Value).ToList(), + LastUpdate = DateTime.Now, }) .Union(consumer.RunningPartitions .GroupBy(c => c.Topic) @@ -49,6 +50,7 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co Topic = c.Key, HostName = Environment.MachineName, RunningPartitions = c.Select(x => x.Partition.Value).ToList(), + LastUpdate = DateTime.Now, })); } diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index cb67e5637..22748a32b 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -33,7 +33,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages( cluster.DependencyConfigurator.AddSingleton(); cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); return cluster .AddProducer( @@ -81,7 +81,7 @@ public static IClusterConfigurationBuilder EnableTelemetry( string consumerGroup) { cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); var groupId = $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; diff --git a/src/KafkaFlow.Admin/TelemetryCache.cs b/src/KafkaFlow.Admin/InMemoryTelemetryCache.cs similarity index 77% rename from src/KafkaFlow.Admin/TelemetryCache.cs rename to src/KafkaFlow.Admin/InMemoryTelemetryCache.cs index 1ff821325..61e7c766c 100644 --- a/src/KafkaFlow.Admin/TelemetryCache.cs +++ b/src/KafkaFlow.Admin/InMemoryTelemetryCache.cs @@ -7,15 +7,15 @@ namespace KafkaFlow.Admin /// /// Provide cache operations related to telemetry data /// - public class TelemetryCache : ITelemetryCache + public class InMemoryTelemetryCache : ITelemetryCache { private readonly IMemoryCache cache; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - /// The memory cache interface to get metric data - public TelemetryCache(IMemoryCache cache) => this.cache = cache; + /// The memory cache class to manage telemetry data + public InMemoryTelemetryCache(IMemoryCache cache) => this.cache = cache; /// public List Get(string groupId, string consumerName) diff --git a/src/KafkaFlow.Admin/TelemetryScheduler.cs b/src/KafkaFlow.Admin/TelemetryScheduler.cs index 73be65afa..fedbcac70 100644 --- a/src/KafkaFlow.Admin/TelemetryScheduler.cs +++ b/src/KafkaFlow.Admin/TelemetryScheduler.cs @@ -6,21 +6,21 @@ namespace KafkaFlow.Admin internal class TelemetryScheduler { - private static readonly Lazy> timers = new (() => new Dictionary()); + private static readonly Lazy> Timers = new (() => new Dictionary()); public static void Set(string key, TimerCallback callback, TimeSpan dueTime, TimeSpan period) { Unset(key); - timers.Value[key] = new Timer(callback, null, dueTime, period); + Timers.Value[key] = new Timer(callback, null, dueTime, period); } public static void Unset(string key) { - if (timers.Value.TryGetValue(key, out var timer)) + if (Timers.Value.TryGetValue(key, out var timer)) { timer.Dispose(); - timers.Value.Remove(key); + Timers.Value.Remove(key); } } } diff --git a/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs b/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs index 902dd184d..a2e53ec90 100644 --- a/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs +++ b/src/KafkaFlow.UnitTests/Consumer/ConsumerManagerTests.cs @@ -36,6 +36,10 @@ public void Setup() this.logHandlerMock = new Mock(MockBehavior.Strict); this.dependencyResolver = new Mock(); + this.consumerMock + .Setup(x => x.FlowManager) + .Returns(new Mock().Object); + this.consumerMock .Setup(x => x.OnPartitionsAssigned(It.IsAny, List>>())) .Callback((Action, List> value) => this.onPartitionAssignedHandler = value); diff --git a/src/KafkaFlow/Configuration/ClusterConfiguration.cs b/src/KafkaFlow/Configuration/ClusterConfiguration.cs index c5c2e5cfa..68670106c 100644 --- a/src/KafkaFlow/Configuration/ClusterConfiguration.cs +++ b/src/KafkaFlow/Configuration/ClusterConfiguration.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration internal class ClusterConfiguration { private readonly Func securityInformationHandler; - private Action onStopHandler = _ => { }; + private readonly Action onStopHandler = _ => { }; private readonly List producers = new(); private readonly List consumers = new(); @@ -31,12 +31,12 @@ public ClusterConfiguration( public IReadOnlyCollection Consumers => this.consumers.AsReadOnly(); + public Action OnStopHandler => this.onStopHandler; + public void AddConsumers(IEnumerable configurations) => this.consumers.AddRange(configurations); public void AddProducers(IEnumerable configurations) => this.producers.AddRange(configurations); public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke(); - - public Action OnStopHandler => this.onStopHandler; } } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 5a9d63a15..7ea3664eb 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration using Confluent.Kafka; using KafkaFlow.Consumers.DistributionStrategies; - internal class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder + internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder { private readonly List topics = new(); private readonly List> statisticsHandlers = new(); diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs index a953ec80f..52420cfd2 100644 --- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs @@ -30,7 +30,7 @@ public interface IConsumerConfiguration string ConsumerName { get; } /// - /// Gets the consumer readonly flag + /// Gets a value indicating whether the consumer is readonly or not /// bool IsReadonly { get; } diff --git a/src/KafkaFlow/Configuration/KafkaConfiguration.cs b/src/KafkaFlow/Configuration/KafkaConfiguration.cs index af58eff94..b5e9895b0 100644 --- a/src/KafkaFlow/Configuration/KafkaConfiguration.cs +++ b/src/KafkaFlow/Configuration/KafkaConfiguration.cs @@ -1,14 +1,11 @@ namespace KafkaFlow.Configuration { - using System; using System.Collections.Generic; internal class KafkaConfiguration { private readonly List clusters = new(); - public Action OnStopHandler = _ => { }; - public IReadOnlyCollection Clusters => this.clusters; public void AddClusters(IEnumerable configurations) => this.clusters.AddRange(configurations); diff --git a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs index 315b035f2..d35e5a280 100644 --- a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs @@ -31,6 +31,10 @@ public interface IConsumerFlowManager : IDisposable /// A list of partitions void Resume(IReadOnlyCollection topicPartitions); + /// + /// Removes the running partitions from the list of paused partitions + /// + /// A list of partitions that are running void UpdatePausedPartitions(IEnumerable partitionsRunning); } } diff --git a/src/KafkaFlow/Consumers/IMessageConsumer.cs b/src/KafkaFlow/Consumers/IMessageConsumer.cs index 6e288ae46..4838d6813 100644 --- a/src/KafkaFlow/Consumers/IMessageConsumer.cs +++ b/src/KafkaFlow/Consumers/IMessageConsumer.cs @@ -16,7 +16,7 @@ public interface IMessageConsumer string ConsumerName { get; } /// - /// Gets the readonly flag defined in the configuration + /// Gets a value indicating whether the consumer is readonly or not /// bool IsReadonly { get; }