From ab2c670a47f9a9b12bc50a40d9fc313e8ade72a1 Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Tue, 16 Jun 2020 07:45:11 +0200 Subject: [PATCH] chore(client): rewrote job worker completely The worker now allows to handle jobs concurrently. Via a new configuration `HandlerThreads` you can set the thread count, which are used to handle activated jobs. Make sure your given Handler implementation is thread-safe. Per default the HandlerThread is set to 1, to be backwards compatible. Fixed some concurrency issues in activating and job handling, which caused that either for a longer time no jobs are polled or not handled or polled to fast. Poll interval are now respected better. It is rewritten in a way that it is now also better testable. Introducing an threshold activation mechanism, which means that new jobs are only activated after the working queue reaches a threshold of 0.6. **Example:** maxActivationCount = 32 threshold = Ceil(32 * 0.6) = 20 If the maxActivationCount is set to 32, then for first twelve jobs no new jobs are activated. After going under 20 jobs it will poll again for new jobs, based on the work queue (32 - work queue count). This allows to batch better job activation it will reduce the load on the gateway and broker and avoids doing to much requests for a single job activation. closes #133 closes #149 --- Client.UnitTests/BaseZeebeTest.cs | 21 +- Client.UnitTests/Client.UnitTests.csproj | 8 + Client.UnitTests/JobHandlerTest.cs | 173 ++++++++++++ Client.UnitTests/JobPollerTest.cs | 283 ++++++++++++++++++++ Client.UnitTests/JobWorkerTest.cs | 72 ++++- Client/Api/Worker/IJobWorkerBuilderStep1.cs | 15 ++ Client/Client.csproj | 11 + Client/Impl/Commands/ActivateJobsCommand.cs | 26 +- Client/Impl/Responses/ActivatedJob.cs | 56 +++- Client/Impl/Worker/JobClientWrapper.cs | 53 ++++ Client/Impl/Worker/JobHandlerExecutor.cs | 116 ++++++++ Client/Impl/Worker/JobPoller.cs | 92 +++++++ Client/Impl/Worker/JobWorker.cs | 237 +++------------- Client/Impl/Worker/JobWorkerBuilder.cs | 45 ++-- Client/Impl/Worker/JobWorkerSignal.cs | 35 +++ Client/ZeebeClient.cs | 2 +- 16 files changed, 1005 insertions(+), 240 deletions(-) create mode 100644 Client.UnitTests/JobHandlerTest.cs create mode 100644 Client.UnitTests/JobPollerTest.cs create mode 100644 Client/Impl/Worker/JobClientWrapper.cs create mode 100644 Client/Impl/Worker/JobHandlerExecutor.cs create mode 100644 Client/Impl/Worker/JobPoller.cs create mode 100644 Client/Impl/Worker/JobWorkerSignal.cs diff --git a/Client.UnitTests/BaseZeebeTest.cs b/Client.UnitTests/BaseZeebeTest.cs index 75d418f4..e2172b4a 100644 --- a/Client.UnitTests/BaseZeebeTest.cs +++ b/Client.UnitTests/BaseZeebeTest.cs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System; +using System.Threading; using System.Threading.Tasks; using GatewayProtocol; using Grpc.Core; @@ -28,8 +30,8 @@ public class BaseZeebeTest private IZeebeClient client; public Server Server => server; - public GatewayTestService TestService => testService; - public IZeebeClient ZeebeClient => client; + protected GatewayTestService TestService => testService; + protected IZeebeClient ZeebeClient => client; [SetUp] public void Init() @@ -43,7 +45,11 @@ public void Init() server.Services.Add(serviceDefinition); server.Start(); - client = Client.ZeebeClient.Builder().UseGatewayAddress("localhost:26500").UsePlainText().Build(); + client = Client.ZeebeClient + .Builder() + .UseGatewayAddress("localhost:26500") + .UsePlainText() + .Build(); } [TearDown] @@ -51,9 +57,18 @@ public void Stop() { client.Dispose(); server.ShutdownAsync().Wait(); + testService.Requests.Clear(); testService = null; server = null; client = null; } + + public void AwaitRequestCount(Type type, int requestCount) + { + while (TestService.Requests[type].Count < requestCount) + { + Thread.Sleep(TimeSpan.FromMilliseconds(100)); + } + } } } diff --git a/Client.UnitTests/Client.UnitTests.csproj b/Client.UnitTests/Client.UnitTests.csproj index 31ab19d7..e6e5f3c5 100644 --- a/Client.UnitTests/Client.UnitTests.csproj +++ b/Client.UnitTests/Client.UnitTests.csproj @@ -5,6 +5,14 @@ Zeebe.Client + + x64 + + + + x64 + + diff --git a/Client.UnitTests/JobHandlerTest.cs b/Client.UnitTests/JobHandlerTest.cs new file mode 100644 index 00000000..276a2902 --- /dev/null +++ b/Client.UnitTests/JobHandlerTest.cs @@ -0,0 +1,173 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using GatewayProtocol; +using Microsoft.Extensions.Logging; +using NLog; +using NUnit.Framework; +using Zeebe.Client.Api.Responses; + +namespace Zeebe.Client.Impl.Worker +{ + [TestFixture] + public class JobHandlerTest : BaseZeebeTest + { + private ConcurrentQueue workItems; + private ConcurrentQueue seenJobs; + private JobWorkerSignal jobWorkerSignal; + private JobHandlerExecutor jobHandler; + private CancellationTokenSource tokenSource; + + [SetUp] + public void SetupTest() + { + workItems = new ConcurrentQueue(); + seenJobs = new ConcurrentQueue(); + var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient); + jobWorkerSignal = new JobWorkerSignal(); + + jobWorkerBuilder + .JobType("foo") + .Handler((jobClient, job) => { seenJobs.Enqueue(job); }) + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123L)) + .PollInterval(TimeSpan.FromMilliseconds(100)) + .PollingTimeout(TimeSpan.FromSeconds(5L)); + jobHandler = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal); + tokenSource = new CancellationTokenSource(); + } + + [TearDown] + public async Task CleanUp() + { + tokenSource.Cancel(); + // delay disposing, since poll and handler take some time to close + await Task.Delay(TimeSpan.FromMilliseconds(200)) + .ContinueWith(t => { tokenSource.Dispose(); }); + + tokenSource = null; + jobHandler = null; + seenJobs = null; + workItems = null; + jobWorkerSignal = null; + } + + [Test] + public void ShouldHandleJob() + { + // given + var expectedJob = CreateActivatedJob(1); + workItems.Enqueue(CreateActivatedJob(1)); + + // when + ScheduleHandling(); + + // then + var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1)); + Assert.IsTrue(hasJobHandled); + AwaitJobsHaveSeen(1); + + Assert.AreEqual(1, seenJobs.Count); + Assert.IsTrue(seenJobs.TryDequeue(out var actualJob)); + Assert.AreEqual(expectedJob, actualJob); + } + + [Test] + public void ShouldTriggerJobHandling() + { + // given + var expectedJob = CreateActivatedJob(1); + ScheduleHandling(); + jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1)); + + // when + workItems.Enqueue(CreateActivatedJob(1)); + jobWorkerSignal.SignalJobPolled(); + + // then + var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1)); + Assert.IsTrue(hasJobHandled); + AwaitJobsHaveSeen(1); + + Assert.AreEqual(1, seenJobs.Count); + Assert.IsTrue(seenJobs.TryDequeue(out var actualJob)); + Assert.AreEqual(expectedJob, actualJob); + } + + [Test] + public void ShouldHandleJobsInOrder() + { + // given + workItems.Enqueue(CreateActivatedJob(1)); + workItems.Enqueue(CreateActivatedJob(2)); + workItems.Enqueue(CreateActivatedJob(3)); + + // when + ScheduleHandling(); + + // then + AwaitJobsHaveSeen(3); + + IJob actualJob; + Assert.IsTrue(seenJobs.TryDequeue(out actualJob)); + Assert.AreEqual(1, actualJob.Key); + Assert.IsTrue(seenJobs.TryDequeue(out actualJob)); + Assert.AreEqual(2, actualJob.Key); + Assert.IsTrue(seenJobs.TryDequeue(out actualJob)); + Assert.AreEqual(3, actualJob.Key); + } + + [Test] + public void ShouldNotHandleDuplicateOnConcurrentHandlers() + { + // given + workItems.Enqueue(CreateActivatedJob(1)); + workItems.Enqueue(CreateActivatedJob(2)); + workItems.Enqueue(CreateActivatedJob(3)); + + // when + ScheduleHandling(); + ScheduleHandling(); + + // then + AwaitJobsHaveSeen(3); + CollectionAssert.AllItemsAreUnique(seenJobs); + } + + private async void AwaitJobsHaveSeen(int expectedCount) + { + while (!tokenSource.IsCancellationRequested && seenJobs.Count < expectedCount) + { + await Task.Delay(25); + } + } + + private void ScheduleHandling() + { + Task.Run(() => jobHandler.HandleActivatedJobs(tokenSource.Token), tokenSource.Token); + } + + private static Responses.ActivatedJob CreateActivatedJob(long key) + { + return new Responses.ActivatedJob(new ActivatedJob + { + Key = key, + Worker = "jobWorker", + Type = "foo", + Variables = "{\"foo\":1}", + CustomHeaders = "{\"customFoo\":\"1\"}", + Retries = 3, + Deadline = 123932, + BpmnProcessId = "process", + ElementId = "job1", + ElementInstanceKey = 23, + WorkflowInstanceKey = 29, + WorkflowDefinitionVersion = 3, + WorkflowKey = 21 + }); + } + } +} \ No newline at end of file diff --git a/Client.UnitTests/JobPollerTest.cs b/Client.UnitTests/JobPollerTest.cs new file mode 100644 index 00000000..40934c26 --- /dev/null +++ b/Client.UnitTests/JobPollerTest.cs @@ -0,0 +1,283 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using GatewayProtocol; +using Microsoft.Extensions.Logging; +using NLog; +using NUnit.Framework; +using Zeebe.Client.Api.Responses; + +namespace Zeebe.Client.Impl.Worker +{ + [TestFixture] + public class JobPollerTest : BaseZeebeTest + { + private ConcurrentQueue workItems; + private JobWorkerSignal jobWorkerSignal; + private JobPoller jobPoller; + private CancellationTokenSource tokenSource; + + [SetUp] + public void SetupTest() + { + workItems = new ConcurrentQueue(); + var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient); + jobWorkerSignal = new JobWorkerSignal(); + jobWorkerBuilder + .JobType("foo") + .Handler((jobClient, job) => { }) + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123L)) + .PollInterval(TimeSpan.FromMilliseconds(100)) + .PollingTimeout(TimeSpan.FromSeconds(5L)); + jobPoller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal); + tokenSource = new CancellationTokenSource(); + } + + [TearDown] + public async Task CleanUp() + { + tokenSource.Cancel(); + // delay disposing, since poll and handler take some time to close + await Task.Delay(TimeSpan.FromMilliseconds(200)) + .ContinueWith(t => { tokenSource.Dispose(); }); + + tokenSource = null; + jobWorkerSignal = null; + workItems = null; + jobPoller = null; + } + + [Test] + public void ShouldSendRequests() + { + // given + var expectedRequest = new ActivateJobsRequest + { + Timeout = 123_000L, + MaxJobsToActivate = 3, + Type = "foo", + Worker = "jobWorker", + RequestTimeout = 5_000L + }; + + // when + SchedulePolling(); + + // then + var hasPolled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + Assert.IsTrue(hasPolled); + var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0]; + Assert.AreEqual(expectedRequest, actualRequest); + } + + [Test] + public void ShouldSendRequestsImmediatelyAfterEmptyResponse() + { + // given + var expectedRequest = new ActivateJobsRequest + { + Timeout = 123_000L, + MaxJobsToActivate = 3, + Type = "foo", + Worker = "jobWorker", + RequestTimeout = 5_000L + }; + SchedulePolling(); + + // when + var hasPolled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(5)); + var hasPolledSecondTime = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(5)); + + // then + Assert.IsTrue(hasPolled); + Assert.IsTrue(hasPolledSecondTime); + + Assert.GreaterOrEqual(TestService.Requests[typeof(ActivateJobsRequest)].Count, 2); + + var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][0]; + Assert.AreEqual(expectedRequest, actualRequest); + + actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][1]; + Assert.AreEqual(expectedRequest, actualRequest); + } + + [Test] + public void ShouldPutActivatedJobsIntoQueue() + { + // given + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse()); + + // when + SchedulePolling(); + + // then + var hasPolled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + Assert.IsTrue(hasPolled); + Assert.AreEqual(workItems.Count, 3); + } + + [Test] + public void ShouldNotPollNewJobsWhenQueueIsFull() + { + // given + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse()); + SchedulePolling(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // when + jobWorkerSignal.SignalJobHandled(); + + // then + Assert.AreEqual(TestService.Requests[typeof(ActivateJobsRequest)].Count, 1); + } + + [Test] + public void ShouldNotPollNewJobsWhenThresholdIsNotMet() + { + // given + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse()); + SchedulePolling(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + workItems.TryDequeue(out _); + + // when + jobWorkerSignal.SignalJobHandled(); + + // then + Assert.AreEqual(TestService.Requests[typeof(ActivateJobsRequest)].Count, 1); + } + + [Test] + public void ShouldPollNewJobsWhenThresholdIsMet() + { + // given + var expectedSecondRequest = new ActivateJobsRequest + { + Timeout = 123_000L, + MaxJobsToActivate = 2, + Type = "foo", + Worker = "jobWorker", + RequestTimeout = 5_000L + }; + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse()); + SchedulePolling(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + workItems.TryDequeue(out _); + workItems.TryDequeue(out _); + + // when + jobWorkerSignal.SignalJobHandled(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // then + Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count); + var actualRequest = TestService.Requests[typeof(ActivateJobsRequest)][1]; + Assert.AreEqual(expectedSecondRequest, actualRequest); + } + + [Test] + public void ShouldPollNewJobsAfterQueueIsCleared() + { + // given + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse()); + SchedulePolling(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // when + workItems.Clear(); + jobWorkerSignal.SignalJobHandled(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // then + Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count); + Assert.AreEqual(3, workItems.Count); + } + + [Test] + public void ShouldPollNewJobsAfterQueueIsClearedAndPollIntervalIsDue() + { + // given + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => JobWorkerTest.CreateExpectedResponse()); + SchedulePolling(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // when + workItems.Clear(); + jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // then + AwaitRequestCount(typeof(ActivateJobsRequest), 2); + Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count); + Assert.AreEqual(3, workItems.Count); + } + + private void SchedulePolling() + { + Task.Run(() => jobPoller.Poll(tokenSource.Token), tokenSource.Token); + } + + [Test] + public void ShouldTimeoutAndRetry() + { + // given + var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient); + jobWorkerBuilder + .JobType("foo") + .Handler((jobClient, job) => { }) + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123L)) + .PollInterval(TimeSpan.FromMilliseconds(100)) + // timeout will be + 10 seconds + .PollingTimeout(TimeSpan.FromMilliseconds(5L)); + jobPoller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => + { + // doesn't send response back before timeout + jobWorkerSignal.AwaitJobHandling(TimeSpan.FromMinutes(1)); + return JobWorkerTest.CreateExpectedResponse(); + }); + SchedulePolling(); + + // when + var polled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(15)); + + // then + Assert.IsFalse(polled); + Assert.AreEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count); + Assert.AreEqual(0, workItems.Count); + } + + [Test] + public void ShouldImmediatelyRetryOnServerException() + { + // given + var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient); + jobWorkerBuilder + .JobType("foo") + .Handler((jobClient, job) => { }) + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123L)) + .PollInterval(TimeSpan.FromMilliseconds(100)) + .PollingTimeout(TimeSpan.FromMilliseconds(5L)); + jobPoller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal); + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => + { + throw new Exception("Server dies."); + }); + SchedulePolling(); + + // when + var polled = jobWorkerSignal.AwaitNewJobPolled(TimeSpan.FromSeconds(1)); + + // then + Assert.IsFalse(polled); + Assert.GreaterOrEqual(2, TestService.Requests[typeof(ActivateJobsRequest)].Count); + Assert.AreEqual(0, workItems.Count); + } + } +} \ No newline at end of file diff --git a/Client.UnitTests/JobWorkerTest.cs b/Client.UnitTests/JobWorkerTest.cs index 1bee77e1..c8b74a6b 100644 --- a/Client.UnitTests/JobWorkerTest.cs +++ b/Client.UnitTests/JobWorkerTest.cs @@ -14,6 +14,8 @@ // limitations under the License. using System; +using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -79,6 +81,21 @@ public void ShouldSendRequestReceiveResponseAsExpected() AssertJob(receivedJobs[2], 3); } + [Test] + public void ShouldFailWithZeroThreadCount() + { + // expected + var aggregateException = Assert.Throws( + () => + { + ZeebeClient.NewWorker() + .JobType("foo") + .Handler((jobClient, job) => { }) + .HandlerThreads(0); + }); + StringAssert.Contains("Expected an handler thread count larger then zero, but got 0.", aggregateException.Message); + } + [Test] public void ShouldSendAsyncCompleteInHandler() { @@ -113,6 +130,7 @@ public void ShouldSendAsyncCompleteInHandler() .Timeout(TimeSpan.FromSeconds(123L)) .PollInterval(TimeSpan.FromMilliseconds(100)) .PollingTimeout(TimeSpan.FromSeconds(5L)) + .HandlerThreads(1) .Open()) { Assert.True(jobWorker.IsOpen()); @@ -131,6 +149,57 @@ public void ShouldSendAsyncCompleteInHandler() AssertJob(completedJobs[2], 3); } + [Test] + public void ShouldUseMultipleHandlerThreads() + { + // given + var expectedRequest = new ActivateJobsRequest + { + Timeout = 123_000L, + MaxJobsToActivate = 3, + Type = "foo", + Worker = "jobWorker", + RequestTimeout = 5_000L + }; + + TestService.AddRequestHandler(typeof(ActivateJobsRequest), request => CreateExpectedResponse()); + + // when + var signal = new EventWaitHandle(false, EventResetMode.AutoReset); + var completedJobs = new ConcurrentDictionary(); + using (var jobWorker = ZeebeClient.NewWorker() + .JobType("foo") + .Handler(async (jobClient, job) => + { + await jobClient.NewCompleteJobCommand(job).Send(); + completedJobs.TryAdd(job.Key, job); + if (completedJobs.Count == 3) + { + signal.Set(); + } + }) + .MaxJobsActive(3) + .Name("jobWorker") + .Timeout(TimeSpan.FromSeconds(123L)) + .PollInterval(TimeSpan.FromMilliseconds(100)) + .PollingTimeout(TimeSpan.FromSeconds(5L)) + .HandlerThreads(3) + .Open()) + { + Assert.True(jobWorker.IsOpen()); + signal.WaitOne(); + } + + // then + var actualActivateRequest = TestService.Requests[typeof(ActivateJobsRequest)][0]; + Assert.AreEqual(expectedRequest, actualActivateRequest); + + var completeRequests = TestService.Requests[typeof(CompleteJobRequest)]; + Assert.GreaterOrEqual(completeRequests.Count, 3); + Assert.GreaterOrEqual(completedJobs.Count, 3); + CollectionAssert.AreEquivalent(new List { 1, 2, 3 }, completedJobs.Keys); + } + [Test] public void ShouldSendCompleteInHandler() { @@ -205,8 +274,7 @@ public void ShouldSendRequestsWithDifferentAmounts() var expectedSecondRequest = new ActivateJobsRequest { Timeout = 123_000L, - MaxJobsToActivate = 2, // first response contains 3 jobs and one is handled (blocking) so 2 jobs remain in queue - // so we can try to activate 2 new jobs + MaxJobsToActivate = 2, Type = "foo", Worker = "jobWorker", RequestTimeout = 5_000L diff --git a/Client/Api/Worker/IJobWorkerBuilderStep1.cs b/Client/Api/Worker/IJobWorkerBuilderStep1.cs index a338a142..58427ac6 100644 --- a/Client/Api/Worker/IJobWorkerBuilderStep1.cs +++ b/Client/Api/Worker/IJobWorkerBuilderStep1.cs @@ -228,6 +228,21 @@ public interface IJobWorkerBuilderStep3 /// the builder for this worker IJobWorkerBuilderStep3 AutoCompletion(); + /// + /// Specifies how many handler threads are used by this job worker. + /// + /// + /// + /// The previous defined job handler can be called by multiple threads, to execute more jobs concurrently. + /// Per default three job handler threads are used by an job worker. + /// This means the job handler implementation needs to be thread safe. + /// + /// + /// Note: Job polling is done by a separate thread. + /// handler thread count, needs to be larger then zero + /// the builder for this worker + IJobWorkerBuilderStep3 HandlerThreads(byte threadCount); + /// /// Open the worker and start to work on available tasks. /// diff --git a/Client/Client.csproj b/Client/Client.csproj index 7cc4eae4..37b10f3e 100644 --- a/Client/Client.csproj +++ b/Client/Client.csproj @@ -59,4 +59,15 @@ Fixes: + + + true + + + + + <_Parameter1>Client.UnitTests + + + diff --git a/Client/Impl/Commands/ActivateJobsCommand.cs b/Client/Impl/Commands/ActivateJobsCommand.cs index fb2b70a8..dd9bec96 100644 --- a/Client/Impl/Commands/ActivateJobsCommand.cs +++ b/Client/Impl/Commands/ActivateJobsCommand.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using GatewayProtocol; using Zeebe.Client.Api.Commands; @@ -10,60 +11,65 @@ namespace Zeebe.Client.Impl.Commands { internal class ActivateJobsCommand : IActivateJobsCommandStep1, IActivateJobsCommandStep2, IActivateJobsCommandStep3 { - private readonly ActivateJobsRequest request; private readonly JobActivator activator; + public ActivateJobsRequest Request { get; } public ActivateJobsCommand(GatewayClient client) { activator = new JobActivator(client); - request = new ActivateJobsRequest(); + Request = new ActivateJobsRequest(); } public IActivateJobsCommandStep2 JobType(string jobType) { - request.Type = jobType; + Request.Type = jobType; return this; } public IActivateJobsCommandStep3 MaxJobsToActivate(int maxJobsToActivate) { - request.MaxJobsToActivate = maxJobsToActivate; + Request.MaxJobsToActivate = maxJobsToActivate; return this; } public IActivateJobsCommandStep3 FetchVariables(IList fetchVariables) { - request.FetchVariable.AddRange(fetchVariables); + Request.FetchVariable.AddRange(fetchVariables); return this; } public IActivateJobsCommandStep3 FetchVariables(params string[] fetchVariables) { - request.FetchVariable.AddRange(fetchVariables); + Request.FetchVariable.AddRange(fetchVariables); return this; } public IActivateJobsCommandStep3 Timeout(TimeSpan timeout) { - request.Timeout = (long)timeout.TotalMilliseconds; + Request.Timeout = (long)timeout.TotalMilliseconds; return this; } public IActivateJobsCommandStep3 PollingTimeout(TimeSpan pollingTimeout) { - request.RequestTimeout = (long)pollingTimeout.TotalMilliseconds; + Request.RequestTimeout = (long)pollingTimeout.TotalMilliseconds; return this; } public IActivateJobsCommandStep3 WorkerName(string workerName) { - request.Worker = workerName; + Request.Worker = workerName; return this; } public async Task Send(TimeSpan? timeout = null) { - return await activator.SendActivateRequest(request, timeout?.FromUtcNow()); + return await Send(timeout, null); + } + + public async Task Send(TimeSpan? timeout, CancellationToken? cancelationToken) + { + return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), cancelationToken); } } } diff --git a/Client/Impl/Responses/ActivatedJob.cs b/Client/Impl/Responses/ActivatedJob.cs index 0a3225db..8ef4b295 100644 --- a/Client/Impl/Responses/ActivatedJob.cs +++ b/Client/Impl/Responses/ActivatedJob.cs @@ -72,7 +72,59 @@ public ActivatedJob(GatewayProtocol.ActivatedJob activatedJob) public override string ToString() { - return $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(WorkflowInstanceKey)}: {WorkflowInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(WorkflowDefinitionVersion)}: {WorkflowDefinitionVersion}, {nameof(WorkflowKey)}: {WorkflowKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}"; + return + $"{nameof(Key)}: {Key}, {nameof(Type)}: {Type}, {nameof(WorkflowInstanceKey)}: {WorkflowInstanceKey}, {nameof(BpmnProcessId)}: {BpmnProcessId}, {nameof(WorkflowDefinitionVersion)}: {WorkflowDefinitionVersion}, {nameof(WorkflowKey)}: {WorkflowKey}, {nameof(ElementId)}: {ElementId}, {nameof(ElementInstanceKey)}: {ElementInstanceKey}, {nameof(Worker)}: {Worker}, {nameof(Retries)}: {Retries}, {nameof(Deadline)}: {Deadline}, {nameof(Variables)}: {Variables}, {nameof(CustomHeaders)}: {CustomHeaders}"; + } + + protected bool Equals(ActivatedJob other) + { + return Key == other.Key && Type == other.Type && WorkflowInstanceKey == other.WorkflowInstanceKey && + BpmnProcessId == other.BpmnProcessId && + WorkflowDefinitionVersion == other.WorkflowDefinitionVersion && WorkflowKey == other.WorkflowKey && + ElementId == other.ElementId && ElementInstanceKey == other.ElementInstanceKey && + Worker == other.Worker && Retries == other.Retries && Deadline.Equals(other.Deadline) && + Variables == other.Variables && CustomHeaders == other.CustomHeaders; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != this.GetType()) + { + return false; + } + + return Equals((ActivatedJob) obj); + } + + public override int GetHashCode() + { + unchecked + { + var hashCode = Key.GetHashCode(); + hashCode = (hashCode * 397) ^ (Type != null ? Type.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ WorkflowInstanceKey.GetHashCode(); + hashCode = (hashCode * 397) ^ (BpmnProcessId != null ? BpmnProcessId.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ WorkflowDefinitionVersion; + hashCode = (hashCode * 397) ^ WorkflowKey.GetHashCode(); + hashCode = (hashCode * 397) ^ (ElementId != null ? ElementId.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ ElementInstanceKey.GetHashCode(); + hashCode = (hashCode * 397) ^ (Worker != null ? Worker.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ Retries; + hashCode = (hashCode * 397) ^ Deadline.GetHashCode(); + hashCode = (hashCode * 397) ^ (Variables != null ? Variables.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ (CustomHeaders != null ? CustomHeaders.GetHashCode() : 0); + return hashCode; + } } } -} +} \ No newline at end of file diff --git a/Client/Impl/Worker/JobClientWrapper.cs b/Client/Impl/Worker/JobClientWrapper.cs new file mode 100644 index 00000000..e5ec55cb --- /dev/null +++ b/Client/Impl/Worker/JobClientWrapper.cs @@ -0,0 +1,53 @@ +using Zeebe.Client.Api.Commands; +using Zeebe.Client.Api.Responses; +using Zeebe.Client.Api.Worker; + +namespace Zeebe.Client.Impl.Worker +{ + internal class JobClientWrapper : IJobClient + { + public static JobClientWrapper Wrap(IJobClient client) + { + return new JobClientWrapper(client); + } + + public bool ClientWasUsed { get; private set; } + + private IJobClient Client { get; } + + private JobClientWrapper(IJobClient client) + { + Client = client; + ClientWasUsed = false; + } + + public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey) + { + ClientWasUsed = true; + return Client.NewCompleteJobCommand(jobKey); + } + + public IFailJobCommandStep1 NewFailCommand(long jobKey) + { + ClientWasUsed = true; + return Client.NewFailCommand(jobKey); + } + + public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey) + { + ClientWasUsed = true; + return Client.NewThrowErrorCommand(jobKey); + } + + public void Reset() + { + ClientWasUsed = false; + } + + public ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob) + { + ClientWasUsed = true; + return Client.NewCompleteJobCommand(activatedJob); + } + } +} \ No newline at end of file diff --git a/Client/Impl/Worker/JobHandlerExecutor.cs b/Client/Impl/Worker/JobHandlerExecutor.cs new file mode 100644 index 00000000..b372c845 --- /dev/null +++ b/Client/Impl/Worker/JobHandlerExecutor.cs @@ -0,0 +1,116 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Zeebe.Client.Api.Responses; +using Zeebe.Client.Api.Worker; +using Zeebe.Client.Impl.Commands; + +namespace Zeebe.Client.Impl.Worker +{ + internal class JobHandlerExecutor + { + private const string JobFailMessage = + "Job worker '{0}' tried to handle job of type '{1}', but exception occured '{2}'"; + + private readonly ConcurrentQueue workItems; + private readonly JobClientWrapper jobClient; + private readonly ILogger logger; + private readonly JobWorkerSignal jobWorkerSignal; + private readonly TimeSpan pollInterval; + private readonly AsyncJobHandler jobHandler; + private readonly bool autoCompletion; + private readonly ActivateJobsCommand activateJobsCommand; + + public JobHandlerExecutor(JobWorkerBuilder builder, + ConcurrentQueue workItems, + JobWorkerSignal jobWorkerSignal) + { + this.jobClient = JobClientWrapper.Wrap(builder.JobClient); + this.workItems = workItems; + this.jobWorkerSignal = jobWorkerSignal; + this.activateJobsCommand = builder.Command; + this.pollInterval = builder.PollInterval(); + this.jobHandler = builder.Handler(); + this.autoCompletion = builder.AutoCompletionEnabled(); + this.logger = builder.LoggerFactory?.CreateLogger(); + } + + public async Task HandleActivatedJobs(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!workItems.IsEmpty) + { + bool success = workItems.TryDequeue(out IJob activatedJob); + + if (success) + { + await HandleActivatedJob(cancellationToken, activatedJob); + } + + jobWorkerSignal.SignalJobHandled(); + } + else + { + jobWorkerSignal.SignalJobHandled(); + jobWorkerSignal.AwaitNewJobPolled(pollInterval); + } + } + } + + private async Task HandleActivatedJob(CancellationToken cancellationToken, IJob activatedJob) + { + try + { + await jobHandler(jobClient, activatedJob); + await TryToAutoCompleteJob(activatedJob); + } + catch (Exception exception) + { + await FailActivatedJob(activatedJob, cancellationToken, exception); + } + finally + { + jobClient.Reset(); + } + } + + private async Task TryToAutoCompleteJob(IJob activatedJob) + { + if (!jobClient.ClientWasUsed && autoCompletion) + { + logger?.LogDebug( + "Job worker ({worker}) will auto complete job with key '{key}'", + activateJobsCommand.Request.Worker, + activatedJob.Key); + await jobClient.NewCompleteJobCommand(activatedJob) + .Send(); + } + } + + private Task FailActivatedJob(IJob activatedJob, CancellationToken cancellationToken, Exception exception) + { + var errorMessage = string.Format( + JobFailMessage, + activatedJob.Worker, + activatedJob.Type, + exception.Message); + logger?.LogError(exception, errorMessage); + + return jobClient.NewFailCommand(activatedJob.Key) + .Retries(activatedJob.Retries - 1) + .ErrorMessage(errorMessage) + .Send() + .ContinueWith( + task => + { + if (task.IsFaulted) + { + logger?.LogError("Problem on failing job occured.", task.Exception); + } + }, cancellationToken); + } + } +} \ No newline at end of file diff --git a/Client/Impl/Worker/JobPoller.cs b/Client/Impl/Worker/JobPoller.cs new file mode 100644 index 00000000..b217a442 --- /dev/null +++ b/Client/Impl/Worker/JobPoller.cs @@ -0,0 +1,92 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Zeebe.Client.Api.Responses; +using Zeebe.Client.Impl.Commands; + +namespace Zeebe.Client.Impl.Worker +{ + internal class JobPoller + { + private readonly ConcurrentQueue workItems; + private readonly int maxJobsActive; + private readonly ILogger logger; + private readonly JobWorkerSignal jobWorkerSignal; + private readonly TimeSpan pollInterval; + private readonly ActivateJobsCommand activateJobsCommand; + private int threshold; + + public JobPoller(JobWorkerBuilder builder, + ConcurrentQueue workItems, + JobWorkerSignal jobWorkerSignal) + { + this.activateJobsCommand = builder.Command; + this.threshold = (int) Math.Ceiling(activateJobsCommand.Request.MaxJobsToActivate * 0.6f); + this.maxJobsActive = activateJobsCommand.Request.MaxJobsToActivate; + this.workItems = workItems; + this.pollInterval = builder.PollInterval(); + this.logger = builder.LoggerFactory?.CreateLogger(); + this.jobWorkerSignal = jobWorkerSignal; + } + + internal async Task Poll(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + while (workItems.Count < threshold) + { + try + { + await PollJobs(cancellationToken); + } + catch (RpcException rpcException) + { + LogRpcException(rpcException); + } + } + + jobWorkerSignal.AwaitJobHandling(pollInterval); + } + } + + private void LogRpcException(RpcException rpcException) + { + LogLevel logLevel; + switch (rpcException.StatusCode) + { + case StatusCode.DeadlineExceeded: + case StatusCode.Cancelled: + logLevel = LogLevel.Trace; + break; + default: + logLevel = LogLevel.Error; + break; + } + + logger?.Log(logLevel, rpcException, "Unexpected RpcException on polling new jobs."); + } + + private async Task PollJobs(CancellationToken cancellationToken) + { + var jobCount = maxJobsActive - workItems.Count; + activateJobsCommand.MaxJobsToActivate(jobCount); + + var response = await activateJobsCommand.Send(null, cancellationToken); + + logger?.LogDebug( + "Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.", + activateJobsCommand.Request.Worker, + response.Jobs.Count, + jobCount); + foreach (var job in response.Jobs) + { + workItems.Enqueue(job); + } + + jobWorkerSignal.SignalJobPolled(); + } + } +} \ No newline at end of file diff --git a/Client/Impl/Worker/JobWorker.cs b/Client/Impl/Worker/JobWorker.cs index 272ff534..30a8e063 100644 --- a/Client/Impl/Worker/JobWorker.cs +++ b/Client/Impl/Worker/JobWorker.cs @@ -20,7 +20,6 @@ using GatewayProtocol; using Grpc.Core; using Microsoft.Extensions.Logging; -using Zeebe.Client.Api.Commands; using Zeebe.Client.Api.Responses; using Zeebe.Client.Api.Worker; using Zeebe.Client.Impl.Commands; @@ -29,45 +28,28 @@ namespace Zeebe.Client.Impl.Worker { public class JobWorker : IJobWorker { - private const string JobFailMessage = - "Job worker '{0}' tried to handle job of type '{1}', but exception occured '{2}'"; - - private readonly int maxJobsActive; private readonly ConcurrentQueue workItems = new ConcurrentQueue(); - - private readonly ActivateJobsRequest activeRequest; - private readonly JobActivator activator; - private readonly AsyncJobHandler jobHandler; - private readonly JobClientWrapper jobClient; - private readonly bool autoCompletion; - private readonly TimeSpan pollInterval; private readonly CancellationTokenSource source; - - private readonly EventWaitHandle handleSignal = new EventWaitHandle(false, EventResetMode.AutoReset); - private readonly EventWaitHandle pollSignal = new EventWaitHandle(false, EventResetMode.AutoReset); private readonly ILogger logger; + private readonly JobWorkerBuilder jobWorkerBuilder; + private volatile bool isRunning; internal JobWorker(JobWorkerBuilder builder) { - source = new CancellationTokenSource(); - activator = new JobActivator(builder.Client); - activeRequest = builder.Request; - maxJobsActive = activeRequest.MaxJobsToActivate; - pollInterval = builder.PollInterval(); - jobClient = JobClientWrapper.Wrap(builder.JobClient); - jobHandler = builder.Handler(); - autoCompletion = builder.AutoCompletionEnabled(); - logger = builder.LoggerFactory?.CreateLogger(); + this.jobWorkerBuilder = builder; + this.source = new CancellationTokenSource(); + this.logger = builder.LoggerFactory?.CreateLogger(); } /// public void Dispose() { source.Cancel(); + var pollInterval = jobWorkerBuilder.PollInterval(); // delay disposing, since poll and handler take some time to close Task.Delay(TimeSpan.FromMilliseconds(pollInterval.TotalMilliseconds * 2)) - .ContinueWith((t) => + .ContinueWith(t => { logger?.LogError("Dispose source"); source.Dispose(); @@ -95,201 +77,44 @@ internal void Open() { isRunning = true; var cancellationToken = source.Token; + var jobWorkerSignal = new JobWorkerSignal(); + + StartPollerThread(jobWorkerSignal, cancellationToken); + StartHandlerThreads(jobWorkerSignal, cancellationToken); + var command = jobWorkerBuilder.Command; + logger?.LogDebug( + "Job worker ({worker}) for job type {type} has been opened.", + command.Request.Worker, + command.Request.Type); + } + + private void StartPollerThread(JobWorkerSignal jobWorkerSignal, CancellationToken cancellationToken) + { + var poller = new JobPoller(jobWorkerBuilder, workItems, jobWorkerSignal); Task.Run( async () => - await Poll(cancellationToken) + await poller.Poll(cancellationToken) .ContinueWith( t => logger?.LogError(t.Exception, "Job polling failed."), TaskContinuationOptions.OnlyOnFaulted), cancellationToken) .ContinueWith( t => logger?.LogError(t.Exception, "Job polling failed."), TaskContinuationOptions.OnlyOnFaulted); - - Task.Run(async () => await HandleActivatedJobs(cancellationToken), cancellationToken) - .ContinueWith( - t => logger?.LogError(t.Exception, "Job handling failed."), - TaskContinuationOptions.OnlyOnFaulted); - - logger?.LogDebug( - "Job worker ({worker}) for job type {type} has been opened.", - activeRequest.Worker, - activeRequest.Type); - } - - private async Task HandleActivatedJobs(CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - if (!workItems.IsEmpty) - { - bool success = workItems.TryDequeue(out IJob activatedJob); - - if (success) - { - await HandleActivatedJob(cancellationToken, activatedJob); - } - else - { - pollSignal.Set(); - } - } - else - { - handleSignal.WaitOne(pollInterval); - } - } - } - - private async Task HandleActivatedJob(CancellationToken cancellationToken, IJob activatedJob) - { - try - { - await jobHandler(jobClient, activatedJob); - await TryToAutoCompleteJob(activatedJob); - } - catch (Exception exception) - { - await FailActivatedJob(activatedJob, cancellationToken, exception); - } - finally - { - jobClient.Reset(); - } } - private async Task TryToAutoCompleteJob(IJob activatedJob) + private void StartHandlerThreads(JobWorkerSignal jobWorkerSignal, CancellationToken cancellationToken) { - if (!jobClient.ClientWasUsed && autoCompletion) + var threadCount = jobWorkerBuilder.ThreadCount; + for (var i = 0; i < threadCount; i++) { - logger?.LogDebug( - "Job worker ({worker}) will auto complete job with key '{key}'", - activeRequest.Worker, - activatedJob.Key); - await jobClient.NewCompleteJobCommand(activatedJob) - .Send(); - } - } - - private Task FailActivatedJob(IJob activatedJob, CancellationToken cancellationToken, Exception exception) - { - var errorMessage = string.Format( - JobFailMessage, - activatedJob.Worker, - activatedJob.Type, - exception.Message); - logger?.LogError(exception, errorMessage); - - return jobClient.NewFailCommand(activatedJob.Key) - .Retries(activatedJob.Retries - 1) - .ErrorMessage(errorMessage) - .Send() - .ContinueWith( - task => - { - if (task.IsFaulted) - { - logger?.LogError("Problem on failing job occured.", task.Exception); - } - }, cancellationToken); - } + logger?.LogDebug("Start handler {index} thread", i); - private async Task Poll(CancellationToken cancellationToken) - { - while (!cancellationToken.IsCancellationRequested) - { - if (workItems.Count < maxJobsActive) - { - try - { - await PollJobs(cancellationToken); - } - catch (RpcException rpcException) - { - LogLevel logLevel; - switch (rpcException.StatusCode) - { - case StatusCode.DeadlineExceeded: - case StatusCode.Cancelled: - logLevel = LogLevel.Debug; - break; - default: - logLevel = LogLevel.Error; - break; - } - - logger?.Log(logLevel, rpcException, "Unexpected RpcException on polling new jobs."); - } - } - - pollSignal.WaitOne(pollInterval); - } - } - - private async Task PollJobs(CancellationToken cancellationToken) - { - var jobCount = maxJobsActive - workItems.Count; - activeRequest.MaxJobsToActivate = jobCount; - - var response = await activator.SendActivateRequest(activeRequest, null, cancellationToken); - - logger?.LogDebug( - "Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.", - activeRequest.Worker, - response.Jobs.Count, - jobCount); - foreach (var job in response.Jobs) - { - workItems.Enqueue(job); - } - - handleSignal.Set(); - } - - private class JobClientWrapper : IJobClient - { - public static JobClientWrapper Wrap(IJobClient client) - { - return new JobClientWrapper(client); - } - - public bool ClientWasUsed { get; private set; } - - private IJobClient Client { get; } - - private JobClientWrapper(IJobClient client) - { - Client = client; - ClientWasUsed = false; - } - - public ICompleteJobCommandStep1 NewCompleteJobCommand(long jobKey) - { - ClientWasUsed = true; - return Client.NewCompleteJobCommand(jobKey); - } - - public IFailJobCommandStep1 NewFailCommand(long jobKey) - { - ClientWasUsed = true; - return Client.NewFailCommand(jobKey); - } - - public IThrowErrorCommandStep1 NewThrowErrorCommand(long jobKey) - { - ClientWasUsed = true; - return Client.NewThrowErrorCommand(jobKey); - } - - public void Reset() - { - ClientWasUsed = false; - } - - public ICompleteJobCommandStep1 NewCompleteJobCommand(IJob activatedJob) - { - ClientWasUsed = true; - return Client.NewCompleteJobCommand(activatedJob); + var jobHandlerExecutor = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal); + Task.Run(async () => await jobHandlerExecutor.HandleActivatedJobs(cancellationToken), cancellationToken) + .ContinueWith( + t => logger?.LogError(t.Exception, "Job handling failed."), + TaskContinuationOptions.OnlyOnFaulted); } } } diff --git a/Client/Impl/Worker/JobWorkerBuilder.cs b/Client/Impl/Worker/JobWorkerBuilder.cs index 915a64e4..7fd0611f 100644 --- a/Client/Impl/Worker/JobWorkerBuilder.cs +++ b/Client/Impl/Worker/JobWorkerBuilder.cs @@ -15,11 +15,11 @@ using System; using System.Collections.Generic; +using System.Diagnostics.Contracts; using System.Threading.Tasks; -using GatewayProtocol; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; using Zeebe.Client.Api.Worker; +using Zeebe.Client.Impl.Commands; namespace Zeebe.Client.Impl.Worker { @@ -28,25 +28,26 @@ public class JobWorkerBuilder : IJobWorkerBuilderStep1, IJobWorkerBuilderStep2, private TimeSpan pollInterval; private AsyncJobHandler handler; private bool autoCompletion; - + internal byte ThreadCount { get; set; } internal ILoggerFactory LoggerFactory { get; } - internal Gateway.GatewayClient Client { get; } - internal ActivateJobsRequest Request { get; } = new ActivateJobsRequest(); + internal ActivateJobsCommand Command { get; } internal IJobClient JobClient { get; } public JobWorkerBuilder( - Gateway.GatewayClient client, - IJobClient jobClient, + IZeebeClient zeebeClient, ILoggerFactory loggerFactory = null) { LoggerFactory = loggerFactory; - Client = client; - JobClient = jobClient; + Command = (ActivateJobsCommand) zeebeClient.NewActivateJobsCommand(); + JobClient = zeebeClient; + ThreadCount = 1; + + zeebeClient.NewActivateJobsCommand(); } public IJobWorkerBuilderStep2 JobType(string type) { - Request.Type = type; + Command.JobType(type); return this; } @@ -69,31 +70,31 @@ internal AsyncJobHandler Handler() public IJobWorkerBuilderStep3 Timeout(TimeSpan timeout) { - Request.Timeout = (long)timeout.TotalMilliseconds; + Command.Timeout(timeout); return this; } public IJobWorkerBuilderStep3 Name(string workerName) { - Request.Worker = workerName; + Command.WorkerName(workerName); return this; } public IJobWorkerBuilderStep3 MaxJobsActive(int maxJobsActive) { - Request.MaxJobsToActivate = maxJobsActive; + Command.MaxJobsToActivate(maxJobsActive); return this; } public IJobWorkerBuilderStep3 FetchVariables(IList fetchVariables) { - Request.FetchVariable.AddRange(fetchVariables); + Command.FetchVariables(fetchVariables); return this; } public IJobWorkerBuilderStep3 FetchVariables(params string[] fetchVariables) { - Request.FetchVariable.AddRange(fetchVariables); + Command.FetchVariables(fetchVariables); return this; } @@ -110,7 +111,7 @@ internal TimeSpan PollInterval() public IJobWorkerBuilderStep3 PollingTimeout(TimeSpan pollingTimeout) { - Request.RequestTimeout = (long)pollingTimeout.TotalMilliseconds; + Command.PollingTimeout(pollingTimeout); return this; } @@ -120,6 +121,18 @@ public IJobWorkerBuilderStep3 AutoCompletion() return this; } + public IJobWorkerBuilderStep3 HandlerThreads(byte threadCount) + { + if (threadCount <= 0) + { + var errorMsg = $"Expected an handler thread count larger then zero, but got {threadCount}."; + throw new ArgumentOutOfRangeException(errorMsg); + } + + this.ThreadCount = threadCount; + return this; + } + internal bool AutoCompletionEnabled() { return autoCompletion; diff --git a/Client/Impl/Worker/JobWorkerSignal.cs b/Client/Impl/Worker/JobWorkerSignal.cs new file mode 100644 index 00000000..21081116 --- /dev/null +++ b/Client/Impl/Worker/JobWorkerSignal.cs @@ -0,0 +1,35 @@ +using System; +using System.Threading; + +namespace Zeebe.Client.Impl.Worker +{ + public class JobWorkerSignal + { + private readonly EventWaitHandle handleSignal = new EventWaitHandle(false, EventResetMode.AutoReset); + private readonly EventWaitHandle pollSignal = new EventWaitHandle(false, EventResetMode.AutoReset); + + internal JobWorkerSignal() + { + } + + public bool AwaitJobHandling(TimeSpan timeSpan) + { + return pollSignal.WaitOne(timeSpan); + } + + public bool AwaitNewJobPolled(TimeSpan timeSpan) + { + return handleSignal.WaitOne(timeSpan); + } + + public void SignalJobHandled() + { + pollSignal.Set(); + } + + public void SignalJobPolled() + { + handleSignal.Set(); + } + } +} \ No newline at end of file diff --git a/Client/ZeebeClient.cs b/Client/ZeebeClient.cs index cbfea2c5..4d1cb3e2 100644 --- a/Client/ZeebeClient.cs +++ b/Client/ZeebeClient.cs @@ -87,7 +87,7 @@ private void AddKeepAliveToChannelOptions(List channelOptions, Ti public IJobWorkerBuilderStep1 NewWorker() { - return new JobWorkerBuilder(gatewayClient, this, loggerFactory); + return new JobWorkerBuilder(this, loggerFactory); } public IActivateJobsCommandStep1 NewActivateJobsCommand()