Skip to content

Conversation

@Parsoolak
Copy link

This PR introduces streaming support to saraf/qb, allowing you to handle large result sets efficiently with a simple, chainable API. Instead of fetching all results at once, you can now stream rows one by one.

🆕 Example Usage

$dbFactory->getQueryBuilder()
    ->select()
    ->from("Users")
    ->addColumn("id")
    ->whereGreater("id", 1)
    ->compile()
    ->stream()
    ->onError(function (Exception $result) {
        echo "Error " . $result->getMessage() . PHP_EOL;
    })
    ->onData(function ($result) {
        echo "New Row Data:" . json_encode($result) . PHP_EOL;
    })
    ->onClosed(function () {
        echo "Task Finished";
    })
    ->run();

✅ Supports:

1. onData(callable $row) for each row
2. onError(callable $e) for error handling
3. onClosed(callable) when stream ends

🛠️ Implementation Notes

Built on top of ReadableStreamInterface pattern
Stream object handles query execution internally, emitting rows as they come
Easy to integrate into existing query chains

💡 Ideal for cases where:

You need to process a large number of rows without loading everything into memory
You want real-time or async-style processing of DB data
You’re building long-running CLI tools or daemons

@hasanparasteh
Copy link
Contributor

This PR introduces streaming support to saraf/qb, allowing you to handle large result sets efficiently with a simple, chainable API. Instead of fetching all results at once, you can now stream rows one by one.

🆕 Example Usage

$dbFactory->getQueryBuilder()
    ->select()
    ->from("Users")
    ->addColumn("id")
    ->whereGreater("id", 1)
    ->compile()
    ->stream()
    ->onError(function (Exception $result) {
        echo "Error " . $result->getMessage() . PHP_EOL;
    })
    ->onData(function ($result) {
        echo "New Row Data:" . json_encode($result) . PHP_EOL;
    })
    ->onClosed(function () {
        echo "Task Finished";
    })
    ->run();

✅ Supports:

1. onData(callable $row) for each row
2. onError(callable $e) for error handling
3. onClosed(callable) when stream ends

🛠️ Implementation Notes

Built on top of ReadableStreamInterface pattern
Stream object handles query execution internally, emitting rows as they come
Easy to integrate into existing query chains

💡 Ideal for cases where:

You need to process a large number of rows without loading everything into memory
You want real-time or async-style processing of DB data
You’re building long-running CLI tools or daemons

I just changed run method to return Deferred promise! This way we maintain pattern across streams and normal queries. Also onClosed and onError removed from user api to avoid bad usage

for example:

$dbFactory->getQueryBuilder()
    ->select()
    ->from("Users")
    ->addColumn("id")
    ->whereGreater("id", 1)
    ->compile()
    ->stream()
    ->onData(function ($result, $carry) {
        echo "New Row Data:" . json_encode($result) . PHP_EOL;
        // process data and return $carry
        return $carry;
    })
    ->run(0); // run accepts $initialValue for the $carry

@hasanparasteh hasanparasteh merged commit 9493104 into main Apr 5, 2025
1 check passed
@hasanparasteh hasanparasteh deleted the stream branch April 5, 2025 09:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants