-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question: how can I consume this array of Streams? #639
Comments
I assume it's important to pipe the responses in order? First of all, you'll want to implement const getUsers = (userId) => _(request(`https://www.myapi.com/users?id=${userId}`))
app.get('/', (req, res) => {
_(users).flatMap(getUser).pipe(res);
}) or if you want to do the requests in parallel, app.get('/', (req, res) => {
// Replace 2 with your desired parallelism factor
_(users).map(getUser).parallel(2).pipe(res);
}) I'm honestly not sure why you're getting the "already consumed" errors. It's there because streams are one-time use, but I don't see you reusing any streams, so I'm not sure what's wrong. |
Thank you, @vqvu! Here's what I tried (it is the actual code): const express = require('express')
const request = require('request')
const _ = require('highland')
const app = express()
const api = (id) => request(`https://www.myapi.com/users?id=${id}`)
const parseJSON = (buffer) => JSON.parse(buffer.toString())
const toBuffer = (obj) => {
const string = JSON.stringify(obj)
const buffer = new Buffer(string)
return buffer
}
const fetchEntries = (data) => {
const { submitted } = data
return submitted.filter((e, i) => i < 3)
}
const filterTitles = (entries) => {
return _(entries).flatMap(api)
}
const filter = _.pipeline(
_.map(parseJSON),
_.map(fetchEntries),
_.map(filterTitles),
_.map(toBuffer)
)
app.get('/', async (req, res) => {
_(request('initialRequest')).pipe(filter).pipe(res)
})
app.listen(3000, () => l('http://localhost:3000')) And the response: {
"domain": null,
"_events": {},
"_eventsCount": 2,
"__HighlandStream__": true,
"id": "766078",
"paused": true,
"_incoming": [],
"_outgoing": [],
"_consumers": [],
"_observers": [],
"_destructors": [],
"_send_events": false,
"_nil_pushed": false,
"_delegate": null,
"_is_observer": false,
"_in_consume_cb": false,
"_repeat_resume": false,
"_consume_waiting_for_next": false,
"source": null,
"writable": true
} In fact, it's not working because I expected the responses from the API. |
There's a few things wrong here, so I'll just start at the top.
const filter = (stream) => {
return stream.map(parseJSON)
.map(filterTitles)
.flatMap(fetchEntries)
)
app.get('/', async (req, res) => {
_(requestPromise('initialRequest')).through(filter).pipe(res)
}) |
I had to pass that data to a const express = require('express')
const rp = require('request-promise')
const _ = require('highland')
const app = express()
const parseJSON = (buffer) => JSON.parse(buffer.toString())
const filterSubmissions = (data) => {
const { submitted } = data
return submitted.filter((e, i) => i < 3) // returns the first 3 entries
}
const filter = (stream) => stream.map(parseJSON).map(filterSubmissions)
const getUser = (id) => `https://hacker-news.firebaseio.com/v0/user/${id}.json?print=pretty`
app.get('/', (req, res) => {
_(rp(getUser('user'))).through(filter).pipe(res)
})
app.listen(3000, () => console.log('http://localhost:3000')) |
The reason why I said you didn't need Change const filter = (stream) => stream.map(parseJSON).map(filterSubmissions).map(JSON.stringify); The server outputs when fetching the test
|
Hello, nice folks!
I have a function that makes a request and returns a stream.
I have an array of users' IDs and I want to call
getUser
on each one of them:*Most of the time I don't know how many users I will have in my array.
Now I want to create an array of streams:
What I am trying to do is iterate over
requests
and pipe its response tohttp.ServerResponse
:When I do this I get an error:
Stream already being consumed, you must either fork() or observe()
What I tried is something like
_(requests.fork()).pipe(res)
but it obviously didn't work.It might be something silly that I'm missing but if anyone could point me to what am I doing wrong, or have any hint, I'd really appreciate.
Thank you!
The text was updated successfully, but these errors were encountered: