Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new SendWithRetry API method #172

Merged
merged 1 commit into from
Sep 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Client.UnitTests/BaseZeebeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class BaseZeebeTest
private GatewayTestService testService;
private IZeebeClient client;

public Server Server => server;
protected GatewayTestService TestService => testService;
protected IZeebeClient ZeebeClient => client;

Expand All @@ -49,6 +48,7 @@ public void Init()
.Builder()
.UseGatewayAddress("localhost:26500")
.UsePlainText()
.UseRetrySleepDurationProvider(retryAttempt => TimeSpan.Zero)
.Build();
}

Expand Down
90 changes: 89 additions & 1 deletion Client.UnitTests/CancelWorkflowInstanceTest.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
using NUnit.Framework;
using Type = Google.Protobuf.WellKnownTypes.Type;

namespace Zeebe.Client
{
Expand Down Expand Up @@ -38,7 +40,93 @@ public void ShouldTimeoutRequest()
var rpcException = (RpcException) aggregateException.InnerExceptions[0];

// then
Assert.AreEqual(Grpc.Core.StatusCode.DeadlineExceeded, rpcException.Status.StatusCode);
Assert.AreEqual(StatusCode.DeadlineExceeded, rpcException.Status.StatusCode);
}

[Test]
public async Task ShouldRetrySendRequestOnResourceExhaustedUntilSucceed()
{
// given
var countdownEvent = new CountdownEvent(5);
TestService.AddRequestHandler(
typeof(CancelWorkflowInstanceRequest),
req =>
{
countdownEvent.Signal();
throw new RpcException(new Status(StatusCode.ResourceExhausted, "exhausted"));
});
var expectedRequest = new CancelWorkflowInstanceRequest
{
WorkflowInstanceKey = 12113
};

// when
var resultTask = ZeebeClient.NewCancelInstanceCommand(12113).SendWithRetry();
countdownEvent.Wait(TimeSpan.FromSeconds(10));

// then
Assert.AreEqual(0, countdownEvent.CurrentCount);
TestService.AddRequestHandler(typeof(CancelWorkflowInstanceRequest), req => new CancelWorkflowInstanceResponse());
await resultTask;

var request = TestService.Requests[typeof(CancelWorkflowInstanceRequest)][0];
Assert.AreEqual(expectedRequest, request);

var requestCount = TestService.Requests[typeof(CancelWorkflowInstanceRequest)].Count;
Assert.GreaterOrEqual(requestCount, 5);
}

[Test]
public async Task ShouldRetrySendRequestOnUnavailableUntilSucceed()
{
// given
var countdownEvent = new CountdownEvent(5);
TestService.AddRequestHandler(
typeof(CancelWorkflowInstanceRequest),
req =>
{
countdownEvent.Signal();
throw new RpcException(new Status(StatusCode.Unavailable, "exhausted"));
});
var expectedRequest = new CancelWorkflowInstanceRequest
{
WorkflowInstanceKey = 12113
};

// when
var resultTask = ZeebeClient.NewCancelInstanceCommand(12113).SendWithRetry();
countdownEvent.Wait(TimeSpan.FromSeconds(10));

// then
Assert.AreEqual(0, countdownEvent.CurrentCount);
TestService.AddRequestHandler(typeof(CancelWorkflowInstanceRequest), req => new CancelWorkflowInstanceResponse());
await resultTask;

var request = TestService.Requests[typeof(CancelWorkflowInstanceRequest)][0];
Assert.AreEqual(expectedRequest, request);

var requestCount = TestService.Requests[typeof(CancelWorkflowInstanceRequest)].Count;
Assert.GreaterOrEqual(requestCount, 5);
}

[Test]
public void ShouldNotRetrySendRequest()
{
// given
TestService.AddRequestHandler(
typeof(CancelWorkflowInstanceRequest),
req =>
{
throw new RpcException(new Status(StatusCode.Internal, "exhausted"));
});

// when
var resultTask = ZeebeClient.NewCancelInstanceCommand(12113).SendWithRetry();
var aggregateException = Assert.Throws<AggregateException>(() => resultTask.Wait());
var rpcException = (RpcException) aggregateException.InnerExceptions[0];

// then
Assert.AreEqual(StatusCode.Internal, rpcException.Status.StatusCode);
}
}
}
1 change: 0 additions & 1 deletion Client.UnitTests/GatewayTestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
Expand Down
167 changes: 167 additions & 0 deletions Client.UnitTests/TransientGrpcErrorRetryStrategyTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using NUnit.Framework;
using Zeebe.Client.Impl.Misc;

namespace Zeebe.Client
{
[TestFixture]
public class TransientGrpcErrorRetryStrategyTest
{
[Test]
public async Task ShouldRetryOnResourceExhaustedException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var result = await strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.ResourceExhausted, "resourceExhausted"));
});

// then
Assert.AreEqual(3, result);
}

[Test]
public async Task ShouldRetryOnUnavailableException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var result = await strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.Unavailable, "resourceExhausted"));
});

// then
Assert.AreEqual(3, result);
}

