Skip to content

Commit

Permalink
Merge pull request #172 from zeebe-io/zell-retry
Browse files Browse the repository at this point in the history
Add new SendWithRetry API method
  • Loading branch information
ChrisKujawa authored Sep 13, 2020
2 parents af22fbb + 61655f6 commit e8421da
Show file tree
Hide file tree
Showing 23 changed files with 455 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Client.UnitTests/BaseZeebeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class BaseZeebeTest
private GatewayTestService testService;
private IZeebeClient client;

public Server Server => server;
protected GatewayTestService TestService => testService;
protected IZeebeClient ZeebeClient => client;

Expand All @@ -49,6 +48,7 @@ public void Init()
.Builder()
.UseGatewayAddress("localhost:26500")
.UsePlainText()
.UseRetrySleepDurationProvider(retryAttempt => TimeSpan.Zero)
.Build();
}

Expand Down
90 changes: 89 additions & 1 deletion Client.UnitTests/CancelWorkflowInstanceTest.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
using NUnit.Framework;
using Type = Google.Protobuf.WellKnownTypes.Type;

namespace Zeebe.Client
{
Expand Down Expand Up @@ -38,7 +40,93 @@ public void ShouldTimeoutRequest()
var rpcException = (RpcException) aggregateException.InnerExceptions[0];

// then
Assert.AreEqual(Grpc.Core.StatusCode.DeadlineExceeded, rpcException.Status.StatusCode);
Assert.AreEqual(StatusCode.DeadlineExceeded, rpcException.Status.StatusCode);
}

[Test]
public async Task ShouldRetrySendRequestOnResourceExhaustedUntilSucceed()
{
// given
var countdownEvent = new CountdownEvent(5);
TestService.AddRequestHandler(
typeof(CancelWorkflowInstanceRequest),
req =>
{
countdownEvent.Signal();
throw new RpcException(new Status(StatusCode.ResourceExhausted, "exhausted"));
});
var expectedRequest = new CancelWorkflowInstanceRequest
{
WorkflowInstanceKey = 12113
};

// when
var resultTask = ZeebeClient.NewCancelInstanceCommand(12113).SendWithRetry();
countdownEvent.Wait(TimeSpan.FromSeconds(10));

// then
Assert.AreEqual(0, countdownEvent.CurrentCount);
TestService.AddRequestHandler(typeof(CancelWorkflowInstanceRequest), req => new CancelWorkflowInstanceResponse());
await resultTask;

var request = TestService.Requests[typeof(CancelWorkflowInstanceRequest)][0];
Assert.AreEqual(expectedRequest, request);

var requestCount = TestService.Requests[typeof(CancelWorkflowInstanceRequest)].Count;
Assert.GreaterOrEqual(requestCount, 5);
}

[Test]
public async Task ShouldRetrySendRequestOnUnavailableUntilSucceed()
{
// given
var countdownEvent = new CountdownEvent(5);
TestService.AddRequestHandler(
typeof(CancelWorkflowInstanceRequest),
req =>
{
countdownEvent.Signal();
throw new RpcException(new Status(StatusCode.Unavailable, "exhausted"));
});
var expectedRequest = new CancelWorkflowInstanceRequest
{
WorkflowInstanceKey = 12113
};

// when
var resultTask = ZeebeClient.NewCancelInstanceCommand(12113).SendWithRetry();
countdownEvent.Wait(TimeSpan.FromSeconds(10));

// then
Assert.AreEqual(0, countdownEvent.CurrentCount);
TestService.AddRequestHandler(typeof(CancelWorkflowInstanceRequest), req => new CancelWorkflowInstanceResponse());
await resultTask;

var request = TestService.Requests[typeof(CancelWorkflowInstanceRequest)][0];
Assert.AreEqual(expectedRequest, request);

var requestCount = TestService.Requests[typeof(CancelWorkflowInstanceRequest)].Count;
Assert.GreaterOrEqual(requestCount, 5);
}

[Test]
public void ShouldNotRetrySendRequest()
{
// given
TestService.AddRequestHandler(
typeof(CancelWorkflowInstanceRequest),
req =>
{
throw new RpcException(new Status(StatusCode.Internal, "exhausted"));
});

// when
var resultTask = ZeebeClient.NewCancelInstanceCommand(12113).SendWithRetry();
var aggregateException = Assert.Throws<AggregateException>(() => resultTask.Wait());
var rpcException = (RpcException) aggregateException.InnerExceptions[0];

// then
Assert.AreEqual(StatusCode.Internal, rpcException.Status.StatusCode);
}
}
}
1 change: 0 additions & 1 deletion Client.UnitTests/GatewayTestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
Expand Down
167 changes: 167 additions & 0 deletions Client.UnitTests/TransientGrpcErrorRetryStrategyTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using NUnit.Framework;
using Zeebe.Client.Impl.Misc;

namespace Zeebe.Client
{
[TestFixture]
public class TransientGrpcErrorRetryStrategyTest
{
[Test]
public async Task ShouldRetryOnResourceExhaustedException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var result = await strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.ResourceExhausted, "resourceExhausted"));
});

// then
Assert.AreEqual(3, result);
}

