Skip to content

Commit 9f3d7dd

Browse files
committed
Merge #246 [backport25] Multiple cron background jobs based on class
2 parents 6cbdd3f + 5f3310f commit 9f3d7dd

File tree

7 files changed

+302
-67
lines changed

7 files changed

+302
-67
lines changed

core/Command/Background/Job.php

+5-61
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,17 @@
2525

2626
namespace OC\Core\Command\Background;
2727

28-
use OCP\BackgroundJob\IJob;
2928
use OCP\BackgroundJob\IJobList;
30-
use OCP\ILogger;
31-
use Symfony\Component\Console\Command\Command;
29+
use Psr\Log\LoggerInterface;
3230
use Symfony\Component\Console\Input\InputArgument;
3331
use Symfony\Component\Console\Input\InputInterface;
3432
use Symfony\Component\Console\Input\InputOption;
3533
use Symfony\Component\Console\Output\OutputInterface;
3634

37-
class Job extends Command {
38-
protected IJobList $jobList;
39-
protected ILogger $logger;
40-
35+
class Job extends JobBase {
4136
public function __construct(IJobList $jobList,
42-
ILogger $logger) {
43-
parent::__construct();
44-
$this->jobList = $jobList;
45-
$this->logger = $logger;
37+
LoggerInterface $logger) {
38+
parent::__construct($jobList, $logger);
4639
}
4740

4841
protected function configure(): void {
@@ -89,7 +82,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
8982
$output->writeln('<error>Something went wrong when trying to retrieve Job with ID ' . $jobId . ' from database</error>');
9083
return 1;
9184
}
92-
$job->execute($this->jobList, $this->logger);
85+
$job->execute($this->jobList, \OC::$server->getLogger());
9386
$job = $this->jobList->getById($jobId);
9487

9588
if (($job === null) || ($lastRun !== $job->getLastRun())) {
@@ -106,53 +99,4 @@ protected function execute(InputInterface $input, OutputInterface $output): int
10699

107100
return 0;
108101
}
109-
110-
protected function printJobInfo(int $jobId, IJob $job, OutputInterface$output): void {
111-
$row = $this->jobList->getDetailsById($jobId);
112-
113-
$lastRun = new \DateTime();
114-
$lastRun->setTimestamp((int) $row['last_run']);
115-
$lastChecked = new \DateTime();
116-
$lastChecked->setTimestamp((int) $row['last_checked']);
117-
$reservedAt = new \DateTime();
118-
$reservedAt->setTimestamp((int) $row['reserved_at']);
119-
120-
$output->writeln('Job class: ' . get_class($job));
121-
$output->writeln('Arguments: ' . json_encode($job->getArgument()));
122-
123-
$isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob;
124-
if ($isTimedJob) {
125-
$output->writeln('Type: timed');
126-
} elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) {
127-
$output->writeln('Type: queued');
128-
} else {
129-
$output->writeln('Type: job');
130-
}
131-
132-
$output->writeln('');
133-
$output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM));
134-
if ((int) $row['reserved_at'] === 0) {
135-
$output->writeln('Reserved at: -');
136-
} else {
137-
$output->writeln('Reserved at: <comment>' . $reservedAt->format(\DateTimeInterface::ATOM) . '</comment>');
138-
}
139-
$output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM));
140-
$output->writeln('Last duration: ' . $row['execution_duration']);
141-
142-
if ($isTimedJob) {
143-
$reflection = new \ReflectionClass($job);
144-
$intervalProperty = $reflection->getProperty('interval');
145-
$intervalProperty->setAccessible(true);
146-
$interval = $intervalProperty->getValue($job);
147-
148-
$nextRun = new \DateTime();
149-
$nextRun->setTimestamp($row['last_run'] + $interval);
150-
151-
if ($nextRun > new \DateTime()) {
152-
$output->writeln('Next execution: <comment>' . $nextRun->format(\DateTimeInterface::ATOM) . '</comment>');
153-
} else {
154-
$output->writeln('Next execution: <info>' . $nextRun->format(\DateTimeInterface::ATOM) . '</info>');
155-
}
156-
}
157-
}
158102
}

core/Command/Background/JobBase.php

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/**
6+
* @copyright Copyright (c) 2022 Julius Härtl <jus@bitgrid.net>
7+
*
8+
* @author Julius Härtl <jus@bitgrid.net>
9+
*
10+
* @license GNU AGPL version 3 or any later version
11+
*
12+
* This program is free software: you can redistribute it and/or modify
13+
* it under the terms of the GNU Affero General Public License as
14+
* published by the Free Software Foundation, either version 3 of the
15+
* License, or (at your option) any later version.
16+
*
17+
* This program is distributed in the hope that it will be useful,
18+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
19+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20+
* GNU Affero General Public License for more details.
21+
*
22+
* You should have received a copy of the GNU Affero General Public License
23+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
24+
*/
25+
26+
27+
namespace OC\Core\Command\Background;
28+
29+
use OCP\BackgroundJob\IJob;
30+
use OCP\BackgroundJob\IJobList;
31+
use Psr\Log\LoggerInterface;
32+
use Symfony\Component\Console\Output\OutputInterface;
33+
34+
abstract class JobBase extends \OC\Core\Command\Base {
35+
protected IJobList $jobList;
36+
protected LoggerInterface $logger;
37+
38+
public function __construct(IJobList $jobList,
39+
LoggerInterface $logger) {
40+
parent::__construct();
41+
$this->jobList = $jobList;
42+
$this->logger = $logger;
43+
}
44+
45+
protected function printJobInfo(int $jobId, IJob $job, OutputInterface $output): void {
46+
$row = $this->jobList->getDetailsById($jobId);
47+
48+
$lastRun = new \DateTime();
49+
$lastRun->setTimestamp((int) $row['last_run']);
50+
$lastChecked = new \DateTime();
51+
$lastChecked->setTimestamp((int) $row['last_checked']);
52+
$reservedAt = new \DateTime();
53+
$reservedAt->setTimestamp((int) $row['reserved_at']);
54+
55+
$output->writeln('Job class: ' . get_class($job));
56+
$output->writeln('Arguments: ' . json_encode($job->getArgument()));
57+
58+
$isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob;
59+
if ($isTimedJob) {
60+
$output->writeln('Type: timed');
61+
} elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) {
62+
$output->writeln('Type: queued');
63+
} else {
64+
$output->writeln('Type: job');
65+
}
66+
67+
$output->writeln('');
68+
$output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM));
69+
if ((int) $row['reserved_at'] === 0) {
70+
$output->writeln('Reserved at: -');
71+
} else {
72+
$output->writeln('Reserved at: <comment>' . $reservedAt->format(\DateTimeInterface::ATOM) . '</comment>');
73+
}
74+
$output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM));
75+
$output->writeln('Last duration: ' . $row['execution_duration']);
76+
77+
if ($isTimedJob) {
78+
$reflection = new \ReflectionClass($job);
79+
$intervalProperty = $reflection->getProperty('interval');
80+
$intervalProperty->setAccessible(true);
81+
$interval = $intervalProperty->getValue($job);
82+
83+
$nextRun = new \DateTime();
84+
$nextRun->setTimestamp($row['last_run'] + $interval);
85+
86+
if ($nextRun > new \DateTime()) {
87+
$output->writeln('Next execution: <comment>' . $nextRun->format(\DateTimeInterface::ATOM) . '</comment>');
88+
} else {
89+
$output->writeln('Next execution: <info>' . $nextRun->format(\DateTimeInterface::ATOM) . '</info>');
90+
}
91+
}
92+
}
93+
}

