diff --git a/packages/core/src/composite.ts b/packages/core/src/composite.ts index 9cb659f0..f1ae11dc 100644 --- a/packages/core/src/composite.ts +++ b/packages/core/src/composite.ts @@ -1,6 +1,8 @@ export const enum ErrorCodes { PermissionDenied = 403, NotFound = 404, + /** A file conflict was found, generally the source file was modified while reading a chunk of it */ + Conflict = 409, InternalError = 500, } /** diff --git a/packages/source-aws/src/s3.source.ts b/packages/source-aws/src/s3.source.ts index 27a6d249..bd9f86de 100644 --- a/packages/source-aws/src/s3.source.ts +++ b/packages/source-aws/src/s3.source.ts @@ -1,5 +1,5 @@ -import { ChunkSource, ChunkSourceBase, CompositeError, isRecord, parseUri } from '@chunkd/core'; -import { S3Like, toPromise } from './type.js'; +import { ChunkSource, ChunkSourceBase, CompositeError, ErrorCodes, isRecord, parseUri } from '@chunkd/core'; +import { HeadRes, S3Like, toPromise } from './type.js'; export function getCompositeError(e: unknown, msg: string): CompositeError { if (!isRecord(e)) return new CompositeError(msg, 500, e); @@ -45,21 +45,32 @@ export class SourceAwsS3 extends ChunkSourceBase { return source.type === SourceAwsS3.type; } - _size: Promise | undefined; + /** Either use the last request or a dedicated head request */ + private _headRequestSync: HeadRes | undefined; + private _headRequest: Promise | undefined; + get head(): Promise { + if (this._headRequest == null) { + this._headRequest = toPromise(this.remote.headObject({ Bucket: this.bucket, Key: this.key })); + this._headRequest.then((hr) => (this._headRequestSync = hr)); + } + return this._headRequest; + } + + /** Read the content length from the last request */ get size(): Promise { - if (this._size) return this._size; - this._size = Promise.resolve().then(async () => { - const res = await toPromise(this.remote.headObject({ Bucket: this.bucket, Key: this.key })); - return res.ContentLength || -1; - }); - return this._size; + return this.head.then((f) => f.ContentLength ?? -1); + } + + /** Read the last response ETag if it exists */ + get etag(): Promise { + return this.head.then((f) => f.ETag ?? ''); } /** * Parse a URI and create a source * * @example - * ``` + * ```typescript * fromUri('s3://foo/bar/baz.tiff') * ``` * @@ -78,9 +89,27 @@ export class SourceAwsS3 extends ChunkSourceBase { const resp = await this.remote.getObject({ Bucket: this.bucket, Key: this.key, Range: fetchRange }).promise(); if (!Buffer.isBuffer(resp.Body)) throw new Error('Failed to fetch object, Body is not a buffer'); - // Set the size of this object now that we know how big it is - if (resp.ContentRange != null && this._size == null) { - this._size = Promise.resolve(this.parseContentRange(resp.ContentRange)); + if (this._headRequest == null) { + const headReq: HeadRes = {}; + // Set the size of this object now that we know how big it is + if (resp.ContentRange != null) headReq.ContentLength = this.parseContentRange(resp.ContentRange); + if (resp.ETag) headReq.ETag = resp.ETag; + if (resp.LastModified) headReq.LastModified = resp.LastModified; + + this._headRequest = Promise.resolve(headReq); + this._headRequestSync = headReq; + } + console.log(resp, this._headRequestSync); + + const lastEtag = this._headRequestSync?.ETag; + + // If the file has been modified since the last time we requested data this can cause conflicts so error out + if (lastEtag && lastEtag !== resp.ETag) { + throw new CompositeError( + `ETag conflict ${this.name} ${fetchRange} expected: ${lastEtag} got: ${resp.ETag}`, + ErrorCodes.Conflict, + undefined, + ); } const buffer = resp.Body; diff --git a/packages/source-aws/src/type.ts b/packages/source-aws/src/type.ts index cf7107a5..8f26f802 100644 --- a/packages/source-aws/src/type.ts +++ b/packages/source-aws/src/type.ts @@ -15,7 +15,7 @@ export type DeleteObjectReq = Location; export type DeleteObjectRes = { DeleteMarker?: boolean }; export type GetObjectReq = Location & { Range?: string }; -export type GetObjectRes = { Body?: Buffer | unknown; ContentRange?: string }; +export type GetObjectRes = { Body?: Buffer | unknown; ContentRange?: string; ETag?: string; LastModified?: Date }; export type UploadReq = Location & { Body?: Buffer | string | Readable; @@ -34,7 +34,7 @@ export type ListRes = { }; export type HeadReq = Location; -export type HeadRes = { ContentLength?: number }; +export type HeadRes = { ContentLength?: number; ETag?: string; LastModified?: Date }; /** Minimal typing for a s3 like interface to make it easier to work across aws-sdk versions */ export interface S3Like {