diff --git a/src/Factory.php b/src/Factory.php index 8fe27e0..351ffb6 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -165,7 +165,10 @@ public function createConnection($uri) $parts = parse_url($uri); $uri = preg_replace('#:[^:/]*@#', ':***@', $uri); if (!isset($parts['scheme'], $parts['host']) || $parts['scheme'] !== 'mysql') { - return \React\Promise\reject(new \InvalidArgumentException('Invalid MySQL URI given')); + return \React\Promise\reject(new \InvalidArgumentException( + 'Invalid MySQL URI given (EINVAL)', + \defined('SOCKET_EINVAL') ? \SOCKET_EINVAL : 22 + )); } $args = []; @@ -191,7 +194,8 @@ public function createConnection($uri) $deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) { // connection cancelled, start with rejecting attempt, then clean up $reject(new \RuntimeException( - 'Connection to ' . $uri . ' cancelled' + 'Connection to ' . $uri . ' cancelled (ECONNABORTED)', + \defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103 )); // either close successful connection or cancel pending connection attempt @@ -213,9 +217,16 @@ public function createConnection($uri) $deferred->resolve($connection); }); $command->on('error', function (\Exception $error) use ($deferred, $stream, $uri) { + $const = ''; + $errno = $error->getCode(); + if ($error instanceof Exception) { + $const = ' (EACCES)'; + $errno = \defined('SOCKET_EACCES') ? \SOCKET_EACCES : 13; + } + $deferred->reject(new \RuntimeException( - 'Connection to ' . $uri . ' failed during authentication: ' . $error->getMessage(), - $error->getCode(), + 'Connection to ' . $uri . ' failed during authentication: ' . $error->getMessage() . $const, + $errno, $error )); $stream->close(); @@ -237,7 +248,8 @@ public function createConnection($uri) return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) { if ($e instanceof TimeoutException) { throw new \RuntimeException( - 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds' + 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', + \defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110 ); } throw $e; diff --git a/src/Io/Connection.php b/src/Io/Connection.php index 6b6dd1c..20810d2 100644 --- a/src/Io/Connection.php +++ b/src/Io/Connection.php @@ -157,14 +157,25 @@ public function close() } $this->state = self::STATE_CLOSED; + $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; $this->stream->close(); // reject all pending commands if connection is closed while (!$this->executor->isIdle()) { $command = $this->executor->dequeue(); - $command->emit('error', [ - new \RuntimeException('Connection lost') - ]); + assert($command instanceof CommandInterface); + + if ($remoteClosed) { + $command->emit('error', [new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + \defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104 + )]); + } else { + $command->emit('error', [new \RuntimeException( + 'Connection closing (ECONNABORTED)', + \defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103 + )]); + } } $this->emit('close'); @@ -189,7 +200,10 @@ public function handleConnectionError($err) public function handleConnectionClosed() { if ($this->state < self::STATE_CLOSEING) { - $this->emit('error', [new \RuntimeException('mysql server has gone away'), $this]); + $this->emit('error', [new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + \defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104 + )]); } $this->close(); @@ -202,10 +216,13 @@ public function handleConnectionClosed() */ protected function _doCommand(CommandInterface $command) { - if ($this->state === self::STATE_AUTHENTICATED) { - return $this->executor->enqueue($command); - } else { - throw new Exception("Can't send command"); + if ($this->state !== self::STATE_AUTHENTICATED) { + throw new \RuntimeException( + 'Connection ' . ($this->state === self::STATE_CLOSED ? 'closed' : 'closing'). ' (ENOTCONN)', + \defined('SOCKET_ENOTCONN') ? \SOCKET_ENOTCONN : 107 + ); } + + return $this->executor->enqueue($command); } } diff --git a/src/Io/Parser.php b/src/Io/Parser.php index 9e9399e..0085a84 100644 --- a/src/Io/Parser.php +++ b/src/Io/Parser.php @@ -307,7 +307,7 @@ protected function onSuccess() if ($command instanceof QueryCommand) { $command->affectedRows = $this->affectedRows; $command->insertId = $this->insertId; - $command->warningCount = $this->warningCount; + $command->warningCount = $this->warningCount; $command->message = $this->message; } $command->emit('success'); @@ -322,9 +322,10 @@ public function onClose() if ($command instanceof QuitCommand) { $command->emit('success'); } else { - $command->emit('error', [ - new \RuntimeException('Connection lost') - ]); + $command->emit('error', [new \RuntimeException( + 'Connection closing (ECONNABORTED)', + \defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103 + )]); } } } diff --git a/tests/FactoryTest.php b/tests/FactoryTest.php index 14fd856..bee52ee 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryTest.php @@ -52,7 +52,17 @@ public function testConnectWillRejectWhenGivenInvalidScheme() $promise = $factory->createConnection('foo://127.0.0.1'); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException'))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('InvalidArgumentException'), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getMessage() === 'Invalid MySQL URI given (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); + }) + ) + )); } public function testConnectWillUseGivenHostAndGivenPort() @@ -113,7 +123,10 @@ public function testConnectWithInvalidUriWillRejectWithoutConnecting() $this->logicalAnd( $this->isInstanceOf('InvalidArgumentException'), $this->callback(function (\InvalidArgumentException $e) { - return $e->getMessage() === 'Invalid MySQL URI given'; + return $e->getMessage() === 'Invalid MySQL URI given (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); }) ) )); @@ -153,9 +166,15 @@ public function testConnectWithInvalidPassRejectsWithAuthenticationError() $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( - $this->isInstanceOf('Exception'), - $this->callback(function (\Exception $e) { - return !!preg_match("/^Connection to mysql:\/\/[^ ]* failed during authentication: Access denied for user '.*?'@'.*?' \(using password: YES\)$/", $e->getMessage()); + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return !!preg_match("/^Connection to mysql:\/\/[^ ]* failed during authentication: Access denied for user '.*?'@'.*?' \(using password: YES\) \(EACCES\)$/", $e->getMessage()); + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + }), + $this->callback(function (\RuntimeException $e) { + return !!preg_match("/^Access denied for user '.*?'@'.*?' \(using password: YES\)$/", $e->getPrevious()->getMessage()); }) ) )); @@ -177,7 +196,20 @@ public function testConnectWillRejectWhenServerClosesConnection() $uri = $this->getConnectionString(['host' => $parts['host'], 'port' => $parts['port']]); $promise = $factory->createConnection($uri); - $promise->then(null, $this->expectCallableOnce()); + + $uri = preg_replace('/:[^:]*@/', ':***@', $uri); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($uri) { + return $e->getMessage() === 'Connection to mysql://' . $uri . ' failed during authentication: Connection closed by peer (ECONNRESET)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }) + ) + )); Loop::run(); } @@ -194,9 +226,12 @@ public function testConnectWillRejectOnExplicitTimeoutDespiteValidAuth() $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( - $this->isInstanceOf('Exception'), - $this->callback(function (\Exception $e) use ($uri) { - return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds'; + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($uri) { + return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds (ETIMEDOUT)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110); }) ) )); @@ -219,9 +254,12 @@ public function testConnectWillRejectOnDefaultTimeoutFromIniDespiteValidAuth() $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( - $this->isInstanceOf('Exception'), - $this->callback(function (\Exception $e) use ($uri) { - return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds'; + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($uri) { + return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds (ETIMEDOUT)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110); }) ) )); @@ -366,7 +404,7 @@ public function testConnectWithValidAuthCanCloseOnlyOnce() public function testConnectWithValidAuthCanCloseAndAbortPing() { - $this->expectOutputString('connected.aborted pending (Connection lost).aborted queued (Connection lost).closed.'); + $this->expectOutputString('connected.aborted pending (Connection closing (ECONNABORTED)).aborted queued (Connection closing (ECONNABORTED)).closed.'); $factory = new Factory(); @@ -401,13 +439,17 @@ public function testlConnectWillRejectWhenUnderlyingConnectorRejects() $factory = new Factory($loop, $connector); $promise = $factory->createConnection('user:secret@127.0.0.1'); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getMessage() === 'Connection to mysql://user:***@127.0.0.1 failed: Failed'); - }))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getCode() === 123); - }))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to mysql://user:***@127.0.0.1 failed: Failed'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === 123; + }) + ) + )); } public function provideUris() @@ -457,7 +499,10 @@ public function testCancelConnectWillCancelPendingConnection($uri, $safe) $this->logicalAnd( $this->isInstanceOf('RuntimeException'), $this->callback(function (\RuntimeException $e) use ($safe) { - return $e->getMessage() === 'Connection to ' . $safe . ' cancelled'; + return $e->getMessage() === 'Connection to ' . $safe . ' cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); }) ) )); @@ -477,10 +522,17 @@ public function testCancelConnectWillCancelPendingConnectionWithRuntimeException $promise->cancel(); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled'); - }))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); + }) + ) + )); } public function testCancelConnectDuringAuthenticationWillCloseConnection() @@ -497,10 +549,17 @@ public function testCancelConnectDuringAuthenticationWillCloseConnection() $promise->cancel(); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled'); - }))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); + }) + ) + )); } public function testConnectLazyWithAnyAuthWillQuitWithoutRunning() diff --git a/tests/Io/ConnectionTest.php b/tests/Io/ConnectionTest.php index 0241354..c9503c7 100644 --- a/tests/Io/ConnectionTest.php +++ b/tests/Io/ConnectionTest.php @@ -25,12 +25,44 @@ public function testQueryAfterQuitRejectsImmediately() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->query('SELECT 1')->then(null, $this->expectCallableOnce()); + $promise = $conn->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); + } + + public function testQueryAfterCloseRejectsImmediately() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $executor = $this->getMockBuilder('React\MySQL\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->never())->method('enqueue'); + + $conn = new Connection($stream, $executor); + $conn->close(); + $promise = $conn->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } - /** - * @expectedException React\MySQL\Exception - */ public function testQueryStreamAfterQuitThrows() { $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); @@ -39,7 +71,13 @@ public function testQueryStreamAfterQuitThrows() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->queryStream('SELECT 1'); + + try { + $conn->queryStream('SELECT 1'); + } catch (\RuntimeException $e) { + $this->assertEquals('Connection closing (ENOTCONN)', $e->getMessage()); + $this->assertEquals(defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107, $e->getCode()); + } } public function testPingAfterQuitRejectsImmediately() @@ -50,7 +88,19 @@ public function testPingAfterQuitRejectsImmediately() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->ping()->then(null, $this->expectCallableOnce()); + $promise = $conn->ping(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } public function testQuitAfterQuitRejectsImmediately() @@ -61,6 +111,49 @@ public function testQuitAfterQuitRejectsImmediately() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->quit()->then(null, $this->expectCallableOnce()); + $promise = $conn->quit(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); + } + + public function testCloseStreamEmitsErrorEvent() + { + $closeHandler = null; + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->exactly(2))->method('on')->withConsecutive( + array('error', $this->anything()), + array('close', $this->callback(function ($arg) use (&$closeHandler) { + $closeHandler = $arg; + return true; + })) + ); + $executor = $this->getMockBuilder('React\MySQL\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->never())->method('enqueue'); + + $conn = new Connection($stream, $executor); + $conn->on('error', $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed by peer (ECONNRESET)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }) + ) + )); + + $this->assertNotNull($closeHandler); + $closeHandler(); } } diff --git a/tests/Io/ParserTest.php b/tests/Io/ParserTest.php index 45460bb..b96c9e9 100644 --- a/tests/Io/ParserTest.php +++ b/tests/Io/ParserTest.php @@ -22,12 +22,23 @@ public function testClosingStreamEmitsErrorForCurrentCommand() $command = new QueryCommand(); $command->on('error', $this->expectCallableOnce()); + $error = null; + $command->on('error', function ($e) use (&$error) { + $error = $e; + }); + // hack to inject command as current command $ref = new \ReflectionProperty($parser, 'currCommand'); $ref->setAccessible(true); $ref->setValue($parser, $command); $stream->close(); + + $this->assertInstanceOf('RuntimeException', $error); + assert($error instanceof \RuntimeException); + + $this->assertEquals('Connection closing (ECONNABORTED)', $error->getMessage()); + $this->assertEquals(defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103, $error->getCode()); } public function testUnexpectedErrorWithoutCurrentCommandWillBeIgnored()