Skip to content

Commit

Permalink
Merge pull request #65 from clue-labs/custom-pipes
Browse files Browse the repository at this point in the history
Support passing custom pipes and file descriptors to child process
  • Loading branch information
jsor authored Dec 3, 2018
2 parents 02c560e + 53f3920 commit 3534ecc
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 29 deletions.
67 changes: 63 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ as [Streams](https://github.com/reactphp/stream).
* [Stream Properties](#stream-properties)
* [Command](#command)
* [Termination](#termination)
* [Custom pipes](#custom-pipes)
* [Sigchild Compatibility](#sigchild-compatibility)
* [Windows Compatibility](#windows-compatibility)
* [Install](#install)
Expand Down Expand Up @@ -55,21 +56,31 @@ Once a process is started, its I/O streams will be constructed as instances of
Before `start()` is called, these properties are not set. Once a process terminates,
the streams will become closed but not unset.

Following common Unix conventions, this library will start each child process
with the three pipes matching the standard I/O streams as given below by default.
You can use the named references for common use cases or access these as an
array with all three pipes.

* `$stdin` or `$pipes[0]` is a `WritableStreamInterface`
* `$stdout` or `$pipes[1]` is a `ReadableStreamInterface`
* `$stderr` or `$pipes[2]` is a `ReadableStreamInterface`

Following common Unix conventions, this library will always start each child
process with the three pipes matching the standard I/O streams as given above.
You can use the named references for common use cases or access these as an
array with all three pipes.
Note that this default configuration may be overridden by explicitly passing
[custom pipes](#custom-pipes), in which case they may not be set or be assigned
different values. The `$pipes` array will always contain references to all pipes
as configured and the standard I/O references will always be set to reference
the pipes matching the above conventions. See [custom pipes](#custom-pipes) for
more details.

Because each of these implement the underlying
[`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface) or
[`WritableStreamInterface`](https://github.com/reactphp/stream#writablestreaminterface),
you can use any of their events and methods as usual:

```php
$process = new Process($command);
$process->start($loop);

$process->stdout->on('data', function ($chunk) {
echo $chunk;
});
Expand Down Expand Up @@ -298,6 +309,54 @@ properties actually allow some fine grained control over process termination,
such as first trying a soft-close and then applying a force-close after a
timeout.

### Custom pipes

Following common Unix conventions, this library will start each child process
with the three pipes matching the standard I/O streams by default. For more
advanced use cases it may be useful to pass in custom pipes, such as explicitly
passing additional file descriptors (FDs) or overriding default process pipes.

Note that passing custom pipes is considered advanced usage and requires a
more in-depth understanding of Unix file descriptors and how they are inherited
to child processes and shared in multi-processing applications.

If you do not want to use the default standard I/O pipes, you can explicitly
pass an array containing the file descriptor specification to the constructor
like this:

```php
$fds = array(
// standard I/O pipes for stdin/stdout/stderr
0 => array('pipe', 'r'),
1 => array('pipe', 'w'),
2 => array('pipe', 'w'),

// example FDs for files or open resources
4 => array('file', '/dev/null', 'r'),
6 => fopen('log.txt','a'),
8 => STDERR,

// example FDs for sockets
10 => fsockopen('localhost', 8080),
12 => stream_socket_server('tcp://0.0.0.0:4711')
);

$process = new Process($cmd, null, null, $fds);
$process->start($loop);
```

Unless your use case has special requirements that demand otherwise, you're
highly recommended to (at least) pass in the standard I/O pipes as given above.
The file descriptor specification accepts arguments in the exact same format
as the underlying [`proc_open()`](http://php.net/proc_open) function.

Once the process is started, the `$pipes` array will always contain references to
all pipes as configured and the standard I/O references will always be set to
reference the pipes matching common Unix conventions. This library supports any
number of pipes and additional file descriptors, but many common applications
being run as a child process will expect that the parent process properly
assigns these file descriptors.

### Sigchild Compatibility

Internally, this project uses a work-around to improve compatibility when PHP
Expand Down
23 changes: 23 additions & 0 deletions examples/21-fds.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

use React\EventLoop\Factory;
use React\ChildProcess\Process;

require __DIR__ . '/../vendor/autoload.php';

$loop = Factory::create();

$process = new Process('exec 0>&- 2>&-;exec ls -la /proc/self/fd', null, null, array(
1 => array('pipe', 'w')
));
$process->start($loop);

$process->stdout->on('data', function ($chunk) {
echo $chunk;
});

$process->on('exit', function ($code) {
echo 'EXIT with code ' . $code . PHP_EOL;
});

$loop->run();
77 changes: 52 additions & 25 deletions src/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,25 @@
class Process extends EventEmitter
{
/**
* @var ?WritableStreamInterface
* @var WritableStreamInterface|null|ReadableStreamInterface
*/
public $stdin;

/**
* @var ?ReadableStreamInterface
* @var ReadableStreamInterface|null|WritableStreamInterface
*/
public $stdout;

/**
* @var ?ReadableStreamInterface
* @var ReadableStreamInterface|null|WritableStreamInterface
*/
public $stderr;

/**
* Array with all process pipes (once started)
*
* Unless explicitly configured otherwise during construction, the following
* standard I/O pipes will be assigned by default:
* - 0: STDIN (`WritableStreamInterface`)
* - 1: STDOUT (`ReadableStreamInterface`)
* - 2: STDERR (`ReadableStreamInterface`)
Expand All @@ -47,6 +50,8 @@ class Process extends EventEmitter
private $cmd;
private $cwd;
private $env;
private $fds;

private $enhanceSigchildCompatibility;
private $sigchildPipe;

Expand All @@ -65,9 +70,10 @@ class Process extends EventEmitter
* @param string $cmd Command line to run
* @param null|string $cwd Current working directory or null to inherit
* @param null|array $env Environment variables or null to inherit
* @param null|array $fds File descriptors to allocate for this process (or null = default STDIO streams)
* @throws \LogicException On windows or when proc_open() is not installed
*/
public function __construct($cmd, $cwd = null, array $env = null)
public function __construct($cmd, $cwd = null, array $env = null, array $fds = null)
{
if (substr(strtolower(PHP_OS), 0, 3) === 'win') {
throw new \LogicException('Windows isn\'t supported due to the blocking nature of STDIN/STDOUT/STDERR pipes.');
Expand All @@ -87,18 +93,27 @@ public function __construct($cmd, $cwd = null, array $env = null)
}
}

if ($fds === null) {
$fds = array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);
}

$this->fds = $fds;
$this->enhanceSigchildCompatibility = self::isSigchildEnabled();
}

/**
* Start the process.
*
* After the process is started, the standard IO streams will be constructed
* and available via public properties. STDIN will be paused upon creation.
* After the process is started, the standard I/O streams will be constructed
* and available via public properties.
*
* @param LoopInterface $loop Loop interface for stream construction
* @param float $interval Interval to periodically monitor process state (seconds)
* @throws RuntimeException If the process is already running or fails to start
* @throws \RuntimeException If the process is already running or fails to start
*/
public function start(LoopInterface $loop, $interval = 0.1)
{
Expand All @@ -107,17 +122,22 @@ public function start(LoopInterface $loop, $interval = 0.1)
}

$cmd = $this->cmd;
$fdSpec = array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);

$fdSpec = $this->fds;
$sigchild = null;

// Read exit code through fourth pipe to work around --enable-sigchild
if ($this->enhanceSigchildCompatibility) {
$fdSpec[] = array('pipe', 'w');
$sigchild = 3;
\end($fdSpec);
$sigchild = \key($fdSpec);

// make sure this is fourth or higher (do not mess with STDIO)
if ($sigchild < 3) {
$fdSpec[3] = $fdSpec[$sigchild];
unset($fdSpec[$sigchild]);
$sigchild = 3;
}

$cmd = sprintf('(%s) ' . $sigchild . '>/dev/null; code=$?; echo $code >&' . $sigchild . '; exit $code', $cmd);
}

Expand All @@ -127,13 +147,13 @@ public function start(LoopInterface $loop, $interval = 0.1)
throw new \RuntimeException('Unable to launch a new process.');
}

$closeCount = 0;

// count open process pipes and await close event for each to drain buffers before detecting exit
$that = $this;
$closeCount = 0;
$streamCloseHandler = function () use (&$closeCount, $loop, $interval, $that) {
$closeCount++;
$closeCount--;

if ($closeCount < 2) {
if ($closeCount > 0) {
return;
}

Expand All @@ -160,18 +180,25 @@ public function start(LoopInterface $loop, $interval = 0.1)
}

foreach ($pipes as $n => $fd) {
if ($n === 0) {
$meta = \stream_get_meta_data($fd);
if (\strpos($meta['mode'], 'w') !== false) {
$stream = new WritableResourceStream($fd, $loop);
} else {
$stream = new ReadableResourceStream($fd, $loop);
$stream->on('close', $streamCloseHandler);
$closeCount++;
}
$this->pipes[$n] = $stream;
}

$this->stdin = $this->pipes[0];
$this->stdout = $this->pipes[1];
$this->stderr = $this->pipes[2];
$this->stdin = isset($this->pipes[0]) ? $this->pipes[0] : null;
$this->stdout = isset($this->pipes[1]) ? $this->pipes[1] : null;
$this->stderr = isset($this->pipes[2]) ? $this->pipes[2] : null;

// immediately start checking for process exit when started without any I/O pipes
if (!$closeCount) {
$streamCloseHandler();
}
}

/**
Expand All @@ -186,9 +213,9 @@ public function close()
return;
}

$this->stdin->close();
$this->stdout->close();
$this->stderr->close();
foreach ($this->pipes as $pipe) {
$pipe->close();
}

if ($this->enhanceSigchildCompatibility) {
$this->pollExitCodePipe();
Expand Down
42 changes: 42 additions & 0 deletions tests/AbstractProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,33 @@ public function testStartWillAssignPipes()
$this->assertSame($process->stderr, $process->pipes[2]);
}

public function testStartWithoutAnyPipesWillNotAssignPipes()
{
$process = new Process('exit 0', null, null, array());
$process->start($this->createLoop());

$this->assertNull($process->stdin);
$this->assertNull($process->stdout);
$this->assertNull($process->stderr);
$this->assertEquals(array(), $process->pipes);
}

public function testStartWithCustomPipesWillAssignPipes()
{
$process = new Process('exit 0', null, null, array(
0 => array('pipe', 'w'),
3 => array('pipe', 'r')
));
$process->start($this->createLoop());

$this->assertInstanceOf('React\Stream\ReadableStreamInterface', $process->stdin);
$this->assertNull($process->stdout);
$this->assertNull($process->stderr);
$this->assertCount(2, $process->pipes);
$this->assertSame($process->stdin, $process->pipes[0]);
$this->assertInstanceOf('React\Stream\WritableStreamInterface', $process->pipes[3]);
}

public function testIsRunning()
{
$process = new Process('sleep 1');
Expand Down Expand Up @@ -333,6 +360,21 @@ public function testDetectsClosingProcessEvenWhenAllStdioPipesHaveBeenClosed()
$time = microtime(true) - $time;

$this->assertLessThan(0.1, $time);
$this->assertSame(0, $process->getExitCode());
}

public function testDetectsClosingProcessEvenWhenStartedWithoutPipes()
{
$loop = $this->createLoop();
$process = new Process('exit 0', null, null, array());
$process->start($loop, 0.001);

$time = microtime(true);
$loop->run();
$time = microtime(true) - $time;

$this->assertLessThan(0.1, $time);
$this->assertSame(0, $process->getExitCode());
}

public function testStartInvalidProcess()
Expand Down

0 comments on commit 3534ecc

Please sign in to comment.