Skip to content

Commit

Permalink
Use socket error codes (errnos) for connection rejections
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Aug 30, 2021
1 parent a729faa commit 705737b
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 53 deletions.
22 changes: 17 additions & 5 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ public function createConnection($uri)
$parts = parse_url($uri);
$uri = preg_replace('#:[^:/]*@#', ':***@', $uri);
if (!isset($parts['scheme'], $parts['host']) || $parts['scheme'] !== 'mysql') {
return \React\Promise\reject(new \InvalidArgumentException('Invalid MySQL URI given'));
return \React\Promise\reject(new \InvalidArgumentException(
'Invalid MySQL URI given (EINVAL)',
\defined('SOCKET_EINVAL') ? \SOCKET_EINVAL : 22
));
}

$args = [];
Expand All @@ -191,7 +194,8 @@ public function createConnection($uri)
$deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) {
// connection cancelled, start with rejecting attempt, then clean up
$reject(new \RuntimeException(
'Connection to ' . $uri . ' cancelled'
'Connection to ' . $uri . ' cancelled (ECONNABORTED)',
\defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
));

// either close successful connection or cancel pending connection attempt
Expand All @@ -213,9 +217,16 @@ public function createConnection($uri)
$deferred->resolve($connection);
});
$command->on('error', function (\Exception $error) use ($deferred, $stream, $uri) {
$const = '';
$errno = $error->getCode();
if ($error instanceof Exception) {
$const = ' (EACCES)';
$errno = \defined('SOCKET_EACCES') ? \SOCKET_EACCES : 13;
}

$deferred->reject(new \RuntimeException(
'Connection to ' . $uri . ' failed during authentication: ' . $error->getMessage(),
$error->getCode(),
'Connection to ' . $uri . ' failed during authentication: ' . $error->getMessage() . $const,
$errno,
$error
));
$stream->close();
Expand All @@ -237,7 +248,8 @@ public function createConnection($uri)
return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) {
if ($e instanceof TimeoutException) {
throw new \RuntimeException(
'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds'
'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)',
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
);
}
throw $e;
Expand Down
33 changes: 25 additions & 8 deletions src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,25 @@ public function close()
}

$this->state = self::STATE_CLOSED;
$remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false;
$this->stream->close();

// reject all pending commands if connection is closed
while (!$this->executor->isIdle()) {
$command = $this->executor->dequeue();
$command->emit('error', [
new \RuntimeException('Connection lost')
]);
assert($command instanceof CommandInterface);

if ($remoteClosed) {
$command->emit('error', [new \RuntimeException(
'Connection closed by peer (ECONNRESET)',
\defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104
)]);
} else {
$command->emit('error', [new \RuntimeException(
'Connection closing (ECONNABORTED)',
\defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
)]);
}
}

$this->emit('close');
Expand All @@ -189,7 +200,10 @@ public function handleConnectionError($err)
public function handleConnectionClosed()
{
if ($this->state < self::STATE_CLOSEING) {
$this->emit('error', [new \RuntimeException('mysql server has gone away'), $this]);
$this->emit('error', [new \RuntimeException(
'Connection closed by peer (ECONNRESET)',
\defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104
)]);
}

$this->close();
Expand All @@ -202,10 +216,13 @@ public function handleConnectionClosed()
*/
protected function _doCommand(CommandInterface $command)
{
if ($this->state === self::STATE_AUTHENTICATED) {
return $this->executor->enqueue($command);
} else {
throw new Exception("Can't send command");
if ($this->state !== self::STATE_AUTHENTICATED) {
throw new \RuntimeException(
'Connection ' . ($this->state === self::STATE_CLOSED ? 'closed' : 'closing'). ' (ENOTCONN)',
\defined('SOCKET_ENOTCONN') ? \SOCKET_ENOTCONN : 107
);
}

return $this->executor->enqueue($command);
}
}
9 changes: 5 additions & 4 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ protected function onSuccess()
if ($command instanceof QueryCommand) {
$command->affectedRows = $this->affectedRows;
$command->insertId = $this->insertId;
$command->warningCount = $this->warningCount;
$command->warningCount = $this->warningCount;
$command->message = $this->message;
}
$command->emit('success');
Expand All @@ -322,9 +322,10 @@ public function onClose()
if ($command instanceof QuitCommand) {
$command->emit('success');
} else {
$command->emit('error', [
new \RuntimeException('Connection lost')
]);
$command->emit('error', [new \RuntimeException(
'Connection closing (ECONNABORTED)',
\defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
)]);
}
}
}
Expand Down
117 changes: 88 additions & 29 deletions tests/FactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,17 @@ public function testConnectWillRejectWhenGivenInvalidScheme()

$promise = $factory->createConnection('foo://127.0.0.1');

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException')));
$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('InvalidArgumentException'),
$this->callback(function (\InvalidArgumentException $e) {
return $e->getMessage() === 'Invalid MySQL URI given (EINVAL)';
}),
$this->callback(function (\InvalidArgumentException $e) {
return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22);
})
)
));
}

