Skip to content

Commit

Permalink
fix: improve request cancellation to keep the s3Client queue low
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Sep 17, 2024
1 parent 53cbaa7 commit 032f5c5
Show file tree
Hide file tree
Showing 21 changed files with 149 additions and 39 deletions.
1 change: 1 addition & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => {
app.addSchema(schemas.authSchema)
app.addSchema(schemas.errorSchema)

app.register(plugins.signals)
app.register(plugins.tenantId)
app.register(plugins.metrics({ enabledEndpoint: !isMultitenant }))
app.register(plugins.tracing)
Expand Down
4 changes: 4 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type StorageConfigType = {
storageS3Endpoint?: string
storageS3ForcePathStyle?: boolean
storageS3Region: string
storageS3ClientTimeout: number
isMultitenant: boolean
jwtSecret: string
jwtAlgorithm: string
Expand Down Expand Up @@ -274,6 +275,9 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
getOptionalConfigFromEnv('STORAGE_S3_FORCE_PATH_STYLE', 'GLOBAL_S3_FORCE_PATH_STYLE') ===
'true',
storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string,
storageS3ClientTimeout: Number(
getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || '30000'
),

// DB - Migrations
dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon',
Expand Down
1 change: 1 addition & 0 deletions src/http/plugins/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './metrics'
export * from './xml'
export * from './signature-v4'
export * from './tracing'
export * from './signals'
43 changes: 43 additions & 0 deletions src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import fastifyPlugin from 'fastify-plugin'
import { FastifyInstance } from 'fastify'

declare module 'fastify' {
interface FastifyRequest {
signals: {
body: AbortController
response: AbortController
disconnect: AbortController
}
}
}

export const signals = fastifyPlugin(
async function (fastify: FastifyInstance) {
fastify.addHook('onRequest', async (req, res) => {
req.signals = {
body: new AbortController(),
response: new AbortController(),
disconnect: new AbortController(),
}

// Client terminated the request before the body was fully sent
res.raw.once('close', () => {
req.signals.response.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
})

// Client terminated the request before the body was fully received
req.raw.once('close', () => {
req.signals.body.abort()

if (!req.signals.disconnect.signal.aborted) {
req.signals.disconnect.abort()
}
})
})
},
{ name: 'request-signals' }
)
1 change: 1 addition & 0 deletions src/http/routes/object/getObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async function requestHandler(
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}

Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/getPublicObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export default async function routes(fastify: FastifyInstance) {
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/getSignedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export default async function routes(fastify: FastifyInstance) {
version: obj.version,
download,
expires: new Date(exp * 1000).toUTCString(),
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/render/renderAuthenticatedImage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export default async function routes(fastify: FastifyInstance) {
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/render/renderPublicImage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export default async function routes(fastify: FastifyInstance) {
key: s3Key,
version: obj.version,
download,
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/render/renderSignedImage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export default async function routes(fastify: FastifyInstance) {
version: obj.version,
download,
expires: new Date(exp * 1000).toUTCString(),
signal: request.signals.disconnect.signal,
})
}
)
Expand Down
19 changes: 12 additions & 7 deletions src/http/routes/s3/commands/get-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ export default function GetObject(s3Router: S3Router) {
const s3Protocol = new S3ProtocolHandler(ctx.storage, ctx.tenantId, ctx.owner)
const ifModifiedSince = req.Headers?.['if-modified-since']

return s3Protocol.getObject({
Bucket: req.Params.Bucket,
Key: req.Params['*'],
Range: req.Headers?.['range'],
IfNoneMatch: req.Headers?.['if-none-match'],
IfModifiedSince: ifModifiedSince ? new Date(ifModifiedSince) : undefined,
})
return s3Protocol.getObject(
{
Bucket: req.Params.Bucket,
Key: req.Params['*'],
Range: req.Headers?.['range'],
IfNoneMatch: req.Headers?.['if-none-match'],
IfModifiedSince: ifModifiedSince ? new Date(ifModifiedSince) : undefined,
},
{
signal: ctx.signals.response,
}
)
}
)
}
4 changes: 4 additions & 0 deletions src/http/routes/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ export default async function routes(fastify: FastifyInstance) {
storage: req.storage,
tenantId: req.tenantId,
owner: req.owner,
signals: {
body: req.signals.body.signal,
response: req.signals.response.signal,
},
})

