Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove watch server & use event store for watching #508

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve
* Versioned and managed lifecycle of [projections](https://patchlevel.github.io/event-sourcing-docs/latest/projection/)
* Smooth [upcasting](https://patchlevel.github.io/event-sourcing-docs/latest/upcasting/) of old events
* Simple setup with [scheme management](https://patchlevel.github.io/event-sourcing-docs/latest/store/) and [doctrine migration](https://patchlevel.github.io/event-sourcing-docs/latest/migration/)
* Dev [tools](https://patchlevel.github.io/event-sourcing-docs/latest/watch_server/) like realtime event watcher
* Built in [cli commands](https://patchlevel.github.io/event-sourcing-docs/latest/cli/) with [symfony](https://symfony.com/)
* and much more...

Expand Down
2 changes: 0 additions & 2 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,4 @@ nav:
- Split Stream: split_stream.md
- Time / Clock: clock.md
- Testing: testing.md
- Other / Tools:
- CLI: cli.md
- Watch Server: watch_server.md
8 changes: 8 additions & 0 deletions docs/pages/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ Interacting with the outbox store is also possible via the cli.

You can find out more about outbox [here](outbox.md).

## Inspector commands

The inspector is a tool to inspect the event streams.

* ShowCommand: `event-sourcing:show`
* ShowAggregateCommand: `event-sourcing:show-aggregate`
* WatchCommand: `event-sourcing:watch`

## CLI example

A cli php file can look like this:
Expand Down
1 change: 0 additions & 1 deletion docs/pages/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ A lightweight but also all-inclusive event sourcing library with a focus on deve
* Versioned and managed lifecycle of [projections](projection.md)
* Smooth [upcasting](upcasting.md) of old events
* Simple setup with [scheme management](store.md) and [doctrine migration](migration.md)
* Dev [tools](watch_server.md) like realtime event watcher
* Built in [cli commands](cli.md) with [symfony](https://symfony.com/)
* and much more...

Expand Down
75 changes: 0 additions & 75 deletions docs/pages/watch_server.md

This file was deleted.

5 changes: 0 additions & 5 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,3 @@ parameters:
message: "#^Ternary operator condition is always true\\.$#"
count: 1
path: src/Store/DoctrineDbalStoreStream.php

-
message: "#^While loop condition is always true\\.$#"
count: 1
path: src/WatchServer/SocketWatchServer.php
79 changes: 68 additions & 11 deletions src/Console/Command/WatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,102 @@

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\OutputStyle;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\WatchServer\WatchServer;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

use function sprintf;

#[AsCommand(
'event-sourcing:watch',
'live stream of all aggregate events',
)]
final class WatchCommand extends Command
{
public function __construct(

Check failure on line 25 in src/Console/Command/WatchCommand.php

View workflow job for this annotation

GitHub Actions / Backward Compatibility Check (locked, 8.2, ubuntu-latest)

The parameter $server of Patchlevel\EventSourcing\Console\Command\WatchCommand#__construct() changed from Patchlevel\EventSourcing\WatchServer\WatchServer to a non-contravariant Patchlevel\EventSourcing\Store\Store
private readonly WatchServer $server,
private readonly Store $store,
private readonly EventSerializer $serializer,
) {
parent::__construct();
}

protected function configure(): void
{
$this
->addOption(
'sleep',
null,
InputOption::VALUE_REQUIRED,
'How much time should elapse before the next job is executed in milliseconds',
1000,
)
->addOption(
'aggregate',
null,
InputOption::VALUE_REQUIRED,
'filter aggregate name',
)
->addOption(
'aggregate-id',
null,
InputOption::VALUE_REQUIRED,
'filter aggregate id',
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$console = new OutputStyle($input, $output);

$this->server->start();
$sleep = InputHelper::positiveIntOrZero($input->getOption('sleep'));
$aggregate = InputHelper::nullableString($input->getOption('aggregate'));
$aggregateId = InputHelper::nullableString($input->getOption('aggregate-id'));

$console->success(sprintf('Server listening on %s', $this->server->host()));
$console->comment('Quit the server with CONTROL-C.');
$index = $this->currentIndex();

$this->server->listen(
function (Message $message) use ($console): void {
$console->message($this->serializer, $message);
$worker = DefaultWorker::create(
function () use ($console, &$index, $aggregate, $aggregateId): void {
$stream = $this->store->load(
new Criteria(
$aggregate,
$aggregateId,
$index,
),
);

foreach ($stream as $message) {
$console->message($this->serializer, $message);
}

$index = $stream->index();

$stream->close();
},
[],
);

$worker->run($sleep);

return 0;
}

private function currentIndex(): int
{
$stream = $this->store->load(
limit: 1,
backwards: true,
);

$index = $stream->index() ?? 0;

$stream->close();

return $index;
}
}
11 changes: 0 additions & 11 deletions src/WatchServer/SendingFailed.php

This file was deleted.

130 changes: 0 additions & 130 deletions src/WatchServer/SocketWatchServer.php

This file was deleted.

Loading
Loading