[Test]
public async Task ShouldIncrementRetriesOnWaitTimeProvider()
{
// given
var retries = 0;
var values = new List<int>();
var strategy = new TransientGrpcErrorRetryStrategy(retry =>
{
values.Add(retry);
return TimeSpan.Zero;
});

// when
var result = await strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.ResourceExhausted, "resourceExhausted"));
});

// then
Assert.AreEqual(3, result);
CollectionAssert.AreEqual(new List<int> { 1, 2, 3 }, values);
}

[Test]
public void ShouldWaitProvidedTime()
{
// given
var retries = 0;
var countdownEvent = new CountdownEvent(2);
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.FromSeconds(1));

// when
strategy.DoWithRetry(() =>
{
countdownEvent.Signal();
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.ResourceExhausted, "resourceExhausted"));
});
countdownEvent.Wait(TimeSpan.FromMilliseconds(10));

// then
Assert.AreEqual(countdownEvent.CurrentCount, 1);
Assert.AreEqual(retries, 1);
}

[Test]
public void ShouldNotRetryOnOtherRpcException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var resultTask = strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new RpcException(new Status(StatusCode.Unknown, "idk"));
});

// then
var aggregateException = Assert.Throws<AggregateException>(() => resultTask.Wait());
var rpcException = (RpcException) aggregateException.InnerExceptions[0];
Assert.AreEqual(StatusCode.Unknown, rpcException.Status.StatusCode);
}

[Test]
public void ShouldNotRetryOnOtherException()
{
// given
int retries = 0;
var strategy = new TransientGrpcErrorRetryStrategy(retry => TimeSpan.Zero);

// when
var resultTask = strategy.DoWithRetry(() =>
{
if (retries == 3)
{
return Task.FromResult(retries);
}

retries++;
throw new Exception("exception");
});

// then
var aggregateException = Assert.Throws<AggregateException>(() => resultTask.Wait());
var exception = aggregateException.InnerExceptions[0];
Assert.AreEqual("exception", exception.Message);
}
}
}
12 changes: 12 additions & 0 deletions Client/Api/Builder/IZeebeClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ public interface IZeebeClientFinalBuildStep
/// <returns>the final step builder</returns>
IZeebeClientFinalBuildStep UseKeepAlive(TimeSpan keepAlive);

/// <summary>
/// Uses the given duration provider for the send retries.
/// On each retry, the duration to wait is calculated by calling <paramref name="sleepDurationProvider" /> with
/// the current retry number (1 for first retry, 2 for second etc)
///
/// <p>This is an optional configuration. Per default the wait time provider provides base two wait time,
/// 2^1 seconds, 2^2 seconds, 2^3 seconds etc. until one minute is reached.</p>
/// </summary>
/// <param name="sleepDurationProvider">The function that provides the duration to wait for for a particular retry attempt.</param>
/// <returns>the final step builder</returns>
IZeebeClientFinalBuildStep UseRetrySleepDurationProvider(Func<int, TimeSpan> sleepDurationProvider);

/// <summary>
/// Builds the client with the given configuration.
/// </summary>
Expand Down
25 changes: 23 additions & 2 deletions Client/Api/Commands/IFinalCommandStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,37 @@ public interface IFinalCommandStep<T>
/// Sends the command to the Zeebe broker. This operation is asynchronous. In case of success, the
/// task returns the event that was generated by the Zeebe broker in response to the command.
///
/// <para>Use <c>await ...send();</c> to wait until the response is available.</para>
/// <para>Use <c>await ...Send();</c> to wait until the response is available.</para>
///
/// <example>
/// <code>
/// T response = await command.send();
/// T response = await command.Send();
/// </code>
/// </example>
/// </summary>
/// <param name="timeout">the time span after request should be timed out</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> Send(TimeSpan? timeout = null);

/// <summary>
/// Sends the command with retry to the Zeebe broker. This operation is asynchronous. In case of success, the
/// task returns the event that was generated by the Zeebe broker in response to the command.
/// If the sending of the command fails, because of broker back pressure or network issues the request is
/// retried until the command succeeds. The wait time between retries can be configured on the
/// ZeebeClientBuilder. Per default the wait time is based on power two, which means 2^1 seconds, 2^2 seconds
/// etc. until it reaches the maximum of one minute.
///
/// <para>Use <c>await ...SendWithRetry();</c> to wait until the response is available.</para>
///
/// <example>
/// <code>
/// T response = await command.SendWithRetry();
/// </code>
/// </example>
/// </summary>
///
/// <param name="timeout">the time span after request should be timed out</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> SendWithRetry(TimeSpan? timeout = null);
}
}
17 changes: 17 additions & 0 deletions Client/Api/Misc/IAsyncRetryStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;
using System.Threading.Tasks;

namespace Zeebe.Client.Api.Misc
{
public interface IAsyncRetryStrategy
{
/// <summary>
/// Runs the given asynchronous action asynchronously and retries it if it fails.
/// When and how it is retried is depended of the implementation.
/// </summary>
/// <param name="action">the action which should be run and retried</param>
/// <typeparam name="TResult">the result type</typeparam>
/// <returns>the result of the action</returns>
Task<TResult> DoWithRetry<TResult>(Func<Task<TResult>> action);
}
}
Loading