Skip to content

Commit

Permalink
feat: changes to Consumer Telemetry Metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 16, 2021
1 parent d859c9c commit 9c809da
Show file tree
Hide file tree
Showing 18 changed files with 60 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { RouterModule, Routes } from '@angular/router';
import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { FormsModule } from '@angular/forms';
import { NgxMaskModule, IConfig } from 'ngx-mask'
import { NgxMaskModule, IConfig } from 'ngx-mask';

import { AppRoutingModule } from './app-routing.module';
import { AppComponent } from './app.component';
Expand All @@ -11,7 +11,6 @@ import { ConsumerComponent } from './consumer/consumer.component';
import { HttpErrorInterceptor } from './http-error.interceptor';
import { GroupByPipe } from './group-by.pipe';
import { SortPipe } from './sort.pipe';
import { CallbackPipe } from './callback.pipe';

import { ConsumerService } from './consumer.service';
import { HttpClientModule, HTTP_INTERCEPTORS } from '@angular/common/http';
Expand All @@ -36,7 +35,6 @@ const appRoutes: Routes = [
AppComponent,
GroupByPipe,
SortPipe,
CallbackPipe,
HomeComponent,
ConsumerComponent,
RewindModalComponent,
Expand Down
14 changes: 0 additions & 14 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/callback.pipe.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { HttpClient, HttpHeaders } from '@angular/common/http';

export class ConsumerService {
private headers: HttpHeaders;
private accessPointUrl: string = '/kafka-flow';
private accessPointUrl = 'api/kafka-flow';

constructor(private http: HttpClient) {
this.headers = new HttpHeaders({'Content-Type': 'application/json; charset=utf-8'});
Expand All @@ -19,14 +19,14 @@ export class ConsumerService {

public updateWorkersCount(groupId: string, consumerName: string, workersCount: number) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/change-worker-count`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/change-worker-count`,
{ workersCount: workersCount },
{headers: this.headers});
}

public resetOffset(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/reset-offsets`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/reset-offsets`,
{ confirm: true },
{headers: this.headers});
}
Expand All @@ -40,21 +40,21 @@ export class ConsumerService {

public restart(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/restart`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/restart`,
null,
{headers: this.headers});
}

public resume(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/resume`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/resume`,
null,
{headers: this.headers});
}

