Skip to content

Commit

Permalink
feat: add consumer lag to telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jul 1, 2021
1 parent 332a332 commit 6f434b1
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/KafkaFlow.Admin.Dashboard/ClientApp/dist/main.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
/* eslint-disable */
export interface TopicPartitionAssignment {
instanceName: string;
lag?: number;
lastUpdate?: string;
pausedPartitions?: null | Array<number>;
runningPartitions?: null | Array<number>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ <h4>Status: <span [ngClass]="{
'text-warning': $any(consumer).status == 'Paused',
'text-danger': $any(consumer).status == 'Not Running'
}">{{$any(consumer).status}}</span></h4>
<!-- <h4>Lag: 55</h4> -->
<h4>Lag: {{$any(consumer).lag}}</h4>
<h4>Workers: {{consumer.workersCount}}</h4>
<div class="mt-3 mb-3">
<button class="btn btn-outline-success" type="button" (click)="openResumeModal(group.groupId, consumer.name)" *ngIf="$any(consumer).status == 'Paused'">Resume</button>
Expand All @@ -29,7 +29,7 @@ <h4>Workers: {{consumer.workersCount}}</h4>
<tr>
<th>Consumer instance</th>
<th>Partitions</th>
<!--<th>Lag</th>-->
<th>Lag</th>
<th>Status</th>
<th>LastUpdate</th>
<!--<th>Actions</th>-->
Expand All @@ -41,10 +41,10 @@ <h4>Workers: {{consumer.workersCount}}</h4>
<tr>
<td class="text-left">{{partitionAssignment.instanceName}}</td>
<td class="text-left">{{partitionAssignment.pausedPartitions}}</td>
<!--<td class="text-left">0</td>-->
<td class="text-left">{{partitionAssignment.lag}}</td>
<td class="text-left">
<div *ngIf="partitionAssignment.isLost;then consumer_lost else consumer_on"></div>
<ng-template #consumer_lost><span>-</span></ng-template>
<ng-template #consumer_lost><span>Offline</span></ng-template>
<ng-template #consumer_on><span class="font-weight-bold text-warning">Paused</span></ng-template>
</td>
<td class="text-left"><span [ngClass]="partitionAssignment.isLost ? 'text-secondary' : 'text-success'">{{partitionAssignment.lastUpdate | date: "medium"}}</span></td>
Expand All @@ -59,13 +59,13 @@ <h4>Workers: {{consumer.workersCount}}</h4>
<tr>
<td class="text-left">{{partitionAssignment.instanceName}}</td>
<td class="text-left">{{partitionAssignment.runningPartitions}}</td>
<!--<td class="text-left">0</td>-->
<td class="text-left">{{partitionAssignment.lag}}</td>
<td class="text-left">
<div *ngIf="partitionAssignment.isLost;then consumer_lost else consumer_on"></div>
<ng-template #consumer_lost><span>-</span></ng-template>
<ng-template #consumer_lost><span>Offline</span></ng-template>
<ng-template #consumer_on><span class="font-weight-bold text-success">Running</span></ng-template>
</td>
<td class="text-left"><span [ngClass]="partitionAssignment.isLost ? 'text-secondary' : 'text-success'">{{partitionAssignment.lastUpdate | date: "medium"}}</span></td>
<td class="text-left"><span [ngClass]="partitionAssignment.isLost ? 'text-secondary' : 'text-success'">{{partitionAssignment.lastUpdate+'Z' | date: "medium"}}</span></td>
<!--<td class="text-left">
<button class="btn btn-sm btn-warning" type="button" *ngIf="!partitionAssignment.isLost" disabled>Pause</button>
<button class="btn btn-sm btn-info ml-1" type="button" *ngIf="!partitionAssignment.isLost" disabled>Rewind</button>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {ChangeWorkersCountRequest} from '../api/models/change-workers-count-requ
import {ResetOffsetsRequest} from '../api/models/reset-offsets-request';
import {RewindOffsetsToDateRequest} from '../api/models/rewind-offsets-to-date-request';
import {ConsumerGroup} from '../api/models/consumer-group';
import {TopicPartitionAssignment} from '../api/models/topic-partition-assignment';

