Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ This example runs a simple `SELECT` query and dumps all the records from a `book

```php
$factory = new React\MySQL\Factory();

$uri = 'test:test@localhost/test';
$connection = $factory->createLazyConnection($uri);
$connection = $factory->createLazyConnection('user:pass@localhost/bookstore');

$connection->query('SELECT * FROM book')->then(
function (QueryResult $command) {
Expand Down
9 changes: 4 additions & 5 deletions examples/01-query.php
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
<?php

// $ php examples/01-query.php
// $ MYSQL_URI=test:test@localhost/test php examples/01-query.php "SELECT * FROM book"

use React\MySQL\Factory;
use React\MySQL\QueryResult;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$connection = $factory->createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$uri = 'test:test@localhost/test';
$query = isset($argv[1]) ? $argv[1] : 'select * from book';

//create a lazy mysql connection for executing query
$connection = $factory->createLazyConnection($uri);

$connection->query($query)->then(function (QueryResult $command) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
Expand Down
7 changes: 2 additions & 5 deletions examples/02-query-stream.php
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
<?php

// $ php examples/02-query-stream.php "SHOW VARIABLES"
// $ MYSQL_URI=test:test@localhost/test php examples/02-query-stream.php "SELECT * FROM book"

use React\MySQL\Factory;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$connection = $factory->createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$uri = 'test:test@localhost/test';
$query = isset($argv[1]) ? $argv[1] : 'select * from book';

//create a lazy mysql connection for executing query
$connection = $factory->createLazyConnection($uri);

$stream = $connection->queryStream($query);

$stream->on('data', function ($row) {
Expand Down
6 changes: 4 additions & 2 deletions examples/11-interactive.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?php

// $ php examples/11-interactive.php
// $ MYSQL_URI=test:test@localhost/test php examples/11-interactive.php

use React\MySQL\ConnectionInterface;
use React\MySQL\QueryResult;
use React\MySQL\Factory;
Expand All @@ -8,8 +11,7 @@
require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();

$uri = 'test:test@localhost/test';
$uri = getenv('MYSQL_URI') ?: 'test:test@localhost/test';

// open a STDIN stream to read keyboard input (not supported on Windows)
$stdin = new ReadableResourceStream(STDIN);
Expand Down
3 changes: 2 additions & 1 deletion examples/12-slow-stream.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php

// $ php examples/12-slow-stream.php "SHOW VARIABLES"
// $ MYSQL_URI=test:test@localhost/test php examples/12-slow-stream.php "SELECT * FROM book"

use React\EventLoop\Loop;
use React\MySQL\ConnectionInterface;
Expand All @@ -9,8 +10,8 @@
require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$uri = getenv('MYSQL_URI') ?: 'test:test@localhost/test';

$uri = 'test:test@localhost/test';
$query = isset($argv[1]) ? $argv[1] : 'select * from book';

//create a mysql connection for executing query
Expand Down
43 changes: 33 additions & 10 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ public function createConnection($uri)
}

$parts = parse_url($uri);
$uri = preg_replace('#:[^:/]*@#', ':***@', $uri);
if (!isset($parts['scheme'], $parts['host']) || $parts['scheme'] !== 'mysql') {
return \React\Promise\reject(new \InvalidArgumentException('Invalid connect uri given'));
return \React\Promise\reject(new \InvalidArgumentException(
'Invalid MySQL URI given (EINVAL)',
\defined('SOCKET_EINVAL') ? \SOCKET_EINVAL : 22
));
}

$args = [];
Expand All @@ -187,9 +191,12 @@ public function createConnection($uri)
$parts['host'] . ':' . (isset($parts['port']) ? $parts['port'] : 3306)
);

$deferred = new Deferred(function ($_, $reject) use ($connecting) {
$deferred = new Deferred(function ($_, $reject) use ($connecting, $uri) {
// connection cancelled, start with rejecting attempt, then clean up
$reject(new \RuntimeException('Connection to database server cancelled'));
$reject(new \RuntimeException(
'Connection to ' . $uri . ' cancelled (ECONNABORTED)',
\defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
));

// either close successful connection or cancel pending connection attempt
$connecting->then(function (SocketConnectionInterface $connection) {
Expand All @@ -198,7 +205,7 @@ public function createConnection($uri)
$connecting->cancel();
});

$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred) {
$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri) {
$executor = new Executor();
$parser = new Parser($stream, $executor);

Expand All @@ -209,12 +216,27 @@ public function createConnection($uri)
$command->on('success', function () use ($deferred, $connection) {
$deferred->resolve($connection);
});
$command->on('error', function ($error) use ($deferred, $stream) {
$deferred->reject($error);
$command->on('error', function (\Exception $error) use ($deferred, $stream, $uri) {
$const = '';
$errno = $error->getCode();
if ($error instanceof Exception) {
$const = ' (EACCES)';
$errno = \defined('SOCKET_EACCES') ? \SOCKET_EACCES : 13;
}

$deferred->reject(new \RuntimeException(
'Connection to ' . $uri . ' failed during authentication: ' . $error->getMessage() . $const,
$errno,
$error
));
$stream->close();
});
}, function ($error) use ($deferred) {
$deferred->reject(new \RuntimeException('Unable to connect to database server', 0, $error));
}, function (\Exception $error) use ($deferred, $uri) {
$deferred->reject(new \RuntimeException(
'Connection to ' . $uri . ' failed: ' . $error->getMessage(),
$error->getCode(),
$error
));
});

// use timeout from explicit ?timeout=x parameter or default to PHP's default_socket_timeout (60)
Expand All @@ -223,10 +245,11 @@ public function createConnection($uri)
return $deferred->promise();
}

return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) {
return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) {
if ($e instanceof TimeoutException) {
throw new \RuntimeException(
'Connection to database server timed out after ' . $e->getTimeout() . ' seconds'
'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)',
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
);
}
throw $e;
Expand Down
33 changes: 25 additions & 8 deletions src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,25 @@ public function close()
}

$this->state = self::STATE_CLOSED;
$remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false;
$this->stream->close();

// reject all pending commands if connection is closed
while (!$this->executor->isIdle()) {
$command = $this->executor->dequeue();
$command->emit('error', [
new \RuntimeException('Connection lost')
]);
assert($command instanceof CommandInterface);

if ($remoteClosed) {
$command->emit('error', [new \RuntimeException(
'Connection closed by peer (ECONNRESET)',
\defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104
)]);
} else {
$command->emit('error', [new \RuntimeException(
'Connection closing (ECONNABORTED)',
\defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
)]);
}
}

$this->emit('close');
Expand All @@ -189,7 +200,10 @@ public function handleConnectionError($err)
public function handleConnectionClosed()
{
if ($this->state < self::STATE_CLOSEING) {
$this->emit('error', [new \RuntimeException('mysql server has gone away'), $this]);
$this->emit('error', [new \RuntimeException(
'Connection closed by peer (ECONNRESET)',
\defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104
)]);
}

$this->close();
Expand All @@ -202,10 +216,13 @@ public function handleConnectionClosed()
*/
protected function _doCommand(CommandInterface $command)
{
if ($this->state === self::STATE_AUTHENTICATED) {
return $this->executor->enqueue($command);
} else {
throw new Exception("Can't send command");
if ($this->state !== self::STATE_AUTHENTICATED) {
throw new \RuntimeException(
'Connection ' . ($this->state === self::STATE_CLOSED ? 'closed' : 'closing'). ' (ENOTCONN)',
\defined('SOCKET_ENOTCONN') ? \SOCKET_ENOTCONN : 107
);
}

return $this->executor->enqueue($command);
}
}
9 changes: 5 additions & 4 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ protected function onSuccess()
if ($command instanceof QueryCommand) {
$command->affectedRows = $this->affectedRows;
$command->insertId = $this->insertId;
$command->warningCount = $this->warningCount;
$command->warningCount = $this->warningCount;
$command->message = $this->message;
}
$command->emit('success');
Expand All @@ -322,9 +322,10 @@ public function onClose()
if ($command instanceof QuitCommand) {
$command->emit('success');
} else {
$command->emit('error', [
new \RuntimeException('Connection lost')
]);
$command->emit('error', [new \RuntimeException(
'Connection closing (ECONNABORTED)',
\defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
)]);
}
}
}
Expand Down
Loading