diff --git a/packages/vow/src/tools.js b/packages/vow/src/tools.js index ff35539726c..af497d6f9f5 100644 --- a/packages/vow/src/tools.js +++ b/packages/vow/src/tools.js @@ -58,6 +58,13 @@ export const prepareBasicVowTools = (zone, powers = {}) => { */ const allVows = maybeVows => watchUtils.all(maybeVows); + /** + * Vow-tolerant implementation of Promise.allSettled. + * + * @param {EVow[]} maybeVows + */ + const allVowsSettled = maybeVows => watchUtils.allSettled(maybeVows); + /** @type {AsPromiseFunction} */ const asPromise = (specimenP, ...watcherArgs) => watchUtils.asPromise(specimenP, ...watcherArgs); @@ -67,6 +74,7 @@ export const prepareBasicVowTools = (zone, powers = {}) => { watch, makeVowKit, allVows, + allVowsSettled, asVow, asPromise, retriable, diff --git a/packages/vow/src/watch-utils.js b/packages/vow/src/watch-utils.js index 0bb740530bc..eb9f28f0852 100644 --- a/packages/vow/src/watch-utils.js +++ b/packages/vow/src/watch-utils.js @@ -54,12 +54,17 @@ export const prepareWatchUtils = ( { utils: M.interface('Utils', { all: M.call(M.arrayOf(M.any())).returns(VowShape), + allSettled: M.call(M.arrayOf(M.any())).returns(VowShape), asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()), }), watcher: M.interface('Watcher', { onFulfilled: M.call(M.any()).rest(M.any()).returns(M.any()), onRejected: M.call(M.any()).rest(M.any()).returns(M.any()), }), + helper: M.interface('Helper', { + createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape), + processResult: M.call(M.any()).rest(M.any()).returns(M.undefined()), + }), retryRejectionPromiseWatcher: PromiseWatcherI, }, () => { @@ -68,6 +73,7 @@ export const prepareWatchUtils = ( * @property {number} remaining * @property {MapStore} resultsMap * @property {VowKit['resolver']} resolver + * @property {boolean} [isAllSettled] */ /** @type {MapStore} */ const idToVowState = detached.mapStore('idToVowState'); @@ -79,20 +85,67 @@ export const prepareWatchUtils = ( }, { utils: { + /** @param {EVow[]} vows */ + all(vows) { + return this.facets.helper.createVow(vows, false); + }, + /** @param {EVow[]} vows */ + allSettled(vows) { + return this.facets.helper.createVow(vows, true); + }, + /** @type {AsPromiseFunction} */ + asPromise(specimenP, ...watcherArgs) { + // Watch the specimen in case it is an ephemeral promise. + const vow = watch(specimenP, ...watcherArgs); + const promise = when(vow); + // Watch the ephemeral result promise to ensure that if its settlement is + // lost due to upgrade of this incarnation, we will at least cause an + // unhandled rejection in the new incarnation. + zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); + + return promise; + }, + }, + watcher: { + /** + * @param {unknown} value + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + */ + onFulfilled(value, ctx) { + this.facets.helper.processResult(value, ctx, 'fulfilled'); + }, + /** + * @param {unknown} reason + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + */ + onRejected(reason, ctx) { + this.facets.helper.processResult(reason, ctx, 'rejected'); + }, + }, + helper: { /** * @param {EVow[]} vows + * @param {boolean} isAllSettled */ - all(vows) { + createVow(vows, isAllSettled) { const { nextId: id, idToVowState } = this.state; /** @type {VowKit} */ const kit = makeVowKit(); - // Preserve the order of the vow results. for (let index = 0; index < vows.length; index += 1) { watch(vows[index], this.facets.watcher, { id, index, numResults: vows.length, + isAllSettled, }); } @@ -105,6 +158,7 @@ export const prepareWatchUtils = ( resolver: kit.resolver, remaining: vows.length, resultsMap: detached.mapStore('resultsMap'), + isAllSettled, }), ); const idToNonStorableResults = provideLazyMap( @@ -119,27 +173,36 @@ export const prepareWatchUtils = ( } return kit.vow; }, - /** @type {AsPromiseFunction} */ - asPromise(specimenP, ...watcherArgs) { - // Watch the specimen in case it is an ephemeral promise. - const vow = watch(specimenP, ...watcherArgs); - const promise = when(vow); - // Watch the ephemeral result promise to ensure that if its settlement is - // lost due to upgrade of this incarnation, we will at least cause an - // unhandled rejection in the new incarnation. - zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); - - return promise; - }, - }, - watcher: { - onFulfilled(value, { id, index, numResults }) { + /** + * @param {unknown} result + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + * @param {'fulfilled' | 'rejected'} status + */ + processResult(result, { id, index, numResults, isAllSettled }, status) { const { idToVowState } = this.state; if (!idToVowState.has(id)) { // Resolution of the returned vow happened already. return; } const { remaining, resultsMap, resolver } = idToVowState.get(id); + if (!isAllSettled && status === 'rejected') { + // For 'all', we reject immediately on the first rejection + idToVowState.delete(id); + resolver.reject(result); + return; + } + + const possiblyWrappedResult = isAllSettled + ? harden({ + status, + [status === 'fulfilled' ? 'value' : 'reason']: result, + }) + : result; + const idToNonStorableResults = provideLazyMap( utilsToNonStorableResults, this.facets.utils, @@ -152,15 +215,16 @@ export const prepareWatchUtils = ( ); // Capture the fulfilled value. - if (zone.isStorable(value)) { - resultsMap.init(index, value); + if (zone.isStorable(possiblyWrappedResult)) { + resultsMap.init(index, possiblyWrappedResult); } else { - nonStorableResults.set(index, value); + nonStorableResults.set(index, possiblyWrappedResult); } const vowState = harden({ remaining: remaining - 1, resultsMap, resolver, + isAllSettled, }); if (vowState.remaining > 0) { idToVowState.set(id, vowState); @@ -177,9 +241,12 @@ export const prepareWatchUtils = ( results[i] = resultsMap.get(i); } else { numLost += 1; + results[i] = isAllSettled + ? { status: 'rejected', reason: 'Unstorable result was lost' } + : undefined; } } - if (numLost > 0) { + if (numLost > 0 && !isAllSettled) { resolver.reject( assert.error(X`${numLost} unstorable results were lost`), ); @@ -187,16 +254,6 @@ export const prepareWatchUtils = ( resolver.resolve(harden(results)); } }, - onRejected(value, { id, index: _index, numResults: _numResults }) { - const { idToVowState } = this.state; - if (!idToVowState.has(id)) { - // First rejection wins. - return; - } - const { resolver } = idToVowState.get(id); - idToVowState.delete(id); - resolver.reject(value); - }, }, retryRejectionPromiseWatcher: { onFulfilled(_result) {}, diff --git a/packages/vow/test/watch-utils.test.js b/packages/vow/test/watch-utils.test.js index 13e5be07d5d..52d073b1443 100644 --- a/packages/vow/test/watch-utils.test.js +++ b/packages/vow/test/watch-utils.test.js @@ -1,4 +1,5 @@ // @ts-check +/* global setTimeout */ import test from 'ava'; import { makeHeapZone } from '@agoric/base-zone/heap.js'; @@ -252,3 +253,42 @@ test('asPromise handles watcher arguments', async t => { t.is(result, 'watcher test'); t.true(watcherCalled); }); + +test('allVowsSettled handles mixed fulfilled and rejected vows', async t => { + const zone = makeHeapZone(); + const { watch, when, allVowsSettled } = prepareVowTools(zone); + + const vowA = watch(Promise.resolve('a')); + const vowB = watch(Promise.reject(new Error('b'))); + const vowC = watch(Promise.resolve('c')); + + const result = await when(allVowsSettled([vowA, vowB, vowC])); + t.is(result.length, 3); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { + status: 'rejected', + reason: new Error('b'), + }); + t.deepEqual(result[2], { status: 'fulfilled', value: 'c' }); +}); + +test('allVowsSettled returns vows in order', async t => { + const zone = makeHeapZone(); + const { watch, when, allVowsSettled, makeVowKit } = prepareVowTools(zone); + const kit = makeVowKit(); + + const vowA = watch(kit.vow); + const vowB = watch(Promise.resolve('b')); + const vowC = watch(Promise.reject(new Error('c'))); + const allSettledV = allVowsSettled([vowA, vowB, vowC]); + setTimeout(() => kit.resolver.resolve('a'), 250); + + const result = await when(allSettledV); + t.is(result.length, 3); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { status: 'fulfilled', value: 'b' }); + t.deepEqual(result[2], { + status: 'rejected', + reason: new Error('c'), + }); +});