Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client/c#): update PublishMessageCommand to run with retries. Also update… #335

Merged
merged 2 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Client.IntegrationTests/JobWorkerMultiPartitionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class JobWorkerMultiPartitionTest
{
private static readonly string DemoProcessPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Resources", "oneTaskProcess.bpmn");

private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().withPartitionCount(3);
private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().WithPartitionCount(3);
private IZeebeClient zeebeClient;
private long processDefinitionKey;

Expand Down Expand Up @@ -74,7 +74,6 @@ await zeebeClient.NewCreateProcessInstanceCommand()
Assert.AreEqual(3, handledJobs.Count);
}


[Test]
public async Task ShouldActivateAllJobs()
{
Expand Down
2 changes: 1 addition & 1 deletion Client.IntegrationTests/ZeebeIntegrationTestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static ZeebeIntegrationTestHelper Latest()
return new ZeebeIntegrationTestHelper(LatestVersion);
}

public ZeebeIntegrationTestHelper withPartitionCount(int count)
public ZeebeIntegrationTestHelper WithPartitionCount(int count)
geekusa33 marked this conversation as resolved.
Show resolved Hide resolved
{
this.count = count;
return this;
Expand Down
5 changes: 5 additions & 0 deletions Client.UnitTests/TestDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public static IEnumerable<TestCaseData> Provider()
}, new GatewayProtocol.ThrowErrorResponse(),
(RequestCreator<IThrowErrorResponse>)
(zeebeClient => zeebeClient.NewThrowErrorCommand(12113).ErrorCode("Code 13").ErrorMessage("This is a business error!")));
yield return new TestCaseData(
new PublishMessageRequest(),
new GatewayProtocol.PublishMessageResponse(),
(RequestCreator<IPublishMessageResponse>)
(zeebeClient => zeebeClient.NewPublishMessageCommand().MessageName("messageName").CorrelationKey("p-1")));
yield return new TestCaseData(
new ResolveIncidentRequest
{
Expand Down
14 changes: 7 additions & 7 deletions Client/Api/Commands/IPublishMessageCommandStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public interface IPublishMessageCommandStep1
/// <summary>
/// Set the name of the message.
/// </summary>
/// <param name="messageName"> the name of the message</param>
/// <returns>the builder for this command</returns>
/// <param name="messageName"> the name of the message.</param>
/// <returns>the builder for this command.</returns>
IPublishMessageCommandStep2 MessageName(string messageName);
}

Expand All @@ -36,19 +36,19 @@ public interface IPublishMessageCommandStep2
/// This value will be used together with the message name
/// to find matching message subscriptions.
/// </summary>
/// <param name="correlationKey">the correlation key value of the message</param>
/// <param name="correlationKey">the correlation key value of the message.</param>
/// <returns>the builder for this command</returns>
IPublishMessageCommandStep3 CorrelationKey(string correlationKey);
}

public interface IPublishMessageCommandStep3 : IFinalCommandStep<IPublishMessageResponse>
public interface IPublishMessageCommandStep3 : IFinalCommandWithRetryStep<IPublishMessageResponse>
{
/// <summary>
/// Set the id of the message. The message is rejected if another message is already published
/// with the same id, name and correlation-key.
/// </summary>
/// <param name="messageId">the id of the message</param>
/// <returns>the builder for this command. Call <see cref="IFinalCommandStep{T}.Send"/> to complete the command and send
/// <param name="messageId">the id of the message.</param>
/// <returns>the builder for this command. Call <see cref="IFinalCommandWithRetryStep{T}.Send"/> to complete the command and send
/// it to the broker.</returns>
IPublishMessageCommandStep3 MessageId(string messageId);

Expand All @@ -70,7 +70,7 @@ public interface IPublishMessageCommandStep3 : IFinalCommandStep<IPublishMessage
/// Set the variables of the message.
/// </summary>
///
/// <param name="variables">the variables (JSON) as String</param>
/// <param name="variables">the variables (JSON) as String.</param>
/// <returns>the builder for this command. Call <see cref="IFinalCommandStep{T}.Send"/> to complete the command and send
/// it to the broker.</returns>
IPublishMessageCommandStep3 Variables(string variables);
Expand Down
4 changes: 2 additions & 2 deletions Client/Api/Commands/IUpdateRetriesCommandStep1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public interface IUpdateRetriesCommandStep1
/// If the given retries are greater than zero then this job will be picked up again by a job
/// subscription and a related incident will be marked as resolved.
/// </para>
/// <param name="retries">retries the retries of this job</param>
/// <param name="retries">retries the retries of this job.</param>
/// <returns>
/// the builder for this command. Call <see cref="IFinalCommandStep{T}.Send"/> to complete the command and send it to the broker.
/// </returns>
Expand All @@ -21,6 +21,6 @@ public interface IUpdateRetriesCommandStep1

public interface IUpdateRetriesCommandStep2 : IFinalCommandWithRetryStep<IUpdateRetriesResponse>
{
// the place for new optional parameters
// the place for new optional parameters
}
}
10 changes: 9 additions & 1 deletion Client/Impl/Commands/PublishMessageCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Threading.Tasks;
using GatewayProtocol;
using Zeebe.Client.Api.Commands;
using Zeebe.Client.Api.Misc;
using Zeebe.Client.Api.Responses;
using static GatewayProtocol.Gateway;
using PublishMessageResponse = Zeebe.Client.Impl.Responses.PublishMessageResponse;
Expand All @@ -28,11 +29,13 @@ public class PublishMessageCommand : IPublishMessageCommandStep1, IPublishMessag
{
private readonly PublishMessageRequest request;
private readonly GatewayClient gatewayClient;
private readonly IAsyncRetryStrategy asyncRetryStrategy;

public PublishMessageCommand(GatewayClient client)
public PublishMessageCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy)
{
gatewayClient = client;
request = new PublishMessageRequest();
this.asyncRetryStrategy = asyncRetryStrategy;
}

