Skip to content

Commit

Permalink
feat: vowTools.allVowsSettled
Browse files Browse the repository at this point in the history
- adds vowTools helper that mimics the behavior of Promise.allSettled
  • Loading branch information
0xpatrickdev committed Sep 12, 2024
1 parent 3656582 commit 891b473
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 31 deletions.
8 changes: 8 additions & 0 deletions packages/vow/src/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
*/
const allVows = maybeVows => watchUtils.all(maybeVows);

/**
* Vow-tolerant implementation of Promise.allSettled.
*
* @param {EVow<unknown>[]} maybeVows
*/
const allVowsSettled = maybeVows => watchUtils.allSettled(maybeVows);

/** @type {AsPromiseFunction} */
const asPromise = (specimenP, ...watcherArgs) =>
watchUtils.asPromise(specimenP, ...watcherArgs);
Expand All @@ -67,6 +74,7 @@ export const prepareBasicVowTools = (zone, powers = {}) => {
watch,
makeVowKit,
allVows,
allVowsSettled,
asVow,
asPromise,
retriable,
Expand Down
119 changes: 88 additions & 31 deletions packages/vow/src/watch-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,17 @@ export const prepareWatchUtils = (
{
utils: M.interface('Utils', {
all: M.call(M.arrayOf(M.any())).returns(VowShape),
allSettled: M.call(M.arrayOf(M.any())).returns(VowShape),
asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()),
}),
watcher: M.interface('Watcher', {
onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()),
onRejected: M.call(M.any()).rest(M.any()).returns(M.any()),
}),
helper: M.interface('Helper', {
createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape),
processResult: M.call(M.any()).rest(M.any()).returns(M.undefined()),
}),
retryRejectionPromiseWatcher: PromiseWatcherI,
},
() => {
Expand All @@ -68,6 +73,7 @@ export const prepareWatchUtils = (
* @property {number} remaining
* @property {MapStore<number, any>} resultsMap
* @property {VowKit['resolver']} resolver
* @property {boolean} [isAllSettled]
*/
/** @type {MapStore<bigint, VowState>} */
const idToVowState = detached.mapStore('idToVowState');
Expand All @@ -79,20 +85,67 @@ export const prepareWatchUtils = (
},
{
utils: {
/** @param {EVow<unknown>[]} vows */
all(vows) {
return this.facets.helper.createVow(vows, false);
},
/** @param {EVow<unknown>[]} vows */
allSettled(vows) {
return this.facets.helper.createVow(vows, true);
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
/**
* @param {unknown} value
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
onFulfilled(value, ctx) {
this.facets.helper.processResult(value, ctx, 'fulfilled');
},
/**
* @param {unknown} reason
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
*/
onRejected(reason, ctx) {
this.facets.helper.processResult(reason, ctx, 'rejected');
},
},
helper: {
/**
* @param {EVow<unknown>[]} vows
* @param {boolean} isAllSettled
*/
all(vows) {
createVow(vows, isAllSettled) {
const { nextId: id, idToVowState } = this.state;
/** @type {VowKit<any[]>} */
const kit = makeVowKit();

// Preserve the order of the vow results.
for (let index = 0; index < vows.length; index += 1) {
watch(vows[index], this.facets.watcher, {
id,
index,
numResults: vows.length,
isAllSettled,
});
}

Expand All @@ -105,6 +158,7 @@ export const prepareWatchUtils = (
resolver: kit.resolver,
remaining: vows.length,
resultsMap: detached.mapStore('resultsMap'),
isAllSettled,
}),
);
const idToNonStorableResults = provideLazyMap(
Expand All @@ -119,27 +173,36 @@ export const prepareWatchUtils = (
}
return kit.vow;
},
/** @type {AsPromiseFunction} */
asPromise(specimenP, ...watcherArgs) {
// Watch the specimen in case it is an ephemeral promise.
const vow = watch(specimenP, ...watcherArgs);
const promise = when(vow);
// Watch the ephemeral result promise to ensure that if its settlement is
// lost due to upgrade of this incarnation, we will at least cause an
// unhandled rejection in the new incarnation.
zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher);

return promise;
},
},
watcher: {
onFulfilled(value, { id, index, numResults }) {
/**
* @param {unknown} result
* @param {object} ctx
* @param {bigint} ctx.id
* @param {number} ctx.index
* @param {number} ctx.numResults
* @param {boolean} ctx.isAllSettled
* @param {'fulfilled' | 'rejected'} status
*/
processResult(result, { id, index, numResults, isAllSettled }, status) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// Resolution of the returned vow happened already.
return;
}
const { remaining, resultsMap, resolver } = idToVowState.get(id);
if (!isAllSettled && status === 'rejected') {
// For 'all', we reject immediately on the first rejection
idToVowState.delete(id);
resolver.reject(result);
return;
}

const possiblyWrappedResult = isAllSettled
? harden({
status,
[status === 'fulfilled' ? 'value' : 'reason']: result,
})
: result;

const idToNonStorableResults = provideLazyMap(
utilsToNonStorableResults,
this.facets.utils,
Expand All @@ -152,15 +215,16 @@ export const prepareWatchUtils = (
);

// Capture the fulfilled value.
if (zone.isStorable(value)) {
resultsMap.init(index, value);
if (zone.isStorable(possiblyWrappedResult)) {
resultsMap.init(index, possiblyWrappedResult);
} else {
nonStorableResults.set(index, value);
nonStorableResults.set(index, possiblyWrappedResult);
}
const vowState = harden({
remaining: remaining - 1,
resultsMap,
resolver,
isAllSettled,
});
if (vowState.remaining > 0) {
idToVowState.set(id, vowState);
Expand All @@ -177,26 +241,19 @@ export const prepareWatchUtils = (
results[i] = resultsMap.get(i);
} else {
numLost += 1;
results[i] = isAllSettled
? { status: 'rejected', reason: 'Unstorable result was lost' }
: undefined;
}
}
if (numLost > 0) {
if (numLost > 0 && !isAllSettled) {
resolver.reject(
assert.error(X`${numLost} unstorable results were lost`),
);
} else {
resolver.resolve(harden(results));
}
},
onRejected(value, { id, index: _index, numResults: _numResults }) {
const { idToVowState } = this.state;
if (!idToVowState.has(id)) {
// First rejection wins.
return;
}
const { resolver } = idToVowState.get(id);
idToVowState.delete(id);
resolver.reject(value);
},
},
retryRejectionPromiseWatcher: {
onFulfilled(_result) {},
Expand Down
41 changes: 41 additions & 0 deletions packages/vow/test/watch-utils.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @ts-check
/* global setTimeout */
import test from 'ava';

import { makeHeapZone } from '@agoric/base-zone/heap.js';
Expand Down Expand Up @@ -252,3 +253,43 @@ test('asPromise handles watcher arguments', async t => {
t.is(result, 'watcher test');
t.true(watcherCalled);
});

test('allVowsSettled handles mixed fulfilled and rejected vows', async t => {
const zone = makeHeapZone();
const { watch, when, allVowsSettled } = prepareBasicVowTools(zone);

const vowA = watch(Promise.resolve('a'));
const vowB = watch(Promise.reject(new Error('b')));
const vowC = watch(Promise.resolve('c'));

const result = await when(allVowsSettled([vowA, vowB, vowC]));
t.is(result.length, 3);
t.deepEqual(result[0], { status: 'fulfilled', value: 'a' });
t.deepEqual(result[1], {
status: 'rejected',
reason: new Error('b'),
});
t.deepEqual(result[2], { status: 'fulfilled', value: 'c' });
});

test('allVowsSettled returns vows in order', async t => {
const zone = makeHeapZone();
const { watch, when, allVowsSettled, makeVowKit } =
prepareBasicVowTools(zone);
const kit = makeVowKit();

const vowA = watch(kit.vow);
const vowB = watch(Promise.resolve('b'));
const vowC = watch(Promise.reject(new Error('c')));
const allSettledV = allVowsSettled([vowA, vowB, vowC]);
setTimeout(() => kit.resolver.resolve('a'), 250);

const result = await when(allSettledV);
t.is(result.length, 3);
t.deepEqual(result[0], { status: 'fulfilled', value: 'a' });
t.deepEqual(result[1], { status: 'fulfilled', value: 'b' });
t.deepEqual(result[2], {
status: 'rejected',
reason: new Error('c'),
});
});

0 comments on commit 891b473

Please sign in to comment.