Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3): clientDirectUpload option #638

Merged
merged 2 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"gcs:direct": "tsnd -r tsconfig-paths/register -r dotenv/config gcs-direct",
"gcs": "tsnd -r tsconfig-paths/register -r dotenv/config express-gcs",
"s3": "tsnd -r tsconfig-paths/register -r dotenv/config express-s3",
"s3:direct": "tsnd -r tsconfig-paths/register -r dotenv/config s3-direct",
"tus": "tsnd -r tsconfig-paths/register -r dotenv/config express-tus",
"validation": "tsnd -r tsconfig-paths/register -r dotenv/config validation",
"plain-nodejs": "tsnd node-http-server",
Expand Down
31 changes: 31 additions & 0 deletions examples/s3-direct.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import * as express from 'express';
import { LogLevel, uploadx } from '@uploadx/core';
import { S3Storage } from '@uploadx/s3';

const PORT = process.env.PORT || 3002;
const app = express();

// const storage = new S3Storage({
// bucket: <YOUR_BUCKET>,
// endpoint: <YOUR_ENDPOINT>,
// region: <YOUR_REGION>,
// credentials: {
// accessKeyId: <YOUR_ACCESS_KEY_ID>,
// secretAccessKey: <YOUR_SECRET_ACCESS_KEY>
// },
// metaStorageConfig: { directory: 'upload' }
// });

// The credentials are loaded from a shared credentials file
const storage = new S3Storage({
bucket: 'node-uploadx',
endpoint: 'http://127.0.0.1:9000',
forcePathStyle: true,
clientDirectUpload: true,
expiration: { maxAge: '1h', purgeInterval: '15min' },
logLevel: <LogLevel>process.env.LOG_LEVEL || 'info'
});

app.use('/files', uploadx({ storage }));

app.listen(PORT, () => console.log('Listening on port:', PORT));
1 change: 1 addition & 0 deletions packages/core/src/handlers/uploadx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export class Uploadx<TFile extends UploadxFile> extends BaseHandler<TFile> {
const file = await this.storage.update({ id }, metadata);
const headers = this.buildHeaders(file, { Location: this.buildFileUrl(req, file) });
setHeaders(res, headers);
if (file.status === 'completed') return file;
const response = await this.storage.onUpdate(file);
this.send(res, response);
return file;
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/utils/primitives.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ export function toMilliseconds(value: string | number | undefined): number | nul
if (!value) return null;
return duration(value);
}
/**
* Convert a human-readable duration to seconds
*/
export function toSeconds(value: string | number): number {
if (isNumber(value)) return value;
return duration(value, 'sec');
}

/**
* Returns a first element of an array
Expand Down
3 changes: 2 additions & 1 deletion packages/s3/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"dependencies": {
"@aws-sdk/abort-controller": "^3.47.0",
"@aws-sdk/client-s3": "^3.47.0",
"@aws-sdk/credential-providers": "^3.47.0"
"@aws-sdk/credential-providers": "^3.47.0",
"@aws-sdk/s3-request-presigner": "^3.47.0"

},
"devDependencies": {
Expand Down
90 changes: 82 additions & 8 deletions packages/s3/src/s3-storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as bytes from 'bytes';
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
Expand All @@ -12,6 +13,7 @@ import {
} from '@aws-sdk/client-s3';
import { AbortController } from '@aws-sdk/abort-controller';
import { fromIni } from '@aws-sdk/credential-providers';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import {
BaseStorage,
BaseStorageOptions,
Expand All @@ -29,18 +31,23 @@ import {
mapValues,
MetaStorage,
partMatch,
toSeconds,
updateSize
} from '@uploadx/core';
import * as http from 'http';
import { AWSError } from './aws-error';
import { S3MetaStorage, S3MetaStorageOptions } from './s3-meta-storage';

const BUCKET_NAME = 'node-uploadx';
const MIN_PART_SIZE = 5 * 1024 * 1024;
const PART_SIZE = 16 * 1024 * 1024;

export class S3File extends File {
Parts?: Part[];
UploadId = '';
uri?: string;
partsUrls?: string[];
partSize?: number;
kukhariev marked this conversation as resolved.
Show resolved Hide resolved
}

export type S3StorageOptions = BaseStorageOptions<S3File> &
Expand All @@ -50,7 +57,15 @@ export type S3StorageOptions = BaseStorageOptions<S3File> &
* @defaultValue 'node-uploadx'
*/
bucket?: string;
keyFile?: string;
/**
* Specifying access rules for uploaded files
*/
acl?: 'private' | 'public-read' | string;
/**
* Force compatible client upload directly to S3 storage
*/
clientDirectUpload?: boolean;
partSize?: number | string;
/**
* Configure metafiles storage
* @example
Expand All @@ -72,6 +87,10 @@ export type S3StorageOptions = BaseStorageOptions<S3File> &
* ```
*/
metaStorageConfig?: LocalMetaStorageOptions | S3MetaStorageOptions;
/**
* @deprecated Use standard auth providers
*/
keyFile?: string;
};

