-
Notifications
You must be signed in to change notification settings - Fork 57
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(client): rewrote job worker completely
The worker now allows to handle jobs concurrently. Via a new configuration `HandlerThreads` you can set the thread count, which are used to handle activated jobs. Make sure your given Handler implementation is thread-safe. Per default the HandlerThread is set to 1, to be backwards compatible. Fixed some concurrency issues in activating and job handling, which caused that either for a longer time no jobs are polled or not handled or polled to fast. Poll interval are now respected better. It is rewritten in a way that it is now also better testable. Introducing an threshold activation mechanism, which means that new jobs are only activated after the working queue reaches a threshold of 0.6. **Example:** maxActivationCount = 32 threshold = Ceil(32 * 0.6) = 20 If the maxActivationCount is set to 32, then for first twelve jobs no new jobs are activated. After going under 20 jobs it will poll again for new jobs, based on the work queue (32 - work queue count). This allows to batch better job activation it will reduce the load on the gateway and broker and avoids doing to much requests for a single job activation. closes #133 closes #149
- Loading branch information
1 parent
e74165e
commit 17ac318
Showing
16 changed files
with
1,005 additions
and
240 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using GatewayProtocol; | ||
using Microsoft.Extensions.Logging; | ||
using NLog; | ||
using NUnit.Framework; | ||
using Zeebe.Client.Api.Responses; | ||
|
||
namespace Zeebe.Client.Impl.Worker | ||
{ | ||
[TestFixture] | ||
public class JobHandlerTest : BaseZeebeTest | ||
{ | ||
private ConcurrentQueue<IJob> workItems; | ||
private ConcurrentQueue<IJob> seenJobs; | ||
private JobWorkerSignal jobWorkerSignal; | ||
private JobHandlerExecutor jobHandler; | ||
private CancellationTokenSource tokenSource; | ||
|
||
[SetUp] | ||
public void SetupTest() | ||
{ | ||
workItems = new ConcurrentQueue<IJob>(); | ||
seenJobs = new ConcurrentQueue<IJob>(); | ||
var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient); | ||
jobWorkerSignal = new JobWorkerSignal(); | ||
|
||
jobWorkerBuilder | ||
.JobType("foo") | ||
.Handler((jobClient, job) => { seenJobs.Enqueue(job); }) | ||
.MaxJobsActive(3) | ||
.Name("jobWorker") | ||
.Timeout(TimeSpan.FromSeconds(123L)) | ||
.PollInterval(TimeSpan.FromMilliseconds(100)) | ||
.PollingTimeout(TimeSpan.FromSeconds(5L)); | ||
jobHandler = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal); | ||
tokenSource = new CancellationTokenSource(); | ||
} | ||
|
||
[TearDown] | ||
public async Task CleanUp() | ||
{ | ||
tokenSource.Cancel(); | ||
// delay disposing, since poll and handler take some time to close | ||
await Task.Delay(TimeSpan.FromMilliseconds(200)) | ||
.ContinueWith(t => { tokenSource.Dispose(); }); | ||
|
||
tokenSource = null; | ||
jobHandler = null; | ||
seenJobs = null; | ||
workItems = null; | ||
jobWorkerSignal = null; | ||
} | ||
|
||
[Test] | ||
public void ShouldHandleJob() | ||
{ | ||
// given | ||
var expectedJob = CreateActivatedJob(1); | ||
workItems.Enqueue(CreateActivatedJob(1)); | ||
|
||
// when | ||
ScheduleHandling(); | ||
|
||
// then | ||
var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1)); | ||
Assert.IsTrue(hasJobHandled); | ||
AwaitJobsHaveSeen(1); | ||
|
||
Assert.AreEqual(1, seenJobs.Count); | ||
Assert.IsTrue(seenJobs.TryDequeue(out var actualJob)); | ||
Assert.AreEqual(expectedJob, actualJob); | ||
} | ||
|
||
[Test] | ||
public void ShouldTriggerJobHandling() | ||
{ | ||
// given | ||
var expectedJob = CreateActivatedJob(1); | ||
ScheduleHandling(); | ||
jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1)); | ||
|
||
// when | ||
workItems.Enqueue(CreateActivatedJob(1)); | ||
jobWorkerSignal.SignalJobPolled(); | ||
|
||
// then | ||
var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1)); | ||
Assert.IsTrue(hasJobHandled); | ||
AwaitJobsHaveSeen(1); | ||
|
||
Assert.AreEqual(1, seenJobs.Count); | ||
Assert.IsTrue(seenJobs.TryDequeue(out var actualJob)); | ||
Assert.AreEqual(expectedJob, actualJob); | ||
} | ||
|
||
[Test] | ||
public void ShouldHandleJobsInOrder() | ||
{ | ||
// given | ||
workItems.Enqueue(CreateActivatedJob(1)); | ||
workItems.Enqueue(CreateActivatedJob(2)); | ||
workItems.Enqueue(CreateActivatedJob(3)); | ||
|
||
// when | ||
ScheduleHandling(); | ||
|
||
// then | ||
AwaitJobsHaveSeen(3); | ||
|
||
IJob actualJob; | ||
Assert.IsTrue(seenJobs.TryDequeue(out actualJob)); | ||
Assert.AreEqual(1, actualJob.Key); | ||
Assert.IsTrue(seenJobs.TryDequeue(out actualJob)); | ||
Assert.AreEqual(2, actualJob.Key); | ||
Assert.IsTrue(seenJobs.TryDequeue(out actualJob)); | ||
Assert.AreEqual(3, actualJob.Key); | ||
} | ||
|
||
[Test] | ||
public void ShouldNotHandleDuplicateOnConcurrentHandlers() | ||
{ | ||
// given | ||
workItems.Enqueue(CreateActivatedJob(1)); | ||
workItems.Enqueue(CreateActivatedJob(2)); | ||
workItems.Enqueue(CreateActivatedJob(3)); | ||
|
||
// when | ||
ScheduleHandling(); | ||
ScheduleHandling(); | ||
|
||
// then | ||
AwaitJobsHaveSeen(3); | ||
CollectionAssert.AllItemsAreUnique(seenJobs); | ||
} | ||
|
||
private async void AwaitJobsHaveSeen(int expectedCount) | ||
{ | ||
while (!tokenSource.IsCancellationRequested && seenJobs.Count < expectedCount) | ||
{ | ||
await Task.Delay(25); | ||
} | ||
} | ||
|
||
private void ScheduleHandling() | ||
{ | ||
Task.Run(() => jobHandler.HandleActivatedJobs(tokenSource.Token), tokenSource.Token); | ||
} | ||
|
||
private static Responses.ActivatedJob CreateActivatedJob(long key) | ||
{ | ||
return new Responses.ActivatedJob(new ActivatedJob | ||
{ | ||
Key = key, | ||
Worker = "jobWorker", | ||
Type = "foo", | ||
Variables = "{\"foo\":1}", | ||
CustomHeaders = "{\"customFoo\":\"1\"}", | ||
Retries = 3, | ||
Deadline = 123932, | ||
BpmnProcessId = "process", | ||
ElementId = "job1", | ||
ElementInstanceKey = 23, | ||
WorkflowInstanceKey = 29, | ||
WorkflowDefinitionVersion = 3, | ||
WorkflowKey = 21 | ||
}); | ||
} | ||
} | ||
} |
Oops, something went wrong.