Skip to content

Commit

Permalink
feat(watchUtils): handle non-storables
Browse files Browse the repository at this point in the history
Co-authored-by: Michael FIG <mfig@agoric.com>
  • Loading branch information
0xpatrickdev and michaelfig committed Jul 1, 2024
1 parent c940d5c commit 8c27c67
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 24 deletions.
2 changes: 1 addition & 1 deletion packages/base-zone/src/watch-promise.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const { apply } = Reflect;
/**
* A PromiseWatcher method guard callable with or more arguments, returning void.
*/
export const PromiseWatcherHandler = M.call(M.any()).rest(M.any()).returns();
export const PromiseWatcherHandler = M.call(M.raw()).rest(M.raw()).returns();

/**
* A PromiseWatcher interface that has both onFulfilled and onRejected handlers.
Expand Down
6 changes: 4 additions & 2 deletions packages/vow/src/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import { prepareWatch } from './watch.js';
import { prepareWatchUtils } from './watch-utils.js';
import { makeAsVow } from './vow-utils.js';

/** @import {Zone} from '@agoric/base-zone' */
/** @import {IsRetryableReason, AsPromiseFunction} from './types.js' */
/**
* @import {Zone} from '@agoric/base-zone';
* @import {IsRetryableReason, AsPromiseFunction} from './types.js';
*/

/**
* @param {Zone} zone
Expand Down
50 changes: 39 additions & 11 deletions packages/vow/src/vow.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import { M } from '@endo/patterns';
import { makeTagged } from '@endo/pass-style';
import { PromiseWatcherI } from '@agoric/base-zone';

const { details: X } = assert;

/**
* @import {PromiseKit} from '@endo/promise-kit'
* @import {Zone} from '@agoric/base-zone'
* @import {VowResolver, VowKit} from './types.js'
* @import {PromiseKit} from '@endo/promise-kit';
* @import {Zone} from '@agoric/base-zone';
* @import {VowResolver, VowKit} from './types.js';
*/

