Skip to content

Commit

Permalink
add in memory store
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Jul 30, 2024
1 parent fe23cbc commit da6cd24
Show file tree
Hide file tree
Showing 4 changed files with 588 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/Store/Criteria/StreamCriterion.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@

namespace Patchlevel\EventSourcing\Store\Criteria;

use Patchlevel\EventSourcing\Store\InvalidStreamName;

use function preg_match;

/** @experimental */
final class StreamCriterion
{
public function __construct(
public readonly string $streamName,
) {
if (!preg_match('/^[^*]*\*?$/', $this->streamName)) {
throw new InvalidStreamName($this->streamName);
}
}

public static function startWith(string $streamName): self
Expand Down
230 changes: 230 additions & 0 deletions src/Store/InMemoryStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

use Closure;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
use Patchlevel\EventSourcing\Store\Criteria\AggregateNameCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;

use function array_filter;
use function array_map;
use function array_merge;
use function array_reverse;
use function array_slice;
use function array_unique;
use function array_values;
use function count;
use function mb_substr;
use function str_ends_with;
use function str_starts_with;

use const ARRAY_FILTER_USE_BOTH;

final class InMemoryStore implements StreamStore
{
/** @param list<Message> $messages */
public function __construct(
private array $messages = [],
) {
}

public function load(
Criteria|null $criteria = null,
int|null $limit = null,
int|null $offset = null,
bool $backwards = false,
): ArrayStream {
$messages = $this->filter($criteria);

if ($backwards) {
$messages = array_reverse($messages);
}

if ($offset !== null) {
$messages = array_slice($messages, $offset);
}

if ($limit !== null) {
$messages = array_slice($messages, 0, $limit);
}

return new ArrayStream($messages);

Check failure on line 61 in src/Store/InMemoryStore.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Parameter #1 $messages of class Patchlevel\EventSourcing\Store\ArrayStream constructor expects array<int<0, max>, Patchlevel\EventSourcing\Message\Message>, array<int, Patchlevel\EventSourcing\Message\Message> given.

Check failure on line 61 in src/Store/InMemoryStore.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

ArgumentTypeCoercion

src/Store/InMemoryStore.php:61:32: ArgumentTypeCoercion: Argument 1 of Patchlevel\EventSourcing\Store\ArrayStream::__construct expects array<int<0, max>, Patchlevel\EventSourcing\Message\Message<object>>, but parent type array<int, Patchlevel\EventSourcing\Message\Message<object>> provided (see https://psalm.dev/193)
}

public function count(Criteria|null $criteria = null): int
{
return count($this->filter($criteria));
}

public function save(Message ...$messages): void
{
array_push($this->messages, ...$messages);

Check failure on line 71 in src/Store/InMemoryStore.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

PropertyTypeCoercion

src/Store/InMemoryStore.php:71:20: PropertyTypeCoercion: $this->messages expects 'list<Patchlevel\EventSourcing\Message\Message>', parent type 'array<array-key, Patchlevel\EventSourcing\Message\Message|Patchlevel\EventSourcing\Message\Message<object>>' provided (see https://psalm.dev/198)
}

/**
* @param Closure():ClosureReturn $function
*
* @template ClosureReturn
*/
public function transactional(Closure $function): void
{
$function();
}

/** @return list<string> */
public function streams(): array
{
return array_values(
array_unique(
array_filter(
array_map(
static function (Message $message): string|null {
try {
return $message->header(AggregateHeader::class)->streamName();
} catch (HeaderNotFound) {
try {
return $message->header(StreamHeader::class)->streamName;
} catch (HeaderNotFound) {
return null;
}
}
},
$this->messages,
),
static fn (string|null $streamName): bool => $streamName !== null,
),
),
);
}

public function remove(string $streamName): void
{
$this->messages = array_values(
array_filter(
$this->messages,
static function (Message $message) use ($streamName): bool {
try {
return $message->header(AggregateHeader::class)->streamName() !== $streamName;
} catch (HeaderNotFound) {
try {
return $message->header(StreamHeader::class)->streamName !== $streamName;
} catch (HeaderNotFound) {
return true;
}
}
},
),
);
}

/** @return array<int, Message> */
private function filter(Criteria|null $criteria): array
{
if (!$criteria) {
return $this->messages;
}

return array_filter(
$this->messages,
static function (Message $message, int $index) use ($criteria): bool {
foreach ($criteria->all() as $criterion) {
switch ($criterion::class) {
case AggregateIdCriterion::class:
try {
if ($message->header(AggregateHeader::class)->aggregateId !== $criterion->aggregateId) {
return false;
}
} catch (HeaderNotFound) {
return false;
}

break;
case AggregateNameCriterion::class:
try {
if ($message->header(AggregateHeader::class)->aggregateName !== $criterion->aggregateName) {
return false;
}
} catch (HeaderNotFound) {
return false;
}

break;
case StreamCriterion::class:
if ($criterion->streamName === '*') {
break;
}

try {
$messageStreamName = $message->header(AggregateHeader::class)->streamName();
} catch (HeaderNotFound) {
try {
$messageStreamName = $message->header(StreamHeader::class)->streamName;
} catch (HeaderNotFound) {
return false;
}
}

if (str_ends_with($criterion->streamName, '*')) {
if (!str_starts_with($messageStreamName, mb_substr($criterion->streamName, 0, -1))) {
return false;
}

break;
}

if ($messageStreamName !== $criterion->streamName) {
return false;
}

break;
case FromPlayheadCriterion::class:
$playhead = null;

try {
$playhead = $message->header(AggregateHeader::class)->playhead;
} catch (HeaderNotFound) {
try {
$playhead = $message->header(StreamHeader::class)->playhead;
} catch (HeaderNotFound) {
return false;
}
}

if ($playhead < $criterion->fromPlayhead) {
return false;
}

break;
case ArchivedCriterion::class:
if (!$message->hasHeader(ArchivedHeader::class) === $criterion->archived) {
return false;
}

break;
case FromIndexCriterion::class:
if ($index < $criterion->fromIndex) {
return false;
}

break;
default:
throw new UnsupportedCriterion($criterion::class);
}
}

return true;
},
ARRAY_FILTER_USE_BOTH,
);
}
}
9 changes: 1 addition & 8 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
use function is_string;
use function mb_substr;
use function sprintf;
use function str_contains;
use function str_ends_with;

/** @experimental */
Expand Down Expand Up @@ -149,14 +148,8 @@ private function applyCriteria(QueryBuilder $builder, Criteria $criteria): void
}

if (str_ends_with($criterion->streamName, '*')) {
$streamName = mb_substr($criterion->streamName, 0, -1);

if (str_contains($streamName, '*')) {
throw new InvalidStreamName($criterion->streamName);
}

$builder->andWhere('stream LIKE :stream');
$builder->setParameter('stream', $streamName . '%');
$builder->setParameter('stream', mb_substr($criterion->streamName, 0, -1) . '%');

Check warning on line 152 in src/Store/StreamDoctrineDbalStore.php

View workflow job for this annotation

GitHub Actions / Mutation tests on diff (locked, 8.3, ubuntu-latest)

Escaped Mutant for Mutator "MBString": --- Original +++ New @@ @@ } if (str_ends_with($criterion->streamName, '*')) { $builder->andWhere('stream LIKE :stream'); - $builder->setParameter('stream', mb_substr($criterion->streamName, 0, -1) . '%'); + $builder->setParameter('stream', substr($criterion->streamName, 0, -1) . '%'); break; } $builder->andWhere('stream = :stream');

break;
}
Expand Down
Loading

0 comments on commit da6cd24

Please sign in to comment.