Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve promise cancellation and clean up any garbage references #23

Merged
merged 1 commit into from
Oct 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"require": {
"php": ">=5.3",
"react/promise": " ^2.1 || ^1.2.1",
"react/socket": "^1.0 || ^0.8.4",
"react/socket": "^1.1",
"ringcentral/psr7": "^1.2"
},
"require-dev": {
Expand Down
62 changes: 45 additions & 17 deletions src/ProxyConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,34 @@ public function connect($uri)
$proxyUri .= '#' . $parts['fragment'];
}

$auth = $this->proxyAuth;
$connecting = $this->connector->connect($proxyUri);

$deferred = new Deferred(function ($_, $reject) use ($connecting) {
$reject(new RuntimeException(
'Connection cancelled while waiting for proxy (ECONNABORTED)',
defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103
));

return $this->connector->connect($proxyUri)->then(function (ConnectionInterface $stream) use ($target, $auth) {
$deferred = new Deferred(function ($_, $reject) use ($stream) {
$reject(new RuntimeException('Connection canceled while waiting for response from proxy (ECONNABORTED)', defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103));
// either close active connection or cancel pending connection attempt
$connecting->then(function (ConnectionInterface $stream) {
$stream->close();
});
$connecting->cancel();
});

$auth = $this->proxyAuth;
$connecting->then(function (ConnectionInterface $stream) use ($target, $auth, $deferred) {
// keep buffering data until headers are complete
$buffer = '';
$fn = function ($chunk) use (&$buffer, $deferred, $stream) {
$stream->on('data', $fn = function ($chunk) use (&$buffer, $deferred, $stream, &$fn) {
$buffer .= $chunk;

$pos = strpos($buffer, "\r\n\r\n");
if ($pos !== false) {
// end of headers received => stop buffering
$stream->removeListener('data', $fn);
$fn = null;

// try to parse headers as response message
try {
$response = Psr7\parse_response(substr($buffer, 0, $pos));
Expand All @@ -163,11 +176,13 @@ public function connect($uri)
if ($response->getStatusCode() === 407) {
// map status code 407 (Proxy Authentication Required) to EACCES
$deferred->reject(new RuntimeException('Proxy denied connection due to invalid authentication ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (EACCES)', defined('SOCKET_EACCES') ? SOCKET_EACCES : 13));
return $stream->close();
$stream->close();
return;
} elseif ($response->getStatusCode() < 200 || $response->getStatusCode() >= 300) {
// map non-2xx status code to ECONNREFUSED
$deferred->reject(new RuntimeException('Proxy refused connection with HTTP error code ' . $response->getStatusCode() . ' (' . $response->getReasonPhrase() . ') (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111));
return $stream->close();
$stream->close();
return;
}

// all okay, resolve with stream instance
Expand All @@ -187,8 +202,7 @@ public function connect($uri)
$deferred->reject(new RuntimeException('Proxy must not send more than 8 KiB of headers (EMSGSIZE)', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90));
$stream->close();
}
};
$stream->on('data', $fn);
});

$stream->on('error', function (Exception $e) use ($deferred) {
$deferred->reject(new RuntimeException('Stream error while waiting for response from proxy (EIO)', defined('SOCKET_EIO') ? SOCKET_EIO : 5, $e));
Expand All @@ -199,14 +213,28 @@ public function connect($uri)
});

$stream->write("CONNECT " . $target . " HTTP/1.1\r\nHost: " . $target . "\r\n" . $auth . "\r\n");

return $deferred->promise()->then(function (ConnectionInterface $stream) use ($fn) {
// Stop buffering when connection has been established.
$stream->removeListener('data', $fn);
return new Promise\FulfilledPromise($stream);
});
}, function (Exception $e) use ($proxyUri) {
throw new RuntimeException('Unable to connect to proxy (ECONNREFUSED)', defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111, $e);
}, function (Exception $e) use ($deferred) {
$deferred->reject($e = new RuntimeException(
'Unable to connect to proxy (ECONNREFUSED)',
defined('SOCKET_ECONNREFUSED') ? SOCKET_ECONNREFUSED : 111,
$e
));

// avoid garbage references by replacing all closures in call stack.
// what a lovely piece of code!
$r = new \ReflectionProperty('Exception', 'trace');
$r->setAccessible(true);
$trace = $r->getValue($e);
foreach ($trace as &$one) {
foreach ($one['args'] as &$arg) {
if ($arg instanceof \Closure) {
$arg = 'Object(' . get_class($arg) . ')';
}
}
}
$r->setValue($e, $trace);
});

return $deferred->promise();
}
}
9 changes: 6 additions & 3 deletions tests/AbstractTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ protected function expectCallableOnceWithExceptionCode($code)
$mock
->expects($this->once())
->method('__invoke')
->with($this->callback(function ($e) use ($code) {
return $e->getCode() === $code;
}));
->with($this->logicalAnd(
$this->isInstanceOf('Exception'),
$this->callback(function ($e) use ($code) {
return $e->getCode() === $code;
})
));

return $mock;
}
Expand Down
16 changes: 16 additions & 0 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,20 @@ public function testSecureGoogleDoesNotAcceptPlainStream()
$this->setExpectedException('RuntimeException', 'Connection to proxy lost', SOCKET_ECONNRESET);
Block\await($promise, $this->loop, 3.0);
}

/**
* @requires PHP 7
*/
public function testCancelWhileConnectingShouldNotCreateGarbageCycles()
{
$proxy = new ProxyConnector('google.com', $this->dnsConnector);

gc_collect_cycles();

$promise = $proxy->connect('google.com:80');
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}
}
69 changes: 66 additions & 3 deletions tests/ProxyConnectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Clue\React\HttpProxy\ProxyConnector;
use React\Promise\Promise;
use React\Socket\ConnectionInterface;
use React\Promise\Deferred;

class ProxyConnectorTest extends AbstractTestCase
{
Expand Down Expand Up @@ -355,22 +356,84 @@ public function testResolvesIfStreamReturnsSuccessAndEmitsExcessiveData()
$stream->emit('data', array("HTTP/1.1 200 OK\r\n\r\nhello!"));
}

public function testCancelPromiseWillCloseOpenConnectionAndReject()
public function testCancelPromiseWhileConnectionIsReadyWillCloseOpenConnectionAndReject()
{
$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock();
$stream->expects($this->once())->method('close');

$promise = \React\Promise\resolve($stream);
$this->connector->expects($this->once())->method('connect')->willReturn($promise);
$deferred = new Deferred();

$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');

$deferred->resolve($stream);

$this->assertInstanceOf('React\Promise\CancellablePromiseInterface', $promise);

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWithExceptionCode(SOCKET_ECONNABORTED));
}

public function testCancelPromiseDuringConnectionShouldNotCreateGarbageCycles()
{
$pending = new Promise(function () { });
$this->connector->expects($this->once())->method('connect')->willReturn($pending);

gc_collect_cycles();

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}

public function testCancelPromiseWhileConnectionIsReadyShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$stream = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close', 'write'))->getMock();

$deferred = new Deferred();

$this->connector->expects($this->once())->method('connect')->willReturn($deferred->promise());

gc_collect_cycles();

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');
$deferred->resolve($stream);
$promise->cancel();
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}

public function testRejectedConnectionShouldNotCreateGarbageCycles()
{
if (class_exists('React\Promise\When')) {
$this->markTestSkipped('Not supported on legacy Promise v1 API');
}

$rejected = \React\Promise\reject(new \RuntimeException());
$this->connector->expects($this->once())->method('connect')->willReturn($rejected);

gc_collect_cycles();

$proxy = new ProxyConnector('proxy.example.com', $this->connector);

$promise = $proxy->connect('google.com:80');
unset($promise);

$this->assertEquals(0, gc_collect_cycles());
}
}