diff --git a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj index c9cd49e0c..dcd8c3d0e 100644 --- a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj +++ b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj @@ -16,7 +16,6 @@ - @@ -26,11 +25,15 @@ - - - - + + + + + + + + diff --git a/samples/KafkaFlow.Sample.Dashboard/Startup.cs b/samples/KafkaFlow.Sample.Dashboard/Startup.cs index ffc47b2b7..8f03cf257 100644 --- a/samples/KafkaFlow.Sample.Dashboard/Startup.cs +++ b/samples/KafkaFlow.Sample.Dashboard/Startup.cs @@ -51,8 +51,6 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp bus = app.ApplicationServices.CreateKafkaBus(); bus.StartAsync(lifetime.ApplicationStopped); }); - - lifetime.ApplicationStopping.Register(() => bus.StopAsync().GetAwaiter().GetResult()); } } } diff --git a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs index bc7656473..2986f7caa 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IClusterConfigurationBuilder.cs @@ -20,6 +20,13 @@ public interface IClusterConfigurationBuilder /// IClusterConfigurationBuilder WithBrokers(IEnumerable brokers); + /// + /// Sets a unique name for the cluster + /// + /// A unique name + /// + IClusterConfigurationBuilder WithName(string name); + /// /// Configures cluster security /// @@ -51,10 +58,17 @@ public interface IClusterConfigurationBuilder IClusterConfigurationBuilder AddConsumer(Action consumer); /// - /// Adds a handler to KafkaFlow cluster stop event + /// Adds a handler to be executed when KafkaFlow cluster is stopping + /// + /// A handler to KafkaFlow cluster stopping event + /// + IClusterConfigurationBuilder OnStopping(Action handler); + + /// + /// Adds a handler to be executed after KafkaFlow cluster started /// - /// A handler to KafkaFlow cluster stop event + /// A handler to KafkaFlow cluster start event /// - IClusterConfigurationBuilder OnStop(Action handler); + IClusterConfigurationBuilder OnStarted(Action handler); } } diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs index 6cc16b6a3..c040a9636 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs @@ -45,7 +45,7 @@ public interface IConsumerConfigurationBuilder /// Sets the consumer as readonly, that means this consumer can not be managed and it will not send telemetry data /// /// - IConsumerConfigurationBuilder AsReadonly(); + IConsumerConfigurationBuilder DisableManagement(); /// /// Sets the group id used by the consumer diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts index 8b20327c6..529f25759 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer.service.ts @@ -17,42 +17,42 @@ export class ConsumerService { return this.http.get(this.accessPointUrl + 'groups', {headers: this.headers}); } - public updateWorkersCount(groupId: any, consumerName: any, workersCount: number) { + public updateWorkersCount(groupId: string, consumerName: string, workersCount: number) { return this.http.post( this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/change-worker-count`, { workersCount: workersCount }, {headers: this.headers}); } - public resetOffset(groupId: any, consumerName: any) { + public resetOffset(groupId: string, consumerName: string) { return this.http.post( this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/reset-offsets`, { confirm: true }, {headers: this.headers}); } - public pause(groupId: any, consumerName: any) { + public pause(groupId: string, consumerName: string) { return this.http.post( this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/pause`, null, {headers: this.headers}); } - public restart(groupId: any, consumerName: any) { + public restart(groupId: string, consumerName: string) { return this.http.post( this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/restart`, null, {headers: this.headers}); } - public resume(groupId: any, consumerName: any) { + public resume(groupId: string, consumerName: string) { return this.http.post( this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/resume`, null, {headers: this.headers}); } - public rewindOffset(groupId: any, consumerName: any, date: Date) { + public rewindOffset(groupId: string, consumerName: string, date: Date) { return this.http.post( this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`, { date: new Date(date.getTime() - (date.getTimezoneOffset() * 60000)).toISOString() }, diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts index 866d99483..f3ffe1b81 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts @@ -24,10 +24,10 @@ export class ConsumerComponent implements OnInit { constructor(private modalService: NgbModal, private consumerService: ConsumerService) { } removeReadonly(group: any) { - return !(group.consumers.length == 1 && group.consumers[0].isReadonly==1); + return !(group.consumers.length == 1 && group.consumers[0].managementDisabled==1); } - openWorkersCountModal(groupId: any, consumerName: any, workersCount: number) { + openWorkersCountModal(groupId: string, consumerName: string, workersCount: number) { const modalRef = this.modalService.open(WorkersCountModalComponent); modalRef.componentInstance.groupId = groupId; modalRef.componentInstance.consumerName = consumerName; @@ -39,7 +39,7 @@ export class ConsumerComponent implements OnInit { }); } - openResetModal(groupId: any, consumerName: any) { + openResetModal(groupId: string, consumerName: string) { const modalRef = this.modalService.open(ResetModalComponent); modalRef.componentInstance.groupId = groupId; modalRef.componentInstance.consumerName = consumerName; @@ -50,7 +50,7 @@ export class ConsumerComponent implements OnInit { }); } - openPauseModal(groupId: any, consumerName: any) { + openPauseModal(groupId: string, consumerName: string) { const modalRef = this.modalService.open(PauseModalComponent); modalRef.componentInstance.groupId = groupId; modalRef.componentInstance.consumerName = consumerName; @@ -61,7 +61,7 @@ export class ConsumerComponent implements OnInit { }); } - openRestartModal(groupId: any, consumerName: any) { + openRestartModal(groupId: string, consumerName: string) { const modalRef = this.modalService.open(RestartModalComponent); modalRef.componentInstance.groupId = groupId; modalRef.componentInstance.consumerName = consumerName; @@ -72,7 +72,7 @@ export class ConsumerComponent implements OnInit { }); } - openResumeModal(groupId: any, consumerName: any) { + openResumeModal(groupId: string, consumerName: string) { const modalRef = this.modalService.open(ResumeModalComponent); modalRef.componentInstance.groupId = groupId; modalRef.componentInstance.consumerName = consumerName; @@ -83,7 +83,7 @@ export class ConsumerComponent implements OnInit { }); } - openRewindModal(groupId: any, consumerName: any) { + openRewindModal(groupId: string, consumerName: string) { const modalRef = this.modalService.open(RewindModalComponent); modalRef.componentInstance.consumerName = consumerName; modalRef.componentInstance.groupId = groupId; diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts index 32071e85d..ae8b700fe 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts @@ -31,7 +31,7 @@ export class HomeComponent implements OnInit { return groups; } - isActive(date: any) { + isActive(date: string) { return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5; } diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts index 8ee2f93fa..8880a4091 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/pause-modal/pause-modal.component.ts @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap'; templateUrl: './pause-modal.component.html' }) export class PauseModalComponent implements OnInit { - @Input() public groupId: any; - @Input() public consumerName: any; + @Input() public groupId: string; + @Input() public consumerName: string; constructor(public activeModal: NgbActiveModal) { } diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts index f21905105..f04973c2b 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/reset-modal/reset-modal.component.ts @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap'; templateUrl: './reset-modal.component.html' }) export class ResetModalComponent implements OnInit { - @Input() public groupId: any; - @Input() public consumerName: any; + @Input() public groupId: string; + @Input() public consumerName: string; constructor(public activeModal: NgbActiveModal) { } diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts index 82e571506..0312741e6 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/restart-modal/restart-modal.component.ts @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap'; templateUrl: './restart-modal.component.html' }) export class RestartModalComponent implements OnInit { - @Input() public groupId: any; - @Input() public consumerName: any; + @Input() public groupId: string; + @Input() public consumerName: string; constructor(public activeModal: NgbActiveModal) { } diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts index 0d2bde0ac..62c490242 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/resume-modal/resume-modal.component.ts @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap'; templateUrl: './resume-modal.component.html' }) export class ResumeModalComponent implements OnInit { - @Input() public groupId: any; - @Input() public consumerName: any; + @Input() public groupId: string; + @Input() public consumerName: string; constructor(public activeModal: NgbActiveModal) { } diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts index 82266161e..2f28577d0 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/rewind-modal/rewind-modal.component.ts @@ -7,8 +7,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap'; }) export class RewindModalComponent implements OnInit { public rewindDate: Date | undefined; - @Input() public groupId: any; - @Input() public consumerName: any; + @Input() public groupId: string; + @Input() public consumerName: string; constructor(public activeModal: NgbActiveModal) { } diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts index a57e6c0f1..e3d41f20b 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/shared/workers-count-modal/workers-count-modal.component.ts @@ -6,10 +6,10 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap'; templateUrl: './workers-count-modal.component.html' }) export class WorkersCountModalComponent implements OnInit { - @Input() public workersCount: any; - @Input() public groupId: any; - @Input() public consumerName: any; - public oldWorkersCount: any; + @Input() public workersCount: number; + @Input() public groupId: string; + @Input() public consumerName: string; + public oldWorkersCount: number; constructor(public activeModal: NgbActiveModal) {} diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json b/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json index 94a24fe1e..c9867cac8 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json @@ -14,6 +14,7 @@ "experimentalDecorators": true, "moduleResolution": "node", "importHelpers": true, + "strictPropertyInitialization": false, "target": "es2015", "module": "es2020", "lib": [ diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs index 9db72a6d4..195d7115e 100644 --- a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs +++ b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs @@ -9,7 +9,7 @@ namespace KafkaFlow.Admin.WebApi.Adapters internal static class ConsumerResponseAdapter { - internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryCache cache) + internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryStorage storage) { var consumerResponse = new ConsumerResponse() { @@ -20,10 +20,10 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetr MemberId = consumer.MemberId, WorkersCount = consumer.WorkersCount, ClientInstanceName = consumer.ClientInstanceName, - IsReadonly = consumer.IsReadonly, + ManagementDisabled = consumer.ManagementDisabled, }; - var cachedMetrics = cache.Get(consumer.GroupId, consumer.ConsumerName); + var cachedMetrics = storage.Get(consumer.GroupId, consumer.ConsumerName); consumerResponse.PartitionAssignments = cachedMetrics.Any() ? cachedMetrics.Select(m => m.Adapt()) : @@ -40,7 +40,8 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co { Topic = c.Key, HostName = Environment.MachineName, - PausedPartitions = c.Select(x => x.Partition.Value).ToList(), + PausedPartitions = c.Select(x => x.Partition.Value), + LastUpdate = DateTime.Now, }) .Union(consumer.RunningPartitions .GroupBy(c => c.Topic) @@ -48,7 +49,8 @@ private static IEnumerable GetLocalInfo(IMessageConsumer co { Topic = c.Key, HostName = Environment.MachineName, - RunningPartitions = c.Select(x => x.Partition.Value).ToList(), + RunningPartitions = c.Select(x => x.Partition.Value), + LastUpdate = DateTime.Now, })); } @@ -56,7 +58,7 @@ private static PartitionAssignment Adapt(this ConsumerMetric consumer) => new() { Topic = consumer.Topic, - HostName = consumer.HostName, + HostName = consumer.InstanceName, PausedPartitions = consumer.PausedPartitions, RunningPartitions = consumer.RunningPartitions, LastUpdate = consumer.SentAt, diff --git a/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs b/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs index e5a78110a..9b2e4a81a 100644 --- a/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs +++ b/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs @@ -14,9 +14,9 @@ public class ConsumerResponse public string ConsumerName { get; set; } /// - /// Gets or sets a value indicating whether the consumer is readonly or not + /// Gets or sets a value indicating whether the consumer is able to be manageable or not /// - public bool IsReadonly { get; set; } + public bool ManagementDisabled { get; set; } /// /// Gets or sets the group id diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs index 4f363c841..197a30ae9 100644 --- a/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs +++ b/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs @@ -18,19 +18,19 @@ public class ConsumersController : ControllerBase { private readonly IConsumerAccessor consumers; private readonly IAdminProducer adminProducer; - private readonly ITelemetryCache cache; + private readonly ITelemetryStorage storage; /// /// Initializes a new instance of the class. /// /// The accessor class that provides access to the consumers /// The producer to publish admin messages - /// The cache interface to get metric data - public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache) + /// The cache interface to get metric data + public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryStorage storage) { this.consumers = consumers; this.adminProducer = adminProducer; - this.cache = cache; + this.storage = storage; } /// @@ -46,7 +46,7 @@ public IActionResult Get([FromRoute] string groupId) this.consumers .All .Where(x => x.GroupId == groupId) - .Select(x=> x.Adapt(this.cache))); + .Select(x=> x.Adapt(this.storage))); } /// @@ -71,7 +71,7 @@ public IActionResult Get( return this.NotFound(); } - return this.Ok(consumer.Adapt(this.cache)); + return this.Ok(consumer.Adapt(this.storage)); } /// diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs index ea48a8b48..5710cb359 100644 --- a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs +++ b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs @@ -18,19 +18,19 @@ public class GroupsController : ControllerBase { private readonly IConsumerAccessor consumers; private readonly IAdminProducer adminProducer; - private readonly ITelemetryCache cache; + private readonly ITelemetryStorage storage; /// /// Initializes a new instance of the class. /// /// The accessor class that provides access to the consumers /// The producer to publish admin messages - /// The cache interface to get metric data - public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache) + /// The cache interface to get metric data + public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryStorage storage) { this.consumers = consumers; this.adminProducer = adminProducer; - this.cache = cache; + this.storage = storage; } /// @@ -48,7 +48,7 @@ public IActionResult Get() x => new GroupResponse { GroupId = x.First().GroupId, - Consumers = x.Select(y => y.Adapt(this.cache)), + Consumers = x.Select(y => y.Adapt(this.storage)), })); } diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index cb67e5637..85018a1d0 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -33,7 +33,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages( cluster.DependencyConfigurator.AddSingleton(); cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); return cluster .AddProducer( @@ -50,7 +50,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages( .WithWorkersCount(1) .WithBufferSize(1) .WithAutoOffsetReset(AutoOffsetReset.Latest) - .AsReadonly() + .DisableManagement() .AddMiddlewares( middlewares => middlewares .AddSerializer() @@ -81,16 +81,14 @@ public static IClusterConfigurationBuilder EnableTelemetry( string consumerGroup) { cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); - var groupId = - $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; - - var producerName = $"telemetry-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; + var groupId = $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; + var name = $"telemetry-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; return cluster .AddProducer( - producerName, + name, producer => producer .DefaultTopic(topicName) .AddMiddlewares( @@ -99,49 +97,12 @@ public static IClusterConfigurationBuilder EnableTelemetry( .AddConsumer( consumer => consumer .Topic(topicName) + .WithName(name) .WithGroupId(groupId) .WithWorkersCount(1) - .AsReadonly() + .DisableManagement() .WithBufferSize(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) - .WithPartitionsAssignedHandler((resolver, partitions) => - { - TelemetryScheduler.Set( - groupId, - async _ => - { - await resolver - .Resolve() - .GetProducer(producerName) - .BatchProduceAsync( - resolver - .Resolve() - .All - .Where(c => !c.IsReadonly) // TODO GET ONLY CONSUMERS FROM THIS CLUSTER - .SelectMany(c => c.Assignment.Select(a => - new BatchProduceItem( - topicName, - Guid.NewGuid().ToString(), - new ConsumerMetric() - { - ConsumerName = c.ConsumerName, - Topic = a.Topic, - GroupId = c.GroupId, - HostName = $"{Environment.MachineName}-{c.MemberId}", - PausedPartitions = c.PausedPartitions - .Where(p => p.Topic == a.Topic) - .Select(p => p.Partition.Value), - RunningPartitions = c.RunningPartitions - .Where(p => p.Topic == a.Topic) - .Select(p => p.Partition.Value), - SentAt = DateTime.Now, - }, - null))) - .ToList()); - }, - TimeSpan.Zero, - TimeSpan.FromSeconds(1)); - }) .AddMiddlewares( middlewares => middlewares .AddSerializer() @@ -149,7 +110,8 @@ await resolver handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) .AddHandlersFromAssemblyOf()))) - .OnStop(_ => TelemetryScheduler.Unset(groupId)); + .OnStarted(resolver => resolver.Resolve().Start(name, topicName)) + .OnStopping(resolver => resolver.Resolve().Stop(name)); } /// diff --git a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs b/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs index 9c8ae4c87..12a511d6e 100644 --- a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs @@ -6,13 +6,13 @@ namespace KafkaFlow.Admin.Handlers internal class ConsumerMetricHandler : IMessageHandler { - private readonly ITelemetryCache cache; + private readonly ITelemetryStorage storage; - public ConsumerMetricHandler(ITelemetryCache cache) => this.cache = cache; + public ConsumerMetricHandler(ITelemetryStorage storage) => this.storage = storage; public Task Handle(IMessageContext context, ConsumerMetric message) { - this.cache.Put(message.GroupId, message.ConsumerName, message); + this.storage.Put(message.GroupId, message.ConsumerName, message); return Task.CompletedTask; } } diff --git a/src/KafkaFlow.Admin/ITelemetryScheduler.cs b/src/KafkaFlow.Admin/ITelemetryScheduler.cs new file mode 100644 index 000000000..99441a104 --- /dev/null +++ b/src/KafkaFlow.Admin/ITelemetryScheduler.cs @@ -0,0 +1,9 @@ +namespace KafkaFlow.Admin +{ + internal interface ITelemetryScheduler + { + void Start(string key, string topicName); + + void Stop(string key); + } +} diff --git a/src/KafkaFlow.Admin/ITelemetryCache.cs b/src/KafkaFlow.Admin/ITelemetryStorage.cs similarity index 78% rename from src/KafkaFlow.Admin/ITelemetryCache.cs rename to src/KafkaFlow.Admin/ITelemetryStorage.cs index e47853dfd..a31456956 100644 --- a/src/KafkaFlow.Admin/ITelemetryCache.cs +++ b/src/KafkaFlow.Admin/ITelemetryStorage.cs @@ -4,12 +4,12 @@ namespace KafkaFlow.Admin using KafkaFlow.Admin.Messages; /// - /// Used to implement a cache provider to manage telemetry data + /// Used to implement a telemetry data storage provider /// - public interface ITelemetryCache + public interface ITelemetryStorage { /// - /// Gets the cached metric indexed with the parameters provided + /// Gets the stored metric indexed with the parameters provided /// /// The group id /// The consumer name @@ -17,7 +17,7 @@ public interface ITelemetryCache List Get(string groupId, string consumerName); /// - /// Puts in the cache the metric provided + /// Store the metric provided /// /// The group id /// The consumer name diff --git a/src/KafkaFlow.Admin/TelemetryCache.cs b/src/KafkaFlow.Admin/MemoryCacheTelemetryStorage.cs similarity index 60% rename from src/KafkaFlow.Admin/TelemetryCache.cs rename to src/KafkaFlow.Admin/MemoryCacheTelemetryStorage.cs index 1ff821325..960fd2788 100644 --- a/src/KafkaFlow.Admin/TelemetryCache.cs +++ b/src/KafkaFlow.Admin/MemoryCacheTelemetryStorage.cs @@ -4,20 +4,12 @@ namespace KafkaFlow.Admin using KafkaFlow.Admin.Messages; using Microsoft.Extensions.Caching.Memory; - /// - /// Provide cache operations related to telemetry data - /// - public class TelemetryCache : ITelemetryCache + internal class MemoryCacheTelemetryStorage : ITelemetryStorage { private readonly IMemoryCache cache; - /// - /// Initializes a new instance of the class. - /// - /// The memory cache interface to get metric data - public TelemetryCache(IMemoryCache cache) => this.cache = cache; + public MemoryCacheTelemetryStorage(IMemoryCache cache) => this.cache = cache; - /// public List Get(string groupId, string consumerName) { return this.cache.TryGetValue(this.BuildKey(groupId, consumerName), out List metric) ? @@ -25,12 +17,11 @@ public List Get(string groupId, string consumerName) new List(); } - /// public void Put(string groupId, string consumerName, ConsumerMetric metric) { var entry = this.Get(groupId, consumerName); - entry.RemoveAll(e => e.HostName == metric.HostName); + entry.RemoveAll(e => e.InstanceName == metric.InstanceName); entry.Add(metric); this.cache.Set(this.BuildKey(groupId, consumerName), entry); } diff --git a/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs b/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs index 5904c9da3..dadbfd94f 100644 --- a/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs +++ b/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs @@ -32,7 +32,7 @@ public class ConsumerMetric /// Gets or sets the consumer host name /// [DataMember(Order = 4)] - public string HostName { get; set; } + public string InstanceName { get; set; } /// /// Gets or sets the list of running partitions diff --git a/src/KafkaFlow.Admin/TelemetryScheduler.cs b/src/KafkaFlow.Admin/TelemetryScheduler.cs index 73be65afa..0ab76b9ad 100644 --- a/src/KafkaFlow.Admin/TelemetryScheduler.cs +++ b/src/KafkaFlow.Admin/TelemetryScheduler.cs @@ -2,26 +2,75 @@ namespace KafkaFlow.Admin { using System; using System.Collections.Generic; + using System.Linq; using System.Threading; + using KafkaFlow.Admin.Messages; + using KafkaFlow.Consumers; + using KafkaFlow.Producers; - internal class TelemetryScheduler + internal class TelemetryScheduler : ITelemetryScheduler { - private static readonly Lazy> timers = new (() => new Dictionary()); + private readonly Dictionary timers = new(); + private readonly IDependencyResolver dependencyResolver; - public static void Set(string key, TimerCallback callback, TimeSpan dueTime, TimeSpan period) + public TelemetryScheduler(IDependencyResolver dependencyResolver) { - Unset(key); + this.dependencyResolver = dependencyResolver; + } + + public void Start(string key, string topicName) + { + this.Stop(key); + + var consumers = this.dependencyResolver + .Resolve() + .All + .Where(c => !c.ManagementDisabled && + c.ClusterName.Equals(this.dependencyResolver + .Resolve()[key] + .ClusterName)); + + var producer = this.dependencyResolver.Resolve().GetProducer(key); - timers.Value[key] = new Timer(callback, null, dueTime, period); + this.timers[key] = new Timer( + _ => + { + producer.BatchProduceAsync( + consumers.SelectMany(c => c.Assignment.Select(a => + new BatchProduceItem( + topicName, + Guid.NewGuid(), + new ConsumerMetric() + { + ConsumerName = c.ConsumerName, + Topic = a.Topic, + GroupId = c.GroupId, + InstanceName = $"{Environment.MachineName}-{c.MemberId}", + PausedPartitions = c.PausedPartitions + .Where(p => p.Topic == a.Topic) + .Select(p => p.Partition.Value), + RunningPartitions = c.RunningPartitions + .Where(p => p.Topic == a.Topic) + .Select(p => p.Partition.Value), + SentAt = DateTime.Now, + }, + null))) + .ToList()) + .GetAwaiter() + .GetResult(); + }, + null, + TimeSpan.Zero, + TimeSpan.FromSeconds(1)); } - public static void Unset(string key) + public void Stop(string key) { - if (timers.Value.TryGetValue(key, out var timer)) + if (this.timers.TryGetValue(key, out var timer)) { timer.Dispose(); - timers.Value.Remove(key); + this.timers.Remove(key); } } } -} +} \ No newline at end of file diff --git a/src/KafkaFlow/Configuration/ClusterConfiguration.cs b/src/KafkaFlow/Configuration/ClusterConfiguration.cs index c5c2e5cfa..a7a2e2029 100644 --- a/src/KafkaFlow/Configuration/ClusterConfiguration.cs +++ b/src/KafkaFlow/Configuration/ClusterConfiguration.cs @@ -7,36 +7,47 @@ namespace KafkaFlow.Configuration internal class ClusterConfiguration { private readonly Func securityInformationHandler; - private Action onStopHandler = _ => { }; + private readonly Action onStartedHandler; + private readonly Action onStoppingHandler; private readonly List producers = new(); private readonly List consumers = new(); public ClusterConfiguration( KafkaConfiguration kafka, + string name, IEnumerable brokers, Func securityInformationHandler, - Action onStopHandler) + Action onStartedHandler, + Action onStoppingHandler) { this.securityInformationHandler = securityInformationHandler; + this.Name = name ?? Guid.NewGuid().ToString(); this.Kafka = kafka; this.Brokers = brokers.ToList(); - this.onStopHandler = onStopHandler; + this.onStoppingHandler = onStoppingHandler; + this.onStartedHandler = onStartedHandler; } public KafkaConfiguration Kafka { get; } public IReadOnlyCollection Brokers { get; } + public string Name { get; } + public IReadOnlyCollection Producers => this.producers.AsReadOnly(); public IReadOnlyCollection Consumers => this.consumers.AsReadOnly(); - public void AddConsumers(IEnumerable configurations) => this.consumers.AddRange(configurations); + public Action OnStartedHandler => this.onStartedHandler; - public void AddProducers(IEnumerable configurations) => this.producers.AddRange(configurations); + public Action OnStoppingHandler => this.onStoppingHandler; - public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke(); + public void AddConsumers(IEnumerable configurations) => + this.consumers.AddRange(configurations); - public Action OnStopHandler => this.onStopHandler; + public void AddProducers(IEnumerable configurations) => + this.producers.AddRange(configurations); + + public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke(); } -} +} \ No newline at end of file diff --git a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs index 29a18f468..fca01c20d 100644 --- a/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs @@ -9,9 +9,10 @@ internal class ClusterConfigurationBuilder : IClusterConfigurationBuilder { private readonly List producers = new(); private readonly List consumers = new(); - private Action onStopHandler = _ => { }; - + private Action onStartedHandler = _ => { }; + private Action onStoppingHandler = _ => { }; private IEnumerable brokers; + private string name; private Func securityInformationHandler; public ClusterConfigurationBuilder(IDependencyConfigurator dependencyConfigurator) @@ -25,9 +26,11 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration) { var configuration = new ClusterConfiguration( kafkaConfiguration, + this.name, this.brokers.ToList(), this.securityInformationHandler, - this.onStopHandler); + this.onStartedHandler, + this.onStoppingHandler); configuration.AddProducers(this.producers.Select(x => x.Build(configuration))); configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration))); @@ -41,6 +44,12 @@ public IClusterConfigurationBuilder WithBrokers(IEnumerable brokers) return this; } + public IClusterConfigurationBuilder WithName(string name) + { + this.name = name; + return this; + } + public IClusterConfigurationBuilder WithSecurityInformation(Action handler) { // Uses a handler to avoid in-memory stored passwords for long periods @@ -85,9 +94,15 @@ public IClusterConfigurationBuilder AddConsumer(Action handler) + public IClusterConfigurationBuilder OnStopping(Action handler) + { + this.onStoppingHandler = handler; + return this; + } + + public IClusterConfigurationBuilder OnStarted(Action handler) { - this.onStopHandler = handler; + this.onStartedHandler = handler; return this; } } diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs index 29880a34d..6f3c4e8be 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs @@ -13,7 +13,8 @@ public ConsumerConfiguration( ConsumerConfig consumerConfig, IEnumerable topics, string consumerName, - bool isReadonly, + string clusterName, + bool managementDisabled, int workersCount, int bufferSize, Factory distributionStrategyFactory, @@ -39,7 +40,8 @@ public ConsumerConfiguration( this.AutoCommitInterval = autoCommitInterval; this.Topics = topics ?? throw new ArgumentNullException(nameof(topics)); this.ConsumerName = consumerName ?? Guid.NewGuid().ToString(); - this.IsReadonly = isReadonly; + this.ClusterName = clusterName; + this.ManagementDisabled = managementDisabled; this.WorkersCount = workersCount; this.StatisticsHandlers = statisticsHandlers; this.PartitionsAssignedHandlers = partitionsAssignedHandlers; @@ -62,7 +64,9 @@ public ConsumerConfiguration( public string ConsumerName { get; } - public bool IsReadonly { get; } + public string ClusterName { get; } + + public bool ManagementDisabled { get; } public int WorkersCount { diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 5a9d63a15..5864018ed 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration using Confluent.Kafka; using KafkaFlow.Consumers.DistributionStrategies; - internal class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder + internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder { private readonly List topics = new(); private readonly List> statisticsHandlers = new(); @@ -18,7 +18,7 @@ internal class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder private ConsumerConfig consumerConfig; private string name; - private bool isReadonly; + private bool disableManagement; private string groupId; private AutoOffsetReset? autoOffsetReset; private int? maxPollIntervalMs; @@ -66,9 +66,9 @@ public IConsumerConfigurationBuilder WithName(string name) return this; } - public IConsumerConfigurationBuilder AsReadonly() + public IConsumerConfigurationBuilder DisableManagement() { - this.isReadonly = true; + this.disableManagement = true; return this; } @@ -201,7 +201,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) this.consumerConfig, this.topics, this.name, - this.isReadonly, + clusterConfiguration.Name, + this.disableManagement, this.workersCount, this.bufferSize, this.distributionStrategyFactory, diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs index a953ec80f..b41bd8987 100644 --- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs @@ -30,9 +30,14 @@ public interface IConsumerConfiguration string ConsumerName { get; } /// - /// Gets the consumer readonly flag + /// Gets the cluster name /// - bool IsReadonly { get; } + string ClusterName { get; } + + /// + /// Gets a value indicating whether the consumer is able to be manageable or not + /// + bool ManagementDisabled { get; } /// /// Gets or sets the number of workers diff --git a/src/KafkaFlow/Configuration/KafkaConfiguration.cs b/src/KafkaFlow/Configuration/KafkaConfiguration.cs index af58eff94..b5e9895b0 100644 --- a/src/KafkaFlow/Configuration/KafkaConfiguration.cs +++ b/src/KafkaFlow/Configuration/KafkaConfiguration.cs @@ -1,14 +1,11 @@ namespace KafkaFlow.Configuration { - using System; using System.Collections.Generic; internal class KafkaConfiguration { private readonly List clusters = new(); - public Action OnStopHandler = _ => { }; - public IReadOnlyCollection Clusters => this.clusters; public void AddClusters(IEnumerable configurations) => this.clusters.AddRange(configurations); diff --git a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs index 657f89373..bc198701b 100644 --- a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs @@ -24,7 +24,7 @@ public ConsumerFlowManager( this.logHandler = logHandler; } - public IReadOnlyList PausedPartitions => this.pausedPartitions.AsReadOnly(); + public IEnumerable PausedPartitions => this.pausedPartitions.AsReadOnly(); public ConsumerFlowStatus Status { @@ -121,12 +121,9 @@ public void Resume(IReadOnlyCollection topicPartitions) } } - public void UpdatePausedPartitions(IEnumerable partitionsRunning) + public void CleanPausedPartitions() { - foreach (var p in partitionsRunning) - { - this.pausedPartitions.Remove(p); - } + this.pausedPartitions.Clear(); } public void Dispose() diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index 2398b1d0b..330508cb8 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -10,8 +10,6 @@ internal class ConsumerManager : IConsumerManager { private readonly ILogHandler logHandler; - private CancellationTokenSource stopCancellationTokenSource; - public ConsumerManager( IConsumer consumer, IConsumerWorkerPool consumerWorkerPool, @@ -35,8 +33,6 @@ public ConsumerManager( public Task StartAsync() { - this.stopCancellationTokenSource = new CancellationTokenSource(); - this.Feeder.Start(); return Task.CompletedTask; @@ -44,11 +40,6 @@ public Task StartAsync() public async Task StopAsync() { - if (this.stopCancellationTokenSource != null && this.stopCancellationTokenSource.Token.CanBeCanceled) - { - this.stopCancellationTokenSource.Cancel(); - } - await this.Feeder.StopAsync().ConfigureAwait(false); await this.WorkerPool.StopAsync().ConfigureAwait(false); @@ -70,8 +61,6 @@ private void OnPartitionAssigned(IReadOnlyCollection partitions) "Partitions assigned", this.GetConsumerLogInfo(partitions)); - this.Consumer.FlowManager.UpdatePausedPartitions(partitions); - this.WorkerPool .StartAsync(partitions) .GetAwaiter() diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index 18e5cb61e..5e7d5c5ab 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -41,6 +41,8 @@ public async Task StartAsync(IEnumerable partitions) this.logHandler), partitions); + this.consumer.FlowManager.CleanPausedPartitions(); + await Task.WhenAll( Enumerable .Range(0, this.consumer.Configuration.WorkersCount) diff --git a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs index 315b035f2..df1a4b368 100644 --- a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs @@ -17,7 +17,7 @@ public interface IConsumerFlowManager : IDisposable /// /// Gets a list of the consumer paused partitions /// - IReadOnlyList PausedPartitions { get; } + IEnumerable PausedPartitions { get; } /// /// Pauses a set of partitions @@ -31,6 +31,9 @@ public interface IConsumerFlowManager : IDisposable /// A list of partitions void Resume(IReadOnlyCollection topicPartitions); - void UpdatePausedPartitions(IEnumerable partitionsRunning); + /// + /// Removes all partitions from the list of paused partitions + /// + void CleanPausedPartitions(); } } diff --git a/src/KafkaFlow/Consumers/IMessageConsumer.cs b/src/KafkaFlow/Consumers/IMessageConsumer.cs index 6e288ae46..7cc873f4e 100644 --- a/src/KafkaFlow/Consumers/IMessageConsumer.cs +++ b/src/KafkaFlow/Consumers/IMessageConsumer.cs @@ -16,9 +16,14 @@ public interface IMessageConsumer string ConsumerName { get; } /// - /// Gets the readonly flag defined in the configuration + /// Gets the unique cluster´s name defined in the configuration /// - bool IsReadonly { get; } + string ClusterName { get; } + + /// + /// Gets a value indicating whether the consumer is able to be manageable or not + /// + bool ManagementDisabled { get; } /// /// Gets the group id define in the configuration @@ -67,12 +72,12 @@ public interface IMessageConsumer /// /// Gets the consumer's paused partitions /// - IReadOnlyList PausedPartitions { get; } + IEnumerable PausedPartitions { get; } /// /// Gets the consumer's running partitions /// - IReadOnlyList RunningPartitions { get; } + IEnumerable RunningPartitions { get; } /// /// Overrides the offsets of the given partitions and restart the consumer diff --git a/src/KafkaFlow/Consumers/MessageConsumer.cs b/src/KafkaFlow/Consumers/MessageConsumer.cs index 017d2d8b0..7f79ee603 100644 --- a/src/KafkaFlow/Consumers/MessageConsumer.cs +++ b/src/KafkaFlow/Consumers/MessageConsumer.cs @@ -21,7 +21,9 @@ public MessageConsumer( public string ConsumerName => this.consumerManager.Consumer.Configuration.ConsumerName; - public bool IsReadonly => this.consumerManager.Consumer.Configuration.IsReadonly; + public string ClusterName => this.consumerManager.Consumer.Configuration.ClusterName; + + public bool ManagementDisabled => this.consumerManager.Consumer.Configuration.ManagementDisabled; public string GroupId => this.consumerManager.Consumer.Configuration.GroupId; @@ -37,9 +39,9 @@ public MessageConsumer( public int WorkersCount => this.consumerManager.Consumer.Configuration.WorkersCount; - public IReadOnlyList PausedPartitions => this.consumerManager.Consumer.FlowManager?.PausedPartitions ?? new List(); + public IEnumerable PausedPartitions => this.consumerManager.Consumer.FlowManager?.PausedPartitions ?? Enumerable.Empty(); - public IReadOnlyList RunningPartitions => this.Assignment.Except(this.PausedPartitions).ToList(); + public IEnumerable RunningPartitions => this.Assignment.Except(this.PausedPartitions); public void Pause(IReadOnlyCollection topicPartitions) { diff --git a/src/KafkaFlow/KafkaBus.cs b/src/KafkaFlow/KafkaBus.cs index 92e8cf0b8..cf06f25a4 100644 --- a/src/KafkaFlow/KafkaBus.cs +++ b/src/KafkaFlow/KafkaBus.cs @@ -40,25 +40,33 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default) stopTokenSource.Token.Register(() => this.StopAsync().GetAwaiter().GetResult()); - foreach (var consumerConfiguration in this.configuration.Clusters.SelectMany(cl => cl.Consumers)) + foreach (var cluster in this.configuration.Clusters) { - var dependencyScope = this.dependencyResolver.CreateScope(); + foreach (var consumerConfiguration in cluster.Consumers) + { + var dependencyScope = this.dependencyResolver.CreateScope(); - var consumerManager = this.consumerManagerFactory.Create(consumerConfiguration, dependencyScope.Resolver); + var consumerManager = this.consumerManagerFactory.Create(consumerConfiguration, dependencyScope.Resolver); - this.consumerManagers.Add(consumerManager); - this.Consumers.Add( - new MessageConsumer( - consumerManager, - dependencyScope.Resolver.Resolve())); + this.consumerManagers.Add(consumerManager); + this.Consumers.Add( + new MessageConsumer( + consumerManager, + dependencyScope.Resolver.Resolve())); - await consumerManager.StartAsync().ConfigureAwait(false); + await consumerManager.StartAsync().ConfigureAwait(false); + } + + cluster.OnStartedHandler(this.dependencyResolver); } } public Task StopAsync() { - this.configuration.Clusters.ToList().ForEach(c => c.OnStopHandler(this.dependencyResolver)); + foreach (var cluster in this.configuration.Clusters) + { + cluster.OnStoppingHandler(this.dependencyResolver); + } return Task.WhenAll(this.consumerManagers.Select(x => x.StopAsync())); }