Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include buffer logic to avoid dependency on reactphp/promise-stream #482

Merged
merged 1 commit into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
"psr/http-message": "^1.0",
"react/event-loop": "^1.2",
"react/promise": "^3 || ^2.3 || ^1.2.1",
"react/promise-stream": "^1.4",
"react/socket": "^1.12",
"react/stream": "^1.2",
"ringcentral/psr7": "^1.2"
Expand All @@ -43,6 +42,7 @@
"clue/socks-react": "^1.4",
"phpunit/phpunit": "^9.5 || ^5.7 || ^4.8.35",
"react/async": "^4 || ^3 || ^2",
"react/promise-stream": "^1.4",
"react/promise-timer": "^1.9"
},
"autoload": {
Expand Down
68 changes: 45 additions & 23 deletions src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use React\Http\Message\Response;
use React\Http\Message\ResponseException;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use React\Stream\ReadableStreamInterface;
use RingCentral\Psr7\Uri;
Expand Down Expand Up @@ -165,46 +166,67 @@ function (ResponseInterface $response) use ($request, $that, $deferred, $state)
*/
public function bufferResponse(ResponseInterface $response, Deferred $deferred, ClientRequestState $state)
{
$stream = $response->getBody();
$body = $response->getBody();
$size = $body->getSize();

$size = $stream->getSize();
if ($size !== null && $size > $this->maximumSize) {
$stream->close();
$body->close();
return \React\Promise\reject(new \OverflowException(
'Response body size of ' . $size . ' bytes exceeds maximum of ' . $this->maximumSize . ' bytes',
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 0
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90
));
}

// body is not streaming => already buffered
if (!$stream instanceof ReadableStreamInterface) {
if (!$body instanceof ReadableStreamInterface) {
return \React\Promise\resolve($response);
}

// buffer stream and resolve with buffered body
/** @var ?\Closure $closer */
$closer = null;
$maximumSize = $this->maximumSize;
$promise = \React\Promise\Stream\buffer($stream, $maximumSize)->then(
function ($body) use ($response) {
return $response->withBody(new BufferedBody($body));
},
function ($e) use ($stream, $maximumSize) {
// try to close stream if buffering fails (or is cancelled)
$stream->close();

if ($e instanceof \OverflowException) {
$e = new \OverflowException(
return $state->pending = new Promise(function ($resolve, $reject) use ($body, $maximumSize, $response, &$closer) {
// resolve with current buffer when stream closes successfully
$buffer = '';
$body->on('close', $closer = function () use (&$buffer, $response, $maximumSize, $resolve, $reject) {
$resolve($response->withBody(new BufferedBody($buffer)));
});

// buffer response body data in memory
$body->on('data', function ($data) use (&$buffer, $maximumSize, $body, $closer, $reject) {
$buffer .= $data;

// close stream and reject promise if limit is exceeded
if (isset($buffer[$maximumSize])) {
$buffer = '';
assert($closer instanceof \Closure);
$body->removeListener('close', $closer);
$body->close();

$reject(new \OverflowException(
'Response body size exceeds maximum of ' . $maximumSize . ' bytes',
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 0
);
\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90
));
}
});

throw $e;
}
);

$state->pending = $promise;
// reject buffering if body emits error
$body->on('error', function (\Exception $e) use ($reject) {
$reject(new \RuntimeException(
'Error while buffering response body: ' . $e->getMessage(),
$e->getCode(),
$e
));
});
}, function () use ($body, &$closer) {
// cancelled buffering: remove close handler to avoid resolving, then close and reject
assert($closer instanceof \Closure);
$body->removeListener('close', $closer);
$body->close();

return $promise;
throw new \RuntimeException('Cancelled buffering response body');
});
}

/**
Expand Down
75 changes: 57 additions & 18 deletions src/Middleware/RequestBodyBufferMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\BufferedBody;
use React\Http\Io\IniUtil;
use React\Promise\Stream;
use React\Promise\Promise;
use React\Stream\ReadableStreamInterface;

final class RequestBodyBufferMiddleware
Expand All @@ -29,19 +29,19 @@ public function __construct($sizeLimit = null)
$this->sizeLimit = IniUtil::iniSizeToBytes($sizeLimit);
}

public function __invoke(ServerRequestInterface $request, $stack)
public function __invoke(ServerRequestInterface $request, $next)
{
$body = $request->getBody();
$size = $body->getSize();

// happy path: skip if body is known to be empty (or is already buffered)
if ($size === 0 || !$body instanceof ReadableStreamInterface) {
if ($size === 0 || !$body instanceof ReadableStreamInterface || !$body->isReadable()) {
// replace with empty body if body is streaming (or buffered size exceeds limit)
if ($body instanceof ReadableStreamInterface || $size > $this->sizeLimit) {
$request = $request->withBody(new BufferedBody(''));
}

return $stack($request);
return $next($request);
}

// request body of known size exceeding limit
Expand All @@ -50,21 +50,60 @@ public function __invoke(ServerRequestInterface $request, $stack)
$sizeLimit = 0;
}

return Stream\buffer($body, $sizeLimit)->then(function ($buffer) use ($request, $stack) {
$request = $request->withBody(new BufferedBody($buffer));

return $stack($request);
}, function ($error) use ($stack, $request, $body) {
// On buffer overflow keep the request body stream in,
// but ignore the contents and wait for the close event
// before passing the request on to the next middleware.
if ($error instanceof OverflowException) {
return Stream\first($body, 'close')->then(function () use ($stack, $request) {
return $stack($request);
});
}
/** @var ?\Closure $closer */
$closer = null;

return new Promise(function ($resolve, $reject) use ($body, &$closer, $sizeLimit, $request, $next) {
// buffer request body data in memory, discard but keep buffering if limit is reached
$buffer = '';
$bufferer = null;
$body->on('data', $bufferer = function ($data) use (&$buffer, $sizeLimit, $body, &$bufferer) {
$buffer .= $data;

// On buffer overflow keep the request body stream in,
// but ignore the contents and wait for the close event
// before passing the request on to the next middleware.
if (isset($buffer[$sizeLimit])) {
assert($bufferer instanceof \Closure);
$body->removeListener('data', $bufferer);
$bufferer = null;
$buffer = '';
}
});

// call $next with current buffer and resolve or reject with its results
$body->on('close', $closer = function () use (&$buffer, $request, $resolve, $reject, $next) {
try {
// resolve with result of next handler
$resolve($next($request->withBody(new BufferedBody($buffer))));
} catch (\Exception $e) {
$reject($e);
} catch (\Throwable $e) { // @codeCoverageIgnoreStart
// reject Errors just like Exceptions (PHP 7+)
$reject($e); // @codeCoverageIgnoreEnd
}
});

// reject buffering if body emits error
$body->on('error', function (\Exception $e) use ($reject, $body, $closer) {
// remove close handler to avoid resolving, then close and reject
assert($closer instanceof \Closure);
$body->removeListener('close', $closer);
$body->close();

$reject(new \RuntimeException(
'Error while buffering request body: ' . $e->getMessage(),
$e->getCode(),
$e
));
});
}, function () use ($body, &$closer) {
// cancelled buffering: remove close handler to avoid resolving, then close and reject
assert($closer instanceof \Closure);
$body->removeListener('close', $closer);
$body->close();

throw $error;
throw new \RuntimeException('Cancelled buffering request body');
});
}
}
94 changes: 89 additions & 5 deletions tests/Io/TransactionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefau
$this->assertEquals('hello world', (string)$response->getBody());
}

