Skip to content

Commit

Permalink
feat(s3): clientDirectUpload option (#638)
Browse files Browse the repository at this point in the history
* feat(s3):  `clientDirectUpload` option

* feat(s3): `acl` option
  • Loading branch information
kukhariev authored Oct 18, 2022
1 parent 849cf4c commit 093484c
Show file tree
Hide file tree
Showing 8 changed files with 955 additions and 695 deletions.
1 change: 1 addition & 0 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"gcs:direct": "tsnd -r tsconfig-paths/register -r dotenv/config gcs-direct",
"gcs": "tsnd -r tsconfig-paths/register -r dotenv/config express-gcs",
"s3": "tsnd -r tsconfig-paths/register -r dotenv/config express-s3",
"s3:direct": "tsnd -r tsconfig-paths/register -r dotenv/config s3-direct",
"tus": "tsnd -r tsconfig-paths/register -r dotenv/config express-tus",
"validation": "tsnd -r tsconfig-paths/register -r dotenv/config validation",
"plain-nodejs": "tsnd node-http-server",
Expand Down
31 changes: 31 additions & 0 deletions examples/s3-direct.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import * as express from 'express';
import { LogLevel, uploadx } from '@uploadx/core';
import { S3Storage } from '@uploadx/s3';

const PORT = process.env.PORT || 3002;
const app = express();

// const storage = new S3Storage({
// bucket: <YOUR_BUCKET>,
// endpoint: <YOUR_ENDPOINT>,
// region: <YOUR_REGION>,
// credentials: {
// accessKeyId: <YOUR_ACCESS_KEY_ID>,
// secretAccessKey: <YOUR_SECRET_ACCESS_KEY>
// },
// metaStorageConfig: { directory: 'upload' }
// });

// The credentials are loaded from a shared credentials file
const storage = new S3Storage({
bucket: 'node-uploadx',
endpoint: 'http://127.0.0.1:9000',
forcePathStyle: true,
clientDirectUpload: true,
expiration: { maxAge: '1h', purgeInterval: '15min' },
logLevel: <LogLevel>process.env.LOG_LEVEL || 'info'
});

app.use('/files', uploadx({ storage }));

app.listen(PORT, () => console.log('Listening on port:', PORT));
1 change: 1 addition & 0 deletions packages/core/src/handlers/uploadx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export class Uploadx<TFile extends UploadxFile> extends BaseHandler<TFile> {
const file = await this.storage.update({ id }, metadata);
const headers = this.buildHeaders(file, { Location: this.buildFileUrl(req, file) });
setHeaders(res, headers);
if (file.status === 'completed') return file;
const response = await this.storage.onUpdate(file);
this.send(res, response);
return file;
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/utils/primitives.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ export function toMilliseconds(value: string | number | undefined): number | nul
if (!value) return null;
return duration(value);
}
/**
* Convert a human-readable duration to seconds
*/
export function toSeconds(value: string | number): number {
if (isNumber(value)) return value;
return duration(value, 'sec');
}

/**
* Returns a first element of an array
Expand Down
3 changes: 2 additions & 1 deletion packages/s3/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"dependencies": {
"@aws-sdk/abort-controller": "^3.47.0",
"@aws-sdk/client-s3": "^3.47.0",
"@aws-sdk/credential-providers": "^3.47.0"
"@aws-sdk/credential-providers": "^3.47.0",
"@aws-sdk/s3-request-presigner": "^3.47.0"

},
"devDependencies": {
Expand Down
90 changes: 82 additions & 8 deletions packages/s3/src/s3-storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as bytes from 'bytes';
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
Expand All @@ -12,6 +13,7 @@ import {
} from '@aws-sdk/client-s3';
import { AbortController } from '@aws-sdk/abort-controller';
import { fromIni } from '@aws-sdk/credential-providers';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import {
BaseStorage,
BaseStorageOptions,
Expand All @@ -29,18 +31,23 @@ import {
mapValues,
MetaStorage,
partMatch,
toSeconds,
updateSize
} from '@uploadx/core';
import * as http from 'http';
import { AWSError } from './aws-error';
import { S3MetaStorage, S3MetaStorageOptions } from './s3-meta-storage';

const BUCKET_NAME = 'node-uploadx';
const MIN_PART_SIZE = 5 * 1024 * 1024;
const PART_SIZE = 16 * 1024 * 1024;

export class S3File extends File {
Parts?: Part[];
UploadId = '';
uri?: string;
partsUrls?: string[];
partSize?: number;
}

export type S3StorageOptions = BaseStorageOptions<S3File> &
Expand All @@ -50,7 +57,15 @@ export type S3StorageOptions = BaseStorageOptions<S3File> &
* @defaultValue 'node-uploadx'
*/
bucket?: string;
keyFile?: string;
/**
* Specifying access rules for uploaded files
*/
acl?: 'private' | 'public-read' | string;
/**
* Force compatible client upload directly to S3 storage
*/
clientDirectUpload?: boolean;
partSize?: number | string;
/**
* Configure metafiles storage
* @example
Expand All @@ -72,6 +87,10 @@ export type S3StorageOptions = BaseStorageOptions<S3File> &
* ```
*/
metaStorageConfig?: LocalMetaStorageOptions | S3MetaStorageOptions;
/**
* @deprecated Use standard auth providers
*/
keyFile?: string;
};

