Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demultiplex execStart() and execStartStream() streaming APIs #40

Merged
merged 5 commits into from
May 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,23 @@ $stream->on('close', function () {
});
```

Note that by default the output of both STDOUT and STDERR will be emitted
as normal `data` events. You can optionally pass a custom event name which
will be used to emit STDERR data so that it can be handled separately.
Note that the normal streaming primitives likely do not know about this
event, so special care may have to be taken.
Also note that this option has no effect if you execute with a TTY.

```php
$stream = $client->execStartStream($exec, $tty, 'stderr');
$stream->on('data', function ($data) {
echo 'STDOUT data: ' . $data;
});
$stream->on('stderr', function ($data) {
echo 'STDERR data: ' . $data;
});
```

See also the [streaming exec example](examples/exec-stream.php) and the [exec benchmark example](examples/benchmark-exec.php).

The TTY mode should be set depending on whether your command needs a TTY
Expand Down
4 changes: 2 additions & 2 deletions examples/exec-inspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
$factory = new Factory($loop);
$client = $factory->createClient();

$client->execCreate($container, $cmd, true)->then(function ($info) use ($client) {
$client->execCreate($container, $cmd)->then(function ($info) use ($client) {
echo 'Created with info: ' . json_encode($info) . PHP_EOL;

return $client->execInspect($info['Id']);
})->then(function ($info) use ($client) {
echo 'Inspected after creation: ' . json_encode($info, JSON_PRETTY_PRINT) . PHP_EOL;

return $client->execStart($info['ID'], true)->then(function ($out) use ($client, $info) {
return $client->execStart($info['ID'])->then(function ($out) use ($client, $info) {
echo 'Starting returned: ';
var_dump($out);

Expand Down
30 changes: 24 additions & 6 deletions examples/exec-stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,36 @@
$out = new Stream(STDOUT, $loop);
$out->pause();

$client->execCreate($container, $cmd, true)->then(function ($info) use ($client, $out) {
$stream = $client->execStartStream($info['Id'], true);
$stderr = new Stream(STDERR, $loop);
$stderr->pause();

// unkown exit code by default
$exit = 1;

$client->execCreate($container, $cmd)->then(function ($info) use ($client, $out, $stderr, &$exit) {
$stream = $client->execStartStream($info['Id'], false, 'stderr');
$stream->pipe($out);

// forward custom stderr event to STDERR stream
$stream->on('stderr', function ($data) use ($stderr, $stream) {
if ($stderr->write($data) === false) {
$stream->pause();
$stderr->once('drain', function () use ($stream) {
$stream->resume();
});
}
});

$stream->on('error', 'printf');

// exit with error code of executed command once it closes
$stream->on('close', function () use ($client, $info) {
$client->execInspect($info['Id'])->then(function ($info) {
exit($info['ExitCode']);
// remember exit code of executed command once it closes
$stream->on('close', function () use ($client, $info, &$exit) {
$client->execInspect($info['Id'])->then(function ($info) use (&$exit) {
$exit = $info['ExitCode'];
}, 'printf');
});
}, 'printf');

$loop->run();

exit($exit);
23 changes: 19 additions & 4 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -1008,16 +1008,24 @@ public function execStartDetached($exec, $tty = false)
* This works for command output of any size as only small chunks have to
* be kept in memory.
*
* @param string $exec exec ID
* @param boolean $tty tty mode
* Note that by default the output of both STDOUT and STDERR will be emitted
* as normal "data" events. You can optionally pass a custom event name which
* will be used to emit STDERR data so that it can be handled separately.
* Note that the normal streaming primitives likely do not know about this
* event, so special care may have to be taken.
* Also note that this option has no effect if you execute with a TTY.
*
* @param string $exec exec ID
* @param boolean $tty tty mode
* @param string $stderrEvent custom event to emit for STDERR data (otherwise emits as "data")
* @return ReadableStreamInterface stream of exec data
* @link https://docs.docker.com/reference/api/docker_remote_api_v1.15/#exec-start
* @see self::execStart()
* @see self::execStartDetached()
*/
public function execStartStream($exec, $tty = false)
public function execStartStream($exec, $tty = false, $stderrEvent = null)
{
return $this->streamingParser->parsePlainStream(
$stream = $this->streamingParser->parsePlainStream(
$this->browser->withOptions(array('streaming' => true))->post(
$this->uri->expand(
'/exec/{exec}/start',
Expand All @@ -1033,6 +1041,13 @@ public function execStartStream($exec, $tty = false)
))
)
);

// this is a multiplexed stream unless this is started with a TTY
if (!$tty) {
$stream = $this->streamingParser->demultiplexStream($stream, $stderrEvent);
}

return $stream;
}

/**
Expand Down
136 changes: 136 additions & 0 deletions src/Io/ReadableDemultiplexStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?php

namespace Clue\React\Docker\Io;

use React\Stream\ReadableStreamInterface;
use Evenement\EventEmitter;
use React\Stream\WritableStreamInterface;
use React\Stream\Util;
/**
* Parser for Docker's own frame format used for bidrectional frames
*
* Each frame consists of a simple header containing the stream identifier and the payload length
* plus the actual payload string.
*
* @internal
* @link https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
*/
class ReadableDemultiplexStream extends EventEmitter implements ReadableStreamInterface
{
private $buffer = '';
private $closed = false;
private $multiplexed;
private $stderrEvent;

public function __construct(ReadableStreamInterface $multiplexed, $stderrEvent = null)
{
$this->multiplexed = $multiplexed;

if ($stderrEvent === null) {
$stderrEvent = 'data';
}

$this->stderrEvent = $stderrEvent;

$out = $this;
$buffer =& $this->buffer;
$closed =& $this->closed;

// pass all input data chunks through the parser
$multiplexed->on('data', array($out, 'push'));

// forward end event to output (unless parsing is still in progress)
$multiplexed->on('end', function () use (&$buffer, $out, &$closed) {
// ignore duplicate end events
if ($closed) {
return;
}

// buffer must be empty on end, otherwise this is an error situation
if ($buffer === '') {
$out->emit('end', array());
} else {
$out->emit('error', array(new \RuntimeException('Stream ended within incomplete multiplexed chunk')));
}
$out->close();
});

// forward error event to output
$multiplexed->on('error', function ($error) use ($out) {
$out->emit('error', array($error));
$out->close();
});

// forward close event to output
$multiplexed->on('close', function ($error) use ($out) {
$out->close();
});
}

/**
* push the given stream chunk into the parser buffer and try to extract all frames
*
* @internal
* @param string $chunk
*/
public function push($chunk)
{
$this->buffer .= $chunk;

while ($this->buffer !== '') {
if (!isset($this->buffer[7])) {
// last header byte not set => no complete header in buffer
break;
}

$header = unpack('Cstream/x/x/x/Nlength', substr($this->buffer, 0, 8));

if (!isset($this->buffer[7 + $header['length']])) {
// last payload byte not set => message payload is incomplete
break;
}

$payload = substr($this->buffer, 8, $header['length']);
$this->buffer = (string)substr($this->buffer, 8 + $header['length']);

$this->emit(
($header['stream'] === 2) ? $this->stderrEvent : 'data',
array($payload)
);
}
}

public function pause()
{
$this->multiplexed->pause();
}

public function resume()
{
$this->multiplexed->resume();
}

public function isReadable()
{
return $this->multiplexed->isReadable();
}

public function pipe(WritableStreamInterface $dest, array $options = array())
{
return Util::pipe($this, $dest, $options);
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;

// closing output stream closes input stream
$this->multiplexed->close();

$this->emit('close', array());
}
}
12 changes: 12 additions & 0 deletions src/Io/StreamingParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ public function parsePlainStream(PromiseInterface $promise)
}));
}

/**
* Returns a readable plain text stream for the given multiplexed stream using Docker's "attach multiplexing protocol"
*
* @param ReadableStreamInterface $input
* @param string $stderrEvent
* @return ReadableStreamInterface
*/
public function demultiplexStream(ReadableStreamInterface $input, $stderrEvent = null)
{
return new ReadableDemultiplexStream($input, $stderrEvent);
}

/**
* Returns a promise which resolves with the buffered stream contents of the given stream
*
Expand Down
28 changes: 27 additions & 1 deletion tests/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -408,22 +408,48 @@ public function testExecStart()

$this->expectRequest('POST', '/exec/123/start', $this->createResponse($data));
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);
$this->streamingParser->expects($this->once())->method('bufferedStream')->with($this->equalTo($stream))->willReturn(Promise\resolve($data));

$this->expectPromiseResolveWith($data, $this->client->execStart(123, $config));
}

public function testExecStartStream()
public function testExecStartStreamWithoutTtyWillDemultiplex()
{
$config = array();
$stream = $this->getMock('React\Stream\ReadableStreamInterface');

$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream)->willReturn($stream);

$this->assertSame($stream, $this->client->execStartStream(123, $config));
}

public function testExecStartStreamWithTtyWillNotDemultiplex()
{
$config = array('Tty' => true);
$stream = $this->getMock('React\Stream\ReadableStreamInterface');

$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
$this->streamingParser->expects($this->never())->method('demultiplexStream');

$this->assertSame($stream, $this->client->execStartStream(123, $config));
}

public function testExecStartStreamWithCustomStderrEvent()
{
$config = array();
$stream = $this->getMock('React\Stream\ReadableStreamInterface');

$this->expectRequest('POST', '/exec/123/start', $this->createResponse());
$this->streamingParser->expects($this->once())->method('parsePlainStream')->will($this->returnValue($stream));
$this->streamingParser->expects($this->once())->method('demultiplexStream')->with($stream, 'stderr')->willReturn($stream);

$this->assertSame($stream, $this->client->execStartStream(123, $config, 'stderr'));
}

public function testExecResize()
{
$this->expectRequestFlow('POST', '/exec/123/resize?w=800&h=600', $this->createResponse(), 'expectEmpty');
Expand Down
Loading