public function testReceivingStreamingBodyWithSizeExceedingMaximumResponseBufferWillRejectAndCloseResponseStream()
public function testReceivingStreamingBodyWithContentLengthExceedingMaximumResponseBufferWillRejectAndCloseResponseStreamImmediately()
{
$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());
Expand All @@ -419,11 +419,87 @@ public function testReceivingStreamingBodyWithSizeExceedingMaximumResponseBuffer
$sender = $this->makeSenderMock();
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));

$transaction = new Transaction($sender, Loop::get());

$promise = $transaction->send($request);

$exception = null;
$promise->then(null, function ($e) use (&$exception) {
$exception = $e;
});

$this->assertFalse($stream->isWritable());

assert($exception instanceof \OverflowException);
$this->assertInstanceOf('OverflowException', $exception);
$this->assertEquals('Response body size of 100000000 bytes exceeds maximum of 16777216 bytes', $exception->getMessage());
$this->assertEquals(defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90, $exception->getCode());
$this->assertNull($exception->getPrevious());
}

public function testReceivingStreamingBodyWithContentsExceedingMaximumResponseBufferWillRejectAndCloseResponseStreamWhenBufferExceedsLimit()
{
$stream = new ThroughStream();
$stream->on('close', $this->expectCallableOnce());

$request = $this->getMockBuilder('Psr\Http\Message\RequestInterface')->getMock();

$response = new Response(200, array(), new ReadableBodyStream($stream));

// mock sender to resolve promise with the given $response in response to the given $request
$sender = $this->makeSenderMock();
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));