[Test]
public async Task ShouldRetryOnUnavailableException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var result = await strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.Unavailable, "resourceExhausted"));
});

// then
Assert.AreEqual(3, result);
}

[Test]
public async Task ShouldIncrementRetriesOnWaitTimeProvider()
{
// given
var retries = 0;
var values = new List<int>();
var strategy = new TransientGrpcErrorRetryStrategy(retry =>
{
values.Add(retry);
return TimeSpan.Zero;
});

// when
var result = await strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.ResourceExhausted, "resourceExhausted"));
});

// then
Assert.AreEqual(3, result);
CollectionAssert.AreEqual(new List<int> { 1, 2, 3 }, values);
}

[Test]
public void ShouldWaitProvidedTime()
{
// given
var retries = 0;
var countdownEvent = new CountdownEvent(2);
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.FromSeconds(1));

// when
strategy.DoWithRetry(() =>
{
countdownEvent.Signal();
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.ResourceExhausted, "resourceExhausted"));
});
countdownEvent.Wait(TimeSpan.FromMilliseconds(10));

// then
Assert.AreEqual(countdownEvent.CurrentCount, 1);
Assert.AreEqual(retries, 1);
}

[Test]
public void ShouldNotRetryOnOtherRpcException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var resultTask = strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.Unknown, "idk"));
});

// then
var aggregateException = Assert.Throws<AggregateException>(() => resultTask.Wait());
var rpcException = (RpcException) aggregateException.InnerExceptions[0];
Assert.AreEqual(StatusCode.Unknown, rpcException.Status.StatusCode);
}

[Test]
public void ShouldNotRetryOnOtherException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var resultTask = strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new Exception("exception");
});

// then
var aggregateException = Assert.Throws<AggregateException>(() => resultTask.Wait());
var exception = aggregateException.InnerExceptions[0];
Assert.AreEqual("exception", exception.Message);
}
}
}
12 changes: 12 additions & 0 deletions Client/Api/Builder/IZeebeClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ public interface IZeebeClientFinalBuildStep
/// <returns>the final step builder</returns>
IZeebeClientFinalBuildStep UseKeepAlive(TimeSpan keepAlive);

/// <summary>
/// Uses the given duration provider for the send retries.
/// On each retry, the duration to wait is calculated by calling <paramref name="sleepDurationProvider" /> with
/// the current retry number (1 for first retry, 2 for second etc)
///
/// <p>This is an optional configuration. Per default the wait time provider provides base two wait time,
/// 2^1 seconds, 2^2 seconds, 2^3 seconds etc. until one minute is reached.</p>
/// </summary>
/// <param name="sleepDurationProvider">The function that provides the duration to wait for for a particular retry attempt.</param>
/// <returns>the final step builder</returns>
IZeebeClientFinalBuildStep UseRetrySleepDurationProvider(Func<int, TimeSpan> sleepDurationProvider);

/// <summary>
/// Builds the client with the given configuration.
/// </summary>
Expand Down
25 changes: 23 additions & 2 deletions Client/Api/Commands/IFinalCommandStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,37 @@ public interface IFinalCommandStep<T>
/// Sends the command to the Zeebe broker. This operation is asynchronous. In case of success, the
/// task returns the event that was generated by the Zeebe broker in response to the command.
///
/// <para>Use <c>await ...send();</c> to wait until the response is available.</para>
/// <para>Use <c>await ...Send();</c> to wait until the response is available.</para>
///
/// <example>
/// <code>
/// T response = await command.send();
/// T response = await command.Send();
/// </code>
/// </example>
/// </summary>
/// <param name="timeout">the time span after request should be timed out</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> Send(TimeSpan? timeout = null);

/// <summary>
/// Sends the command with retry to the Zeebe broker. This operation is asynchronous. In case of success, the
/// task returns the event that was generated by the Zeebe broker in response to the command.
/// If the sending of the command fails, because of broker back pressure or network issues the request is
/// retried until the command succeeds. The wait time between retries can be configured on the
/// ZeebeClientBuilder. Per default the wait time is based on power two, which means 2^1 seconds, 2^2 seconds
/// etc. until it reaches the maximum of one minute.
///
/// <para>Use <c>await ...SendWithRetry();</c> to wait until the response is available.</para>
///
/// <example>
/// <code>
/// T response = await command.SendWithRetry();
/// </code>
/// </example>
/// </summary>
///
/// <param name="timeout">the time span after request should be timed out</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> SendWithRetry(TimeSpan? timeout = null);
}
}
17 changes: 17 additions & 0 deletions Client/Api/Misc/IAsyncRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Threading.Tasks;

namespace Zeebe.Client.Api.Misc
{
public interface IAsyncRetryStrategy
{
/// <summary>
/// Runs the given asynchronous action asynchronously and retries it if it fails.
/// When and how it is retried is depended of the implementation.
/// </summary>
/// <param name="action">the action which should be run and retried</param>
/// <typeparam name="TResult">the result type</typeparam>
/// <returns>the result of the action</returns>
Task<TResult> DoWithRetry<TResult>(Func<Task<TResult>> action);
}
}
Loading

0 comments on commit e8421da

Please sign in to comment.