From 6c7bb1a5c0c71542227bc2457fa65f34abe9d01d Mon Sep 17 00:00:00 2001 From: Samo Dekleva Date: Sat, 25 Jan 2025 14:05:53 +0100 Subject: [PATCH] Add exponential retry and handle rate-limiting towards DevRev. Gracefully handle failure to upload extracted attachments. --- README.md | 5 ++++ package.json | 2 +- src/common/constants.ts | 2 +- src/deprecated/uploader/index.ts | 16 +++++++----- src/http/axios-devrev-client.ts | 34 ++++++++++++++++++++++++ src/repo/repo.ts | 2 +- src/uploader/uploader.ts | 45 ++++++++++++++++++++------------ src/workers/worker-adapter.ts | 30 +++++++++++++-------- 8 files changed, 100 insertions(+), 36 deletions(-) create mode 100644 src/http/axios-devrev-client.ts diff --git a/README.md b/README.md index 1a52e01..7801488 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,11 @@ ## Release Notes +### v1.1.6 + +- Add exponential retry and handle rate-limiting towards DevRev. +- Gracefully handle failure to upload extracted attachments. + ### v1.1.5 - Increase `delayFactor` and number of retries for the exponential backoff retry mechanism for HTTP requests. diff --git a/package.json b/package.json index ea43573..8aa6201 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@devrev/ts-adaas", - "version": "1.1.5", + "version": "1.1.6", "description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/common/constants.ts b/src/common/constants.ts index 77bb723..da1a560 100644 --- a/src/common/constants.ts +++ b/src/common/constants.ts @@ -33,7 +33,7 @@ export const ALLOWED_EVENT_TYPES = [ ]; export const ARTIFACT_BATCH_SIZE = 2000; -export const MAX_DEVREV_ARTIFACT_SIZE = 536870912; // 512MB +export const MAX_DEVREV_ARTIFACT_SIZE = 262144000; // 250MB export const AIRDROP_DEFAULT_ITEM_TYPES = { EXTERNAL_DOMAIN_METADATA: 'external_domain_metadata', diff --git a/src/deprecated/uploader/index.ts b/src/deprecated/uploader/index.ts index 838f085..99178ed 100644 --- a/src/deprecated/uploader/index.ts +++ b/src/deprecated/uploader/index.ts @@ -1,4 +1,4 @@ -import axios from 'axios'; +import { axiosDevRevClient } from '../../http/axios-devrev-client'; import { betaSDK, client } from '@devrev/typescript-sdk'; import fs, { promises as fsPromises } from 'fs'; import { createFormData } from '../common/helpers'; @@ -108,11 +108,15 @@ export class Uploader { ): Promise { const formData = createFormData(preparedArtifact, fetchedObjects); try { - const response = await axios.post(preparedArtifact.url, formData, { - headers: { - 'Content-Type': 'multipart/form', - }, - }); + const response = await axiosDevRevClient.post( + preparedArtifact.url, + formData, + { + headers: { + 'Content-Type': 'multipart/form', + }, + } + ); return response; } catch (error) { diff --git a/src/http/axios-devrev-client.ts b/src/http/axios-devrev-client.ts new file mode 100644 index 0000000..79658ca --- /dev/null +++ b/src/http/axios-devrev-client.ts @@ -0,0 +1,34 @@ +import axios, { AxiosError } from 'axios'; +import axiosRetry from 'axios-retry'; + +const axiosDevRevClient = axios.create(); + +axiosRetry(axiosDevRevClient, { + retries: 5, + retryDelay: (retryCount, error) => { + console.warn( + 'Retry attempt: ' + retryCount + 'to url: ' + error.config?.url + '.' + ); + if (error.response) { + const retry_after = error.response?.headers['retry-after']; + if (retry_after) { + return retry_after; + } + } + // Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms + return axiosRetry.exponentialDelay(retryCount, error, 1000); + }, + retryCondition: (error: AxiosError) => { + return ( + axiosRetry.isNetworkOrIdempotentRequestError(error) || + error.response?.status === 429 + ); + }, + onMaxRetryTimesExceeded(error: AxiosError, retryCount) { + console.log(`Max retries attempted: ${retryCount}`); + delete error.config?.headers.Authorization; + delete error.request._header; + }, +}); + +export { axios, axiosDevRevClient }; diff --git a/src/repo/repo.ts b/src/repo/repo.ts index bad3332..a689141 100644 --- a/src/repo/repo.ts +++ b/src/repo/repo.ts @@ -93,7 +93,7 @@ export class Repo { // Add the new records to the items array this.items.push(...recordsToPush); - console.log( + console.info( `Extracted ${recordsToPush.length} new items of type ${this.itemType}. Total number of items in repo: ${this.items.length}.` ); diff --git a/src/uploader/uploader.ts b/src/uploader/uploader.ts index e5148fe..f9a3cff 100644 --- a/src/uploader/uploader.ts +++ b/src/uploader/uploader.ts @@ -1,6 +1,5 @@ -import { AxiosResponse } from 'axios'; import fs, { promises as fsPromises } from 'fs'; -import { axios, axiosClient } from '../http/axios-client'; +import { axios, axiosDevRevClient } from '../http/axios-devrev-client'; import zlib from 'zlib'; import { jsonl } from 'js-jsonl'; import FormData from 'form-data'; @@ -16,6 +15,7 @@ import { UploaderFactoryInterface, } from './uploader.interfaces'; import { serializeAxiosError } from '../logger/logger'; +import { AxiosResponse } from 'axios'; export class Uploader { private event: AirdropEvent; @@ -123,11 +123,15 @@ export class Uploader { formData.append('file', file); try { - const response = await axiosClient.post(preparedArtifact.url, formData, { - headers: { - ...formData.getHeaders(), - }, - }); + const response = await axiosDevRevClient.post( + preparedArtifact.url, + formData, + { + headers: { + ...formData.getHeaders(), + }, + } + ); return response; } catch (error) { @@ -154,15 +158,24 @@ export class Uploader { formData.append('file', fileStreamResponse.data); + if ( + fileStreamResponse.headers['content-length'] > MAX_DEVREV_ARTIFACT_SIZE + ) { + return; + } try { - const response = await axiosClient.post(preparedArtifact.url, formData, { - headers: { - ...formData.getHeaders(), - ...(!fileStreamResponse.headers['content-length'] && { - 'Content-Length': MAX_DEVREV_ARTIFACT_SIZE, - }), - }, - }); + const response = await axiosDevRevClient.post( + preparedArtifact.url, + formData, + { + headers: { + ...formData.getHeaders(), + ...(!fileStreamResponse.headers['content-length'] && { + 'Content-Length': MAX_DEVREV_ARTIFACT_SIZE, + }), + }, + } + ); return response; } catch (error) { if (axios.isAxiosError(error)) { @@ -239,7 +252,7 @@ export class Uploader { private async downloadArtifact(artifactUrl: string): Promise { try { - const response = await axiosClient.get(artifactUrl, { + const response = await axiosDevRevClient.get(artifactUrl, { responseType: 'arraybuffer', }); diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 07ef877..d78e262 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -1,4 +1,4 @@ -import { axios } from '../http/axios-client'; +import { axios } from '../http/axios-devrev-client'; import { AirdropEvent, ExtractorEventType, @@ -650,10 +650,7 @@ export class WorkerAdapter { if (error) { return { error }; } - console.log( - 'this.state.toDevRev?.attachmentsMetadata :>> ', - this.state.toDevRev?.attachmentsMetadata - ); + if (attachments) { const attachmentsToProcess = attachments.slice( this.state.toDevRev?.attachmentsMetadata?.lastProcessed, @@ -683,9 +680,15 @@ export class WorkerAdapter { fileType ); if (!preparedArtifact) { - return { - error: { message: 'Error while preparing artifact.' }, - }; + console.warn( + 'Error while preparing artifact for attachment ID ' + + attachment.id + + '. Skipping attachment' + ); + if (this.state.toDevRev) { + this.state.toDevRev.attachmentsMetadata.lastProcessed++; + } + continue; } const uploadedArtifact = await this.uploader.streamToArtifact( @@ -694,9 +697,14 @@ export class WorkerAdapter { ); if (!uploadedArtifact) { - return { - error: { message: 'Error while streaming artifact.' }, - }; + console.warn( + 'Error while preparing artifact for attachment ID ' + + attachment.id + ); + if (this.state.toDevRev) { + this.state.toDevRev.attachmentsMetadata.lastProcessed++; + } + continue; } const ssorAttachment: SsorAttachment = {