Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/common/event-size-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { ErrorRecord } from '../types/common';
import { EventData } from '../types/extraction';

const MAX_EVENT_SIZE_BYTES = 200_000;
const EVENT_SIZE_THRESHOLD_BYTES = Math.floor(MAX_EVENT_SIZE_BYTES * 0.8); // 160_000 bytes

/**
* Get the JSON serialized size of event data in bytes
*/
export function getEventDataSize(data: EventData | undefined): number {
if (!data) return 0;
return JSON.stringify(data).length;
}

/**
* Check if event data exceeds the 80% threshold (160KB)
*/
export function shouldTriggerSizeLimit(data: EventData | undefined): boolean {
return getEventDataSize(data) > EVENT_SIZE_THRESHOLD_BYTES;
}

/**
* Truncate error message to max length (default 1000 chars)
*/
export function truncateErrorMessage(
error: ErrorRecord | undefined,
maxLength: number = 1000
): ErrorRecord | undefined {
if (!error) return undefined;

return {
message: error.message.substring(0, maxLength),
};
}

/**
* Prune event data by truncating error messages
* Always applied before serialization
*/
export function pruneEventData(
data: EventData | undefined
): EventData | undefined {
if (!data) return data;

return {
...data,
error: truncateErrorMessage(data.error),
};
}

/**
* Log detailed warning when size limit is detected
*/
export function logSizeLimitWarning(
size: number,
triggerType: 'onUpload' | 'onEmit'
): void {
const percentage = (size / MAX_EVENT_SIZE_BYTES) * 100;
const detailsString =
triggerType === 'onUpload'
? 'during data collection. Emitting progress event and stopping further processing.'
: 'during emit. Error messages truncated.';

console.warn(
`[SIZE_LIMIT] Event data size ${size} bytes (${percentage.toFixed(
1
)}% of ${MAX_EVENT_SIZE_BYTES} limit) detected ${detailsString}`
);
}

export { MAX_EVENT_SIZE_BYTES as MAX_EVENT_SIZE, EVENT_SIZE_THRESHOLD_BYTES as SIZE_LIMIT_THRESHOLD };
9 changes: 9 additions & 0 deletions src/workers/process-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ export function processTask<ConnectorState>({
})()
);
await task({ adapter });

// If size limit was triggered during task, call onTimeout for cleanup
if (adapter.isTimeout) {
console.log(
'[SIZE_LIMIT] Size limit detected during data collection. Executing onTimeout function for cleanup.'
);
await onTimeout({ adapter });
}

process.exit(0);
}
} catch (error) {
Expand Down
31 changes: 30 additions & 1 deletion src/workers/worker-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import {
STATELESS_EVENT_TYPES,
} from '../common/constants';
import { emit } from '../common/control-protocol';
import {
logSizeLimitWarning,
pruneEventData,
SIZE_LIMIT_THRESHOLD,
} from '../common/event-size-monitor';
import { addReportToLoaderReport, getFilesToLoad } from '../common/helpers';
import { serializeError } from '../logger/logger';
import { Mappers } from '../mappers/mappers';
Expand Down Expand Up @@ -91,6 +96,9 @@ export class WorkerAdapter<ConnectorState> {
private _mappers: Mappers;
private uploader: Uploader;

// Length of the resulting artifact JSON object string.
private currentLength: number = 0;

constructor({
event,
adapterState,
Expand Down Expand Up @@ -149,12 +157,30 @@ export class WorkerAdapter<ConnectorState> {
itemType: repo.itemType,
...(shouldNormalize && { normalize: repo.normalize }),
onUpload: (artifact: Artifact) => {
const newLength = JSON.stringify(artifact).length;

// We need to store artifacts ids in state for later use when streaming attachments
if (repo.itemType === AIRDROP_DEFAULT_ITEM_TYPES.ATTACHMENTS) {
this.state.toDevRev?.attachmentsMetadata.artifactIds.push(
artifact.id
);
}

this.currentLength += newLength;

// Check for size limit (80% of 200KB = 160KB threshold)
if (
this.currentLength > SIZE_LIMIT_THRESHOLD &&
!this.hasWorkerEmitted
) {
logSizeLimitWarning(this.currentLength, 'onUpload');

// Set timeout flag to trigger onTimeout cleanup after task completes
this.handleTimeout();

// Emit progress event to save state and continue on next iteration
void this.emit(ExtractorEventType.ExtractionDataProgress);
}
},
options: this.options,
});
Expand Down Expand Up @@ -246,11 +272,14 @@ export class WorkerAdapter<ConnectorState> {
}

try {
// Always prune error messages to 1000 chars before emit
const prunedData = pruneEventData(data);

await emit({
eventType: newEventType,
event: this.event,
data: {
...data,
...prunedData,
...(ALLOWED_EXTRACTION_EVENT_TYPES.includes(
this.event.payload.event_type
)
Expand Down