Skip to content

Commit

Permalink
feat(indexing): contentType check instead of GLOB match
Browse files Browse the repository at this point in the history
  • Loading branch information
sperka authored and JeremyJonas committed Nov 2, 2023
1 parent e6262a1 commit b3032ab
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 17 deletions.
7 changes: 6 additions & 1 deletion demo/corpus/logic/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ export interface IProcessEnv {
PROCESSING_INPUT_LOCAL_PATH?: string;
/** Glob pattern(s) to index in source bucket (CSV) */
INDEXING_GLOB?: string;
/**
* Comma-separated list of supported 'Content-Type' values to index from source bucket.
*/
INDEXING_SUPPORTED_CONTENT_TYPES?: string;
/** Indicates if delta check is skipped, which is the per file last indexed checking */
INDEXING_SKIP_DELTA_CHECK?: string;

Expand Down Expand Up @@ -76,7 +80,8 @@ export namespace ENV {
export const CHUNK_OVERLAP = parseInt(process.env.CHUNK_OVERLAP || '200');
export const VECTOR_INDEX_LISTS = parseInt(process.env.VECTOR_INDEX_LISTS || '1000');
export const PROCESSING_INPUT_LOCAL_PATH = process.env.PROCESSING_INPUT_LOCAL_PATH || '/opt/ml/processing/input_data';
export const INDEXING_GLOB = process.env.INDEXING_GLOB || '**/*.txt';
export const INDEXING_GLOB = process.env.INDEXING_GLOB || '**/*.*';
export const INDEXING_SUPPORTED_CONTENT_TYPES = process.env.INDEXING_SUPPORTED_CONTENT_TYPES || 'text/plain';

export const EMBEDDING_TABLENAME = normalizePostgresTableName(
`${EMBEDDING_SENTENCE_TRANSFORMER_MODEL}_${VECTOR_SIZE}`,
Expand Down
33 changes: 22 additions & 11 deletions demo/corpus/logic/src/indexing/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from '@aws-sdk/lib-dynamodb';
import * as async from 'async';
import { chunkArray } from './utils';
import { ENV } from '../env';
import { measurable } from '../metrics';

const logger = getLogger('indexing/datastore');
Expand Down Expand Up @@ -59,6 +60,8 @@ export class IndexingCache {
protected lastIndexedMap: Map<string, Date> = new Map();
protected modelLastExecuted?: Date;

private supportedContentTypes: string[];

get entityCount(): number {
return this.entities.size;
}
Expand All @@ -72,6 +75,9 @@ export class IndexingCache {
this.ddbClient = new DynamoDBClient(ddbClientConfig ?? {});
this.ddbDocClient = DynamoDBDocumentClient.from(this.ddbClient);
this.s3Client = new S3Client(s3ClientConfig ?? {});

// remove spaces and split by comma
this.supportedContentTypes = ENV.INDEXING_SUPPORTED_CONTENT_TYPES.replace(' ', '').split(',');
}

formatSourceLocation(key: string): string {
Expand Down Expand Up @@ -218,17 +224,22 @@ export class IndexingCache {
}),
);

const entity: IndexEntity = {
objectKey: _objectKey,
localPath: path.join(this.baseLocalPath, _objectKey),
sourceLocation,
metadata: normalizeMetadata(response.Metadata),
lastModified: response.LastModified ?? new Date(),
};

this.entities.set(sourceLocation, entity);

return entity;
if (response.ContentType && this.supportedContentTypes.includes(response.ContentType)) {
const entity: IndexEntity = {
objectKey: _objectKey,
localPath: path.join(this.baseLocalPath, _objectKey),
sourceLocation,
metadata: normalizeMetadata(response.Metadata),
lastModified: response.LastModified ?? new Date(),
};

this.entities.set(sourceLocation, entity);

return;
} else {
logger.warn(`${sourceLocation}'s Content-Type (${response.ContentType} not supported. Skipping...)`);
return;
}
} catch (error) {
logger.warn(`Failed to resolve S3 object key: "${sourceLocation}"`, error as Error);
throw error;
Expand Down
9 changes: 4 additions & 5 deletions demo/corpus/logic/test/datastore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ describe('metadata', () => {
// console.log(base64EncodedValue);

const metadata = {
'key': 'value',
'json-base64':
'eyJjaMOsYSBraMOzYSI6Imdpw6EgdHLhu4siLCJnacOhIHRy4buLIjoiY2jDrGEga2jDs2EifQ==', // base64EncodedValue,
key: 'value',
'json-base64': 'eyJjaMOsYSBraMOzYSI6Imdpw6EgdHLhu4siLCJnacOhIHRy4buLIjoiY2jDrGEga2jDs2EifQ==', // base64EncodedValue,
};

const expectedResult = {
'key': 'value',
key: 'value',
'chìa khóa': 'giá trị',
'giá trị': 'chìa khóa',
};
Expand All @@ -45,7 +44,7 @@ describe('metadata', () => {

test('normalizeMetadata - with wrong base64 field', () => {
const metadata = {
'key': 'value',
key: 'value',
'json-base64': 'wrongly-encoded-value',
};

Expand Down
33 changes: 33 additions & 0 deletions demo/corpus/logic/test/indexing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ describe('indexing', () => {
a: 'a',
b: 'b',
},
ContentType: 'text/plain',
} as HeadObjectCommandOutput);

dynamoDBDocumentMock.on(GetCommand).resolves({});
Expand Down Expand Up @@ -102,6 +103,7 @@ describe('indexing', () => {
a: 'a',
b: 'b',
},
ContentType: 'text/plain',
} as HeadObjectCommandOutput);

// model last indexed
Expand Down Expand Up @@ -148,6 +150,7 @@ describe('indexing', () => {
a: 'a',
b: 'b',
},
ContentType: 'text/plain',
} as HeadObjectCommandOutput);
});

Expand Down Expand Up @@ -178,4 +181,34 @@ describe('indexing', () => {
const count = await require('../src/indexing').main();
expect(count).toBe(Object.keys(inputFiles).length / 2);
});

test('content-type filtered', async () => {
const _s3Head = s3Mock.on(HeadObjectCommand);
Object.keys(inputFiles).forEach((_, i) => {
_s3Head.resolvesOnce({
$metadata: {},
LastModified: new Date(),
Metadata: {
a: 'a',
b: 'b',
},
ContentType: i % 2 == 0 ? 'text/plain' : 'application/octet-stream',
} as HeadObjectCommandOutput);
});

dynamoDBDocumentMock.on(GetCommand).resolves({});
dynamoDBDocumentMock.on(PutCommand).resolves({});

dynamoDBDocumentMock.on(BatchGetCommand).resolves({
$metadata: {} as any,
Responses: {
[INDEXING_CACHE_TABLE]: [],
},
} as BatchGetCommandOutput);

dynamoDBDocumentMock.on(BatchWriteCommand).resolves({});

const count = await require('../src/indexing').main();
expect(count).toBe(Object.keys(inputFiles).length / 2);
});
});

0 comments on commit b3032ab

Please sign in to comment.