diff --git a/runner/src/indexer-meta/indexer-meta.test.ts b/runner/src/indexer-meta/indexer-meta.test.ts index 673cdab0b..ee8da06b4 100644 --- a/runner/src/indexer-meta/indexer-meta.test.ts +++ b/runner/src/indexer-meta/indexer-meta.test.ts @@ -3,6 +3,7 @@ import IndexerMeta, { IndexerStatus } from './indexer-meta'; import type PgClient from '../pg-client'; import LogEntry, { LogLevel } from './log-entry'; import { type PostgresConnectionParams } from '../pg-client'; +import IndexerConfig from '../indexer-config/indexer-config'; describe('IndexerMeta', () => { let genericMockPgClient: PgClient; @@ -23,8 +24,9 @@ describe('IndexerMeta', () => { port: 5432, database: 'test_database' }; - const functionName = 'some_account/some_indexer'; - const schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); + + const indexerConfig = new IndexerConfig('', 'some-account', 'some-indexer', 0, '', '', LogLevel.INFO); + const schemaName = indexerConfig.schemaName(); describe('writeLog', () => { it('should insert a single log entry into the database', async () => { @@ -32,7 +34,7 @@ describe('IndexerMeta', () => { jest.useFakeTimers({ now: date.getTime() }); const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00'); - const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const infoEntry = LogEntry.systemInfo('Info message'); await indexerMeta.writeLogs([infoEntry]); @@ -45,7 +47,7 @@ describe('IndexerMeta', () => { jest.useFakeTimers({ now: date.getTime() }); const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00'); - const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const errorEntry = LogEntry.systemError('Error message', 12345); await indexerMeta.writeLogs([errorEntry]); @@ -56,13 +58,13 @@ describe('IndexerMeta', () => { it('should handle errors when inserting a single log entry', async () => { query.mockRejectedValueOnce(new Error('Failed to insert log')); - const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const errorEntry = LogEntry.systemError('Error message', 12345); await expect(indexerMeta.writeLogs([errorEntry])).rejects.toThrow('Failed to insert log'); }); it('should insert a batch of log entries into the database', async () => { - const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const debugEntry = LogEntry.systemDebug('Debug message'); const infoEntry = LogEntry.systemInfo('Information message'); const logEntries: LogEntry[] = [ @@ -79,7 +81,7 @@ describe('IndexerMeta', () => { it('should handle errors when inserting a batch of log entries', async () => { query.mockRejectedValueOnce(new Error('Failed to insert batch of logs')); - const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const debugEntry = LogEntry.systemDebug('Debug message'); const infoEntry = LogEntry.systemInfo('Information message'); const logEntries: LogEntry[] = [ @@ -91,7 +93,7 @@ describe('IndexerMeta', () => { }); it('should handle empty log entry', async () => { - const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const logEntries: LogEntry[] = []; await indexerMeta.writeLogs(logEntries); @@ -99,7 +101,7 @@ describe('IndexerMeta', () => { }); it('should skip log entries with levels lower than the logging level specified in the constructor', async () => { - const indexerMeta = new IndexerMeta(functionName, LogLevel.ERROR, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const debugEntry = LogEntry.systemDebug('Debug message'); await indexerMeta.writeLogs([debugEntry]); @@ -108,7 +110,7 @@ describe('IndexerMeta', () => { }); it('writes status for indexer', async () => { - const indexerMeta = new IndexerMeta(functionName, 5, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); await indexerMeta.setStatus(IndexerStatus.RUNNING); expect(query).toBeCalledWith( `INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *` @@ -116,7 +118,7 @@ describe('IndexerMeta', () => { }); it('writes last processed block height for indexer', async () => { - const indexerMeta = new IndexerMeta(functionName, 5, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); await indexerMeta.updateBlockheight(123); expect(query).toBeCalledWith( `INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *` diff --git a/runner/src/indexer-meta/indexer-meta.ts b/runner/src/indexer-meta/indexer-meta.ts index 9c8edc485..3ce3f17e1 100644 --- a/runner/src/indexer-meta/indexer-meta.ts +++ b/runner/src/indexer-meta/indexer-meta.ts @@ -4,6 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client'; import { trace } from '@opentelemetry/api'; import type LogEntry from './log-entry'; import { LogLevel } from './log-entry'; +import type IndexerConfig from '../indexer-config/indexer-config'; export enum IndexerStatus { PROVISIONING = 'PROVISIONING', @@ -20,25 +21,22 @@ export default class IndexerMeta { tracer = trace.getTracer('queryapi-runner-indexer-logger'); private readonly pgClient: PgClient; - private readonly schemaName: string; + private readonly indexerConfig: IndexerConfig; private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.__logs (block_height, date, timestamp, type, level, message) VALUES %L'; - private readonly loggingLevel: number; constructor ( - functionName: string, - loggingLevel: number, + indexerConfig: IndexerConfig, databaseConnectionParameters: PostgresConnectionParams, pgClientInstance: PgClient | undefined = undefined ) { const pgClient = pgClientInstance ?? new PgClient(databaseConnectionParameters); this.pgClient = pgClient; - this.schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); - this.loggingLevel = loggingLevel; + this.indexerConfig = indexerConfig; } private shouldLog (logLevel: LogLevel): boolean { - return logLevel >= this.loggingLevel; + return logLevel >= this.indexerConfig.logLevel; } async writeLogs ( @@ -49,6 +47,7 @@ export default class IndexerMeta { const spanMessage = `write log for ${entriesArray.length === 1 ? 'single entry' : `batch of ${entriesArray.length}`} through postgres `; const writeLogSpan = this.tracer.startSpan(spanMessage); + await wrapError(async () => { const values = entriesArray.map(entry => [ entry.blockHeight, @@ -59,9 +58,9 @@ export default class IndexerMeta { entry.message ]); - const query = format(this.logInsertQueryTemplate, this.schemaName, values); + const query = format(this.logInsertQueryTemplate, this.indexerConfig.schemaName(), values); await this.pgClient.query(query); - }, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.schemaName}.__logs table`) + }, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.indexerConfig.schemaName()}.__logs table`) .finally(() => { writeLogSpan.end(); }); @@ -70,10 +69,10 @@ export default class IndexerMeta { async setStatus (status: IndexerStatus): Promise { const setStatusSpan = this.tracer.startSpan(`set status of indexer to ${status} through postgres`); const values = [[STATUS_ATTRIBUTE, status]]; - const query = format(METADATA_TABLE_UPSERT, this.schemaName, values); + const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values); try { - await wrapError(async () => await this.pgClient.query(query), `Failed to update status for ${this.schemaName}`); + await wrapError(async () => await this.pgClient.query(query), `Failed to update status for ${this.indexerConfig.schemaName()}`); } finally { setStatusSpan.end(); } @@ -82,10 +81,10 @@ export default class IndexerMeta { async updateBlockheight (blockHeight: number): Promise { const setLastProcessedBlockSpan = this.tracer.startSpan(`set last processed block to ${blockHeight} through postgres`); const values = [[LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE, blockHeight.toString()]]; - const query = format(METADATA_TABLE_UPSERT, this.schemaName, values); + const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values); try { - await wrapError(async () => await this.pgClient.query(query), `Failed to update last processed block height for ${this.schemaName}`); + await wrapError(async () => await this.pgClient.query(query), `Failed to update last processed block height for ${this.indexerConfig.schemaName()}`); } finally { setLastProcessedBlockSpan.end(); } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 55aebdb4b..e9fddd846 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -4,7 +4,7 @@ import type fetch from 'node-fetch'; import Indexer from './indexer'; import { VM } from 'vm2'; import DmlHandler from '../dml-handler/dml-handler'; -// import IndexerMeta from '../indexer-meta/indexer-meta'; +import type IndexerMeta from '../indexer-meta/indexer-meta'; import type PgClient from '../pg-client'; import { LogLevel } from '../indexer-meta/log-entry'; import IndexerConfig from '../indexer-config/indexer-config'; @@ -198,11 +198,11 @@ CREATE TABLE delete: jest.fn().mockReturnValue([]), } as unknown as DmlHandler; - // const genericMockIndexerMeta : any = { - // writeLogs: jest.fn(), - // setStatus: jest.fn(), - // updateBlockheight: jest.fn() - // } as unknown as IndexerMeta; + const genericMockIndexerMeta: any = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockheight: jest.fn() + } as unknown as IndexerMeta; const genericDbCredentials: any = { database: 'test_near', @@ -251,7 +251,7 @@ CREATE TABLE fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler, - // indexerMeta: genericMockIndexerMeta , + indexerMeta: genericMockIndexerMeta, }, undefined, config); await indexer.execute(mockBlock); @@ -284,7 +284,7 @@ CREATE TABLE dmlHandler: genericMockDmlHandler }, undefined, config); - const context = indexer.buildContext(1 /** [] */); + const context = indexer.buildContext(1, []); const query = ` query { @@ -339,7 +339,7 @@ CREATE TABLE dmlHandler: genericMockDmlHandler }, undefined, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); await context.fetchFromSocialApi('/index', { method: 'POST', @@ -368,7 +368,7 @@ CREATE TABLE }); const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, dmlHandler: genericMockDmlHandler }, undefined, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); await expect(async () => await context.graphql('query { hello }')).rejects.toThrow('boom'); }); @@ -383,7 +383,7 @@ CREATE TABLE }); const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, dmlHandler: genericMockDmlHandler }, undefined, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const query = 'query($name: String) { hello(name: $name) }'; const variables = { name: 'morgan' }; @@ -504,7 +504,7 @@ CREATE TABLE const indexer = new Indexer(indexerConfig, { dmlHandler: genericMockDmlHandler }, undefined, config); // Does not outright throw an error but instead returns an empty object - expect(indexer.buildDatabaseContext(1)) + expect(indexer.buildDatabaseContext(1, [])) .toStrictEqual({}); }); @@ -515,7 +515,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: mockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const objToInsert = [{ account_id: 'morgs_near', @@ -549,7 +549,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: mockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const promises = []; for (let i = 1; i <= 100; i++) { @@ -584,7 +584,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: mockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const objToSelect = { account_id: 'morgs_near', @@ -610,7 +610,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: mockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const whereObj = { account_id: 'morgs_near', @@ -640,7 +640,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: mockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const objToInsert = [{ account_id: 'morgs_near', @@ -672,7 +672,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: mockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const deleteFilter = { account_id: 'morgs_near', @@ -687,7 +687,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: genericMockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); expect(Object.keys(context.db)).toStrictEqual([ 'CreatorQuest', @@ -726,7 +726,7 @@ CREATE TABLE fetch: genericMockFetch as unknown as typeof fetch, dmlHandler: genericMockDmlHandler }, genericDbCredentials, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); expect(Object.keys(context.db)).toStrictEqual([]); }); @@ -825,7 +825,7 @@ CREATE TABLE return (\`Created comment \${id} on post \${post.id}\`) `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'buildnear.testnet', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); await indexer.execute(mockBlock); @@ -874,7 +874,7 @@ CREATE TABLE throw new Error('boom'); `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'buildnear.testnet', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -912,7 +912,7 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; - const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); await indexer.execute(mockBlock); @@ -948,7 +948,7 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; - const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); await indexer.execute(mockBlock); @@ -982,7 +982,7 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; - const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); await indexer.execute(mockBlock); await indexer.execute(mockBlock); @@ -1022,7 +1022,7 @@ CREATE TABLE context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'morgs.near', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); await indexer.execute(mockBlock); @@ -1060,7 +1060,7 @@ CREATE TABLE context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'morgs.near', 'test', 0, code, 'schema', LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler }, undefined, config); + const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); await expect(indexer.execute(mockBlock)).rejects.toThrow(error); expect(mockFetch.mock.calls).toMatchSnapshot(); @@ -1113,19 +1113,19 @@ CREATE TABLE const indexerDebug = new Indexer( debugIndexerConfig, - { fetch: mockFetchDebug as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler }, + { fetch: mockFetchDebug as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config ); const indexerInfo = new Indexer( infoIndexerConfig, - { fetch: mockFetchInfo as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler }, + { fetch: mockFetchInfo as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config ); const indexerError = new Indexer( errorIndexerConfig, - { fetch: mockFetchError as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler }, + { fetch: mockFetchError as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config ); @@ -1154,7 +1154,7 @@ CREATE TABLE }) }); const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch }, undefined, config); - const context = indexer.buildContext(1); + const context = indexer.buildContext(1, []); const mutation = ` mutation { @@ -1180,6 +1180,52 @@ CREATE TABLE } ]); }); + + it('call writeLogs method at the end of execution with correct and all logs are present', async () => { + const mockFetchDebug = jest.fn(() => ({ + status: 200, + json: async () => ({ + errors: null, + }), + })); + const blockHeight = 456; + const mockBlock = Block.fromStreamerMessage({ + block: { + chunks: [], + header: { + height: blockHeight + } + }, + shards: {} + } as unknown as StreamerMessage) as unknown as Block; + + const indexerMeta: any = { + writeLogs: jest.fn(), + }; + + const code = ` + console.debug('debug log'); + console.log('info log'); + console.error('error log'); + await context.db.Posts.select({ + account_id: 'morgs_near', + receipt_id: 'abc', + }); + `; + + const debugIndexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'buildnear.testnet', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.DEBUG); + const mockDmlHandler: DmlHandler = { select: jest.fn() } as unknown as DmlHandler; + const indexerDebug = new Indexer( + debugIndexerConfig, + { fetch: mockFetchDebug as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta }, + undefined, + config + ); + + await indexerDebug.execute(mockBlock); + expect(indexerMeta.writeLogs).toHaveBeenCalledTimes(1); + expect(indexerMeta.writeLogs.mock.calls[0][0]).toHaveLength(5); + }); test('does not attach the hasura admin secret header when no role specified', async () => { const mockFetch = jest.fn() .mockResolvedValueOnce({ diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 18d5a131f..d3a554e43 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -5,9 +5,9 @@ import { Parser } from 'node-sql-parser'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; -import /** LogEntry, */ { LogLevel } from '../indexer-meta/log-entry'; +import LogEntry, { LogLevel } from '../indexer-meta/log-entry'; -import /** IndexerMeta, */ { IndexerStatus } from '../indexer-meta/indexer-meta'; +import IndexerMeta, { IndexerStatus } from '../indexer-meta/indexer-meta'; import { trace, type Span } from '@opentelemetry/api'; import type IndexerConfig from '../indexer-config'; import { type PostgresConnectionParams } from '../pg-client'; @@ -16,7 +16,7 @@ interface Dependencies { fetch: typeof fetch provisioner: Provisioner dmlHandler?: DmlHandler - // indexerMeta?: IndexerMeta + indexerMeta?: IndexerMeta parser: Parser }; @@ -82,44 +82,46 @@ export default class Indexer { const simultaneousPromises: Array> = []; const allMutations: string[] = []; - // const logEntries: LogEntry[] = []; + const logEntries: LogEntry[] = []; try { const runningMessage = `Running function ${this.indexerConfig.fullName()} on block ${blockHeight}, lag is: ${lag?.toString()}ms from block timestamp`; - simultaneousPromises.push(this.writeLog(LogLevel.INFO, blockHeight, runningMessage)); + simultaneousPromises.push(this.writeLogOld(LogLevel.INFO, blockHeight, runningMessage)); try { if (!await this.deps.provisioner.fetchUserApiProvisioningStatus(this.indexerConfig)) { await this.setStatus(blockHeight, IndexerStatus.PROVISIONING); - simultaneousPromises.push(this.writeLog(LogLevel.INFO, blockHeight, 'Provisioning endpoint: starting')); - // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: 'Provisioning endpoint: starting' }); + simultaneousPromises.push(this.writeLogOld(LogLevel.INFO, blockHeight, 'Provisioning endpoint: starting')); + const provisionStartLogEntry = LogEntry.systemInfo('Provisioning endpoint: starting', blockHeight); + logEntries.push(provisionStartLogEntry); await this.deps.provisioner.provisionUserApi(this.indexerConfig); - simultaneousPromises.push(this.writeLog(LogLevel.INFO, blockHeight, 'Provisioning endpoint: successful')); - // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: 'Provisioning endpoint: successful' }); + simultaneousPromises.push(this.writeLogOld(LogLevel.INFO, blockHeight, 'Provisioning endpoint: successful')); + const provisionSuccessLogEntry = LogEntry.systemInfo('Provisioning endpoint: successful', blockHeight); + logEntries.push(provisionSuccessLogEntry); } await this.deps.provisioner.provisionLogsIfNeeded(this.indexerConfig); await this.deps.provisioner.provisionMetadataIfNeeded(this.indexerConfig); } catch (e) { const error = e as Error; - simultaneousPromises.push(this.writeLog(LogLevel.ERROR, blockHeight, 'Provisioning endpoint: failure', error.message)); - // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: `Provisioning endpoint: failure ${error.message}` }); + simultaneousPromises.push(this.writeLogOld(LogLevel.ERROR, blockHeight, `Provisioning endpoint: failure:${error.message}`)); + const provisionFailureLogEntry = LogEntry.systemError('Provisioning endpoint: failure', blockHeight); + logEntries.push(provisionFailureLogEntry); throw error; } - // const runningLogEntry = LogEntry.systemInfo(runningMessage, blockHeight); - // logEntries.push(runningLogEntry); + const runningLogEntry = LogEntry.systemInfo(runningMessage, blockHeight); + logEntries.push(runningLogEntry); // Cache database credentials after provisioning const credentialsFetchSpan = this.tracer.startSpan('fetch database connection parameters'); try { this.database_connection_parameters ??= await this.deps.provisioner.getPgBouncerConnectionParameters(this.indexerConfig.hasuraRoleName()); - // this.database_connection_parameters = await this.getDatabaseConnectionParams(hasuraRoleName); - // this.deps.indexerMeta ??= new IndexerMeta(functionName, this.indexer_behavior.log_level, this.database_connection_parameters); + this.deps.indexerMeta ??= new IndexerMeta(this.indexerConfig, this.database_connection_parameters); this.deps.dmlHandler ??= new DmlHandler(this.database_connection_parameters); } catch (e) { const error = e as Error; - await this.writeLog(LogLevel.ERROR, blockHeight, 'Failed to get database connection parameters', error.message); - // const databaseErrorLogEntry = LogEntry.systemError('Failed to get database connection parameters', blockHeight); - // logEntries.push(databaseErrorLogEntry); + await this.writeLogOld(LogLevel.ERROR, blockHeight, 'Failed to get database connection parameters', error.message); + const databaseErrorLogEntry = LogEntry.systemError('Failed to get database connection parameters', blockHeight); + logEntries.push(databaseErrorLogEntry); throw error; } finally { credentialsFetchSpan.end(); @@ -129,7 +131,7 @@ export default class Indexer { const resourceCreationSpan = this.tracer.startSpan('prepare vm and context to run indexer code'); simultaneousPromises.push(this.setStatus(blockHeight, IndexerStatus.RUNNING)); const vm = new VM({ allowAsync: true }); - const context = this.buildContext(blockHeight /* ,logEntries */); + const context = this.buildContext(blockHeight, logEntries); vm.freeze(block, 'block'); vm.freeze(lakePrimitives, 'primitives'); @@ -143,8 +145,9 @@ export default class Indexer { await vm.run(transformedCode); } catch (e) { const error = e as Error; - simultaneousPromises.push(this.writeLog(LogLevel.ERROR, blockHeight, 'Error running IndexerFunction', error.message)); - // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.ERROR, message: `Error running IndexerFunction ${error.message}` }); + simultaneousPromises.push(this.writeLogOld(LogLevel.ERROR, blockHeight, 'Error running IndexerFunction', error.message)); + const indexerErrorLogEntry = LogEntry.systemError('Error running IndexerFunction', blockHeight); + logEntries.push(indexerErrorLogEntry); throw e; } finally { runIndexerCodeSpan.end(); @@ -156,12 +159,12 @@ export default class Indexer { await this.setStatus(blockHeight, IndexerStatus.FAILING); throw e; } finally { - await Promise.all([...simultaneousPromises]); + await Promise.all([...simultaneousPromises, (this.deps.indexerMeta as IndexerMeta).writeLogs(logEntries)]); } return allMutations; } - buildContext (blockHeight: number /*, logEntries: LogEntry[] */): Context { + buildContext (blockHeight: number, logEntries: LogEntry[]): Context { return { graphql: async (operation, variables) => { const graphqlSpan = this.tracer.startSpan(`Call graphql ${operation.includes('mutation') ? 'mutation' : 'query'} through Hasura`); @@ -189,29 +192,25 @@ export default class Indexer { } }, debug: async (...log) => { - return await this.writeLog(LogLevel.DEBUG, blockHeight, ...log); - // const debugLogEntry = LogEntry.systemDebug(log.join(' '), blockHeight); - // return await this.writeLog(debugLogEntry, logEntries as LogEntry[], functionName); + const debugLogEntry = LogEntry.systemDebug(log.join(' '), blockHeight); + return await this.writeLog(debugLogEntry, logEntries); }, log: async (...log) => { - return await this.writeLog(LogLevel.INFO, blockHeight, ...log); - // const infoLogEntry = LogEntry.systemInfo(log.join(' '), blockHeight); - // return await this.writeLog(infoLogEntry, logEntries as LogEntry[], functionName); + const infoLogEntry = LogEntry.systemInfo(log.join(' '), blockHeight); + return await this.writeLog(infoLogEntry, logEntries); }, warn: async (...log) => { - return await this.writeLog(LogLevel.WARN, blockHeight, ...log); - // const warnLogEntry = LogEntry.systemWarn(log.join(' '), blockHeight); - // return await this.writeLog(warnLogEntry, logEntries as LogEntry[], functionName); + const warnLogEntry = LogEntry.systemWarn(log.join(' '), blockHeight); + return await this.writeLog(warnLogEntry, logEntries); }, error: async (...log) => { - return await this.writeLog(LogLevel.ERROR, blockHeight, ...log); - // const errorLogEntry = LogEntry.systemError(log.join(' '), blockHeight); - // return await this.writeLog(errorLogEntry, logEntries as LogEntry[], functionName); + const errorLogEntry = LogEntry.systemError(log.join(' '), blockHeight); + return await this.writeLog(errorLogEntry, logEntries); }, fetchFromSocialApi: async (path, options) => { return await this.deps.fetch(`https://api.near.social${path}`, options); }, - db: this.buildDatabaseContext(blockHeight /** , logEntries as LogEntry[] */) + db: this.buildDatabaseContext(blockHeight, logEntries) }; } @@ -290,7 +289,7 @@ export default class Indexer { buildDatabaseContext ( blockHeight: number, - // logEntries: LogEntry[], + logEntries: LogEntry[], ): Record any>> { try { const tableNameToDefinitionNamesMapping = this.getTableNameToDefinitionNamesMapping(this.indexerConfig.schema); @@ -316,10 +315,8 @@ export default class Indexer { return await this.tracer.startActiveSpan('Call context db insert', async (insertSpan: Span) => { try { // Write log before calling insert - await this.writeLog(LogLevel.DEBUG, blockHeight, - `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName}`); - // const insertLogEntry = LogEntry.systemDebug(`Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName}`, blockHeight); - // await this.writeLog(insertLogEntry, logEntries, functionName); + const insertLogEntry = LogEntry.userDebug(`Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName}`, blockHeight); + await this.writeLog(insertLogEntry, logEntries); // Call insert with parameters return await dmlHandler.insert(this.indexerConfig.schemaName(), tableDefinitionNames, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]); } finally { @@ -331,10 +328,8 @@ export default class Indexer { return await this.tracer.startActiveSpan('Call context db select', async (selectSpan: Span) => { try { // Write log before calling select - await this.writeLog(LogLevel.DEBUG, blockHeight, - `Selecting objects in table ${tableName} with values ${JSON.stringify(filterObj)} with ${limit === null ? 'no' : limit} limit`); - // const selectLogEntry = LogEntry.systemDebug(`Selecting objects in table ${tableName} with values ${JSON.stringify(filterObj)} with ${limit === null ? 'no' : limit} limit`, blockHeight); - // await this.writeLog(selectLogEntry, logEntries, functionName); + const selectLogEntry = LogEntry.userDebug(`Selecting objects in table ${tableName} with values ${JSON.stringify(filterObj)} with ${limit === null ? 'no' : limit} limit`, blockHeight); + await this.writeLog(selectLogEntry, logEntries); // Call select with parameters return await dmlHandler.select(this.indexerConfig.schemaName(), tableDefinitionNames, filterObj, limit); } finally { @@ -346,10 +341,8 @@ export default class Indexer { return await this.tracer.startActiveSpan('Call context db update', async (updateSpan: Span) => { try { // Write log before calling update - await this.writeLog(LogLevel.DEBUG, blockHeight, - `Updating objects in table ${tableName} that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)}`); - // const updateLogEntry = LogEntry.systemDebug(`Updating objects in table ${tableName} that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)}`, blockHeight); - // await this.writeLog(updateLogEntry, logEntries, functionName); + const updateLogEntry = LogEntry.userDebug(`Updating objects in table ${tableName} that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)}`, blockHeight); + await this.writeLog(updateLogEntry, logEntries); // Call update with parameters return await dmlHandler.update(this.indexerConfig.schemaName(), tableDefinitionNames, filterObj, updateObj); } finally { @@ -361,10 +354,8 @@ export default class Indexer { return await this.tracer.startActiveSpan('Call context db upsert', async (upsertSpan: Span) => { try { // Write log before calling upsert - await this.writeLog(LogLevel.DEBUG, blockHeight, - `Inserting objects into table ${tableName} with values ${JSON.stringify(objectsToInsert)}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); - // const upsertLogEntry = LogEntry.systemDebug(`Inserting objects into table ${tableName} with values ${JSON.stringify(objectsToInsert)}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`, blockHeight); - // await this.writeLog(upsertLogEntry, logEntries, functionName); + const upsertLogEntry = LogEntry.userDebug(`Inserting objects into table ${tableName} with values ${JSON.stringify(objectsToInsert)}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`, blockHeight); + await this.writeLog(upsertLogEntry, logEntries); // Call upsert with parameters return await dmlHandler.upsert(this.indexerConfig.schemaName(), tableDefinitionNames, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns); } finally { @@ -376,10 +367,8 @@ export default class Indexer { return await this.tracer.startActiveSpan('Call context db delete', async (deleteSpan: Span) => { try { // Write log before calling delete - await this.writeLog(LogLevel.DEBUG, blockHeight, - `Deleting objects from table ${tableName} with values ${JSON.stringify(filterObj)}`); - // const deleteLogEntry = LogEntry.systemDebug(`Deleting objects from table ${tableName} with values ${JSON.stringify(filterObj)}`, blockHeight); - // await this.writeLog(deleteLogEntry, logEntries, functionName); + const deleteLogEntry = LogEntry.userDebug(`Deleting objects from table ${tableName} with values ${JSON.stringify(filterObj)}`, blockHeight); + await this.writeLog(deleteLogEntry, logEntries); // Call delete with parameters return await dmlHandler.delete(this.indexerConfig.schemaName(), tableDefinitionNames, filterObj); } finally { @@ -435,15 +424,18 @@ export default class Indexer { } } - // async writeLog (logEntry: LogEntry, logEntries: LogEntry[], functionName: string): Promise { - // logEntries.push(logEntry); - // const { logLevel, blockHeight, message } = logEntry; - // return await this.writeLogOld(logLevel, functionName, blockHeight, message); - // } + async writeLog (logEntry: LogEntry, logEntries: LogEntry[]): Promise { + logEntries.push(logEntry); + const { level, blockHeight, message } = logEntry; + if (blockHeight) { + return await this.writeLogOld(level, blockHeight, message); + } + } - // async callWriteLog (logEntry: LogEntry): Promise { - // await (this.deps.indexerMeta as IndexerMeta).writeLogs([logEntry]); - // } + // onetime use method to allow stream-handler to writeLog into new log table in case of failure + async callWriteLog (logEntry: LogEntry): Promise { + await (this.deps.indexerMeta as IndexerMeta).writeLogs([logEntry]); + } async updateIndexerBlockHeight (blockHeight: number): Promise { const realTimeMutation: string = ` @@ -474,7 +466,7 @@ export default class Indexer { } // todo rename to writeLogOld - async writeLog (logLevel: LogLevel, blockHeight: number, ...message: any[]): Promise { + async writeLogOld (logLevel: LogLevel, blockHeight: number, ...message: any[]): Promise { if (logLevel < this.indexerConfig.logLevel) { return; } diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 3033855ba..6e9196089 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -4,7 +4,7 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; import { IndexerStatus } from '../indexer-meta/indexer-meta'; -import { /* LogType, */ LogLevel } from '../indexer-meta/log-entry'; +import LogEntry, { LogLevel } from '../indexer-meta/log-entry'; import type IndexerConfig from '../indexer-config'; @@ -64,19 +64,11 @@ export default class StreamHandler { console.error(`Failed to set status STOPPED for stream: ${this.indexerConfig.redisStreamKey}`, e); }); + const streamErrorLogEntry = LogEntry.systemError(`Encountered error processing stream: ${this.indexerConfig.redisStreamKey}, terminating thread\n${error.toString()}`, this.executorContext.block_height); + Promise.all([ - indexer.writeLog( - LogLevel.ERROR, - this.executorContext.block_height, - `Encountered error processing stream: ${this.indexerConfig.fullName()}, terminating thread\n${error.toString()}` - ), - // indexer.callWriteLog({ - // blockHeight: this.executorContext.block_height, - // logTimestamp: new Date(), - // logType: LogType.SYSTEM, - // logLevel: LogLevel.ERROR, - // message: `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}` - // }) + indexer.writeLogOld(LogLevel.ERROR, this.executorContext.block_height, `Encountered error processing stream: ${this.indexerConfig.fullName()}, terminating thread\n${error.toString()}`), + indexer.callWriteLog(streamErrorLogEntry), ]).catch((e) => { console.error(`Failed to write log for stream: ${this.indexerConfig.redisStreamKey}`, e); }); diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index d990d0c6a..df9c6944c 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -6,7 +6,6 @@ import Indexer from '../src/indexer'; import HasuraClient from '../src/hasura-client'; import Provisioner from '../src/provisioner'; import PgClient from '../src/pg-client'; -// import IndexerMeta from '../src/indexer-meta/indexer-meta'; import { HasuraGraphQLContainer, type StartedHasuraGraphQLContainer } from './testcontainers/hasura'; import { PostgreSqlContainer, type StartedPostgreSqlContainer } from './testcontainers/postgres'; @@ -140,7 +139,7 @@ describe('Indexer integration', () => { expect(state.current_block_height).toEqual(115185109); expect(state.status).toEqual('RUNNING'); - const { indexer_log_entries: logs }: any = await graphqlClient.request(gql` + const { indexer_log_entries: old_logs }: any = await graphqlClient.request(gql` query { indexer_log_entries(where: { function_name: { _eq:"morgs.near/test" } }) { message @@ -148,7 +147,38 @@ describe('Indexer integration', () => { } `); + expect(old_logs.length).toEqual(4); + + const { morgs_near_test___logs: logs }: any = await graphqlClient.request(gql` + query { + morgs_near_test___logs { + message + } + } + `); + expect(logs.length).toEqual(4); + + const { morgs_near_test___logs: provisioning_endpoints }: any = await graphqlClient.request(gql` + query { + morgs_near_test___logs(where: {message: {_ilike: "%Provisioning endpoint%"}}) { + message + } + } + `); + + expect(provisioning_endpoints.length).toEqual(2); + + const { morgs_near_test___logs: running_function_enpoint }: any = await graphqlClient.request(gql` + query { + morgs_near_test___logs(where: {message: {_ilike: "%Running function%"}}) { + message + } + } + `); + + expect(running_function_enpoint.length).toEqual(2); + }); it('test context db', async () => {