Skip to content

Commit 6c7d0fc

Browse files
committed
Implemented a crude SQS size overflow workaround.
1 parent 96de479 commit 6c7d0fc

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

src/workers/worker-adapter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ import { serializeError } from '../logger/logger';
4646
import { SyncMapperRecordStatus } from '../mappers/mappers.interface';
4747
import { AttachmentsStreamingPool } from '../attachments-streaming/attachments-streaming-pool';
4848

49+
const MAX_MESSAGE_LENGTH: number = 200_000;
50+
4951
export function createWorkerAdapter<ConnectorState>({
5052
event,
5153
adapterState,
@@ -89,6 +91,9 @@ export class WorkerAdapter<ConnectorState> {
8991
private _mappers: Mappers;
9092
private uploader: Uploader;
9193

94+
// Length of the resulting artifact JSON object string.
95+
private currentLength: number = 0;
96+
9297
constructor({
9398
event,
9499
adapterState,
@@ -147,12 +152,20 @@ export class WorkerAdapter<ConnectorState> {
147152
itemType: repo.itemType,
148153
...(shouldNormalize && { normalize: repo.normalize }),
149154
onUpload: (artifact: Artifact) => {
155+
let newLength = JSON.stringify(artifact).length;
156+
150157
// We need to store artifacts ids in state for later use when streaming attachments
151158
if (repo.itemType === AIRDROP_DEFAULT_ITEM_TYPES.ATTACHMENTS) {
152159
this.state.toDevRev?.attachmentsMetadata.artifactIds.push(
153160
artifact.id
154161
);
155162
}
163+
164+
if(this.currentLength + newLength > MAX_MESSAGE_LENGTH) {
165+
// TODO: We need to call the adapter's `onTimeout` here and `emit` the Progress event.
166+
// We might have to run the `uploadAllRepos` as well.
167+
this.handleTimeout();
168+
}
156169
},
157170
options: this.options,
158171
});

0 commit comments

Comments
 (0)