Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions example/stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

use Dotenv\Dotenv;
use React\EventLoop\Loop;
use Saraf\QB\QueryBuilder\Core\DBFactory;
use Saraf\QB\QueryBuilder\Enums\OrderDirection;
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;

include "vendor/autoload.php";

// Loop
$loop = Loop::get();

// Environments
$env = Dotenv::createImmutable(__DIR__ . "/../");
$env->load();

// Env Loader
$DB_NAME = $_ENV['DB_NAME'];
$DB_USER = $_ENV['DB_USER'];
$DB_PASS = $_ENV['DB_PASS'];
$DB_HOST = $_ENV['DB_HOST'];
$DB_PORT_READ = $_ENV['DB_PORT_READ'];
$DB_PORT_WRITE = $_ENV['DB_PORT_WRITE'];


try {
$dbFactory = new DBFactory(
$loop,
$DB_HOST,
$DB_NAME,
$DB_USER,
$DB_PASS,
$DB_PORT_WRITE,
$DB_PORT_READ,
5,
5,
2,
2
);
} catch (DBFactoryException $e) {
echo $e->getMessage();
exit(1);
}

// Without QB
$dbFactory->getQueryBuilder()
->select()
->from("Users")
->addColumn("id")
->whereGreater("id", 1)
->compile()
->stream()
->onData(function ($result) {
echo "New Row Data:" . json_encode($result) . PHP_EOL;
})
->run();

// Without QueryBuilder
$dbFactory->streamQuery("select id from Users where id > 1")
->onData(function ($result) {
echo "New Row Data:" . json_encode($result) . PHP_EOL;
})
->run();

$loop->addPeriodicTimer(1, function () {
memory();
});

function memory()
{
echo "Memory Stat: " . round(memory_get_usage() / 1_000_000, 2) . " / " . round(memory_get_usage(true) / 1_000_000, 2) . " MB" .
" | Peak: " . round(memory_get_peak_usage() / 1_000_000, 2) . " / " . round(memory_get_peak_usage(true) / 1_000_000, 2) . " MB" . PHP_EOL;
}


$loop->run();
23 changes: 23 additions & 0 deletions src/QueryBuilder/Core/DBFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React\EventLoop\LoopInterface;
use React\MySQL\Factory;
use React\Promise\PromiseInterface;
use React\Stream\ReadableStreamInterface;
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
use Saraf\QB\QueryBuilder\Helpers\QBHelper;
use Saraf\QB\QueryBuilder\QueryBuilder;
Expand Down Expand Up @@ -160,6 +161,28 @@ public function query(string $query): PromiseInterface
});
}

/**
* @throws DBFactoryException
*/
public function streamQuery(string $query): StreamEventHandler
{
$isWrite = true;
if (str_starts_with(strtolower($query), "select")
|| str_starts_with(strtolower($query), "show")
) $isWrite = false;

$bestConnections = $this->getBestConnection();

$connection = $isWrite
? $this->writeConnections[$bestConnections['write']]
: $this->readConnections[$bestConnections['read']];

if (!($connection instanceof DBWorker))
throw new DBFactoryException("Connections Not Instance of Worker / Restart App");

return $connection->streamQuery($query);
}

/**
* @throws DBFactoryException
*/
Expand Down
9 changes: 9 additions & 0 deletions src/QueryBuilder/Core/DBWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React\MySQL\ConnectionInterface;
use React\MySQL\QueryResult;
use React\Promise\PromiseInterface;
use React\Stream\ReadableStreamInterface;

class DBWorker
{
Expand All @@ -31,6 +32,14 @@ public function query(string $query): PromiseInterface
});
}

public function streamQuery(string $query): StreamEventHandler
{
$this->startJob();
return (new StreamEventHandler($this->getConnection(), $query, function () {
$this->endJob();
}));
}

protected function handleResult(QueryResult $result): array
{
if (!is_null($result->resultRows)) {
Expand Down
6 changes: 6 additions & 0 deletions src/QueryBuilder/Core/EQuery.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use React\Promise\Promise;
use React\Promise\PromiseInterface;
use React\Stream\ReadableStreamInterface;
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;

final class EQuery
Expand All @@ -30,6 +31,11 @@ public function commit(): PromiseInterface
}
}

public function stream(): StreamEventHandler
{
return $this->factory->streamQuery($this->query);
}

public function getQuery(): Promise
{
return new Promise(function (callable $resolve) {
Expand Down
66 changes: 66 additions & 0 deletions src/QueryBuilder/Core/StreamEventHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

namespace Saraf\QB\QueryBuilder\Core;

use React\MySQL\ConnectionInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;

class StreamEventHandler
{
protected \Closure|null $onDataFn = null;
protected Deferred $promise;

public function __construct(
protected ConnectionInterface $connection,
protected string $query,
protected ?\Closure $onClosedWorker = null
)
{
}

public function onData(\Closure $onDataFn): StreamEventHandler
{
$this->onDataFn = $onDataFn;
return $this;
}

/**
* @throws \Exception
*/
public function run(mixed $initialValue = null): PromiseInterface
{
$promise = new Deferred();

if ($this->onDataFn == null) {
throw new \Exception("onData is required");
}


$stream = $this->connection->queryStream($this->query);

$stream->on("data", function ($row) use (&$initialValue) {
$initialValue = ($this->onDataFn)($row, $initialValue);
});

$stream->on("error", function (\Throwable $error) use ($promise) {
$promise->resolve([
'result' => false,
'error' => $error->getMessage(),
]);
});
$stream->on("close", function () use (&$initialValue, $promise) {
$promise->resolve([
'result' => true,
'data' => $initialValue
]);
});

// For Handling Inner Queue
if ($this->onClosedWorker != null)
$stream->on("close", $this->onClosedWorker);

return $promise->promise();
}

}