From 3f6acdddbe4cf30e0b5caf3a8402ff1a79f5af46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 7 Aug 2019 10:19:36 +0200 Subject: [PATCH 1/2] Add SelectiveTransportExecutor to retry with TCP if UDP is truncated --- README.md | 49 ++++ composer.json | 2 +- src/Query/SelectiveTransportExecutor.php | 85 +++++++ src/Query/UdpTransportExecutor.php | 8 +- .../Query/SelectiveTransportExecutorTest.php | 220 ++++++++++++++++++ tests/Query/UdpTransportExecutorTest.php | 27 +-- 6 files changed, 372 insertions(+), 19 deletions(-) create mode 100644 src/Query/SelectiveTransportExecutor.php create mode 100644 tests/Query/SelectiveTransportExecutorTest.php diff --git a/README.md b/README.md index ed329a22..108732d1 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ easily be used to create a DNS server. * [Advanced usage](#advanced-usage) * [UdpTransportExecutor](#udptransportexecutor) * [TcpTransportExecutor](#tcptransportexecutor) + * [SelectiveTransportExecutor](#selectivetransportexecutor) * [HostsFileExecutor](#hostsfileexecutor) * [Install](#install) * [Tests](#tests) @@ -350,6 +351,54 @@ $executor = new CoopExecutor( packages. Higher-level components should take advantage of the Socket component instead of reimplementing this socket logic from scratch. +### SelectiveTransportExecutor + +The `SelectiveTransportExecutor` class can be used to +Send DNS queries over a UDP or TCP/IP stream transport. + +This class will automatically choose the correct transport protocol to send +a DNS query to your DNS server. It will always try to send it over the more +efficient UDP transport first. If this query yields a size related issue +(truncated messages), it will retry over a streaming TCP/IP transport. + +For more advanced usages one can utilize this class directly. +The following example looks up the `IPv6` address for `reactphp.org`. + +```php +$executor = new SelectiveTransportExecutor($udpExecutor, $tcpExecutor); + +$executor->query( + new Query($name, Message::TYPE_AAAA, Message::CLASS_IN) +)->then(function (Message $message) { + foreach ($message->answers as $answer) { + echo 'IPv6: ' . $answer->data . PHP_EOL; + } +}, 'printf'); +``` + +Note that this executor only implements the logic to select the correct +transport for the given DNS query. Implementing the correct transport logic, +implementing timeouts and any retry logic is left up to the given executors, +see also [`UdpTransportExecutor`](#udptransportexecutor) and +[`TcpTransportExecutor`](#tcptransportexecutor) for more details. + +Note that this executor is entirely async and as such allows you to execute +any number of queries concurrently. You should probably limit the number of +concurrent queries in your application or you're very likely going to face +rate limitations and bans on the resolver end. For many common applications, +you may want to avoid sending the same query multiple times when the first +one is still pending, so you will likely want to use this in combination with +a `CoopExecutor` like this: + +```php +$executor = new CoopExecutor( + new SelectiveTransportExecutor( + $datagramExecutor, + $streamExecutor + ) +); +``` + ### HostsFileExecutor Note that the above `UdpTransportExecutor` class always performs an actual DNS query. diff --git a/composer.json b/composer.json index 572baff7..5c0e47a9 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,7 @@ "php": ">=5.3.0", "react/cache": "^1.0 || ^0.6 || ^0.5", "react/event-loop": "^1.0 || ^0.5", - "react/promise": "^2.1 || ^1.2.1", + "react/promise": "^2.7 || ^1.2.1", "react/promise-timer": "^1.2" }, "require-dev": { diff --git a/src/Query/SelectiveTransportExecutor.php b/src/Query/SelectiveTransportExecutor.php new file mode 100644 index 00000000..0f0ca5d0 --- /dev/null +++ b/src/Query/SelectiveTransportExecutor.php @@ -0,0 +1,85 @@ +query( + * new Query($name, Message::TYPE_AAAA, Message::CLASS_IN) + * )->then(function (Message $message) { + * foreach ($message->answers as $answer) { + * echo 'IPv6: ' . $answer->data . PHP_EOL; + * } + * }, 'printf'); + * ``` + * + * Note that this executor only implements the logic to select the correct + * transport for the given DNS query. Implementing the correct transport logic, + * implementing timeouts and any retry logic is left up to the given executors, + * see also [`UdpTransportExecutor`](#udptransportexecutor) and + * [`TcpTransportExecutor`](#tcptransportexecutor) for more details. + * + * Note that this executor is entirely async and as such allows you to execute + * any number of queries concurrently. You should probably limit the number of + * concurrent queries in your application or you're very likely going to face + * rate limitations and bans on the resolver end. For many common applications, + * you may want to avoid sending the same query multiple times when the first + * one is still pending, so you will likely want to use this in combination with + * a `CoopExecutor` like this: + * + * ```php + * $executor = new CoopExecutor( + * new SelectiveTransportExecutor( + * $datagramExecutor, + * $streamExecutor + * ) + * ); + * ``` + */ +class SelectiveTransportExecutor implements ExecutorInterface +{ + private $datagramExecutor; + private $streamExecutor; + + public function __construct(ExecutorInterface $datagramExecutor, ExecutorInterface $streamExecutor) + { + $this->datagramExecutor = $datagramExecutor; + $this->streamExecutor = $streamExecutor; + } + + public function query(Query $query) + { + $stream = $this->streamExecutor; + $pending = $this->datagramExecutor->query($query); + + return new Promise(function ($resolve, $reject) use (&$pending, $stream, $query) { + $pending->then( + $resolve, + function ($e) use (&$pending, $stream, $query, $resolve, $reject) { + if ($e->getCode() === (\defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90)) { + $pending = $stream->query($query)->then($resolve, $reject); + } else { + $reject($e); + } + } + ); + }, function () use (&$pending) { + $pending->cancel(); + $pending = null; + }); + } +} diff --git a/src/Query/UdpTransportExecutor.php b/src/Query/UdpTransportExecutor.php index d1b4d215..62ac2183 100644 --- a/src/Query/UdpTransportExecutor.php +++ b/src/Query/UdpTransportExecutor.php @@ -121,7 +121,8 @@ public function query(Query $query) $queryData = $this->dumper->toBinary($request); if (isset($queryData[512])) { return \React\Promise\reject(new \RuntimeException( - 'DNS query for ' . $query->name . ' failed: Query too large for UDP transport' + 'DNS query for ' . $query->name . ' failed: Query too large for UDP transport', + \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90 )); } @@ -172,7 +173,10 @@ public function query(Query $query) \fclose($socket); if ($response->tc) { - $deferred->reject(new \RuntimeException('DNS query for ' . $query->name . ' failed: The server returned a truncated result for a UDP query, but retrying via TCP is currently not supported')); + $deferred->reject(new \RuntimeException( + 'DNS query for ' . $query->name . ' failed: The server returned a truncated result for a UDP query', + \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90 + )); return; } diff --git a/tests/Query/SelectiveTransportExecutorTest.php b/tests/Query/SelectiveTransportExecutorTest.php new file mode 100644 index 00000000..d9d22b20 --- /dev/null +++ b/tests/Query/SelectiveTransportExecutorTest.php @@ -0,0 +1,220 @@ +datagram = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock(); + $this->stream = $this->getMockBuilder('React\Dns\Query\ExecutorInterface')->getMock(); + + $this->executor = new SelectiveTransportExecutor($this->datagram, $this->stream); + } + + public function testQueryResolvesWhenDatagramTransportResolvesWithoutUsingStreamTransport() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $response = new Message(); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\resolve($response)); + + $this->stream + ->expects($this->never()) + ->method('query'); + + $promise = $this->executor->query($query); + + $promise->then($this->expectCallableOnceWith($response)); + } + + public function testQueryResolvesWhenStreamTransportResolvesAfterDatagramTransportRejectsWithSizeError() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $response = new Message(); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\reject(new \RuntimeException('', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90))); + + $this->stream + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\resolve($response)); + + $promise = $this->executor->query($query); + + $promise->then($this->expectCallableOnceWith($response)); + } + + public function testQueryRejectsWhenDatagramTransportRejectsWithRuntimeExceptionWithoutUsingStreamTransport() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\reject(new \RuntimeException())); + + $this->stream + ->expects($this->never()) + ->method('query'); + + $promise = $this->executor->query($query); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testQueryRejectsWhenStreamTransportRejectsAfterDatagramTransportRejectsWithSizeError() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\reject(new \RuntimeException('', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90))); + + $this->stream + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\reject(new \RuntimeException())); + + $promise = $this->executor->query($query); + + $promise->then(null, $this->expectCallableOnce()); + } + + public function testCancelPromiseWillCancelPromiseFromDatagramExecutor() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(new Promise(function () {}, $this->expectCallableOnce())); + + $promise = $this->executor->query($query); + $promise->cancel(); + } + + public function testCancelPromiseWillCancelPromiseFromStreamExecutorWhenDatagramExecutorRejectedWithTruncatedResponse() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $deferred = new Deferred(); + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn($deferred->promise()); + + $this->stream + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(new Promise(function () {}, $this->expectCallableOnce())); + + $promise = $this->executor->query($query); + $deferred->reject(new \RuntimeException('', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90)); + $promise->cancel(); + } + + public function testCancelPromiseShouldNotCreateAnyGarbageReferences() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(new Promise(function () {}, function () { + throw new \RuntimeException('Cancelled'); + })); + + gc_collect_cycles(); + $promise = $this->executor->query($query); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } + + public function testCancelPromiseAfterTruncatedResponseShouldNotCreateAnyGarbageReferences() + { + if (class_exists('React\Promise\When')) { + $this->markTestSkipped('Not supported on legacy Promise v1 API'); + } + + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $deferred = new Deferred(); + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn($deferred->promise()); + + $this->stream + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(new Promise(function () {}, function () { + throw new \RuntimeException('Cancelled'); + })); + + gc_collect_cycles(); + $promise = $this->executor->query($query); + $deferred->reject(new \RuntimeException('', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90)); + $promise->cancel(); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } + + public function testRejectedPromiseAfterTruncatedResponseShouldNotCreateAnyGarbageReferences() + { + $query = new Query('igor.io', Message::TYPE_A, Message::CLASS_IN); + + $this->datagram + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\reject(new \RuntimeException('', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90))); + + $this->stream + ->expects($this->once()) + ->method('query') + ->with($query) + ->willReturn(\React\Promise\reject(new \RuntimeException())); + + gc_collect_cycles(); + $promise = $this->executor->query($query); + unset($promise); + + $this->assertEquals(0, gc_collect_cycles()); + } +} diff --git a/tests/Query/UdpTransportExecutorTest.php b/tests/Query/UdpTransportExecutorTest.php index 44ff7f4e..d01ab3ee 100644 --- a/tests/Query/UdpTransportExecutorTest.php +++ b/tests/Query/UdpTransportExecutorTest.php @@ -101,7 +101,14 @@ public function testQueryRejectsIfMessageExceedsUdpSize() $promise = $executor->query($query); $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); - $promise->then(null, $this->expectCallableOnce()); + + $exception = null; + $promise->then(null, function ($reason) use (&$exception) { + $exception = $reason; + }); + + $this->setExpectedException('RuntimeException', '', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90); + throw $exception; } public function testQueryRejectsIfServerConnectionFails() @@ -247,22 +254,10 @@ public function testQueryRejectsIfServerSendsTruncatedResponse() $query = new Query('google.com', Message::TYPE_A, Message::CLASS_IN); - $wait = true; - $promise = $executor->query($query)->then( - null, - function ($e) use (&$wait) { - $wait = false; - throw $e; - } - ); - - // run loop for short period to ensure we detect connection ICMP rejection error - \Clue\React\Block\sleep(0.01, $loop); - if ($wait) { - \Clue\React\Block\sleep(0.2, $loop); - } + $promise = $executor->query($query); - $this->assertFalse($wait); + $this->setExpectedException('RuntimeException', '', defined('SOCKET_EMSGSIZE') ? SOCKET_EMSGSIZE : 90); + \Clue\React\Block\await($promise, $loop, 0.1); } public function testQueryResolvesIfServerSendsValidResponse() From 994b019c2083a54e7b1e2d84e9edcc540126864a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sat, 10 Aug 2019 13:18:56 +0200 Subject: [PATCH 2/2] Automatically select transport protocol w/o explicit scheme in Factory --- src/Resolver/Factory.php | 44 +++++++++++++++++++++++----------- tests/Resolver/FactoryTest.php | 26 ++++++++++++++++++-- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/Resolver/Factory.php b/src/Resolver/Factory.php index 2cec9492..1750aa86 100644 --- a/src/Resolver/Factory.php +++ b/src/Resolver/Factory.php @@ -10,6 +10,7 @@ use React\Dns\Query\ExecutorInterface; use React\Dns\Query\HostsFileExecutor; use React\Dns\Query\RetryExecutor; +use React\Dns\Query\SelectiveTransportExecutor; use React\Dns\Query\TcpTransportExecutor; use React\Dns\Query\TimeoutExecutor; use React\Dns\Query\UdpTransportExecutor; @@ -84,24 +85,39 @@ private function createExecutor($nameserver, LoopInterface $loop) $parts = \parse_url($nameserver); if (isset($parts['scheme']) && $parts['scheme'] === 'tcp') { - $executor = new TimeoutExecutor( - new TcpTransportExecutor($nameserver, $loop), - 5.0, - $loop - ); + $executor = $this->createTcpExecutor($nameserver, $loop); + } elseif (isset($parts['scheme']) && $parts['scheme'] === 'udp') { + $executor = $this->createUdpExecutor($nameserver, $loop); } else { - $executor = new RetryExecutor( - new TimeoutExecutor( - new UdpTransportExecutor( - $nameserver, - $loop - ), - 5.0, - $loop - ) + $executor = new SelectiveTransportExecutor( + $this->createUdpExecutor($nameserver, $loop), + $this->createTcpExecutor($nameserver, $loop) ); } return new CoopExecutor($executor); } + + private function createTcpExecutor($nameserver, LoopInterface $loop) + { + return new TimeoutExecutor( + new TcpTransportExecutor($nameserver, $loop), + 5.0, + $loop + ); + } + + private function createUdpExecutor($nameserver, LoopInterface $loop) + { + return new RetryExecutor( + new TimeoutExecutor( + new UdpTransportExecutor( + $nameserver, + $loop + ), + 5.0, + $loop + ) + ); + } } diff --git a/tests/Resolver/FactoryTest.php b/tests/Resolver/FactoryTest.php index b7027827..1f201bf2 100644 --- a/tests/Resolver/FactoryTest.php +++ b/tests/Resolver/FactoryTest.php @@ -21,7 +21,7 @@ public function createShouldCreateResolver() /** @test */ - public function createWithoutSchemeShouldCreateResolverWithUdpExecutorStack() + public function createWithoutSchemeShouldCreateResolverWithSelectiveUdpAndTcpExecutorStack() { $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); @@ -36,7 +36,15 @@ public function createWithoutSchemeShouldCreateResolverWithUdpExecutorStack() $ref = new \ReflectionProperty($coopExecutor, 'executor'); $ref->setAccessible(true); - $retryExecutor = $ref->getValue($coopExecutor); + $selectiveExecutor = $ref->getValue($coopExecutor); + + $this->assertInstanceOf('React\Dns\Query\SelectiveTransportExecutor', $selectiveExecutor); + + // udp below: + + $ref = new \ReflectionProperty($selectiveExecutor, 'datagramExecutor'); + $ref->setAccessible(true); + $retryExecutor = $ref->getValue($selectiveExecutor); $this->assertInstanceOf('React\Dns\Query\RetryExecutor', $retryExecutor); @@ -51,6 +59,20 @@ public function createWithoutSchemeShouldCreateResolverWithUdpExecutorStack() $udpExecutor = $ref->getValue($timeoutExecutor); $this->assertInstanceOf('React\Dns\Query\UdpTransportExecutor', $udpExecutor); + + // tcp below: + + $ref = new \ReflectionProperty($selectiveExecutor, 'streamExecutor'); + $ref->setAccessible(true); + $timeoutExecutor = $ref->getValue($selectiveExecutor); + + $this->assertInstanceOf('React\Dns\Query\TimeoutExecutor', $timeoutExecutor); + + $ref = new \ReflectionProperty($timeoutExecutor, 'executor'); + $ref->setAccessible(true); + $tcpExecutor = $ref->getValue($timeoutExecutor); + + $this->assertInstanceOf('React\Dns\Query\TcpTransportExecutor', $tcpExecutor); } /** @test */