Skip to content

Commit

Permalink
feat: add sendWithRetry on PublicMessageCommand
Browse files Browse the repository at this point in the history
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
geekusa33 and ChrisKujawa authored Oct 14, 2021
1 parent 68215ef commit e2faa57
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 20 deletions.
3 changes: 1 addition & 2 deletions Client.IntegrationTests/JobWorkerMultiPartitionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class JobWorkerMultiPartitionTest
{
private static readonly string DemoProcessPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Resources", "oneTaskProcess.bpmn");

private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().withPartitionCount(3);
private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().WithPartitionCount(3);
private IZeebeClient zeebeClient;
private long processDefinitionKey;

Expand Down Expand Up @@ -74,7 +74,6 @@ await zeebeClient.NewCreateProcessInstanceCommand()
Assert.AreEqual(3, handledJobs.Count);
}


[Test]
public async Task ShouldActivateAllJobs()
{
Expand Down
2 changes: 1 addition & 1 deletion Client.IntegrationTests/ZeebeIntegrationTestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static ZeebeIntegrationTestHelper Latest()
return new ZeebeIntegrationTestHelper(LatestVersion);
}

public ZeebeIntegrationTestHelper withPartitionCount(int count)
public ZeebeIntegrationTestHelper WithPartitionCount(int count)
{
this.count = count;
return this;
Expand Down
5 changes: 5 additions & 0 deletions Client.UnitTests/TestDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public static IEnumerable<TestCaseData> Provider()
}, new GatewayProtocol.ThrowErrorResponse(),
(RequestCreator<IThrowErrorResponse>)
(zeebeClient => zeebeClient.NewThrowErrorCommand(12113).ErrorCode("Code 13").ErrorMessage("This is a business error!")));
yield return new TestCaseData(
new PublishMessageRequest(),
new GatewayProtocol.PublishMessageResponse(),
(RequestCreator<IPublishMessageResponse>)
(zeebeClient => zeebeClient.NewPublishMessageCommand().MessageName("messageName").CorrelationKey("p-1")));
yield return new TestCaseData(
new ResolveIncidentRequest
{
Expand Down
14 changes: 7 additions & 7 deletions Client/Api/Commands/IPublishMessageCommandStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public interface IPublishMessageCommandStep1
/// <summary>
/// Set the name of the message.
/// </summary>
/// <param name="messageName"> the name of the message</param>
/// <returns>the builder for this command</returns>
/// <param name="messageName"> the name of the message.</param>
/// <returns>the builder for this command.</returns>
IPublishMessageCommandStep2 MessageName(string messageName);
}

Expand All @@ -36,19 +36,19 @@ public interface IPublishMessageCommandStep2
/// This value will be used together with the message name
/// to find matching message subscriptions.
/// </summary>
/// <param name="correlationKey">the correlation key value of the message</param>
/// <param name="correlationKey">the correlation key value of the message.</param>
/// <returns>the builder for this command</returns>
IPublishMessageCommandStep3 CorrelationKey(string correlationKey);
}

public interface IPublishMessageCommandStep3 : IFinalCommandStep<IPublishMessageResponse>
public interface IPublishMessageCommandStep3 : IFinalCommandWithRetryStep<IPublishMessageResponse>
{
/// <summary>
/// Set the id of the message. The message is rejected if another message is already published
/// with the same id, name and correlation-key.
/// </summary>
/// <param name="messageId">the id of the message</param>
/// <returns>the builder for this command. Call <see cref="IFinalCommandStep{T}.Send"/> to complete the command and send
/// <param name="messageId">the id of the message.</param>
/// <returns>the builder for this command. Call <see cref="IFinalCommandWithRetryStep{T}.Send"/> to complete the command and send
/// it to the broker.</returns>
IPublishMessageCommandStep3 MessageId(string messageId);

Expand All @@ -70,7 +70,7 @@ public interface IPublishMessageCommandStep3 : IFinalCommandStep<IPublishMessage
/// Set the variables of the message.
/// </summary>
///
/// <param name="variables">the variables (JSON) as String</param>
/// <param name="variables">the variables (JSON) as String.</param>
/// <returns>the builder for this command. Call <see cref="IFinalCommandStep{T}.Send"/> to complete the command and send
/// it to the broker.</returns>
IPublishMessageCommandStep3 Variables(string variables);
Expand Down
4 changes: 2 additions & 2 deletions Client/Api/Commands/IUpdateRetriesCommandStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public interface IUpdateRetriesCommandStep1
/// If the given retries are greater than zero then this job will be picked up again by a job
/// subscription and a related incident will be marked as resolved.
/// </para>
/// <param name="retries">retries the retries of this job</param>
/// <param name="retries">retries the retries of this job.</param>
/// <returns>
/// the builder for this command. Call <see cref="IFinalCommandStep{T}.Send"/> to complete the command and send it to the broker.
/// </returns>
Expand All @@ -21,6 +21,6 @@ public interface IUpdateRetriesCommandStep1

public interface IUpdateRetriesCommandStep2 : IFinalCommandWithRetryStep<IUpdateRetriesResponse>
{
// the place for new optional parameters
// the place for new optional parameters
}
}
10 changes: 9 additions & 1 deletion Client/Impl/Commands/PublishMessageCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Threading.Tasks;
using GatewayProtocol;
using Zeebe.Client.Api.Commands;
using Zeebe.Client.Api.Misc;
using Zeebe.Client.Api.Responses;
using static GatewayProtocol.Gateway;
using PublishMessageResponse = Zeebe.Client.Impl.Responses.PublishMessageResponse;
Expand All @@ -28,11 +29,13 @@ public class PublishMessageCommand : IPublishMessageCommandStep1, IPublishMessag
{
private readonly PublishMessageRequest request;
private readonly GatewayClient gatewayClient;
private readonly IAsyncRetryStrategy asyncRetryStrategy;

public PublishMessageCommand(GatewayClient client)
public PublishMessageCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy)
{
gatewayClient = client;
request = new PublishMessageRequest();
this.asyncRetryStrategy = asyncRetryStrategy;
}

