Skip to content

Commit

Permalink
fix: collect all job responses
Browse files Browse the repository at this point in the history
Loops over the activation stream until the stream is closed. An consumer was added to get intermediate jobs. In the ActivateJobsCommand the consumer is used to collect all jobs in one response. This is not necessary in the worker, here we want to already work on jobs if they are available. This need to be done as follow up
  • Loading branch information
ChrisKujawa committed Aug 20, 2021
1 parent 257c4ac commit 45fff2a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
4 changes: 3 additions & 1 deletion Client/Impl/Commands/ActivateJobsCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public IActivateJobsCommandStep3 WorkerName(string workerName)

public async Task<IActivateJobsResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), token);
var activateJobsResponses = new Responses.ActivateJobsResponses();
await activator.SendActivateRequest(Request, response => activateJobsResponses.Add(response), timeout?.FromUtcNow(), token);
return activateJobsResponses;
}

public async Task<IActivateJobsResponse> Send(CancellationToken cancellationToken)
Expand Down
14 changes: 7 additions & 7 deletions Client/Impl/Commands/JobActivator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace Zeebe.Client.Impl.Commands
{
public delegate void ConsumeJob(IActivateJobsResponse response);
internal class JobActivator
{
private readonly GatewayClient client;
Expand All @@ -18,20 +19,19 @@ public JobActivator(GatewayClient client)
this.client = client;
}

public async Task<IActivateJobsResponse> SendActivateRequest(ActivateJobsRequest request, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null)
public async Task SendActivateRequest(ActivateJobsRequest request, ConsumeJob consumer, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null)
{
DateTime activateRequestTimeout = requestTimeout ?? CalculateRequestTimeout(request);
using (var stream = client.ActivateJobs(request, deadline: activateRequestTimeout))
{
var responseStream = stream.ResponseStream;
if (await MoveNext(responseStream, cancellationToken))

while (await MoveNext(responseStream, cancellationToken))
{
var response = responseStream.Current;
return new ActivateJobsResponses(response);
var currentResponse = responseStream.Current;
var response = new ActivateJobsResponses(currentResponse);
consumer.Invoke(response);
}

// empty response
return new ActivateJobsResponses();
}
}

Expand Down
13 changes: 12 additions & 1 deletion Client/Impl/Responses/ActivateJobsResponses.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using GatewayProtocol;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client.Impl.Responses
Expand All @@ -15,7 +16,17 @@ public ActivateJobsResponses()

public ActivateJobsResponses(GatewayProtocol.ActivateJobsResponse jobsResponse)
{
Jobs = jobsResponse.Jobs
Jobs = ConvertToList(jobsResponse);
}

public void Add(IActivateJobsResponse jobsResponse)
{
((List<IJob>) Jobs).AddRange(jobsResponse.Jobs);
}

private static List<IJob> ConvertToList(ActivateJobsResponse jobsResponse)
{
return jobsResponse.Jobs
.Select(job => new ActivatedJob(job))
.Cast<IJob>()
.ToList();
Expand Down

0 comments on commit 45fff2a

Please sign in to comment.