diff --git a/src/Store/Criteria/StreamCriterion.php b/src/Store/Criteria/StreamCriterion.php index 070c1c83..bbe5eecd 100644 --- a/src/Store/Criteria/StreamCriterion.php +++ b/src/Store/Criteria/StreamCriterion.php @@ -4,12 +4,19 @@ namespace Patchlevel\EventSourcing\Store\Criteria; +use Patchlevel\EventSourcing\Store\InvalidStreamName; + +use function preg_match; + /** @experimental */ final class StreamCriterion { public function __construct( public readonly string $streamName, ) { + if (!preg_match('/^[^*]*\*?$/', $this->streamName)) { + throw new InvalidStreamName($this->streamName); + } } public static function startWith(string $streamName): self diff --git a/src/Store/InMemoryStore.php b/src/Store/InMemoryStore.php new file mode 100644 index 00000000..1c1f4249 --- /dev/null +++ b/src/Store/InMemoryStore.php @@ -0,0 +1,230 @@ + $messages */ + public function __construct( + private array $messages = [], + ) { + } + + public function load( + Criteria|null $criteria = null, + int|null $limit = null, + int|null $offset = null, + bool $backwards = false, + ): ArrayStream { + $messages = $this->filter($criteria); + + if ($backwards) { + $messages = array_reverse($messages); + } + + if ($offset !== null) { + $messages = array_slice($messages, $offset); + } + + if ($limit !== null) { + $messages = array_slice($messages, 0, $limit); + } + + return new ArrayStream($messages); + } + + public function count(Criteria|null $criteria = null): int + { + return count($this->filter($criteria)); + } + + public function save(Message ...$messages): void + { + array_push($this->messages, ...$messages); + } + + /** + * @param Closure():ClosureReturn $function + * + * @template ClosureReturn + */ + public function transactional(Closure $function): void + { + $function(); + } + + /** @return list */ + public function streams(): array + { + return array_values( + array_unique( + array_filter( + array_map( + static function (Message $message): string|null { + try { + return $message->header(AggregateHeader::class)->streamName(); + } catch (HeaderNotFound) { + try { + return $message->header(StreamHeader::class)->streamName; + } catch (HeaderNotFound) { + return null; + } + } + }, + $this->messages, + ), + static fn (string|null $streamName): bool => $streamName !== null, + ), + ), + ); + } + + public function remove(string $streamName): void + { + $this->messages = array_values( + array_filter( + $this->messages, + static function (Message $message) use ($streamName): bool { + try { + return $message->header(AggregateHeader::class)->streamName() !== $streamName; + } catch (HeaderNotFound) { + try { + return $message->header(StreamHeader::class)->streamName !== $streamName; + } catch (HeaderNotFound) { + return true; + } + } + }, + ), + ); + } + + /** @return array */ + private function filter(Criteria|null $criteria): array + { + if (!$criteria) { + return $this->messages; + } + + return array_filter( + $this->messages, + static function (Message $message, int $index) use ($criteria): bool { + foreach ($criteria->all() as $criterion) { + switch ($criterion::class) { + case AggregateIdCriterion::class: + try { + if ($message->header(AggregateHeader::class)->aggregateId !== $criterion->aggregateId) { + return false; + } + } catch (HeaderNotFound) { + return false; + } + + break; + case AggregateNameCriterion::class: + try { + if ($message->header(AggregateHeader::class)->aggregateName !== $criterion->aggregateName) { + return false; + } + } catch (HeaderNotFound) { + return false; + } + + break; + case StreamCriterion::class: + if ($criterion->streamName === '*') { + break; + } + + try { + $messageStreamName = $message->header(AggregateHeader::class)->streamName(); + } catch (HeaderNotFound) { + try { + $messageStreamName = $message->header(StreamHeader::class)->streamName; + } catch (HeaderNotFound) { + return false; + } + } + + if (str_ends_with($criterion->streamName, '*')) { + if (!str_starts_with($messageStreamName, mb_substr($criterion->streamName, 0, -1))) { + return false; + } + + break; + } + + if ($messageStreamName !== $criterion->streamName) { + return false; + } + + break; + case FromPlayheadCriterion::class: + $playhead = null; + + try { + $playhead = $message->header(AggregateHeader::class)->playhead; + } catch (HeaderNotFound) { + try { + $playhead = $message->header(StreamHeader::class)->playhead; + } catch (HeaderNotFound) { + return false; + } + } + + if ($playhead < $criterion->fromPlayhead) { + return false; + } + + break; + case ArchivedCriterion::class: + if (!$message->hasHeader(ArchivedHeader::class) === $criterion->archived) { + return false; + } + + break; + case FromIndexCriterion::class: + if ($index < $criterion->fromIndex) { + return false; + } + + break; + default: + throw new UnsupportedCriterion($criterion::class); + } + } + + return true; + }, + ARRAY_FILTER_USE_BOTH, + ); + } +} diff --git a/src/Store/StreamDoctrineDbalStore.php b/src/Store/StreamDoctrineDbalStore.php index e9fa93b4..d9c1cd16 100644 --- a/src/Store/StreamDoctrineDbalStore.php +++ b/src/Store/StreamDoctrineDbalStore.php @@ -44,7 +44,6 @@ use function is_string; use function mb_substr; use function sprintf; -use function str_contains; use function str_ends_with; /** @experimental */ @@ -149,14 +148,8 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void } if (str_ends_with($criterion->streamName, '*')) { - $streamName = mb_substr($criterion->streamName, 0, -1); - - if (str_contains($streamName, '*')) { - throw new InvalidStreamName($criterion->streamName); - } - $builder->andWhere('stream LIKE :stream'); - $builder->setParameter('stream', $streamName . '%'); + $builder->setParameter('stream', mb_substr($criterion->streamName, 0, -1) . '%'); break; } diff --git a/tests/Unit/Store/InMemoryStoreTest.php b/tests/Unit/Store/InMemoryStoreTest.php new file mode 100644 index 00000000..626918ba --- /dev/null +++ b/tests/Unit/Store/InMemoryStoreTest.php @@ -0,0 +1,350 @@ +load(); + + self::assertCount(0, $stream); + } + + public function testLoadMessages(): void + { + $expected = [ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]; + + $store = new InMemoryStore($expected); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame($expected, $messages); + } + + public function testLoadByAggregateId(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('profile', '2', 1, new DateTimeImmutable())); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new AggregateIdCriterion('2'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadByAggregateName(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('foo', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('bar', '2', 1, new DateTimeImmutable())); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new AggregateNameCriterion('bar'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadByStreamName(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar')); + $message3 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new StreamCriterion('bar'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadByStreamNameWithLike(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo-3')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar-1')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar-2')); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new StreamCriterion('bar-*'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2, $message3], $messages); + } + + public function testLoadFromPlayhead(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('foo', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('foo', '1', 2, new DateTimeImmutable())); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('foo-1', 3, new DateTimeImmutable())); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + $stream = $store->load(new Criteria(new FromPlayheadCriterion(2))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2, $message3], $messages); + } + + public function testLoadFromIndex(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new AggregateHeader('foo', '1', 1, new DateTimeImmutable())); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new AggregateHeader('foo', '1', 2, new DateTimeImmutable())); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('foo-1', 3, new DateTimeImmutable())); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + $stream = $store->load(new Criteria(new FromIndexCriterion(2))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message3, $message4], $messages); + } + + public function testLoadByStreamNameWithLikeAll(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo-3')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar-1')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar-2')); + + $store = new InMemoryStore([$message1, $message2, $message3]); + + $stream = $store->load(new Criteria(new StreamCriterion('*'))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1, $message2, $message3], $messages); + } + + public function testLoadArchived(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new ArchivedHeader()); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(new Criteria(new ArchivedCriterion(true))); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1], $messages); + } + + public function testLoadUnsupportedCriterion(): void + { + $store = new InMemoryStore([ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]); + + $this->expectException(UnsupportedCriterion::class); + + $store->load(new Criteria(new stdClass())); + } + + public function testLoadLimit(): void + { + $message1 = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('2'))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(null, 1); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1], $messages); + } + + public function testLoadOffset(): void + { + $message1 = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('2'))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(null, null, 1); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2], $messages); + } + + public function testLoadBackwards(): void + { + $message1 = new Message(new ProfileVisited(ProfileId::fromString('1'))); + $message2 = new Message(new ProfileVisited(ProfileId::fromString('2'))); + + $store = new InMemoryStore([$message1, $message2]); + + $stream = $store->load(null, null, null, true); + + $messages = iterator_to_array($stream); + + self::assertSame([$message2, $message1], $messages); + } + + public function testCount(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new ArchivedHeader()); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))); + + $store = new InMemoryStore([$message1, $message2]); + + self::assertSame(1, $store->count(new Criteria(new ArchivedCriterion(true)))); + } + + public function testSaveEmpty(): void + { + $expected = [ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]; + + $store = new InMemoryStore([]); + + $store->save(...$expected); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame($expected, $messages); + } + + public function testSaveWithExistingMessages(): void + { + $startMessages = [ + new Message(new ProfileVisited(ProfileId::fromString('1'))), + new Message(new ProfileVisited(ProfileId::fromString('2'))), + ]; + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('3'))); + + $store = new InMemoryStore($startMessages); + + $store->save($message1); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame([...$startMessages, $message1], $messages); + } + + public function testStreams(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar')); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + self::assertSame(['foo', 'bar'], $store->streams()); + } + + public function testRemove(): void + { + $message1 = (new Message(new ProfileVisited(ProfileId::fromString('1')))) + ->withHeader(new StreamHeader('foo')); + $message2 = (new Message(new ProfileVisited(ProfileId::fromString('2')))) + ->withHeader(new StreamHeader('bar')); + $message3 = (new Message(new ProfileVisited(ProfileId::fromString('3')))) + ->withHeader(new StreamHeader('bar')); + $message4 = (new Message(new ProfileVisited(ProfileId::fromString('3')))); + + $store = new InMemoryStore([$message1, $message2, $message3, $message4]); + + $store->remove('bar'); + + $stream = $store->load(); + + $messages = iterator_to_array($stream); + + self::assertSame([$message1, $message4], $messages); + } + + public function testTransactional(): void + { + $called = false; + + $store = new InMemoryStore(); + $store->transactional( + static function () use (&$called): void { + $called = true; + }, + ); + + self::assertTrue($called); + } +}