Skip to content

Bulk helper semaphore handling results in hanging await for long-running requests (exceeding flushInterval) #1562

Closed
@brentmjohnson

Description

@brentmjohnson

🐛 Bug Report

Bulk helper hangs forever when flushInterval is exceeded while iterator is already awaiting semaphore.

To Reproduce

Steps to reproduce the behavior:

  1. Run below code against a test cluster - should complete successfully
  2. Simulate long running server-side operation exceeding configured flushInterval. Multiple ways to do this but one way is to modify the compiled Helpers.js with the following:
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions, async (err, { body }) => {
await new Promise(resolve => setTimeout(resolve, flushInterval));
if (err) return callback(err, null)
  1. Re-run below code and watch the hanging await caused by onFlushTimeout() invoked on a payload already awaiting semaphore()

Paste your code here:

'use strict'
const util = require("util")
const { Readable } = require('stream');
const { Client } = require('@elastic/elasticsearch');

async function* generator() {
    let i = 0
    while (i < 10) {
        await new Promise(resolve => setTimeout(resolve, 1 * 1000));
        yield { i: i++ };
    }
};

const readableStream = Readable.from(generator());

const elasticClient = new Client({
    [TESTCLUSTER]
});

(async () => {
    const bulkHelper = elasticClient.helpers.bulk({
        flushBytes: 43,
        concurrency: 1,
        datasource: readableStream,
        onDocument(doc) {
            console.log(doc)
            return {
                index: { _index: 'semaphoretest' }
            }
        },
        onDrop(doc) {
            console.log(doc);
        }
    }).catch((err) => {
        console.error(err);
    });

    while (util.inspect(bulkHelper).includes('pending')) {
        await new Promise(resolve => setTimeout(resolve, 1 * 1000));
        console.log('...waiting');
    }

    console.log(await bulkHelper);
})();

Expected behavior

Bulk helper awaits gracefully for queued requests to complete, error, or timeout.

Paste the results here:

{ i: 0 }
...waiting
{ i: 1 }
...waiting
...waiting
...waiting
...[forever]

Your Environment

  • node version: v14.17.6
  • @elastic/elasticsearch version: >=7.15.0
  • os: Linux

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions