Skip to content

Commit

Permalink
Merge pull request #48 from clue-labs/blocking
Browse files Browse the repository at this point in the history
Support using blocking SQLite adapter when using an empty binary path
  • Loading branch information
SimonFrings authored Nov 12, 2021
2 parents 9d16848 + 34d55c3 commit e94c9f0
Show file tree
Hide file tree
Showing 6 changed files with 694 additions and 234 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ environment than your parent process.
$factory = new Clue\React\SQLite\Factory(null, '/usr/bin/php6.0');
```

Or you may use this parameter to pass an empty PHP binary path which will
cause this project to not spawn a PHP child process for any database
interactions at all. In this case, using SQLite will block the main
process, but continues to provide the exact same async API. This can be
useful if concurrent execution is not needed, especially when running
behind a traditional web server (non-CLI SAPI).

```php
// advanced usage: empty binary path runs blocking SQLite in same process
$factory = new Clue\React\SQLite\Factory(null, '');
```

#### open()

The `open(string $filename, int $flags = null): PromiseInterface<DatabaseInterface>` method can be used to
Expand Down
131 changes: 36 additions & 95 deletions res/sqlite-worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

use Clue\React\NDJson\Decoder;
use Clue\React\NDJson\Encoder;
use Clue\React\SQLite\Io\BlockingDatabase;
use Clue\React\SQLite\Result;
use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;
use React\Stream\ReadableResourceStream;
Expand Down Expand Up @@ -74,35 +76,10 @@
return;
}

if ($data->method === 'open' && \count($data->params) === 1 && \is_string($data->params[0])) {
// open database with one parameter: $filename
try {
$db = new SQLite3(
$data->params[0]
);

$out->write(array(
'id' => $data->id,
'result' => true
));
} catch (Exception $e) {
$out->write(array(
'id' => $data->id,
'error' => array('message' => $e->getMessage())
));
} catch (Error $e) {
$out->write(array(
'id' => $data->id,
'error' => array('message' => $e->getMessage())
));
}
} elseif ($data->method === 'open' && \count($data->params) === 2 && \is_string($data->params[0]) && \is_int($data->params[1])) {
if ($data->method === 'open' && \count($data->params) === 2 && \is_string($data->params[0]) && ($data->params[1] === null || \is_int($data->params[1]))) {
// open database with two parameters: $filename, $flags
try {
$db = new SQLite3(
$data->params[0],
$data->params[1]
);
$db = new BlockingDatabase($data->params[0], $data->params[1]);

$out->write(array(
'id' => $data->id,
Expand All @@ -120,78 +97,40 @@
));
}
} elseif ($data->method === 'exec' && $db !== null && \count($data->params) === 1 && \is_string($data->params[0])) {
// execute statement and suppress PHP warnings
$ret = @$db->exec($data->params[0]);

if ($ret === false) {
// execute statement: $db->exec($sql)
$db->exec($data->params[0])->then(function (Result $result) use ($data, $out) {
$out->write(array(
'id' => $data->id,
'error' => array('message' => $db->lastErrorMsg())
'result' => array(
'insertId' => $result->insertId,
'changed' => $result->changed
)
));
} else {
}, function (Exception $e) use ($data, $out) {
$out->write(array(
'id' => $data->id,
'result' => array(
'insertId' => $db->lastInsertRowID(),
'changed' => $db->changes()
)
'error' => array('message' => $e->getMessage())
));
}
});
} elseif ($data->method === 'query' && $db !== null && \count($data->params) === 2 && \is_string($data->params[0]) && (\is_array($data->params[1]) || \is_object($data->params[1]))) {
// execute statement and suppress PHP warnings
if ($data->params[1] === []) {
$result = @$db->query($data->params[0]);
} else {
$statement = @$db->prepare($data->params[0]);
if ($statement === false) {
$result = false;
// execute statement: $db->query($sql, $params)
$params = [];
foreach ($data->params[1] as $index => $value) {
if (isset($value->float)) {
$params[$index] = (float)$value->float;
} elseif (isset($value->base64)) {
// base64-decode string parameters as BLOB
$params[$index] = \base64_decode($value->base64);
} else {
foreach ($data->params[1] as $index => $value) {
if ($value === null) {
$type = \SQLITE3_NULL;
} elseif ($value === true || $value === false) {
// explicitly cast bool to int because SQLite does not have a native boolean
$type = \SQLITE3_INTEGER;
$value = (int)$value;
} elseif (\is_int($value)) {
$type = \SQLITE3_INTEGER;
} elseif (isset($value->float)) {
$type = \SQLITE3_FLOAT;
$value = (float)$value->float;
} elseif (isset($value->base64)) {
// base64-decode string parameters as BLOB
$type = \SQLITE3_BLOB;
$value = \base64_decode($value->base64);
} else {
$type = \SQLITE3_TEXT;
}

$statement->bindValue(
\is_int($index) ? $index + 1 : $index,
$value,
$type
);
}
$result = @$statement->execute();
$params[$index] = $value;
}
}

if ($result === false) {
$out->write(array(
'id' => $data->id,
'error' => array('message' => $db->lastErrorMsg())
));
} else {
if ($result->numColumns() !== 0) {
// Fetch all rows only if this result set has any columns.
// INSERT/UPDATE/DELETE etc. do not return any columns, trying
// to fetch the results here will issue the same query again.
$rows = $columns = [];
for ($i = 0, $n = $result->numColumns(); $i < $n; ++$i) {
$columns[] = $result->columnName($i);
}

while (($row = $result->fetchArray(\SQLITE3_ASSOC)) !== false) {
$db->query($data->params[0], $params)->then(function (Result $result) use ($data, $out) {
$rows = null;
if ($result->rows !== null) {
$rows = [];
foreach ($result->rows as $row) {
// base64-encode any string that is not valid UTF-8 without control characters (BLOB)
foreach ($row as &$value) {
if (\is_string($value) && \preg_match('/[\x00-\x08\x11\x12\x14-\x1f\x7f]/u', $value) !== 0) {
Expand All @@ -202,21 +141,23 @@
}
$rows[] = $row;
}
} else {
$rows = $columns = null;
}
$result->finalize();

$out->write(array(
'id' => $data->id,
'result' => array(
'columns' => $columns,
'columns' => $result->columns,
'rows' => $rows,
'insertId' => $db->lastInsertRowID(),
'changed' => $db->changes()
'insertId' => $result->insertId,
'changed' => $result->changed
)
));
}
}, function (Exception $e) use ($data, $out) {
$out->write(array(
'id' => $data->id,
'error' => array('message' => $e->getMessage())
));
});
} elseif ($data->method === 'close' && $db !== null && \count($data->params) === 0) {
// close database and remove reference
$db->close();
Expand Down
34 changes: 26 additions & 8 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Clue\React\SQLite;

use Clue\React\SQLite\Io\BlockingDatabase;
use Clue\React\SQLite\Io\LazyDatabase;
use Clue\React\SQLite\Io\ProcessIoDatabase;
use React\ChildProcess\Process;
Expand Down Expand Up @@ -46,6 +47,18 @@ class Factory
* $factory = new Clue\React\SQLite\Factory(null, '/usr/bin/php6.0');
* ```
*
* Or you may use this parameter to pass an empty PHP binary path which will
* cause this project to not spawn a PHP child process for any database
* interactions at all. In this case, using SQLite will block the main
* process, but continues to provide the exact same async API. This can be
* useful if concurrent execution is not needed, especially when running
* behind a traditional web server (non-CLI SAPI).
*
* ```php
* // advanced usage: empty binary path runs blocking SQLite in same process
* $factory = new Clue\React\SQLite\Factory(null, '');
* ```
*
* @param ?LoopInterface $loop
* @param ?string $binary
*/
Expand Down Expand Up @@ -109,6 +122,17 @@ public function __construct(LoopInterface $loop = null, $binary = null)
public function open($filename, $flags = null)
{
$filename = $this->resolve($filename);

if ($this->bin === '') {
try {
return \React\Promise\resolve(new BlockingDatabase($filename, $flags));
} catch (\Exception $e) {
return \React\Promise\reject(new \RuntimeException($e->getMessage()) );
} catch (\Error $e) {
return \React\Promise\reject(new \RuntimeException($e->getMessage()));
}
}

return $this->useSocket ? $this->openSocketIo($filename, $flags) : $this->openProcessIo($filename, $flags);
}

Expand Down Expand Up @@ -248,10 +272,7 @@ private function openProcessIo($filename, $flags = null)
$process->start($this->loop);

$db = new ProcessIoDatabase($process);
$args = array($filename);
if ($flags !== null) {
$args[] = $flags;
}
$args = array($filename, $flags);

return $db->send('open', $args)->then(function () use ($db) {
return $db;
Expand Down Expand Up @@ -333,10 +354,7 @@ private function openSocketIo($filename, $flags = null)
});

$db = new ProcessIoDatabase($process);
$args = array($filename);
if ($flags !== null) {
$args[] = $flags;
}
$args = array($filename, $flags);

$db->send('open', $args)->then(function () use ($deferred, $db) {
$deferred->resolve($db);
Expand Down
Loading

0 comments on commit e94c9f0

Please sign in to comment.