Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add tenant id to job / create process command #683

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Client.UnitTests/CreateProcessInstanceTest.cs
Original file line number Diff line number Diff line change
@@ -149,6 +149,27 @@ await ZeebeClient.NewCreateProcessInstanceCommand()
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldSendRequestWithTenantIdAsExpected()
{
// given
var expectedRequest = new CreateProcessInstanceRequest
{
ProcessDefinitionKey = 1,
TenantId = "tenant1"
};

// when
await ZeebeClient.NewCreateProcessInstanceCommand()
.ProcessDefinitionKey(1)
.AddTenantId("tenant1")
.Send();

// then
var request = TestService.Requests[typeof(CreateProcessInstanceRequest)][0];
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldReceiveResponseAsExpected()
{
28 changes: 26 additions & 2 deletions Client.UnitTests/CreateProcessInstanceWithResultTest.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
using NUnit.Framework;
using Type = Google.Protobuf.WellKnownTypes.Type;

namespace Zeebe.Client
{
@@ -215,6 +213,32 @@ await ZeebeClient.NewCreateProcessInstanceCommand()
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldSendRequestWithTenantIdAsExpected()
{
// given
var expectedRequest = new CreateProcessInstanceWithResultRequest
{
Request = new CreateProcessInstanceRequest
{
ProcessDefinitionKey = 1,
TenantId = "tenant1"
},
RequestTimeout = 20 * 1000
};

// when
await ZeebeClient.NewCreateProcessInstanceCommand()
.ProcessDefinitionKey(1)
.AddTenantId("tenant1")
.WithResult()
.Send();

// then
var request = TestService.Requests[typeof(CreateProcessInstanceWithResultRequest)][0];
Assert.AreEqual(expectedRequest, request);
}

[Test]
public async Task ShouldSendRequestWithFetchVariables()
{
53 changes: 53 additions & 0 deletions Client.UnitTests/JobWorkerTest.cs
Original file line number Diff line number Diff line change
@@ -80,6 +80,59 @@ public void ShouldSendRequestReceiveResponseAsExpected()
AssertJob(receivedJobs[2], 3);
}

[Test]
public void ShouldSendRequestWithTenantIdsListReceiveResponseAsExpected()
{
// given
var expectedRequest = new ActivateJobsRequest
{
Timeout = 123_000L,
MaxJobsToActivate = 3,
Type = "foo",
Worker = "jobWorker",
RequestTimeout = 5_000L,
TenantIds = { "1234", "5678", "91011" }
};

TestService.AddRequestHandler(typeof(ActivateJobsRequest), _ => CreateExpectedResponse());

// when
var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
var receivedJobs = new List<IJob>();
using (var jobWorker = ZeebeClient.NewWorker()
.JobType("foo")
.Handler((_, job) =>
{
receivedJobs.Add(job);
if (receivedJobs.Count == 3)
{
signal.Set();
}
})
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(123L))
.PollInterval(TimeSpan.FromMilliseconds(100))
.PollingTimeout(TimeSpan.FromSeconds(5L))
.TenantIds("1234", "5678")
.TenantIds("91011")
.Open())
{
Assert.True(jobWorker.IsOpen());
signal.WaitOne();
}

// then
var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0];
Assert.AreEqual(expectedRequest, actualRequest);

Assert.AreEqual(receivedJobs.Count, 3);

AssertJob(receivedJobs[0], 1);
AssertJob(receivedJobs[1], 2);
AssertJob(receivedJobs[2], 3);
}

[Test]
public void ShouldFailWithZeroThreadCount()
{
2 changes: 1 addition & 1 deletion Client/Api/Commands/ICreateProcessInstanceCommandStep1.cs
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ public interface ICreateProcessInstanceCommandStep2
ICreateProcessInstanceCommandStep3 LatestVersion();
}

public interface ICreateProcessInstanceCommandStep3 : IFinalCommandWithRetryStep<IProcessInstanceResponse>
public interface ICreateProcessInstanceCommandStep3 : IFinalCommandWithRetryStep<IProcessInstanceResponse>, ITenantIdCommandStep<ICreateProcessInstanceCommandStep3>
{
/// <summary>
/// Set the initial variables of the process instance.
3 changes: 3 additions & 0 deletions Client/Api/Responses/IJob.cs
Original file line number Diff line number Diff line change
@@ -61,5 +61,8 @@ public interface IJob

/// <returns> JSON-formatted Custom Headers </returns>
string CustomHeaders { get; }

/// <returns> tenant ID of the process </returns>
string TenantId { get; }
}
}
3 changes: 2 additions & 1 deletion Client/Api/Worker/IJobWorkerBuilderStep1.cs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Zeebe.Client.Api.Commands;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client.Api.Worker
@@ -103,7 +104,7 @@ public interface IJobWorkerBuilderStep2
IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler);
}

