diff --git a/runner/src/indexer-meta/indexer-meta.test.ts b/runner/src/indexer-meta/indexer-meta.test.ts index 950cb03c1..99de0350b 100644 --- a/runner/src/indexer-meta/indexer-meta.test.ts +++ b/runner/src/indexer-meta/indexer-meta.test.ts @@ -1,7 +1,7 @@ import pgFormat from 'pg-format'; import IndexerMeta, { IndexerStatus } from './indexer-meta'; import type PgClient from '../pg-client'; -import { LogType, LogLevel, type LogEntry } from './indexer-meta'; +import LogEntry, { LogLevel } from './log-entry'; describe('IndexerMeta', () => { let genericMockPgClient: PgClient; @@ -27,53 +27,46 @@ describe('IndexerMeta', () => { describe('writeLog', () => { it('should insert a single log entry into the database', async () => { + const date = new Date(); + jest.useFakeTimers({ now: date.getTime() }); + const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00'); + + const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const infoEntry = LogEntry.systemInfo('Info message'); + await indexerMeta.writeLogs([infoEntry]); + + const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`; + expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure); + }); + + it('should insert a single log entry into the database when logEntry has a blockheight', async () => { + const date = new Date(); + jest.useFakeTimers({ now: date.getTime() }); + const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00'); + const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); - const logEntry: LogEntry = { - blockHeight: 123, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message' - }; - - await indexerMeta.writeLogs(logEntry); - - const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES`; - expect(query.mock.calls[0][0]).toContain(expectedQueryStructure); + const errorEntry = LogEntry.systemError('Error message', 12345); + await indexerMeta.writeLogs([errorEntry]); + + const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`; + expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure); }); it('should handle errors when inserting a single log entry', async () => { query.mockRejectedValueOnce(new Error('Failed to insert log')); const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); - const logEntry: LogEntry = { - blockHeight: 123, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message' - }; - - await expect(indexerMeta.writeLogs(logEntry)).rejects.toThrow('Failed to insert log'); + const errorEntry = LogEntry.systemError('Error message', 12345); + await expect(indexerMeta.writeLogs([errorEntry])).rejects.toThrow('Failed to insert log'); }); it('should insert a batch of log entries into the database', async () => { const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const debugEntry = LogEntry.systemDebug('Debug message'); + const infoEntry = LogEntry.systemInfo('Information message'); const logEntries: LogEntry[] = [ - { - blockHeight: 123, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message 1' - }, - { - blockHeight: 124, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message 2' - } + debugEntry, + infoEntry ]; await indexerMeta.writeLogs(logEntries); @@ -86,21 +79,11 @@ describe('IndexerMeta', () => { query.mockRejectedValueOnce(new Error('Failed to insert batch of logs')); const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const debugEntry = LogEntry.systemDebug('Debug message'); + const infoEntry = LogEntry.systemInfo('Information message'); const logEntries: LogEntry[] = [ - { - blockHeight: 123, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message 1' - }, - { - blockHeight: 124, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message 2' - } + debugEntry, + infoEntry ]; await expect(indexerMeta.writeLogs(logEntries)).rejects.toThrow('Failed to insert batch of logs'); @@ -116,15 +99,9 @@ describe('IndexerMeta', () => { it('should skip log entries with levels lower than the logging level specified in the constructor', async () => { const indexerMeta = new IndexerMeta(functionName, LogLevel.ERROR, mockDatabaseConnectionParameters, genericMockPgClient); - const logEntry: LogEntry = { - blockHeight: 123, - logTimestamp: new Date(), - logType: LogType.SYSTEM, - logLevel: LogLevel.INFO, - message: 'Test log message' - }; - - await indexerMeta.writeLogs(logEntry); + const debugEntry = LogEntry.systemDebug('Debug message'); + + await indexerMeta.writeLogs([debugEntry]); expect(query).not.toHaveBeenCalled(); }); diff --git a/runner/src/indexer-meta/indexer-meta.ts b/runner/src/indexer-meta/indexer-meta.ts index 2d158b649..b2d3e2652 100644 --- a/runner/src/indexer-meta/indexer-meta.ts +++ b/runner/src/indexer-meta/indexer-meta.ts @@ -3,6 +3,8 @@ import { wrapError } from '../utility'; import PgClient from '../pg-client'; import { type DatabaseConnectionParameters } from '../provisioner/provisioner'; import { trace } from '@opentelemetry/api'; +import type LogEntry from './log-entry'; +import { LogLevel } from './log-entry'; export enum IndexerStatus { PROVISIONING = 'PROVISIONING', @@ -11,26 +13,6 @@ export enum IndexerStatus { STOPPED = 'STOPPED', } -export interface LogEntry { - blockHeight: number - logTimestamp: Date - logType: LogType - logLevel: LogLevel - message: string -} - -export enum LogLevel { - DEBUG = 2, - INFO = 5, - WARN = 6, - ERROR = 8, -} - -export enum LogType { - SYSTEM = 'system', - USER = 'user', -} - const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *'; const STATUS_ATTRIBUTE = 'STATUS'; const LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE = 'LAST_PROCESSED_BLOCK_HEIGHT'; @@ -67,9 +49,9 @@ export default class IndexerMeta { } async writeLogs ( - logEntries: LogEntry | LogEntry[], + logEntries: LogEntry[], ): Promise { - const entriesArray = (Array.isArray(logEntries) ? logEntries : [logEntries]).filter(entry => this.shouldLog(entry.logLevel)); ; + const entriesArray = logEntries.filter(entry => this.shouldLog(entry.level)); if (entriesArray.length === 0) return; const spanMessage = `write log for ${entriesArray.length === 1 ? 'single entry' : `batch of ${entriesArray.length}`} through postgres `; @@ -78,10 +60,10 @@ export default class IndexerMeta { await wrapError(async () => { const values = entriesArray.map(entry => [ entry.blockHeight, - entry.logTimestamp, - entry.logTimestamp, - entry.logType, - LogLevel[entry.logLevel], + entry.timestamp, + entry.timestamp, + entry.type, + LogLevel[entry.level], entry.message ]); diff --git a/runner/src/indexer-meta/log-entry.test.ts b/runner/src/indexer-meta/log-entry.test.ts new file mode 100644 index 000000000..8f02a1f6d --- /dev/null +++ b/runner/src/indexer-meta/log-entry.test.ts @@ -0,0 +1,92 @@ +import LogEntry, { LogType, LogLevel } from './log-entry'; + +describe('LogEntry', () => { + test('create a system debug log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.systemDebug('Debug message', blockHeight); + expect(logEntry.message).toBe('Debug message'); + expect(logEntry.level).toBe(LogLevel.DEBUG); + expect(logEntry.type).toBe(LogType.SYSTEM); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a system info log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.systemInfo('Info message', blockHeight); + expect(logEntry.message).toBe('Info message'); + expect(logEntry.level).toBe(LogLevel.INFO); + expect(logEntry.type).toBe(LogType.SYSTEM); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a system warn log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.systemWarn('Warn message', blockHeight); + expect(logEntry.message).toBe('Warn message'); + expect(logEntry.level).toBe(LogLevel.WARN); + expect(logEntry.type).toBe(LogType.SYSTEM); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a system error log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.systemError('Error message', blockHeight); + expect(logEntry.message).toBe('Error message'); + expect(logEntry.level).toBe(LogLevel.ERROR); + expect(logEntry.type).toBe(LogType.SYSTEM); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a user debug log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.userDebug('Debug message', blockHeight); + expect(logEntry.message).toBe('Debug message'); + expect(logEntry.level).toBe(LogLevel.DEBUG); + expect(logEntry.type).toBe(LogType.USER); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a user info log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.userInfo('User info message', blockHeight); + expect(logEntry.message).toBe('User info message'); + expect(logEntry.level).toBe(LogLevel.INFO); + expect(logEntry.type).toBe(LogType.USER); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a user warn log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.userWarn('User warn message', blockHeight); + expect(logEntry.message).toBe('User warn message'); + expect(logEntry.level).toBe(LogLevel.WARN); + expect(logEntry.type).toBe(LogType.USER); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a user error log entry', () => { + const blockHeight = 100; + const logEntry = LogEntry.userError('User error message', blockHeight); + expect(logEntry.message).toBe('User error message'); + expect(logEntry.level).toBe(LogLevel.ERROR); + expect(logEntry.type).toBe(LogType.USER); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBe(blockHeight); + }); + + test('create a system info log entry without blockheight', () => { + const logEntry = LogEntry.systemInfo('Info message'); + expect(logEntry.message).toBe('Info message'); + expect(logEntry.level).toBe(LogLevel.INFO); + expect(logEntry.type).toBe(LogType.SYSTEM); + expect(logEntry.timestamp).toBeInstanceOf(Date); + expect(logEntry.blockHeight).toBeUndefined(); + }); +}); diff --git a/runner/src/indexer-meta/log-entry.ts b/runner/src/indexer-meta/log-entry.ts new file mode 100644 index 000000000..4cb7dba21 --- /dev/null +++ b/runner/src/indexer-meta/log-entry.ts @@ -0,0 +1,60 @@ +export enum LogLevel { + DEBUG = 2, + INFO = 5, + WARN = 6, + ERROR = 8, +} + +export enum LogType { + SYSTEM = 'system', + USER = 'user', +} + +export default class LogEntry { + public readonly timestamp: Date; + + constructor ( + public readonly message: string, + public readonly level: LogLevel, + public readonly type: LogType, + public readonly blockHeight?: number + ) { + this.timestamp = new Date(); + } + + static createLog (message: string, level: LogLevel, type: LogType, blockHeight?: number): LogEntry { + return new LogEntry(message, level, type, blockHeight); + } + + static systemDebug (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.DEBUG, LogType.SYSTEM, blockHeight); + } + + static systemInfo (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.INFO, LogType.SYSTEM, blockHeight); + } + + static systemWarn (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.WARN, LogType.SYSTEM, blockHeight); + } + + static systemError (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.ERROR, LogType.SYSTEM, blockHeight); + } + + static userDebug (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.DEBUG, LogType.USER, blockHeight); + } + + static userInfo (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.INFO, LogType.USER, blockHeight); + } + + static userWarn (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.WARN, LogType.USER, blockHeight); + } + + static userError (message: string, blockHeight?: number): LogEntry { + return LogEntry.createLog(message, LogLevel.ERROR, LogType.USER, blockHeight); + } +} diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index c1324038e..b3d4f2f4b 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -6,7 +6,7 @@ import { VM } from 'vm2'; import DmlHandler from '../dml-handler/dml-handler'; import type PgClient from '../pg-client'; import { type IndexerBehavior } from '../stream-handler/stream-handler'; -import { LogLevel } from '../indexer-meta/indexer-meta'; +import { LogLevel } from '../indexer-meta/log-entry'; describe('Indexer unit tests', () => { const HASURA_ROLE = 'morgs_near'; diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index b7c600836..f17644346 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -8,7 +8,8 @@ import DmlHandler from '../dml-handler/dml-handler'; // import IndexerMeta from '../indexer-meta/indexer-meta'; import { type IndexerBehavior } from '../stream-handler/stream-handler'; -import { /* type LogEntry, LogType, */ IndexerStatus, LogLevel } from '../indexer-meta/indexer-meta'; +import { IndexerStatus } from '../indexer-meta/indexer-meta'; +import { LogLevel } from '../indexer-meta/log-entry'; import { type DatabaseConnectionParameters } from '../provisioner/provisioner'; import { trace, type Span } from '@opentelemetry/api'; diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index a96e3f653..cde61d624 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -138,7 +138,7 @@ export default class Provisioner { async setupPartitionedLogsTable (userName: string, databaseName: string, schemaName: string): Promise { await wrapError( async () => { - // TODO: Create logs table + // await this.runLogsSql(databaseName, schemaName); await this.grantCronAccess(userName); await this.scheduleLogPartitionJobs(userName, databaseName, schemaName); }, @@ -249,7 +249,6 @@ export default class Provisioner { await this.createSchema(databaseName, schemaName); - // await this.runLogsSql(databaseName, schemaName); // await this.createMetadataTable(databaseName, schemaName); await this.runIndexerSql(databaseName, schemaName, databaseSchema); diff --git a/runner/src/provisioner/schemas/logs-table.ts b/runner/src/provisioner/schemas/logs-table.ts index 1d63d781e..d045f327a 100644 --- a/runner/src/provisioner/schemas/logs-table.ts +++ b/runner/src/provisioner/schemas/logs-table.ts @@ -10,11 +10,11 @@ CREATE TABLE __logs ( PRIMARY KEY (date, id) ) PARTITION BY RANGE (date); -CREATE INDEX logs_timestamp_idx ON __logs USING btree (timestamp); -CREATE INDEX logs_type_idx ON __logs USING btree (type); -CREATE INDEX logs_level_idx ON __logs USING btree (level); -CREATE INDEX logs_block_height_idx ON __logs USING btree (block_height); -CREATE INDEX logs_search_vector_idx ON __logs USING GIN (to_tsvector('english', message)); +CREATE INDEX __logs_timestamp_idx ON __logs USING btree (timestamp); +CREATE INDEX __logs_type_idx ON __logs USING btree (type); +CREATE INDEX __logs_level_idx ON __logs USING btree (level); +CREATE INDEX __logs_block_height_idx ON __logs USING btree (block_height); +CREATE INDEX __logs_search_vector_idx ON __logs USING GIN (to_tsvector('english', message)); CREATE OR REPLACE FUNCTION fn_create_partition(_tbl text, _date date, _interval_start text, _interval_end text) diff --git a/runner/src/server/runner-service.test.ts b/runner/src/server/runner-service.test.ts index f1ef1b45d..b67fc321a 100644 --- a/runner/src/server/runner-service.test.ts +++ b/runner/src/server/runner-service.test.ts @@ -1,5 +1,6 @@ import type StreamHandler from '../stream-handler/stream-handler'; -import { IndexerStatus, LogLevel } from '../indexer-meta/indexer-meta'; +import { IndexerStatus } from '../indexer-meta/indexer-meta'; +import { LogLevel } from '../indexer-meta/log-entry'; import getRunnerService from './runner-service'; import * as grpc from '@grpc/grpc-js'; diff --git a/runner/src/server/runner-service.ts b/runner/src/server/runner-service.ts index b21c9d25e..2fbe8a90c 100644 --- a/runner/src/server/runner-service.ts +++ b/runner/src/server/runner-service.ts @@ -1,6 +1,7 @@ import { type ServerUnaryCall, type sendUnaryData } from '@grpc/grpc-js'; import * as grpc from '@grpc/grpc-js'; -import { IndexerStatus, LogLevel } from '../indexer-meta/indexer-meta'; +import { IndexerStatus } from '../indexer-meta/indexer-meta'; +import { LogLevel } from '../indexer-meta/log-entry'; import crypto from 'crypto'; import { type RunnerHandlers } from '../generated/runner/Runner'; diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 9e6fbd576..9d68cc146 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,7 +3,8 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; -import { /* LogType, */ IndexerStatus, LogLevel } from '../indexer-meta/indexer-meta'; +import { IndexerStatus } from '../indexer-meta/indexer-meta'; +import { /* LogType, */ LogLevel } from '../indexer-meta/log-entry'; export interface IndexerConfig { account_id: string diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index bfe5ae441..f4f34168b 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -10,7 +10,7 @@ import PgClient from '../src/pg-client'; import { HasuraGraphQLContainer, type StartedHasuraGraphQLContainer } from './testcontainers/hasura'; import { PostgreSqlContainer, type StartedPostgreSqlContainer } from './testcontainers/postgres'; import block1 from './blocks/00115185108/streamer_message.json'; -import { LogLevel } from '../src/indexer-meta/indexer-meta'; +import { LogLevel } from '../src/indexer-meta/log-entry'; describe('Indexer integration', () => { jest.setTimeout(300_000);