diff --git a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj index e9eeebe0f..c9cd49e0c 100644 --- a/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj +++ b/samples/KafkaFlow.Sample.Dashboard/KafkaFlow.Sample.Dashboard.csproj @@ -28,8 +28,9 @@ - - + + + diff --git a/samples/KafkaFlow.Sample.Dashboard/Startup.cs b/samples/KafkaFlow.Sample.Dashboard/Startup.cs index a077be2ab..ffc47b2b7 100644 --- a/samples/KafkaFlow.Sample.Dashboard/Startup.cs +++ b/samples/KafkaFlow.Sample.Dashboard/Startup.cs @@ -21,23 +21,15 @@ public void ConfigureServices(IServiceCollection services) cluster => cluster .WithBrokers(new[] { "localhost:9092" }) .EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id") + .EnableTelemetry("kafka-flow.telemetry", "kafka-flow.telemetry.group.id") .AddConsumer( consumer => consumer - .Topics("topic-dashboard", "topic-dashboard-new") + .Topics("topic-dashboard") .WithGroupId("groupid-dashboard") .WithName("consumer-dashboard") .WithBufferSize(100) .WithWorkersCount(20) .WithAutoOffsetReset(AutoOffsetReset.Latest) - ) - .AddConsumer( - consumer => consumer - .Topic("topic-dashboard-other") - .WithGroupId("groupid-dashboard-other") - .WithName("consumer-dashboard-other") - .WithBufferSize(10) - .WithWorkersCount(10) - .WithAutoOffsetReset(AutoOffsetReset.Latest) )) ); diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 135f94da7..142aecf37 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -3,9 +3,9 @@ using System; using System.Linq; using System.Threading.Tasks; + using KafkaFlow.Admin; using Confluent.Kafka; using global::Microsoft.Extensions.DependencyInjection; - using KafkaFlow.Admin; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; using KafkaFlow.Producers; diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/app.module.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/app.module.ts index ee840a248..14b3ae9b5 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/app.module.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/app.module.ts @@ -9,6 +9,9 @@ import { AppComponent } from './app.component'; import { HomeComponent } from './home/home.component'; import { ConsumerComponent } from './consumer/consumer.component'; import { HttpErrorInterceptor } from './http-error.interceptor'; +import { GroupByPipe } from './group-by.pipe'; +import { SortPipe } from './sort.pipe'; +import { CallbackPipe } from './callback.pipe'; import { ConsumerService } from './consumer.service'; import { HttpClientModule, HTTP_INTERCEPTORS } from '@angular/common/http'; @@ -31,6 +34,9 @@ const appRoutes: Routes = [ @NgModule({ declarations: [ AppComponent, + GroupByPipe, + SortPipe, + CallbackPipe, HomeComponent, ConsumerComponent, RewindModalComponent, diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/callback.pipe.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/callback.pipe.ts new file mode 100644 index 000000000..81284f132 --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/callback.pipe.ts @@ -0,0 +1,14 @@ +import { PipeTransform, Pipe } from '@angular/core'; + +@Pipe({ + name: 'callback', + pure: false +}) +export class CallbackPipe implements PipeTransform { + transform(items: any[], callback: (item: any) => boolean): any { + if (!items || !callback) { + return items; + } + return items.filter(item => callback(item)); + } +} diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.html b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.html index 6e9378a0a..9363582be 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.html +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.html @@ -1,71 +1,84 @@ -
Success! {{ successMessage}}
+
Success! {{successMessage}}
-
-
-
-

Group Id: {{ group.groupId }}

-
-

Consumer: {{ consumer.consumerName }}

-

Status: {{consumer.flowStatus}}

-

Lag: 55

-

Workers: {{consumer.workersCount}}

-
- - - - - - -
- -

Topic: {{partitionAssignment.topic}}

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
HostnamePartitionsLagStatusActions
{{partitionAssignment.hostName}}{{partitionAssignment.pausedPartitions}}0Paused - - - -
{{partitionAssignment.hostName}}{{partitionAssignment.runningPartitions}}0Running - - - -
-
-
-
+
+
+
+

