From 81ed622c09e2f44228c0df0e5b949c9b0877185e Mon Sep 17 00:00:00 2001 From: gardusig Date: Thu, 11 May 2023 13:24:32 -0300 Subject: [PATCH 1/6] Fixed extension with host creation and updated related tests --- Conductor/Client/Worker/WorkflowTaskHost.cs | 3 +-- Tests/Worker/WorkerTests.cs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Conductor/Client/Worker/WorkflowTaskHost.cs b/Conductor/Client/Worker/WorkflowTaskHost.cs index 06a059d6..d1efe4ed 100644 --- a/Conductor/Client/Worker/WorkflowTaskHost.cs +++ b/Conductor/Client/Worker/WorkflowTaskHost.cs @@ -1,7 +1,6 @@ using Conductor.Client.Interfaces; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using System.Collections.Generic; namespace Conductor.Client.Extensions { @@ -14,7 +13,7 @@ public static IHost CreateWorkerHost(LogLevel logLevel = LogLevel.Information) public static IHost CreateWorkerHost(Configuration configuration, LogLevel logLevel = LogLevel.Information) { - return CreateWorkerHost(configuration, logLevel); + return CreateWorkerHost(configuration, logLevel, workers: new IWorkflowTask[0]); } public static IHost CreateWorkerHost(LogLevel logLevel = LogLevel.Information, params T[] workers) where T : IWorkflowTask diff --git a/Tests/Worker/WorkerTests.cs b/Tests/Worker/WorkerTests.cs index bfa2e277..544d1ca3 100644 --- a/Tests/Worker/WorkerTests.cs +++ b/Tests/Worker/WorkerTests.cs @@ -58,7 +58,8 @@ private async System.Threading.Tasks.Task> StartWorkflows( private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout) { - var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug, new ClassWorker()); + var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug); + host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug, new ClassWorker()); await host.StartAsync(); Thread.Sleep(workflowCompletionTimeout); await host.StopAsync(); From 4ade115fbd432a355e93594af3dca685361be86c Mon Sep 17 00:00:00 2001 From: gardusig Date: Thu, 11 May 2023 13:28:59 -0300 Subject: [PATCH 2/6] Updated load for integration tests --- Tests/Worker/WorkerTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/Worker/WorkerTests.cs b/Tests/Worker/WorkerTests.cs index 544d1ca3..6695b9b9 100644 --- a/Tests/Worker/WorkerTests.cs +++ b/Tests/Worker/WorkerTests.cs @@ -30,8 +30,8 @@ public async System.Threading.Tasks.Task TestWorkflowAsyncExecution() { var workflow = GetConductorWorkflow(); ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true); - var workflowIdList = await StartWorkflows(workflow, quantity: 32); - await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(10)); + var workflowIdList = await StartWorkflows(workflow, quantity: 35); + await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20)); await ValidateWorkflowCompletion(workflowIdList.ToArray()); } From c00f10d60e33601496be950e9ef36aa203e68529 Mon Sep 17 00:00:00 2001 From: gardusig Date: Thu, 11 May 2023 14:14:59 -0300 Subject: [PATCH 3/6] Updated test worker --- Tests/Worker/Workers.cs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs index d083d6c8..7560dbc7 100644 --- a/Tests/Worker/Workers.cs +++ b/Tests/Worker/Workers.cs @@ -2,23 +2,34 @@ using Conductor.Client.Interfaces; using Conductor.Client.Models; using Conductor.Client.Worker; +using System; namespace Tests.Worker { [WorkerTask] public class FunctionalWorkers { - // Polls for 1 task every 35ms - [WorkerTask("test-sdk-csharp-task", 1, "taskDomain", 35, "workerId")] - public static TaskResult SimpleWorkerStatic(Task task) + private static Random _random; + + static FunctionalWorkers() + { + _random = new Random(); + } + + // Polls for 5 task every 200ms + [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")] + public static TaskResult SimpleWorker(Task task) { return task.Completed(); } // Polls for 12 tasks every 420ms [WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")] - public TaskResult SimpleWorker(Task task) + public TaskResult LazyWorker(Task task) { + var timeSpan = System.TimeSpan.FromSeconds(_random.Next(2, 6)); + Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Seconds} seconds"); + System.Threading.Tasks.Task.Delay(timeSpan).RunSynchronously(); return task.Completed(); } } From 27e9f51447aa6dcf7d3268c6d8141e53ba1409a7 Mon Sep 17 00:00:00 2001 From: gardusig Date: Thu, 11 May 2023 14:26:06 -0300 Subject: [PATCH 4/6] Updated test worker --- Tests/Worker/Workers.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs index 7560dbc7..9040539c 100644 --- a/Tests/Worker/Workers.cs +++ b/Tests/Worker/Workers.cs @@ -17,7 +17,7 @@ static FunctionalWorkers() } // Polls for 5 task every 200ms - [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")] + [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 100, "workerId")] public static TaskResult SimpleWorker(Task task) { return task.Completed(); @@ -27,8 +27,8 @@ public static TaskResult SimpleWorker(Task task) [WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")] public TaskResult LazyWorker(Task task) { - var timeSpan = System.TimeSpan.FromSeconds(_random.Next(2, 6)); - Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Seconds} seconds"); + var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048)); + Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Milliseconds} ms"); System.Threading.Tasks.Task.Delay(timeSpan).RunSynchronously(); return task.Completed(); } From 31aec6b13ab60094ce6968e2ecfb0d6e481777c6 Mon Sep 17 00:00:00 2001 From: gardusig Date: Thu, 11 May 2023 18:23:40 -0300 Subject: [PATCH 5/6] Updated tests --- Tests/Worker/Workers.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs index 9040539c..975cc5e8 100644 --- a/Tests/Worker/Workers.cs +++ b/Tests/Worker/Workers.cs @@ -17,7 +17,7 @@ static FunctionalWorkers() } // Polls for 5 task every 200ms - [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 100, "workerId")] + [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")] public static TaskResult SimpleWorker(Task task) { return task.Completed(); @@ -29,7 +29,7 @@ public TaskResult LazyWorker(Task task) { var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048)); Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Milliseconds} ms"); - System.Threading.Tasks.Task.Delay(timeSpan).RunSynchronously(); + System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult(); return task.Completed(); } } From 0b01347f59758d57b17d7c71257ab6f185660c77 Mon Sep 17 00:00:00 2001 From: gardusig Date: Thu, 11 May 2023 18:31:51 -0300 Subject: [PATCH 6/6] Improve WorkflowTaskExecutor logic to create a failed task instead of raising an Exception --- Conductor/Client/Worker/WorkflowTaskExecutor.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Conductor/Client/Worker/WorkflowTaskExecutor.cs b/Conductor/Client/Worker/WorkflowTaskExecutor.cs index a51dc4d8..9c05f528 100644 --- a/Conductor/Client/Worker/WorkflowTaskExecutor.cs +++ b/Conductor/Client/Worker/WorkflowTaskExecutor.cs @@ -1,4 +1,5 @@ using Conductor.Client.Interfaces; +using Conductor.Client.Extensions; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -140,8 +141,6 @@ private void ProcessTask(Models.Task task) try { var taskResult = _worker.Execute(task); - taskResult.WorkerId = _workerSettings.WorkerId; - UpdateTask(taskResult); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done processing task for worker" + $", taskType: {_worker.TaskType}" @@ -149,6 +148,7 @@ private void ProcessTask(Models.Task task) + $", taskId: {task.TaskId}" + $", workflowId: {task.WorkflowInstanceId}" ); + UpdateTask(taskResult); } catch (Exception e) { @@ -159,6 +159,8 @@ private void ProcessTask(Models.Task task) + $", taskId: {task.TaskId}" + $", workflowId: {task.WorkflowInstanceId}" ); + var taskResult = task.Failed(e.Message); + UpdateTask(taskResult); } finally { @@ -168,6 +170,7 @@ private void ProcessTask(Models.Task task) private void UpdateTask(Models.TaskResult taskResult) { + taskResult.WorkerId = taskResult.WorkerId ?? _workerSettings.WorkerId; for (var attemptCounter = 0; attemptCounter < UPDATE_TASK_RETRY_COUNT_LIMIT; attemptCounter += 1) { try