Skip to content

Commit

Permalink
Added live tail methods to signal flow client
Browse files Browse the repository at this point in the history
Add register live tail method

Dont use streamComputationWebsocket

revert style changes

one more minor style revert

removed unrequired method

cleaned up register live tail

Added un register live tail and more cleanup

Address feedback

fix typo

Fix stop live tail method

Added streamController and error handler

address Max's feedback

Add binary msg types and fix the failing test

cleanup and docs

remove trailing commas

Fix the trailing space

minor change to stop

retuen requestId from register live tail

cleanup callbacks and other minor fixes

bump version

remove older version checks from travis.yml

update to 7.1.0 instead
  • Loading branch information
ssingamneni-sfx committed Aug 26, 2020
1 parent 6f47885 commit 4762716
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 8 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,3 @@ language: node_js
node_js:
- "9.11"
- "8.11"
- "4.1"
- "4.0"
39 changes: 38 additions & 1 deletion lib/client/signalflow/request_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,48 @@ function RequestManager(options) {
//destroy all open SSE connections
}


function registerLiveTail(params, callback, overrideRequestId) {
var requestId = overrideRequestId || getRequestIdAndIncrement();
knownComputations[requestId] = {
onMessage: callback,
transport: transports.WEBSOCKET,
requestId: requestId,
streamController: {
retry: function () {
registerLiveTail(params, callback, overrideRequestId);
},
stop: function () {
stopLiveTail(requestId);
}
}
};
activeSocket.send(JSON.stringify({
type: 'registerlivetail',
channel: requestId,
query: params.query,
throttleOptions: params.throttleOptions
}));
return requestId;
}

function stopLiveTail(requestId) {
if (removeComputation(requestId)) {
activeSocket.send(JSON.stringify({
type: 'stoplivetail',
channel: requestId,
reason: 'Close live tail connection from FE'
}));
}
}

return {
authenticate: authenticate,
execute: execute,
stop: stop,
disconnect: disconnect
disconnect: disconnect,
registerLiveTail: registerLiveTail,
stopLiveTail: stopLiveTail
};
}

Expand Down
50 changes: 49 additions & 1 deletion lib/client/signalflow/signalflow_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,54 @@ function SignalflowClient(apiToken, options) {
}
};
}
/**
* This is a method on SignalflowClient that registers a live tail and exposes the stream and close methos to
* get live tail data and to stop a live tail.
* @param opts - {
* query: {
* matcher: {
* params: {
* op: string,
* args: object
* },
* }
* },
* throttleOptions: { rate: number }
* }
*/
function liveTailRequest(opts) {
var msgBuffer = [];
var callback = null;

function resolveMessage(msg) {
if (!callback) {
msgBuffer.push(msg);
return;
}
if (msg.type === 'error') {
callback(msg, null);
} else {
callback(null, msg);
}
}

var requestId = rm.registerLiveTail(opts, resolveMessage, null);

return {
close: function () {
return rm.stopLiveTail(requestId);
},
stream: function (fn) {
if (typeof fn !== 'function') {
return false;
}
callback = fn;
msgBuffer.forEach(resolveMessage);
msgBuffer = [];
return true;
}
};
}
return {
disconnect: disconnect,
execute: function (opts) {
Expand All @@ -84,7 +131,8 @@ function SignalflowClient(apiToken, options) {
},
preflight: function (opts) {
return signalflowRequest(opts, 'preflight');
}
},
livetail: liveTailRequest
};
}

Expand Down
4 changes: 3 additions & 1 deletion lib/client/signalflow/websocket_message_parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ var binaryMessageTypes = {
7: 'authenticated',
8: 'computation-started',
9: 'estimation',
10: 'expired-tsid'
10: 'expired-tsid',
11: 'log-data',
12: 'livetail-started'
};

var binaryDataMessageFormats = {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "signalfx",
"version": "7.0.1",
"version": "7.1.0",
"description": "Node.js client library for SignalFx",
"homepage": "https://signalfx.com",
"repository": "https://github.com/signalfx/signalfx-nodejs",
Expand Down
6 changes: 4 additions & 2 deletions test/signalflow/request_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ var expect = require('chai').expect;
describe('The SignalFlow Request Manager', function () {
//TODO : re-enable this test once we figure out why it causes a hang
var client = sflowclient('AUTHTOKEN');
it('should initialize a request manager with three methods, execute, stop, authenticate', function (done) {
it('should initialize a request manager with six methods, execute, stop, authenticate, disconnect, registerLiveTail and stopLiveTail', function (done) {
var exposedFns = Object.keys(client);
expect(exposedFns.length).to.equal(4);
expect(exposedFns.length).to.equal(6);
expect(exposedFns.indexOf('execute')).to.not.be.equal(-1);
expect(exposedFns.indexOf('stop')).to.not.be.equal(-1);
expect(exposedFns.indexOf('authenticate')).to.not.be.equal(-1);
expect(exposedFns.indexOf('disconnect')).to.not.be.equal(-1);
expect(exposedFns.indexOf('registerLiveTail')).to.not.be.equal(-1);
expect(exposedFns.indexOf('stopLiveTail')).to.not.be.equal(-1);
done();
});

Expand Down

0 comments on commit 4762716

Please sign in to comment.