core/Command/Background/JobWorker.php

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
/**
5+
* @copyright Copyright (c) 2021, Joas Schilling <coding@schilljs.com>
6+
*
7+
* @author Joas Schilling <coding@schilljs.com>
8+
*
9+
* @license GNU AGPL version 3 or any later version
10+
*
11+
* This program is free software: you can redistribute it and/or modify
12+
* it under the terms of the GNU Affero General Public License as
13+
* published by the Free Software Foundation, either version 3 of the
14+
* License, or (at your option) any later version.
15+
*
16+
* This program is distributed in the hope that it will be useful,
17+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
18+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19+
* GNU Affero General Public License for more details.
20+
*
21+
* You should have received a copy of the GNU Affero General Public License
22+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
23+
*
24+
*/
25+
26+
namespace OC\Core\Command\Background;
27+
28+
use OC\Core\Command\InterruptedException;
29+
use OCP\BackgroundJob\IJobList;
30+
use Psr\Log\LoggerInterface;
31+
use Symfony\Component\Console\Input\InputArgument;
32+
use Symfony\Component\Console\Input\InputInterface;
33+
use Symfony\Component\Console\Input\InputOption;
34+
use Symfony\Component\Console\Output\OutputInterface;
35+
36+
class JobWorker extends JobBase {
37+
private array $executedJobs = [];
38+
39+
public function __construct(IJobList $jobList,
40+
LoggerInterface $logger) {
41+
parent::__construct($jobList, $logger);
42+
}
43+
44+
protected function configure(): void {
45+
parent::configure();
46+
47+
$this
48+
->setName('background-job:worker')
49+
->setDescription('Run a background job worker')
50+
->addArgument(
51+
'job-class',
52+
InputArgument::OPTIONAL,
53+
'The class of the job in the database'
54+
)
55+
->addOption(
56+
'once',
57+
null,
58+
InputOption::VALUE_NONE,
59+
'Only execute the worker once (as a regular cron execution would do it)'
60+
)
61+
->addOption(
62+
'interval',
63+
'i',
64+
InputOption::VALUE_OPTIONAL,
65+
'Interval in seconds in which the worker should repeat already processed jobs (set to 0 for no repeat)',
66+
5
67+
)
68+
;
69+
}
70+
71+
protected function execute(InputInterface $input, OutputInterface $output): int {
72+
$jobClass = $input->getArgument('job-class');
73+
74+
if ($jobClass && !class_exists($jobClass)) {
75+
$output->writeln('<error>Invalid job class</error>');
76+
return 1;
77+
}
78+
79+
while (true) {
80+
// Handle canceling of the process
81+
try {
82+
$this->abortIfInterrupted();
83+
} catch (InterruptedException $e) {
84+
$output->writeln('<comment>Cleaning up before quitting. Press Ctrl-C again to kill, but this may have unexpected side effects.</comment>');
85+
$this->unlockExecuted();
86+
$output->writeln('<info>Background job worker stopped</info>');
87+
break;
88+
}
89+
90+
$this->printSummary($input, $output);
91+
92+
$interval = (int)($input->getOption('interval') ?? 5);
93+
94+
// Unlock jobs that should be executed again after the interval
95+
// Alternative could be to set last_checked to interval in the future to avoid the extra locks
96+
foreach ($this->executedJobs as $id => $time) {
97+
if ($time <= time() - $interval) {
98+
unset($this->executedJobs[$id]);
99+
$job = $this->jobList->getById($id);
100+
if ($job !== null) {
101+
$this->jobList->unlockJob($job);
102+
}
103+
}
104+
}
105+
106+
usleep(50000);
107+
$job = $this->jobList->getNext(false, $jobClass);
108+
if (!$job) {
109+
if ($input->getOption('once') === true || $interval === 0) {
110+
break;
111+
}
112+
113+
$output->writeln("Waiting for new jobs to be queued", OutputInterface::VERBOSITY_VERBOSE);
114+
// Re-check interval for new jobs
115+
sleep(1);
116+
continue;
117+
}
118+
119+
120+
if (isset($this->executedJobs[$job->getId()]) && ($this->executedJobs[$job->getId()] + $interval > time())) {
121+
$output->writeln("<comment>Job already executed within timeframe " . get_class($job) . " " . $job->getId() . '</comment>', OutputInterface::VERBOSITY_VERBOSE);
122+
continue;
123+
}
124+
125+
$output->writeln("Running job " . get_class($job) . " with ID " . $job->getId());
126+
127+
if ($output->isVerbose()) {
128+
$this->printJobInfo($job->getId(), $job, $output);
129+
}
130+
131+
$job->execute($this->jobList, \OC::$server->getLogger());
132+
133+
// clean up after unclean jobs
134+
\OC_Util::tearDownFS();
135+
\OC::$server->getTempManager()->clean();
136+
137+
$this->jobList->setLastJob($job);
138+
$this->jobList->unlockJob($job);
139+
$this->executedJobs[$job->getId()] = time();
140+
141+
if ($input->getOption('once') === true) {
142+
break;
143+
}
144+
}
145+
146+
$this->unlockExecuted();
147+
148+
return 0;
149+
}
150+
151+
private function printSummary(InputInterface $input, OutputInterface $output): void {
152+
if (!$output->isVeryVerbose()) {
153+
return;
154+
}
155+
$output->writeln("<comment>Summary</comment>");
156+
157+
$counts = [];
158+
foreach ($this->jobList->countByClass() as $row) {
159+
$counts[] = $row;
160+
}
161+
$this->writeTableInOutputFormat($input, $output, $counts);
162+
}
163+
164+
private function unlockExecuted() {
165+
foreach ($this->executedJobs as $id => $time) {
166+
unset($this->executedJobs[$id]);
167+
$job = $this->jobList->getById($id);
168+
if ($job !== null) {
169+
$this->jobList->unlockJob($job);
170+
}
171+
}
172+
}
173+
}

core/register_command.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@
8989
$application->add(new OC\Core\Command\Background\Cron(\OC::$server->getConfig()));
9090
$application->add(new OC\Core\Command\Background\WebCron(\OC::$server->getConfig()));
9191
$application->add(new OC\Core\Command\Background\Ajax(\OC::$server->getConfig()));
92-
$application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->getLogger()));
92+
$application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->get(LoggerInterface::class)));
93+
$application->add(\OCP\Server::get(OC\Core\Command\Background\JobWorker::class));
9394
$application->add(new OC\Core\Command\Background\ListCommand(\OC::$server->getJobList()));
9495

9596
$application->add(\OC::$server->query(\OC\Core\Command\Broadcast\Test::class));

cron.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@
142142
$endTime = time() + 14 * 60;
143143

144144
$executedJobs = [];
145-
while ($job = $jobList->getNext($onlyTimeSensitive)) {
145+
$jobClass = isset($argv[1]) ? $argv[1] : null;
146+
while ($job = $jobList->getNext($onlyTimeSensitive, $jobClass)) {
146147
if (isset($executedJobs[$job->getId()])) {
147148
$jobList->unlockJob($job);
148149
break;

0 commit comments

Comments
 (0)