From 1c1d22a3a3df52877a67ae966869506deaacf9a0 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 13 Apr 2021 08:45:13 -0700 Subject: [PATCH] HACK: solo delivery: update tempfile for each helper invocation This is a horrible hack that should not land on trunk. The solo-to-chain delivery pathway has a bug in which heavy-ish traffic (a load generator which emits messages faster than once per block or two) causes a queue to build up without bound, causing message delivery to fall further and further behind. Each helper invocation sends a few more messages, but does not send *all* the remaining messages. Given enough traffic, this can lead to a queue that takes days to drain, when it could really be processed in under a minute. We need to overhaul this delivery pathway (#2855). This patch is a quick and dirty hack to get a load-generator running faster than one cycle per 20s. It doesn't drain the queue any faster, but each time it invokes the helper to send a few messages, it calls back into the mailbox to copy *all* the current messages out. The helper arguments (including the tempfile name) are already fixed and recorded in the queue, but this hack just replaces the *contents* of the tempfile just before invoking the helper. We still have a queue that takes days to drain, but once the load generation frontend is turned off, the actual new messages are delivered within a block or two, and the remaining day of activity consists entirely of empty or duplicate messages, which the chain then correctly ignores. --- .../cosmic-swingset/lib/ag-solo/batched-deliver.js | 4 ++-- .../lib/ag-solo/chain-cosmos-sdk.js | 14 +++++++++++++- packages/cosmic-swingset/lib/ag-solo/outbound.js | 9 ++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/packages/cosmic-swingset/lib/ag-solo/batched-deliver.js b/packages/cosmic-swingset/lib/ag-solo/batched-deliver.js index c18dff3d25c..f7fe162ab33 100644 --- a/packages/cosmic-swingset/lib/ag-solo/batched-deliver.js +++ b/packages/cosmic-swingset/lib/ag-solo/batched-deliver.js @@ -9,7 +9,7 @@ export function makeBatchedDeliver( let latestAckNum = 0; let deliverTimeout; - async function batchedDeliver(newMessages, ackNum) { + async function batchedDeliver(newMessages, ackNum, getAllMessages) { // If we have no existing messages, reset the deliver timeout. // // This defers sending an ack until the timeout expires or we have new @@ -20,7 +20,7 @@ export function makeBatchedDeliver( // Transfer the batched messages to the deliver function. const msgs = batchedMessages; batchedMessages = []; - deliver(msgs, latestAckNum); + deliver(msgs, latestAckNum, getAllMessages); }, batchTimeoutMs); } diff --git a/packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js b/packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js index b01ecb1620a..ad292e0d251 100644 --- a/packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js +++ b/packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js @@ -112,6 +112,7 @@ export async function connectToChain( stdin, throwIfCancelled = () => undefined, defaultIfCancelled = WAS_CANCELLED_EXCEPTION, + updateTempfile, ) { return retryRpcAddr(async rpcAddr => { await throwIfCancelled(); @@ -125,6 +126,9 @@ export async function connectToChain( log.debug(HELPER, ...fullArgs); let ret; try { + if (updateTempfile) { + updateTempfile(); + } ret = await new Promise((resolve, reject) => { const proc = execFile( HELPER, @@ -176,6 +180,7 @@ export async function connectToChain( parseReturn, stdin, defaultIfCancelled, + updateTempfile, ) { const queue = queued[name] || []; queued[name] = queue; @@ -226,6 +231,7 @@ export async function connectToChain( } }, defaultIfCancelled, + updateTempfile, ); } finally { // Remove us from the queue. @@ -415,7 +421,7 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)} recurseEachNewBlock(); let totalDeliveries = 0; - async function deliver(newMessages, acknum) { + async function deliver(newMessages, acknum, getAllMessages) { let tmpInfo; try { totalDeliveries += 1; @@ -464,6 +470,11 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)} }); } + function updateTempfile() { + const data = JSON.stringify(getAllMessages()); + fs.writeFileSync(tmpInfo.path, data); + } + const args = [ 'tx', 'swingset', @@ -502,6 +513,7 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseEgress(myAddr)} }, undefined, {}, // defaultIfCancelled + updateTempfile, ); return qret; } finally { diff --git a/packages/cosmic-swingset/lib/ag-solo/outbound.js b/packages/cosmic-swingset/lib/ag-solo/outbound.js index fe6f1ecbc09..c485ff398b1 100644 --- a/packages/cosmic-swingset/lib/ag-solo/outbound.js +++ b/packages/cosmic-swingset/lib/ag-solo/outbound.js @@ -17,6 +17,13 @@ const log = anylogger('outbound'); */ const knownTargets = new Map(); +function getAllMessages(mbs, target) { + const { outbox, inboundAck } = mbs.exportToData()[target]; + const messages = Array.from(outbox); + messages.sort((a, b) => a[0] - b[0]); + return [messages, inboundAck]; +} + export function deliver(mbs) { const data = mbs.exportToData(); log.debug(`deliver`, data); @@ -42,7 +49,7 @@ export function deliver(mbs) { log( `invoking deliverator; ${newMessages.length} new messages for ${target}`, ); - t.deliverator(newMessages, acknum); + t.deliverator(newMessages, acknum, () => getAllMessages(mbs, target)); if (newMessages.length) { [t.highestSent] = newMessages[newMessages.length - 1]; }