public function testConnectWillUseGivenHostAndGivenPort()
Expand Down Expand Up @@ -113,7 +123,10 @@ public function testConnectWithInvalidUriWillRejectWithoutConnecting()
$this->logicalAnd(
$this->isInstanceOf('InvalidArgumentException'),
$this->callback(function (\InvalidArgumentException $e) {
return $e->getMessage() === 'Invalid MySQL URI given';
return $e->getMessage() === 'Invalid MySQL URI given (EINVAL)';
}),
$this->callback(function (\InvalidArgumentException $e) {
return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22);
})
)
));
Expand Down Expand Up @@ -153,9 +166,15 @@ public function testConnectWithInvalidPassRejectsWithAuthenticationError()

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('Exception'),
$this->callback(function (\Exception $e) {
return !!preg_match("/^Connection to mysql:\/\/[^ ]* failed during authentication: Access denied for user '.*?'@'.*?' \(using password: YES\)$/", $e->getMessage());
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) {
return !!preg_match("/^Connection to mysql:\/\/[^ ]* failed during authentication: Access denied for user '.*?'@'.*?' \(using password: YES\) \(EACCES\)$/", $e->getMessage());
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13);
}),
$this->callback(function (\RuntimeException $e) {
return !!preg_match("/^Access denied for user '.*?'@'.*?' \(using password: YES\)$/", $e->getPrevious()->getMessage());
})
)
));
Expand All @@ -177,7 +196,20 @@ public function testConnectWillRejectWhenServerClosesConnection()
$uri = $this->getConnectionString(['host' => $parts['host'], 'port' => $parts['port']]);

$promise = $factory->createConnection($uri);
$promise->then(null, $this->expectCallableOnce());

$uri = preg_replace('/:[^:]*@/', ':***@', $uri);

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) use ($uri) {
return $e->getMessage() === 'Connection to mysql://' . $uri . ' failed during authentication: Connection closed by peer (ECONNRESET)';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104);
})
)
));

Loop::run();
}
Expand All @@ -194,9 +226,12 @@ public function testConnectWillRejectOnExplicitTimeoutDespiteValidAuth()

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('Exception'),
$this->callback(function (\Exception $e) use ($uri) {
return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds';
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) use ($uri) {
return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds (ETIMEDOUT)';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110);
})
)
));
Expand All @@ -219,9 +254,12 @@ public function testConnectWillRejectOnDefaultTimeoutFromIniDespiteValidAuth()

$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('Exception'),
$this->callback(function (\Exception $e) use ($uri) {
return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds';
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) use ($uri) {
return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds (ETIMEDOUT)';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110);
})
)
));
Expand Down Expand Up @@ -366,7 +404,7 @@ public function testConnectWithValidAuthCanCloseOnlyOnce()

public function testConnectWithValidAuthCanCloseAndAbortPing()
{
$this->expectOutputString('connected.aborted pending (Connection lost).aborted queued (Connection lost).closed.');
$this->expectOutputString('connected.aborted pending (Connection closing (ECONNABORTED)).aborted queued (Connection closing (ECONNABORTED)).closed.');

$factory = new Factory();

Expand Down Expand Up @@ -401,13 +439,17 @@ public function testlConnectWillRejectWhenUnderlyingConnectorRejects()
$factory = new Factory($loop, $connector);
$promise = $factory->createConnection('user:secret@127.0.0.1');

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getMessage() === 'Connection to mysql://user:***@127.0.0.1 failed: Failed');
})));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getCode() === 123);
})));
$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) {
return $e->getMessage() === 'Connection to mysql://user:***@127.0.0.1 failed: Failed';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === 123;
})
)
));
}

public function provideUris()
Expand Down Expand Up @@ -457,7 +499,10 @@ public function testCancelConnectWillCancelPendingConnection($uri, $safe)
$this->logicalAnd(
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) use ($safe) {
return $e->getMessage() === 'Connection to ' . $safe . ' cancelled';
return $e->getMessage() === 'Connection to ' . $safe . ' cancelled (ECONNABORTED)';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103);
})
)
));
Expand All @@ -477,10 +522,17 @@ public function testCancelConnectWillCancelPendingConnectionWithRuntimeException

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled');
})));
$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) {
return $e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled (ECONNABORTED)';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103);
})
)
));
}

public function testCancelConnectDuringAuthenticationWillCloseConnection()
Expand All @@ -497,10 +549,17 @@ public function testCancelConnectDuringAuthenticationWillCloseConnection()

$promise->cancel();

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));
$promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) {
return ($e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled');
})));
$promise->then(null, $this->expectCallableOnceWith(
$this->logicalAnd(
$this->isInstanceOf('RuntimeException'),
$this->callback(function (\RuntimeException $e) {
return $e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled (ECONNABORTED)';
}),
$this->callback(function (\RuntimeException $e) {
return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103);
})
)
));
}

public function testConnectLazyWithAnyAuthWillQuitWithoutRunning()
Expand Down
Loading

0 comments on commit 705737b

Please sign in to comment.