public IPublishMessageCommandStep3 CorrelationKey(string correlationKey)
Expand Down Expand Up @@ -76,5 +79,10 @@ public async Task<IPublishMessageResponse> Send(CancellationToken cancellationTo
{
return await Send(token: cancellationToken);
}

public async Task<IPublishMessageResponse> SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token));
}
}
}
4 changes: 2 additions & 2 deletions Client/Impl/Commands/ThrowErrorCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ThrowErrorCommand : IThrowErrorCommandStep1, IThrowErrorCommandStep
private readonly ThrowErrorRequest request;
private readonly GatewayClient gatewayClient;
private readonly IAsyncRetryStrategy asyncRetryStrategy;

public ThrowErrorCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy, long jobKey)
{
gatewayClient = client;
Expand Down Expand Up @@ -63,7 +63,7 @@ public async Task<IThrowErrorResponse> Send(CancellationToken cancellationToken)
{
return await Send(token: cancellationToken);
}

public async Task<IThrowErrorResponse> SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token));
Expand Down
2 changes: 1 addition & 1 deletion Client/Impl/Commands/TopologyRequestCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class TopologyRequestCommand : ITopologyRequestStep1

public TopologyRequestCommand(Gateway.GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy)
{

gatewayClient = client;
this.asyncRetryStrategy = asyncRetryStrategy;
}

public async Task<ITopology> Send(TimeSpan? timeout = null, CancellationToken token = default)
Expand Down
8 changes: 4 additions & 4 deletions Client/ZeebeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal ZeebeClient(string address,
/// <summary>
/// Adds keepAlive options to the channel options.
/// </summary>
/// <param name="channelOptions">the current existing channel options</param>
/// <param name="channelOptions">the current existing channel options.</param>
private void AddKeepAliveToChannelOptions(List<ChannelOption> channelOptions, TimeSpan? keepAlive)
{
// GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS
Expand Down Expand Up @@ -130,7 +130,7 @@ public IUpdateRetriesCommandStep1 NewUpdateRetriesCommand(long jobKey)

public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey)
{
return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy,jobKey);
return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy, jobKey);
}

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -164,7 +164,7 @@ public IResolveIncidentCommandStep1 NewResolveIncidentCommand(long incidentKey)

public IPublishMessageCommandStep1 NewPublishMessageCommand()
{
return new PublishMessageCommand(gatewayClient);
return new PublishMessageCommand(gatewayClient, asyncRetryStrategy);
}

public ITopologyRequestStep1 TopologyRequest() => new TopologyRequestCommand(gatewayClient, asyncRetryStrategy);
Expand All @@ -184,7 +184,7 @@ public void Dispose()
/// Creates an new IZeebeClientBuilder. This builder need to be used to construct
/// a ZeebeClient.
/// </summary>
/// <returns>an builder to construct an ZeebeClient</returns>
/// <returns>an builder to construct an ZeebeClient.</returns>
public static IZeebeClientBuilder Builder()
{
return new ZeebeClientBuilder();
Expand Down

0 comments on commit e2faa57

Please sign in to comment.