Skip to content

Commit

Permalink
WIP #1245 Updated worker cache (restoring useBlob fn-ality)
Browse files Browse the repository at this point in the history
  • Loading branch information
brollb committed Oct 10, 2019
1 parent 4154330 commit b6c5a25
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 62 deletions.
7 changes: 0 additions & 7 deletions bin/deepforge
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ var storeConfig = function(id, value) {
var cKey = envToConf[env];
process.env[env] = process.env[env] || p(getConfigValue(cKey));
});

// Special cases
if (process.env.DEEPFORGE_WORKER_USE_BLOB === 'true' &&
exists.sync(process.env.DEEPFORGE_BLOB_DIR)) {

process.env.DEEPFORGE_WORKER_CACHE = process.env.DEEPFORGE_BLOB_DIR + '/wg-content';
}
})();

program
Expand Down
40 changes: 30 additions & 10 deletions bin/start-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ var path = require('path'),
config = {};

var createDir = function(dir) {
if (path.dirname(dir) !== dir) {
createDir(path.dirname(dir));
}
try {
fs.statSync(dir);
} catch (e) {
Expand All @@ -25,18 +28,20 @@ var createDir = function(dir) {
}
return false;
};
createDir(workerRootPath);
createDir(workerPath);

const symlink = function(origin, link) {
try {
fs.statSync(link);
} catch (e) {
childProcess.spawnSync('ln', ['-s', origin, link]);
}
};

createDir(workerTmp);

// Create sym link to the node_modules
var modules = path.join(workerRootPath, 'node_modules');
try {
fs.statSync(modules);
} catch (e) {
// Create dir
childProcess.spawnSync('ln', ['-s', `${__dirname}/../node_modules`, modules]);
}
// Create sym link to the node_modules and to deepforge
const modules = path.join(workerRootPath, 'node_modules');
symlink(`${__dirname}/../node_modules`, modules);

var cleanUp = function() {
console.log('removing worker directory ', workerPath);
Expand All @@ -47,6 +52,21 @@ var startExecutor = function() {
process.on('SIGINT', cleanUp);
process.on('uncaughtException', cleanUp);

// Configure the cache
const blobDir = process.env.DEEPFORGE_BLOB_DIR;
const isSharingBlob = process.env.DEEPFORGE_WORKER_USE_BLOB === 'true' &&
!!blobDir;

if (process.env.DEEPFORGE_WORKER_CACHE && isSharingBlob) {
// Create the cache directory and symlink the blob in cache/gme
createDir(process.env.DEEPFORGE_WORKER_CACHE);

const blobContentDir = path.join(blobDir, 'wg-content');
const gmeStorageCache = path.join(process.env.DEEPFORGE_WORKER_CACHE, 'gme');
rm_rf.sync(gmeStorageCache);
symlink(blobContentDir, gmeStorageCache);
}

// Start the executor
const env = Object.assign({}, process.env);
env.DEEPFORGE_ROOT = path.join(__dirname, '..');
Expand Down
102 changes: 57 additions & 45 deletions src/plugins/GenerateJob/templates/start.ejs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
// A wrapper for the torch script which:
// A wrapper for the worker script which:
// - merges stdout, stderr
// - receives some commands and uploads intermediate data
var spawn = require('child_process').spawn,
fs = require('fs'),
path = require('path'),
log = console.error,
logger = {};
const spawn = require('child_process').spawn;
const fs = require('fs');
const rm_rf = require('rimraf');
const path = require('path');
const COMMAND_PREFIX = '<%= CONSTANTS.START_CMD %>';
const IMAGE = '<%= CONSTANTS.IMAGE.PREFIX %>';
const log = console.error;
const logger = {};

// Create the stderr only logger
['error', 'warn', 'info', 'log', 'debug'].forEach(method => logger[method] = log);
logger.fork = () => logger;

// Get the BlobClient...
var COMMAND_PREFIX = '<%= CONSTANTS.START_CMD %>',
IMAGE = '<%= CONSTANTS.IMAGE.PREFIX %>',
remainingImageCount = 0,
exitCode;
let remainingImageCount = 0;
let exitCode;

const assert = require('assert');
const cwd = process.cwd();
Expand All @@ -39,7 +39,7 @@ requirejs([
Storage,
) {
var url = process.env.ORIGIN_URL || 'http://127.0.0.1:8888',
CACHE_DIR = process.env.DEEPFORGE_WORKER_CACHE || './worker-cache',
workerCacheDir = process.env.DEEPFORGE_WORKER_CACHE,
protocol = url.split('://').shift(),
address,
port = (url.split(':') || ['80']).pop();
Expand All @@ -48,14 +48,24 @@ requirejs([
address = url.replace(protocol + '://', '')
.replace(':' + port, '');

// Create CACHE_DIR if it doesn't exist
var prepareCache = function() {
var dirs = CACHE_DIR.replace(/\/$/, '').split('/'),
cacheParent;
// Create workerCacheDir if it doesn't exist
var prepareCache = async function() {
if (!workerCacheDir) {
workerCacheDir = './worker-cache';
const blobDir = process.env.DEEPFORGE_BLOB_DIR;
const isSharingBlob = process.env.DEEPFORGE_WORKER_USE_BLOB === 'true';
if (isSharingBlob && blobDir) {
await makeIfNeeded(workerCacheDir);

const blobContentDir = path.join(blobDir, 'wg-content');
const gmeStorageCache = path.join(workerCacheDir, 'gme');
rm_rf.sync(path.join(workerCacheDir, 'gme'));
await symlink(blobContentDir, gmeStorageCache);
}
} else {
await makeIfNeeded(workerCacheDir);
}

dirs.pop();
cacheParent = dirs.join('/');
return makeIfNeeded(cacheParent).then(() => makeIfNeeded(CACHE_DIR));
};

var prepareInputsOutputs = function() {
Expand Down Expand Up @@ -162,30 +172,7 @@ requirejs([

var dataCachePath = async function(dataInfo) {
const relPath = await Storage.getCachePath(logger, dataInfo);
return `${CACHE_DIR}/${relPath}`;
};

var makeSymLink = function(target, src) {
var deferred = Q.defer(),
job;

src = path.resolve(src);
target = path.resolve(target);
fs.stat(src, err => {
if (err && err.code === 'ENOENT') {
logger.debug(`creating symlink "ln -s ${target} ${src}"`);
job = spawn('ln', ['-s', target, src || '.']);
job.on('exit', code => {
if (code) {
deferred.reject(`Could not create symlink ${target} -> ${src||'.'}`);
return;
}
deferred.resolve();
});
}
deferred.resolve();
});
return deferred.promise;
return `${workerCacheDir}/${relPath}`;
};

var getData = async function(ipath, dataInfo) {
Expand All @@ -200,7 +187,7 @@ requirejs([
// Check if the data exists in the cache
if (!err && cacheStats.isFile()) {
logger.info(`${inputName} already cached. Skipping retrieval from blob`);
return makeSymLink(cachePath, ipath).then(deferred.resolve);
return symlink(cachePath, ipath).then(deferred.resolve);
}

createCacheDir(cachePath)
Expand All @@ -212,7 +199,7 @@ requirejs([
}
// Create the symlink
logger.info('Retrieved ' + ipath);
return makeSymLink(cachePath, ipath).then(deferred.resolve);
return symlink(cachePath, ipath).then(deferred.resolve);
}))
.catch(err => deferred.reject(`Could not retrieve "${inputName}" (${err})`));
});
Expand Down Expand Up @@ -300,4 +287,29 @@ requirejs([
console.log(`Data retrieval failed: ${err}`);
process.exit(1);
});

function symlink(target, src) {
var deferred = Q.defer(),
job;

src = path.resolve(src);
target = path.resolve(target);
fs.stat(src, err => {
if (err && err.code === 'ENOENT') {
logger.debug(`creating symlink "ln -s ${target} ${src}"`);
console.log(`creating symlink "ln -s ${target} ${src}"`);
job = spawn('ln', ['-s', target, src || '.']);
job.on('exit', code => {
if (code) {
deferred.reject(`Could not create symlink ${target} -> ${src||'.'}`);
return;
}
deferred.resolve();
});
}
deferred.resolve();
});
return deferred.promise;
};

});

0 comments on commit b6c5a25

Please sign in to comment.