From 9c809daa8365b9cfb5bb96148ebb1c4712e22bb1 Mon Sep 17 00:00:00 2001 From: Douglas Lima Date: Tue, 15 Jun 2021 22:01:50 -0300 Subject: [PATCH] feat: changes to Consumer Telemetry Metric --- .../ClientApp/src/app/app.module.ts | 4 +--- .../ClientApp/src/app/callback.pipe.ts | 14 ------------ .../ClientApp/src/app/consumer.service.ts | 12 +++++----- .../src/app/consumer/consumer.component.html | 22 +++++++++---------- .../src/app/consumer/consumer.component.ts | 21 +++++++----------- .../ClientApp/src/index.html | 1 + .../Adapters/ConsumerResponseAdapter.cs | 2 +- .../Adapters/TelemetryResponseAdapter.cs | 4 ++-- .../Contracts/ConsumerResponse.cs | 4 ++-- .../Controllers/GroupsController.cs | 1 - .../ClusterConfigurationBuilderExtensions.cs | 2 +- ...r.cs => ConsumerTelemetryMetricHandler.cs} | 6 ++--- src/KafkaFlow.Admin/ITelemetryStorage.cs | 10 ++++----- src/KafkaFlow.Admin/MemoryTelemetryStorage.cs | 12 +++++----- ...erMetric.cs => ConsumerTelemetryMetric.cs} | 2 +- src/KafkaFlow.Admin/TelemetryScheduler.cs | 6 +++-- .../MemoryTelemetryStorageTests.cs | 14 ++++++------ .../Consumers/ConsumerFlowManager.cs | 2 +- 18 files changed, 60 insertions(+), 79 deletions(-) delete mode 100644 src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/callback.pipe.ts rename src/KafkaFlow.Admin/Handlers/{ConsumerMetricHandler.cs => ConsumerTelemetryMetricHandler.cs} (52%) rename src/KafkaFlow.Admin/Messages/{ConsumerMetric.cs => ConsumerTelemetryMetric.cs} (97%) diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/app.module.ts b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/app.module.ts index 14b3ae9b5..5cbb80bac 100644 --- a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/app.module.ts +++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/app.module.ts @@ -2,7 +2,7 @@ import { RouterModule, Routes } from '@angular/router'; import { NgModule } from '@angular/core'; import { BrowserModule } from '@angular/platform-browser'; import { FormsModule } from '@angular/forms'; -import { NgxMaskModule, IConfig } from 'ngx-mask' +import { NgxMaskModule, IConfig } from 'ngx-mask'; import { AppRoutingModule } from './app-routing.module'; import { AppComponent } from './app.component'; @@ -11,7 +11,6 @@ import { ConsumerComponent } from './consumer/consumer.component'; import { HttpErrorInterceptor } from './http-error.interceptor'; import { GroupByPipe } from './group-by.pipe'; import { SortPipe } from './sort.pipe'; -import { CallbackPipe } from './callback.pipe'; import { ConsumerService } from './consumer.service'; import { HttpClientModule, HTTP_INTERCEPTORS } from '@angular/common/http'; @@ -36,7 +35,6 @@ const appRoutes: Routes = [ AppComponent, GroupByPipe, SortPipe, - CallbackPipe, HomeComponent, ConsumerComponent, RewindModalComponent, diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/callback.pipe.ts b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/callback.pipe.ts deleted file mode 100644 index 1a30424c5..000000000 --- a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/callback.pipe.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { PipeTransform, Pipe } from '@angular/core'; - -@Pipe({ - name: 'callback', - pure: false -}) -export class CallbackPipe implements PipeTransform { - transform(items: any[], callback: (item: any) => boolean): any { - if (!items || !callback) { - return items; - } - return items.filter((item: any) => callback(item)); - } -} diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer.service.ts b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer.service.ts index d5c2f3c87..7fb1a7fd5 100644 --- a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer.service.ts +++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer.service.ts @@ -7,7 +7,7 @@ import { HttpClient, HttpHeaders } from '@angular/common/http'; export class ConsumerService { private headers: HttpHeaders; - private accessPointUrl: string = '/kafka-flow'; + private accessPointUrl = 'api/kafka-flow'; constructor(private http: HttpClient) { this.headers = new HttpHeaders({'Content-Type': 'application/json; charset=utf-8'}); @@ -19,14 +19,14 @@ export class ConsumerService { public updateWorkersCount(groupId: string, consumerName: string, workersCount: number) { return this.http.post( - this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/change-worker-count`, + this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/change-worker-count`, { workersCount: workersCount }, {headers: this.headers}); } public resetOffset(groupId: string, consumerName: string) { return this.http.post( - this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/reset-offsets`, + this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/reset-offsets`, { confirm: true }, {headers: this.headers}); } @@ -40,21 +40,21 @@ export class ConsumerService { public restart(groupId: string, consumerName: string) { return this.http.post( - this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/restart`, + this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/restart`, null, {headers: this.headers}); } public resume(groupId: string, consumerName: string) { return this.http.post( - this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/resume`, + this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/resume`, null, {headers: this.headers}); } public rewindOffset(groupId: string, consumerName: string, date: Date) { return this.http.post( - this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`, + this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`, { date: new Date(date.getTime() - (date.getTimezoneOffset() * 60000)).toISOString() }, {headers: this.headers}); } diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.html b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.html index 1d7490353..eead37bf2 100644 --- a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.html +++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.html @@ -1,12 +1,12 @@
Success! {{successMessage}}
-
+

Group Id: {{ group.groupId }}

-

Consumer: {{ consumer.consumerName }}

+

Consumer: {{ consumer.name }}

Status: Lag: 55

-->

Workers: {{consumer.workersCount}}

- - - - - - + + + + + +
- -

Topic: {{partitionAssignments.key}}

+ +

Topic: {{assignment.key}}

@@ -36,7 +36,7 @@

Workers: {{consumer.workersCount}}

- + diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.ts b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.ts index af93acea3..0a1a05080 100644 --- a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.ts +++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/consumer/consumer.component.ts @@ -15,42 +15,37 @@ import { RestartModalComponent } from '../shared/restart-modal/restart-modal.com templateUrl: './consumer.component.html' }) export class ConsumerComponent implements OnInit { - public groups: Array = []; + public telemetryResponse: any = []; @ViewChild('successAlert', { static: false }) successAlert: NgbAlert | undefined; private successSubject = new Subject(); private delayMs = 1000; successMessage = ''; constructor(private modalService: NgbModal, private consumerService: ConsumerService) { - interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = this.enrichGroups(data))); + interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.telemetryResponse = this.enrichConsumers(data))); } - enrichGroups(groups: any) { + enrichConsumers(telemetryResponse: any) { var self = this; - groups.forEach(function (g: any) { + telemetryResponse.groups.forEach(function (g: any) { g.consumers.forEach(function (c: any) { c.status = - c.partitionAssignments.some((pa: any) => pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ? + c.assignments.some((pa: any) => pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ? "Running" : - c.partitionAssignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ? + c.assignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ? "Paused" : "Not Running"; - c.partitionAssignments.forEach( (pa: any) => pa.isLost = !self.isActive(pa.lastUpdate) + c.assignments.forEach( (pa: any) => pa.isLost = !self.isActive(pa.lastUpdate) ) }) }); - - return groups; + return telemetryResponse; } isActive(date: string) { return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5; } - removeReadonly(group: any) { - return !(group.consumers[0].managementDisabled==1); - } - openWorkersCountModal(groupId: string, consumerName: string, workersCount: number) { const modalRef = this.modalService.open(WorkersCountModalComponent); modalRef.componentInstance.groupId = groupId; diff --git a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/index.html b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/index.html index c458fe7f1..59a4f336f 100644 --- a/src/KafkaFlow.Admin.Dashboard/ClientApp/src/index.html +++ b/src/KafkaFlow.Admin.Dashboard/ClientApp/src/index.html @@ -5,6 +5,7 @@ KafkaFlow - Dashboard + diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs index 5e72cb641..d10c573a8 100644 --- a/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs +++ b/src/KafkaFlow.Admin.WebApi/Adapters/ConsumerResponseAdapter.cs @@ -14,7 +14,7 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer) Subscription = consumer.Subscription, ConsumerName = consumer.ConsumerName, GroupId = consumer.GroupId, - FlowStatus = consumer.FlowStatus?.ToString() ?? ConsumerStatus.NotRunning.ToString(), + Status = consumer.FlowStatus?.ToString() ?? ConsumerStatus.NotRunning.ToString(), MemberId = consumer.MemberId, WorkersCount = consumer.WorkersCount, ClientInstanceName = consumer.ClientInstanceName, diff --git a/src/KafkaFlow.Admin.WebApi/Adapters/TelemetryResponseAdapter.cs b/src/KafkaFlow.Admin.WebApi/Adapters/TelemetryResponseAdapter.cs index 327f4ce77..f178252db 100644 --- a/src/KafkaFlow.Admin.WebApi/Adapters/TelemetryResponseAdapter.cs +++ b/src/KafkaFlow.Admin.WebApi/Adapters/TelemetryResponseAdapter.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Admin.WebApi.Adapters internal static class TelemetryResponseAdapter { - internal static TelemetryResponse Adapt(this IEnumerable metrics) + internal static TelemetryResponse Adapt(this IEnumerable metrics) { return new TelemetryResponse { @@ -24,7 +24,7 @@ internal static TelemetryResponse Adapt(this IEnumerable metrics metric => new TelemetryResponse.Consumer { Name = metric.First().ConsumerName, - WorkersCount = metric.First().WorkersCount, + WorkersCount = metric.OrderByDescending(x=> x.SentAt).First().WorkersCount, Assignments = metric.Select( x => new TelemetryResponse.TopicPartitionAssignment { diff --git a/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs b/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs index dfd66de83..84c1a2daf 100644 --- a/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs +++ b/src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs @@ -50,8 +50,8 @@ public class ConsumerResponse public string ClientInstanceName { get; set; } /// - /// Gets or sets the current consumer flow status + /// Gets or sets the current consumer status /// - public string FlowStatus { get; set; } + public string Status { get; set; } } } diff --git a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs index dd7c7af7d..c4ac596dd 100644 --- a/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs +++ b/src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs @@ -24,7 +24,6 @@ public class GroupsController : ControllerBase /// /// The accessor class that provides access to the consumers /// The producer to publish admin messages - /// The cache interface to get metric data public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer) { this.consumers = consumers; diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index 8e9c7996c..e37d5a7e1 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -108,7 +108,7 @@ public static IClusterConfigurationBuilder EnableTelemetry( .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) - .AddHandlersFromAssemblyOf()))) + .AddHandlersFromAssemblyOf()))) .OnStarted(resolver => resolver.Resolve().Start(telemetryId, topicName)) .OnStopping(resolver => resolver.Resolve().Stop(telemetryId)); } diff --git a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs b/src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs similarity index 52% rename from src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs rename to src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs index 50fd32791..0e7c038c9 100644 --- a/src/KafkaFlow.Admin/Handlers/ConsumerMetricHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs @@ -4,13 +4,13 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Messages; using KafkaFlow.TypedHandler; - internal class ConsumerMetricHandler : IMessageHandler + internal class ConsumerTelemetryMetricHandler : IMessageHandler { private readonly ITelemetryStorage storage; - public ConsumerMetricHandler(ITelemetryStorage storage) => this.storage = storage; + public ConsumerTelemetryMetricHandler(ITelemetryStorage storage) => this.storage = storage; - public Task Handle(IMessageContext context, ConsumerMetric message) + public Task Handle(IMessageContext context, ConsumerTelemetryMetric message) { this.storage.Put(message); return Task.CompletedTask; diff --git a/src/KafkaFlow.Admin/ITelemetryStorage.cs b/src/KafkaFlow.Admin/ITelemetryStorage.cs index 4a57970a4..e9c81043b 100644 --- a/src/KafkaFlow.Admin/ITelemetryStorage.cs +++ b/src/KafkaFlow.Admin/ITelemetryStorage.cs @@ -9,15 +9,15 @@ namespace KafkaFlow.Admin public interface ITelemetryStorage { /// - /// Gets the stored metric indexed with the parameters provided + /// Gets all the consumer telemetry metrics /// - /// The list of consumer metrics stored in the cache - IEnumerable Get(); + /// The list of consumer metrics + IEnumerable Get(); /// /// Store the metric provided /// - /// The consumer metric - void Put(ConsumerMetric metric); + /// The consumer telemetry metric + void Put(ConsumerTelemetryMetric telemetryMetric); } } diff --git a/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs b/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs index 27c01757e..43c06245a 100644 --- a/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs +++ b/src/KafkaFlow.Admin/MemoryTelemetryStorage.cs @@ -14,7 +14,7 @@ internal class MemoryTelemetryStorage : ITelemetryStorage private readonly TimeSpan expiryTime; private readonly object cleanSyncRoot = new(); - private readonly ConcurrentDictionary<(string, string, string), ConsumerMetric> metrics = new(); + private readonly ConcurrentDictionary<(string, string, string), ConsumerTelemetryMetric> metrics = new(); private DateTime lastCleanDate; @@ -26,17 +26,17 @@ public MemoryTelemetryStorage(TimeSpan cleanRunInterval, TimeSpan expiryTime, ID this.lastCleanDate = dateTimeProvider.MinValue; } - public IEnumerable Get() => this.metrics.Values; + public IEnumerable Get() => this.metrics.Values; - public void Put(ConsumerMetric metric) + public void Put(ConsumerTelemetryMetric telemetryMetric) { this.TryCleanItems(); - this.metrics[BuildKey(metric)] = metric; + this.metrics[BuildKey(telemetryMetric)] = telemetryMetric; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static (string, string, string) BuildKey(ConsumerMetric metric) => - (metric.InstanceName, metric.GroupId, metric.ConsumerName); + private static (string, string, string) BuildKey(ConsumerTelemetryMetric telemetryMetric) => + (telemetryMetric.InstanceName, telemetryMetric.GroupId, telemetryMetric.ConsumerName); private void TryCleanItems() { diff --git a/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs b/src/KafkaFlow.Admin/Messages/ConsumerTelemetryMetric.cs similarity index 97% rename from src/KafkaFlow.Admin/Messages/ConsumerMetric.cs rename to src/KafkaFlow.Admin/Messages/ConsumerTelemetryMetric.cs index d91e85ff6..14cbcac20 100644 --- a/src/KafkaFlow.Admin/Messages/ConsumerMetric.cs +++ b/src/KafkaFlow.Admin/Messages/ConsumerTelemetryMetric.cs @@ -9,7 +9,7 @@ namespace KafkaFlow.Admin.Messages /// A message that contains data related to consumers partition assigment /// [DataContract] - public class ConsumerMetric + public class ConsumerTelemetryMetric { /// /// Gets or sets the consumer group id diff --git a/src/KafkaFlow.Admin/TelemetryScheduler.cs b/src/KafkaFlow.Admin/TelemetryScheduler.cs index 03dd4ab65..015696425 100644 --- a/src/KafkaFlow.Admin/TelemetryScheduler.cs +++ b/src/KafkaFlow.Admin/TelemetryScheduler.cs @@ -53,12 +53,12 @@ public void Stop(string telemetryId) private static void ProduceTelemetry( string topicName, - IReadOnlyCollection consumers, + IList consumers, IMessageProducer producer) { var items = consumers.SelectMany( c => c.Assignment.Select( - a => new ConsumerMetric() + a => new ConsumerTelemetryMetric() { ConsumerName = c.ConsumerName, Topic = a.Topic, @@ -70,6 +70,8 @@ private static void ProduceTelemetry( RunningPartitions = c.RunningPartitions .Where(p => p.Topic == a.Topic) .Select(p => p.Partition.Value), + WorkersCount = c.WorkersCount, + Status = c.FlowStatus.GetValueOrDefault(), SentAt = DateTime.Now, })); diff --git a/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs b/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs index f2e5467b5..e13828790 100644 --- a/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs +++ b/src/KafkaFlow.UnitTests/MemoryTelemetryStorageTests.cs @@ -51,7 +51,7 @@ public void Put_OneItem_ReturnsOneItem() .Returns(now); // Act - this.target.Put(new ConsumerMetric { SentAt = now }); + this.target.Put(new ConsumerTelemetryMetric { SentAt = now }); // Assert this.target.Get().Should().HaveCount(1); @@ -67,7 +67,7 @@ public void PutTwoItems_SameInstanceGroupConsumer_ReplaceOlder() .SetupGet(x => x.Now) .Returns(now); - var metric1 = new ConsumerMetric + var metric1 = new ConsumerTelemetryMetric { InstanceName = Guid.NewGuid().ToString(), GroupId = Guid.NewGuid().ToString(), @@ -75,7 +75,7 @@ public void PutTwoItems_SameInstanceGroupConsumer_ReplaceOlder() SentAt = now, }; - var metric2 = new ConsumerMetric + var metric2 = new ConsumerTelemetryMetric { InstanceName = metric1.InstanceName, GroupId = metric1.GroupId, @@ -102,7 +102,7 @@ public void PutTwoItems_DifferentInstanceGroupConsumer_ReturnsTwo() .SetupGet(x => x.Now) .Returns(now); - var metric1 = new ConsumerMetric + var metric1 = new ConsumerTelemetryMetric { InstanceName = Guid.NewGuid().ToString(), GroupId = Guid.NewGuid().ToString(), @@ -110,7 +110,7 @@ public void PutTwoItems_DifferentInstanceGroupConsumer_ReturnsTwo() SentAt = now, }; - var metric2 = new ConsumerMetric + var metric2 = new ConsumerTelemetryMetric { InstanceName = Guid.NewGuid().ToString(), GroupId = Guid.NewGuid().ToString(), @@ -136,7 +136,7 @@ public void PutTwoItems_ExpiryOne_ReturnsOne() .SetupGet(x => x.Now) .Returns(now); - var metric1 = new ConsumerMetric + var metric1 = new ConsumerTelemetryMetric { InstanceName = Guid.NewGuid().ToString(), GroupId = Guid.NewGuid().ToString(), @@ -146,7 +146,7 @@ public void PutTwoItems_ExpiryOne_ReturnsOne() this.target.Put(metric1); - var metric2 = new ConsumerMetric + var metric2 = new ConsumerTelemetryMetric { InstanceName = Guid.NewGuid().ToString(), GroupId = Guid.NewGuid().ToString(), diff --git a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs index 840b7f7e0..ce71922a1 100644 --- a/src/KafkaFlow/Consumers/ConsumerFlowManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerFlowManager.cs @@ -24,7 +24,7 @@ public ConsumerFlowManager( this.logHandler = logHandler; } - public IEnumerable PausedPartitions => this.pausedPartitions.AsReadOnly(); + public IEnumerable PausedPartitions => this.pausedPartitions.AsEnumerable(); public ConsumerStatus Status {
{{partitionAssignment.instanceName}}