diff --git a/.vscode/settings.json b/.vscode/settings.json index 67b515c68994f..9f41be91d5d05 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,5 +11,6 @@ "source.fixAll.eslint": "explicit" }, "typescript.tsdk": "node_modules/typescript/lib", - "vitest.disableWorkspaceWarning": true + "vitest.disableWorkspaceWarning": true, + "java.configuration.updateBuildConfiguration": "interactive" } diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts index 3f5b6e83e794d..879fd2f6e9e06 100644 --- a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.e2e.spec.ts @@ -31,16 +31,13 @@ describe(S3TransferManager.name, () => { let region: string; beforeAll(async () => { - // const integTestResourcesEnv = await getIntegTestResources(); - // Object.assign(process.env, integTestResourcesEnv); + const integTestResourcesEnv = await getIntegTestResources(); + Object.assign(process.env, integTestResourcesEnv); - // region = process?.env?.AWS_SMOKE_TEST_REGION as string; - // Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string; + region = process?.env?.AWS_SMOKE_TEST_REGION as string; + Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string; void getIntegTestResources; - region = "us-west-2"; - Bucket = "lukachad-us-west-2"; - client = new S3({ region, }); @@ -54,7 +51,7 @@ describe(S3TransferManager.name, () => { }); }, 120_000); - describe.skip("multi part download", () => { + describe("multi part download", () => { const modes = ["PART", "RANGE"] as S3TransferManagerConfig["multipartDownloadType"][]; const sizes = [6, 11] as number[]; @@ -92,13 +89,11 @@ describe(S3TransferManager.name, () => { }, { eventListeners: { - transferInitiated: [({ request, snapshot }) => {}], bytesTransferred: [ ({ request, snapshot }) => { bytesTransferred = snapshot.transferredBytes; }, ], - transferComplete: [({ request, snapshot, response }) => {}], }, } ); @@ -151,7 +146,7 @@ describe(S3TransferManager.name, () => { const serialized = await download.Body?.transformToString(); check(serialized); if (partNumber) { - expect(serialized?.length).toEqual(DEFAULT_PART_SIZE); + expect(serialized?.length).toEqual(4 * 1024 * 1024); // Part 1 is 8MB Part 2 is 4MB } else { expect(serialized?.length).toEqual(Body.length); } @@ -163,10 +158,11 @@ describe(S3TransferManager.name, () => { it("multipart object: multipartDownloadType = RANGE, range = 0-12MB, partNumber = null", async () => { await sepTests("multipart", "RANGE", `bytes=0-${12 * 1024 * 1024}`, undefined); }, 60_000); - it("single object: multipartDownloadType = PART, range = null, partNumber = 2", async () => { + // skipped because TM no longer supports partNumber + it.skip("single object: multipartDownloadType = PART, range = null, partNumber = 2", async () => { await sepTests("single", "PART", undefined, 2); }, 60_000); - it("single object: multipartDownloadType = RANGE, range = null, partNumber = 2", async () => { + it.skip("single object: multipartDownloadType = RANGE, range = null, partNumber = 2", async () => { await sepTests("single", "RANGE", undefined, 2); }, 60_000); it("single object: multipartDownloadType = PART, range = null, partNumber = null", async () => { diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts index b8d56721d17ad..0fa0eb9322c73 100644 --- a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.spec.ts @@ -1,7 +1,11 @@ import { S3, S3Client } from "@aws-sdk/client-s3"; import { TransferCompleteEvent, TransferEvent } from "@aws-sdk/lib-storage/dist-types/s3-transfer-manager/types"; +import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; +import { Readable } from "stream"; import { beforeAll, beforeEach, describe, expect, test as it, vi } from "vitest"; +import { getIntegTestResources } from "../../../../tests/e2e/get-integ-test-resources"; +import { iterateStreams, joinStreams } from "./join-streams"; import { S3TransferManager } from "./S3TransferManager"; /** @@ -22,8 +26,15 @@ describe("S3TransferManager Unit Tests", () => { let region: string; beforeAll(async () => { - region = "us-west-1"; - Bucket = "lukachad-us-west-2"; + const integTestResourcesEnv = await getIntegTestResources(); + Object.assign(process.env, integTestResourcesEnv); + + region = process?.env?.AWS_SMOKE_TEST_REGION as string; + Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string; + void getIntegTestResources; + + // region = "us-west-1"; + // Bucket = "lukachad-us-west-2"; client = new S3({ region, @@ -175,7 +186,7 @@ describe("S3TransferManager Unit Tests", () => { }).toThrow("Unknown event type: invalidEvent"); }); - it("Should handle options.once correctly", () => { + it("Should handle options.once correctly, running the listener at most once.", () => { const mockCallback = vi.fn(); tm.addEventListener("transferInitiated", mockCallback, { once: true }); @@ -190,6 +201,50 @@ describe("S3TransferManager Unit Tests", () => { expect(mockCallback).toHaveBeenCalledTimes(1); }); + it("Should not add listener if included AbortSignal is aborted", () => { + const controller = new AbortController(); + const callback = vi.fn(); + controller.abort(); + tm.addEventListener("transferInitiated", callback, { signal: controller.signal }); + expect((tm as any).eventListeners.transferInitiated).toEqual([]); + }); + + it("Should remove listener after included AbortSignal was aborted", () => { + const controller = new AbortController(); + const callback = vi.fn(); + tm.addEventListener("transferInitiated", callback, { signal: controller.signal }); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + tm.dispatchEvent(event); + + expect(callback).toHaveBeenCalledTimes(1); + expect((tm as any).eventListeners.transferInitiated).toEqual([callback]); + + controller.abort(); + expect((tm as any).eventListeners.transferInitiated).toEqual([]); + }); + + it("Should clean up abort listeners and store cleanup functions in WeakMap", () => { + const controller = new AbortController(); + const callback = vi.fn(); + + tm.addEventListener("transferInitiated", callback, { signal: controller.signal }); + + expect((tm as any).eventListeners.transferInitiated).toEqual([callback]); + expect((tm as any).abortCleanupFunctions.has(controller.signal)).toBe(true); + + const cleanupFn = (tm as any).abortCleanupFunctions.get(controller.signal); + cleanupFn(); + (tm as any).abortCleanupFunctions.delete(controller.signal); + + expect((tm as any).abortCleanupFunctions.has(controller.signal)).toBe(false); + controller.abort(); + expect((tm as any).eventListeners.transferInitiated).toEqual([callback]); + }); + it("Should handle boolean options parameter", () => { tm.addEventListener("transferInitiated", initiated, true); expect((tm as any).eventListeners.transferInitiated).toContain(initiated); @@ -310,68 +365,464 @@ describe("S3TransferManager Unit Tests", () => { expect(result).toBe(true); }); - it("Should handle unknown event types", () => {}); + it("Should handle unknown event types", () => { + const event = Object.assign(new Event("unknownEvent"), { + request: {}, + snapshot: {}, + }); + + const results = tm.dispatchEvent(event); + expect(results).toBe(true); + }); + + it("Should handle a mix of object-style callbacks and functions", () => { + const callback = vi.fn(); + const objectCallback = { + handleEvent: vi.fn(), + }; + tm.addEventListener("transferInitiated", objectCallback as any); + tm.addEventListener("transferInitiated", callback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(objectCallback.handleEvent).toHaveBeenCalledTimes(1); + expect(objectCallback.handleEvent).toHaveBeenCalledWith(event); + expect(callback).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledWith(event); + }); + }); + + describe("removeEventListener()", () => { + it("Should remove only the specified listener, leaving other intact", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + tm.addEventListener("transferInitiated", callback1); + tm.addEventListener("transferInitiated", callback2); + + tm.removeEventListener("transferInitiated", callback1); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callback1).not.toHaveBeenCalled(); + expect(callback2).toHaveBeenCalledTimes(1); + }); + + it("Should remove object-style callback with handleEvent", () => { + const objectCallback = { handleEvent: vi.fn() }; + tm.addEventListener("transferInitiated", objectCallback as any); + tm.removeEventListener("transferInitiated", objectCallback as any); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + expect(objectCallback.handleEvent).not.toHaveBeenCalled(); + }); + + it("Should remove all instance of the same callback", () => { + const callback = vi.fn(); + tm.addEventListener("transferInitiated", callback); + tm.addEventListener("transferInitiated", callback); + + tm.removeEventListener("transferInitiated", callback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callback).not.toHaveBeenCalled(); + }); + + it("Should handle removing non-existing listener gracefully", () => { + const callback = vi.fn(); + expect(() => { + tm.removeEventListener("transferInitiated", callback); + }).not.toThrow(); + }); + + it("Should handle removing from an event type with no listeners gracefully", () => { + const callback = vi.fn(); + tm.removeEventListener("transferInitiated", callback); + + const event = Object.assign(new Event("transferInitiated"), { + request: {}, + snapshot: {}, + }); + + tm.dispatchEvent(event); + + expect(callback).not.toHaveBeenCalled(); + }); + + it("Should handle null callback parameter", () => { + expect(() => { + tm.removeEventListener("transferInitiated", null as any); + }).not.toThrow(); + }); + }); + }); + + describe("iterateListeners()", () => { + let tm: S3TransferManager; + + beforeEach(async () => { + tm = new S3TransferManager({ + s3ClientInstance: client, + }); + }); + + it("Should iterate over all listeners given a TransferManager's object of event listeners", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + const callback3 = vi.fn(); + + const eventListeners = { + transferInitiated: [callback1], + bytesTransferred: [callback2, callback3], + transferComplete: [], + transferFailed: [], + }; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; - it("Should handle a mix of object-style callbacks and functions", () => {}); + expect(results).toHaveLength(3); + expect(results[0][0]).toEqual({ eventType: "transferInitiated", callback: callback1 }); + expect(results[1][0]).toEqual({ eventType: "bytesTransferred", callback: callback2 }); + expect(results[2][0]).toEqual({ eventType: "bytesTransferred", callback: callback3 }); }); - describe.skip("removeEventListener()", () => { - it("Should remove a listener from an event", () => {}); + it("Should handle empty event listeners object", () => { + const eventListeners = { + transferInitiated: [], + bytesTransferred: [], + transferComplete: [], + transferFailed: [], + }; - it("Should remove a listener from an event", () => {}); + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; - it("Should remove a listener from an event", () => {}); + expect(results).toHaveLength(0); }); - describe.skip("iterateListeners()", () => {}); + it("Should iterate over a mix of functions and objects with handleEvent callback types.", () => { + const callback1 = vi.fn(); + const callback2 = vi.fn(); + const objectCallback = { + handleEvent: vi.fn(), + }; - describe.skip("joinStreams()", () => {}); + const eventListeners = { + transferInitiated: [callback1], + bytesTransferred: [], + transferComplete: [], + transferFailed: [callback2, objectCallback], + }; - describe.skip("iterateStreams()", () => {}); + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(3); + expect(results[0][0]).toEqual({ eventType: "transferInitiated", callback: callback1 }); + expect(results[1][0]).toEqual({ eventType: "transferFailed", callback: callback2 }); + expect(results[2][0]).toEqual({ eventType: "transferFailed", callback: objectCallback }); + }); + + it("Should handle event lisetners with duplicate callbacks in the same event type", () => { + const callback = vi.fn(); + + const eventListeners = { + transferInitiated: [callback, callback], + bytesTransferred: [], + transferComplete: [callback, callback], + transferFailed: [], + }; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(4); + for (let i = 0; i < results.length; i++) { + expect(results[i][0]).toEqual({ eventType: results[i][0].eventType, callback }); + } + }); + + it("Should return empty iterator when no callbacks are present", () => { + const eventListeners = {}; + + const results = Array.from((tm as any).iterateListeners(eventListeners)) as any[]; + + expect(results).toHaveLength(0); + }); }); - describe("validateExpectedRanges()", () => { + describe("validatePartDownload()", () => { let tm: any; beforeAll(async () => { tm = new S3TransferManager() as any; }, 120_000); - it("Should pass correct sequential ranges without throwing an error", () => { + it("Should pass correct ranges based on part number without throwing an error", () => { + const partSize = 5242880; const ranges = [ - "bytes 0-5242879/13631488", - "bytes 5242880-10485759/13631488", - "bytes 10485760-13631487/13631488", + { partNumber: 1, range: "bytes 0-5242879/13631488" }, + { partNumber: 2, range: "bytes 5242880-10485759/13631488" }, + { partNumber: 3, range: "bytes 10485760-13631487/13631488" }, ]; - for (let i = 1; i < ranges.length; i++) { + for (const { partNumber, range } of ranges) { expect(() => { - tm.validateExpectedRanges(ranges[i - 1], ranges[i], i + 1); + tm.validatePartDownload(partNumber, range, partSize); }).not.toThrow(); } }); - it("Should throw error for incomplete download", () => { - const ranges = [ - "bytes 0-5242879/13631488", - "bytes 5242880-10485759/13631488", - "bytes 10485760-13631480/13631488", // 8 bytes short - ]; + it("Should throw error for incorrect start position", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload(2, "bytes 5242881-10485759/13631488", partSize); + }).toThrow("Expected part 2 to start at 5242880 but got 5242881"); expect(() => { - tm.validateExpectedRanges(ranges[1], ranges[2], 3); - }).toThrow( - "Range validation failed: Final part did not cover total range of 13631488. Expected range of bytes 10485760-314572" - ); + tm.validatePartDownload(2, "bytes 5242879-10485759/13631488", partSize); + }).toThrow("Expected part 2 to start at 5242880 but got 5242879"); + + expect(() => { + tm.validatePartDownload(2, "bytes 0-5242879/13631488", partSize); + }).toThrow("Expected part 2 to start at 5242880 but got 0"); }); - it.each([ - ["bytes 5242881-10485759/13631488", "Expected part 2 to start at 5242880 but got 5242881"], // 1 byte off - ["bytes 5242879-10485759/13631488", "Expected part 2 to start at 5242880 but got 5242879"], // overlap - ["bytes 0-5242879/13631488", "Expected part 2 to start at 5242880 but got 0"], // duplicate - ])("Should throw error for non-sequential range: %s", (invalidRange, expectedError) => { + it("Should throw error for incorrect end position", () => { + const partSize = 5242880; + expect(() => { - tm.validateExpectedRanges("bytes 0-5242879/13631488", invalidRange, 2); - }).toThrow(expectedError); + tm.validatePartDownload(2, "bytes 5242880-10485760/13631488", partSize); + }).toThrow("Expected part 2 to end at 10485759 but got 10485760"); + + expect(() => { + tm.validatePartDownload(3, "bytes 10485760-13631480/13631488", partSize); + }).toThrow("Expected part 3 to end at 13631487 but got 13631480"); + }); + + it("Should handle last part correctly when not a full part size", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload(3, "bytes 10485760-13631487/13631488", partSize); + }).not.toThrow(); + }); + + it("Should throw error for invalid ContentRange format", () => { + const partSize = 5242880; + + expect(() => { + tm.validatePartDownload(2, "invalid-format", partSize); + }).toThrow("Invalid ContentRange format: invalid-format"); + }); + }); + + // TODO: tests cases for validateRangeDownload() +}); + +describe("join-streams tests", () => { + const streamTypes = [ + { + name: "Readable", + createStream: () => new Readable({ read() {} }), + supported: true, + streamType: Readable, + }, + { + name: "ReadableStream", + createStream: () => new ReadableStream(), + supported: false, + streamType: ReadableStream, + }, + ]; + + streamTypes.forEach(({ name, createStream, supported, streamType }) => { + describe.skipIf(!supported)(`${name} streams`, () => { + describe("joinStreams()", () => { + it(`Should return single ${name} when only one stream is provided`, async () => { + const stream = createStream(); + const result = await joinStreams([Promise.resolve(stream as unknown as StreamingBlobPayloadOutputTypes)]); + + expect(result).toBeDefined(); + expect(result).not.toBe(stream); + }); + + it(`Should handle empty ${name} streams array`, async () => { + const result = await joinStreams([] as unknown as Promise[]); + expect(result).toBeDefined(); + if (name === "Readable") { + expect(result).toBeInstanceOf(streamType); + } + }); + + it(`Should join multiple ${name} streams into a single stream`, async () => { + const content1 = Buffer.from("Chunk 1"); + const content2 = Buffer.from("Chunk 2"); + const content3 = Buffer.from("Chunk 3"); + + if (name === "Readable") { + const stream1 = new Readable({ + read() { + this.push(content1); + this.push(null); + }, + }); + const stream2 = new Readable({ + read() { + this.push(content2); + this.push(null); + }, + }); + const stream3 = new Readable({ + read() { + this.push(content3); + this.push(null); + }, + }); + + const joinedStream = await joinStreams([ + Promise.resolve(stream1), + Promise.resolve(stream2), + Promise.resolve(stream3), + ] as unknown as Promise[]); + + const chunks: Buffer[] = []; + for await (const chunk of joinedStream as any) { + chunks.push(Buffer.from(chunk)); + } + + const joinedContent = Buffer.concat(chunks).toString(); + expect(joinedContent).toContain(content1.toString()); + expect(joinedContent).toContain(content2.toString()); + expect(joinedContent).toContain(content3.toString()); + } + }); + + it(`Should handle ${name} streams with different chunk sizes`, async () => { + const content1 = Buffer.from("Chunk 1 Chunk 1 Chunk 1"); + const content2 = Buffer.from("Chunk 2"); + const content3 = Buffer.from("Chunk 3 Chunk 3"); + + if (name === "Readable") { + const stream1 = new Readable({ + read() { + this.push(content1); + this.push(null); + }, + }); + const stream2 = new Readable({ + read() { + this.push(content2); + this.push(null); + }, + }); + const stream3 = new Readable({ + read() { + this.push(content3); + this.push(null); + }, + }); + + const joinedStream = await joinStreams([ + Promise.resolve(stream1), + Promise.resolve(stream2), + Promise.resolve(stream3), + ] as unknown as Promise[]); + + const chunks: Buffer[] = []; + for await (const chunk of joinedStream as any) { + chunks.push(Buffer.from(chunk)); + } + + const joinedContent = Buffer.concat(chunks).toString(); + expect(joinedContent).toContain(content1.toString()); + expect(joinedContent).toContain(content2.toString()); + expect(joinedContent).toContain(content3.toString()); + } + }); + + it(`Should handle ${name} streams with no data`, async () => { + if (name === "Readable") { + const emptyStream1 = new Readable({ + read() { + this.push(null); + }, + }); + const emptyStream2 = new Readable({ + read() { + this.push(null); + }, + }); + + const joinedStream = await joinStreams([ + Promise.resolve(emptyStream1), + Promise.resolve(emptyStream2), + ] as unknown as Promise[]); + + const chunks: Buffer[] = []; + for await (const chunk of joinedStream as any) { + chunks.push(Buffer.from(chunk)); + } + expect(chunks.length).toBe(0); + expect(Buffer.concat(chunks).length).toBe(0); + } + }); + + it(`Should report progress via eventListeners`, async () => { + if (name === "Readable") { + const stream1 = new Readable({ + read() { + this.push(Buffer.from("data")); + this.push(null); + }, + }); + const stream2 = new Readable({ + read() { + this.push(Buffer.from("more")); + this.push(null); + }, + }); + + const onBytesSpy = vi.fn(); + const onCompletionSpy = vi.fn(); + + const joinedStream = await joinStreams( + [ + Promise.resolve(stream1), + Promise.resolve(stream2), + ] as unknown as Promise[], + { + onBytes: onBytesSpy, + onCompletion: onCompletionSpy, + } + ); + + for await (const _ of joinedStream as any) { + // consume the data + } + + expect(onBytesSpy).toHaveBeenCalled(); + expect(onCompletionSpy).toHaveBeenCalledWith(expect.any(Number), 1); + } + }); + }); }); }); }); diff --git a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts index f7f5282d41482..2b127938e48fd 100644 --- a/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts +++ b/lib/lib-storage/src/s3-transfer-manager/S3TransferManager.ts @@ -6,8 +6,7 @@ import type { } from "@aws-sdk/client-s3"; import { GetObjectCommand, HeadObjectCommand, S3Client } from "@aws-sdk/client-s3"; import { getChecksum } from "@aws-sdk/middleware-flexible-checksums/dist-types/getChecksum"; -import { ChecksumConstructor, StreamingBlobPayloadOutputTypes } from "@smithy/types"; -import { Checksum } from "@smithy/types"; +import { type StreamingBlobPayloadOutputTypes, Checksum, ChecksumConstructor } from "@smithy/types"; import type { AddEventListenerOptions, EventListener, RemoveEventListenerOptions } from "./event-listener-types"; import { joinStreams } from "./join-streams"; @@ -36,6 +35,7 @@ export class S3TransferManager implements IS3TransferManager { private readonly checksumAlgorithm: ChecksumAlgorithm; private readonly multipartDownloadType: "PART" | "RANGE"; private readonly eventListeners: TransferEventListeners; + private readonly abortCleanupFunctions = new WeakMap void>(); public constructor(config: S3TransferManagerConfig = {}) { this.checksumValidationEnabled = config.checksumValidationEnabled ?? true; @@ -84,16 +84,8 @@ export class S3TransferManager implements IS3TransferManager { callback: EventListener, options?: AddEventListenerOptions | boolean ): void; - public addEventListener( - type: string, - callback: EventListener | null, - options?: AddEventListenerOptions | boolean - ): void; - public addEventListener( - type: unknown, - callback: EventListener, - options?: AddEventListenerOptions | boolean - ): void { + public addEventListener(type: string, callback: EventListener, options?: AddEventListenerOptions | boolean): void; + public addEventListener(type: string, callback: EventListener, options?: AddEventListenerOptions | boolean): void { const eventType = type as keyof TransferEventListeners; const listeners = this.eventListeners[eventType]; @@ -101,12 +93,26 @@ export class S3TransferManager implements IS3TransferManager { throw new Error(`Unknown event type: ${eventType}`); } - // TODO: Add support for AbortSignal - const once = typeof options !== "boolean" && options?.once; + const signal = typeof options !== "boolean" ? options?.signal : undefined; let updatedCallback = callback; + + if (signal?.aborted) { + return; + } + + if (signal) { + const removeListenerAfterAbort = () => { + this.removeEventListener(eventType, updatedCallback); + this.abortCleanupFunctions.delete(signal); + }; + + this.abortCleanupFunctions.set(signal, () => signal.removeEventListener("abort", removeListenerAfterAbort)); + signal.addEventListener("abort", removeListenerAfterAbort, { once: true }); + } + if (once) { - updatedCallback = (event: any) => { + updatedCallback = (event: Event) => { if (typeof callback === "function") { callback(event); } else { @@ -115,29 +121,31 @@ export class S3TransferManager implements IS3TransferManager { this.removeEventListener(eventType, updatedCallback); }; } - - if (eventType === "transferInitiated" || eventType === "bytesTransferred" || eventType === "transferFailed") { - listeners.push(updatedCallback as EventListener); - } else if (eventType === "transferComplete") { - (listeners as EventListener[]).push( - updatedCallback as EventListener - ); - } + listeners.push(updatedCallback); } + /** + * todo: what does the return boolean mean? + * + * it returns false if the event is cancellable, and at least oneo the handlers which received event called + * Event.preventDefault(). Otherwise true. + * The use cases of preventDefault() does not apply to transfermanager but we should still keep the boolean + * and continue to return true to stay consistent with EventTarget. + * + */ public dispatchEvent(event: Event & TransferEvent): boolean; public dispatchEvent(event: Event & TransferCompleteEvent): boolean; public dispatchEvent(event: Event): boolean; - public dispatchEvent(event: any): boolean { + public dispatchEvent(event: Event): boolean { const eventType = event.type; - const listeners = this.eventListeners[eventType as keyof TransferEventListeners]; + const listeners = this.eventListeners[eventType as keyof TransferEventListeners] as EventListener[]; if (listeners) { - for (const callback of listeners) { - if (typeof callback === "function") { - callback(event); + for (const listener of listeners) { + if (typeof listener === "function") { + listener(event); } else { - callback.handleEvent?.(event); + listener.handleEvent(event); } } } @@ -166,25 +174,29 @@ export class S3TransferManager implements IS3TransferManager { ): void; public removeEventListener( type: string, - callback: EventListener | null, + callback: EventListener, options?: RemoveEventListenerOptions | boolean ): void; - public removeEventListener(type: unknown, callback: unknown, options?: unknown): void { + public removeEventListener( + type: string, + callback: EventListener, + options?: RemoveEventListenerOptions | boolean + ): void { const eventType = type as keyof TransferEventListeners; const listeners = this.eventListeners[eventType]; if (listeners) { - if (eventType === "transferInitiated" || eventType === "bytesTransferred" || eventType === "transferFailed") { + if ( + eventType === "transferInitiated" || + eventType === "bytesTransferred" || + eventType === "transferFailed" || + eventType === "transferComplete" + ) { const eventListener = callback as EventListener; - const index = listeners.indexOf(eventListener); - if (index !== -1) { + let index = listeners.indexOf(eventListener); + while (index !== -1) { listeners.splice(index, 1); - } - } else if (eventType === "transferComplete") { - const eventListener = callback as EventListener; - const index = (listeners as EventListener[]).indexOf(eventListener); - if (index !== -1) { - (listeners as EventListener[]).splice(index, 1); + index = listeners.indexOf(eventListener); } } else { throw new Error(`Unknown event type: ${type}`); @@ -196,205 +208,117 @@ export class S3TransferManager implements IS3TransferManager { throw new Error("Method not implemented."); } + /** + * What is missing from the revised SEP and this implementation currently? + * PART mode: + * - (DONE) Step 5: validate GetObject response for each part + * - If validation fails at any point, cancel all ongoing requests and error out + * - Step 6: after all requests have been sent, validate that the total number of part GET requests sent matches with the + * expected `PartsCount` + * - Step 7: when creating DownloadResponse, set accordingly: + * - (DONE) `ContentLength` : total length of the object saved from Step 3 + * - (DONE) `ContentRange`: based on `bytes 0-(ContentLength -1)/ContentLength` + * - If ChecksumType is `COMPOSITE`, set all checksum value members to null as + * the checksum value returned from a part GET request is not the composite + * checksum for the entire object + * RANGE mode: + * - (DONE) Step 7: validate GetObject response for each part. If validation fails or a + * request fails at any point, cancel all ongoing requests and return an error to + * the user. + * - Step 8: after all requests have sent, validate that the total number of ranged + * GET requests sent matches with the expected number saved from Step 5. + * - Step 9: create DownloadResponse. Copy the fields in GetObject response from + * Step 3 and set the following fields accordingly: + * - (DONE) `ContentLength` : total length of the object saved from Step 3 + * - (DONE) `ContentRange`: based on `bytes 0-(ContentLength -1)/ContentLength` + * - If ChecksumType is `COMPOSITE`, set all checksum value members to null as + * the checksum value returned from a part GET request is not the composite + * checksum for the entire object + * Checksum validation notes: + * - + * + */ public async download(request: DownloadRequest, transferOptions?: TransferOptions): Promise { - const metadata = {} as Omit; - const streams = [] as StreamingBlobPayloadOutputTypes[]; - const requests = [] as GetObjectCommandInput[]; - const partNumber = request.PartNumber; - const range = request.Range; - let totalSize: number | undefined; - - if (transferOptions?.eventListeners) { - for await (const listeners of this.iterateListeners(transferOptions?.eventListeners)) { - for (const listener of listeners) { - this.addEventListener(listener.eventType, listener.callback as EventListener); - } - } - } - - // TODO: Ensure download operation is treated as single object download when partNumber is provided regardless of multipartDownloadType setting if (typeof partNumber === "number") { - const getObjectRequest = { - ...request, - PartNumber: partNumber, - }; - const getObject = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); - - this.dispatchEvent( - Object.assign(new Event("transferInitiated"), { - request, - snapshot: { - transferredBytes: 0, - totalBytes: getObject.ContentLength, - }, - }) + throw new Error( + "partNumber included: S3 Transfer Manager does not support downloads for specific parts. Use GetObjectCommand instead" ); + } - if (getObject.Body) { - streams.push(getObject.Body); - requests.push(getObjectRequest); - } - this.assignMetadata(metadata, getObject); - } else if (this.multipartDownloadType === "PART") { - if (range == null) { - const initialPartRequest = { - ...request, - PartNumber: 1, - }; - const initialPart = await this.s3ClientInstance.send(new GetObjectCommand(initialPartRequest), transferOptions); - const initialETag = initialPart.ETag ?? undefined; - totalSize = initialPart.ContentRange ? parseInt(initialPart.ContentRange.split("/")[1]) : undefined; - - this.dispatchTransferInitiatedEvent(request, totalSize); - if (initialPart.Body) { - streams.push(initialPart.Body); - requests.push(initialPartRequest); - } - this.assignMetadata(metadata, initialPart); - - if (initialPart.PartsCount! > 1) { - let previousPart = initialPart; - for (let part = 2; part <= initialPart.PartsCount!; part++) { - const getObjectRequest = { - ...request, - PartNumber: part, - IfMatch: !request.VersionId ? initialETag : undefined, - }; - const getObject = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); - - if (getObject.ContentRange && previousPart.ContentRange) { - this.validateExpectedRanges(previousPart.ContentRange, getObject.ContentRange, part); - } - - if (getObject.Body) { - streams.push(getObject.Body); - requests.push(getObjectRequest); - } - this.assignMetadata(metadata, getObject); - previousPart = getObject; - } - } - } else { - const getObjectRequest = { - ...request, - }; - const getObject = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); - totalSize = getObject.ContentRange ? parseInt(getObject.ContentRange.split("/")[1]) : undefined; - - this.dispatchTransferInitiatedEvent(request, totalSize); - if (getObject.Body) { - streams.push(getObject.Body); - requests.push(getObjectRequest); - } - this.assignMetadata(metadata, getObject); - } - } else if (this.multipartDownloadType === "RANGE") { - let initialETag = undefined; - let left = 0; - let right = S3TransferManager.MIN_PART_SIZE; - let maxRange = Infinity; - - if (range != null) { - const [userRangeLeft, userRangeRight] = range.replace("bytes=", "").split("-").map(Number); - - maxRange = userRangeRight; - left = userRangeLeft; - right = Math.min(userRangeRight, left + S3TransferManager.MIN_PART_SIZE); - } + const metadata = {} as Omit; + const streams = [] as Promise[]; + const requests = [] as GetObjectCommandInput[]; - let remainingLength = 1; - let transferInitiatedEventDispatched = false; - - // TODO: Validate ranges for if multipartDownloadType === "RANGE" - while (remainingLength > 0) { - const range = `bytes=${left}-${right}`; - const getObjectRequest: GetObjectCommandInput = { - ...request, - Range: range, - IfMatch: transferInitiatedEventDispatched && !request.VersionId ? initialETag : undefined, - }; - const getObject = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); - - if (!transferInitiatedEventDispatched) { - totalSize = getObject.ContentRange ? parseInt(getObject.ContentRange.split("/")[1]) : undefined; - - this.dispatchTransferInitiatedEvent(request, totalSize); - initialETag = getObject.ETag ?? undefined; - transferInitiatedEventDispatched = true; - } + let totalSize: number | undefined; - if (getObject.Body) { - streams.push(getObject.Body); - requests.push(getObjectRequest); + this.checkAborted(transferOptions); + this.addEventListeners(transferOptions?.eventListeners); - // TODO: - // after completing SEP requirements: - // - acquire lock on webstreams in the same - // - synchronous frame as they are opened or else - // - the connection might be closed too early. - if (typeof (getObject.Body as ReadableStream).getReader === "function") { - const reader = (getObject.Body as any).getReader(); - (getObject.Body as any).getReader = function () { - return reader; - }; - } - } - this.assignMetadata(metadata, getObject); + if (this.multipartDownloadType === "PART") { + const responseMetadata = await this.downloadByPart(request, transferOptions ?? {}, streams, requests, metadata); + totalSize = responseMetadata.totalSize; + } else if (this.multipartDownloadType === "RANGE") { + const responseMetadata = await this.downloadByRange(request, transferOptions ?? {}, streams, requests, metadata); + totalSize = responseMetadata.totalSize; + } - left = right + 1; - right = Math.min(left + S3TransferManager.MIN_PART_SIZE, maxRange); + const removeLocalEventListeners = () => { + this.removeEventListeners(transferOptions?.eventListeners); - remainingLength = Math.min( - right - left, - Math.max(0, (getObject.ContentLength ?? 0) - S3TransferManager.MIN_PART_SIZE) - ); + // remove any local abort() listeners after request completes. + if (transferOptions?.abortSignal) { + this.abortCleanupFunctions.get(transferOptions.abortSignal as AbortSignal)?.(); + this.abortCleanupFunctions.delete(transferOptions.abortSignal as AbortSignal); } - } + }; - const responseBody = joinStreams(streams, { - onBytes: (byteLength: number, index) => { - this.dispatchEvent( - Object.assign(new Event("bytesTransferred"), { - request: requests[index], - snapshot: { - transferredBytes: byteLength, - totalBytes: totalSize, - }, - }) - ); - }, - onCompletion: (byteLength: number, index) => { - this.dispatchEvent( - Object.assign(new Event("transferComplete"), { - request: requests[index], - response: { - ...metadata, - Body: responseBody, - }, - snapshot: { - transferredBytes: byteLength, - totalBytes: totalSize, - }, - }) - ); - }, - onFailure: (error: unknown, index) => { - this.dispatchEvent( - Object.assign(new Event("transferFailed"), { - request: requests[index], - snapshot: { - transferredBytes: error, - totalBytes: totalSize, - }, - }) - ); - }, - }); + // TODO: + // after completing SEP requirements: + // - acquire lock on webstreams in the same + // - synchronous frame as they are opened or else + // - the connection might be closed too early. const response = { ...metadata, - Body: responseBody, + Body: await joinStreams(streams, { + onBytes: (byteLength: number, index) => { + this.dispatchEvent( + Object.assign(new Event("bytesTransferred"), { + request: requests[index], + snapshot: { + transferredBytes: byteLength, + totalBytes: totalSize, + }, + }) + ); + }, + onCompletion: (byteLength: number, index) => { + this.dispatchEvent( + Object.assign(new Event("transferComplete"), { + request: requests[index], + response, + snapshot: { + transferredBytes: byteLength, + totalBytes: totalSize, + }, + }) + ); + removeLocalEventListeners(); + }, + onFailure: (error: unknown, index) => { + this.dispatchEvent( + Object.assign(new Event("transferFailed"), { + request: requests[index], + snapshot: { + transferredBytes: error, + totalBytes: totalSize, + }, + }) + ); + removeLocalEventListeners(); + }, + }), }; return response; @@ -429,6 +353,236 @@ export class S3TransferManager implements IS3TransferManager { throw new Error("Method not implemented."); } + protected async downloadByPart( + request: DownloadRequest, + transferOptions: TransferOptions, + streams: Promise[], + requests: GetObjectCommandInput[], + metadata: Omit + ): Promise<{ totalSize: number | undefined }> { + let totalSize: number | undefined; + this.checkAborted(transferOptions); + + if (request.Range == null) { + const initialPartRequest = { + ...request, + PartNumber: 1, + }; + const initialPart = await this.s3ClientInstance.send(new GetObjectCommand(initialPartRequest), transferOptions); + const initialETag = initialPart.ETag ?? undefined; + const partSize = initialPart.ContentLength; + totalSize = initialPart.ContentRange ? Number.parseInt(initialPart.ContentRange.split("/")[1]) : undefined; + this.dispatchTransferInitiatedEvent(request, totalSize); + if (initialPart.Body) { + if (initialPart.Body && typeof (initialPart.Body as any).getReader === "function") { + const reader = (initialPart.Body as any).getReader(); + (initialPart.Body as any).getReader = function () { + return reader; + }; + } + streams.push(Promise.resolve(initialPart.Body)); + requests.push(initialPartRequest); + } + + this.updateResponseLengthAndRange(initialPart, totalSize); + this.assignMetadata(metadata, initialPart); + this.updateChecksumValues(initialPart, metadata); + + let partCount = 1; + if (initialPart.PartsCount! > 1) { + for (let part = 2; part <= initialPart.PartsCount!; part++) { + this.checkAborted(transferOptions); + const getObjectRequest = { + ...request, + PartNumber: part, + IfMatch: !request.VersionId ? initialETag : undefined, + }; + const getObject = this.s3ClientInstance + .send(new GetObjectCommand(getObjectRequest), transferOptions) + .then((response) => { + this.validatePartDownload(response.ContentRange, part, partSize ?? 0); + if (response.Body && typeof (response.Body as any).getReader === "function") { + const reader = (response.Body as any).getReader(); + (response.Body as any).getReader = function () { + return reader; + }; + } + return response.Body!; + }); + + streams.push(getObject); + requests.push(getObjectRequest); + partCount++; + } + if (partCount !== initialPart.PartsCount) { + throw new Error( + `The number of parts downloaded (${partCount}) does not match the expected number (${initialPart.PartsCount})` + ); + } + } + } else { + this.checkAborted(transferOptions); + + const getObjectRequest = { + ...request, + }; + const getObject = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); + totalSize = getObject.ContentRange ? Number.parseInt(getObject.ContentRange.split("/")[1]) : undefined; + + this.dispatchTransferInitiatedEvent(request, totalSize); + if (getObject.Body) { + streams.push(Promise.resolve(getObject.Body)); + requests.push(getObjectRequest); + } + this.updateResponseLengthAndRange(getObject, totalSize); + this.assignMetadata(metadata, getObject); + this.updateChecksumValues(getObject, metadata); + } + + return { + totalSize, + }; + } + + protected async downloadByRange( + request: DownloadRequest, + transferOptions: TransferOptions, + streams: Promise[], + requests: GetObjectCommandInput[], + metadata: Omit + ): Promise<{ totalSize: number | undefined }> { + this.checkAborted(transferOptions); + + let left = 0; + let right = this.targetPartSizeBytes - 1; + let maxRange = Number.POSITIVE_INFINITY; + let remainingLength = 1; + + if (request.Range != null) { + const [userRangeLeft, userRangeRight] = request.Range.replace("bytes=", "").split("-").map(Number); + + maxRange = userRangeRight; + left = userRangeLeft; + right = Math.min(userRangeRight, left + S3TransferManager.MIN_PART_SIZE - 1); + } + const getObjectRequest: GetObjectCommandInput = { + ...request, + Range: `bytes=${left}-${right}`, + }; + const initialRangeGet = await this.s3ClientInstance.send(new GetObjectCommand(getObjectRequest), transferOptions); + this.validateRangeDownload(`bytes=${left}-${right}`, initialRangeGet.ContentRange); + const initialETag = initialRangeGet.ETag ?? undefined; + const totalSize = initialRangeGet.ContentRange + ? Number.parseInt(initialRangeGet.ContentRange.split("/")[1]) + : undefined; + + let expectedRequestCount = 1; + if (totalSize) { + const contentLength = totalSize; + const remainingBytes = Math.max(0, contentLength - (right - left + 1)); + const additionalRequests = Math.ceil(remainingBytes / S3TransferManager.MIN_PART_SIZE); + expectedRequestCount += additionalRequests; + } + + if (initialRangeGet.Body && typeof (initialRangeGet.Body as any).getReader === "function") { + const reader = (initialRangeGet.Body as any).getReader(); + (initialRangeGet.Body as any).getReader = function () { + return reader; + }; + } + + this.dispatchTransferInitiatedEvent(request, totalSize); + streams.push(Promise.resolve(initialRangeGet.Body!)); + requests.push(getObjectRequest); + + this.updateResponseLengthAndRange(initialRangeGet, totalSize); + this.assignMetadata(metadata, initialRangeGet); + this.updateChecksumValues(initialRangeGet, metadata); + + left = right + 1; + right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange); + remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0; + let actualRequestCount = 1; + while (remainingLength > 0) { + this.checkAborted(transferOptions); + + const range = `bytes=${left}-${right}`; + const getObjectRequest: GetObjectCommandInput = { + ...request, + Range: range, + IfMatch: !request.VersionId ? initialETag : undefined, + }; + const getObject = this.s3ClientInstance + .send(new GetObjectCommand(getObjectRequest), transferOptions) + .then((response) => { + this.validateRangeDownload(range, response.ContentRange); + if (response.Body && typeof (response.Body as any).getReader === "function") { + const reader = (response.Body as any).getReader(); + (response.Body as any).getReader = function () { + return reader; + }; + } + return response.Body!; + }); + + streams.push(getObject); + requests.push(getObjectRequest); + actualRequestCount++; + + left = right + 1; + right = Math.min(left + S3TransferManager.MIN_PART_SIZE - 1, maxRange); + remainingLength = totalSize ? Math.min(right - left + 1, Math.max(0, totalSize - left)) : 0; + } + + if (expectedRequestCount !== actualRequestCount) { + throw new Error( + `The number of ranged GET requests sent (${actualRequestCount}) does not match the expected number (${expectedRequestCount})` + ); + } + + return { + totalSize, + }; + } + + private addEventListeners(eventListeners?: TransferEventListeners): void { + for (const listeners of this.iterateListeners(eventListeners)) { + for (const listener of listeners) { + this.addEventListener(listener.eventType, listener.callback as EventListener); + } + } + } + + private removeEventListeners(eventListeners?: TransferEventListeners): void { + for (const listeners of this.iterateListeners(eventListeners)) { + for (const listener of listeners) { + this.removeEventListener(listener.eventType, listener.callback as EventListener); + } + } + } + + private updateResponseLengthAndRange(response: DownloadResponse, totalSize: number | undefined): void { + if (totalSize !== undefined) { + response.ContentLength = totalSize; + response.ContentRange = `bytes 0-${totalSize - 1}/${totalSize}`; + } + } + + private updateChecksumValues(initialPart: DownloadResponse, metadata: Omit) { + if (initialPart.ChecksumType === "COMPOSITE") { + metadata.ChecksumCRC32 = undefined; + metadata.ChecksumCRC32C = undefined; + metadata.ChecksumSHA1 = undefined; + metadata.ChecksumSHA256 = undefined; + } + } + + private checkAborted(transferOptions?: TransferOptions): void { + if (transferOptions?.abortSignal?.aborted) { + throw Object.assign(new Error("Download aborted."), { name: "AbortError" }); + } + } + private assignMetadata(container: any, response: any) { for (const key in response) { if (key === "Body") { @@ -457,29 +611,11 @@ export class S3TransferManager implements IS3TransferManager { return true; } - /** - * For debugging purposes - * - * @internal - */ - private logCallbackCount(type: unknown): void { - const eventType = type as keyof TransferEventListeners; - const listeners = this.eventListeners[eventType]; - - console.log(`Callback count for ${eventType}: `); - let count = 0; - if (listeners) { - for (const callbacks of listeners) { - count++; - } - } - console.log(count); - } - - private async *iterateListeners(eventListeners: TransferEventListeners) { - for (const eventType in eventListeners) { + private *iterateListeners(eventListeners: TransferEventListeners = {}) { + for (const key in eventListeners) { + const eventType = key as keyof TransferEventListeners; const listeners = eventListeners[eventType as keyof TransferEventListeners]; - if (listeners) { + if (Array.isArray(listeners)) { for (const callback of listeners) { yield [ { @@ -492,43 +628,60 @@ export class S3TransferManager implements IS3TransferManager { } } - private validateExpectedRanges(previousPart: string, currentPart: string, partNum: number) { - const parseContentRange = (range: string) => { - const match = range.match(/bytes (\d+)-(\d+)\/(\d+)/); - if (!match) throw new Error(`Invalid ContentRange format: ${range}`); - return { - start: parseInt(match[1]), - end: parseInt(match[2]), - total: parseInt(match[3]), - }; - }; + private validatePartDownload(contentRange: string | undefined, partNumber: number, partSize: number) { + if (!contentRange) { + throw new Error(`Missing ContentRange for part ${partNumber}.`); + } - // TODO: throw error for incomplete download. - // Ex: final part and 8 bytes short should throw error -> "bytes 10485760-13631480/13631488" + const match = contentRange.match(/bytes (\d+)-(\d+)\/(\d+)/); + if (!match) throw new Error(`Invalid ContentRange format: ${contentRange}`); - try { - const previous = parseContentRange(previousPart); - const current = parseContentRange(currentPart); + const start = Number.parseInt(match[1]); + const end = Number.parseInt(match[2]); + const total = Number.parseInt(match[3]) - 1; - const expectedStart = previous.end + 1; - const prevPartSize = previous.end - previous.start + 1; - const currPartSize = current.end - current.start + 1; + const expectedStart = (partNumber - 1) * partSize; + const expectedEnd = Math.min(expectedStart + partSize - 1, total); - if (current.start !== expectedStart) { - throw new Error(`Expected part ${partNum} to start at ${expectedStart} but got ${current.start}`); - } + if (start !== expectedStart) { + throw new Error(`Expected part ${partNumber} to start at ${expectedStart} but got ${start}`); + } - // console.log(currPartSize < prevPartSize); - // console.log(current.end !== current.total - 1); - if (currPartSize < prevPartSize && current.end !== current.total - 1) { - throw new Error( - `Final part did not cover total range of ${current.total}. Expected range of bytes ${current.start}-${ - currPartSize - 1 - }` - ); - } - } catch (error) { - throw new Error(`Range validation failed: ${error.message}`); + if (end !== expectedEnd) { + throw new Error(`Expected part ${partNumber} to end at ${expectedEnd} but got ${end}`); + } + } + + private validateRangeDownload(requestRange: string, responseRange: string | undefined) { + if (!responseRange) { + throw new Error(`Missing ContentRange for range ${requestRange}.`); } + + const match = responseRange.match(/bytes (\d+)-(\d+)\/(\d+)/); + if (!match) throw new Error(`Invalid ContentRange format: ${responseRange}`); + + const start = Number.parseInt(match[1]); + const end = Number.parseInt(match[2]); + const total = Number.parseInt(match[3]) - 1; + + const rangeMatch = requestRange.match(/bytes=(\d+)-(\d+)/); + if (!rangeMatch) throw new Error(`Invalid Range format: ${requestRange}`); + + const expectedStart = Number.parseInt(rangeMatch[1]); + const expectedEnd = Number.parseInt(rangeMatch[2]); + + if (start !== expectedStart) { + throw new Error(`Expected range to start at ${expectedStart} but got ${start}`); + } + + if (end === expectedEnd) { + return; + } + + if (end === total) { + return; + } + + throw new Error(`Expected range to end at ${expectedEnd} but got ${end}`); } } diff --git a/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts b/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts index 5b2b13d3d8b45..23b81389d4df0 100644 --- a/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts +++ b/lib/lib-storage/src/s3-transfer-manager/join-streams.browser.ts @@ -1,17 +1,17 @@ import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; import { isBlob, isReadableStream, sdkStreamMixin } from "@smithy/util-stream"; -// check all types. needs to join nodejs and browser together -export function joinStreams(streams: StreamingBlobPayloadOutputTypes[]): StreamingBlobPayloadOutputTypes { - console.log("Is Readable Stream: "); - console.log(isReadableStream(streams[0])); +import { JoinStreamIterationEvents } from "./types"; - if (streams.length === 1) { - return streams[0]; - } else if (isReadableStream(streams[0]) || isBlob(streams[0])) { +export async function joinStreams( + streams: Promise[], + eventListeners?: JoinStreamIterationEvents +): Promise { + const firstStream = await streams[0]; + if (isReadableStream(firstStream)) { const newReadableStream = new ReadableStream({ async start(controller) { - for await (const chunk of iterateStreams(streams)) { + for await (const chunk of iterateStreams(streams, eventListeners)) { controller.enqueue(chunk); } controller.close(); @@ -19,25 +19,39 @@ export function joinStreams(streams: StreamingBlobPayloadOutputTypes[]): Streami }); return sdkStreamMixin(newReadableStream); } else { - throw new Error("Unknown stream type"); + throw new Error("Unsupported Stream Type"); } } export async function* iterateStreams( - streams: StreamingBlobPayloadOutputTypes[] + streams: Promise[], + eventListeners?: JoinStreamIterationEvents ): AsyncIterable { - for (const stream of streams) { + let bytesTransferred = 0; + let index = 0; + for (const streamPromise of streams) { + const stream = await streamPromise; if (isReadableStream(stream)) { - const reader = (stream as ReadableStream).getReader(); + const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); - if (done) break; + if (done) { + break; + } yield value; + bytesTransferred += value.byteLength; + eventListeners?.onBytes?.(bytesTransferred, index); } } finally { reader.releaseLock(); } + } else { + const failure = new Error(`unhandled stream type ${(stream as any)?.constructor?.name}`); + eventListeners?.onFailure?.(failure, index); + throw failure; } + index++; } + eventListeners?.onCompletion?.(bytesTransferred, index - 1); } diff --git a/lib/lib-storage/src/s3-transfer-manager/join-streams.ts b/lib/lib-storage/src/s3-transfer-manager/join-streams.ts index 574c99aba51f5..56c18e2929361 100644 --- a/lib/lib-storage/src/s3-transfer-manager/join-streams.ts +++ b/lib/lib-storage/src/s3-transfer-manager/join-streams.ts @@ -1,17 +1,16 @@ import { StreamingBlobPayloadOutputTypes } from "@smithy/types"; -import { isBlob, isReadableStream, sdkStreamMixin } from "@smithy/util-stream"; +import { isReadableStream, sdkStreamMixin } from "@smithy/util-stream"; import { Readable } from "stream"; import { JoinStreamIterationEvents } from "./types"; // TODO: check all types. needs to join nodejs and browser together -export function joinStreams( - streams: StreamingBlobPayloadOutputTypes[], +export async function joinStreams( + streams: Promise[], eventListeners?: JoinStreamIterationEvents -): StreamingBlobPayloadOutputTypes { - if (streams.length === 1) { - return streams[0]; - } else if (isReadableStream(streams[0])) { +): Promise { + const firstStream = await streams[0]; + if (isReadableStream(firstStream)) { const newReadableStream = new ReadableStream({ async start(controller) { for await (const chunk of iterateStreams(streams, eventListeners)) { @@ -21,43 +20,39 @@ export function joinStreams( }, }); return sdkStreamMixin(newReadableStream); - } else if (isBlob(streams[0])) { - throw new Error("Blob not supported yet"); } else { return sdkStreamMixin(Readable.from(iterateStreams(streams, eventListeners))); } } export async function* iterateStreams( - streams: StreamingBlobPayloadOutputTypes[], + streams: Promise[], eventListeners?: JoinStreamIterationEvents ): AsyncIterable { let bytesTransferred = 0; let index = 0; - for (const stream of streams) { + for (const streamPromise of streams) { + const stream = await streamPromise; if (isReadableStream(stream)) { - // const reader = stream.getReader(); - // while (true) { - // const { done, value } = await reader.read(); - // if (done) { - // break; - // } - // yield value; - // bytesTransferred += value.byteLength; - // } - // reader.releaseLock(); - - const failure = new Error(`ReadableStreams not supported yet ${(stream as any)?.constructor?.name}`); - eventListeners?.onFailure?.(failure, index); - throw failure; - } else if (isBlob(stream)) { - throw new Error("Blob not supported yet"); + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + yield value; + bytesTransferred += value.byteLength; + eventListeners?.onBytes?.(bytesTransferred, index); + } + } finally { + reader.releaseLock(); + } } else if (stream instanceof Readable) { for await (const chunk of stream) { yield chunk; const chunkSize = Buffer.isBuffer(chunk) ? chunk.length : Buffer.byteLength(chunk); bytesTransferred += chunkSize; - eventListeners?.onBytes?.(bytesTransferred, index); } } else { diff --git a/lib/lib-storage/src/s3-transfer-manager/types.ts b/lib/lib-storage/src/s3-transfer-manager/types.ts index 6b6e81f6d6b1d..69801f4759706 100644 --- a/lib/lib-storage/src/s3-transfer-manager/types.ts +++ b/lib/lib-storage/src/s3-transfer-manager/types.ts @@ -216,14 +216,14 @@ export interface IS3TransferManager { callback: EventListener, options?: AddEventListenerOptions | boolean ): void; - addEventListener(type: string, callback: EventListener | null, options?: AddEventListenerOptions | boolean): void; + addEventListener(type: string, callback: EventListener, options?: AddEventListenerOptions | boolean): void; /** * Dispatches an event to the registered event listeners. * Triggers callbacks registered via addEventListener with matching event types. * * @param event - The event object to dispatch. - * @returns whether the event dispatched successfully + * @returns whether the event ran to completion * * @public */ @@ -261,11 +261,7 @@ export interface IS3TransferManager { callback: EventListener, options?: RemoveEventListenerOptions | boolean ): void; - removeEventListener( - type: string, - callback: EventListener | null, - options?: RemoveEventListenerOptions | boolean - ): void; + removeEventListener(type: string, callback: EventListener, options?: RemoveEventListenerOptions | boolean): void; } /**