Skip to content

Commit

Permalink
Adding background:queue commands: worker, status and delete
Browse files Browse the repository at this point in the history
  • Loading branch information
DeepDiver1975 committed Jun 4, 2018
1 parent 2038c0b commit 3929f66
Show file tree
Hide file tree
Showing 10 changed files with 538 additions and 5 deletions.
45 changes: 45 additions & 0 deletions core/Command/Background/Queue/Delete.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace OC\Core\Command\Background\Queue;

use OCP\BackgroundJob\IJobList;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class Delete extends Command {

/** @var \OCP\BackgroundJob\IJobList */
private $jobList;

public function __construct(IJobList $jobList) {
$this->jobList = $jobList;
parent::__construct();
}

protected function configure() {
$this
->setName('background:queue:delete')
->setDescription('Delete a job from the queue')
->addArgument('id', InputArgument::REQUIRED, 'id of the job to be deleted');
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @return void
*/
protected function execute(InputInterface $input, OutputInterface $output) {
$id = $input->getArgument('id');

$job = $this->jobList->getById($id);
if ($job === null) {
$output->writeln("Job with id <$id> is not known.");
return;
}

$this->jobList->removeById($id);
$output->writeln('Job has been deleted.');
}
}
41 changes: 41 additions & 0 deletions core/Command/Background/Queue/Status.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace OC\Core\Command\Background\Queue;

use OCP\BackgroundJob\IJob;
use OCP\BackgroundJob\IJobList;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\Table;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class Status extends Command {

/** @var \OCP\BackgroundJob\IJobList */
private $jobList;

public function __construct(IJobList $jobList) {
$this->jobList = $jobList;
parent::__construct();
}

protected function configure() {
$this
->setName('background:queue:status')
->setDescription('List queue status');
}

/**
* @param InputInterface $input
* @param OutputInterface $output
* @return void
*/
protected function execute(InputInterface $input, OutputInterface $output) {
$t = new Table($output);
$t->setHeaders(['Id', 'Job', 'Last run', 'Arguments']);
$this->jobList->listJobs(function (IJob $job) use ($t) {
$t->addRow([$job->getId(), \get_class($job), \date('c', $job->getLastRun()), $job->getArgument()]);
});
$t->render();
}
}
65 changes: 65 additions & 0 deletions core/Command/Background/Queue/Worker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

namespace OC\Core\Command\Background\Queue;

use OC\Console\CommandLogger;
use OC\Core\Command\Base;
use OCP\ILogger;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class Worker extends Base {

/** @var \OCP\BackgroundJob\IJobList */
private $jobList;
/** @var ILogger */
private $logger;

public function __construct() {
$this->jobList = \OC::$server->getJobList();
parent::__construct();
}

protected function configure() {
$this
->setName("background:queue:worker")
->setDescription("Listen to the background job queue and execute the jobs")
->addOption('sleep', null, InputOption::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3);
}

/**
* @param InputInterface $input
* @param OutputInterface $output
*/
protected function execute(InputInterface $input, OutputInterface $output) {
$this->logger = new CommandLogger($output);

$waitTime = \max(1, $input->getOption('sleep'));
while (true) {
if ($this->hasBeenInterrupted()) {
break;
}
if ($this->executeNext() === null) {
\sleep($waitTime);
}
}
}

private function executeNext() {
$job = $this->jobList->getNext();
if ($job === null) {
return null;
}
$jobId = $job->getId();
$this->logger->debug('Run job with ID ' . $job->getId(), ['app' => 'cron']);
$job->execute($this->jobList, $this->logger);
$this->logger->debug('Finished job with ID ' . $job->getId(), ['app' => 'cron']);

$this->jobList->setLastJob($job);
unset($job);

return $jobId;
}
}
3 changes: 3 additions & 0 deletions core/register_command.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
$application->add(new OC\Core\Command\Background\Cron(\OC::$server->getConfig()));
$application->add(new OC\Core\Command\Background\WebCron(\OC::$server->getConfig()));
$application->add(new OC\Core\Command\Background\Ajax(\OC::$server->getConfig()));
$application->add(new OC\Core\Command\Background\Queue\Worker());
$application->add(new OC\Core\Command\Background\Queue\Status(\OC::$server->getJobList()));
$application->add(new OC\Core\Command\Background\Queue\Delete(\OC::$server->getJobList()));

$application->add(new OC\Core\Command\Config\App\DeleteConfig(\OC::$server->getConfig()));
$application->add(new OC\Core\Command\Config\App\GetConfig(\OC::$server->getConfig()));
Expand Down
1 change: 0 additions & 1 deletion db_structure.xml
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@
<default></default>
<notnull>false</notnull>
</field>

<field>
<!-- timestamp when the job was checked if it needs execution the last time -->
<name>last_checked</name>
Expand Down
26 changes: 22 additions & 4 deletions lib/private/BackgroundJob/JobList.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public function remove($job, $argument = null) {
/**
* @param int $id
*/
protected function removeById($id) {
public function removeById($id) {
$query = $this->connection->getQueryBuilder();
$query->delete('jobs')
->where($query->expr()->eq('id', $query->createNamedParameter($id, IQueryBuilder::PARAM_INT)));
Expand Down Expand Up @@ -305,9 +305,7 @@ public function getLastJob() {
}

/**
* set the lastRun of $job to now
*
* @param IJob $job
* @inheritdoc
*/
public function setLastRun($job) {
$query = $this->connection->getQueryBuilder();
Expand All @@ -324,4 +322,24 @@ public function setExecutionTime($job, $timeTaken) {
->where($query->expr()->eq('id', $query->createNamedParameter($job->getId(), IQueryBuilder::PARAM_INT)));
$query->execute();
}

/**
* @inheritdoc
*/
public function listJobs(\Closure $callback) {
$query = $this->connection->getQueryBuilder();
$query->select('*')
->from('jobs');
$result = $query->execute();

while ($row = $result->fetch()) {
$job = $this->buildJob($row);
if ($job) {
if ($callback($job) === false) {
break;
}
}
}
$result->closeCursor();
}
}
Loading

0 comments on commit 3929f66

Please sign in to comment.