Skip to content

Commit

Permalink
HACK: solo delivery: update tempfile for each helper invocation
Browse files Browse the repository at this point in the history
This is a horrible hack that should not land on trunk.

The solo-to-chain delivery pathway has a bug in which heavy-ish traffic (a
load generator which emits messages faster than once per block or two) causes
a queue to build up without bound, causing message delivery to fall further
and further behind. Each helper invocation sends a few more messages, but
does not send *all* the remaining messages. Given enough traffic, this can
lead to a queue that takes days to drain, when it could really be processed
in under a minute.

We need to overhaul this delivery pathway (#2855). This patch is a quick and
dirty hack to get a load-generator running faster than one cycle per 20s. It
doesn't drain the queue any faster, but each time it invokes the helper to
send a few messages, it calls back into the mailbox to copy *all* the current
messages out. The helper arguments (including the tempfile name) are already
fixed and recorded in the queue, but this hack just replaces the *contents*
of the tempfile just before invoking the helper.

We still have a queue that takes days to drain, but once the load generation
frontend is turned off, the actual new messages are delivered within a block
or two, and the remaining day of activity consists entirely of empty or
duplicate messages, which the chain then correctly ignores.
  • Loading branch information
warner committed Apr 16, 2021
1 parent fbcb273 commit 1c1d22a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
4 changes: 2 additions & 2 deletions packages/cosmic-swingset/lib/ag-solo/batched-deliver.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export function makeBatchedDeliver(
let latestAckNum = 0;
let deliverTimeout;

async function batchedDeliver(newMessages, ackNum) {
async function batchedDeliver(newMessages, ackNum, getAllMessages) {
// If we have no existing messages, reset the deliver timeout.
//
// This defers sending an ack until the timeout expires or we have new
Expand All @@ -20,7 +20,7 @@ export function makeBatchedDeliver(
// Transfer the batched messages to the deliver function.
const msgs = batchedMessages;
batchedMessages = [];
deliver(msgs, latestAckNum);
deliver(msgs, latestAckNum, getAllMessages);
}, batchTimeoutMs);
}

Expand Down
14 changes: 13 additions & 1 deletion packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export async function connectToChain(
stdin,
throwIfCancelled = () => undefined,
defaultIfCancelled = WAS_CANCELLED_EXCEPTION,
updateTempfile,
) {
return retryRpcAddr(async rpcAddr => {
await throwIfCancelled();
Expand All @@ -125,6 +126,9 @@ export async function connectToChain(
log.debug(HELPER, ...fullArgs);
let ret;
try {
if (updateTempfile) {
updateTempfile();
}
ret = await new Promise((resolve, reject) => {
const proc = execFile(
HELPER,
Expand Down Expand Up @@ -176,6 +180,7 @@ export async function connectToChain(
parseReturn,
stdin,
defaultIfCancelled,
updateTempfile,
) {
const queue = queued[name] || [];
queued[name] = queue;
Expand Down Expand Up @@ -226,6 +231,7 @@ export async function connectToChain(
}
},
defaultIfCancelled,
updateTempfile,
);
} finally {
// Remove us from the queue.
Expand Down Expand Up @@ -415,7 +421,7 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)}
recurseEachNewBlock();

let totalDeliveries = 0;
async function deliver(newMessages, acknum) {
async function deliver(newMessages, acknum, getAllMessages) {
let tmpInfo;
try {
totalDeliveries += 1;
Expand Down Expand Up @@ -464,6 +470,11 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)}
});
}

function updateTempfile() {
const data = JSON.stringify(getAllMessages());
fs.writeFileSync(tmpInfo.path, data);
}

const args = [
'tx',
'swingset',
Expand Down Expand Up @@ -502,6 +513,7 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)}
},
undefined,
{}, // defaultIfCancelled
updateTempfile,
);
return qret;
} finally {
Expand Down
9 changes: 8 additions & 1 deletion packages/cosmic-swingset/lib/ag-solo/outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ const log = anylogger('outbound');
*/
const knownTargets = new Map();

function getAllMessages(mbs, target) {
const { outbox, inboundAck } = mbs.exportToData()[target];
const messages = Array.from(outbox);
messages.sort((a, b) => a[0] - b[0]);
return [messages, inboundAck];
}

export function deliver(mbs) {
const data = mbs.exportToData();
log.debug(`deliver`, data);
Expand All @@ -42,7 +49,7 @@ export function deliver(mbs) {
log(
`invoking deliverator; ${newMessages.length} new messages for ${target}`,
);
t.deliverator(newMessages, acknum);
t.deliverator(newMessages, acknum, () => getAllMessages(mbs, target));
if (newMessages.length) {
[t.highestSent] = newMessages[newMessages.length - 1];
}
Expand Down

0 comments on commit 1c1d22a

Please sign in to comment.