public rewindOffset(groupId: string, consumerName: string, date: Date) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`,
{ date: new Date(date.getTime() - (date.getTimezoneOffset() * 60000)).toISOString() },
{headers: this.headers});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<ngb-alert #successAlert *ngIf="successMessage" type="success" (closed)="successMessage=''">
<div class="text-center"><b>Success! </b> <span class="text-center">{{successMessage}}</span></div>
</ngb-alert>
<div class="container" *ngFor="let group of enrichGroups(groups) | callback: removeReadonly">
<div class="container" *ngFor="let group of telemetryResponse.groups ">
<div class="card my-3">
<div class="card-body">
<h3>Group Id: {{ group.groupId }}</h3>
<div *ngFor="let consumer of group.consumers">
<h4>Consumer: {{ consumer.consumerName }}</h4>
<h4>Consumer: {{ consumer.name }}</h4>
<h4>Status: <span [ngClass]="{
'text-success': consumer.status == 'Running',
'text-warning': consumer.status == 'Paused',
Expand All @@ -15,15 +15,15 @@ <h4>Status: <span [ngClass]="{
<!-- <h4>Lag: 55</h4> -->
<h4>Workers: {{consumer.workersCount}}</h4>
<div class="mt-3 mb-3">
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.consumerName)" *ngIf="consumer.status == 'Paused'">Resume</button>
<button class="btn btn-outline-warning ml-2" type="button" (click)="openPauseModal(group.groupId, consumer.consumerName)" *ngIf="consumer.status == 'Running'">Pause</button>
<button class="btn btn-outline-danger ml-2" type="button" (click)="openRestartModal(group.groupId, consumer.consumerName)">Restart</button>
<button class="btn btn-outline-info ml-2" type="button" (click)="openRewindModal(group.groupId, consumer.consumerName)">Rewind Offset</button>
<button class="btn btn-outline-secondary ml-2" type="button" (click)="openResetModal(group.groupId, consumer.consumerName)">Reset Offset</button>
<button class="btn btn-outline-success ml-2" type="button" (click)="openWorkersCountModal(group.groupId, consumer.consumerName, consumer.workersCount)">Update number of workers</button>
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.name)" *ngIf="consumer.status == 'Paused'">Resume</button>
<button class="btn btn-outline-warning ml-2" type="button" (click)="openPauseModal(group.groupId, consumer.name)" *ngIf="consumer.status == 'Running'">Pause</button>
<button class="btn btn-outline-danger ml-2" type="button" (click)="openRestartModal(group.groupId, consumer.name)">Restart</button>
<button class="btn btn-outline-info ml-2" type="button" (click)="openRewindModal(group.groupId, consumer.name)">Rewind Offset</button>
<button class="btn btn-outline-secondary ml-2" type="button" (click)="openResetModal(group.groupId, consumer.name)">Reset Offset</button>
<button class="btn btn-outline-success ml-2" type="button" (click)="openWorkersCountModal(group.groupId, consumer.name, consumer.workersCount)">Update number of workers</button>
</div>
<ng-template let-partitionAssignments ngFor [ngForOf]="consumer.partitionAssignments | groupBy:'topic'">
<p class="mb-0 font-weight-bold">Topic: {{partitionAssignments.key}}</p>
<ng-template let-assignment ngFor [ngForOf]="consumer.assignments | groupBy:'topicName'">
<p class="mb-0 font-weight-bold">Topic: {{assignment.key}}</p>
<table class="table table-striped table-hover mt-1">
<thead>
<tr>
Expand All @@ -36,7 +36,7 @@ <h4>Workers: {{consumer.workersCount}}</h4>
</tr>
</thead>
<tbody>
<ng-template let-partitionAssignment ngFor [ngForOf]="partitionAssignments.value | sort:'instanceName'">
<ng-template let-partitionAssignment ngFor [ngForOf]="assignment.value | sort:'instanceName'">
<ng-template [ngIf]="partitionAssignment.pausedPartitions?.length > 0">
<tr>
<td class="text-left">{{partitionAssignment.instanceName}}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,37 @@ import { RestartModalComponent } from '../shared/restart-modal/restart-modal.com
templateUrl: './consumer.component.html'
})
export class ConsumerComponent implements OnInit {
public groups: Array<any> = [];
public telemetryResponse: any = [];
@ViewChild('successAlert', { static: false }) successAlert: NgbAlert | undefined;
private successSubject = new Subject<string>();
private delayMs = 1000;
successMessage = '';

constructor(private modalService: NgbModal, private consumerService: ConsumerService) {
interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = this.enrichGroups(data)));
interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.telemetryResponse = this.enrichConsumers(data)));
}

enrichGroups(groups: any) {
enrichConsumers(telemetryResponse: any) {
var self = this;
groups.forEach(function (g: any) {
telemetryResponse.groups.forEach(function (g: any) {
g.consumers.forEach(function (c: any) {
c.status =
c.partitionAssignments.some((pa: any) => pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
c.assignments.some((pa: any) => pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
"Running" :
c.partitionAssignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
c.assignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
"Paused" :
"Not Running";
c.partitionAssignments.forEach( (pa: any) => pa.isLost = !self.isActive(pa.lastUpdate)
c.assignments.forEach( (pa: any) => pa.isLost = !self.isActive(pa.lastUpdate)
)
})
});

return groups;
return telemetryResponse;
}

isActive(date: string) {
return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5;
}

removeReadonly(group: any) {
return !(group.consumers[0].managementDisabled==1);
}

openWorkersCountModal(groupId: string, consumerName: string, workersCount: number) {
const modalRef = this.modalService.open(WorkersCountModalComponent);
modalRef.componentInstance.groupId = groupId;
Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<title>KafkaFlow - Dashboard</title>
<base href="/">
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/popper.js@1.16.1/dist/umd/popper.min.js" integrity="sha384-9/reFTGAW83EW2RDu2S0VKaIzap3H66lZH81PoYlFhbGU+6BZp6G7niu735Sk7lN" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer)
Subscription = consumer.Subscription,
ConsumerName = consumer.ConsumerName,
GroupId = consumer.GroupId,
FlowStatus = consumer.FlowStatus?.ToString() ?? ConsumerStatus.NotRunning.ToString(),
Status = consumer.FlowStatus?.ToString() ?? ConsumerStatus.NotRunning.ToString(),
MemberId = consumer.MemberId,
WorkersCount = consumer.WorkersCount,
ClientInstanceName = consumer.ClientInstanceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaFlow.Admin.WebApi.Adapters

internal static class TelemetryResponseAdapter
{
internal static TelemetryResponse Adapt(this IEnumerable<ConsumerMetric> metrics)
internal static TelemetryResponse Adapt(this IEnumerable<ConsumerTelemetryMetric> metrics)
{
return new TelemetryResponse
{
Expand All @@ -24,7 +24,7 @@ internal static TelemetryResponse Adapt(this IEnumerable<ConsumerMetric> metrics
metric => new TelemetryResponse.Consumer
{
Name = metric.First().ConsumerName,
WorkersCount = metric.First().WorkersCount,
WorkersCount = metric.OrderByDescending(x=> x.SentAt).First().WorkersCount,
Assignments = metric.Select(
x => new TelemetryResponse.TopicPartitionAssignment
{
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class ConsumerResponse
public string ClientInstanceName { get; set; }

/// <summary>
/// Gets or sets the current consumer flow status
/// Gets or sets the current consumer status
/// </summary>
public string FlowStatus { get; set; }
public string Status { get; set; }
}
}
1 change: 0 additions & 1 deletion src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class GroupsController : ControllerBase
/// </summary>
/// <param name="consumers">The accessor class that provides access to the consumers</param>
/// <param name="adminProducer">The producer to publish admin messages</param>
/// <param name="storage">The cache interface to get metric data</param>
public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer)
{
this.consumers = consumers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static IClusterConfigurationBuilder EnableTelemetry(
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<ConsumerMetricHandler>())))
.AddHandlersFromAssemblyOf<ConsumerTelemetryMetricHandler>())))
.OnStarted(resolver => resolver.Resolve<ITelemetryScheduler>().Start(telemetryId, topicName))
.OnStopping(resolver => resolver.Resolve<ITelemetryScheduler>().Stop(telemetryId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ namespace KafkaFlow.Admin.Handlers
using KafkaFlow.Admin.Messages;
using KafkaFlow.TypedHandler;

internal class ConsumerMetricHandler : IMessageHandler<ConsumerMetric>
internal class ConsumerTelemetryMetricHandler : IMessageHandler<ConsumerTelemetryMetric>
{
private readonly ITelemetryStorage storage;

public ConsumerMetricHandler(ITelemetryStorage storage) => this.storage = storage;
public ConsumerTelemetryMetricHandler(ITelemetryStorage storage) => this.storage = storage;

public Task Handle(IMessageContext context, ConsumerMetric message)
public Task Handle(IMessageContext context, ConsumerTelemetryMetric message)
{
this.storage.Put(message);
return Task.CompletedTask;
Expand Down
10 changes: 5 additions & 5 deletions src/KafkaFlow.Admin/ITelemetryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace KafkaFlow.Admin
public interface ITelemetryStorage
{
/// <summary>
/// Gets the stored metric indexed with the parameters provided
/// Gets all the consumer telemetry metrics
/// </summary>
/// <returns>The list of consumer metrics stored in the cache</returns>
IEnumerable<ConsumerMetric> Get();
/// <returns>The list of consumer metrics</returns>
IEnumerable<ConsumerTelemetryMetric> Get();

/// <summary>
/// Store the metric provided
/// </summary>
/// <param name="metric">The consumer metric</param>
void Put(ConsumerMetric metric);
/// <param name="telemetryMetric">The consumer telemetry metric</param>
void Put(ConsumerTelemetryMetric telemetryMetric);
}
}
12 changes: 6 additions & 6 deletions src/KafkaFlow.Admin/MemoryTelemetryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal class MemoryTelemetryStorage : ITelemetryStorage
private readonly TimeSpan expiryTime;
private readonly object cleanSyncRoot = new();

private readonly ConcurrentDictionary<(string, string, string), ConsumerMetric> metrics = new();
private readonly ConcurrentDictionary<(string, string, string), ConsumerTelemetryMetric> metrics = new();

private DateTime lastCleanDate;

Expand All @@ -26,17 +26,17 @@ public MemoryTelemetryStorage(TimeSpan cleanRunInterval, TimeSpan expiryTime, ID
this.lastCleanDate = dateTimeProvider.MinValue;
}

public IEnumerable<ConsumerMetric> Get() => this.metrics.Values;
public IEnumerable<ConsumerTelemetryMetric> Get() => this.metrics.Values;

public void Put(ConsumerMetric metric)
public void Put(ConsumerTelemetryMetric telemetryMetric)
{
this.TryCleanItems();
this.metrics[BuildKey(metric)] = metric;
this.metrics[BuildKey(telemetryMetric)] = telemetryMetric;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static (string, string, string) BuildKey(ConsumerMetric metric) =>
(metric.InstanceName, metric.GroupId, metric.ConsumerName);
private static (string, string, string) BuildKey(ConsumerTelemetryMetric telemetryMetric) =>
(telemetryMetric.InstanceName, telemetryMetric.GroupId, telemetryMetric.ConsumerName);

private void TryCleanItems()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace KafkaFlow.Admin.Messages
/// A message that contains data related to consumers partition assigment
/// </summary>
[DataContract]
public class ConsumerMetric
public class ConsumerTelemetryMetric
{
/// <summary>
/// Gets or sets the consumer group id
Expand Down
6 changes: 4 additions & 2 deletions src/KafkaFlow.Admin/TelemetryScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ public void Stop(string telemetryId)

private static void ProduceTelemetry(
string topicName,
IReadOnlyCollection<IMessageConsumer> consumers,
IList<IMessageConsumer> consumers,
IMessageProducer producer)
{
var items = consumers.SelectMany(
c => c.Assignment.Select(
a => new ConsumerMetric()
a => new ConsumerTelemetryMetric()
{
ConsumerName = c.ConsumerName,
Topic = a.Topic,
Expand All @@ -70,6 +70,8 @@ private static void ProduceTelemetry(
RunningPartitions = c.RunningPartitions
.Where(p => p.Topic == a.Topic)
.Select(p => p.Partition.Value),
WorkersCount = c.WorkersCount,
Status = c.FlowStatus.GetValueOrDefault(),
SentAt = DateTime.Now,
}));

Expand Down
Loading

0 comments on commit 9c809da

Please sign in to comment.