Skip to content

Commit

Permalink
feat: vowTools.allSettled
Browse files Browse the repository at this point in the history
- adds vowTools helper that mimics the behavior of Promise.allSettled
  • Loading branch information
0xpatrickdev committed Sep 18, 2024
1 parent ac7fc7b commit 3949b10
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 44 deletions.
12 changes: 12 additions & 0 deletions packages/vow/src/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { makeWhen } from './when.js';

/**
* @import {Zone} from '@agoric/base-zone';
* @import {Passable} from '@endo/pass-style';
* @import {IsRetryableReason, AsPromiseFunction, EVow, Vow, ERef} from './types.js';
*/

Expand Down Expand Up @@ -68,6 +69,16 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
*/
const allVows = all;

/**
* Vow-tolerant implementation of Promise.allSettled that takes an iterable
* of vows and other {@link Passable}s and returns a single {@link Vow}. It
* resolves when all of the input's promises or vows are settled with an
* array of settled outcome objects.
*
* @param {unknown[]} maybeVows
*/
const allSettled = maybeVows => watchUtils.allSettled(maybeVows);

/** @type {AsPromiseFunction} */
const asPromise = (specimenP, ...watcherArgs) =>
watchUtils.asPromise(specimenP, ...watcherArgs);
Expand All @@ -78,6 +89,7 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
makeVowKit,
all,
allVows,
allSettled,
asVow,
asPromise,
retriable,
Expand Down
1 change: 1 addition & 0 deletions packages/vow/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export {};
*/

/**
* Vows are objects that represent promises that can be stored durably.
* @template [T=any]
* @typedef {CopyTagged<'Vow', VowPayload<T>>} Vow
*/
Expand Down
136 changes: 98 additions & 38 deletions packages/vow/src/watch-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const { Fail, bare, details: X } = assert;
* @import {Zone} from '@agoric/base-zone';
* @import {Watch} from './watch.js';
* @import {When} from './when.js';
* @import {VowKit, AsPromiseFunction, IsRetryableReason, EVow} from './types.js';
* @import {VowKit, AsPromiseFunction, IsRetryableReason, Vow} from './types.js';
*/

