Skip to content

Commit

Permalink
feat(source-aws): detect if the file ETag has changed between request…
Browse files Browse the repository at this point in the history
…s and error with 409 conflict.
  • Loading branch information
blacha committed Mar 16, 2023
1 parent 35d76ba commit b6be429
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
2 changes: 2 additions & 0 deletions packages/core/src/composite.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export const enum ErrorCodes {
PermissionDenied = 403,
NotFound = 404,
/** A file conflict was found, generally the source file was modified while reading a chunk of it */
Conflict = 409,
InternalError = 500,
}
/**
Expand Down
55 changes: 42 additions & 13 deletions packages/source-aws/src/s3.source.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ChunkSource, ChunkSourceBase, CompositeError, isRecord, parseUri } from '@chunkd/core';
import { S3Like, toPromise } from './type.js';
import { ChunkSource, ChunkSourceBase, CompositeError, ErrorCodes, isRecord, parseUri } from '@chunkd/core';
import { HeadRes, S3Like, toPromise } from './type.js';

export function getCompositeError(e: unknown, msg: string): CompositeError {
if (!isRecord(e)) return new CompositeError(msg, 500, e);
Expand Down Expand Up @@ -45,21 +45,32 @@ export class SourceAwsS3 extends ChunkSourceBase {
return source.type === SourceAwsS3.type;
}

_size: Promise<number> | undefined;
/** Either use the last request or a dedicated head request */
private _headRequestSync: HeadRes | undefined;
private _headRequest: Promise<HeadRes> | undefined;
get head(): Promise<HeadRes> {
if (this._headRequest == null) {
this._headRequest = toPromise(this.remote.headObject({ Bucket: this.bucket, Key: this.key }));
this._headRequest.then((hr) => (this._headRequestSync = hr));
}
return this._headRequest;
}

/** Read the content length from the last request */
get size(): Promise<number> {
if (this._size) return this._size;
this._size = Promise.resolve().then(async () => {
const res = await toPromise(this.remote.headObject({ Bucket: this.bucket, Key: this.key }));
return res.ContentLength || -1;
});
return this._size;
return this.head.then((f) => f.ContentLength ?? -1);
}

/** Read the last response ETag if it exists */
get etag(): Promise<string | null> {
return this.head.then((f) => f.ETag ?? '');
}

/**
* Parse a URI and create a source
*
* @example
* ```
* ```typescript
* fromUri('s3://foo/bar/baz.tiff')
* ```
*
Expand All @@ -78,9 +89,27 @@ export class SourceAwsS3 extends ChunkSourceBase {
const resp = await this.remote.getObject({ Bucket: this.bucket, Key: this.key, Range: fetchRange }).promise();
if (!Buffer.isBuffer(resp.Body)) throw new Error('Failed to fetch object, Body is not a buffer');

// Set the size of this object now that we know how big it is
if (resp.ContentRange != null && this._size == null) {
this._size = Promise.resolve(this.parseContentRange(resp.ContentRange));
if (this._headRequest == null) {
const headReq: HeadRes = {};
// Set the size of this object now that we know how big it is
if (resp.ContentRange != null) headReq.ContentLength = this.parseContentRange(resp.ContentRange);
if (resp.ETag) headReq.ETag = resp.ETag;
if (resp.LastModified) headReq.LastModified = resp.LastModified;

this._headRequest = Promise.resolve(headReq);
this._headRequestSync = headReq;
}
console.log(resp, this._headRequestSync);

const lastEtag = this._headRequestSync?.ETag;

// If the file has been modified since the last time we requested data this can cause conflicts so error out
if (lastEtag && lastEtag !== resp.ETag) {
throw new CompositeError(
`ETag conflict ${this.name} ${fetchRange} expected: ${lastEtag} got: ${resp.ETag}`,
ErrorCodes.Conflict,
undefined,
);
}

const buffer = resp.Body;
Expand Down
4 changes: 2 additions & 2 deletions packages/source-aws/src/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type DeleteObjectReq = Location;
export type DeleteObjectRes = { DeleteMarker?: boolean };

export type GetObjectReq = Location & { Range?: string };
export type GetObjectRes = { Body?: Buffer | unknown; ContentRange?: string };
export type GetObjectRes = { Body?: Buffer | unknown; ContentRange?: string; ETag?: string; LastModified?: Date };

export type UploadReq = Location & {
Body?: Buffer | string | Readable;
Expand All @@ -34,7 +34,7 @@ export type ListRes = {
};

export type HeadReq = Location;
export type HeadRes = { ContentLength?: number };
export type HeadRes = { ContentLength?: number; ETag?: string; LastModified?: Date };

/** Minimal typing for a s3 like interface to make it easier to work across aws-sdk versions */
export interface S3Like {
Expand Down

0 comments on commit b6be429

Please sign in to comment.