Skip to content

Commit

Permalink
Merge pull request #13 from pipedrive/listeners
Browse files Browse the repository at this point in the history
Add alpha version of channel support for live listeners
  • Loading branch information
martintajur committed Jun 12, 2015
2 parents 756e8d5 + 5a1f388 commit 5f8c50a
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
node_modules
.project
.idea
.DS_Store
npm-debug*
test
21 changes: 21 additions & 0 deletions examples/live-updates.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
if (!process.argv[2]) {
process.stderr.write('Please provide API token!' + "\n");
process.exit();
}

var Pipedrive = require(__dirname + '/../index');
var pipedrive = new Pipedrive.Client(process.argv[2], { strictMode: true });
var _ = require('lodash');


var start = Date.now();

pipedrive.on('deal.added', function(event, data) {
console.log('Deal ' + event.meta.id + ' was added ('+data.current.title+', worth '+data.current.value+' '+data.current.currency+')');
pipedrive.removeAllListeners();
process.exit();
});

pipedrive.on('connect', function() {
pipedrive.Deals.add({ title: 'Live deal', value: 10000, currency: 'EUR' });
});
20 changes: 17 additions & 3 deletions lib/Pipedrive.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var _ = require('lodash'),
qs = require('qs'),
rest = require('./restler'),
inflection = require('./inflection'),
Channel = require('./channel'),
protocol = process.env.PIPEDRIVE_API_PROTOCOL || 'https',
host = process.env.PIPEDRIVE_API_HOST || 'api.pipedrive.com',
version = process.env.PIPEDRIVE_API_VERSION || 'v1',
Expand Down Expand Up @@ -156,13 +157,19 @@ exports.authenticate = function(auth, callback) {
}, false);
};

exports.Client = function(apiToken, strictMode) {
exports.Client = function(apiToken, options) {
if (!apiToken) {
throw new Error('Could not instantiate Pipedrive API Client - apiToken not given.');
}

var that = this;
strict = !!strictMode;
if (!options) {
options = { strictMode: false };
}

var that = this,
strict = !!options.strictMode;

var listener = new Channel(apiToken);

_.each(apiObjects, function(item) {
that[item.substr(0,1).toUpperCase() + item.substr(1)] = new Collection(item, apiToken);
Expand All @@ -187,6 +194,13 @@ exports.Client = function(apiToken, strictMode) {
return returnVal;
};

if (strict) {
// in strict mode, we'll expose the event hub integration methods
this.on = listener.on;
this.removeListener = listener.removeListener;
this.removeAllListeners = listener.removeAllListeners;
}

return this;
};

Expand Down
150 changes: 150 additions & 0 deletions lib/channel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
'use strict';

var SockJS = require('sockjs-client-node'),
fetch = require('fetch'),
_ = require('lodash');

module.exports = Channel;

function Channel(apiToken) {

var handlers = {},
self = this,
client = null,
clientStarted = false,
clientClosed = false;

this.startClient = function() {
if (clientStarted) {
return;
}
clientStarted = true;
clientClosed = false;

client = new SockJS((process.env.PIPEDRIVE_API_PROTOCOL || 'https')+'://'+(process.env.PIPEDRIVE_CHANNEL_HOST || 'channel.pipedrive.com')+'/sockjs');

client.onopen = function () {
var options = {
rejectUnauthorized: false
};
fetch.fetchUrl((process.env.PIPEDRIVE_API_PROTOCOL || 'https')+'://'+(process.env.PIPEDRIVE_API_HOST || 'api.pipedrive.com')+'/v1/authorizations/nonce?api_token=' + encodeURIComponent(apiToken), options, function(error, meta, body) {
var data = {};
try {
data = JSON.parse(body);
if (data.data) {
data = data.data;
}
}
catch (e) {
throw new Error('Could not parse API response');
}

if (data && data.nonce) {
client.send(JSON.stringify({
company_id: data.company_id,
user_id: data.user_id,
user_name: 'client-nodejs-user',
host: 'app.pipedrive.com',
timestamp: Math.round(new Date().getTime() / 1000),
nonce: data.nonce
}));
}
else {
throw new Error('Authorization failed');
}
});
};
client.onmessage = function (msg) {
if (msg && msg.type === 'message') {
var data = {},
eventPatterns = [];

try {
data = JSON.parse(msg.data);
}
catch (e) {
throw new Error('Malformed JSON received from socket');
}

if (data && data.meta && data.meta.v === 1) {

eventPatterns = [
data.meta.object + '.' + data.meta.action,
'*.' + data.meta.action,
data.meta.object + '.*',
'*.*'
];

_.each(eventPatterns, function(pattern) {
if (handlers[pattern]) {
_.each(handlers[pattern], function(handler) {
handler(data, data.data);
});
}
});
}

if (data.rabbitStateChange === 'open') {
if (handlers['connect']) {
_.each(handlers['connect'], function(handler) {
handler();
});
}
}
}
};
client.onclose = function (e) {
if (!clientClosed) {
// not closed by user - we have some connection error.
self.restartClient();
return;
}

clientStarted = false;
if (handlers['close']) {
_.each(handlers['close'], function(handler) {
handler(e);
});
}
};
};

this.restartClient = function() {
client.onopen = null;
client.onclose = null;
client.onmessage = null;
client = null;

clientStarted = false;

setTimeout(self.startClient, (1+Math.random()*4)*1000);
};

this.on = function(method, handler) {
if (!clientStarted) {
self.startClient();
}
handlers[method] = handlers[method] || [];
handlers[method].push(handler);
};

this.removeListener = function(method, handler) {
var index = handlers[method].indexOf(handler);
if (index > -1) {
handlers[method].splice(index, 1);
}
if (!_.keys(handlers).length) {
this.removeAllListeners();
}
};

this.removeAllListeners = function() {
handlers = {};
clientClosed = true;

if (client && client.close) {
client.close();
}
}

}
50 changes: 30 additions & 20 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
{
"name": "pipedrive",
"version": "1.7.3",
"description": "Pipedrive REST client for NodeJS",
"keywords":["pipedrive","CRM","sales","contacts","customers","deals","pipeline","sales pipeline"],
"homepage": "https://github.com/pipedrive/client-nodejs",
"main":"./lib/Pipedrive",
"engines": {
"node": ">= 0.8.x"
},
"repository": {
"type": "git",
"url": "http://github.com/pipedrive/client-nodejs.git"
},
"dependencies": {
"qs": "~2.1.0",
"lodash": "~2.4.1",
"fetch": "~0.3.6",
"mime": "~1.2.11",
"async": "~0.9.0"
}
"name": "pipedrive",
"version": "2.0.0",
"description": "Pipedrive REST client for NodeJS",
"keywords": [
"pipedrive",
"CRM",
"sales",
"contacts",
"customers",
"deals",
"pipeline",
"sales pipeline"
],
"homepage": "https://github.com/pipedrive/client-nodejs",
"main": "./lib/Pipedrive",
"engines": {
"node": ">= 0.8.x"
},
"repository": {
"type": "git",
"url": "http://github.com/pipedrive/client-nodejs.git"
},
"dependencies": {
"async": "~0.9.0",
"fetch": "~0.3.6",
"lodash": "~2.4.1",
"mime": "~1.2.11",
"qs": "~2.1.0",
"sockjs-client-node": "^0.2.1"
}
}

0 comments on commit 5f8c50a

Please sign in to comment.