Skip to content

Commit

Permalink
feat: changes to Consumer Telemetry Metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 16, 2021
1 parent d859c9c commit af846a5
Show file tree
Hide file tree
Showing 32 changed files with 108 additions and 126 deletions.
1 change: 1 addition & 0 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/dist/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<title>KafkaFlow - Dashboard</title>
<base href="/kafka-flow/">
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/popper.js@1.16.1/dist/umd/popper.min.js" integrity="sha384-9/reFTGAW83EW2RDu2S0VKaIzap3H66lZH81PoYlFhbGU+6BZp6G7niu735Sk7lN" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
Expand Down
70 changes: 35 additions & 35 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/dist/main.js

Large diffs are not rendered by default.

16 changes: 7 additions & 9 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { RouterModule, Routes } from '@angular/router';
import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { FormsModule } from '@angular/forms';
import { NgxMaskModule, IConfig } from 'ngx-mask'
import { NgxMaskModule, IConfig } from 'ngx-mask';

import { AppRoutingModule } from './app-routing.module';
import { AppComponent } from './app.component';
Expand All @@ -11,17 +11,16 @@ import { ConsumerComponent } from './consumer/consumer.component';
import { HttpErrorInterceptor } from './http-error.interceptor';
import { GroupByPipe } from './group-by.pipe';
import { SortPipe } from './sort.pipe';
import { CallbackPipe } from './callback.pipe';

import { ConsumerService } from './consumer.service';
import { HttpClientModule, HTTP_INTERCEPTORS } from '@angular/common/http';
import { NgbModule } from '@ng-bootstrap/ng-bootstrap';
import { RewindModalComponent } from './shared/rewind-modal/rewind-modal.component';
import { WorkersCountModalComponent } from './shared/workers-count-modal/workers-count-modal.component';
import { ResetModalComponent } from './shared/reset-modal/reset-modal.component';
import { PauseModalComponent } from './shared/pause-modal/pause-modal.component';
import { RestartModalComponent } from './shared/restart-modal/restart-modal.component';
import { ResumeModalComponent } from './shared/resume-modal/resume-modal.component';
import { RewindModalComponent } from './consumer/shared/rewind-modal/rewind-modal.component';
import { WorkersCountModalComponent } from './consumer/shared/workers-count-modal/workers-count-modal.component';
import { ResetModalComponent } from './consumer/shared/reset-modal/reset-modal.component';
import { PauseModalComponent } from './consumer/shared/pause-modal/pause-modal.component';
import { RestartModalComponent } from './consumer/shared/restart-modal/restart-modal.component';
import { ResumeModalComponent } from './consumer/shared/resume-modal/resume-modal.component';