Group Id: {{ group.groupId }}

+
+

Consumer: {{ consumer.consumerName }}

+

Status: {{consumer.status}}

+ +

Workers: {{consumer.workersCount}}

+
+ + + + + +
+ +

Topic: {{partitionAssignments.key}}

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Hostname - consumer instancePartitionsStatusLastUpdate
{{partitionAssignment.hostName}}{{partitionAssignment.pausedPartitions}} +
+ - + Paused +
{{partitionAssignment.lastUpdate | date: "medium"}}
{{partitionAssignment.hostName}}{{partitionAssignment.runningPartitions}} +
+ - + Running +
{{partitionAssignment.lastUpdate | date: "medium"}}
+
+
+
+
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts index 37b69f694..866d99483 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/consumer/consumer.component.ts @@ -1,7 +1,6 @@ import { Component, Input, OnInit, ViewChild } from '@angular/core'; import { ConsumerService } from '../consumer.service' import { Subject } from 'rxjs'; -import { delay } from 'rxjs/internal/operators'; import { debounceTime } from 'rxjs/operators'; import { NgbModal, NgbAlert } from '@ng-bootstrap/ng-bootstrap'; import { RewindModalComponent } from '../shared/rewind-modal/rewind-modal.component'; @@ -24,6 +23,10 @@ export class ConsumerComponent implements OnInit { constructor(private modalService: NgbModal, private consumerService: ConsumerService) { } + removeReadonly(group: any) { + return !(group.consumers.length == 1 && group.consumers[0].isReadonly==1); + } + openWorkersCountModal(groupId: any, consumerName: any, workersCount: number) { const modalRef = this.modalService.open(WorkersCountModalComponent); modalRef.componentInstance.groupId = groupId; diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/group-by.pipe.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/group-by.pipe.ts new file mode 100644 index 000000000..cc3b29b52 --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/group-by.pipe.ts @@ -0,0 +1,26 @@ +import { Pipe, PipeTransform } from '@angular/core'; + +@Pipe({ + name: 'groupBy' +}) +export class GroupByPipe implements PipeTransform { + transform(collection: any[], property: string): any[] { + // prevents the application from breaking if the array of objects doesn't exist yet + if(!collection) { + return null as any; + } + + const groupedCollection = collection.reduce((previous, current)=> { + if(!previous[current[property]]) { + previous[current[property]] = [current]; + } else { + previous[current[property]].push(current); + } + + return previous; + }, {}); + + // this will return an array of objects, each object containing a group of objects + return Object.keys(groupedCollection).map(key => ({ key, value: groupedCollection[key] })); + } +} diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.html b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.html index 1d1d5e445..c6433fd9d 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.html +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.html @@ -7,17 +7,18 @@
diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts index 2b73625f0..32071e85d 100644 --- a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/home/home.component.ts @@ -9,7 +9,30 @@ import {interval} from "rxjs"; export class HomeComponent implements OnInit { public groups: Array = []; constructor (private consumerService: ConsumerService) { - interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = data)); + interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = this.buildCalculatedProperties(data))); + } + + buildCalculatedProperties(groups: any) { + var self = this; + groups.forEach(function (g: any) { + g.consumers.forEach(function (c: any) { + c.status = + c.partitionAssignments.some(function (pa: any){ return pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate);}) ? + "Running" : + c.partitionAssignments.some(function (pa: any){ return pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate);}) ? + "Paused" : + "Not Running"; + c.partitionAssignments.forEach(function (pa: any) { + pa.isLost = !self.isActive(pa.lastUpdate); + }) + }) + }); + + return groups; + } + + isActive(date: any) { + return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5; } ngOnInit(): void { diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/sort.pipe.ts b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/sort.pipe.ts new file mode 100644 index 000000000..ef3493684 --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/sort.pipe.ts @@ -0,0 +1,26 @@ +import { Pipe, PipeTransform } from "@angular/core"; + +@Pipe({ + name: "sort" +}) +export class SortPipe implements PipeTransform { + transform(array: any, field: string, order: string = "asc"): any[] { + order = order.toLowerCase(); + if(order!= "asc" && order!= "desc") { + return array; + } + if (!Array.isArray(array)) { + return null as any; + } + array.sort((a: any, b: any) => { + if (a[field] < b[field]) { + return (order == "asc")? -1 : 1; + } else if (a[field] > b[field]) { + return (order == "asc")? 1 : -1; + } else { + return 0; + } + }); + return array; + } +} diff --git a/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/assets/.gitkeep b/src/KafkaFlow.Admin.Dashboard/AngularFiles/src/assets/.gitkeep deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs b/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs new file mode 100644 index 000000000..0c2bbc1ea --- /dev/null +++ b/src/KafkaFlow.Admin.Dashboard/ApplicationBuilderExtensions.cs @@ -0,0 +1,42 @@ +namespace KafkaFlow.Admin.Dashboard +{ + using System.IO; + using System.Reflection; + using Microsoft.AspNetCore.Builder; + using Microsoft.AspNetCore.Hosting; + using Microsoft.AspNetCore.SpaServices.AngularCli; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Hosting; + + /// + /// Extension methods over IApplicationBuilder + /// + public static class ApplicationBuilderExtensions + { + /// + /// Creates a kafkaflow dashboard + /// + /// Instance of + /// Instance of + /// + public static IApplicationBuilder UseKafkaFlowDashboard(this IApplicationBuilder app, IWebHostEnvironment env) + { + if (!env.IsDevelopment()) + { + app.UseSpaStaticFiles(); + } + + app.UseSpa(spa => + { + spa.Options.SourcePath = @$"{Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)}\AngularFiles"; + + if (env.IsDevelopment()) + { + spa.UseAngularCliServer(npmScript: "start"); + } + }); + + return app; + } + } +} diff --git a/src/KafkaFlow.Admin.Dashboard/ServiceCollectionExtensions.cs b/src/KafkaFlow.Admin.Dashboard/ServiceCollectionExtensions.cs index bf79e5069..83519bda4 100644 --- a/src/KafkaFlow.Admin.Dashboard/ServiceCollectionExtensions.cs +++ b/src/KafkaFlow.Admin.Dashboard/ServiceCollectionExtensions.cs @@ -1,16 +1,18 @@ namespace KafkaFlow.Admin.Dashboard { - using System.IO; - using System.Reflection; using System.Text.Json.Serialization; - using Microsoft.AspNetCore.Builder; - using Microsoft.AspNetCore.Hosting; - using Microsoft.AspNetCore.SpaServices.AngularCli; using Microsoft.Extensions.DependencyInjection; - using Microsoft.Extensions.Hosting; + /// + /// Extension methods over IServiceCollection + /// public static class ServiceCollectionExtensions { + /// + /// Configures a KafkaFlow Dashboard + /// + /// Instance of + /// public static IServiceCollection AddKafkaFlowDashboard(this IServiceCollection services) { // In production, the Angular files will be served from this directory @@ -29,25 +31,5 @@ public static IServiceCollection AddKafkaFlowDashboard(this IServiceCollection s return services; } - - public static IApplicationBuilder UseKafkaFlowDashboard(this IApplicationBuilder app, IWebHostEnvironment env) - { - if (!env.IsDevelopment()) - { - app.UseSpaStaticFiles(); - } - - app.UseSpa(spa => - { - spa.Options.SourcePath = @$"{ Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)}\AngularFiles"; - - if (env.IsDevelopment()) - { - spa.UseAngularCliServer(npmScript: "start"); - } - }); - - return app; - } } } diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs index 5f3fd0c05..9db72a6d4 100644 --- a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs +++ b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs @@ -6,16 +6,14 @@ namespace KafkaFlow.Admin.WebApi.Adapters using KafkaFlow.Admin.Messages; using KafkaFlow.Admin.WebApi.Contracts; using KafkaFlow.Consumers; - using Microsoft.Extensions.Caching.Memory; internal static class ConsumerResponseAdapter { - internal static ConsumerResponse Adapt(this IMessageConsumer consumer, IMemoryCache cache) + internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryCache cache) { var consumerResponse = new ConsumerResponse() { Subscription = consumer.Subscription, - IsReadonly = consumer.IsReadonly, ConsumerName = consumer.ConsumerName, GroupId = consumer.GroupId, FlowStatus = consumer.FlowStatus ?? ConsumerFlowStatus.NotRunning, @@ -25,15 +23,16 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer, IMemoryCa IsReadonly = consumer.IsReadonly, }; - consumerResponse.PartitionAssignments = - cache.TryGetValue($"{consumer.GroupId}-{consumer.ConsumerName}", out List metric) ? - metric.Select(m => m.Adapt()) : - consumer.GetLocalPartitionAssignments(); + var cachedMetrics = cache.Get(consumer.GroupId, consumer.ConsumerName); + + consumerResponse.PartitionAssignments = cachedMetrics.Any() ? + cachedMetrics.Select(m => m.Adapt()) : + GetLocalInfo(consumer); return consumerResponse; } - private static IEnumerable GetLocalPartitionAssignments(this IMessageConsumer consumer) + private static IEnumerable GetLocalInfo(IMessageConsumer consumer) { return consumer.PausedPartitions .GroupBy(c => c.Topic) @@ -42,7 +41,6 @@ private static IEnumerable GetLocalPartitionAssignments(thi Topic = c.Key, HostName = Environment.MachineName, PausedPartitions = c.Select(x => x.Partition.Value).ToList(), - LastUpdate = DateTime.Now, }) .Union(consumer.RunningPartitions .GroupBy(c => c.Topic) @@ -51,7 +49,6 @@ private static IEnumerable GetLocalPartitionAssignments(thi Topic = c.Key, HostName = Environment.MachineName, RunningPartitions = c.Select(x => x.Partition.Value).ToList(), - LastUpdate = DateTime.Now, })); } diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs index 0a8ee2947..4f363c841 100644 --- a/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs +++ b/src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs @@ -3,8 +3,8 @@ namespace KafkaFlow.Admin.WebApi.Controllers using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; - using Adapters; using KafkaFlow.Admin.Messages; + using KafkaFlow.Admin.WebApi.Adapters; using KafkaFlow.Admin.WebApi.Contracts; using KafkaFlow.Consumers; using Microsoft.AspNetCore.Mvc; @@ -18,16 +18,19 @@ public class ConsumersController : ControllerBase { private readonly IConsumerAccessor consumers; private readonly IAdminProducer adminProducer; + private readonly ITelemetryCache cache; /// /// Initializes a new instance of the class. /// /// The accessor class that provides access to the consumers /// The producer to publish admin messages - public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer) + /// The cache interface to get metric data + public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache) { this.consumers = consumers; this.adminProducer = adminProducer; + this.cache = cache; } /// @@ -43,7 +46,7 @@ public IActionResult Get([FromRoute] string groupId) this.consumers .All .Where(x => x.GroupId == groupId) - .Select(x=> x.Adapt())); + .Select(x=> x.Adapt(this.cache))); } /// @@ -68,7 +71,7 @@ public IActionResult Get( return this.NotFound(); } - return this.Ok(consumer.Adapt()); + return this.Ok(consumer.Adapt(this.cache)); } /// diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs index 7a3dcfc0c..ea48a8b48 100644 --- a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs +++ b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs @@ -3,8 +3,8 @@ namespace KafkaFlow.Admin.WebApi.Controllers using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; - using Adapters; using KafkaFlow.Admin.Messages; + using KafkaFlow.Admin.WebApi.Adapters; using KafkaFlow.Admin.WebApi.Contracts; using KafkaFlow.Consumers; using Microsoft.AspNetCore.Mvc; @@ -18,18 +18,19 @@ public class GroupsController : ControllerBase { private readonly IConsumerAccessor consumers; private readonly IAdminProducer adminProducer; + private readonly ITelemetryCache cache; /// /// Initializes a new instance of the class. /// /// The accessor class that provides access to the consumers /// The producer to publish admin messages - public GroupsController( - IConsumerAccessor consumers, - IAdminProducer adminProducer) + /// The cache interface to get metric data + public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache) { this.consumers = consumers; this.adminProducer = adminProducer; + this.cache = cache; } /// @@ -47,7 +48,7 @@ public IActionResult Get() x => new GroupResponse { GroupId = x.First().GroupId, - Consumers = x.Select(y => y.Adapt()), + Consumers = x.Select(y => y.Adapt(this.cache)), })); } diff --git a/src/KafkaFlow.Admin/AdminProducer.cs b/src/KafkaFlow.Admin/AdminProducer.cs index ebff0aca6..7f052edc1 100644 --- a/src/KafkaFlow.Admin/AdminProducer.cs +++ b/src/KafkaFlow.Admin/AdminProducer.cs @@ -3,7 +3,6 @@ namespace KafkaFlow.Admin using System; using System.Threading.Tasks; using KafkaFlow.Admin.Messages; - using KafkaFlow.Producers; internal class AdminProducer : IAdminProducer { diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index 52c97e3ae..cb67e5637 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -6,9 +6,9 @@ using KafkaFlow.Admin; using KafkaFlow.Admin.Handlers; using KafkaFlow.Admin.Messages; - using KafkaFlow.Admin.Producers; using KafkaFlow.Configuration; using KafkaFlow.Consumers; + using KafkaFlow.Producers; using KafkaFlow.Serializer; using KafkaFlow.TypedHandler; using Microsoft.Extensions.Caching.Memory; @@ -30,10 +30,11 @@ public static IClusterConfigurationBuilder EnableAdminMessages( string adminTopic, string adminConsumerGroup) { - cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); + return cluster .AddProducer( producer => producer @@ -71,60 +72,72 @@ public static IClusterConfigurationBuilder EnableAdminMessages( /// Creates the telemetry producer and consumer to send and receive metric messages /// /// The cluster configuration builder - /// The topic to be used by the metric commands - /// The consumer group prefix + /// The topic to be used by the metric commands + /// The consumer group prefix /// public static IClusterConfigurationBuilder EnableTelemetry( this IClusterConfigurationBuilder cluster, - string metricTopic, - string metricConsumerGroup) + string topicName, + string consumerGroup) { cluster.DependencyConfigurator.AddSingleton(); + cluster.DependencyConfigurator.AddSingleton(); - cluster.DependencyConfigurator.AddSingleton(); + var groupId = + $"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; + + var producerName = $"telemetry-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}"; return cluster - .AddProducer( + .AddProducer( + producerName, producer => producer - .DefaultTopic(metricTopic) + .DefaultTopic(topicName) .AddMiddlewares( middlewares => middlewares .AddSerializer())) .AddConsumer( consumer => consumer - .Topic(metricTopic) - .WithGroupId( - $"{metricConsumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}") + .Topic(topicName) + .WithGroupId(groupId) .WithWorkersCount(1) .AsReadonly() .WithBufferSize(10) .WithAutoOffsetReset(AutoOffsetReset.Latest) .WithPartitionsAssignedHandler((resolver, partitions) => { - TimerManager.Instance.Set( + TelemetryScheduler.Set( + groupId, async _ => { - foreach (var c in resolver.Resolve().All.Where(c => !c.IsReadonly)) - { - foreach (var assignment in c.Assignment ) - { - await resolver.Resolve().ProduceAsync( - new ConsumerMetric() - { - ConsumerName = c.ConsumerName, - Topic = assignment.Topic, - GroupId = c.GroupId, - HostName = $"{Environment.MachineName}-{c.MemberId}", - PausedPartitions = c.PausedPartitions - .Where(p=> p.Topic == assignment.Topic) - .Select(p => p.Partition.Value), - RunningPartitions = c.RunningPartitions - .Where(p=> p.Topic == assignment.Topic) - .Select(p => p.Partition.Value), - SentAt = DateTime.Now, - }); - } - } + await resolver + .Resolve() + .GetProducer(producerName) + .BatchProduceAsync( + resolver + .Resolve() + .All + .Where(c => !c.IsReadonly) // TODO GET ONLY CONSUMERS FROM THIS CLUSTER + .SelectMany(c => c.Assignment.Select(a => + new BatchProduceItem( + topicName, + Guid.NewGuid().ToString(), + new ConsumerMetric() + { + ConsumerName = c.ConsumerName, + Topic = a.Topic, + GroupId = c.GroupId, + HostName = $"{Environment.MachineName}-{c.MemberId}", + PausedPartitions = c.PausedPartitions + .Where(p => p.Topic == a.Topic) + .Select(p => p.Partition.Value), + RunningPartitions = c.RunningPartitions + .Where(p => p.Topic == a.Topic) + .Select(p => p.Partition.Value), + SentAt = DateTime.Now, + }, + null))) + .ToList()); }, TimeSpan.Zero, TimeSpan.FromSeconds(1)); @@ -135,15 +148,16 @@ await resolver.Resolve().ProduceAsync( .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) - .AddHandlersFromAssemblyOf()))); + .AddHandlersFromAssemblyOf()))) + .OnStop(_ => TelemetryScheduler.Unset(groupId)); } /// public static IClusterConfigurationBuilder EnableTelemetry( this IClusterConfigurationBuilder cluster, - string adminTopic) + string topicName) { - return cluster.EnableTelemetry(adminTopic, $"Telemetry-{Assembly.GetEntryAssembly().GetName().Name}"); + return cluster.EnableTelemetry(topicName, $"Telemetry-{Assembly.GetEntryAssembly().GetName().Name}"); } } } diff --git a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs b/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs new file mode 100644 index 000000000..9c8ae4c87 --- /dev/null +++ b/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs @@ -0,0 +1,19 @@ +namespace KafkaFlow.Admin.Handlers +{ + using System.Threading.Tasks; + using KafkaFlow.Admin.Messages; + using KafkaFlow.TypedHandler; + + internal class ConsumerMetricHandler : IMessageHandler + { + private readonly ITelemetryCache cache; + + public ConsumerMetricHandler(ITelemetryCache cache) => this.cache = cache; + + public Task Handle(IMessageContext context, ConsumerMetric message) + { + this.cache.Put(message.GroupId, message.ConsumerName, message); + return Task.CompletedTask; + } + } +} diff --git a/src/KafkaFlow.Admin/ITelemetryCache.cs b/src/KafkaFlow.Admin/ITelemetryCache.cs new file mode 100644 index 000000000..e47853dfd --- /dev/null +++ b/src/KafkaFlow.Admin/ITelemetryCache.cs @@ -0,0 +1,27 @@ +namespace KafkaFlow.Admin +{ + using System.Collections.Generic; + using KafkaFlow.Admin.Messages; + + /// + /// Used to implement a cache provider to manage telemetry data + /// + public interface ITelemetryCache + { + /// + /// Gets the cached metric indexed with the parameters provided + /// + /// The group id + /// The consumer name + /// The list of consumer metrics stored in the cache + List Get(string groupId, string consumerName); + + /// + /// Puts in the cache the metric provided + /// + /// The group id + /// The consumer name + /// The consumer metric + void Put(string groupId, string consumerName, ConsumerMetric metric); + } +} diff --git a/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj b/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj index c1444c83c..bab36347b 100644 --- a/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj +++ b/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj @@ -14,4 +14,9 @@ + + + + + diff --git a/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs b/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs new file mode 100644 index 000000000..5904c9da3 --- /dev/null +++ b/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs @@ -0,0 +1,55 @@ +namespace KafkaFlow.Admin.Messages +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + + /// + /// A message that contains data related to consumers partition assigment + /// + [DataContract] + public class ConsumerMetric + { + /// + /// Gets or sets the consumer group id + /// + [DataMember(Order = 1)] + public string GroupId { get; set; } + + /// + /// Gets or sets the consumer name + /// + [DataMember(Order = 2)] + public string ConsumerName { get; set; } + + /// + /// Gets or sets the topic name + /// + [DataMember(Order = 3)] + public string Topic { get; set; } + + /// + /// Gets or sets the consumer host name + /// + [DataMember(Order = 4)] + public string HostName { get; set; } + + /// + /// Gets or sets the list of running partitions + /// + [DataMember(Order = 5)] + public IEnumerable RunningPartitions { get; set; } + + /// + /// Gets or sets the list of paused partitions + /// + [DataMember(Order = 6)] + public IEnumerable PausedPartitions { get; set; } + + /// + /// Gets or sets the datetime at when the metric was sent + /// + [DataMember(Order = 7)] + public DateTime SentAt { get; set; } + } +} diff --git a/src/KafkaFlow.Admin/TelemetryCache.cs b/src/KafkaFlow.Admin/TelemetryCache.cs new file mode 100644 index 000000000..1ff821325 --- /dev/null +++ b/src/KafkaFlow.Admin/TelemetryCache.cs @@ -0,0 +1,40 @@ +namespace KafkaFlow.Admin +{ + using System.Collections.Generic; + using KafkaFlow.Admin.Messages; + using Microsoft.Extensions.Caching.Memory; + + /// + /// Provide cache operations related to telemetry data + /// + public class TelemetryCache : ITelemetryCache + { + private readonly IMemoryCache cache; + + /// + /// Initializes a new instance of the class. + /// + /// The memory cache interface to get metric data + public TelemetryCache(IMemoryCache cache) => this.cache = cache; + + /// + public List Get(string groupId, string consumerName) + { + return this.cache.TryGetValue(this.BuildKey(groupId, consumerName), out List metric) ? + metric : + new List(); + } + + /// + public void Put(string groupId, string consumerName, ConsumerMetric metric) + { + var entry = this.Get(groupId, consumerName); + + entry.RemoveAll(e => e.HostName == metric.HostName); + entry.Add(metric); + this.cache.Set(this.BuildKey(groupId, consumerName), entry); + } + + private string BuildKey(string groupId, string consumerName) => $"{groupId}-{consumerName}"; + } +} diff --git a/src/KafkaFlow.Admin/TelemetryScheduler.cs b/src/KafkaFlow.Admin/TelemetryScheduler.cs new file mode 100644 index 000000000..73be65afa --- /dev/null +++ b/src/KafkaFlow.Admin/TelemetryScheduler.cs @@ -0,0 +1,27 @@ +namespace KafkaFlow.Admin +{ + using System; + using System.Collections.Generic; + using System.Threading; + + internal class TelemetryScheduler + { + private static readonly Lazy> timers = new (() => new Dictionary()); + + public static void Set(string key, TimerCallback callback, TimeSpan dueTime, TimeSpan period) + { + Unset(key); + + timers.Value[key] = new Timer(callback, null, dueTime, period); + } + + public static void Unset(string key) + { + if (timers.Value.TryGetValue(key, out var timer)) + { + timer.Dispose(); + timers.Value.Remove(key); + } + } + } +} diff --git a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs index dcd0c86da..657f89373 100644 --- a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs @@ -121,6 +121,14 @@ public void Resume(IReadOnlyCollection topicPartitions) } } + public void UpdatePausedPartitions(IEnumerable partitionsRunning) + { + foreach (var p in partitionsRunning) + { + this.pausedPartitions.Remove(p); + } + } + public void Dispose() { this.heartbeatTokenSource?.Cancel(); diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index c112f99c0..2398b1d0b 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -70,6 +70,8 @@ private void OnPartitionAssigned(IReadOnlyCollection partitions) "Partitions assigned", this.GetConsumerLogInfo(partitions)); + this.Consumer.FlowManager.UpdatePausedPartitions(partitions); + this.WorkerPool .StartAsync(partitions) .GetAwaiter() diff --git a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs index 7b2ff19d6..315b035f2 100644 --- a/src/KafkaFlow/Consumers/IConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/IConsumerFlowManager.cs @@ -30,5 +30,7 @@ public interface IConsumerFlowManager : IDisposable /// /// A list of partitions void Resume(IReadOnlyCollection topicPartitions); + + void UpdatePausedPartitions(IEnumerable partitionsRunning); } }