-
Notifications
You must be signed in to change notification settings - Fork 439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implements Request.pause() / resume() #518
Conversation
I tried you branch, and I have the following error using this code: const Connection = require('tedious').Connection;
const Request = require('tedious').Request;
const connection = new Connection({
server: '???',
userName: '???',
password: '???',
options: {
database: '???'
}
});
connection.on('connect', (err) => {
if (err) {
throw err;
}
const request = new Request("select * from RANDOM_TABLE", (err2, rowCount) => {
if (err2) {
throw err2;
}
console.log(rowCount + ' rows');
connection.close();
});
request.on('row', (columns) => {
console.log(columns.length);
request.pause();
setTimeout(() => {
request.resume();
}, 500);
});
connection.execSql(request);
}); Error:
The table contains 20 rows. I tried to pause the stream every 5 rows, but I had the same error. I'm using node 7.6 on Ubuntu x64.
Am I misusing the feature ? Don't hesitate if you wan any more feedback / test. |
@Congelli501 Thanks for your bug report. I will analyze the problem and provide a fix. |
@Congelli501 I have fixed the problem. It happened when there were only a few records left when pause() was called. I have added another test case to verify that this problem is solved. Could you please test again? |
Hey @chdh, Thank you very much for this PR - it definitely shows that pausing and resuming requests is possible in But as you mentioned over in #512:
I think so as well. Tedious internals should be a bunch of streams that are piped into each other, so that Would you be interested in helping to transform the tedious internals for this? I have played with a few ideas for this, and we could open an issue to discuss this further. If not, I believe these changes could be merged as they are, I'm just concerned that doing so might make tedious internals even more complex (and fragile) than they already are. 🤔 I'm scheduling this for the release after the planned 1.15.0 release, as I don't want to delay the 1.15.0 release even further. /cc @tvrprasad What do you think? |
@arthurschreiber, thanks for your comment. I will try to simplify the PR. |
I have simplified the PR to apply backpressure (pause/resume) only to the last stream (the token stream parser transform). The current solution bridges the backpressure gap between the token stream parser transform and the packet stream transform. This bridging is only a few lines and can later be replaced by Stream.pipe(). That way we have a "minimal invasive" patch with the data flow control concept we want in the long term. |
@chdh pause's great, but what do you think about timeout? should it pause too? |
@ElfenLiedGH Is it the request timeout? The default value for options.requestTimeout is 15 seconds. For a large table export or a complex query, this is too short anyway, whether pause/resume is used or not. It would be good if we could override the options.requestTimeout value for a single request, maybe with a new method Request.setTimeout()? |
From the "Client Request Timer" section of the TDS specification:
This means that as soon as we received the first packet from the response message for a query, we need to stop the request timer. If the user wants some sort of time out on the processing of the data, it's up to them to define a more suitable timer. |
You are right, but according to the documentation of SqlCommand.CommandTimeout in the .NET API, the timeout can also occur between two consecutive rows.
I don't know how we could implement this correctly. We probably don't want to start a timer after each 'row' event. |
Weird. This is not how |
@ElfenLiedGH please have a look at the new version. Does it solve your problem? |
@chdh @arthurschreiber None of the Microsoft implemented drivers implement the notion of pause/resume. The reasoning as I understand is that you can't really slow down how fast the server sends data as there is no support this throttling in TDS specification. So the memory pressure is going to build one place or the other in the system. Is there something different about NodeJS that it makes sense here? Curious to know. @arthurschreiber Agree with not delaying release of 1.15. I'll need to read up on streaming in NodeJS before I can share thoughts on it :-) |
src/connection.js
Outdated
@@ -522,7 +534,15 @@ class Connection extends EventEmitter { | |||
this.socket.on('close', this.socketClose); | |||
this.socket.on('end', this.socketEnd); | |||
this.messageIo = new MessageIO(this.socket, this.config.options.packetSize, this.debug); | |||
this.messageIo.on('data', (data) => { this.dispatchEvent('data', data); }); | |||
this.messageIo.on('data', (data) => { | |||
this.clearRequestTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do a separate PR for this bug fix? Though this got caught as part of this work, I think we can call it unrelated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I could put this into a separate PR, but it's not really a problem as long as you don't have a large result set or pause/resume. There was a real bug with clearRequestTimer() which I already fixed in PR #527.
@arthurschreiber what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A separate PR would be nice, so this can be part of the next release. ❤
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, it's in #530 now.
src/request.js
Outdated
// Returns true if this request is the currently active request of the connection. | ||
isActive() { | ||
return this.connection && this.connection.request === this && this.connection.state === this.connection.STATE.SENT_CLIENT_REQUEST; | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This introduces pretty deep coupling between the Connection class and Request class. Perhaps this suggests pause/resume methods should be on the Connection class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have multiple active result sets (MARS) in the future, we need pause/resume per request. But you are right, the deep coupling is not clean. I will move this logic into the Connection class, but keep the pause/resume methods in the Request class.
It's a general problem that we cannot indicate public/protected/private with class methods and properties. Therefore I suggest to use TypeScript (see #512).
@tvrprasad The need for pause/resume only arises in an asynchronous environment like Node.js. In the traditional programming environments with synchronous read-next calls, the data flow control is implicit. All Microsoft database drivers implement this implicit data flow control. Example with MS-Access / VBA:
In this example with MS-Access, the memory is not filled up, even if HugeTable has tens of millions of records. If you do the same thing with the current Tedious release, the memory fills up and after a few seconds the Node.js runtime will crash. Without pause/resume there is no way to control the data flow. The only solution is using SQL queries that don't produce a lot of data. |
src/connection.js
Outdated
@@ -522,7 +534,15 @@ class Connection extends EventEmitter { | |||
this.socket.on('close', this.socketClose); | |||
this.socket.on('end', this.socketEnd); | |||
this.messageIo = new MessageIO(this.socket, this.config.options.packetSize, this.debug); | |||
this.messageIo.on('data', (data) => { this.dispatchEvent('data', data); }); | |||
this.messageIo.on('data', (data) => { | |||
this.clearRequestTimer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A separate PR would be nice, so this can be part of the next release. ❤
let paused = false; | ||
openConnection(); | ||
|
||
function openConnection() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you refactor these test cases to make use of common 'setUp' and 'tearDown' functions? See the nodeunit documentation of you need more information for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have refactored the pause/resume test cases with setUp/tearDown.
Isn't the memory being filled up at the network layer (perhaps the TCP buffers), in that case? I assume the server is still sending data as fast as it can. |
TCP has built in flow control and the sender will slow down sending data to
the receiver automatically.
|
Right, thought about that but it was not clear to me how that'd kick in here. The TCP layer on the client would have to stop sending ACKs while it's in the pause mode to throttle the server sending data. But then I thought that would cause TCP timeouts, retransmissions and ultimately errors on the server side. Now I see that the pause has to be really long for that to happen. Presumably server has mechanisms to handle slow clients. Makes sense now. Thanks :-) Microsoft drivers like ADO.net support Async mode API but don't have pause/resume. So they'd be running into this issue, right? I wonder why there is not a demand for that support in those drivers. |
It's another concept. In the .NET API, you have to call SqlDataReader.ReadAsync() for each row. When you stop calling ReadAsync(), the row stream is paused implicitly. With Tedious and other Node libraries, the rows are continuously delivered via 'row' events and you have to call pause() explicitly. |
@chdh Thanks for the clarification! Certainly learnt some stuff from this thread :-) I'll send you a few more comments on this PR. |
src/token/token-stream-parser.js
Outdated
addBuffer(buffer) { | ||
return this.parser.write(buffer); | ||
} | ||
|
||
// Writes an end-of-message (EOM) marker into the parser transform input | ||
// queue. StreamParser will emit a 'data' event with an 'endOfMessage' | ||
// pseudo token when the EON marker has passed through the transform stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EON => EOM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np :-)
src/connection.js
Outdated
this.messageIo.on('data', (data) => { | ||
this.clearRequestTimer(); | ||
const ret = this.dispatchEvent('data', data); | ||
if (ret === false && this.state === this.STATE.SENT_CLIENT_REQUEST) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just move this chunk of code under SENT_CLIENT_REQUEST.data so we don't have to do these comparisons? The reliance on return value from dispatchEvent feels especially fragile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will move it to SENT_CLIENT_REQUEST.data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
test.ok(!socketRs.flowing, | ||
'Socket is not paused.'); | ||
} | ||
test.ok(socketRs.length >= Math.min(socketRs.highWaterMark - 512, 0x4000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please define consts for these numbers to describe what they represent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but it will not make it much clearer, because these are more heuristic values, not exact science. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A brief comment would be helpful.
const packetTransformRs = connection.messageIo.packetStream._readableState; | ||
test.ok(!packetTransformRs.flowing, | ||
'Packet transform is not paused.'); | ||
test.ok(packetTransformWs.length <= packetTransformWs.highWaterMark && packetTransformRs.length <= packetTransformRs.highWaterMark, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line too long. Perhaps you can break them into two assertions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
|
||
function fail(msg) { | ||
if (failed) { | ||
return; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting - move } to new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I wonder why the linter didn't see that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea. @arthurschreiber ?
} | ||
|
||
function processRow(columns) { | ||
if (canceled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
line:193 validates rowCount for cancelled request but rowCount is not updated here in canceled state. Should it be? What's the expectation in terms of getting 'row' events after canceling a paused request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API documentation does not specify whether rows are emitted after cancel(). The current implementation does this, but it's not specified as part of the API. It also depends on what the server does when it receives the ATTENTION message. The current test case assumes that the reception of rows after cancel() is undefined and ignores them.
I don't know what to do in the special case when a request is paused at the time of the cancel. The current implementation releases the pause when the driver switches to the SENT_ATTENTION state. This has the effect that the remaining rows are emitted, until the server stops sending more rows and terminates the active request. We could change that behavior and block the remaining rows from being emitted, when the request was in a paused state at the time of the cancel.
@arthurschreiber what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From TDS spec:
"The client can interrupt and cancel the current request by sending an Attention message. This is also known as out-of-band data, but any TDS packet that is currently being sent MUST be finished before sending the Attention message. After the client sends an Attention message, the client MUST read until it receives an Attention acknowledgment. "
Either way, blocking or emitting remaining rows, seems consistent with the spec as long as we emit all the rows before sending the ECANCEL and the code seems to do that correctly. As far as I'm concerned we can keep the current behavior.
I would modify the test though to validate the current implementation. I think it's best to be intentional if we decide to change the implementation in future, and make the correspond test modification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some thoughts, I think it's better to suppress further 'row' events after a paused request has been canceled or timed out. This is what the application expects. If the application explicitly wants to receive the remaining rows on a paused request, it can call resume() before calling cancel().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this is a better approach. This prevents the possibility of an application bug where it chokes on 'row' events coming in after 'cancel'. Making it difficult to write bugs is good :-)
// This test reads only a few rows and makes a short pause after each row. | ||
// The test verifies that: | ||
// - Pause/resume works correctly when applied after the last packet of a TDS | ||
// message has already been dispatched by MessageIO.ReadablePacketStream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a pause/resume after each row. Which part of the code validates this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is implicitly tested since there is a pause after the last row...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this test is a reaction to the problem reported by Congelli501 (second message in #518),
The problem was that the last packet (with the EOM mark set) has already been received and processed when there are still rows in the queue of the token parser transform. With the first test case which uses 200,000 rows and stops only once after 50'000 rows, the error could not be detected, because the end of the TDS message was still far away when stop() was called. In this second test case, all the rows probably fit within a single packet. After the first 100ms delay, the 'message' event has surely been emitted by the MessageIO class, because it has already detected the reception of the last packet of the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for the explanation.
|
||
// Temporarily suspends the flow of data from the database. | ||
// No more 'row' events will be emitted until resume() is called. | ||
pause() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove pause/resume on Request class and leave them on Connection class only? These seem roughly equivalent of cancel() which is on Connection class only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. From the API point of view, cancel() should be a member of the Request class. It's the request that is beeing canceled, not the connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. Opened #534 to add 'cancel' on Request and deprecate it on Connection.
function onRequestCompletion(err) { | ||
requestCount++; | ||
if (requestCount == requestToCancel) { | ||
test.ok(err && err.code == 'ECANCEL'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change '==' to '===' and '!=' to '!==' in the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
src/connection.js
Outdated
sendDataToTokenStreamParser(data) { | ||
return this.tokenStreamParser.addBuffer(data); | ||
} | ||
|
||
pauseRequest(request) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'request' parameter is not needed on pause/resume as there can only be one active request at a time on connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only an internal method that is called from Request.pause(). It has to check whether this Request object represents the currently active request. I will add a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
// The test verifies that: | ||
// - Pause/resume works correctly when applied after the last packet of a TDS | ||
// message has already been dispatched by MessageIO.ReadablePacketStream. | ||
// (EOM / packet.isLast() has already been detected when pause() is called. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (EOM / packet.isLast() => (EOM / packet.isLast())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the end-bracket it at the end of the paragraph. But I will remove the brackets and reformulate the sentences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. Thanks.
// This test reads only a few rows and makes a short pause after each row. | ||
// The test verifies that: | ||
// - Pause/resume works correctly when applied after the last packet of a TDS | ||
// message has already been dispatched by MessageIO.ReadablePacketStream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this is implicitly tested since there is a pause after the last row...
Only the two new methods Request.pause() and Request.resume() are intended to be part of the public Tedious API. All other changes are internal. The reasons that these two methods are members of the Request class and not of the Connection class are:
It's true that with the current implementation only a single Request object can be active at a time. But thats the internal view and we always have to think both sides, the internal implementation side and the external API user / application side. The application might still have old instances of Request objects that are no longer active. When pause/resume is called accidentally on an inactive Request object, the implementation must detect this and ignore the call (or throw an exception). This is the reason why the pause/resume methods check whether the associated Request object represents the currently active request. I hope you understand the reasons for these design principles. I would like to also write a first version of a streaming BulkLoad implementation (#523), because I need that for my customer projects and I need it soon. But I don't have time to explain and justify everything in this detail. |
@chdh Thanks for the detailed design rationale, makes sense and sounds good. [rant:begin] It's too bad that we can't really stop anyone from using the internal pauseRequest/resumeRequest on the connection object. I guess that's life with JavaScript. [rant:end] |
You can prepend protected methods by an underscore. |
@Congelli501 Glad I ranted :-) Opened issue #536 to track this. Thanks! |
Also, let's not access Node.js internal APIs inside the tests.
Heya @chdh, thank you so much for this contribution! I cleaned up the test cases a bit and fixed one tiny issue with paused requests and connection closing, but I'm really happy to have these changes be finally part of I'll review the other contributions that were made since the last release and push a new version including these changes as soon as possible. I'll also try to get streaming bulk data loads into another release shortly after. Again, thank you so much for this contribution and sorry that it has taken so long to get this merged. ❤️ |
@arthurschreiber |
Hey @arthurschreiber. Any word on when you'll release to npm? |
@chdh |
Here is the stack
|
@v-suhame this is actually my fault. 😥 I’ll open a pull request to fix the flaky tests soon. |
@arthurschreiber @v-suhame I had similar effects with my version of the tests. I suggest to repeat the isPaused() test for a couple of times, with a delay for each retry. |
Is there a way for my problem? |
@hosseinGanjyar If pause/resume was implemented in node-mssql, you could call |
@chdh I am using the upper layer library |
@gsamal The pause mechanism works by applying back-pressure on the internal chain of streams. Each stream continues to buffer data until the high water mark is reached. Then it applies back-pressure to the next preceding stream in the chain. Could you open a new issue and supply some more detailed information about the effect you are investigating? How much does the memory grow after pause() was called. The best way would be to provide an isolated test case that allows us to reproduce the problem, |
@chdh Okay. But it seems to be keep adding records in a buffer somewhere, only pause the internal chain of streams. I can see a similar issue reported here - node-mssql#832. Can you please take a look at it? In a simpler sense, if I pause the request after getting n records, then I monitor the network and memory, it keeps increasing. |
The fix for this is part of the |
This pull request adds the two methods Request.pause() and Request.resume() to the public Tedious API. They can be used to pause and resume the flow of data rows from a query result. A test case is included.
See #181 for the reasons why this extension is important.