@Component({
selector: 'app-consumer',
Expand Down Expand Up @@ -53,6 +54,7 @@ export class ConsumerComponent implements OnInit {
c.assignments.some((pa: any) => pa.pausedPartitions?.length > 0 && self.isActive(pa.lastUpdate)) ?
'Paused' :
'Not Running';
c.lag = c.assignments.map((item: TopicPartitionAssignment) => item.lag).reduce((prev: number, next: number) => prev + next);
c.assignments.forEach((pa: any) => pa.isLost = !self.isActive(pa.lastUpdate)
);
});
Expand All @@ -61,7 +63,7 @@ export class ConsumerComponent implements OnInit {
return telemetryResponse;
}

isActive = (date: string) => Math.abs((new Date().getTime() - new Date(date).getTime()) / 1000) < 5;
isActive = (date: string) => Math.abs((new Date().getTime() - new Date(date + 'Z').getTime()) / 1000) < 5;

openWorkersCountModal = (groupId: string, consumerName: string, workersCount?: number) => {
const modalRef = this.modalService.open(WorkersCountModalComponent);
Expand Down
15 changes: 8 additions & 7 deletions src/KafkaFlow.Admin.WebApi/Adapters/TelemetryResponseAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ internal static TelemetryResponse Adapt(this IEnumerable<ConsumerTelemetryMetric
Name = metric.First().ConsumerName,
WorkersCount = metric.OrderByDescending(x=> x.SentAt).First().WorkersCount,
Assignments = metric.Select(
x => new TelemetryResponse.TopicPartitionAssignment
m => new TelemetryResponse.TopicPartitionAssignment
{
InstanceName = x.InstanceName,
TopicName = x.Topic,
Status = x.Status.ToString(),
LastUpdate = x.SentAt,
PausedPartitions = x.PausedPartitions,
RunningPartitions = x.RunningPartitions,
InstanceName = m.InstanceName,
TopicName = m.Topic,
Status = m.Status.ToString(),
LastUpdate = m.SentAt,
PausedPartitions = m.PausedPartitions,
RunningPartitions = m.RunningPartitions,
Lag = m.Lag,
}),
}),
}),
Expand Down
5 changes: 5 additions & 0 deletions src/KafkaFlow.Admin.WebApi/Contracts/TelemetryResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ public class TopicPartitionAssignment
/// Gets or sets the datetime at when the partition assigned was updated
/// </summary>
public DateTime LastUpdate { get; set; }

/// <summary>
/// Gets or sets the consumer lag
/// </summary>
public long Lag { get; set; }
}
}
}
6 changes: 6 additions & 0 deletions src/KafkaFlow.Admin/Messages/ConsumerTelemetryMetric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,11 @@ public class ConsumerTelemetryMetric
/// </summary>
[DataMember(Order = 9)]
public ConsumerStatus Status { get; set; }

/// <summary>
/// Gets or sets the consumer lag
/// </summary>
[DataMember(Order = 10)]
public long Lag { get; set; }
}
}
21 changes: 13 additions & 8 deletions src/KafkaFlow.Admin/TelemetryScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,29 @@ private static void ProduceTelemetry(
IList<IMessageConsumer> consumers,
IMessageProducer producer)
{
var items = consumers.SelectMany(
c => c.Assignment.Select(
a => new ConsumerTelemetryMetric()
var items = consumers
.Where(c => c.Subscription?.Any() == true)
.SelectMany(c =>
{
var consumerLag = c.GetTopicPartitionsLag();
return c.Subscription?.Select(topic => new ConsumerTelemetryMetric
{
ConsumerName = c.ConsumerName,
Topic = a.Topic,
Topic = topic,
GroupId = c.GroupId,
InstanceName = Environment.MachineName,
PausedPartitions = c.PausedPartitions
.Where(p => p.Topic == a.Topic)
.Where(p => p.Topic == topic)
.Select(p => p.Partition.Value),
RunningPartitions = c.RunningPartitions
.Where(p => p.Topic == a.Topic)
.Where(p => p.Topic == topic)
.Select(p => p.Partition.Value),
WorkersCount = c.WorkersCount,
Status = c.Status,
SentAt = DateTime.Now,
}));
Lag = consumerLag.Where(l => l.topic == topic).Sum(l => l.lag),
SentAt = DateTime.Now.ToUniversalTime(),
});
});

