Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
timacdonald committed Sep 1, 2023
1 parent 10a8869 commit b855cb2
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 11 deletions.
2 changes: 2 additions & 0 deletions config/pulse.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

'database' => [
'connection' => env('PULSE_DB_CONNECTION', null),
'chunk' => 1000,
],
],

Expand All @@ -35,6 +36,7 @@

'redis' => [
'connection' => env('PULSE_REDIS_CONNECTION'),
'chunk' => 1000,
],
],

Expand Down
3 changes: 1 addition & 2 deletions src/Commands/WorkCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public function handle(
$lastTrimmedStorageAt = $now;
}

// TODO: chunking?
$processed = $ingest->store($storage, 1000);
$processed = $ingest->store($storage);

if ($processed === 0) {
Sleep::for(1)->second();
Expand Down
2 changes: 1 addition & 1 deletion src/Contracts/Ingest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ public function trim(): void;
/**
* Store the ingested entries.
*/
public function store(Storage $storage, int $count): int;
public function store(Storage $storage): int;
}
6 changes: 3 additions & 3 deletions src/Ingests/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public function trim(): void
/**
* Store the ingested entries.
*/
public function store(Storage $storage, int $count): int
public function store(Storage $storage): int
{
$entries = collect($this->connection()->xrange($this->stream, '-', '+', $count));
$entries = collect($this->connection()->xrange($this->stream, '-', '+', $this->config->get('pulse.ingest.redis.chunk')));

if ($entries->isEmpty()) {
return 0;
Expand All @@ -84,7 +84,7 @@ public function store(Storage $storage, int $count): int
*/
protected function trimAfter(): Interval
{
return new Interval($this->config->get('pulse.retain') ?? 'P7D');
return new Interval($this->config->get('pulse.retain'));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Ingests/Storage.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function trim(): void
/**
* Store the ingested entries.
*/
public function store(StorageContract $store, int $count): int
public function store(StorageContract $store): int
{
throw new RuntimeException('The storage ingest driver does not need to process entries.');
}
Expand Down
4 changes: 2 additions & 2 deletions src/Storage/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function store(Collection $items): void

$this->connection()->transaction(function () use ($inserts, $updates) {
$inserts->groupBy('table')
->each(fn (Collection $rows, string $table) => $rows->chunk(1000)
->each(fn (Collection $rows, string $table) => $rows->chunk($this->config->get('pulse.storage.database.chunk'))
->map(fn (Collection $inserts) => $inserts->pluck('attributes')->all())
->each($this->connection()->table($table)->insert(...)));

Expand All @@ -63,7 +63,7 @@ public function trim(Collection $tables): void
*/
protected function trimAfter(): Interval
{
return new Interval($this->config->get('pulse.retain') ?? 'P7D');
return new Interval($this->config->get('pulse.retain'));
}

/**
Expand Down
1 change: 0 additions & 1 deletion tests/Feature/HttpRequestsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Illuminate\Support\Facades\Route;
use Laravel\Pulse\Facades\Pulse;
use Laravel\Pulse\Pulse as PulseInstance;

use function Pest\Laravel\actingAs;
use function Pest\Laravel\get;
use function Pest\Laravel\post;
Expand Down
3 changes: 2 additions & 1 deletion tests/Feature/RedisTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

it('runs the same commands while storing', function ($driver) {
Config::set('database.redis.client', $driver);
Config::set('pulse.ingest.redis.chunk', 567);
Date::setTestNow(Date::parse('2000-01-02 03:04:05')->startOfSecond());
$ingest = App::make(Redis::class);
$ingest->ingest(collect([
Expand All @@ -44,7 +45,7 @@
->output();
[$firstEntryKey, $lastEntryKey] = collect(explode("\n", $output))->only([17, 21])->values();

$commands = captureRedisCommands(fn () => $ingest->store(new NullStorage, 567));
$commands = captureRedisCommands(fn () => $ingest->store(new NullStorage));

expect($commands)->toContain('"XRANGE" "laravel_database_laravel:pulse:entries" "-" "+" "COUNT" "567"');
expect($commands)->toContain('"XDEL" "laravel_database_laravel:pulse:entries" "'.$firstEntryKey.'" "'.$lastEntryKey.'"');
Expand Down

0 comments on commit b855cb2

Please sign in to comment.