From 6879f7231a4e4779faf0908bcdd183548d16fac1 Mon Sep 17 00:00:00 2001 From: Dries Vints Date: Tue, 29 Jan 2019 16:41:22 +0100 Subject: [PATCH] Expire monitored jobs This commit fixes the problem where completed jobs which were monitored by a tag are stored indefinitely and end up filling up the DB and cause the system to run out of memory. The main problem at hand was that these jobs were never cleaned up. By adding the max TTL and TrimMonitoredJobs listener the monitored jobs will be cleaned up after the given amount of time. By default this is set to 7 days. The listener itself will run one time each day. Fixes https://github.com/laravel/horizon/issues/271 --- config/horizon.php | 1 + src/Contracts/JobRepository.php | 7 ++++ src/EventMap.php | 1 + src/Listeners/TrimMonitoredJobs.php | 47 +++++++++++++++++++++++++ src/Repositories/RedisJobRepository.php | 40 +++++++++++++++++++-- src/ServiceBindings.php | 1 + tests/Feature/MonitoringTest.php | 2 +- tests/Feature/TrimMonitoredJobsTest.php | 35 ++++++++++++++++++ 8 files changed, 131 insertions(+), 3 deletions(-) create mode 100644 src/Listeners/TrimMonitoredJobs.php create mode 100644 tests/Feature/TrimMonitoredJobsTest.php diff --git a/config/horizon.php b/config/horizon.php index a12ea39e..29a04bb5 100644 --- a/config/horizon.php +++ b/config/horizon.php @@ -70,6 +70,7 @@ 'trim' => [ 'recent' => 60, 'failed' => 10080, + 'monitored' => 10080, ], /* diff --git a/src/Contracts/JobRepository.php b/src/Contracts/JobRepository.php index 2b44dc8e..ed3687b8 100644 --- a/src/Contracts/JobRepository.php +++ b/src/Contracts/JobRepository.php @@ -155,6 +155,13 @@ public function trimRecentJobs(); */ public function trimFailedJobs(); + /** + * Trim the monitored job list. + * + * @return void + */ + public function trimMonitoredJobs(); + /** * Find a failed job by ID. * diff --git a/src/EventMap.php b/src/EventMap.php index 9a7eee6c..4ad26978 100644 --- a/src/EventMap.php +++ b/src/EventMap.php @@ -45,6 +45,7 @@ trait EventMap Events\MasterSupervisorLooped::class => [ Listeners\TrimRecentJobs::class, Listeners\TrimFailedJobs::class, + Listeners\TrimMonitoredJobs::class, Listeners\ExpireSupervisors::class, Listeners\MonitorMasterSupervisorMemory::class, ], diff --git a/src/Listeners/TrimMonitoredJobs.php b/src/Listeners/TrimMonitoredJobs.php new file mode 100644 index 00000000..3bc669a7 --- /dev/null +++ b/src/Listeners/TrimMonitoredJobs.php @@ -0,0 +1,47 @@ +lastTrimmed)) { + $this->frequency = max(1, intdiv( + config('horizon.trim.monitored', 10080), 12 + )); + + $this->lastTrimmed = Chronos::now()->subMinutes($this->frequency + 1); + } + + if ($this->lastTrimmed->lte(Chronos::now()->subMinutes($this->frequency))) { + app(JobRepository::class)->trimMonitoredJobs(); + + $this->lastTrimmed = Chronos::now(); + } + } +} diff --git a/src/Repositories/RedisJobRepository.php b/src/Repositories/RedisJobRepository.php index ab310bbd..9a77cbaa 100644 --- a/src/Repositories/RedisJobRepository.php +++ b/src/Repositories/RedisJobRepository.php @@ -42,6 +42,13 @@ class RedisJobRepository implements JobRepository */ public $failedJobExpires; + /** + * The number of minutes until monitored jobs should be purged. + * + * @var int + */ + public $monitoredJobExpires; + /** * Create a new repository instance. * @@ -53,6 +60,7 @@ public function __construct(RedisFactory $redis) $this->redis = $redis; $this->recentJobExpires = config('horizon.trim.recent', 60); $this->failedJobExpires = config('horizon.trim.failed', 10080); + $this->monitoredJobExpires = config('horizon.trim.monitored', 10080); } /** @@ -293,7 +301,7 @@ public function released($connection, $queue, JobPayload $payload) } /** - * Store a monitored job. + * Mark the job as completed and monitored. * * @param string $connection * @param string $queue @@ -303,6 +311,8 @@ public function released($connection, $queue, JobPayload $payload) public function remember($connection, $queue, JobPayload $payload) { $this->connection()->pipeline(function ($pipe) use ($connection, $queue, $payload) { + $this->storeMonitoredReferences($pipe, $payload->id()); + $pipe->hmset( $payload->id(), [ 'id' => $payload->id(), @@ -315,10 +325,24 @@ public function remember($connection, $queue, JobPayload $payload) ] ); - $pipe->persist($payload->id()); + $pipe->expireat( + $payload->id(), Chronos::now()->addMinutes($this->monitoredJobExpires)->getTimestamp() + ); }); } + /** + * Store the look-up references for a monitored job. + * + * @param mixed $pipe + * @param string $id + * @return void + */ + protected function storeMonitoredReferences($pipe, $id) + { + $pipe->zadd('monitored_jobs', str_replace(',', '.', microtime(true) * -1), $id); + } + /** * Mark the given jobs as released / pending. * @@ -457,6 +481,18 @@ public function trimFailedJobs() ); } + /** + * Trim the monitored job list. + * + * @return void + */ + public function trimMonitoredJobs() + { + $this->connection()->zremrangebyscore( + 'monitored_jobs', Chronos::now()->subMinutes($this->monitoredJobExpires)->getTimestamp() * -1, '+inf' + ); + } + /** * Find a failed job by ID. * diff --git a/src/ServiceBindings.php b/src/ServiceBindings.php index c3c61883..cff53682 100644 --- a/src/ServiceBindings.php +++ b/src/ServiceBindings.php @@ -15,6 +15,7 @@ trait ServiceBindings Contracts\HorizonCommandQueue::class => RedisHorizonCommandQueue::class, Listeners\TrimRecentJobs::class, Listeners\TrimFailedJobs::class, + Listeners\TrimMonitoredJobs::class, Lock::class, Stopwatch::class, diff --git a/tests/Feature/MonitoringTest.php b/tests/Feature/MonitoringTest.php index 9674f7ac..6fa45933 100644 --- a/tests/Feature/MonitoringTest.php +++ b/tests/Feature/MonitoringTest.php @@ -53,7 +53,7 @@ public function test_completed_jobs_are_stored_in_database_when_one_of_their_tag $id = Queue::push(new Jobs\BasicJob); $this->work(); $this->assertEquals(1, $this->monitoredJobs('first')); - $this->assertEquals(-1, Redis::connection('horizon')->ttl($id)); + $this->assertGreaterThan(0, Redis::connection('horizon')->ttl($id)); } public function test_completed_jobs_are_removed_from_database_when_their_tag_is_no_longer_monitored() diff --git a/tests/Feature/TrimMonitoredJobsTest.php b/tests/Feature/TrimMonitoredJobsTest.php new file mode 100644 index 00000000..a6633fa4 --- /dev/null +++ b/tests/Feature/TrimMonitoredJobsTest.php @@ -0,0 +1,35 @@ +shouldReceive('trimMonitoredJobs')->twice(); + $this->app->instance(JobRepository::class, $repository); + + // Should not be called first time since date is initialized... + $trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class))); + + Chronos::setTestNow(Chronos::now()->addMinutes(1600)); + + // Should only be called twice... + $trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class))); + $trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class))); + $trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class))); + + Chronos::setTestNow(); + } +}