diff --git a/Client.UnitTests/BaseZeebeTest.cs b/Client.UnitTests/BaseZeebeTest.cs
index 75d418f4..e2172b4a 100644
--- a/Client.UnitTests/BaseZeebeTest.cs
+++ b/Client.UnitTests/BaseZeebeTest.cs
@@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+using System;
+using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
@@ -28,8 +30,8 @@ public class BaseZeebeTest
private IZeebeClient client;
public Server Server => server;
- public GatewayTestService TestService => testService;
- public IZeebeClient ZeebeClient => client;
+ protected GatewayTestService TestService => testService;
+ protected IZeebeClient ZeebeClient => client;
[SetUp]
public void Init()
@@ -43,7 +45,11 @@ public void Init()
server.Services.Add(serviceDefinition);
server.Start();
- client = Client.ZeebeClient.Builder().UseGatewayAddress("localhost:26500").UsePlainText().Build();
+ client = Client.ZeebeClient
+ .Builder()
+ .UseGatewayAddress("localhost:26500")
+ .UsePlainText()
+ .Build();
}
[TearDown]
@@ -51,9 +57,18 @@ public void Stop()
{
client.Dispose();
server.ShutdownAsync().Wait();
+ testService.Requests.Clear();
testService = null;
server = null;
client = null;
}
+
+ public void AwaitRequestCount(Type type, int requestCount)
+ {
+ while (TestService.Requests[type].Count < requestCount)
+ {
+ Thread.Sleep(TimeSpan.FromMilliseconds(100));
+ }
+ }
}
}
diff --git a/Client.UnitTests/Client.UnitTests.csproj b/Client.UnitTests/Client.UnitTests.csproj
index cefe857f..929b82fa 100644
--- a/Client.UnitTests/Client.UnitTests.csproj
+++ b/Client.UnitTests/Client.UnitTests.csproj
@@ -5,6 +5,14 @@
Zeebe.Client
+
+ x64
+
+
+
+ x64
+
+
diff --git a/Client.UnitTests/JobHandlerTest.cs b/Client.UnitTests/JobHandlerTest.cs
new file mode 100644
index 00000000..276a2902
--- /dev/null
+++ b/Client.UnitTests/JobHandlerTest.cs
@@ -0,0 +1,173 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using GatewayProtocol;
+using Microsoft.Extensions.Logging;
+using NLog;
+using NUnit.Framework;
+using Zeebe.Client.Api.Responses;
+
+namespace Zeebe.Client.Impl.Worker
+{
+ [TestFixture]
+ public class JobHandlerTest : BaseZeebeTest
+ {
+ private ConcurrentQueue workItems;
+ private ConcurrentQueue seenJobs;
+ private JobWorkerSignal jobWorkerSignal;
+ private JobHandlerExecutor jobHandler;
+ private CancellationTokenSource tokenSource;
+
+ [SetUp]
+ public void SetupTest()
+ {
+ workItems = new ConcurrentQueue();
+ seenJobs = new ConcurrentQueue();
+ var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient);
+ jobWorkerSignal = new JobWorkerSignal();
+
+ jobWorkerBuilder
+ .JobType("foo")
+ .Handler((jobClient, job) => { seenJobs.Enqueue(job); })
+ .MaxJobsActive(3)
+ .Name("jobWorker")
+ .Timeout(TimeSpan.FromSeconds(123L))
+ .PollInterval(TimeSpan.FromMilliseconds(100))
+ .PollingTimeout(TimeSpan.FromSeconds(5L));
+ jobHandler = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal);
+ tokenSource = new CancellationTokenSource();
+ }
+
+ [TearDown]
+ public async Task CleanUp()
+ {
+ tokenSource.Cancel();
+ // delay disposing, since poll and handler take some time to close
+ await Task.Delay(TimeSpan.FromMilliseconds(200))
+ .ContinueWith(t => { tokenSource.Dispose(); });
+
+ tokenSource = null;
+ jobHandler = null;
+ seenJobs = null;
+ workItems = null;
+ jobWorkerSignal = null;
+ }
+
+ [Test]
+ public void ShouldHandleJob()
+ {
+ // given
+ var expectedJob = CreateActivatedJob(1);
+ workItems.Enqueue(CreateActivatedJob(1));
+
+ // when
+ ScheduleHandling();
+
+ // then
+ var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
+ Assert.IsTrue(hasJobHandled);
+ AwaitJobsHaveSeen(1);
+
+ Assert.AreEqual(1, seenJobs.Count);
+ Assert.IsTrue(seenJobs.TryDequeue(out var actualJob));
+ Assert.AreEqual(expectedJob, actualJob);
+ }
+
+ [Test]
+ public void ShouldTriggerJobHandling()
+ {
+ // given
+ var expectedJob = CreateActivatedJob(1);
+ ScheduleHandling();
+ jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
+
+ // when
+ workItems.Enqueue(CreateActivatedJob(1));
+ jobWorkerSignal.SignalJobPolled();
+
+ // then
+ var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
+ Assert.IsTrue(hasJobHandled);
+ AwaitJobsHaveSeen(1);
+
+ Assert.AreEqual(1, seenJobs.Count);
+ Assert.IsTrue(seenJobs.TryDequeue(out var actualJob));
+ Assert.AreEqual(expectedJob, actualJob);
+ }
+
+ [Test]
+ public void ShouldHandleJobsInOrder()
+ {
+ // given
+ workItems.Enqueue(CreateActivatedJob(1));
+ workItems.Enqueue(CreateActivatedJob(2));
+ workItems.Enqueue(CreateActivatedJob(3));
+
+ // when
+ ScheduleHandling();
+
+ // then
+ AwaitJobsHaveSeen(3);
+
+ IJob actualJob;
+ Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
+ Assert.AreEqual(1, actualJob.Key);
+ Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
+ Assert.AreEqual(2, actualJob.Key);
+ Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
+ Assert.AreEqual(3, actualJob.Key);
+ }
+
+ [Test]
+ public void ShouldNotHandleDuplicateOnConcurrentHandlers()
+ {
+ // given
+ workItems.Enqueue(CreateActivatedJob(1));
+ workItems.Enqueue(CreateActivatedJob(2));
+ workItems.Enqueue(CreateActivatedJob(3));
+
+ // when
+ ScheduleHandling();
+ ScheduleHandling();
+
+ // then
+ AwaitJobsHaveSeen(3);
+ CollectionAssert.AllItemsAreUnique(seenJobs);
+ }
+
+ private async void AwaitJobsHaveSeen(int expectedCount)
+ {
+ while (!tokenSource.IsCancellationRequested && seenJobs.Count < expectedCount)
+ {
+ await Task.Delay(25);
+ }
+ }
+
+ private void ScheduleHandling()
+ {
+ Task.Run(() => jobHandler.HandleActivatedJobs(tokenSource.Token), tokenSource.Token);
+ }
+
+ private static Responses.ActivatedJob CreateActivatedJob(long key)
+ {
+ return new Responses.ActivatedJob(new ActivatedJob
+ {
+ Key = key,
+ Worker = "jobWorker",
+ Type = "foo",
+ Variables = "{\"foo\":1}",
+ CustomHeaders = "{\"customFoo\":\"1\"}",
+ Retries = 3,
+ Deadline = 123932,
+ BpmnProcessId = "process",
+ ElementId = "job1",
+ ElementInstanceKey = 23,
+ WorkflowInstanceKey = 29,
+ WorkflowDefinitionVersion = 3,
+ WorkflowKey = 21
+ });
+ }
+ }
+}
\ No newline at end of file
diff --git a/Client.UnitTests/JobPollerTest.cs b/Client.UnitTests/JobPollerTest.cs
new file mode 100644
index 00000000..40934c26
--- /dev/null
+++ b/Client.UnitTests/JobPollerTest.cs
@@ -0,0 +1,283 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using GatewayProtocol;
+using Microsoft.Extensions.Logging;
+using NLog;
+using NUnit.Framework;
+using Zeebe.Client.Api.Responses;
+
+namespace Zeebe.Client.Impl.Worker
+{
+ [TestFixture]
+ public class JobPollerTest : BaseZeebeTest
+ {
+ private ConcurrentQueue workItems;
+ private JobWorkerSignal jobWorkerSignal;
+ private JobPoller jobPoller;
+ private CancellationTokenSource tokenSource;
+
+ [SetUp]
+ public void SetupTest()
+ {
+ workItems = new ConcurrentQueue();
+ var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient);
+ jobWorkerSignal = new JobWorkerSignal();
+ jobWorkerBuilder
+ .JobType("foo")
+ .Handler((jobClient, job) => { })
+ .MaxJobsActive(3)
+ .Name("jobWorker")
+ .Timeout(TimeSpan.FromSeconds(123L))
+ .PollInterval(TimeSpan.FromMilliseconds(100))
+ .PollingTimeout(TimeSpan.FromSeconds(5L));
+ jobPoller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal);
+ tokenSource = new CancellationTokenSource();
+ }
+
+ [TearDown]
+ public async Task CleanUp()
+ {
+ tokenSource.Cancel();
+ // delay disposing, since poll and handler take some time to close
+ await Task.Delay(TimeSpan.FromMilliseconds(200))
+ .ContinueWith(t => { tokenSource.Dispose(); });
+
+ tokenSource = null;
+ jobWorkerSignal = null;
+ workItems = null;
+ jobPoller = null;
+ }
+
+ [Test]
+ public void ShouldSendRequests()
+ {
+ // given
+ var expectedRequest = new ActivateJobsRequest
+ {
+ Timeout = 123_000L,
+ MaxJobsToActivate = 3,
+ Type = "foo",
+ Worker = "jobWorker",
+ RequestTimeout = 5_000L
+ };
+
+ // when
+ SchedulePolling();
+
+ // then
+ var hasPolled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+ Assert.IsTrue(hasPolled);
+ var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0];
+ Assert.AreEqual(expectedRequest, actualRequest);
+ }
+
+ [Test]
+ public void ShouldSendRequestsImmediatelyAfterEmptyResponse()
+ {
+ // given
+ var expectedRequest = new ActivateJobsRequest
+ {
+ Timeout = 123_000L,
+ MaxJobsToActivate = 3,
+ Type = "foo",
+ Worker = "jobWorker",
+ RequestTimeout = 5_000L
+ };
+ SchedulePolling();
+
+ // when
+ var hasPolled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(5));
+ var hasPolledSecondTime = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(5));
+
+ // then
+ Assert.IsTrue(hasPolled);
+ Assert.IsTrue(hasPolledSecondTime);
+
+ Assert.GreaterOrEqual(TestService.Requests[typeof(ActivateJobsRequest)].Count, 2);
+
+ var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0];
+ Assert.AreEqual(expectedRequest, actualRequest);
+
+ actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][1];
+ Assert.AreEqual(expectedRequest, actualRequest);
+ }
+
+ [Test]
+ public void ShouldPutActivatedJobsIntoQueue()
+ {
+ // given
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse());
+
+ // when
+ SchedulePolling();
+
+ // then
+ var hasPolled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+ Assert.IsTrue(hasPolled);
+ Assert.AreEqual(workItems.Count, 3);
+ }
+
+ [Test]
+ public void ShouldNotPollNewJobsWhenQueueIsFull()
+ {
+ // given
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse());
+ SchedulePolling();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // when
+ jobWorkerSignal.SignalJobHandled();
+
+ // then
+ Assert.AreEqual(TestService.Requests[typeof(ActivateJobsRequest)].Count, 1);
+ }
+
+ [Test]
+ public void ShouldNotPollNewJobsWhenThresholdIsNotMet()
+ {
+ // given
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse());
+ SchedulePolling();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+ workItems.TryDequeue(out _);
+
+ // when
+ jobWorkerSignal.SignalJobHandled();
+
+ // then
+ Assert.AreEqual(TestService.Requests[typeof(ActivateJobsRequest)].Count, 1);
+ }
+
+ [Test]
+ public void ShouldPollNewJobsWhenThresholdIsMet()
+ {
+ // given
+ var expectedSecondRequest = new ActivateJobsRequest
+ {
+ Timeout = 123_000L,
+ MaxJobsToActivate = 2,
+ Type = "foo",
+ Worker = "jobWorker",
+ RequestTimeout = 5_000L
+ };
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse());
+ SchedulePolling();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+ workItems.TryDequeue(out _);
+ workItems.TryDequeue(out _);
+
+ // when
+ jobWorkerSignal.SignalJobHandled();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // then
+ Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count);
+ var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][1];
+ Assert.AreEqual(expectedSecondRequest, actualRequest);
+ }
+
+ [Test]
+ public void ShouldPollNewJobsAfterQueueIsCleared()
+ {
+ // given
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse());
+ SchedulePolling();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // when
+ workItems.Clear();
+ jobWorkerSignal.SignalJobHandled();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // then
+ Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count);
+ Assert.AreEqual(3, workItems.Count);
+ }
+
+ [Test]
+ public void ShouldPollNewJobsAfterQueueIsClearedAndPollIntervalIsDue()
+ {
+ // given
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse());
+ SchedulePolling();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // when
+ workItems.Clear();
+ jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // then
+ AwaitRequestCount(typeof(ActivateJobsRequest), 2);
+ Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count);
+ Assert.AreEqual(3, workItems.Count);
+ }
+
+ private void SchedulePolling()
+ {
+ Task.Run(() => jobPoller.Poll(tokenSource.Token), tokenSource.Token);
+ }
+
+ [Test]
+ public void ShouldTimeoutAndRetry()
+ {
+ // given
+ var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient);
+ jobWorkerBuilder
+ .JobType("foo")
+ .Handler((jobClient, job) => { })
+ .MaxJobsActive(3)
+ .Name("jobWorker")
+ .Timeout(TimeSpan.FromSeconds(123L))
+ .PollInterval(TimeSpan.FromMilliseconds(100))
+ // timeout will be + 10 seconds
+ .PollingTimeout(TimeSpan.FromMilliseconds(5L));
+ jobPoller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal);
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request =>
+ {
+ // doesn't send response back before timeout
+ jobWorkerSignal.AwaitJobHandling(TimeSpan.FromMinutes(1));
+ return JobWorkerTest.CreateExpectedResponse();
+ });
+ SchedulePolling();
+
+ // when
+ var polled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(15));
+
+ // then
+ Assert.IsFalse(polled);
+ Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count);
+ Assert.AreEqual(0, workItems.Count);
+ }
+
+ [Test]
+ public void ShouldImmediatelyRetryOnServerException()
+ {
+ // given
+ var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient);
+ jobWorkerBuilder
+ .JobType("foo")
+ .Handler((jobClient, job) => { })
+ .MaxJobsActive(3)
+ .Name("jobWorker")
+ .Timeout(TimeSpan.FromSeconds(123L))
+ .PollInterval(TimeSpan.FromMilliseconds(100))
+ .PollingTimeout(TimeSpan.FromMilliseconds(5L));
+ jobPoller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal);
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request =>
+ {
+ throw new Exception("Server dies.");
+ });
+ SchedulePolling();
+
+ // when
+ var polled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1));
+
+ // then
+ Assert.IsFalse(polled);
+ Assert.GreaterOrEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count);
+ Assert.AreEqual(0, workItems.Count);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Client.UnitTests/JobWorkerTest.cs b/Client.UnitTests/JobWorkerTest.cs
index 1bee77e1..c8b74a6b 100644
--- a/Client.UnitTests/JobWorkerTest.cs
+++ b/Client.UnitTests/JobWorkerTest.cs
@@ -14,6 +14,8 @@
// limitations under the License.
using System;
+using System.Collections;
+using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -79,6 +81,21 @@ public void ShouldSendRequestReceiveResponseAsExpected()
AssertJob(receivedJobs[2], 3);
}
+ [Test]
+ public void ShouldFailWithZeroThreadCount()
+ {
+ // expected
+ var aggregateException = Assert.Throws(
+ () =>
+ {
+ ZeebeClient.NewWorker()
+ .JobType("foo")
+ .Handler((jobClient, job) => { })
+ .HandlerThreads(0);
+ });
+ StringAssert.Contains("Expected an handler thread count larger then zero, but got 0.", aggregateException.Message);
+ }
+
[Test]
public void ShouldSendAsyncCompleteInHandler()
{
@@ -113,6 +130,7 @@ public void ShouldSendAsyncCompleteInHandler()
.Timeout(TimeSpan.FromSeconds(123L))
.PollInterval(TimeSpan.FromMilliseconds(100))
.PollingTimeout(TimeSpan.FromSeconds(5L))
+ .HandlerThreads(1)
.Open())
{
Assert.True(jobWorker.IsOpen());
@@ -131,6 +149,57 @@ public void ShouldSendAsyncCompleteInHandler()
AssertJob(completedJobs[2], 3);
}
+ [Test]
+ public void ShouldUseMultipleHandlerThreads()
+ {
+ // given
+ var expectedRequest = new ActivateJobsRequest
+ {
+ Timeout = 123_000L,
+ MaxJobsToActivate = 3,
+ Type = "foo",
+ Worker = "jobWorker",
+ RequestTimeout = 5_000L
+ };
+
+ TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse());
+
+ // when
+ var signal = new EventWaitHandle(false, EventResetMode.AutoReset);
+ var completedJobs = new ConcurrentDictionary();
+ using (var jobWorker = ZeebeClient.NewWorker()
+ .JobType("foo")
+ .Handler(async (jobClient, job) =>
+ {
+ await jobClient.NewCompleteJobCommand(job).Send();
+ completedJobs.TryAdd(job.Key, job);
+ if (completedJobs.Count == 3)
+ {
+ signal.Set();
+ }
+ })
+ .MaxJobsActive(3)
+ .Name("jobWorker")
+ .Timeout(TimeSpan.FromSeconds(123L))
+ .PollInterval(TimeSpan.FromMilliseconds(100))
+ .PollingTimeout(TimeSpan.FromSeconds(5L))
+ .HandlerThreads(3)
+ .Open())
+ {
+ Assert.True(jobWorker.IsOpen());
+ signal.WaitOne();
+ }
+
+ // then
+ var actualActivateRequest = TestService.Requests[typeof(ActivateJobsRequest)][0];
+ Assert.AreEqual(expectedRequest, actualActivateRequest);
+
+ var completeRequests = TestService.Requests[typeof(CompleteJobRequest)];
+ Assert.GreaterOrEqual(completeRequests.Count, 3);
+ Assert.GreaterOrEqual(completedJobs.Count, 3);
+ CollectionAssert.AreEquivalent(new List { 1, 2, 3 }, completedJobs.Keys);
+ }
+
[Test]
public void ShouldSendCompleteInHandler()
{
@@ -205,8 +274,7 @@ public void ShouldSendRequestsWithDifferentAmounts()
var expectedSecondRequest = new ActivateJobsRequest
{
Timeout = 123_000L,
- MaxJobsToActivate = 2, // first response contains 3 jobs and one is handled (blocking) so 2 jobs remain in queue
- // so we can try to activate 2 new jobs
+ MaxJobsToActivate = 2,
Type = "foo",
Worker = "jobWorker",
RequestTimeout = 5_000L
diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs
index a338a142..58427ac6 100644
--- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs
+++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs
@@ -228,6 +228,21 @@ public interface IJobWorkerBuilderStep3
/// the builder for this worker
IJobWorkerBuilderStep3 AutoCompletion();
+ ///
+ /// Specifies how many handler threads are used by this job worker.
+ ///
+ ///
+ ///
+ /// The previous defined job handler can be called by multiple threads, to execute more jobs concurrently.
+ /// Per default three job handler threads are used by an job worker.
+ /// This means the job handler implementation needs to be thread safe.
+ ///
+ ///
+ /// Note: Job polling is done by a separate thread.
+ /// handler thread count, needs to be larger then zero
+ /// the builder for this worker
+ IJobWorkerBuilderStep3 HandlerThreads(byte threadCount);
+
///
/// Open the worker and start to work on available tasks.
///
diff --git a/Client/Client.csproj b/Client/Client.csproj
index aa8594ef..940eff53 100644
--- a/Client/Client.csproj
+++ b/Client/Client.csproj
@@ -59,4 +59,15 @@ Fixes:
+
+
+ true
+
+
+
+
+ <_Parameter1>Client.UnitTests
+
+
+
diff --git a/Client/Impl/Commands/ActivateJobsCommand.cs b/Client/Impl/Commands/ActivateJobsCommand.cs
index fb2b70a8..dd9bec96 100644
--- a/Client/Impl/Commands/ActivateJobsCommand.cs
+++ b/Client/Impl/Commands/ActivateJobsCommand.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Zeebe.Client.Api.Commands;
@@ -10,60 +11,65 @@ namespace Zeebe.Client.Impl.Commands
{
internal class ActivateJobsCommand : IActivateJobsCommandStep1, IActivateJobsCommandStep2, IActivateJobsCommandStep3
{
- private readonly ActivateJobsRequest request;
private readonly JobActivator activator;
+ public ActivateJobsRequest Request { get; }
public ActivateJobsCommand(GatewayClient client)
{
activator = new JobActivator(client);
- request = new ActivateJobsRequest();
+ Request = new ActivateJobsRequest();
}
public IActivateJobsCommandStep2 JobType(string jobType)
{
- request.Type = jobType;
+ Request.Type = jobType;
return this;
}
public IActivateJobsCommandStep3 MaxJobsToActivate(int maxJobsToActivate)
{
- request.MaxJobsToActivate = maxJobsToActivate;
+ Request.MaxJobsToActivate = maxJobsToActivate;
return this;
}
public IActivateJobsCommandStep3 FetchVariables(IList fetchVariables)
{
- request.FetchVariable.AddRange(fetchVariables);
+ Request.FetchVariable.AddRange(fetchVariables);
return this;
}
public IActivateJobsCommandStep3 FetchVariables(params string[] fetchVariables)
{
- request.FetchVariable.AddRange(fetchVariables);
+ Request.FetchVariable.AddRange(fetchVariables);
return this;
}
public IActivateJobsCommandStep3 Timeout(TimeSpan timeout)
{
- request.Timeout = (long)timeout.TotalMilliseconds;
+ Request.Timeout = (long)timeout.TotalMilliseconds;
return this;
}
public IActivateJobsCommandStep3 PollingTimeout(TimeSpan pollingTimeout)
{
- request.RequestTimeout = (long)pollingTimeout.TotalMilliseconds;
+ Request.RequestTimeout = (long)pollingTimeout.TotalMilliseconds;
return this;
}
public IActivateJobsCommandStep3 WorkerName(string workerName)
{
- request.Worker = workerName;
+ Request.Worker = workerName;
return this;
}
public async Task Send(TimeSpan? timeout = null)
{
- return await activator.SendActivateRequest(request, timeout?.FromUtcNow());
+ return await Send(timeout, null);
+ }
+
+ public async Task Send(TimeSpan? timeout, CancellationToken? cancelationToken)
+ {
+ return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), cancelationToken);
}
}
}
diff --git a/Client/Impl/Responses/ActivatedJob.cs b/Client/Impl/Responses/ActivatedJob.cs
index 0a3225db..8ef4b295 100644
--- a/Client/Impl/Responses/ActivatedJob.cs
+++ b/Client/Impl/Responses/ActivatedJob.cs
@@ -72,7 +72,59 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob)
public override string ToString()
{
- return $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(WorkflowInstanceKey)}: {WorkflowInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(WorkflowDefinitionVersion)}: {WorkflowDefinitionVersion}, {nameof(WorkflowKey)}: {WorkflowKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}";
+ return
+ $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(WorkflowInstanceKey)}: {WorkflowInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(WorkflowDefinitionVersion)}: {WorkflowDefinitionVersion}, {nameof(WorkflowKey)}: {WorkflowKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}";
+ }
+
+ protected bool Equals(ActivatedJob other)
+ {
+ return Key == other.Key && Type == other.Type && WorkflowInstanceKey == other.WorkflowInstanceKey &&
+ BpmnProcessId == other.BpmnProcessId &&
+ WorkflowDefinitionVersion == other.WorkflowDefinitionVersion && WorkflowKey == other.WorkflowKey &&
+ ElementId == other.ElementId && ElementInstanceKey == other.ElementInstanceKey &&
+ Worker == other.Worker && Retries == other.Retries && Deadline.Equals(other.Deadline) &&
+ Variables == other.Variables && CustomHeaders == other.CustomHeaders;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj))
+ {
+ return false;
+ }
+
+ if (ReferenceEquals(this, obj))
+ {
+ return true;
+ }
+
+ if (obj.GetType() != this.GetType())
+ {
+ return false;
+ }
+
+ return Equals((ActivatedJob) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ var hashCode = Key.GetHashCode();
+ hashCode = (hashCode * 397) ^ (Type != null ? Type.GetHashCode() : 0);
+ hashCode = (hashCode * 397) ^ WorkflowInstanceKey.GetHashCode();
+ hashCode = (hashCode * 397) ^ (BpmnProcessId != null ? BpmnProcessId.GetHashCode() : 0);
+ hashCode = (hashCode * 397) ^ WorkflowDefinitionVersion;
+ hashCode = (hashCode * 397) ^ WorkflowKey.GetHashCode();
+ hashCode = (hashCode * 397) ^ (ElementId != null ? ElementId.GetHashCode() : 0);
+ hashCode = (hashCode * 397) ^ ElementInstanceKey.GetHashCode();
+ hashCode = (hashCode * 397) ^ (Worker != null ? Worker.GetHashCode() : 0);
+ hashCode = (hashCode * 397) ^ Retries;
+ hashCode = (hashCode * 397) ^ Deadline.GetHashCode();
+ hashCode = (hashCode * 397) ^ (Variables != null ? Variables.GetHashCode() : 0);
+ hashCode = (hashCode * 397) ^ (CustomHeaders != null ? CustomHeaders.GetHashCode() : 0);
+ return hashCode;
+ }
}
}
-}
+}
\ No newline at end of file
diff --git a/Client/Impl/Worker/JobClientWrapper.cs b/Client/Impl/Worker/JobClientWrapper.cs
new file mode 100644
index 00000000..e5ec55cb
--- /dev/null
+++ b/Client/Impl/Worker/JobClientWrapper.cs
@@ -0,0 +1,53 @@
+using Zeebe.Client.Api.Commands;
+using Zeebe.Client.Api.Responses;
+using Zeebe.Client.Api.Worker;
+
+namespace Zeebe.Client.Impl.Worker
+{
+ internal class JobClientWrapper : IJobClient
+ {
+ public static JobClientWrapper Wrap(IJobClient client)
+ {
+ return new JobClientWrapper(client);
+ }
+
+ public bool ClientWasUsed { get; private set; }
+
+ private IJobClient Client { get; }
+
+ private JobClientWrapper(IJobClient client)
+ {
+ Client = client;
+ ClientWasUsed = false;
+ }
+
+ public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey)
+ {
+ ClientWasUsed = true;
+ return Client.NewCompleteJobCommand(jobKey);
+ }
+
+ public IFailJobCommandStep1 NewFailCommand(long jobKey)
+ {
+ ClientWasUsed = true;
+ return Client.NewFailCommand(jobKey);
+ }
+
+ public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey)
+ {
+ ClientWasUsed = true;
+ return Client.NewThrowErrorCommand(jobKey);
+ }
+
+ public void Reset()
+ {
+ ClientWasUsed = false;
+ }
+
+ public ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob)
+ {
+ ClientWasUsed = true;
+ return Client.NewCompleteJobCommand(activatedJob);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Client/Impl/Worker/JobHandlerExecutor.cs b/Client/Impl/Worker/JobHandlerExecutor.cs
new file mode 100644
index 00000000..b372c845
--- /dev/null
+++ b/Client/Impl/Worker/JobHandlerExecutor.cs
@@ -0,0 +1,116 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Zeebe.Client.Api.Responses;
+using Zeebe.Client.Api.Worker;
+using Zeebe.Client.Impl.Commands;
+
+namespace Zeebe.Client.Impl.Worker
+{
+ internal class JobHandlerExecutor
+ {
+ private const string JobFailMessage =
+ "Job worker '{0}' tried to handle job of type '{1}', but exception occured '{2}'";
+
+ private readonly ConcurrentQueue workItems;
+ private readonly JobClientWrapper jobClient;
+ private readonly ILogger logger;
+ private readonly JobWorkerSignal jobWorkerSignal;
+ private readonly TimeSpan pollInterval;
+ private readonly AsyncJobHandler jobHandler;
+ private readonly bool autoCompletion;
+ private readonly ActivateJobsCommand activateJobsCommand;
+
+ public JobHandlerExecutor(JobWorkerBuilder builder,
+ ConcurrentQueue workItems,
+ JobWorkerSignal jobWorkerSignal)
+ {
+ this.jobClient = JobClientWrapper.Wrap(builder.JobClient);
+ this.workItems = workItems;
+ this.jobWorkerSignal = jobWorkerSignal;
+ this.activateJobsCommand = builder.Command;
+ this.pollInterval = builder.PollInterval();
+ this.jobHandler = builder.Handler();
+ this.autoCompletion = builder.AutoCompletionEnabled();
+ this.logger = builder.LoggerFactory?.CreateLogger();
+ }
+
+ public async Task HandleActivatedJobs(CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ if (!workItems.IsEmpty)
+ {
+ bool success = workItems.TryDequeue(out IJob activatedJob);
+
+ if (success)
+ {
+ await HandleActivatedJob(cancellationToken, activatedJob);
+ }
+
+ jobWorkerSignal.SignalJobHandled();
+ }
+ else
+ {
+ jobWorkerSignal.SignalJobHandled();
+ jobWorkerSignal.AwaitNewJobPolled(pollInterval);
+ }
+ }
+ }
+
+ private async Task HandleActivatedJob(CancellationToken cancellationToken, IJob activatedJob)
+ {
+ try
+ {
+ await jobHandler(jobClient, activatedJob);
+ await TryToAutoCompleteJob(activatedJob);
+ }
+ catch (Exception exception)
+ {
+ await FailActivatedJob(activatedJob, cancellationToken, exception);
+ }
+ finally
+ {
+ jobClient.Reset();
+ }
+ }
+
+ private async Task TryToAutoCompleteJob(IJob activatedJob)
+ {
+ if (!jobClient.ClientWasUsed && autoCompletion)
+ {
+ logger?.LogDebug(
+ "Job worker ({worker}) will auto complete job with key '{key}'",
+ activateJobsCommand.Request.Worker,
+ activatedJob.Key);
+ await jobClient.NewCompleteJobCommand(activatedJob)
+ .Send();
+ }
+ }
+
+ private Task FailActivatedJob(IJob activatedJob, CancellationToken cancellationToken, Exception exception)
+ {
+ var errorMessage = string.Format(
+ JobFailMessage,
+ activatedJob.Worker,
+ activatedJob.Type,
+ exception.Message);
+ logger?.LogError(exception, errorMessage);
+
+ return jobClient.NewFailCommand(activatedJob.Key)
+ .Retries(activatedJob.Retries - 1)
+ .ErrorMessage(errorMessage)
+ .Send()
+ .ContinueWith(
+ task =>
+ {
+ if (task.IsFaulted)
+ {
+ logger?.LogError("Problem on failing job occured.", task.Exception);
+ }
+ }, cancellationToken);
+ }
+ }
+}
\ No newline at end of file
diff --git a/Client/Impl/Worker/JobPoller.cs b/Client/Impl/Worker/JobPoller.cs
new file mode 100644
index 00000000..b217a442
--- /dev/null
+++ b/Client/Impl/Worker/JobPoller.cs
@@ -0,0 +1,92 @@
+using System;
+using System.Collections.Concurrent;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.Extensions.Logging;
+using Zeebe.Client.Api.Responses;
+using Zeebe.Client.Impl.Commands;
+
+namespace Zeebe.Client.Impl.Worker
+{
+ internal class JobPoller
+ {
+ private readonly ConcurrentQueue workItems;
+ private readonly int maxJobsActive;
+ private readonly ILogger logger;
+ private readonly JobWorkerSignal jobWorkerSignal;
+ private readonly TimeSpan pollInterval;
+ private readonly ActivateJobsCommand activateJobsCommand;
+ private int threshold;
+
+ public JobPoller(JobWorkerBuilder builder,
+ ConcurrentQueue workItems,
+ JobWorkerSignal jobWorkerSignal)
+ {
+ this.activateJobsCommand = builder.Command;
+ this.threshold = (int) Math.Ceiling(activateJobsCommand.Request.MaxJobsToActivate * 0.6f);
+ this.maxJobsActive = activateJobsCommand.Request.MaxJobsToActivate;
+ this.workItems = workItems;
+ this.pollInterval = builder.PollInterval();
+ this.logger = builder.LoggerFactory?.CreateLogger();
+ this.jobWorkerSignal = jobWorkerSignal;
+ }
+
+ internal async Task Poll(CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ while (workItems.Count < threshold)
+ {
+ try
+ {
+ await PollJobs(cancellationToken);
+ }
+ catch (RpcException rpcException)
+ {
+ LogRpcException(rpcException);
+ }
+ }
+
+ jobWorkerSignal.AwaitJobHandling(pollInterval);
+ }
+ }
+
+ private void LogRpcException(RpcException rpcException)
+ {
+ LogLevel logLevel;
+ switch (rpcException.StatusCode)
+ {
+ case StatusCode.DeadlineExceeded:
+ case StatusCode.Cancelled:
+ logLevel = LogLevel.Trace;
+ break;
+ default:
+ logLevel = LogLevel.Error;
+ break;
+ }
+
+ logger?.Log(logLevel, rpcException, "Unexpected RpcException on polling new jobs.");
+ }
+
+ private async Task PollJobs(CancellationToken cancellationToken)
+ {
+ var jobCount = maxJobsActive - workItems.Count;
+ activateJobsCommand.MaxJobsToActivate(jobCount);
+
+ var response = await activateJobsCommand.Send(null, cancellationToken);
+
+ logger?.LogDebug(
+ "Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.",
+ activateJobsCommand.Request.Worker,
+ response.Jobs.Count,
+ jobCount);
+ foreach (var job in response.Jobs)
+ {
+ workItems.Enqueue(job);
+ }
+
+ jobWorkerSignal.SignalJobPolled();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs
index 272ff534..30a8e063 100644
--- a/Client/Impl/Worker/JobWorker.cs
+++ b/Client/Impl/Worker/JobWorker.cs
@@ -20,7 +20,6 @@
using GatewayProtocol;
using Grpc.Core;
using Microsoft.Extensions.Logging;
-using Zeebe.Client.Api.Commands;
using Zeebe.Client.Api.Responses;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Commands;
@@ -29,45 +28,28 @@ namespace Zeebe.Client.Impl.Worker
{
public class JobWorker : IJobWorker
{
- private const string JobFailMessage =
- "Job worker '{0}' tried to handle job of type '{1}', but exception occured '{2}'";
-
- private readonly int maxJobsActive;
private readonly ConcurrentQueue workItems = new ConcurrentQueue();
-
- private readonly ActivateJobsRequest activeRequest;
- private readonly JobActivator activator;
- private readonly AsyncJobHandler jobHandler;
- private readonly JobClientWrapper jobClient;
- private readonly bool autoCompletion;
- private readonly TimeSpan pollInterval;
private readonly CancellationTokenSource source;
-
- private readonly EventWaitHandle handleSignal = new EventWaitHandle(false, EventResetMode.AutoReset);
- private readonly EventWaitHandle pollSignal = new EventWaitHandle(false, EventResetMode.AutoReset);
private readonly ILogger logger;
+ private readonly JobWorkerBuilder jobWorkerBuilder;
+
private volatile bool isRunning;
internal JobWorker(JobWorkerBuilder builder)
{
- source = new CancellationTokenSource();
- activator = new JobActivator(builder.Client);
- activeRequest = builder.Request;
- maxJobsActive = activeRequest.MaxJobsToActivate;
- pollInterval = builder.PollInterval();
- jobClient = JobClientWrapper.Wrap(builder.JobClient);
- jobHandler = builder.Handler();
- autoCompletion = builder.AutoCompletionEnabled();
- logger = builder.LoggerFactory?.CreateLogger();
+ this.jobWorkerBuilder = builder;
+ this.source = new CancellationTokenSource();
+ this.logger = builder.LoggerFactory?.CreateLogger();
}
///
public void Dispose()
{
source.Cancel();
+ var pollInterval = jobWorkerBuilder.PollInterval();
// delay disposing, since poll and handler take some time to close
Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2))
- .ContinueWith((t) =>
+ .ContinueWith(t =>
{
logger?.LogError("Dispose source");
source.Dispose();
@@ -95,201 +77,44 @@ internal void Open()
{
isRunning = true;
var cancellationToken = source.Token;
+ var jobWorkerSignal = new JobWorkerSignal();
+
+ StartPollerThread(jobWorkerSignal, cancellationToken);
+ StartHandlerThreads(jobWorkerSignal, cancellationToken);
+ var command = jobWorkerBuilder.Command;
+ logger?.LogDebug(
+ "Job worker ({worker}) for job type {type} has been opened.",
+ command.Request.Worker,
+ command.Request.Type);
+ }
+
+ private void StartPollerThread(JobWorkerSignal jobWorkerSignal, CancellationToken cancellationToken)
+ {
+ var poller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal);
Task.Run(
async () =>
- await Poll(cancellationToken)
+ await poller.Poll(cancellationToken)
.ContinueWith(
t => logger?.LogError(t.Exception, "Job polling failed."),
TaskContinuationOptions.OnlyOnFaulted), cancellationToken)
.ContinueWith(
t => logger?.LogError(t.Exception, "Job polling failed."),
TaskContinuationOptions.OnlyOnFaulted);
-
- Task.Run(async () => await HandleActivatedJobs(cancellationToken), cancellationToken)
- .ContinueWith(
- t => logger?.LogError(t.Exception, "Job handling failed."),
- TaskContinuationOptions.OnlyOnFaulted);
-
- logger?.LogDebug(
- "Job worker ({worker}) for job type {type} has been opened.",
- activeRequest.Worker,
- activeRequest.Type);
- }
-
- private async Task HandleActivatedJobs(CancellationToken cancellationToken)
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- if (!workItems.IsEmpty)
- {
- bool success = workItems.TryDequeue(out IJob activatedJob);
-
- if (success)
- {
- await HandleActivatedJob(cancellationToken, activatedJob);
- }
- else
- {
- pollSignal.Set();
- }
- }
- else
- {
- handleSignal.WaitOne(pollInterval);
- }
- }
- }
-
- private async Task HandleActivatedJob(CancellationToken cancellationToken, IJob activatedJob)
- {
- try
- {
- await jobHandler(jobClient, activatedJob);
- await TryToAutoCompleteJob(activatedJob);
- }
- catch (Exception exception)
- {
- await FailActivatedJob(activatedJob, cancellationToken, exception);
- }
- finally
- {
- jobClient.Reset();
- }
}
- private async Task TryToAutoCompleteJob(IJob activatedJob)
+ private void StartHandlerThreads(JobWorkerSignal jobWorkerSignal, CancellationToken cancellationToken)
{
- if (!jobClient.ClientWasUsed && autoCompletion)
+ var threadCount = jobWorkerBuilder.ThreadCount;
+ for (var i = 0; i < threadCount; i++)
{
- logger?.LogDebug(
- "Job worker ({worker}) will auto complete job with key '{key}'",
- activeRequest.Worker,
- activatedJob.Key);
- await jobClient.NewCompleteJobCommand(activatedJob)
- .Send();
- }
- }
-
- private Task FailActivatedJob(IJob activatedJob, CancellationToken cancellationToken, Exception exception)
- {
- var errorMessage = string.Format(
- JobFailMessage,
- activatedJob.Worker,
- activatedJob.Type,
- exception.Message);
- logger?.LogError(exception, errorMessage);
-
- return jobClient.NewFailCommand(activatedJob.Key)
- .Retries(activatedJob.Retries - 1)
- .ErrorMessage(errorMessage)
- .Send()
- .ContinueWith(
- task =>
- {
- if (task.IsFaulted)
- {
- logger?.LogError("Problem on failing job occured.", task.Exception);
- }
- }, cancellationToken);
- }
+ logger?.LogDebug("Start handler {index} thread", i);
- private async Task Poll(CancellationToken cancellationToken)
- {
- while (!cancellationToken.IsCancellationRequested)
- {
- if (workItems.Count < maxJobsActive)
- {
- try
- {
- await PollJobs(cancellationToken);
- }
- catch (RpcException rpcException)
- {
- LogLevel logLevel;
- switch (rpcException.StatusCode)
- {
- case StatusCode.DeadlineExceeded:
- case StatusCode.Cancelled:
- logLevel = LogLevel.Debug;
- break;
- default:
- logLevel = LogLevel.Error;
- break;
- }
-
- logger?.Log(logLevel, rpcException, "Unexpected RpcException on polling new jobs.");
- }
- }
-
- pollSignal.WaitOne(pollInterval);
- }
- }
-
- private async Task PollJobs(CancellationToken cancellationToken)
- {
- var jobCount = maxJobsActive - workItems.Count;
- activeRequest.MaxJobsToActivate = jobCount;
-
- var response = await activator.SendActivateRequest(activeRequest, null, cancellationToken);
-
- logger?.LogDebug(
- "Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.",
- activeRequest.Worker,
- response.Jobs.Count,
- jobCount);
- foreach (var job in response.Jobs)
- {
- workItems.Enqueue(job);
- }
-
- handleSignal.Set();
- }
-
- private class JobClientWrapper : IJobClient
- {
- public static JobClientWrapper Wrap(IJobClient client)
- {
- return new JobClientWrapper(client);
- }
-
- public bool ClientWasUsed { get; private set; }
-
- private IJobClient Client { get; }
-
- private JobClientWrapper(IJobClient client)
- {
- Client = client;
- ClientWasUsed = false;
- }
-
- public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey)
- {
- ClientWasUsed = true;
- return Client.NewCompleteJobCommand(jobKey);
- }
-
- public IFailJobCommandStep1 NewFailCommand(long jobKey)
- {
- ClientWasUsed = true;
- return Client.NewFailCommand(jobKey);
- }
-
- public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey)
- {
- ClientWasUsed = true;
- return Client.NewThrowErrorCommand(jobKey);
- }
-
- public void Reset()
- {
- ClientWasUsed = false;
- }
-
- public ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob)
- {
- ClientWasUsed = true;
- return Client.NewCompleteJobCommand(activatedJob);
+ var jobHandlerExecutor = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal);
+ Task.Run(async () => await jobHandlerExecutor.HandleActivatedJobs(cancellationToken), cancellationToken)
+ .ContinueWith(
+ t => logger?.LogError(t.Exception, "Job handling failed."),
+ TaskContinuationOptions.OnlyOnFaulted);
}
}
}
diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs
index 915a64e4..7fd0611f 100644
--- a/Client/Impl/Worker/JobWorkerBuilder.cs
+++ b/Client/Impl/Worker/JobWorkerBuilder.cs
@@ -15,11 +15,11 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics.Contracts;
using System.Threading.Tasks;
-using GatewayProtocol;
using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Logging.Abstractions;
using Zeebe.Client.Api.Worker;
+using Zeebe.Client.Impl.Commands;
namespace Zeebe.Client.Impl.Worker
{
@@ -28,25 +28,26 @@ public class JobWorkerBuilder : IJobWorkerBuilderStep1, IJobWorkerBuilderStep2,
private TimeSpan pollInterval;
private AsyncJobHandler handler;
private bool autoCompletion;
-
+ internal byte ThreadCount { get; set; }
internal ILoggerFactory LoggerFactory { get; }
- internal Gateway.GatewayClient Client { get; }
- internal ActivateJobsRequest Request { get; } = new ActivateJobsRequest();
+ internal ActivateJobsCommand Command { get; }
internal IJobClient JobClient { get; }
public JobWorkerBuilder(
- Gateway.GatewayClient client,
- IJobClient jobClient,
+ IZeebeClient zeebeClient,
ILoggerFactory loggerFactory = null)
{
LoggerFactory = loggerFactory;
- Client = client;
- JobClient = jobClient;
+ Command = (ActivateJobsCommand) zeebeClient.NewActivateJobsCommand();
+ JobClient = zeebeClient;
+ ThreadCount = 1;
+
+ zeebeClient.NewActivateJobsCommand();
}
public IJobWorkerBuilderStep2 JobType(string type)
{
- Request.Type = type;
+ Command.JobType(type);
return this;
}
@@ -69,31 +70,31 @@ internal AsyncJobHandler Handler()
public IJobWorkerBuilderStep3 Timeout(TimeSpan timeout)
{
- Request.Timeout = (long)timeout.TotalMilliseconds;
+ Command.Timeout(timeout);
return this;
}
public IJobWorkerBuilderStep3 Name(string workerName)
{
- Request.Worker = workerName;
+ Command.WorkerName(workerName);
return this;
}
public IJobWorkerBuilderStep3 MaxJobsActive(int maxJobsActive)
{
- Request.MaxJobsToActivate = maxJobsActive;
+ Command.MaxJobsToActivate(maxJobsActive);
return this;
}
public IJobWorkerBuilderStep3 FetchVariables(IList fetchVariables)
{
- Request.FetchVariable.AddRange(fetchVariables);
+ Command.FetchVariables(fetchVariables);
return this;
}
public IJobWorkerBuilderStep3 FetchVariables(params string[] fetchVariables)
{
- Request.FetchVariable.AddRange(fetchVariables);
+ Command.FetchVariables(fetchVariables);
return this;
}
@@ -110,7 +111,7 @@ internal TimeSpan PollInterval()
public IJobWorkerBuilderStep3 PollingTimeout(TimeSpan pollingTimeout)
{
- Request.RequestTimeout = (long)pollingTimeout.TotalMilliseconds;
+ Command.PollingTimeout(pollingTimeout);
return this;
}
@@ -120,6 +121,18 @@ public IJobWorkerBuilderStep3 AutoCompletion()
return this;
}
+ public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount)
+ {
+ if (threadCount <= 0)
+ {
+ var errorMsg = $"Expected an handler thread count larger then zero, but got {threadCount}.";
+ throw new ArgumentOutOfRangeException(errorMsg);
+ }
+
+ this.ThreadCount = threadCount;
+ return this;
+ }
+
internal bool AutoCompletionEnabled()
{
return autoCompletion;
diff --git a/Client/Impl/Worker/JobWorkerSignal.cs b/Client/Impl/Worker/JobWorkerSignal.cs
new file mode 100644
index 00000000..21081116
--- /dev/null
+++ b/Client/Impl/Worker/JobWorkerSignal.cs
@@ -0,0 +1,35 @@
+using System;
+using System.Threading;
+
+namespace Zeebe.Client.Impl.Worker
+{
+ public class JobWorkerSignal
+ {
+ private readonly EventWaitHandle handleSignal = new EventWaitHandle(false, EventResetMode.AutoReset);
+ private readonly EventWaitHandle pollSignal = new EventWaitHandle(false, EventResetMode.AutoReset);
+
+ internal JobWorkerSignal()
+ {
+ }
+
+ public bool AwaitJobHandling(TimeSpan timeSpan)
+ {
+ return pollSignal.WaitOne(timeSpan);
+ }
+
+ public bool AwaitNewJobPolled(TimeSpan timeSpan)
+ {
+ return handleSignal.WaitOne(timeSpan);
+ }
+
+ public void SignalJobHandled()
+ {
+ pollSignal.Set();
+ }
+
+ public void SignalJobPolled()
+ {
+ handleSignal.Set();
+ }
+ }
+}
\ No newline at end of file
diff --git a/Client/ZeebeClient.cs b/Client/ZeebeClient.cs
index cbfea2c5..4d1cb3e2 100644
--- a/Client/ZeebeClient.cs
+++ b/Client/ZeebeClient.cs
@@ -87,7 +87,7 @@ private void AddKeepAliveToChannelOptions(List channelOptions, Ti
public IJobWorkerBuilderStep1 NewWorker()
{
- return new JobWorkerBuilder(gatewayClient, this, loggerFactory);
+ return new JobWorkerBuilder(this, loggerFactory);
}
public IActivateJobsCommandStep1 NewActivateJobsCommand()