Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for direct AWS Lambda invocation #5

Merged
merged 5 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ npm i allserver

#### Client

Same as the HTTP protocol client above.
Two ways:

- Same as the HTTP protocol client above.
- Direct invocation via AWS SDK or AWS CLI.

### gRPC protocol

Expand Down Expand Up @@ -369,15 +372,16 @@ exports.handler = Allserver({
}).start();
```

Or, if you want each individual procedure to be the Lambda handler, pass `mapProceduresToExports: true`.
### Server in Bare AWS Lambda

Invoke directly via AWS SDK or AWS CLI. But better use the LambdaClientTransport (aka `"lambda://"` scheme) below.

```js
const { Allserver, LambdaTransport } = require("allserver");

// Note! No `handler` here.
exports = Allserver({
exports.handler = Allserver({
procedures,
transport: LambdaTransport({ mapProceduresToExports: true }),
transport: LambdaTransport(),
}).start();
```

Expand Down Expand Up @@ -596,19 +600,76 @@ Sometimes you need to unit test your procedures via the `AllserverClient`. For t
const { Allserver, MemoryTransport } = require("allserver");

const memoryServer = Allserver({
procedures,
transport: MemoryTransport(),
procedures,
transport: MemoryTransport(),
});

const client = memoryServer.start();

const { success, code, message, user } = await client.updateUser({
id: "123412341234123412341234",
firstName: "Fred",
lastName: "Flinstone",
});

assert(success === true);
```

### Bare AWS Lambda invocation

First you need to install any of the AWS SDK versions.

```shell
npm i allserver aws-sdk
```

or

```shell
npm i allserver @aws-sdk/client-lambda
```

The invoke the lambda this way:

```js
const { AllserverClient } = require("allserver");
// or
const AllserverClient = require("allserver/Client");

const client = AllserverClient({ uri: "lambda://my-lambda-name" });

const { success, code, message, user } = await client.updateUser({
id: "123412341234123412341234",
firstName: "Fred",
lastName: "Flinstone",
});
```

#### Using AWS SDK

As usual, the client side does not require the Allserver packages at all.

```js
import { Lambda } from "@aws-sdk/client-lambda";

const invocationResponse = await new Lambda().invoke({
FunctionName: "my-lambda-name",
Payload: JSON.stringify({
id: "123412341234123412341234",
firstName: "Fred",
lastName: "Flinstone",
}),
ClientContext: JSON.stringify({
procedureName: "updateUser",
}),
});
const { success, code, message, user } = JSON.parse(invocationResponse.Payload);
```

assert(success === true);
Alternatively, you can call the same procedure using the `aws` CLI:

```shell
aws lambda invoke --function-name my-lambda-name --client-context '{"procedureName":"updateUser"}' --payload '{"id":"123412341234123412341234","firstName":"Fred","lastName":"Flinstone"}'
```

## `AllserverClient` options
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "allserver",
"version": "2.1.0",
"version": "2.2.0-1",
"description": "Multi-protocol simple RPC server and [optional] client. Boilerplate-less. Opinionated. Minimalistic. DX-first.",
"main": "src/index.js",
"scripts": {
Expand All @@ -9,6 +9,8 @@
"cov": "nyc --reporter=html npm run test"
},
"keywords": [
"simple",
"rpc",
"http",
"grpc",
"websocket",
Expand Down Expand Up @@ -63,7 +65,7 @@
"@grpc/grpc-js": "^1.1.7",
"@grpc/proto-loader": "^0.7.5",
"bullmq": "^3.10.1",
"eslint": "^7.9.0",
"eslint": "^8.55.0",
"express": "^4.18.2",
"lambda-local": "^1.7.3",
"micro": "^10.0.1",
Expand All @@ -77,7 +79,7 @@
},
"eslintConfig": {
"parserOptions": {
"ecmaVersion": 2019
"ecmaVersion": 2022
},
"env": {
"es6": true,
Expand Down
1 change: 1 addition & 0 deletions src/client/AllserverClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ module.exports = require("stampit")({
https() { return require("./HttpClientTransport"); },
grpc() { return require("./GrpcClientTransport"); },
bullmq() { return require("./BullmqClientTransport"); },
lambda() { return require("./LambdaClientTransport"); },
},
},

Expand Down
54 changes: 54 additions & 0 deletions src/client/LambdaClientTransport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
module.exports = require("./ClientTransport").compose({
name: "LambdaClientTransport",

props: {
awsSdkLambdaClient: null,
},

init() {
if (!this.awsSdkLambdaClient) {
let Lambda;
try {
Lambda = require("aws-sdk").Lambda; // AWS SDK v2 adoption
} catch {
Lambda = require("@aws-sdk/client-lambda").Lambda;
}

this.awsSdkLambdaClient = new Lambda();
}
},

methods: {
async introspect(ctx) {
ctx.procedureName = "";
const result = await this.call(ctx);
// The server-side Transport will not have the call result if introspection is not available on the server side,
// but the server itself is up and running processing calls.
if (!result.procedures) throw Error("The lambda introspection call returned nothing"); // The ClientTransport expects us to throw if call fails
return result;
},

async call({ procedureName, lambda }) {
let promise = this.awsSdkLambdaClient.invoke({
FunctionName: this.uri.substring("lambda://".length),
Payload: JSON.stringify({
callContext: { ...lambda.callContext, procedureName },
callArg: lambda.callArg,
}),
});
if (typeof promise.promise === "function") promise = promise.promise(); // AWS SDK v2 adoption
const invocationResponse = await promise;
return JSON.parse(invocationResponse.Payload);
},

createCallContext(defaultCtx) {
return {
...defaultCtx,
lambda: {
callContext: {},
callArg: defaultCtx.arg,
},
};
},
},
});
3 changes: 3 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,7 @@ module.exports = {
get MemoryClientTransport() {
return require("./client/MemoryClientTransport");
},
get LambdaClientTransport() {
return require("./client/LambdaClientTransport");
},
};
96 changes: 49 additions & 47 deletions src/server/LambdaTransport.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
module.exports = require("./Transport").compose({
name: "LambdaTransport",

props: {
_mapProceduresToExports: false,
},

init({ mapProceduresToExports }) {
this._mapProceduresToExports = mapProceduresToExports || this._mapProceduresToExports;
},

methods: {
async deserializeRequest(ctx) {
const body = ctx.lambda.event.body;
let arg = ctx.lambda.query;
try {
// If there is no body we will use request query (aka search params)
if (body) arg = JSON.parse(body);
ctx.arg = arg;
async deserializeEvent(ctx) {
if (ctx.lambda.isHttp) {
const { body, query } = ctx.lambda.http;
try {
// If there is no body we will use request query (aka search params)
if (!body) {
ctx.arg = query || {};
} else {
if ((typeof body === "string" && body[0] === "{") || Buffer.isBuffer(body)) {
ctx.arg = JSON.parse(body);
} else {
return false;
}
}
return true;
} catch (err) {
return false;
}
} else {
ctx.arg = ctx.lambda.invoke.callArg || {};
return true;
} catch (err) {
return false;
}
},

async _handleRequest(ctx) {
if (await this.deserializeRequest(ctx)) {
if (await this.deserializeEvent(ctx)) {
await ctx.allserver.handleCall(ctx);
} else {
// HTTP protocol request was malformed (not expected structure).
Expand All @@ -36,37 +39,32 @@ module.exports = require("./Transport").compose({
},

startServer(defaultCtx) {
if (this._mapProceduresToExports) {
const exports = {};
for (const procedureName of Object.keys(defaultCtx.allserver.procedures)) {
exports[procedureName] = async (event) =>
new Promise((resolve) => {
const path = "/" + procedureName;
const ctx = {
...defaultCtx,
lambda: { event, resolve, path, query: { ...(event.queryStringParameters || {}) } },
};
return async (event, context) => {
return new Promise((resolve) => {
const lambda = { event, context, resolve };

this._handleRequest(ctx);
});
}
return exports;
}
const path = (event && (event.path || event.requestContext?.http?.path)) || undefined;
lambda.isHttp = Boolean(path);

return async (event) => {
return new Promise((resolve) => {
const ctx = {
...defaultCtx,
lambda: { event, resolve, path: event.path, query: { ...(event.queryStringParameters || {}) } },
};
if (lambda.isHttp) {
lambda.http = {
path,
query: event?.queryStringParameters,
body: event?.body,
headers: event?.headers,
};
} else {
lambda.invoke = { callContext: event?.callContext, callArg: event?.callArg };
}

this._handleRequest(ctx);
this._handleRequest({ ...defaultCtx, lambda });
});
};
},

getProcedureName(ctx) {
return ctx.lambda.path.substr(1);
const { isHttp, http, invoke } = ctx.lambda;
return isHttp ? http.path.substr(1) : invoke.callContext?.procedureName;
},

isIntrospection(ctx) {
Expand All @@ -85,12 +83,16 @@ module.exports = require("./Transport").compose({
},

reply(ctx) {
if (!ctx.lambda.statusCode) ctx.lambda.statusCode = 200;
ctx.lambda.resolve({
statusCode: ctx.lambda.statusCode,
headers: { "content-type": "application/json" },
body: JSON.stringify(ctx.result),
});
if (ctx.lambda.isHttp) {
if (!ctx.lambda.statusCode) ctx.lambda.statusCode = 200;
ctx.lambda.resolve({
statusCode: ctx.lambda.statusCode,
headers: { "content-type": "application/json" },
body: JSON.stringify(ctx.result),
});
} else {
ctx.lambda.resolve(ctx.result);
}
},
},
});
6 changes: 5 additions & 1 deletion test/integration/LocalLambdaClientTransport.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ module.exports = require("../../ClientTransport").compose({

methods: {
async introspect() {
return this.call(this.createCallContext({ procedureName: "" }));
const result = await this.call(this.createCallContext({ procedureName: "" }));
// The server-side Transport will not have the call result if introspection is not available on the server side,
// but the server itself is up and running processing calls.
if (!result.procedures) throw Error("The lambda introspection call returned nothing"); // The ClientTransport expects us to throw if call fails
return result;
},

async call({ procedureName = "", arg }) {
Expand Down
Loading