diff --git a/Client.IntegrationTests/JobWorkerMultiPartitionTest.cs b/Client.IntegrationTests/JobWorkerMultiPartitionTest.cs index d3222074..c7a3d776 100644 --- a/Client.IntegrationTests/JobWorkerMultiPartitionTest.cs +++ b/Client.IntegrationTests/JobWorkerMultiPartitionTest.cs @@ -15,7 +15,7 @@ public class JobWorkerMultiPartitionTest { private static readonly string DemoProcessPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Resources", "oneTaskProcess.bpmn"); - private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().withPartitionCount(3); + private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().WithPartitionCount(3); private IZeebeClient zeebeClient; private long processDefinitionKey; @@ -74,7 +74,6 @@ await zeebeClient.NewCreateProcessInstanceCommand() Assert.AreEqual(3, handledJobs.Count); } - [Test] public async Task ShouldActivateAllJobs() { diff --git a/Client.IntegrationTests/ZeebeIntegrationTestHelper.cs b/Client.IntegrationTests/ZeebeIntegrationTestHelper.cs index 005ac4f2..9e8a1a01 100644 --- a/Client.IntegrationTests/ZeebeIntegrationTestHelper.cs +++ b/Client.IntegrationTests/ZeebeIntegrationTestHelper.cs @@ -33,7 +33,7 @@ public static ZeebeIntegrationTestHelper Latest() return new ZeebeIntegrationTestHelper(LatestVersion); } - public ZeebeIntegrationTestHelper withPartitionCount(int count) + public ZeebeIntegrationTestHelper WithPartitionCount(int count) { this.count = count; return this; diff --git a/Client.UnitTests/TestDataProvider.cs b/Client.UnitTests/TestDataProvider.cs index 5d12ffa2..146a2807 100644 --- a/Client.UnitTests/TestDataProvider.cs +++ b/Client.UnitTests/TestDataProvider.cs @@ -60,6 +60,11 @@ public static IEnumerable Provider() }, new GatewayProtocol.ThrowErrorResponse(), (RequestCreator) (zeebeClient => zeebeClient.NewThrowErrorCommand(12113).ErrorCode("Code 13").ErrorMessage("This is a business error!"))); + yield return new TestCaseData( + new PublishMessageRequest(), + new GatewayProtocol.PublishMessageResponse(), + (RequestCreator) + (zeebeClient => zeebeClient.NewPublishMessageCommand().MessageName("messageName").CorrelationKey("p-1"))); yield return new TestCaseData( new ResolveIncidentRequest { diff --git a/Client/Api/Commands/IPublishMessageCommandStep1.cs b/Client/Api/Commands/IPublishMessageCommandStep1.cs index f298cbc7..30c56faa 100644 --- a/Client/Api/Commands/IPublishMessageCommandStep1.cs +++ b/Client/Api/Commands/IPublishMessageCommandStep1.cs @@ -23,8 +23,8 @@ public interface IPublishMessageCommandStep1 /// /// Set the name of the message. /// - /// the name of the message - /// the builder for this command + /// the name of the message. + /// the builder for this command. IPublishMessageCommandStep2 MessageName(string messageName); } @@ -36,19 +36,19 @@ public interface IPublishMessageCommandStep2 /// This value will be used together with the message name /// to find matching message subscriptions. /// - /// the correlation key value of the message + /// the correlation key value of the message. /// the builder for this command IPublishMessageCommandStep3 CorrelationKey(string correlationKey); } - public interface IPublishMessageCommandStep3 : IFinalCommandStep + public interface IPublishMessageCommandStep3 : IFinalCommandWithRetryStep { /// /// Set the id of the message. The message is rejected if another message is already published /// with the same id, name and correlation-key. /// - /// the id of the message - /// the builder for this command. Call to complete the command and send + /// the id of the message. + /// the builder for this command. Call to complete the command and send /// it to the broker. IPublishMessageCommandStep3 MessageId(string messageId); @@ -70,7 +70,7 @@ public interface IPublishMessageCommandStep3 : IFinalCommandStep /// - /// the variables (JSON) as String + /// the variables (JSON) as String. /// the builder for this command. Call to complete the command and send /// it to the broker. IPublishMessageCommandStep3 Variables(string variables); diff --git a/Client/Api/Commands/IUpdateRetriesCommandStep1.cs b/Client/Api/Commands/IUpdateRetriesCommandStep1.cs index 0c4346c3..cd7e1bf6 100644 --- a/Client/Api/Commands/IUpdateRetriesCommandStep1.cs +++ b/Client/Api/Commands/IUpdateRetriesCommandStep1.cs @@ -12,7 +12,7 @@ public interface IUpdateRetriesCommandStep1 /// If the given retries are greater than zero then this job will be picked up again by a job /// subscription and a related incident will be marked as resolved. /// - /// retries the retries of this job + /// retries the retries of this job. /// /// the builder for this command. Call to complete the command and send it to the broker. /// @@ -21,6 +21,6 @@ public interface IUpdateRetriesCommandStep1 public interface IUpdateRetriesCommandStep2 : IFinalCommandWithRetryStep { - // the place for new optional parameters + // the place for new optional parameters } } \ No newline at end of file diff --git a/Client/Impl/Commands/PublishMessageCommand.cs b/Client/Impl/Commands/PublishMessageCommand.cs index 546c88f2..56b3a36a 100644 --- a/Client/Impl/Commands/PublishMessageCommand.cs +++ b/Client/Impl/Commands/PublishMessageCommand.cs @@ -18,6 +18,7 @@ using System.Threading.Tasks; using GatewayProtocol; using Zeebe.Client.Api.Commands; +using Zeebe.Client.Api.Misc; using Zeebe.Client.Api.Responses; using static GatewayProtocol.Gateway; using PublishMessageResponse = Zeebe.Client.Impl.Responses.PublishMessageResponse; @@ -28,11 +29,13 @@ public class PublishMessageCommand : IPublishMessageCommandStep1, IPublishMessag { private readonly PublishMessageRequest request; private readonly GatewayClient gatewayClient; + private readonly IAsyncRetryStrategy asyncRetryStrategy; - public PublishMessageCommand(GatewayClient client) + public PublishMessageCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy) { gatewayClient = client; request = new PublishMessageRequest(); + this.asyncRetryStrategy = asyncRetryStrategy; } public IPublishMessageCommandStep3 CorrelationKey(string correlationKey) @@ -76,5 +79,10 @@ public async Task Send(CancellationToken cancellationTo { return await Send(token: cancellationToken); } + + public async Task SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default) + { + return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token)); + } } } diff --git a/Client/Impl/Commands/ThrowErrorCommand.cs b/Client/Impl/Commands/ThrowErrorCommand.cs index bb903443..07a3b22b 100644 --- a/Client/Impl/Commands/ThrowErrorCommand.cs +++ b/Client/Impl/Commands/ThrowErrorCommand.cs @@ -29,7 +29,7 @@ public class ThrowErrorCommand : IThrowErrorCommandStep1, IThrowErrorCommandStep private readonly ThrowErrorRequest request; private readonly GatewayClient gatewayClient; private readonly IAsyncRetryStrategy asyncRetryStrategy; - + public ThrowErrorCommand(GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy, long jobKey) { gatewayClient = client; @@ -63,7 +63,7 @@ public async Task Send(CancellationToken cancellationToken) { return await Send(token: cancellationToken); } - + public async Task SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default) { return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token)); diff --git a/Client/Impl/Commands/TopologyRequestCommand.cs b/Client/Impl/Commands/TopologyRequestCommand.cs index 20993e8d..1f0294ae 100644 --- a/Client/Impl/Commands/TopologyRequestCommand.cs +++ b/Client/Impl/Commands/TopologyRequestCommand.cs @@ -32,8 +32,8 @@ public class TopologyRequestCommand : ITopologyRequestStep1 public TopologyRequestCommand(Gateway.GatewayClient client, IAsyncRetryStrategy asyncRetryStrategy) { - gatewayClient = client; + this.asyncRetryStrategy = asyncRetryStrategy; } public async Task Send(TimeSpan? timeout = null, CancellationToken token = default) diff --git a/Client/ZeebeClient.cs b/Client/ZeebeClient.cs index 863ad018..e7cd9a4c 100644 --- a/Client/ZeebeClient.cs +++ b/Client/ZeebeClient.cs @@ -78,7 +78,7 @@ internal ZeebeClient(string address, /// /// Adds keepAlive options to the channel options. /// - /// the current existing channel options + /// the current existing channel options. private void AddKeepAliveToChannelOptions(List channelOptions, TimeSpan? keepAlive) { // GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS @@ -130,7 +130,7 @@ public IUpdateRetriesCommandStep1 NewUpdateRetriesCommand(long jobKey) public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey) { - return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy,jobKey); + return new ThrowErrorCommand(gatewayClient, asyncRetryStrategy, jobKey); } //////////////////////////////////////////////////////////////////////// @@ -164,7 +164,7 @@ public IResolveIncidentCommandStep1 NewResolveIncidentCommand(long incidentKey) public IPublishMessageCommandStep1 NewPublishMessageCommand() { - return new PublishMessageCommand(gatewayClient); + return new PublishMessageCommand(gatewayClient, asyncRetryStrategy); } public ITopologyRequestStep1 TopologyRequest() => new TopologyRequestCommand(gatewayClient, asyncRetryStrategy); @@ -184,7 +184,7 @@ public void Dispose() /// Creates an new IZeebeClientBuilder. This builder need to be used to construct /// a ZeebeClient. /// - /// an builder to construct an ZeebeClient + /// an builder to construct an ZeebeClient. public static IZeebeClientBuilder Builder() { return new ZeebeClientBuilder();