Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using connection.query(...).stream() with promise wrapper #677

Open
jomel opened this issue Nov 13, 2017 · 14 comments
Open

using connection.query(...).stream() with promise wrapper #677

jomel opened this issue Nov 13, 2017 · 14 comments

Comments

@jomel
Copy link

jomel commented Nov 13, 2017

why can't I use connection.query(...).stream() with promise wrapper?
(you get error: TypeError: connection.query(...).stream is not a function)

I'd quite like to be able to do:
let q = await connection.query(someSQL);
as well as

const s1 = connection.query(veryBiqSqlResult).stream();
s1.on('result', async function(row) {...})

in the same script, but atm I need promise api for the first and standard api for the second...

@sidorares
Copy link
Owner

sidorares commented Nov 19, 2017

maybe we need to wait for async generators to land: http://node.green/#ESNEXT-candidate--stage-3--Asynchronous-Iterators-for-await-of-loops

for await (const row of connection.query(veryBiqSqlResult).stream()) {
  console.log(row);
}

( not sure if I can mix buffering .query() => Promise version with non-buffering detecting that .stream() is added, maybe we need another name: `for await (const row of connection.queryStream(veryBiqSqlResult)) {})

You can access non-promise api from wrapper as connection.connection - see

this.connection = connection;

const [rows] = await connection.query(smallSqlResult);
await new Promise((accept, reject) => {
  const s1 = connection.connection.query(veryBiqSqlResult);
  s1.on('result', function(row) {...})
  s1.on('end', accept);
  s1.on('error', reject);
})

@derN3rd
Copy link
Contributor

derN3rd commented Mar 25, 2019

One note to the connection.connection.query(...).stream() part:
I had problems with it, until I saw the comment #770 (comment)
After removing .stream() it worked perfectly

@sidorares
Copy link
Owner

Oh, thanks @derN3rd I'll update my example here

@sidorares
Copy link
Owner

yes, stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway

@terrisgit
Copy link

terrisgit commented Feb 19, 2020

If you shut down the database while processing a large rowset after the 'result' handler is called at least once, neither end() nor error() are ever called. I'll try to create a test program.

@sidorares
Copy link
Owner

@terrisgit try setting enableKeepAlive option to true - see discussion in #1081

Maybe we should change that to be true by default, but not sure if there might be some undesired side effects

@terrisgit
Copy link

terrisgit commented Feb 29, 2020

This promisified example writes a result set to a CSV file. It handles all possible errors (mysql connection, SQL, stream write) without crashing or hanging. Feedback wanted.

const csvstringify = require('csv-stringify');
const fs = require('fs');

const outputStream = fs.createWriteStream('output.csv', {encoding: 'utf8'});

const finishedWriting = new Promise((resolve, reject)=>
  outputStream.on('finished', resolve).on('error', reject));

const BOM = '\ufeff'; // Microsoft Excel needs this
outputStream.write(BOM);

const connection = __Create a mysql2 Connection object here__
const generator = connection.connection.query('SELECT...');
let recordsProcessed = 0;

try {
  await new Promise((resolve, reject) => {
    // When using a connection pool, the 'error' connection event is called only when
    // enableKeepAlive is true. See:
    // https://github.com/sidorares/node-mysql2/issues/677#issuecomment-588530194
    // Without this handler, this code will hang if the database connection is broken
    // while reading the result set.
    connection.on('error', reject);

    generator
      .on('result', row => ++recordsProcessed) // Counting rows just as an example
      .stream({highWaterMark: 10})
      .on('error', reject)
      .on('end', resolve)
      .pipe(csvstringify({header: true}))
      .pipe(outputStream)
      .on('error', error => {
        // Handle stream write error
        // See also https://github.com/sidorares/node-mysql2/issues/664
        // Data is being sent from server to client; without calling destroy, the
        // connection will go back to the pool and will be unusable. The
        // callback provided to destroy() is never called.
        connection.destroy(); // This appears to cause file descriptor leaks
        reject(error);
      });
  });
}
finally {
  connection.on('error', error => console.log(error)); // Remove the handler. Is there a better way?
}

await finishedWriting;

@terrisgit
Copy link

terrisgit commented Mar 2, 2020

I've been doing a lot of error testing. The above code can be forced to fail in many ways but I've been focusing on write errors which you can force via outputStream.close() after you create it. It seems to leak memory if you run the above code in a loop when closing the outputStream. It's not a typical runtime error anyway so it's not a concern.

@eyalroth
Copy link

eyalroth commented Jul 19, 2021

Update: Do not use the following workaround -- it is unsafe. See nodejs/node#39722


I ended up with this workaround, if someone's interested:

const unpromisedConnection = connection.connection;
const queryUnpromised = unpromisedConnection.query.bind(unpromisedConnection);

connection.streamQuery = async (sql, values, streamOptions) =>
  new Promise((resolve, reject) => {
    const query = queryUnpromised(sql, values);
    const stream = query.stream(streamOptions);

    stream.once('error', reject);
    query.once('fields', () => {
      stream.removeListener('error', reject);
      resolve(stream);
    });
    query.once('error', reject);
  });

This adds the method streamQuery to the connection which accepts the same arguments as connection.query(sql, values).stream(streamOptions), and returns a promise that is resolved with the stream once the result set returns from the connection before the first result arrives, or rejected if the query fails.

The resolved stream may still emit error events IIUC, so those should still be handled by the caller:

connection.streamQuery('SELECT blabla malformed sql'); // promise reject

const stream = await connection.streamQuery('SELECT name FROM employees');
stream.on('error', console.log);
stream.pipe(...);

@eyalroth
Copy link

Following problems with error handling that I've encountered while trying to use queue composition and promises/async functions -- described in nodejs/node#39722 -- I feel like it would be unsafe to add any kind of support for promisified queue API, other than perhaps injecting the queue method to the promise returned from the query method if that's possible.

@marnixhoh
Copy link

marnixhoh commented Oct 1, 2021

Just a note for anyone running into this issue. I had to actually include .stream() to make it work, as opposed to not calling it as suggested in this comment:

One note to the connection.connection.query(...).stream() part: I had problems with it, until I saw the comment #770 (comment) After removing .stream() it worked perfectly

My code ended up looking something like this:

connection.connection.query('SELECT * FROM my_table')
    .stream()
    .pipe(someStream)

note:
connection.connection is used as opposed to just connection as this accesses the non-promise API.

@eyalroth
Copy link

eyalroth commented Oct 2, 2021

  1. It is not possible to stream individual columns, therefore you must use * to get all of them. For more info: see the docs

Interesting. I wonder whether this works:

connection.connection.query('SELECT * FROM (SELECT column_a FROM my_table)')
    .stream()
    .pipe(someStream)

@marnixhoh
Copy link

@eyalroth I have no idea... I think the docs are a little unclear on this. In case you're interested, I have asked a somewhat similar question here: mysqljs/mysql#2523

@marnixhoh
Copy link

@eyalroth I just wanted to let you (and others) know that I misinterpreted the docs. I have edited my comment accordingly.

It is indeed possible to stream individual columns. However, it is not possible to stream individual fields. So a large field/entry can not be streamed and will be buffered in its entirety. I hope this clarifies it. More info

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants