Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queryStream() method to stream result set rows and remove undocumented "results" event #57

Merged
merged 2 commits into from
Jun 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ It is written in pure PHP and does not require any extensions.
* [Usage](#usage)
* [Connection](#connection)
* [connect()](#connect)
* [query()](#query)
* [queryStream()](#querystream)
* [Install](#install)
* [Tests](#tests)
* [License](#license)
Expand Down Expand Up @@ -118,6 +120,102 @@ invoking this method without having to await its resolution first.
This method throws an `Exception` if the connection is already initialized,
i.e. it MUST NOT be called more than once.

#### query()

The `query(string $query, callable|null $callback, mixed ...$params): QueryCommand|null` method can be used to
perform an async query.

If this SQL statement returns a result set (such as from a `SELECT`
statement), this method will buffer everything in memory until the result
set is completed and will then invoke the `$callback` function. This is
the preferred method if you know your result set to not exceed a few
dozens or hundreds of rows. If the size of your result set is either
unknown or known to be too large to fit into memory, you should use the
[`queryStream()`](#querystream) method instead.

```php
$connection->query($query, function (QueryCommand $command) {
if ($command->hasError()) {
// test whether the query was executed successfully
// get the error object, instance of Exception.
$error = $command->getError();
echo 'Error: ' . $error->getMessage() . PHP_EOL;
} elseif (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
print_r($command->resultFields);
print_r($command->resultRows);
echo count($command->resultRows) . ' row(s) in set' . PHP_EOL;
} else {
// this is an OK message in response to an UPDATE etc.
if ($command->insertId !== 0) {
var_dump('last insert ID', $command->insertId);
}
echo 'Query OK, ' . $command->affectedRows . ' row(s) affected' . PHP_EOL;
}
});
```

You can optionally pass any number of `$params` that will be bound to the
query like this:

```php
$connection->query('SELECT * FROM user WHERE id > ?', $fn, $id);
```

The given `$sql` parameter MUST contain a single statement. Support
for multiple statements is disabled for security reasons because it
could allow for possible SQL injection attacks and this API is not
suited for exposing multiple possible results.

#### queryStream()

The `queryStream(string $sql, array $params = array()): ReadableStreamInterface` method can be used to
perform an async query and stream the rows of the result set.

This method returns a readable stream that will emit each row of the
result set as a `data` event. It will only buffer data to complete a
single row in memory and will not store the whole result set. This allows
you to process result sets of unlimited size that would not otherwise fit
into memory. If you know your result set to not exceed a few dozens or
hundreds of rows, you may want to use the [`query()`](#query) method instead.

```php
$stream = $connection->queryStream('SELECT * FROM user');
$stream->on('data', function ($row) {
echo $row['name'] . PHP_EOL;
});
$stream->on('end', function () {
echo 'Completed.';
});
```

You can optionally pass an array of `$params` that will be bound to the
query like this:

```php
$stream = $connection->queryStream('SELECT * FROM user WHERE id > ?', [$id]);
```

This method is specifically designed for queries that return a result set
(such as from a `SELECT` or `EXPLAIN` statement). Queries that do not
return a result set (such as a `UPDATE` or `INSERT` statement) will not
emit any `data` events.

See also [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
for more details about how readable streams can be used in ReactPHP. For
example, you can also use its `pipe()` method to forward the result set
rows to a [`WritableStreamInterface`](https://github.com/reactphp/stream#writablestreaminterface)
like this:

```php
$connection->queryStream('SELECT * FROM user')->pipe($formatter)->pipe($logger);
```

The given `$sql` parameter MUST contain a single statement. Support
for multiple statements is disabled for security reasons because it
could allow for possible SQL injection attacks and this API is not
suited for exposing multiple possible results.

## Install

The recommended way to install this library is [through Composer](https://getcomposer.org).
Expand Down
34 changes: 34 additions & 0 deletions examples/02-query-stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

require __DIR__ . '/../vendor/autoload.php';

//create the main loop
$loop = React\EventLoop\Factory::create();

//create a mysql connection for executing queries
$connection = new React\MySQL\Connection($loop, array(
'dbname' => 'test',
'user' => 'test',
'passwd' => 'test',
));

$connection->connect(function () {});

$sql = isset($argv[1]) ? $argv[1] : 'select * from book';

$stream = $connection->queryStream($sql);
$stream->on('data', function ($row) {
var_dump($row);
});

$stream->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () {
echo 'CLOSED' . PHP_EOL;
});

$connection->close();

$loop->run();
47 changes: 44 additions & 3 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use React\Socket\ConnectionInterface as SocketConnectionInterface;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;

use React\Stream\ThroughStream;

/**
* Class Connection
Expand Down Expand Up @@ -116,17 +116,58 @@ public function query($sql, $callback = null, $params = null)
}
$this->_doCommand($command);

$command->on('results', function ($rows, $command) use ($callback) {
// store all result set rows until result set end
$rows = array();
$command->on('result', function ($row) use (&$rows) {
$rows[] = $row;
});
$command->on('end', function ($command) use ($callback, &$rows) {
$command->resultRows = $rows;
$rows = array();
$callback($command, $this);
});

// resolve / reject status reply (response without result set)
$command->on('error', function ($err, $command) use ($callback) {
$callback($command, $this);
});
$command->on('success', function ($command) use ($callback) {
$callback($command, $this);
});
}

public function queryStream($sql, $params = array())
{
$query = new Query($sql);

if ($params) {
$query->bindParamsFromArray($params);
}

$command = new QueryCommand($this);
$command->setQuery($query);
$this->_doCommand($command);

$stream = new ThroughStream();

// forward result set rows until result set end
$command->on('result', function ($row) use ($stream) {
$stream->write($row);
});
$command->on('end', function () use ($stream) {
$stream->end();
});

// status reply (response without result set) ends stream without data
$command->on('success', function () use ($stream) {
$stream->end();
});
$command->on('error', function ($err) use ($stream) {
$stream->emit('error', array($err));
$stream->close();
});

return null;
return $stream;
}

/**
Expand Down
103 changes: 94 additions & 9 deletions src/ConnectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace React\MySQL;

use React\MySQL\Commands\QueryCommand;
use React\Stream\ReadableStreamInterface;

/**
* Interface ConnectionInterface
Expand All @@ -11,7 +12,6 @@
*/
interface ConnectionInterface
{

const STATE_INIT = 0;
const STATE_CONNECT_FAILED = 1;
const STATE_AUTHENTICATE_FAILED = 2;
Expand All @@ -22,25 +22,110 @@ interface ConnectionInterface
const STATE_CLOSED = 7;

/**
* Do a async query.
* Performs an async query.
*
* If this SQL statement returns a result set (such as from a `SELECT`
* statement), this method will buffer everything in memory until the result
* set is completed and will then invoke the `$callback` function. This is
* the preferred method if you know your result set to not exceed a few
* dozens or hundreds of rows. If the size of your result set is either
* unknown or known to be too large to fit into memory, you should use the
* [`queryStream()`](#querystream) method instead.
*
* ```php
* $connection->query($query, function (QueryCommand $command) {
* if ($command->hasError()) {
* // test whether the query was executed successfully
* // get the error object, instance of Exception.
* $error = $command->getError();
* echo 'Error: ' . $error->getMessage() . PHP_EOL;
* } elseif (isset($command->resultRows)) {
* // this is a response to a SELECT etc. with some rows (0+)
* print_r($command->resultFields);
* print_r($command->resultRows);
* echo count($command->resultRows) . ' row(s) in set' . PHP_EOL;
* } else {
* // this is an OK message in response to an UPDATE etc.
* if ($command->insertId !== 0) {
* var_dump('last insert ID', $command->insertId);
* }
* echo 'Query OK, ' . $command->affectedRows . ' row(s) affected' . PHP_EOL;
* }
* });
* ```
*
* You can optionally pass any number of `$params` that will be bound to the
* query like this:
*
* ```php
* $connection->query('SELECT * FROM user WHERE id > ?', $fn, $id);
* ```
*
* The given `$sql` parameter MUST contain a single statement. Support
* for multiple statements is disabled for security reasons because it
* could allow for possible SQL injection attacks and this API is not
* suited for exposing multiple possible results.
*
* @param string $sql MySQL sql statement.
* @param callable|null $callback Query result handler callback.
* @param mixed $params,... Parameters which should bind to query.
*
* $callback signature:
*
* function (QueryCommand $cmd, ConnectionInterface $conn): void
* @return QueryCommand|null Return QueryCommand if $callback not specified.
* @throws Exception if the connection is not initialized or already closed/closing
*/
public function query($sql, $callback = null, $params = null);

/**
* Performs an async query and streams the rows of the result set.
*
* This method returns a readable stream that will emit each row of the
* result set as a `data` event. It will only buffer data to complete a
* single row in memory and will not store the whole result set. This allows
* you to process result sets of unlimited size that would not otherwise fit
* into memory. If you know your result set to not exceed a few dozens or
* hundreds of rows, you may want to use the [`query()`](#query) method instead.
*
* ```php
* $stream = $connection->queryStream('SELECT * FROM user');
* $stream->on('data', function ($row) {
* echo $row['name'] . PHP_EOL;
* });
* $stream->on('end', function () {
* echo 'Completed.';
* });
* ```
*
* You can optionally pass an array of `$params` that will be bound to the
* query like this:
*
* ```php
* $stream = $connection->queryStream('SELECT * FROM user WHERE id > ?', [$id]);
* ```
*
* This method is specifically designed for queries that return a result set
* (such as from a `SELECT` or `EXPLAIN` statement). Queries that do not
* return a result set (such as a `UPDATE` or `INSERT` statement) will not
* emit any `data` events.
*
* See also [`ReadableStreamInterface`](https://github.com/reactphp/stream#readablestreaminterface)
* for more details about how readable streams can be used in ReactPHP. For
* example, you can also use its `pipe()` method to forward the result set
* rows to a [`WritableStreamInterface`](https://github.com/reactphp/stream#writablestreaminterface)
* like this:
*
* ```php
* $connection->queryStream('SELECT * FROM user')->pipe($formatter)->pipe($logger);
* ```
*
* The given `$sql` parameter MUST contain a single statement. Support
* for multiple statements is disabled for security reasons because it
* could allow for possible SQL injection attacks and this API is not
* suited for exposing multiple possible results.
*
* @return QueryCommand|null Return QueryCommand if $callback not specified.
* @throws Exception if the connection is not initialized or already closed/closing
* @param string $sql SQL statement
* @param array $params Parameters which should be bound to query
* @return ReadableStreamInterface
*/
public function query($sql, $callback = null, $params = null);
public function queryStream($sql, $params = array());

/**
* Checks that connection is alive.
Expand Down
7 changes: 1 addition & 6 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class Parser extends EventEmitter

protected $rsState = 0;
protected $pctSize = 0;
protected $resultRows = [];
protected $resultFields = [];

protected $insertId;
Expand Down Expand Up @@ -179,7 +178,6 @@ public function parse($data)

$this->rsState = self::RS_STATE_HEADER;
$this->resultFields = [];
$this->resultRows = [];
if ($this->phase === self::PHASE_AUTH_SENT || $this->phase === self::PHASE_GOT_INIT) {
$this->phase = self::PHASE_AUTH_ERR;
}
Expand Down Expand Up @@ -301,7 +299,6 @@ public function parse($data)
private function onResultRow($row)
{
// $this->debug('row data: ' . json_encode($row));
$this->resultRows[] = $row;
$command = $this->currCommand;
$command->emit('result', array($row, $command, $command->getConnection()));
}
Expand All @@ -323,13 +320,11 @@ protected function onResultDone()
$command = $this->currCommand;
$this->currCommand = null;

$command->resultRows = $this->resultRows;
$command->resultFields = $this->resultFields;
$command->emit('results', array($this->resultRows, $command, $command->getConnection()));
$command->emit('end', array($command, $command->getConnection()));

$this->rsState = self::RS_STATE_HEADER;
$this->resultRows = $this->resultFields = [];
$this->resultFields = [];
}

protected function onSuccess()
Expand Down
Loading