const maskConfig: Partial<IConfig> = {
validation: false,
Expand All @@ -36,7 +35,6 @@ const appRoutes: Routes = [
AppComponent,
GroupByPipe,
SortPipe,
CallbackPipe,
HomeComponent,
ConsumerComponent,
RewindModalComponent,
Expand Down
14 changes: 0 additions & 14 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/src/app/callback.pipe.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { HttpClient, HttpHeaders } from '@angular/common/http';

export class ConsumerService {
private headers: HttpHeaders;
private accessPointUrl: string = '/kafka-flow';
private accessPointUrl = '/kafka-flow';

constructor(private http: HttpClient) {
this.headers = new HttpHeaders({'Content-Type': 'application/json; charset=utf-8'});
Expand All @@ -19,14 +19,14 @@ export class ConsumerService {

public updateWorkersCount(groupId: string, consumerName: string, workersCount: number) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/change-worker-count`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/change-worker-count`,
{ workersCount: workersCount },
{headers: this.headers});
}

public resetOffset(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/reset-offsets`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/reset-offsets`,
{ confirm: true },
{headers: this.headers});
}
Expand All @@ -40,21 +40,21 @@ export class ConsumerService {

public restart(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/restart`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/restart`,
null,
{headers: this.headers});
}

public resume(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/resume`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/resume`,
null,
{headers: this.headers});
}

public rewindOffset(groupId: string, consumerName: string, date: Date) {
return this.http.post<any>(
this.accessPointUrl +`/groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`,
this.accessPointUrl + `/groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`,
{ date: new Date(date.getTime() - (date.getTimezoneOffset() * 60000)).toISOString() },
{headers: this.headers});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<ngb-alert #successAlert *ngIf="successMessage" type="success" (closed)="successMessage=''">
<div class="text-center"><b>Success! </b> <span class="text-center">{{successMessage}}</span></div>
</ngb-alert>
<div class="container" *ngFor="let group of enrichGroups(groups) | callback: removeReadonly">
<div class="container" *ngFor="let group of telemetryResponse.groups ">
<div class="card my-3">
<div class="card-body">
<h3>Group Id: {{ group.groupId }}</h3>
<div *ngFor="let consumer of group.consumers">
<h4>Consumer: {{ consumer.consumerName }}</h4>
<h4>Consumer: {{ consumer.name }}</h4>
<h4>Status: <span [ngClass]="{
'text-success': consumer.status == 'Running',
'text-warning': consumer.status == 'Paused',
Expand All @@ -15,15 +15,15 @@ <h4>Status: <span [ngClass]="{
<!-- <h4>Lag: 55</h4> -->
<h4>Workers: {{consumer.workersCount}}</h4>
<div class="mt-3 mb-3">
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.consumerName)" *ngIf="consumer.status == 'Paused'">Resume</button>
<button class="btn btn-outline-warning ml-2" type="button" (click)="openPauseModal(group.groupId, consumer.consumerName)" *ngIf="consumer.status == 'Running'">Pause</button>
<button class="btn btn-outline-danger ml-2" type="button" (click)="openRestartModal(group.groupId, consumer.consumerName)">Restart</button>
<button class="btn btn-outline-info ml-2" type="button" (click)="openRewindModal(group.groupId, consumer.consumerName)">Rewind Offset</button>
<button class="btn btn-outline-secondary ml-2" type="button" (click)="openResetModal(group.groupId, consumer.consumerName)">Reset Offset</button>
<button class="btn btn-outline-success ml-2" type="button" (click)="openWorkersCountModal(group.groupId, consumer.consumerName, consumer.workersCount)">Update number of workers</button>
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.name)" *ngIf="consumer.status == 'Paused'">Resume</button>
<button class="btn btn-outline-warning ml-2" type="button" (click)="openPauseModal(group.groupId, consumer.name)" *ngIf="consumer.status == 'Running'">Pause</button>
<button class="btn btn-outline-danger ml-2" type="button" (click)="openRestartModal(group.groupId, consumer.name)">Restart</button>
<button class="btn btn-outline-info ml-2" type="button" (click)="openRewindModal(group.groupId, consumer.name)">Rewind Offset</button>
<button class="btn btn-outline-secondary ml-2" type="button" (click)="openResetModal(group.groupId, consumer.name)">Reset Offset</button>
<button class="btn btn-outline-success ml-2" type="button" (click)="openWorkersCountModal(group.groupId, consumer.name, consumer.workersCount)">Update number of workers</button>
</div>
<ng-template let-partitionAssignments ngFor [ngForOf]="consumer.partitionAssignments | groupBy:'topic'">
<p class="mb-0 font-weight-bold">Topic: {{partitionAssignments.key}}</p>
<ng-template let-assignment ngFor [ngForOf]="consumer.assignments | groupBy:'topicName'">
<p class="mb-0 font-weight-bold">Topic: {{assignment.key}}</p>
<table class="table table-striped table-hover mt-1">
<thead>
<tr>
Expand All @@ -36,7 +36,7 @@ <h4>Workers: {{consumer.workersCount}}</h4>
</tr>
</thead>
<tbody>
<ng-template let-partitionAssignment ngFor [ngForOf]="partitionAssignments.value | sort:'instanceName'">
<ng-template let-partitionAssignment ngFor [ngForOf]="assignment.value | sort:'instanceName'">
<ng-template [ngIf]="partitionAssignment.pausedPartitions?.length > 0">
<tr>
<td class="text-left">{{partitionAssignment.instanceName}}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,49 @@ import { ConsumerService } from '../consumer.service'
import {interval, Subject} from 'rxjs';
import { debounceTime } from 'rxjs/operators';
import { NgbModal, NgbAlert } from '@ng-bootstrap/ng-bootstrap';
import { RewindModalComponent } from '../shared/rewind-modal/rewind-modal.component';
import { WorkersCountModalComponent } from '../shared/workers-count-modal/workers-count-modal.component';
import { ResetModalComponent } from '../shared/reset-modal/reset-modal.component';
import { PauseModalComponent } from '../shared/pause-modal/pause-modal.component';
import { ResumeModalComponent } from '../shared/resume-modal/resume-modal.component';
import { RestartModalComponent } from '../shared/restart-modal/restart-modal.component';
import { RewindModalComponent } from './shared/rewind-modal/rewind-modal.component';
import { WorkersCountModalComponent } from './shared/workers-count-modal/workers-count-modal.component';
import { ResetModalComponent } from './shared/reset-modal/reset-modal.component';
import { PauseModalComponent } from './shared/pause-modal/pause-modal.component';
import { ResumeModalComponent } from './shared/resume-modal/resume-modal.component';
import { RestartModalComponent } from './shared/restart-modal/restart-modal.component';

