Skip to content

Commit

Permalink
Merge pull request #312 from camunda-community-hub/zell-fix-activation
Browse files Browse the repository at this point in the history
Fix job worker activation and handling
  • Loading branch information
ChrisKujawa authored Aug 20, 2021
2 parents 754de93 + 2ee7323 commit 8fe6ad2
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 46 deletions.
1 change: 1 addition & 0 deletions Client.IntegrationTests/Client.IntegrationTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DotNet.Testcontainers" Version="1.5.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.14" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="NLog" Version="4.7.11" />
Expand Down
100 changes: 100 additions & 0 deletions Client.IntegrationTests/JobWorkerMultiPartitionTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Zeebe.Client;
using Zeebe.Client.Api.Responses;

namespace Client.IntegrationTests
{
[TestFixture]
public class JobWorkerMultiPartitionTest
{
private static readonly string DemoProcessPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "Resources", "oneTaskProcess.bpmn");

private readonly ZeebeIntegrationTestHelper testHelper = ZeebeIntegrationTestHelper.Latest().withPartitionCount(3);
private IZeebeClient zeebeClient;
private long processDefinitionKey;

[OneTimeSetUp]
public async Task Setup()
{
zeebeClient = await testHelper.SetupIntegrationTest();
var deployResponse = await zeebeClient.NewDeployCommand()
.AddResourceFile(DemoProcessPath)
.Send();
processDefinitionKey = deployResponse.Processes[0].ProcessDefinitionKey;
}

[OneTimeTearDown]
public async Task Stop()
{
await testHelper.TearDownIntegrationTest();
}

[Test]
public async Task ShouldHandleAllJobs()
{
// given
var handledJobs = new List<IJob>();
foreach (int i in Enumerable.Range(1, 3))
{
await zeebeClient.NewCreateProcessInstanceCommand()
.ProcessDefinitionKey(processDefinitionKey)
.Send();
}

// when
using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
{
using (zeebeClient.NewWorker()
.JobType("oneTask")
.Handler(async (jobClient, job) =>
{
await jobClient.NewCompleteJobCommand(job).Send();
handledJobs.Add(job);
if (handledJobs.Count >= 3)
{
signal.Set();
}
})
.MaxJobsActive(5)
.Name("csharpWorker")
.Timeout(TimeSpan.FromHours(10))
.PollInterval(TimeSpan.FromSeconds(5))
.Open())
{
signal.WaitOne(TimeSpan.FromSeconds(5));
}
}

Assert.AreEqual(3, handledJobs.Count);
}


[Test]
public async Task ShouldActivateAllJobs()
{
// given
foreach (int i in Enumerable.Range(1, 3))
{
await zeebeClient.NewCreateProcessInstanceCommand()
.ProcessDefinitionKey(processDefinitionKey)
.Send();
}

// when
var activateJobsResponse = await zeebeClient.NewActivateJobsCommand()
.JobType("oneTask")
.MaxJobsToActivate(5)
.WorkerName("csharpWorker")
.Timeout(TimeSpan.FromHours(10))
.Send();

Assert.AreEqual(3, activateJobsResponse.Jobs.Count);
}
}
}
2 changes: 1 addition & 1 deletion Client.IntegrationTests/JobWorkerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ public async Task ShouldCompleteProcessWithJobAutoCompletion()
}
}
}
}
}
29 changes: 20 additions & 9 deletions Client.IntegrationTests/ZeebeIntegrationTestHelper.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks;
using DotNet.Testcontainers.Containers.Builders;
using DotNet.Testcontainers.Containers.Configurations;
using DotNet.Testcontainers.Containers.Modules;
using Microsoft.Extensions.Logging;
using NLog.Extensions.Logging;
using TestContainers.Core.Builders;
Expand All @@ -13,10 +17,11 @@ public class ZeebeIntegrationTestHelper
{
public const string LatestVersion = "1.0.0";

private Container container;
private TestcontainersContainer container;
private IZeebeClient client;

private readonly string version;
private int count = 1;

private ZeebeIntegrationTestHelper(string version)
{
Expand All @@ -28,6 +33,12 @@ public static ZeebeIntegrationTestHelper Latest()
return new ZeebeIntegrationTestHelper(LatestVersion);
}

public ZeebeIntegrationTestHelper withPartitionCount(int count)
{
this.count = count;
return this;
}

public static ZeebeIntegrationTestHelper OfVersion(string version)
{
return new ZeebeIntegrationTestHelper(version);
Expand All @@ -36,7 +47,7 @@ public static ZeebeIntegrationTestHelper OfVersion(string version)
public async Task<IZeebeClient> SetupIntegrationTest()
{
container = CreateZeebeContainer();
await container.Start();
await container.StartAsync();

client = CreateZeebeClient();
await AwaitBrokerReadiness();
Expand All @@ -47,16 +58,16 @@ public async Task TearDownIntegrationTest()
{
client.Dispose();
client = null;
await container.Stop();
await container.StopAsync();
container = null;
}

private Container CreateZeebeContainer()
private TestcontainersContainer CreateZeebeContainer()
{
return new GenericContainerBuilder<Container>()
.Begin()
return new TestcontainersBuilder<TestcontainersContainer>()
.WithImage("camunda/zeebe:" + version)
.WithExposedPorts(26500)
.WithPortBinding(26500)
.WithEnvironment("ZEEBE_BROKER_CLUSTER_PARTITIONSCOUNT", count.ToString())
.Build();
}

Expand All @@ -71,7 +82,7 @@ private IZeebeClient CreateZeebeClient()
loggingBuilder.AddNLog(path);
});

var host = "0.0.0.0:" + container.GetMappedPort(26500);
var host = "0.0.0.0:" + container.GetMappedPublicPort(26500);

return ZeebeClient.Builder()
.UseLoggerFactory(loggerFactory)
Expand All @@ -88,7 +99,7 @@ private async Task AwaitBrokerReadiness()
try
{
var topology = await client.TopologyRequest().Send();
ready = topology.Brokers[0].Partitions.Count == 1;
ready = topology.Brokers[0].Partitions.Count >= count;
}
catch (Exception)
{
Expand Down
4 changes: 3 additions & 1 deletion Client/Impl/Commands/ActivateJobsCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public IActivateJobsCommandStep3 WorkerName(string workerName)

public async Task<IActivateJobsResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), token);
var activateJobsResponses = new Responses.ActivateJobsResponses();
await activator.SendActivateRequest(Request, response => Task.Run(() => activateJobsResponses.Add(response), token), timeout?.FromUtcNow(), token);
return activateJobsResponses;
}

