Skip to content

Commit

Permalink
feat: create telemetry workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 11, 2021
1 parent 0a875a2 commit f86e98d
Show file tree
Hide file tree
Showing 34 changed files with 144 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
</ItemGroup>

<Target Name="DebugNpmInstall" BeforeTargets="Build">
<!-- Ensure Node.js is installed -->
<Exec Command="node --version" ContinueOnError="true">
<Output TaskParameter="ExitCode" PropertyName="ErrorCode" />
</Exec>
Expand All @@ -26,11 +25,15 @@
</Target>

<Target Name="PublishRunWebpack" AfterTargets="ComputeFilesToPublish">
<!-- As part of publishing, ensure the JS resources are freshly built in production mode -->
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="npm run build -- --prod" />
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="if not exist $(PublishDir)$(SpaRoot)dist mkdir $(PublishDir)$(SpaRoot)dist"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="del $(PublishDir)$(SpaRoot)dist\*.* /s /q"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="move dist\dashboard\* $(PublishDir)$(SpaRoot)dist"/>

<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="if not exist $(PublishDir)$(SpaRoot)dist mkdir $(PublishDir)$(SpaRoot)dist" Condition=" '$(OS)' == 'Windows_NT' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="del $(PublishDir)$(SpaRoot)dist\*.* /s /q" Condition=" '$(OS)' == 'Windows_NT' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="move dist\dashboard\* $(PublishDir)$(SpaRoot)dist" Condition=" '$(OS)' == 'Windows_NT' "/>

