From 5f9e0f86a7c30d2469453fe889bb402c93da2926 Mon Sep 17 00:00:00 2001 From: fenos Date: Tue, 17 Sep 2024 11:49:07 +0200 Subject: [PATCH] fix: improve request cancellation to keep the s3Client queue low --- src/app.ts | 1 + src/config.ts | 12 +++ src/http/plugins/index.ts | 1 + src/http/plugins/signals.ts | 43 ++++++++++ src/http/routes/object/getObject.ts | 1 + src/http/routes/object/getPublicObject.ts | 1 + src/http/routes/object/getSignedObject.ts | 1 + .../routes/render/renderAuthenticatedImage.ts | 1 + src/http/routes/render/renderPublicImage.ts | 1 + src/http/routes/render/renderSignedImage.ts | 1 + src/http/routes/s3/commands/get-object.ts | 19 +++-- src/http/routes/s3/index.ts | 4 + src/http/routes/s3/router.ts | 8 +- src/http/routes/tus/index.ts | 3 + src/storage/backend/adapter.ts | 3 +- src/storage/backend/index.ts | 12 ++- src/storage/backend/s3.ts | 84 ++++++++++++++----- src/storage/protocols/s3/s3-handler.ts | 6 +- src/storage/renderer/asset.ts | 16 ++-- src/storage/renderer/image.ts | 1 + src/storage/renderer/renderer.ts | 5 ++ src/test/render-routes.test.ts | 6 +- 22 files changed, 187 insertions(+), 43 deletions(-) create mode 100644 src/http/plugins/signals.ts diff --git a/src/app.ts b/src/app.ts index 4e97b103..a61bff9b 100644 --- a/src/app.ts +++ b/src/app.ts @@ -50,6 +50,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => { app.addSchema(schemas.authSchema) app.addSchema(schemas.errorSchema) + app.register(plugins.signals) app.register(plugins.tenantId) app.register(plugins.metrics({ enabledEndpoint: !isMultitenant })) app.register(plugins.tracing) diff --git a/src/config.ts b/src/config.ts index cd414e38..90a8e03c 100644 --- a/src/config.ts +++ b/src/config.ts @@ -25,6 +25,9 @@ type StorageConfigType = { storageS3Endpoint?: string storageS3ForcePathStyle?: boolean storageS3Region: string + storageS3ClientTimeout: number + storageS3UploadTimeout: number + storageS3DownloadTimeout: number isMultitenant: boolean jwtSecret: string jwtAlgorithm: string @@ -274,6 +277,15 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { getOptionalConfigFromEnv('STORAGE_S3_FORCE_PATH_STYLE', 'GLOBAL_S3_FORCE_PATH_STYLE') === 'true', storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string, + storageS3ClientTimeout: Number( + getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `${1000 * 600}` // 10m + ), + storageS3DownloadTimeout: Number( + getOptionalConfigFromEnv('STORAGE_S3_DOWNLOAD_TIMEOUT') || `${1000 * 43200}` //12h + ), + storageS3UploadTimeout: Number( + getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_TIMEOUT') || `${1000 * 1200}` // 20m + ), // DB - Migrations dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon', diff --git a/src/http/plugins/index.ts b/src/http/plugins/index.ts index cd6be474..bdca1c09 100644 --- a/src/http/plugins/index.ts +++ b/src/http/plugins/index.ts @@ -9,3 +9,4 @@ export * from './metrics' export * from './xml' export * from './signature-v4' export * from './tracing' +export * from './signals' diff --git a/src/http/plugins/signals.ts b/src/http/plugins/signals.ts new file mode 100644 index 00000000..77af1264 --- /dev/null +++ b/src/http/plugins/signals.ts @@ -0,0 +1,43 @@ +import fastifyPlugin from 'fastify-plugin' +import { FastifyInstance } from 'fastify' + +declare module 'fastify' { + interface FastifyRequest { + signals: { + body: AbortController + response: AbortController + disconnect: AbortController + } + } +} + +export const signals = fastifyPlugin( + async function (fastify: FastifyInstance) { + fastify.addHook('onRequest', async (req, res) => { + req.signals = { + body: new AbortController(), + response: new AbortController(), + disconnect: new AbortController(), + } + + // Client terminated the request before the body was fully received + res.raw.once('close', () => { + req.signals.response.abort() + + if (!req.signals.disconnect.signal.aborted) { + req.signals.disconnect.abort() + } + }) + }) + + // Client terminated the request before the body was fully sent + fastify.addHook('onRequestAbort', async (req) => { + req.signals.body.abort() + + if (!req.signals.disconnect.signal.aborted) { + req.signals.disconnect.abort() + } + }) + }, + { name: 'request-signals' } +) diff --git a/src/http/routes/object/getObject.ts b/src/http/routes/object/getObject.ts index c91b1fd1..74fc8ba2 100644 --- a/src/http/routes/object/getObject.ts +++ b/src/http/routes/object/getObject.ts @@ -78,6 +78,7 @@ async function requestHandler( key: s3Key, version: obj.version, download, + signal: request.signals.disconnect.signal, }) } diff --git a/src/http/routes/object/getPublicObject.ts b/src/http/routes/object/getPublicObject.ts index e5611111..74095e12 100644 --- a/src/http/routes/object/getPublicObject.ts +++ b/src/http/routes/object/getPublicObject.ts @@ -66,6 +66,7 @@ export default async function routes(fastify: FastifyInstance) { key: s3Key, version: obj.version, download, + signal: request.signals.disconnect.signal, }) } ) diff --git a/src/http/routes/object/getSignedObject.ts b/src/http/routes/object/getSignedObject.ts index 07929663..dd46eb60 100644 --- a/src/http/routes/object/getSignedObject.ts +++ b/src/http/routes/object/getSignedObject.ts @@ -92,6 +92,7 @@ export default async function routes(fastify: FastifyInstance) { version: obj.version, download, expires: new Date(exp * 1000).toUTCString(), + signal: request.signals.disconnect.signal, }) } ) diff --git a/src/http/routes/render/renderAuthenticatedImage.ts b/src/http/routes/render/renderAuthenticatedImage.ts index b86d0717..51d0ae3a 100644 --- a/src/http/routes/render/renderAuthenticatedImage.ts +++ b/src/http/routes/render/renderAuthenticatedImage.ts @@ -61,6 +61,7 @@ export default async function routes(fastify: FastifyInstance) { key: s3Key, version: obj.version, download, + signal: request.signals.disconnect.signal, }) } ) diff --git a/src/http/routes/render/renderPublicImage.ts b/src/http/routes/render/renderPublicImage.ts index 092564b4..09ae3ed8 100644 --- a/src/http/routes/render/renderPublicImage.ts +++ b/src/http/routes/render/renderPublicImage.ts @@ -66,6 +66,7 @@ export default async function routes(fastify: FastifyInstance) { key: s3Key, version: obj.version, download, + signal: request.signals.disconnect.signal, }) } ) diff --git a/src/http/routes/render/renderSignedImage.ts b/src/http/routes/render/renderSignedImage.ts index e9625ed5..79e92441 100644 --- a/src/http/routes/render/renderSignedImage.ts +++ b/src/http/routes/render/renderSignedImage.ts @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) { version: obj.version, download, expires: new Date(exp * 1000).toUTCString(), + signal: request.signals.disconnect.signal, }) } ) diff --git a/src/http/routes/s3/commands/get-object.ts b/src/http/routes/s3/commands/get-object.ts index 180bcf9f..34016bdf 100644 --- a/src/http/routes/s3/commands/get-object.ts +++ b/src/http/routes/s3/commands/get-object.ts @@ -63,13 +63,18 @@ export default function GetObject(s3Router: S3Router) { const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner) const ifModifiedSince = req.Headers?.['if-modified-since'] - return s3Protocol.getObject({ - Bucket: req.Params.Bucket, - Key: req.Params['*'], - Range: req.Headers?.['range'], - IfNoneMatch: req.Headers?.['if-none-match'], - IfModifiedSince: ifModifiedSince ? new Date(ifModifiedSince) : undefined, - }) + return s3Protocol.getObject( + { + Bucket: req.Params.Bucket, + Key: req.Params['*'], + Range: req.Headers?.['range'], + IfNoneMatch: req.Headers?.['if-none-match'], + IfModifiedSince: ifModifiedSince ? new Date(ifModifiedSince) : undefined, + }, + { + signal: ctx.signals.response, + } + ) } ) } diff --git a/src/http/routes/s3/index.ts b/src/http/routes/s3/index.ts index 67cf6216..2eba32c2 100644 --- a/src/http/routes/s3/index.ts +++ b/src/http/routes/s3/index.ts @@ -60,6 +60,10 @@ export default async function routes(fastify: FastifyInstance) { storage: req.storage, tenantId: req.tenantId, owner: req.owner, + signals: { + body: req.signals.body.signal, + response: req.signals.response.signal, + }, }) const headers = output.headers diff --git a/src/http/routes/s3/router.ts b/src/http/routes/s3/router.ts index e3c0b8fa..dbc1361a 100644 --- a/src/http/routes/s3/router.ts +++ b/src/http/routes/s3/router.ts @@ -22,7 +22,13 @@ import { default as ListParts } from './commands/list-parts' import { default as UploadPartCopy } from './commands/upload-part-copy' import { JTDDataType } from 'ajv/dist/jtd' -export type Context = { storage: Storage; tenantId: string; owner?: string; req: FastifyRequest } +export type Context = { + storage: Storage + tenantId: string + owner?: string + req: FastifyRequest + signals: { body: AbortSignal; response: AbortSignal } +} export type S3Router = Router const s3Commands = [ diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index 9e9a30e9..6347ff6d 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -35,6 +35,7 @@ const { storageS3Endpoint, storageS3ForcePathStyle, storageS3Region, + storageS3UploadTimeout, tusUrlExpiryMs, tusPath, tusPartSize, @@ -67,6 +68,8 @@ function createTusStore() { s3ClientConfig: { requestHandler: new NodeHttpHandler({ ...agent, + connectionTimeout: 5000, + requestTimeout: storageS3UploadTimeout, }), bucket: storageS3Bucket, region: storageS3Region, diff --git a/src/storage/backend/adapter.ts b/src/storage/backend/adapter.ts index 9981277b..94fbfc5e 100644 --- a/src/storage/backend/adapter.ts +++ b/src/storage/backend/adapter.ts @@ -62,7 +62,8 @@ export abstract class StorageBackendAdapter { bucketName: string, key: string, version: string | undefined, - headers?: BrowserCacheHeaders + headers?: BrowserCacheHeaders, + signal?: AbortSignal ): Promise { throw new Error('getObject not implemented') } diff --git a/src/storage/backend/index.ts b/src/storage/backend/index.ts index 903a5cc1..608a2bcf 100644 --- a/src/storage/backend/index.ts +++ b/src/storage/backend/index.ts @@ -7,7 +7,14 @@ export * from './s3' export * from './file' export * from './adapter' -const { storageS3Region, storageS3Endpoint, storageS3ForcePathStyle } = getConfig() +const { + storageS3Region, + storageS3Endpoint, + storageS3ForcePathStyle, + storageS3ClientTimeout, + storageS3UploadTimeout, + storageS3DownloadTimeout, +} = getConfig() type ConfigForStorage = Type extends 's3' ? S3ClientOptions @@ -26,6 +33,9 @@ export function createStorageBackend( region: storageS3Region, endpoint: storageS3Endpoint, forcePathStyle: storageS3ForcePathStyle, + requestTimeout: storageS3ClientTimeout, + uploadTimeout: storageS3UploadTimeout, + downloadTimeout: storageS3DownloadTimeout, ...(config ? config : {}), } storageBackend = new S3Backend(defaultOptions) diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index 3588dad5..53f8bbba 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -41,6 +41,7 @@ export function createAgent(protocol: 'http' | 'https') { const agentOptions = { maxSockets: storageS3MaxSockets, keepAlive: true, + keepAliveMsecs: 1000, } return protocol === 'http' @@ -56,6 +57,9 @@ export interface S3ClientOptions { secretKey?: string role?: string httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent } + requestTimeout?: number + downloadTimeout?: number + uploadTimeout?: number } /** @@ -64,25 +68,27 @@ export interface S3ClientOptions { */ export class S3Backend implements StorageBackendAdapter { client: S3Client + uploadClient: S3Client + downloadClient: S3Client constructor(options: S3ClientOptions) { - const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https' - const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol) + // Default client for API operations + this.client = this.createS3Client({ + ...options, + requestTimeout: options.requestTimeout, + }) - const params: S3ClientConfig = { - region: options.region, - runtime: 'node', - requestHandler: new NodeHttpHandler({ - ...agent, - }), - } - if (options.endpoint) { - params.endpoint = options.endpoint - } - if (options.forcePathStyle) { - params.forcePathStyle = true - } - this.client = new S3Client(params) + // Upload client exclusively for upload operations + this.uploadClient = this.createS3Client({ + ...options, + requestTimeout: options.uploadTimeout, + }) + + // Download client exclusively for download operations + this.downloadClient = this.createS3Client({ + ...options, + requestTimeout: options.downloadTimeout, + }) } /** @@ -91,12 +97,14 @@ export class S3Backend implements StorageBackendAdapter { * @param key * @param version * @param headers + * @param signal */ async getObject( bucketName: string, key: string, version: string | undefined, - headers?: BrowserCacheHeaders + headers?: BrowserCacheHeaders, + signal?: AbortSignal ): Promise { const input: GetObjectCommandInput = { Bucket: bucketName, @@ -108,7 +116,9 @@ export class S3Backend implements StorageBackendAdapter { input.IfModifiedSince = new Date(headers.ifModifiedSince) } const command = new GetObjectCommand(input) - const data = await this.client.send(command) + const data = await this.downloadClient.send(command, { + abortSignal: signal, + }) return { metadata: { @@ -145,7 +155,7 @@ export class S3Backend implements StorageBackendAdapter { ): Promise { try { const paralellUploadS3 = new Upload({ - client: this.client, + client: this.uploadClient, params: { Bucket: bucketName, Key: withOptionalVersion(key, version), @@ -221,7 +231,7 @@ export class S3Backend implements StorageBackendAdapter { CopySourceIfModifiedSince: conditions?.ifModifiedSince, CopySourceIfUnmodifiedSince: conditions?.ifUnmodifiedSince, }) - const data = await this.client.send(command) + const data = await this.uploadClient.send(command) return { httpStatusCode: data.$metadata.httpStatusCode || 200, eTag: data.CopyObjectResult?.ETag || '', @@ -335,7 +345,8 @@ export class S3Backend implements StorageBackendAdapter { uploadId: string, partNumber: number, body?: string | Uint8Array | Buffer | Readable, - length?: number + length?: number, + signal?: AbortSignal ) { const paralellUploadS3 = new UploadPartCommand({ Bucket: bucketName, @@ -346,7 +357,11 @@ export class S3Backend implements StorageBackendAdapter { ContentLength: length, }) - const resp = await this.client.send(paralellUploadS3) + const resp = await this.uploadClient.send(paralellUploadS3, { + // overwriting the requestTimeout here to avoid the request being cancelled, as the upload can take a long time for a max 5GB upload + requestTimeout: 0, + abortSignal: signal, + }) return { version, @@ -428,11 +443,34 @@ export class S3Backend implements StorageBackendAdapter { CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined, }) - const part = await this.client.send(uploadPartCopy) + const part = await this.uploadClient.send(uploadPartCopy) return { eTag: part.CopyPartResult?.ETag, lastModified: part.CopyPartResult?.LastModified, } } + + protected createS3Client(options: S3ClientOptions) { + const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https' + + const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol) + + const params: S3ClientConfig = { + region: options.region, + runtime: 'node', + requestHandler: new NodeHttpHandler({ + ...agent, + connectionTimeout: 5000, + requestTimeout: options.requestTimeout, + }), + } + if (options.endpoint) { + params.endpoint = options.endpoint + } + if (options.forcePathStyle) { + params.forcePathStyle = true + } + return new S3Client(params) + } } diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index c6340529..f164b32d 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -839,8 +839,9 @@ export class S3ProtocolHandler { * Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html * * @param command + * @param options */ - async getObject(command: GetObjectCommandInput) { + async getObject(command: GetObjectCommandInput, options?: { signal?: AbortSignal }) { const bucket = command.Bucket as string const key = command.Key as string @@ -853,7 +854,8 @@ export class S3ProtocolHandler { ifModifiedSince: command.IfModifiedSince?.toISOString(), ifNoneMatch: command.IfNoneMatch, range: command.Range, - } + }, + options?.signal ) let metadataHeaders: Record = {} diff --git a/src/storage/renderer/asset.ts b/src/storage/renderer/asset.ts index 92ada546..0b3c8351 100644 --- a/src/storage/renderer/asset.ts +++ b/src/storage/renderer/asset.ts @@ -12,10 +12,16 @@ export class AssetRenderer extends Renderer { } getAsset(request: FastifyRequest, options: RenderOptions) { - return this.backend.getObject(options.bucket, options.key, options.version, { - ifModifiedSince: request.headers['if-modified-since'], - ifNoneMatch: request.headers['if-none-match'], - range: request.headers.range, - }) + return this.backend.getObject( + options.bucket, + options.key, + options.version, + { + ifModifiedSince: request.headers['if-modified-since'], + ifNoneMatch: request.headers['if-none-match'], + range: request.headers.range, + }, + options.signal + ) } } diff --git a/src/storage/renderer/image.ts b/src/storage/renderer/image.ts index 5b60229f..40de8f28 100644 --- a/src/storage/renderer/image.ts +++ b/src/storage/renderer/image.ts @@ -204,6 +204,7 @@ export class ImageRenderer extends Renderer { const response = await this.getClient().get(url.join('/'), { responseType: 'stream', + signal: options.signal, headers: acceptHeader ? { accept: acceptHeader, diff --git a/src/storage/renderer/renderer.ts b/src/storage/renderer/renderer.ts index 529f497e..2e43d44f 100644 --- a/src/storage/renderer/renderer.ts +++ b/src/storage/renderer/renderer.ts @@ -11,6 +11,7 @@ export interface RenderOptions { download?: string expires?: string object?: Obj + signal?: AbortSignal } export interface AssetResponse { @@ -39,6 +40,10 @@ export abstract class Renderer { */ async render(request: FastifyRequest, response: FastifyReply, options: RenderOptions) { try { + if (options.signal?.aborted) { + return response.send({ error: 'Request aborted', statusCode: '499' }) + } + const data = await this.getAsset(request, options) this.setHeaders(request, response, data, options) diff --git a/src/test/render-routes.test.ts b/src/test/render-routes.test.ts index c1e90698..f7c6c60b 100644 --- a/src/test/render-routes.test.ts +++ b/src/test/render-routes.test.ts @@ -43,7 +43,7 @@ describe('image rendering routes', () => { expect(S3Backend.prototype.privateAssetUrl).toBeCalledTimes(1) expect(axiosSpy).toBeCalledWith( '/public/height:100/width:100/resizing_type:fill/plain/local:///data/sadcat.jpg', - { responseType: 'stream' } + { responseType: 'stream', signal: expect.any(AbortSignal) } ) }) @@ -61,7 +61,7 @@ describe('image rendering routes', () => { expect(S3Backend.prototype.privateAssetUrl).toBeCalledTimes(1) expect(axiosSpy).toBeCalledWith( '/public/height:100/width:100/resizing_type:fill/plain/local:///data/sadcat.jpg', - { responseType: 'stream' } + { responseType: 'stream', signal: expect.any(AbortSignal) } ) }) @@ -97,7 +97,7 @@ describe('image rendering routes', () => { expect(S3Backend.prototype.privateAssetUrl).toBeCalledTimes(1) expect(axiosSpy).toBeCalledWith( '/public/height:100/width:100/resizing_type:fit/plain/local:///data/sadcat.jpg', - { responseType: 'stream' } + { responseType: 'stream', signal: expect.any(AbortSignal) } ) }) })