public interface IJobWorkerBuilderStep3
public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilderStep3>
{
/// <summary>
/// Set the time for how long a job is exclusively assigned for this worker.
7 changes: 6 additions & 1 deletion Client/Impl/Commands/CreateProcessInstanceCommand.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
@@ -58,6 +57,12 @@ public ICreateProcessInstanceCommandStep3 Variables(string variables)
return this;
}

public ICreateProcessInstanceCommandStep3 AddTenantId(string tenantId)
{
request.TenantId = tenantId;
return this;
}

public ICreateProcessInstanceWithResultCommandStep1 WithResult()
{
return new CreateProcessInstanceCommandWithResult(client, asyncRetryStrategy, request);
8 changes: 6 additions & 2 deletions Client/Impl/Responses/ActivatedJob.cs
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob)
Deadline = FromUTCTimestamp(activatedJob.Deadline);
Variables = activatedJob.Variables;
CustomHeaders = activatedJob.CustomHeaders;
TenantId = activatedJob.TenantId;
}

public long Key { get; }
@@ -70,15 +71,17 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob)

public string CustomHeaders { get; }

public string TenantId { get; }

public override string ToString()
{
return
$"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(ProcessInstanceKey)}: {ProcessInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(ProcessDefinitionVersion)}: {ProcessDefinitionVersion}, {nameof(ProcessDefinitionKey)}: {ProcessDefinitionKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}";
$"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(TenantId)}: {TenantId}, {nameof(ProcessInstanceKey)}: {ProcessInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(ProcessDefinitionVersion)}: {ProcessDefinitionVersion}, {nameof(ProcessDefinitionKey)}: {ProcessDefinitionKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}";
}

protected bool Equals(ActivatedJob other)
{
return Key == other.Key && Type == other.Type && ProcessInstanceKey == other.ProcessInstanceKey &&
return Key == other.Key && Type == other.Type && ProcessInstanceKey == other.ProcessInstanceKey && TenantId == other.TenantId &&
BpmnProcessId == other.BpmnProcessId &&
ProcessDefinitionVersion == other.ProcessDefinitionVersion && ProcessDefinitionKey == other.ProcessDefinitionKey &&
ElementId == other.ElementId && ElementInstanceKey == other.ElementInstanceKey &&
@@ -112,6 +115,7 @@ public override int GetHashCode()
{
var hashCode = Key.GetHashCode();
hashCode = (hashCode * 397) ^ (Type != null ? Type.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ (TenantId != null ? TenantId.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ ProcessInstanceKey.GetHashCode();
hashCode = (hashCode * 397) ^ (BpmnProcessId != null ? BpmnProcessId.GetHashCode() : 0);
hashCode = (hashCode * 397) ^ ProcessDefinitionVersion;
12 changes: 12 additions & 0 deletions Client/Impl/Worker/JobWorkerBuilder.cs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using GatewayProtocol;
using Microsoft.Extensions.Logging;
@@ -64,6 +65,17 @@ public IJobWorkerBuilderStep3 Handler(AsyncJobHandler handler)
return this;
}

public IJobWorkerBuilderStep3 TenantIds(IList<string> tenantIds)
{
Request.TenantIds.AddRange(tenantIds);
return this;
}

public IJobWorkerBuilderStep3 TenantIds(params string[] tenantIds)
{
return TenantIds(tenantIds.ToList());
}

internal AsyncJobHandler Handler()
{
return asyncJobHandler;
Loading