From b0ee22776dbceac56d1d91ed1bb912ccd96830fc Mon Sep 17 00:00:00 2001 From: Robert Cao Date: Mon, 3 Jun 2024 12:50:05 -0500 Subject: [PATCH] refactor comments --- src/Temporalio.Extensions.Hosting/README.md | 2 +- .../TemporalWorkerClientUpdateSubscriber.cs | 68 ------------------- .../TemporalWorkerClientUpdatedEventArgs.cs | 22 ------ .../TemporalWorkerClientUpdater.cs | 50 +++++++------- .../TemporalWorkerService.cs | 23 +++++-- .../TemporalWorkerServiceOptions.cs | 5 -- tests/Temporalio.Tests/AssertMore.cs | 2 +- .../Hosting/TemporalWorkerServiceTests.cs | 24 +++++-- 8 files changed, 63 insertions(+), 133 deletions(-) delete mode 100644 src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdateSubscriber.cs delete mode 100644 src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdatedEventArgs.cs diff --git a/src/Temporalio.Extensions.Hosting/README.md b/src/Temporalio.Extensions.Hosting/README.md index bb93db49..09333055 100644 --- a/src/Temporalio.Extensions.Hosting/README.md +++ b/src/Temporalio.Extensions.Hosting/README.md @@ -139,7 +139,7 @@ TemporalClientConnectOptions clientConnectOptions = new("my-other-temporal-host: ITemporalClient updatedClient = await TemporalClient.ConnectAsync(clientConnectOptions).ConfigureAwait(false); -workerClientUpdater.UpdateTemporalWorkerClient(updatedClient); +workerClientUpdater.UpdateClient(updatedClient); // Make sure you use RunAsync and not Run, see https://github.com/temporalio/sdk-dotnet/issues/220 await host.RunAsync(); diff --git a/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdateSubscriber.cs b/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdateSubscriber.cs deleted file mode 100644 index e8619870..00000000 --- a/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdateSubscriber.cs +++ /dev/null @@ -1,68 +0,0 @@ -using System; -using Temporalio.Worker; - -namespace Temporalio.Extensions.Hosting -{ - /// - /// Subscribes to the , and propagates the worker client changes to the Temporal worker. - /// - public class TemporalWorkerClientUpdateSubscriber : IDisposable - { - private readonly TemporalWorkerClientUpdater temporalWorkerClientUpdater; - private readonly TemporalWorker worker; - private bool disposedValue; - - /// - /// Initializes a new instance of the class. - /// - /// The optional used to subscribe to updates. - /// The that will be updated when the worker client updates. - public TemporalWorkerClientUpdateSubscriber( - TemporalWorkerClientUpdater temporalWorkerClientUpdater, - TemporalWorker worker) - { - this.temporalWorkerClientUpdater = temporalWorkerClientUpdater; - this.worker = worker; - this.temporalWorkerClientUpdater.TemporalWorkerClientUpdated += OnTemporalWorkerClientUpdated; - } - - /// - /// Finalizes an instance of the class. - /// - ~TemporalWorkerClientUpdateSubscriber() => Dispose(false); - - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// - /// Unsubscribes from the worker client updater if one exists. - /// - /// If set to , the worker will unsubscribe from the worker client updater. - protected virtual void Dispose(bool disposing) - { - if (!disposedValue) - { - if (disposing) - { - temporalWorkerClientUpdater.TemporalWorkerClientUpdated -= OnTemporalWorkerClientUpdated; - } - - disposedValue = true; - } - } - - /// - /// Callback invoked when a worker client updated is pushed through the . - /// - /// The sender of the event. - /// The of the event. - private void OnTemporalWorkerClientUpdated(object? sender, TemporalWorkerClientUpdatedEventArgs eventArgs) - { - worker.Client = eventArgs.WorkerClient; - } - } -} diff --git a/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdatedEventArgs.cs b/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdatedEventArgs.cs deleted file mode 100644 index 084e8653..00000000 --- a/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdatedEventArgs.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using Temporalio.Worker; - -namespace Temporalio.Extensions.Hosting -{ - /// - /// Event raised when a worker client is updated. - /// - public class TemporalWorkerClientUpdatedEventArgs : EventArgs - { - /// - /// Initializes a new instance of the class. - /// - /// The client to update workers with. - public TemporalWorkerClientUpdatedEventArgs(IWorkerClient workerClient) => WorkerClient = workerClient; - - /// - /// Gets the that will be propagated to all event listeners. - /// - public IWorkerClient WorkerClient { get; } - } -} diff --git a/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdater.cs b/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdater.cs index 7062f9c4..95e6e4b0 100644 --- a/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdater.cs +++ b/src/Temporalio.Extensions.Hosting/TemporalWorkerClientUpdater.cs @@ -10,39 +10,39 @@ public class TemporalWorkerClientUpdater { private readonly object clientLock = new(); + private event EventHandler? OnClientUpdatedEvent; + /// - /// The used to dispatch notifications to subscribers that the Temporal worker client was updated. + /// Dispatches a notification to all subscribers that a new worker client should be used. /// - public event EventHandler TemporalWorkerClientUpdated + /// The new that should be pushed out to all subscribing workers. + public void UpdateClient(IWorkerClient client) { - add - { - lock (clientLock) - { - TemporalWorkerClientUpdatedEvent += value; - } - } + OnClientUpdatedEvent?.Invoke(this, client); + } - remove + /// + /// Adds a new subscriber that will be notified when a new worker client should be used. + /// + /// The event handler to add to the event listeners. + internal void Subscribe(EventHandler eventHandler) + { + lock (clientLock) { - lock (clientLock) - { - TemporalWorkerClientUpdatedEvent -= value; - } + OnClientUpdatedEvent += eventHandler; } - } - - private event EventHandler? TemporalWorkerClientUpdatedEvent; + } - /// - /// Dispatches a notification to all subscribers that a new worker client should be used. - /// - /// The new that should be pushed out to all subscribing workers. - public void UpdateTemporalWorkerClient(IWorkerClient client) + /// + /// Removes an existing subscriber from receiving notifications when a new worker client should be used. + /// + /// The event handler to remove from the event listeners. + internal void Unsubscribe(EventHandler eventHandler) { - TemporalWorkerClientUpdatedEventArgs eventArgs = new TemporalWorkerClientUpdatedEventArgs(client); - - TemporalWorkerClientUpdatedEvent?.Invoke(this, eventArgs); + lock (clientLock) + { + OnClientUpdatedEvent -= eventHandler; + } } } } diff --git a/src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs b/src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs index 88a02cda..39a5de1f 100644 --- a/src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs +++ b/src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs @@ -15,7 +15,7 @@ namespace Temporalio.Extensions.Hosting /// public class TemporalWorkerService : BackgroundService { - // These two are mutually exclusive + // These two (newClientOptions and existingClient) are mutually exclusive private readonly TemporalClientConnectOptions? newClientOptions; private readonly ITemporalClient? existingClient; private readonly TemporalWorkerOptions workerOptions; @@ -31,8 +31,11 @@ public class TemporalWorkerService : BackgroundService /// Options to use to create the worker service. public TemporalWorkerService(TemporalWorkerServiceOptions options) { - newClientOptions = options.ClientOptions ?? throw new ArgumentException( - "Client options is required", nameof(options)); + if (options.ClientOptions == null) + { + throw new ArgumentException("Client options is required", nameof(options)); + } + workerOptions = options; } @@ -157,7 +160,7 @@ public TemporalWorkerService( if (newClientOptions != null && workerOptions.LoggerFactory != null) { newClientOptions.LoggerFactory = workerOptions.LoggerFactory; - } + } if (options.WorkerClientUpdater != null) { @@ -175,10 +178,20 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) if (workerClientUpdater != null) { - using (new TemporalWorkerClientUpdateSubscriber(workerClientUpdater, worker)) + void SubscribeToClientUpdates(object? sender, IWorkerClient updatedClient) { + worker!.Client = updatedClient; + } + + try + { + workerClientUpdater.Subscribe(SubscribeToClientUpdates); await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false); } + finally + { + workerClientUpdater.Unsubscribe(SubscribeToClientUpdates); + } } else { diff --git a/src/Temporalio.Extensions.Hosting/TemporalWorkerServiceOptions.cs b/src/Temporalio.Extensions.Hosting/TemporalWorkerServiceOptions.cs index d9e41f0a..94e859ad 100644 --- a/src/Temporalio.Extensions.Hosting/TemporalWorkerServiceOptions.cs +++ b/src/Temporalio.Extensions.Hosting/TemporalWorkerServiceOptions.cs @@ -46,11 +46,6 @@ public override object Clone() options.ClientOptions = (TemporalClientConnectOptions)ClientOptions.Clone(); } - if (WorkerClientUpdater != null) - { - options.WorkerClientUpdater = WorkerClientUpdater; - } - return options; } diff --git a/tests/Temporalio.Tests/AssertMore.cs b/tests/Temporalio.Tests/AssertMore.cs index 32eeaafe..3175bb50 100644 --- a/tests/Temporalio.Tests/AssertMore.cs +++ b/tests/Temporalio.Tests/AssertMore.cs @@ -49,7 +49,7 @@ await HasEventEventuallyAsync( public static Task HasEventEventuallyAsync(WorkflowHandle handle, Func predicate) { - return AssertMore.EventuallyAsync(async () => + return EventuallyAsync(async () => { await foreach (var evt in handle.FetchHistoryEventsAsync()) { diff --git a/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs b/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs index 32fc3e4e..e9981823 100644 --- a/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs +++ b/tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs @@ -1,5 +1,3 @@ -using Temporalio.Tests.Worker; - #pragma warning disable SA1201, SA1204 // We want to have classes near their tests namespace Temporalio.Tests.Extensions.Hosting; @@ -210,6 +208,20 @@ public async Task TemporalWorkerService_ExecuteAsync_MultipleWorkers() result); } + [Workflow] + public class TickingWorkflow + { + [WorkflowRun] + public async Task RunAsync() + { + // Just tick every 100ms for 10s + for (var i = 0; i < 100; i++) + { + await Workflow.DelayAsync(100); + } + } + } + [Fact] public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient() { @@ -224,10 +236,10 @@ public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient() // Start both workflows on different servers var taskQueue = $"tq-{Guid.NewGuid()}"; var handle1 = await Client.StartWorkflowAsync( - (WorkflowWorkerTests.TickingWorkflow wf) => wf.RunAsync(), + (TickingWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue)); var handle2 = await otherEnv.Client.StartWorkflowAsync( - (WorkflowWorkerTests.TickingWorkflow wf) => wf.RunAsync(), + (TickingWorkflow wf) => wf.RunAsync(), new(id: $"workflow-{Guid.NewGuid()}", taskQueue)); var bld = Host.CreateApplicationBuilder(); @@ -241,7 +253,7 @@ public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient() bld.Services. AddSingleton(Client). AddHostedTemporalWorker(taskQueue). - AddWorkflow() + AddWorkflow() .ConfigureOptions() .Configure((options, updater) => { @@ -264,7 +276,7 @@ public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient() // Now replace the client, which should be used fairly quickly because we should have // timer-done poll completions every 100ms - workerClientUpdater.UpdateTemporalWorkerClient(otherEnv.Client); + workerClientUpdater.UpdateClient(otherEnv.Client); // Now confirm the other workflow has started await AssertMore.HasEventEventuallyAsync(handle2, e => e.WorkflowTaskCompletedEventAttributes != null);