From ed4743519e81dbb05b6136de1f94bae0ae0f87c8 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Thu, 2 Mar 2023 22:11:58 +0000 Subject: [PATCH 01/12] feat(cosmic-swingset): Add context info to actionQueue items Use context to generate inboundNum --- go.mod | 2 +- go.sum | 4 +-- golang/cosmos/x/swingset/keeper/keeper.go | 32 +++++++++++++++++++- golang/cosmos/x/vbank/vbank.go | 8 +++++ packages/cosmic-swingset/src/launch-chain.js | 12 +++----- packages/cosmic-swingset/src/sim-chain.js | 17 ++++++----- 6 files changed, 57 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 4a44373e0e9..fb583a93081 100644 --- a/go.mod +++ b/go.mod @@ -145,7 +145,7 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.33.2 replace github.com/tendermint/tendermint => github.com/agoric-labs/tendermint v0.34.23-alpha.agoric.3 // We need a fork of cosmos-sdk until all of the differences are merged. -replace github.com/cosmos/cosmos-sdk => github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1 +replace github.com/cosmos/cosmos-sdk => github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1.0.20230320225042-2109765fd835 replace github.com/cosmos/gaia/v7 => github.com/Agoric/ag0/v7 v7.0.2-alpha.agoric.1 diff --git a/go.sum b/go.sum index ba52d612c0e..97620c6dc68 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,8 @@ github.com/Zilliqa/gozilliqa-sdk v1.2.1-0.20201201074141-dd0ecada1be6/go.mod h1: github.com/adlio/schema v1.3.3 h1:oBJn8I02PyTB466pZO1UZEn1TV5XLlifBSyMrmHl/1I= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= -github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1 h1:3GHpCatyGBaZDoSr9k6Rd5/5QnNghkzxgLfkDsPDBPk= -github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1/go.mod h1:fdXvzy+wmYB+W+N139yb0+szbT7zAGgUjmxm5DBrjto= +github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1.0.20230320225042-2109765fd835 h1:Mmw52cHAUNwtaNXpk7b3lTeoCRd5Vw9Fdrly5ABIxCA= +github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1.0.20230320225042-2109765fd835/go.mod h1:fdXvzy+wmYB+W+N139yb0+szbT7zAGgUjmxm5DBrjto= github.com/agoric-labs/cosmos-sdk/ics23/go v0.8.0-alpha.agoric.1 h1:2jvHI/2d+psWAZy6FQ0vXJCHUtfU3ZbbW+pQFL04arQ= github.com/agoric-labs/cosmos-sdk/ics23/go v0.8.0-alpha.agoric.1/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg= github.com/agoric-labs/tendermint v0.34.23-alpha.agoric.3 h1:aq6F1r3RQkKUYNeMNjRxgGn3dayvKnDK/R6gQF0WoFs= diff --git a/golang/cosmos/x/swingset/keeper/keeper.go b/golang/cosmos/x/swingset/keeper/keeper.go index 53470ecbed9..7ddd93fc0ad 100644 --- a/golang/cosmos/x/swingset/keeper/keeper.go +++ b/golang/cosmos/x/swingset/keeper/keeper.go @@ -4,11 +4,13 @@ import ( "encoding/json" "errors" "fmt" + stdlog "log" "math" "strconv" "github.com/tendermint/tendermint/libs/log" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper" @@ -36,6 +38,25 @@ const MaxUint53 = 9007199254740991 // Number.MAX_SAFE_INTEGER = 2**53 - 1 const stateKey string = "state" +// Contextual information about the message source of an action on the queue. +// This context should be unique per actionQueueRecord. +type actionContext struct { + // The block height in which the corresponding action was enqueued + BlockHeight int64 `json:"blockHeight"` + // The hash of the cosmos transaction that included the message + // If the action didn't result from a transaction message, a substitute value + // may be used. For example the VBANK_BALANCE_UPDATE actions use `x/vbank`. + TxHash string `json:"txHash"` + // The index of the message within the transaction. If the action didn't + // result from a cosmos transaction, a number should be chosen to make the + // actionContext unique. (for example a counter per block and source module). + MsgIdx int `json:"msgIdx"` +} +type actionQueueRecord struct { + Action vm.Jsonable `json:"action"` + Context actionContext `json:"context"` +} + // Keeper maintains the link to data vstorage and exposes getter/setter methods for the various parts of the state machine type Keeper struct { storeKey sdk.StoreKey @@ -87,7 +108,16 @@ func NewKeeper( // The actionQueue's format is documented by `makeChainQueue` in // `packages/cosmic-swingset/src/make-queue.js`. func (k Keeper) PushAction(ctx sdk.Context, action vm.Jsonable) error { - bz, err := json.Marshal(action) + txHash, txHashOk := ctx.Context().Value(baseapp.TxHashContextKey).(string) + if !txHashOk { + txHash = "unknown" + } + msgIdx, msgIdxOk := ctx.Context().Value(baseapp.TxMsgIdxContextKey).(int) + if !txHashOk || !msgIdxOk { + stdlog.Printf("error while extracting context for action %q\n", action) + } + record := actionQueueRecord{Action: action, Context: actionContext{BlockHeight: ctx.BlockHeight(), TxHash: txHash, MsgIdx: msgIdx}} + bz, err := json.Marshal(record) if err != nil { return err } diff --git a/golang/cosmos/x/vbank/vbank.go b/golang/cosmos/x/vbank/vbank.go index 08660902873..75ea273e11f 100644 --- a/golang/cosmos/x/vbank/vbank.go +++ b/golang/cosmos/x/vbank/vbank.go @@ -1,12 +1,14 @@ package vbank import ( + "context" "encoding/json" "fmt" stdlog "log" "sort" "github.com/Agoric/agoric-sdk/golang/cosmos/vm" + "github.com/cosmos/cosmos-sdk/baseapp" sdk "github.com/cosmos/cosmos-sdk/types" ) @@ -241,5 +243,11 @@ func (ch portHandler) Receive(ctx *vm.ControllerContext, str string) (ret string } func (am AppModule) PushAction(ctx sdk.Context, action vm.Jsonable) error { + // vbank actions are not triggered by a swingset message in a transaction, so we need to + // synthesize unique context information. + // We use a fixed placeholder value for the txHash context, and can simply use `0` for the + // message index as there is only one such action per block. + ctx = ctx.WithContext(context.WithValue(ctx.Context(), baseapp.TxHashContextKey, "x/vbank")) + ctx = ctx.WithContext(context.WithValue(ctx.Context(), baseapp.TxMsgIdxContextKey, 0)) return am.keeper.PushAction(ctx, action) } diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index b7c7cc53eec..ee0ca486949 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -257,7 +257,7 @@ export async function launch({ commit: () => {}, // disable abort: () => {}, // disable }); - /** @type {ReturnType>} */ + /** @type {ReturnType>} */ const inboundQueue = makeQueue(inboundQueueStorage); // Not to be confused with the gas model, this meter is for OpenTelemetry. @@ -518,7 +518,8 @@ export async function launch({ // first the old actions followed by the newActions, running the // kernel to completion after each. if (keepGoing) { - for (const { action, inboundNum } of inboundQueue.consumeAll()) { + for (const { action, context } of inboundQueue.consumeAll()) { + const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`; inboundQueueMetrics.decStat(); // eslint-disable-next-line no-await-in-loop await performAction(action, inboundNum); @@ -546,11 +547,8 @@ export async function launch({ // First, push all newActions onto the end of the inboundQueue, // remembering that inboundQueue might still have work from the // previous block - let actionNum = 0; - for (const action of newActions) { - const inboundNum = `${blockHeight}-${actionNum}`; - inboundQueue.push({ action, inboundNum }); - actionNum += 1; + for (const actionRecord of newActions) { + inboundQueue.push(actionRecord); inboundQueueMetrics.incStat(); } diff --git a/packages/cosmic-swingset/src/sim-chain.js b/packages/cosmic-swingset/src/sim-chain.js index b817feb5c9a..f71af53ff04 100644 --- a/packages/cosmic-swingset/src/sim-chain.js +++ b/packages/cosmic-swingset/src/sim-chain.js @@ -149,14 +149,17 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { // Gather up the new messages into the latest block. const thisBlock = intoChain.splice(0, queueAllowed[QueueInbound]); - for (const [newMessages, acknum] of thisBlock) { + for (const [i, [newMessages, acknum]] of thisBlock.entries()) { aqContents.push({ - type: 'DELIVER_INBOUND', - peer: bootAddress, - messages: newMessages, - ack: acknum, - blockHeight, - blockTime, + action: { + type: 'DELIVER_INBOUND', + peer: bootAddress, + messages: newMessages, + ack: acknum, + blockHeight, + blockTime, + }, + context: { blockHeight, txHash: 'simTx', msgIdx: i }, }); } const endAction = { type: 'END_BLOCK', blockHeight, blockTime }; From 804502e6049168fc237e5661d9e834710b158cb4 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Fri, 3 Mar 2023 03:21:25 +0000 Subject: [PATCH 02/12] fix(cosmic-swingset): type of makeInstallationPublisher --- packages/cosmic-swingset/src/launch-chain.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index ee0ca486949..0aa5263f772 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -217,7 +217,7 @@ export async function launch({ replayChainSends, setActivityhash, bridgeOutbound, - makeInstallationPublisher = undefined, + makeInstallationPublisher, vatconfig, argv, env = process.env, @@ -413,7 +413,6 @@ export async function launch({ installationPublisher === undefined && makeInstallationPublisher !== undefined ) { - // @ts-expect-error not really undefined. TODO give provideInstallationPublisher a signature. installationPublisher = makeInstallationPublisher(); } } From 0d5aea77e9ceb8649da298237035701d18f35bd0 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Wed, 17 Aug 2022 14:51:37 +0000 Subject: [PATCH 03/12] fix(cosmic-swingset): Remove unused getKeys in bufferedStorage getKeys -> getNextKey throwing stub --- .../cosmic-swingset/src/bufferedStorage.js | 144 +----------------- 1 file changed, 8 insertions(+), 136 deletions(-) diff --git a/packages/cosmic-swingset/src/bufferedStorage.js b/packages/cosmic-swingset/src/bufferedStorage.js index e6371d3f643..3945af26f81 100644 --- a/packages/cosmic-swingset/src/bufferedStorage.js +++ b/packages/cosmic-swingset/src/bufferedStorage.js @@ -1,10 +1,10 @@ -import { assert, details as X, Fail } from '@agoric/assert'; +import { assert, Fail } from '@agoric/assert'; // XXX Do these "StorageAPI" functions belong in their own package? /** * Assert function to ensure that an object implements the StorageAPI - * interface: methods { has, getKeys, get, set, delete } + * interface: methods { has, getNextKey, get, set, delete } * (cf. packages/SwingSet/docs/state.md#transactions). * * @param {*} kvStore The object to be tested @@ -15,94 +15,11 @@ import { assert, details as X, Fail } from '@agoric/assert'; * @returns {void} */ export function insistStorageAPI(kvStore) { - for (const n of ['has', 'getKeys', 'get', 'set', 'delete']) { + for (const n of ['has', 'getNextKey', 'get', 'set', 'delete']) { n in kvStore || Fail`kvStore.${n} is missing, cannot use`; } } -/** - * Given two iterators over sequences of unique strings sorted in ascending - * order lexicographically by UTF-16 code unit, produce a new iterator that will - * output the ascending sequence of unique strings from their merged output. - * - * @param {Iterator} it1 - * @param {Iterator} it2 - * - * @yields any - */ -function* mergeUtf16SortedIterators(it1, it2) { - /** @type {IteratorResult | null} */ - let v1 = null; - /** @type {IteratorResult | null} */ - let v2 = null; - /** @type {IteratorResult | null} */ - let vrest = null; - /** @type {Iterator | null} */ - let itrest = null; - - try { - v1 = it1.next(); - v2 = it2.next(); - while (!v1.done && !v2.done) { - if (v1.value < v2.value) { - const result = v1.value; - v1 = it1.next(); - yield result; - } else if (v1.value > v2.value) { - const result = v2.value; - v2 = it2.next(); - yield result; - } else { - // Each iterator produced the same value; consume it from both. - const result = v1.value; - v1 = it1.next(); - v2 = it2.next(); - yield result; - } - } - - itrest = v1.done ? it2 : it1; - vrest = v1.done ? v2 : v1; - v1 = null; - v2 = null; - - while (!vrest.done) { - const result = vrest.value; - vrest = itrest.next(); - yield result; - } - } finally { - const errors = []; - try { - if (vrest && !vrest.done && itrest && itrest.return) { - itrest.return(); - } - } catch (e) { - errors.push(e); - } - try { - if (v1 && !v1.done && it1.return) { - it1.return(); - } - } catch (e) { - errors.push(e); - } - try { - if (v2 && !v2.done && it2.return) { - it2.return(); - } - } catch (e) { - errors.push(e); - } - if (errors.length) { - const err = assert.error(X`Merged iterator failed to close cleanly`); - for (const e of errors) { - assert.note(err, X`Caused by ${e}`); - } - } - } -} - /** * Create a StorageAPI object that buffers writes to a wrapped StorageAPI object * until told to commit (or abort) them. @@ -159,55 +76,11 @@ export function makeBufferedStorage(kvStore, listeners = {}) { }, /** - * Generator function that returns an iterator over all the keys within a - * given range, in lexicographical order by UTF-16 code unit. - * - * Warning: this function introduces consistency risks that callers must - * take into account. Results do not include include keys added during - * iteration. This layer of abstraction lacks the knowledge necessary to - * protect callers, as the nature of the risks varies depending on what a - * caller is trying to do. This API should not be made available to user - * vat code. Rather, it is intended as a low-level mechanism to use in - * implementating higher level storage abstractions that are expected to - * provide their own consistency protections as appropriate to their own - * circumstances. - * - * @param {string} start Start of the key range of interest (inclusive). An empty - * string indicates a range from the beginning of the key set. - * @param {string} end End of the key range of interest (exclusive). An empty string - * indicates a range through the end of the key set. - * - * @yields {string} an iterator for the keys from start <= key < end - * - * @throws if either parameter is not a string. + * @param {string} previousKey */ - *getKeys(start, end) { - assert.typeof(start, 'string'); - assert.typeof(end, 'string'); - - // Merge keys reported by the backing store and a snapshot of in-range - // additions into a single duplicate-free iterator. - const added = []; - for (const k of additions.keys()) { - if ((start === '' || start <= k) && (end === '' || k < end)) { - added.push(k); - } - } - // Note that this implicitly compares keys lexicographically by UTF-16 - // code unit (e.g., "\u{1D306}" _precedes_ "\u{FFFD}"). - // If kvStore.getKeys() results are not in ascending order subject to the - // same comparison, then output may include duplicates. - added.sort(); - const merged = mergeUtf16SortedIterators( - added.values(), - kvStore.getKeys(start, end), - ); - - for (const k of merged) { - if (!deletions.has(k)) { - yield k; - } - } + getNextKey(previousKey) { + assert.typeof(previousKey, 'string'); + throw new Error('not implemented'); }, }; function commit() { @@ -264,8 +137,7 @@ export const makeReadCachingStorage = getterSetter => { // Deletion in chain storage manifests as set-to-undefined. storage.set(key, undefined); }, - // eslint-disable-next-line require-yield - *getKeys(_start, _end) { + getNextKey(_previousKey) { throw new Error('not implemented'); }, }); From c8f362326053fa9e3f38d300c336918f101a4db0 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Fri, 3 Mar 2023 03:16:46 +0000 Subject: [PATCH 04/12] fix(cosmic-swingset): correct typings for bufferedStorage --- .../cosmic-swingset/src/bufferedStorage.js | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/packages/cosmic-swingset/src/bufferedStorage.js b/packages/cosmic-swingset/src/bufferedStorage.js index 3945af26f81..d25f74e2b2e 100644 --- a/packages/cosmic-swingset/src/bufferedStorage.js +++ b/packages/cosmic-swingset/src/bufferedStorage.js @@ -1,7 +1,20 @@ +// @ts-check + import { assert, Fail } from '@agoric/assert'; // XXX Do these "StorageAPI" functions belong in their own package? +/** + * @template {unknown} [T=unknown] + * @typedef {{ + * has: (key: string) => boolean, + * get: (key: string) => T | undefined, + * getNextKey: (previousKey: string) => string | undefined, + * set: (key: string, value: T ) => void, + * delete: (key: string) => void, + * }} KVStore + */ + /** * Assert function to ensure that an object implements the StorageAPI * interface: methods { has, getNextKey, get, set, delete } @@ -24,16 +37,17 @@ export function insistStorageAPI(kvStore) { * Create a StorageAPI object that buffers writes to a wrapped StorageAPI object * until told to commit (or abort) them. * - * @param {import('@agoric/swing-store').KVStore} kvStore The StorageAPI object to wrap + * @template {unknown} [T=unknown] + * @param {KVStore} kvStore The StorageAPI object to wrap * @param {{ - * onGet?: (key: string, value: string) => void, // a callback invoked after getting a value from kvStore - * onPendingSet?: (key: string, value: string) => void, // a callback invoked after a new uncommitted value is set + * onGet?: (key: string, value: T) => void, // a callback invoked after getting a value from kvStore + * onPendingSet?: (key: string, value: T) => void, // a callback invoked after a new uncommitted value is set * onPendingDelete?: (key: string) => void, // a callback invoked after a new uncommitted delete * onCommit?: () => void, // a callback invoked after pending operations have been committed * onAbort?: () => void, // a callback invoked after pending operations have been aborted * }} listeners Optional callbacks to be invoked when respective events occur * - * @returns {{kvStore: import('@agoric/swing-store').KVStore, commit: () => void, abort: () => void}} + * @returns {{kvStore: KVStore, commit: () => void, abort: () => void}} */ export function makeBufferedStorage(kvStore, listeners = {}) { insistStorageAPI(kvStore); @@ -57,8 +71,7 @@ export function makeBufferedStorage(kvStore, listeners = {}) { if (additions.has(key)) return additions.get(key); if (deletions.has(key)) return undefined; const value = kvStore.get(key); - // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error -- https://github.com/Agoric/agoric-sdk/issues/4620 - // @ts-ignore value may be undefined + // @ts-expect-error value may be undefined if (onGet !== undefined) onGet(key, value); return value; }, @@ -103,7 +116,7 @@ export function makeBufferedStorage(kvStore, listeners = {}) { } /** - * @param {{ get(key: string) => unknown, set(key: string, value: unknown): void }} getterSetter + * @param {{ get(key: string): unknown, set(key: string, value: unknown): void }} getterSetter */ export const makeReadCachingStorage = getterSetter => { // In addition to the wrapping write buffer, keep a simple cache of From 2d886dee4b6518f176fc844d560422fe0fb53ffd Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Fri, 3 Mar 2023 03:15:26 +0000 Subject: [PATCH 05/12] feat(cosmic-swingset): More complete vstorage access Actual has & delete --- .../cosmic-swingset/src/bufferedStorage.js | 41 ++++++++++++++----- packages/cosmic-swingset/src/chain-main.js | 34 ++++++++++++++- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/packages/cosmic-swingset/src/bufferedStorage.js b/packages/cosmic-swingset/src/bufferedStorage.js index d25f74e2b2e..0e85216c15e 100644 --- a/packages/cosmic-swingset/src/bufferedStorage.js +++ b/packages/cosmic-swingset/src/bufferedStorage.js @@ -116,9 +116,10 @@ export function makeBufferedStorage(kvStore, listeners = {}) { } /** - * @param {{ get(key: string): unknown, set(key: string, value: unknown): void }} getterSetter + * @template {unknown} [T=unknown] + * @param {KVStore} kvStore */ -export const makeReadCachingStorage = getterSetter => { +export const makeReadCachingStorage = kvStore => { // In addition to the wrapping write buffer, keep a simple cache of // read values for has and get. let cache; @@ -127,28 +128,48 @@ export const makeReadCachingStorage = getterSetter => { } resetCache(); + const deleted = Symbol('deleted'); + const undef = Symbol('undefined'); + + /** @type {KVStore} */ const storage = harden({ has(key) { - return storage.get(key) !== undefined; + const value = cache.get(key); + if (value !== undefined) { + return value !== deleted; + } else { + const ret = kvStore.has(key); + if (!ret) { + cache.set(key, deleted); + } + return ret; + } }, get(key) { - if (cache.has(key)) return cache.get(key); + let value = cache.get(key); + if (value !== undefined) { + return value === deleted || value === undef ? undefined : value; + } // Fetch the value and cache it until the next commit or abort. - const value = getterSetter.get(key); - cache.set(key, value); + value = kvStore.get(key); + cache.set(key, value === undefined ? undef : value); return value; }, set(key, value) { // Set the value and cache it until the next commit or abort (which is // expected immediately, since the buffered wrapper only calls set // *during* a commit). - cache.set(key, value); - getterSetter.set(key, value); + // `undefined` is a valid value technically different than deletion, + // depending on how the underlying store does its serialization + cache.set(key, value === undefined ? undef : value); + kvStore.set(key, value); }, delete(key) { - // Deletion in chain storage manifests as set-to-undefined. - storage.set(key, undefined); + // Delete the value and cache the deletion until next commit or abort. + // Deletion results in `undefined` on `get`, but `false` on `has` + cache.set(key, deleted); + kvStore.delete(key); }, getNextKey(_previousKey) { throw new Error('not implemented'); diff --git a/packages/cosmic-swingset/src/chain-main.js b/packages/cosmic-swingset/src/chain-main.js index 200f2ac6bae..8d2ed5bde5d 100644 --- a/packages/cosmic-swingset/src/chain-main.js +++ b/packages/cosmic-swingset/src/chain-main.js @@ -1,3 +1,5 @@ +// @ts-check + import { resolve as pathResolve } from 'path'; import v8 from 'node:v8'; import process from 'node:process'; @@ -41,6 +43,15 @@ const toNumber = specimen => { return number; }; +/** + * @template {unknown} [T=unknown] + * @param {(req: string) => string} call + * @param {string} prefix + * @param {"set" | "legacySet" | "setWithoutNotify"} setterMethod + * @param {(value: unknown) => T} fromBridgeValue + * @param {(value: T) => unknown} toBridgeValue + * @returns {import("./bufferedStorage.js").KVStore} + */ const makePrefixedBridgeStorage = ( call, prefix, @@ -51,6 +62,13 @@ const makePrefixedBridgeStorage = ( prefix.endsWith('.') || Fail`prefix ${prefix} must end with a dot`; return harden({ + has: key => { + const retStr = call( + stringify({ method: 'has', args: [`${prefix}${key}`] }), + ); + const ret = JSON.parse(retStr); + return ret; + }, get: key => { const retStr = call( stringify({ method: 'get', args: [`${prefix}${key}`] }), @@ -64,8 +82,17 @@ const makePrefixedBridgeStorage = ( }, set: (key, value) => { const path = `${prefix}${key}`; - const entry = - value == null ? [path] : [path, stringify(toBridgeValue(value))]; + const entry = [path, stringify(toBridgeValue(value))]; + call( + stringify({ + method: setterMethod, + args: [entry], + }), + ); + }, + delete: key => { + const path = `${prefix}${key}`; + const entry = [path]; call( stringify({ method: setterMethod, @@ -73,6 +100,9 @@ const makePrefixedBridgeStorage = ( }), ); }, + getNextKey(_previousKey) { + throw new Error('not implemented'); + }, }); }; From 2175fb43ca978788a06aa11eebfec3178f0ecbcd Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Fri, 3 Mar 2023 03:23:25 +0000 Subject: [PATCH 06/12] refactor(cosmic-swingset): makeQueue does own serialization --- packages/cosmic-swingset/src/chain-main.js | 43 ++++++++++++-------- packages/cosmic-swingset/src/launch-chain.js | 8 ++-- packages/cosmic-swingset/src/make-queue.js | 20 ++++----- 3 files changed, 40 insertions(+), 31 deletions(-) diff --git a/packages/cosmic-swingset/src/chain-main.js b/packages/cosmic-swingset/src/chain-main.js index 8d2ed5bde5d..6133f814061 100644 --- a/packages/cosmic-swingset/src/chain-main.js +++ b/packages/cosmic-swingset/src/chain-main.js @@ -23,7 +23,10 @@ import { makePublishKit, pipeTopicToStorage } from '@agoric/notifier'; import * as STORAGE_PATH from '@agoric/internal/src/chain-storage-paths.js'; import { BridgeId as BRIDGE_ID } from '@agoric/internal'; -import { makeReadCachingStorage } from './bufferedStorage.js'; +import { + makeBufferedStorage, + makeReadCachingStorage, +} from './bufferedStorage.js'; import stringify from './json-stable-stringify.js'; import { launch } from './launch-chain.js'; import { getTelemetryProviders } from './kernel-stats.js'; @@ -48,16 +51,16 @@ const toNumber = specimen => { * @param {(req: string) => string} call * @param {string} prefix * @param {"set" | "legacySet" | "setWithoutNotify"} setterMethod - * @param {(value: unknown) => T} fromBridgeValue - * @param {(value: T) => unknown} toBridgeValue + * @param {(value: string) => T} fromBridgeStringValue + * @param {(value: T) => string} toBridgeStringValue * @returns {import("./bufferedStorage.js").KVStore} */ const makePrefixedBridgeStorage = ( call, prefix, setterMethod, - fromBridgeValue = x => x, - toBridgeValue = x => x, + fromBridgeStringValue = x => JSON.parse(x), + toBridgeStringValue = x => stringify(x), ) => { prefix.endsWith('.') || Fail`prefix ${prefix} must end with a dot`; @@ -77,12 +80,11 @@ const makePrefixedBridgeStorage = ( if (ret == null) { return undefined; } - const bridgeValue = JSON.parse(ret); - return fromBridgeValue(bridgeValue); + return fromBridgeStringValue(ret); }, set: (key, value) => { const path = `${prefix}${key}`; - const entry = [path, stringify(toBridgeValue(value))]; + const entry = [path, toBridgeStringValue(value)]; call( stringify({ method: setterMethod, @@ -275,19 +277,26 @@ export default async function main(progname, args, { env, homedir, agcc }) { sendToChain, `${STORAGE_PATH.MAILBOX}.`, 'legacySet', - fromBridgeMailbox, - exportMailbox, + val => fromBridgeMailbox(JSON.parse(val)), + val => stringify(exportMailbox(val)), ), ); - const actionQueue = makeQueue( - makeReadCachingStorage( - makePrefixedBridgeStorage( - sendToChain, - `${STORAGE_PATH.ACTION_QUEUE}.`, - 'setWithoutNotify', - ), + const actionQueueStorage = makeBufferedStorage( + makePrefixedBridgeStorage( + sendToChain, + `${STORAGE_PATH.ACTION_QUEUE}.`, + 'setWithoutNotify', + x => x, + x => x, ), ); + const actionQueue = makeQueue( + harden({ + ...actionQueueStorage.kvStore, + commit: actionQueueStorage.commit, + abort: actionQueueStorage.abort, + }), + ); function setActivityhash(activityhash) { const entry = [STORAGE_PATH.ACTIVITYHASH, activityhash]; const msg = stringify({ diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index 0aa5263f772..c6da2214887 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -244,14 +244,14 @@ export async function launch({ // and disables commit/abort. const inboundQueuePrefix = getHostKey('inboundQueue.'); + /** @type {import("./make-queue.js").QueueStorage} */ const inboundQueueStorage = harden({ get: key => { - const val = kvStore.get(inboundQueuePrefix + key); - return val ? JSON.parse(val) : undefined; + return kvStore.get(inboundQueuePrefix + key); }, set: (key, value) => { - value !== undefined || Fail`value in inboundQueue must be defined`; - kvStore.set(inboundQueuePrefix + key, JSON.stringify(value)); + typeof value === 'string' || Fail`value in inboundQueue must be a string`; + kvStore.set(inboundQueuePrefix + key, value); }, delete: key => kvStore.delete(inboundQueuePrefix + key), commit: () => {}, // disable diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index 2a1a15a6d43..a67b919c391 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -4,8 +4,8 @@ * @typedef {object} QueueStorage * @property {() => void} commit * @property {() => void} abort - * @property {(key: string) => unknown} get - * @property {(key: string, value: unknown) => void} set + * @property {(key: string) => string | undefined} get + * @property {(key: string, value: string) => void} set * @property {(key: string) => void} delete */ @@ -29,10 +29,8 @@ * @param {QueueStorage} storage a scoped queue storage */ export const makeQueue = storage => { - const getHead = () => - /** @type {number | undefined} */ (storage.get('head')) || 0; - const getTail = () => - /** @type {number | undefined} */ (storage.get('tail')) || 0; + const getHead = () => Number.parseInt(storage.get('head') || '0', 10); + const getTail = () => Number.parseInt(storage.get('tail') || '0', 10); const queue = { size: () => { @@ -41,8 +39,8 @@ export const makeQueue = storage => { /** @param {T} obj */ push: obj => { const tail = getTail(); - storage.set('tail', tail + 1); - storage.set(`${tail}`, obj); + storage.set('tail', String(tail + 1)); + storage.set(`${tail}`, JSON.stringify(obj)); storage.commit(); }, /** @returns {Iterable} */ @@ -57,7 +55,9 @@ export const makeQueue = storage => { if (head < tail) { // Still within the queue. const headKey = `${head}`; - const value = /** @type {T} */ (storage.get(headKey)); + const value = JSON.parse( + /** @type {string} */ (storage.get(headKey)), + ); storage.delete(headKey); head += 1; return { value, done }; @@ -73,7 +73,7 @@ export const makeQueue = storage => { return: () => { if (!done) { // We're done consuming, so save our state. - storage.set('head', head); + storage.set('head', String(head)); storage.commit(); done = true; } From 40a8abb5aeac9237613fd50e4bbacb33f8057ed9 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Wed, 17 Aug 2022 14:51:37 +0000 Subject: [PATCH 07/12] fix(cosmic-swingset): Use BigInt for chainQueue bounds --- golang/cosmos/x/swingset/keeper/keeper.go | 45 ++++++++++++++-------- packages/cosmic-swingset/src/make-queue.js | 10 ++--- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/golang/cosmos/x/swingset/keeper/keeper.go b/golang/cosmos/x/swingset/keeper/keeper.go index 7ddd93fc0ad..71a144514a8 100644 --- a/golang/cosmos/x/swingset/keeper/keeper.go +++ b/golang/cosmos/x/swingset/keeper/keeper.go @@ -6,7 +6,7 @@ import ( "fmt" stdlog "log" "math" - "strconv" + "math/big" "github.com/tendermint/tendermint/libs/log" @@ -34,7 +34,8 @@ const ( StoragePathBundles = "bundles" ) -const MaxUint53 = 9007199254740991 // Number.MAX_SAFE_INTEGER = 2**53 - 1 +// 2 ** 256 - 1 +var MaxSDKInt = sdk.NewIntFromBigInt(new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil), big.NewInt(1))) const stateKey string = "state" @@ -123,34 +124,40 @@ func (k Keeper) PushAction(ctx sdk.Context, action vm.Jsonable) error { } // Get the current queue tail, defaulting to zero if its vstorage doesn't exist. + // The `tail` is the value of the next index to be inserted tail, err := k.actionQueueIndex(ctx, "tail") if err != nil { return err } - // JS uses IEEE 754 floats so avoid overflowing integers - if tail == MaxUint53 { + if tail.Equal(MaxSDKInt) { return errors.New(StoragePathActionQueue + " overflow") } + nextTail := tail.Add(sdk.NewInt(1)) // Set the vstorage corresponding to the queue entry for the current tail. - path := StoragePathActionQueue + "." + strconv.FormatUint(tail, 10) + path := StoragePathActionQueue + "." + tail.String() k.vstorageKeeper.SetStorage(ctx, vstoragetypes.NewStorageEntry(path, string(bz))) // Update the tail to point to the next available entry. path = StoragePathActionQueue + ".tail" - k.vstorageKeeper.SetStorage(ctx, vstoragetypes.NewStorageEntry(path, strconv.FormatUint(tail+1, 10))) + k.vstorageKeeper.SetStorage(ctx, vstoragetypes.NewStorageEntry(path, nextTail.String())) return nil } -func (k Keeper) actionQueueIndex(ctx sdk.Context, name string) (uint64, error) { - index := uint64(0) - var err error - indexEntry := k.vstorageKeeper.GetEntry(ctx, StoragePathActionQueue+"."+name) - if indexEntry.HasData() { - index, err = strconv.ParseUint(indexEntry.StringValue(), 10, 64) +func (k Keeper) actionQueueIndex(ctx sdk.Context, position string) (sdk.Int, error) { + // Position should be either "head" or "tail" + path := StoragePathActionQueue + "." + position + indexEntry := k.vstorageKeeper.GetEntry(ctx, path) + if !indexEntry.HasData() { + return sdk.NewInt(0), nil } - return index, err + + index, ok := sdk.NewIntFromString(indexEntry.StringValue()) + if !ok { + return index, fmt.Errorf("couldn't parse %s as Int: %s", path, indexEntry.StringValue()) + } + return index, nil } func (k Keeper) ActionQueueLength(ctx sdk.Context) (int32, error) { @@ -162,11 +169,17 @@ func (k Keeper) ActionQueueLength(ctx sdk.Context) (int32, error) { if err != nil { return 0, err } - size := tail - head - if size > math.MaxInt32 { + // The tail index is exclusive + size := tail.Sub(head) + if !size.IsInt64() { + return 0, fmt.Errorf("%s size too big: %s", StoragePathActionQueue, size) + } + + int64Size := size.Int64() + if int64Size > math.MaxInt32 { return math.MaxInt32, nil } - return int32(size), nil + return int32(int64Size), nil } // BlockingSend sends a message to the controller and blocks the Golang process diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index a67b919c391..c7ef5c826f0 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -29,17 +29,17 @@ * @param {QueueStorage} storage a scoped queue storage */ export const makeQueue = storage => { - const getHead = () => Number.parseInt(storage.get('head') || '0', 10); - const getTail = () => Number.parseInt(storage.get('tail') || '0', 10); + const getHead = () => BigInt(storage.get('head') || 0); + const getTail = () => BigInt(storage.get('tail') || 0); const queue = { size: () => { - return getTail() - getHead(); + return Number(getTail() - getHead()); }, /** @param {T} obj */ push: obj => { const tail = getTail(); - storage.set('tail', String(tail + 1)); + storage.set('tail', String(tail + 1n)); storage.set(`${tail}`, JSON.stringify(obj)); storage.commit(); }, @@ -59,7 +59,7 @@ export const makeQueue = storage => { /** @type {string} */ (storage.get(headKey)), ); storage.delete(headKey); - head += 1; + head += 1n; return { value, done }; } // Reached the end, so clean up our indices. From c7b7ca6585b7284f12e9f27fadb73cacf348167b Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Wed, 17 Aug 2022 15:02:56 +0000 Subject: [PATCH 08/12] refactor(cosmic-swingset): Use IterableIterator for chainQueue consume --- packages/cosmic-swingset/src/make-queue.js | 94 +++++++++++----------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index c7ef5c826f0..1ecaca4b843 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -43,54 +43,54 @@ export const makeQueue = storage => { storage.set(`${tail}`, JSON.stringify(obj)); storage.commit(); }, - /** @returns {Iterable} */ - consumeAll: () => ({ - [Symbol.iterator]: () => { - let done = false; - let head = getHead(); - const tail = getTail(); - return { - next: () => { - if (!done) { - if (head < tail) { - // Still within the queue. - const headKey = `${head}`; - const value = JSON.parse( - /** @type {string} */ (storage.get(headKey)), - ); - storage.delete(headKey); - head += 1n; - return { value, done }; - } - // Reached the end, so clean up our indices. - storage.delete('head'); - storage.delete('tail'); - storage.commit(); - done = true; - } - return { value: undefined, done }; - }, - return: () => { - if (!done) { - // We're done consuming, so save our state. - storage.set('head', String(head)); - storage.commit(); - done = true; - } - return { value: undefined, done }; - }, - throw: err => { - if (!done) { - // Don't change our state. - storage.abort(); - done = true; - throw err; + /** @returns {IterableIterator} */ + consumeAll: () => { + let done = false; + let head = getHead(); + const tail = getTail(); + const iterator = { + [Symbol.iterator]: () => iterator, + next: () => { + if (!done) { + if (head < tail) { + // Still within the queue. + const headKey = `${head}`; + const value = JSON.parse( + /** @type {string} */ (storage.get(headKey)), + ); + storage.delete(headKey); + head += 1n; + return { value, done }; } - return { value: undefined, done }; - }, - }; - }, - }), + // Reached the end, so clean up our indices. + storage.delete('head'); + storage.delete('tail'); + storage.commit(); + done = true; + } + return { value: undefined, done }; + }, + return: () => { + if (!done) { + // We're done consuming, so save our state. + storage.set('head', String(head)); + storage.commit(); + done = true; + } + return { value: undefined, done }; + }, + throw: err => { + if (!done) { + // Don't change our state. + storage.abort(); + done = true; + throw err; + } + return { value: undefined, done }; + }, + }; + return iterator; + }, }; return queue; }; From 0d9943860668744548de1218d070093e436bed8a Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Wed, 17 Aug 2022 15:05:30 +0000 Subject: [PATCH 09/12] fix(cosmic-swingset): enforce no parallel consume for actionQueue --- packages/cosmic-swingset/src/make-queue.js | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index 1ecaca4b843..ea82d7ab052 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -1,5 +1,7 @@ // @ts-check +import { Fail } from '@agoric/assert'; + /** * @typedef {object} QueueStorage * @property {() => void} commit @@ -32,6 +34,8 @@ export const makeQueue = storage => { const getHead = () => BigInt(storage.get('head') || 0); const getTail = () => BigInt(storage.get('tail') || 0); + /** @type {IterableIterator | null} */ + let currentIterator = null; const queue = { size: () => { return Number(getTail() - getHead()); @@ -42,6 +46,7 @@ export const makeQueue = storage => { storage.set('tail', String(tail + 1n)); storage.set(`${tail}`, JSON.stringify(obj)); storage.commit(); + currentIterator = null; }, /** @returns {IterableIterator} */ consumeAll: () => { @@ -51,6 +56,7 @@ export const makeQueue = storage => { const iterator = { [Symbol.iterator]: () => iterator, next: () => { + currentIterator === iterator || Fail`invalid iterator`; if (!done) { if (head < tail) { // Still within the queue. @@ -71,6 +77,7 @@ export const makeQueue = storage => { return { value: undefined, done }; }, return: () => { + currentIterator === iterator || Fail`invalid iterator`; if (!done) { // We're done consuming, so save our state. storage.set('head', String(head)); @@ -80,6 +87,7 @@ export const makeQueue = storage => { return { value: undefined, done }; }, throw: err => { + currentIterator === iterator || Fail`invalid iterator`; if (!done) { // Don't change our state. storage.abort(); @@ -89,6 +97,7 @@ export const makeQueue = storage => { return { value: undefined, done }; }, }; + currentIterator = iterator; return iterator; }, }; From de1a35310f0af4829f01356c5c0933fcc45baf78 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Sat, 4 Mar 2023 23:49:54 +0000 Subject: [PATCH 10/12] fix(cosmic-swingset): early done check in makeQueue Prevents updating the queue head if queue started empty --- packages/cosmic-swingset/src/make-queue.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index ea82d7ab052..c8d0b323eee 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -50,9 +50,9 @@ export const makeQueue = storage => { }, /** @returns {IterableIterator} */ consumeAll: () => { - let done = false; let head = getHead(); const tail = getTail(); + let done = !(head < tail); const iterator = { [Symbol.iterator]: () => iterator, next: () => { From e795236c6b2172bbe189417e57f03d087a53adcb Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Sat, 4 Mar 2023 00:05:16 +0000 Subject: [PATCH 11/12] refactor(cosmic-swingset): launch-chain uses actionQueueStorage --- packages/cosmic-swingset/src/chain-main.js | 17 ++++++------- packages/cosmic-swingset/src/launch-chain.js | 7 ++++-- packages/cosmic-swingset/src/make-queue.js | 26 ++++++++++++++++++++ packages/cosmic-swingset/src/sim-chain.js | 16 ++++-------- 4 files changed, 43 insertions(+), 23 deletions(-) diff --git a/packages/cosmic-swingset/src/chain-main.js b/packages/cosmic-swingset/src/chain-main.js index 6133f814061..60388564431 100644 --- a/packages/cosmic-swingset/src/chain-main.js +++ b/packages/cosmic-swingset/src/chain-main.js @@ -30,7 +30,6 @@ import { import stringify from './json-stable-stringify.js'; import { launch } from './launch-chain.js'; import { getTelemetryProviders } from './kernel-stats.js'; -import { makeQueue } from './make-queue.js'; // eslint-disable-next-line no-unused-vars let whenHellFreezesOver = null; @@ -281,7 +280,7 @@ export default async function main(progname, args, { env, homedir, agcc }) { val => stringify(exportMailbox(val)), ), ); - const actionQueueStorage = makeBufferedStorage( + const actionQueueRawStorage = makeBufferedStorage( makePrefixedBridgeStorage( sendToChain, `${STORAGE_PATH.ACTION_QUEUE}.`, @@ -290,13 +289,11 @@ export default async function main(progname, args, { env, homedir, agcc }) { x => x, ), ); - const actionQueue = makeQueue( - harden({ - ...actionQueueStorage.kvStore, - commit: actionQueueStorage.commit, - abort: actionQueueStorage.abort, - }), - ); + const actionQueueStorage = harden({ + ...actionQueueRawStorage.kvStore, + commit: actionQueueRawStorage.commit, + abort: actionQueueRawStorage.abort, + }); function setActivityhash(activityhash) { const entry = [STORAGE_PATH.ACTIVITYHASH, activityhash]; const msg = stringify({ @@ -436,7 +433,7 @@ export default async function main(progname, args, { env, homedir, agcc }) { }; const s = await launch({ - actionQueue, + actionQueueStorage, kernelStateDBDir: stateDBDir, makeInstallationPublisher, mailboxStorage, diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index c6da2214887..e4c698adf6b 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -210,7 +210,7 @@ function neverStop() { } export async function launch({ - actionQueue, + actionQueueStorage, kernelStateDBDir, mailboxStorage, clearChainSends, @@ -257,7 +257,10 @@ export async function launch({ commit: () => {}, // disable abort: () => {}, // disable }); - /** @type {ReturnType>} */ + /** @typedef {ReturnType>} ActionQueue */ + /** @type {ActionQueue} */ + const actionQueue = makeQueue(actionQueueStorage); + /** @type {ActionQueue} */ const inboundQueue = makeQueue(inboundQueueStorage); // Not to be confused with the gas model, this meter is for OpenTelemetry. diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index c8d0b323eee..b4a1ce129e2 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -11,6 +11,32 @@ import { Fail } from '@agoric/assert'; * @property {(key: string) => void} delete */ +/** + * @typedef {{[idx: number]: string | undefined, head?: string, tail?: string}} QueueStorageDump + * @param {QueueStorageDump} [init] + * @returns {{storage: QueueStorage; dump: () => QueueStorageDump}} + */ +export const makeQueueStorageMock = init => { + const storage = new Map(init && Object.entries(init)); + return harden({ + storage: { + get(key) { + return storage.get(key); + }, + set(key, value) { + typeof value === 'string' || Fail`invalid value type ${value}`; + storage.set(key, value); + }, + delete(key) { + storage.delete(key); + }, + commit() {}, + abort() {}, + }, + dump: () => harden(Object.fromEntries(storage.entries())), + }); +}; + /** * Create a queue backed by some sort of scoped storage. * diff --git a/packages/cosmic-swingset/src/sim-chain.js b/packages/cosmic-swingset/src/sim-chain.js index f71af53ff04..f6e2edea0f0 100644 --- a/packages/cosmic-swingset/src/sim-chain.js +++ b/packages/cosmic-swingset/src/sim-chain.js @@ -20,6 +20,7 @@ import { launch } from './launch-chain.js'; import { getTelemetryProviders } from './kernel-stats.js'; import { DEFAULT_SIM_SWINGSET_PARAMS, QueueInbound } from './sim-params.js'; import { parseQueueSizes } from './params.js'; +import { makeQueue, makeQueueStorageMock } from './make-queue.js'; const console = anylogger('fake-chain'); @@ -96,18 +97,11 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { env, }); - let aqContents = []; - const actionQueue = { - /** @returns {Iterable} */ - consumeAll: () => { - const iterable = aqContents; - aqContents = []; - return iterable; - }, - }; + const actionQueueStorage = makeQueueStorageMock().storage; + const actionQueue = makeQueue(actionQueueStorage); const s = await launch({ - actionQueue, + actionQueueStorage, kernelStateDBDir: stateDBdir, mailboxStorage, clearChainSends, @@ -150,7 +144,7 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { const thisBlock = intoChain.splice(0, queueAllowed[QueueInbound]); for (const [i, [newMessages, acknum]] of thisBlock.entries()) { - aqContents.push({ + actionQueue.push({ action: { type: 'DELIVER_INBOUND', peer: bootAddress, From a32299df308eb869def870cca93f0b89e37e9110 Mon Sep 17 00:00:00 2001 From: Mathieu Hofman Date: Sat, 4 Mar 2023 20:50:46 +0000 Subject: [PATCH 12/12] feat(cosmic-swingset): leave inbound in actionQueue --- golang/cosmos/x/swingset/abci.go | 21 ++--- golang/cosmos/x/swingset/keeper/keeper.go | 33 +++++++ packages/cosmic-swingset/src/kernel-stats.js | 11 ++- packages/cosmic-swingset/src/launch-chain.js | 95 ++++---------------- packages/cosmic-swingset/src/sim-chain.js | 14 +-- 5 files changed, 69 insertions(+), 105 deletions(-) diff --git a/golang/cosmos/x/swingset/abci.go b/golang/cosmos/x/swingset/abci.go index 42ad6525f8e..c7359fcb04a 100644 --- a/golang/cosmos/x/swingset/abci.go +++ b/golang/cosmos/x/swingset/abci.go @@ -3,7 +3,6 @@ package swingset import ( // "fmt" // "os" - "encoding/json" "time" "github.com/cosmos/cosmos-sdk/telemetry" @@ -23,10 +22,6 @@ type beginBlockAction struct { Params types.Params `json:"params"` } -type beginBlockResult struct { - QueueAllowed []types.QueueSize `json:"queue_allowed"` -} - type endBlockAction struct { Type string `json:"type"` BlockHeight int64 `json:"blockHeight"` @@ -50,20 +45,14 @@ func BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, keeper Keeper) erro ChainID: ctx.ChainID(), Params: keeper.GetParams(ctx), } - out, err := keeper.BlockingSend(ctx, action) + _, err := keeper.BlockingSend(ctx, action) // fmt.Fprintf(os.Stderr, "BEGIN_BLOCK Returned from SwingSet: %s, %v\n", out, err) - - if out != "" { - var result beginBlockResult - err := json.Unmarshal([]byte(out), &result) - if err != nil { - panic(err) - } - state := keeper.GetState(ctx) - state.QueueAllowed = result.QueueAllowed - keeper.SetState(ctx, state) + if err != nil { + panic(err) } + err = keeper.UpdateQueueAllowed(ctx) + return err } diff --git a/golang/cosmos/x/swingset/keeper/keeper.go b/golang/cosmos/x/swingset/keeper/keeper.go index 71a144514a8..961f77cc549 100644 --- a/golang/cosmos/x/swingset/keeper/keeper.go +++ b/golang/cosmos/x/swingset/keeper/keeper.go @@ -182,6 +182,39 @@ func (k Keeper) ActionQueueLength(ctx sdk.Context) (int32, error) { return int32(int64Size), nil } +func (k Keeper) UpdateQueueAllowed(ctx sdk.Context) error { + params := k.GetParams(ctx) + inboundQueueMax, found := types.QueueSizeEntry(params.QueueMax, types.QueueInbound) + if !found { + return errors.New("could not find max inboundQueue size in params") + } + inboundMempoolQueueMax := inboundQueueMax / 2 + + inboundQueueSize, err := k.ActionQueueLength(ctx) + if err != nil { + return err + } + + var inboundQueueAllowed int32 + if inboundQueueMax > inboundQueueSize { + inboundQueueAllowed = inboundQueueMax - inboundQueueSize + } + + var inboundMempoolQueueAllowed int32 + if inboundMempoolQueueMax > inboundQueueSize { + inboundMempoolQueueAllowed = inboundMempoolQueueMax - inboundQueueSize + } + + state := k.GetState(ctx) + state.QueueAllowed = []types.QueueSize{ + {Key: types.QueueInbound, Size_: inboundQueueAllowed}, + {Key: types.QueueInboundMempool, Size_: inboundMempoolQueueAllowed}, + } + k.SetState(ctx, state) + + return nil +} + // BlockingSend sends a message to the controller and blocks the Golang process // until the response. It is orthogonal to PushAction, and should only be used // by SwingSet to perform block lifecycle events (BEGIN_BLOCK, END_BLOCK, diff --git a/packages/cosmic-swingset/src/kernel-stats.js b/packages/cosmic-swingset/src/kernel-stats.js index c5ce317076e..9d77ab88d93 100644 --- a/packages/cosmic-swingset/src/kernel-stats.js +++ b/packages/cosmic-swingset/src/kernel-stats.js @@ -256,9 +256,14 @@ export function makeInboundQueueMetrics(initialLength) { let remove = 0; return harden({ - incStat: (delta = 1) => { - length += delta; - add += delta; + updateLength: newLength => { + const delta = newLength - length; + length = newLength; + if (delta > 0) { + add += delta; + } else { + remove -= delta; + } }, decStat: (delta = 1) => { diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index e4c698adf6b..636e7d66d3e 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -33,9 +33,8 @@ import { BeansPerBlockComputeLimit, BeansPerVatCreation, BeansPerXsnapComputron, - QueueInbound, } from './sim-params.js'; -import { parseParams, encodeQueueSizes } from './params.js'; +import { parseParams } from './params.js'; import { makeQueue } from './make-queue.js'; const console = anylogger('launch-chain'); @@ -237,31 +236,9 @@ export async function launch({ }); const { kvStore, commit } = hostStorage; - // makeQueue() thinks it should commit/abort, but the kvStore doesn't provide - // those ('commit' is reserved for flushing the block buffer). Furthermore - // the kvStore only deals with string values. - // We create a storage wrapper that adds a prefix to keys, serializes values, - // and disables commit/abort. - - const inboundQueuePrefix = getHostKey('inboundQueue.'); - /** @type {import("./make-queue.js").QueueStorage} */ - const inboundQueueStorage = harden({ - get: key => { - return kvStore.get(inboundQueuePrefix + key); - }, - set: (key, value) => { - typeof value === 'string' || Fail`value in inboundQueue must be a string`; - kvStore.set(inboundQueuePrefix + key, value); - }, - delete: key => kvStore.delete(inboundQueuePrefix + key), - commit: () => {}, // disable - abort: () => {}, // disable - }); /** @typedef {ReturnType>} ActionQueue */ /** @type {ActionQueue} */ const actionQueue = makeQueue(actionQueueStorage); - /** @type {ActionQueue} */ - const inboundQueue = makeQueue(inboundQueueStorage); // Not to be confused with the gas model, this meter is for OpenTelemetry. const metricMeter = metricsProvider.getMeter('ag-chain-cosmos'); @@ -292,7 +269,7 @@ export async function launch({ ? parseInt(env.END_BLOCK_SPIN_MS, 10) : 0; - const inboundQueueMetrics = makeInboundQueueMetrics(inboundQueue.size()); + const inboundQueueMetrics = makeInboundQueueMetrics(actionQueue.size()); const { crankScheduler } = exportKernelStats({ controller, metricMeter, @@ -314,33 +291,6 @@ export async function launch({ } } - let savedQueueAllowed = JSON.parse( - kvStore.get(getHostKey('queueAllowed')) || '{}', - ); - - function updateQueueAllowed(_blockHeight, _blockTime, params) { - assert(params.queueMax); - assert(QueueInbound in params.queueMax); - - const inboundQueueMax = params.queueMax[QueueInbound]; - const inboundMempoolQueueMax = Math.floor(inboundQueueMax / 2); - - const inboundQueueSize = inboundQueue.size(); - - const inboundQueueAllowed = Math.max(0, inboundQueueMax - inboundQueueSize); - const inboundMempoolQueueAllowed = Math.max( - 0, - inboundMempoolQueueMax - inboundQueueSize, - ); - - savedQueueAllowed = { - // Keep up-to-date with queue size keys defined in - // golang/cosmos/x/swingset/types/default-params.go - inbound: inboundQueueAllowed, - inbound_mempool: inboundMempoolQueueAllowed, - }; - } - async function saveChainState() { // Save the mailbox state. await mailboxStorage.commit(); @@ -351,7 +301,6 @@ export async function launch({ kvStore.set(getHostKey('height'), `${blockHeight}`); kvStore.set(getHostKey('blockTime'), `${blockTime}`); kvStore.set(getHostKey('chainSends'), JSON.stringify(chainSends)); - kvStore.set(getHostKey('queueAllowed'), JSON.stringify(savedQueueAllowed)); await commit(); } @@ -516,11 +465,11 @@ export async function launch({ let keepGoing = await runSwingset(); - // Then process as much as we can from the inboundQueue, which contains + // Then process as much as we can from the actionQueue, which contains // first the old actions followed by the newActions, running the // kernel to completion after each. if (keepGoing) { - for (const { action, context } of inboundQueue.consumeAll()) { + for (const { action, context } of actionQueue.consumeAll()) { const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`; inboundQueueMetrics.decStat(); // eslint-disable-next-line no-await-in-loop @@ -528,7 +477,7 @@ export async function launch({ // eslint-disable-next-line no-await-in-loop keepGoing = await runSwingset(); if (!keepGoing) { - // any leftover actions will remain on the inboundQueue for possible + // any leftover actions will remain on the actionQueue for possible // processing in the next block break; } @@ -540,19 +489,14 @@ export async function launch({ } } - async function endBlock(blockHeight, blockTime, params, newActions) { + async function endBlock(blockHeight, blockTime, params) { // This is called once per block, during the END_BLOCK event, and // only when we know that cosmos is in sync (else we'd skip kernel - // execution). 'newActions' are the bridge/mailbox/etc events that - // cosmos stored up for delivery to swingset in this block. - - // First, push all newActions onto the end of the inboundQueue, - // remembering that inboundQueue might still have work from the - // previous block - for (const actionRecord of newActions) { - inboundQueue.push(actionRecord); - inboundQueueMetrics.incStat(); - } + // execution). + + // First, record new actions (bridge/mailbox/etc events that cosmos + // added up for delivery to swingset) into our inboundQueue metrics + inboundQueueMetrics.updateLength(actionQueue.size()); // We update the timer device at the start of each block, which might push // work onto the end of the kernel run-queue (if any timers were ready to @@ -746,19 +690,14 @@ export async function launch({ blockManagerConsole.info('block', blockHeight, 'begin'); runTime = 0; - if (blockNeedsExecution(blockHeight)) { - // We are not reevaluating, so compute a new queueAllowed - updateQueueAllowed(blockHeight, blockTime, blockParams); - } - controller.writeSlogObject({ type: 'cosmic-swingset-begin-block', blockHeight, blockTime, - queueAllowed: savedQueueAllowed, + actionQueueStats: inboundQueueMetrics.getStats(), }); - return { queue_allowed: encodeQueueSizes(savedQueueAllowed) }; + return undefined; } case ActionType.END_BLOCK: { @@ -794,12 +733,7 @@ export async function launch({ provideInstallationPublisher(); await processAction(action.type, async () => - endBlock( - blockHeight, - blockTime, - blockParams, - actionQueue.consumeAll(), - ), + endBlock(blockHeight, blockTime, blockParams), ); // We write out our on-chain state as a number of chainSends. @@ -815,6 +749,7 @@ export async function launch({ type: 'cosmic-swingset-end-block-finish', blockHeight, blockTime, + actionQueueStats: inboundQueueMetrics.getStats(), }); return undefined; diff --git a/packages/cosmic-swingset/src/sim-chain.js b/packages/cosmic-swingset/src/sim-chain.js index f6e2edea0f0..52e2f877df1 100644 --- a/packages/cosmic-swingset/src/sim-chain.js +++ b/packages/cosmic-swingset/src/sim-chain.js @@ -12,7 +12,7 @@ import anylogger from 'anylogger'; import { makeSlogSender } from '@agoric/telemetry'; import { resolve as importMetaResolve } from 'import-meta-resolve'; -import { assert, Fail } from '@agoric/assert'; +import { Fail } from '@agoric/assert'; import { makeWithQueue } from '@agoric/internal/src/queue.js'; import { makeBatchedDeliver } from '@agoric/internal/src/batched-deliver.js'; import stringify from './json-stable-stringify.js'; @@ -135,13 +135,15 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { blockTime, params, }; - const beginBlockResult = await blockingSend(beginAction); - assert(beginBlockResult); - const queueAllowed = parseQueueSizes(beginBlockResult.queue_allowed); - assert(QueueInbound in queueAllowed); + await blockingSend(beginAction); + const inboundQueueMax = parseQueueSizes(params.queue_max)[QueueInbound]; + const inboundQueueAllowed = Math.max( + 0, + inboundQueueMax - actionQueue.size(), + ); // Gather up the new messages into the latest block. - const thisBlock = intoChain.splice(0, queueAllowed[QueueInbound]); + const thisBlock = intoChain.splice(0, inboundQueueAllowed); for (const [i, [newMessages, acknum]] of thisBlock.entries()) { actionQueue.push({