Skip to content

Commit

Permalink
feat(cosmic-swingset): process highPriorityQueue actions
Browse files Browse the repository at this point in the history
Drain the `highPriorityQueue` before the `actionQueue`
(#5966) and before polling
timers (#6964).
  • Loading branch information
mhofman committed Apr 27, 2023
1 parent 29df491 commit 182a96e
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 12 deletions.
27 changes: 20 additions & 7 deletions golang/cosmos/x/swingset/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
// Top-level paths for chain storage should remain synchronized with
// packages/internal/src/chain-storage-paths.js
const (
StoragePathActionQueue = "actionQueue"
StoragePathBeansOwing = "beansOwing"
StoragePathEgress = "egress"
StoragePathMailbox = "mailbox"
StoragePathCustom = "published"
StoragePathBundles = "bundles"
StoragePathSwingStore = "swingStore"
StoragePathActionQueue = "actionQueue"
StoragePathHighPriorityQueue = "highPriorityQueue"
StoragePathBeansOwing = "beansOwing"
StoragePathEgress = "egress"
StoragePathMailbox = "mailbox"
StoragePathCustom = "published"
StoragePathBundles = "bundles"
StoragePathSwingStore = "swingStore"
)

// 2 ** 256 - 1
Expand Down Expand Up @@ -150,6 +151,11 @@ func (k Keeper) PushAction(ctx sdk.Context, action vm.Jsonable) error {
return k.pushAction(ctx, StoragePathActionQueue, action)
}

// PushAction appends an action to the controller's highPriorityQueue.
func (k Keeper) PushHighPriorityAction(ctx sdk.Context, action vm.Jsonable) error {
return k.pushAction(ctx, StoragePathHighPriorityQueue, action)
}

func (k Keeper) queueIndex(ctx sdk.Context, queuePath string, position string) (sdk.Int, error) {
// Position should be either "head" or "tail"
path := queuePath + "." + position
Expand Down Expand Up @@ -180,6 +186,13 @@ func (k Keeper) queueLength(ctx sdk.Context, queuePath string) (sdk.Int, error)

func (k Keeper) InboundQueueLength(ctx sdk.Context) (int32, error) {
size := sdk.NewInt(0)

highPriorityQueueLength, err := k.queueLength(ctx, StoragePathHighPriorityQueue)
if err != nil {
return 0, err
}
size = size.Add(highPriorityQueueLength)

actionQueueLength, err := k.queueLength(ctx, StoragePathActionQueue)
if err != nil {
return 0, err
Expand Down
4 changes: 4 additions & 0 deletions packages/cosmic-swingset/src/chain-main.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ export default async function main(progname, args, { env, homedir, agcc }) {
return harden({ ...kvStore, commit, abort });
};
const actionQueueStorage = makeQueueStorage(STORAGE_PATH.ACTION_QUEUE);
const highPriorityQueueStorage = makeQueueStorage(
STORAGE_PATH.HIGH_PRIORITY_QUEUE,
);
/**
* Callback invoked during SwingSet execution when new "export data" is
* generated by swingStore to be saved in the host's verified DB. In our
Expand Down Expand Up @@ -449,6 +452,7 @@ export default async function main(progname, args, { env, homedir, agcc }) {

const s = await launch({
actionQueueStorage,
highPriorityQueueStorage,
kernelStateDBDir: stateDBDir,
makeInstallationPublisher,
mailboxStorage,
Expand Down
6 changes: 3 additions & 3 deletions packages/cosmic-swingset/src/helpers/make-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ export const makeQueueStorageMock = init => {
* - `tail`: the index *past* the last entry in the queue.
* - `<index>`: the contents of the queue at the given index.
*
* For a cosmos inbound queue (e.g. `actionQueue`), the golang side will push
* into the queue, updating the index stored at key `${queuePath}.tail` and
* setting data for key `${queuePath}.${index}`.
* For the cosmos inbound queues (`actionQueue` or `highPriorityQueue`), the
* golang side will push into the queue, updating the index stored at key
* `${queuePath}.tail` and setting data for key `${queuePath}.${index}`.
* The JS side will shift the queue, updating the index at key
* `${queuePath}.head` and reading and deleting `${queuePath}.${index}`.
*
Expand Down
15 changes: 13 additions & 2 deletions packages/cosmic-swingset/src/launch-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ function neverStop() {

export async function launch({
actionQueueStorage,
highPriorityQueueStorage,
kernelStateDBDir,
mailboxStorage,
clearChainSends,
Expand Down Expand Up @@ -293,6 +294,8 @@ export async function launch({
/** @typedef {ReturnType<typeof makeQueue<{context: any, action: any}>>} InboundQueue */
/** @type {InboundQueue} */
const actionQueue = makeQueue(actionQueueStorage);
/** @type {InboundQueue} */
const highPriorityQueue = makeQueue(highPriorityQueueStorage);

// Not to be confused with the gas model, this meter is for OpenTelemetry.
const metricMeter = metricsProvider.getMeter('ag-chain-cosmos');
Expand Down Expand Up @@ -323,7 +326,9 @@ export async function launch({
? parseInt(env.END_BLOCK_SPIN_MS, 10)
: 0;

const inboundQueueMetrics = makeInboundQueueMetrics(actionQueue.size());
const inboundQueueMetrics = makeInboundQueueMetrics(
actionQueue.size() + highPriorityQueue.size(),
);
const { crankScheduler } = exportKernelStats({
controller,
metricMeter,
Expand Down Expand Up @@ -545,6 +550,10 @@ export async function launch({
let keepGoing = await runSwingset();
if (!keepGoing) return;

// Then, process as much as we can from the priorityQueue.
keepGoing = await processActions(highPriorityQueue);
if (!keepGoing) return;

// Then, update the timer device with the new external time, which might
// push work onto the kernel run-queue (if any timers were ready to wake).
const addedToQueue = timer.poll(blockTime);
Expand All @@ -569,7 +578,9 @@ export async function launch({

// First, record new actions (bridge/mailbox/etc events that cosmos
// added up for delivery to swingset) into our inboundQueue metrics
inboundQueueMetrics.updateLength(actionQueue.size());
inboundQueueMetrics.updateLength(
actionQueue.size() + highPriorityQueue.size(),
);

// make a runPolicy that will be shared across all cycles
const runPolicy = computronCounter(params.beansPerUnit);
Expand Down
2 changes: 2 additions & 0 deletions packages/cosmic-swingset/src/sim-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) {
});

const actionQueueStorage = makeQueueStorageMock().storage;
const highPriorityQueueStorage = makeQueueStorageMock().storage;
const actionQueue = makeQueue(actionQueueStorage);

const s = await launch({
actionQueueStorage,
highPriorityQueueStorage,
kernelStateDBDir: stateDBdir,
mailboxStorage,
clearChainSends,
Expand Down
1 change: 1 addition & 0 deletions packages/internal/src/chain-storage-paths.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* golang/cosmos/x/swingset/keeper/keeper.go
*/
export const ACTION_QUEUE = 'actionQueue';
export const HIGH_PRIORITY_QUEUE = 'highPriorityQueue';
export const HIGH_PRIORITY_SENDERS = 'highPrioritySenders';
export const BEANSOWING = 'beansOwing';
export const EGRESS = 'egress';
Expand Down

0 comments on commit 182a96e

Please sign in to comment.