From 177f3831e80fa02cda118abeecc98e5aead20ee1 Mon Sep 17 00:00:00 2001 From: Terje Innerdal Date: Fri, 19 Feb 2021 11:41:19 +0100 Subject: [PATCH] fix(client): Call SendWithRetry from JobWorker.PollJobs to avoid looping without delay if an RpcException is thrown --- Client/Impl/Commands/ActivateJobsCommand.cs | 14 +++++++++++--- Client/Impl/Worker/JobWorker.cs | 2 +- Client/ZeebeClient.cs | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/Client/Impl/Commands/ActivateJobsCommand.cs b/Client/Impl/Commands/ActivateJobsCommand.cs index fd334b0f..23bfd173 100644 --- a/Client/Impl/Commands/ActivateJobsCommand.cs +++ b/Client/Impl/Commands/ActivateJobsCommand.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using GatewayProtocol; using Zeebe.Client.Api.Commands; +using Zeebe.Client.Api.Misc; using Zeebe.Client.Api.Responses; using static GatewayProtocol.Gateway; @@ -13,9 +14,11 @@ internal class ActivateJobsCommand : IActivateJobsCommandStep1, IActivateJobsCom { private readonly JobActivator activator; public ActivateJobsRequest Request { get; } + private readonly IAsyncRetryStrategy asyncRetryStrategy; - public ActivateJobsCommand(GatewayClient client) + public ActivateJobsCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy) { + this.asyncRetryStrategy = asyncRetryStrategy; activator = new JobActivator(client); Request = new ActivateJobsRequest(); } @@ -72,9 +75,14 @@ public async Task Send(TimeSpan? timeout, CancellationTok return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), cancellationToken); } - public Task SendWithRetry(TimeSpan? timespan = null) + public async Task SendWithRetry(TimeSpan? timespan) { - throw new NotImplementedException(); + return await asyncRetryStrategy.DoWithRetry(() => Send(timespan)); + } + + public async Task SendWithRetry(TimeSpan? timespan, CancellationToken? cancellationToken) + { + return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, cancellationToken)); } } } diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index 71319f0b..eaa361d2 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -145,7 +145,7 @@ private async Task PollJobs(ITargetBlock input, CancellationToken cancella try { - var response = await activateJobsCommand.Send(null, cancellationToken); + var response = await activateJobsCommand.SendWithRetry(null, cancellationToken); await HandleActivationResponse(input, response, jobCount); } catch (RpcException rpcException) diff --git a/Client/ZeebeClient.cs b/Client/ZeebeClient.cs index 1e2a3ab7..f70ef92c 100644 --- a/Client/ZeebeClient.cs +++ b/Client/ZeebeClient.cs @@ -102,7 +102,7 @@ public IJobWorkerBuilderStep1 NewWorker() public IActivateJobsCommandStep1 NewActivateJobsCommand() { - return new ActivateJobsCommand(gatewayClient); + return new ActivateJobsCommand(gatewayClient, asyncRetryStrategy); } public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey)