From b73269c1335abac2987be4d96253a7698eea756e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julius=20H=C3=A4rtl?= Date: Tue, 21 Dec 2021 08:57:53 +0100 Subject: [PATCH 1/3] Allow calling cron.php with a background job class MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julius Härtl --- cron.php | 3 ++- lib/private/BackgroundJob/JobList.php | 10 +++++++--- tests/lib/BackgroundJob/DummyJobList.php | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cron.php b/cron.php index 7d661621ed090..4268b2ef36657 100644 --- a/cron.php +++ b/cron.php @@ -142,7 +142,8 @@ $endTime = time() + 14 * 60; $executedJobs = []; - while ($job = $jobList->getNext($onlyTimeSensitive)) { + $jobClass = isset($argv[1]) ? $argv[1] : null; + while ($job = $jobList->getNext($onlyTimeSensitive, $jobClass)) { if (isset($executedJobs[$job->getId()])) { $jobList->unlockJob($job); break; diff --git a/lib/private/BackgroundJob/JobList.php b/lib/private/BackgroundJob/JobList.php index 20176e451251d..e4e533c78b1e0 100644 --- a/lib/private/BackgroundJob/JobList.php +++ b/lib/private/BackgroundJob/JobList.php @@ -205,7 +205,7 @@ public function getJobs($job, ?int $limit, int $offset): array { /** * get the next job in the list */ - public function getNext(bool $onlyTimeSensitive = false): ?IJob { + public function getNext(bool $onlyTimeSensitive = false, string $jobClass = null): ?IJob { $query = $this->connection->getQueryBuilder(); $query->select('*') ->from('jobs') @@ -218,6 +218,10 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { $query->andWhere($query->expr()->eq('time_sensitive', $query->createNamedParameter(IJob::TIME_SENSITIVE, IQueryBuilder::PARAM_INT))); } + if ($jobClass) { + $query->andWhere($query->expr()->eq('class', $query->createNamedParameter($jobClass))); + } + $update = $this->connection->getQueryBuilder(); $update->update('jobs') ->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime())) @@ -238,7 +242,7 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { if ($count === 0) { // Background job already executed elsewhere, try again. - return $this->getNext($onlyTimeSensitive); + return $this->getNext($onlyTimeSensitive, $jobClass); } $job = $this->buildJob($row); @@ -252,7 +256,7 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { $reset->executeStatement(); // Background job from disabled app, try again. - return $this->getNext($onlyTimeSensitive); + return $this->getNext($onlyTimeSensitive, $jobClass); } return $job; diff --git a/tests/lib/BackgroundJob/DummyJobList.php b/tests/lib/BackgroundJob/DummyJobList.php index 4d14ed9e7db43..d0ba68b60e1b5 100644 --- a/tests/lib/BackgroundJob/DummyJobList.php +++ b/tests/lib/BackgroundJob/DummyJobList.php @@ -91,7 +91,7 @@ public function getJobs($job, ?int $limit, int $offset): array { /** * get the next job in the list */ - public function getNext(bool $onlyTimeSensitive = false): ?IJob { + public function getNext(bool $onlyTimeSensitive = false, string $jobClass = null): ?IJob { if (count($this->jobs) > 0) { if ($this->last < (count($this->jobs) - 1)) { $i = $this->last + 1; From 1e394484492eaa3a3c1c2ef7a7488134ea553d1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julius=20H=C3=A4rtl?= Date: Thu, 14 Jul 2022 14:50:12 +0200 Subject: [PATCH 2/3] Add background worker occ command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julius Härtl --- core/Command/Background/Worker.php | 206 ++++++++++++++++++++++++++ core/register_command.php | 1 + lib/private/BackgroundJob/JobList.php | 19 +++ 3 files changed, 226 insertions(+) create mode 100644 core/Command/Background/Worker.php diff --git a/core/Command/Background/Worker.php b/core/Command/Background/Worker.php new file mode 100644 index 0000000000000..61f8fbc495edd --- /dev/null +++ b/core/Command/Background/Worker.php @@ -0,0 +1,206 @@ + + * + * @author Joas Schilling + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OC\Core\Command\Background; + +use OCP\BackgroundJob\IJob; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + +class Worker extends Command { + protected IJobList $jobList; + protected LoggerInterface $logger; + + const DEFAULT_INTERVAL = 5; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct(); + $this->jobList = $jobList; + $this->logger = $logger; + } + + protected function configure(): void { + $this + ->setName('background-job:worker') + ->setDescription('Run a background job worker') + ->addArgument( + 'job-class', + InputArgument::OPTIONAL, + 'The class of the job in the database' + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Only execute the worker once (as a regular cron execution would do it)' + ) + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + $jobClass = $input->getArgument('job-class'); + + $executedJobs = []; + + $ended = false; + pcntl_signal(SIGINT, function () use (&$ended, $output, $executedJobs) { + $output->writeln('SIGINT'); + if ($ended) { + foreach ($executedJobs as $id => $time) { + unset($executedJobs[$id]); + $job = $this->jobList->getById($id); + $this->jobList->unlockJob($job); + } + $output->writeln('Killed'); + exit(1); + } + $ended = true; + $output->writeln('Waiting for job to finish. Press Ctrl-C again to kill, but this may have unexpected side effects.'); + }); + + while (true) { + if ($ended) { + break; + } + $count = 0; + $total = 0; + foreach($this->jobList->countByClass() as $row) { + if ((int)$row['count'] === 1) { + $count++; + } else { + $output->writeln($row['class'] . " " . $row['count']); + } + $total += $row['count']; + } + $output->writeln("Other jobs " . $count); + $output->writeln("Total jobs " . $count); + + + + foreach ($executedJobs as $id => $time) { + if ($time < time() - self::DEFAULT_INTERVAL) { + unset($executedJobs[$id]); + $job = $this->jobList->getById($id); + $this->jobList->unlockJob($job); + } + } + + $job = $this->jobList->getNext(false, $jobClass); + if (!$job) { + $output->writeln("Waiting for new jobs to be queued"); + sleep(1); + continue; + } + + + if (isset($executedJobs[$job->getId()])) { + continue; + } + + $output->writeln("- Running job " . get_class($job) . " " . $job->getId()); + + if ($output->isVerbose()) { + $this->printJobInfo($job->getId(), $job, $output); + } + + $job->execute($this->jobList, \OC::$server->getLogger()); + + // clean up after unclean jobs + \OC_Util::tearDownFS(); + \OC::$server->getTempManager()->clean(); + + $this->jobList->setLastJob($job); + $executedJobs[$job->getId()] = time(); + unset($job); + + if ($input->getOption('once')) { + break; + } + } + + foreach ($executedJobs as $id => $time) { + unset($executedJobs[$id]); + $job = $this->jobList->getById($id); + $this->jobList->unlockJob($job); + } + + return 0; + } + + protected function printJobInfo(int $jobId, IJob $job, OutputInterface$output): void { + $row = $this->jobList->getDetailsById($jobId); + + $lastRun = new \DateTime(); + $lastRun->setTimestamp((int) $row['last_run']); + $lastChecked = new \DateTime(); + $lastChecked->setTimestamp((int) $row['last_checked']); + $reservedAt = new \DateTime(); + $reservedAt->setTimestamp((int) $row['reserved_at']); + + $output->writeln('Job class: ' . get_class($job)); + $output->writeln('Arguments: ' . json_encode($job->getArgument())); + + $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; + if ($isTimedJob) { + $output->writeln('Type: timed'); + } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { + $output->writeln('Type: queued'); + } else { + $output->writeln('Type: job'); + } + + $output->writeln(''); + $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); + if ((int) $row['reserved_at'] === 0) { + $output->writeln('Reserved at: -'); + } else { + $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); + } + $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); + $output->writeln('Last duration: ' . $row['execution_duration']); + + if ($isTimedJob) { + $reflection = new \ReflectionClass($job); + $intervalProperty = $reflection->getProperty('interval'); + $intervalProperty->setAccessible(true); + $interval = $intervalProperty->getValue($job); + + $nextRun = new \DateTime(); + $nextRun->setTimestamp($row['last_run'] + $interval); + + if ($nextRun > new \DateTime()) { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } else { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } + } + } +} diff --git a/core/register_command.php b/core/register_command.php index b6da0a6d44d01..298748d985327 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -90,6 +90,7 @@ $application->add(new OC\Core\Command\Background\WebCron(\OC::$server->getConfig())); $application->add(new OC\Core\Command\Background\Ajax(\OC::$server->getConfig())); $application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->getLogger())); + $application->add(new OC\Core\Command\Background\Worker(\OC::$server->getJobList(), \OC::$server->get(LoggerInterface::class))); $application->add(new OC\Core\Command\Background\ListCommand(\OC::$server->getJobList())); $application->add(\OC::$server->query(\OC\Core\Command\Broadcast\Test::class)); diff --git a/lib/private/BackgroundJob/JobList.php b/lib/private/BackgroundJob/JobList.php index e4e533c78b1e0..3b1e2e91c719c 100644 --- a/lib/private/BackgroundJob/JobList.php +++ b/lib/private/BackgroundJob/JobList.php @@ -386,4 +386,23 @@ public function resetBackgroundJob(IJob $job): void { ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId()), IQueryBuilder::PARAM_INT)); $query->executeStatement(); } + + public function countByClass(): array { + $query = $this->connection->getQueryBuilder(); + $query->select('class') + ->selectAlias($query->func()->count('id'), 'count') + ->from('jobs') + ->orderBy('count') + ->groupBy('class'); + + $result = $query->executeQuery(); + + $jobs = []; + while ($row = $result->fetch()) { + $jobs[] = $row; + } + + return $jobs; + + } } From 5f3310f2024f5c38fdcf2847072d5369d0872f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julius=20H=C3=A4rtl?= Date: Thu, 1 Sep 2022 12:13:05 +0200 Subject: [PATCH 3/3] Fix running once when no job was scheduled MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Julius Härtl --- core/Command/Background/Job.php | 66 +-------- core/Command/Background/JobBase.php | 93 ++++++++++++ core/Command/Background/JobWorker.php | 173 +++++++++++++++++++++ core/Command/Background/Worker.php | 206 -------------------------- core/register_command.php | 4 +- 5 files changed, 273 insertions(+), 269 deletions(-) create mode 100644 core/Command/Background/JobBase.php create mode 100644 core/Command/Background/JobWorker.php delete mode 100644 core/Command/Background/Worker.php diff --git a/core/Command/Background/Job.php b/core/Command/Background/Job.php index 823498cf8ca6c..87c06be48da4d 100644 --- a/core/Command/Background/Job.php +++ b/core/Command/Background/Job.php @@ -25,24 +25,17 @@ namespace OC\Core\Command\Background; -use OCP\BackgroundJob\IJob; use OCP\BackgroundJob\IJobList; -use OCP\ILogger; -use Symfony\Component\Console\Command\Command; +use Psr\Log\LoggerInterface; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -class Job extends Command { - protected IJobList $jobList; - protected ILogger $logger; - +class Job extends JobBase { public function __construct(IJobList $jobList, - ILogger $logger) { - parent::__construct(); - $this->jobList = $jobList; - $this->logger = $logger; + LoggerInterface $logger) { + parent::__construct($jobList, $logger); } protected function configure(): void { @@ -89,7 +82,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $output->writeln('Something went wrong when trying to retrieve Job with ID ' . $jobId . ' from database'); return 1; } - $job->execute($this->jobList, $this->logger); + $job->execute($this->jobList, \OC::$server->getLogger()); $job = $this->jobList->getById($jobId); if (($job === null) || ($lastRun !== $job->getLastRun())) { @@ -106,53 +99,4 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 0; } - - protected function printJobInfo(int $jobId, IJob $job, OutputInterface$output): void { - $row = $this->jobList->getDetailsById($jobId); - - $lastRun = new \DateTime(); - $lastRun->setTimestamp((int) $row['last_run']); - $lastChecked = new \DateTime(); - $lastChecked->setTimestamp((int) $row['last_checked']); - $reservedAt = new \DateTime(); - $reservedAt->setTimestamp((int) $row['reserved_at']); - - $output->writeln('Job class: ' . get_class($job)); - $output->writeln('Arguments: ' . json_encode($job->getArgument())); - - $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; - if ($isTimedJob) { - $output->writeln('Type: timed'); - } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { - $output->writeln('Type: queued'); - } else { - $output->writeln('Type: job'); - } - - $output->writeln(''); - $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); - if ((int) $row['reserved_at'] === 0) { - $output->writeln('Reserved at: -'); - } else { - $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); - } - $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); - $output->writeln('Last duration: ' . $row['execution_duration']); - - if ($isTimedJob) { - $reflection = new \ReflectionClass($job); - $intervalProperty = $reflection->getProperty('interval'); - $intervalProperty->setAccessible(true); - $interval = $intervalProperty->getValue($job); - - $nextRun = new \DateTime(); - $nextRun->setTimestamp($row['last_run'] + $interval); - - if ($nextRun > new \DateTime()) { - $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); - } else { - $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); - } - } - } } diff --git a/core/Command/Background/JobBase.php b/core/Command/Background/JobBase.php new file mode 100644 index 0000000000000..fe2880c0988a0 --- /dev/null +++ b/core/Command/Background/JobBase.php @@ -0,0 +1,93 @@ + + * + * @author Julius Härtl + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + + +namespace OC\Core\Command\Background; + +use OCP\BackgroundJob\IJob; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Output\OutputInterface; + +abstract class JobBase extends \OC\Core\Command\Base { + protected IJobList $jobList; + protected LoggerInterface $logger; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct(); + $this->jobList = $jobList; + $this->logger = $logger; + } + + protected function printJobInfo(int $jobId, IJob $job, OutputInterface $output): void { + $row = $this->jobList->getDetailsById($jobId); + + $lastRun = new \DateTime(); + $lastRun->setTimestamp((int) $row['last_run']); + $lastChecked = new \DateTime(); + $lastChecked->setTimestamp((int) $row['last_checked']); + $reservedAt = new \DateTime(); + $reservedAt->setTimestamp((int) $row['reserved_at']); + + $output->writeln('Job class: ' . get_class($job)); + $output->writeln('Arguments: ' . json_encode($job->getArgument())); + + $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; + if ($isTimedJob) { + $output->writeln('Type: timed'); + } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { + $output->writeln('Type: queued'); + } else { + $output->writeln('Type: job'); + } + + $output->writeln(''); + $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); + if ((int) $row['reserved_at'] === 0) { + $output->writeln('Reserved at: -'); + } else { + $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); + } + $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); + $output->writeln('Last duration: ' . $row['execution_duration']); + + if ($isTimedJob) { + $reflection = new \ReflectionClass($job); + $intervalProperty = $reflection->getProperty('interval'); + $intervalProperty->setAccessible(true); + $interval = $intervalProperty->getValue($job); + + $nextRun = new \DateTime(); + $nextRun->setTimestamp($row['last_run'] + $interval); + + if ($nextRun > new \DateTime()) { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } else { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } + } + } +} diff --git a/core/Command/Background/JobWorker.php b/core/Command/Background/JobWorker.php new file mode 100644 index 0000000000000..2ca4af73474b1 --- /dev/null +++ b/core/Command/Background/JobWorker.php @@ -0,0 +1,173 @@ + + * + * @author Joas Schilling + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OC\Core\Command\Background; + +use OC\Core\Command\InterruptedException; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + +class JobWorker extends JobBase { + private array $executedJobs = []; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct($jobList, $logger); + } + + protected function configure(): void { + parent::configure(); + + $this + ->setName('background-job:worker') + ->setDescription('Run a background job worker') + ->addArgument( + 'job-class', + InputArgument::OPTIONAL, + 'The class of the job in the database' + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Only execute the worker once (as a regular cron execution would do it)' + ) + ->addOption( + 'interval', + 'i', + InputOption::VALUE_OPTIONAL, + 'Interval in seconds in which the worker should repeat already processed jobs (set to 0 for no repeat)', + 5 + ) + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + $jobClass = $input->getArgument('job-class'); + + if ($jobClass && !class_exists($jobClass)) { + $output->writeln('Invalid job class'); + return 1; + } + + while (true) { + // Handle canceling of the process + try { + $this->abortIfInterrupted(); + } catch (InterruptedException $e) { + $output->writeln('Cleaning up before quitting. Press Ctrl-C again to kill, but this may have unexpected side effects.'); + $this->unlockExecuted(); + $output->writeln('Background job worker stopped'); + break; + } + + $this->printSummary($input, $output); + + $interval = (int)($input->getOption('interval') ?? 5); + + // Unlock jobs that should be executed again after the interval + // Alternative could be to set last_checked to interval in the future to avoid the extra locks + foreach ($this->executedJobs as $id => $time) { + if ($time <= time() - $interval) { + unset($this->executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } + + usleep(50000); + $job = $this->jobList->getNext(false, $jobClass); + if (!$job) { + if ($input->getOption('once') === true || $interval === 0) { + break; + } + + $output->writeln("Waiting for new jobs to be queued", OutputInterface::VERBOSITY_VERBOSE); + // Re-check interval for new jobs + sleep(1); + continue; + } + + + if (isset($this->executedJobs[$job->getId()]) && ($this->executedJobs[$job->getId()] + $interval > time())) { + $output->writeln("Job already executed within timeframe " . get_class($job) . " " . $job->getId() . '', OutputInterface::VERBOSITY_VERBOSE); + continue; + } + + $output->writeln("Running job " . get_class($job) . " with ID " . $job->getId()); + + if ($output->isVerbose()) { + $this->printJobInfo($job->getId(), $job, $output); + } + + $job->execute($this->jobList, \OC::$server->getLogger()); + + // clean up after unclean jobs + \OC_Util::tearDownFS(); + \OC::$server->getTempManager()->clean(); + + $this->jobList->setLastJob($job); + $this->jobList->unlockJob($job); + $this->executedJobs[$job->getId()] = time(); + + if ($input->getOption('once') === true) { + break; + } + } + + $this->unlockExecuted(); + + return 0; + } + + private function printSummary(InputInterface $input, OutputInterface $output): void { + if (!$output->isVeryVerbose()) { + return; + } + $output->writeln("Summary"); + + $counts = []; + foreach ($this->jobList->countByClass() as $row) { + $counts[] = $row; + } + $this->writeTableInOutputFormat($input, $output, $counts); + } + + private function unlockExecuted() { + foreach ($this->executedJobs as $id => $time) { + unset($this->executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } +} diff --git a/core/Command/Background/Worker.php b/core/Command/Background/Worker.php deleted file mode 100644 index 61f8fbc495edd..0000000000000 --- a/core/Command/Background/Worker.php +++ /dev/null @@ -1,206 +0,0 @@ - - * - * @author Joas Schilling - * - * @license GNU AGPL version 3 or any later version - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -namespace OC\Core\Command\Background; - -use OCP\BackgroundJob\IJob; -use OCP\BackgroundJob\IJobList; -use Psr\Log\LoggerInterface; -use Symfony\Component\Console\Command\Command; -use Symfony\Component\Console\Input\InputArgument; -use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Output\OutputInterface; - -class Worker extends Command { - protected IJobList $jobList; - protected LoggerInterface $logger; - - const DEFAULT_INTERVAL = 5; - - public function __construct(IJobList $jobList, - LoggerInterface $logger) { - parent::__construct(); - $this->jobList = $jobList; - $this->logger = $logger; - } - - protected function configure(): void { - $this - ->setName('background-job:worker') - ->setDescription('Run a background job worker') - ->addArgument( - 'job-class', - InputArgument::OPTIONAL, - 'The class of the job in the database' - ) - ->addOption( - 'once', - null, - InputOption::VALUE_NONE, - 'Only execute the worker once (as a regular cron execution would do it)' - ) - ; - } - - protected function execute(InputInterface $input, OutputInterface $output): int { - $jobClass = $input->getArgument('job-class'); - - $executedJobs = []; - - $ended = false; - pcntl_signal(SIGINT, function () use (&$ended, $output, $executedJobs) { - $output->writeln('SIGINT'); - if ($ended) { - foreach ($executedJobs as $id => $time) { - unset($executedJobs[$id]); - $job = $this->jobList->getById($id); - $this->jobList->unlockJob($job); - } - $output->writeln('Killed'); - exit(1); - } - $ended = true; - $output->writeln('Waiting for job to finish. Press Ctrl-C again to kill, but this may have unexpected side effects.'); - }); - - while (true) { - if ($ended) { - break; - } - $count = 0; - $total = 0; - foreach($this->jobList->countByClass() as $row) { - if ((int)$row['count'] === 1) { - $count++; - } else { - $output->writeln($row['class'] . " " . $row['count']); - } - $total += $row['count']; - } - $output->writeln("Other jobs " . $count); - $output->writeln("Total jobs " . $count); - - - - foreach ($executedJobs as $id => $time) { - if ($time < time() - self::DEFAULT_INTERVAL) { - unset($executedJobs[$id]); - $job = $this->jobList->getById($id); - $this->jobList->unlockJob($job); - } - } - - $job = $this->jobList->getNext(false, $jobClass); - if (!$job) { - $output->writeln("Waiting for new jobs to be queued"); - sleep(1); - continue; - } - - - if (isset($executedJobs[$job->getId()])) { - continue; - } - - $output->writeln("- Running job " . get_class($job) . " " . $job->getId()); - - if ($output->isVerbose()) { - $this->printJobInfo($job->getId(), $job, $output); - } - - $job->execute($this->jobList, \OC::$server->getLogger()); - - // clean up after unclean jobs - \OC_Util::tearDownFS(); - \OC::$server->getTempManager()->clean(); - - $this->jobList->setLastJob($job); - $executedJobs[$job->getId()] = time(); - unset($job); - - if ($input->getOption('once')) { - break; - } - } - - foreach ($executedJobs as $id => $time) { - unset($executedJobs[$id]); - $job = $this->jobList->getById($id); - $this->jobList->unlockJob($job); - } - - return 0; - } - - protected function printJobInfo(int $jobId, IJob $job, OutputInterface$output): void { - $row = $this->jobList->getDetailsById($jobId); - - $lastRun = new \DateTime(); - $lastRun->setTimestamp((int) $row['last_run']); - $lastChecked = new \DateTime(); - $lastChecked->setTimestamp((int) $row['last_checked']); - $reservedAt = new \DateTime(); - $reservedAt->setTimestamp((int) $row['reserved_at']); - - $output->writeln('Job class: ' . get_class($job)); - $output->writeln('Arguments: ' . json_encode($job->getArgument())); - - $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; - if ($isTimedJob) { - $output->writeln('Type: timed'); - } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { - $output->writeln('Type: queued'); - } else { - $output->writeln('Type: job'); - } - - $output->writeln(''); - $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); - if ((int) $row['reserved_at'] === 0) { - $output->writeln('Reserved at: -'); - } else { - $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); - } - $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); - $output->writeln('Last duration: ' . $row['execution_duration']); - - if ($isTimedJob) { - $reflection = new \ReflectionClass($job); - $intervalProperty = $reflection->getProperty('interval'); - $intervalProperty->setAccessible(true); - $interval = $intervalProperty->getValue($job); - - $nextRun = new \DateTime(); - $nextRun->setTimestamp($row['last_run'] + $interval); - - if ($nextRun > new \DateTime()) { - $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); - } else { - $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); - } - } - } -} diff --git a/core/register_command.php b/core/register_command.php index 298748d985327..f6790c46d961d 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -89,8 +89,8 @@ $application->add(new OC\Core\Command\Background\Cron(\OC::$server->getConfig())); $application->add(new OC\Core\Command\Background\WebCron(\OC::$server->getConfig())); $application->add(new OC\Core\Command\Background\Ajax(\OC::$server->getConfig())); - $application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->getLogger())); - $application->add(new OC\Core\Command\Background\Worker(\OC::$server->getJobList(), \OC::$server->get(LoggerInterface::class))); + $application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->get(LoggerInterface::class))); + $application->add(\OCP\Server::get(OC\Core\Command\Background\JobWorker::class)); $application->add(new OC\Core\Command\Background\ListCommand(\OC::$server->getJobList())); $application->add(\OC::$server->query(\OC\Core\Command\Broadcast\Test::class));