Skip to content

Commit

Permalink
add a limit to the number of concurrent processes (#9)
Browse files Browse the repository at this point in the history
* limit the number of concurrent processes

* use 0.6.1 so table renders nicely

* clear summary on completion
  • Loading branch information
Harry Bragg authored Jun 30, 2017
1 parent 33b2da2 commit 23d19d9
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 82 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"squizlabs/php_codesniffer": "^2.9",
"graze/standards": "^1.0",
"symfony/console": "^3.1",
"graze/console-diff-renderer": "^0.6",
"graze/console-diff-renderer": "^0.6.1",
"mockery/mockery": "^0.9.9"
},
"suggest": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

use RuntimeException;

class AlreadyRunningException extends RuntimeException
class NotRunningException extends RuntimeException
{
}
114 changes: 100 additions & 14 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,29 @@
namespace Graze\ParallelProcess;

use Graze\DataStructure\Collection\Collection;
use Graze\ParallelProcess\Exceptions\AlreadyRunningException;
use Graze\ParallelProcess\Exceptions\NotRunningException;
use InvalidArgumentException;
use Symfony\Component\Process\Process;

class Pool extends Collection implements RunInterface
{
const CHECK_INTERVAL = 0.1;
const NO_MAX = -1;

/** @var RunInterface[] */
protected $items = [];
/** @var RunInterface[] */
protected $running = [];
/** @var RunInterface[] */
protected $waiting = [];
/** @var callable|null */
protected $onSuccess;
/** @var callable|null */
protected $onFailure;
/** @var callable|null */
protected $onProgress;
/** @var int */
private $maxSimultaneous = -1;

/**
* Pool constructor.
Expand All @@ -42,18 +47,21 @@ class Pool extends Collection implements RunInterface
* @param callable|null $onSuccess function (Process $process, float $duration, string $last) : void
* @param callable|null $onFailure function (Process $process, float $duration, string $last) : void
* @param callable|null $onProgress function (Process $process, float $duration, string $last) : void
* @param int $maxSimultaneous
*/
public function __construct(
array $items = [],
callable $onSuccess = null,
callable $onFailure = null,
callable $onProgress = null
callable $onProgress = null,
$maxSimultaneous = self::NO_MAX
) {
parent::__construct($items);

$this->onSuccess = $onSuccess;
$this->onFailure = $onFailure;
$this->onProgress = $onProgress;
$this->maxSimultaneous = $maxSimultaneous;
}

/**
Expand Down Expand Up @@ -106,15 +114,14 @@ public function add($item)
throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection");
}

$itemRunning = $item->isRunning();
if ((count($this->running) > 0) && !$itemRunning) {
throw new AlreadyRunningException("add: unable to add an item when the pool is currently running");
if (!$this->isRunning() && $item->isRunning()) {
throw new NotRunningException("add: unable to add a running item when the pool has not started");
}

parent::add($item);

if ($itemRunning) {
$this->running[] = $item;
if ($this->isRunning()) {
$this->startRun($item);
}

return $this;
Expand Down Expand Up @@ -145,14 +152,27 @@ protected function addProcess(Process $process)
public function start()
{
foreach ($this->items as $run) {
$run->start();
$this->startRun($run);
}

$this->running = $this->items;

return $this;
}

/**
* Start a run (or queue it if we are running the maximum number of processes already)
*
* @param RunInterface $run
*/
private function startRun(RunInterface $run)
{
if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) {
$run->start();
$this->running[] = $run;
} else {
$this->waiting[] = $run;
}
}

/**
* Blocking call to run processes;
*
Expand All @@ -163,14 +183,31 @@ public function start()
public function run($checkInterval = self::CHECK_INTERVAL)
{
$this->start();
$interval = $checkInterval * 1000000;

while ($this->isRunning()) {
usleep($checkInterval * 1000000);
while ($this->poll()) {
usleep($interval);
}

return $this->isSuccessful();
}

/**
* Check when a run has finished, if there are processes waiting, start them
*/
private function checkFinished()
{
if ($this->maxSimultaneous !== static::NO_MAX
&& count($this->waiting) > 0
&& count($this->running) < $this->maxSimultaneous) {
for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) {
$run = array_shift($this->waiting);
$run->start();
$this->running[] = $run;
}
}
}

/**
* Determine if any item has run
*
Expand All @@ -191,13 +228,23 @@ public function hasStarted()
*
* @return bool
*/
public function isRunning()
public function poll()
{
/** @var Run[] $running */
$this->running = array_filter($this->running, function (RunInterface $run) {
return $run->isRunning();
return $run->poll();
});

$this->checkFinished();

return $this->isRunning();
}

/**
* @return bool
*/
public function isRunning()
{
return count($this->running) > 0;
}

Expand All @@ -220,4 +267,43 @@ public function isSuccessful()

return true;
}

/**
* Get a list of all the currently running runs
*
* @return RunInterface[]
*/
public function getRunning()
{
return $this->running;
}

