Skip to content

Commit 1f7987a

Browse files
Merge pull request #44 from dowjones/feature/DNASNS-2660/nodejs-stream-client-maintenance
[DNASNS-2660] NodeJS stream client maintenance
2 parents e39f2b2 + dcee414 commit 1f7987a

21 files changed

+2397
-5152
lines changed

.eslintrc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
2-
"extends": ["airbnb/base"],
2+
"extends": ["eslint:recommended", "prettier"],
33
"rules": {
44
"no-console": "off",
55
"max-len": [2, {"code": 205, "tabWidth": 4, "ignoreUrls": true}],

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ node_modules
44
/DowJonesDNA.json
55
/r-d.sh
66
/fix_git.sh
7-
7+
.envrc
88
.vscode/**
9+
.DS_Store

Listener.js

+29-18
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ class Listener {
4040
* Leave as null or undefined if you
4141
* want to use the default.
4242
*/
43-
listen(onMessageCallback, subscription, userErrorHandling = false) {
43+
listen(onMessageCallback, subscription, usingAsyncFunction = true) {
4444
return this.apiService.getStreamingCredentials().then((credentials) => {
4545
this.initialize(credentials);
46-
this.readyListener(onMessageCallback, subscription, userErrorHandling);
46+
this.readyListener(onMessageCallback, subscription, usingAsyncFunction);
4747
return true;
4848
}).catch((err) => {
4949
if (err.message) {
@@ -55,7 +55,7 @@ class Listener {
5555
});
5656
}
5757

58-
readyListener(onMessageCallback, subscriptionId, userErrorHandling) {
58+
readyListener(onMessageCallback, subscriptionId, usingAsyncFunction) {
5959
const sub = subscriptionId || this.defaultSubscriptionId;
6060

6161
if (!sub || sub.length <= 0) {
@@ -66,37 +66,36 @@ class Listener {
6666

6767
console.log(`Listening to subscription: ${subscriptionFullName}`);
6868

69-
const onMessageTryCatch = (msg) => {
70-
try {
71-
onMessageCallback(msg);
72-
msg.ack();
73-
} catch (err) {
74-
console.error(`Error from callback: ${err}\n`);
75-
msg.nack();
76-
throw err;
77-
}
78-
};
69+
const onMessagePromise = (msg) => {
70+
onMessageCallback(msg)
71+
.then(() => {
72+
msg.ack();
73+
})
74+
.catch(err => {
75+
console.error(`On callback ${err.message}`);
76+
msg.nack();
77+
});
78+
}
7979

8080
const onMessageUserHandling = (msg) => {
8181
onMessageCallback(msg, err => {
8282
if (err) {
8383
console.error(`Error from callback: ${err}`);
8484
msg.nack();
85-
throw err;
8685
} else {
8786
msg.ack();
8887
}
8988
});
9089
}
9190

92-
const onMessage = (userErrorHandling) ? onMessageUserHandling : onMessageTryCatch;
91+
const onMessage = (usingAsyncFunction) ? onMessagePromise : onMessageUserHandling;
9392

94-
const pubsubSubscription = this.pubsubClient.subscription(subscriptionFullName);
93+
this.pubsubSubscription = this.pubsubClient.subscription(subscriptionFullName);
9594

9695
this.apiService.getAccountInfo().then(accountInfo =>
9796
this.checkDocCountExceeded(sub, accountInfo.max_allowed_document_extracts));
9897

99-
pubsubSubscription.get().then((data) => {
98+
this.pubsubSubscription.get().then((data) => {
10099
const pubsubSub = data[0];
101100
pubsubSub.on('message', onMessage);
102101
pubsubSub.on('error', (subErr) => {
@@ -118,7 +117,6 @@ class Listener {
118117
'However, you won\'t lose access to any documents that have already been added to the queue.\n' +
119118
'These will continue to be streamed to you.\n' +
120119
'Contact your account administrator with any questions or to upgrade your account limits.';
121-
const interval = 300000;
122120
this.apiService.isStreamDisabled(subscriptionId).then((isDisabled) => {
123121
if (isDisabled) {
124122
console.error(streamDisabledMsg);
@@ -127,6 +125,19 @@ class Listener {
127125
console.error(err);
128126
});
129127
}
128+
129+
closeListener() {
130+
if (this.pubsubSubscription) {
131+
if (this.pubsubSubscription.isOpen) {
132+
this.pubsubSubscription.close();
133+
console.log('Closing the listener.')
134+
} else {
135+
console.warn("The listener is already closed.");
136+
}
137+
} else {
138+
console.warn("There's no opened subscription to close. Call listen method first.");
139+
}
140+
}
130141
}
131142

132143
module.exports = Listener;

README.md

+135-44
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# dj-dna-streaming-javascript
2+
23
DNA Streaming Client - written in Javascript.
34

45
## How To Use
56

6-
#### Installing
7+
### Installing
78

89
This project is an NPM module. That means it can be installed as a kind of library for your main project. To do this go to your main project's root. At the command line execute the following:
910

@@ -13,28 +14,28 @@ npm install git+https://git@github.com/dowjones/dj-dna-streams-javascript.git --
1314

1415
Alternatively you can simply check out this project from Git.
1516

16-
#### Authentication
17+
### Authentication
1718

1819
User Key
1920

20-
#### Configuring The App
21+
### Configuring The App
2122

2223
There are three ways to pass configuration variables to the app. Please note that environment variables (Option 1) will override values provided in the `customerConfig.json` file (Option 2).
2324
They will not override values passed directly to the `Listener` constructor (Option 3).
2425

25-
Option 1. Set environment variables.
26+
#### Option 1. Set environment variables.
2627

27-
###### User Key
28+
Export the following envirnment variables:
2829

2930
**USER_KEY**
3031
Dow Jones provided user key.
3132

3233
**SUBSCRIPTION_ID**
33-
This environment variable holds the subscription ID.
34-
35-
Option 2. Modify the 'customerConfig.json' file. In this project's root you will find the 'customerConfig.json' file. Add your credentials and subscription ID. Ensure your additions follow the JSON data format conventions.
34+
This environment variable holds the subscription ID.
3635

37-
###### User Key
36+
#### Option 2. Modifying `customerConfig.json`
37+
38+
In this project's root you will find the `customerConfig.json` file. Add your credentials and subscription ID. Ensure your additions follow the JSON data format conventions.
3839

3940
```
4041
{
@@ -43,14 +44,14 @@ Option 2. Modify the 'customerConfig.json' file. In this project's root you will
4344
}
4445
```
4546

46-
or
47+
#### Option 3. Passing values as function arguments.
4748

48-
Option 3: Passing values as function arguments. Specifically you can pass either the service account credentials and/or subscription ID. When you start a listener you can pass the service account crendentials to the Listener constructor as an object with the field "user_key", like so:
49+
Specifically you can pass either the service account credentials and/or subscription ID. When you start a listener you can pass the service account crendentials to the Listener constructor as an object with the field "user_key", like so:
4950

5051
~~~~
51-
var Listener = require('dj-dna-streaming-javascript').Listener;
52+
const Listener = require('dj-dna-streaming-javascript').Listener;
5253
53-
var onMessageCallback = function(msg) {
54+
const onMessageCallback = async function(msg) {
5455
console.log('One incoming message:' + JSON.stringify(msg.data));
5556
};
5657
@@ -68,28 +69,28 @@ This will override both the environment variable and the configuration file serv
6869
If you want to pass the subscription ID via function arguments, take a look at the following code:
6970

7071
~~~~
71-
var Listener = require('dj-dna-streaming-javascript').Listener;
72+
const Listener = require('dj-dna-streaming-javascript').Listener;
7273
73-
var onMessageCallback = function(msg) {
74+
const onMessageCallback = async function(msg) {
7475
console.log('One incoming message:' + JSON.stringify(msg.data));
7576
};
7677
77-
var subscriptionId = 'abcdefghi123';
78+
const subscriptionId = 'abcdefghi123';
7879
7980
const listener = new Listener();
8081
listener.listen(onMessageCallback, subscriptionId);
8182
~~~~
8283

8384

84-
#### Running the Demo Code
85+
### Running the Demo Code
8586

8687
This modules comes with demonstration code. To execute the demo code, configure your app (See _Configuring the App_ section above). Then execute the following:
8788

8889
~~~
8990
npm run demo
9091
~~~
9192

92-
##### Docker Demo
93+
### Docker Demo
9394

9495
To execute the demo code in a Docker container, perform the following steps.
9596

@@ -101,58 +102,148 @@ Step 1: Build the docker image. Execute the following command line:
101102

102103
Step 2: Run the docker image
103104

104-
###### User Key
105-
106105
~~~
107106
docker run -it \
108107
-e USER_KEY="<your user key>" \
109108
-e SUBSCRIPTION_ID="<your subscription ID>" \
110109
dj-dna-streaming-javascript
111110
~~~
112111

113-
###### Client Credentials
114-
~~~
115-
docker run -it \
116-
-e USER_ID="<your user ID>" \
117-
-e CLIENT_ID="<your client ID>" \
118-
-e PASSWORD="<your password>" \
119-
-e SUBSCRIPTION_ID="<your subscription ID>" \
120-
dj-dna-streaming-javascript
121-
~~~
112+
## Writing Your Own Code
113+
114+
The following is some very basic code. Use it to listen to a DNA subscription. It assumes you have correctly configured the app. (See the *Configuring The App* section above).
122115

116+
You can use two patterns to consume the messages from the subscription. The first option is using an async function or function returning a Promise or theanable:
123117

124-
#### Writing Your Own Code
118+
### Async Function, Promise or Theanable
125119

126-
The following is some very basic code. Use it to listen to a DNA subscription. It assumes you have configured the app correct. (See the *Configuring The App* section above).
120+
Write an async function or function returning a Promise/Theanable processing the messages. When the
121+
promise is resolved the message is acknowledged, in case the promise is rejected the message will be
122+
not acknowledged, so it can be processed again.
127123

128124
~~~~
129-
var Listener = require('dj-dna-streaming-javascript').Listener;
125+
const Listener = require('dj-dna-streaming-javascript').Listener;
130126
131-
var onMessageCallback = function(msg) {
127+
const onMessageCallback = async function(msg) {
132128
console.log('One incoming message:' + JSON.stringify(msg.data));
133129
};
134130
135131
const listener = new Listener();
136132
listener.listen(onMessageCallback);
137133
~~~~
138134

139-
###### Error Handling
135+
### Callback pattern
140136

141-
If your callback fails, the message will be nack'd and the listener will rethrow the error. If you wish to write your own error handling for callbacks then set the `userErrorHandling` parameter to true. This allows you to use an error handler callback to force the callback handler to nack messages. The following is a very basic example illustrating how this may work.
137+
Write a callback function with two parameters, the first one being the message. The second one is a function which must be called with null as a parameter if the message is correctly processed or an error. If the parameter is null when calling handleErr, the message will be acknowledged, if not, the message will be not acknowledged.
142138

143139
~~~~
144-
var Listener = require('dj-dna-streaming-javascript').Listener;
145-
146-
var onMessageCallback = function((msg, handleErr) {
147-
let err = null;
148-
try {
140+
const Listener = require('dj-dna-streaming-javascript').Listener;
141+
142+
const onMessageCallback = (msg, handleErr) => {
143+
let err = null;
144+
try {
149145
console.log('One incoming message:' + JSON.stringify(msg.data));
150-
} catch (e) {
151-
err = e
152-
};
146+
} catch (e) {
147+
err = e
148+
};
149+
153150
handleErr(err)
154151
};
152+
153+
const listener = new Listener();
154+
listener.listen(onMessageCallback, null, false);
155+
~~~~
156+
157+
### Shutting down the listener
158+
159+
The listener have a method to stop receiving new messages. After a set time you can call this method or you can add a termination signal listener to call it and close cleanly the listener.
160+
161+
#### Waiting a set time
162+
163+
~~~~
164+
const Listener = require('dj-dna-streaming-javascript').Listener;
165+
166+
const onMessageCallback = async function(msg) {
167+
console.log('One incoming message:' + JSON.stringify(msg.data));
168+
};
155169
156170
const listener = new Listener();
157-
listener.listen(onMessageCallback, my_subscription_id, true);
171+
listener.listen(onMessageCallback);
172+
173+
// Listen to messages for 10 seconds
174+
175+
setTimeout(() => {
176+
listener.closeListener();
177+
}, 10000);
178+
~~~~
179+
180+
#### Ading a listener to the SIGTERM or SIGINT signals
181+
182+
~~~~
183+
const Listener = require('dj-dna-streaming-javascript').Listener;
184+
185+
const onMessageCallback = async function(msg) {
186+
console.log('One incoming message:' + JSON.stringify(msg.data));
187+
};
188+
189+
const listener = new Listener();
190+
listener.listen(onMessageCallback);
191+
192+
// This method calls the listener's closeListener method.
193+
const terminationHandler = () => {
194+
listener.closeListener();
195+
};
196+
197+
// Adding this method as a handler of SIGTERM or SIGINT
198+
process.on('SIGINT', terminationHandler);
199+
process.on('SIGTERM', terminationHandler);
200+
~~~~
201+
202+
### Migrating from a synchronous callback function
203+
204+
The latest version of the client require the callback function to conform to the callback pattern or using a function returning a Promise or Theanable.
205+
206+
When having a synchronous callback function as the following:
207+
~~~~~
208+
const oldSynchronousCallback = (msg) => {
209+
console.log('One incoming message:' + JSON.stringify(msg.data));
210+
}
211+
~~~~~
212+
213+
#### Create Promise from the old callback function:
214+
215+
~~~~
216+
const Listener = require('dj-dna-streaming-javascript').Listener;
217+
218+
const newAsyncCallback = (msg) => {
219+
return new Promise((resolve, reject) => {
220+
try {
221+
resolve(oldSynchronousCallback(msg));
222+
} catch (e) {
223+
reject(e);
224+
}
225+
});
226+
};
227+
228+
const listener = new Listener();
229+
listener.listen(newAsyncCallback);
230+
~~~~
231+
232+
#### Use the callback pattern:
233+
~~~~
234+
const Listener = require('dj-dna-streaming-javascript').Listener;
235+
236+
const onMessageCallback = (msg, handleErr) => {
237+
let err = null;
238+
try {
239+
oldSynchronousCallback(msg);
240+
} catch (e) {
241+
err = e
242+
};
243+
244+
handleErr(err)
245+
};
246+
247+
const listener = new Listener();
248+
listener.listen(onMessageCallback, null, false);
158249
~~~~

0 commit comments

Comments
 (0)