Skip to content

Commit

Permalink
feat: Resume Resumable Uploads (#2333)
Browse files Browse the repository at this point in the history
* chore: WIP

* chore: typo

* feat: WIP

* docs: `returns` clean up

* fix: missing `gzip`

* test: validate `createURI`

* feat: check for invalid offset when `resumeCRC32C` has not been provided

* fix: type

* chore: lint

* test: lint + cleanup

* chore: lint

* feat: emit `uri` when created

* docs: clarify

* test: Make the interrupt test actually test something

* feat: expose the `crc32c` if available

* refactor: clean-up

* test: Add _resumable_ resumable upload tests

* docs: Clean up and clarify

* docs: docs

* chore: clean-up

* fix: merge conflict fixes
  • Loading branch information
danielbankhead authored Nov 1, 2023
1 parent 8cff0a1 commit 2ba4009
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 100 deletions.
113 changes: 92 additions & 21 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@ import * as crypto from 'crypto';
import * as fs from 'fs';
import mime from 'mime';
import * as resumableUpload from './resumable-upload.js';
import {
Writable,
Readable,
pipeline,
Transform,
PassThrough,
PipelineSource,
} from 'stream';
import {Writable, Readable, pipeline, Transform, PipelineSource} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -70,7 +63,7 @@ import {
formatAsUTCISO,
PassThroughShim,
} from './util.js';
import {CRC32CValidatorGenerator} from './crc32c.js';
import {CRC32C, CRC32CValidatorGenerator} from './crc32c.js';
import {HashStreamValidator} from './hash-stream-validator.js';
import {URL} from 'url';

Expand Down Expand Up @@ -208,6 +201,7 @@ export type PredefinedAcl =
type PublicResumableUploadOptions =
| 'chunkSize'
| 'highWaterMark'
| 'isPartialUpload'
| 'metadata'
| 'origin'
| 'offset'
Expand All @@ -219,6 +213,15 @@ type PublicResumableUploadOptions =

export interface CreateResumableUploadOptions
extends Pick<resumableUpload.UploadConfig, PublicResumableUploadOptions> {
/**
* A CRC32C to resume from when continuing a previous upload. It is recommended
* to capture the `crc32c` event from previous upload sessions to provide in
* subsequent requests in order to accurately track the upload. This is **required**
* when validating a final portion of the uploaded object.
*
* @see {@link CRC32C.from} for possible values.
*/
resumeCRC32C?: Parameters<(typeof CRC32C)['from']>[0];
preconditionOpts?: PreconditionOptions;
[GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY];
}
Expand Down Expand Up @@ -467,6 +470,8 @@ export enum FileExceptionMessages {
UPLOAD_MISMATCH = `The uploaded data did not match the data from the server.
As a precaution, the file has been deleted.
To be sure the content is the same, you should try uploading the file again.`,
MD5_RESUMED_UPLOAD = 'MD5 cannot be used with a continued resumable upload as MD5 cannot be extended from an existing value',
MISSING_RESUME_CRC32C_FINAL_UPLOAD = 'The CRC32C is missing for the final portion of a resumed upload, which is required for validation. Please provide `resumeCRC32C` if validation is required, or disable `validation`.',
}

/**
Expand Down Expand Up @@ -1837,8 +1842,8 @@ class File extends ServiceObject<File, FileMetadata> {
* NOTE: Writable streams will emit the `finish` event when the file is fully
* uploaded.
*
* See {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload| Upload Options (Simple or Resumable)}
* See {@link https://cloud.google.com/storage/docs/json_api/v1/objects/insert| Objects: insert API Documentation}
* See {@link https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload Upload Options (Simple or Resumable)}
* See {@link https://cloud.google.com/storage/docs/json_api/v1/objects/insert Objects: insert API Documentation}
*
* @param {CreateWriteStreamOptions} [options] Configuration options.
* @returns {WritableStream}
Expand Down Expand Up @@ -1903,6 +1908,22 @@ class File extends ServiceObject<File, FileMetadata> {
* // The file upload is complete.
* });
* ```
*
* //-
* // <h4>Continuing a Resumable Upload</h4>
* //
* // One can capture a `uri` from a resumable upload to reuse later.
* // Additionally, for validation, one can also capture and pass `crc32c`.
* //-
* let uri: string | undefined = undefined;
* let resumeCRC32C: string | undefined = undefined;
*
* fs.createWriteStream()
* .on('uri', link => {uri = link})
* .on('crc32', crc32c => {resumeCRC32C = crc32c});
*
* // later...
* fs.createWriteStream({uri, resumeCRC32C});
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
createWriteStream(options: CreateWriteStreamOptions = {}): Writable {
Expand Down Expand Up @@ -1941,6 +1962,19 @@ class File extends ServiceObject<File, FileMetadata> {
md5 = options.validation === 'md5';
} else if (options.validation === false) {
crc32c = false;
md5 = false;
}

if (options.offset) {
if (md5) {
throw new RangeError(FileExceptionMessages.MD5_RESUMED_UPLOAD);
}

if (crc32c && !options.isPartialUpload && !options.resumeCRC32C) {
throw new RangeError(
FileExceptionMessages.MISSING_RESUME_CRC32C_FINAL_UPLOAD
);
}
}

/**
Expand Down Expand Up @@ -1968,27 +2002,46 @@ class File extends ServiceObject<File, FileMetadata> {
},
});

const transformStreams: Transform[] = [];

if (gzip) {
transformStreams.push(zlib.createGzip());
}

const emitStream = new PassThroughShim();
const hashCalculatingStream = new HashStreamValidator({
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
updateHashesOnly: true,
});

let hashCalculatingStream: HashStreamValidator | null = null;

if (crc32c || md5) {
const crc32cInstance = options.resumeCRC32C
? CRC32C.from(options.resumeCRC32C)
: undefined;

hashCalculatingStream = new HashStreamValidator({
crc32c,
crc32cInstance,
md5,
crc32cGenerator: this.crc32cGenerator,
updateHashesOnly: true,
});

transformStreams.push(hashCalculatingStream);
}

const fileWriteStream = duplexify();
let fileWriteStreamMetadataReceived = false;

// Handing off emitted events to users
emitStream.on('reading', () => writeStream.emit('reading'));
emitStream.on('writing', () => writeStream.emit('writing'));
fileWriteStream.on('uri', evt => writeStream.emit('uri', evt));
fileWriteStream.on('progress', evt => writeStream.emit('progress', evt));
fileWriteStream.on('response', resp => writeStream.emit('response', resp));
fileWriteStream.once('metadata', () => {
fileWriteStreamMetadataReceived = true;
});

writeStream.on('writing', () => {
writeStream.once('writing', () => {
if (options.resumable === false) {
this.startSimpleUpload_(fileWriteStream, options);
} else {
Expand All @@ -1997,8 +2050,7 @@ class File extends ServiceObject<File, FileMetadata> {

pipeline(
emitStream,
gzip ? zlib.createGzip() : new PassThrough(),
hashCalculatingStream,
...(transformStreams as [Transform]),
fileWriteStream,
async e => {
if (e) {
Expand All @@ -2019,8 +2071,23 @@ class File extends ServiceObject<File, FileMetadata> {
}
}

// Emit the local CRC32C value for future validation, if validation is enabled.
if (hashCalculatingStream?.crc32c) {
writeStream.emit('crc32c', hashCalculatingStream.crc32c);
}

try {
await this.#validateIntegrity(hashCalculatingStream, {crc32c, md5});
// Metadata may not be ready if the upload is a partial upload,
// nothing to validate yet.
const metadataNotReady = options.isPartialUpload && !this.metadata;

if (hashCalculatingStream && !metadataNotReady) {
await this.#validateIntegrity(hashCalculatingStream, {
crc32c,
md5,
});
}

pipelineCallback();
} catch (e) {
pipelineCallback(e as Error);
Expand Down Expand Up @@ -3913,6 +3980,7 @@ class File extends ServiceObject<File, FileMetadata> {
),
file: this.name,
generation: this.generation,
isPartialUpload: options.isPartialUpload,
key: this.encryptionKey,
kmsKeyName: this.kmsKeyName,
metadata: options.metadata,
Expand All @@ -3933,6 +4001,9 @@ class File extends ServiceObject<File, FileMetadata> {
.on('response', resp => {
dup.emit('response', resp);
})
.on('uri', uri => {
dup.emit('uri', uri);
})
.on('metadata', metadata => {
this.metadata = metadata;
dup.emit('metadata');
Expand Down
23 changes: 18 additions & 5 deletions src/hash-stream-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ interface HashStreamValidatorOptions {
crc32c: boolean;
/** Enables MD5 calculation. To validate a provided value use `md5Expected`. */
md5: boolean;
/** Set a custom CRC32C generator */
/** A CRC32C instance for validation. To validate a provided value use `crc32cExpected`. */
crc32cInstance: CRC32CValidator;
/** Set a custom CRC32C generator. Used if `crc32cInstance` has not been provided. */
crc32cGenerator: CRC32CValidatorGenerator;
/** Sets the expected CRC32C value to verify once all data has been consumed. Also sets the `crc32c` option to `true` */
crc32cExpected?: string;
Expand Down Expand Up @@ -57,17 +59,28 @@ class HashStreamValidator extends Transform {
this.md5Expected = options.md5Expected;

if (this.crc32cEnabled) {
const crc32cGenerator =
options.crc32cGenerator || CRC32C_DEFAULT_VALIDATOR_GENERATOR;

this.#crc32cHash = crc32cGenerator();
if (options.crc32cInstance) {
this.#crc32cHash = options.crc32cInstance;
} else {
const crc32cGenerator =
options.crc32cGenerator || CRC32C_DEFAULT_VALIDATOR_GENERATOR;

this.#crc32cHash = crc32cGenerator();
}
}

if (this.md5Enabled) {
this.#md5Hash = createHash('md5');
}
}

/**
* Return the current CRC32C value, if available.
*/
get crc32c() {
return this.#crc32cHash?.toString();
}

_flush(callback: (error?: Error | null | undefined) => void) {
if (this.#md5Hash) {
this.#md5Digest = this.#md5Hash.digest('base64');
Expand Down
Loading

0 comments on commit 2ba4009

Please sign in to comment.