diff --git a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj
index c9cd49e0c..dcd8c3d0e 100644
--- a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj
+++ b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj
@@ -16,7 +16,6 @@
-
@@ -26,11 +25,15 @@
-
-
-
-
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaFlow.Sample.Dashboard/Startup.cs b/samples/KafkaFlow.Sample.Dashboard/Startup.cs
index ffc47b2b7..8f03cf257 100644
--- a/samples/KafkaFlow.Sample.Dashboard/Startup.cs
+++ b/samples/KafkaFlow.Sample.Dashboard/Startup.cs
@@ -51,8 +51,6 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp
bus = app.ApplicationServices.CreateKafkaBus();
bus.StartAsync(lifetime.ApplicationStopped);
});
-
- lifetime.ApplicationStopping.Register(() => bus.StopAsync().GetAwaiter().GetResult());
}
}
}
diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs
index bc7656473..8f4b29354 100644
--- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs
+++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs
@@ -20,6 +20,13 @@ public interface IClusterConfigurationBuilder
///
IClusterConfigurationBuilder WithBrokers(IEnumerable brokers);
+ ///
+ /// Sets a unique name for the cluster
+ ///
+ /// A unique name
+ ///
+ IClusterConfigurationBuilder WithName(string name);
+
///
/// Configures cluster security
///
@@ -55,6 +62,6 @@ public interface IClusterConfigurationBuilder
///
/// A handler to KafkaFlow cluster stop event
///
- IClusterConfigurationBuilder OnStop(Action handler);
+ IClusterConfigurationBuilder OnStopping(Action handler);
}
}
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts
index 8b20327c6..529f25759 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts
@@ -17,42 +17,42 @@ export class ConsumerService {
return this.http.get(this.accessPointUrl + 'groups', {headers: this.headers});
}
- public updateWorkersCount(groupId: any, consumerName: any, workersCount: number) {
+ public updateWorkersCount(groupId: string, consumerName: string, workersCount: number) {
return this.http.post(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/change-worker-count`,
{ workersCount: workersCount },
{headers: this.headers});
}
- public resetOffset(groupId: any, consumerName: any) {
+ public resetOffset(groupId: string, consumerName: string) {
return this.http.post(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/reset-offsets`,
{ confirm: true },
{headers: this.headers});
}
- public pause(groupId: any, consumerName: any) {
+ public pause(groupId: string, consumerName: string) {
return this.http.post(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/pause`,
null,
{headers: this.headers});
}
- public restart(groupId: any, consumerName: any) {
+ public restart(groupId: string, consumerName: string) {
return this.http.post(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/restart`,
null,
{headers: this.headers});
}
- public resume(groupId: any, consumerName: any) {
+ public resume(groupId: string, consumerName: string) {
return this.http.post(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/resume`,
null,
{headers: this.headers});
}
- public rewindOffset(groupId: any, consumerName: any, date: Date) {
+ public rewindOffset(groupId: string, consumerName: string, date: Date) {
return this.http.post(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`,
{ date: new Date(date.getTime() - (date.getTimezoneOffset() * 60000)).toISOString() },
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts
index 866d99483..95ead0b11 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts
@@ -27,7 +27,7 @@ export class ConsumerComponent implements OnInit {
return !(group.consumers.length == 1 && group.consumers[0].isReadonly==1);
}
- openWorkersCountModal(groupId: any, consumerName: any, workersCount: number) {
+ openWorkersCountModal(groupId: string, consumerName: string, workersCount: number) {
const modalRef = this.modalService.open(WorkersCountModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
@@ -39,7 +39,7 @@ export class ConsumerComponent implements OnInit {
});
}
- openResetModal(groupId: any, consumerName: any) {
+ openResetModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(ResetModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
@@ -50,7 +50,7 @@ export class ConsumerComponent implements OnInit {
});
}
- openPauseModal(groupId: any, consumerName: any) {
+ openPauseModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(PauseModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
@@ -61,7 +61,7 @@ export class ConsumerComponent implements OnInit {
});
}
- openRestartModal(groupId: any, consumerName: any) {
+ openRestartModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(RestartModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
@@ -72,7 +72,7 @@ export class ConsumerComponent implements OnInit {
});
}
- openResumeModal(groupId: any, consumerName: any) {
+ openResumeModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(ResumeModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
@@ -83,7 +83,7 @@ export class ConsumerComponent implements OnInit {
});
}
- openRewindModal(groupId: any, consumerName: any) {
+ openRewindModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(RewindModalComponent);
modalRef.componentInstance.consumerName = consumerName;
modalRef.componentInstance.groupId = groupId;
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts
index 32071e85d..ae8b700fe 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts
@@ -31,7 +31,7 @@ export class HomeComponent implements OnInit {
return groups;
}
- isActive(date: any) {
+ isActive(date: string) {
return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5;
}
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts
index 8ee2f93fa..8880a4091 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts
@@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './pause-modal.component.html'
})
export class PauseModalComponent implements OnInit {
- @Input() public groupId: any;
- @Input() public consumerName: any;
+ @Input() public groupId: string;
+ @Input() public consumerName: string;
constructor(public activeModal: NgbActiveModal) { }
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts
index f21905105..f04973c2b 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts
@@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './reset-modal.component.html'
})
export class ResetModalComponent implements OnInit {
- @Input() public groupId: any;
- @Input() public consumerName: any;
+ @Input() public groupId: string;
+ @Input() public consumerName: string;
constructor(public activeModal: NgbActiveModal) { }
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts
index 82e571506..0312741e6 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts
@@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './restart-modal.component.html'
})
export class RestartModalComponent implements OnInit {
- @Input() public groupId: any;
- @Input() public consumerName: any;
+ @Input() public groupId: string;
+ @Input() public consumerName: string;
constructor(public activeModal: NgbActiveModal) { }
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts
index 0d2bde0ac..62c490242 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts
@@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './resume-modal.component.html'
})
export class ResumeModalComponent implements OnInit {
- @Input() public groupId: any;
- @Input() public consumerName: any;
+ @Input() public groupId: string;
+ @Input() public consumerName: string;
constructor(public activeModal: NgbActiveModal) { }
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts
index 82266161e..2f28577d0 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts
@@ -7,8 +7,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
})
export class RewindModalComponent implements OnInit {
public rewindDate: Date | undefined;
- @Input() public groupId: any;
- @Input() public consumerName: any;
+ @Input() public groupId: string;
+ @Input() public consumerName: string;
constructor(public activeModal: NgbActiveModal) { }
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts
index a57e6c0f1..e3d41f20b 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts
@@ -6,10 +6,10 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './workers-count-modal.component.html'
})
export class WorkersCountModalComponent implements OnInit {
- @Input() public workersCount: any;
- @Input() public groupId: any;
- @Input() public consumerName: any;
- public oldWorkersCount: any;
+ @Input() public workersCount: number;
+ @Input() public groupId: string;
+ @Input() public consumerName: string;
+ public oldWorkersCount: number;
constructor(public activeModal: NgbActiveModal) {}
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json b/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json
index 94a24fe1e..c9867cac8 100644
--- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json
+++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json
@@ -14,6 +14,7 @@
"experimentalDecorators": true,
"moduleResolution": "node",
"importHelpers": true,
+ "strictPropertyInitialization": false,
"target": "es2015",
"module": "es2020",
"lib": [
diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs
index 9db72a6d4..f551330aa 100644
--- a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs
+++ b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs
@@ -9,7 +9,7 @@ namespace KafkaFlow.Admin.WebApi.Adapters
internal static class ConsumerResponseAdapter
{
- internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryCache cache)
+ internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryStorage storage)
{
var consumerResponse = new ConsumerResponse()
{
@@ -23,7 +23,7 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetr
IsReadonly = consumer.IsReadonly,
};
- var cachedMetrics = cache.Get(consumer.GroupId, consumer.ConsumerName);
+ var cachedMetrics = storage.Get(consumer.GroupId, consumer.ConsumerName);
consumerResponse.PartitionAssignments = cachedMetrics.Any() ?
cachedMetrics.Select(m => m.Adapt()) :
@@ -41,6 +41,7 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co
Topic = c.Key,
HostName = Environment.MachineName,
PausedPartitions = c.Select(x => x.Partition.Value).ToList(),
+ LastUpdate = DateTime.Now,
})
.Union(consumer.RunningPartitions
.GroupBy(c => c.Topic)
@@ -49,6 +50,7 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co
Topic = c.Key,
HostName = Environment.MachineName,
RunningPartitions = c.Select(x => x.Partition.Value).ToList(),
+ LastUpdate = DateTime.Now,
}));
}
diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs
index 4f363c841..197a30ae9 100644
--- a/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs
+++ b/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs
@@ -18,19 +18,19 @@ public class ConsumersController : ControllerBase
{
private readonly IConsumerAccessor consumers;
private readonly IAdminProducer adminProducer;
- private readonly ITelemetryCache cache;
+ private readonly ITelemetryStorage storage;
///
/// Initializes a new instance of the class.
///
/// The accessor class that provides access to the consumers
/// The producer to publish admin messages
- /// The cache interface to get metric data
- public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache)
+ /// The cache interface to get metric data
+ public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryStorage storage)
{
this.consumers = consumers;
this.adminProducer = adminProducer;
- this.cache = cache;
+ this.storage = storage;
}
///
@@ -46,7 +46,7 @@ public IActionResult Get([FromRoute] string groupId)
this.consumers
.All
.Where(x => x.GroupId == groupId)
- .Select(x=> x.Adapt(this.cache)));
+ .Select(x=> x.Adapt(this.storage)));
}
///
@@ -71,7 +71,7 @@ public IActionResult Get(
return this.NotFound();
}
- return this.Ok(consumer.Adapt(this.cache));
+ return this.Ok(consumer.Adapt(this.storage));
}
///
diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs
index ea48a8b48..5710cb359 100644
--- a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs
+++ b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs
@@ -18,19 +18,19 @@ public class GroupsController : ControllerBase
{
private readonly IConsumerAccessor consumers;
private readonly IAdminProducer adminProducer;
- private readonly ITelemetryCache cache;
+ private readonly ITelemetryStorage storage;
///
/// Initializes a new instance of the class.
///
/// The accessor class that provides access to the consumers
/// The producer to publish admin messages
- /// The cache interface to get metric data
- public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache)
+ /// The cache interface to get metric data
+ public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryStorage storage)
{
this.consumers = consumers;
this.adminProducer = adminProducer;
- this.cache = cache;
+ this.storage = storage;
}
///
@@ -48,7 +48,7 @@ public IActionResult Get()
x => new GroupResponse
{
GroupId = x.First().GroupId,
- Consumers = x.Select(y => y.Adapt(this.cache)),
+ Consumers = x.Select(y => y.Adapt(this.storage)),
}));
}
diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs
index cb67e5637..498190b36 100644
--- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs
+++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs
@@ -33,7 +33,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
cluster.DependencyConfigurator.AddSingleton();
cluster.DependencyConfigurator.AddSingleton();
- cluster.DependencyConfigurator.AddSingleton();
+ cluster.DependencyConfigurator.AddSingleton();
return cluster
.AddProducer(
@@ -81,16 +81,14 @@ public static IClusterConfigurationBuilder EnableTelemetry(
string consumerGroup)
{
cluster.DependencyConfigurator.AddSingleton();
- cluster.DependencyConfigurator.AddSingleton();
+ cluster.DependencyConfigurator.AddSingleton();
- var groupId =
- $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
-
- var producerName = $"telemetry-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
+ var groupId = $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
+ var name = $"telemetry-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
return cluster
.AddProducer(
- producerName,
+ name,
producer => producer
.DefaultTopic(topicName)
.AddMiddlewares(
@@ -99,12 +97,13 @@ public static IClusterConfigurationBuilder EnableTelemetry(
.AddConsumer(
consumer => consumer
.Topic(topicName)
+ .WithName(name)
.WithGroupId(groupId)
.WithWorkersCount(1)
.AsReadonly()
.WithBufferSize(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
- .WithPartitionsAssignedHandler((resolver, partitions) =>
+ .WithPartitionsAssignedHandler((resolver, _) =>
{
TelemetryScheduler.Set(
groupId,
@@ -112,12 +111,15 @@ public static IClusterConfigurationBuilder EnableTelemetry(
{
await resolver
.Resolve()
- .GetProducer(producerName)
+ .GetProducer(name)
.BatchProduceAsync(
resolver
.Resolve()
.All
- .Where(c => !c.IsReadonly) // TODO GET ONLY CONSUMERS FROM THIS CLUSTER
+ .Where(c => !c.IsReadonly &&
+ c.ClusterName.Equals(resolver
+ .Resolve()[name]
+ .ClusterName))
.SelectMany(c => c.Assignment.Select(a =>
new BatchProduceItem(
topicName,
@@ -149,7 +151,7 @@ await resolver
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf())))
- .OnStop(_ => TelemetryScheduler.Unset(groupId));
+ .OnStopping(_ => TelemetryScheduler.Unset(groupId));
}
///
diff --git a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs b/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs
index 9c8ae4c87..12a511d6e 100644
--- a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs
+++ b/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs
@@ -6,13 +6,13 @@ namespace KafkaFlow.Admin.Handlers
internal class ConsumerMetricHandler : IMessageHandler
{
- private readonly ITelemetryCache cache;
+ private readonly ITelemetryStorage storage;
- public ConsumerMetricHandler(ITelemetryCache cache) => this.cache = cache;
+ public ConsumerMetricHandler(ITelemetryStorage storage) => this.storage = storage;
public Task Handle(IMessageContext context, ConsumerMetric message)
{
- this.cache.Put(message.GroupId, message.ConsumerName, message);
+ this.storage.Put(message.GroupId, message.ConsumerName, message);
return Task.CompletedTask;
}
}
diff --git a/src/KafkaFlow.Admin/ITelemetryCache.cs b/src/KafkaFlow.Admin/ITelemetryStorage.cs
similarity index 78%
rename from src/KafkaFlow.Admin/ITelemetryCache.cs
rename to src/KafkaFlow.Admin/ITelemetryStorage.cs
index e47853dfd..a31456956 100644
--- a/src/KafkaFlow.Admin/ITelemetryCache.cs
+++ b/src/KafkaFlow.Admin/ITelemetryStorage.cs
@@ -4,12 +4,12 @@ namespace KafkaFlow.Admin
using KafkaFlow.Admin.Messages;
///
- /// Used to implement a cache provider to manage telemetry data
+ /// Used to implement a telemetry data storage provider
///
- public interface ITelemetryCache
+ public interface ITelemetryStorage
{
///
- /// Gets the cached metric indexed with the parameters provided
+ /// Gets the stored metric indexed with the parameters provided
///
/// The group id
/// The consumer name
@@ -17,7 +17,7 @@ public interface ITelemetryCache
List Get(string groupId, string consumerName);
///
- /// Puts in the cache the metric provided
+ /// Store the metric provided
///
/// The group id
/// The consumer name
diff --git a/src/KafkaFlow.Admin/TelemetryCache.cs b/src/KafkaFlow.Admin/MemoryCacheTelemetryStorage.cs
similarity index 65%
rename from src/KafkaFlow.Admin/TelemetryCache.cs
rename to src/KafkaFlow.Admin/MemoryCacheTelemetryStorage.cs
index 1ff821325..d88f774db 100644
--- a/src/KafkaFlow.Admin/TelemetryCache.cs
+++ b/src/KafkaFlow.Admin/MemoryCacheTelemetryStorage.cs
@@ -4,20 +4,12 @@ namespace KafkaFlow.Admin
using KafkaFlow.Admin.Messages;
using Microsoft.Extensions.Caching.Memory;
- ///
- /// Provide cache operations related to telemetry data
- ///
- public class TelemetryCache : ITelemetryCache
+ internal class MemoryCacheTelemetryStorage : ITelemetryStorage
{
private readonly IMemoryCache cache;
- ///
- /// Initializes a new instance of the class.
- ///
- /// The memory cache interface to get metric data
- public TelemetryCache(IMemoryCache cache) => this.cache = cache;
+ public MemoryCacheTelemetryStorage(IMemoryCache cache) => this.cache = cache;
- ///
public List Get(string groupId, string consumerName)
{
return this.cache.TryGetValue(this.BuildKey(groupId, consumerName), out List metric) ?
@@ -25,7 +17,6 @@ public List Get(string groupId, string consumerName)
new List();
}
- ///
public void Put(string groupId, string consumerName, ConsumerMetric metric)
{
var entry = this.Get(groupId, consumerName);
diff --git a/src/KafkaFlow.Admin/TelemetryScheduler.cs b/src/KafkaFlow.Admin/TelemetryScheduler.cs
index 73be65afa..fedbcac70 100644
--- a/src/KafkaFlow.Admin/TelemetryScheduler.cs
+++ b/src/KafkaFlow.Admin/TelemetryScheduler.cs
@@ -6,21 +6,21 @@ namespace KafkaFlow.Admin
internal class TelemetryScheduler
{
- private static readonly Lazy> timers = new (() => new Dictionary());
+ private static readonly Lazy> Timers = new (() => new Dictionary());
public static void Set(string key, TimerCallback callback, TimeSpan dueTime, TimeSpan period)
{
Unset(key);
- timers.Value[key] = new Timer(callback, null, dueTime, period);
+ Timers.Value[key] = new Timer(callback, null, dueTime, period);
}
public static void Unset(string key)
{
- if (timers.Value.TryGetValue(key, out var timer))
+ if (Timers.Value.TryGetValue(key, out var timer))
{
timer.Dispose();
- timers.Value.Remove(key);
+ Timers.Value.Remove(key);
}
}
}
diff --git a/src/KafkaFlow/Configuration/ClusterConfiguration.cs b/src/KafkaFlow/Configuration/ClusterConfiguration.cs
index c5c2e5cfa..4c65b7b01 100644
--- a/src/KafkaFlow/Configuration/ClusterConfiguration.cs
+++ b/src/KafkaFlow/Configuration/ClusterConfiguration.cs
@@ -7,36 +7,40 @@ namespace KafkaFlow.Configuration
internal class ClusterConfiguration
{
private readonly Func securityInformationHandler;
- private Action onStopHandler = _ => { };
+ private readonly Action onStoppingHandler = _ => { };
private readonly List producers = new();
private readonly List consumers = new();
public ClusterConfiguration(
KafkaConfiguration kafka,
+ string name,
IEnumerable brokers,
Func securityInformationHandler,
- Action onStopHandler)
+ Action onStoppingHandler)
{
this.securityInformationHandler = securityInformationHandler;
+ this.Name = name ?? Guid.NewGuid().ToString();
this.Kafka = kafka;
this.Brokers = brokers.ToList();
- this.onStopHandler = onStopHandler;
+ this.onStoppingHandler = onStoppingHandler;
}
public KafkaConfiguration Kafka { get; }
public IReadOnlyCollection Brokers { get; }
+ public string Name { get; }
+
public IReadOnlyCollection Producers => this.producers.AsReadOnly();
public IReadOnlyCollection Consumers => this.consumers.AsReadOnly();
+ public Action OnStoppingHandler => this.onStoppingHandler;
+
public void AddConsumers(IEnumerable configurations) => this.consumers.AddRange(configurations);
public void AddProducers(IEnumerable configurations) => this.producers.AddRange(configurations);
public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke();
-
- public Action OnStopHandler => this.onStopHandler;
}
}
diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
index 29a18f468..a6604f6bc 100644
--- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
@@ -9,9 +9,10 @@ internal class ClusterConfigurationBuilder : IClusterConfigurationBuilder
{
private readonly List producers = new();
private readonly List consumers = new();
- private Action onStopHandler = _ => { };
+ private Action onStoppingHandler = _ => { };
private IEnumerable brokers;
+ private string name;
private Func securityInformationHandler;
public ClusterConfigurationBuilder(IDependencyConfigurator dependencyConfigurator)
@@ -25,9 +26,10 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration)
{
var configuration = new ClusterConfiguration(
kafkaConfiguration,
+ this.name,
this.brokers.ToList(),
this.securityInformationHandler,
- this.onStopHandler);
+ this.onStoppingHandler);
configuration.AddProducers(this.producers.Select(x => x.Build(configuration)));
configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration)));
@@ -41,6 +43,12 @@ public IClusterConfigurationBuilder WithBrokers(IEnumerable brokers)
return this;
}
+ public IClusterConfigurationBuilder WithName(string name)
+ {
+ this.name = name;
+ return this;
+ }
+
public IClusterConfigurationBuilder WithSecurityInformation(Action handler)
{
// Uses a handler to avoid in-memory stored passwords for long periods
@@ -85,9 +93,9 @@ public IClusterConfigurationBuilder AddConsumer(Action handler)
+ public IClusterConfigurationBuilder OnStopping(Action handler)
{
- this.onStopHandler = handler;
+ this.onStoppingHandler = handler;
return this;
}
}
diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
index 29880a34d..cd896da92 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs
@@ -13,6 +13,7 @@ public ConsumerConfiguration(
ConsumerConfig consumerConfig,
IEnumerable topics,
string consumerName,
+ string clusterName,
bool isReadonly,
int workersCount,
int bufferSize,
@@ -39,6 +40,7 @@ public ConsumerConfiguration(
this.AutoCommitInterval = autoCommitInterval;
this.Topics = topics ?? throw new ArgumentNullException(nameof(topics));
this.ConsumerName = consumerName ?? Guid.NewGuid().ToString();
+ this.ClusterName = clusterName;
this.IsReadonly = isReadonly;
this.WorkersCount = workersCount;
this.StatisticsHandlers = statisticsHandlers;
@@ -62,6 +64,8 @@ public ConsumerConfiguration(
public string ConsumerName { get; }
+ public string ClusterName { get; }
+
public bool IsReadonly { get; }
public int WorkersCount
diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
index 5a9d63a15..afe5026bd 100644
--- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
+++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs
@@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration
using Confluent.Kafka;
using KafkaFlow.Consumers.DistributionStrategies;
- internal class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder
+ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder
{
private readonly List topics = new();
private readonly List> statisticsHandlers = new();
@@ -201,6 +201,7 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration)
this.consumerConfig,
this.topics,
this.name,
+ clusterConfiguration.Name,
this.isReadonly,
this.workersCount,
this.bufferSize,
diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
index a953ec80f..255774800 100644
--- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
+++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs
@@ -30,7 +30,12 @@ public interface IConsumerConfiguration
string ConsumerName { get; }
///
- /// Gets the consumer readonly flag
+ /// Gets the cluster name
+ ///
+ string ClusterName { get; }
+
+ ///
+ /// Gets a value indicating whether the consumer is readonly or not
///
bool IsReadonly { get; }
diff --git a/src/KafkaFlow/Configuration/KafkaConfiguration.cs b/src/KafkaFlow/Configuration/KafkaConfiguration.cs
index af58eff94..b5e9895b0 100644
--- a/src/KafkaFlow/Configuration/KafkaConfiguration.cs
+++ b/src/KafkaFlow/Configuration/KafkaConfiguration.cs
@@ -1,14 +1,11 @@
namespace KafkaFlow.Configuration
{
- using System;
using System.Collections.Generic;
internal class KafkaConfiguration
{
private readonly List clusters = new();
- public Action OnStopHandler = _ => { };
-
public IReadOnlyCollection Clusters => this.clusters;
public void AddClusters(IEnumerable configurations) => this.clusters.AddRange(configurations);
diff --git a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs
index 657f89373..bc198701b 100644
--- a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs
+++ b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs
@@ -24,7 +24,7 @@ public ConsumerFlowManager(
this.logHandler = logHandler;
}
- public IReadOnlyList PausedPartitions => this.pausedPartitions.AsReadOnly();
+ public IEnumerable PausedPartitions => this.pausedPartitions.AsReadOnly();
public ConsumerFlowStatus Status
{
@@ -121,12 +121,9 @@ public void Resume(IReadOnlyCollection topicPartitions)
}
}
- public void UpdatePausedPartitions(IEnumerable partitionsRunning)
+ public void CleanPausedPartitions()
{
- foreach (var p in partitionsRunning)
- {
- this.pausedPartitions.Remove(p);
- }
+ this.pausedPartitions.Clear();
}
public void Dispose()
diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs
index 2398b1d0b..c112f99c0 100644
--- a/src/KafkaFlow/Consumers/ConsumerManager.cs
+++ b/src/KafkaFlow/Consumers/ConsumerManager.cs
@@ -70,8 +70,6 @@ private void OnPartitionAssigned(IReadOnlyCollection partitions)
"Partitions assigned",
this.GetConsumerLogInfo(partitions));
- this.Consumer.FlowManager.UpdatePausedPartitions(partitions);
-
this.WorkerPool
.StartAsync(partitions)
.GetAwaiter()
diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
index 18e5cb61e..5e7d5c5ab 100644
--- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
+++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
@@ -41,6 +41,8 @@ public async Task StartAsync(IEnumerable partitions)
this.logHandler),
partitions);
+ this.consumer.FlowManager.CleanPausedPartitions();
+
await Task.WhenAll(
Enumerable
.Range(0, this.consumer.Configuration.WorkersCount)
diff --git a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs
index 315b035f2..df1a4b368 100644
--- a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs
+++ b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs
@@ -17,7 +17,7 @@ public interface IConsumerFlowManager : IDisposable
///
/// Gets a list of the consumer paused partitions
///
- IReadOnlyList PausedPartitions { get; }
+ IEnumerable PausedPartitions { get; }
///
/// Pauses a set of partitions
@@ -31,6 +31,9 @@ public interface IConsumerFlowManager : IDisposable
/// A list of partitions
void Resume(IReadOnlyCollection topicPartitions);
- void UpdatePausedPartitions(IEnumerable partitionsRunning);
+ ///
+ /// Removes all partitions from the list of paused partitions
+ ///
+ void CleanPausedPartitions();
}
}
diff --git a/src/KafkaFlow/Consumers/IMessageConsumer.cs b/src/KafkaFlow/Consumers/IMessageConsumer.cs
index 6e288ae46..d331c3f13 100644
--- a/src/KafkaFlow/Consumers/IMessageConsumer.cs
+++ b/src/KafkaFlow/Consumers/IMessageConsumer.cs
@@ -16,7 +16,12 @@ public interface IMessageConsumer
string ConsumerName { get; }
///
- /// Gets the readonly flag defined in the configuration
+ /// Gets the unique cluster´s name defined in the configuration
+ ///
+ string ClusterName { get; }
+
+ ///
+ /// Gets a value indicating whether the consumer is readonly or not
///
bool IsReadonly { get; }
@@ -67,12 +72,12 @@ public interface IMessageConsumer
///
/// Gets the consumer's paused partitions
///
- IReadOnlyList PausedPartitions { get; }
+ IEnumerable PausedPartitions { get; }
///
/// Gets the consumer's running partitions
///
- IReadOnlyList RunningPartitions { get; }
+ IEnumerable RunningPartitions { get; }
///
/// Overrides the offsets of the given partitions and restart the consumer
diff --git a/src/KafkaFlow/Consumers/MessageConsumer.cs b/src/KafkaFlow/Consumers/MessageConsumer.cs
index 017d2d8b0..bf4eb4b43 100644
--- a/src/KafkaFlow/Consumers/MessageConsumer.cs
+++ b/src/KafkaFlow/Consumers/MessageConsumer.cs
@@ -21,6 +21,8 @@ public MessageConsumer(
public string ConsumerName => this.consumerManager.Consumer.Configuration.ConsumerName;
+ public string ClusterName => this.consumerManager.Consumer.Configuration.ClusterName;
+
public bool IsReadonly => this.consumerManager.Consumer.Configuration.IsReadonly;
public string GroupId => this.consumerManager.Consumer.Configuration.GroupId;
@@ -37,9 +39,9 @@ public MessageConsumer(
public int WorkersCount => this.consumerManager.Consumer.Configuration.WorkersCount;
- public IReadOnlyList PausedPartitions => this.consumerManager.Consumer.FlowManager?.PausedPartitions ?? new List();
+ public IEnumerable PausedPartitions => this.consumerManager.Consumer.FlowManager?.PausedPartitions ?? Enumerable.Empty();
- public IReadOnlyList RunningPartitions => this.Assignment.Except(this.PausedPartitions).ToList();
+ public IEnumerable RunningPartitions => this.Assignment.Except(this.PausedPartitions);
public void Pause(IReadOnlyCollection topicPartitions)
{
diff --git a/src/KafkaFlow/KafkaBus.cs b/src/KafkaFlow/KafkaBus.cs
index 92e8cf0b8..52323dc75 100644
--- a/src/KafkaFlow/KafkaBus.cs
+++ b/src/KafkaFlow/KafkaBus.cs
@@ -58,7 +58,10 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default)
public Task StopAsync()
{
- this.configuration.Clusters.ToList().ForEach(c => c.OnStopHandler(this.dependencyResolver));
+ foreach (var cluster in this.configuration.Clusters)
+ {
+ cluster.OnStoppingHandler(this.dependencyResolver);
+ }
return Task.WhenAll(this.consumerManagers.Select(x => x.StopAsync()));
}