const VowShape = M.tagged(
Expand Down Expand Up @@ -54,12 +54,17 @@ export const prepareWatchUtils = (
{
utils: M.interface('Utils', {
all: M.call(M.arrayOf(M.any())).returns(VowShape),
allSettled: M.call(M.arrayOf(M.any())).returns(VowShape),
asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()),
}),
watcher: M.interface('Watcher', {
onFulfilled: M.call(M.raw()).rest(M.raw()).returns(M.raw()),
onRejected: M.call(M.raw()).rest(M.raw()).returns(M.raw()),
}),
helper: M.interface('Helper', {
createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape),
processResult: M.call(M.raw()).rest(M.raw()).returns(M.undefined()),
}),
retryRejectionPromiseWatcher: PromiseWatcherI,
},
() => {
Expand All @@ -68,6 +73,7 @@ export const prepareWatchUtils = (
* @property {number} remaining
* @property {MapStore<number, any>} resultsMap
* @property {VowKit['resolver']} resolver
* @property {boolean} [isAllSettled]
*/
/** @type {MapStore<bigint, VowState>} */
const idToVowState = detached.mapStore('idToVowState');
Expand All @@ -79,32 +85,83 @@ export const prepareWatchUtils = (
},
{
utils: {
/** @param {unknown[]} specimens */
all(specimens) {
return this.facets.helper.createVow(specimens, false);
},
/** @param {unknown[]} specimens */
allSettled(specimens) {
return /** @type {Vow<({status: 'fulfilled', value: any} | {status: 'rejected', reason: any})[]>} */ (
this.facets.helper.createVow(specimens, true)
);
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
/**
* @param {EVow<unknown>[]} vows
* @param {unknown} value
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
all(vows) {
onFulfilled(value, ctx) {
this.facets.helper.processResult(value, ctx, 'fulfilled');
},
/**
* @param {unknown} reason
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
onRejected(reason, ctx) {
this.facets.helper.processResult(reason, ctx, 'rejected');
},
},
helper: {
/**
* @param {unknown[]} specimens
* @param {boolean} isAllSettled
*/
createVow(specimens, isAllSettled) {
const { nextId: id, idToVowState } = this.state;
/** @type {VowKit<any[]>} */
const kit = makeVowKit();

// Preserve the order of the vow results.
for (let index = 0; index < vows.length; index += 1) {
watch(vows[index], this.facets.watcher, {
// Preserve the order of the results.
for (let index = 0; index < specimens.length; index += 1) {
watch(specimens[index], this.facets.watcher, {
id,
index,
numResults: vows.length,
numResults: specimens.length,
isAllSettled,
});
}

if (vows.length > 0) {
if (specimens.length > 0) {
// Save the state until rejection or all fulfilled.
this.state.nextId += 1n;
idToVowState.init(
id,
harden({
resolver: kit.resolver,
remaining: vows.length,
remaining: specimens.length,
resultsMap: detached.mapStore('resultsMap'),
isAllSettled,
}),
);
const idToNonStorableResults = provideLazyMap(
Expand All @@ -119,27 +176,36 @@ export const prepareWatchUtils = (
}
return kit.vow;
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
onFulfilled(value, { id, index, numResults }) {
/**
* @param {unknown} result
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
* @param {'fulfilled' | 'rejected'} status
*/
processResult(result, { id, index, numResults, isAllSettled }, status) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// Resolution of the returned vow happened already.
return;
}
const { remaining, resultsMap, resolver } = idToVowState.get(id);
if (!isAllSettled && status === 'rejected') {
// For 'all', we reject immediately on the first rejection
idToVowState.delete(id);
resolver.reject(result);
return;
}

const possiblyWrappedResult = isAllSettled
? harden({
status,
[status === 'fulfilled' ? 'value' : 'reason']: result,
})
: result;

const idToNonStorableResults = provideLazyMap(
utilsToNonStorableResults,
this.facets.utils,
Expand All @@ -152,15 +218,16 @@ export const prepareWatchUtils = (
);

// Capture the fulfilled value.
if (zone.isStorable(value)) {
resultsMap.init(index, value);
if (zone.isStorable(possiblyWrappedResult)) {
resultsMap.init(index, possiblyWrappedResult);
} else {
nonStorableResults.set(index, value);
nonStorableResults.set(index, possiblyWrappedResult);
}
const vowState = harden({
remaining: remaining - 1,
resultsMap,
resolver,
isAllSettled,
});
if (vowState.remaining > 0) {
idToVowState.set(id, vowState);
Expand All @@ -177,26 +244,19 @@ export const prepareWatchUtils = (
results[i] = resultsMap.get(i);
} else {
numLost += 1;
results[i] = isAllSettled
? { status: 'rejected', reason: 'Unstorable result was lost' }
: undefined;
}
}
if (numLost > 0) {
if (numLost > 0 && !isAllSettled) {
resolver.reject(
assert.error(X`${numLost} unstorable results were lost`),
);
} else {
resolver.resolve(harden(results));
}
},
onRejected(value, { id, index: _index, numResults: _numResults }) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// First rejection wins.
return;
}
const { resolver } = idToVowState.get(id);
idToVowState.delete(id);
resolver.reject(value);
},
},
retryRejectionPromiseWatcher: {
onFulfilled(_result) {},
Expand Down
15 changes: 15 additions & 0 deletions packages/vow/test/types.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,18 @@ expectType<(p1: number, p2: string) => Vow<{ someValue: 'bar' }>>(
Promise.resolve({ someValue: 'bar' } as const),
),
);

expectType<
Vow<
(
| { status: 'fulfilled'; value: any }
| { status: 'rejected'; reason: any }
)[]
>
>(
vt.allSettled([
Promise.resolve(1),
Promise.reject(new Error('test')),
Promise.resolve('hello'),
]),
);
Loading

0 comments on commit 3949b10

Please sign in to comment.