Skip to content

Commit

Permalink
Merge pull request #17 from haraka/v1.0.9
Browse files Browse the repository at this point in the history
improve connection failure handling
  • Loading branch information
msimerson authored Feb 22, 2019
2 parents 72989ea + 1e7750b commit d3d8a04
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .codeclimate.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
engines:
engines:
eslint:
enabled: true
channel: "eslint-3"
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ jspm_packages

# Optional REPL history
.node_repl_history

package-lock.json
9 changes: 8 additions & 1 deletion Changes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@

# 1.0.8 - 2017-12-30
# 1.0.9 - 2019-02-19

- bump redis version to 2.8.0
- emit error message if redis connection fails
- add 3s timeout for subscribe connects: minimize connections stalls
- add es6 template literals

# 1.0.8 - 2018-01-03

- upon punsubscribe, `quit()` (disconnect) redis client

Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: 1.0.{build}

environment:
nodejs_version: "6"
nodejs_version: "8"

# Install scripts. (runs after repo cloning)
install:
Expand Down
111 changes: 74 additions & 37 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ exports.register = function () {

plugin.load_redis_ini();

// some other plugin doing: inherits('haraka-plugin-redis')
// another plugin doing: inherits('haraka-plugin-redis')
if (plugin.name !== 'redis') return;

// do register these when 'redis' is declared in config/plugins
plugin.register_hook('init_master', 'init_redis_shared');
plugin.register_hook('init_child', 'init_redis_shared');
};
}

exports.load_redis_ini = function () {
const plugin = this;
Expand All @@ -40,7 +40,7 @@ exports.load_redis_ini = function () {
Object.keys(plugin.redisCfg.opts).forEach(opt => {
if (ps[opt] === undefined) ps[opt] = plugin.redisCfg.opts[opt];
});
};
}

exports.merge_redis_ini = function () {
const plugin = this;
Expand All @@ -64,7 +64,7 @@ exports.init_redis_shared = function (next, server) {

let calledNext = false;
function nextOnce (e) {
if (e) plugin.logerror('Redis error: ' + e.message);
if (e) plugin.logerror(`Redis error: ${e.message}`);
if (calledNext) return;
calledNext = true;
next();
Expand All @@ -86,7 +86,7 @@ exports.init_redis_shared = function (next, server) {
opts.port = plugin.redisCfg.server.port;
server.notes.redis = plugin.get_redis_client(opts, nextOnce);
}
};
}

exports.init_redis_plugin = function (next, server) {
const plugin = this;
Expand Down Expand Up @@ -115,7 +115,7 @@ exports.init_redis_plugin = function (next, server) {
}

plugin.db = plugin.get_redis_client(plugin.cfg.redis, nextOnce);
};
}

