Skip to content

Commit

Permalink
refactor comments
Browse files Browse the repository at this point in the history
  • Loading branch information
robcao committed Jun 3, 2024
1 parent 6958e1d commit b0ee227
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 133 deletions.
2 changes: 1 addition & 1 deletion src/Temporalio.Extensions.Hosting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ TemporalClientConnectOptions clientConnectOptions = new("my-other-temporal-host:

ITemporalClient updatedClient = await TemporalClient.ConnectAsync(clientConnectOptions).ConfigureAwait(false);

workerClientUpdater.UpdateTemporalWorkerClient(updatedClient);
workerClientUpdater.UpdateClient(updatedClient);

// Make sure you use RunAsync and not Run, see https://github.com/temporalio/sdk-dotnet/issues/220
await host.RunAsync();
Expand Down

This file was deleted.

This file was deleted.

50 changes: 25 additions & 25 deletions src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdater.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,39 @@ public class TemporalWorkerClientUpdater
{
private readonly object clientLock = new();

private event EventHandler<IWorkerClient>? OnClientUpdatedEvent;

/// <summary>
/// The <see cref="EventHandler"/> used to dispatch notifications to subscribers that the Temporal worker client was updated.
/// Dispatches a notification to all subscribers that a new worker client should be used.
/// </summary>
public event EventHandler<TemporalWorkerClientUpdatedEventArgs> TemporalWorkerClientUpdated
/// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param>
public void UpdateClient(IWorkerClient client)
{
add
{
lock (clientLock)
{
TemporalWorkerClientUpdatedEvent += value;
}
}
OnClientUpdatedEvent?.Invoke(this, client);
}

remove
/// <summary>
/// Adds a new subscriber that will be notified when a new worker client should be used.
/// </summary>
/// <param name="eventHandler">The event handler to add to the event listeners.</param>
internal void Subscribe(EventHandler<IWorkerClient> eventHandler)
{
lock (clientLock)
{
lock (clientLock)
{
TemporalWorkerClientUpdatedEvent -= value;
}
OnClientUpdatedEvent += eventHandler;
}
}

private event EventHandler<TemporalWorkerClientUpdatedEventArgs>? TemporalWorkerClientUpdatedEvent;
}

/// <summary>
/// Dispatches a notification to all subscribers that a new worker client should be used.
/// </summary>
/// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param>
public void UpdateTemporalWorkerClient(IWorkerClient client)
/// <summary>
/// Removes an existing subscriber from receiving notifications when a new worker client should be used.
/// </summary>
/// <param name="eventHandler">The event handler to remove from the event listeners.</param>
internal void Unsubscribe(EventHandler<IWorkerClient> eventHandler)
{
TemporalWorkerClientUpdatedEventArgs eventArgs = new TemporalWorkerClientUpdatedEventArgs(client);

TemporalWorkerClientUpdatedEvent?.Invoke(this, eventArgs);
lock (clientLock)
{
OnClientUpdatedEvent -= eventHandler;
}
}
}
}
23 changes: 18 additions & 5 deletions src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Temporalio.Extensions.Hosting
/// </summary>
public class TemporalWorkerService : BackgroundService
{
// These two are mutually exclusive
// These two (newClientOptions and existingClient) are mutually exclusive
private readonly TemporalClientConnectOptions? newClientOptions;
private readonly ITemporalClient? existingClient;
private readonly TemporalWorkerOptions workerOptions;
Expand All @@ -31,8 +31,11 @@ public class TemporalWorkerService : BackgroundService
/// <param name="options">Options to use to create the worker service.</param>
public TemporalWorkerService(TemporalWorkerServiceOptions options)
{
newClientOptions = options.ClientOptions ?? throw new ArgumentException(
"Client options is required", nameof(options));
if (options.ClientOptions == null)
{
throw new ArgumentException("Client options is required", nameof(options));
}

workerOptions = options;
}

Expand Down Expand Up @@ -157,7 +160,7 @@ public TemporalWorkerService(
if (newClientOptions != null && workerOptions.LoggerFactory != null)
{
newClientOptions.LoggerFactory = workerOptions.LoggerFactory;
}
}

if (options.WorkerClientUpdater != null)
{
Expand All @@ -175,10 +178,20 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

if (workerClientUpdater != null)
{
using (new TemporalWorkerClientUpdateSubscriber(workerClientUpdater, worker))
void SubscribeToClientUpdates(object? sender, IWorkerClient updatedClient)
{
worker!.Client = updatedClient;
}

try
{
workerClientUpdater.Subscribe(SubscribeToClientUpdates);
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
}
finally
{
workerClientUpdater.Unsubscribe(SubscribeToClientUpdates);
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ public override object Clone()
options.ClientOptions = (TemporalClientConnectOptions)ClientOptions.Clone();
}

if (WorkerClientUpdater != null)
{
options.WorkerClientUpdater = WorkerClientUpdater;
}

return options;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Temporalio.Tests/AssertMore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ await HasEventEventuallyAsync(

public static Task HasEventEventuallyAsync(WorkflowHandle handle, Func<HistoryEvent, bool> predicate)
{
return AssertMore.EventuallyAsync(async () =>
return EventuallyAsync(async () =>
{
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using Temporalio.Tests.Worker;

#pragma warning disable SA1201, SA1204 // We want to have classes near their tests
namespace Temporalio.Tests.Extensions.Hosting;

Expand Down Expand Up @@ -210,6 +208,20 @@ public async Task TemporalWorkerService_ExecuteAsync_MultipleWorkers()
result);
}

[Workflow]
public class TickingWorkflow
{
[WorkflowRun]
public async Task RunAsync()
{
// Just tick every 100ms for 10s
for (var i = 0; i < 100; i++)
{
await Workflow.DelayAsync(100);
}
}
}

[Fact]
public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()
{
Expand All @@ -224,10 +236,10 @@ public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()
// Start both workflows on different servers
var taskQueue = $"tq-{Guid.NewGuid()}";
var handle1 = await Client.StartWorkflowAsync(
(WorkflowWorkerTests.TickingWorkflow wf) => wf.RunAsync(),
(TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));
var handle2 = await otherEnv.Client.StartWorkflowAsync(
(WorkflowWorkerTests.TickingWorkflow wf) => wf.RunAsync(),
(TickingWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));

var bld = Host.CreateApplicationBuilder();
Expand All @@ -241,7 +253,7 @@ public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()
bld.Services.
AddSingleton(Client).
AddHostedTemporalWorker(taskQueue).
AddWorkflow<WorkflowWorkerTests.TickingWorkflow>()
AddWorkflow<TickingWorkflow>()
.ConfigureOptions()
.Configure<TemporalWorkerClientUpdater>((options, updater) =>
{
Expand All @@ -264,7 +276,7 @@ public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()

// Now replace the client, which should be used fairly quickly because we should have
// timer-done poll completions every 100ms
workerClientUpdater.UpdateTemporalWorkerClient(otherEnv.Client);
workerClientUpdater.UpdateClient(otherEnv.Client);

// Now confirm the other workflow has started
await AssertMore.HasEventEventuallyAsync(handle2, e => e.WorkflowTaskCompletedEventAttributes != null);
Expand Down

0 comments on commit b0ee227

Please sign in to comment.