Skip to content

Commit

Permalink
feat: Enable Metadata Table Writes (#659)
Browse files Browse the repository at this point in the history
Enable writes of Status and Last Processed Block Height to Metadata
table. Reorganizes provisioning to ensure writing of PROVISIONING
status. Ensures IndexerMeta is available for writing error logs.
  • Loading branch information
darunrs authored Apr 17, 2024
1 parent 09cdcaa commit 492d95c
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 352 deletions.
2 changes: 2 additions & 0 deletions runner/src/indexer-meta/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { default } from './indexer-meta';
export { IndexerStatus, METADATA_TABLE_UPSERT, MetadataFields } from './indexer-meta';
41 changes: 22 additions & 19 deletions runner/src/indexer-meta/indexer-meta.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import IndexerMeta, { IndexerStatus } from './indexer-meta';
import type PgClient from '../pg-client';
import LogEntry, { LogLevel } from './log-entry';
import { type PostgresConnectionParams } from '../pg-client';
import IndexerConfig from '../indexer-config/indexer-config';
import IndexerConfig from '../indexer-config';

describe('IndexerMeta', () => {
let genericMockPgClient: PgClient;
Expand All @@ -17,6 +17,12 @@ describe('IndexerMeta', () => {
} as unknown as PgClient;
});

const accountId = 'some-account';
const functionName = 'some-indexer';

const infoIndexerConfig: IndexerConfig = new IndexerConfig('stream', accountId, functionName, 0, '', '', LogLevel.INFO);
const errorIndexerConfig: IndexerConfig = new IndexerConfig('stream', accountId, functionName, 0, '', '', LogLevel.ERROR);

const mockDatabaseConnectionParameters: PostgresConnectionParams = {
user: 'test_user',
password: 'test_password',
Expand All @@ -25,20 +31,17 @@ describe('IndexerMeta', () => {
database: 'test_database'
};

const indexerConfig = new IndexerConfig('', 'some-account', 'some-indexer', 0, '', '', LogLevel.INFO);
const schemaName = indexerConfig.schemaName();

describe('writeLog', () => {
it('should insert a single log entry into the database', async () => {
const date = new Date();
jest.useFakeTimers({ now: date.getTime() });
const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00');

const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const infoEntry = LogEntry.systemInfo('Info message');
await indexerMeta.writeLogs([infoEntry]);

const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

Expand All @@ -47,24 +50,24 @@ describe('IndexerMeta', () => {
jest.useFakeTimers({ now: date.getTime() });
const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00');

const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const errorEntry = LogEntry.systemError('Error message', 12345);
await indexerMeta.writeLogs([errorEntry]);

const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

it('should handle errors when inserting a single log entry', async () => {
query.mockRejectedValueOnce(new Error('Failed to insert log'));

const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const errorEntry = LogEntry.systemError('Error message', 12345);
await expect(indexerMeta.writeLogs([errorEntry])).rejects.toThrow('Failed to insert log');
});

it('should insert a batch of log entries into the database', async () => {
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');
const infoEntry = LogEntry.systemInfo('Information message');
const logEntries: LogEntry[] = [
Expand All @@ -74,14 +77,14 @@ describe('IndexerMeta', () => {

await indexerMeta.writeLogs(logEntries);

const expectedQuery = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES`;
const expectedQuery = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES`;
expect(query.mock.calls[0][0]).toContain(expectedQuery);
});

it('should handle errors when inserting a batch of log entries', async () => {
query.mockRejectedValueOnce(new Error('Failed to insert batch of logs'));

const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');
const infoEntry = LogEntry.systemInfo('Information message');
const logEntries: LogEntry[] = [
Expand All @@ -93,15 +96,15 @@ describe('IndexerMeta', () => {
});

it('should handle empty log entry', async () => {
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const logEntries: LogEntry[] = [];
await indexerMeta.writeLogs(logEntries);

expect(query).not.toHaveBeenCalled();
});

it('should skip log entries with levels lower than the logging level specified in the constructor', async () => {
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(errorIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');

await indexerMeta.writeLogs([debugEntry]);
Expand All @@ -110,18 +113,18 @@ describe('IndexerMeta', () => {
});

it('writes status for indexer', async () => {
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.setStatus(IndexerStatus.RUNNING);
expect(query).toBeCalledWith(
`INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
`INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});

it('writes last processed block height for indexer', async () => {
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.updateBlockheight(123);
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.updateBlockHeight(123);
expect(query).toBeCalledWith(
`INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
`INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});
});
Expand Down
24 changes: 13 additions & 11 deletions runner/src/indexer-meta/indexer-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client';
import { trace } from '@opentelemetry/api';
import type LogEntry from './log-entry';
import { LogLevel } from './log-entry';
import type IndexerConfig from '../indexer-config/indexer-config';
import type IndexerConfig from '../indexer-config';

export enum IndexerStatus {
PROVISIONING = 'PROVISIONING',
Expand All @@ -13,9 +13,11 @@ export enum IndexerStatus {
STOPPED = 'STOPPED',
}

const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
const STATUS_ATTRIBUTE = 'STATUS';
const LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE = 'LAST_PROCESSED_BLOCK_HEIGHT';
export const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
export enum MetadataFields {
STATUS = 'STATUS',
LAST_PROCESSED_BLOCK_HEIGHT = 'LAST_PROCESSED_BLOCK_HEIGHT'
}

export default class IndexerMeta {
tracer = trace.getTracer('queryapi-runner-indexer-logger');
Expand Down Expand Up @@ -68,23 +70,23 @@ export default class IndexerMeta {

async setStatus (status: IndexerStatus): Promise<void> {
const setStatusSpan = this.tracer.startSpan(`set status of indexer to ${status} through postgres`);
const values = [[STATUS_ATTRIBUTE, status]];
const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values);
const values = [[MetadataFields.STATUS, status]];
const setStatusQuery = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values);

try {
await wrapError(async () => await this.pgClient.query(query), `Failed to update status for ${this.indexerConfig.schemaName()}`);
await wrapError(async () => await this.pgClient.query(setStatusQuery), `Failed to update status for ${this.indexerConfig.schemaName()}`);
} finally {
setStatusSpan.end();
}
}

async updateBlockheight (blockHeight: number): Promise<void> {
async updateBlockHeight (blockHeight: number): Promise<void> {
const setLastProcessedBlockSpan = this.tracer.startSpan(`set last processed block to ${blockHeight} through postgres`);
const values = [[LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE, blockHeight.toString()]];
const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values);
const values = [[MetadataFields.LAST_PROCESSED_BLOCK_HEIGHT, blockHeight.toString()]];
const updateBlockHeightQuery = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values);

try {
await wrapError(async () => await this.pgClient.query(query), `Failed to update last processed block height for ${this.indexerConfig.schemaName()}`);
await wrapError(async () => await this.pgClient.query(updateBlockHeightQuery), `Failed to update last processed block height for ${this.indexerConfig.schemaName()}`);
} finally {
setLastProcessedBlockSpan.end();
}
Expand Down
Loading

0 comments on commit 492d95c

Please sign in to comment.