const sink = () => {};
Expand Down Expand Up @@ -61,13 +63,13 @@ export const prepareVowKit = zone => {
shorten: M.call().returns(M.promise()),
}),
resolver: M.interface('VowResolver', {
resolve: M.call().optional(M.any()).returns(),
reject: M.call().optional(M.any()).returns(),
resolve: M.call().optional(M.raw()).returns(),
reject: M.call().optional(M.raw()).returns(),
}),
watchNextStep: PromiseWatcherI,
},
() => ({
value: undefined,
value: /** @type {any} */ (undefined),
// The stepStatus is null if the promise step hasn't settled yet.
stepStatus: /** @type {null | 'pending' | 'fulfilled' | 'rejected'} */ (
null
Expand All @@ -80,11 +82,18 @@ export const prepareVowKit = zone => {
*/
async shorten() {
const { stepStatus, value } = this.state;
const { resolver } = this.facets;
const ephemera = resolverToEphemera.get(resolver);

switch (stepStatus) {
case 'fulfilled':
case 'fulfilled': {
if (ephemera) return ephemera.promise;
return value;
case 'rejected':
}
case 'rejected': {
if (ephemera) return ephemera.promise;
throw value;
}
case null:
case 'pending':
return provideCurrentKit(this.facets.resolver).promise;
Expand Down Expand Up @@ -129,17 +138,36 @@ export const prepareVowKit = zone => {
},
watchNextStep: {
onFulfilled(value) {
const { resolver } = this.facets;
const { resolver, watchNextStep } = this.facets;
const { resolve } = getPromiseKitForResolution(resolver);
harden(value);
if (resolve) {
resolve(value);
}
this.state.stepStatus = 'fulfilled';
this.state.value = value;
if (zone.isStorable(value)) {
this.state.value = value;
} else {
watchNextStep.onRejected(
assert.error(X`Vow fulfillment value is not storable: ${value}`),
);
}
},
onRejected(reason) {
const { resolver } = this.facets;
const { reject } = getPromiseKitForResolution(resolver);
harden(reason);
if (reject) {
reject(reason);
}
this.state.stepStatus = 'rejected';
this.state.value = reason;
if (zone.isStorable(reason)) {
this.state.value = reason;
} else {
this.state.value = assert.error(
X`Vow rejection reason is not storable: ${reason}`,
);
}
},
},
},
Expand Down
74 changes: 64 additions & 10 deletions packages/vow/src/watch-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { M } from '@endo/patterns';
import { PromiseWatcherI } from '@agoric/base-zone';

const { Fail, bare } = assert;
const { Fail, bare, details: X } = assert;

/**
* @import {MapStore} from '@agoric/store/src/types.js'
Expand All @@ -21,6 +21,20 @@ const VowShape = M.tagged(
}),
);

/**
* Like `provideLazy`, but accepts non-Passable values.
*
* @param {WeakMap} map
* @param {any} key
* @param {(key: any) => any} makeValue
*/
const provideLazyMap = (map, key, makeValue) => {
if (!map.has(key)) {
map.set(key, makeValue(key));
}
return map.get(key);
};

/**
* @param {Zone} zone
* @param {object} powers
Expand All @@ -34,6 +48,8 @@ export const prepareWatchUtils = (
{ watch, when, makeVowKit, isRetryableReason },
) => {
const detached = zone.detached();
const utilsToNonStorableResults = new WeakMap();

const makeWatchUtilsKit = zone.exoClassKit(
'WatchUtils',
{
Expand Down Expand Up @@ -75,7 +91,11 @@ export const prepareWatchUtils = (
// Preserve the order of the vow results.
let index = 0;
for (const vow of vows) {
watch(vow, this.facets.watcher, { id, index });
watch(vow, this.facets.watcher, {
id,
index,
numResults: vows.length,
});
index += 1;
}

Expand All @@ -90,6 +110,12 @@ export const prepareWatchUtils = (
resultsMap: detached.mapStore('resultsMap'),
}),
);
const idToNonStorableResults = provideLazyMap(
utilsToNonStorableResults,
this.facets.utils,
() => new Map(),
);
idToNonStorableResults.set(id, new Map());
} else {
// Base case: nothing to wait for.
kit.resolver.resolve(harden([]));
Expand All @@ -110,15 +136,30 @@ export const prepareWatchUtils = (
},
},
watcher: {
onFulfilled(value, { id, index }) {
onFulfilled(value, { id, index, numResults }) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// Resolution of the returned vow happened already.
return;
}
const { remaining, resultsMap, resolver } = idToVowState.get(id);
const idToNonStorableResults = provideLazyMap(
utilsToNonStorableResults,
this.facets.utils,
() => new Map(),
);
const nonStorableResults = provideLazyMap(
idToNonStorableResults,
id,
() => new Map(),
);

// Capture the fulfilled value.
resultsMap.init(index, value);
if (zone.isStorable(value)) {
resultsMap.init(index, value);
} else {
nonStorableResults.set(index, value);
}
const vowState = harden({
remaining: remaining - 1,
resultsMap,
Expand All @@ -130,13 +171,26 @@ export const prepareWatchUtils = (
}
// We're done! Extract the array.
idToVowState.delete(id);
const results = new Array(resultsMap.getSize());
for (const [i, val] of resultsMap.entries()) {
results[i] = val;
const results = new Array(numResults);
let numLost = 0;
for (let i = 0; i < numResults; i += 1) {
if (nonStorableResults.has(i)) {
results[i] = nonStorableResults.get(i);
} else if (resultsMap.has(i)) {
results[i] = resultsMap.get(i);
} else {
numLost += 1;
}
}
if (numLost > 0) {
resolver.reject(
assert.error(X`${numLost} unstorable results were lost`),
);
} else {
resolver.resolve(harden(results));
}
resolver.resolve(harden(results));
},
onRejected(value, { id, index: _index }) {
onRejected(value, { id, index: _index, numResults: _numResults }) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// First rejection wins.
Expand All @@ -151,7 +205,7 @@ export const prepareWatchUtils = (
onFulfilled(_result) {},
onRejected(reason, failedOp) {
if (isRetryableReason(reason, undefined)) {
Fail`Pending ${bare(failedOp)} could not retry; {reason}`;
Fail`Pending ${bare(failedOp)} could not retry; ${reason}`;
}
},
},
Expand Down

0 comments on commit 8c27c67

Please sign in to comment.