Skip to content

Commit

Permalink
Merge pull request #536 from patchlevel/improve-deployment
Browse files Browse the repository at this point in the history
worker should not discover new subscriptions in running process
  • Loading branch information
DavidBadura authored Mar 13, 2024
2 parents 1e013ed + 025abe4 commit 1636023
Show file tree
Hide file tree
Showing 6 changed files with 472 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$setup = InputHelper::bool($input->getOption('setup'));

$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria);

if ($setup) {
$this->engine->setup($criteria);
Expand Down
15 changes: 15 additions & 0 deletions src/Console/Command/SubscriptionCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;

use function array_map;

/** @interal */
abstract class SubscriptionCommand extends Command
{
Expand Down Expand Up @@ -44,4 +46,17 @@ protected function subscriptionEngineCriteria(InputInterface $input): Subscripti
InputHelper::nullableStringList($input->getOption('group')),
);
}

protected function resolveCriteriaIntoCriteriaWithOnlyIds(
SubscriptionEngineCriteria $criteria,
): SubscriptionEngineCriteria {
$subscriptions = $this->engine->subscriptions($criteria);

return new SubscriptionEngineCriteria(
array_map(
static fn ($subscription) => $subscription->id(),
$subscriptions,
),
);
}
}
1 change: 1 addition & 0 deletions src/Console/Command/SubscriptionRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$rebuild = InputHelper::bool($input->getOption('rebuild'));

$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria);

$logger = new ConsoleLogger($output);

Expand Down
3 changes: 2 additions & 1 deletion src/Subscription/Store/DoctrineSubscriptionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public function find(SubscriptionCriteria|null $criteria = null): array
{
$qb = $this->connection->createQueryBuilder()
->select('*')
->from($this->tableName);
->from($this->tableName)
->orderBy('id');

if (!$this->connection->getDatabasePlatform() instanceof SQLitePlatform) {
$qb->forUpdate();
Expand Down
66 changes: 66 additions & 0 deletions tests/Integration/Subscription/Subscriber/ProfileNewProjection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Table;
use Patchlevel\EventSourcing\Attribute\Projector;
use Patchlevel\EventSourcing\Attribute\Setup;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Attribute\Teardown;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil;
use Patchlevel\EventSourcing\Tests\Integration\Subscription\Events\ProfileCreated;

use function assert;

#[Projector('profile_2')]
final class ProfileNewProjection
{
use SubscriberUtil;

public function __construct(
private Connection $connection,
) {
}

#[Setup]
public function create(): void
{
$table = new Table($this->tableName());
$table->addColumn('id', 'string')->setLength(36);
$table->addColumn('firstname', 'string')->setLength(255);
$table->setPrimaryKey(['id']);

$this->connection->createSchemaManager()->createTable($table);
}

#[Teardown]
public function drop(): void
{
$this->connection->createSchemaManager()->dropTable($this->tableName());
}

#[Subscribe(ProfileCreated::class)]
public function handleProfileCreated(Message $message): void
{
$profileCreated = $message->event();

assert($profileCreated instanceof ProfileCreated);

$this->connection->executeStatement(
'INSERT INTO ' . $this->tableName() . ' (id, firstname) VALUES(:id, :firstname);',
[
'id' => $profileCreated->profileId->toString(),
'firstname' => $profileCreated->name,
],
);
}

private function tableName(): string
{
return 'projection_' . $this->subscriberId();
}
}
Loading

0 comments on commit 1636023

Please sign in to comment.