Skip to content

Commit

Permalink
Merge pull request #34
Browse files Browse the repository at this point in the history
Various fixes to loadgen and runner

Fixes #28.
Handles rename of `Treasury` -> `VaultFactory`
Handle older SDK versions which don't have a fee purse implementation
  • Loading branch information
mhofman authored Jan 9, 2022
2 parents 5bf764f + 01bf114 commit 88137bb
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 36 deletions.
13 changes: 11 additions & 2 deletions loadgen/contract/agent-create-vault.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@ export default async function startAgent([key, home]) {
runPurse,
bldPurse,
treasuryInstance,
vaultFactoryInstance,
} = await allValues({
runBrand: E(agoricNames).lookup('brand', issuerPetnames.RUN),
// bldBrand: E(agoricNames).lookup('brand', issuerPetnames.BLD),
bldBrand: E(E(wallet).getIssuer(issuerPetnames.BLD)).getBrand(),
runPurse: E(wallet).getPurse(pursePetnames.RUN),
bldPurse: E(wallet).getPurse(pursePetnames.BLD),
treasuryInstance: E(home.agoricNames).lookup('instance', 'Treasury'),
treasuryInstance: E(agoricNames)
.lookup('instance', 'Treasury')
.catch(() => {}),
vaultFactoryInstance: E(agoricNames)
.lookup('instance', 'VaultFactory')
.catch(() => {}),
});

const treasuryPublicFacet = E(zoe).getPublicFacet(treasuryInstance);
const treasuryPublicFacet = E(zoe).getPublicFacet(
vaultFactoryInstance || treasuryInstance,
);

