Skip to content

Commit

Permalink
Enable Logging functionality to both new and old Log Tables (#657)
Browse files Browse the repository at this point in the history
Uncommented functionality so we actually start writing logs to new the
Tables that have been provisioned in #643.
Old logging implementation remains untouched as still functions
(although it has been renamed from writeLog -> writeLogOld). We are
writing to both log tables.

### 1. Provisioning and Logging (to both tables) for a new Indexer

https://www.loom.com/share/3ad6d6ea3368412e8896340a74759ffb?sid=4d5379e8-5401-41bf-9e38-d0f8e8c4eca5

### 2. Logging (to both tables) for a existing Indexer

https://www.loom.com/share/4ba411f2bcb740e1842650f695ffb347?sid=253ced68-9d4c-459f-871b-b0a3ee00cd91

### Provisioning and Logging new logs table for a existing Indexer (that
does not have logs table)

https://www.loom.com/share/2aa7c0cc882f4dbdb9e51fc2a9e9b7b9?sid=1aa511fe-3054-4d27-9996-2b9fddc44ed8
  • Loading branch information
Kevin101Zhang authored Apr 16, 2024
1 parent 1db0cdd commit e23d4c1
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 136 deletions.
24 changes: 13 additions & 11 deletions runner/src/indexer-meta/indexer-meta.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import IndexerMeta, { IndexerStatus } from './indexer-meta';
import type PgClient from '../pg-client';
import LogEntry, { LogLevel } from './log-entry';
import { type PostgresConnectionParams } from '../pg-client';
import IndexerConfig from '../indexer-config/indexer-config';

