diff --git a/src/common/event-size-monitor.ts b/src/common/event-size-monitor.ts new file mode 100644 index 0000000..a60a301 --- /dev/null +++ b/src/common/event-size-monitor.ts @@ -0,0 +1,71 @@ +import { ErrorRecord } from '../types/common'; +import { EventData } from '../types/extraction'; + +const MAX_EVENT_SIZE_BYTES = 200_000; +const EVENT_SIZE_THRESHOLD_BYTES = Math.floor(MAX_EVENT_SIZE_BYTES * 0.8); // 160_000 bytes + +/** + * Get the JSON serialized size of event data in bytes + */ +export function getEventDataSize(data: EventData | undefined): number { + if (!data) return 0; + return JSON.stringify(data).length; +} + +/** + * Check if event data exceeds the 80% threshold (160KB) + */ +export function shouldTriggerSizeLimit(data: EventData | undefined): boolean { + return getEventDataSize(data) > EVENT_SIZE_THRESHOLD_BYTES; +} + +/** + * Truncate error message to max length (default 1000 chars) + */ +export function truncateErrorMessage( + error: ErrorRecord | undefined, + maxLength: number = 1000 +): ErrorRecord | undefined { + if (!error) return undefined; + + return { + message: error.message.substring(0, maxLength), + }; +} + +/** + * Prune event data by truncating error messages + * Always applied before serialization + */ +export function pruneEventData( + data: EventData | undefined +): EventData | undefined { + if (!data) return data; + + return { + ...data, + error: truncateErrorMessage(data.error), + }; +} + +/** + * Log detailed warning when size limit is detected + */ +export function logSizeLimitWarning( + size: number, + triggerType: 'onUpload' | 'onEmit' +): void { + const percentage = (size / MAX_EVENT_SIZE_BYTES) * 100; + const detailsString = + triggerType === 'onUpload' + ? 'during data collection. Emitting progress event and stopping further processing.' + : 'during emit. Error messages truncated.'; + + console.warn( + `[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed( + 1 + )}% of ${MAX_EVENT_SIZE_BYTES} limit) detected ${detailsString}` + ); +} + +export { MAX_EVENT_SIZE_BYTES as MAX_EVENT_SIZE, EVENT_SIZE_THRESHOLD_BYTES as SIZE_LIMIT_THRESHOLD }; diff --git a/src/workers/process-task.ts b/src/workers/process-task.ts index b7300b0..745d86e 100644 --- a/src/workers/process-task.ts +++ b/src/workers/process-task.ts @@ -58,6 +58,15 @@ export function processTask({ })() ); await task({ adapter }); + + // If size limit was triggered during task, call onTimeout for cleanup + if (adapter.isTimeout) { + console.log( + '[SIZE_LIMIT] Size limit detected during data collection. Executing onTimeout function for cleanup.' + ); + await onTimeout({ adapter }); + } + process.exit(0); } } catch (error) { diff --git a/src/workers/worker-adapter.ts b/src/workers/worker-adapter.ts index 028eb91..a64706a 100644 --- a/src/workers/worker-adapter.ts +++ b/src/workers/worker-adapter.ts @@ -7,6 +7,11 @@ import { STATELESS_EVENT_TYPES, } from '../common/constants'; import { emit } from '../common/control-protocol'; +import { + logSizeLimitWarning, + pruneEventData, + SIZE_LIMIT_THRESHOLD, +} from '../common/event-size-monitor'; import { addReportToLoaderReport, getFilesToLoad } from '../common/helpers'; import { serializeError } from '../logger/logger'; import { Mappers } from '../mappers/mappers'; @@ -91,6 +96,9 @@ export class WorkerAdapter { private _mappers: Mappers; private uploader: Uploader; + // Length of the resulting artifact JSON object string. + private currentLength: number = 0; + constructor({ event, adapterState, @@ -149,12 +157,30 @@ export class WorkerAdapter { itemType: repo.itemType, ...(shouldNormalize && { normalize: repo.normalize }), onUpload: (artifact: Artifact) => { + const newLength = JSON.stringify(artifact).length; + // We need to store artifacts ids in state for later use when streaming attachments if (repo.itemType === AIRDROP_DEFAULT_ITEM_TYPES.ATTACHMENTS) { this.state.toDevRev?.attachmentsMetadata.artifactIds.push( artifact.id ); } + + this.currentLength += newLength; + + // Check for size limit (80% of 200KB = 160KB threshold) + if ( + this.currentLength > SIZE_LIMIT_THRESHOLD && + !this.hasWorkerEmitted + ) { + logSizeLimitWarning(this.currentLength, 'onUpload'); + + // Set timeout flag to trigger onTimeout cleanup after task completes + this.handleTimeout(); + + // Emit progress event to save state and continue on next iteration + void this.emit(ExtractorEventType.ExtractionDataProgress); + } }, options: this.options, }); @@ -246,11 +272,14 @@ export class WorkerAdapter { } try { + // Always prune error messages to 1000 chars before emit + const prunedData = pruneEventData(data); + await emit({ eventType: newEventType, event: this.event, data: { - ...data, + ...prunedData, ...(ALLOWED_EXTRACTION_EVENT_TYPES.includes( this.event.payload.event_type )