Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle nsqd disconnects #16

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ Events:

- `message` (msg) incoming message
- `discard` (msg) discarded message
- `error lookup` (err) response from nsqlookup
- `error response` (err) response from nsq
- `error` (err)
- `subscribed` (topic) name of the subscribed topic
Expand Down
11 changes: 6 additions & 5 deletions lib/mixins/reconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ module.exports = (conn, max) => {

function reconnect() {
if (backoff.attempts >= max) {
return debug('%s - reconnection attempts exceeded', conn.addr);
debug('%s - reconnection attempts exceeded', conn.addr);

conn.emit('disconnect');

return;
}

const ms = backoff.duration();

debug('%s - reconnection attempt (%s/%s) in %dms', conn.addr, backoff.attempts, max, ms);

setTimeout(() => {
conn.connect();
conn.emit('reconnect', conn);
}, ms);
setTimeout(() => conn.connect(), ms);
}
};
24 changes: 9 additions & 15 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Reader extends EventEmitter {

// Initialize properties.
this.trace = options.trace || function() {};
this.maxConnectionAttempts = options.maxConnectionAttempts || Infinity;
this.maxConnectionAttempts = options.maxConnectionAttempts ?? Infinity;
this.pollInterval = options.pollInterval || 20000;
this.maxAttempts = options.maxAttempts || Infinity;
this.maxInFlight = options.maxInFlight || 10;
Expand Down Expand Up @@ -110,30 +110,18 @@ class Reader extends EventEmitter {
debug('polling every %dms', this.pollInterval);

this.timer = setInterval(() => {
this.lookup((errors, nodes) => {
if (!nodes) {
return debug('no nodes returned');
}

this.lookup((errors, nodes = []) => {
if (errors) {
debug('errors %j', errors);

for (const error of errors) {
this.emit('error', error);
this.emit('error lookup', error);
}
}

for (const node of nodes) {
this.connectTo(node);
}

for (const conn of this.conns) {
if (nodes.includes(conn.addr)) {
debug('conn valid', conn.addr);
} else {
debug('conn invalid', conn.addr);
}
}
});
}, this.pollInterval);
}
Expand Down Expand Up @@ -210,6 +198,12 @@ class Reader extends EventEmitter {
}
});

// Handle disconnection.
conn.on('disconnect', () => {
this.remove(conn);
this.distributeMaxInFlight();
});

// Connect to the nsqd node.
conn.connect(err => {
if (err) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"eslint": "^8.56.0",
"jstrace": "^0.3.0",
"mocha": "^3.1.0",
"sinon": "^1.17.7",
"sinon": "^17.0.1",
"superagent": "~3.8.3",
"uid": "^0.0.2"
},
Expand Down
97 changes: 74 additions & 23 deletions test/acceptance/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
*/

const assert = require('node:assert');
const nsq = require('../..');
const Reader = require('../../lib/reader');
const Writer = require('../../lib/writer');
const sinon = require('sinon');
const uid = require('uid');
const utils = require('../utils');

Expand All @@ -24,8 +26,8 @@ describe('Acceptance: Reader', () => {
describe('constructor()', () => {
describe('with nsqd addresses', () => {
it('should subscribe to messages', done => {
const pub = nsq.writer();
const sub = nsq.reader({
const pub = new Writer();
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150']
Expand All @@ -36,7 +38,7 @@ describe('Acceptance: Reader', () => {
});

it('should connect after event handlers are added', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150']
Expand All @@ -50,8 +52,8 @@ describe('Acceptance: Reader', () => {

describe('with nsqlookupd addresses', () => {
it('should subscribe to messages', done => {
const pub = nsq.writer();
const sub = nsq.reader({
const pub = new Writer();
const sub = new Reader({
topic,
channel: 'reader',
nsqlookupd: ['127.0.0.1:4161'],
Expand All @@ -63,7 +65,7 @@ describe('Acceptance: Reader', () => {
});

it('should connect after event handlers are added', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqlookupd: ['127.0.0.1:4161']
Expand All @@ -75,7 +77,7 @@ describe('Acceptance: Reader', () => {
});

it('should set timer attribute for lookup polling', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqlookupd: ['127.0.0.1:4161']
Expand All @@ -89,8 +91,8 @@ describe('Acceptance: Reader', () => {
});

it('should discard messages after the max attempts', done => {
const pub = nsq.writer();
const sub = nsq.reader({
const pub = new Writer();
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
Expand All @@ -115,8 +117,8 @@ describe('Acceptance: Reader', () => {
});

it('should re-receive the message after calling requeue', done => {
const pub = nsq.writer();
const sub = nsq.reader({
const pub = new Writer();
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
Expand Down Expand Up @@ -146,8 +148,8 @@ describe('Acceptance: Reader', () => {
});

it('should wait for in-flight messages and emit "close"', done => {
const pub = nsq.writer();
const sub = nsq.reader({
const pub = new Writer();
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
Expand Down Expand Up @@ -177,8 +179,8 @@ describe('Acceptance: Reader', () => {
});

it('should wait for pending messages and invoke the callback', done => {
const pub = nsq.writer();
const sub = nsq.reader({
const pub = new Writer();
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
Expand Down Expand Up @@ -208,7 +210,7 @@ describe('Acceptance: Reader', () => {
});

it('should close if there are no in-flight messages', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
Expand All @@ -219,7 +221,7 @@ describe('Acceptance: Reader', () => {
});

it('should stop polling nsqlookupd if reader had been closed', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqlookupd: ['127.0.0.1:4161'],
Expand All @@ -239,7 +241,7 @@ describe('Acceptance: Reader', () => {

describe('end()', () => {
it('should end if there are no connections', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqd: [],
Expand All @@ -249,7 +251,7 @@ describe('Acceptance: Reader', () => {
});

it('should end all connections', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150']
Expand All @@ -265,9 +267,9 @@ describe('Acceptance: Reader', () => {
});

it('should end all connections even if there are in-flight messages', done => {
const pub = nsq.writer();
const pub = new Writer();

const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
Expand All @@ -294,7 +296,7 @@ describe('Acceptance: Reader', () => {
});

it('should stop polling nsqlookupd if reader had been ended', done => {
const sub = nsq.reader({
const sub = new Reader({
topic,
channel: 'reader',
nsqlookupd: ['127.0.0.1:4161'],
Expand All @@ -311,5 +313,54 @@ describe('Acceptance: Reader', () => {
setTimeout(done, 100);
});
});

describe('reader lifecycle', () => {
beforeEach(done => {
utils.createTopic(topic, done);
});

it('should remove connections that disconnect', done => {
const sub = new Reader({
topic,
channel: 'reader',
nsqd: ['127.0.0.1:4150'],
maxConnectionAttempts: 0
});

sub.on('subscribed', () => {
assert.equal(sub.conns.size, 1);

sub.conns.forEach(conn => {
conn.on('disconnect', () => {
assert.equal(sub.conns.size, 0);

done();
});

conn.sock.end();
});
});
});

it('should emit lookup errors', done => {
const sub = new Reader({
topic,
channel: 'reader',
nsqlookupd: ['127.0.0.1:4161'],
pollInterval: 10
});

sinon.stub(sub, 'lookup').onSecondCall().callsFake(fn => {
fn([new Error('foo')]);
});

sub.on('error lookup', error => {
assert.equal(error.message, 'foo');

done();
});

});
});
});
});
35 changes: 13 additions & 22 deletions test/unit/reconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,6 @@ const assert = require('node:assert');
const reconnect = require('../../lib/mixins/reconnect');

describe('reconnect()', () => {
it('should emit "reconnect"', done => {
const conn = new EventEmitter();
let called = false;

conn.connect = () => {
called = true;
};

reconnect(conn);

conn.emit('connect');
conn.emit('close');

conn.on('reconnect', c => {
assert.equal(called, true);
assert.deepEqual(c, conn);
done();
});

conn.emit('connect');
});

it('should reconnect on close', done => {
const conn = new EventEmitter();

Expand All @@ -54,4 +32,17 @@ describe('reconnect()', () => {

process.nextTick(done);
});

it('should emit "disconnect" if the max number of reconnection attempts is reached', done => {
const conn = new EventEmitter();

conn.connect = () => {};

reconnect(conn, 1);

conn.on('disconnect', done);

conn.emit('close');
conn.emit('close');
});
});
Loading
Loading