Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework CatchUp mechanism #4746

Open
bwaidelich opened this issue Nov 13, 2023 · 5 comments
Open

Rework CatchUp mechanism #4746

bwaidelich opened this issue Nov 13, 2023 · 5 comments
Assignees
Labels

Comments

@bwaidelich
Copy link
Member

bwaidelich commented Nov 13, 2023

The CatchUp mechanism was overhauled with #4289 but the current implementation still has some issues:

By default we use Scripts::executeCommandAsync() (using the SubprocessProjectionCatchUpTrigger) to trigger catch ups.
This can lead to many sub requests if there are a lot of commands (e.g. multiple editors creating content simultaneously).

Apart from the eminent performance issue this can bring, those sub requests might be killed if the parent request ends (or due to process limits).

Update: After going back and forth I now suggest:

  • Switching to a synchronous catch up mechanism
    • That basically means that ContentRepository::handle() will always be blocking until all projections are up to date
    • All projections will always be updated even if they don't act on an event
    • The EventPersister will simply trigger ProjectionInterface::apply() for every new published event
      • it will acquire a mutex to prevent race conditions if it was called in parallel in a separate process
    • This will simplify checkpoint management – projections can now just update their own version and control transactions

Considerations

  • for larger scale a different ProjectionCatchUpTriggerInterface could interact with a background worker (e.g. via socket) but the catch up would still be blocking
  • we could always add some ContentRepository::handleAsync() later if we find that we really need it (but my feeling is that this only makes sense when we introduce a proper write side model for constraint checks etc)

Current architecture

sequenceDiagram
    actor Client
    Client->>CR: handle(cmd)
    activate CR
    CR->>EventPersister: publishEvents()
    activate EventPersister
    EventPersister->>EventStore: commit()
    activate EventStore
    EventStore-->>EventPersister: CommitResult
    deactivate EventStore
    loop
      EventPersister->>CatchUpTrigger: triggerCatchUp
      activate CatchUpTrigger
      rect rgba(200, 200, 200, .1)
        Note right of CatchUpTrigger: separate process
        CatchUpTrigger--)SubProcessCatchupCommandController: catchupCommand()
        activate SubProcessCatchupCommandController
        deactivate CatchUpTrigger
        SubProcessCatchupCommandController->>CR Registry: get()
        CR Registry-->>SubProcessCatchupCommandController: CR
        participant CR2 as CR
        SubProcessCatchupCommandController->>CR2: catchUpProjection()
        deactivate SubProcessCatchupCommandController
      end
    end
    deactivate EventPersister
    EventPersister-->>CR: CommandResult
    CR-->>Client: CommandResult
    deactivate CR
Loading
sequenceDiagram
    actor Client
    Client->>CR: catchUpProjection()
    activate CR
    CR->>EventStore: load()
    activate EventStore
    EventStore-->>CR: EventStream
    deactivate EventStore
    CR->>CheckpointStorage: acquireLock()
    activate CheckpointStorage
    CheckpointStorage-->>CR: SequenceNumber
    loop
      CR->>Projection: apply()
      CR->>CheckpointStorage: updateAndReleaseLock()
      CR->>CheckpointStorage: acquireLock()
      CheckpointStorage-->>CR: SequenceNumber
    end
    deactivate CheckpointStorage
Loading

Suggested architecture

sequenceDiagram
    actor Client
    Client->>CR: handle(cmd)
    activate CR
    CR->>EventPersister: publishEvents()
    activate EventPersister
    EventPersister->>EventStore: commit()
    activate EventStore
    deactivate EventStore
    EventPersister->>SubscriptionEngine: run()
    activate SubscriptionEngine
    Note over SubscriptionEngine: mark new/removed/retrying subscriptions
    SubscriptionEngine->>EventStore: load()
    activate EventStore
    loop
      EventStore-->>SubscriptionEngine: EventStream
      SubscriptionEngine->>Subscriber (Projection): apply()
    end
    deactivate EventStore
    deactivate SubscriptionEngine
    deactivate EventPersister
Loading

The original ideas I wrote down in November 2023 when creating this issue:

mermaid
sequenceDiagram
actor Client
Client->>CR: handle(cmd)
activate CR
CR->>EventPersister: publishEvents()
activate EventPersister
EventPersister->>EventStore: commit()
activate EventStore
EventStore-->>EventPersister: CommitResult
deactivate EventStore
EventPersister->>EventStore: load()
activate EventStore
EventStore-->>EventPersister: EventStream
deactivate EventStore
Note over EventPersister: acquire catch up lock
loop
EventPersister->>Projection: apply()
end
Note over EventPersister: release catch up lock
deactivate EventPersister

Options

Switch to synchronous

It might be an option to switch to a synchronous catch-up mechanism but that has the great risk that it might lead to code that relies on that immediate consistent behavior – and thus breaks when scaled up to be used asynchronously.

=> Potential optimization for local dev and smaller projects, but dangerous and probably no general solution

Fork process

Instead of creating a fully fledged sub request, we could use PCNTL functions to fork the current process (along the lines of spatie/fork).
Apparently this allows the scripts to stay alive even if the parent request was killed – but it works only on the CLI so we would still need at least one coordinating sub request.

=> Potential (optional!) performance improvement, but not a solution on its own