/**
Expand All @@ -95,12 +114,20 @@ export class S3Storage extends BaseStorage<S3File> {
client: S3Client;
meta: MetaStorage<S3File>;
checksumTypes = ['md5'];
private readonly _partSize = PART_SIZE;

constructor(public config: S3StorageOptions) {
super(config);
this.bucket = config.bucket || process.env.S3_BUCKET || BUCKET_NAME;
const keyFile = config.keyFile || process.env.S3_KEYFILE;
keyFile && (config.credentials = fromIni({ configFilepath: keyFile }));
this._partSize = bytes.parse(this.config.partSize || PART_SIZE);
if (this._partSize < MIN_PART_SIZE) {
throw new Error('Minimum allowed partSize value is 5MB');
}
if (this.config.clientDirectUpload) {
this.onCreate = async file => ({ body: file }); // TODO: remove hook
}
this.client = new S3Client(config);
if (config.metaStorage) {
this.meta = config.metaStorage;
Expand Down Expand Up @@ -144,16 +171,21 @@ export class S3Storage extends BaseStorage<S3File> {
Bucket: this.bucket,
Key: file.name,
ContentType: file.contentType,
Metadata: mapValues(file.metadata, encodeURI)
Metadata: mapValues(file.metadata, encodeURI),
ACL: this.config.acl
};
const { UploadId } = await this.client.send(new CreateMultipartUploadCommand(params));
if (!UploadId) {
return fail(ERRORS.FILE_ERROR, 's3 create multipart upload error');
}
file.UploadId = UploadId;
file.bytesWritten = 0;
if (this.config.clientDirectUpload) {
file.partSize ??= this._partSize;
}
await this.saveMeta(file);
file.status = 'created';
if (this.config.clientDirectUpload) return this.buildPresigned(file);
return file;
}

Expand All @@ -163,11 +195,9 @@ export class S3Storage extends BaseStorage<S3File> {
if (file.status === 'completed') return file;
if (part.size) updateSize(file, part.size);
if (!partMatch(part, file)) return fail(ERRORS.FILE_CONFLICT);
if (this.config.clientDirectUpload) return this.buildPresigned(file);
file.Parts ??= await this._getParts(file);
file.bytesWritten = file.Parts.map(item => item.Size || 0).reduce(
(prev, next) => prev + next,
0
);
file.bytesWritten = file.Parts.map(item => item.Size || 0).reduce((p, c) => p + c, 0);
await this.lock(part.id);
try {
if (hasContent(part)) {
Expand All @@ -186,7 +216,7 @@ export class S3Storage extends BaseStorage<S3File> {
ContentMD5: checksumMD5
};
const abortSignal = new AbortController().signal;
part.body.on('error', err => abortSignal.abort());
part.body.on('error', _err => abortSignal.abort());
const { ETag } = await this.client.send(new UploadPartCommand(params), { abortSignal });
const uploadPart: Part = { PartNumber: partNumber, Size: part.contentLength, ETag };
file.Parts = [...file.Parts, uploadPart];
Expand Down Expand Up @@ -215,11 +245,55 @@ export class S3Storage extends BaseStorage<S3File> {
return [{ id } as S3File];
}

async update({ id }: FileQuery, file: Partial<S3File>): Promise<S3File> {
if (this.config.clientDirectUpload) return this.buildPresigned(file);
return super.update({ id }, file);
}

accessCheck(maxWaitTime = 30): Promise<any> {
return waitUntilBucketExists({ client: this.client, maxWaitTime }, { Bucket: this.bucket });
}

protected _onComplete = (file: S3File): Promise<[CompleteMultipartUploadOutput, any]> => {
private async buildPresigned(file: Partial<S3File>): Promise<S3File> {
file.partSize ??= this._partSize;
if (!file.Parts?.length) {
file.Parts = await this._getParts(file as S3File);
}
if (!file.partsUrls?.length) {
file.partsUrls = await this.getPartsPresignedUrls(file as S3File);
}

if (file.Parts.length === file.partsUrls?.length) {
file.bytesWritten = file.size;
const [completed] = await this._onComplete(file as S3File);
delete file.Parts;
delete file.partsUrls;
file.uri = completed.Location;
}
file.status = getFileStatus(file as File);
return file as S3File;
}

private async getPartsPresignedUrls(file: S3File): Promise<string[]> {
file.partSize ??= this._partSize;
const partsNum = ~~(file.size / this._partSize) + 1;
const promises = [];
const expiresIn = ~~toSeconds(this.config.expiration?.maxAge || '6hrs');
for (let i = 0; i < partsNum; i++) {
const partCommandInput = {
Bucket: this.bucket,
Key: file.name,
UploadId: file.UploadId,
PartNumber: i + 1
};
promises.push(
getSignedUrl(this.client, new UploadPartCommand(partCommandInput), { expiresIn })
);
}
return Promise.all(promises);
}

private _onComplete = (file: S3File): Promise<[CompleteMultipartUploadOutput, any]> => {
return Promise.all([this._completeMultipartUpload(file), this.deleteMeta(file.id)]);
};

Expand Down
51 changes: 51 additions & 0 deletions test/s3-storage.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
jest.mock('@aws-sdk/s3-request-presigner');

import {
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
Expand All @@ -9,6 +11,7 @@ import {
S3Client,
UploadPartCommand
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { mockClient } from 'aws-sdk-client-mock';
import { AWSError, S3Storage, S3StorageOptions } from '../packages/s3/src';
import { authRequest, metafile, storageOptions, testfile } from './shared';
Expand Down Expand Up @@ -151,3 +154,51 @@ describe('S3Storage', () => {
});
});
});

describe('S3PresignedStorage', () => {
const getSignedUrlMock: jest.Mock = getSignedUrl as any;
jest.useFakeTimers().setSystemTime(new Date('2022-02-02'));
const options = { ...(storageOptions as S3StorageOptions), clientDirectUpload: true };

let storage: S3Storage;
const req = authRequest();

beforeEach(async () => {
s3Mock.reset();
storage = new S3Storage(options);
});

describe('.create()', () => {
it('should request api and set status and UploadId', async () => {
s3Mock.on(HeadObjectCommand).rejects();
s3Mock.on(CreateMultipartUploadCommand).resolves({ UploadId: '123456789' });
s3Mock.on(ListPartsCommand).resolves({ Parts: [] });
getSignedUrlMock.mockResolvedValue('https://api.s3.example.com?signed');
const s3file = await storage.create(req, metafile);
expect(s3file.partsUrls?.length).toBe(1);
expect(s3file.partSize).toBeGreaterThan(0);
});
});

describe('update', () => {
it('should add partsUrls', async () => {
s3Mock.on(ListPartsCommand).resolves({ Parts: [] });
getSignedUrlMock.mockResolvedValue('https://api.s3.example.com?signed');
const s3file = await storage.update(metafile, metafile);
expect(s3file.partsUrls?.length).toBe(1);
});

it('should complete', async () => {
const preCompleted = {
...metafile,
Parts: [{ PartNumber: 1, Size: 64, ETag: '123456789' }],
UploadId: '123456789',
partSize: 16777216,
partsUrls: ['https://api.s3.example.com?signed']
};
s3Mock.on(CompleteMultipartUploadCommand).resolves({ Location: '/1234' });
const s3file = await storage.update({ id: metafile.id }, preCompleted);
expect(s3file.status).toBe('completed');
});
});
});
Loading

0 comments on commit 093484c

Please sign in to comment.