diff --git a/src/proposer.js b/src/proposer.js new file mode 100644 index 0000000..5634d87 --- /dev/null +++ b/src/proposer.js @@ -0,0 +1,82 @@ +// @ts-check + +import { nodeFetch } from './curl'; +import { RNode } from './rnode'; + +const { freeze } = Object; + +/** + * @param {{ admin?: string, boot: string, read: string }} api + * @param {ReturnType} rnode + * @param {SchedulerAccess} sched + * @param {number=} period + * + * @typedef { { + * setInterval: typeof setInterval, + * clearInterval: typeof clearInterval, + * } } SchedulerAccess + */ + +export function proposer(api, rnode, sched, period = 2 * 1000) { + let proposing = false; + let waiters = 0; + let pid; + + return freeze({ + startProposing() { + if (!api.admin) return; + const node = rnode.admin(api.admin); + waiters += 1; + if (typeof pid !== 'undefined') { + return; + } + pid = sched.setInterval(() => { + if (!proposing) { + proposing = true; + node + .propose() + .then(() => { + console.log('proposed', { waiters }); + proposing = false; + }) + .catch((err) => { + console.log('propose failed', { waiters, err: err.message }); + proposing = false; + }); + } + }, period); + }, + stopProposing() { + if (waiters <= 0) { + return; + } + waiters -= 1; + sched.clearInterval(pid); + pid = undefined; + }, + }); +} + +/** + * @param {string[]} args + * @param {{ http: typeof import('http') }} io + * @param {SchedulerAccess} sched + */ +function main(args, { http }, sched) { + const [url] = args.length ? args : ['http://localhost:40404']; + const fetch = nodeFetch({ http }); + const rnode = RNode(fetch); + const node = proposer({ admin: url, boot: 'N/A', read: 'N/A' }, rnode, sched); + node.startProposing(); +} + +if (require.main === module) { + main( + process.argv.slice(2), + { + // eslint-disable-next-line global-require + http: require('http'), + }, + { setInterval, clearInterval }, + ); +} diff --git a/src/rhopm.js b/src/rhopm.js index 246133b..c5035b0 100644 --- a/src/rhopm.js +++ b/src/rhopm.js @@ -3,13 +3,17 @@ import { nodeFetch } from './curl'; import { RNode } from './rnode'; import { startTerm, listenAtDeployId } from './proxy'; +import { proposer } from './proposer'; // @ts-ignore const { keys, freeze, fromEntries } = Object; -export const rhoDir = 'rho_modules'; -export const rhoInfoPath = (src) => - `${rhoDir}/${src.replace(/\.rho$/, '.json')}`; +/* TODO: handle non-posix paths? */ +/** @type {(net: string) => string} */ +export const rhoDir = (net) => `rho_modules/${net}`; +/** @type {(src: string, net: string) => string} */ +export const rhoInfoPath = (src, net) => + `${rhoDir(net)}/${src.replace(/\.rho$/, '.json')}`; export const importPattern = /match\s*\("import",\s*"(?[^"]+)",\s*`(?rho:id:[^`]*)`\)/g; function log(...args) { @@ -215,65 +219,43 @@ function parseEnv(txt) { } /** - * - * @param {string} envText + * @param {Record} env + * @param {{ admin?: string, boot: string, read: string }} api * @param {typeof import('http')} http * @param {SchedulerAccess} sched - * @param {number} period + * @param {number=} period * - * @typedef { { - * setInterval: typeof setInterval, - * clearInterval: typeof clearInterval, - * } } SchedulerAccess + * @typedef {import('./proposer').SchedulerAccess} SchedulerAccess */ -export function shardIO(envText, http, sched, period = 2 * 1000) { +export function shardAccess(env, api, http, sched, period = 2 * 1000) { const fetch = nodeFetch({ http }); - const env = parseEnv(envText); - const api = { - admin: `http://${env.MY_NET_IP}:40405`, - boot: `http://${env.MY_NET_IP}:40403`, - read: `http://${env.MY_NET_IP}:40413`, - }; - const rnode = RNode(fetch); - const proposer = rnode.admin(api.admin); - let proposing = false; - let waiters = 0; - let pid; + const rnode = RNode(fetch); + const blockMaker = proposer(api, rnode, sched, period); return freeze({ env, ...api, validator: rnode.validator(api.boot), observer: rnode.observer(api.read), - startProposing() { - waiters += 1; - if (typeof pid !== 'undefined') { - return; - } - pid = sched.setInterval(() => { - if (!proposing) { - proposing = true; - proposer - .propose() - .then(() => { - console.log('proposed', { waiters }); - proposing = false; - }) - .catch((err) => { - console.log('propose failed', { waiters, err: err.message }); - proposing = false; - }); - } - }, period); - }, - stopProposing() { - if (waiters <= 0) { - return; - } - waiters -= 1; - sched.clearInterval(pid); - pid = undefined; - }, + ...blockMaker, }); } + +/** + * Local shard I/O + * + * @param {string} envText + * @param {typeof import('http')} http + * @param {SchedulerAccess} sched + * @param {number=} period + */ +export function shardIO(envText, http, sched, period = 2 * 1000) { + const env = parseEnv(envText); + const api = { + admin: `http://${env.MY_NET_IP}:40405`, + boot: `http://${env.MY_NET_IP}:40403`, + read: `http://${env.MY_NET_IP}:40413`, + }; + return shardAccess(env, api, http, sched, period); +}