Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fail safe intervals to restart watches as necessary #236

Merged
merged 12 commits into from
Jan 26, 2022
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,4 @@ typings/

########### Custom ignores ###########
dev/
.npmrc
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
language: node_js

node_js:
- "11"
- "lts/*"

before_install:
- echo "$DOCKERHUB_TOKEN" | docker login -u "icdevops" --password-stdin

script:
- if [ "${TRAVIS_PULL_REQUEST}" != "false" ]; then npm audit; else npm audit || true; fi
- npm run lint
Expand Down
33 changes: 32 additions & 1 deletion lib/EventHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
const merge = require('deepmerge');
const touch = require('touch');
const watchman = require('./Watchman');
const objectPath = require('object-path');


module.exports = class EventHandler {
Expand All @@ -31,6 +32,17 @@ module.exports = class EventHandler {
this._livenessInterval = 60000; //One minute
}
}
this._livenessEndHours = params.restartPod; // time in hours when we should stop updating the liveness file, which should tell kube to restart our pod. If 0 is defined, pod will not be told to restart. Default: 0.
if (!Number.isInteger(this._livenessEndHours)) {
const livenessEndHours = Number.parseInt(this._livenessEndHours);
this._livenessEndHours = (livenessEndHours >= 0) ? livenessEndHours : 0;
}
this._watchTimeoutSeconds = objectPath.get(params, 'requestOptions.qs.timeoutSeconds', 300); // time in seconds when the watch should be re-initialized. default: 300s
if (!Number.isInteger(this._watchTimeoutSeconds)) {
const watchTimeoutSeconds = Number.parseInt(this._watchTimeoutSeconds);
this._watchTimeoutSeconds = (watchTimeoutSeconds > 0) ? watchTimeoutSeconds : 300;
this._watchTimeoutMilliseconds = this._watchTimeoutSeconds * 1000;
}

this._logger = params.logger || require('../bunyan-api').createLogger('EventHandler');

Expand All @@ -55,14 +67,33 @@ module.exports = class EventHandler {
};
this._wm = new watchman(opt, (data) => this.eventHandler(data));
this._wm.watch();

if (this._livenessInterval) {
touch.sync('/tmp/liveness');
let livenessStartTime = Date.now(); // time liveness was first initialized
setInterval(() => {
if (this._wm.watching) {
const now = Date.now();
const millisecondsSinceLivenessStart = now - livenessStartTime;
const hoursSinceLivenessStart = millisecondsSinceLivenessStart / 1000 / 60 / 60;
const restartThresholdHours = this._livenessEndHours; // stop touching liveness file after the specified number of hours, which should tell kube to restart the pod
if (this._wm.watching && (restartThresholdHours == 0 || hoursSinceLivenessStart < restartThresholdHours)) { // (watching && restartThresholdHours not defined) || (watching && hoursSinceLivenessStart < restartThresholdHours)
touch.sync('/tmp/liveness');
}
}, this._livenessInterval);
}

// every cycle, check if the watch is restarting in the appropriate amount of time
setInterval(() => {
const now = Date.now();
const millisecondsSinceLastStart = now - this._wm.watchStart;
const resetThresholdMilliseconds = (this._watchTimeoutMilliseconds + 10000); // interval time and kube timing is not exact, so adding 10 seconds to allow for variance without having to wait for an entire extra cycle
this._logger.debug(`Time since last start: ${millisecondsSinceLastStart / 1000}s | WatchTimeoutSeconds: ${resetThresholdMilliseconds / 1000}s.`);
// if time since last start is more than the time is should have been, restart the watch here instead of waiting on kube to restart it for us
if (millisecondsSinceLastStart > resetThresholdMilliseconds) {
this._logger.debug(`Time since last start: ${millisecondsSinceLastStart / 1000}s > ${resetThresholdMilliseconds / 1000}s ... reseting watch via eventHandler.`);
this._wm.watch();
}
}, (this._watchTimeoutMilliseconds));
}

// Event Handler
Expand Down
17 changes: 14 additions & 3 deletions lib/Watchman.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ module.exports = class Watchman {
throw `uri '${this._requestOptions.baseUrl}${this._requestOptions.uri}' not valid watch uri.`;
}

this._rewatchOnTimeout = options.rewatchOnTimeout || true;
this._rewatchOnTimeout = typeof options.rewatchOnTimeout === 'boolean' ? options.rewatchOnTimeout : true;
this._requestStream = undefined;
this._jsonStream = undefined;
this._errors = 0;
Expand All @@ -63,6 +63,10 @@ module.exports = class Watchman {
get watching() {
return this._watching;
}
get watchStart() {
// Returns the numeric value corresponding to when the watch started — the number of milliseconds elapsed since January 1, 1970 00:00:00 UTC
return this._watchStart;
}

_watchError() {
this._errors++;
Expand All @@ -75,7 +79,9 @@ module.exports = class Watchman {

// public methods
watch() {
this._logger.debug('Watchman: initializing watch');
this.end(this._rewatchOnTimeout);
this._logger.debug('Watchman: attempting new watch ');
this._requestStream = request(this._requestOptions)
.on('response', (response) => {
if (response.statusCode !== 200) {
Expand All @@ -84,6 +90,8 @@ module.exports = class Watchman {
}
this._watchError();
} else {
this._logger.debug('Watchman: watch started');
this._watchStart = Date.now();
this._watching = true;
this._errors = 0;
}
Expand All @@ -96,11 +104,13 @@ module.exports = class Watchman {
})
.on('close', () => {
this._watching = false;
if (this._logger) {
this._logger.info(`GET ${this._requestOptions.uri} closed. rewatchOnTimeout: ${this._rewatchOnTimeout}, errors: ${this._errors}`);
}
if (this._rewatchOnTimeout && this._errors == 0) {
this.watch();
}
});

var parser = JSONStream.parse(true);
parser.on('data', (data) => {
if (data.type === 'ERROR') {
Expand All @@ -122,10 +132,11 @@ module.exports = class Watchman {
}

end(rewatchOnTimeout = false) {
this._logger.debug('Watchman: ending previous watch');
this._watching = false;
this._rewatchOnTimeout = rewatchOnTimeout;
if (this._requestStream) {
this._requestStream.abort();
this._requestStream.destroy();
}
this._requestStream = undefined;
this._jsonStream = undefined;
Expand Down
Loading