foreach (var item in items)
{
Expand Down
34 changes: 30 additions & 4 deletions src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace KafkaFlow.Consumers
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
Expand All @@ -20,6 +21,7 @@ private readonly List<Action<IDependencyResolver, IConsumer<byte[], byte[]>, Lis

private readonly List<Action<IConsumer<byte[], byte[]>, Error>> errorsHandlers = new();
private readonly List<Action<IConsumer<byte[], byte[]>, string>> statisticsHandlers = new();
private readonly Dictionary<(string topic, int partition), long> committedOffsets = new();
private readonly ConsumerFlowManager flowManager;

private IConsumer<byte[], byte[]> consumer;
Expand Down Expand Up @@ -54,7 +56,7 @@ public Consumer(

public IConsumerConfiguration Configuration { get; }

public IReadOnlyList<string> Subscription => this.consumer?.Subscription.AsReadOnly();
public IReadOnlyList<string> Subscription { get; private set; } = new List<string>();

public IReadOnlyList<TopicPartition> Assignment { get; private set; } = new List<TopicPartition>();

Expand Down Expand Up @@ -110,7 +112,27 @@ public List<TopicPartitionOffset> OffsetsForTimes(
TimeSpan timeout) =>
this.consumer.OffsetsForTimes(topicPartitions, timeout);

public void Commit(IEnumerable<TopicPartitionOffset> offsetsValues) => this.consumer.Commit(offsetsValues);
public IEnumerable<(string topic, int partition, long lag)> GetTopicPartitionsLag()
{
return this.Assignment.Select(tp =>
{
var offsetEnd = this.GetWatermarkOffsets(tp).High.Value;
var offsetCurrent = this.committedOffsets.TryGetValue((tp.Topic, tp.Partition.Value), out var offset)
? offset
: 0;
return (tp.Topic, tp.Partition.Value, offsetEnd - offsetCurrent);
});
}

public void Commit(IEnumerable<TopicPartitionOffset> offsetsValues)
{
this.consumer.Commit(offsetsValues);

foreach (var offset in offsetsValues)
{
this.committedOffsets[(offset.Topic, offset.Partition.Value)] = offset.Offset.Value;
}
}

public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationToken cancellationToken)
{
Expand All @@ -119,7 +141,7 @@ public async ValueTask<ConsumeResult<byte[], byte[]>> ConsumeAsync(CancellationT
try
{
this.EnsureConsumer();
await this.flowManager.BlockHeartbeat();
await this.flowManager.BlockHeartbeat(cancellationToken);
return this.consumer.Consume(cancellationToken);
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -167,6 +189,7 @@ private void EnsureConsumer()
(consumer, partitions) =>
{
this.Assignment = partitions;
this.Subscription = consumer.Subscription;
this.flowManager.Start(consumer);

this.partitionsAssignedHandlers.ForEach(x =>
Expand All @@ -176,13 +199,16 @@ private void EnsureConsumer()
(consumer, partitions) =>
{
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
this.committedOffsets.Clear();
this.flowManager.Stop();

this.partitionsRevokedHandlers.ForEach(
x => x(this.dependencyResolver, consumer, partitions));
})
.SetErrorHandler((consumer, error) => this.errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => this.statisticsHandlers.ForEach(x => x(consumer, statistics)))
.SetStatisticsHandler((consumer, statistics) =>
this.statisticsHandlers.ForEach(x => x(consumer, statistics)))
.Build();

this.consumer.Subscribe(this.Configuration.Topics);
Expand Down
11 changes: 7 additions & 4 deletions src/KafkaFlow/Consumers/ConsumerFlowManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ public void Pause(IReadOnlyCollection<TopicPartition> topicPartitions)
}
}

public Task BlockHeartbeat()
public Task BlockHeartbeat(CancellationToken cancellationToken)
{
return this.consumerSemaphore.WaitAsync();
return this.consumerSemaphore.WaitAsync(cancellationToken);
}

public void ReleaseHeartbeat()
{
this.consumerSemaphore.Release();
if (this.consumerSemaphore.CurrentCount != 1)
{
this.consumerSemaphore.Release();
}
}

public void Resume(IReadOnlyCollection<TopicPartition> topicPartitions)
Expand Down Expand Up @@ -131,7 +134,7 @@ private void StartHeartbeat()
}
finally
{
this.consumerSemaphore.Release();
this.ReleaseHeartbeat();
}
}
});
Expand Down
6 changes: 6 additions & 0 deletions src/KafkaFlow/Consumers/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public interface IConsumer : IDisposable
/// </summary>
ConsumerStatus Status { get; }

/// <summary>
/// Gets the lag of each topic/partitions assigned
/// </summary>
/// <returns>The list of topic, partition and lag</returns>
IEnumerable<(string topic, int partition, long lag)> GetTopicPartitionsLag();

/// <summary>
/// Register a handler to be executed when the partitions are assigned
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/KafkaFlow/Consumers/IMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public interface IMessageConsumer
/// </summary>
IEnumerable<TopicPartition> RunningPartitions { get; }

/// <summary>
/// Gets the lag of each topic/partitions assigned
/// </summary>
/// <returns>The list of topic, partition and lag</returns>
IEnumerable<(string topic, int partition, long lag)> GetTopicPartitionsLag();

/// <summary>
/// Overrides the offsets of the given partitions and restart the consumer
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/KafkaFlow/Consumers/MessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public List<TopicPartitionOffset> GetOffsets(
TimeSpan timeout) =>
this.consumerManager.Consumer.OffsetsForTimes(topicPartitions, timeout);

public IEnumerable<(string topic, int partition, long lag)> GetTopicPartitionsLag() =>
this.consumerManager.Consumer.GetTopicPartitionsLag();

public async Task OverrideOffsetsAndRestartAsync(IReadOnlyCollection<TopicPartitionOffset> offsets)
{
if (offsets is null)
Expand Down

0 comments on commit 6f434b1

Please sign in to comment.