Skip to content

Commit

Permalink
split into callLambdaSync(), callLambdaAsync(), callLambdaStreaming()
Browse files Browse the repository at this point in the history
  • Loading branch information
JonnyBurger committed Jan 5, 2025
1 parent 69daa68 commit 2469b75
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 315 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"ci": "turbo run make test --concurrency=1 --no-update-notifier",
"watch": "turbo watch make --concurrency=2",
"watchwebcodecs": "turbo watch make --filter='@remotion/media-parser' --filter='@remotion/webcodecs'",
"watchserverless": "turbo watch make --filter='@remotion/lambda' --filter='@remotion/serverless' --filter='@remotion/streaming'",
"release-alpha": "pnpm recursive publish --tag=alpha --no-git-checks && pnpm recursive run --sequential publishprivate",
"release": "pnpm recursive publish && pnpm recursive run --sequential publishprivate && git push --tags && git push",
"clean": "turbo run clean && rm -rf packages/**/dist && rm -rf packages/**/node_modules && rm -rf node_modules && rm -rf .cache && rm -rf packages/**/tsconfig.tsbuildinfo && rm -f packages/tsconfig.tsbuildinfo && rm -rf packages/**/.turbo && rm -rf .turbo"
Expand Down
4 changes: 2 additions & 2 deletions packages/lambda/src/api/get-compositions-on-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {VideoConfig} from 'remotion/no-react';
import {VERSION} from 'remotion/version';
import type {AwsRegion} from '../client';
import {awsImplementation} from '../functions/aws-implementation';
import {callLambda} from '../shared/call-lambda';
import {callLambdaSync} from '../shared/call-lambda-sync';

