-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: opportunity to realize graceful stopping IJobWorker instances #624
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably something else )
To launch a worker, the following construction is usually used // open job worker
using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
{
client.NewWorker()
.JobType(JobType)
.Handler(HandleJob)
.MaxJobsActive(120)
.Name(WorkerName)
.AutoCompletion()
.PollInterval(TimeSpan.FromMilliseconds(100))
.Timeout(TimeSpan.FromSeconds(10))
.PollingTimeout(TimeSpan.FromSeconds(30))
.Open();
// blocks main thread, so that worker can run
signal.WaitOne();
} Worker handler is something like this private static void HandleJob(IJobClient jobClient, IJob job)
{
Logger.Debug("Handle job {job}", job.Key);
jobClient.NewCompleteJobCommand(job).Send();
} When I use this in ASP.NET Core service, I wrap it in Background service (IHostedService, example) To control IHostedService the asp.net core engine use StartAsync & StopAsync methods. Respectively, in StartAsync method I place code block "client...Open()....WaitHandler.WaitOne", in StopAsync method I reset WaitHandler. Let's back to code 1 // open job worker
2 using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
3 {
4 client.NewWorker()
5 .JobType(JobType)
6 .Handler(HandleJob)
..
10 .Open();
11
12 // blocks main thread, so that worker can run
13 signal.WaitOne();
15 }
16 1 private static void HandleJob(IJobClient jobClient, IJob job)
2 {
3 Logger.Debug("Handle job {job}", job.Key);
4
5 // some worker job, 1..10 seconds duration
6
7 jobClient.NewCompleteJobCommand(job).Send();
8 } Imagine a situation:
It dose not allow to realize to graceful stop service. In my case, the workload is interaction with external service. Not every operation is idempotent. The worker completed his work but did not have time to report to zeebe, zebbe redirect this job to another worker service and it call extrenal service again. The first solution was wait when all handlers complete before worker dispose. 1 // open job worker
2 using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
3 {
4 client.NewWorker()
5 .JobType(JobType)
6 .Handler(HandleJob)
..
10 .Open();
11
12 // blocks main thread, so that worker can run
13 signal.WaitOne();
14
15 while(handlerCount > 0) Thread.Sleep(10);
16 }
17 1 private static void HandleJob(IJobClient jobClient, IJob job)
2 {
3 handlerCount++;
4 Logger.Debug("Handle job {job}", job.Key);
5
6 // some worker job, 1..10 seconds duration
7
8 jobClient.NewCompleteJobCommand(job).Send();
9 handlerCount--;
10 } But problem in this solution that while code wait active handlers ending (line 15), worker continues pooling new task form zeebe and start new handlers. Next step, we fork (locally) zeebe-client-csharp and add method StopPooling() to worker public void StopPooling() => source.Cancel(); It's stop new job pooling private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancellationToken)
{
while (!source.IsCancellationRequested)
{ Now I can do this 1 // open job worker
2 using (var signal = new EventWaitHandle(false, EventResetMode.AutoReset))
3 {
4 var worker = client.NewWorker()
5 .JobType(JobType)
6 .Handler(HandleJob)
..
10 .Open();
11
12 // blocks main thread, so that worker can run
13 signal.WaitOne();
14 worker.StopPooling();
15 while(handlerCount > 0) Thread.Sleep(10);
16 }
17 Before wait for active handlers stopping I call StopPoling(). As result, when the service is stopping:
It's work well. Furthermore, I can extend service stop timeout and wait for not only report to zeebe, I can wait for worker comlete interaction with external service. |
Sorry to intervene in this discussion but why do you use a WaitEventHandle? We also use hosted services for our workers and create them in the Start method, but do not block anything because it's not required: once opened, the worker will poll for jobs until disposed. Now about the problem you mention, I'm afraid that the main question is about the idempotency. It's not because you try to shutdown gracefully that you have any guaranty that you will be able to report the completion of the job to Zeebe. What I mean is that if your hosted service is stopping, it may be because your node is down and you may have already lost any communication means. Just my 2 cents. |
Good question :)
Hm... I have not looked at this problem from this point of view. It works well. The graceful stoping works. public Task StartAsync(CancellationToken cancellationToken)
{
workerInstance = _workerManager.StartWorker(_jobType, WorkTaskAsync, cancellationToken);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
workerInstance.JobWorker.Dispose();
await _workerHandlerCounter.WaitForActiveHandlersAsync();
} public WorkerInstance StartWorker(
string jobType,
Func<IJob, ICompleteJobCommandStep1, CancellationToken, Task> handleJobAsync,
CancellationToken cancellationToken)
{
var result = new WorkerInstance();
var scope = _serviceProvider.CreateScope();
result.ZeebeClient = scope.ServiceProvider.GetRequiredService<IZeebeClient>();
result.JobWorker = result.ZeebeClient
.NewWorker()
.JobType(jobType)
.Handler((client, job) => HandleJobAsync(client, job, handleJobAsync, cancellationToken))
.MaxJobsActive(StaticSettings.MaxJobsActive)
.Name($"{jobType}[{Environment.MachineName}][{Environment.CurrentManagedThreadId}]")
.AutoCompletion()
.PollingTimeout(TimeSpan.FromSeconds(StaticSettings.PollingTimeoutSec))
.PollInterval(TimeSpan.FromSeconds(StaticSettings.PollIntervalSec))
.Timeout(TimeSpan.FromSeconds(StaticSettings.TimeoutSec))
.HandlerThreads(StaticSettings.HandlerThreads)
.Open();
return result;
} The fact that it works looks like using an undocumented feature :) But if @Zelldon, as inspirer of the project, thinks this option is better then add JobWorker.StopPulling() method then ok.
This is cool idea!
Loss of connection is a rare event. But service restart for updates happen frequently. Even if this covers 20...30% of such errors - the effort will be worth it. |
We host multiple workers in a single application using hosted services. But they all share the same Zeebe client. In a distributed system, anything can happen and you must be ready for any kind of failure (that includes remote ITs doing crazy things as we have already seen): so I agree that your use case is right but is not less likely than a loss of communication while you were trying to ack Zeebe about the completion of the job. This is why, I personnaly think that you must be ready to restart in any kind of situation and recover. That being said, it's useless trying to shutdown gracefully, just be ready to restart gracefully. |
Problem:
System.ObjectDisposedException: Cannot access a disposed object.\nObject name: 'ZeebeClient was already disposed.'.\n
at AsyncUnaryCall GatewayProtocol.ClosedGatewayClient.CompleteJobAsync(CompleteJobRequest request, Metadata headers, DateTime? deadline, CancellationToken cancellationToken)\n
at async Task Zeebe.Client.Impl.Commands.CompleteJobCommand.Send(TimeSpan? timeout, CancellationToken token) x 2\n
Really, the worker's task has already been done, but zeebe thinks that it is not correct and after a few time transfers it to another worker. There will be problems:
If it is possible to give a few seconds for the worker to finish the current work and report it to zeebe, there will be fewer time delays required to transfer tasks to other pods.
To implement such a strategy with zeebe-client, it is necessary possibility to stop pushing new tasks, while it must be able to report of complete on current ones; zeebeClient should not be disposed of before reporting.
A simplified implementation example is here
https://github.com/dimasm61/zeebe-client-csharp/tree/feat2_example/Client.GracefulStopping.Example