Expose locking state of CheckpointStorage and prevent needless catch-ups

The CheckpointStorageInterface provides means to acquire an exclusive lock in order to prevent events from being applied multiple times to the same subscriber.
If we could expose the locking state, we could skip catch-ups that are already being processed.

Some considerations

How to expose lock state?

To ensure that events are only applied once, we use an exclusive write lock on the *_checkpoints tables (see DoctrineCheckpointStorage.
AFAIK it is no possible to determine the lock state though without acquiring it..

Maybe we can greatly simplify the locking mechanism by using a kind of 2-phase-commit like Laravel does (see DatabaseLock for example).

"Queue" CatchUps

It won't suffice to skip a catch-up if it's already running because the events might have been read already.
Instead the lock should consist of three states:

  • not acquired (= no catch-up in progress)
    • if this is the state, the catch-up should be invoked
  • acquired (= catch-up in progress)
    • if this is the state, the catch-up should be queued and invoked as soon as the lock is released
  • acquired & queued (= catch-up in progress, and queued for re-run)
    • if this is the state, we can safely skip the catch-up for the projection in question because it will be caught up anyways

Compare checkpoint sequence number

In some cases we could compare CommitResult::highestCommittedSequenceNumber with CheckpointStorageInterface::getHighestAppliedSequenceNumber() to skip catch-ups if the projection is already up-to-date.
But this has to be handled with care because other commits might have been skipped under the assumption, that the catch-up is still queued!

Related: #4388

@bwaidelich
Copy link
Member Author

A sequence diagram of the current flow for

ContentRepository::handle():

ESCR_catchup_01

and ContentRepository::catchUpProjection() that is triggered in a separate process:

ESCR_catchup_02

@robertlemke
Copy link
Member

Thanks for the write-up! I didn't think your suggestion through yet, but wanted to leave one thought already. Using the fork-approach might be tricky, as I assume that most hosting providers will now have the pctnl extension enabled. So, if that's a hard requirements, people will have trouble using Neos in at a vanilla hosting provider.

@kitsunet
Copy link
Member

Thanks for the write-up! I didn't think your suggestion through yet, but wanted to leave one thought already. Using the fork-approach might be tricky, as I assume that most hosting providers will now have the pctnl extension enabled. So, if that's a hard requirements, people will have trouble using Neos in at a vanilla hosting provider.

Funnily we do have it :D But yes, this is a separate issue and in the end doesn't really fix much for now, so we can probably think about it later.

@bwaidelich
Copy link
Member Author

From todays Weekly:

  • Schnapsidee from @skurfuerst : Invalidate projections until they are explicitly blocked (in a single request)
  • we change the default from async => sync (NO SUBPROCESSES anymore in this) 🟢 general agreement, if combined with "Schnapsidee" :-D
    * for big scale → BG worker. 🟢 general agreement
    * ReactPHP / Promise / ... ? - 🟡 unsure, @bwaidelich will experiment.
    * Async case: test via behat. (every night) 🟢 general agreement
    * specify on which projections we block. 🟢 general agreement

@bwaidelich
Copy link
Member Author

bwaidelich commented Oct 19, 2024

Note: I updated the suggested architecture in the issue description:
The idea is to introduce the notion of subscriptions inspired by https://event-sourcing.patchlevel.io/latest/subscription/ with the following concepts:

Subscriber

The actual event handler – in our case basically a closure around ProjectionInterface::apply()

Subscription

The persisted state of a subscriber with

  • a unique id
  • a status (see below)
  • the position in the event stream (i.e. SequenceNumber)
  • some more internal details for debugging and error resolving

A subscription is basically a state machine with the states

  • new => newly discovered subscriber that wasn't booted yet
  • booting => initial catchup
  • active => default state, new events will be applied
  • paused => explicitly paused subscription, new events won't be applied (we won't need this)
  • finished => finished one-time-subscription, new event won't be applied (we won't need this)
  • detached => subscription without a corresponding subscriber (i.e. code was removed)
  • error => an error had occurred while applying an event to this subscription

The state transitions are:
image

But we can probably simplify it to something like:
image

Subscription Engine

The central authority to manage subscription states and invoke subscribers.
It allows to bootsubscriptions, but the main usage is to run it whenever events where published (and when replaying projections).
The process of the run-call is:

  1. discoverNewSubscriptions - create a new subscription record with state new for every subscriber that has no such record yet
  2. markDetachedSubscriptions - the counter-part: mark subscription records detached if no corresponding subscriber exists
  3. retrySubscriptions - reset subscriptions with state error to the state they had before the error if some "retry-strategy" allows (we can skip this part for now)
  4. load all active subscriptions and lock their records (...FOR UPDATE)
  5. find the active subscription with the lowest position
  6. load events starting from that position
  7. iterate all active subscriptions (skip if they are already further than the current position)
  8. apply the event (with try/catch and put the subscription in error state if that failed)

For reference:

The original implementation: https://github.com/patchlevel/event-sourcing/blob/3.4.x/src/Subscription/Engine/DefaultSubscriptionEngine.php

and a WIP version of that from me: https://github.com/bwaidelich/dcb-library/blob/main/src/Subscription/Engine/SubscriptionEngine.php (IMO a bit easier to read but the DB lock is not nice yet)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: In Progress 🚧
Development

No branches or pull requests

4 participants