Skip to content

Commit

Permalink
Merge pull request #891 from LockerProject/vm
Browse files Browse the repository at this point in the history
Vhymn
  • Loading branch information
quartzjer committed Mar 2, 2012
2 parents 7bc37d4 + 79d6562 commit 3602c33
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 35 deletions.
8 changes: 6 additions & 2 deletions Common/node/levents.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,13 @@ BatchSendQueue.prototype.push = function(item) {
// Every push is added and we see if we can send more
this.items.push(item);
var self = this;
if(!this.running) setTimeout(function(){ self.run(); }, 100); // give a chance to queue up if coming in fast
if(!this.running) {
if(this.timer) clearTimeout(this.timer);
this.timer = setTimeout(function(){ self.run(); }, 1000); // give a chance to queue up if coming in fast
}
};
BatchSendQueue.prototype.run = function() {
if(this.timer) delete this.timer;
if (this.running) return;
if (this.items.length == 0) return;

Expand All @@ -155,7 +159,7 @@ BatchSendQueue.prototype.run = function() {
logger.verbose("Done sending batched events to " + self.url);
// If more stuff came in we run again, otherwise push will get it next time
self.running = false;
if (self.items.length > 0) setTimeout(function(){ self.run(); }, 100); // breather
if (self.items.length > 0) setTimeout(function(){ self.run(); }, 1000); // breather
});
async.forEachSeries(sendingItems, function(item, sendCb) {
req.write(JSON.stringify(item) + "\n");
Expand Down
126 changes: 120 additions & 6 deletions Common/node/lsyncmanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,60 @@ var fs = require('fs')
, logger = require("./logger.js")
, dispatcher = require('./instrument.js').StatsdDispatcher
, stats = new dispatcher(lconfig.stats);
var vm = require("vm");
var util = require("util");

var runningContexts = {}; // Map of a synclet to a running context

function LockerInterface(synclet, info) {
EventEmitter.call(this);
this.synclet = synclet;
this.info = info;
this.srcdir = path.join(lconfig.lockerDir, info.srcdir);
this.workingDirectory = path.join(lconfig.lockerDir, lconfig.me, info.id);
this.processing = false; // If we're processing events
this.events = [];
}
util.inherits(LockerInterface, EventEmitter);
LockerInterface.prototype.error = function(message) {
logger.error("Error from synclet " + this.synclet.name + "/" + this.info.id + ": " + message);
};
// Fire an event from the synclet
LockerInterface.prototype.event = function(action, lockerType, obj) {
this.events.push({action:action, lockerType:lockerType, obj:obj});
this.emit("event");
this.processEvents();
}
LockerInterface.prototype.processEvents = function() {
if (this.processing) return;
// Process the events we have
this.processing = true;
var self = this;
var curEvents = this.events;
this.events = [];
async.forEachSeries(curEvents, function(event, cb) {
processData([], self.info, self.synclet, event.lockerType, [event], cb);
}, function(error) {
self.processing = false;
if (self.events.length == 0) {
self.emit("drain");
} else {
process.nextTick(function() {
self.processEvents();
});
}
});
}
// Signals that the synclet context is complete and may be cleaned up
LockerInterface.prototype.end = function() {
if (this.events.length > 0) {
this.once("drain", function() {
this.emit("end");
});
} else {
this.emit("end");
}
}

// this works, but feels like it should be a cleaner abstraction layer on top of the datastore instead of this garbage
datastore.init = function (callback) {
Expand Down Expand Up @@ -56,6 +110,10 @@ exports.syncNow = function (serviceId, syncletId, post, callback) {
var js = serviceManager.map(serviceId);
if (!js || !js.synclets) return callback("no synclets like that installed");
async.forEachSeries(js.synclets, function (synclet, cb) {
if (!synclet) {
logger.error("Unknown synclet info in syncNow");
cb();
}
if(syncletId && synclet.name != syncletId) return cb();
if(post)
{
Expand Down Expand Up @@ -155,7 +213,7 @@ function executeSynclet(info, synclet, callback, force) {
return callback();
}
// if another synclet is running, come back a little later, don't overlap!
if (info.status == 'running') {
if (info.status == 'running' || runningContexts[info.id + "/" + synclet.name]) {
logger.verbose("delaying "+synclet.name);
setTimeout(function() {
executeSynclet(info, synclet, callback, force);
Expand All @@ -164,6 +222,67 @@ function executeSynclet(info, synclet, callback, force) {
}
logger.info("Synclet "+synclet.name+" starting for "+info.id);
info.status = synclet.status = "running";
var tstart = Date.now();
stats.increment(info.id + '.' + synclet.name + '.start');

if (info.vm || synclet.vm) {
// Go ahead and create a context immediately so we get it listed as
// running and dont' start mulitple ones
var sandbox = {
// XXX: This could be a problem later and need a cacheing layer to
// remove anything that they add, but for now we'll allow it
// direct and see the impact
require:require,
console:console,
exports:{}
};
var context = vm.createContext(sandbox);
runningContexts[info.id + "/" + synclet.name] = context;
// Let's get the code loaded
var fname = path.join(info.srcdir, synclet.name + ".js");
fs.readFile(fname, function(err, code) {
if (err) {
logger.error("Unable to load synclet " + synclet.name + "/" + info.id + ": " + err);
return callback(err);
}
try {
synclet.deleted = synclet.added = synclet.updated = 0;
vm.runInContext(code, context, fname);

if (!info.config) info.config = {};

if (!info.fullsrcdir) info.absoluteSrcdir = path.join(lconfig.lockerDir, info.srcdir);
info.syncletToRun = synclet;
info.syncletToRun.workingDirectory = path.join(lconfig.lockerDir, lconfig.me, info.id);
info.lockerUrl = lconfig.lockerBase;
sandbox.exports.sync(info, function(syncErr, response) {
delete runningContexts[info.id + "/" + synclet.name];
if (syncErr) {
logger.error(synclet.name+" error: "+util.inspect(syncErr));
info.status = synclet.status = 'failed';
return callback(syncErr);
}
logger.info("Synclet "+synclet.name+" finished for "+info.id+" timing "+(Date.now() - tstart));
info.status = synclet.status = 'processing data';
var deleteIDs = compareIDs(info.config, response.config);
info.auth = lutil.extend(true, info.auth, response.auth); // for refresh tokens and profiles
info.config = lutil.extend(true, info.config, response.config);
exports.scheduleRun(info, synclet);
serviceManager.mapDirty(info.id); // save out to disk
processResponse(deleteIDs, info, synclet, response, function(processErr) {
info.status = 'waiting';
callback(processErr);
});
});
} catch (E) {
logger.error("Error running " + synclet.name + "/" + info.id + " in a vm context: " + E);
return callback(E);
}
});
if(synclet.posts) synclet.posts = []; // they're serialized, empty the queue
delete info.syncletToRun;
return;
}
var run;
var env = process.env;
if (!synclet.run) {
Expand Down Expand Up @@ -192,12 +311,7 @@ function executeSynclet(info, synclet, callback, force) {
localError(info.title+" "+synclet.name + " error:",data.toString());
});

var tstart;
app.stdout.on('data',function (data) {
if(!tstart) {
tstart = Date.now();
stats.increment(info.id + '.' + synclet.name + '.start');
}
dataResponse += data;
});

Expand Down
9 changes: 5 additions & 4 deletions Connectors/Twitter/friends.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
*
*/

var tw = require('./lib.js')
, contacts = []
;
var path = require('path');
var tw;

var contacts = [];
exports.sync = function(processInfo, cb) {
tw.init(processInfo.auth);
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
exports.syncFriends(function(err) {
if (err) console.error(err);
var responseObj = {data : {}};
Expand Down
20 changes: 11 additions & 9 deletions Connectors/Twitter/lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,24 @@ var fs = require('fs'),
request = require('request'),
async = require('async'),
url = require('url');

var path = require('path');

var tw;
var auth;
var base;

exports.init = function(theAuth) {
exports.init = function(theAuth, theBase) {
auth = theAuth;
tw = require('./twitter_client')(auth.consumerKey, auth.consumerSecret);
base = theBase;
tw = require(path.join(base,'twitter_client'))(auth.consumerKey, auth.consumerSecret);
try {
fs.mkdirSync('friends', 0755);
fs.mkdirSync(path.join(base,'friends'), 0755);
} catch(e) {};
};

exports.getMe = function(arg, cbEach, cbDone) {
arg.path = '/account/verify_credentials.json';
fs.readFile('profile.json', function(err, data) {
fs.readFile(path.join(base,'profile.json'), function(err, data) {
var me;
try {
if(err) throw "na";
Expand All @@ -36,7 +38,7 @@ exports.getMe = function(arg, cbEach, cbDone) {
return getOne(arg,function(err,me){
if(!err)
{
fs.writeFile('profile.json', JSON.stringify(me));
fs.writeFile(path.join(base,'profile.json'), JSON.stringify(me));
cbEach(me);
}
cbDone(err);
Expand All @@ -62,18 +64,18 @@ exports.getMyFriends = function(arg, cbEach, cbDone) {
// load orig if any
var orig;
try {
orig = JSON.parse(fs.readFileSync('friends/'+friend.id_str+'.json'));
orig = JSON.parse(fs.readFileSync(path.join(base,'friends/'+friend.id_str+'.json')));
}catch(E){}
// background cache pic if it's new or changed
if(!orig || orig.profile_image_url != friend.profile_image_url)
{
request.get({uri:friend.profile_image_url, encoding:'binary'}, function(err, resp, body) {
var photoExt = friend.profile_image_url.substring(friend.profile_image_url.lastIndexOf('.'));
fs.writeFile('friends/' + friend.id_str + photoExt, body, 'binary');
fs.writeFile(path.join(base,'friends/' + friend.id_str + photoExt, body, 'binary'));
});
}
// background cache data
fs.writeFile('friends/'+friend.id_str+'.json', JSON.stringify(friend));
fs.writeFile(path.join(base,'friends/'+friend.id_str+'.json', JSON.stringify(friend)));
cbEach(friend);
},cbDone);
});
Expand Down
6 changes: 4 additions & 2 deletions Connectors/Twitter/mentions.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
*
*/

var tw = require('./lib.js');
var path = require('path');
var tw;

exports.sync = function(processInfo, cb) {
tw.init(processInfo.auth);
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
var me;
var responseObj = {data : {}, config:{}};
var since=1;
Expand Down
1 change: 1 addition & 0 deletions Connectors/Twitter/profile.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id_str":"1704421","id":1704421,"profile_text_color":"000000","created_at":"Wed Mar 21 02:44:51 +0000 2007","contributors_enabled":false,"follow_request_sent":false,"lang":"en","listed_count":138,"profile_sidebar_border_color":"000000","show_all_inline_media":true,"friends_count":177,"utc_offset":-21600,"location":"Cascade, IA","name":"Jeremie Miller","profile_background_tile":false,"profile_sidebar_fill_color":"ffffff","profile_image_url_https":"https://si0.twimg.com/profile_images/565991047/jer_normal.jpg","protected":false,"geo_enabled":true,"following":false,"default_profile_image":false,"statuses_count":1096,"is_translator":false,"favourites_count":10,"profile_background_color":"ffffff","description":"Currently building the Locker Project, TeleHash, and Singly with a focus on personal data + distributed protocols. Helped found Jabber/XMPP, open platforms FTW!","time_zone":"Central Time (US & Canada)","screen_name":"jeremie","profile_background_image_url":"http://a0.twimg.com/images/themes/theme1/bg.png","profile_image_url":"http://a0.twimg.com/profile_images/565991047/jer_normal.jpg","profile_link_color":"0000ff","profile_background_image_url_https":"https://si0.twimg.com/images/themes/theme1/bg.png","followers_count":1505,"status":{"in_reply_to_status_id_str":"173876512158781440","in_reply_to_user_id_str":"3099501","retweeted":false,"coordinates":null,"in_reply_to_screen_name":"mpesce","created_at":"Sun Feb 26 21:07:00 +0000 2012","contributors":null,"in_reply_to_status_id":173876512158781440,"entities":{"urls":[],"user_mentions":[{"name":"Mark Pesce","screen_name":"mpesce","indices":[0,7],"id_str":"3099501","id":3099501}],"hashtags":[]},"geo":null,"in_reply_to_user_id":3099501,"place":null,"favorited":false,"truncated":false,"source":"<a href=\"http://itunes.apple.com/us/app/twitter/id409789998?mt=12\" rel=\"nofollow\">Twitter for Mac</a>","id_str":"173876742518345729","id":173876742518345730,"retweet_count":0,"text":"@mpesce metaphorically, yes, in my code."},"default_profile":false,"notifications":false,"url":"http://jeremie.com/-","profile_use_background_image":false,"verified":false}
6 changes: 4 additions & 2 deletions Connectors/Twitter/related.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
*
*/

var tw = require('./lib.js');
var path = require('path');
var tw;
var async = require('async');

exports.sync = function(processInfo, cb) {
tw.init(processInfo.auth);
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
var me;
var responseObj = {data : {}};
tw.getMe({},function(js){me=js}, function(err){
Expand Down
6 changes: 4 additions & 2 deletions Connectors/Twitter/self.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
*
*/

var tw = require('./lib.js');
var path = require('path');
var tw;

exports.sync = function(processInfo, cb) {
tw.init(processInfo.auth);
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
var self;
tw.getMe({}, function(js){ self = js; }, function(err) {
if (err) return cb(err);
Expand Down
1 change: 1 addition & 0 deletions Connectors/Twitter/synclets.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"strip" : {
"contact":["status"]
},
"vm":true,
"synclets":[
{"name": "self", "frequency": 7200},
{"name": "friends", "frequency": 3600},
Expand Down
6 changes: 4 additions & 2 deletions Connectors/Twitter/timeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
*
*/

var tw = require('./lib.js');
var path = require('path');
var tw;

exports.sync = function(processInfo, cb) {
tw.init(processInfo.auth);
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
var me;
var responseObj = {data : {}, config:{}};
var since=1;
Expand Down
6 changes: 4 additions & 2 deletions Connectors/Twitter/tweet.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@
*
*/

var tw = require('./lib.js');
var path = require('path');
var tw;
var async = require('async');

exports.sync = function(processInfo, cb) {
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
var posts = processInfo.syncletToRun.posts;
var ret = {data: { tweets: [] } };
if(!Array.isArray(posts) || posts.length == 0) return cb(undefined, ret);
tw.init(processInfo.auth);
async.forEachSeries(posts, function(post, cb){
tw.update(post, function(tweet){ ret.data.tweets.push(tweet); }, function(err){
if(err) console.error("got "+err+" while posting "+JSON.stringify(post));
Expand Down
6 changes: 4 additions & 2 deletions Connectors/Twitter/tweets.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
*
*/

var tw = require('./lib.js');
var path = require('path');
var tw;

exports.sync = function(processInfo, cb) {
tw.init(processInfo.auth);
tw = require(path.join(processInfo.absoluteSrcdir, 'lib.js'));
tw.init(processInfo.auth, processInfo.absoluteSrcdir);
var me;
var responseObj = {data : {}, config:{}};
var since=1;
Expand Down
2 changes: 1 addition & 1 deletion Ops/registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ exports.sync = function (callback, force) {
function finish(err) {
logger.info("registry sync finished");
if(err) logger.error(err);
syncTimer = setTimeout(exports.sync, lconfig.registryUpdateInterval * 1000);
syncTimer = setTimeout(exports.sync, lconfig.registryUpdateInterval * (1000 + ((Math.random() - 0.5) * 200)));
syncCallbacks.forEach(function (cb) { cb(err); });
syncCallbacks = [];
logger.info("registry callbacks completed");
Expand Down
1 change: 0 additions & 1 deletion tests/Config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"core-lib" : [
"lpushmanager-test-local.js"],
"synclets" : ["synclet-local-foursquare.js",
"synclet-local-twitter.js",
"synclet-local-facebook.js",
"synclet-local-flickr.js",
"synclet-local-soundcloud.js",
Expand Down

0 comments on commit 3602c33

Please sign in to comment.