Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/uploader/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export class Uploader {
console.warn(
`File size exceeds the maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes.`
);
this.destroyStream(fileStream);
return;
}

Expand All @@ -186,6 +187,7 @@ export class Uploader {
return response;
} catch (error) {
console.error('Error while streaming artifact.', serializeError(error));
this.destroyStream(fileStream);
return;
}
}
Expand Down Expand Up @@ -216,6 +218,25 @@ export class Uploader {
}
}

/**
* Destroys a stream to prevent resource leaks.
* @param {any} fileStream - The axios response stream to destroy
*/
private destroyStream(fileStream: any): void {
try {
if (fileStream && fileStream.data) {
// For axios response streams, the data property contains the actual stream
if (typeof fileStream.data.destroy === 'function') {
fileStream.data.destroy();
} else if (typeof fileStream.data.close === 'function') {
fileStream.data.close();
}
}
} catch (error) {
console.warn('Error while destroying stream:', serializeError(error));
}
}

async getAttachmentsFromArtifactId({
artifact,
}: {
Expand Down
47 changes: 46 additions & 1 deletion src/workers/default-workers/attachments-extraction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,36 @@ import {
serializeAxiosError,
} from '../../index';
import { axiosClient } from '../../http/axios-client-internal';
import { MAX_DEVREV_ARTIFACT_SIZE } from '../../common/constants';

const getAttachmentStream = async ({
item,
}: ExternalSystemAttachmentStreamingParams): Promise<ExternalSystemAttachmentStreamingResponse> => {
const { id, url } = item;
let fileStreamResponse: any = null;

try {
const fileStreamResponse = await axiosClient.get(url, {
// First, check file size with HEAD request to avoid downloading large files
const headResponse = await axiosClient.head(url, {
headers: {
'Accept-Encoding': 'identity',
},
});

const contentLength = headResponse.headers['content-length'];
if (contentLength && parseInt(contentLength) > MAX_DEVREV_ARTIFACT_SIZE) {
console.warn(
`Attachment ${id} size (${contentLength} bytes) exceeds maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes. Skipping download.`
);
return {
error: {
message: `File size exceeds maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes.`,
},
};
}

// If size is acceptable, proceed with streaming
fileStreamResponse = await axiosClient.get(url, {
responseType: 'stream',
headers: {
'Accept-Encoding': 'identity',
Expand All @@ -25,6 +47,11 @@ const getAttachmentStream = async ({

return { httpStream: fileStreamResponse };
} catch (error) {
// If we created a stream but failed afterwards, destroy it
if (fileStreamResponse) {
destroyHttpStream(fileStreamResponse);
}

if (axios.isAxiosError(error)) {
console.warn(
`Error while fetching attachment ${id} from URL.`,
Expand All @@ -44,6 +71,24 @@ const getAttachmentStream = async ({
}
};

/**
* Destroys a stream to prevent memory leaks.
* @param {any} httpStream - The axios response stream to destroy
*/
const destroyHttpStream = (httpStream: any): void => {
try {
if (httpStream && httpStream.data) {
if (typeof httpStream.data.destroy === 'function') {
httpStream.data.destroy();
} else if (typeof httpStream.data.close === 'function') {
httpStream.data.close();
}
}
} catch (error) {
console.warn('Error while destroying HTTP stream:', error);
}
};

processTask({
task: async ({ adapter }) => {
try {
Expand Down
20 changes: 20 additions & 0 deletions src/workers/worker-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,12 @@ export class WorkerAdapter<ConnectorState> {
console.warn(
`Error while preparing artifact for attachment ID ${attachment.id}. Skipping attachment.`
);
this.destroyHttpStream(httpStream);
return;
}

if (this.isTimeout) {
this.destroyHttpStream(httpStream);
return;
}

Expand Down Expand Up @@ -768,6 +770,24 @@ export class WorkerAdapter<ConnectorState> {
return;
}

/**
* Destroys a stream to prevent memory leaks.
* @param {any} httpStream - The axios response stream to destroy
*/
private destroyHttpStream(httpStream: any): void {
try {
if (httpStream && httpStream.data) {
if (typeof httpStream.data.destroy === 'function') {
httpStream.data.destroy();
} else if (typeof httpStream.data.close === 'function') {
httpStream.data.close();
}
}
} catch (error) {
console.warn('Error while destroying HTTP stream:', error);
}
}

async loadAttachment({
item,
create,
Expand Down