/**
Expand All @@ -95,12 +114,20 @@ export class S3Storage extends BaseStorage<S3File> {
client: S3Client;
meta: MetaStorage<S3File>;
checksumTypes = ['md5'];
private readonly _partSize = PART_SIZE;

constructor(public config: S3StorageOptions) {
super(config);
this.bucket = config.bucket || process.env.S3_BUCKET || BUCKET_NAME;
const keyFile = config.keyFile || process.env.S3_KEYFILE;
keyFile && (config.credentials = fromIni({ configFilepath: keyFile }));
this._partSize = bytes.parse(this.config.partSize || PART_SIZE);
if (this._partSize < MIN_PART_SIZE) {
throw new Error('Minimum allowed partSize value is 5MB');
}
if (this.config.clientDirectUpload) {
this.onCreate = async file => ({ body: file }); // TODO: remove hook
}
this.client = new S3Client(config);
if (config.metaStorage) {
this.meta = config.metaStorage;
Expand Down Expand Up @@ -144,16 +171,21 @@ export class S3Storage extends BaseStorage<S3File> {
Bucket: this.bucket,
Key: file.name,
ContentType: file.contentType,
Metadata: mapValues(file.metadata, encodeURI)
Metadata: mapValues(file.metadata, encodeURI),
ACL: this.config.acl
};
const { UploadId } = await this.client.send(new CreateMultipartUploadCommand(params));
if (!UploadId) {
return fail(ERRORS.FILE_ERROR, 's3 create multipart upload error');
}
file.UploadId = UploadId;
file.bytesWritten = 0;
if (this.config.clientDirectUpload) {
file.partSize ??= this._partSize;
}
await this.saveMeta(file);
file.status = 'created';
if (this.config.clientDirectUpload) return this.buildPresigned(file);
return file;
}

Expand All @@ -163,11 +195,9 @@ export class S3Storage extends BaseStorage<S3File> {
if (file.status === 'completed') return file;
if (part.size) updateSize(file, part.size);
if (!partMatch(part, file)) return fail(ERRORS.FILE_CONFLICT);
if (this.config.clientDirectUpload) return this.buildPresigned(file);
file.Parts ??= await this._getParts(file);
file.bytesWritten = file.Parts.map(item => item.Size || 0).reduce(
(prev, next) => prev + next,
0
);
file.bytesWritten = file.Parts.map(item => item.Size || 0).reduce((p, c) => p + c, 0);
await this.lock(part.id);
try {
if (hasContent(part)) {
Expand All @@ -186,7 +216,7 @@ export class S3Storage extends BaseStorage<S3File> {
ContentMD5: checksumMD5
};
const abortSignal = new AbortController().signal;
part.body.on('error', err => abortSignal.abort());
part.body.on('error', _err => abortSignal.abort());
const { ETag } = await this.client.send(new UploadPartCommand(params), { abortSignal });
const uploadPart: Part = { PartNumber: partNumber, Size: part.contentLength, ETag };
file.Parts = [...file.Parts, uploadPart];
Expand Down Expand Up @@ -215,11 +245,55 @@ export class S3Storage extends BaseStorage<S3File> {
return [{ id } as S3File];
}

async update({ id }: FileQuery, file: Partial<S3File>): Promise<S3File> {
if (this.config.clientDirectUpload) return this.buildPresigned(file);
return super.update({ id }, file);
}

accessCheck(maxWaitTime = 30): Promise<any> {
return waitUntilBucketExists({ client: this.client, maxWaitTime }, { Bucket: this.bucket });
}

protected _onComplete = (file: S3File): Promise<[CompleteMultipartUploadOutput, any]> => {
private async buildPresigned(file: Partial<S3File>): Promise<S3File> {
file.partSize ??= this._partSize;
if (!file.Parts?.length) {
file.Parts = await this._getParts(file as S3File);
}
if (!file.partsUrls?.length) {
file.partsUrls = await this.getPartsPresignedUrls(file as S3File);
}

if (file.Parts.length === file.partsUrls?.length) {
file.bytesWritten = file.size;
const [completed] = await this._onComplete(file as S3File);
delete file.Parts;
delete file.partsUrls;
file.uri = completed.Location;
}
file.status = getFileStatus(file as File);
return file as S3File;
}

private async getPartsPresignedUrls(file: S3File): Promise<string[]> {
file.partSize ??= this._partSize;
const partsNum = ~~(file.size / this._partSize) + 1;
const promises = [];
const expiresIn = ~~toSeconds(this.config.expiration?.maxAge || '6hrs');
for (let i = 0; i < partsNum; i++) {
const partCommandInput = {
Bucket: this.bucket,
Key: file.name,
UploadId: file.UploadId,
PartNumber: i + 1
};
promises.push(
getSignedUrl(this.client, new UploadPartCommand(partCommandInput), { expiresIn })
);
}
return Promise.all(promises);
}

private _onComplete = (file: S3File): Promise<[CompleteMultipartUploadOutput, any]> => {
return Promise.all([this._completeMultipartUpload(file), this.deleteMeta(file.id)]);
};

Expand Down
51 changes: 51 additions & 0 deletions test/s3-storage.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
jest.mock('@aws-sdk/s3-request-presigner');

import {
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
Expand All @@ -9,6 +11,7 @@ import {
S3Client,
UploadPartCommand
} from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { mockClient } from 'aws-sdk-client-mock';
import { AWSError, S3Storage, S3StorageOptions } from '../packages/s3/src';
import { authRequest, metafile, storageOptions, testfile } from './shared';
Expand Down Expand Up @@ -151,3 +154,51 @@ describe('S3Storage', () => {
});
});
});

describe('S3PresignedStorage', () => {
const getSignedUrlMock: jest.Mock = getSignedUrl as any;
jest.useFakeTimers().setSystemTime(new Date('2022-02-02'));
const options = { ...(storageOptions as S3StorageOptions), clientDirectUpload: true };

let storage: S3Storage;
const req = authRequest();

beforeEach(async () => {
s3Mock.reset();
storage = new S3Storage(options);
});

describe('.create()', () => {
it('should request api and set status and UploadId', async () => {
s3Mock.on(HeadObjectCommand).rejects();
s3Mock.on(CreateMultipartUploadCommand).resolves({ UploadId: '123456789' });
s3Mock.on(ListPartsCommand).resolves({ Parts: [] });
getSignedUrlMock.mockResolvedValue('https://api.s3.example.com?signed');
const s3file = await storage.create(req, metafile);
expect(s3file.partsUrls?.length).toBe(1);
expect(s3file.partSize).toBeGreaterThan(0);
});
});

describe('update', () => {
it('should add partsUrls', async () => {
s3Mock.on(ListPartsCommand).resolves({ Parts: [] });
getSignedUrlMock.mockResolvedValue('https://api.s3.example.com?signed');
const s3file = await storage.update(metafile, metafile);
expect(s3file.partsUrls?.length).toBe(1);
});

it('should complete', async () => {
const preCompleted = {
...metafile,
Parts: [{ PartNumber: 1, Size: 64, ETag: '123456789' }],
UploadId: '123456789',
partSize: 16777216,
partsUrls: ['https://api.s3.example.com?signed']
};
s3Mock.on(CompleteMultipartUploadCommand).resolves({ Location: '/1234' });
const s3file = await storage.update({ id: metafile.id }, preCompleted);
expect(s3file.status).toBe('completed');
});
});
});
Loading