Skip to content

Commit

Permalink
feat: Allow requests to be paused and resumed.
Browse files Browse the repository at this point in the history
This adds two new methods to the `Request`s, `.pause` and `.resume`, that allow
pausing and resuming a request. A paused request will stop emitting `row` events,
which allows better control of memory usage when handling very large responses from SQL Server.
  • Loading branch information
chdh authored and arthurschreiber committed Oct 12, 2017
1 parent 7727b5a commit 9ef0d58
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 7 deletions.
66 changes: 61 additions & 5 deletions src/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ class Connection extends EventEmitter {
if (this.config.options.rowCollectionOnDone) {
this.request.rst.push(token.columns);
}
return this.request.emit('row', token.columns);
if (!(this.state === this.STATE.SENT_ATTENTION && this.request.paused)) {
this.request.emit('row', token.columns);
}
} else {
this.emit('error', new Error("Received 'row' when no sqlRequest is in progress"));
return this.close();
Expand Down Expand Up @@ -641,6 +643,12 @@ class Connection extends EventEmitter {
}
});

this.tokenStreamParser.on('endOfMessage', () => { // EOM pseudo token received
if (this.state === this.STATE.SENT_CLIENT_REQUEST) {
this.dispatchEvent('endOfMessageMarkerReceived');
}
});

this.tokenStreamParser.on('resetConnection', () => {
return this.emit('resetConnection');
});
Expand All @@ -649,6 +657,12 @@ class Connection extends EventEmitter {
this.emit('error', error);
return this.close();
});

this.tokenStreamParser.on('drain', () => {
// Bridge the release of backpressure from the token stream parser
// transform to the packet stream transform.
this.messageIo.resume();
});
}

connect() {
Expand Down Expand Up @@ -766,7 +780,7 @@ class Connection extends EventEmitter {
}

if (this.state && this.state.exit) {
this.state.exit.apply(this);
this.state.exit.call(this, newState);
}

this.debug.log('State change: ' + (this.state ? this.state.name : undefined) + ' -> ' + newState.name);
Expand Down Expand Up @@ -973,10 +987,33 @@ class Connection extends EventEmitter {
}
}

// Returns false to apply backpressure.
sendDataToTokenStreamParser(data) {
return this.tokenStreamParser.addBuffer(data);
}

// This is an internal method that is called from Request.pause().
// It has to check whether the passed Request object represents the currently
// active request, because the application might have called Request.pause()
// on an old inactive Request object.
pauseRequest(request) {
if (this.isRequestActive(request)) {
this.tokenStreamParser.pause();
}
}

// This is an internal method that is called from Request.resume().
resumeRequest(request) {
if (this.isRequestActive(request)) {
this.tokenStreamParser.resume();
}
}

// Returns true if the passed request is the currently active request of the connection.
isRequestActive(request) {
return request === this.request && this.state === this.STATE.SENT_CLIENT_REQUEST;
}

sendInitialSql() {
const payload = new SqlBatchPayload(this.getInitialSql(), this.currentTransactionDescriptor(), this.config.options);
return this.messageIo.sendMessage(TYPE.SQL_BATCH, payload.data);
Expand Down Expand Up @@ -1266,6 +1303,7 @@ class Connection extends EventEmitter {
}

this.request = request;
this.request.connection = this;
this.request.rowCount = 0;
this.request.rows = [];
this.request.rst = [];
Expand All @@ -1275,7 +1313,10 @@ class Connection extends EventEmitter {
this.debug.payload(function() {
return payload.toString(' ');
});
return this.transitionTo(this.STATE.SENT_CLIENT_REQUEST);
this.transitionTo(this.STATE.SENT_CLIENT_REQUEST);
if (request.paused) { // Request.pause() has been called before the request was started
this.pauseRequest(request);
}
}
}

Expand Down Expand Up @@ -1561,8 +1602,12 @@ Connection.prototype.STATE = {
},
SENT_CLIENT_REQUEST: {
name: 'SentClientRequest',
exit: function() {
exit: function(nextState) {
this.clearRequestTimer();

if (nextState !== this.STATE.FINAL) {
this.tokenStreamParser.resume();
}
},
events: {
socketError: function(err) {
Expand All @@ -1573,9 +1618,20 @@ Connection.prototype.STATE = {
},
data: function(data) {
this.clearRequestTimer(); // request timer is stopped on first data package
return this.sendDataToTokenStreamParser(data);
const ret = this.sendDataToTokenStreamParser(data);
if (ret === false) {
// Bridge backpressure from the token stream parser transform to the
// packet stream transform.
this.messageIo.pause();
}
},
message: function() {
// We have to channel the 'message' (EOM) event through the token stream
// parser transform, to keep it in line with the flow of the tokens, when
// the incoming data flow is paused and resumed.
return this.tokenStreamParser.addEndOfMessageMarker();
},
endOfMessageMarkerReceived: function() {
this.transitionTo(this.STATE.LOGGED_IN);
const sqlRequest = this.request;
this.request = undefined;
Expand Down
10 changes: 10 additions & 0 deletions src/message-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,14 @@ module.exports = class MessageIO extends EventEmitter {
this.debug.packet(direction, packet);
return this.debug.data(packet);
}

// Temporarily suspends the flow of incoming packets.
pause() {
this.packetStream.pause();
}

// Resumes the flow of incoming packets.
resume() {
this.packetStream.resume();
}
};
28 changes: 26 additions & 2 deletions src/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ module.exports = class Request extends EventEmitter {
super();

this.sqlTextOrProcedure = sqlTextOrProcedure;
this.callback = callback;
this.parameters = [];
this.parametersByName = {};
this.userCallback = this.callback;
this.canceled = false;
this.paused = false;
this.userCallback = callback;
this.callback = function() {
if (this.preparing) {
this.emit('prepared');
Expand Down Expand Up @@ -134,4 +135,27 @@ module.exports = class Request extends EventEmitter {
}
return null;
}

// Temporarily suspends the flow of data from the database.
// No more 'row' events will be emitted until resume() is called.
pause() {
if (this.paused) {
return;
}
this.paused = true;
if (this.connection) {
this.connection.pauseRequest(this);
}
}

// Resumes the flow of data from the database.
resume() {
if (!this.paused) {
return;
}
this.paused = false;
if (this.connection) {
this.connection.resumeRequest(this);
}
}
};
8 changes: 8 additions & 0 deletions src/token/stream-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module.exports = class Parser extends Transform {
this.debug = debug;
this.colMetadata = colMetadata;
this.options = options;
this.endOfMessageMarker = {};

this.buffer = new Buffer(0);
this.position = 0;
Expand All @@ -33,6 +34,13 @@ module.exports = class Parser extends Transform {
}

_transform(input, encoding, done) {
if (input === this.endOfMessageMarker) {
done(null, { // generate endOfMessage pseudo token
name: 'EOM',
event: 'endOfMessage'
});
return;
}
if (this.position === this.buffer.length) {
this.buffer = input;
} else {
Expand Down
22 changes: 22 additions & 0 deletions src/token/token-stream-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,36 @@ class Parser extends EventEmitter {
this.emit(token.event, token);
}
});
this.parser.on('drain', () => {
this.emit('drain');
});
}

// Returns false to apply backpressure.
addBuffer(buffer) {
return this.parser.write(buffer);
}

// Writes an end-of-message (EOM) marker into the parser transform input
// queue. StreamParser will emit a 'data' event with an 'endOfMessage'
// pseudo token when the EOM marker has passed through the transform stream.
// Returns false to apply backpressure.
addEndOfMessageMarker() {
return this.parser.write(this.parser.endOfMessageMarker);
}

isEnd() {
return this.parser.buffer.length === this.parser.position;
}

// Temporarily suspends the token stream parser transform from emitting events.
pause() {
this.parser.pause();
}

// Resumes the token stream parser transform.
resume() {
this.parser.resume();
}
}
module.exports.Parser = Parser;
Loading

0 comments on commit 9ef0d58

Please sign in to comment.