diff --git a/Conductor/Client/Worker/WorkflowTaskExecutor.cs b/Conductor/Client/Worker/WorkflowTaskExecutor.cs index a51dc4d8..9c05f528 100644 --- a/Conductor/Client/Worker/WorkflowTaskExecutor.cs +++ b/Conductor/Client/Worker/WorkflowTaskExecutor.cs @@ -1,4 +1,5 @@ using Conductor.Client.Interfaces; +using Conductor.Client.Extensions; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -140,8 +141,6 @@ private void ProcessTask(Models.Task task) try { var taskResult = _worker.Execute(task); - taskResult.WorkerId = _workerSettings.WorkerId; - UpdateTask(taskResult); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done processing task for worker" + $", taskType: {_worker.TaskType}" @@ -149,6 +148,7 @@ private void ProcessTask(Models.Task task) + $", taskId: {task.TaskId}" + $", workflowId: {task.WorkflowInstanceId}" ); + UpdateTask(taskResult); } catch (Exception e) { @@ -159,6 +159,8 @@ private void ProcessTask(Models.Task task) + $", taskId: {task.TaskId}" + $", workflowId: {task.WorkflowInstanceId}" ); + var taskResult = task.Failed(e.Message); + UpdateTask(taskResult); } finally { @@ -168,6 +170,7 @@ private void ProcessTask(Models.Task task) private void UpdateTask(Models.TaskResult taskResult) { + taskResult.WorkerId = taskResult.WorkerId ?? _workerSettings.WorkerId; for (var attemptCounter = 0; attemptCounter < UPDATE_TASK_RETRY_COUNT_LIMIT; attemptCounter += 1) { try diff --git a/Conductor/Client/Worker/WorkflowTaskHost.cs b/Conductor/Client/Worker/WorkflowTaskHost.cs index 06a059d6..d1efe4ed 100644 --- a/Conductor/Client/Worker/WorkflowTaskHost.cs +++ b/Conductor/Client/Worker/WorkflowTaskHost.cs @@ -1,7 +1,6 @@ using Conductor.Client.Interfaces; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using System.Collections.Generic; namespace Conductor.Client.Extensions { @@ -14,7 +13,7 @@ public static IHost CreateWorkerHost(LogLevel logLevel = LogLevel.Information) public static IHost CreateWorkerHost(Configuration configuration, LogLevel logLevel = LogLevel.Information) { - return CreateWorkerHost(configuration, logLevel); + return CreateWorkerHost(configuration, logLevel, workers: new IWorkflowTask[0]); } public static IHost CreateWorkerHost(LogLevel logLevel = LogLevel.Information, params T[] workers) where T : IWorkflowTask diff --git a/Tests/Worker/WorkerTests.cs b/Tests/Worker/WorkerTests.cs index bfa2e277..6695b9b9 100644 --- a/Tests/Worker/WorkerTests.cs +++ b/Tests/Worker/WorkerTests.cs @@ -30,8 +30,8 @@ public async System.Threading.Tasks.Task TestWorkflowAsyncExecution() { var workflow = GetConductorWorkflow(); ApiExtensions.GetWorkflowExecutor().RegisterWorkflow(workflow, true); - var workflowIdList = await StartWorkflows(workflow, quantity: 32); - await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(10)); + var workflowIdList = await StartWorkflows(workflow, quantity: 35); + await ExecuteWorkflowTasks(workflowCompletionTimeout: TimeSpan.FromSeconds(20)); await ValidateWorkflowCompletion(workflowIdList.ToArray()); } @@ -58,7 +58,8 @@ private async System.Threading.Tasks.Task> StartWorkflows( private async System.Threading.Tasks.Task ExecuteWorkflowTasks(TimeSpan workflowCompletionTimeout) { - var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug, new ClassWorker()); + var host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug); + host = WorkflowTaskHost.CreateWorkerHost(Microsoft.Extensions.Logging.LogLevel.Debug, new ClassWorker()); await host.StartAsync(); Thread.Sleep(workflowCompletionTimeout); await host.StopAsync(); diff --git a/Tests/Worker/Workers.cs b/Tests/Worker/Workers.cs index d083d6c8..975cc5e8 100644 --- a/Tests/Worker/Workers.cs +++ b/Tests/Worker/Workers.cs @@ -2,23 +2,34 @@ using Conductor.Client.Interfaces; using Conductor.Client.Models; using Conductor.Client.Worker; +using System; namespace Tests.Worker { [WorkerTask] public class FunctionalWorkers { - // Polls for 1 task every 35ms - [WorkerTask("test-sdk-csharp-task", 1, "taskDomain", 35, "workerId")] - public static TaskResult SimpleWorkerStatic(Task task) + private static Random _random; + + static FunctionalWorkers() + { + _random = new Random(); + } + + // Polls for 5 task every 200ms + [WorkerTask("test-sdk-csharp-task", 5, "taskDomain", 200, "workerId")] + public static TaskResult SimpleWorker(Task task) { return task.Completed(); } // Polls for 12 tasks every 420ms [WorkerTask("test-sdk-csharp-task", 12, "taskDomain", 420, "workerId")] - public TaskResult SimpleWorker(Task task) + public TaskResult LazyWorker(Task task) { + var timeSpan = System.TimeSpan.FromMilliseconds(_random.Next(128, 2048)); + Console.WriteLine($"Lazy worker is going to rest for {timeSpan.Milliseconds} ms"); + System.Threading.Tasks.Task.Delay(timeSpan).GetAwaiter().GetResult(); return task.Completed(); } }