Skip to content

Commit

Permalink
feat: creates worker context and worker events
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Jul 14, 2023
1 parent 1d17c14 commit 48a7417
Show file tree
Hide file tree
Showing 27 changed files with 299 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
Expand All @@ -17,5 +18,15 @@ public interface IWorker
/// </summary>
/// <param name="handler"><see cref="Action"/> to be executed</param>
void OnTaskCompleted(Action handler);

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject> WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject> WorkerStopped { get; }
}
}
13 changes: 13 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/IWorkerLifetimeContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace KafkaFlow
{
/// <summary>
/// Provides access to the current consumer worker context. This interface only returns values when inside a middleware with Worker lifetime; otherwise, it will return null.
/// </summary>
public interface IWorkerLifetimeContext
{
/// <summary>
/// Gets the current worker in the context.
/// </summary>
IWorker Worker { get; }
}
}
11 changes: 11 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace KafkaFlow
{
using KafkaFlow.Observer;

/// <summary>
/// Represents a subject specific to worker stopped events where observers can subscribe to receive notifications.
/// </summary>
public class WorkerStoppedSubject : Subject<WorkerStoppedSubject>
{
}
}
11 changes: 11 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace KafkaFlow
{
using KafkaFlow.Observer;

/// <summary>
/// Represents a subject specific to worker stopping events where observers can subscribe to receive notifications.
/// </summary>
public class WorkerStoppingSubject : Subject<WorkerStoppingSubject>
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.Add<TService>(InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
Expand Down
16 changes: 16 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace KafkaFlow.Observer
{
/// <summary>
/// Represents a subject in the observer design pattern that can be observed by observers.
/// </summary>
/// <typeparam name="T">The type of the subject.</typeparam>
public interface ISubject<T>
where T : ISubject<T>
{
/// <summary>
/// Subscribes an observer to the subject.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
void Subscribe(ISubjectObserver<T> observer);
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace KafkaFlow.Observer
{
using System.Threading.Tasks;

/// <summary>
/// Represents an observer in the observer design pattern that can receive notifications from a subject.
/// </summary>
/// <typeparam name="T">The type of the subject.</typeparam>
public interface ISubjectObserver<T>
where T : ISubject<T>
{
/// <summary>
/// Called when a notification is received from the subject.
/// </summary>
/// <returns>A task representing the asynchronous notification handling.</returns>
Task OnNotification();
}
}
33 changes: 33 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace KafkaFlow.Observer
{
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Represents a subject in the observer design pattern that can be observed by multiple observers.
/// </summary>
/// <typeparam name="T">The type of the subject.</typeparam>
public abstract class Subject<T> : ISubject<T>
where T : ISubject<T>
{
private readonly List<ISubjectObserver<T>> observers = new();

/// <summary>
/// Subscribes an observer to the subject, allowing it to receive notifications.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
public void Subscribe(ISubjectObserver<T> observer) => this.observers.Add(observer);

/// <summary>
/// Notifies all subscribed observers asynchronously.
/// </summary>
/// <returns>A task representing the asynchronous notification operation.</returns>
public async Task NotifyAsync()
{
foreach (var observer in this.observers)
{
await observer.OnNotification();
}
}
}
}
3 changes: 2 additions & 1 deletion src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static IConsumerMiddlewareConfigurationBuilder BatchConsume(
{
return builder.Add(
resolver => new BatchConsumeMiddleware(
resolver.Resolve<IWorkerLifetimeContext>(),
batchSize,
batchTimeout,
resolver.Resolve<ILogHandler>()),
Expand All @@ -38,7 +39,7 @@ public static IReadOnlyCollection<IMessageContext> GetMessagesBatch(this IMessag
{
if (context is BatchConsumeMessageContext ctx)
{
return (IReadOnlyCollection<IMessageContext>) ctx.Message.Value;
return (IReadOnlyCollection<IMessageContext>)ctx.Message.Value;
}

throw new InvalidOperationException($"This method can only be used on {nameof(BatchConsumeMessageContext)}");
Expand Down
38 changes: 25 additions & 13 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable, IAsyncDisposable
internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject>,
IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

Expand All @@ -16,8 +20,10 @@ internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable, IAsyncD

private readonly List<IMessageContext> batch;
private CancellationTokenSource dispatchTokenSource;
private Task<Task> dispatchTask;

public BatchConsumeMiddleware(
IWorkerLifetimeContext workerContext,
int batchSize,
TimeSpan batchTimeout,
ILogHandler logHandler)
Expand All @@ -26,6 +32,8 @@ public BatchConsumeMiddleware(
this.batchTimeout = batchTimeout;
this.logHandler = logHandler;
this.batch = new(batchSize);

workerContext.Worker.WorkerStopped.Subscribe(this);
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand All @@ -45,13 +53,14 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)

this.dispatchTokenSource.CancelAfter(this.batchTimeout);

this.dispatchTokenSource.Token.Register(
async _ =>
{
this.dispatchTokenSource.Dispose();
await this.DispatchAsync(context, next);
},
null);
this.dispatchTask = Task
.Delay(Timeout.Infinite, this.dispatchTokenSource.Token)
.ContinueWith(
async _ =>
{
this.dispatchTokenSource.Dispose();
await this.DispatchAsync(context, next);
});
}

if (this.batch.Count >= this.batchSize)
Expand All @@ -65,16 +74,19 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
}

public async ValueTask DisposeAsync()
async Task ISubjectObserver<WorkerStoppedSubject>.OnNotification()
{
this.dispatchTokenSource.Dispose();
this.dispatchTokenSource?.Cancel();
await (this.dispatchTask ?? Task.CompletedTask);
}

await this.dispatchSemaphore.WaitAsync();
public void Dispose()
{
this.dispatchTask?.Dispose();
this.dispatchTokenSource?.Dispose();
this.dispatchSemaphore.Dispose();
}

public void Dispose() => this.DisposeAsync().GetAwaiter().GetResult();

private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate next)
{
await this.dispatchSemaphore.WaitAsync();
Expand Down
9 changes: 6 additions & 3 deletions src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@ public KafkaConfiguration Build()

foreach (var cluster in configuration.Clusters)
{
this.dependencyConfigurator.AddSingleton<IClusterManager>(resolver =>
new ClusterManager(resolver.Resolve<ILogHandler>(), cluster));
this.dependencyConfigurator.AddSingleton<IClusterManager>(
resolver =>
new ClusterManager(resolver.Resolve<ILogHandler>(), cluster));
}

this.dependencyConfigurator
.AddTransient(typeof(ILogHandler), this.logHandlerType)
.AddSingleton<IDateTimeProvider, DateTimeProvider>()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory())
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>();
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>()
.AddScoped<WorkerLifetimeContext>()
.AddScoped<IWorkerLifetimeContext>(r => r.Resolve<WorkerLifetimeContext>());

return configuration;
}
Expand Down
26 changes: 12 additions & 14 deletions src/KafkaFlow/Consumers/ConsumerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
using Confluent.Kafka;
using KafkaFlow.Configuration;

internal class ConsumerManager : IConsumerManager, IDisposable
internal class ConsumerManager : IConsumerManager
{
private readonly IOffsetCommitter offsetCommitter;
private readonly IDependencyResolver dependencyResolver;
private readonly ILogHandler logHandler;
private readonly Timer evaluateWorkersCountTimer;

private Timer evaluateWorkersCountTimer;

public ConsumerManager(
IConsumer consumer,
Expand All @@ -30,12 +31,6 @@ public ConsumerManager(
this.WorkerPool = consumerWorkerPool;
this.Feeder = feeder;

this.evaluateWorkersCountTimer = new Timer(
state => _ = this.EvaluateWorkersCountAsync(),
null,
Timeout.Infinite,
Timeout.Infinite);

this.Consumer.OnPartitionsAssigned((_, _, partitions) => this.OnPartitionAssigned(partitions));
this.Consumer.OnPartitionsRevoked((_, _, partitions) => this.OnPartitionRevoked(partitions));
}
Expand All @@ -46,13 +41,16 @@ public ConsumerManager(

public IConsumer Consumer { get; }

public Task StartAsync()
public async Task StartAsync()
{
this.Feeder.Start();
await this.offsetCommitter.StartAsync();

this.StartEvaluateWorkerCountTimer();

return Task.CompletedTask;
this.evaluateWorkersCountTimer = new Timer(
state => _ = this.EvaluateWorkersCountAsync(),
null,
Timeout.Infinite,
Timeout.Infinite);
}

public async Task StopAsync()
Expand All @@ -62,14 +60,14 @@ public async Task StopAsync()
await this.Feeder.StopAsync().ConfigureAwait(false);
await this.WorkerPool.StopAsync().ConfigureAwait(false);

this.offsetCommitter.Dispose();
await this.offsetCommitter.StopAsync();

this.evaluateWorkersCountTimer.Dispose();
this.Consumer.Dispose();
}

public void Dispose()
{
this.evaluateWorkersCountTimer.Dispose();
}

private void StopEvaluateWorkerCountTimer() => this.evaluateWorkersCountTimer.Change(Timeout.Infinite, Timeout.Infinite);
Expand Down
Loading

0 comments on commit 48a7417

Please sign in to comment.