Skip to content

Commit

Permalink
Settle parallel branches before re-planning
Browse files Browse the repository at this point in the history
The agent runtime now runs parallel branches with `Promise.allSettled`
instead of `Promise.all`. After the execution is finished, if any of
the branches returns a failure, the plan execution is interrupted and
re-planning proceeds. This prevents moving on too soon when a failure is
encountered and causing unnecessary re-plans. This also moves logging
earlier in the plan execution to get feedback as soon as an action is
executed successfully or it fails.

Change-type: patch
  • Loading branch information
pipex committed Apr 30, 2024
1 parent 4bc3c49 commit 6f04dcf
Showing 1 changed file with 81 additions and 32 deletions.
113 changes: 81 additions & 32 deletions lib/agent/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,47 @@ import { Failure, NotStarted, Stopped, Timeout, UnknownError } from './types';

import * as DAG from '../dag';

class ActionError extends Error {
constructor(
message: string,
readonly id: string,
readonly action: Action,
readonly cause?: any,
) {
super(message);
}
}

/**
* Internal error
*/
class ActionRunFailed extends Error {
class ActionRunFailed extends ActionError {
constructor(
readonly id: string,
readonly action: Action,
readonly cause: any,
) {
super(`Action '${action.description}' failed with error: ${cause}`);
super(
`Action '${action.description}' failed with error: ${cause}`,
id,
action,
cause,
);
}
}

class ActionConditionFailed extends Error {
constructor(readonly action: Action) {
super(`Condition for action '${action.description}' not met`);
class PlanRunFailed extends Error {
constructor(readonly errors: ActionError[]) {
super(`Plan execution failed`);
}
}

class ActionConditionFailed extends ActionError {
constructor(
readonly id: string,
readonly action: Action,
) {
super(`Condition for action '${action.description}' not met`, id, action);
}
}

Expand Down Expand Up @@ -125,6 +151,7 @@ export class Runtime<TState> {
changes.map(toLog).forEach((log) => logger.debug('-', ...log));

// Trigger a plan search
// console.log('FIND PLAN', { current: this.stateRef._, target: this.target });
const result = this.planner.findPlan(this.stateRef._, this.target);
logger.debug(
`search finished after ${
Expand All @@ -140,44 +167,68 @@ export class Runtime<TState> {
return result;
}

private async runAction(action: Action): Promise<void> {
try {
// Running the action should perform the changes in the
// local state without the need of comparisons later.
// The observe() wrapper allows to notify the observer from every
// change to some part of the state
await observe(action, this.observer)(this.stateRef);
} catch (e) {
throw new ActionRunFailed(action, e);
}
}

private async runPlan(node: PlanNode<TState> | null) {
private async runPlan(root: PlanNode<TState> | null) {
const { logger } = this.opts;

return await DAG.mapReduce(
node,
root,
Promise.resolve(),
async (v: PlanAction<TState>, prev) => {
// Wait for the previous action to complete
async (node: PlanAction<TState>, prev) => {
// Wait for the previous action to complete,
// this also propagates errors
await prev;

const { action } = v;
const { id, action } = node;

if (this.stopped) {
throw new Cancelled();
}

if (!action.condition(this.stateRef._)) {
throw new ActionConditionFailed(action);
logger.warn(`${action.description}: condition failed`);
throw new ActionConditionFailed(id, action);
}

logger.info(`${action.description}: running ...`);
await this.runAction(action);
logger.info(`${action.description}: success`);
try {
// Running the action should perform the changes in the
// local state without the need of comparisons later.
// The observe() wrapper allows to notify the observer from every
// change to some part of the state, it also reverts any changes
// if an error occurs
logger.info(`${action.description}: running ...`);
await observe(action, this.observer)(this.stateRef);
logger.info(`${action.description}: success`);
} catch (e) {
logger.error(`${action.description}: failed`, e);
throw new ActionRunFailed(id, action, e);
}
},
async (actions) => {
await Promise.all(actions);
// Wait for all promises to be settled to prevent moving
// on with a new planning cycle before the state is settled
const results = await Promise.allSettled(actions);

// Aggregate the results from previous calls
// we use a map to deduplicate errors since map reduce
// will propagate the same errors on every branch
const actionErrorMap: Record<string, ActionError> = {};
for (const r of results) {
if (r.status === 'rejected') {
const { reason: err } = r;
actionErrorMap[err.id] = err;
if (err instanceof ActionError) {
actionErrorMap[err.id] = err;
} else {
// Propagate any other errors
throw err;
}
}
}

const errors = Object.values(actionErrorMap);
if (errors.length > 0) {
throw new PlanRunFailed(errors);
}
},
);
}
Expand Down Expand Up @@ -246,10 +297,8 @@ export class Runtime<TState> {
} else {
logger.warn('no plan found');
}
} else if (e instanceof ActionConditionFailed) {
logger.warn(`${e.action.description}: condition failed`);
} else if (e instanceof ActionRunFailed) {
logger.error(`${e.action.description}: failed`, e.cause);
} else if (e instanceof ActionError || e instanceof PlanRunFailed) {
logger.warn('plan execution interrupted due to errors');
} else if (e instanceof Cancelled) {
logger.warn('plan execution cancelled');
// exit the loop
Expand All @@ -273,7 +322,7 @@ export class Runtime<TState> {
}
}
const wait = Math.min(this.opts.backoffMs(tries), this.opts.maxWaitMs);
logger.debug(`waiting ${wait / 1000}s before re-planning`);
logger.debug(`waiting ${wait / 1000}s before re - planning`);
await delay(wait);

// Only backof if we haven't been able to reach the target
Expand Down

0 comments on commit 6f04dcf

Please sign in to comment.