diff --git a/Client.UnitTests/CreateProcessInstanceTest.cs b/Client.UnitTests/CreateProcessInstanceTest.cs index b0b56f03..b19d1495 100644 --- a/Client.UnitTests/CreateProcessInstanceTest.cs +++ b/Client.UnitTests/CreateProcessInstanceTest.cs @@ -149,6 +149,27 @@ await ZeebeClient.NewCreateProcessInstanceCommand() Assert.AreEqual(expectedRequest, request); } + [Test] + public async Task ShouldSendRequestWithTenantIdAsExpected() + { + // given + var expectedRequest = new CreateProcessInstanceRequest + { + ProcessDefinitionKey = 1, + TenantId = "tenant1" + }; + + // when + await ZeebeClient.NewCreateProcessInstanceCommand() + .ProcessDefinitionKey(1) + .AddTenantId("tenant1") + .Send(); + + // then + var request = TestService.Requests[typeof(CreateProcessInstanceRequest)][0]; + Assert.AreEqual(expectedRequest, request); + } + [Test] public async Task ShouldReceiveResponseAsExpected() { diff --git a/Client.UnitTests/CreateProcessInstanceWithResultTest.cs b/Client.UnitTests/CreateProcessInstanceWithResultTest.cs index 7593b26d..1c4e1e48 100644 --- a/Client.UnitTests/CreateProcessInstanceWithResultTest.cs +++ b/Client.UnitTests/CreateProcessInstanceWithResultTest.cs @@ -1,12 +1,10 @@ using System; -using System.Collections; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using GatewayProtocol; using Grpc.Core; using NUnit.Framework; -using Type = Google.Protobuf.WellKnownTypes.Type; namespace Zeebe.Client { @@ -215,6 +213,32 @@ await ZeebeClient.NewCreateProcessInstanceCommand() Assert.AreEqual(expectedRequest, request); } + [Test] + public async Task ShouldSendRequestWithTenantIdAsExpected() + { + // given + var expectedRequest = new CreateProcessInstanceWithResultRequest + { + Request = new CreateProcessInstanceRequest + { + ProcessDefinitionKey = 1, + TenantId = "tenant1" + }, + RequestTimeout = 20 * 1000 + }; + + // when + await ZeebeClient.NewCreateProcessInstanceCommand() + .ProcessDefinitionKey(1) + .AddTenantId("tenant1") + .WithResult() + .Send(); + + // then + var request = TestService.Requests[typeof(CreateProcessInstanceWithResultRequest)][0]; + Assert.AreEqual(expectedRequest, request); + } + [Test] public async Task ShouldSendRequestWithFetchVariables() { diff --git a/Client.UnitTests/JobWorkerTest.cs b/Client.UnitTests/JobWorkerTest.cs index a86ca110..fb3df507 100644 --- a/Client.UnitTests/JobWorkerTest.cs +++ b/Client.UnitTests/JobWorkerTest.cs @@ -80,6 +80,59 @@ public void ShouldSendRequestReceiveResponseAsExpected() AssertJob(receivedJobs[2], 3); } + [Test] + public void ShouldSendRequestWithTenantIdsListReceiveResponseAsExpected() + { + // given + var expectedRequest = new ActivateJobsRequest + { + Timeout = 123_000L, + MaxJobsToActivate = 3, + Type = "foo", + Worker = "jobWorker", + RequestTimeout = 5_000L, + TenantIds = { "1234", "5678", "91011" } + }; + + TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse()); + + // when + var signal = new EventWaitHandle(false, EventResetMode.AutoReset); + var receivedJobs = new List(); + using (var jobWorker = ZeebeClient.NewWorker() + .JobType("foo") + .Handler((_, job) => + { + receivedJobs.Add(job); + if (receivedJobs.Count == 3) + { + signal.Set(); + } + }) + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123L)) + .PollInterval(TimeSpan.FromMilliseconds(100)) + .PollingTimeout(TimeSpan.FromSeconds(5L)) + .TenantIds("1234", "5678") + .TenantIds("91011") + .Open()) + { + Assert.True(jobWorker.IsOpen()); + signal.WaitOne(); + } + + // then + var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0]; + Assert.AreEqual(expectedRequest, actualRequest); + + Assert.AreEqual(receivedJobs.Count, 3); + + AssertJob(receivedJobs[0], 1); + AssertJob(receivedJobs[1], 2); + AssertJob(receivedJobs[2], 3); + } + [Test] public void ShouldFailWithZeroThreadCount() { diff --git a/Client/Api/Commands/ICreateProcessInstanceCommandStep1.cs b/Client/Api/Commands/ICreateProcessInstanceCommandStep1.cs index 2f4d31af..1624b452 100644 --- a/Client/Api/Commands/ICreateProcessInstanceCommandStep1.cs +++ b/Client/Api/Commands/ICreateProcessInstanceCommandStep1.cs @@ -43,7 +43,7 @@ public interface ICreateProcessInstanceCommandStep2 ICreateProcessInstanceCommandStep3 LatestVersion(); } - public interface ICreateProcessInstanceCommandStep3 : IFinalCommandWithRetryStep + public interface ICreateProcessInstanceCommandStep3 : IFinalCommandWithRetryStep, ITenantIdCommandStep { /// /// Set the initial variables of the process instance. diff --git a/Client/Api/Responses/IJob.cs b/Client/Api/Responses/IJob.cs index 88d7f9d4..0dbc338c 100644 --- a/Client/Api/Responses/IJob.cs +++ b/Client/Api/Responses/IJob.cs @@ -61,5 +61,8 @@ public interface IJob /// JSON-formatted Custom Headers string CustomHeaders { get; } + + /// tenant ID of the process + string TenantId { get; } } } diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index 265f3a07..ebf5ac8a 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -15,6 +15,7 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; +using Zeebe.Client.Api.Commands; using Zeebe.Client.Api.Responses; namespace Zeebe.Client.Api.Worker @@ -103,7 +104,7 @@ public interface IJobWorkerBuilderStep2 IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler); } - public interface IJobWorkerBuilderStep3 + public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep { /// /// Set the time for how long a job is exclusively assigned for this worker. diff --git a/Client/Impl/Commands/CreateProcessInstanceCommand.cs b/Client/Impl/Commands/CreateProcessInstanceCommand.cs index 8987b1a7..f0a30bc0 100644 --- a/Client/Impl/Commands/CreateProcessInstanceCommand.cs +++ b/Client/Impl/Commands/CreateProcessInstanceCommand.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using GatewayProtocol; @@ -58,6 +57,12 @@ public ICreateProcessInstanceCommandStep3 Variables(string variables) return this; } + public ICreateProcessInstanceCommandStep3 AddTenantId(string tenantId) + { + request.TenantId = tenantId; + return this; + } + public ICreateProcessInstanceWithResultCommandStep1 WithResult() { return new CreateProcessInstanceCommandWithResult(client, asyncRetryStrategy, request); diff --git a/Client/Impl/Responses/ActivatedJob.cs b/Client/Impl/Responses/ActivatedJob.cs index 74d04797..0e0b5c1e 100644 --- a/Client/Impl/Responses/ActivatedJob.cs +++ b/Client/Impl/Responses/ActivatedJob.cs @@ -42,6 +42,7 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob) Deadline = FromUTCTimestamp(activatedJob.Deadline); Variables = activatedJob.Variables; CustomHeaders = activatedJob.CustomHeaders; + TenantId = activatedJob.TenantId; } public long Key { get; } @@ -70,15 +71,17 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob) public string CustomHeaders { get; } + public string TenantId { get; } + public override string ToString() { return - $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(ProcessInstanceKey)}: {ProcessInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(ProcessDefinitionVersion)}: {ProcessDefinitionVersion}, {nameof(ProcessDefinitionKey)}: {ProcessDefinitionKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}"; + $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(TenantId)}: {TenantId}, {nameof(ProcessInstanceKey)}: {ProcessInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(ProcessDefinitionVersion)}: {ProcessDefinitionVersion}, {nameof(ProcessDefinitionKey)}: {ProcessDefinitionKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}"; } protected bool Equals(ActivatedJob other) { - return Key == other.Key && Type == other.Type && ProcessInstanceKey == other.ProcessInstanceKey && + return Key == other.Key && Type == other.Type && ProcessInstanceKey == other.ProcessInstanceKey && TenantId == other.TenantId && BpmnProcessId == other.BpmnProcessId && ProcessDefinitionVersion == other.ProcessDefinitionVersion && ProcessDefinitionKey == other.ProcessDefinitionKey && ElementId == other.ElementId && ElementInstanceKey == other.ElementInstanceKey && @@ -112,6 +115,7 @@ public override int GetHashCode() { var hashCode = Key.GetHashCode(); hashCode = (hashCode * 397) ^ (Type != null ? Type.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ (TenantId != null ? TenantId.GetHashCode() : 0); hashCode = (hashCode * 397) ^ ProcessInstanceKey.GetHashCode(); hashCode = (hashCode * 397) ^ (BpmnProcessId != null ? BpmnProcessId.GetHashCode() : 0); hashCode = (hashCode * 397) ^ ProcessDefinitionVersion; diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index 61c027a0..fe0996de 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -15,6 +15,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using GatewayProtocol; using Microsoft.Extensions.Logging; @@ -64,6 +65,17 @@ public IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler) return this; } + public IJobWorkerBuilderStep3 TenantIds(IList tenantIds) + { + Request.TenantIds.AddRange(tenantIds); + return this; + } + + public IJobWorkerBuilderStep3 TenantIds(params string[] tenantIds) + { + return TenantIds(tenantIds.ToList()); + } + internal AsyncJobHandler Handler() { return asyncJobHandler;