diff --git a/package-lock.json b/package-lock.json index 43ad664d..30badda8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,6 +22,7 @@ "@tus/server": "https://gitpkg.now.sh/supabase/tus-node-server/packages/server/dist?build", "agentkeepalive": "^4.2.1", "axios": "^0.27.2", + "axios-rate-limit": "^1.3.0", "axios-retry": "^3.3.1", "connection-string": "^4.3.6", "conventional-changelog-conventionalcommits": "^5.0.0", @@ -3654,6 +3655,14 @@ "form-data": "^4.0.0" } }, + "node_modules/axios-rate-limit": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/axios-rate-limit/-/axios-rate-limit-1.3.0.tgz", + "integrity": "sha512-cKR5wTbU/CeeyF1xVl5hl6FlYsmzDVqxlN4rGtfO5x7J83UxKDckudsW0yW21/ZJRcO0Qrfm3fUFbhEbWTLayw==", + "peerDependencies": { + "axios": "*" + } + }, "node_modules/axios-retry": { "version": "3.3.1", "resolved": "https://registry.npmjs.org/axios-retry/-/axios-retry-3.3.1.tgz", @@ -13237,6 +13246,12 @@ "form-data": "^4.0.0" } }, + "axios-rate-limit": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/axios-rate-limit/-/axios-rate-limit-1.3.0.tgz", + "integrity": "sha512-cKR5wTbU/CeeyF1xVl5hl6FlYsmzDVqxlN4rGtfO5x7J83UxKDckudsW0yW21/ZJRcO0Qrfm3fUFbhEbWTLayw==", + "requires": {} + }, "axios-retry": { "version": "3.3.1", "resolved": "https://registry.npmjs.org/axios-retry/-/axios-retry-3.3.1.tgz", diff --git a/package.json b/package.json index 78bf7dac..922f0990 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "@tus/server": "https://gitpkg.now.sh/supabase/tus-node-server/packages/server/dist?build", "agentkeepalive": "^4.2.1", "axios": "^0.27.2", + "axios-rate-limit": "^1.3.0", "axios-retry": "^3.3.1", "connection-string": "^4.3.6", "conventional-changelog-conventionalcommits": "^5.0.0", diff --git a/src/config.ts b/src/config.ts index 34b5e37d..e9be2308 100644 --- a/src/config.ts +++ b/src/config.ts @@ -40,6 +40,7 @@ type StorageConfigType = { webhookURL?: string webhookApiKey?: string webhookQueuePullInterval?: number + webhookQueueBatchSize: number enableImageTransformation: boolean imgProxyURL?: string imgProxyRequestTimeout: number @@ -143,6 +144,7 @@ export function getConfig(): StorageConfigType { webhookQueuePullInterval: parseInt( getOptionalConfigFromEnv('WEBHOOK_QUEUE_PULL_INTERVAL') || '700' ), + webhookQueueBatchSize: parseInt(getOptionalConfigFromEnv('WEBHOOK_QUEUE_BATCH_SIZE') || '100'), enableImageTransformation: getOptionalConfigFromEnv('ENABLE_IMAGE_TRANSFORMATION') === 'true', imgProxyRequestTimeout: parseInt( getOptionalConfigFromEnv('IMGPROXY_REQUEST_TIMEOUT') || '15', diff --git a/src/queue/events/base-event.ts b/src/queue/events/base-event.ts index e5186399..0243ecd1 100644 --- a/src/queue/events/base-event.ts +++ b/src/queue/events/base-event.ts @@ -46,6 +46,20 @@ export abstract class BaseEvent> { return {} } + static ack(job: Job['payload']> | Job['payload']>[]) { + if (enableQueueEvents) { + const jobs = Array.isArray(job) ? job : [job] + return Promise.all(jobs.map((job) => Queue.getInstance().complete(job.id))) + } + } + + static fail(job: Job['payload']> | Job['payload']>[]) { + if (enableQueueEvents) { + const jobs = Array.isArray(job) ? job : [job] + return Promise.all(jobs.map((job) => Queue.getInstance().fail(job.id))) + } + } + static send>( this: StaticThis, payload: Omit @@ -75,7 +89,7 @@ export abstract class BaseEvent> { }) } - static handle(job: Job['payload']>) { + static handle(job: Job['payload']> | Job['payload']>[]) { throw new Error('not implemented') } @@ -101,6 +115,21 @@ export abstract class BaseEvent> { const constructor = this.constructor as typeof BaseEvent if (!enableQueueEvents) { + const options = constructor.getWorkerOptions() + + if (options.batchSize) { + return constructor.handle([ + { + id: '', + name: constructor.getQueueName(), + data: { + ...this.payload, + $version: constructor.version, + }, + }, + ]) + } + return constructor.handle({ id: '', name: constructor.getQueueName(), diff --git a/src/queue/events/webhook.ts b/src/queue/events/webhook.ts index 7ddd8e6c..490b7dd6 100644 --- a/src/queue/events/webhook.ts +++ b/src/queue/events/webhook.ts @@ -3,8 +3,11 @@ import { Job, WorkOptions } from 'pg-boss' import axios from 'axios' import { getConfig } from '../../config' import { logger } from '../../monitoring' +import rateLimit from 'axios-rate-limit' -const { webhookURL, webhookApiKey, webhookQueuePullInterval } = getConfig() +const { webhookURL, webhookApiKey, webhookQueuePullInterval, webhookQueueBatchSize } = getConfig() + +const httpClient = rateLimit(axios.create(), { maxRPS: 100 }) interface WebhookEvent { event: { @@ -26,6 +29,7 @@ export class Webhook extends BaseEvent { static getWorkerOptions(): WorkOptions { return { newJobCheckInterval: webhookQueuePullInterval, + batchSize: webhookQueueBatchSize, } } @@ -38,7 +42,7 @@ export class Webhook extends BaseEvent { logger.info({ job }, 'handling webhook') try { - await axios.post( + await httpClient.post( webhookURL, { type: 'Webhook', diff --git a/src/queue/queue.ts b/src/queue/queue.ts index 8f5e57ec..c98480dc 100644 --- a/src/queue/queue.ts +++ b/src/queue/queue.ts @@ -71,50 +71,68 @@ export abstract class Queue { const workers: Promise[] = [] Queue.events.forEach((event) => { + const options = event.getWorkerOptions() + workers.push( Queue.getInstance().work( event.getQueueName(), - event.getWorkerOptions(), - async (job: Job) => { - try { - const res = await event.handle(job) - - QueueJobCompleted.inc({ - tenant_id: job.data.tenant.ref, - name: event.getQueueName(), - }) + options, + async (job: Job | Job[]) => { + if (!Array.isArray(job)) { + job = [job] + } - return res - } catch (e) { - QueueJobRetryFailed.inc({ - tenant_id: job.data.tenant.ref, - name: event.getQueueName(), - }) + await Promise.all( + job.map(async (j) => { + try { + const res = await event.handle(j) - Queue.getInstance() - .getJobById(job.id) - .then((dbJob) => { - if (!dbJob) { - return + if (options.batchSize) { + await event.ack(j) } - if (dbJob.retrycount === dbJob.retrylimit) { - QueueJobError.inc({ - tenant_id: job.data.tenant.ref, - name: event.getQueueName(), + + QueueJobCompleted.inc({ + tenant_id: j.data.tenant.ref, + name: event.getQueueName(), + }) + + return res + } catch (e) { + QueueJobRetryFailed.inc({ + tenant_id: j.data.tenant.ref, + name: event.getQueueName(), + }) + + Queue.getInstance() + .getJobById(j.id) + .then((dbJob) => { + if (!dbJob) { + return + } + if (dbJob.retrycount === dbJob.retrylimit) { + QueueJobError.inc({ + tenant_id: j.data.tenant.ref, + name: event.getQueueName(), + }) + } }) - } - }) - logger.error( - { - job: JSON.stringify(job), - rawError: normalizeRawError(e), - }, - 'Error while processing job' - ) + logger.error( + { + job: JSON.stringify(j), + rawError: normalizeRawError(e), + }, + 'Error while processing job' + ) - throw e - } + if (!options.batchSize) { + throw e + } + + await event.fail(j) + } + }) + ) } ) ) diff --git a/src/storage/object.ts b/src/storage/object.ts index ffd6548c..021eda2f 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -387,12 +387,6 @@ export class ObjectStorage { await db.deleteObject(this.bucketId, sourceObjectName, sourceObj.version) await Promise.all([ - ObjectAdminDelete.send({ - name: sourceObjectName, - bucketId: this.bucketId, - tenant: this.db.tenant(), - version: sourceObj.version, - }), ObjectRemovedMove.sendWebhook({ tenant: this.db.tenant(), name: sourceObjectName, @@ -408,6 +402,12 @@ export class ObjectStorage { bucketId: this.bucketId, }, }), + ObjectAdminDelete.send({ + name: sourceObjectName, + bucketId: this.bucketId, + tenant: this.db.tenant(), + version: sourceObj.version, + }), ]) }) } catch (e) {