diff --git a/go.mod b/go.mod index 4a44373e0e9..fb583a93081 100644 --- a/go.mod +++ b/go.mod @@ -145,7 +145,7 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.33.2 replace github.com/tendermint/tendermint => github.com/agoric-labs/tendermint v0.34.23-alpha.agoric.3 // We need a fork of cosmos-sdk until all of the differences are merged. -replace github.com/cosmos/cosmos-sdk => github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1 +replace github.com/cosmos/cosmos-sdk => github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1.0.20230320225042-2109765fd835 replace github.com/cosmos/gaia/v7 => github.com/Agoric/ag0/v7 v7.0.2-alpha.agoric.1 diff --git a/go.sum b/go.sum index ba52d612c0e..97620c6dc68 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,8 @@ github.com/Zilliqa/gozilliqa-sdk v1.2.1-0.20201201074141-dd0ecada1be6/go.mod h1: github.com/adlio/schema v1.3.3 h1:oBJn8I02PyTB466pZO1UZEn1TV5XLlifBSyMrmHl/1I= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= -github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1 h1:3GHpCatyGBaZDoSr9k6Rd5/5QnNghkzxgLfkDsPDBPk= -github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1/go.mod h1:fdXvzy+wmYB+W+N139yb0+szbT7zAGgUjmxm5DBrjto= +github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1.0.20230320225042-2109765fd835 h1:Mmw52cHAUNwtaNXpk7b3lTeoCRd5Vw9Fdrly5ABIxCA= +github.com/agoric-labs/cosmos-sdk v0.45.11-alpha.agoric.1.0.20230320225042-2109765fd835/go.mod h1:fdXvzy+wmYB+W+N139yb0+szbT7zAGgUjmxm5DBrjto= github.com/agoric-labs/cosmos-sdk/ics23/go v0.8.0-alpha.agoric.1 h1:2jvHI/2d+psWAZy6FQ0vXJCHUtfU3ZbbW+pQFL04arQ= github.com/agoric-labs/cosmos-sdk/ics23/go v0.8.0-alpha.agoric.1/go.mod h1:E45NqnlpxGnpfTWL/xauN7MRwEE28T4Dd4uraToOaKg= github.com/agoric-labs/tendermint v0.34.23-alpha.agoric.3 h1:aq6F1r3RQkKUYNeMNjRxgGn3dayvKnDK/R6gQF0WoFs= diff --git a/golang/cosmos/x/swingset/abci.go b/golang/cosmos/x/swingset/abci.go index 42ad6525f8e..c7359fcb04a 100644 --- a/golang/cosmos/x/swingset/abci.go +++ b/golang/cosmos/x/swingset/abci.go @@ -3,7 +3,6 @@ package swingset import ( // "fmt" // "os" - "encoding/json" "time" "github.com/cosmos/cosmos-sdk/telemetry" @@ -23,10 +22,6 @@ type beginBlockAction struct { Params types.Params `json:"params"` } -type beginBlockResult struct { - QueueAllowed []types.QueueSize `json:"queue_allowed"` -} - type endBlockAction struct { Type string `json:"type"` BlockHeight int64 `json:"blockHeight"` @@ -50,20 +45,14 @@ func BeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, keeper Keeper) erro ChainID: ctx.ChainID(), Params: keeper.GetParams(ctx), } - out, err := keeper.BlockingSend(ctx, action) + _, err := keeper.BlockingSend(ctx, action) // fmt.Fprintf(os.Stderr, "BEGIN_BLOCK Returned from SwingSet: %s, %v\n", out, err) - - if out != "" { - var result beginBlockResult - err := json.Unmarshal([]byte(out), &result) - if err != nil { - panic(err) - } - state := keeper.GetState(ctx) - state.QueueAllowed = result.QueueAllowed - keeper.SetState(ctx, state) + if err != nil { + panic(err) } + err = keeper.UpdateQueueAllowed(ctx) + return err } diff --git a/golang/cosmos/x/swingset/keeper/keeper.go b/golang/cosmos/x/swingset/keeper/keeper.go index 53470ecbed9..961f77cc549 100644 --- a/golang/cosmos/x/swingset/keeper/keeper.go +++ b/golang/cosmos/x/swingset/keeper/keeper.go @@ -4,11 +4,13 @@ import ( "encoding/json" "errors" "fmt" + stdlog "log" "math" - "strconv" + "math/big" "github.com/tendermint/tendermint/libs/log" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper" @@ -32,10 +34,30 @@ const ( StoragePathBundles = "bundles" ) -const MaxUint53 = 9007199254740991 // Number.MAX_SAFE_INTEGER = 2**53 - 1 +// 2 ** 256 - 1 +var MaxSDKInt = sdk.NewIntFromBigInt(new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil), big.NewInt(1))) const stateKey string = "state" +// Contextual information about the message source of an action on the queue. +// This context should be unique per actionQueueRecord. +type actionContext struct { + // The block height in which the corresponding action was enqueued + BlockHeight int64 `json:"blockHeight"` + // The hash of the cosmos transaction that included the message + // If the action didn't result from a transaction message, a substitute value + // may be used. For example the VBANK_BALANCE_UPDATE actions use `x/vbank`. + TxHash string `json:"txHash"` + // The index of the message within the transaction. If the action didn't + // result from a cosmos transaction, a number should be chosen to make the + // actionContext unique. (for example a counter per block and source module). + MsgIdx int `json:"msgIdx"` +} +type actionQueueRecord struct { + Action vm.Jsonable `json:"action"` + Context actionContext `json:"context"` +} + // Keeper maintains the link to data vstorage and exposes getter/setter methods for the various parts of the state machine type Keeper struct { storeKey sdk.StoreKey @@ -87,40 +109,55 @@ func NewKeeper( // The actionQueue's format is documented by `makeChainQueue` in // `packages/cosmic-swingset/src/make-queue.js`. func (k Keeper) PushAction(ctx sdk.Context, action vm.Jsonable) error { - bz, err := json.Marshal(action) + txHash, txHashOk := ctx.Context().Value(baseapp.TxHashContextKey).(string) + if !txHashOk { + txHash = "unknown" + } + msgIdx, msgIdxOk := ctx.Context().Value(baseapp.TxMsgIdxContextKey).(int) + if !txHashOk || !msgIdxOk { + stdlog.Printf("error while extracting context for action %q\n", action) + } + record := actionQueueRecord{Action: action, Context: actionContext{BlockHeight: ctx.BlockHeight(), TxHash: txHash, MsgIdx: msgIdx}} + bz, err := json.Marshal(record) if err != nil { return err } // Get the current queue tail, defaulting to zero if its vstorage doesn't exist. + // The `tail` is the value of the next index to be inserted tail, err := k.actionQueueIndex(ctx, "tail") if err != nil { return err } - // JS uses IEEE 754 floats so avoid overflowing integers - if tail == MaxUint53 { + if tail.Equal(MaxSDKInt) { return errors.New(StoragePathActionQueue + " overflow") } + nextTail := tail.Add(sdk.NewInt(1)) // Set the vstorage corresponding to the queue entry for the current tail. - path := StoragePathActionQueue + "." + strconv.FormatUint(tail, 10) + path := StoragePathActionQueue + "." + tail.String() k.vstorageKeeper.SetStorage(ctx, vstoragetypes.NewStorageEntry(path, string(bz))) // Update the tail to point to the next available entry. path = StoragePathActionQueue + ".tail" - k.vstorageKeeper.SetStorage(ctx, vstoragetypes.NewStorageEntry(path, strconv.FormatUint(tail+1, 10))) + k.vstorageKeeper.SetStorage(ctx, vstoragetypes.NewStorageEntry(path, nextTail.String())) return nil } -func (k Keeper) actionQueueIndex(ctx sdk.Context, name string) (uint64, error) { - index := uint64(0) - var err error - indexEntry := k.vstorageKeeper.GetEntry(ctx, StoragePathActionQueue+"."+name) - if indexEntry.HasData() { - index, err = strconv.ParseUint(indexEntry.StringValue(), 10, 64) +func (k Keeper) actionQueueIndex(ctx sdk.Context, position string) (sdk.Int, error) { + // Position should be either "head" or "tail" + path := StoragePathActionQueue + "." + position + indexEntry := k.vstorageKeeper.GetEntry(ctx, path) + if !indexEntry.HasData() { + return sdk.NewInt(0), nil } - return index, err + + index, ok := sdk.NewIntFromString(indexEntry.StringValue()) + if !ok { + return index, fmt.Errorf("couldn't parse %s as Int: %s", path, indexEntry.StringValue()) + } + return index, nil } func (k Keeper) ActionQueueLength(ctx sdk.Context) (int32, error) { @@ -132,11 +169,50 @@ func (k Keeper) ActionQueueLength(ctx sdk.Context) (int32, error) { if err != nil { return 0, err } - size := tail - head - if size > math.MaxInt32 { + // The tail index is exclusive + size := tail.Sub(head) + if !size.IsInt64() { + return 0, fmt.Errorf("%s size too big: %s", StoragePathActionQueue, size) + } + + int64Size := size.Int64() + if int64Size > math.MaxInt32 { return math.MaxInt32, nil } - return int32(size), nil + return int32(int64Size), nil +} + +func (k Keeper) UpdateQueueAllowed(ctx sdk.Context) error { + params := k.GetParams(ctx) + inboundQueueMax, found := types.QueueSizeEntry(params.QueueMax, types.QueueInbound) + if !found { + return errors.New("could not find max inboundQueue size in params") + } + inboundMempoolQueueMax := inboundQueueMax / 2 + + inboundQueueSize, err := k.ActionQueueLength(ctx) + if err != nil { + return err + } + + var inboundQueueAllowed int32 + if inboundQueueMax > inboundQueueSize { + inboundQueueAllowed = inboundQueueMax - inboundQueueSize + } + + var inboundMempoolQueueAllowed int32 + if inboundMempoolQueueMax > inboundQueueSize { + inboundMempoolQueueAllowed = inboundMempoolQueueMax - inboundQueueSize + } + + state := k.GetState(ctx) + state.QueueAllowed = []types.QueueSize{ + {Key: types.QueueInbound, Size_: inboundQueueAllowed}, + {Key: types.QueueInboundMempool, Size_: inboundMempoolQueueAllowed}, + } + k.SetState(ctx, state) + + return nil } // BlockingSend sends a message to the controller and blocks the Golang process diff --git a/golang/cosmos/x/vbank/vbank.go b/golang/cosmos/x/vbank/vbank.go index 08660902873..75ea273e11f 100644 --- a/golang/cosmos/x/vbank/vbank.go +++ b/golang/cosmos/x/vbank/vbank.go @@ -1,12 +1,14 @@ package vbank import ( + "context" "encoding/json" "fmt" stdlog "log" "sort" "github.com/Agoric/agoric-sdk/golang/cosmos/vm" + "github.com/cosmos/cosmos-sdk/baseapp" sdk "github.com/cosmos/cosmos-sdk/types" ) @@ -241,5 +243,11 @@ func (ch portHandler) Receive(ctx *vm.ControllerContext, str string) (ret string } func (am AppModule) PushAction(ctx sdk.Context, action vm.Jsonable) error { + // vbank actions are not triggered by a swingset message in a transaction, so we need to + // synthesize unique context information. + // We use a fixed placeholder value for the txHash context, and can simply use `0` for the + // message index as there is only one such action per block. + ctx = ctx.WithContext(context.WithValue(ctx.Context(), baseapp.TxHashContextKey, "x/vbank")) + ctx = ctx.WithContext(context.WithValue(ctx.Context(), baseapp.TxMsgIdxContextKey, 0)) return am.keeper.PushAction(ctx, action) } diff --git a/packages/cosmic-swingset/src/bufferedStorage.js b/packages/cosmic-swingset/src/bufferedStorage.js index e6371d3f643..0e85216c15e 100644 --- a/packages/cosmic-swingset/src/bufferedStorage.js +++ b/packages/cosmic-swingset/src/bufferedStorage.js @@ -1,10 +1,23 @@ -import { assert, details as X, Fail } from '@agoric/assert'; +// @ts-check + +import { assert, Fail } from '@agoric/assert'; // XXX Do these "StorageAPI" functions belong in their own package? +/** + * @template {unknown} [T=unknown] + * @typedef {{ + * has: (key: string) => boolean, + * get: (key: string) => T | undefined, + * getNextKey: (previousKey: string) => string | undefined, + * set: (key: string, value: T ) => void, + * delete: (key: string) => void, + * }} KVStore + */ + /** * Assert function to ensure that an object implements the StorageAPI - * interface: methods { has, getKeys, get, set, delete } + * interface: methods { has, getNextKey, get, set, delete } * (cf. packages/SwingSet/docs/state.md#transactions). * * @param {*} kvStore The object to be tested @@ -15,108 +28,26 @@ import { assert, details as X, Fail } from '@agoric/assert'; * @returns {void} */ export function insistStorageAPI(kvStore) { - for (const n of ['has', 'getKeys', 'get', 'set', 'delete']) { + for (const n of ['has', 'getNextKey', 'get', 'set', 'delete']) { n in kvStore || Fail`kvStore.${n} is missing, cannot use`; } } -/** - * Given two iterators over sequences of unique strings sorted in ascending - * order lexicographically by UTF-16 code unit, produce a new iterator that will - * output the ascending sequence of unique strings from their merged output. - * - * @param {Iterator} it1 - * @param {Iterator} it2 - * - * @yields any - */ -function* mergeUtf16SortedIterators(it1, it2) { - /** @type {IteratorResult | null} */ - let v1 = null; - /** @type {IteratorResult | null} */ - let v2 = null; - /** @type {IteratorResult | null} */ - let vrest = null; - /** @type {Iterator | null} */ - let itrest = null; - - try { - v1 = it1.next(); - v2 = it2.next(); - while (!v1.done && !v2.done) { - if (v1.value < v2.value) { - const result = v1.value; - v1 = it1.next(); - yield result; - } else if (v1.value > v2.value) { - const result = v2.value; - v2 = it2.next(); - yield result; - } else { - // Each iterator produced the same value; consume it from both. - const result = v1.value; - v1 = it1.next(); - v2 = it2.next(); - yield result; - } - } - - itrest = v1.done ? it2 : it1; - vrest = v1.done ? v2 : v1; - v1 = null; - v2 = null; - - while (!vrest.done) { - const result = vrest.value; - vrest = itrest.next(); - yield result; - } - } finally { - const errors = []; - try { - if (vrest && !vrest.done && itrest && itrest.return) { - itrest.return(); - } - } catch (e) { - errors.push(e); - } - try { - if (v1 && !v1.done && it1.return) { - it1.return(); - } - } catch (e) { - errors.push(e); - } - try { - if (v2 && !v2.done && it2.return) { - it2.return(); - } - } catch (e) { - errors.push(e); - } - if (errors.length) { - const err = assert.error(X`Merged iterator failed to close cleanly`); - for (const e of errors) { - assert.note(err, X`Caused by ${e}`); - } - } - } -} - /** * Create a StorageAPI object that buffers writes to a wrapped StorageAPI object * until told to commit (or abort) them. * - * @param {import('@agoric/swing-store').KVStore} kvStore The StorageAPI object to wrap + * @template {unknown} [T=unknown] + * @param {KVStore} kvStore The StorageAPI object to wrap * @param {{ - * onGet?: (key: string, value: string) => void, // a callback invoked after getting a value from kvStore - * onPendingSet?: (key: string, value: string) => void, // a callback invoked after a new uncommitted value is set + * onGet?: (key: string, value: T) => void, // a callback invoked after getting a value from kvStore + * onPendingSet?: (key: string, value: T) => void, // a callback invoked after a new uncommitted value is set * onPendingDelete?: (key: string) => void, // a callback invoked after a new uncommitted delete * onCommit?: () => void, // a callback invoked after pending operations have been committed * onAbort?: () => void, // a callback invoked after pending operations have been aborted * }} listeners Optional callbacks to be invoked when respective events occur * - * @returns {{kvStore: import('@agoric/swing-store').KVStore, commit: () => void, abort: () => void}} + * @returns {{kvStore: KVStore, commit: () => void, abort: () => void}} */ export function makeBufferedStorage(kvStore, listeners = {}) { insistStorageAPI(kvStore); @@ -140,8 +71,7 @@ export function makeBufferedStorage(kvStore, listeners = {}) { if (additions.has(key)) return additions.get(key); if (deletions.has(key)) return undefined; const value = kvStore.get(key); - // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error -- https://github.com/Agoric/agoric-sdk/issues/4620 - // @ts-ignore value may be undefined + // @ts-expect-error value may be undefined if (onGet !== undefined) onGet(key, value); return value; }, @@ -159,55 +89,11 @@ export function makeBufferedStorage(kvStore, listeners = {}) { }, /** - * Generator function that returns an iterator over all the keys within a - * given range, in lexicographical order by UTF-16 code unit. - * - * Warning: this function introduces consistency risks that callers must - * take into account. Results do not include include keys added during - * iteration. This layer of abstraction lacks the knowledge necessary to - * protect callers, as the nature of the risks varies depending on what a - * caller is trying to do. This API should not be made available to user - * vat code. Rather, it is intended as a low-level mechanism to use in - * implementating higher level storage abstractions that are expected to - * provide their own consistency protections as appropriate to their own - * circumstances. - * - * @param {string} start Start of the key range of interest (inclusive). An empty - * string indicates a range from the beginning of the key set. - * @param {string} end End of the key range of interest (exclusive). An empty string - * indicates a range through the end of the key set. - * - * @yields {string} an iterator for the keys from start <= key < end - * - * @throws if either parameter is not a string. + * @param {string} previousKey */ - *getKeys(start, end) { - assert.typeof(start, 'string'); - assert.typeof(end, 'string'); - - // Merge keys reported by the backing store and a snapshot of in-range - // additions into a single duplicate-free iterator. - const added = []; - for (const k of additions.keys()) { - if ((start === '' || start <= k) && (end === '' || k < end)) { - added.push(k); - } - } - // Note that this implicitly compares keys lexicographically by UTF-16 - // code unit (e.g., "\u{1D306}" _precedes_ "\u{FFFD}"). - // If kvStore.getKeys() results are not in ascending order subject to the - // same comparison, then output may include duplicates. - added.sort(); - const merged = mergeUtf16SortedIterators( - added.values(), - kvStore.getKeys(start, end), - ); - - for (const k of merged) { - if (!deletions.has(k)) { - yield k; - } - } + getNextKey(previousKey) { + assert.typeof(previousKey, 'string'); + throw new Error('not implemented'); }, }; function commit() { @@ -230,9 +116,10 @@ export function makeBufferedStorage(kvStore, listeners = {}) { } /** - * @param {{ get(key: string) => unknown, set(key: string, value: unknown): void }} getterSetter + * @template {unknown} [T=unknown] + * @param {KVStore} kvStore */ -export const makeReadCachingStorage = getterSetter => { +export const makeReadCachingStorage = kvStore => { // In addition to the wrapping write buffer, keep a simple cache of // read values for has and get. let cache; @@ -241,31 +128,50 @@ export const makeReadCachingStorage = getterSetter => { } resetCache(); + const deleted = Symbol('deleted'); + const undef = Symbol('undefined'); + + /** @type {KVStore} */ const storage = harden({ has(key) { - return storage.get(key) !== undefined; + const value = cache.get(key); + if (value !== undefined) { + return value !== deleted; + } else { + const ret = kvStore.has(key); + if (!ret) { + cache.set(key, deleted); + } + return ret; + } }, get(key) { - if (cache.has(key)) return cache.get(key); + let value = cache.get(key); + if (value !== undefined) { + return value === deleted || value === undef ? undefined : value; + } // Fetch the value and cache it until the next commit or abort. - const value = getterSetter.get(key); - cache.set(key, value); + value = kvStore.get(key); + cache.set(key, value === undefined ? undef : value); return value; }, set(key, value) { // Set the value and cache it until the next commit or abort (which is // expected immediately, since the buffered wrapper only calls set // *during* a commit). - cache.set(key, value); - getterSetter.set(key, value); + // `undefined` is a valid value technically different than deletion, + // depending on how the underlying store does its serialization + cache.set(key, value === undefined ? undef : value); + kvStore.set(key, value); }, delete(key) { - // Deletion in chain storage manifests as set-to-undefined. - storage.set(key, undefined); + // Delete the value and cache the deletion until next commit or abort. + // Deletion results in `undefined` on `get`, but `false` on `has` + cache.set(key, deleted); + kvStore.delete(key); }, - // eslint-disable-next-line require-yield - *getKeys(_start, _end) { + getNextKey(_previousKey) { throw new Error('not implemented'); }, }); diff --git a/packages/cosmic-swingset/src/chain-main.js b/packages/cosmic-swingset/src/chain-main.js index 200f2ac6bae..60388564431 100644 --- a/packages/cosmic-swingset/src/chain-main.js +++ b/packages/cosmic-swingset/src/chain-main.js @@ -1,3 +1,5 @@ +// @ts-check + import { resolve as pathResolve } from 'path'; import v8 from 'node:v8'; import process from 'node:process'; @@ -21,11 +23,13 @@ import { makePublishKit, pipeTopicToStorage } from '@agoric/notifier'; import * as STORAGE_PATH from '@agoric/internal/src/chain-storage-paths.js'; import { BridgeId as BRIDGE_ID } from '@agoric/internal'; -import { makeReadCachingStorage } from './bufferedStorage.js'; +import { + makeBufferedStorage, + makeReadCachingStorage, +} from './bufferedStorage.js'; import stringify from './json-stable-stringify.js'; import { launch } from './launch-chain.js'; import { getTelemetryProviders } from './kernel-stats.js'; -import { makeQueue } from './make-queue.js'; // eslint-disable-next-line no-unused-vars let whenHellFreezesOver = null; @@ -41,16 +45,32 @@ const toNumber = specimen => { return number; }; +/** + * @template {unknown} [T=unknown] + * @param {(req: string) => string} call + * @param {string} prefix + * @param {"set" | "legacySet" | "setWithoutNotify"} setterMethod + * @param {(value: string) => T} fromBridgeStringValue + * @param {(value: T) => string} toBridgeStringValue + * @returns {import("./bufferedStorage.js").KVStore} + */ const makePrefixedBridgeStorage = ( call, prefix, setterMethod, - fromBridgeValue = x => x, - toBridgeValue = x => x, + fromBridgeStringValue = x => JSON.parse(x), + toBridgeStringValue = x => stringify(x), ) => { prefix.endsWith('.') || Fail`prefix ${prefix} must end with a dot`; return harden({ + has: key => { + const retStr = call( + stringify({ method: 'has', args: [`${prefix}${key}`] }), + ); + const ret = JSON.parse(retStr); + return ret; + }, get: key => { const retStr = call( stringify({ method: 'get', args: [`${prefix}${key}`] }), @@ -59,13 +79,21 @@ const makePrefixedBridgeStorage = ( if (ret == null) { return undefined; } - const bridgeValue = JSON.parse(ret); - return fromBridgeValue(bridgeValue); + return fromBridgeStringValue(ret); }, set: (key, value) => { const path = `${prefix}${key}`; - const entry = - value == null ? [path] : [path, stringify(toBridgeValue(value))]; + const entry = [path, toBridgeStringValue(value)]; + call( + stringify({ + method: setterMethod, + args: [entry], + }), + ); + }, + delete: key => { + const path = `${prefix}${key}`; + const entry = [path]; call( stringify({ method: setterMethod, @@ -73,6 +101,9 @@ const makePrefixedBridgeStorage = ( }), ); }, + getNextKey(_previousKey) { + throw new Error('not implemented'); + }, }); }; @@ -245,19 +276,24 @@ export default async function main(progname, args, { env, homedir, agcc }) { sendToChain, `${STORAGE_PATH.MAILBOX}.`, 'legacySet', - fromBridgeMailbox, - exportMailbox, + val => fromBridgeMailbox(JSON.parse(val)), + val => stringify(exportMailbox(val)), ), ); - const actionQueue = makeQueue( - makeReadCachingStorage( - makePrefixedBridgeStorage( - sendToChain, - `${STORAGE_PATH.ACTION_QUEUE}.`, - 'setWithoutNotify', - ), + const actionQueueRawStorage = makeBufferedStorage( + makePrefixedBridgeStorage( + sendToChain, + `${STORAGE_PATH.ACTION_QUEUE}.`, + 'setWithoutNotify', + x => x, + x => x, ), ); + const actionQueueStorage = harden({ + ...actionQueueRawStorage.kvStore, + commit: actionQueueRawStorage.commit, + abort: actionQueueRawStorage.abort, + }); function setActivityhash(activityhash) { const entry = [STORAGE_PATH.ACTIVITYHASH, activityhash]; const msg = stringify({ @@ -397,7 +433,7 @@ export default async function main(progname, args, { env, homedir, agcc }) { }; const s = await launch({ - actionQueue, + actionQueueStorage, kernelStateDBDir: stateDBDir, makeInstallationPublisher, mailboxStorage, diff --git a/packages/cosmic-swingset/src/kernel-stats.js b/packages/cosmic-swingset/src/kernel-stats.js index c5ce317076e..9d77ab88d93 100644 --- a/packages/cosmic-swingset/src/kernel-stats.js +++ b/packages/cosmic-swingset/src/kernel-stats.js @@ -256,9 +256,14 @@ export function makeInboundQueueMetrics(initialLength) { let remove = 0; return harden({ - incStat: (delta = 1) => { - length += delta; - add += delta; + updateLength: newLength => { + const delta = newLength - length; + length = newLength; + if (delta > 0) { + add += delta; + } else { + remove -= delta; + } }, decStat: (delta = 1) => { diff --git a/packages/cosmic-swingset/src/launch-chain.js b/packages/cosmic-swingset/src/launch-chain.js index b7c7cc53eec..636e7d66d3e 100644 --- a/packages/cosmic-swingset/src/launch-chain.js +++ b/packages/cosmic-swingset/src/launch-chain.js @@ -33,9 +33,8 @@ import { BeansPerBlockComputeLimit, BeansPerVatCreation, BeansPerXsnapComputron, - QueueInbound, } from './sim-params.js'; -import { parseParams, encodeQueueSizes } from './params.js'; +import { parseParams } from './params.js'; import { makeQueue } from './make-queue.js'; const console = anylogger('launch-chain'); @@ -210,14 +209,14 @@ function neverStop() { } export async function launch({ - actionQueue, + actionQueueStorage, kernelStateDBDir, mailboxStorage, clearChainSends, replayChainSends, setActivityhash, bridgeOutbound, - makeInstallationPublisher = undefined, + makeInstallationPublisher, vatconfig, argv, env = process.env, @@ -237,28 +236,9 @@ export async function launch({ }); const { kvStore, commit } = hostStorage; - // makeQueue() thinks it should commit/abort, but the kvStore doesn't provide - // those ('commit' is reserved for flushing the block buffer). Furthermore - // the kvStore only deals with string values. - // We create a storage wrapper that adds a prefix to keys, serializes values, - // and disables commit/abort. - - const inboundQueuePrefix = getHostKey('inboundQueue.'); - const inboundQueueStorage = harden({ - get: key => { - const val = kvStore.get(inboundQueuePrefix + key); - return val ? JSON.parse(val) : undefined; - }, - set: (key, value) => { - value !== undefined || Fail`value in inboundQueue must be defined`; - kvStore.set(inboundQueuePrefix + key, JSON.stringify(value)); - }, - delete: key => kvStore.delete(inboundQueuePrefix + key), - commit: () => {}, // disable - abort: () => {}, // disable - }); - /** @type {ReturnType>} */ - const inboundQueue = makeQueue(inboundQueueStorage); + /** @typedef {ReturnType>} ActionQueue */ + /** @type {ActionQueue} */ + const actionQueue = makeQueue(actionQueueStorage); // Not to be confused with the gas model, this meter is for OpenTelemetry. const metricMeter = metricsProvider.getMeter('ag-chain-cosmos'); @@ -289,7 +269,7 @@ export async function launch({ ? parseInt(env.END_BLOCK_SPIN_MS, 10) : 0; - const inboundQueueMetrics = makeInboundQueueMetrics(inboundQueue.size()); + const inboundQueueMetrics = makeInboundQueueMetrics(actionQueue.size()); const { crankScheduler } = exportKernelStats({ controller, metricMeter, @@ -311,33 +291,6 @@ export async function launch({ } } - let savedQueueAllowed = JSON.parse( - kvStore.get(getHostKey('queueAllowed')) || '{}', - ); - - function updateQueueAllowed(_blockHeight, _blockTime, params) { - assert(params.queueMax); - assert(QueueInbound in params.queueMax); - - const inboundQueueMax = params.queueMax[QueueInbound]; - const inboundMempoolQueueMax = Math.floor(inboundQueueMax / 2); - - const inboundQueueSize = inboundQueue.size(); - - const inboundQueueAllowed = Math.max(0, inboundQueueMax - inboundQueueSize); - const inboundMempoolQueueAllowed = Math.max( - 0, - inboundMempoolQueueMax - inboundQueueSize, - ); - - savedQueueAllowed = { - // Keep up-to-date with queue size keys defined in - // golang/cosmos/x/swingset/types/default-params.go - inbound: inboundQueueAllowed, - inbound_mempool: inboundMempoolQueueAllowed, - }; - } - async function saveChainState() { // Save the mailbox state. await mailboxStorage.commit(); @@ -348,7 +301,6 @@ export async function launch({ kvStore.set(getHostKey('height'), `${blockHeight}`); kvStore.set(getHostKey('blockTime'), `${blockTime}`); kvStore.set(getHostKey('chainSends'), JSON.stringify(chainSends)); - kvStore.set(getHostKey('queueAllowed'), JSON.stringify(savedQueueAllowed)); await commit(); } @@ -413,7 +365,6 @@ export async function launch({ installationPublisher === undefined && makeInstallationPublisher !== undefined ) { - // @ts-expect-error not really undefined. TODO give provideInstallationPublisher a signature. installationPublisher = makeInstallationPublisher(); } } @@ -514,18 +465,19 @@ export async function launch({ let keepGoing = await runSwingset(); - // Then process as much as we can from the inboundQueue, which contains + // Then process as much as we can from the actionQueue, which contains // first the old actions followed by the newActions, running the // kernel to completion after each. if (keepGoing) { - for (const { action, inboundNum } of inboundQueue.consumeAll()) { + for (const { action, context } of actionQueue.consumeAll()) { + const inboundNum = `${context.blockHeight}-${context.txHash}-${context.msgIdx}`; inboundQueueMetrics.decStat(); // eslint-disable-next-line no-await-in-loop await performAction(action, inboundNum); // eslint-disable-next-line no-await-in-loop keepGoing = await runSwingset(); if (!keepGoing) { - // any leftover actions will remain on the inboundQueue for possible + // any leftover actions will remain on the actionQueue for possible // processing in the next block break; } @@ -537,22 +489,14 @@ export async function launch({ } } - async function endBlock(blockHeight, blockTime, params, newActions) { + async function endBlock(blockHeight, blockTime, params) { // This is called once per block, during the END_BLOCK event, and // only when we know that cosmos is in sync (else we'd skip kernel - // execution). 'newActions' are the bridge/mailbox/etc events that - // cosmos stored up for delivery to swingset in this block. - - // First, push all newActions onto the end of the inboundQueue, - // remembering that inboundQueue might still have work from the - // previous block - let actionNum = 0; - for (const action of newActions) { - const inboundNum = `${blockHeight}-${actionNum}`; - inboundQueue.push({ action, inboundNum }); - actionNum += 1; - inboundQueueMetrics.incStat(); - } + // execution). + + // First, record new actions (bridge/mailbox/etc events that cosmos + // added up for delivery to swingset) into our inboundQueue metrics + inboundQueueMetrics.updateLength(actionQueue.size()); // We update the timer device at the start of each block, which might push // work onto the end of the kernel run-queue (if any timers were ready to @@ -746,19 +690,14 @@ export async function launch({ blockManagerConsole.info('block', blockHeight, 'begin'); runTime = 0; - if (blockNeedsExecution(blockHeight)) { - // We are not reevaluating, so compute a new queueAllowed - updateQueueAllowed(blockHeight, blockTime, blockParams); - } - controller.writeSlogObject({ type: 'cosmic-swingset-begin-block', blockHeight, blockTime, - queueAllowed: savedQueueAllowed, + actionQueueStats: inboundQueueMetrics.getStats(), }); - return { queue_allowed: encodeQueueSizes(savedQueueAllowed) }; + return undefined; } case ActionType.END_BLOCK: { @@ -794,12 +733,7 @@ export async function launch({ provideInstallationPublisher(); await processAction(action.type, async () => - endBlock( - blockHeight, - blockTime, - blockParams, - actionQueue.consumeAll(), - ), + endBlock(blockHeight, blockTime, blockParams), ); // We write out our on-chain state as a number of chainSends. @@ -815,6 +749,7 @@ export async function launch({ type: 'cosmic-swingset-end-block-finish', blockHeight, blockTime, + actionQueueStats: inboundQueueMetrics.getStats(), }); return undefined; diff --git a/packages/cosmic-swingset/src/make-queue.js b/packages/cosmic-swingset/src/make-queue.js index 2a1a15a6d43..b4a1ce129e2 100644 --- a/packages/cosmic-swingset/src/make-queue.js +++ b/packages/cosmic-swingset/src/make-queue.js @@ -1,14 +1,42 @@ // @ts-check +import { Fail } from '@agoric/assert'; + /** * @typedef {object} QueueStorage * @property {() => void} commit * @property {() => void} abort - * @property {(key: string) => unknown} get - * @property {(key: string, value: unknown) => void} set + * @property {(key: string) => string | undefined} get + * @property {(key: string, value: string) => void} set * @property {(key: string) => void} delete */ +/** + * @typedef {{[idx: number]: string | undefined, head?: string, tail?: string}} QueueStorageDump + * @param {QueueStorageDump} [init] + * @returns {{storage: QueueStorage; dump: () => QueueStorageDump}} + */ +export const makeQueueStorageMock = init => { + const storage = new Map(init && Object.entries(init)); + return harden({ + storage: { + get(key) { + return storage.get(key); + }, + set(key, value) { + typeof value === 'string' || Fail`invalid value type ${value}`; + storage.set(key, value); + }, + delete(key) { + storage.delete(key); + }, + commit() {}, + abort() {}, + }, + dump: () => harden(Object.fromEntries(storage.entries())), + }); +}; + /** * Create a queue backed by some sort of scoped storage. * @@ -29,68 +57,75 @@ * @param {QueueStorage} storage a scoped queue storage */ export const makeQueue = storage => { - const getHead = () => - /** @type {number | undefined} */ (storage.get('head')) || 0; - const getTail = () => - /** @type {number | undefined} */ (storage.get('tail')) || 0; + const getHead = () => BigInt(storage.get('head') || 0); + const getTail = () => BigInt(storage.get('tail') || 0); + /** @type {IterableIterator | null} */ + let currentIterator = null; const queue = { size: () => { - return getTail() - getHead(); + return Number(getTail() - getHead()); }, /** @param {T} obj */ push: obj => { const tail = getTail(); - storage.set('tail', tail + 1); - storage.set(`${tail}`, obj); + storage.set('tail', String(tail + 1n)); + storage.set(`${tail}`, JSON.stringify(obj)); storage.commit(); + currentIterator = null; }, - /** @returns {Iterable} */ - consumeAll: () => ({ - [Symbol.iterator]: () => { - let done = false; - let head = getHead(); - const tail = getTail(); - return { - next: () => { - if (!done) { - if (head < tail) { - // Still within the queue. - const headKey = `${head}`; - const value = /** @type {T} */ (storage.get(headKey)); - storage.delete(headKey); - head += 1; - return { value, done }; - } - // Reached the end, so clean up our indices. - storage.delete('head'); - storage.delete('tail'); - storage.commit(); - done = true; - } - return { value: undefined, done }; - }, - return: () => { - if (!done) { - // We're done consuming, so save our state. - storage.set('head', head); - storage.commit(); - done = true; - } - return { value: undefined, done }; - }, - throw: err => { - if (!done) { - // Don't change our state. - storage.abort(); - done = true; - throw err; + /** @returns {IterableIterator} */ + consumeAll: () => { + let head = getHead(); + const tail = getTail(); + let done = !(head < tail); + const iterator = { + [Symbol.iterator]: () => iterator, + next: () => { + currentIterator === iterator || Fail`invalid iterator`; + if (!done) { + if (head < tail) { + // Still within the queue. + const headKey = `${head}`; + const value = JSON.parse( + /** @type {string} */ (storage.get(headKey)), + ); + storage.delete(headKey); + head += 1n; + return { value, done }; } - return { value: undefined, done }; - }, - }; - }, - }), + // Reached the end, so clean up our indices. + storage.delete('head'); + storage.delete('tail'); + storage.commit(); + done = true; + } + return { value: undefined, done }; + }, + return: () => { + currentIterator === iterator || Fail`invalid iterator`; + if (!done) { + // We're done consuming, so save our state. + storage.set('head', String(head)); + storage.commit(); + done = true; + } + return { value: undefined, done }; + }, + throw: err => { + currentIterator === iterator || Fail`invalid iterator`; + if (!done) { + // Don't change our state. + storage.abort(); + done = true; + throw err; + } + return { value: undefined, done }; + }, + }; + currentIterator = iterator; + return iterator; + }, }; return queue; }; diff --git a/packages/cosmic-swingset/src/sim-chain.js b/packages/cosmic-swingset/src/sim-chain.js index b817feb5c9a..52e2f877df1 100644 --- a/packages/cosmic-swingset/src/sim-chain.js +++ b/packages/cosmic-swingset/src/sim-chain.js @@ -12,7 +12,7 @@ import anylogger from 'anylogger'; import { makeSlogSender } from '@agoric/telemetry'; import { resolve as importMetaResolve } from 'import-meta-resolve'; -import { assert, Fail } from '@agoric/assert'; +import { Fail } from '@agoric/assert'; import { makeWithQueue } from '@agoric/internal/src/queue.js'; import { makeBatchedDeliver } from '@agoric/internal/src/batched-deliver.js'; import stringify from './json-stable-stringify.js'; @@ -20,6 +20,7 @@ import { launch } from './launch-chain.js'; import { getTelemetryProviders } from './kernel-stats.js'; import { DEFAULT_SIM_SWINGSET_PARAMS, QueueInbound } from './sim-params.js'; import { parseQueueSizes } from './params.js'; +import { makeQueue, makeQueueStorageMock } from './make-queue.js'; const console = anylogger('fake-chain'); @@ -96,18 +97,11 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { env, }); - let aqContents = []; - const actionQueue = { - /** @returns {Iterable} */ - consumeAll: () => { - const iterable = aqContents; - aqContents = []; - return iterable; - }, - }; + const actionQueueStorage = makeQueueStorageMock().storage; + const actionQueue = makeQueue(actionQueueStorage); const s = await launch({ - actionQueue, + actionQueueStorage, kernelStateDBDir: stateDBdir, mailboxStorage, clearChainSends, @@ -141,22 +135,27 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) { blockTime, params, }; - const beginBlockResult = await blockingSend(beginAction); - assert(beginBlockResult); - const queueAllowed = parseQueueSizes(beginBlockResult.queue_allowed); - assert(QueueInbound in queueAllowed); + await blockingSend(beginAction); + const inboundQueueMax = parseQueueSizes(params.queue_max)[QueueInbound]; + const inboundQueueAllowed = Math.max( + 0, + inboundQueueMax - actionQueue.size(), + ); // Gather up the new messages into the latest block. - const thisBlock = intoChain.splice(0, queueAllowed[QueueInbound]); - - for (const [newMessages, acknum] of thisBlock) { - aqContents.push({ - type: 'DELIVER_INBOUND', - peer: bootAddress, - messages: newMessages, - ack: acknum, - blockHeight, - blockTime, + const thisBlock = intoChain.splice(0, inboundQueueAllowed); + + for (const [i, [newMessages, acknum]] of thisBlock.entries()) { + actionQueue.push({ + action: { + type: 'DELIVER_INBOUND', + peer: bootAddress, + messages: newMessages, + ack: acknum, + blockHeight, + blockTime, + }, + context: { blockHeight, txHash: 'simTx', msgIdx: i }, }); } const endAction = { type: 'END_BLOCK', blockHeight, blockTime };