public async Task<IActivateJobsResponse> Send(CancellationToken cancellationToken)
Expand Down
18 changes: 9 additions & 9 deletions Client/Impl/Commands/JobActivator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace Zeebe.Client.Impl.Commands
{
public delegate Task ConsumeJob(IActivateJobsResponse response);
internal class JobActivator
{
private readonly GatewayClient client;
Expand All @@ -18,20 +19,19 @@ public JobActivator(GatewayClient client)
this.client = client;
}

public async Task<IActivateJobsResponse> SendActivateRequest(ActivateJobsRequest request, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null)
public async Task SendActivateRequest(ActivateJobsRequest request, ConsumeJob consumer, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null)
{
DateTime activateRequestTimeout = requestTimeout ?? CalculateRequestTimeout(request);
var activateRequestTimeout = requestTimeout ?? CalculateRequestTimeout(request);
using (var stream = client.ActivateJobs(request, deadline: activateRequestTimeout))
{
var responseStream = stream.ResponseStream;
if (await MoveNext(responseStream, cancellationToken))

while (await MoveNext(responseStream, cancellationToken))
{
var response = responseStream.Current;
return new ActivateJobsResponses(response);
var currentResponse = responseStream.Current;
var response = new ActivateJobsResponses(currentResponse);
await consumer.Invoke(response);
}

// empty response
return new ActivateJobsResponses();
}
}

Expand All @@ -44,7 +44,7 @@ private static DateTime CalculateRequestTimeout(ActivateJobsRequest request)
: TimeSpan.FromSeconds((longPollingTimeout / 1000f) + 10).FromUtcNow();
}

