Skip to content

Commit

Permalink
feat: create telemetry workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 1, 2021
1 parent 548154b commit 90b73a7
Show file tree
Hide file tree
Showing 28 changed files with 496 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
<Target Name="PublishRunWebpack" AfterTargets="ComputeFilesToPublish">
<!-- As part of publishing, ensure the JS resources are freshly built in production mode -->
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="npm run build -- --prod" />
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="if not exist $(PublishDir)$(SpaRoot) mkdir $(PublishDir)$(SpaRoot)"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="move dist\dashboard $(PublishDir)$(SpaRoot)dist"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="if not exist $(PublishDir)$(SpaRoot)dist mkdir $(PublishDir)$(SpaRoot)dist"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="del $(PublishDir)$(SpaRoot)dist\*.* /s /q"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="move dist\dashboard\* $(PublishDir)$(SpaRoot)dist"/>
</Target>

</Project>
12 changes: 2 additions & 10 deletions samples/KafkaFlow.Sample.Dashboard/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,15 @@ public void ConfigureServices(IServiceCollection services)
cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.EnableAdminMessages("kafka-flow.admin", "kafka-flow.admin.group.id")
.EnableTelemetry("kafka-flow.telemetry", "kafka-flow.telemetry.group.id")
.AddConsumer(
consumer => consumer
.Topics("topic-dashboard", "topic-dashboard-new")
.Topics("topic-dashboard")
.WithGroupId("groupid-dashboard")
.WithName("consumer-dashboard")
.WithBufferSize(100)
.WithWorkersCount(20)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
)
.AddConsumer(
consumer => consumer
.Topic("topic-dashboard-other")
.WithGroupId("groupid-dashboard-other")
.WithName("consumer-dashboard-other")
.WithBufferSize(10)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
))
);

Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaFlow.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Admin;
using Confluent.Kafka;
using global::Microsoft.Extensions.DependencyInjection;
using KafkaFlow.Admin;
using KafkaFlow.Admin.Messages;
using KafkaFlow.Consumers;
using KafkaFlow.Producers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import { AppComponent } from './app.component';
import { HomeComponent } from './home/home.component';
import { ConsumerComponent } from './consumer/consumer.component';
import { HttpErrorInterceptor } from './http-error.interceptor';
import { GroupByPipe } from './group-by.pipe';
import { SortPipe } from './sort.pipe';
import { CallbackPipe } from './callback.pipe';

