Skip to content

Commit

Permalink
Merge pull request #519 from patchlevel/save-10000-events
Browse files Browse the repository at this point in the history
save 10000 events
  • Loading branch information
DavidBadura authored Mar 3, 2024
2 parents d522673 + 2671b32 commit 569fcc1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
47 changes: 39 additions & 8 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

use function array_fill;
use function count;
use function floor;
use function implode;
use function is_int;
use function is_string;
use function sprintf;

final class DoctrineDbalStore implements Store, ArchivableStore, SchemaConfigurator
{
/**
* PostgreSQL has a limit of 65535 parameters in a single query.
*/
private const MAX_UNSIGNED_SMALL_INT = 65_535;

public function __construct(
private readonly Connection $connection,
private readonly EventSerializer $serializer,
Expand Down Expand Up @@ -123,12 +129,6 @@ function (Connection $connection) use ($messages): void {
$jsonType = Type::getType(Types::JSON);
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

$parameters = [];
$placeholders = [];

/** @var array<int<0, max>, Type> $types */
$types = [];

$columns = [
'aggregate',
'aggregate_id',
Expand All @@ -142,11 +142,17 @@ function (Connection $connection) use ($messages): void {
];

$columnsLength = count($columns);
$batchSize = (int)floor(self::MAX_UNSIGNED_SMALL_INT / $columnsLength);
$placeholder = implode(', ', array_fill(0, $columnsLength, '?'));

foreach ($messages as $position => $message) {
$parameters = [];
$placeholders = [];
/** @var array<int<0, max>, Type> $types */
$types = [];
$position = 0;
foreach ($messages as $message) {
/** @var int<0, max> $offset */
$offset = (int)$position * $columnsLength;
$offset = $position * $columnsLength;
$placeholders[] = $placeholder;

$data = $this->serializer->serialize($message->event());
Expand Down Expand Up @@ -184,6 +190,31 @@ function (Connection $connection) use ($messages): void {

$parameters[] = $message->customHeaders();
$types[$offset + 8] = $jsonType;

$position++;

if ($position !== $batchSize) {
continue;
}

$query = sprintf(
"INSERT INTO %s (%s) VALUES\n(%s)",
$this->storeTableName,
implode(', ', $columns),
implode("),\n(", $placeholders),
);

$connection->executeStatement($query, $parameters, $types);

$parameters = [];
$placeholders = [];
$types = [];

$position = 0;
}

if ($position === 0) {
return;
}

$query = sprintf(
Expand Down
20 changes: 20 additions & 0 deletions tests/Integration/Store/StoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ public function testSave(): void
self::assertEquals(['profileId' => 'test', 'name' => 'test'], json_decode($result1['payload'], true));
}

public function testSave10000Messages(): void
{
$messages = [];

for ($i = 1; $i <= 10000; $i++) {
$messages[] = Message::create(new ProfileCreated(ProfileId::fromString('test'), 'test'))
->withAggregateName('profile')
->withAggregateId('test')
->withPlayhead($i)
->withRecordedOn(new DateTimeImmutable('2020-01-01 00:00:00'));
}

$this->store->save(...$messages);

/** @var int $result */
$result = $this->connection->fetchFirstColumn('SELECT COUNT(*) FROM eventstore')[0];

self::assertEquals(10000, $result);
}

public function testLoad(): void
{
$message = Message::create(new ProfileCreated(ProfileId::fromString('test'), 'test'))
Expand Down

0 comments on commit 569fcc1

Please sign in to comment.