Skip to content

Commit

Permalink
feat(asyncFlow): Stopgap E support (#9519)
Browse files Browse the repository at this point in the history
closes: #XXXX
refs: #9322, #9299 #9443

## Description

PR #9322 is supposed to provide production quality support for asyncFlow guest functions to use `E`. It is being reviewed for that goal, and will not be merged until we think it meets that bar. However, we need to start integration testing of asyncFlow with orchestration, to spot mismatched assumptions we may have missed. For this purpose, we do not immediately need production quality `E` support. That is the purpose of this PR. It starts as a copy of the code from #9322 but need only be evaluated as adequate for these stopgap purposes before being merged. This PR does *NOT* claim to f-i-x #9299 , leaving that job to remain with #9322 

Even though the requirements on this PR are so much lighter, reviewers should still look at the unresolved conversations on #9322 and determine if any of those need to first be solved even in this PR.

### Security Considerations

When merging stopgap code to master, there is always the danger that it might be used as if it production code. We need to remember not to do so, instead waiting for #9322 to do the job for real.

### Scaling Considerations
none
### Documentation Considerations
just as this stopgap unblocks integration testing, it also likely unblocks documenting how to use asyncFlow, both in general and for orchestration.
### Testing Considerations
As a stopgap, this PR does not need the rigorous testing that #9322 should have.
### Upgrade Considerations
We need to not use this stopgap for production purposes.
  • Loading branch information
erights committed Jun 28, 2024
1 parent f0f1a3e commit 4adf64f
Show file tree
Hide file tree
Showing 7 changed files with 432 additions and 46 deletions.
281 changes: 257 additions & 24 deletions packages/async-flow/src/replay-membrane.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
/* eslint-disable no-use-before-define */
import { Fail, X, b, makeError, q } from '@endo/errors';
import { isPromise } from '@endo/promise-kit';
import { Far, Remotable, getInterfaceOf } from '@endo/pass-style';
import {
Far,
Remotable,
getInterfaceOf,
getTag,
makeTagged,
passStyleOf,
} from '@endo/pass-style';
import { E } from '@endo/eventual-send';
import { throwLabeled } from '@endo/common/throw-labeled.js';
import { heapVowE } from '@agoric/vow/vat.js';
import { getMethodNames } from '@endo/eventual-send/utils.js';
import { objectMap } from '@endo/common/object-map.js';
import { isVow } from '@agoric/vow/src/vow-utils.js';
import { makeEquate } from './equate.js';
import { makeConvertKit } from './convert.js';

/**
* @import {PromiseKit} from '@endo/promise-kit'
* @import {Vow, VowTools} from '@agoric/vow'
* @import {Passable, PassableCap, CopyTagged} from '@endo/pass-style'
* @import {Vow, VowTools, VowKit} from '@agoric/vow'
* @import {LogStore} from '../src/log-store.js';
* @import {Bijection} from '../src/bijection.js';
* @import {Host, HostVow, LogEntry, Outcome} from '../src/types.js';
Expand All @@ -33,7 +43,7 @@ export const makeReplayMembrane = ({
watchWake,
panic,
}) => {
const { when, watch } = vowTools;
const { when, watch, makeVowKit } = vowTools;

const equate = makeEquate(bijection);

Expand Down Expand Up @@ -127,18 +137,50 @@ export const makeReplayMembrane = ({

// ///////////// Guest to Host or consume log ////////////////////////////////

/**
* The host is not supposed to expose host-side promises to the membrane,
* since they cannot be stored durably or survive upgrade. We cannot just
* automatically wrap any such host promises with host vows, because that
* would mask upgrade hazards if an upgrade happens before the vow settles.
* However, during the transition, the current host APIs called by
* orchestration still return many promises. We want to generate diagnostics
* when we encounter them, but for now, automatically convert them to
* host vow anyway, just so integration testing can proceed to reveal
* additional problems beyond these.
*
* @param {Passable} h
*/
const tolerateHostPromiseToVow = h => {
if (isPromise(h)) {
const e = Error('where warning happened');
console.log('Warning for now: vow expected, not promise', h, e);
// TODO remove this stopgap. Here for now because host-side
// promises are everywhere!
// Note: A good place to set a breakpoint, or to uncomment the
// `debugger;` line, to work around bundling.
// debugger;
return watch(h);
} else {
return h;
const passStyle = passStyleOf(h);
switch (passStyle) {
case 'promise': {
const e = Error('where warning happened');
console.log('Warning for now: vow expected, not promise', h, e);
// TODO remove this stopgap. Here for now because host-side
// promises are everywhere!
// Note: A good place to set a breakpoint, or to uncomment the
// `debugger;` line, to work around bundling.
// debugger;
return watch(h);
}
case 'copyRecord': {
const o = /** @type {object} */ (h);
return objectMap(o, tolerateHostPromiseToVow);
}
case 'copyArray': {
const a = /** @type {Array} */ (h);
return harden(a.map(tolerateHostPromiseToVow));
}
case 'tagged': {
const t = /** @type {CopyTagged} */ (h);
if (isVow(t)) {
return h;
}
return makeTagged(getTag(t), tolerateHostPromiseToVow(t.payload));
}
default: {
return h;
}
}
};

Expand All @@ -150,6 +192,7 @@ export const makeReplayMembrane = ({
: hostTarget(...hostArgs);
// This is a temporary kludge anyway. But note that it only
// catches the case where the promise is at the top of hostResult.
harden(hostResult);
hostResult = tolerateHostPromiseToVow(hostResult);
// Try converting here just to route the error correctly
hostToGuest(hostResult, `converting ${optVerb || 'host'} result`);
Expand Down Expand Up @@ -233,14 +276,193 @@ export const makeReplayMembrane = ({

// //////////////// Eventual Send ////////////////////////////////////////////

/**
* @param {PassableCap} hostTarget
* @param {string | undefined} optVerb
* @param {Passable[]} hostArgs
*/
const performSendOnly = (hostTarget, optVerb, hostArgs) => {
try {
optVerb
? heapVowE.sendOnly(hostTarget)[optVerb](...hostArgs)
: // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error
// @ts-ignore once we changed this from E to heapVowE,
// typescript started complaining that heapVowE(hostTarget)
// is not callable. I'm not sure if this is a just a typing bug
// in heapVowE or also reflects a runtime deficiency. But this
// case it not used yet anyway. We disable it
// with at-ts-ignore rather than at-ts-expect-error because
// the dependency-graph tests complains that the latter is unused.
heapVowE.sendOnly(hostTarget)(...hostArgs);
} catch (hostProblem) {
throw Panic`internal: eventual sendOnly synchrously failed ${hostProblem}`;
}
};

/**
* @param {PassableCap} hostTarget
* @param {string | undefined} optVerb
* @param {Passable[]} hostArgs
* @param {number} callIndex
* @param {VowKit} hostResultKit
* @param {Promise} guestReturnedP
* @returns {Outcome}
*/
const performSend = (
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
) => {
const { vow, resolver } = hostResultKit;
try {
const hostPromise = optVerb
? heapVowE(hostTarget)[optVerb](...hostArgs)
: // eslint-disable-next-line @typescript-eslint/prefer-ts-expect-error
// @ts-ignore once we changed this from E to heapVowE,
// typescript started complaining that heapVowE(hostTarget)
// is not callable. I'm not sure if this is a just a typing bug
// in heapVowE or also reflects a runtime deficiency. But this
// case it not used yet anyway. We disable it
// with at-ts-ignore rather than at-ts-expect-error because
// the dependency-graph tests complains that the latter is unused.
heapVowE(hostTarget)(...hostArgs);
resolver.resolve(hostPromise); // TODO does this always work?
} catch (hostProblem) {
throw Panic`internal: eventual send synchrously failed ${hostProblem}`;
}
try {
const entry = harden(['doReturn', callIndex, vow]);
log.pushEntry(entry);
const guestPromise = makeGuestForHostVow(vow, guestReturnedP);
// Note that `guestPromise` is not registered in the bijection since
// guestReturnedP is already the guest for vow. Rather, the handler
// returns guestPromise to resolve guestReturnedP to guestPromise.
doReturn(callIndex, vow);
return harden({
kind: 'return',
result: guestPromise,
});
} catch (problem) {
throw panic(problem);
}
};

const guestHandler = harden({
applyMethodSendOnly(guestTarget, optVerb, guestArgs) {
const callIndex = log.getIndex();
if (stopped || !bijection.hasGuest(guestTarget)) {
Fail`Sent from a previous run: ${guestTarget}`;
}
try {
const guestEntry = harden([
'checkSendOnly',
guestTarget,
optVerb,
guestArgs,
callIndex,
]);
if (log.isReplaying()) {
const entry = log.nextEntry();
try {
equate(guestEntry, entry);
} catch (equateErr) {
// TODO consider Richard Gibson's suggestion for a better way
// to keep track of the error labeling.
throwLabeled(
equateErr,
`replay ${callIndex}:
${q(guestEntry)}
vs ${q(entry)}
`,
);
}
} else {
const entry = guestToHost(guestEntry);
log.pushEntry(entry);
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry;
performSendOnly(hostTarget, optVerb, hostArgs);
}
} catch (fatalError) {
throw panic(fatalError);
}
},
applyMethod(guestTarget, optVerb, guestArgs, guestReturnedP) {
if (optVerb === undefined) {
throw Panic`guest eventual call not yet supported: ${guestTarget}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
} else {
throw Panic`guest eventual send not yet supported: ${guestTarget}.${b(optVerb)}(${b(guestArgs)}) -> ${b(guestReturnedP)}`;
const callIndex = log.getIndex();
if (stopped || !bijection.hasGuest(guestTarget)) {
Fail`Sent from a previous run: ${guestTarget}`;
}
const hostResultKit = makeVowKit();
const g = bijection.unwrapInit(guestReturnedP, hostResultKit.vow);
g === guestReturnedP ||
Fail`internal: guestReturnedP should not unwrap: ${g} vs ${guestReturnedP}`;
/** @type {Outcome} */
let outcome;
try {
const guestEntry = harden([
'checkSend',
guestTarget,
optVerb,
guestArgs,
callIndex,
]);
if (log.isReplaying()) {
const entry = log.nextEntry();
try {
equate(guestEntry, entry);
} catch (equateErr) {
// TODO consider Richard Gibson's suggestion for a better way
// to keep track of the error labeling.
throwLabeled(
equateErr,
`replay ${callIndex}:
${q(guestEntry)}
vs ${q(entry)}
`,
);
}
outcome = /** @type {Outcome} */ (nestInterpreter(callIndex));
} else {
const entry = guestToHost(guestEntry);
log.pushEntry(entry);
const [_op, hostTarget, _optVerb, hostArgs, _callIndex] = entry;
nestInterpreter(callIndex);
outcome = performSend(
hostTarget,
optVerb,
hostArgs,
callIndex,
hostResultKit,
guestReturnedP,
);
}
} catch (fatalError) {
throw panic(fatalError);
}

switch (outcome.kind) {
case 'return': {
return outcome.result;
}
case 'throw': {
throw outcome.problem;
}
default: {
// @ts-expect-error TS correctly knows this case would be outside
// the type. But that's what we want to check.
throw Panic`unexpected outcome kind ${q(outcome.kind)}`;
}
}
},
applyFunctionSendOnly(guestTarget, guestArgs) {
return guestHandler.applyMethodSendOnly(
guestTarget,
undefined,
guestArgs,
);
},
applyFunction(guestTarget, guestArgs, guestReturnedP) {
return guestHandler.applyMethod(
guestTarget,
Expand All @@ -249,6 +471,9 @@ export const makeReplayMembrane = ({
guestReturnedP,
);
},
getSendOnly(guestTarget, prop) {
throw Panic`guest eventual getSendOnly not yet supported: ${guestTarget}.${b(prop)}`;
},
get(guestTarget, prop, guestReturnedP) {
throw Panic`guest eventual get not yet supported: ${guestTarget}.${b(prop)} -> ${b(guestReturnedP)}`;
},
Expand Down Expand Up @@ -340,13 +565,21 @@ export const makeReplayMembrane = ({

/**
* @param {Vow} hVow
* @returns {unknown}
* @param {Promise} [promiseKey]
* If provided, use this promise as the key in the guestPromiseMap
* rather than the returned promise. This only happens when the
* promiseKey ends up forwarded to the returned promise anyway, so
* associating it with this resolve/reject pair is not incorrect.
* It is needed when `promiseKey` is also entered into the bijection
* paired with hVow.
* @returns {Promise}
*/
const makeGuestForHostVow = hVow => {
const makeGuestForHostVow = (hVow, promiseKey = undefined) => {
hVow = tolerateHostPromiseToVow(hVow);
isVow(hVow) || Fail`vow expected ${hVow}`;
const { promise, resolve, reject } = makeGuestPromiseKit();
guestPromiseMap.set(promise, harden({ resolve, reject }));
promiseKey ??= promise;
guestPromiseMap.set(promiseKey, harden({ resolve, reject }));

watchWake(hVow);

Expand All @@ -370,7 +603,7 @@ export const makeReplayMembrane = ({
hVow,
async hostFulfillment => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doFulfill', hVow, hostFulfillment]);
log.pushEntry(entry);
Expand All @@ -385,7 +618,7 @@ export const makeReplayMembrane = ({
},
async hostReason => {
await log.promiseReplayDone(); // should never reject
if (!stopped && guestPromiseMap.get(promise) !== 'settled') {
if (!stopped && guestPromiseMap.get(promiseKey) !== 'settled') {
/** @type {LogEntry} */
const entry = harden(['doReject', hVow, hostReason]);
log.pushEntry(entry);
Expand Down
Loading

0 comments on commit 4adf64f

Please sign in to comment.