Skip to content

Commit

Permalink
Make sure timers is cleared on exit
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Feb 2, 2022
1 parent bf3bec7 commit 6e23e0c
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 29 deletions.
40 changes: 21 additions & 19 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Cluster extends EventEmitter {
private isRefreshing = false;
public isCluster = true;
private _autoPipelines: Map<string, typeof Pipeline> = new Map();
private _groupsIds: {[key: string]: number} = {};
private _groupsIds: { [key: string]: number } = {};
private _groupsBySlot: number[] = Array(16384);
private _runningAutoPipelines: Set<string> = new Set();
private _readyDelayedCallbacks: CallbackFunction[] = [];
Expand Down Expand Up @@ -190,8 +190,10 @@ class Cluster extends EventEmitter {
return;
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);
if (this._addedScriptHashesCleanInterval) {
// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);
}

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
Expand Down Expand Up @@ -272,12 +274,12 @@ class Cluster extends EventEmitter {
this.once("close", this.handleCloseEvent.bind(this));

this.refreshSlotsCache(
function (err) {
if (err && err.message === "Failed to refresh slots cache.") {
Redis.prototype.silentEmit.call(this, "error", err);
this.connectionPool.reset([]);
}
}.bind(this)
function (err) {
if (err && err.message === "Failed to refresh slots cache.") {
Redis.prototype.silentEmit.call(this, "error", err);
this.connectionPool.reset([]);
}
}.bind(this)
);
this.subscriber.start();
})
Expand All @@ -300,6 +302,11 @@ class Cluster extends EventEmitter {
if (reason) {
debug("closed because %s", reason);
}

if (this._addedScriptHashesCleanInterval) {
clearInterval(this._addedScriptHashesCleanInterval);
}

let retryDelay;
if (
!this.manuallyClosing &&
Expand Down Expand Up @@ -339,9 +346,6 @@ class Cluster extends EventEmitter {
const status = this.status;
this.setStatus("disconnecting");

clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;

if (!reconnect) {
this.manuallyClosing = true;
}
Expand Down Expand Up @@ -372,9 +376,6 @@ class Cluster extends EventEmitter {
const status = this.status;
this.setStatus("disconnecting");

clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;

this.manuallyClosing = true;

if (this.reconnectTimeout) {
Expand Down Expand Up @@ -632,7 +633,8 @@ class Cluster extends EventEmitter {
} else {
_this.slots[slot] = [key];
}
_this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')];
_this._groupsBySlot[slot] =
_this._groupsIds[_this.slots[slot].join(";")];
_this.connectionPool.findOrCreate(_this.natMapper(key));
tryConnection();
debug("refreshing slot caches... (triggered by MOVED error)");
Expand Down Expand Up @@ -867,14 +869,14 @@ class Cluster extends EventEmitter {
}

// Assign to each node keys a numeric value to make autopipeline comparison faster.
this._groupsIds = Object.create(null);
this._groupsIds = Object.create(null);
let j = 0;
for (let i = 0; i < 16384; i++) {
const target = (this.slots[i] || []).join(';');
const target = (this.slots[i] || []).join(";");

if (!target.length) {
this._groupsBySlot[i] = undefined;
continue;
continue;
}

if (!this._groupsIds[target]) {
Expand Down
5 changes: 5 additions & 0 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ export function closeHandler(self) {
abortTransactionFragments(self.offlineQueue);
}

if (self._addedScriptHashesCleanInterval) {
clearInterval(self._addedScriptHashesCleanInterval);
self._addedScriptHashesCleanInterval = null;
}

if (self.manuallyClosing) {
self.manuallyClosing = false;
debug("skip reconnecting since the connection is manually closed.");
Expand Down
14 changes: 4 additions & 10 deletions lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,10 @@ Redis.prototype.connect = function (callback) {
return;
}

// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);
if (this._addedScriptHashesCleanInterval) {
// Make sure only one timer is active at a time
clearInterval(this._addedScriptHashesCleanInterval);
}

// Start the script cache cleaning
this._addedScriptHashesCleanInterval = setInterval(() => {
Expand Down Expand Up @@ -436,9 +438,6 @@ Redis.prototype.connect = function (callback) {
* @public
*/
Redis.prototype.disconnect = function (reconnect) {
clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;

if (!reconnect) {
this.manuallyClosing = true;
}
Expand Down Expand Up @@ -741,11 +740,6 @@ Redis.prototype.sendCommand = function (command: Command, stream: NetStream) {
command.setTimeout(this.options.commandTimeout);
}

if (command.name === "quit") {
clearInterval(this._addedScriptHashesCleanInterval);
this._addedScriptHashesCleanInterval = null;
}

let writable =
this.status === "ready" ||
(!stream &&
Expand Down
51 changes: 51 additions & 0 deletions test/functional/cluster/disconnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import Redis from "../../../lib/redis";
import * as sinon from "sinon";
import { expect } from "chai";
import { Cluster } from "../../../lib";
import MockServer from "../../helpers/mock_server";

describe("disconnection", function () {
afterEach(() => {
sinon.restore();
});

it("should clear all timers on disconnect", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const cluster = new Cluster([{ host: "127.0.0.1", port: "30000" }]);
cluster.on("connect", function () {
cluster.disconnect();
});

cluster.on("end", function () {
setTimeout(() => {
// wait for disconnect with refresher.
expect(setIntervalCalls.callCount).to.equal(
clearIntervalCalls.callCount
);
server.disconnect();
done();
}, 500);
});
});

it("should clear all timers on server exits", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const cluster = new Cluster([{ host: "127.0.0.1", port: "30000" }], {
clusterRetryStrategy: null,
});
cluster.on("end", function () {
expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount);
done();
});

server.disconnect();
});
});
46 changes: 46 additions & 0 deletions test/functional/disconnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import Redis from "../../lib/redis";
import * as sinon from "sinon";
import { expect } from "chai";
import MockServer from "../helpers/mock_server";

describe("disconnection", function () {
afterEach(() => {
sinon.restore();
});

it("should clear all timers on disconnect", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const redis = new Redis({});
redis.on("connect", function () {
redis.disconnect();
});

redis.on("end", function () {
expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount);
server.disconnect();
done();
});
});

it("should clear all timers on server exits", function (done) {
const server = new MockServer(30000);

const setIntervalCalls = sinon.spy(global, "setInterval");
const clearIntervalCalls = sinon.spy(global, "clearInterval");

const redis = new Redis({
port: 30000,
retryStrategy: null,
});
redis.on("end", function () {
expect(setIntervalCalls.callCount).to.equal(clearIntervalCalls.callCount);
done();
});

server.disconnect();
});
});

0 comments on commit 6e23e0c

Please sign in to comment.