@Component({
selector: 'app-consumer',
templateUrl: './consumer.component.html'
})
export class ConsumerComponent implements OnInit {
public groups: Array<any> = [];
public telemetryResponse: any = [];
@ViewChild('successAlert', { static: false }) successAlert: NgbAlert | undefined;
private successSubject = new Subject<string>();
private delayMs = 1000;
successMessage = '';

constructor(private modalService: NgbModal, private consumerService: ConsumerService) {
interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = this.enrichGroups(data)));
interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.telemetryResponse = this.enrichConsumers(data)));
}

enrichGroups(groups: any) {
enrichConsumers(telemetryResponse: any) {
var self = this;
groups.forEach(function (g: any) {
telemetryResponse.groups.forEach(function (g: any) {
g.consumers.forEach(function (c: any) {
c.status =
c.partitionAssignments.some((pa: any) => pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
c.assignments.some((pa: any) => pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
"Running" :
c.partitionAssignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
c.assignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
"Paused" :
"Not Running";
c.partitionAssignments.forEach( (pa: any) => pa.isLost = !self.isActive(pa.lastUpdate)
c.assignments.forEach( (pa: any) => pa.isLost = !self.isActive(pa.lastUpdate)
)
})
});

return groups;
return telemetryResponse;
}

isActive(date: string) {
return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5;
}

removeReadonly(group: any) {
return !(group.consumers[0].managementDisabled==1);
}

openWorkersCountModal(groupId: string, consumerName: string, workersCount: number) {
const modalRef = this.modalService.open(WorkersCountModalComponent);
modalRef.componentInstance.groupId = groupId;
Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow.Admin.Dashboard/ClientApp/src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<title>KafkaFlow - Dashboard</title>
<base href="/">
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/popper.js@1.16.1/dist/umd/popper.min.js" integrity="sha384-9/reFTGAW83EW2RDu2S0VKaIzap3H66lZH81PoYlFhbGU+6BZp6G7niu735Sk7lN" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.min.js" integrity="sha384-+YQ4JLhjyBLPDQt//I+STsc9iw4uQqACwlvpslubQzn4u2UU2UFM80nGisd026JF" crossorigin="anonymous"></script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer)
Subscription = consumer.Subscription,
ConsumerName = consumer.ConsumerName,
GroupId = consumer.GroupId,
FlowStatus = consumer.FlowStatus?.ToString() ?? ConsumerStatus.NotRunning.ToString(),
Status = consumer.FlowStatus?.ToString() ?? ConsumerStatus.NotRunning.ToString(),
MemberId = consumer.MemberId,
WorkersCount = consumer.WorkersCount,
ClientInstanceName = consumer.ClientInstanceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaFlow.Admin.WebApi.Adapters

internal static class TelemetryResponseAdapter
{
internal static TelemetryResponse Adapt(this IEnumerable<ConsumerMetric> metrics)
internal static TelemetryResponse Adapt(this IEnumerable<ConsumerTelemetryMetric> metrics)
{
return new TelemetryResponse
{
Expand All @@ -24,7 +24,7 @@ internal static TelemetryResponse Adapt(this IEnumerable<ConsumerMetric> metrics
metric => new TelemetryResponse.Consumer
{
Name = metric.First().ConsumerName,
WorkersCount = metric.First().WorkersCount,
WorkersCount = metric.OrderByDescending(x=> x.SentAt).First().WorkersCount,
Assignments = metric.Select(
x => new TelemetryResponse.TopicPartitionAssignment
{
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow.Admin.WebApi/Contracts/ConsumerResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class ConsumerResponse
public string ClientInstanceName { get; set; }

/// <summary>
/// Gets or sets the current consumer flow status
/// Gets or sets the current consumer status
/// </summary>
public string FlowStatus { get; set; }
public string Status { get; set; }
}
}
1 change: 0 additions & 1 deletion src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class GroupsController : ControllerBase
/// </summary>
/// <param name="consumers">The accessor class that provides access to the consumers</param>
/// <param name="adminProducer">The producer to publish admin messages</param>
/// <param name="storage">The cache interface to get metric data</param>
public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer)
{
this.consumers = consumers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static IClusterConfigurationBuilder EnableTelemetry(
.AddTypedHandlers(
handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandlersFromAssemblyOf<ConsumerMetricHandler>())))
.AddHandlersFromAssemblyOf<ConsumerTelemetryMetricHandler>())))
.OnStarted(resolver => resolver.Resolve<ITelemetryScheduler>().Start(telemetryId, topicName))
.OnStopping(resolver => resolver.Resolve<ITelemetryScheduler>().Stop(telemetryId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ namespace KafkaFlow.Admin.Handlers
using KafkaFlow.Admin.Messages;
using KafkaFlow.TypedHandler;

internal class ConsumerMetricHandler : IMessageHandler<ConsumerMetric>
internal class ConsumerTelemetryMetricHandler : IMessageHandler<ConsumerTelemetryMetric>
{
private readonly ITelemetryStorage storage;

public ConsumerMetricHandler(ITelemetryStorage storage) => this.storage = storage;
public ConsumerTelemetryMetricHandler(ITelemetryStorage storage) => this.storage = storage;

public Task Handle(IMessageContext context, ConsumerMetric message)
public Task Handle(IMessageContext context, ConsumerTelemetryMetric message)
{
this.storage.Put(message);
return Task.CompletedTask;
Expand Down
10 changes: 5 additions & 5 deletions src/KafkaFlow.Admin/ITelemetryStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ namespace KafkaFlow.Admin
public interface ITelemetryStorage
{
/// <summary>
/// Gets the stored metric indexed with the parameters provided
/// Gets all the consumer telemetry metrics
/// </summary>
/// <returns>The list of consumer metrics stored in the cache</returns>
IEnumerable<ConsumerMetric> Get();
/// <returns>The list of consumer metrics</returns>
IEnumerable<ConsumerTelemetryMetric> Get();

/// <summary>
/// Store the metric provided
/// </summary>
/// <param name="metric">The consumer metric</param>
void Put(ConsumerMetric metric);
/// <param name="telemetryMetric">The consumer telemetry metric</param>
void Put(ConsumerTelemetryMetric telemetryMetric);
}
}
Loading

0 comments on commit af846a5

Please sign in to comment.