Skip to content

Commit

Permalink
feat: Global poller for updating worker status
Browse files Browse the repository at this point in the history
Uses a shared poller to fetch and update the status of workers; helps reduce the no. of requests made to BrowserStack.
Ref karma-runner#30
  • Loading branch information
shirish87 committed Oct 4, 2015
1 parent 9995615 commit f9d29e1
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 37 deletions.
94 changes: 57 additions & 37 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ var q = require('q')
var api = require('browserstack')
var BrowserStackTunnel = require('browserstacktunnel-wrapper')

var Worker = require('./worker').Worker
var WorkerManager = require('./worker').WorkerManager


var createBrowserStackTunnel = function (logger, config, emitter) {
var log = logger.create('launcher.browserstack')
var bsConfig = config.browserStack || {}
Expand Down Expand Up @@ -50,6 +54,11 @@ var createBrowserStackTunnel = function (logger, config, emitter) {
log.error(error)
}

var workerManager = WorkerManager.getInstance();
if (workerManager.isPolling) {
workerManager.stopPolling()
}

done()
})
})
Expand All @@ -61,10 +70,28 @@ var createBrowserStackClient = function (/* config.browserStack */ config) {
var env = process.env

// TODO(vojta): handle no username/pwd
return api.createClient({
var client = api.createClient({
username: env.BROWSER_STACK_USERNAME || config.username,
password: env.BROWSER_STACK_ACCESS_KEY || config.accessKey
})

var pollingTimeout = config.pollingTimeout || 1000

var workerManager = WorkerManager.getInstance()
if (!workerManager.isPolling) {
workerManager.startPolling(client, pollingTimeout, function (err) {
if (err) {
console.error(err)
}
})
}

var browserStackClient = {
client: client,
workerManager: workerManager
}

return browserStackClient
}

var formatError = function (error) {
Expand All @@ -78,8 +105,11 @@ var formatError = function (error) {

var BrowserStackBrowser = function (id, emitter, args, logger,
/* config */ config,
/* browserStackTunnel */ tunnel, /* browserStackClient */ client) {
/* browserStackTunnel */ tunnel, /* browserStackClient */ browserStackClient) {
var self = this
var client = browserStackClient.client
var workerManager = browserStackClient.workerManager

var workerId = null
var captured = false
var alreadyKilling = null
Expand All @@ -94,7 +124,6 @@ var BrowserStackBrowser = function (id, emitter, args, logger,
var captureTimeout = config.captureTimeout || 0
var captureTimeoutId
var retryLimit = bsConfig.retryLimit || 3
var pollingTimeout = bsConfig.pollingTimeout || 1000

this.start = function (url) {
// TODO(vojta): handle non os/browser/version
Expand Down Expand Up @@ -132,43 +161,34 @@ var BrowserStackBrowser = function (id, emitter, args, logger,
workerId = worker.id
alreadyKilling = null

var whenRunning = function () {
log.debug('%s job started with id %s', browserName, workerId)

if (captureTimeout) {
captureTimeoutId = setTimeout(self._onTimeout, captureTimeout)
worker = workerManager.registerWorker(worker)
worker.on('status', function (status) {
// TODO(vojta): show immediately in createClient callback once this gets fixed:
// https://github.com/browserstack/api/issues/10
if (!sessionUrlShowed) {
log.info('%s session at %s', browserName, worker.browser_url)
sessionUrlShowed = true
}
}

var waitForWorkerRunning = function () {
client.getWorker(workerId, function (error, w) {
if (error) {
log.error('Can not get worker %s status %s\n %s', workerId, browserName, formatError(error))
return emitter.emit('browser_process_failure', self)
}

// TODO(vojta): show immediately in createClient callback once this gets fixed:
// https://github.com/browserstack/api/issues/10
if (!sessionUrlShowed) {
log.info('%s session at %s', browserName, w.browser_url)
sessionUrlShowed = true
}

if (w.status === 'running') {
whenRunning()
} else {
log.debug('%s job with id %s still in queue.', browserName, workerId)
setTimeout(waitForWorkerRunning, pollingTimeout)
}
})
}
switch (status) {
case 'running':
log.debug('%s job started with id %s', browserName, workerId)

if (captureTimeout) {
captureTimeoutId = setTimeout(self._onTimeout, captureTimeout)
}
break;

case 'queue':
log.debug('%s job with id %s in queue.', browserName, workerId)
break;

case 'delete':
log.debug('%s job with id %s has been deleted.', browserName, workerId)
break;
}
})

if (worker.status === 'running') {
whenRunning()
} else {
log.debug('%s job queued with id %s.', browserName, workerId)
setTimeout(waitForWorkerRunning, pollingTimeout)
}
})
}, function () {
emitter.emit('browser_process_failure', self)
Expand Down
141 changes: 141 additions & 0 deletions worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
'use strict';

var EventEmitter = require('events').EventEmitter;
var util = require('util');


function Worker(data) {
EventEmitter.call(this);

if (typeof data === 'object' && !Array.isArray(data)) {
var self = this;

Object.keys(data).forEach(function (k) {
self[k] = data[k];
});
}
}

util.inherits(Worker, EventEmitter);


/**
* Tracks worker state across runs.
*/
function WorkerManager() {
this._pollHandle = null;

this.workers = {};
this.isPolling = false;
}


WorkerManager.prototype.registerWorker = function registerWorker(workerData) {
if (this.workers[workerData.id]) {
this.unregisterWorker(this.workers[workerData.id]);
}

var worker = new Worker(workerData);
worker.emit('status', worker.status);

this.workers[workerData.id] = worker;
return worker;
};


WorkerManager.prototype.unregisterWorker = function unregisterWorker(worker) {
worker.emit('delete', worker);
worker.removeAllListeners();

delete this.workers[worker.id];
return worker;
};


WorkerManager.prototype.updateWorker = function updateWorker(workerData) {
var workers = this.workers;

if (workers[workerData.id]) {
var worker = workers[workerData.id];
var prevStatus = worker.status;

Object.keys(workerData).forEach(function (k) {
worker[k] = workerData[k];
});

if (worker.status !== prevStatus) {
worker.emit('status', worker.status);
}

return worker;
}

// will end up including workers that don't belong to current run
// return WorkerManager.registerWorker(workerData);
};


WorkerManager.prototype.startPolling = function startPolling(client, pollingTimeout, callback) {
if (this.isPolling) {
return;
}

var self = this;
this.isPolling = true;

client.getWorkers(function (err, updatedWorkers) {
if (err) {
self.isPolling = false;
return (callback ? callback(err) : null);
}

updatedWorkers = updatedWorkers || [];
var activeWorkerIds = updatedWorkers.map(function (worker) {
return worker.id;
});

// process deletions
for (var i = 0, l = self.workers.length; i < l; i++) {
var worker = self.workers[i];
if (activeWorkerIds.indexOf(worker.id) === -1) {
self.unregisterWorker(worker);
}
}

// process updates
var existingWorkerIds = Object.keys(self.workers);
updatedWorkers.forEach(function (workerData) {
self.updateWorker(workerData);
});

self._pollHandle = setTimeout(function () {
self.isPolling = false;
self.startPolling(client, pollingTimeout, callback);
}, pollingTimeout);

});
};


WorkerManager.prototype.stopPolling = function stopPolling() {
if (this._pollHandle) {
clearTimeout(this._pollHandle);
this._pollHandle = null;
}

this.isPolling = false;
};


// expose a single, shared instance of WorkerManager
var workerManager = new WorkerManager();

module.exports = {
Worker: Worker,

WorkerManager: {
getInstance: function () {
return workerManager;
}
}
};

0 comments on commit f9d29e1

Please sign in to comment.