This project is an extension of the C# Zeebe client project. Zeebe Workers are automatically recognized and bootstrapped via a .Net HostedService.
Read the Zeebe documentation for more information about the Zeebe project.
The basic idea and implementation for this came from https://github.com/camunda-community-hub/zeebe-client-csharp-bootstrap. We loved the idea, but had in some parts our own preferences for defaults, behaviour and separation of concerns. So this is our version of a good Bootstrap Extension for the C# Zeebe Client. Credits for the base work still belong to https://github.com/arjangeertsema.
- net standard 2.0 or higher, which means
- .net core 2.1 or higher
- or .net framework 4.7.1 or higher
- latest C# Zeebe client release
- latest Zeebe release
The Zeebe C# client bootstrap extension is available via nuget (https://www.nuget.org/packages/zb-client-accelerator/).
Recommendation: a complete sample project using this extension can be found in examples.
All classes which implement IZeebeWorker
, IAsyncZeebeWorker
, IZeebeWorkerWithResult
or IAsyncZeebeWorkerWithResult
are automatically added to the service collection and autowired to Zeebe when you register this bootstrap project with the IServiceCollection.BootstrapZeebe()
extension method.
More power is provided by using global::Zeebe.Client.Accelerator.Extensions;
which provides you with further extensions for IHost
, IZeebeClient
etc. in
order to deploy processes or create one time message receivers.
The BootstrapZeebe
method has two parameters:
ZeebeBootstrapOptions
via configuration, action delegate or both.- An array with assemblies which will be scanned for job handlers.
ConfigureServices((hostContext, services) => {
services.BootstrapZeebe(
hostContext.Configuration.GetSection("ZeebeConfiguration"),
this.GetType().Assembly
);
})
Example Web Application:
// Start building my WebApplication
var builder = WebApplication.CreateBuilder(args);
// Bootstrap Zeebe Integration
builder.Services.BootstrapZeebe(
builder.Configuration.GetSection("ZeebeConfiguration"),
typeof(Program).Assembly);
The configuration will e.g. look as follows:
{
"ZeebeConfiguration": {
"Client": {
"GatewayAddress": "127.0.0.1:26500"
},
"Worker": {
"MaxJobsActive": 5,
"TimeoutInMilliseconds": 500,
"PollIntervalInMilliseconds": 50,
"PollingTimeoutInMilliseconds": 1000,
"RetryTimeoutInMilliseconds": 1000
}
},
}
If we want to deploy some processes right before the final startup of our application we create a deployment using the extension for IHost
or IServiceProvider
as follows:
var app = builder.Build();
...
// Deploy all process resources
app.CreateZeebeDeployment()
.UsingDirectory("Resources")
.AddResource("insurance_application.bpmn")
.AddResource("document_request.bpmn")
.AddResource("risk_check.dmn")
.Deploy();
// Now run the application
app.Run();
A Zeebe Worker is an implementation of IZeebeWorker
, IAsyncZeebeWorker
, IZeebeWorkerWithResult
or IAsyncZeebeWorkerWithResult
. Zeebe Workers are automatically added to the DI container, therefore you can use dependency injection inside. The default worker configuration can be overwritten with AbstractWorkerAttribute
implementations, see attributes for more information.
[JobType("doSomeWork")]
public class SomeWorker : IAsyncZeebeWorker
{
private readonly MyApiService _myApiService;
public SimpleJobHandler(MyApiService myApiService)
{
_myApiService = myApiService;
}
/// <summary>
/// Handles the job "doSomeWork".
/// </summary>
/// <param name="job">the Zeebe job</param>
/// <param name="cancellationToken">cancellation token</param>
public async Task HandleJob(ZeebeJob job, CancellationToken cancellationToken)
{
// execute business service etc.
await _myApiService.DoSomethingAsync(cancellationToken);
}
}
Of course you are able to access process variables and return a result. E.g.:
[JobType("doAwesomeWork")]
public class AwesomeWorker : IAsyncZeebeWorker<SimpleJobPayload, SimpleResponse>
{
...
public async Task<SimpleResponse> HandleJob(ZeebeJob<SimpleJobPayload> job, CancellationToken cancellationToken)
{
// get variables as declared (SimpleJobPayload)
var variables = job.getVariables();
// execute business service etc.
var result = await _myApiService.DoSomethingAsync(variables.CustomerNo, cancellationToken);
return new SimpleResponse(result);
}
class SimpleJobPayload
{
public string CustomerNo { get; set; }
}
}
The above code will fetch exactly the variables defined as attributes in SimpleJobPaylad
from the process.
And there are more options, including the option to access custom headers configured in the process model:
[JobType("doComplexWork")]
public class ComplexWorker : IAsyncZeebeWorker
{
...
public async Task HandleJob(ZeebeJob job, CancellationToken cancellationToken)
{
// get all variables (and deserialize to a given type)
ProcessVariables variables = job.getVariables<ProcessVariables>();
// get custom headers (and deserialize to a given type)
MyCustomHeaders headers = job.getCustomHeaders<MyCustomHeaders>();
// execute business service etc.
await _myApiService.DoSomethingComplex(variables.Customer, headers.SomeConfiguration, cancellationToken);
...
}
class ProcessVariables
{
public string? BusinessKey { get; set; }
public CustomerData Customer { get; set; }
public string? AccountName { get; set; }
...
}
class MyCustomHeaders
{
public string SomeConfiguration { get; set; }
}
}
The following table gives you an overview of the available options:
Interface | Description | Fetched Variables |
---|---|---|
IAsyncZeebeWorker |
Asynchronous worker without specific input and no response | Default is to fetch all process variables. Use FetchVariables attribute for restictions. |
IAsyncZeebeWorker<TInput> |
Asynchronous worker with specific input and no response | Fetches exactly the variables defined as attributes in TInput . |
IAsyncZeebeWorker<TInput, TResponse> |
Asynchronous worker with specific input and specific response | Fetches exactly the variables defined as attributes in TInput . |
IAsyncZeebeWorkerWithResult<TResponse> |
Asynchronous worker without specific input but a specific response | Default is to fetch all process variables. Use FetchVariables attribute for restrictions. |
IZeebeWorker |
Synchronous worker without specific input and no response | Default is to fetch all process variables. Use FetchVariables attribute for restictions. |
IZeebeWorker<TInput> |
Synchronous worker with specific input and no response | Fetches exactly the variables defined as attributes in TInput . |
IZeebeWorker<TInput, TResponse> |
Synchronous worker with specific input and specific response | Fetches exactly the variables defined as attributes in TInput . |
IZeebeWorkerWithResult<TResponse> |
Synchronous worker without specific input but a specific response | Default is to fetch all process variables. Use FetchVariables attribute for restrictions. |
If you like to explicitely restrict the variables fetched from Zeebe, you have the following additional option:
[JobType("doComplexWork")]
[FetchVariables("businessKey", "applicantName")]
public class SimpleWorker : IAsyncZeebeWorker
{
...
}
In case you do not want to fetch any variables at all from Zeebe, use [FetchVariables(none: true)]
:
[JobType("doSimpleWork")]
[FetchVariables(none: true)]
class SimpleWorker : IZeebeWorker
{
...
}
A handled job has three outcomes:
- The job has been handled without exceptions: this will automaticly result in a
JobCompletedCommand
beeing send to the broker. The optionalTResponse
is automaticly serialized and added to theJobCompletedCommand
. - A
BpmnErrorException
has been thrown while handling the job: this will automaticly result in aThrowErrorCommand
beeing send to the broker triggering Error Boundary Events in the process. - Any other unexpected exception will automatically result in a
FailCommand
beeing send to the broker including message details and reducing the number of retries;
See Example for synchronous responses from processes for a description of the scenario.
You can create a one time job handler for receiving a message for a dynamic job type "received_" + number
as follows:
try
{
string jsonContent = _zeebeClient.ReceiveMessage("received_" + number, TimeSpan.FromSeconds(5), "someVariable1", "someVariable2");
...
} catch (MessageTimeoutException)
{
// nothing received
...
}
Of course it is possible to use a typed response, which will automatically fetch and deserialize all variables defined as attributes in the given type:
MyVariables typedContent = _zeebeClient.ReceiveMessage<MyVariables>("received_" + number, TimeSpan.FromSeconds(3));
Simply waiting without receiving any variables:
bool messageReceived = _zeebeClient.ReceiveMessage("received_" + number, TimeSpan.FromSeconds(3));
The one time job handler will be destroyed after ReceiveMessage
returns.
- By default the workers are added to de DI container with a
Transient
service lifetime. This can be overriden by adding theServiceLifetimeAttribute
to the worker, see attributes for more information. - By default the
ZeebeVariablesSerializer
is registered as the implementation forIZeebeVariablesSerializer
which usesSystem.Text.Json.JsonSerializer
. Serialization / Deserialization always uses CamelCase as naming policy! - The default job type of a worker is the class name of the worker. This can be overriden by adding the
JobTypeAttribute
to the worker, e.g.[JobType("myJobName")]
.
Run dotnet build Zeebe.Client.Accelerator.sln
Run dotnet test Zeebe.Client.Accelerator.sln