Skip to content

Commit

Permalink
preserve async_hooks context in client-side middlewares
Browse files Browse the repository at this point in the history
  • Loading branch information
koresar committed May 21, 2024
1 parent 1cc3351 commit 31c0109
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 95 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -653,21 +653,19 @@ import { Lambda } from "@aws-sdk/client-lambda";
const invocationResponse = await new Lambda().invoke({
FunctionName: "my-lambda-name",
Payload: JSON.stringify({
_: { procedureName: "updateUser" },
id: "123412341234123412341234",
firstName: "Fred",
lastName: "Flinstone",
}),
ClientContext: JSON.stringify({
procedureName: "updateUser",
}),
});
const { success, code, message, user } = JSON.parse(invocationResponse.Payload);
```

Alternatively, you can call the same procedure using the `aws` CLI:

```shell
aws lambda invoke --function-name my-lambda-name --client-context '{"procedureName":"updateUser"}' --payload '{"id":"123412341234123412341234","firstName":"Fred","lastName":"Flinstone"}'
aws lambda invoke --function-name my-lambda-name --payload '{"_":{"procedureName":"updateUser"},"id":"123412341234123412341234","firstName":"Fred","lastName":"Flinstone"}}'
```

## `AllserverClient` options
Expand Down
105 changes: 62 additions & 43 deletions src/client/AllserverClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,40 +213,55 @@ module.exports = require("stampit")({
const defaultCtx = { client: this, isIntrospection: true };
const ctx = transport.createCallContext(defaultCtx);

await this._callMiddlewares(ctx, "before");

if (!ctx.result) {
try {
// This is supposed to be executed only once (per uri) unless it throws.
// There are only 3 situations when this throws:
// * the "introspect" method not found on server,
// * the network request is malformed,
// * couldn't connect to the remote host.
ctx.result = await transport.introspect(ctx);
} catch (err) {
ctx.result = {
success: false,
code: "ALLSERVER_CLIENT_INTROSPECTION_FAILED",
message: `Couldn't introspect ${transport.uri} due to: ${err.message}`,
noNetToServer: Boolean(err.noNetToServer),
error: err,
};
await this._callMiddlewares(ctx, "before", async () => {
if (!ctx.result) {
try {
// This is supposed to be executed only once (per uri) unless it throws.
// There are only 3 situations when this throws:
// * the "introspect" method not found on server,
// * the network request is malformed,
// * couldn't connect to the remote host.
ctx.result = await transport.introspect(ctx);
} catch (err) {
ctx.result = {
success: false,
code: "ALLSERVER_CLIENT_INTROSPECTION_FAILED",
message: `Couldn't introspect ${transport.uri} due to: ${err.message}`,
noNetToServer: Boolean(err.noNetToServer),
error: err,
};
}
}
}

await this._callMiddlewares(ctx, "after");
await this._callMiddlewares(ctx, "after");
});

return ctx.result;
},

