Skip to content

Commit

Permalink
fix: ensure file systems with matching roleArns are registered correc…
Browse files Browse the repository at this point in the history
…tly TDE-1268 (#1092)

#### Motivation

file systems are only created for unique roleArns so FileSystemCreated
event is not fired twice if two configuration objects have the same
roleArn,

give a confiugration with two buckets both using `roleA` on a service
that has a default role `roleDefault`
```
s3://foo/ - roleA
s3://bar/ - roleA
```

requests to 

```typescript
fs.read("s3://foo"); // tries roleDefault then uses roleA
fs.read("s3://foo"); // uses cached roleA
```

depending on the order of reads the default role may be used far too
often

```typescript
fs.read("s3://foo") // tries roleDefault then uses roleA
fs.read("s3://bar") // tries roleDefault then uses roleA
fs.read("s3://bar") // tries roleDefault then uses roleA
```

after this change

```typescript
fs.read("s3://foo") // tries roleDefault then uses roleA
fs.read("s3://bar") // tries roleDefault then uses roleA
fs.read("s3://bar") // uses cached roleA
```


#### Modification

hook the file system finder when it needs to find a new file system use
that file system to register onto `fsa`

#### Checklist

_If not applicable, provide explanation of why._

- [ ] Tests updated
- [ ] Docs updated
- [ ] Issue linked in Title
  • Loading branch information
blacha authored Oct 16, 2024
1 parent bc09b74 commit e004506
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 22 deletions.
146 changes: 146 additions & 0 deletions src/__test__/fs.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { after, before, beforeEach, describe, it } from 'node:test';

import { CompositeError } from '@chunkd/core';
import { FileSystemAbstraction, fsa } from '@chunkd/fs';
import { FsAwsS3 } from '@chunkd/source-aws';
import { S3LikeV3 } from '@chunkd/source-aws-v3';
import { FsMemory } from '@chunkd/source-memory';
import { InitializeMiddleware, MetadataBearer } from '@smithy/types';
import assert from 'assert';

import { registerFileSystem } from '../fs.register.js';

export class HttpError extends Error {
statusCode: number;
constructor(code: number, msg: string) {
super(msg);
this.statusCode = code;
}
}

describe('Register', () => {
const seenBuckets = new Set();
const throw403: InitializeMiddleware<object, MetadataBearer> = () => {
return async (args: unknown) => {
const inp = args as { input: { Bucket?: string } };
const bucket = inp.input.Bucket;
if (seenBuckets.has(bucket)) throw new HttpError(418, `Bucket: ${bucket} read multiple`);
seenBuckets.add(bucket);
throw new HttpError(403, 'Something');
};
};
const throw403Init = {
name: 'throw403',
step: 'initialize',
priority: 'high',
} as const;

/** find all the file systems related to s3:// */
const fsSystemsPath = (): string[] => {
fsa.get('s3://', 'r'); // ensure systems' array is sorted
return fsa.systems.filter((f) => f.path.startsWith('s3://')).map((f) => f.path);
};

// Because these tests modify the singleton "fsa" backup the starting systems then restore them
// after all the tests are finished
const oldSystems: FileSystemAbstraction['systems'] = [];
before(() => oldSystems.push(...fsa.systems));
after(() => (fsa.systems = oldSystems));

beforeEach(async () => {
fsa.systems.length = 0;

seenBuckets.clear();

const fsMem = new FsMemory();

fsa.register('memory://', fsMem);
const config = {
prefixes: [
// `_` is not a valid bucket name
{ type: 's3', prefix: 's3://_linz-topographic/', roleArn: 'a' },
{ type: 's3', prefix: 's3://_linz-topographic-upload/', roleArn: 'a' },
],
v: 2,
};
await fsa.write('memory://config.json', JSON.stringify(config));
});

it('should add both middleware', async () => {
const s3Fs = registerFileSystem({ config: 'memory://config.json' });
await s3Fs.credentials.find('s3://_linz-topographic/foo.json');

const fileSystems = [...s3Fs.credentials.fileSystems.values()];
assert.equal(fileSystems.length, 1);
const newFs = fileSystems[0]!.s3 as S3LikeV3;
assert.equal(
newFs.client.middlewareStack.identify().find((f) => f.startsWith('FQDN -')),
'FQDN - finalizeRequest',
);
assert.equal(
newFs.client.middlewareStack.identify().find((f) => f.startsWith('EAI_AGAIN -')),
'EAI_AGAIN - build',
);
});

it('should not duplicate middleware', async () => {
const s3Fs = registerFileSystem({ config: 'memory://config.json' });
assert.equal(
s3Fs.client.middlewareStack.identify().find((f) => f.startsWith('FQDN -')),
'FQDN - finalizeRequest',
);

await s3Fs.credentials.find('s3://_linz-topographic/foo.json');
await s3Fs.credentials.find('s3://_linz-topographic-upload/foo.json');

const fileSystems = [...s3Fs.credentials.fileSystems.values()];
assert.equal(fileSystems.length, 1);
const newFs = fileSystems[0]!.s3 as S3LikeV3;

assert.deepEqual(
newFs.client.middlewareStack.identify().filter((f) => f.startsWith('FQDN -')),
['FQDN - finalizeRequest'],
);
});

it('should register on 403', async () => {
assert.equal(fsa.systems.length, 1);
const s3Fs = registerFileSystem({ config: 'memory://config.json' });
s3Fs.client.middlewareStack.add(throw403, throw403Init);
assert.deepEqual(fsSystemsPath(), ['s3://']);

s3Fs.credentials.onFileSystemCreated = (_ac, fs): void => {
const fsS3 = fs as FsAwsS3;
const s3 = fsS3.s3 as S3LikeV3;
s3.client.middlewareStack.add(throw403);
};

const ret = await fsa.read('s3://_linz-topographic/foo.json').catch((e: Error) => e);
assert.equal(String(ret), 'CompositeError: Failed to read: "s3://_linz-topographic/foo.json"');
assert.equal(CompositeError.isCompositeError(ret), true);
const ce = ret as CompositeError;
assert.equal(ce.code, 418);
});

it('should register all buckets', async (t) => {
assert.equal(fsa.systems.length, 1);
const s3Fs = registerFileSystem({ config: 'memory://config.json' });

// All requests to s3 will error with http 403
s3Fs.client.middlewareStack.add(throw403, throw403Init);

const fakeTopo = new FsMemory();
await fakeTopo.write('s3://_linz-topographic/foo.json', 's3://_linz-topographic/foo.json');
t.mock.method(s3Fs.credentials, 'createFileSystem', () => fakeTopo);

assert.deepEqual(fsSystemsPath(), ['s3://']);

const ret = await fsa.read('s3://_linz-topographic/foo.json');

assert.equal(String(ret), 's3://_linz-topographic/foo.json');
assert.deepEqual(fsSystemsPath(), ['s3://_linz-topographic/', 's3://']);

await fsa.exists('s3://_linz-topographic-upload/foo.json');
assert.deepEqual(fsSystemsPath(), ['s3://_linz-topographic-upload/', 's3://_linz-topographic/', 's3://']);
});
});
77 changes: 55 additions & 22 deletions src/fs.register.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { setTimeout } from 'node:timers/promises';

import { S3Client } from '@aws-sdk/client-s3';
import { FileSystem } from '@chunkd/core';
import { fsa } from '@chunkd/fs';
import { AwsCredentialConfig, FsAwsS3 } from '@chunkd/source-aws';
import { FsAwsS3 } from '@chunkd/source-aws';
import { FsAwsS3V3, S3LikeV3 } from '@chunkd/source-aws-v3';
import { BuildMiddleware, FinalizeRequestMiddleware, MetadataBearer } from '@smithy/types';

Expand Down Expand Up @@ -63,40 +62,74 @@ export function eaiAgainBuilder(timeout: (attempt: number) => number): BuildMidd
return eaiAgain;
}

