Skip to content

Commit

Permalink
fix: improve request cancellation to keep the s3Client queue low (#548)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Sep 18, 2024
1 parent 53cbaa7 commit e47e612
Show file tree
Hide file tree
Showing 22 changed files with 187 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => {
app.addSchema(schemas.authSchema)
app.addSchema(schemas.errorSchema)

app.register(plugins.signals)
app.register(plugins.tenantId)
app.register(plugins.metrics({ enabledEndpoint: !isMultitenant }))
app.register(plugins.tracing)
Expand Down
12 changes: 12 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type StorageConfigType = {
storageS3Endpoint?: string
storageS3ForcePathStyle?: boolean
storageS3Region: string
storageS3ClientTimeout: number
storageS3UploadTimeout: number
storageS3DownloadTimeout: number
isMultitenant: boolean
jwtSecret: string
jwtAlgorithm: string
Expand Down Expand Up @@ -274,6 +277,15 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
getOptionalConfigFromEnv('STORAGE_S3_FORCE_PATH_STYLE', 'GLOBAL_S3_FORCE_PATH_STYLE') ===
'true',
storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string,
storageS3ClientTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `${1000 * 600}` // 10m
),
storageS3DownloadTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_DOWNLOAD_TIMEOUT') || `${1000 * 43200}` //12h
),
storageS3UploadTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_TIMEOUT') || `${1000 * 1200}` // 20m
),

// DB - Migrations
dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon',
Expand Down
1 change: 1 addition & 0 deletions src/http/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './metrics'
export * from './xml'
export * from './signature-v4'
export * from './tracing'
export * from './signals'
43 changes: 43 additions & 0 deletions src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import fastifyPlugin from 'fastify-plugin'
import { FastifyInstance } from 'fastify'

declare module 'fastify' {
interface FastifyRequest {
signals: {
body: AbortController
response: AbortController
disconnect: AbortController
}
}
}

export const signals = fastifyPlugin(
async function (fastify: FastifyInstance) {
fastify.addHook('onRequest', async (req, res) => {
req.signals = {
body: new AbortController(),
response: new AbortController(),
disconnect: new AbortController(),
}

// Client terminated the request before the body was fully received
res.raw.once('close', () => {
req.signals.response.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
})
})

// Client terminated the request before the body was fully sent
fastify.addHook('onRequestAbort', async (req) => {
req.signals.body.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
})
},
{ name: 'request-signals' }
)
1 change: 1 addition & 0 deletions src/http/routes/object/getObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async function requestHandler(
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/getPublicObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export default async function routes(fastify: FastifyInstance) {
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/getSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export default async function routes(fastify: FastifyInstance) {
version: obj.version,
download,
expires: new Date(exp * 1000).toUTCString(),
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/render/renderAuthenticatedImage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export default async function routes(fastify: FastifyInstance) {
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/render/renderPublicImage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export default async function routes(fastify: FastifyInstance) {
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/render/renderSignedImage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) {
version: obj.version,
download,
expires: new Date(exp * 1000).toUTCString(),
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
19 changes: 12 additions & 7 deletions src/http/routes/s3/commands/get-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ export default function GetObject(s3Router: S3Router) {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)
const ifModifiedSince = req.Headers?.['if-modified-since']

return s3Protocol.getObject({
Bucket: req.Params.Bucket,
Key: req.Params['*'],
Range: req.Headers?.['range'],
IfNoneMatch: req.Headers?.['if-none-match'],
IfModifiedSince: ifModifiedSince ? new Date(ifModifiedSince) : undefined,
})
return s3Protocol.getObject(
{
Bucket: req.Params.Bucket,
Key: req.Params['*'],
Range: req.Headers?.['range'],
IfNoneMatch: req.Headers?.['if-none-match'],
IfModifiedSince: ifModifiedSince ? new Date(ifModifiedSince) : undefined,
},
{
signal: ctx.signals.response,
}
)
}
)
}
4 changes: 4 additions & 0 deletions src/http/routes/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ export default async function routes(fastify: FastifyInstance) {
storage: req.storage,
tenantId: req.tenantId,
owner: req.owner,
signals: {
body: req.signals.body.signal,
response: req.signals.response.signal,
},
})

const headers = output.headers
Expand Down
8 changes: 7 additions & 1 deletion src/http/routes/s3/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ import { default as ListParts } from './commands/list-parts'
import { default as UploadPartCopy } from './commands/upload-part-copy'
import { JTDDataType } from 'ajv/dist/jtd'

export type Context = { storage: Storage; tenantId: string; owner?: string; req: FastifyRequest }
export type Context = {
storage: Storage
tenantId: string
owner?: string
req: FastifyRequest
signals: { body: AbortSignal; response: AbortSignal }
}
export type S3Router = Router<Context>

const s3Commands = [
Expand Down
3 changes: 3 additions & 0 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const {
storageS3Endpoint,
storageS3ForcePathStyle,
storageS3Region,
storageS3UploadTimeout,
tusUrlExpiryMs,
tusPath,
tusPartSize,
Expand Down Expand Up @@ -67,6 +68,8 @@ function createTusStore() {
s3ClientConfig: {
requestHandler: new NodeHttpHandler({
...agent,
connectionTimeout: 5000,
requestTimeout: storageS3UploadTimeout,
}),
bucket: storageS3Bucket,
region: storageS3Region,
Expand Down
3 changes: 2 additions & 1 deletion src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ export abstract class StorageBackendAdapter {
bucketName: string,
key: string,
version: string | undefined,
headers?: BrowserCacheHeaders
headers?: BrowserCacheHeaders,
signal?: AbortSignal
): Promise<ObjectResponse> {
throw new Error('getObject not implemented')
}
Expand Down
12 changes: 11 additions & 1 deletion src/storage/backend/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ export * from './s3'
export * from './file'
export * from './adapter'

const { storageS3Region, storageS3Endpoint, storageS3ForcePathStyle } = getConfig()
const {
storageS3Region,
storageS3Endpoint,
storageS3ForcePathStyle,
storageS3ClientTimeout,
storageS3UploadTimeout,
storageS3DownloadTimeout,
} = getConfig()

type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
Expand All @@ -26,6 +33,9 @@ export function createStorageBackend<Type extends StorageBackendType>(
region: storageS3Region,
endpoint: storageS3Endpoint,
forcePathStyle: storageS3ForcePathStyle,
requestTimeout: storageS3ClientTimeout,
uploadTimeout: storageS3UploadTimeout,
downloadTimeout: storageS3DownloadTimeout,
...(config ? config : {}),
}
storageBackend = new S3Backend(defaultOptions)
Expand Down
Loading

0 comments on commit e47e612

Please sign in to comment.