diff --git a/src/uploader/uploader.ts b/src/uploader/uploader.ts index 742240f..696b3b1 100644 --- a/src/uploader/uploader.ts +++ b/src/uploader/uploader.ts @@ -167,6 +167,7 @@ export class Uploader { console.warn( `File size exceeds the maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes.` ); + this.destroyStream(fileStream); return; } @@ -186,6 +187,7 @@ export class Uploader { return response; } catch (error) { console.error('Error while streaming artifact.', serializeError(error)); + this.destroyStream(fileStream); return; } } @@ -216,6 +218,25 @@ export class Uploader { } } + /** + * Destroys a stream to prevent resource leaks. + * @param {any} fileStream - The axios response stream to destroy + */ + private destroyStream(fileStream: any): void { + try { + if (fileStream && fileStream.data) { + // For axios response streams, the data property contains the actual stream + if (typeof fileStream.data.destroy === 'function') { + fileStream.data.destroy(); + } else if (typeof fileStream.data.close === 'function') { + fileStream.data.close(); + } + } + } catch (error) { + console.warn('Error while destroying stream:', serializeError(error)); + } + } + async getAttachmentsFromArtifactId({ artifact, }: { diff --git a/src/workers/default-workers/attachments-extraction.ts b/src/workers/default-workers/attachments-extraction.ts index 96246d0..53cc89f 100644 --- a/src/workers/default-workers/attachments-extraction.ts +++ b/src/workers/default-workers/attachments-extraction.ts @@ -9,14 +9,36 @@ import { serializeAxiosError, } from '../../index'; import { axiosClient } from '../../http/axios-client-internal'; +import { MAX_DEVREV_ARTIFACT_SIZE } from '../../common/constants'; const getAttachmentStream = async ({ item, }: ExternalSystemAttachmentStreamingParams): Promise => { const { id, url } = item; + let fileStreamResponse: any = null; try { - const fileStreamResponse = await axiosClient.get(url, { + // First, check file size with HEAD request to avoid downloading large files + const headResponse = await axiosClient.head(url, { + headers: { + 'Accept-Encoding': 'identity', + }, + }); + + const contentLength = headResponse.headers['content-length']; + if (contentLength && parseInt(contentLength) > MAX_DEVREV_ARTIFACT_SIZE) { + console.warn( + `Attachment ${id} size (${contentLength} bytes) exceeds maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes. Skipping download.` + ); + return { + error: { + message: `File size exceeds maximum limit of ${MAX_DEVREV_ARTIFACT_SIZE} bytes.`, + }, + }; + } + + // If size is acceptable, proceed with streaming + fileStreamResponse = await axiosClient.get(url, { responseType: 'stream', headers: { 'Accept-Encoding': 'identity', @@ -25,6 +47,11 @@ const getAttachmentStream = async ({ return { httpStream: fileStreamResponse }; } catch (error) { + // If we created a stream but failed afterwards, destroy it + if (fileStreamResponse) { + destroyHttpStream(fileStreamResponse); + } + if (axios.isAxiosError(error)) { console.warn( `Error while fetching attachment ${id} from URL.`, @@ -44,6 +71,24 @@ const getAttachmentStream = async ({ } }; +/** + * Destroys a stream to prevent memory leaks. + * @param {any} httpStream - The axios response stream to destroy + */ +const destroyHttpStream = (httpStream: any): void => { + try { + if (httpStream && httpStream.data) { + if (typeof httpStream.data.destroy === 'function') { + httpStream.data.destroy(); + } else if (typeof httpStream.data.close === 'function') { + httpStream.data.close(); + } + } + } catch (error) { + console.warn('Error while destroying HTTP stream:', error); + } +}; + processTask({ task: async ({ adapter }) => { try { diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 95cd0e6..4f8eb05 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -709,10 +709,12 @@ export class WorkerAdapter { console.warn( `Error while preparing artifact for attachment ID ${attachment.id}. Skipping attachment.` ); + this.destroyHttpStream(httpStream); return; } if (this.isTimeout) { + this.destroyHttpStream(httpStream); return; } @@ -768,6 +770,24 @@ export class WorkerAdapter { return; } + /** + * Destroys a stream to prevent memory leaks. + * @param {any} httpStream - The axios response stream to destroy + */ + private destroyHttpStream(httpStream: any): void { + try { + if (httpStream && httpStream.data) { + if (typeof httpStream.data.destroy === 'function') { + httpStream.data.destroy(); + } else if (typeof httpStream.data.close === 'function') { + httpStream.data.close(); + } + } + } catch (error) { + console.warn('Error while destroying HTTP stream:', error); + } + } + async loadAttachment({ item, create,