import { ConsumerService } from './consumer.service';
import { HttpClientModule, HTTP_INTERCEPTORS } from '@angular/common/http';
Expand All @@ -31,6 +34,9 @@ const appRoutes: Routes = [
@NgModule({
declarations: [
AppComponent,
GroupByPipe,
SortPipe,
CallbackPipe,
HomeComponent,
ConsumerComponent,
RewindModalComponent,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { PipeTransform, Pipe } from '@angular/core';

@Pipe({
name: 'callback',
pure: false
})
export class CallbackPipe implements PipeTransform {
transform(items: any[], callback: (item: any) => boolean): any {
if (!items || !callback) {
return items;
}
return items.filter(item => callback(item));
}
}
Original file line number Diff line number Diff line change
@@ -1,71 +1,84 @@
<ngb-alert #successAlert *ngIf="successMessage" type="success" (closed)="successMessage=''">
<div class="text-center"><b>Success! </b> <span class="text-center">{{ successMessage}}</span></div>
<div class="text-center"><b>Success! </b> <span class="text-center">{{successMessage}}</span></div>
</ngb-alert>
<div class="container" *ngFor="let group of groups | slice: 1 : groups.length">
<div class="card my-3">
<div class="card-body">
<h3>Group Id: {{ group.groupId }}</h3>
<div *ngFor="let consumer of group.consumers">
<h4>Consumer: {{ consumer.consumerName }}</h4>
<h4>Status: <span [ngClass]="{
'text-success': consumer.flowStatus == 'Running',
'text-warning': consumer.flowStatus == 'Paused',
'text-danger': consumer.flowStatus == 'NotRunning'
}">{{consumer.flowStatus}}</span></h4>
<h4>Lag: 55</h4>
<h4>Workers: {{consumer.workersCount}}</h4>
<div class="mt-3 mb-3">
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.consumerName)" *ngIf="consumer.flowStatus == 'Paused'">Resume</button>
<button class="btn btn-outline-warning ml-2" type="button" (click)="openPauseModal(group.groupId, consumer.consumerName)" *ngIf="consumer.flowStatus == 'Running'">Pause</button>
<button class="btn btn-outline-danger ml-2" type="button" (click)="openRestartModal(group.groupId, consumer.consumerName)">Restart</button>
<button class="btn btn-outline-info ml-2" type="button" (click)="openRewindModal(group.groupId, consumer.consumerName)">Rewind Offset</button>
<button class="btn btn-outline-secondary ml-2" type="button" (click)="openResetModal(group.groupId, consumer.consumerName)">Reset Offset</button>
<button class="btn btn-outline-success ml-2" type="button" (click)="openWorkersCountModal(group.groupId, consumer.consumerName, consumer.workersCount)">Update number of workers</button>
</div>
<ng-template let-partitionAssignment ngFor [ngForOf]="consumer.partitionAssignments">
<p class="mb-0 font-weight-bold">Topic: {{partitionAssignment.topic}}</p>
<table class="table table-striped table-hover mt-1">
<thead>
<tr>
<th>Hostname</th>
<th>Partitions</th>
<th>Lag</th>
<th>Status</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
<ng-template [ngIf]="partitionAssignment.pausedPartitions && partitionAssignment.pausedPartitions.length > 0">
<tr>
<td class="text-left">{{partitionAssignment.hostName}}</td>
<td class="text-left">{{partitionAssignment.pausedPartitions}}</td>
<td class="text-left">0</td>
<td class="text-left"><span class="font-weight-bold text-warning">Paused</span></td>
<td class="text-left">
<button class="btn btn-sm btn-success" type="button">Resume</button>
<button class="btn btn-sm btn-info ml-1" type="button">Rewind Offset</button>
<button class="btn btn-sm btn-secondary ml-1" type="button">Reset Offset</button>
</td>
</tr>
</ng-template>
<ng-template [ngIf]="partitionAssignment.runningPartitions?.length > 0">
<tr>
<td class="text-left">{{partitionAssignment.hostName}}</td>
<td class="text-left">{{partitionAssignment.runningPartitions}}</td>
<td class="text-left">0</td>
<td class="text-left"><span class="font-weight-bold text-success">Running</span></td>
<td class="text-left">
<button class="btn btn-sm btn-warning" type="button">Pause</button>
<button class="btn btn-sm btn-info ml-1" type="button">Rewind Offset</button>
<button class="btn btn-sm btn-secondary ml-1" type="button">Reset Offset</button>
</td>
</tr>
</ng-template>
</tbody>
</table>
</ng-template>
<hr />
</div>
<div class="container" *ngFor="let group of groups | callback: removeReadonly">
<div class="card my-3">
<div class="card-body">
<h3>Group Id: {{ group.groupId }}</h3>
<div *ngFor="let consumer of group.consumers">
<h4>Consumer: {{ consumer.consumerName }}</h4>
<h4>Status: <span [ngClass]="{
'text-success': consumer.status == 'Running',
'text-warning': consumer.status == 'Paused',
'text-danger': consumer.status == 'Not Running'
}">{{consumer.status}}</span></h4>
<!-- <h4>Lag: 55</h4> -->
<h4>Workers: {{consumer.workersCount}}</h4>
<div class="mt-3 mb-3">
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.consumerName)" *ngIf="consumer.status == 'Paused'">Resume</button>
<button class="btn btn-outline-warning ml-2" type="button" (click)="openPauseModal(group.groupId, consumer.consumerName)" *ngIf="consumer.status == 'Running'">Pause</button>
<button class="btn btn-outline-danger ml-2" type="button" (click)="openRestartModal(group.groupId, consumer.consumerName)">Restart</button>
<button class="btn btn-outline-info ml-2" type="button" (click)="openRewindModal(group.groupId, consumer.consumerName)">Rewind Offset</button>
<button class="btn btn-outline-secondary ml-2" type="button" (click)="openResetModal(group.groupId, consumer.consumerName)">Reset Offset</button>
<button class="btn btn-outline-success ml-2" type="button" (click)="openWorkersCountModal(group.groupId, consumer.consumerName, consumer.workersCount)">Update number of workers</button>
</div>
<ng-template let-partitionAssignments ngFor [ngForOf]="consumer.partitionAssignments | groupBy:'topic'">
<p class="mb-0 font-weight-bold">Topic: {{partitionAssignments.key}}</p>
<table class="table table-striped table-hover mt-1">
<thead>
<tr>
<th>Hostname - consumer instance</th>
<th>Partitions</th>
<!--<th>Lag</th>-->
<th>Status</th>
<th>LastUpdate</th>
<!--<th>Actions</th>-->
</tr>
</thead>
<tbody>
<ng-template let-partitionAssignment ngFor [ngForOf]="partitionAssignments.value | sort:'hostName'">
<ng-template [ngIf]="partitionAssignment.pausedPartitions?.length > 0">
<tr>
<td class="text-left">{{partitionAssignment.hostName}}</td>
<td class="text-left">{{partitionAssignment.pausedPartitions}}</td>
<!--<td class="text-left">0</td>-->
<td class="text-left">
<div *ngIf="partitionAssignment.isLost;then consumer_lost else consumer_on"></div>
<ng-template #consumer_lost><span>-</span></ng-template>
<ng-template #consumer_on><span class="font-weight-bold text-warning">Paused</span></ng-template>
</td>
<td class="text-left"><span [ngClass]="partitionAssignment.isLost ? 'text-secondary' : 'text-success'">{{partitionAssignment.lastUpdate | date: "medium"}}</span></td>
<!--<td class="text-left">
<button class="btn btn-sm btn-success" *ngIf="!partitionAssignment.isLost" type="button" disabled>Resume</button>
<button class="btn btn-sm btn-info ml-1" *ngIf="!partitionAssignment.isLost" type="button" disabled>Rewind</button>
<button class="btn btn-sm btn-secondary ml-1" *ngIf="!partitionAssignment.isLost" type="button" disabled>Reset</button>
</td>-->
</tr>
</ng-template>
<ng-template [ngIf]="partitionAssignment.runningPartitions?.length > 0">
<tr>
<td class="text-left">{{partitionAssignment.hostName}}</td>
<td class="text-left">{{partitionAssignment.runningPartitions}}</td>
<!--<td class="text-left">0</td>-->
<td class="text-left">
<div *ngIf="partitionAssignment.isLost;then consumer_lost else consumer_on"></div>
<ng-template #consumer_lost><span>-</span></ng-template>
<ng-template #consumer_on><span class="font-weight-bold text-success">Running</span></ng-template>
</td>
<td class="text-left"><span [ngClass]="partitionAssignment.isLost ? 'text-secondary' : 'text-success'">{{partitionAssignment.lastUpdate | date: "medium"}}</span></td>
<!--<td class="text-left">
<button class="btn btn-sm btn-warning" type="button" *ngIf="!partitionAssignment.isLost" disabled>Pause</button>
<button class="btn btn-sm btn-info ml-1" type="button" *ngIf="!partitionAssignment.isLost" disabled>Rewind</button>
<button class="btn btn-sm btn-secondary ml-1" type="button" *ngIf="!partitionAssignment.isLost" disabled>Reset</button>
</td>-->
</tr>
</ng-template>
</ng-template>
</tbody>
</table>
</ng-template>
<hr />
</div>
</div>
</div>
</div>
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Component, Input, OnInit, ViewChild } from '@angular/core';
import { ConsumerService } from '../consumer.service'
import { Subject } from 'rxjs';
import { delay } from 'rxjs/internal/operators';
import { debounceTime } from 'rxjs/operators';
import { NgbModal, NgbAlert } from '@ng-bootstrap/ng-bootstrap';
import { RewindModalComponent } from '../shared/rewind-modal/rewind-modal.component';
Expand All @@ -24,6 +23,10 @@ export class ConsumerComponent implements OnInit {

constructor(private modalService: NgbModal, private consumerService: ConsumerService) { }

removeReadonly(group: any) {
return !(group.consumers.length == 1 && group.consumers[0].isReadonly==1);
}

openWorkersCountModal(groupId: any, consumerName: any, workersCount: number) {
const modalRef = this.modalService.open(WorkersCountModalComponent);
modalRef.componentInstance.groupId = groupId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Pipe, PipeTransform } from '@angular/core';

@Pipe({
name: 'groupBy'
})
export class GroupByPipe implements PipeTransform {
transform(collection: any[], property: string): any[] {
// prevents the application from breaking if the array of objects doesn't exist yet
if(!collection) {
return null as any;
}

const groupedCollection = collection.reduce((previous, current)=> {
if(!previous[current[property]]) {
previous[current[property]] = [current];
} else {
previous[current[property]].push(current);
}

return previous;
}, {});

// this will return an array of objects, each object containing a group of objects
return Object.keys(groupedCollection).map(key => ({ key, value: groupedCollection[key] }));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="#">Clusters</a>
<a class="nav-link active" aria-current="page" href="#">Consumers</a>
</li>
<!--
<li class="nav-item">
<a class="nav-link active" aria-current="page" href="#">Consumers</a>
<a class="nav-link" href="#">Clusters</a>
</li>
<li class="nav-item">
<a class="nav-link" href="#">Data</a>
</li>
<li class="nav-item">
<a class="nav-link" href="#">Monitoring</a>
</li>
</li>-->
</ul>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,30 @@ import {interval} from "rxjs";
export class HomeComponent implements OnInit {
public groups: Array<any> = [];
constructor (private consumerService: ConsumerService) {
interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = data));
interval(1000).subscribe(_ => consumerService.get().subscribe((data: any) => this.groups = this.buildCalculatedProperties(data)));
}

buildCalculatedProperties(groups: any) {
var self = this;
groups.forEach(function (g: any) {
g.consumers.forEach(function (c: any) {
c.status =
c.partitionAssignments.some(function (pa: any){ return pa.runningPartitions?.length > 0 && self.isActive(pa.lastUpdate);}) ?
"Running" :
c.partitionAssignments.some(function (pa: any){ return pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate);}) ?
"Paused" :
"Not Running";
c.partitionAssignments.forEach(function (pa: any) {
pa.isLost = !self.isActive(pa.lastUpdate);
})
})
});

return groups;
}

isActive(date: any) {
return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5;
}

ngOnInit(): void {
Expand Down
26 changes: 26 additions & 0 deletions src/KafkaFlow.Admin.Dashboard/AngularFiles/src/app/sort.pipe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Pipe, PipeTransform } from "@angular/core";

@Pipe({
name: "sort"
})
export class SortPipe implements PipeTransform {
transform(array: any, field: string, order: string = "asc"): any[] {
order = order.toLowerCase();
if(order!= "asc" && order!= "desc") {
return array;
}
if (!Array.isArray(array)) {
return null as any;
}
array.sort((a: any, b: any) => {
if (a[field] < b[field]) {
return (order == "asc")? -1 : 1;
} else if (a[field] > b[field]) {
return (order == "asc")? 1 : -1;
} else {
return 0;
}
});
return array;
}
}
Empty file.
Loading

0 comments on commit 90b73a7

Please sign in to comment.