const client = new S3Client();
export const s3Fs = new FsAwsS3V3(client);
client.middlewareStack.add(
eaiAgainBuilder((attempt: number) => 100 + attempt * 1000),
{ name: 'EAI_AGAIN', step: 'build' },
);
client.middlewareStack.add(fqdn, { name: 'FQDN', step: 'finalizeRequest' });
/**
* When a new AWS file system is created copy across the middleware, but only if the middleware does not exist
*
* @param fsClient Filesystem to setup
*/
export function setupS3FileSystem(fsClient: FsAwsS3V3): void {
addMiddlewareToS3Client(fsClient.client);

FsAwsS3.MaxListCount = 1000;
s3Fs.credentials.onFileSystemCreated = (acc: AwsCredentialConfig, fs: FileSystem): void => {
logger.debug({ prefix: acc.prefix, roleArn: acc.roleArn }, 'FileSystem:Register');

if (fs.protocol === 's3') {
// TODO this cast can be removed once chunkd is upgraded
const fsS3 = fs as FsAwsS3V3;
const fsClient = fsS3.s3 as S3LikeV3;
fsClient.client.middlewareStack.add(fqdn, { name: 'FQDN', step: 'finalizeRequest' });
if (fsClient.credentials == null) return;
const oldFind = fsClient.credentials.find.bind(fsClient.credentials);
// When file systems are looked up ensure they are registered into `fsa`
fsClient.credentials.find = async (path: string): Promise<FsAwsS3 | null> => {
const accountConfig = await fsClient.credentials.findCredentials(path);
if (accountConfig == null) return null;

const fileSystem = await oldFind(path);
if (fileSystem == null) return null;

logger.debug({ prefix: path, roleArn: accountConfig.roleArn }, 'FileSystem:Register');
fsa.register(accountConfig.prefix, fileSystem);
if (fileSystem.s3 != null && 'client' in fileSystem.s3) {
addMiddlewareToS3Client((fileSystem.s3 as S3LikeV3).client);
}
return fileSystem;
};
}

/**
* ensure the FQDN and EAI_AGAIN Middleware exist on a s3 client
*
* @param client
*/
export function addMiddlewareToS3Client(client: S3Client): void {
// There doesnt appear to be a has or find, so the only option is to list all middleware
// which returns a list in a format: "FQDN - finalizeRequest"
const middleware = client.middlewareStack.identify();

if (middleware.find((f) => f.startsWith('FQDN ')) == null) {
client.middlewareStack.add(fqdn, { name: 'FQDN', step: 'finalizeRequest' });
}

fsa.register(acc.prefix, fs);
};
if (middleware.find((f) => f.startsWith('EAI_AGAIN ')) == null) {
client.middlewareStack.add(
eaiAgainBuilder((attempt: number) => 100 + attempt * 1000),
{ name: 'EAI_AGAIN', step: 'build' },
);
}
}

FsAwsS3.MaxListCount = 1000;

function splitConfig(x: string): string[] {
if (x.startsWith('[')) return JSON.parse(x) as string[];
return x.split(',');
}

export function registerFileSystem(opts: { config?: string }): void {
export function registerFileSystem(opts: { config?: string }): FsAwsS3V3 {
const s3Fs = new FsAwsS3V3(new S3Client());
setupS3FileSystem(s3Fs);

fsa.register('s3://', s3Fs);

const configPath = opts.config ?? process.env['AWS_ROLE_CONFIG_PATH'];
if (configPath == null || configPath === '') return;
if (configPath == null || configPath === '') return s3Fs;

const paths = splitConfig(configPath);

for (const path of paths) s3Fs.credentials.registerConfig(path, fsa);

return s3Fs;
}

0 comments on commit e004506

Please sign in to comment.