Skip to content

Commit d3ef8da

Browse files
author
Parsoolak
committed
feat: supporting stream in easiest way
1 parent cdfe222 commit d3ef8da

File tree

5 files changed

+182
-0
lines changed

5 files changed

+182
-0
lines changed

example/stream.php

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
3+
use Dotenv\Dotenv;
4+
use React\EventLoop\Loop;
5+
use Saraf\QB\QueryBuilder\Core\DBFactory;
6+
use Saraf\QB\QueryBuilder\Enums\OrderDirection;
7+
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
8+
9+
include "vendor/autoload.php";
10+
11+
// Loop
12+
$loop = Loop::get();
13+
14+
// Environments
15+
$env = Dotenv::createImmutable(__DIR__ . "/../");
16+
$env->load();
17+
18+
// Env Loader
19+
$DB_NAME = $_ENV['DB_NAME'];
20+
$DB_USER = $_ENV['DB_USER'];
21+
$DB_PASS = $_ENV['DB_PASS'];
22+
$DB_HOST = $_ENV['DB_HOST'];
23+
$DB_PORT_READ = $_ENV['DB_PORT_READ'];
24+
$DB_PORT_WRITE = $_ENV['DB_PORT_WRITE'];
25+
26+
27+
try {
28+
$dbFactory = new DBFactory(
29+
$loop,
30+
$DB_HOST,
31+
$DB_NAME,
32+
$DB_USER,
33+
$DB_PASS,
34+
$DB_PORT_WRITE,
35+
$DB_PORT_READ,
36+
5,
37+
5,
38+
2,
39+
2
40+
);
41+
} catch (DBFactoryException $e) {
42+
echo $e->getMessage();
43+
exit(1);
44+
}
45+
46+
// Without QB
47+
$dbFactory->getQueryBuilder()
48+
->select()
49+
->from("Users")
50+
->addColumn("id")
51+
->whereGreater("id", 1)
52+
->compile()
53+
->stream()
54+
->onError(function (Exception $result) {
55+
echo "Error " . $result->getMessage() . PHP_EOL;
56+
})
57+
->onData(function ($result) {
58+
echo "New Row Data:" . json_encode($result) . PHP_EOL;
59+
})
60+
->onClosed(function () {
61+
echo "Task Finished";
62+
})
63+
->run();
64+
65+
// Without QueryBuilder
66+
$dbFactory->streamQuery("select id from Users where id > 1")
67+
->onError(function (Exception $result) {
68+
echo "Error " . $result->getMessage() . PHP_EOL;
69+
})
70+
->onData(function ($result) {
71+
echo "New Row Data:" . json_encode($result) . PHP_EOL;
72+
})
73+
->onClosed(function () {
74+
echo "Task Finished";
75+
})
76+
->run();
77+
78+
$loop->addPeriodicTimer(1, function () {
79+
memory();
80+
});
81+
82+
function memory()
83+
{
84+
echo "Memory Stat: " . round(memory_get_usage() / 1_000_000, 2) . " / " . round(memory_get_usage(true) / 1_000_000, 2) . " MB" .
85+
" | Peak: " . round(memory_get_peak_usage() / 1_000_000, 2) . " / " . round(memory_get_peak_usage(true) / 1_000_000, 2) . " MB" . PHP_EOL;
86+
}
87+
88+
89+
$loop->run();

src/QueryBuilder/Core/DBFactory.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React\EventLoop\LoopInterface;
66
use React\MySQL\Factory;
77
use React\Promise\PromiseInterface;
8+
use React\Stream\ReadableStreamInterface;
89
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
910
use Saraf\QB\QueryBuilder\Helpers\QBHelper;
1011
use Saraf\QB\QueryBuilder\QueryBuilder;
@@ -160,6 +161,28 @@ public function query(string $query): PromiseInterface
160161
});
161162
}
162163

164+
/**
165+
* @throws DBFactoryException
166+
*/
167+
public function streamQuery(string $query): StreamEventHandler
168+
{
169+
$isWrite = true;
170+
if (str_starts_with(strtolower($query), "select")
171+
|| str_starts_with(strtolower($query), "show")
172+
) $isWrite = false;
173+
174+
$bestConnections = $this->getBestConnection();
175+
176+
$connection = $isWrite
177+
? $this->writeConnections[$bestConnections['write']]
178+
: $this->readConnections[$bestConnections['read']];
179+
180+
if (!($connection instanceof DBWorker))
181+
throw new DBFactoryException("Connections Not Instance of Worker / Restart App");
182+
183+
return $connection->streamQuery($query);
184+
}
185+
163186
/**
164187
* @throws DBFactoryException
165188
*/

src/QueryBuilder/Core/DBWorker.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use React\MySQL\ConnectionInterface;
66
use React\MySQL\QueryResult;
77
use React\Promise\PromiseInterface;
8+
use React\Stream\ReadableStreamInterface;
89

910
class DBWorker
1011
{
@@ -31,6 +32,14 @@ public function query(string $query): PromiseInterface
3132
});
3233
}
3334

35+
public function streamQuery(string $query): StreamEventHandler
36+
{
37+
$this->startJob();
38+
return (new StreamEventHandler($this->getConnection(), $query, function () {
39+
$this->endJob();
40+
}));
41+
}
42+
3443
protected function handleResult(QueryResult $result): array
3544
{
3645
if (!is_null($result->resultRows)) {

src/QueryBuilder/Core/EQuery.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use React\Promise\Promise;
66
use React\Promise\PromiseInterface;
7+
use React\Stream\ReadableStreamInterface;
78
use Saraf\QB\QueryBuilder\Exceptions\DBFactoryException;
89

910
final class EQuery
@@ -30,6 +31,11 @@ public function commit(): PromiseInterface
3031
}
3132
}
3233

34+
public function stream(): StreamEventHandler
35+
{
36+
return $this->factory->streamQuery($this->query);
37+
}
38+
3339
public function getQuery(): Promise
3440
{
3541
return new Promise(function (callable $resolve) {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
namespace Saraf\QB\QueryBuilder\Core;
4+
5+
use React\MySQL\ConnectionInterface;
6+
7+
class StreamEventHandler
8+
{
9+
protected $onError = null;
10+
protected $onData = null;
11+
protected $onClosed = null;
12+
13+
public function __construct(
14+
protected ConnectionInterface $connection,
15+
protected string $query,
16+
protected ?\Closure $onClosedWorker = null
17+
)
18+
{
19+
}
20+
21+
public function onError(callable $onError): StreamEventHandler
22+
{
23+
$this->onError = $onError;
24+
return $this;
25+
}
26+
27+
public function onData(callable $onData): StreamEventHandler
28+
{
29+
$this->onData = $onData;
30+
return $this;
31+
}
32+
33+
public function onClosed(callable $onClosed): StreamEventHandler
34+
{
35+
$this->onClosed = $onClosed;
36+
return $this;
37+
}
38+
39+
public function run(): void
40+
{
41+
$stream = $this->connection->queryStream($this->query);
42+
43+
if ($this->onError != null)
44+
$stream->on("error", $this->onError);
45+
if ($this->onData != null)
46+
$stream->on("data", $this->onData);
47+
if ($this->onClosed != null)
48+
$stream->on("close", $this->onClosed);
49+
50+
// For Handling Inner Queue
51+
if ($this->onClosedWorker != null)
52+
$stream->on("close", $this->onClosedWorker);
53+
}
54+
55+
}

0 commit comments

Comments
 (0)