describe('IndexerMeta', () => {
let genericMockPgClient: PgClient;
Expand All @@ -23,16 +24,17 @@ describe('IndexerMeta', () => {
port: 5432,
database: 'test_database'
};
const functionName = 'some_account/some_indexer';
const schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_');

const indexerConfig = new IndexerConfig('', 'some-account', 'some-indexer', 0, '', '', LogLevel.INFO);
const schemaName = indexerConfig.schemaName();

describe('writeLog', () => {
it('should insert a single log entry into the database', async () => {
const date = new Date();
jest.useFakeTimers({ now: date.getTime() });
const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00');

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const infoEntry = LogEntry.systemInfo('Info message');
await indexerMeta.writeLogs([infoEntry]);

Expand All @@ -45,7 +47,7 @@ describe('IndexerMeta', () => {
jest.useFakeTimers({ now: date.getTime() });
const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00');

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const errorEntry = LogEntry.systemError('Error message', 12345);
await indexerMeta.writeLogs([errorEntry]);

Expand All @@ -56,13 +58,13 @@ describe('IndexerMeta', () => {
it('should handle errors when inserting a single log entry', async () => {
query.mockRejectedValueOnce(new Error('Failed to insert log'));

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const errorEntry = LogEntry.systemError('Error message', 12345);
await expect(indexerMeta.writeLogs([errorEntry])).rejects.toThrow('Failed to insert log');
});

it('should insert a batch of log entries into the database', async () => {
const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');
const infoEntry = LogEntry.systemInfo('Information message');
const logEntries: LogEntry[] = [
Expand All @@ -79,7 +81,7 @@ describe('IndexerMeta', () => {
it('should handle errors when inserting a batch of log entries', async () => {
query.mockRejectedValueOnce(new Error('Failed to insert batch of logs'));

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');
const infoEntry = LogEntry.systemInfo('Information message');
const logEntries: LogEntry[] = [
Expand All @@ -91,15 +93,15 @@ describe('IndexerMeta', () => {
});

it('should handle empty log entry', async () => {
const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const logEntries: LogEntry[] = [];
await indexerMeta.writeLogs(logEntries);

expect(query).not.toHaveBeenCalled();
});

it('should skip log entries with levels lower than the logging level specified in the constructor', async () => {
const indexerMeta = new IndexerMeta(functionName, LogLevel.ERROR, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');

await indexerMeta.writeLogs([debugEntry]);
Expand All @@ -108,15 +110,15 @@ describe('IndexerMeta', () => {
});

it('writes status for indexer', async () => {
const indexerMeta = new IndexerMeta(functionName, 5, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.setStatus(IndexerStatus.RUNNING);
expect(query).toBeCalledWith(
`INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});

it('writes last processed block height for indexer', async () => {
const indexerMeta = new IndexerMeta(functionName, 5, mockDatabaseConnectionParameters, genericMockPgClient);
const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.updateBlockheight(123);
expect(query).toBeCalledWith(
`INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
Expand Down
25 changes: 12 additions & 13 deletions runner/src/indexer-meta/indexer-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client';
import { trace } from '@opentelemetry/api';
import type LogEntry from './log-entry';
import { LogLevel } from './log-entry';
import type IndexerConfig from '../indexer-config/indexer-config';

export enum IndexerStatus {
PROVISIONING = 'PROVISIONING',
Expand All @@ -20,25 +21,22 @@ export default class IndexerMeta {
tracer = trace.getTracer('queryapi-runner-indexer-logger');

private readonly pgClient: PgClient;
private readonly schemaName: string;
private readonly indexerConfig: IndexerConfig;
private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.__logs (block_height, date, timestamp, type, level, message) VALUES %L';
private readonly loggingLevel: number;

constructor (
functionName: string,
loggingLevel: number,
indexerConfig: IndexerConfig,
databaseConnectionParameters: PostgresConnectionParams,
pgClientInstance: PgClient | undefined = undefined
) {
const pgClient = pgClientInstance ?? new PgClient(databaseConnectionParameters);

this.pgClient = pgClient;
this.schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_');
this.loggingLevel = loggingLevel;
this.indexerConfig = indexerConfig;
}

private shouldLog (logLevel: LogLevel): boolean {
return logLevel >= this.loggingLevel;
return logLevel >= this.indexerConfig.logLevel;
}

async writeLogs (
Expand All @@ -49,6 +47,7 @@ export default class IndexerMeta {

const spanMessage = `write log for ${entriesArray.length === 1 ? 'single entry' : `batch of ${entriesArray.length}`} through postgres `;
const writeLogSpan = this.tracer.startSpan(spanMessage);

await wrapError(async () => {
const values = entriesArray.map(entry => [
entry.blockHeight,
Expand All @@ -59,9 +58,9 @@ export default class IndexerMeta {
entry.message
]);

const query = format(this.logInsertQueryTemplate, this.schemaName, values);
const query = format(this.logInsertQueryTemplate, this.indexerConfig.schemaName(), values);
await this.pgClient.query(query);
}, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.schemaName}.__logs table`)
}, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.indexerConfig.schemaName()}.__logs table`)
.finally(() => {
writeLogSpan.end();
});
Expand All @@ -70,10 +69,10 @@ export default class IndexerMeta {
async setStatus (status: IndexerStatus): Promise<void> {
const setStatusSpan = this.tracer.startSpan(`set status of indexer to ${status} through postgres`);
const values = [[STATUS_ATTRIBUTE, status]];
const query = format(METADATA_TABLE_UPSERT, this.schemaName, values);
const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values);

try {
await wrapError(async () => await this.pgClient.query(query), `Failed to update status for ${this.schemaName}`);
await wrapError(async () => await this.pgClient.query(query), `Failed to update status for ${this.indexerConfig.schemaName()}`);
} finally {
setStatusSpan.end();
}
Expand All @@ -82,10 +81,10 @@ export default class IndexerMeta {
async updateBlockheight (blockHeight: number): Promise<void> {
const setLastProcessedBlockSpan = this.tracer.startSpan(`set last processed block to ${blockHeight} through postgres`);
const values = [[LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE, blockHeight.toString()]];
const query = format(METADATA_TABLE_UPSERT, this.schemaName, values);
const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values);

try {
await wrapError(async () => await this.pgClient.query(query), `Failed to update last processed block height for ${this.schemaName}`);
await wrapError(async () => await this.pgClient.query(query), `Failed to update last processed block height for ${this.indexerConfig.schemaName()}`);
} finally {
setLastProcessedBlockSpan.end();
}
Expand Down
Loading

0 comments on commit e23d4c1

Please sign in to comment.