Skip to content

Commit

Permalink
chore(client): rewrote job worker completely
Browse files Browse the repository at this point in the history
The worker now allows to handle jobs concurrently. Via a new configuration `HandlerThreads` you can set the thread count, which are used to handle activated jobs. Make sure your given Handler implementation is thread-safe. Per default the HandlerThread is set to 1, to be backwards compatible.

Fixed some concurrency issues in activating and job handling, which caused that either for a longer time no jobs are polled or not handled or polled to fast. Poll interval are now respected better. It is rewritten in a way that it is now also better testable.

Introducing an threshold activation mechanism, which means that new jobs are only activated after the working queue reaches a threshold of 0.6.

**Example:**

maxActivationCount = 32
threshold = Ceil(32 * 0.6) = 20

If the maxActivationCount is set to 32, then for first twelve jobs no new jobs are activated. After going under 20 jobs it will poll again for new jobs, based on the work queue (32 - work queue count). This allows to batch better job activation it will reduce the load on the gateway and broker and avoids doing to much requests for a single job activation.

closes #133
closes #149
  • Loading branch information
ChrisKujawa committed Aug 15, 2020
1 parent 67d6d47 commit ab2c670
Show file tree
Hide file tree
Showing 16 changed files with 1,005 additions and 240 deletions.
21 changes: 18 additions & 3 deletions Client.UnitTests/BaseZeebeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
Expand All @@ -28,8 +30,8 @@ public class BaseZeebeTest
private IZeebeClient client;

public Server Server => server;
public GatewayTestService TestService => testService;
public IZeebeClient ZeebeClient => client;
protected GatewayTestService TestService => testService;
protected IZeebeClient ZeebeClient => client;

[SetUp]
public void Init()
Expand All @@ -43,17 +45,30 @@ public void Init()
server.Services.Add(serviceDefinition);
server.Start();

client = Client.ZeebeClient.Builder().UseGatewayAddress("localhost:26500").UsePlainText().Build();
client = Client.ZeebeClient
.Builder()
.UseGatewayAddress("localhost:26500")
.UsePlainText()
.Build();
}

[TearDown]
public void Stop()
{
client.Dispose();
server.ShutdownAsync().Wait();
testService.Requests.Clear();
testService = null;
server = null;
client = null;
}

public void AwaitRequestCount(Type type, int requestCount)
{
while (TestService.Requests[type].Count < requestCount)
{
Thread.Sleep(TimeSpan.FromMilliseconds(100));
}
}
}
}
8 changes: 8 additions & 0 deletions Client.UnitTests/Client.UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
<RootNamespace>Zeebe.Client</RootNamespace>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>

<ItemGroup>
<None Update="Resources\**" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
Expand Down
173 changes: 173 additions & 0 deletions Client.UnitTests/JobHandlerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Microsoft.Extensions.Logging;
using NLog;
using NUnit.Framework;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client.Impl.Worker
{
[TestFixture]
public class JobHandlerTest : BaseZeebeTest
{
private ConcurrentQueue<IJob> workItems;
private ConcurrentQueue<IJob> seenJobs;
private JobWorkerSignal jobWorkerSignal;
private JobHandlerExecutor jobHandler;
private CancellationTokenSource tokenSource;

[SetUp]
public void SetupTest()
{
workItems = new ConcurrentQueue<IJob>();
seenJobs = new ConcurrentQueue<IJob>();
var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient);
jobWorkerSignal = new JobWorkerSignal();

jobWorkerBuilder
.JobType("foo")
.Handler((jobClient, job) => { seenJobs.Enqueue(job); })
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(123L))
.PollInterval(TimeSpan.FromMilliseconds(100))
.PollingTimeout(TimeSpan.FromSeconds(5L));
jobHandler = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal);
tokenSource = new CancellationTokenSource();
}

[TearDown]
public async Task CleanUp()
{
tokenSource.Cancel();
// delay disposing, since poll and handler take some time to close
await Task.Delay(TimeSpan.FromMilliseconds(200))
.ContinueWith(t => { tokenSource.Dispose(); });

tokenSource = null;
jobHandler = null;
seenJobs = null;
workItems = null;
jobWorkerSignal = null;
}

[Test]
public void ShouldHandleJob()
{
// given
var expectedJob = CreateActivatedJob(1);
workItems.Enqueue(CreateActivatedJob(1));

// when
ScheduleHandling();

// then
var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
Assert.IsTrue(hasJobHandled);
AwaitJobsHaveSeen(1);

Assert.AreEqual(1, seenJobs.Count);
Assert.IsTrue(seenJobs.TryDequeue(out var actualJob));
Assert.AreEqual(expectedJob, actualJob);
}

[Test]
public void ShouldTriggerJobHandling()
{
// given
var expectedJob = CreateActivatedJob(1);
ScheduleHandling();
jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));

// when
workItems.Enqueue(CreateActivatedJob(1));
jobWorkerSignal.SignalJobPolled();

// then
var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
Assert.IsTrue(hasJobHandled);
AwaitJobsHaveSeen(1);

Assert.AreEqual(1, seenJobs.Count);
Assert.IsTrue(seenJobs.TryDequeue(out var actualJob));
Assert.AreEqual(expectedJob, actualJob);
}

[Test]
public void ShouldHandleJobsInOrder()
{
// given
workItems.Enqueue(CreateActivatedJob(1));
workItems.Enqueue(CreateActivatedJob(2));
workItems.Enqueue(CreateActivatedJob(3));

// when
ScheduleHandling();

// then
AwaitJobsHaveSeen(3);

IJob actualJob;
Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
Assert.AreEqual(1, actualJob.Key);
Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
Assert.AreEqual(2, actualJob.Key);
Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
Assert.AreEqual(3, actualJob.Key);
}

[Test]
public void ShouldNotHandleDuplicateOnConcurrentHandlers()
{
// given
workItems.Enqueue(CreateActivatedJob(1));
workItems.Enqueue(CreateActivatedJob(2));
workItems.Enqueue(CreateActivatedJob(3));

// when
ScheduleHandling();
ScheduleHandling();

// then
AwaitJobsHaveSeen(3);
CollectionAssert.AllItemsAreUnique(seenJobs);
}

private async void AwaitJobsHaveSeen(int expectedCount)
{
while (!tokenSource.IsCancellationRequested && seenJobs.Count < expectedCount)
{
await Task.Delay(25);
}
}

private void ScheduleHandling()
{
Task.Run(() => jobHandler.HandleActivatedJobs(tokenSource.Token), tokenSource.Token);
}

private static Responses.ActivatedJob CreateActivatedJob(long key)
{
return new Responses.ActivatedJob(new ActivatedJob
{
Key = key,
Worker = "jobWorker",
Type = "foo",
Variables = "{\"foo\":1}",
CustomHeaders = "{\"customFoo\":\"1\"}",
Retries = 3,
Deadline = 123932,
BpmnProcessId = "process",
ElementId = "job1",
ElementInstanceKey = 23,
WorkflowInstanceKey = 29,
WorkflowDefinitionVersion = 3,
WorkflowKey = 21
});
}
}
}
Loading

0 comments on commit ab2c670

Please sign in to comment.