Skip to content

Commit

Permalink
feat: create telemetry workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 2, 2021
1 parent 0967b93 commit cbf6afe
Show file tree
Hide file tree
Showing 13 changed files with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
</ItemGroup>

<Target Name="DebugNpmInstall" BeforeTargets="Build">
<!-- Ensure Node.js is installed -->
<Exec Command="node --version" ContinueOnError="true">
<Output TaskParameter="ExitCode" PropertyName="ErrorCode" />
</Exec>
Expand All @@ -26,11 +25,15 @@
</Target>

<Target Name="PublishRunWebpack" AfterTargets="ComputeFilesToPublish">
<!-- As part of publishing, ensure the JS resources are freshly built in production mode -->
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="npm run build -- --prod" />
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="if not exist $(PublishDir)$(SpaRoot)dist mkdir $(PublishDir)$(SpaRoot)dist"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="del $(PublishDir)$(SpaRoot)dist\*.* /s /q"/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="move dist\dashboard\* $(PublishDir)$(SpaRoot)dist"/>

<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="if not exist $(PublishDir)$(SpaRoot)dist mkdir $(PublishDir)$(SpaRoot)dist" Condition=" '$(OS)' == 'Windows_NT' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="del $(PublishDir)$(SpaRoot)dist\*.* /s /q" Condition=" '$(OS)' == 'Windows_NT' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="move dist\dashboard\* $(PublishDir)$(SpaRoot)dist" Condition=" '$(OS)' == 'Windows_NT' "/>

