Skip to content

Commit

Permalink
feat: expose OnStop cluster handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Douglas Lima committed Jun 9, 2021
1 parent 7b9d0b6 commit 939db93
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,12 @@ public interface IClusterConfigurationBuilder
/// <param name="consumer">A handler to configure the consumer</param>
/// <returns></returns>
IClusterConfigurationBuilder AddConsumer(Action<IConsumerConfigurationBuilder> consumer);

/// <summary>
/// Adds a handler to KafkaFlow cluster stop event
/// </summary>
/// <param name="handler">A handler to KafkaFlow cluster stop event</param>
/// <returns></returns>
IClusterConfigurationBuilder OnStop(Action<IDependencyResolver> handler);
}
}
8 changes: 6 additions & 2 deletions src/KafkaFlow/Configuration/ClusterConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ namespace KafkaFlow.Configuration
internal class ClusterConfiguration
{
private readonly Func<SecurityInformation> securityInformationHandler;

private Action<IDependencyResolver> onStopHandler = _ => { };
private readonly List<IProducerConfiguration> producers = new();
private readonly List<IConsumerConfiguration> consumers = new();

public ClusterConfiguration(
KafkaConfiguration kafka,
IEnumerable<string> brokers,
Func<SecurityInformation> securityInformationHandler)
Func<SecurityInformation> securityInformationHandler,
Action<IDependencyResolver> onStopHandler)
{
this.securityInformationHandler = securityInformationHandler;
this.Kafka = kafka;
this.Brokers = brokers.ToList();
this.onStopHandler = onStopHandler;
}

public KafkaConfiguration Kafka { get; }
Expand All @@ -34,5 +36,7 @@ public ClusterConfiguration(
public void AddProducers(IEnumerable<IProducerConfiguration> configurations) => this.producers.AddRange(configurations);

public SecurityInformation GetSecurityInformation() => this.securityInformationHandler?.Invoke();

public Action<IDependencyResolver> OnStopHandler => this.onStopHandler;
}
}
10 changes: 9 additions & 1 deletion src/KafkaFlow/Configuration/ClusterConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ internal class ClusterConfigurationBuilder : IClusterConfigurationBuilder
{
private readonly List<ProducerConfigurationBuilder> producers = new();
private readonly List<ConsumerConfigurationBuilder> consumers = new();
private Action<IDependencyResolver> onStopHandler = _ => { };

private IEnumerable<string> brokers;
private Func<SecurityInformation> securityInformationHandler;
Expand All @@ -25,7 +26,8 @@ public ClusterConfiguration Build(KafkaConfiguration kafkaConfiguration)
var configuration = new ClusterConfiguration(
kafkaConfiguration,
this.brokers.ToList(),
this.securityInformationHandler);
this.securityInformationHandler,
this.onStopHandler);

configuration.AddProducers(this.producers.Select(x => x.Build(configuration)));
configuration.AddConsumers(this.consumers.Select(x => x.Build(configuration)));
Expand Down Expand Up @@ -82,5 +84,11 @@ public IClusterConfigurationBuilder AddConsumer(Action<IConsumerConfigurationBui

return this;
}

public IClusterConfigurationBuilder OnStop(Action<IDependencyResolver> handler)
{
this.onStopHandler = handler;
return this;
}
}
}
3 changes: 3 additions & 0 deletions src/KafkaFlow/Configuration/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
namespace KafkaFlow.Configuration
{
using System;
using System.Collections.Generic;

internal class KafkaConfiguration
{
private readonly List<ClusterConfiguration> clusters = new();

public Action<IDependencyResolver> OnStopHandler = _ => { };

public IReadOnlyCollection<ClusterConfiguration> Clusters => this.clusters;

public void AddClusters(IEnumerable<ClusterConfiguration> configurations) => this.clusters.AddRange(configurations);
Expand Down
2 changes: 2 additions & 0 deletions src/KafkaFlow/KafkaBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public async Task StartAsync(CancellationToken stopCancellationToken = default)

public Task StopAsync()
{
this.configuration.Clusters.ToList().ForEach(c => c.OnStopHandler(this.dependencyResolver));

return Task.WhenAll(this.consumerManagers.Select(x => x.StopAsync()));
}
}
Expand Down

0 comments on commit 939db93

Please sign in to comment.