Skip to content

Commit

Permalink
Add DevRev httpClient. Gracefully handle failed attachments. (#19)
Browse files Browse the repository at this point in the history
- Add exponential retry and handle rate-limiting towards DevRev. 
- Gracefully handle failure to upload extracted attachments.

https://app.devrev.ai/devrev/works/ISS-143265
  • Loading branch information
samod authored Jan 25, 2025
1 parent b254267 commit 2f5bc7f
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 39 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Release Notes

### v1.1.6

- Add exponential retry and handle rate-limiting towards DevRev.
- Gracefully handle failure to upload extracted attachments.

### v1.1.5

- Increase `delayFactor` and number of retries for the exponential backoff retry mechanism for HTTP requests.
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@devrev/ts-adaas",
"version": "1.1.5",
"description": "Typescript library containing the ADaaS(AirDrop as a Service) control protocol.",
"version": "1.1.6",
"description": "DevRev ADaaS (AirDrop-as-a-Service) Typescript SDK.",
"type": "commonjs",
"main": "./dist/index.js",
"typings": "./dist/index.d.ts",
Expand Down
2 changes: 1 addition & 1 deletion src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const ALLOWED_EVENT_TYPES = [
];

export const ARTIFACT_BATCH_SIZE = 2000;
export const MAX_DEVREV_ARTIFACT_SIZE = 536870912; // 512MB
export const MAX_DEVREV_ARTIFACT_SIZE = 262144000; // 250MB

export const AIRDROP_DEFAULT_ITEM_TYPES = {
EXTERNAL_DOMAIN_METADATA: 'external_domain_metadata',
Expand Down
16 changes: 10 additions & 6 deletions src/deprecated/uploader/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import axios from 'axios';
import { axiosDevRevClient } from '../../http/axios-devrev-client';
import { betaSDK, client } from '@devrev/typescript-sdk';
import fs, { promises as fsPromises } from 'fs';
import { createFormData } from '../common/helpers';
Expand Down Expand Up @@ -108,11 +108,15 @@ export class Uploader {
): Promise<any | null> {
const formData = createFormData(preparedArtifact, fetchedObjects);
try {
const response = await axios.post(preparedArtifact.url, formData, {
headers: {
'Content-Type': 'multipart/form',
},
});
const response = await axiosDevRevClient.post(
preparedArtifact.url,
formData,
{
headers: {
'Content-Type': 'multipart/form',
},
}
);

return response;
} catch (error) {
Expand Down
34 changes: 34 additions & 0 deletions src/http/axios-devrev-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import axios, { AxiosError } from 'axios';
import axiosRetry from 'axios-retry';

const axiosDevRevClient = axios.create();

axiosRetry(axiosDevRevClient, {
retries: 5,
retryDelay: (retryCount, error) => {
console.warn(
'Retry attempt: ' + retryCount + 'to url: ' + error.config?.url + '.'
);
if (error.response) {
const retry_after = error.response?.headers['retry-after'];
if (retry_after) {
return retry_after;
}
}
// Exponential backoff algorithm: 1 * 2 ^ retryCount * 1000ms
return axiosRetry.exponentialDelay(retryCount, error, 1000);
},
retryCondition: (error: AxiosError) => {
return (
axiosRetry.isNetworkOrIdempotentRequestError(error) ||
error.response?.status === 429
);
},
onMaxRetryTimesExceeded(error: AxiosError, retryCount) {
console.log(`Max retries attempted: ${retryCount}`);
delete error.config?.headers.Authorization;
delete error.request._header;
},
});

export { axios, axiosDevRevClient };
2 changes: 1 addition & 1 deletion src/repo/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export class Repo {
// Add the new records to the items array
this.items.push(...recordsToPush);

console.log(
console.info(
`Extracted ${recordsToPush.length} new items of type ${this.itemType}. Total number of items in repo: ${this.items.length}.`
);

Expand Down
45 changes: 29 additions & 16 deletions src/uploader/uploader.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { AxiosResponse } from 'axios';
import fs, { promises as fsPromises } from 'fs';
import { axios, axiosClient } from '../http/axios-client';
import { axios, axiosDevRevClient } from '../http/axios-devrev-client';
import zlib from 'zlib';
import { jsonl } from 'js-jsonl';
import FormData from 'form-data';
Expand All @@ -16,6 +15,7 @@ import {
UploaderFactoryInterface,
} from './uploader.interfaces';
import { serializeAxiosError } from '../logger/logger';
import { AxiosResponse } from 'axios';

export class Uploader {
private event: AirdropEvent;
Expand Down Expand Up @@ -123,11 +123,15 @@ export class Uploader {
formData.append('file', file);

try {
const response = await axiosClient.post(preparedArtifact.url, formData, {
headers: {
...formData.getHeaders(),
},
});
const response = await axiosDevRevClient.post(
preparedArtifact.url,
formData,
{
headers: {
...formData.getHeaders(),
},
}
);

return response;
} catch (error) {
Expand All @@ -154,15 +158,24 @@ export class Uploader {

formData.append('file', fileStreamResponse.data);

if (
fileStreamResponse.headers['content-length'] > MAX_DEVREV_ARTIFACT_SIZE
) {
return;
}
try {
const response = await axiosClient.post(preparedArtifact.url, formData, {
headers: {
...formData.getHeaders(),
...(!fileStreamResponse.headers['content-length'] && {
'Content-Length': MAX_DEVREV_ARTIFACT_SIZE,
}),
},
});
const response = await axiosDevRevClient.post(
preparedArtifact.url,
formData,
{
headers: {
...formData.getHeaders(),
...(!fileStreamResponse.headers['content-length'] && {
'Content-Length': MAX_DEVREV_ARTIFACT_SIZE,
}),
},
}
);
return response;
} catch (error) {
if (axios.isAxiosError(error)) {
Expand Down Expand Up @@ -239,7 +252,7 @@ export class Uploader {

private async downloadArtifact(artifactUrl: string): Promise<Buffer | void> {
try {
const response = await axiosClient.get(artifactUrl, {
const response = await axiosDevRevClient.get(artifactUrl, {
responseType: 'arraybuffer',
});

Expand Down
30 changes: 19 additions & 11 deletions src/workers/worker-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { axios } from '../http/axios-client';
import { axios } from '../http/axios-devrev-client';
import {
AirdropEvent,
ExtractorEventType,
Expand Down Expand Up @@ -650,10 +650,7 @@ export class WorkerAdapter<ConnectorState> {
if (error) {
return { error };
}
console.log(
'this.state.toDevRev?.attachmentsMetadata :>> ',
this.state.toDevRev?.attachmentsMetadata
);

if (attachments) {
const attachmentsToProcess = attachments.slice(
this.state.toDevRev?.attachmentsMetadata?.lastProcessed,
Expand Down Expand Up @@ -683,9 +680,15 @@ export class WorkerAdapter<ConnectorState> {
fileType
);
if (!preparedArtifact) {
return {
error: { message: 'Error while preparing artifact.' },
};
console.warn(
'Error while preparing artifact for attachment ID ' +
attachment.id +
'. Skipping attachment'
);
if (this.state.toDevRev) {
this.state.toDevRev.attachmentsMetadata.lastProcessed++;
}
continue;
}

const uploadedArtifact = await this.uploader.streamToArtifact(
Expand All @@ -694,9 +697,14 @@ export class WorkerAdapter<ConnectorState> {
);

if (!uploadedArtifact) {
return {
error: { message: 'Error while streaming artifact.' },
};
console.warn(
'Error while preparing artifact for attachment ID ' +
attachment.id
);
if (this.state.toDevRev) {
this.state.toDevRev.attachmentsMetadata.lastProcessed++;
}
continue;
}

const ssorAttachment: SsorAttachment = {
Expand Down

0 comments on commit 2f5bc7f

Please sign in to comment.