<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="mkdir $(PublishDir)$(SpaRoot)dist -p" Condition=" '$(OS)' == 'Unix' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="rm $(PublishDir)$(SpaRoot)dist/* -rf" Condition=" '$(OS)' == 'Unix' "/>
<Exec WorkingDirectory="$(OutDir)$(SpaRoot)" Command="mv dist/dashboard/* $(PublishDir)$(SpaRoot)dist" Condition=" '$(OS)' == 'Unix' "/>
</Target>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private static IEnumerable<PartitionAssignment> GetLocalInfo(IMessageConsumer co
Topic = c.Key,
HostName = Environment.MachineName,
PausedPartitions = c.Select(x => x.Partition.Value).ToList(),
LastUpdate = DateTime.Now,
})
.Union(consumer.RunningPartitions
.GroupBy(c => c.Topic)
Expand All @@ -49,6 +50,7 @@ private static IEnumerable<PartitionAssignment> GetLocalInfo(IMessageConsumer co
Topic = c.Key,
HostName = Environment.MachineName,
RunningPartitions = c.Select(x => x.Partition.Value).ToList(),
LastUpdate = DateTime.Now,
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static IClusterConfigurationBuilder EnableAdminMessages(
cluster.DependencyConfigurator.AddSingleton<IAdminProducer, AdminProducer>();

cluster.DependencyConfigurator.AddSingleton<IMemoryCache, MemoryCache>();
cluster.DependencyConfigurator.AddSingleton<ITelemetryCache, TelemetryCache>();
cluster.DependencyConfigurator.AddSingleton<ITelemetryCache, InMemoryTelemetryCache>();

return cluster
.AddProducer<AdminProducer>(
Expand Down Expand Up @@ -81,7 +81,7 @@ public static IClusterConfigurationBuilder EnableTelemetry(
string consumerGroup)
{
cluster.DependencyConfigurator.AddSingleton<IMemoryCache, MemoryCache>();
cluster.DependencyConfigurator.AddSingleton<ITelemetryCache, TelemetryCache>();
cluster.DependencyConfigurator.AddSingleton<ITelemetryCache, InMemoryTelemetryCache>();

var groupId =
$"{consumerGroup}-{Environment.MachineName}-{Convert.ToBase64String(Guid.NewGuid().ToByteArray())}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ namespace KafkaFlow.Admin
/// <summary>
/// Provide cache operations related to telemetry data
/// </summary>
public class TelemetryCache : ITelemetryCache
public class InMemoryTelemetryCache : ITelemetryCache
{
private readonly IMemoryCache cache;

/// <summary>
/// Initializes a new instance of the <see cref="TelemetryCache"/> class.
/// Initializes a new instance of the <see cref="InMemoryTelemetryCache"/> class.
/// </summary>
/// <param name="cache">The memory cache interface to get metric data</param>
public TelemetryCache(IMemoryCache cache) => this.cache = cache;
/// <param name="cache">The memory cache class to manage telemetry data</param>
public InMemoryTelemetryCache(IMemoryCache cache) => this.cache = cache;

/// <inheritdoc />
public List<ConsumerMetric> Get(string groupId, string consumerName)
Expand Down
8 changes: 4 additions & 4 deletions src/KafkaFlow.Admin/TelemetryScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ namespace KafkaFlow.Admin

internal class TelemetryScheduler
{
private static readonly Lazy<Dictionary<string, Timer>> timers = new (() => new Dictionary<string, Timer>());
private static readonly Lazy<Dictionary<string, Timer>> Timers = new (() => new Dictionary<string, Timer>());

public static void Set(string key, TimerCallback callback, TimeSpan dueTime, TimeSpan period)
{
Unset(key);

timers.Value[key] = new Timer(callback, null, dueTime, period);
Timers.Value[key] = new Timer(callback, null, dueTime, period);
}

public static void Unset(string key)
{
if (timers.Value.TryGetValue(key, out var timer))
if (Timers.Value.TryGetValue(key, out var timer))
{
timer.Dispose();
timers.Value.Remove(key);
Timers.Value.Remove(key);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaFlow/Configuration/ClusterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration
internal class ClusterConfiguration
{
private readonly Func<SecurityInformation> securityInformationHandler;
private Action<IDependencyResolver> onStopHandler = _ => { };
private readonly Action<IDependencyResolver> onStopHandler = _ => { };
private readonly List<IProducerConfiguration> producers = new();
private readonly List<IConsumerConfiguration> consumers = new();

Expand All @@ -31,12 +31,12 @@ public ClusterConfiguration(

public IReadOnlyCollection<IConsumerConfiguration> Consumers => this.consumers.AsReadOnly();

public Action<IDependencyResolver> OnStopHandler => this.onStopHandler;

public void AddConsumers(IEnumerable<IConsumerConfiguration> configurations) => this.consumers.AddRange(configurations);

public void AddProducers(IEnumerable<IProducerConfiguration> configurations) => this.producers.AddRange(configurations);

public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke();

public Action<IDependencyResolver> OnStopHandler => this.onStopHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaFlow.Configuration
using Confluent.Kafka;
using KafkaFlow.Consumers.DistributionStrategies;

internal class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder
internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuilder
{
private readonly List<string> topics = new();
private readonly List<Action<string>> statisticsHandlers = new();
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Configuration/IConsumerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface IConsumerConfiguration
string ConsumerName { get; }

/// <summary>
/// Gets the consumer readonly flag
/// Gets a value indicating whether the consumer is readonly or not
/// </summary>
bool IsReadonly { get; }

Expand Down
3 changes: 0 additions & 3 deletions src/KafkaFlow/Configuration/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;

internal class KafkaConfiguration
{
private readonly List<ClusterConfiguration> clusters = new();

public Action<IDependencyResolver> OnStopHandler = _ => { };

public IReadOnlyCollection<ClusterConfiguration> Clusters => this.clusters;

public void AddClusters(IEnumerable<ClusterConfiguration> configurations) => this.clusters.AddRange(configurations);
Expand Down
2 changes: 0 additions & 2 deletions src/KafkaFlow/Consumers/ConsumerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ private void OnPartitionAssigned(IReadOnlyCollection<TopicPartition> partitions)
"Partitions assigned",
this.GetConsumerLogInfo(partitions));

this.Consumer.FlowManager.UpdatePausedPartitions(partitions);

this.WorkerPool
.StartAsync(partitions)
.GetAwaiter()
Expand Down
2 changes: 2 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public async Task StartAsync(IEnumerable<TopicPartition> partitions)
this.logHandler),
partitions);

this.consumer.FlowManager.UpdatePausedPartitions(partitions);

await Task.WhenAll(
Enumerable
.Range(0, this.consumer.Configuration.WorkersCount)
Expand Down
4 changes: 4 additions & 0 deletions src/KafkaFlow/Consumers/IConsumerFlowManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public interface IConsumerFlowManager : IDisposable
/// <param name="topicPartitions">A list of partitions</param>
void Resume(IReadOnlyCollection<TopicPartition> topicPartitions);

/// <summary>
/// Removes the running partitions from the list of paused partitions
/// </summary>
/// <param name="partitionsRunning">A list of partitions that are running</param>
void UpdatePausedPartitions(IEnumerable<TopicPartition> partitionsRunning);
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/IMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IMessageConsumer
string ConsumerName { get; }

/// <summary>
/// Gets the readonly flag defined in the configuration
/// Gets a value indicating whether the consumer is readonly or not
/// </summary>
bool IsReadonly { get; }

Expand Down

0 comments on commit cbf6afe

Please sign in to comment.