diff --git a/src/Projection/Projectionist/RunProjectionistEventBusWrapper.php b/src/Projection/Projectionist/SyncProjectionistEventBusWrapper.php similarity index 66% rename from src/Projection/Projectionist/RunProjectionistEventBusWrapper.php rename to src/Projection/Projectionist/SyncProjectionistEventBusWrapper.php index f718b971..1f89db4f 100644 --- a/src/Projection/Projectionist/RunProjectionistEventBusWrapper.php +++ b/src/Projection/Projectionist/SyncProjectionistEventBusWrapper.php @@ -7,8 +7,9 @@ use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\EventBus\Message; use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\FlockStore; -final class RunProjectionistEventBusWrapper implements EventBus +final class SyncProjectionistEventBusWrapper implements EventBus { public function __construct( private readonly EventBus $parentEventBus, @@ -33,4 +34,15 @@ public function dispatch(Message ...$messages): void $lock->release(); } } + + public static function createWithDefaultLockStrategy(EventBus $parentEventBus, Projectionist $projectionist): self + { + return new self( + $parentEventBus, + $projectionist, + new LockFactory( + new FlockStore(), + ), + ); + } } diff --git a/src/Projection/Projector/SyncProjectorListener.php b/src/Projection/Projector/SyncProjectorListener.php deleted file mode 100644 index db8f056c..00000000 --- a/src/Projection/Projector/SyncProjectorListener.php +++ /dev/null @@ -1,23 +0,0 @@ -projectorResolver)) - ->handleMessage($message, ...$this->projectorRepository->projectors()); - } -} diff --git a/tests/Benchmark/WriteEventsBench.php b/tests/Benchmark/WriteEventsBench.php index aec9e543..33fde26d 100644 --- a/tests/Benchmark/WriteEventsBench.php +++ b/tests/Benchmark/WriteEventsBench.php @@ -9,8 +9,10 @@ use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory; +use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; +use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; -use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -22,6 +24,8 @@ use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\ProfileId; use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Projection\ProfileProjector; use PhpBench\Attributes as Bench; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore; use function file_exists; use function unlink; @@ -47,20 +51,33 @@ public function setUp(): void 'path' => self::DB_PATH, ]); + $this->store = new DoctrineDbalStore( + $connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), + (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/BasicImplementation/Aggregate']), + 'eventstore', + ); + $profileProjection = new ProfileProjector($connection); $projectionRepository = new InMemoryProjectorRepository( [$profileProjection], ); - $this->bus = new DefaultEventBus(); - $this->bus->addListener(new SyncProjectorListener($projectionRepository)); - $this->bus->addListener(new SendEmailProcessor()); + $projectionist = new DefaultProjectionist( + $this->store, + new InMemoryStore(), + $projectionRepository, + ); - $this->store = new DoctrineDbalStore( - $connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), - (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/BasicImplementation/Aggregate']), - 'eventstore', + $innerEventStream = new DefaultEventBus(); + $innerEventStream->addListener(new SendEmailProcessor()); + + $this->bus = new SyncProjectionistEventBusWrapper( + $innerEventStream, + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ); $this->repository = new DefaultRepository($this->store, $this->bus, Profile::metadata()); @@ -71,7 +88,7 @@ public function setUp(): void ); $schemaDirector->create(); - $profileProjection->create(); + $projectionist->boot(); $this->profile = Profile::create(ProfileId::fromString('1'), 'Peter'); $this->repository->save($this->profile); diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index dd4c2366..734230f4 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -11,8 +11,10 @@ use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory; use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; +use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; +use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; -use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -24,6 +26,8 @@ use Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream\Projection\BankAccountProjection; use Patchlevel\EventSourcing\Tests\Integration\DbalManager; use PHPUnit\Framework\TestCase; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore; use function count; @@ -42,14 +46,8 @@ public function tearDown(): void $this->connection->close(); } - public function testSingleTableSuccessful(): void + public function testSuccessful(): void { - $bankAccountProjection = new BankAccountProjection($this->connection); - $projectionRepository = new InMemoryProjectorRepository([$bankAccountProjection]); - - $eventStream = new DefaultEventBus(); - $eventStream->addListener(new SyncProjectorListener($projectionRepository)); - $store = new DoctrineDbalStore( $this->connection, DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), @@ -57,6 +55,23 @@ public function testSingleTableSuccessful(): void 'eventstore', ); + $bankAccountProjection = new BankAccountProjection($this->connection); + $projectionRepository = new InMemoryProjectorRepository([$bankAccountProjection]); + + $projectionist = new DefaultProjectionist( + $store, + new InMemoryStore(), + $projectionRepository, + ); + + $eventStream = new SyncProjectionistEventBusWrapper( + new DefaultEventBus(), + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), + ); + $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), $store, @@ -74,7 +89,7 @@ public function testSingleTableSuccessful(): void ); $schemaDirector->create(); - $bankAccountProjection->create(); + $projectionist->boot(); $bankAccount = BankAccount::create(AccountId::fromString('1'), 'John'); $bankAccount->addBalance(100); diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index 45ff247a..61c8f8e8 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -9,8 +9,10 @@ use Patchlevel\EventSourcing\EventBus\SymfonyEventBus; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory; +use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; +use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; -use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -23,6 +25,8 @@ use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjection; use Patchlevel\EventSourcing\Tests\Integration\DbalManager; use PHPUnit\Framework\TestCase; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore; /** @coversNothing */ final class BasicIntegrationTest extends TestCase @@ -42,20 +46,33 @@ public function tearDown(): void public function testSuccessful(): void { + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), + 'eventstore', + ); + $profileProjection = new ProfileProjection($this->connection); $projectorRepository = new InMemoryProjectorRepository( [$profileProjection], ); - $eventStream = new DefaultEventBus(); - $eventStream->addListener(new SyncProjectorListener($projectorRepository)); - $eventStream->addListener(new SendEmailProcessor()); + $projectionist = new DefaultProjectionist( + $store, + new InMemoryStore(), + $projectorRepository, + ); - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), - 'eventstore', + $innerEventStream = new DefaultEventBus(); + $innerEventStream->addListener(new SendEmailProcessor()); + + $eventStream = new SyncProjectionistEventBusWrapper( + $innerEventStream, + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ); $manager = new DefaultRepositoryManager( @@ -73,7 +90,7 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $profileProjection->create(); + $projectionist->boot(); $profile = Profile::create(ProfileId::fromString('1'), 'John'); $repository->save($profile); @@ -102,21 +119,34 @@ public function testSuccessful(): void public function testWithSymfonySuccessful(): void { + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), + 'eventstore', + ); + $profileProjection = new ProfileProjection($this->connection); $projectorRepository = new InMemoryProjectorRepository( [$profileProjection], ); - $eventStream = SymfonyEventBus::create([ - new SyncProjectorListener($projectorRepository), + $projectionist = new DefaultProjectionist( + $store, + new InMemoryStore(), + $projectorRepository, + ); + + $innerEventStream = SymfonyEventBus::create([ new SendEmailProcessor(), ]); - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), - 'eventstore', + $eventStream = new SyncProjectionistEventBusWrapper( + $innerEventStream, + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ); $manager = new DefaultRepositoryManager( @@ -124,6 +154,7 @@ public function testWithSymfonySuccessful(): void $store, $eventStream, ); + $repository = $manager->get(Profile::class); $schemaDirector = new DoctrineSchemaDirector( @@ -132,7 +163,7 @@ public function testWithSymfonySuccessful(): void ); $schemaDirector->create(); - $profileProjection->create(); + $projectionist->boot(); $profile = Profile::create(ProfileId::fromString('1'), 'John'); $repository->save($profile); @@ -164,20 +195,33 @@ public function testWithSymfonySuccessful(): void public function testSnapshot(): void { + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), + 'eventstore', + ); + $profileProjection = new ProfileProjection($this->connection); $projectorRepository = new InMemoryProjectorRepository( [$profileProjection], ); - $eventStream = new DefaultEventBus(); - $eventStream->addListener(new SyncProjectorListener($projectorRepository)); - $eventStream->addListener(new SendEmailProcessor()); + $projectionist = new DefaultProjectionist( + $store, + new InMemoryStore(), + $projectorRepository, + ); - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']), - 'eventstore', + $innerEventStream = new DefaultEventBus(); + $innerEventStream->addListener(new SendEmailProcessor()); + + $eventStream = new SyncProjectionistEventBusWrapper( + $innerEventStream, + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), ); $manager = new DefaultRepositoryManager( @@ -195,7 +239,7 @@ public function testSnapshot(): void ); $schemaDirector->create(); - $profileProjection->create(); + $projectionist->boot(); $profile = Profile::create(ProfileId::fromString('1'), 'John'); $repository->save($profile); diff --git a/tests/Integration/Outbox/OutboxTest.php b/tests/Integration/Outbox/OutboxTest.php index 1b9a1c1b..44ac1d23 100644 --- a/tests/Integration/Outbox/OutboxTest.php +++ b/tests/Integration/Outbox/OutboxTest.php @@ -10,8 +10,10 @@ use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore; use Patchlevel\EventSourcing\Outbox\OutboxEventBus; use Patchlevel\EventSourcing\Outbox\StoreOutboxConsumer; +use Patchlevel\EventSourcing\Projection\Projection\Store\InMemoryStore; +use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; -use Patchlevel\EventSourcing\Projection\Projector\SyncProjectorListener; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; @@ -23,6 +25,8 @@ use Patchlevel\EventSourcing\Tests\Integration\Outbox\Processor\SendEmailProcessor; use Patchlevel\EventSourcing\Tests\Integration\Outbox\Projection\ProfileProjection; use PHPUnit\Framework\TestCase; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\InMemoryStore as LockInMemoryStore; /** @coversNothing */ final class OutboxTest extends TestCase @@ -42,11 +46,6 @@ public function tearDown(): void public function testSuccessful(): void { - $profileProjection = new ProfileProjection($this->connection); - $projectionRepository = new InMemoryProjectorRepository( - [$profileProjection], - ); - $serializer = DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']); $registry = (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']); @@ -65,11 +64,33 @@ public function testSuccessful(): void ); $realEventBus = new DefaultEventBus(); - $realEventBus->addListener(new SyncProjectorListener($projectionRepository)); $realEventBus->addListener(new SendEmailProcessor()); $outboxEventBus = new OutboxEventBus($outboxStore); - $repository = new DefaultRepository($store, $outboxEventBus, Profile::metadata()); + + $profileProjection = new ProfileProjection($this->connection); + $projectorRepository = new InMemoryProjectorRepository( + [$profileProjection], + ); + + $projectionist = new DefaultProjectionist( + $store, + new InMemoryStore(), + $projectorRepository, + ); + + $realEventBus = new DefaultEventBus(); + $realEventBus->addListener(new SendEmailProcessor()); + + $eventStream = new SyncProjectionistEventBusWrapper( + $outboxEventBus, + $projectionist, + new LockFactory( + new LockInMemoryStore(), + ), + ); + + $repository = new DefaultRepository($store, $eventStream, Profile::metadata()); $schemaDirector = new DoctrineSchemaDirector( $this->connection, @@ -80,7 +101,7 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $profileProjection->create(); + $projectionist->boot(); $profile = Profile::create(ProfileId::fromString('1'), 'John'); $repository->save($profile); diff --git a/tests/Integration/Projectionist/ProjectionistTest.php b/tests/Integration/Projectionist/ProjectionistTest.php index 77bc3fd2..578efd4f 100644 --- a/tests/Integration/Projectionist/ProjectionistTest.php +++ b/tests/Integration/Projectionist/ProjectionistTest.php @@ -11,7 +11,7 @@ use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory; use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore; use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\RunProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Projection\Projector\InMemoryProjectorRepository; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Schema\ChainSchemaConfigurator; @@ -118,7 +118,7 @@ public function testSync(): void $manager = new DefaultRepositoryManager( $aggregateRegistry, $store, - new RunProjectionistEventBusWrapper( + new SyncProjectionistEventBusWrapper( new DefaultEventBus(), $projectionist, new LockFactory($lockStore), diff --git a/tests/Unit/Projection/Projectionist/RunProjectionistEventBusWrapperTest.php b/tests/Unit/Projection/Projectionist/SyncProjectionistEventBusWrapperTest.php similarity index 80% rename from tests/Unit/Projection/Projectionist/RunProjectionistEventBusWrapperTest.php rename to tests/Unit/Projection/Projectionist/SyncProjectionistEventBusWrapperTest.php index 28c19a68..99d3224a 100644 --- a/tests/Unit/Projection/Projectionist/RunProjectionistEventBusWrapperTest.php +++ b/tests/Unit/Projection/Projectionist/SyncProjectionistEventBusWrapperTest.php @@ -7,14 +7,14 @@ use Patchlevel\EventSourcing\EventBus\EventBus; use Patchlevel\EventSourcing\EventBus\Message; use Patchlevel\EventSourcing\Projection\Projectionist\Projectionist; -use Patchlevel\EventSourcing\Projection\Projectionist\RunProjectionistEventBusWrapper; +use Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\TestCase; use Prophecy\PhpUnit\ProphecyTrait; -/** @covers \Patchlevel\EventSourcing\Projection\Projectionist\RunProjectionistEventBusWrapper */ -final class RunProjectionistEventBusWrapperTest extends TestCase +/** @covers \Patchlevel\EventSourcing\Projection\Projectionist\SyncProjectionistEventBusWrapper */ +final class SyncProjectionistEventBusWrapperTest extends TestCase { use ProphecyTrait; @@ -35,7 +35,7 @@ public function testDispatch(): void $projectionist->run()->shouldBeCalledOnce(); $projectionist->reveal(); - $eventBus = new RunProjectionistEventBusWrapper( + $eventBus = new SyncProjectionistEventBusWrapper( $parentEventBus->reveal(), $projectionist->reveal(), ); diff --git a/tests/Unit/Projection/Projector/SyncProjectorListenerTest.php b/tests/Unit/Projection/Projector/SyncProjectorListenerTest.php deleted file mode 100644 index 113f0e9a..00000000 --- a/tests/Unit/Projection/Projector/SyncProjectorListenerTest.php +++ /dev/null @@ -1,101 +0,0 @@ -message = $message; - } - }; - - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('foo@bar.com'), - ), - ); - - $projectorRepository = $this->prophesize(ProjectorRepository::class); - $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce(); - - $resolver = $this->prophesize(ProjectorResolver::class); - $resolver->resolveHandleMethod($projector, $message)->willReturn($projector->handleProfileCreated(...))->shouldBeCalledOnce(); - - $projectionListener = new SyncProjectorListener( - $projectorRepository->reveal(), - $resolver->reveal(), - ); - - $projectionListener($message); - - self::assertSame($message, $projector->message); - } - - public function testNoMethod(): void - { - $projector = new class implements Projector { - public Message|null $message = null; - - public function targetProjection(): ProjectionId - { - return new ProjectionId('dummy', 1); - } - - public function handleProfileCreated(Message $message): void - { - $this->message = $message; - } - }; - - $message = new Message( - new ProfileCreated( - ProfileId::fromString('1'), - Email::fromString('foo@bar.com'), - ), - ); - - $projectorRepository = $this->prophesize(ProjectorRepository::class); - $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce(); - - $resolver = $this->prophesize(ProjectorResolver::class); - $resolver->resolveHandleMethod($projector, $message)->willReturn(null)->shouldBeCalledOnce(); - - $projectionListener = new SyncProjectorListener( - $projectorRepository->reveal(), - $resolver->reveal(), - ); - - $projectionListener($message); - - self::assertNull($projector->message); - } -}