Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rhopm: net choice #80

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/proposer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// @ts-check

import { nodeFetch } from './curl';
import { RNode } from './rnode';

const { freeze } = Object;

/**
* @param {{ admin?: string, boot: string, read: string }} api
* @param {ReturnType<import('./rnode').RNode>} rnode
* @param {SchedulerAccess} sched
* @param {number=} period
*
* @typedef { {
* setInterval: typeof setInterval,
* clearInterval: typeof clearInterval,
* } } SchedulerAccess
*/

export function proposer(api, rnode, sched, period = 2 * 1000) {
let proposing = false;
let waiters = 0;
let pid;

return freeze({
startProposing() {
if (!api.admin) return;
const node = rnode.admin(api.admin);
waiters += 1;
if (typeof pid !== 'undefined') {
return;
}
pid = sched.setInterval(() => {
if (!proposing) {
proposing = true;
node
.propose()
.then(() => {
console.log('proposed', { waiters });
proposing = false;
})
.catch((err) => {
console.log('propose failed', { waiters, err: err.message });
proposing = false;
});
}
}, period);
},
stopProposing() {
if (waiters <= 0) {
return;
}
waiters -= 1;
sched.clearInterval(pid);
pid = undefined;
},
});
}

/**
* @param {string[]} args
* @param {{ http: typeof import('http') }} io
* @param {SchedulerAccess} sched
*/
function main(args, { http }, sched) {
const [url] = args.length ? args : ['http://localhost:40404'];
const fetch = nodeFetch({ http });
const rnode = RNode(fetch);
const node = proposer({ admin: url, boot: 'N/A', read: 'N/A' }, rnode, sched);
node.startProposing();
}

if (require.main === module) {
main(
process.argv.slice(2),
{
// eslint-disable-next-line global-require
http: require('http'),
},
{ setInterval, clearInterval },
);
}
84 changes: 33 additions & 51 deletions src/rhopm.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import { nodeFetch } from './curl';
import { RNode } from './rnode';
import { startTerm, listenAtDeployId } from './proxy';
import { proposer } from './proposer';

// @ts-ignore
const { keys, freeze, fromEntries } = Object;

export const rhoDir = 'rho_modules';
export const rhoInfoPath = (src) =>
`${rhoDir}/${src.replace(/\.rho$/, '.json')}`;
/* TODO: handle non-posix paths? */
/** @type {(net: string) => string} */
export const rhoDir = (net) => `rho_modules/${net}`;
/** @type {(src: string, net: string) => string} */
export const rhoInfoPath = (src, net) =>
`${rhoDir(net)}/${src.replace(/\.rho$/, '.json')}`;
export const importPattern = /match\s*\("import",\s*"(?<specifier>[^"]+)",\s*`(?<uri>rho:id:[^`]*)`\)/g;

function log(...args) {
Expand Down Expand Up @@ -215,65 +219,43 @@ function parseEnv(txt) {
}

/**
*
* @param {string} envText
* @param {Record<string, string | undefined>} env
* @param {{ admin?: string, boot: string, read: string }} api
* @param {typeof import('http')} http
* @param {SchedulerAccess} sched
* @param {number} period
* @param {number=} period
*
* @typedef { {
* setInterval: typeof setInterval,
* clearInterval: typeof clearInterval,
* } } SchedulerAccess
* @typedef {import('./proposer').SchedulerAccess} SchedulerAccess
*/
export function shardIO(envText, http, sched, period = 2 * 1000) {
export function shardAccess(env, api, http, sched, period = 2 * 1000) {
const fetch = nodeFetch({ http });
const env = parseEnv(envText);
const api = {
admin: `http://${env.MY_NET_IP}:40405`,
boot: `http://${env.MY_NET_IP}:40403`,
read: `http://${env.MY_NET_IP}:40413`,
};
const rnode = RNode(fetch);

const proposer = rnode.admin(api.admin);
let proposing = false;
let waiters = 0;
let pid;
const rnode = RNode(fetch);
const blockMaker = proposer(api, rnode, sched, period);

return freeze({
env,
...api,
validator: rnode.validator(api.boot),
observer: rnode.observer(api.read),
startProposing() {
waiters += 1;
if (typeof pid !== 'undefined') {
return;
}
pid = sched.setInterval(() => {
if (!proposing) {
proposing = true;
proposer
.propose()
.then(() => {
console.log('proposed', { waiters });
proposing = false;
})
.catch((err) => {
console.log('propose failed', { waiters, err: err.message });
proposing = false;
});
}
}, period);
},
stopProposing() {
if (waiters <= 0) {
return;
}
waiters -= 1;
sched.clearInterval(pid);
pid = undefined;
},
...blockMaker,
});
}

/**
* Local shard I/O
*
* @param {string} envText
* @param {typeof import('http')} http
* @param {SchedulerAccess} sched
* @param {number=} period
*/
export function shardIO(envText, http, sched, period = 2 * 1000) {
const env = parseEnv(envText);
const api = {
admin: `http://${env.MY_NET_IP}:40405`,
boot: `http://${env.MY_NET_IP}:40403`,
read: `http://${env.MY_NET_IP}:40413`,
};
return shardAccess(env, api, http, sched, period);
}