diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 58b7c31e..20d1012e 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -69,7 +69,7 @@ class Cluster extends EventEmitter { private isRefreshing = false; public isCluster = true; private _autoPipelines: Map = new Map(); - private _groupsIds: {[key: string]: number} = {}; + private _groupsIds: { [key: string]: number } = {}; private _groupsBySlot: number[] = Array(16384); private _runningAutoPipelines: Set = new Set(); private _readyDelayedCallbacks: CallbackFunction[] = []; @@ -154,6 +154,13 @@ class Cluster extends EventEmitter { } } + clearAddedScriptHashesCleanInterval() { + if (this._addedScriptHashesCleanInterval) { + clearInterval(this._addedScriptHashesCleanInterval); + this._addedScriptHashesCleanInterval = null; + } + } + resetNodesRefreshInterval() { if (this.slotsTimer) { return; @@ -191,7 +198,7 @@ class Cluster extends EventEmitter { } // Make sure only one timer is active at a time - clearInterval(this._addedScriptHashesCleanInterval); + this.clearAddedScriptHashesCleanInterval(); // Start the script cache cleaning this._addedScriptHashesCleanInterval = setInterval(() => { @@ -272,12 +279,12 @@ class Cluster extends EventEmitter { this.once("close", this.handleCloseEvent.bind(this)); this.refreshSlotsCache( - function (err) { - if (err && err.message === "Failed to refresh slots cache.") { - Redis.prototype.silentEmit.call(this, "error", err); - this.connectionPool.reset([]); - } - }.bind(this) + function (err) { + if (err && err.message === "Failed to refresh slots cache.") { + Redis.prototype.silentEmit.call(this, "error", err); + this.connectionPool.reset([]); + } + }.bind(this) ); this.subscriber.start(); }) @@ -300,6 +307,9 @@ class Cluster extends EventEmitter { if (reason) { debug("closed because %s", reason); } + + this.clearAddedScriptHashesCleanInterval(); + let retryDelay; if ( !this.manuallyClosing && @@ -339,8 +349,7 @@ class Cluster extends EventEmitter { const status = this.status; this.setStatus("disconnecting"); - clearInterval(this._addedScriptHashesCleanInterval); - this._addedScriptHashesCleanInterval = null; + this.clearAddedScriptHashesCleanInterval(); if (!reconnect) { this.manuallyClosing = true; @@ -372,8 +381,7 @@ class Cluster extends EventEmitter { const status = this.status; this.setStatus("disconnecting"); - clearInterval(this._addedScriptHashesCleanInterval); - this._addedScriptHashesCleanInterval = null; + this.clearAddedScriptHashesCleanInterval(); this.manuallyClosing = true; @@ -632,7 +640,8 @@ class Cluster extends EventEmitter { } else { _this.slots[slot] = [key]; } - _this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')]; + _this._groupsBySlot[slot] = + _this._groupsIds[_this.slots[slot].join(";")]; _this.connectionPool.findOrCreate(_this.natMapper(key)); tryConnection(); debug("refreshing slot caches... (triggered by MOVED error)"); @@ -867,14 +876,14 @@ class Cluster extends EventEmitter { } // Assign to each node keys a numeric value to make autopipeline comparison faster. - this._groupsIds = Object.create(null); + this._groupsIds = Object.create(null); let j = 0; for (let i = 0; i < 16384; i++) { - const target = (this.slots[i] || []).join(';'); + const target = (this.slots[i] || []).join(";"); if (!target.length) { this._groupsBySlot[i] = undefined; - continue; + continue; } if (!this._groupsIds[target]) { diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 618d8aa2..68e1bc1b 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -170,6 +170,8 @@ export function closeHandler(self) { abortTransactionFragments(self.offlineQueue); } + self.clearAddedScriptHashesCleanInterval(); + if (self.manuallyClosing) { self.manuallyClosing = false; debug("skip reconnecting since the connection is manually closed."); diff --git a/lib/redis/index.ts b/lib/redis/index.ts index ac3dd1d3..84bb672b 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -290,6 +290,13 @@ Redis.prototype.setStatus = function (status, arg) { process.nextTick(this.emit.bind(this, status, arg)); }; +Redis.prototype.clearAddedScriptHashesCleanInterval = function () { + if (this._addedScriptHashesCleanInterval) { + clearInterval(this._addedScriptHashesCleanInterval); + this._addedScriptHashesCleanInterval = null; + } +}; + /** * Create a connection to Redis. * This method will be invoked automatically when creating a new Redis instance @@ -314,7 +321,7 @@ Redis.prototype.connect = function (callback) { } // Make sure only one timer is active at a time - clearInterval(this._addedScriptHashesCleanInterval); + this.clearAddedScriptHashesCleanInterval(); // Start the script cache cleaning this._addedScriptHashesCleanInterval = setInterval(() => { @@ -436,8 +443,7 @@ Redis.prototype.connect = function (callback) { * @public */ Redis.prototype.disconnect = function (reconnect) { - clearInterval(this._addedScriptHashesCleanInterval); - this._addedScriptHashesCleanInterval = null; + this.clearAddedScriptHashesCleanInterval(); if (!reconnect) { this.manuallyClosing = true; @@ -742,8 +748,7 @@ Redis.prototype.sendCommand = function (command: Command, stream: NetStream) { } if (command.name === "quit") { - clearInterval(this._addedScriptHashesCleanInterval); - this._addedScriptHashesCleanInterval = null; + this.clearAddedScriptHashesCleanInterval(); } let writable = diff --git a/test/functional/autopipelining.ts b/test/functional/autopipelining.ts index 60775281..648ac6ed 100644 --- a/test/functional/autopipelining.ts +++ b/test/functional/autopipelining.ts @@ -181,7 +181,7 @@ describe("autoPipelining for single node", function () { expect(redis.autoPipelineQueueSize).to.eql(1); redis.set("foo2", (err) => { - expect(err.message).to.eql( + expect(err.message).to.include( "ERR wrong number of arguments for 'set' command" ); done(); diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index be3a8a33..f9e52ca9 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -355,7 +355,7 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(1); cluster.set("foo5", (err) => { - expect(err.message).to.eql( + expect(err.message).to.include( "ERR wrong number of arguments for 'set' command" ); diff --git a/test/functional/cluster/disconnection.ts b/test/functional/cluster/disconnection.ts new file mode 100644 index 00000000..5ae47abc --- /dev/null +++ b/test/functional/cluster/disconnection.ts @@ -0,0 +1,51 @@ +import Redis from "../../../lib/redis"; +import * as sinon from "sinon"; +import { expect } from "chai"; +import { Cluster } from "../../../lib"; +import MockServer from "../../helpers/mock_server"; + +describe("disconnection", function () { + afterEach(() => { + sinon.restore(); + }); + + it("should clear all timers on disconnect", function (done) { + const server = new MockServer(30000); + + const setIntervalCalls = sinon.spy(global, "setInterval"); + const clearIntervalCalls = sinon.spy(global, "clearInterval"); + + const cluster = new Cluster([{ host: "127.0.0.1", port: "30000" }]); + cluster.on("connect", function () { + cluster.disconnect(); + }); + + cluster.on("end", function () { + setTimeout(() => { + // wait for disconnect with refresher. + expect(setIntervalCalls.callCount).to.equal( + clearIntervalCalls.callCount + ); + server.disconnect(); + done(); + }, 500); + }); + }); + + it("should clear all timers on server exits", function (done) { + const server = new MockServer(30000); + + const setIntervalCalls = sinon.spy(global, "setInterval"); + const clearIntervalCalls = sinon.spy(global, "clearInterval"); + + const cluster = new Cluster([{ host: "127.0.0.1", port: "30000" }], { + clusterRetryStrategy: null, + }); + cluster.on("end", function () { + expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount); + done(); + }); + + server.disconnect(); + }); +}); diff --git a/test/functional/disconnection.ts b/test/functional/disconnection.ts new file mode 100644 index 00000000..65905d06 --- /dev/null +++ b/test/functional/disconnection.ts @@ -0,0 +1,46 @@ +import Redis from "../../lib/redis"; +import * as sinon from "sinon"; +import { expect } from "chai"; +import MockServer from "../helpers/mock_server"; + +describe("disconnection", function () { + afterEach(() => { + sinon.restore(); + }); + + it("should clear all timers on disconnect", function (done) { + const server = new MockServer(30000); + + const setIntervalCalls = sinon.spy(global, "setInterval"); + const clearIntervalCalls = sinon.spy(global, "clearInterval"); + + const redis = new Redis({}); + redis.on("connect", function () { + redis.disconnect(); + }); + + redis.on("end", function () { + expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount); + server.disconnect(); + done(); + }); + }); + + it("should clear all timers on server exits", function (done) { + const server = new MockServer(30000); + + const setIntervalCalls = sinon.spy(global, "setInterval"); + const clearIntervalCalls = sinon.spy(global, "clearInterval"); + + const redis = new Redis({ + port: 30000, + retryStrategy: null, + }); + redis.on("end", function () { + expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount); + done(); + }); + + server.disconnect(); + }); +});