Skip to content

Commit

Permalink
feat: creates worker context and worker events
Browse files Browse the repository at this point in the history
  • Loading branch information
filipeesch committed Jul 13, 2023
1 parent 1d17c14 commit a6ee2fe
Show file tree
Hide file tree
Showing 17 changed files with 180 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
Expand All @@ -17,5 +18,15 @@ public interface IWorker
/// </summary>
/// <param name="handler"><see cref="Action"/> to be executed</param>
void OnTaskCompleted(Action handler);

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject> WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject> WorkerStopped { get; }
}
}
13 changes: 13 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/IWorkerContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace KafkaFlow
{
/// <summary>
/// Provides access to the current consumer worker context. This interface only returns values when inside a middleware with Worker lifetime; otherwise, it will return null.
/// </summary>
public interface IWorkerContext
{
/// <summary>
/// Gets the current worker in the context.
/// </summary>
IWorker Worker { get; }
}
}
11 changes: 11 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace KafkaFlow
{
using KafkaFlow.Observer;

/// <summary>
/// Represents a subject specific to worker stopped events where observers can subscribe to receive notifications.
/// </summary>
public class WorkerStoppedSubject : Subject<WorkerStoppedSubject>
{
}
}
11 changes: 11 additions & 0 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace KafkaFlow
{
using KafkaFlow.Observer;

/// <summary>
/// Represents a subject specific to worker stopping events where observers can subscribe to receive notifications.
/// </summary>
public class WorkerStoppingSubject : Subject<WorkerStoppingSubject>
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,35 @@ public static IDependencyConfigurator AddSingleton<TService>(
InstanceLifetime.Singleton);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(this IDependencyConfigurator configurator)
where TService : class
{
return configurator.Add<TService>(InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a scoped type mapping where the returned instance will be given by the provided factory
/// </summary>
/// <param name="configurator">The <see cref="IDependencyConfigurator"/> object that this method was called on</param>
/// <param name="factory">A factory to create new instances of the service implementation</param>
/// <typeparam name="TService">Type that will be created</typeparam>
/// <returns></returns>
public static IDependencyConfigurator AddScoped<TService>(
this IDependencyConfigurator configurator,
Func<IDependencyResolver, TService> factory)
{
return configurator.Add(
typeof(TService),
factory,
InstanceLifetime.Scoped);
}

/// <summary>
/// Registers a transient type mapping
/// </summary>
Expand Down
16 changes: 16 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace KafkaFlow.Observer
{
/// <summary>
/// Represents a subject in the observer design pattern that can be observed by observers.
/// </summary>
/// <typeparam name="T">The type of the subject.</typeparam>
public interface ISubject<T>
where T : ISubject<T>
{
/// <summary>
/// Subscribes an observer to the subject.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
void Subscribe(ISubjectObserver<T> observer);
}
}
18 changes: 18 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace KafkaFlow.Observer
{
using System.Threading.Tasks;

/// <summary>
/// Represents an observer in the observer design pattern that can receive notifications from a subject.
/// </summary>
/// <typeparam name="T">The type of the subject.</typeparam>
public interface ISubjectObserver<T>
where T : ISubject<T>
{
/// <summary>
/// Called when a notification is received from the subject.
/// </summary>
/// <returns>A task representing the asynchronous notification handling.</returns>
Task OnNotification();
}
}
33 changes: 33 additions & 0 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace KafkaFlow.Observer
{
using System.Collections.Generic;
using System.Threading.Tasks;

/// <summary>
/// Represents a subject in the observer design pattern that can be observed by multiple observers.
/// </summary>
/// <typeparam name="T">The type of the subject.</typeparam>
public abstract class Subject<T> : ISubject<T>
where T : ISubject<T>
{
private readonly List<ISubjectObserver<T>> observers = new();

/// <summary>
/// Subscribes an observer to the subject, allowing it to receive notifications.
/// </summary>
/// <param name="observer">The observer to subscribe.</param>
public void Subscribe(ISubjectObserver<T> observer) => this.observers.Add(observer);

/// <summary>
/// Notifies all subscribed observers asynchronously.
/// </summary>
/// <returns>A task representing the asynchronous notification operation.</returns>
public async Task NotifyAsync()
{
foreach (var observer in this.observers)
{
await observer.OnNotification();
}
}
}
}
9 changes: 6 additions & 3 deletions src/KafkaFlow/Configuration/KafkaConfigurationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@ public KafkaConfiguration Build()

foreach (var cluster in configuration.Clusters)
{
this.dependencyConfigurator.AddSingleton<IClusterManager>(resolver =>
new ClusterManager(resolver.Resolve<ILogHandler>(), cluster));
this.dependencyConfigurator.AddSingleton<IClusterManager>(
resolver =>
new ClusterManager(resolver.Resolve<ILogHandler>(), cluster));
}

this.dependencyConfigurator
.AddTransient(typeof(ILogHandler), this.logHandlerType)
.AddSingleton<IDateTimeProvider, DateTimeProvider>()
.AddSingleton<IConsumerAccessor>(new ConsumerAccessor())
.AddSingleton<IConsumerManagerFactory>(new ConsumerManagerFactory())
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>();
.AddSingleton<IClusterManagerAccessor, ClusterManagerAccessor>()
.AddScoped<WorkerContext>()
.AddScoped<IWorkerContext>(r => r.Resolve<WorkerContext>());

return configuration;
}
Expand Down
22 changes: 18 additions & 4 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ namespace KafkaFlow.Consumers
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using KafkaFlow.Observer;

internal class ConsumerWorker : IConsumerWorker
{
private readonly IConsumer consumer;
private readonly IDependencyResolver consumerDependencyResolver;
private readonly IDependencyResolverScope workerDependencyResolverScope;
private readonly IMiddlewareExecutor middlewareExecutor;
private readonly ILogHandler logHandler;

private readonly Channel<IMessageContext> messagesBuffer;

private readonly WorkerStoppingSubject workerStoppingSubject = new();
private readonly WorkerStoppedSubject workerStoppedSubject = new();

private CancellationTokenSource stopCancellationTokenSource;
private IDependencyResolverScope workerDependencyResolverScope;
private Task backgroundTask;
private Action onMessageFinishedHandler;

Expand All @@ -28,10 +31,14 @@ public ConsumerWorker(
{
this.Id = workerId;
this.consumer = consumer;
this.consumerDependencyResolver = consumerDependencyResolver;
this.workerDependencyResolverScope = consumerDependencyResolver.CreateScope();
this.middlewareExecutor = middlewareExecutor;
this.logHandler = logHandler;
this.messagesBuffer = Channel.CreateBounded<IMessageContext>(consumer.Configuration.BufferSize);

var workerContext = this.workerDependencyResolverScope.Resolver.Resolve<WorkerContext>();

workerContext.Worker = this;
}

public int Id { get; }
Expand All @@ -40,6 +47,10 @@ public ConsumerWorker(

public IDependencyResolver WorkerDependencyResolver => this.workerDependencyResolverScope.Resolver;

public ISubject<WorkerStoppingSubject> WorkerStopping => this.workerStoppingSubject;

public ISubject<WorkerStoppedSubject> WorkerStopped => this.workerStoppedSubject;

public ValueTask EnqueueAsync(
IMessageContext context,
CancellationToken stopCancellationToken)
Expand All @@ -50,7 +61,6 @@ public ValueTask EnqueueAsync(
public Task StartAsync()
{
this.stopCancellationTokenSource = new CancellationTokenSource();
this.workerDependencyResolverScope = this.consumerDependencyResolver.CreateScope();

this.backgroundTask = Task.Run(
async () =>
Expand Down Expand Up @@ -84,6 +94,8 @@ public Task StartAsync()

public async Task StopAsync()
{
await this.workerStoppingSubject.NotifyAsync();

this.messagesBuffer.Writer.TryComplete();

if (this.stopCancellationTokenSource.Token.CanBeCanceled)
Expand All @@ -92,6 +104,8 @@ public async Task StopAsync()
}

await this.backgroundTask.ConfigureAwait(false);

await this.workerStoppedSubject.NotifyAsync();
}

public void Dispose()
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace KafkaFlow.Consumers
using System.Threading.Tasks;
using Confluent.Kafka;
using KafkaFlow.Configuration;
using KafkaFlow.Core.Observer;
using KafkaFlow.Observer;

internal class ConsumerWorkerPool : IConsumerWorkerPool, IDisposable
{
Expand Down Expand Up @@ -109,7 +109,7 @@ public async Task StopAsync()

this.offsetManager = null;

this.workerPoolStoppedSubject.Notify();
await this.workerPoolStoppedSubject.NotifyAsync();
}

public async Task EnqueueAsync(ConsumeResult<byte[], byte[]> message, CancellationToken stopCancellationToken)
Expand Down
7 changes: 7 additions & 0 deletions src/KafkaFlow/Consumers/WorkerContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace KafkaFlow.Consumers
{
internal class WorkerContext : IWorkerContext
{
public IWorker Worker { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace KafkaFlow.Consumers
{
using KafkaFlow.Core.Observer;
using KafkaFlow.Observer;

internal class WorkerPoolStoppedSubject : Subject<WorkerPoolStoppedSubject>
{
Expand Down
8 changes: 0 additions & 8 deletions src/KafkaFlow/Core/Observer/ISubject.cs

This file was deleted.

8 changes: 0 additions & 8 deletions src/KafkaFlow/Core/Observer/ISubjectObserver.cs

This file was deleted.

20 changes: 0 additions & 20 deletions src/KafkaFlow/Core/Observer/Subject.cs

This file was deleted.

7 changes: 4 additions & 3 deletions src/KafkaFlow/MiddlewareExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace KafkaFlow
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Core.Observer;
using KafkaFlow.Observer;

internal class MiddlewareExecutor
: IMiddlewareExecutor,
Expand All @@ -27,9 +27,10 @@ public Task Execute(IMessageContext context, Func<IMessageContext, Task> nextOpe
return this.ExecuteDefinition(0, context, nextOperation);
}

void ISubjectObserver<WorkerPoolStoppedSubject>.OnNotification()
Task ISubjectObserver<WorkerPoolStoppedSubject>.OnNotification()
{
this.workersMiddlewares.Clear();
this.workersMiddlewares.Clear();
return Task.CompletedTask;
}

private static IMessageMiddleware CreateInstance(
Expand Down

0 comments on commit a6ee2fe

Please sign in to comment.