Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ const users = await sql`

```

## Stream ```sql` `.stream(fn) -> Promise```
## Streams

### ```sql` `.stream(fn) -> Promise```

If you want to handle rows returned by a query one by one you can use `.stream` which returns a promise that resolves once there are no more rows.
```js
Expand All @@ -121,6 +123,21 @@ await sql`

```

### With `for-await`

If you are using a recent version of Node.JS (> v10), can use the new `for-await` to consume streams properly.
```js

for await (const row of await sql`
select created_at, name from events
`) {
// row = { created_at: '2019-11-22T14:22:00Z', name: 'connected' }
}

// No more rows

```

## Listen and notify

When you call listen, a dedicated connection will automatically be made to ensure that you receive notifications in realtime. This connection will be used for any further calls to listen.
Expand Down
19 changes: 19 additions & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,25 @@ function Postgres(url, options) {
})

promise.stream = (fn) => (query.stream = fn, promise)

if (Symbol.asyncIterator)
promise[Symbol.asyncIterator] = () => {
let locked = [], resolved = [];
query.stream = row => locked.length ? locked.pop()({ done: false, value: row }) : resolved.push(Promise.resolve({ done: false, value: row }));
promise.then(() => locked.length ? locked.pop()({ done: true }) : resolved.push(Promise.resolve({ done: true })))
return {
return() {
// TODO Abort the query properly
return Promise.resolve({ done: true });
},
next() {
return resolved.length ? resolved.pop() : new Promise(res => locked.push(res));
},
throw() {
return Promise.resolve({ done: true });
}
}
};

return promise
}
Expand Down
24 changes: 24 additions & 0 deletions tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,30 @@ t('Stream works', async() => {
return [1, result]
})

t('Stream works with for-await', async() => {
let result
for await (const { x } of sql`select 1 as x`)
result = x
return [1, result]
})

t('Stream works with for-await with big results', async() => {
let result = 0
for await (const { generate_series: x } of sql`select * from generate_series(1, 100000)`)
result += x;
return [4699408878, result]
})

t('Stream works with for-await with interruption', async() => {
let result = 0
for await (const { generate_series: x } of sql`select * from generate_series(1, 100000)`) {
result += x;
if (result > 10000)
break;
}
return [10450, result]
})

t('Stream returns empty array', async() => {
return [0, (await sql`select 1 as x`.stream(x => {})).length]
})
Expand Down