Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume-Pause in stream is not working as expected #832

Closed
StarkDevelopers opened this issue Mar 12, 2019 · 7 comments · Fixed by #838
Closed

Resume-Pause in stream is not working as expected #832

StarkDevelopers opened this issue Mar 12, 2019 · 7 comments · Fixed by #838

Comments

@StarkDevelopers
Copy link

StarkDevelopers commented Mar 12, 2019

I am trying to export Database having 3 million records to CSV file. Writing to CSV is slow compared to reading from database so it was continue fetching and storing all the rows into memory so it was causing memory exhaustion. So I used pause-resume feature released latest version. I had lots of hope on that, but it didn't work :(

Here is my code what I tried:

	const sql = require('mssql')
	const fs = require('fs')
	const { Writable, Readable } = require('stream')

	let request, pool1
	let count = 0;
	let write = 0;
	let isRequestPaused = false;

	let fsWrite = fs.createWriteStream('./output', {flags: 'a'});

	let fields;
	let opts;

	console.log('Start => ', new Date());

	const config = {
	    user: '...',
	    password: '...',
	    server: '...',
	    database: '...',
	    options: {
		encrypt: true
	    }
	}

	class MyReadableStream extends Readable {
	    constructor(options) {
		super(options);

		this._request = getRequest();

		this._request.on('row', row => {
		    count++;
		    if (count % 100000 === 0) {
		        console.log('Now => ', new Date());
		        console.log('Row => ', count);
		    }
		    if (!this.push(row)) {
		        isRequestPaused = true;
		        console.log('Request Paused =>');
		        this._request.pause();
		    }
		})
	    
		this._request.on('done', result => {
		    console.log('Done');
		    this.push(null);
		    console.log('End => ', new Date());
		})
	    }

	    _read () {
		if (count % 100000 === 0) {
		    console.log('Read row =>');
		}
		if (isRequestPaused) {
		    console.log('Request Resumed =>');
		    this._request.resume();
		    isRequestPaused = false;
		}
	    }
	}

	class MyWritableStream extends Writable {
	    constructor(options) {
		super(options);
	    }

	    _write (chunk, encoding, callback) {
		write++
		if (write % 100000 === 0) {
		    console.log('Write Row => ', write);
		}
		fsWrite.write(JSON.stringify(chunk), null, callback);
	    }

	    _final () {
		console.log('Done writing');
	    }
	}

	function getRequest () {
	    request = pool1.request()
	    
	    request.stream = true

	    request.query('SELECT * FROM largeTable')

	    return request
	}

	async function main () {
	    pool1 = await new sql.ConnectionPool(config).connect()
	    
	    pool1.on('error', error => {
		console.error('Error in connection pool => ', error)
	    })

	    const myReadableStream = new MyReadableStream({
		objectMode: true
	    });
	    const myWritableStream = new MyWritableStream({
		objectMode: true
	    });

	    myReadableStream.pipe(myWritableStream);
	}

	main();

This is what I expected:

Whenever this.push() returns false, request should pause and no more data should fetched until request.resume has been called.

But this is what I observed:

After some data fetched request.pause() has been called and after some time request.resume() has been called at regular interval but it was fetching data even though request was paused.

Here is the video of execution of my program:
https://filebin.net/yuwq0j27ki05ajwz

You can notice that request.pause() and request.resume() are triggered at regular interval. At starting point, network speed was vary between 100Mbps t o 500Mbps. So it was fetching too many records and meanwhile memory utilisation increases rapidly. And then after some point network speed dropped around 1Mbps at that point of time memory utilisation by Node is around 2 GB. So what it did was even though I have called request.pause() it continued to fetch record and store in memory.

So my memory exhaustion problem still persist.

Can anyone please help?
Thanks in advance.

@dhensby
Copy link
Collaborator

dhensby commented Mar 12, 2019

There is an issue template for a reason; please use it.

@dhensby
Copy link
Collaborator

dhensby commented Mar 12, 2019

I've opened #833 as proof that the pause/resume functionality works as expected.

I'm closing this due to lack of information; feel free to add the information with the missing info and I'll look into this issue more

@dhensby dhensby closed this as completed Mar 12, 2019
@StarkDevelopers
Copy link
Author

Hi @dhensby, Thanks for reply , I have tried to Pause the request just after getting first row like:

	this._request = getRequest();

	this._request.on('row', row => {
		count++;
		this._request.pause();
		if (count % 100000 === 0) {
		        console.log('Now => ', new Date());
		        console.log('Row => ', count);
		}
		if (!this.push(row)) {
		        isRequestPaused = true;
		        console.log('Request Paused =>');
		        this._request.pause();
		}
	})

Then logically it should stop fetching the rows but memory utilized by node process was getting increased which means it was fetching the rows from database even though it was in pause state. In pause state, I think it just stops emitting events only. So memory exhaustion problem is still there. Correct me if I am wrong.

@arthurschreiber
Copy link
Contributor

This was fixed in tedious via tediousjs/tedious#878.

@arthurschreiber
Copy link
Contributor

The fix for this is part of the tedious@6.0.1 release. 🎉

@dhensby dhensby mentioned this issue Mar 16, 2019
@kuldeep-argusoft
Copy link

@dhensby

I am waiting for this release. So when should I expect stable release?

@dhensby
Copy link
Collaborator

dhensby commented May 7, 2019

There's no timeline currently defined for moving 6.0 into stable.

I could move it into beta in the coming days

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants