Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Mediator to Brighter #3370

Draft
wants to merge 45 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f52c18d
chore: save work in progress
iancooper Oct 28, 2024
36890f5
feat: add ADR for a mediator and assembly
iancooper Oct 30, 2024
d92cb84
feat: required workflow classes to write setup of rist mediator test
iancooper Oct 30, 2024
556bb7a
chore: safety check whilst releasing V9 and V10; does not build
iancooper Oct 31, 2024
3ed0e7e
chore: safety checkin
iancooper Nov 1, 2024
e0c9c85
feat: add fire and forget action
iancooper Nov 1, 2024
f8609b8
feat: add requestreply outline
iancooper Nov 1, 2024
255d086
fix: folder name causing issues on MacOS which believes it is an appl…
iancooper Nov 2, 2024
a321e30
feat: modifications to step and workflow responsibility
iancooper Nov 4, 2024
7a879e6
feat: add workflow data, over just using the bag
iancooper Nov 5, 2024
b8eded1
feat: need correlation id on event and command to support workflow
iancooper Nov 6, 2024
5298d38
chore: check in to allow merging of master
iancooper Nov 6, 2024
ab16834
chore: merge branch 'master' into mediator
iancooper Nov 6, 2024
92e344d
feat: move completed workflows to the done state
iancooper Nov 9, 2024
6840f04
feat: add an ADR for adding the specification pattern
iancooper Nov 9, 2024
9e63244
feat: add the specification pattern
iancooper Nov 9, 2024
f7cf152
fix: typo in filename
iancooper Nov 9, 2024
b09c3a6
chore: safety dance
iancooper Nov 9, 2024
5313018
chore: safety dance
iancooper Nov 9, 2024
c41547a
feat: add a choice workflow action
iancooper Nov 10, 2024
c42b6b2
fix: shared fixture problems
iancooper Nov 10, 2024
40bf2b4
feat: add first version of robust flow
iancooper Nov 10, 2024
2b11f6a
chore: ~Merge branch 'master' into mediator
iancooper Nov 10, 2024
56f577e
fix: remove IAmTheWorkflowData as unnecessary abstraction.
iancooper Nov 11, 2024
3cb3c4b
fix: make choice about choosing the next step from the workflow data
iancooper Nov 11, 2024
bd6d2e1
fix: tests not checking all paths
iancooper Nov 11, 2024
12a7ecd
fix: add workflow patterns to ADR
iancooper Nov 12, 2024
f24fac2
feat: move to workflow patterns style, step and task; some behaviours…
iancooper Nov 13, 2024
991e4f2
feat: first pass at Parallel; requires Scheduler-Runner split to Medi…
iancooper Nov 17, 2024
1373583
chore: safety check-in during scheduler/runner split work
iancooper Nov 18, 2024
6067bd4
fix: refactor relationship between job and step to be more explicit. …
iancooper Nov 19, 2024
8b4e5e4
fix: step advancement manages job state
iancooper Nov 19, 2024
e5a6639
fix: add cancellation token interrupt of runner to all tests
iancooper Nov 19, 2024
acef39d
chore: safety check in; fixing failing tests
iancooper Nov 19, 2024
aa8ca95
fix: get the steps to save state, when they modify the job, not the r…
iancooper Nov 20, 2024
1cea4ea
fix: add fault version of robust request-reply
iancooper Nov 20, 2024
08b2e18
fix: add multi-threading support to Job
iancooper Nov 20, 2024
67f3376
Update ASB Samples to use the Emulator (#3391)
preardon Nov 21, 2024
e118650
feat: adding a Wait step.
iancooper Nov 23, 2024
ca229f0
fix: don't try to await a thread; ensure we leave time for scheduler …
iancooper Nov 23, 2024
059fff0
chore: merge from master
iancooper Nov 25, 2024
a7019a2
chore: merge with master
iancooper Dec 2, 2024
f35d999
chore: safety check in; failing test on parallel split still
iancooper Dec 2, 2024
1ef0839
fix: parallel split was not terminating
iancooper Dec 4, 2024
392dd4d
fix: we should pass data to the callbacks; was capturing variable in …
iancooper Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Salutation_Sweeper", "sampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.MsSql", "src\Paramore.Brighter.Locking.MsSql\Paramore.Brighter.Locking.MsSql.csproj", "{758EE237-C722-4A0A-908C-2D08C1E59025}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Mediator", "src\Paramore.Brighter.Mediator\Paramore.Brighter.Mediator.csproj", "{F00B137A-C187-4C33-A37B-22AD40B71600}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1765,6 +1767,18 @@ Global
{758EE237-C722-4A0A-908C-2D08C1E59025}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{758EE237-C722-4A0A-908C-2D08C1E59025}.Release|x86.ActiveCfg = Release|Any CPU
{758EE237-C722-4A0A-908C-2D08C1E59025}.Release|x86.Build.0 = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|x86.ActiveCfg = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Debug|x86.Build.0 = Debug|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Release|Any CPU.Build.0 = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Release|x86.ActiveCfg = Release|Any CPU
{F00B137A-C187-4C33-A37B-22AD40B71600}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageVersion Include="System.Reflection.TypeExtensions" Version="4.7.0" />
<PackageVersion Include="System.Text.Json" Version="8.0.5" />
<PackageVersion Include="System.Threading.Channels" Version="9.0.0" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2">
<PrivateAssets>all</PrivateAssets>
Expand Down
2 changes: 1 addition & 1 deletion docs/adr/0020-reduce-esb-complexity.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 20. Reduce External Service Bus Complexity

Date: 2019-08-01
Date: 2024-08-01

## Status

Expand Down
58 changes: 58 additions & 0 deletions docs/adr/0022-add-a-mediator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# 22. Add a Mediator to Brighter

Date: 2024-10-22

## Status

Proposed

## Context
We have two approaches to a workflow: orchestration and choreography. In choreography the workflow emerges from the interaction of the participants. In orchestration, one participant executes the workflow, calling other participants as needed. Whilst choreography has low-coupling, it also has low-cohesion. At scale this can lead to the Pinball anti-pattern, where it is difficult to maintain the workflow.

The [Mediator](https://www.oodesign.com/mediator-pattern) pattern provides an orchestrator that manages a workflow that involves multiple objects. In its simplest form, instead of talking to each other, objects talk to the mediator, which then calls other objects as required to execute the workflow.

Brighter provides `IHandleRequests<>` to provide a handler for an individual request, either a command or an event. It is possible to have an emergent workflow, within Brighter, through the choreography of these handlers. However, Brighter provides no model for an orchestrator that manages a workflow that involves multiple handlers. In particular, Brighter does not support a class that can listen to multiple requests and then call other handlers as required to execute the workflow.

In principle, nothing stops an end user from implementing a `Mediator` class that listens to multiple requests and then calls other handlers as required to execute the workflow. So orchestration has always been viable, but left as an exercise to the user. However, competing OSS projects provide popular workflow functionality, suggesting there is demand for an off-the-shelf solution.

Other dotnet messaging platforms erroneously conflate the Saga and Mediator patterns. A Saga is a long-running transaction that spans multiple services. A Mediator is an orchestrator that manages a workflow that involves multiple objects. One aspect of those implementations is typically the ability to store workflow state.

There is a pattern catalogue associated with workflows. [Workflow Patterns](http://www.workflowpatterns.com/patterns/control/index.php) describes both basic and advanced patterns for workflows. We intend to use these patters as guidance for our offering, over traditional .NET workflow offerings in competing products such as Mass Transit and NServicBus, which have tended to be ersatz in design.

A particular reference for the requirements for this work is [AWS step functions](https://states-language.net/spec.html). AWS Step functions provide a state machine that mediates calls to AWS Lambda functions. When thinking about Brighter's `IHandleRequests` it is attractive to compare them to Lambda functions in the Step functions model :

1. The AWS Step funcions state machine does not hold the business logic, that is located in the functions called; the Step function handles calling the Lambda functions and state transitions (as well as error paths)
2. We want to use the Mediator to orchestrate both internal bus and external bus hosted workflows. Step functions provide a useful model of requirements for the latter.

This approach is intended to enable flexible, event-driven workflows that can handle various business processes and requirements, including asynchronous event handling and conditional branching.

Our experience has been that many teams adopt Step Functions to gain access to it as a workflow engine. But this forces them into Lambda Pinball architectures. We believe that Brighter could offer a compelling alternative.

## Decision

We will add a `Mediator` class to Brighter that will:

1. Manages and tracks a WorkflowState object representing the current step in the workflow.
2. Support multiple steps: sequence, choice, parallel, wait.
3. Supports multiple tasks, mapped to typical ws-messaging patterns including:
• FireAndForget: Dispatches a `Command` and immediately advances to the next state.
• RequestReaction: Dispatches a `Command` and waits for an event response before advancing.
• RobustRequestReaction: Reaction event can kick off an error flow.
4. Uses a CommandProcessor for routing commands and events to appropriate handlers.
5. Work is handled within Brighter handlers. They use glue code to call back to the workflow where necessary
6. Can be passed events, and uses the correlation IDs to match events to specific workflow instances and advance the workflow accordingly.

The Specification Pattern in a Choice steo will allow flexible conditional logic by combining specifications with And and Or conditions, enabling complex branching decisions within the workflow.

We assume that the initial V10 of Brighter will contain a minimum viable product version of the `Mediator`. Additional functionality, workflows, etc. will be a feature of later releases. Broady our goal within V10 would be to ensure that from [Workflow Patterns](http://www.workflowpatterns.com/patterns/control/index.php) we can deliver the Basic Control Flow patterns. A stretch goal would be to offer some Iteration and Cnacellation patterns.

## Consequences

Positive Consequences

1. Simplicity: Providing orchestration for a workflow, which is easier to understand
2. Modularity: It is possible to extend the `Mediator' relativey easy by adding new process states.

Negative Consequences

1. Increased Brighter scope: Previously we had assumed that developers would use an off-the-shelf workflow solution like [Stateless](https://github.com/nblumhardt/stateless) or [Workflow Core]. The decision to provide our own workflow, to orchestrate via CommandProcessor means that we increase our scope to include the complexity of workflow management.
25 changes: 25 additions & 0 deletions docs/adr/0023-add-the_specification-pattern.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# 22. Add the Specification Pattern

Date: 2024-11-09

## Status

Proposed

## Context

The Specification Pattern is a software design pattern that is used to define business rules that can be combined to create complex rules. It is used to encapsulate business rules that can be used to determine if an object meets a certain criteria. The pattern was described by Eric Evans and Martin Fowler in [this article](https://martinfowler.com/apsupp/spec.pdf).

Brighter needs the addition of the specification pattern, for two reasons:

1. For use with its Mediator. The Mediator allows Brighter to execute a workflow that has a branching condition. The Specification Pattern can be used to define the branching conditions. See [ADR-0022](0022-use-the-mediator-pattern.md).
2. For use when implementing the [Agreement Dispatcher](https://martinfowler.com/eaaDev/AgreementDispatcher.html) pattern from Martin Fowler. The Agreement Dispatcher pattern is used to dispatch a message to a handler based on a set of criteria. The Specification Pattern can be used to define the criteria.

## Decision
Add the Specification Pattern to Brighter. We could have taken a dependency on an off-the-shelf implementation. Many of the Brighter team worked at Huddle Engineering, and worked on [this](https://github.com/HuddleEng/Specification) implementation of the Specification Pattern. However, this forces Brighter to take a dependency on another project, and we would like to keep Brighter as self-contained as possible. So, whilst we may be inspired by Huddle's implementation, we will write our own.

In this version, we don't need some of the complexity of Huddle's usage of the Visitor pattern, as we only need to control branching. In addition, Huddle's version was written before the wide usage of lambda expressions via delegates in C#, so we can simplify the implementation.

## Consequences

Brighter will provide an implementation of the Specification pattern.
57 changes: 57 additions & 0 deletions docs/adr/0024-add-parallel-split-to-mediator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# ADR: Implementing Parallel Split Step for Concurrent Workflow Execution

## Context

Our workflow currently supports sequential steps executed in a single thread of control. Each step in the workflow proceeds one after another, and the Mediator has been designed with this single-threaded assumption.

To support more advanced control flow, we want to introduce a Parallel Split Step based on the Workflow Patterns Basic Control Flow Patterns. The Parallel Split Step is defined as “the divergence of a branch into two or more parallel branches, each of which execute concurrently.” This will enable the workflow to branch into parallel paths, executing multiple threads of control simultaneously. Each branch will operate independently of the others, continuing the workflow until either completion or a synchronization step (such as a Simple Merge) later in the process.

We would expect a some point to implement the Simple Merge step to allow parallel branches to converge back into a single thread of control. However, this ADR will focus on the Parallel Split Step implementation, with the understanding that future steps will be added to support synchronization.

### Key Requirements
1. Parallel Execution:
* Parallel Split Step must initiate two or more parallel branches within the workflow.
* Each branch should proceed as a separate thread of control, executing steps independently.
2. Concurrency Handling in the Mediator:
* The Mediator needs to manage multiple threads of execution rather than assuming a single-threaded flow.
* It must be able to initiate and track multiple branches for each Parallel Split Step within the workflow.
3. State Persistence for Parallel Branches:
* Workflow state management and persistence will need to be adapted to track the branches of the flow.
* In the case of a crash, each branch should be able to resume from its last saved state.
4. Integration with Future Synchronization Steps:
* The Parallel Split Step should integrate seamlessly with a future Simple Merge step, which will allow parallel branches to converge back into a single thread.

## Decision
1. Parallel Split Step Implementation:
* Introduce a new class, ParallelSplitStep<TData>, derived from Step<TData>.
* Ths class will define multiple branches by specifying two or more independent workflow sequences to be executed in parallel.
2. Producer and Consumer Model for Parallel Execution
* The Mediator will now consist of two classes: a producer (Scheduler) and a consumer (Runner).
* Scheduling a workflow via the Scheduler causes it to send a job to a shared channel or blocking collection.
* The Runner class will act as a consumer, reading workflow jobs from the channel and executing them.
* The Runner is single-threaded, and runs a message pump to process jobs sequentially.
* The job queue is bounded to prevent excessive memory usage and ensure fair scheduling.
* The user can configure the job scheduler for backpressure (producer stalls) or load shedding (dropping jobs).
* The user configures the number of Runners; we don't just pull them from the thread pool. This allows users to control how many threads are used to process jobs. For example, a user could configure a single Runner for a single-threaded workflow, or multiple Runners for parallel execution.
3. In the In-Memory version the job channels will be implemented using a BlockingCollection<T> with a bounded capacity.
* We won't separately store workflow data in a database; the job channel is the storage for work to be done, or in flight
* When we branch, we schedule onto the same channel; this means a Runner has a dependency on the Mediator
4. For resilience, we will need to use a persistent queue for the workflow channels.
* We assume that workflow will become unlocked when their owning Runner crashes, allowing another runner to pick them up
* We will use one derived from a database, not a message queue.
* This will be covered in a later ADR, and likely create some changes

## Consequences

### Positive Consequences
* Concurrency and Flexibility: The addition of Parallel Split allows workflows to handle concurrent tasks and enables more complex control flows.
* Scalability: Running parallel branches improves throughput, as tasks that are independent of each other can execute simultaneously.
* Adaptability for Future Steps: Implementing parallel branching prepares the workflow for synchronization steps (e.g., Simple Merge), allowing flexible convergence of parallel tasks.
* Resilience:

### Negative Consequences
* Increased Complexity in State Management: Tracking multiple branches requires more complex state management to ensure each branch persists and resumes accurately.
* Concurrency Overhead in the Mediator: Managing multiple threads of control adds overhead. We now have both a Runner and a Scheduler.

### Related ADRs
* Future ADR for implementing Simple Merge Step for synchronization of parallel branches.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ namespace Greetings.Ports.Events
{
public class GreetingAsyncEvent : Event
{
public GreetingAsyncEvent() : base(Guid.NewGuid()) { }
public GreetingAsyncEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingAsyncEvent(string greeting) : base(Guid.NewGuid())
public GreetingAsyncEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}

public string Greeting { get; set; }
public string? Greeting { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ namespace Greetings.Ports.Events
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}

public string Greeting { get; set; }
public string? Greeting { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace Greetings.Ports.Commands
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace Greetings.Ports.Commands
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace Greetings.Ports.Commands
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace Events.Ports.Commands
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,8 @@
namespace Greetings.Ports.Commands
{
[MessagePackObject(keyAsPropertyName: true)]
public class FarewellEvent : Event
public class FarewellEvent(string farewell) : Event(Guid.NewGuid().ToString())
{
public FarewellEvent() : base(Guid.NewGuid())
{
}

public FarewellEvent(string farewell) : base(Guid.NewGuid())
{
Farewell = farewell;
}

public string Farewell { get; set; }
public string Farewell { get; set; } = farewell;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ namespace Greetings.Ports.Commands
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}

public string Greeting { get; set; }
public string? Greeting { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace Greetings.Ports.Events
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ namespace Greetings.Ports.Commands
{
public class GreetingEvent : Event
{
public GreetingEvent() : base(Guid.NewGuid()) { }
public GreetingEvent() : base(Guid.NewGuid().ToString()) { }

public GreetingEvent(string greeting) : base(Guid.NewGuid())
public GreetingEvent(string greeting) : base(Guid.NewGuid().ToString())
{
Greeting = greeting;
}
Expand Down
Loading
Loading