const bldBalance = await E(bldPurse).getCurrentAmount();
if (AmountMath.isEmpty(bldBalance)) {
Expand Down Expand Up @@ -106,6 +114,7 @@ export default async function startAgent([key, home]) {
await Promise.all([
E(runPurse).deposit(runPayout),
E(bldPurse).deposit(bldPayout),
E(seatP).getOfferResult(),
]);
console.error(`create-vault: vault closed`);
}
Expand Down
5 changes: 4 additions & 1 deletion loadgen/contract/agent-tap-faucet.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export default async function startAgent([key, home, faucetBundle]) {
const seatP = E(zoe).offer(invitationP); // pipeline stall: bug #2846
const paymentP = E(seatP).getPayout('Token');
const payment = await paymentP;
await E(bobPurse).deposit(payment);
await Promise.all([
E(bobPurse).deposit(payment),
E(seatP).getOfferResult(),
]);
return E(bobPurse).getCurrentAmount();
},
});
Expand Down
41 changes: 27 additions & 14 deletions loadgen/contract/agent-trade-amm.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,32 @@ export default async function startAgent([key, home]) {
// const bldBrand = await E(bldPurse).getAllegedBrand();

{
const run = await E(runPurse).getCurrentAmount();
const thirdRunAmount = AmountMath.make(runBrand, run.value / 3n);

if (AmountMath.isEmpty(run)) {
throw Error(`no RUN, trade-amm cannot proceed`);
const feePurse = await E(faucet)
.getFeePurse()
.catch((err) => {
if (err.name !== 'TypeError') {
throw err;
} else {
return null;
}
});

if (feePurse) {
const run = await E(runPurse).getCurrentAmount();
const thirdRunAmount = AmountMath.make(runBrand, run.value / 3n);

if (AmountMath.isEmpty(run)) {
throw Error(`no RUN, trade-amm cannot proceed`);
}

// TODO: change to the appropriate amounts
// setup: transfer 33% of our initial RUN to the feePurse
console.error(
`trade-amm: depositing ${disp(thirdRunAmount)} into the fee purse`,
);
const feePayment = await E(runPurse).withdraw(thirdRunAmount);
await E(feePurse).deposit(feePayment);
}

// TODO: change to the appropriate amounts
// setup: transfer 33% of our initial RUN to the feePurse
console.error(
`trade-amm: depositing ${disp(thirdRunAmount)} into the fee purse`,
);
const feePurse = E(faucet).getFeePurse();
const feePayment = await E(runPurse).withdraw(thirdRunAmount);
await E(feePurse).deposit(feePayment);
}

const publicFacet = await E(zoe).getPublicFacet(amm || autoswap);
Expand Down Expand Up @@ -92,6 +103,7 @@ export default async function startAgent([key, home]) {
await Promise.all([
E(bldPurse).deposit(refundPayout),
E(runPurse).deposit(payout),
E(seatP).getOfferResult(),
]);
}

Expand All @@ -113,6 +125,7 @@ export default async function startAgent([key, home]) {
await Promise.all([
E(runPurse).deposit(refundPayout),
E(bldPurse).deposit(payout),
E(seatP).getOfferResult(),
]);
}

Expand Down
21 changes: 13 additions & 8 deletions loadgen/loop.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const tasks = {
faucet: [prepareFaucet],
};

const runners = {}; // name -> { cycle, stop?, limit }
const runners = {}; // name -> { cycle, stop?, limit, pending }
const status = {}; // name -> { active, succeeded, failed, next } // JSON-serializable

function logdata(data) {
Expand All @@ -63,11 +63,15 @@ function maybeStartOneCycle(name) {
const r = runners[name];
if (s.active >= r.limit) {
console.log(
`not starting ${name}, ${s.active} active reached dynamic limit ${r.limit}`,
`not starting ${name}, ${s.active} active reached limit ${r.limit}`,
);
return;
}
r.limit = Math.max(0, r.limit - 1);
if (!r.pending) {
console.log(`not starting ${name}, no pending (${s.active} active)`);
return;
}
r.pending -= 1;
const seq = s.next;
s.next += 1;
s.active += 1;
Expand Down Expand Up @@ -134,19 +138,20 @@ function updateConfig(config) {
if (r.stop) {
r.stop();
r.stop = undefined;
const { limit = 1 } = config[name] || { limit: 0 };
r.limit = Math.min(r.limit, limit);
}
const { limit = 1 } = config[name] || { limit: 0 };
r.limit = Math.max(0, Math.round(limit));
r.pending = Math.min(r.pending, r.limit);
}
for (const [name, data] of Object.entries(config)) {
if (!data) {
// eslint-disable-next-line no-continue
continue;
}
const { interval = 60, limit = 1, wait = 0 } = data;
const { interval = 60, wait = 0 } = data;
function bump() {
const r = runners[name];
r.limit = Math.min(r.limit + 1, limit);
r.pending = Math.min(r.pending + 1, r.limit);
maybeStartOneCycle(name);
}
function start() {
Expand Down Expand Up @@ -266,7 +271,7 @@ export default async function runCycles(homePromise, deployPowers) {
for (const [name, [prepare]] of Object.entries(tasks)) {
// eslint-disable-next-line no-await-in-loop
const cycle = await prepare(homePromise, deployPowers);
runners[name] = { cycle, limit: 0, stop: undefined };
runners[name] = { cycle, limit: 0, pending: 0, stop: undefined };
status[name] = { active: 0, succeeded: 0, failed: 0, next: 0 };
}
const { myAddressNameAdmin } = await homePromise;
Expand Down
39 changes: 39 additions & 0 deletions runner/lib/helpers/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,42 @@ export const makeFsHelper = ({ fs, fsStream, spawn, tmpDir }) => {

return harden({ dirDiskUsage, findByPrefix, makeFIFO });
};

/**
*
* @param {import("fs").ReadStream | import("fs").WriteStream} stream
* @returns {Promise<void>}
*/
export const fsStreamReady = (stream) =>
new Promise((resolve, reject) => {
if (stream.destroyed) {
reject(new Error('Stream already destroyed'));
return;
}

if (!stream.pending) {
resolve();
return;
}

stream.destroyed;

const onReady = () => {
cleanup(); // eslint-disable-line no-use-before-define
resolve();
};

/** @param {Error} err */
const onError = (err) => {
cleanup(); // eslint-disable-line no-use-before-define
reject(err);
};

const cleanup = () => {
stream.off('ready', onReady);
stream.off('error', onError);
};

stream.on('ready', onReady);
stream.on('error', onError);
});
13 changes: 8 additions & 5 deletions runner/lib/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
sequential,
} from './helpers/async.js';
import { childProcessDone } from './helpers/child-process.js';
import { makeFsHelper } from './helpers/fs.js';
import { fsStreamReady, makeFsHelper } from './helpers/fs.js';
import { makeProcfsHelper } from './helpers/procsfs.js';
import { makeOutputter } from './helpers/outputter.js';

Expand Down Expand Up @@ -286,17 +286,19 @@ const main = async (progName, rawArgs, powers) => {
getProcessInfo,
});

const outputStream = fsStream.createWriteStream(
joinPath(outputDir, 'perf.jsonl'),
);

const monitorInterval =
Number(argv.monitorInterval || defaultMonitorIntervalMinutes) * 60 * 1000;

let currentStage = -1;
let currentStageElapsedOffsetNs = 0;
const cpuTimeOffset = await getCPUTimeOffset();

const outputStream = fsStream.createWriteStream(
joinPath(outputDir, 'perf.jsonl'),
{ flags: 'wx' },
);
await fsStreamReady(outputStream);

/**
*
* @param {string} eventType
Expand Down Expand Up @@ -489,6 +491,7 @@ const main = async (progName, rawArgs, powers) => {
const slogOutputWriteStream = fsStream.createWriteStream(
joinPath(outputDir, `chain-stage-${currentStage}.slog.gz`),
);
await fsStreamReady(slogOutputWriteStream);
// const slogOutput = slogOutputWriteStream;
// const slogOutputPipeResult = finished(slogOutput);
const slogOutputPipeResult = pipeline(slogOutput, slogOutputWriteStream);
Expand Down
3 changes: 2 additions & 1 deletion runner/lib/tasks/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import fs from 'fs';

import { sleep } from '../helpers/async.js';
import { makeOutputter } from '../helpers/outputter.js';
import { fsStreamReady } from '../helpers/fs.js';

const protocolModules = {
'http:': http,
Expand Down Expand Up @@ -36,7 +37,7 @@ export const httpRequest = (urlOrString, options = {}) => {
statusCode: 200,
}),
));
resolve(res);
resolve(fsStreamReady(stream).then(() => res));
return;
}

Expand Down
9 changes: 7 additions & 2 deletions runner/lib/tasks/local-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
} from '../helpers/child-process.js';
import BufferLineTransform from '../helpers/buffer-line-transform.js';
import { PromiseAllOrErrors, tryTimeout } from '../helpers/async.js';
import { fsStreamReady } from '../helpers/fs.js';
import { whenStreamSteps } from '../helpers/stream-steps.js';
import {
getArgvMatcher,
Expand Down Expand Up @@ -92,6 +93,7 @@ export const makeTasks = ({
console.log('Starting chain');

const slogFifo = await makeFIFO('chain.slog');
const slogReady = fsStreamReady(slogFifo);
const slogLines = new BufferLineTransform();
const slogPipeResult = pipeline(slogFifo, slogLines);

Expand Down Expand Up @@ -140,6 +142,8 @@ export const makeTasks = ({
chainDone,
]).then(() => {});

const ready = PromiseAllOrErrors([firstBlock, slogReady]).then(() => {});

return tryTimeout(
timeout * 1000,
async () => {
Expand All @@ -160,14 +164,15 @@ export const makeTasks = ({
stopped = true;
process.kill(processInfo.pid);
if (slogFifo.pending) {
slogLines.end();
slogFifo.close();
}
};

return harden({
stop,
done,
ready: firstBlock,
ready,
slogLines: {
[Symbol.asyncIterator]: () => slogLines[Symbol.asyncIterator](),
},
Expand All @@ -177,7 +182,7 @@ export const makeTasks = ({
},
async () => {
// Avoid unhandled rejections for promises that can no longer be handled
Promise.allSettled([done, firstBlock]);
Promise.allSettled([done, ready]);
launcherCp.kill();
slogFifo.close();
},
Expand Down
10 changes: 8 additions & 2 deletions runner/lib/tasks/testnet.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
sleep,
aggregateTryFinally,
} from '../helpers/async.js';
import { fsStreamReady } from '../helpers/fs.js';
import { whenStreamSteps } from '../helpers/stream-steps.js';
import {
getArgvMatcher,
Expand Down Expand Up @@ -222,6 +223,7 @@ export const makeTasks = ({ spawn, fs, makeFIFO, getProcessInfo }) => {
console.log('Starting chain monitor');

const slogFifo = await makeFIFO('chain.slog');
const slogReady = fsStreamReady(slogFifo);
const slogLines = new BufferLineTransform();
const slogPipeResult = pipeline(slogFifo, slogLines);

Expand Down Expand Up @@ -271,7 +273,7 @@ export const makeTasks = ({ spawn, fs, makeFIFO, getProcessInfo }) => {
chainDone,
]).then(() => {});

const ready = firstBlock.then(async () => {
const ready = PromiseAllOrErrors([firstBlock, slogReady]).then(async () => {
let retries = 0;
while (!stopped) {
// Don't pipe output to console, it's too noisy
Expand Down Expand Up @@ -322,6 +324,10 @@ export const makeTasks = ({ spawn, fs, makeFIFO, getProcessInfo }) => {
const stop = () => {
stopped = true;
launcherCp.kill();
if (slogFifo.pending) {
slogLines.end();
slogFifo.close();
}
};

const processInfo = await getProcessInfo(
Expand All @@ -341,7 +347,7 @@ export const makeTasks = ({ spawn, fs, makeFIFO, getProcessInfo }) => {
},
async () => {
// Avoid unhandled rejections for promises that can no longer be handled
Promise.allSettled([done, firstBlock]);
Promise.allSettled([done, ready]);
launcherCp.kill();
slogFifo.close();
},
Expand Down
2 changes: 1 addition & 1 deletion start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mkdir -p ${AGORIC_BIN_DIR}
OUTPUT_DIR="${OUTPUT_DIR:-/tmp/agoric-sdk-out-${SDK_REVISION}}"
mkdir -p "${OUTPUT_DIR}"

export PATH=$AGORIC_BIN_DIR:$PATH
export PATH="$AGORIC_BIN_DIR:$PATH"

cd "$SDK_SRC"
if [ "x$SDK_BUILD" != "x0" ]; then
Expand Down

0 comments on commit 88137bb

Please sign in to comment.