From 45fff2a03621afc2c47e7dafca05a07f8aadf784 Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Fri, 20 Aug 2021 15:10:41 +0200 Subject: [PATCH] fix: collect all job responses Loops over the activation stream until the stream is closed. An consumer was added to get intermediate jobs. In the ActivateJobsCommand the consumer is used to collect all jobs in one response. This is not necessary in the worker, here we want to already work on jobs if they are available. This need to be done as follow up --- Client/Impl/Commands/ActivateJobsCommand.cs | 4 +++- Client/Impl/Commands/JobActivator.cs | 14 +++++++------- Client/Impl/Responses/ActivateJobsResponses.cs | 13 ++++++++++++- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/Client/Impl/Commands/ActivateJobsCommand.cs b/Client/Impl/Commands/ActivateJobsCommand.cs index 3d8a977a..aff0ee89 100644 --- a/Client/Impl/Commands/ActivateJobsCommand.cs +++ b/Client/Impl/Commands/ActivateJobsCommand.cs @@ -67,7 +67,9 @@ public IActivateJobsCommandStep3 WorkerName(string workerName) public async Task Send(TimeSpan? timeout = null, CancellationToken token = default) { - return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), token); + var activateJobsResponses = new Responses.ActivateJobsResponses(); + await activator.SendActivateRequest(Request, response => activateJobsResponses.Add(response), timeout?.FromUtcNow(), token); + return activateJobsResponses; } public async Task Send(CancellationToken cancellationToken) diff --git a/Client/Impl/Commands/JobActivator.cs b/Client/Impl/Commands/JobActivator.cs index c282dc1a..a241d873 100644 --- a/Client/Impl/Commands/JobActivator.cs +++ b/Client/Impl/Commands/JobActivator.cs @@ -9,6 +9,7 @@ namespace Zeebe.Client.Impl.Commands { + public delegate void ConsumeJob(IActivateJobsResponse response); internal class JobActivator { private readonly GatewayClient client; @@ -18,20 +19,19 @@ public JobActivator(GatewayClient client) this.client = client; } - public async Task SendActivateRequest(ActivateJobsRequest request, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null) + public async Task SendActivateRequest(ActivateJobsRequest request, ConsumeJob consumer, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null) { DateTime activateRequestTimeout = requestTimeout ?? CalculateRequestTimeout(request); using (var stream = client.ActivateJobs(request, deadline: activateRequestTimeout)) { var responseStream = stream.ResponseStream; - if (await MoveNext(responseStream, cancellationToken)) + + while (await MoveNext(responseStream, cancellationToken)) { - var response = responseStream.Current; - return new ActivateJobsResponses(response); + var currentResponse = responseStream.Current; + var response = new ActivateJobsResponses(currentResponse); + consumer.Invoke(response); } - - // empty response - return new ActivateJobsResponses(); } } diff --git a/Client/Impl/Responses/ActivateJobsResponses.cs b/Client/Impl/Responses/ActivateJobsResponses.cs index 3c4e0a77..69c9998b 100644 --- a/Client/Impl/Responses/ActivateJobsResponses.cs +++ b/Client/Impl/Responses/ActivateJobsResponses.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Linq; +using GatewayProtocol; using Zeebe.Client.Api.Responses; namespace Zeebe.Client.Impl.Responses @@ -15,7 +16,17 @@ public ActivateJobsResponses() public ActivateJobsResponses(GatewayProtocol.ActivateJobsResponse jobsResponse) { - Jobs = jobsResponse.Jobs + Jobs = ConvertToList(jobsResponse); + } + + public void Add(IActivateJobsResponse jobsResponse) + { + ((List) Jobs).AddRange(jobsResponse.Jobs); + } + + private static List ConvertToList(ActivateJobsResponse jobsResponse) + { + return jobsResponse.Jobs .Select(job => new ActivatedJob(job)) .Cast() .ToList();