/**
* Get a list of all the current waiting runs
*
* @return RunInterface[]
*/
public function getWaiting()
{
return $this->waiting;
}

/**
* @return int
*/
public function getMaxSimultaneous()
{
return $this->maxSimultaneous;
}

/**
* @param int $maxSimultaneous
*
* @return $this
*/
public function setMaxSimultaneous($maxSimultaneous)
{
$this->maxSimultaneous = $maxSimultaneous;
return $this;
}
}
15 changes: 11 additions & 4 deletions src/Run.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Graze\ParallelProcess;

use Graze\ParallelProcess\Exceptions\AlreadyRunningException;
use Symfony\Component\Process\Process;

class Run implements RunInterface
Expand Down Expand Up @@ -61,8 +60,6 @@ public function start()
});
$this->started = microtime(true);
$this->completed = false;
} else {
throw new AlreadyRunningException("start: this run is already running");
}

return $this;
Expand All @@ -73,7 +70,7 @@ public function start()
*
* @return bool true if the process is currently running (started and not terminated)
*/
public function isRunning()
public function poll()
{
if ($this->completed || !$this->hasStarted()) {
return false;
Expand All @@ -95,6 +92,16 @@ public function isRunning()
return false;
}

/**
* Return if the underlying process is running
*
* @return bool
*/
public function isRunning()
{
return $this->process->isRunning();
}

/**
* Call an event callback
*
Expand Down
11 changes: 9 additions & 2 deletions src/RunInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function hasStarted();
*
* @return $this
*
* @throws \Graze\ParallelProcess\Exceptions\AlreadyRunningException
* @throws \Graze\ParallelProcess\Exceptions\NotRunningException
*/
public function start();

Expand All @@ -30,9 +30,16 @@ public function start();
public function isSuccessful();

/**
* Polls to see if this process is running
* We think this is running
*
* @return bool
*/
public function isRunning();

/**
* Pools to see if this process is running
*
* @return bool
*/
public function poll();
}
47 changes: 46 additions & 1 deletion src/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class Table
private $terminal;
/** @var bool */
private $showOutput = true;
/** @var bool */
private $showSummary = true;

/**
* Table constructor.
Expand Down Expand Up @@ -141,6 +143,27 @@ function ($process, $duration, $last) use ($index, $data) {
$this->updateRowKeyLengths($data);
}

/**
* @return string
*/
private function getSummary()
{
if ($this->processPool->hasStarted()) {
if ($this->processPool->isRunning()) {
return sprintf(
'<comment>Total</comment>: %2d, <comment>Running</comment>: %2d, <comment>Waiting</comment>: %2d',
$this->processPool->count(),
count($this->processPool->getRunning()),
count($this->processPool->getWaiting())
);
} else {
return '';
}
} else {
return 'waiting...';
}
}

/**
* Render a specific row
*
Expand All @@ -149,7 +172,8 @@ function ($process, $duration, $last) use ($index, $data) {
private function render($row = 0)
{
if ($this->output->getVerbosity() >= OutputInterface::VERBOSITY_VERBOSE) {
$this->output->reWrite($this->rows, true);
$rows = ($this->showSummary ? array_merge($this->rows, [$this->getSummary()]) : $this->rows);
$this->output->reWrite($rows, !$this->showSummary);
} else {
$this->output->writeln($this->rows[$row]);
}
Expand All @@ -167,6 +191,9 @@ public function run($checkInterval = Pool::CHECK_INTERVAL)
$this->render();
}
$output = $this->processPool->run($checkInterval);
if ($this->output->getVerbosity() >= OutputInterface::VERBOSITY_VERBOSE && $this->showSummary) {
$this->render();
}

if (count($this->exceptions) > 0) {
foreach ($this->exceptions as $exception) {
Expand Down Expand Up @@ -195,7 +222,25 @@ public function isShowOutput()
public function setShowOutput($showOutput)
{
$this->showOutput = $showOutput;
return $this;
}

/**
* @return bool
*/
public function isShowSummary()
{
return $this->showSummary;
}

/**
* @param bool $showSummary
*
* @return $this
*/
public function setShowSummary($showSummary)
{
$this->showSummary = $showSummary;
return $this;
}
}
4 changes: 3 additions & 1 deletion tests/example/app.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

$output = new ConsoleOutput(ConsoleOutput::VERBOSITY_VERY_VERBOSE);

$table = new Table($output);
$pool = new \Graze\ParallelProcess\Pool();
$pool->setMaxSimultaneous(3);
$table = new Table($output, $pool);
for ($i = 0; $i < 5; $i++) {
$time = $i + 5;
$table->add(new Process(sprintf('for i in `seq 1 %d` ; do date ; sleep 1 ; done', $time)), ['sleep' => $time]);
Expand Down
Loading

0 comments on commit 23d19d9

Please sign in to comment.