From 3949b107de79ccb2e46e14b2ab761f4ada742d25 Mon Sep 17 00:00:00 2001 From: 0xPatrick Date: Thu, 15 Aug 2024 22:45:39 -0400 Subject: [PATCH] feat: vowTools.allSettled - adds vowTools helper that mimics the behavior of Promise.allSettled --- packages/vow/src/tools.js | 12 +++ packages/vow/src/types.js | 1 + packages/vow/src/watch-utils.js | 136 +++++++++++++++++++------- packages/vow/test/types.test-d.ts | 15 +++ packages/vow/test/watch-utils.test.js | 120 +++++++++++++++++++++-- 5 files changed, 240 insertions(+), 44 deletions(-) diff --git a/packages/vow/src/tools.js b/packages/vow/src/tools.js index 8a2968b1fb0..275d274c615 100644 --- a/packages/vow/src/tools.js +++ b/packages/vow/src/tools.js @@ -7,6 +7,7 @@ import { makeWhen } from './when.js'; /** * @import {Zone} from '@agoric/base-zone'; + * @import {Passable} from '@endo/pass-style'; * @import {IsRetryableReason, AsPromiseFunction, EVow, Vow, ERef} from './types.js'; */ @@ -68,6 +69,16 @@ export const prepareBasicVowTools = (zone, powers = {}) => { */ const allVows = all; + /** + * Vow-tolerant implementation of Promise.allSettled that takes an iterable + * of vows and other {@link Passable}s and returns a single {@link Vow}. It + * resolves when all of the input's promises or vows are settled with an + * array of settled outcome objects. + * + * @param {unknown[]} maybeVows + */ + const allSettled = maybeVows => watchUtils.allSettled(maybeVows); + /** @type {AsPromiseFunction} */ const asPromise = (specimenP, ...watcherArgs) => watchUtils.asPromise(specimenP, ...watcherArgs); @@ -78,6 +89,7 @@ export const prepareBasicVowTools = (zone, powers = {}) => { makeVowKit, all, allVows, + allSettled, asVow, asPromise, retriable, diff --git a/packages/vow/src/types.js b/packages/vow/src/types.js index 15b894b244e..1370be6bd1c 100644 --- a/packages/vow/src/types.js +++ b/packages/vow/src/types.js @@ -66,6 +66,7 @@ export {}; */ /** + * Vows are objects that represent promises that can be stored durably. * @template [T=any] * @typedef {CopyTagged<'Vow', VowPayload>} Vow */ diff --git a/packages/vow/src/watch-utils.js b/packages/vow/src/watch-utils.js index 1039b950005..24b809f5a67 100644 --- a/packages/vow/src/watch-utils.js +++ b/packages/vow/src/watch-utils.js @@ -10,7 +10,7 @@ const { Fail, bare, details: X } = assert; * @import {Zone} from '@agoric/base-zone'; * @import {Watch} from './watch.js'; * @import {When} from './when.js'; - * @import {VowKit, AsPromiseFunction, IsRetryableReason, EVow} from './types.js'; + * @import {VowKit, AsPromiseFunction, IsRetryableReason, Vow} from './types.js'; */ const VowShape = M.tagged( @@ -54,12 +54,17 @@ export const prepareWatchUtils = ( { utils: M.interface('Utils', { all: M.call(M.arrayOf(M.any())).returns(VowShape), + allSettled: M.call(M.arrayOf(M.any())).returns(VowShape), asPromise: M.call(M.raw()).rest(M.raw()).returns(M.promise()), }), watcher: M.interface('Watcher', { onFulfilled: M.call(M.raw()).rest(M.raw()).returns(M.raw()), onRejected: M.call(M.raw()).rest(M.raw()).returns(M.raw()), }), + helper: M.interface('Helper', { + createVow: M.call(M.arrayOf(M.any()), M.boolean()).returns(VowShape), + processResult: M.call(M.raw()).rest(M.raw()).returns(M.undefined()), + }), retryRejectionPromiseWatcher: PromiseWatcherI, }, () => { @@ -68,6 +73,7 @@ export const prepareWatchUtils = ( * @property {number} remaining * @property {MapStore} resultsMap * @property {VowKit['resolver']} resolver + * @property {boolean} [isAllSettled] */ /** @type {MapStore} */ const idToVowState = detached.mapStore('idToVowState'); @@ -79,32 +85,83 @@ export const prepareWatchUtils = ( }, { utils: { + /** @param {unknown[]} specimens */ + all(specimens) { + return this.facets.helper.createVow(specimens, false); + }, + /** @param {unknown[]} specimens */ + allSettled(specimens) { + return /** @type {Vow<({status: 'fulfilled', value: any} | {status: 'rejected', reason: any})[]>} */ ( + this.facets.helper.createVow(specimens, true) + ); + }, + /** @type {AsPromiseFunction} */ + asPromise(specimenP, ...watcherArgs) { + // Watch the specimen in case it is an ephemeral promise. + const vow = watch(specimenP, ...watcherArgs); + const promise = when(vow); + // Watch the ephemeral result promise to ensure that if its settlement is + // lost due to upgrade of this incarnation, we will at least cause an + // unhandled rejection in the new incarnation. + zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); + + return promise; + }, + }, + watcher: { /** - * @param {EVow[]} vows + * @param {unknown} value + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled */ - all(vows) { + onFulfilled(value, ctx) { + this.facets.helper.processResult(value, ctx, 'fulfilled'); + }, + /** + * @param {unknown} reason + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + */ + onRejected(reason, ctx) { + this.facets.helper.processResult(reason, ctx, 'rejected'); + }, + }, + helper: { + /** + * @param {unknown[]} specimens + * @param {boolean} isAllSettled + */ + createVow(specimens, isAllSettled) { const { nextId: id, idToVowState } = this.state; /** @type {VowKit} */ const kit = makeVowKit(); - // Preserve the order of the vow results. - for (let index = 0; index < vows.length; index += 1) { - watch(vows[index], this.facets.watcher, { + // Preserve the order of the results. + for (let index = 0; index < specimens.length; index += 1) { + watch(specimens[index], this.facets.watcher, { id, index, - numResults: vows.length, + numResults: specimens.length, + isAllSettled, }); } - if (vows.length > 0) { + if (specimens.length > 0) { // Save the state until rejection or all fulfilled. this.state.nextId += 1n; idToVowState.init( id, harden({ resolver: kit.resolver, - remaining: vows.length, + remaining: specimens.length, resultsMap: detached.mapStore('resultsMap'), + isAllSettled, }), ); const idToNonStorableResults = provideLazyMap( @@ -119,27 +176,36 @@ export const prepareWatchUtils = ( } return kit.vow; }, - /** @type {AsPromiseFunction} */ - asPromise(specimenP, ...watcherArgs) { - // Watch the specimen in case it is an ephemeral promise. - const vow = watch(specimenP, ...watcherArgs); - const promise = when(vow); - // Watch the ephemeral result promise to ensure that if its settlement is - // lost due to upgrade of this incarnation, we will at least cause an - // unhandled rejection in the new incarnation. - zone.watchPromise(promise, this.facets.retryRejectionPromiseWatcher); - - return promise; - }, - }, - watcher: { - onFulfilled(value, { id, index, numResults }) { + /** + * @param {unknown} result + * @param {object} ctx + * @param {bigint} ctx.id + * @param {number} ctx.index + * @param {number} ctx.numResults + * @param {boolean} ctx.isAllSettled + * @param {'fulfilled' | 'rejected'} status + */ + processResult(result, { id, index, numResults, isAllSettled }, status) { const { idToVowState } = this.state; if (!idToVowState.has(id)) { // Resolution of the returned vow happened already. return; } const { remaining, resultsMap, resolver } = idToVowState.get(id); + if (!isAllSettled && status === 'rejected') { + // For 'all', we reject immediately on the first rejection + idToVowState.delete(id); + resolver.reject(result); + return; + } + + const possiblyWrappedResult = isAllSettled + ? harden({ + status, + [status === 'fulfilled' ? 'value' : 'reason']: result, + }) + : result; + const idToNonStorableResults = provideLazyMap( utilsToNonStorableResults, this.facets.utils, @@ -152,15 +218,16 @@ export const prepareWatchUtils = ( ); // Capture the fulfilled value. - if (zone.isStorable(value)) { - resultsMap.init(index, value); + if (zone.isStorable(possiblyWrappedResult)) { + resultsMap.init(index, possiblyWrappedResult); } else { - nonStorableResults.set(index, value); + nonStorableResults.set(index, possiblyWrappedResult); } const vowState = harden({ remaining: remaining - 1, resultsMap, resolver, + isAllSettled, }); if (vowState.remaining > 0) { idToVowState.set(id, vowState); @@ -177,9 +244,12 @@ export const prepareWatchUtils = ( results[i] = resultsMap.get(i); } else { numLost += 1; + results[i] = isAllSettled + ? { status: 'rejected', reason: 'Unstorable result was lost' } + : undefined; } } - if (numLost > 0) { + if (numLost > 0 && !isAllSettled) { resolver.reject( assert.error(X`${numLost} unstorable results were lost`), ); @@ -187,16 +257,6 @@ export const prepareWatchUtils = ( resolver.resolve(harden(results)); } }, - onRejected(value, { id, index: _index, numResults: _numResults }) { - const { idToVowState } = this.state; - if (!idToVowState.has(id)) { - // First rejection wins. - return; - } - const { resolver } = idToVowState.get(id); - idToVowState.delete(id); - resolver.reject(value); - }, }, retryRejectionPromiseWatcher: { onFulfilled(_result) {}, diff --git a/packages/vow/test/types.test-d.ts b/packages/vow/test/types.test-d.ts index 8a859348caf..6f2968a7631 100644 --- a/packages/vow/test/types.test-d.ts +++ b/packages/vow/test/types.test-d.ts @@ -16,3 +16,18 @@ expectType<(p1: number, p2: string) => Vow<{ someValue: 'bar' }>>( Promise.resolve({ someValue: 'bar' } as const), ), ); + +expectType< + Vow< + ( + | { status: 'fulfilled'; value: any } + | { status: 'rejected'; reason: any } + )[] + > +>( + vt.allSettled([ + Promise.resolve(1), + Promise.reject(new Error('test')), + Promise.resolve('hello'), + ]), +); diff --git a/packages/vow/test/watch-utils.test.js b/packages/vow/test/watch-utils.test.js index c769fe6120f..4a622f37195 100644 --- a/packages/vow/test/watch-utils.test.js +++ b/packages/vow/test/watch-utils.test.js @@ -1,4 +1,5 @@ // @ts-check +/* global setTimeout */ import test from 'ava'; import { makeHeapZone } from '@agoric/base-zone/heap.js'; @@ -6,6 +7,8 @@ import { E, getInterfaceOf } from '@endo/far'; import { prepareBasicVowTools } from '../src/tools.js'; +const setTimeoutAmbient = setTimeout; + test('vowTools.all waits for a single vow to complete', async t => { const zone = makeHeapZone(); const { watch, when, all } = prepareBasicVowTools(zone); @@ -116,14 +119,29 @@ test('vowTools.all - watch promises mixed with vows', async t => { test('vowTools.all can accept passable data (PureData)', async t => { const zone = makeHeapZone(); - const { watch, when, all } = prepareBasicVowTools(zone); - - const testPromiseP = Promise.resolve('vow'); - const vowA = watch(testPromiseP); + const { when, all } = prepareBasicVowTools(zone); - const result = await when(all([vowA, 'string', 1n, { obj: true }])); + const result = await when( + all([Promise.resolve('promise'), 'string', 1n, { obj: true }]), + ); t.is(result.length, 4); - t.deepEqual(result, ['vow', 'string', 1n, { obj: true }]); + t.deepEqual(result, ['promise', 'string', 1n, { obj: true }]); +}); + +test('vowTools.all rejects on the first settled rejection', async t => { + const zone = makeHeapZone(); + const { when, all } = prepareBasicVowTools(zone); + + await t.throwsAsync( + when( + all([ + Promise.resolve('yes'), + Promise.reject(new Error('no')), + Promise.reject(new Error('no again')), + ]), + ), + { message: 'no' }, + ); }); const prepareAccount = zone => @@ -268,3 +286,93 @@ test('vowTools.all handles unstorable results', async t => { t.is(result[1], nonPassable); t.is(result[1](), 'i am a function'); }); + +test('vowTools.allSettled handles mixed fulfilled and rejected vows', async t => { + const zone = makeHeapZone(); + const { watch, when, allSettled } = prepareBasicVowTools(zone); + + const vowA = watch(Promise.resolve('a')); + const vowB = watch(Promise.reject(new Error('b'))); + const vowC = watch(Promise.resolve('c')); + + const result = await when(allSettled([vowA, vowB, vowC])); + t.is(result.length, 3); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { + status: 'rejected', + reason: new Error('b'), + }); + t.deepEqual(result[2], { status: 'fulfilled', value: 'c' }); +}); + +test('vowTools.allSettled accepts any passables', async t => { + const zone = makeHeapZone(); + const { watch, when, allSettled } = prepareBasicVowTools(zone); + + const result = await when( + allSettled([ + watch(Promise.resolve('a')), + watch(Promise.reject(new Error('b'))), + Promise.resolve('c'), + 1n, + { foo: 'e' }, + new Error('f'), + 'g', + undefined, + ]), + ); + t.is(result.length, 8); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { + status: 'rejected', + reason: Error('b'), + }); + t.deepEqual(result[2], { status: 'fulfilled', value: 'c' }); + t.deepEqual(result[3], { status: 'fulfilled', value: 1n }); + t.deepEqual(result[4], { status: 'fulfilled', value: { foo: 'e' } }); + t.deepEqual(result[5], { status: 'fulfilled', value: Error('f') }); + t.deepEqual(result[6], { status: 'fulfilled', value: 'g' }); + t.deepEqual(result[7], { status: 'fulfilled', value: undefined }); +}); + +test('vowTools.allSettled returns vows in order', async t => { + const zone = makeHeapZone(); + const { watch, when, allSettled, makeVowKit } = prepareBasicVowTools(zone); + const kit = makeVowKit(); + + const vowA = watch(kit.vow); + const vowB = watch(Promise.resolve('b')); + const vowC = watch(Promise.reject(new Error('c'))); + const allSettledV = allSettled([vowA, vowB, vowC]); + setTimeoutAmbient(() => kit.resolver.resolve('a'), 250); + + const result = await when(allSettledV); + t.is(result.length, 3); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { status: 'fulfilled', value: 'b' }); + t.deepEqual(result[2], { + status: 'rejected', + reason: new Error('c'), + }); +}); + +test('vowTools.allSettled handles unstorable results', async t => { + const zone = makeHeapZone(); + const { watch, when, allSettled } = prepareBasicVowTools(zone); + + // it's not recommended to use non-passables with allVows or allSettled, + // but an attempt will be made to store the value + const nonPassable = () => 'im a function'; + t.is(zone.isStorable(nonPassable), false); + + const vowA = watch(Promise.resolve('a')); + const vowB = watch(nonPassable); + + const result = await when(allSettled([vowA, vowB])); + + t.is(result.length, 2); + t.deepEqual(result[0], { status: 'fulfilled', value: 'a' }); + t.deepEqual(result[1], { status: 'fulfilled', value: nonPassable }); + // @ts-expect-error narrowed in line above + t.is(result[1].value(), 'im a function'); +});