Skip to content

Commit

Permalink
feat(xhr-http-handler): add XMLHttpRequest http handler to use with l…
Browse files Browse the repository at this point in the history
…ib-storage Upload (#3798)

* feat(xhr-http-handler): add xhr handler

* feat(xhr-http-handler): remove debug output

* feat(xhr-http-handler): cleanup for PR

* feat(xhr-http-handler): add error handler

* accept suggestion in packages/xhr-http-handler/src/request-timeout.ts

Co-authored-by: AllanZhengYP <zheallan@amazon.com>

* feat(xhr-http-handler): add streaming text handling

* feat(xhr-http-handler): update unit tests

* fix(xhr-http-handler): readme update

* fix(xhr-http-handler): readme update

* fix(xhr-http-handler): readme update

* fix(fetch-http-handler): modify config awaiter to be consistent with other http handlers

* doc(xhr-http-handler): improve documentation for download progress events

Co-authored-by: AllanZhengYP <zheallan@amazon.com>
  • Loading branch information
kuhe and AllanZhengYP authored Jul 15, 2022
1 parent 1702c43 commit 7462b07
Show file tree
Hide file tree
Showing 18 changed files with 1,049 additions and 11 deletions.
49 changes: 43 additions & 6 deletions lib/lib-storage/src/Upload.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ const endpointMock = jest.fn().mockResolvedValue({
query: undefined,
});

import { EventEmitter, Readable } from "stream";

const mockAddListener = jest.fn();
const mockRemoveListener = jest.fn();
const requestHandlerMock = (() => {
const mock = {
on: mockAddListener,
off: mockRemoveListener,
};
Object.setPrototypeOf(mock, EventEmitter.prototype);
return mock;
})();

jest.mock("@aws-sdk/client-s3", () => ({
...(jest.requireActual("@aws-sdk/client-s3") as {}),
S3: jest.fn().mockReturnValue({
Expand All @@ -41,6 +54,7 @@ jest.mock("@aws-sdk/client-s3", () => ({
send: sendMock,
config: {
endpoint: endpointMock,
requestHandler: requestHandlerMock,
},
}),
CreateMultipartUploadCommand: createMultipartMock,
Expand All @@ -50,9 +64,8 @@ jest.mock("@aws-sdk/client-s3", () => ({
PutObjectCommand: putObjectMock,
}));

import { CompleteMultipartUploadCommandOutput, S3 } from "@aws-sdk/client-s3";
import { CompleteMultipartUploadCommandOutput, S3, S3Client } from "@aws-sdk/client-s3";
import { createHash } from "crypto";
import { Readable } from "stream";

import { Progress, Upload } from "./index";

Expand Down Expand Up @@ -498,7 +511,7 @@ describe(Upload.name, () => {
client: new S3({}),
});

const received = [];
const received: Progress[] = [];
upload.on("httpUploadProgress", (progress: Progress) => {
received.push(progress);
});
Expand Down Expand Up @@ -534,7 +547,7 @@ describe(Upload.name, () => {
client: new S3({}),
});

const received = [];
const received: Progress[] = [];
upload.on("httpUploadProgress", (progress: Progress) => {
received.push(progress);
});
Expand Down Expand Up @@ -564,7 +577,7 @@ describe(Upload.name, () => {
client: new S3({}),
});

const received = [];
const received: Progress[] = [];
upload.on("httpUploadProgress", (progress: Progress) => {
received.push(progress);
});
Expand All @@ -587,7 +600,7 @@ describe(Upload.name, () => {
client: new S3({}),
});

const received = [];
const received: Progress[] = [];
upload.on("httpUploadProgress", (progress: Progress) => {
received.push(progress);
});
Expand All @@ -601,4 +614,28 @@ describe(Upload.name, () => {
});
expect(received.length).toBe(1);
});

it("listens to the requestHandler for updates if it is an EventEmitter", async () => {
const partSize = 1024 * 1024 * 5;
const largeBuffer = Buffer.from("#".repeat(partSize + 10));
const streamBody = Readable.from(
(function* () {
yield largeBuffer;
})()
);
const actionParams = { ...params, Body: streamBody };
const upload = new Upload({
params: actionParams,
client: new S3Client({}),
});

const received: Progress[] = [];
upload.on("httpUploadProgress", (progress: Progress) => {
received.push(progress);
});
await upload.done();
expect(received.length).toBe(2);
expect(mockAddListener).toHaveBeenCalledWith("xhr.upload.progress", expect.any(Function));
expect(mockRemoveListener).toHaveBeenCalledWith("xhr.upload.progress", expect.any(Function));
});
});
74 changes: 73 additions & 1 deletion lib/lib-storage/src/Upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
Tag,
UploadPartCommand,
} from "@aws-sdk/client-s3";
import { HttpRequest } from "@aws-sdk/protocol-http";
import { extendedEncodeURIComponent } from "@aws-sdk/smithy-client";
import { EventEmitter } from "events";

Expand Down Expand Up @@ -99,11 +100,35 @@ export class Upload extends EventEmitter {
private async __uploadUsingPut(dataPart: RawDataPart): Promise<void> {
this.isMultiPart = false;
const params = { ...this.params, Body: dataPart.data };

const requestHandler = this.client.config.requestHandler;
const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;
const uploadEventListener = (event: ProgressEvent) => {
this.bytesUploadedSoFar = event.loaded;
this.totalBytes = event.total;
this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
part: dataPart.partNumber,
Key: this.params.Key,
Bucket: this.params.Bucket,
});
};

if (eventEmitter !== null) {
// The requestHandler is the xhr-http-handler.
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}

const [putResult, endpoint] = await Promise.all([
this.client.send(new PutObjectCommand(params)),
this.client.config.endpoint(),
]);

if (eventEmitter !== null) {
eventEmitter.off("xhr.upload.progress", uploadEventListener);
}

const locationKey = this.params
.Key!.split("/")
.map((segment) => extendedEncodeURIComponent(segment))
Expand All @@ -121,6 +146,7 @@ export class Upload extends EventEmitter {
Location,
};
const totalSize = byteLength(dataPart.data);

this.__notifyProgress({
loaded: totalSize,
total: totalSize,
Expand Down Expand Up @@ -164,6 +190,39 @@ export class Upload extends EventEmitter {
}
}

const partSize: number = byteLength(dataPart.data) || 0;

const requestHandler = this.client.config.requestHandler;
const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;

let lastSeenBytes = 0;
const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => {
const requestPartSize = Number(request.query["partNumber"]) || -1;

if (requestPartSize !== dataPart.partNumber) {
// ignored, because the emitted event is not for this part.
return;
}

if (event.total && partSize) {
this.bytesUploadedSoFar += event.loaded - lastSeenBytes;
lastSeenBytes = event.loaded;
}

this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
part: dataPart.partNumber,
Key: this.params.Key,
Bucket: this.params.Bucket,
});
};

if (eventEmitter !== null) {
// The requestHandler is the xhr-http-handler.
eventEmitter.on("xhr.upload.progress", uploadEventListener);
}

const partResult = await this.client.send(
new UploadPartCommand({
...this.params,
Expand All @@ -173,10 +232,20 @@ export class Upload extends EventEmitter {
})
);

if (eventEmitter !== null) {
eventEmitter.off("xhr.upload.progress", uploadEventListener);
}

if (this.abortController.signal.aborted) {
return;
}

if (!partResult.ETag) {
throw new Error(
`Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`
);
}

this.uploadedParts.push({
PartNumber: dataPart.partNumber,
ETag: partResult.ETag,
Expand All @@ -186,7 +255,10 @@ export class Upload extends EventEmitter {
...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }),
});

this.bytesUploadedSoFar += byteLength(dataPart.data);
if (eventEmitter === null) {
this.bytesUploadedSoFar += partSize;
}

this.__notifyProgress({
loaded: this.bytesUploadedSoFar,
total: this.totalBytes,
Expand Down
9 changes: 5 additions & 4 deletions packages/fetch-http-handler/src/fetch-http-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ type FetchHttpHandlerConfig = FetchHttpHandlerOptions;

export class FetchHttpHandler implements HttpHandler {
private config?: FetchHttpHandlerConfig;
private readonly configProvider?: Provider<FetchHttpHandlerConfig>;
private readonly configProvider: Promise<FetchHttpHandlerConfig>;

constructor(options?: FetchHttpHandlerOptions | Provider<FetchHttpHandlerOptions | undefined>) {
if (typeof options === "function") {
this.configProvider = async () => (await options()) || {};
this.configProvider = options().then((opts) => opts || {});
} else {
this.config = options ?? {};
this.configProvider = Promise.resolve(this.config);
}
}

Expand All @@ -36,8 +37,8 @@ export class FetchHttpHandler implements HttpHandler {
}

async handle(request: HttpRequest, { abortSignal }: HttpHandlerOptions = {}): Promise<{ response: HttpResponse }> {
if (!this.config && this.configProvider) {
this.config = await this.configProvider();
if (!this.config) {
this.config = await this.configProvider;
}
const requestTimeoutInMs = this.config!.requestTimeout;

Expand Down
8 changes: 8 additions & 0 deletions packages/xhr-http-handler/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/node_modules/
/build/
/coverage/
/docs/
*.tsbuildinfo
*.tgz
*.log
package-lock.json
Empty file.
Loading

0 comments on commit 7462b07

Please sign in to comment.