<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="mkdir $(PublishDir)$(SpaRoot)dist -p" Condition=" '$(OS)' == 'Unix' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="rm $(PublishDir)$(SpaRoot)dist/* -rf" Condition=" '$(OS)' == 'Unix' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="mv dist/dashboard/* $(PublishDir)$(SpaRoot)dist" Condition=" '$(OS)' == 'Unix' "/>
</Target>

</Project>
2 changes: 0 additions & 2 deletions samples/KafkaFlow.Sample.Dashboard/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApp
bus = app.ApplicationServices.CreateKafkaBus();
bus.StartAsync(lifetime.ApplicationStopped);
});

lifetime.ApplicationStopping.Register(() => bus.StopAsync().GetAwaiter().GetResult());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public interface IClusterConfigurationBuilder
/// <returns></returns>
IClusterConfigurationBuilder WithBrokers(IEnumerable<string> brokers);

/// <summary>
/// Sets a unique name for the cluster
/// </summary>
/// <param name="name">A unique name</param>
/// <returns></returns>
IClusterConfigurationBuilder WithName(string name);

/// <summary>
/// Configures cluster security
/// </summary>
Expand Down Expand Up @@ -55,6 +62,6 @@ public interface IClusterConfigurationBuilder
/// </summary>
/// <param name="handler">A handler to KafkaFlow cluster stop event</param>
/// <returns></returns>
IClusterConfigurationBuilder OnStop(Action<IDependencyResolver> handler);
IClusterConfigurationBuilder OnStopping(Action<IDependencyResolver> handler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,42 @@ export class ConsumerService {
return this.http.get(this.accessPointUrl + 'groups', {headers: this.headers});
}

public updateWorkersCount(groupId: any, consumerName: any, workersCount: number) {
public updateWorkersCount(groupId: string, consumerName: string, workersCount: number) {
return this.http.post<any>(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/change-worker-count`,
{ workersCount: workersCount },
{headers: this.headers});
}

public resetOffset(groupId: any, consumerName: any) {
public resetOffset(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/reset-offsets`,
{ confirm: true },
{headers: this.headers});
}

public pause(groupId: any, consumerName: any) {
public pause(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/pause`,
null,
{headers: this.headers});
}

public restart(groupId: any, consumerName: any) {
public restart(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/restart`,
null,
{headers: this.headers});
}

public resume(groupId: any, consumerName: any) {
public resume(groupId: string, consumerName: string) {
return this.http.post<any>(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/resume`,
null,
{headers: this.headers});
}

public rewindOffset(groupId: any, consumerName: any, date: Date) {
public rewindOffset(groupId: string, consumerName: string, date: Date) {
return this.http.post<any>(
this.accessPointUrl +`groups/${groupId}/consumers/${consumerName}/rewind-offsets-to-date`,
{ date: new Date(date.getTime() - (date.getTimezoneOffset() * 60000)).toISOString() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ConsumerComponent implements OnInit {
return !(group.consumers.length == 1 && group.consumers[0].isReadonly==1);
}

openWorkersCountModal(groupId: any, consumerName: any, workersCount: number) {
openWorkersCountModal(groupId: string, consumerName: string, workersCount: number) {
const modalRef = this.modalService.open(WorkersCountModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
Expand All @@ -39,7 +39,7 @@ export class ConsumerComponent implements OnInit {
});
}

openResetModal(groupId: any, consumerName: any) {
openResetModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(ResetModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
Expand All @@ -50,7 +50,7 @@ export class ConsumerComponent implements OnInit {
});
}

openPauseModal(groupId: any, consumerName: any) {
openPauseModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(PauseModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
Expand All @@ -61,7 +61,7 @@ export class ConsumerComponent implements OnInit {
});
}

openRestartModal(groupId: any, consumerName: any) {
openRestartModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(RestartModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
Expand All @@ -72,7 +72,7 @@ export class ConsumerComponent implements OnInit {
});
}

openResumeModal(groupId: any, consumerName: any) {
openResumeModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(ResumeModalComponent);
modalRef.componentInstance.groupId = groupId;
modalRef.componentInstance.consumerName = consumerName;
Expand All @@ -83,7 +83,7 @@ export class ConsumerComponent implements OnInit {
});
}

openRewindModal(groupId: any, consumerName: any) {
openRewindModal(groupId: string, consumerName: string) {
const modalRef = this.modalService.open(RewindModalComponent);
modalRef.componentInstance.consumerName = consumerName;
modalRef.componentInstance.groupId = groupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class HomeComponent implements OnInit {
return groups;
}

isActive(date: any) {
isActive(date: string) {
return Math.abs((new Date().getTime() - new Date(date).getTime())/1000) < 5;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './pause-modal.component.html'
})
export class PauseModalComponent implements OnInit {
@Input() public groupId: any;
@Input() public consumerName: any;
@Input() public groupId: string;
@Input() public consumerName: string;

constructor(public activeModal: NgbActiveModal) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './reset-modal.component.html'
})
export class ResetModalComponent implements OnInit {
@Input() public groupId: any;
@Input() public consumerName: any;
@Input() public groupId: string;
@Input() public consumerName: string;

constructor(public activeModal: NgbActiveModal) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './restart-modal.component.html'
})
export class RestartModalComponent implements OnInit {
@Input() public groupId: any;
@Input() public consumerName: any;
@Input() public groupId: string;
@Input() public consumerName: string;

constructor(public activeModal: NgbActiveModal) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './resume-modal.component.html'
})
export class ResumeModalComponent implements OnInit {
@Input() public groupId: any;
@Input() public consumerName: any;
@Input() public groupId: string;
@Input() public consumerName: string;

constructor(public activeModal: NgbActiveModal) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
})
export class RewindModalComponent implements OnInit {
public rewindDate: Date | undefined;
@Input() public groupId: any;
@Input() public consumerName: any;
@Input() public groupId: string;
@Input() public consumerName: string;

constructor(public activeModal: NgbActiveModal) { }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { NgbActiveModal } from '@ng-bootstrap/ng-bootstrap';
templateUrl: './workers-count-modal.component.html'
})
export class WorkersCountModalComponent implements OnInit {
@Input() public workersCount: any;
@Input() public groupId: any;
@Input() public consumerName: any;
public oldWorkersCount: any;
@Input() public workersCount: number;
@Input() public groupId: string;
@Input() public consumerName: string;
public oldWorkersCount: number;

constructor(public activeModal: NgbActiveModal) {}

Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow.Admin.Dashboard/AngularFiles/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"experimentalDecorators": true,
"moduleResolution": "node",
"importHelpers": true,
"strictPropertyInitialization": false,
"target": "es2015",
"module": "es2020",
"lib": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace KafkaFlow.Admin.WebApi.Adapters

internal static class ConsumerResponseAdapter
{
internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryCache cache)
internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetryStorage storage)
{
var consumerResponse = new ConsumerResponse()
{
Expand All @@ -23,7 +23,7 @@ internal static ConsumerResponse Adapt(this IMessageConsumer consumer, ITelemetr
IsReadonly = consumer.IsReadonly,
};

var cachedMetrics = cache.Get(consumer.GroupId, consumer.ConsumerName);
var cachedMetrics = storage.Get(consumer.GroupId, consumer.ConsumerName);

consumerResponse.PartitionAssignments = cachedMetrics.Any() ?
cachedMetrics.Select(m => m.Adapt()) :
Expand All @@ -41,6 +41,7 @@ private static IEnumerable<PartitionAssignment> GetLocalInfo(IMessageConsumer co
Topic = c.Key,
HostName = Environment.MachineName,
PausedPartitions = c.Select(x => x.Partition.Value).ToList(),
LastUpdate = DateTime.Now,
})
.Union(consumer.RunningPartitions
.GroupBy(c => c.Topic)
Expand All @@ -49,6 +50,7 @@ private static IEnumerable<PartitionAssignment> GetLocalInfo(IMessageConsumer co
Topic = c.Key,
HostName = Environment.MachineName,
RunningPartitions = c.Select(x => x.Partition.Value).ToList(),
LastUpdate = DateTime.Now,
}));
}

Expand Down
12 changes: 6 additions & 6 deletions src/KafkaFlow.Admin.WebApi/Controllers/ConsumersController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ public class ConsumersController : ControllerBase
{
private readonly IConsumerAccessor consumers;
private readonly IAdminProducer adminProducer;
private readonly ITelemetryCache cache;
private readonly ITelemetryStorage storage;

/// <summary>
/// Initializes a new instance of the <see cref="ConsumersController"/> class.
/// </summary>
/// <param name="consumers">The accessor class that provides access to the consumers</param>
/// <param name="adminProducer">The producer to publish admin messages</param>
/// <param name="cache">The cache interface to get metric data</param>
public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache)
/// <param name="storage">The cache interface to get metric data</param>
public ConsumersController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryStorage storage)
{
this.consumers = consumers;
this.adminProducer = adminProducer;
this.cache = cache;
this.storage = storage;
}

/// <summary>
Expand All @@ -46,7 +46,7 @@ public IActionResult Get([FromRoute] string groupId)
this.consumers
.All
.Where(x => x.GroupId == groupId)
.Select(x=> x.Adapt(this.cache)));
.Select(x=> x.Adapt(this.storage)));
}

/// <summary>
Expand All @@ -71,7 +71,7 @@ public IActionResult Get(
return this.NotFound();
}

return this.Ok(consumer.Adapt(this.cache));
return this.Ok(consumer.Adapt(this.storage));
}

/// <summary>
Expand Down
10 changes: 5 additions & 5 deletions src/KafkaFlow.Admin.WebApi/Controllers/GroupsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ public class GroupsController : ControllerBase
{
private readonly IConsumerAccessor consumers;
private readonly IAdminProducer adminProducer;
private readonly ITelemetryCache cache;
private readonly ITelemetryStorage storage;

/// <summary>
/// Initializes a new instance of the <see cref="GroupsController"/> class.
/// </summary>
/// <param name="consumers">The accessor class that provides access to the consumers</param>
/// <param name="adminProducer">The producer to publish admin messages</param>
/// <param name="cache">The cache interface to get metric data</param>
public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryCache cache)
/// <param name="storage">The cache interface to get metric data</param>
public GroupsController(IConsumerAccessor consumers, IAdminProducer adminProducer, ITelemetryStorage storage)
{
this.consumers = consumers;
this.adminProducer = adminProducer;
this.cache = cache;
this.storage = storage;
}

/// <summary>
Expand All @@ -48,7 +48,7 @@ public IActionResult Get()
x => new GroupResponse
{
GroupId = x.First().GroupId,
Consumers = x.Select(y => y.Adapt(this.cache)),
Consumers = x.Select(y => y.Adapt(this.storage)),
}));
}

Expand Down
Loading

0 comments on commit f86e98d

Please sign in to comment.