From cf8e22b8e325493ab188cc3195fc023b7f8a5767 Mon Sep 17 00:00:00 2001 From: Pim Elshoff Date: Mon, 30 Jan 2012 23:40:06 +0100 Subject: [PATCH 1/2] Changed execution cycle to execute new workers on callback --- Manager.php | 94 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 31 deletions(-) diff --git a/Manager.php b/Manager.php index 6fe7110..e4e1dfc 100644 --- a/Manager.php +++ b/Manager.php @@ -1,4 +1,5 @@ startExecution(); - /** @var DocBlox_Parallel_Worker $worker */ - foreach ($this as $worker) { - - // if requirements are not met, execute workers in series. - if (!$this->checkRequirements()) { + // if requirements are not met, execute workers in series. + if (!$this->checkRequirements()) { + /** @var DocBlox_Parallel_Worker $worker */ + foreach ($this as $worker) { $worker->execute(); - continue; } + } else { + // Register signalling callback + pcntl_signal(SIGUSR1, array($this, 'startNextWorker')); - $this->forkAndRun($worker, $processes); + // Register a copy of the workers to shift from + $this->workersToDo = $this->getArrayCopy(); + + // Start as many workers as we can + for ($i = 0; $i < $this->getProcessLimit() && $i < count($this); $i++) { + $this->startNextWorker(); + } + + // Listen for signals from child processes + while (true) { + $pid = pcntl_waitpid(-1, $status); + if (isset($this->processes[$pid])) { + unset($this->processes[$pid]); + } + if (empty($this->workersToDo) && empty($this->processes)) { + break; + } + } } - $this->stopExecution($processes); + $this->stopExecution(); + } + + /** + * Forks the current process and calls the current Worker's execute method + * OR handles the parent process' execution. + * + * This is the really tricky part of the forking mechanism. Here we invoke + * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork} + * and either execute the forked process or deal with the parent's process + * based on in which process we are. + * + * @throws RuntimeException + */ + protected function startNextWorker() + { + $worker = array_shift($this->workersToDo); + if (empty($worker)) { + return; + } + $this->forkAndRun($worker); } /** @@ -218,21 +263,14 @@ protected function startExecution() * Waits for all processes to have finished and notifies the manager that * execution has stopped. * - * @param int[] &$processes List of running processes. - * * @return void */ - protected function stopExecution(array &$processes) + protected function stopExecution() { - // starting of processes has ended but some processes might still be - // running wait for them to finish - while (!empty($processes)) { - pcntl_waitpid(array_shift($processes), $status); - } - /** @var DocBlox_Parallel_Worker $worker */ foreach ($this as $worker) { - $worker->pipe->push(); + if (isset($worker->pipe)) + $worker->pipe->push(); } $this->is_running = false; @@ -252,22 +290,17 @@ protected function stopExecution(array &$processes) * {@link http://www.php.net/manual/en/function.pcntl-fork.php pcntl_fork} * and associated articles. * - * If there are more workers than may be ran simultaneously then this method - * will wait until a slot becomes available and then starts the next worker. - * * @param DocBlox_Parallel_Worker $worker The worker to process. - * @param int[] &$processes The list of running processes. * * @throws RuntimeException if we are unable to fork. * * @return void */ - protected function forkAndRun( - DocBlox_Parallel_Worker $worker, array &$processes - ) { + protected function forkAndRun(DocBlox_Parallel_Worker $worker) { $worker->pipe = new DocBlox_Parallel_WorkerPipe($worker); // fork the process and register the PID + $parentPid = getmypid(); $pid = pcntl_fork(); switch ($pid) { @@ -278,16 +311,15 @@ protected function forkAndRun( $worker->pipe->pull(); + // Signal the parent the child is ready + posix_kill($parentPid, SIGUSR1); + // Kill -9 this process to prevent closing of shared file handlers. // Not doing this causes, for example, MySQL connections to be cleaned. posix_kill(getmypid(), SIGKILL); default: // Parent process - // Keep track if the worker children - $processes[] = $pid; - - if (count($processes) >= $this->getProcessLimit()) { - pcntl_waitpid(array_shift($processes), $status); - } + // Keep track of the worker children + $this->processes[$pid] = true; break; } } From a87af4c21e8a5055480fcb15f083929e9661fedc Mon Sep 17 00:00:00 2001 From: Pim Elshoff Date: Mon, 30 Jan 2012 23:44:49 +0100 Subject: [PATCH 2/2] Added an extra test for timing demonstration --- example.php | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/example.php b/example.php index 3bdcd40..b08b461 100644 --- a/example.php +++ b/example.php @@ -55,3 +55,27 @@ foreach ($mgr as $worker) { var_dump($worker->getResult()); } + +// ----------------------------------------------------------------------------- +// Extra test: demonstrating timing with callbacks +// This should finish in 8 seconds without callback, and 5 seconds with +// (Tested on quad core) +// ----------------------------------------------------------------------------- + +$time = microtime(true); + +$mgr = new DocBlox_Parallel_Manager(); +$mgr + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(5); return 'k'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(4); return 'l'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(4); return 'm'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(2); return 'n'; })) + ->addWorker(new DocBlox_Parallel_Worker(function() { sleep(3); return 'o'; })) + ->execute(); + +/** @var DocBlox_Parallel_Worker $worker */ +foreach ($mgr as $worker) { + var_dump($worker->getResult()); +} + +echo 'Time: ' . (microtime(true) - $time) . PHP_EOL; \ No newline at end of file