exports.shutdown = function () {
if (this.db) this.db.quit();
Expand All @@ -127,96 +127,133 @@ exports.shutdown = function () {

exports.redis_ping = function (done) {
const plugin = this;
const nope = function (err) {

function nope (err) {
if (err) plugin.logerror(err.message);
plugin.redis_pings=false;
done(err);
};
}

if (!plugin.db) {
return nope(new Error('redis not initialized'));
}

plugin.db.ping(function (err, res) {
plugin.db.ping((err, res) => {
if (err) return nope(err);
if (res !== 'PONG') return nope(new Error('not PONG'));
plugin.redis_pings=true;
done(err, true);
});
};
}

function getUriStr (client, opts) {
let msg = `redis://${opts.host}:${opts.port}`;
if (opts.db) msg += `/${opts.db}`;
if (client && client.server_info && client.server_info.redis_version) {
msg += ` v${client.server_info.redis_version}`;
}
return msg;
}

exports.get_redis_client = function (opts, next) {
const plugin = this;

const client = redis.createClient(opts)
.on('error', (error) => {
next(error);
if (!opts.host) opts.host = 'localhost'
if (!opts.port) opts.port = '6379'

const client = redis.createClient(opts);
const urlStr = getUriStr(client, opts);

client
.on('error', (err) => {
plugin.logerror(err.message);
next(err);
})
.on('ready', () => {
let msg = 'connected to redis://' + opts.host + ':' + opts.port;
if (opts.db) msg += '/' + opts.db;
if (client.server_info && client.server_info.redis_version) {
msg += ' v' + client.server_info.redis_version;
}
plugin.loginfo(plugin, msg);
plugin.loginfo(plugin, `connected to ${urlStr}`);
next();
})
.on('end', () => {
if (arguments.length) console.log(arguments);
// plugin.logdebug('Redis client ended');
next();
plugin.loginfo(`Disconnected from ${urlStr}`);
});

return client;
};
}

exports.get_redis_pub_channel = function (conn) {
return 'result-' + conn.transaction ? conn.transaction.uuid : conn.uuid;
};
return `result-${conn.transaction ? conn.transaction.uuid : conn.uuid}`;
}

exports.get_redis_sub_channel = function (conn) {
return 'result-' + conn.uuid + '*';
};
return `result-${conn.uuid}*`;
}

exports.redis_subscribe_pattern = function (pattern, next) {
const plugin = this;

if (plugin.redis) return next(); // already subscribed?

plugin.redis = require('redis').createClient(plugin.redisCfg.pubsub)
.on('error', function (err) {
next(err.message);
})
.on('psubscribe', function (pattern2, count) {
plugin.logdebug(plugin, 'psubscribed to ' + pattern2);
plugin.logdebug(plugin, `psubscribed to ${pattern2}`);
next();
})
.on('punsubscribe', function (pattern3, count) {
plugin.logdebug(plugin, 'unsubsubscribed from ' + pattern3);
plugin.logdebug(plugin, `unsubsubscribed from ${pattern3}`);
connection.notes.redis.quit();
});

plugin.redis.psubscribe(pattern);
};
}

exports.redis_subscribe = function (connection, next) {
const plugin = this;

if (connection.notes.redis) {
connection.logdebug(plugin, `redis already subscribed`);
// another plugin has already called this. Do nothing
return next();
}

let calledNext = false;
function nextOnce (errMsg) {
if (calledNext) return;
calledNext = true;
if (errMsg && connection) connection.logerror(plugin, errMsg);
next();
}

const timer = setTimeout(() => {
nextOnce('redis psubscribe timed out');
}, 3 * 1000);

connection.notes.redis = require('redis').createClient(plugin.redisCfg.pubsub)
.on('error', (err) => {
clearTimeout(timer);
nextOnce(err.message);
})
.on('psubscribe', function (pattern, count) {
connection.logdebug(plugin, 'psubscribed to ' + pattern);
next();
clearTimeout(timer);
connection.logdebug(plugin, `psubscribed to ${pattern}`);
nextOnce();
})
.on('punsubscribe', function (pattern, count) {
connection.logdebug(plugin, 'unsubsubscribed from ' + pattern);
connection.logdebug(plugin, `unsubsubscribed from ${pattern}`);
connection.notes.redis.quit();
});

connection.notes.redis.psubscribe(plugin.get_redis_sub_channel(connection));
};
}

exports.redis_unsubscribe = function (connection) {
if (!connection.notes.redis) return;
connection.notes.redis.punsubscribe(this.get_redis_sub_channel(connection));
};
const plugin = this;

if (!connection.notes.redis) {
connection.logerror(plugin, `redis_unsubscribe called when no redis`)
return;
}
connection.notes.redis.punsubscribe(plugin.get_redis_sub_channel(connection));
}
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"name": "haraka-plugin-redis",
"version": "1.0.8",
"version": "1.0.9",
"description": "Redis plugin for Haraka & other plugins to inherit from",
"main": "index.js",
"directories": {
"test": "test"
},
"dependencies": {
"redis": "^2.7.1"
"redis": "^2.8.0"
},
"devDependencies": {
"eslint": "*",
Expand Down

0 comments on commit d3d8a04

Please sign in to comment.