diff --git a/core/Command/Background/Worker.php b/core/Command/Background/Worker.php new file mode 100644 index 0000000000000..abefd520211ac --- /dev/null +++ b/core/Command/Background/Worker.php @@ -0,0 +1,221 @@ + + * + * @author Joas Schilling + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OC\Core\Command\Background; + +use OCP\BackgroundJob\IJob; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Command\Command; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + +class Worker extends Command +{ + protected IJobList $jobList; + protected LoggerInterface $logger; + + public const DEFAULT_INTERVAL = 5; + + public function __construct( + IJobList $jobList, + LoggerInterface $logger + ) { + parent::__construct(); + $this->jobList = $jobList; + $this->logger = $logger; + } + + protected function configure(): void + { + $this + ->setName('background-job:worker') + ->setDescription('Run a background job worker') + ->addArgument( + 'job-class', + InputArgument::OPTIONAL, + 'The class of the job in the database' + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Only execute the worker once (as a regular cron execution would do it)' + ); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $jobClass = $input->getArgument('job-class'); + + $executedJobs = []; + + $ended = false; + pcntl_signal(SIGINT, function () use (&$ended, $output, &$executedJobs) { + $output->writeln('SIGINT'); + if ($ended) { + foreach ($executedJobs as $id => $time) { + unset($executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + $output->writeln('Killed'); + exit(1); + } + $ended = true; + $output->writeln('Waiting for job to finish. Press Ctrl-C again to kill, but this may have unexpected side effects.'); + }); + + while (true) { + if ($ended) { + break; + } + $count = 0; + $total = 0; + foreach ($this->jobList->countByClass() as $row) { + if ((int)$row['count'] === 1) { + $count++; + } else { + $output->writeln($row['class'] . " " . $row['count']); + } + $total += $row['count']; + } + $output->writeln("Other jobs " . $count); + $output->writeln("Total jobs " . $count); + + + + foreach ($executedJobs as $id => $time) { + if ($time < time() - self::DEFAULT_INTERVAL) { + unset($executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } + + if ($input->getOption('once')) { + break; + } + + $job = $this->jobList->getNext(false, $jobClass); + if (!$job) { + $output->writeln("Waiting for new jobs to be queued"); + sleep(1); + continue; + } + + + if (isset($executedJobs[$job->getId()])) { + continue; + } + + $output->writeln("- Running job " . get_class($job) . " " . $job->getId()); + + if ($output->isVerbose()) { + $this->printJobInfo($job->getId(), $job, $output); + } + + $job->execute($this->jobList, \OC::$server->getLogger()); + + // clean up after unclean jobs + \OC_Util::tearDownFS(); + \OC::$server->getTempManager()->clean(); + + $this->jobList->setLastJob($job); + $executedJobs[$job->getId()] = time(); + unset($job); + + if ($input->getOption('once')) { + break; + } + } + + foreach ($executedJobs as $id => $time) { + unset($executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + + return 0; + } + + protected function printJobInfo(int $jobId, IJob $job, OutputInterface $output): void + { + $row = $this->jobList->getDetailsById($jobId); + + $lastRun = new \DateTime(); + $lastRun->setTimestamp((int) $row['last_run']); + $lastChecked = new \DateTime(); + $lastChecked->setTimestamp((int) $row['last_checked']); + $reservedAt = new \DateTime(); + $reservedAt->setTimestamp((int) $row['reserved_at']); + + $output->writeln('Job class: ' . get_class($job)); + $output->writeln('Arguments: ' . json_encode($job->getArgument())); + + $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; + if ($isTimedJob) { + $output->writeln('Type: timed'); + } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { + $output->writeln('Type: queued'); + } else { + $output->writeln('Type: job'); + } + + $output->writeln(''); + $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); + if ((int) $row['reserved_at'] === 0) { + $output->writeln('Reserved at: -'); + } else { + $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); + } + $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); + $output->writeln('Last duration: ' . $row['execution_duration']); + + if ($isTimedJob) { + $reflection = new \ReflectionClass($job); + $intervalProperty = $reflection->getProperty('interval'); + $intervalProperty->setAccessible(true); + $interval = $intervalProperty->getValue($job); + + $nextRun = new \DateTime(); + $nextRun->setTimestamp($row['last_run'] + $interval); + + if ($nextRun > new \DateTime()) { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } else { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } + } + } +} diff --git a/core/register_command.php b/core/register_command.php index 8f600d7b89488..a73572d665f27 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -92,6 +92,8 @@ $application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->getLogger())); $application->add(new OC\Core\Command\Background\ListCommand(\OC::$server->getJobList())); + $application->add(new OC\Core\Command\Background\Worker(\OC::$server->getJobList(), \OC::$server->get(LoggerInterface::class))); + $application->add(\OC::$server->query(\OC\Core\Command\Broadcast\Test::class)); $application->add(new OC\Core\Command\Config\App\DeleteConfig(\OC::$server->getConfig())); diff --git a/cron.php b/cron.php index 7d661621ed090..4268b2ef36657 100644 --- a/cron.php +++ b/cron.php @@ -142,7 +142,8 @@ $endTime = time() + 14 * 60; $executedJobs = []; - while ($job = $jobList->getNext($onlyTimeSensitive)) { + $jobClass = isset($argv[1]) ? $argv[1] : null; + while ($job = $jobList->getNext($onlyTimeSensitive, $jobClass)) { if (isset($executedJobs[$job->getId()])) { $jobList->unlockJob($job); break; diff --git a/lib/private/BackgroundJob/JobList.php b/lib/private/BackgroundJob/JobList.php index 3cdfee5113890..b3d6d9a50d64c 100644 --- a/lib/private/BackgroundJob/JobList.php +++ b/lib/private/BackgroundJob/JobList.php @@ -205,7 +205,7 @@ public function getJobsIterator($job, ?int $limit, int $offset): iterable { * Get the next job in the list * @return ?IJob the next job to run. Beware that this object may be a singleton and may be modified by the next call to buildJob. */ - public function getNext(bool $onlyTimeSensitive = false): ?IJob { + public function getNext(bool $onlyTimeSensitive = false, ?string $jobClass = null): ?IJob { $query = $this->connection->getQueryBuilder(); $query->select('*') ->from('jobs') @@ -218,6 +218,10 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { $query->andWhere($query->expr()->eq('time_sensitive', $query->createNamedParameter(IJob::TIME_SENSITIVE, IQueryBuilder::PARAM_INT))); } + if ($jobClass) { + $query->andWhere($query->expr()->eq('class', $query->createNamedParameter($jobClass))); + } + $update = $this->connection->getQueryBuilder(); $update->update('jobs') ->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime())) @@ -238,7 +242,7 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { if ($count === 0) { // Background job already executed elsewhere, try again. - return $this->getNext($onlyTimeSensitive); + return $this->getNext($onlyTimeSensitive, $jobClass); } $job = $this->buildJob($row); @@ -252,7 +256,7 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { $reset->executeStatement(); // Background job from disabled app, try again. - return $this->getNext($onlyTimeSensitive); + return $this->getNext($onlyTimeSensitive, $jobClass); } return $job; @@ -374,6 +378,24 @@ public function setExecutionTime(IJob $job, $timeTaken): void { $query->executeStatement(); } + public function countByClass(): array { + $query = $this->connection->getQueryBuilder(); + $query->select('class') + ->selectAlias($query->func()->count('id'), 'count') + ->from('jobs') + ->orderBy('count') + ->groupBy('class'); + + $result = $query->executeQuery(); + + $jobs = []; + while ($row = $result->fetch()) { + $jobs[] = $row; + } + + return $jobs; + } + /** * Reset the $job so it executes on the next trigger * diff --git a/lib/public/BackgroundJob/IJobList.php b/lib/public/BackgroundJob/IJobList.php index 65e2f5b6250e1..f50e4ec9e49e1 100644 --- a/lib/public/BackgroundJob/IJobList.php +++ b/lib/public/BackgroundJob/IJobList.php @@ -99,7 +99,7 @@ public function getJobsIterator($job, ?int $limit, int $offset): iterable; * * @since 7.0.0 - In 24.0.0 parameter $onlyTimeSensitive got added */ - public function getNext(bool $onlyTimeSensitive = false): ?IJob; + public function getNext(bool $onlyTimeSensitive = false, ?string $jobClass = null): ?IJob; /** * @since 7.0.0 @@ -146,6 +146,14 @@ public function setExecutionTime(IJob $job, int $timeTaken): void; */ public function resetBackgroundJob(IJob $job): void; + /** + * Count the number of job by class name + * + * @array{class-string, int} + * @since 25.0.0 + */ + public function countByClass(): array; + /** * Checks whether a job of the passed class is reserved to run *