export type GetCompositionsOnLambdaInput = {
chromiumOptions?: ChromiumOptions;
Expand Down Expand Up @@ -66,7 +66,7 @@ export const getCompositionsOnLambda = async ({
});

try {
const res = await callLambda({
const res = await callLambdaSync({
functionName,
type: ServerlessRoutines.compositions,
payload: {
Expand Down
4 changes: 2 additions & 2 deletions packages/lambda/src/api/get-render-progress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import {getProgress} from '../functions/helpers/get-progress';
import {parseFunctionName} from '../functions/helpers/parse-function-name';
import type {AwsRegion} from '../regions';
import {callLambda} from '../shared/call-lambda';
import {callLambdaSync} from '../shared/call-lambda-sync';
import type {RenderProgress} from '../shared/constants';
import {getRenderProgressPayload} from './make-lambda-payload';

Expand Down Expand Up @@ -56,7 +56,7 @@ export const getRenderProgress = async (
});
}

const result = await callLambda<AwsProvider, ServerlessRoutines.status>({
const result = await callLambdaSync<AwsProvider, ServerlessRoutines.status>({
functionName: input.functionName,
type: ServerlessRoutines.status,
payload: getRenderProgressPayload(input),
Expand Down
4 changes: 2 additions & 2 deletions packages/lambda/src/api/render-media-on-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import type {
import {ServerlessRoutines} from '@remotion/serverless/client';
import type {AwsProvider} from '../functions/aws-implementation';
import type {AwsRegion} from '../regions';
import {callLambda} from '../shared/call-lambda';
import {callLambdaSync} from '../shared/call-lambda-sync';
import {
getCloudwatchMethodUrl,
getCloudwatchRendererUrl,
Expand Down Expand Up @@ -88,7 +88,7 @@ export const internalRenderMediaOnLambdaRaw = async (
const {functionName, region, rendererFunctionName} = input;

try {
const res = await callLambda({
const res = await callLambdaSync({
functionName,
type: ServerlessRoutines.start,
payload: await makeLambdaRenderMediaPayload(input),
Expand Down
2 changes: 1 addition & 1 deletion packages/lambda/src/api/render-still-on-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type {
import type {BrowserSafeApis} from '@remotion/renderer/client';
import type {DownloadBehavior} from '@remotion/serverless/client';
import {ServerlessRoutines} from '@remotion/serverless/client';
import {callLambdaWithStreaming} from '../shared/call-lambda';
import {callLambdaWithStreaming} from '../shared/call-lambda-streaming';

import {wrapWithErrorHandling} from '@remotion/renderer/error-handling';
import type {
Expand Down
2 changes: 1 addition & 1 deletion packages/lambda/src/functions/helpers/stream-renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {ServerlessPayload} from '@remotion/serverless/client';
import {ServerlessRoutines} from '@remotion/serverless/client';
import {writeFileSync} from 'fs';
import {join} from 'path';
import {callLambdaWithStreaming} from '../../shared/call-lambda';
import {callLambdaWithStreaming} from '../../shared/call-lambda-streaming';
import type {OverallProgressHelper} from './overall-render-progress';

type StreamRendererResponse =
Expand Down
33 changes: 8 additions & 25 deletions packages/lambda/src/functions/start.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {InvokeCommand} from '@aws-sdk/client-lambda';
import type {CloudProvider, ProviderSpecifics} from '@remotion/serverless';
import type {ServerlessPayload} from '@remotion/serverless/client';
import {
Expand All @@ -8,7 +7,7 @@ import {
} from '@remotion/serverless/client';
import {VERSION} from 'remotion/version';
import type {AwsRegion} from '../regions';
import {getLambdaClient} from '../shared/aws-clients';
import {callLambdaAsync} from '../shared/call-lambda-async';
import {validateDeleteAfter} from './helpers/lifecycle';
import {makeInitialOverallRenderProgress} from './helpers/overall-render-progress';

Expand Down Expand Up @@ -123,29 +122,13 @@ export const startHandler = async <Provider extends CloudProvider>(
metadata: params.metadata,
};

const stringifiedPayload = JSON.stringify(payload);

if (stringifiedPayload.length > 256 * 1024) {
throw new Error(
`Payload is too big: ${stringifiedPayload.length} bytes. Maximum size is 256 KB. This should not happen, please report this to the Remotion team. Payload: ${stringifiedPayload}`,
);
}

// Don't replace with callLambda(), we want to return before the render is snone
const result = await getLambdaClient(
providerSpecifics.getCurrentRegionInFunction() as AwsRegion,
).send(
new InvokeCommand({
FunctionName: process.env.AWS_LAMBDA_FUNCTION_NAME,
Payload: stringifiedPayload,
InvocationType: 'Event',
}),
);
if (result.FunctionError) {
throw new Error(
`Lambda function returned error: ${result.FunctionError} ${result.LogResult}`,
);
}
await callLambdaAsync({
functionName: process.env.AWS_LAMBDA_FUNCTION_NAME as string,
type: ServerlessRoutines.launch,
payload,
region: region as AwsRegion,
timeoutInTest: options.timeoutInMilliseconds,
});

await initialFile;

Expand Down
37 changes: 37 additions & 0 deletions packages/lambda/src/shared/call-lambda-async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {InvokeCommand} from '@aws-sdk/client-lambda';
import type {CloudProvider} from '@remotion/serverless';
import type {ServerlessRoutines} from '@remotion/serverless/client';
import type {AwsRegion} from '../regions';
import {getLambdaClient} from './aws-clients';
import type {CallLambdaOptions} from './call-lambda';

export const callLambdaAsync = async <
T extends ServerlessRoutines,
Provider extends CloudProvider,
>({
functionName,
payload,
region,
timeoutInTest,
}: CallLambdaOptions<T, Provider>): Promise<void> => {
const stringifiedPayload = JSON.stringify(payload);
if (stringifiedPayload.length > 256 * 1024) {
throw new Error(
`Payload is too big: ${stringifiedPayload.length} bytes. Maximum size is 256 KB. This should not happen, please report this to the Remotion team. Payload: ${stringifiedPayload}`,
);
}

const result = await getLambdaClient(region as AwsRegion, timeoutInTest).send(
new InvokeCommand({
FunctionName: functionName,
Payload: stringifiedPayload,
InvocationType: 'Event',
}),
);

if (result.FunctionError) {
throw new Error(
`Lambda function returned error: ${result.FunctionError} ${result.LogResult}`,
);
}
};
218 changes: 218 additions & 0 deletions packages/lambda/src/shared/call-lambda-streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import type {InvokeWithResponseStreamCommandOutput} from '@aws-sdk/client-lambda';
import {
InvokeWithResponseStreamCommand,
type InvokeWithResponseStreamResponseEvent,
} from '@aws-sdk/client-lambda';
import type {
CloudProvider,
OnMessage,
StreamingMessage,
} from '@remotion/serverless';
import type {
MessageTypeId,
ServerlessRoutines,
} from '@remotion/serverless/client';
import {
formatMap,
messageTypeIdToMessageType,
} from '@remotion/serverless/client';
import {makeStreamer} from '@remotion/streaming';
import type {AwsRegion} from '../regions';
import {getLambdaClient} from './aws-clients';
import type {CallLambdaOptions} from './call-lambda';

const STREAM_STALL_TIMEOUT = 30000;
const LAMBDA_STREAM_STALL = `AWS did not invoke Lambda in ${STREAM_STALL_TIMEOUT}ms`;

const parseJsonOrThrowSource = (data: Uint8Array, type: string) => {
const asString = new TextDecoder('utf-8').decode(data);
try {
return JSON.parse(asString);
} catch {
throw new Error(`Invalid JSON (${type}): ${asString}`);
}
};

const invokeStreamOrTimeout = async <Provider extends CloudProvider>({
region,
timeoutInTest,
functionName,
type,
payload,
}: {
region: Provider['region'];
timeoutInTest: number;
functionName: string;
type: string;
payload: Record<string, unknown>;
}) => {
const resProm = getLambdaClient(region as AwsRegion, timeoutInTest).send(
new InvokeWithResponseStreamCommand({
FunctionName: functionName,
Payload: JSON.stringify({type, ...payload}),
}),
);

let cleanup = () => undefined;

const timeout = new Promise<InvokeWithResponseStreamCommandOutput>(
(_resolve, reject) => {
const int = setTimeout(() => {
reject(new Error(LAMBDA_STREAM_STALL));
}, STREAM_STALL_TIMEOUT);
cleanup = () => {
clearTimeout(int);
};
},
);

const res = await Promise.race([resProm, timeout]);

cleanup();

return res;
};

const INVALID_JSON_MESSAGE = 'Cannot parse Lambda response as JSON';

const callLambdaWithStreamingWithoutRetry = async <
T extends ServerlessRoutines,
Provider extends CloudProvider,
>({
functionName,
type,
payload,
region,
timeoutInTest,
receivedStreamingPayload,
}: CallLambdaOptions<T, Provider> & {
receivedStreamingPayload: OnMessage<Provider>;
}): Promise<void> => {
const res = await invokeStreamOrTimeout({
functionName,
payload,
region,
timeoutInTest,
type,
});

const {onData, clear} = makeStreamer((status, messageTypeId, data) => {
const messageType = messageTypeIdToMessageType(
messageTypeId as MessageTypeId,
);
const innerPayload =
formatMap[messageType] === 'json'
? parseJsonOrThrowSource(data, messageType)
: data;

const message: StreamingMessage<Provider> = {
successType: status,
message: {
type: messageType,
payload: innerPayload,
},
};

receivedStreamingPayload(message);
});

const dumpBuffers = () => {
clear();
};

// @ts-expect-error - We are adding a listener to a global variable
if (globalThis._dumpUnreleasedBuffers) {
// @ts-expect-error - We are adding a listener to a global variable
(globalThis._dumpUnreleasedBuffers as EventEmitter).addListener(
'dump-unreleased-buffers',
dumpBuffers,
);
}

const events =
res.EventStream as AsyncIterable<InvokeWithResponseStreamResponseEvent>;

for await (const event of events) {
// There are two types of events you can get on a stream.

// `PayloadChunk`: These contain the actual raw bytes of the chunk
// It has a single property: `Payload`
if (event.PayloadChunk && event.PayloadChunk.Payload) {
onData(event.PayloadChunk.Payload);
}

if (event.InvokeComplete) {
if (event.InvokeComplete.ErrorCode) {
const logs = `https://${region}.console.aws.amazon.com/cloudwatch/home?region=${region}#logsV2:logs-insights$3FqueryDetail$3D~(end~0~start~-3600~timeType~'RELATIVE~unit~'seconds~editorString~'fields*20*40timestamp*2c*20*40requestId*2c*20*40message*0a*7c*20filter*20*40requestId*20like*20*${res.$metadata.requestId}*22*0a*7c*20sort*20*40timestamp*20asc~source~(~'*2faws*2flambda*2f${functionName}))`;
if (event.InvokeComplete.ErrorCode === 'Unhandled') {
throw new Error(
`Lambda function ${functionName} failed with an unhandled error: ${
event.InvokeComplete.ErrorDetails as string
} See ${logs} to see the logs of this invocation.`,
);
}

throw new Error(
`Lambda function ${functionName} failed with error code ${event.InvokeComplete.ErrorCode}: ${event.InvokeComplete.ErrorDetails}. See ${logs} to see the logs of this invocation.`,
);
}
}

// Don't put a `break` statement here, as it will cause the socket to not properly exit.
}

// @ts-expect-error - We are adding a listener to a global variable
if (globalThis._dumpUnreleasedBuffers) {
// @ts-expect-error - We are adding a listener to a global variable
(globalThis._dumpUnreleasedBuffers as EventEmitter).removeListener(
'dump-unreleased-buffers',
dumpBuffers,
);
}

clear();
};

export const callLambdaWithStreaming = async <
Provider extends CloudProvider,
T extends ServerlessRoutines,
>(
options: CallLambdaOptions<T, Provider> & {
receivedStreamingPayload: OnMessage<Provider>;
retriesRemaining: number;
},
): Promise<void> => {
// As of August 2023, Lambda streaming sometimes misses parts of the JSON response.
// Handling this for now by applying a retry mechanism.

try {
// Do not remove this await
await callLambdaWithStreamingWithoutRetry<T, Provider>(options);
} catch (err) {
if ((err as Error).stack?.includes('TooManyRequestsException')) {
throw new Error(
`AWS Concurrency limit reached (Original Error: ${(err as Error).message}). See https://www.remotion.dev/docs/lambda/troubleshooting/rate-limit for tips to fix this.`,
);
}

if (
!(err as Error).message.includes(INVALID_JSON_MESSAGE) &&
!(err as Error).message.includes(LAMBDA_STREAM_STALL) &&
!(err as Error).message.includes('aborted')
) {
throw err;
}

console.error('Retries remaining:', options.retriesRemaining);
if (options.retriesRemaining === 0) {
console.error('Throwing error:');
throw err;
}

console.error(err);
return callLambdaWithStreaming({
...options,
retriesRemaining: options.retriesRemaining - 1,
});
}
};
Loading

0 comments on commit 2469b75

Please sign in to comment.