Skip to content

Commit

Permalink
[43991] add rollback to pipelines (#39)
Browse files Browse the repository at this point in the history
Co-authored-by: Nate Rutman <nrutman@users.noreply.github.com>
  • Loading branch information
jdoughty-fg and nrutman authored Oct 4, 2024
1 parent fcab1cb commit d7dd60c
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 27 deletions.
36 changes: 23 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ A type-safe toolkit to easily compose synchronous process chains in TypeScript/J

## Table of Contents

- [Table of Contents](#table-of-contents)
- [Overview](#overview)
- [Installation](#installation)
- [Types](#types)
- [Builder](#builder)
- [Initializer](#initializer)
- [Stages](#stages)
- [Stage Arguments](#stage-arguments)
- [Stage Results](#stage-results)
- [Results Validator](#results-validator)
- [Middleware](#middleware)
- [Error Handling](#error-handling)
- [Example Use Cases](#example-use-cases)
- [Pipeline](#pipeline)
- [Table of Contents](#table-of-contents)
- [Overview](#overview)
- [Installation](#installation)
- [Types](#types)
- [Builder](#builder)
- [Initializer](#initializer)
- [Stages](#stages)
- [Stage Arguments](#stage-arguments)
- [Stage Results](#stage-results)
- [Results Validator](#results-validator)
- [Middleware](#middleware)
- [Error Handling](#error-handling)
- [Example Use Cases](#example-use-cases)

## Overview

Expand Down Expand Up @@ -67,6 +68,15 @@ The **Initializer** is a method that takes in the pipeline's arguments and produ

**Stages** are the independent steps in the process chain. They are processed synchronously (one at a time, in order) until the end of the chain is reached.

As of version `0.1.0` stages can be one of two types

- PipelineStage
- PipelineStageConfiguration

`PipelineStageConfiguration` adds the ability for the user to define a `rollback` function, which should undo changes made by the `execute` function.

The pipeline can support processing a collection of stages of either type.

### Stage Arguments

The following arguments are provided to a stage when it is executed:
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fieldguide/pipeline",
"description": "A toolkit to easily build synchronous process pipelines in TypeScript/JavaScript",
"version": "0.0.1",
"version": "0.1.0",
"main": "build/index.js",
"types": "build/index.d.ts",
"scripts": {
Expand Down
21 changes: 20 additions & 1 deletion src/__mocks__/TestPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { last } from "lodash";
import { last, noop } from "lodash";
import type {
PipelineInitializer,
PipelineMiddleware,
PipelineResultValidator,
PipelineStage,
PipelineStageConfiguration,
} from "../types";

export interface TestPipelineArguments {
Expand All @@ -25,6 +26,12 @@ export type TestStage = PipelineStage<
TestPipelineResults
>;

export type TestStageWithRollback = PipelineStageConfiguration<
TestPipelineArguments,
TestPipelineContext,
TestPipelineResults
>;

export type TestMiddleware = PipelineMiddleware<
TestPipelineArguments,
TestPipelineContext,
Expand Down Expand Up @@ -84,6 +91,18 @@ export const errorStage: TestStage = () => {
throw Error("This stage throws an error!");
};

/**
* A stage that specifies a rollback function to undo changes
*/
export function generateStageWithRollback(
rollbackFunction: () => Promise<void> | void,
): TestStageWithRollback {
return {
execute: noop,
rollback: rollbackFunction,
};
}

/**
* A results validator for the test pipeline
*/
Expand Down
91 changes: 89 additions & 2 deletions src/__tests__/buildPipeline.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { PipelineRollbackError } from "error/PipelineRollbackError";
import { logStageMiddlewareFactory } from "middleware/logStageMiddlewareFactory";
import {
TestMiddleware,
TestPipelineArguments,
TestPipelineContext,
TestPipelineResults,
TestStage,
TestStageWithRollback,
additionStage,
errorStage,
generateStageWithRollback,
initializer,
returnHistoryResult,
testPipelineResultValidator,
Expand All @@ -28,7 +31,21 @@ const partialResultsStages: TestStage[] = [additionStage, returnSumResult];

const errorStages: TestStage[] = [errorStage, returnHistoryResult];

const rollback1 = jest.fn();
const rollback2 = jest.fn();

const stagesWithRollback: (TestStage | TestStageWithRollback)[] = [
additionStage,
generateStageWithRollback(rollback1),
generateStageWithRollback(rollback2),
errorStage,
];

describe("buildPipeline", () => {
beforeEach(() => {
jest.clearAllMocks();
});

describe("when running a simple pipeline", () => {
it("should produce a result when successful", async () => {
const results = await runPipelineForStages(successfulStages);
Expand Down Expand Up @@ -59,7 +76,7 @@ describe("buildPipeline", () => {
let testMiddleware1: TestMiddlewareMock;
let testMiddleware2: TestMiddlewareMock;

beforeAll(async () => {
beforeEach(async () => {
middlewareCalls = [];

const createMiddlewareMock = (name: string): TestMiddlewareMock => {
Expand Down Expand Up @@ -98,10 +115,80 @@ describe("buildPipeline", () => {
]);
});
});

describe("when using a pipeline stage that can rollback", () => {
let error: unknown;

describe("and the rollback is successful", () => {
beforeEach(async () => {
error = undefined;

try {
await runPipelineForStages(stagesWithRollback);
} catch (e) {
error = e;
}
});

it("should call configured rollback functions", () => {
expect(rollback1).toHaveBeenCalledTimes(1);
expect(rollback2).toHaveBeenCalledTimes(1);
});

it("should call the rollbacks in the proper order", () => {
expect(rollback2.mock.invocationCallOrder[0]).toBeLessThan(
rollback1.mock.invocationCallOrder[0] ?? 0,
);
});

it("should still throw the error", () => {
expect(error).toBeInstanceOf(PipelineError);
});
});

describe("and the rollback fails", () => {
const errorThrownInRollback = new Error("This is a rollback error");

beforeEach(async () => {
rollback1.mockImplementation(() => {
throw errorThrownInRollback;
});

error = undefined;

try {
await runPipelineForStages(stagesWithRollback);
} catch (e) {
error = e;
}
});

it("should run the rollbacks from subsequent stages", () => {
expect(rollback2).toHaveBeenCalledTimes(1);
});

it("should throw a PipelineRollbackError", () => {
expect(error).toBeInstanceOf(PipelineRollbackError);
});

it("should capture the original pipeline error", () => {
expect(
(error as PipelineRollbackError<object, object, object>)
.originalPipelineError.message,
).toBe("[TestPipeline] Error: This stage throws an error!");
});

it("should capture the original cause that was thrown in the rollback", () => {
expect(
(error as PipelineRollbackError<object, object, object>).cause,
).toBe(errorThrownInRollback);
});
});
});
});

function runPipelineForStages(
stages: TestStage[],
stages: (TestStage | TestStageWithRollback)[],
middleware: TestMiddleware[] = [],
) {
const pipeline = buildPipeline<
Expand Down
79 changes: 71 additions & 8 deletions src/buildPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { PipelineRollbackError } from "error/PipelineRollbackError";
import { merge } from "lodash";
import { isPipelineStageConfiguration } from "utils";
import { PipelineError } from "./error/PipelineError";
import type {
Pipeline,
Expand All @@ -7,6 +9,7 @@ import type {
PipelineMiddleware,
PipelineResultValidator,
PipelineStage,
PipelineStageConfiguration,
} from "./types";

interface BuildPipelineInput<
Expand All @@ -16,7 +19,7 @@ interface BuildPipelineInput<
> {
name: string;
initializer: PipelineInitializer<C, A>;
stages: PipelineStage<A, C, R>[];
stages: (PipelineStage<A, C, R> | PipelineStageConfiguration<A, C, R>)[];
resultsValidator: PipelineResultValidator<R>;
middleware?: PipelineMiddleware<A, C, R>[];
}
Expand Down Expand Up @@ -46,10 +49,26 @@ export function buildPipeline<
arguments: args,
};

try {
const stageNames = stages.map((s) => s.name);
const context = await initializer(args);

/** All stages converted to configurations */
const stageConfigurations: PipelineStageConfiguration<A, C, R>[] =
stages.map((stage) => {
if (isPipelineStageConfiguration(stage)) {
return stage;
}

return {
execute: stage,
};
});

const potentiallyProcessedStages = [];

const context = await initializer(args);
try {
const stageNames: string[] = stageConfigurations.map(
(s) => s.execute.name,
);
maybeContext = context;

const reversedMiddleware = [...middlewares].reverse();
Expand All @@ -70,15 +89,19 @@ export function buildPipeline<
};
};

for (const stage of stages) {
for (const stage of stageConfigurations) {
// initialize next() with the stage itself
let next = () => stage(context, metadata) as Promise<Partial<R>>;
let next = () =>
stage.execute(context, metadata) as Promise<Partial<R>>;

// wrap stage with middleware such that the first middleware is the outermost function
for (const middleware of reversedMiddleware) {
next = wrapMiddleware(middleware, stage.name, next);
next = wrapMiddleware(middleware, stage.execute.name, next);
}

// Add stage to a stack that can be rolled back if necessary
potentiallyProcessedStages.push(stage);

// invoke middleware-wrapped stage
const stageResults = await next();

Expand All @@ -94,13 +117,53 @@ export function buildPipeline<

return results;
} catch (cause) {
throw new PipelineError(
const pipelineError = new PipelineError(
String(cause),
maybeContext,
results,
metadata,
cause,
);

await rollback(
potentiallyProcessedStages,
context,
metadata,
results,
pipelineError,
);

// Throw error after rolling back all stages
throw pipelineError;
}
};
}

/**
* Rollback changes made by stages in reverse order
*/
async function rollback<A extends object, C extends object, R extends object>(
stages: PipelineStageConfiguration<A, C, R>[],
context: C,
metadata: PipelineMetadata<A>,
results: R,
originalPipelineError: PipelineError<A, C, R>,
) {
let stage;
while ((stage = stages.pop()) !== undefined) {
try {
if (stage.rollback) {
await stage.rollback(context, metadata);
}
} catch (rollbackCause) {
throw new PipelineRollbackError(
String(`Rollback failed for stage: ${stage.execute.name}`),
context,
results,
metadata,
originalPipelineError,
rollbackCause,
);
}
}
}
Loading

0 comments on commit d7dd60c

Please sign in to comment.