$transaction = new Transaction($sender, Loop::get());
$transaction = $transaction->withOptions(array('maximumSize' => 10));
$promise = $transaction->send($request);

$exception = null;
$promise->then(null, function ($e) use (&$exception) {
$exception = $e;
});

$this->assertTrue($stream->isWritable());
$stream->write('hello wörld');
$this->assertFalse($stream->isWritable());

assert($exception instanceof \OverflowException);
$this->assertInstanceOf('OverflowException', $exception);
$this->assertEquals('Response body size exceeds maximum of 10 bytes', $exception->getMessage());
$this->assertEquals(defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90, $exception->getCode());
$this->assertNull($exception->getPrevious());
}

public function testReceivingStreamingBodyWillRejectWhenStreamEmitsError()
{
$stream = new ThroughStream(function ($data) {
throw new \UnexpectedValueException('Unexpected ' . $data, 42);
});

$request = $this->getMockBuilder('Psr\Http\Message\RequestInterface')->getMock();
$response = new Response(200, array(), new ReadableBodyStream($stream));

// mock sender to resolve promise with the given $response in response to the given $request
$sender = $this->makeSenderMock();
$sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response));

$transaction = new Transaction($sender, Loop::get());
$promise = $transaction->send($request);

$this->setExpectedException('OverflowException');
\React\Async\await(\React\Promise\Timer\timeout($promise, 0.001));
$exception = null;
$promise->then(null, function ($e) use (&$exception) {
$exception = $e;
});

$this->assertTrue($stream->isWritable());
$stream->write('Foo');
$this->assertFalse($stream->isWritable());

assert($exception instanceof \RuntimeException);
$this->assertInstanceOf('RuntimeException', $exception);
$this->assertEquals('Error while buffering response body: Unexpected Foo', $exception->getMessage());
$this->assertEquals(42, $exception->getCode());
$this->assertInstanceOf('UnexpectedValueException', $exception->getPrevious());
}

public function testCancelBufferingResponseWillCloseStreamAndReject()
Expand All @@ -446,8 +522,16 @@ public function testCancelBufferingResponseWillCloseStreamAndReject()
$deferred->resolve($response);
$promise->cancel();

$this->setExpectedException('RuntimeException');
\React\Async\await(\React\Promise\Timer\timeout($promise, 0.001));
$exception = null;
$promise->then(null, function ($e) use (&$exception) {
$exception = $e;
});

assert($exception instanceof \RuntimeException);
$this->assertInstanceOf('RuntimeException', $exception);
$this->assertEquals('Cancelled buffering response body', $exception->getMessage());
$this->assertEquals(0, $exception->getCode());
$this->assertNull($exception->getPrevious());
}

public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled()
Expand Down
Loading