You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm looking for a way to combine WritableStream with async await. I saw some old issues in this repository related to this problem and didn't find any relevant solution.
What am I trying to do:
Get file stream from S3
Parse file with streams using htmlparser2
During parsing, insert chunks of data to Kafka
Example of code is following:
constreadable=getFileStreamFromS3()letbuffer=[]constparser=newWStream({onopentag(name,attributes){// ... collect data on open tag},ontext(text){// collect text on text},asynconclosetag(tagname){buffer.push(data)if(buffer.length>=100){awaitpushToKafka(buffer)buffer=[]}},},{xmlMode: true},)returnnewPromise(resolve=>readable.pipe(writableStream).on('finish',resolve))
However, it doesn't seem to work. Any ideas on how to implement this feature?
The text was updated successfully, but these errors were encountered:
To combine a writable stream with async/await, you need to handle the asynchronous operations within the write method of the writable stream. However, the current approach you've shown is attempting to use async/await inside an event handler, which won't work as expected. Instead, you should handle the asynchronous operation inside the write method itself. Here's how you can refactor your code to achieve this:
const { Writable } = require('stream');
const { getFileStreamFromS3, pushToKafka } = require('./your-utils');
const readable = getFileStreamFromS3();
const parser = new Writable({
write(chunk, encoding, callback) {
// Parse chunk using htmlparser2
// For example, assuming parseChunk is a function that parses the chunk
parseChunk(chunk).then(async (parsedData) => {
// Insert parsed data into Kafka
await pushToKafka(parsedData);
callback(); // Call the callback to indicate that the chunk has been processed
}).catch(err => {
callback(err); // If an error occurs, pass it to the callback
});
}
});
// Pipe the readable stream to the parser
readable.pipe(parser);
// Handle finish event to resolve the promise when parsing is done
return new Promise((resolve, reject) => {
parser.on('finish', resolve);
parser.on('error', reject); // Handle any errors that occur during parsing
});
I'm looking for a way to combine WritableStream with async await. I saw some old issues in this repository related to this problem and didn't find any relevant solution.
What am I trying to do:
Example of code is following:
However, it doesn't seem to work. Any ideas on how to implement this feature?
The text was updated successfully, but these errors were encountered: