From 105e8d0e776479d09070854ce5064c7ee046debf Mon Sep 17 00:00:00 2001 From: Minigugus <43109623+Minigugus@users.noreply.github.com> Date: Wed, 8 Jan 2020 18:18:25 +0100 Subject: [PATCH] Add for-await support for streaming --- README.md | 19 ++++++++++++++++++- lib/index.js | 19 +++++++++++++++++++ tests/index.js | 24 ++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d0d2f32c..8da39460 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,9 @@ const users = await sql` ``` -## Stream ```sql` `.stream(fn) -> Promise``` +## Streams + +### ```sql` `.stream(fn) -> Promise``` If you want to handle rows returned by a query one by one you can use `.stream` which returns a promise that resolves once there are no more rows. ```js @@ -121,6 +123,21 @@ await sql` ``` +### With `for-await` + +If you are using a recent version of Node.JS (> v10), can use the new `for-await` to consume streams properly. +```js + +for await (const row of await sql` + select created_at, name from events +`) { + // row = { created_at: '2019-11-22T14:22:00Z', name: 'connected' } +} + +// No more rows + +``` + ## Listen and notify When you call listen, a dedicated connection will automatically be made to ensure that you receive notifications in realtime. This connection will be used for any further calls to listen. diff --git a/lib/index.js b/lib/index.js index cb0994c6..22191a96 100644 --- a/lib/index.js +++ b/lib/index.js @@ -169,6 +169,25 @@ function Postgres(url, options) { }) promise.stream = (fn) => (query.stream = fn, promise) + + if (Symbol.asyncIterator) + promise[Symbol.asyncIterator] = () => { + let locked = [], resolved = []; + query.stream = row => locked.length ? locked.pop()({ done: false, value: row }) : resolved.push(Promise.resolve({ done: false, value: row })); + promise.then(() => locked.length ? locked.pop()({ done: true }) : resolved.push(Promise.resolve({ done: true }))) + return { + return() { + // TODO Abort the query properly + return Promise.resolve({ done: true }); + }, + next() { + return resolved.length ? resolved.pop() : new Promise(res => locked.push(res)); + }, + throw() { + return Promise.resolve({ done: true }); + } + } + }; return promise } diff --git a/tests/index.js b/tests/index.js index 7564c5a3..30b51730 100644 --- a/tests/index.js +++ b/tests/index.js @@ -765,6 +765,30 @@ t('Stream works', async() => { return [1, result] }) +t('Stream works with for-await', async() => { + let result + for await (const { x } of sql`select 1 as x`) + result = x + return [1, result] +}) + +t('Stream works with for-await with big results', async() => { + let result = 0 + for await (const { generate_series: x } of sql`select * from generate_series(1, 100000)`) + result += x; + return [4699408878, result] +}) + +t('Stream works with for-await with interruption', async() => { + let result = 0 + for await (const { generate_series: x } of sql`select * from generate_series(1, 100000)`) { + result += x; + if (result > 10000) + break; + } + return [10450, result] +}) + t('Stream returns empty array', async() => { return [0, (await sql`select 1 as x`.stream(x => {})).length] })