diff --git a/lib/client.js b/lib/client.js index dfce3937f..53ac81035 100644 --- a/lib/client.js +++ b/lib/client.js @@ -27,6 +27,8 @@ var Client = function(config) { ssl: this.connectionParameters.ssl }); this.queryQueue = []; + this.sentQueryQueue = []; + this.sendImmediately = true; this.binary = c.binary || defaults.binary; this.encoding = 'utf8'; this.processID = null; @@ -154,6 +156,7 @@ Client.prototype.connect = function(callback) { var activeQuery = self.activeQuery; self.activeQuery = null; self.readyForQuery = true; + self.handshakeDone = true; self._pulseQueryQueue(); if(activeQuery) { activeQuery.handleReadyForQuery(); @@ -279,12 +282,24 @@ Client.prototype.escapeLiteral = function(str) { }; Client.prototype._pulseQueryQueue = function() { + if(!this.handshakeDone) + return; + + while((this.sendImmediately && !this.blocked) || (this.activeQuery === null && this.sentQueryQueue.length === 0)) { + var query = this.queryQueue.shift(); + if(!query) + break; + + query.submit(this.connection); + this.blocked = query.blocking; + this.sentQueryQueue.push(query); + } + if(this.readyForQuery===true) { - this.activeQuery = this.queryQueue.shift(); + this.activeQuery = this.sentQueryQueue.shift(); if(this.activeQuery) { this.readyForQuery = false; this.hasExecuted = true; - this.activeQuery.submit(this.connection); } else if(this.hasExecuted) { this.activeQuery = null; this.emit('drain'); @@ -292,7 +307,7 @@ Client.prototype._pulseQueryQueue = function() { } }; -Client.prototype._copy = function (text, stream) { +Client.prototype._copy = function (text, stream, blocking) { var config = {}; config.text = text; config.stream = stream; @@ -304,14 +319,14 @@ Client.prototype._copy = function (text, stream) { } }; var query = new Query(config); + query.blocking = blocking; this.queryQueue.push(query); this._pulseQueryQueue(); return config.stream; - }; Client.prototype.copyFrom = function (text) { - return this._copy(text, new CopyFromStream()); + return this._copy(text, new CopyFromStream(), true); }; Client.prototype.copyTo = function (text) { diff --git a/test/unit/client/simple-query-tests.js b/test/unit/client/simple-query-tests.js index 19f05c186..220916a5e 100644 --- a/test/unit/client/simple-query-tests.js +++ b/test/unit/client/simple-query-tests.js @@ -31,6 +31,7 @@ test('executing query', function() { test("multiple in the queue", function() { var client = helper.client(); + client.sendImmediately = false; var connection = client.connection; var queries = connection.queries; client.query('one'); @@ -59,6 +60,38 @@ test('executing query', function() { assert.equal(queries[2], 'three'); }); }); + + test("multiple in the queue, sending immediately", function() { + var client = helper.client(); + client.sendImmediately = true; + var connection = client.connection; + var queries = connection.queries; + client.query('one'); + client.query('two'); + client.query('three'); + assert.empty(queries); + + test("after one ready for query",function() { + connection.emit('readyForQuery'); + assert.lengthIs(queries, 3); + assert.equal(queries[0], "one"); + }); + + test('after two ready for query', function() { + connection.emit('readyForQuery'); + assert.lengthIs(queries, 3); + }); + + test("after a bunch more", function() { + connection.emit('readyForQuery'); + connection.emit('readyForQuery'); + connection.emit('readyForQuery'); + assert.lengthIs(queries, 3); + assert.equal(queries[0], "one"); + assert.equal(queries[1], 'two'); + assert.equal(queries[2], 'three'); + }); + }); }); test("query event binding and flow", function() {