async _callMiddlewares(ctx, middlewareType) {
const middlewares = [].concat(this[p][middlewareType]).filter(isFunction);
for (const middleware of middlewares) {
try {
const result = await middleware.call(this, ctx);
async _callMiddlewares(ctx, middlewareType, next) {
const runMiddlewares = async (middlewares) => {
if (!middlewares?.length) {
// no middlewares to run
if (next) return await next();
return;
}
const middleware = middlewares[0];
async function handleMiddlewareResult(result) {
if (result !== undefined) {
ctx.result = result;
break;
// Do not call any more middlewares
} else {
await runMiddlewares(middlewares.slice(1));
}
}
try {
if (middleware.length > 1) {
// This middleware accepts more than one argument
await middleware.call(this, ctx, handleMiddlewareResult);
} else {
const result = await middleware.call(this, ctx);
await handleMiddlewareResult(result);
}
} catch (err) {
if (!this[p].neverThrow) throw err;
Expand All @@ -257,9 +272,13 @@ module.exports = require("stampit")({
message = `The '${middlewareType}' middleware error while calling '${ctx.procedureName}' procedure: ${err.message}`;
}
ctx.result = { success: false, code, message, error: err };
return;
// Do not call any more middlewares
if (next) return await next();
}
}
};

const middlewares = [].concat(this[p][middlewareType]).filter(isFunction);
return await runMiddlewares(middlewares);
},

async call(procedureName, arg) {
Expand All @@ -271,24 +290,24 @@ module.exports = require("stampit")({
const defaultCtx = { procedureName, arg, client: this };
const ctx = transport.createCallContext(defaultCtx);

await this._callMiddlewares(ctx, "before");

if (!ctx.result) {
try {
ctx.result = await transport.call(ctx);
} catch (err) {
if (!this[p].neverThrow) throw err;

let { code, message } = err;
if (!err.code || err.noNetToServer) {
code = "ALLSERVER_CLIENT_PROCEDURE_UNREACHABLE";
message = `Couldn't reach remote procedure ${ctx.procedureName} due to: ${err.message}`;
await this._callMiddlewares(ctx, "before", async () => {
if (!ctx.result) {
try {
ctx.result = await transport.call(ctx);
} catch (err) {
if (!this[p].neverThrow) throw err;

let { code, message } = err;
if (!err.code || err.noNetToServer) {
code = "ALLSERVER_CLIENT_PROCEDURE_UNREACHABLE";
message = `Couldn't reach remote procedure ${ctx.procedureName} due to: ${err.message}`;
}
ctx.result = { success: false, code, message, error: err };
}
ctx.result = { success: false, code, message, error: err };
}
}

await this._callMiddlewares(ctx, "after");
await this._callMiddlewares(ctx, "after");
});

return ctx.result;
},
Expand Down
88 changes: 88 additions & 0 deletions test/client/AllserverClient.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
const assert = require("assert");

const cls = require("cls-hooked");
const spaceName = "allserver";
const session = cls.getNamespace(spaceName) || cls.createNamespace(spaceName);
function getTraceId() {
if (session?.active) {
return session.get("traceId") || "";
}

return "";
}
function setTraceIdAndRunFunction(traceId, func, ...args) {
return new Promise((resolve, reject) => {
session.run(async () => {
session.set("traceId", traceId);

try {
const result = await func(...args);
resolve(result);
} catch (err) {
reject(err);
}
});
});
}

const VoidClientTransport = require("../..").ClientTransport.compose({
props: {
uri: "void://localhost",
Expand Down Expand Up @@ -322,6 +347,42 @@ describe("AllserverClient", () => {
error: err,
});
});

it("should preserve async_hooks context in 'before'", async () => {
let called = [];
const client = AllserverClient({
transport: VoidClientTransport(),
autoIntrospect: false,
before: [
(ctx, next) => {
setTraceIdAndRunFunction("my-random-trace-id", next);
},
() => {
assert.strictEqual(getTraceId(), "my-random-trace-id");
called.push(1);
return undefined;
},
() => {
assert.strictEqual(getTraceId(), "my-random-trace-id");
called.push(2);
return { success: false, code: "BAD_AUTH_OR_SOMETHING", message: "Bad auth or something" };
},
() => {
called.push(3);
assert.fail("should not be called");
},
],
});

const result = await client.foo();

assert.deepStrictEqual(result, {
success: false,
code: "BAD_AUTH_OR_SOMETHING",
message: "Bad auth or something",
});
assert.deepStrictEqual(called, [1, 2]);
});
});

describe("'after'", () => {
Expand Down Expand Up @@ -423,6 +484,33 @@ describe("AllserverClient", () => {
error: err,
});
});

it("should preserve async_hooks context in 'after'", async () => {
let called = [];
const client = AllserverClient({
transport: VoidClientTransport(),
autoIntrospect: false,
before: [
(ctx, next) => {
setTraceIdAndRunFunction("my-random-trace-id", next);
},
],
after: [
() => {
assert.strictEqual(getTraceId(), "my-random-trace-id");
called.push(1);
},
() => {
assert.strictEqual(getTraceId(), "my-random-trace-id");
called.push(2);
},
],
});

await client.foo();

assert.deepStrictEqual(called, [1, 2]);
});
});

describe("'before'+'after'", () => {
Expand Down
72 changes: 24 additions & 48 deletions test/server/Allserver.test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
const assert = require("assert");

const cls = require("cls-hooked");
const spaceName = "allserver";
const session = cls.getNamespace(spaceName) || cls.createNamespace(spaceName);
function getTraceId() {
if (session?.active) {
return session.get("traceId") || "";
}

return "";
}
function setTraceIdAndRunFunction(traceId, func, ...args) {
return new Promise((resolve, reject) => {
session.run(async () => {
session.set("traceId", traceId);

try {
const result = await func(...args);
resolve(result);
} catch (err) {
reject(err);
}
});
});
}

const VoidTransport = require("stampit")({
methods: {
Expand Down Expand Up @@ -408,30 +432,6 @@ describe("Allserver", () => {
});

it("should preserve async_hooks context in 'before'", async () => {
const cls = require("cls-hooked");
const spaceName = "allserver";
const session = cls.getNamespace(spaceName) || cls.createNamespace(spaceName);
function getTraceId() {
if (session?.active) {
return session.get("traceId") || "";
}

return "";
}
function setTraceIdAndRunFunction(traceId, func, ...args) {
return new Promise((resolve, reject) => {
session.run(async () => {
session.set("traceId", traceId);

try {
const result = await func(...args);
resolve(result);
} catch (err) {
reject(err);
}
});
});
}
let called = [];
const server = Allserver({
before: [
Expand Down Expand Up @@ -583,30 +583,6 @@ describe("Allserver", () => {
});

it("should preserve async_hooks context in 'after'", async () => {
const cls = require("cls-hooked");
const spaceName = "allserver";
const session = cls.getNamespace(spaceName) || cls.createNamespace(spaceName);
function getTraceId() {
if (session?.active) {
return session.get("traceId") || "";
}

return "";
}
function setTraceIdAndRunFunction(traceId, func, ...args) {
return new Promise((resolve, reject) => {
session.run(async () => {
session.set("traceId", traceId);

try {
const result = await func(...args);
resolve(result);
} catch (err) {
reject(err);
}
});
});
}
let called = [];
const server = Allserver({
before: [
Expand Down

0 comments on commit 31c0109

Please sign in to comment.