const headers = output.headers
Expand Down
8 changes: 7 additions & 1 deletion src/http/routes/s3/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ import { default as ListParts } from './commands/list-parts'
import { default as UploadPartCopy } from './commands/upload-part-copy'
import { JTDDataType } from 'ajv/dist/jtd'

export type Context = { storage: Storage; tenantId: string; owner?: string; req: FastifyRequest }
export type Context = {
storage: Storage
tenantId: string
owner?: string
req: FastifyRequest
signals: { body: AbortSignal; response: AbortSignal }
}
export type S3Router = Router<Context>

const s3Commands = [
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/tus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ function createTusStore() {
s3ClientConfig: {
requestHandler: new NodeHttpHandler({
...agent,
connectionTimeout: 5000,
}),
bucket: storageS3Bucket,
region: storageS3Region,
Expand Down
3 changes: 2 additions & 1 deletion src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ export abstract class StorageBackendAdapter {
bucketName: string,
key: string,
version: string | undefined,
headers?: BrowserCacheHeaders
headers?: BrowserCacheHeaders,
signal?: AbortSignal
): Promise<ObjectResponse> {
throw new Error('getObject not implemented')
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/backend/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ export * from './s3'
export * from './file'
export * from './adapter'

const { storageS3Region, storageS3Endpoint, storageS3ForcePathStyle } = getConfig()
const { storageS3Region, storageS3Endpoint, storageS3ForcePathStyle, storageS3ClientTimeout } =
getConfig()

type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
Expand All @@ -26,6 +27,7 @@ export function createStorageBackend<Type extends StorageBackendType>(
region: storageS3Region,
endpoint: storageS3Endpoint,
forcePathStyle: storageS3ForcePathStyle,
requestTimeout: storageS3ClientTimeout,
...(config ? config : {}),
}
storageBackend = new S3Backend(defaultOptions)
Expand Down
66 changes: 44 additions & 22 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export function createAgent(protocol: 'http' | 'https') {
const agentOptions = {
maxSockets: storageS3MaxSockets,
keepAlive: true,
keepAliveMsecs: 1000,
}

return protocol === 'http'
Expand All @@ -56,6 +57,7 @@ export interface S3ClientOptions {
secretKey?: string
role?: string
httpAgent?: { httpAgent: Agent } | { httpsAgent: HttpsAgent }
requestTimeout?: number
}

/**
Expand All @@ -64,25 +66,17 @@ export interface S3ClientOptions {
*/
export class S3Backend implements StorageBackendAdapter {
client: S3Client
uploadClient: S3Client

constructor(options: S3ClientOptions) {
const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https'
const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol)
this.client = this.createS3Client({
...options,
requestTimeout: options.requestTimeout,
})

const params: S3ClientConfig = {
region: options.region,
runtime: 'node',
requestHandler: new NodeHttpHandler({
...agent,
}),
}
if (options.endpoint) {
params.endpoint = options.endpoint
}
if (options.forcePathStyle) {
params.forcePathStyle = true
}
this.client = new S3Client(params)
this.uploadClient = this.createS3Client({
...options,
})
}

/**
Expand All @@ -91,12 +85,14 @@ export class S3Backend implements StorageBackendAdapter {
* @param key
* @param version
* @param headers
* @param signal
*/
async getObject(
bucketName: string,
key: string,
version: string | undefined,
headers?: BrowserCacheHeaders
headers?: BrowserCacheHeaders,
signal?: AbortSignal
): Promise<ObjectResponse> {
const input: GetObjectCommandInput = {
Bucket: bucketName,
Expand All @@ -108,7 +104,10 @@ export class S3Backend implements StorageBackendAdapter {
input.IfModifiedSince = new Date(headers.ifModifiedSince)
}
const command = new GetObjectCommand(input)
const data = await this.client.send(command)
const data = await this.client.send(command, {
abortSignal: signal,
requestTimeout: 0,
})

return {
metadata: {
Expand Down Expand Up @@ -145,7 +144,7 @@ export class S3Backend implements StorageBackendAdapter {
): Promise<ObjectMetadata> {
try {
const paralellUploadS3 = new Upload({
client: this.client,
client: this.uploadClient,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Expand Down Expand Up @@ -221,7 +220,7 @@ export class S3Backend implements StorageBackendAdapter {
CopySourceIfModifiedSince: conditions?.ifModifiedSince,
CopySourceIfUnmodifiedSince: conditions?.ifUnmodifiedSince,
})
const data = await this.client.send(command)
const data = await this.uploadClient.send(command)
return {
httpStatusCode: data.$metadata.httpStatusCode || 200,
eTag: data.CopyObjectResult?.ETag || '',
Expand Down Expand Up @@ -346,7 +345,7 @@ export class S3Backend implements StorageBackendAdapter {
ContentLength: length,
})

const resp = await this.client.send(paralellUploadS3)
const resp = await this.uploadClient.send(paralellUploadS3)

return {
version,
Expand Down Expand Up @@ -428,11 +427,34 @@ export class S3Backend implements StorageBackendAdapter {
CopySourceRange: bytesRange ? `bytes=${bytesRange.fromByte}-${bytesRange.toByte}` : undefined,
})

const part = await this.client.send(uploadPartCopy)
const part = await this.uploadClient.send(uploadPartCopy)

return {
eTag: part.CopyPartResult?.ETag,
lastModified: part.CopyPartResult?.LastModified,
}
}

protected createS3Client(options: S3ClientOptions) {
const storageS3Protocol = options.endpoint?.includes('http://') ? 'http' : 'https'

const agent = options.httpAgent ? options.httpAgent : createAgent(storageS3Protocol)

const params: S3ClientConfig = {
region: options.region,
runtime: 'node',
requestHandler: new NodeHttpHandler({
...agent,
connectionTimeout: 5000,
requestTimeout: options.requestTimeout,
}),
}
if (options.endpoint) {
params.endpoint = options.endpoint
}
if (options.forcePathStyle) {
params.forcePathStyle = true
}
return new S3Client(params)
}
}
6 changes: 4 additions & 2 deletions src/storage/protocols/s3/s3-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,9 @@ export class S3ProtocolHandler {
* Reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
*
* @param command
* @param options
*/
async getObject(command: GetObjectCommandInput) {
async getObject(command: GetObjectCommandInput, options?: { signal?: AbortSignal }) {
const bucket = command.Bucket as string
const key = command.Key as string

Expand All @@ -853,7 +854,8 @@ export class S3ProtocolHandler {
ifModifiedSince: command.IfModifiedSince?.toISOString(),
ifNoneMatch: command.IfNoneMatch,
range: command.Range,
}
},
options?.signal
)

let metadataHeaders: Record<string, any> = {}
Expand Down
16 changes: 11 additions & 5 deletions src/storage/renderer/asset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ export class AssetRenderer extends Renderer {
}

getAsset(request: FastifyRequest, options: RenderOptions) {
return this.backend.getObject(options.bucket, options.key, options.version, {
ifModifiedSince: request.headers['if-modified-since'],
ifNoneMatch: request.headers['if-none-match'],
range: request.headers.range,
})
return this.backend.getObject(
options.bucket,
options.key,
options.version,
{
ifModifiedSince: request.headers['if-modified-since'],
ifNoneMatch: request.headers['if-none-match'],
range: request.headers.range,
},
options.signal
)
}
}
Loading

0 comments on commit 032f5c5

Please sign in to comment.