public IPublishMessageCommandStep3 CorrelationKey(string correlationKey)
Expand Down Expand Up @@ -76,5 +79,10 @@ public async Task<IPublishMessageResponse> Send(CancellationToken cancellationTo
{
return await Send(token: cancellationToken);
}

public async Task<IPublishMessageResponse> SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token));
}
}
}
4 changes: 2 additions & 2 deletions Client/Impl/Commands/ThrowErrorCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class ThrowErrorCommand : IThrowErrorCommandStep1, IThrowErrorCommandStep
private readonly ThrowErrorRequest request;
private readonly GatewayClient gatewayClient;
private readonly IAsyncRetryStrategy asyncRetryStrategy;

public ThrowErrorCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy, long jobKey)
{
gatewayClient = client;
Expand Down Expand Up @@ -63,7 +63,7 @@ public async Task<IThrowErrorResponse> Send(CancellationToken cancellationToken)
{
return await Send(token: cancellationToken);
}

public async Task<IThrowErrorResponse> SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token));
Expand Down
2 changes: 1 addition & 1 deletion Client/Impl/Commands/TopologyRequestCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class TopologyRequestCommand : ITopologyRequestStep1

public TopologyRequestCommand(Gateway.GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy)
{

gatewayClient = client;
this.asyncRetryStrategy = asyncRetryStrategy;
}

public async Task<ITopology> Send(TimeSpan? timeout = null, CancellationToken token = default)
Expand Down
8 changes: 4 additions & 4 deletions Client/ZeebeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal ZeebeClient(string address,
/// <summary>
/// Adds keepAlive options to the channel options.
/// </summary>
/// <param name="channelOptions">the current existing channel options</param>
/// <param name="channelOptions">the current existing channel options.</param>
private void AddKeepAliveToChannelOptions(List<ChannelOption> channelOptions, TimeSpan? keepAlive)
{
// GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS
Expand Down Expand Up @@ -130,7 +130,7 @@ public IUpdateRetriesCommandStep1 NewUpdateRetriesCommand(long jobKey)

public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey)
{
return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy,jobKey);
return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy, jobKey);
}

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -164,7 +164,7 @@ public IResolveIncidentCommandStep1 NewResolveIncidentCommand(long incidentKey)

public IPublishMessageCommandStep1 NewPublishMessageCommand()
{
return new PublishMessageCommand(gatewayClient);
return new PublishMessageCommand(gatewayClient, asyncRetryStrategy);
}

public ITopologyRequestStep1 TopologyRequest() => new TopologyRequestCommand(gatewayClient, asyncRetryStrategy);
Expand All @@ -184,7 +184,7 @@ public void Dispose()
/// Creates an new IZeebeClientBuilder. This builder need to be used to construct
/// a ZeebeClient.
/// </summary>
/// <returns>an builder to construct an ZeebeClient</returns>
/// <returns>an builder to construct an ZeebeClient.</returns>
public static IZeebeClientBuilder Builder()
{
return new ZeebeClientBuilder();
Expand Down