private async Task<bool> MoveNext(IAsyncStreamReader<ActivateJobsResponse> stream, CancellationToken? cancellationToken = null)
private static async Task<bool> MoveNext(IAsyncStreamReader<ActivateJobsResponse> stream, CancellationToken? cancellationToken = null)
{
if (cancellationToken.HasValue)
{
Expand Down
13 changes: 12 additions & 1 deletion Client/Impl/Responses/ActivateJobsResponses.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using GatewayProtocol;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client.Impl.Responses
Expand All @@ -15,7 +16,17 @@ public ActivateJobsResponses()

public ActivateJobsResponses(GatewayProtocol.ActivateJobsResponse jobsResponse)
{
Jobs = jobsResponse.Jobs
Jobs = ConvertToList(jobsResponse);
}

public void Add(IActivateJobsResponse jobsResponse)
{
((List<IJob>) Jobs).AddRange(jobsResponse.Jobs);
}

private static List<IJob> ConvertToList(ActivateJobsResponse jobsResponse)
{
return jobsResponse.Jobs
.Select(job => new ActivatedJob(job))
.Cast<IJob>()
.ToList();
Expand Down
27 changes: 17 additions & 10 deletions Client/Impl/Worker/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using GatewayProtocol;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Zeebe.Client.Api.Misc;
using Zeebe.Client.Api.Responses;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Commands;
Expand All @@ -34,7 +36,8 @@ public class JobWorker : IJobWorker
private readonly CancellationTokenSource source;
private readonly ILogger<JobWorker> logger;
private readonly JobWorkerBuilder jobWorkerBuilder;
private readonly ActivateJobsCommand activateJobsCommand;
private readonly ActivateJobsRequest activateJobsRequest;
private readonly JobActivator jobActivator;
private readonly int maxJobsActive;
private readonly AsyncJobHandler jobHandler;
private readonly bool autoCompletion;
Expand All @@ -52,8 +55,9 @@ internal JobWorker(JobWorkerBuilder builder)
this.jobHandler = jobWorkerBuilder.Handler();
this.autoCompletion = builder.AutoCompletionEnabled();
this.pollInterval = jobWorkerBuilder.PollInterval();
this.activateJobsCommand = jobWorkerBuilder.Command;
this.maxJobsActive = jobWorkerBuilder.Command.Request.MaxJobsToActivate;
this.activateJobsRequest = jobWorkerBuilder.Request;
jobActivator = jobWorkerBuilder.Activator;
this.maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate;
this.thresholdJobsActivation = maxJobsActive * 0.6;
}

Expand Down Expand Up @@ -114,8 +118,8 @@ internal void Open()

logger?.LogDebug(
"Job worker ({worker}) for job type {type} has been opened.",
activateJobsCommand.Request.Worker,
activateJobsCommand.Request.Type);
activateJobsRequest.Worker,
activateJobsRequest.Type);
}

private ExecutionDataflowBlockOptions CreateExecutionOptions(CancellationToken cancellationToken)
Expand Down Expand Up @@ -145,12 +149,14 @@ private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancella
if (currentJobs < thresholdJobsActivation)
{
var jobCount = maxJobsActive - currentJobs;
activateJobsCommand.MaxJobsToActivate(jobCount);
activateJobsRequest.MaxJobsToActivate = jobCount;

try
{
var response = await activateJobsCommand.SendWithRetry(null, cancellationToken);
await HandleActivationResponse(input, response, jobCount);
await jobActivator.SendActivateRequest(activateJobsRequest,
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount),
null,
cancellationToken);
}
catch (RpcException rpcException)
{
Expand All @@ -168,7 +174,7 @@ private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJ
{
logger?.LogDebug(
"Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.",
activateJobsCommand.Request.Worker,
activateJobsRequest.Worker,
response.Jobs.Count,
jobCount);

Expand Down Expand Up @@ -207,6 +213,7 @@ private void LogRpcException(RpcException rpcException)
{
case StatusCode.DeadlineExceeded:
case StatusCode.Cancelled:
case StatusCode.ResourceExhausted:
logLevel = LogLevel.Trace;
break;
default:
Expand All @@ -224,7 +231,7 @@ private async Task TryToAutoCompleteJob(JobClientWrapper jobClient, IJob activat
{
logger?.LogDebug(
"Job worker ({worker}) will auto complete job with key '{key}'",
activateJobsCommand.Request.Worker,
activateJobsRequest.Worker,
activatedJob.Key);
await jobClient.NewCompleteJobCommand(activatedJob)
.Send(cancellationToken);
Expand Down
Loading

0 comments on commit 8fe6ad2

Please sign in to comment.