From 492d95ca25ea07fe8a301b38994de258fb21c897 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 17 Apr 2024 16:52:10 -0700 Subject: [PATCH] feat: Enable Metadata Table Writes (#659) Enable writes of Status and Last Processed Block Height to Metadata table. Reorganizes provisioning to ensure writing of PROVISIONING status. Ensures IndexerMeta is available for writing error logs. --- runner/src/indexer-meta/index.ts | 2 + runner/src/indexer-meta/indexer-meta.test.ts | 41 +- runner/src/indexer-meta/indexer-meta.ts | 24 +- runner/src/indexer/indexer.test.ts | 384 ++++++++++++------- runner/src/indexer/indexer.ts | 41 +- runner/src/provisioner/provisioner.test.ts | 42 +- runner/src/provisioner/provisioner.ts | 18 +- runner/src/stream-handler/stream-handler.ts | 7 +- runner/tests/integration.test.ts | 273 +++++++------ 9 files changed, 480 insertions(+), 352 deletions(-) create mode 100644 runner/src/indexer-meta/index.ts diff --git a/runner/src/indexer-meta/index.ts b/runner/src/indexer-meta/index.ts new file mode 100644 index 000000000..a6bf324cc --- /dev/null +++ b/runner/src/indexer-meta/index.ts @@ -0,0 +1,2 @@ +export { default } from './indexer-meta'; +export { IndexerStatus, METADATA_TABLE_UPSERT, MetadataFields } from './indexer-meta'; diff --git a/runner/src/indexer-meta/indexer-meta.test.ts b/runner/src/indexer-meta/indexer-meta.test.ts index ee8da06b4..085d4cce5 100644 --- a/runner/src/indexer-meta/indexer-meta.test.ts +++ b/runner/src/indexer-meta/indexer-meta.test.ts @@ -3,7 +3,7 @@ import IndexerMeta, { IndexerStatus } from './indexer-meta'; import type PgClient from '../pg-client'; import LogEntry, { LogLevel } from './log-entry'; import { type PostgresConnectionParams } from '../pg-client'; -import IndexerConfig from '../indexer-config/indexer-config'; +import IndexerConfig from '../indexer-config'; describe('IndexerMeta', () => { let genericMockPgClient: PgClient; @@ -17,6 +17,12 @@ describe('IndexerMeta', () => { } as unknown as PgClient; }); + const accountId = 'some-account'; + const functionName = 'some-indexer'; + + const infoIndexerConfig: IndexerConfig = new IndexerConfig('stream', accountId, functionName, 0, '', '', LogLevel.INFO); + const errorIndexerConfig: IndexerConfig = new IndexerConfig('stream', accountId, functionName, 0, '', '', LogLevel.ERROR); + const mockDatabaseConnectionParameters: PostgresConnectionParams = { user: 'test_user', password: 'test_password', @@ -25,20 +31,17 @@ describe('IndexerMeta', () => { database: 'test_database' }; - const indexerConfig = new IndexerConfig('', 'some-account', 'some-indexer', 0, '', '', LogLevel.INFO); - const schemaName = indexerConfig.schemaName(); - describe('writeLog', () => { it('should insert a single log entry into the database', async () => { const date = new Date(); jest.useFakeTimers({ now: date.getTime() }); const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00'); - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const infoEntry = LogEntry.systemInfo('Info message'); await indexerMeta.writeLogs([infoEntry]); - const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`; + const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`; expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure); }); @@ -47,24 +50,24 @@ describe('IndexerMeta', () => { jest.useFakeTimers({ now: date.getTime() }); const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00'); - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const errorEntry = LogEntry.systemError('Error message', 12345); await indexerMeta.writeLogs([errorEntry]); - const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`; + const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`; expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure); }); it('should handle errors when inserting a single log entry', async () => { query.mockRejectedValueOnce(new Error('Failed to insert log')); - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const errorEntry = LogEntry.systemError('Error message', 12345); await expect(indexerMeta.writeLogs([errorEntry])).rejects.toThrow('Failed to insert log'); }); it('should insert a batch of log entries into the database', async () => { - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const debugEntry = LogEntry.systemDebug('Debug message'); const infoEntry = LogEntry.systemInfo('Information message'); const logEntries: LogEntry[] = [ @@ -74,14 +77,14 @@ describe('IndexerMeta', () => { await indexerMeta.writeLogs(logEntries); - const expectedQuery = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES`; + const expectedQuery = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES`; expect(query.mock.calls[0][0]).toContain(expectedQuery); }); it('should handle errors when inserting a batch of log entries', async () => { query.mockRejectedValueOnce(new Error('Failed to insert batch of logs')); - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const debugEntry = LogEntry.systemDebug('Debug message'); const infoEntry = LogEntry.systemInfo('Information message'); const logEntries: LogEntry[] = [ @@ -93,7 +96,7 @@ describe('IndexerMeta', () => { }); it('should handle empty log entry', async () => { - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const logEntries: LogEntry[] = []; await indexerMeta.writeLogs(logEntries); @@ -101,7 +104,7 @@ describe('IndexerMeta', () => { }); it('should skip log entries with levels lower than the logging level specified in the constructor', async () => { - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(errorIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); const debugEntry = LogEntry.systemDebug('Debug message'); await indexerMeta.writeLogs([debugEntry]); @@ -110,18 +113,18 @@ describe('IndexerMeta', () => { }); it('writes status for indexer', async () => { - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); await indexerMeta.setStatus(IndexerStatus.RUNNING); expect(query).toBeCalledWith( - `INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *` + `INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *` ); }); it('writes last processed block height for indexer', async () => { - const indexerMeta = new IndexerMeta(indexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); - await indexerMeta.updateBlockheight(123); + const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient); + await indexerMeta.updateBlockHeight(123); expect(query).toBeCalledWith( - `INSERT INTO ${schemaName}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *` + `INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *` ); }); }); diff --git a/runner/src/indexer-meta/indexer-meta.ts b/runner/src/indexer-meta/indexer-meta.ts index 3ce3f17e1..dac39e4a1 100644 --- a/runner/src/indexer-meta/indexer-meta.ts +++ b/runner/src/indexer-meta/indexer-meta.ts @@ -4,7 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client'; import { trace } from '@opentelemetry/api'; import type LogEntry from './log-entry'; import { LogLevel } from './log-entry'; -import type IndexerConfig from '../indexer-config/indexer-config'; +import type IndexerConfig from '../indexer-config'; export enum IndexerStatus { PROVISIONING = 'PROVISIONING', @@ -13,9 +13,11 @@ export enum IndexerStatus { STOPPED = 'STOPPED', } -const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *'; -const STATUS_ATTRIBUTE = 'STATUS'; -const LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE = 'LAST_PROCESSED_BLOCK_HEIGHT'; +export const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *'; +export enum MetadataFields { + STATUS = 'STATUS', + LAST_PROCESSED_BLOCK_HEIGHT = 'LAST_PROCESSED_BLOCK_HEIGHT' +} export default class IndexerMeta { tracer = trace.getTracer('queryapi-runner-indexer-logger'); @@ -68,23 +70,23 @@ export default class IndexerMeta { async setStatus (status: IndexerStatus): Promise { const setStatusSpan = this.tracer.startSpan(`set status of indexer to ${status} through postgres`); - const values = [[STATUS_ATTRIBUTE, status]]; - const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values); + const values = [[MetadataFields.STATUS, status]]; + const setStatusQuery = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values); try { - await wrapError(async () => await this.pgClient.query(query), `Failed to update status for ${this.indexerConfig.schemaName()}`); + await wrapError(async () => await this.pgClient.query(setStatusQuery), `Failed to update status for ${this.indexerConfig.schemaName()}`); } finally { setStatusSpan.end(); } } - async updateBlockheight (blockHeight: number): Promise { + async updateBlockHeight (blockHeight: number): Promise { const setLastProcessedBlockSpan = this.tracer.startSpan(`set last processed block to ${blockHeight} through postgres`); - const values = [[LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE, blockHeight.toString()]]; - const query = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values); + const values = [[MetadataFields.LAST_PROCESSED_BLOCK_HEIGHT, blockHeight.toString()]]; + const updateBlockHeightQuery = format(METADATA_TABLE_UPSERT, this.indexerConfig.schemaName(), values); try { - await wrapError(async () => await this.pgClient.query(query), `Failed to update last processed block height for ${this.indexerConfig.schemaName()}`); + await wrapError(async () => await this.pgClient.query(updateBlockHeightQuery), `Failed to update last processed block height for ${this.indexerConfig.schemaName()}`); } finally { setLastProcessedBlockSpan.end(); } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index e9fddd846..7f533ba29 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -4,10 +4,13 @@ import type fetch from 'node-fetch'; import Indexer from './indexer'; import { VM } from 'vm2'; import DmlHandler from '../dml-handler/dml-handler'; -import type IndexerMeta from '../indexer-meta/indexer-meta'; import type PgClient from '../pg-client'; import { LogLevel } from '../indexer-meta/log-entry'; import IndexerConfig from '../indexer-config/indexer-config'; +import type IndexerMeta from '../indexer-meta'; +import { IndexerStatus } from '../indexer-meta'; +import type Provisioner from '../provisioner'; +import { type PostgresConnectionParams } from '../pg-client'; describe('Indexer unit tests', () => { const SIMPLE_SCHEMA = `CREATE TABLE @@ -60,137 +63,146 @@ describe('Indexer unit tests', () => { );`; const CASE_SENSITIVE_SCHEMA = ` - CREATE TABLE - Posts ( - "id" SERIAL NOT NULL, - "AccountId" VARCHAR NOT NULL, - BlockHeight DECIMAL(58, 0) NOT NULL, - "receiptId" VARCHAR NOT NULL, - content TEXT NOT NULL, - block_Timestamp DECIMAL(20, 0) NOT NULL, - "Accounts_Liked" JSONB NOT NULL DEFAULT '[]', - "LastCommentTimestamp" DECIMAL(20, 0), - CONSTRAINT "posts_pkey" PRIMARY KEY ("id") - ); - - CREATE TABLE - "CommentsTable" ( - "id" SERIAL NOT NULL, - PostId SERIAL NOT NULL, - "accountId" VARCHAR NOT NULL, - blockHeight DECIMAL(58, 0) NOT NULL, - CONSTRAINT "comments_pkey" PRIMARY KEY ("id") - );`; + CREATE TABLE + Posts ( + "id" SERIAL NOT NULL, + "AccountId" VARCHAR NOT NULL, + BlockHeight DECIMAL(58, 0) NOT NULL, + "receiptId" VARCHAR NOT NULL, + content TEXT NOT NULL, + block_Timestamp DECIMAL(20, 0) NOT NULL, + "Accounts_Liked" JSONB NOT NULL DEFAULT '[]', + "LastCommentTimestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "CommentsTable" ( + "id" SERIAL NOT NULL, + PostId SERIAL NOT NULL, + "accountId" VARCHAR NOT NULL, + blockHeight DECIMAL(58, 0) NOT NULL, + CONSTRAINT "comments_pkey" PRIMARY KEY ("id") + );`; const STRESS_TEST_SCHEMA = ` -CREATE TABLE creator_quest ( - account_id VARCHAR PRIMARY KEY, - num_components_created INTEGER NOT NULL DEFAULT 0, - completed BOOLEAN NOT NULL DEFAULT FALSE - ); - -CREATE TABLE - composer_quest ( - account_id VARCHAR PRIMARY KEY, - num_widgets_composed INTEGER NOT NULL DEFAULT 0, - completed BOOLEAN NOT NULL DEFAULT FALSE - ); - -CREATE TABLE - "contractor - quest" ( - account_id VARCHAR PRIMARY KEY, - num_contracts_deployed INTEGER NOT NULL DEFAULT 0, - completed BOOLEAN NOT NULL DEFAULT FALSE - ); - -CREATE TABLE - "posts" ( - "id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - "content" TEXT NOT NULL, - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "accounts_liked" JSONB NOT NULL DEFAULT '[]', - "last_comment_timestamp" DECIMAL(20, 0), - CONSTRAINT "posts_pkey" PRIMARY KEY ("id") - ); - -CREATE TABLE - "comments" ( - "id" SERIAL NOT NULL, - "post_id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0) NOT NULL, - "content" TEXT NOT NULL, - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - CONSTRAINT "comments_pkey" PRIMARY KEY ("id") - ); - -CREATE TABLE - "post_likes" ( - "post_id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0), - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - CONSTRAINT "post_likes_pkey" PRIMARY KEY ("post_id", "account_id") - ); - -CREATE UNIQUE INDEX "posts_account_id_block_height_key" ON "posts" ("account_id" ASC, "block_height" ASC); - -CREATE UNIQUE INDEX "comments_post_id_account_id_block_height_key" ON "comments" ( - "post_id" ASC, - "account_id" ASC, - "block_height" ASC -); - -CREATE INDEX - "posts_last_comment_timestamp_idx" ON "posts" ("last_comment_timestamp" DESC); - -ALTER TABLE - "comments" -ADD - CONSTRAINT "comments_post_id_fkey" FOREIGN KEY ("post_id") REFERENCES "posts" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION; - -ALTER TABLE - "post_likes" -ADD - CONSTRAINT "post_likes_post_id_fkey" FOREIGN KEY ("post_id") REFERENCES "posts" ("id") ON DELETE CASCADE ON UPDATE NO ACTION; - -CREATE TABLE IF NOT EXISTS - "My Table1" (id serial PRIMARY KEY); - -CREATE TABLE - "Another-Table" (id serial PRIMARY KEY); - -CREATE TABLE -IF NOT EXISTS - "Third-Table" (id serial PRIMARY KEY); - -CREATE TABLE - yet_another_table (id serial PRIMARY KEY); -`; + CREATE TABLE creator_quest ( + account_id VARCHAR PRIMARY KEY, + num_components_created INTEGER NOT NULL DEFAULT 0, + completed BOOLEAN NOT NULL DEFAULT FALSE + ); + + CREATE TABLE + composer_quest ( + account_id VARCHAR PRIMARY KEY, + num_widgets_composed INTEGER NOT NULL DEFAULT 0, + completed BOOLEAN NOT NULL DEFAULT FALSE + ); + + CREATE TABLE + "contractor - quest" ( + account_id VARCHAR PRIMARY KEY, + num_contracts_deployed INTEGER NOT NULL DEFAULT 0, + completed BOOLEAN NOT NULL DEFAULT FALSE + ); + + CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + "last_comment_timestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "comments" ( + "id" SERIAL NOT NULL, + "post_id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + CONSTRAINT "comments_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "post_likes" ( + "post_id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0), + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + CONSTRAINT "post_likes_pkey" PRIMARY KEY ("post_id", "account_id") + ); + + CREATE UNIQUE INDEX "posts_account_id_block_height_key" ON "posts" ("account_id" ASC, "block_height" ASC); + + CREATE UNIQUE INDEX "comments_post_id_account_id_block_height_key" ON "comments" ( + "post_id" ASC, + "account_id" ASC, + "block_height" ASC + ); + + CREATE INDEX + "posts_last_comment_timestamp_idx" ON "posts" ("last_comment_timestamp" DESC); + + ALTER TABLE + "comments" + ADD + CONSTRAINT "comments_post_id_fkey" FOREIGN KEY ("post_id") REFERENCES "posts" ("id") ON DELETE NO ACTION ON UPDATE NO ACTION; + + ALTER TABLE + "post_likes" + ADD + CONSTRAINT "post_likes_post_id_fkey" FOREIGN KEY ("post_id") REFERENCES "posts" ("id") ON DELETE CASCADE ON UPDATE NO ACTION; + + CREATE TABLE IF NOT EXISTS + "My Table1" (id serial PRIMARY KEY); + + CREATE TABLE + "Another-Table" (id serial PRIMARY KEY); + + CREATE TABLE + IF NOT EXISTS + "Third-Table" (id serial PRIMARY KEY); + + CREATE TABLE + yet_another_table (id serial PRIMARY KEY); + `; + const SIMPLE_REDIS_STREAM = 'test:stream'; const SIMPLE_ACCOUNT_ID = 'morgs.near'; const SIMPLE_FUNCTION_NAME = 'test_indexer'; - const SIMPLE_CODE = ''; + const SIMPLE_CODE = 'const a = 1;'; const simpleSchemaConfig: IndexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, SIMPLE_ACCOUNT_ID, SIMPLE_FUNCTION_NAME, 0, SIMPLE_CODE, SIMPLE_SCHEMA, LogLevel.INFO); const socialSchemaConfig: IndexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, SIMPLE_ACCOUNT_ID, SIMPLE_FUNCTION_NAME, 0, SIMPLE_CODE, SOCIAL_SCHEMA, LogLevel.INFO); const caseSensitiveConfig: IndexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, SIMPLE_ACCOUNT_ID, SIMPLE_FUNCTION_NAME, 0, SIMPLE_CODE, CASE_SENSITIVE_SCHEMA, LogLevel.INFO); const stressTestConfig: IndexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, SIMPLE_ACCOUNT_ID, SIMPLE_FUNCTION_NAME, 0, SIMPLE_CODE, STRESS_TEST_SCHEMA, LogLevel.INFO); + const genericDbCredentials: PostgresConnectionParams = { + database: 'test_near', + host: 'postgres', + password: 'test_pass', + port: 5432, + user: 'test_near' + }; + const genericMockFetch = jest.fn() .mockResolvedValue({ status: 200, json: async () => ({ data: 'mock', }), - }); + }) as unknown as typeof fetch; - const genericMockDmlHandler: any = { + const genericMockDmlHandler = { insert: jest.fn().mockReturnValue([]), select: jest.fn().mockReturnValue([]), update: jest.fn().mockReturnValue([]), @@ -201,23 +213,15 @@ CREATE TABLE const genericMockIndexerMeta: any = { writeLogs: jest.fn(), setStatus: jest.fn(), - updateBlockheight: jest.fn() + updateBlockHeight: jest.fn() } as unknown as IndexerMeta; - const genericDbCredentials: any = { - database: 'test_near', - host: 'postgres', - password: 'test_pass', - port: 5432, - username: 'test_near' - }; - - const genericProvisioner: any = { + const genericProvisioner = { getPgBouncerConnectionParameters: jest.fn().mockReturnValue(genericDbCredentials), fetchUserApiProvisioningStatus: jest.fn().mockResolvedValue(true), provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), - }; + } as unknown as Provisioner; const config = { hasuraEndpoint: 'mock-hasura-endpoint', @@ -246,17 +250,24 @@ CREATE TABLE const foo = 3; block.result = context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `; + const indexerMeta = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn() + } as unknown as IndexerMeta; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'buildnear.testnet', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler, - indexerMeta: genericMockIndexerMeta, + indexerMeta, }, undefined, config); await indexer.execute(mockBlock); expect(mockFetch.mock.calls).toMatchSnapshot(); + expect(indexerMeta.setStatus).toHaveBeenCalledWith(IndexerStatus.RUNNING); + expect(indexerMeta.updateBlockHeight).toHaveBeenCalledWith(blockHeight); }); test('Indexer.buildContext() allows execution of arbitrary GraphQL operations', async () => { @@ -825,7 +836,12 @@ CREATE TABLE return (\`Created comment \${id} on post \${post.id}\`) `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'buildnear.testnet', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); + const indexer = new Indexer(indexerConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner: genericProvisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta: genericMockIndexerMeta + }, undefined, config); await indexer.execute(mockBlock); @@ -873,19 +889,24 @@ CREATE TABLE const code = ` throw new Error('boom'); `; + const indexerMeta = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn() + } as unknown as IndexerMeta; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'buildnear.testnet', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); - - const functions: Record = {}; - functions['buildnear.testnet/test'] = { - code: ` - throw new Error('boom'); - `, - schema: SIMPLE_SCHEMA - }; + const indexer = new Indexer(indexerConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner: genericProvisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta, + }, undefined, config); await expect(indexer.execute(mockBlock)).rejects.toThrow(new Error('boom')); expect(mockFetch.mock.calls).toMatchSnapshot(); + expect(indexerMeta.setStatus).toHaveBeenNthCalledWith(1, IndexerStatus.RUNNING); + expect(indexerMeta.setStatus).toHaveBeenNthCalledWith(2, IndexerStatus.FAILING); + expect(indexerMeta.updateBlockHeight).not.toHaveBeenCalled(); }); test('Indexer.execute() provisions a GraphQL endpoint with the specified schema', async () => { @@ -912,11 +933,22 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; - const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); + const indexerMeta = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn() + } as unknown as IndexerMeta; + const indexer = new Indexer(simpleSchemaConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta, + }, undefined, config); await indexer.execute(mockBlock); expect(provisioner.fetchUserApiProvisioningStatus).toHaveBeenCalledWith(simpleSchemaConfig); + expect(indexerMeta.setStatus).toHaveBeenNthCalledWith(2, IndexerStatus.RUNNING); expect(provisioner.provisionUserApi).toHaveBeenCalledTimes(1); expect(provisioner.provisionUserApi).toHaveBeenCalledWith(simpleSchemaConfig); expect(provisioner.provisionLogsIfNeeded).toHaveBeenCalled(); @@ -948,7 +980,12 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; - const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); + const indexer = new Indexer(simpleSchemaConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta: genericMockIndexerMeta, + }, undefined, config); await indexer.execute(mockBlock); @@ -982,7 +1019,17 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; - const indexer = new Indexer(simpleSchemaConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); + const indexerMeta = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn() + } as unknown as IndexerMeta; + const indexer = new Indexer(simpleSchemaConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta, + }, undefined, config); await indexer.execute(mockBlock); await indexer.execute(mockBlock); @@ -992,6 +1039,10 @@ CREATE TABLE expect(provisioner.getPgBouncerConnectionParameters).toHaveBeenCalledTimes(1); expect(provisioner.provisionLogsIfNeeded).toHaveBeenCalled(); expect(provisioner.provisionMetadataIfNeeded).toHaveBeenCalled(); + expect(indexerMeta.setStatus).toHaveBeenCalledTimes(1); // Status is cached, so only called once + expect(indexerMeta.setStatus).toHaveBeenCalledWith(IndexerStatus.RUNNING); + expect(indexerMeta.updateBlockHeight).toHaveBeenCalledTimes(3); + expect(indexerMeta.updateBlockHeight).toHaveBeenCalledWith(blockHeight); }); test('Indexer.execute() supplies the required role to the GraphQL endpoint', async () => { @@ -1018,19 +1069,31 @@ CREATE TABLE provisionLogsIfNeeded: jest.fn(), provisionMetadataIfNeeded: jest.fn(), }; + const indexerMeta = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn() + } as unknown as IndexerMeta; const code = ` context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'morgs.near', 'test', 0, code, SIMPLE_SCHEMA, LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); + const indexer = new Indexer(indexerConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta, + }, undefined, config); await indexer.execute(mockBlock); expect(provisioner.provisionUserApi).not.toHaveBeenCalled(); + expect(indexerMeta.setStatus).toHaveBeenNthCalledWith(1, IndexerStatus.RUNNING); expect(mockFetch.mock.calls).toMatchSnapshot(); expect(provisioner.getPgBouncerConnectionParameters).toHaveBeenCalledTimes(1); expect(provisioner.provisionLogsIfNeeded).toHaveBeenCalled(); expect(provisioner.provisionMetadataIfNeeded).toHaveBeenCalled(); + expect(indexerMeta.updateBlockHeight).toHaveBeenCalledWith(blockHeight); }); test('Indexer.execute() logs provisioning failures', async () => { @@ -1055,15 +1118,31 @@ CREATE TABLE getPgBouncerConnectionParameters: jest.fn().mockReturnValue(genericDbCredentials), fetchUserApiProvisioningStatus: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), + provisionLogsIfNeeded: jest.fn(), + provisionMetadataIfNeeded: jest.fn(), }; + const indexerMeta = { + writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn() + } as unknown as IndexerMeta; const code = ` context.graphql(\`mutation { set(functionName: "buildnear.testnet/test", key: "height", data: "\${block.blockHeight}")}\`); `; const indexerConfig = new IndexerConfig(SIMPLE_REDIS_STREAM, 'morgs.near', 'test', 0, code, 'schema', LogLevel.INFO); - const indexer = new Indexer(indexerConfig, { fetch: mockFetch as unknown as typeof fetch, provisioner, dmlHandler: genericMockDmlHandler, indexerMeta: genericMockIndexerMeta }, undefined, config); + const indexer = new Indexer(indexerConfig, { + fetch: mockFetch as unknown as typeof fetch, + provisioner, + dmlHandler: genericMockDmlHandler, + indexerMeta, + }, undefined, config); await expect(indexer.execute(mockBlock)).rejects.toThrow(error); + expect(mockFetch.mock.calls).toMatchSnapshot(); + expect(indexerMeta.updateBlockHeight).not.toHaveBeenCalled(); + expect(provisioner.provisionLogsIfNeeded).not.toHaveBeenCalled(); + expect(provisioner.provisionMetadataIfNeeded).not.toHaveBeenCalled(); expect(provisioner.getPgBouncerConnectionParameters).not.toHaveBeenCalled(); }); @@ -1113,19 +1192,34 @@ CREATE TABLE const indexerDebug = new Indexer( debugIndexerConfig, - { fetch: mockFetchDebug as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta: genericMockIndexerMeta }, + { + fetch: mockFetchDebug as unknown as typeof fetch, + provisioner: genericProvisioner, + dmlHandler: mockDmlHandler, + indexerMeta: genericMockIndexerMeta + }, undefined, config ); const indexerInfo = new Indexer( infoIndexerConfig, - { fetch: mockFetchInfo as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta: genericMockIndexerMeta }, + { + fetch: mockFetchInfo as unknown as typeof fetch, + provisioner: genericProvisioner, + dmlHandler: mockDmlHandler, + indexerMeta: genericMockIndexerMeta + }, undefined, config ); const indexerError = new Indexer( errorIndexerConfig, - { fetch: mockFetchError as unknown as typeof fetch, provisioner: genericProvisioner, dmlHandler: mockDmlHandler, indexerMeta: genericMockIndexerMeta }, + { + fetch: mockFetchError as unknown as typeof fetch, + provisioner: genericProvisioner, + dmlHandler: mockDmlHandler, + indexerMeta: genericMockIndexerMeta + }, undefined, config ); @@ -1201,6 +1295,8 @@ CREATE TABLE const indexerMeta: any = { writeLogs: jest.fn(), + setStatus: jest.fn(), + updateBlockHeight: jest.fn(), }; const code = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index d3a554e43..1a2e0de18 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -7,10 +7,10 @@ import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; import LogEntry, { LogLevel } from '../indexer-meta/log-entry'; -import IndexerMeta, { IndexerStatus } from '../indexer-meta/indexer-meta'; import { trace, type Span } from '@opentelemetry/api'; import type IndexerConfig from '../indexer-config'; import { type PostgresConnectionParams } from '../pg-client'; +import IndexerMeta, { IndexerStatus } from '../indexer-meta'; interface Dependencies { fetch: typeof fetch @@ -48,7 +48,7 @@ const defaultConfig: Config = { export default class Indexer { DEFAULT_HASURA_ROLE: string; - LOGGGED_CONTEXT_DB_WARNING: boolean = false; + IS_FIRST_EXECUTION: boolean = false; tracer = trace.getTracer('queryapi-runner-indexer'); private readonly deps: Dependencies; @@ -60,7 +60,7 @@ export default class Indexer { constructor ( private readonly indexerConfig: IndexerConfig, deps?: Partial, - databaseConnectionParameters = undefined, + databaseConnectionParameters: PostgresConnectionParams | undefined = undefined, private readonly config: Config = defaultConfig, ) { this.DEFAULT_HASURA_ROLE = 'append'; @@ -127,7 +127,6 @@ export default class Indexer { credentialsFetchSpan.end(); } - // TODO: Prevent unnecesary reruns of set status const resourceCreationSpan = this.tracer.startSpan('prepare vm and context to run indexer code'); simultaneousPromises.push(this.setStatus(blockHeight, IndexerStatus.RUNNING)); const vm = new VM({ allowAsync: true }); @@ -386,9 +385,9 @@ export default class Indexer { return result; } catch (error) { const errorContent = error as { message: string, location: Record }; - if (!this.LOGGGED_CONTEXT_DB_WARNING) { + if (!this.IS_FIRST_EXECUTION) { console.warn(`${this.indexerConfig.fullName()}: Caught error when generating context.db methods. Building no functions. You can still use other context object methods.\nError: ${errorContent.message}\nLocation: `, errorContent.location); - this.LOGGGED_CONTEXT_DB_WARNING = true; + this.IS_FIRST_EXECUTION = true; } } return {}; // Default to empty object if error @@ -422,6 +421,9 @@ export default class Indexer { } finally { setStatusSpan.end(); } + + // Metadata table possibly unprovisioned when called, so I am not validating indexerMeta yet + await this.deps.indexerMeta?.setStatus(status); } async writeLog (logEntry: LogEntry, logEntries: LogEntry[]): Promise { @@ -432,9 +434,30 @@ export default class Indexer { } } + private async createIndexerMetaIfNotExists (failureMessage: string): Promise { + if (!this.deps.indexerMeta) { + try { + this.database_connection_parameters ??= await this.deps.provisioner.getPgBouncerConnectionParameters(this.indexerConfig.hasuraRoleName()); + this.deps.indexerMeta = new IndexerMeta(this.indexerConfig, this.database_connection_parameters); + } catch (e) { + const error = e as Error; + console.error(failureMessage, e); + throw error; + } + } + } + + async setStoppedStatus (): Promise { + await this.createIndexerMetaIfNotExists(`${this.indexerConfig.fullName()}: Failed to get DB params to set status STOPPED for stream`); + const indexerMeta: IndexerMeta = this.deps.indexerMeta as IndexerMeta; + await indexerMeta.setStatus(IndexerStatus.STOPPED); + } + // onetime use method to allow stream-handler to writeLog into new log table in case of failure - async callWriteLog (logEntry: LogEntry): Promise { - await (this.deps.indexerMeta as IndexerMeta).writeLogs([logEntry]); + async callWriteLog (logEntry: LogEntry): Promise { + await this.createIndexerMetaIfNotExists(`${this.indexerConfig.fullName()}: Failed to get DB params to write crashed worker error log for stream`); + const indexerMeta: IndexerMeta = this.deps.indexerMeta as IndexerMeta; + await indexerMeta.writeLogs([logEntry]); } async updateIndexerBlockHeight (blockHeight: number): Promise { @@ -463,6 +486,8 @@ export default class Indexer { } finally { setBlockHeightSpan.end(); } + + await (this.deps.indexerMeta as IndexerMeta).updateBlockHeight(blockHeight); } // todo rename to writeLogOld diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index 7254dc989..a2385c1a0 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -3,8 +3,6 @@ import pgFormat from 'pg-format'; import Provisioner from './provisioner'; import IndexerConfig from '../indexer-config/indexer-config'; import { LogLevel } from '../indexer-meta/log-entry'; -// import { logsTableDDL } from './schemas/logs-table'; -// import { metadataTableDDL } from './schemas/metadata-table'; describe('Provisioner', () => { let adminPgClient: any; @@ -12,12 +10,14 @@ describe('Provisioner', () => { let hasuraClient: any; let provisioner: Provisioner; let userPgClientQuery: any; - let indexerConfig: any; + let indexerConfig: IndexerConfig; const tableNames = ['blocks']; const accountId = 'morgs.near'; const functionName = 'test-function'; const databaseSchema = 'CREATE TABLE blocks (height numeric)'; + indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO); + const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`; const logsDDL = expect.any(String); const metadataDDL = expect.any(String); const error = new Error('some error'); @@ -42,13 +42,10 @@ describe('Provisioner', () => { addDatasource: jest.fn().mockReturnValueOnce(null), executeSqlOnSchema: jest.fn().mockReturnValueOnce(null), createSchema: jest.fn().mockReturnValueOnce(null), - setupPartitionedLogsTable: jest.fn().mockReturnValueOnce(null), doesSourceExist: jest.fn().mockReturnValueOnce(false), doesSchemaExist: jest.fn().mockReturnValueOnce(false), untrackTables: jest.fn().mockReturnValueOnce(null), - grantCronAccess: jest.fn().mockResolvedValueOnce(null), - scheduleLogPartitionJobs: jest.fn().mockResolvedValueOnce(null), - getDbConnectionParameters: jest.fn().mockReturnValueOnce({}), + getDbConnectionParameters: jest.fn().mockReturnValue({}), }; adminPgClient = { @@ -112,15 +109,15 @@ describe('Provisioner', () => { ['GRANT EXECUTE ON FUNCTION cron.schedule_in_database TO morgs_near;'], ]); expect(userPgClientQuery.mock.calls).toEqual([ + [setProvisioningStatusQuery], ["SELECT cron.schedule_in_database('morgs_near_test_function_logs_create_partition', '0 1 * * *', $$SELECT morgs_near_test_function.fn_create_partition('morgs_near_test_function.__logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"], ["SELECT cron.schedule_in_database('morgs_near_test_function_logs_delete_partition', '0 2 * * *', $$SELECT morgs_near_test_function.fn_delete_partition('morgs_near_test_function.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"] ]); expect(hasuraClient.addDatasource).toBeCalledWith(indexerConfig.userName(), password, indexerConfig.databaseName()); expect(hasuraClient.createSchema).toBeCalledWith(indexerConfig.userName(), indexerConfig.schemaName()); - // expect(hasuraClient.executeSqlOnSchema).toBeCalledWith(sanitizedAccountId, schemaName, metadataTableDDL()); - expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(1, indexerConfig.userName(), indexerConfig.schemaName(), databaseSchema); + expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(1, indexerConfig.userName(), indexerConfig.schemaName(), metadataDDL); expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(2, indexerConfig.userName(), indexerConfig.schemaName(), logsDDL); - expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.userName(), indexerConfig.schemaName(), metadataDDL); + expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.userName(), indexerConfig.schemaName(), databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(indexerConfig.schemaName(), indexerConfig.databaseName()); expect(hasuraClient.trackTables).toBeCalledWith(indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( @@ -147,9 +144,9 @@ describe('Provisioner', () => { expect(hasuraClient.addDatasource).not.toBeCalled(); expect(hasuraClient.createSchema).toBeCalledWith(indexerConfig.userName(), indexerConfig.schemaName()); - expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(1, indexerConfig.databaseName(), indexerConfig.schemaName(), databaseSchema); + expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(1, indexerConfig.userName(), indexerConfig.schemaName(), metadataDDL); expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(2, indexerConfig.userName(), indexerConfig.schemaName(), logsDDL); - expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.userName(), indexerConfig.schemaName(), metadataDDL); + expect(hasuraClient.executeSqlOnSchema).toHaveBeenNthCalledWith(3, indexerConfig.databaseName(), indexerConfig.schemaName(), databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(indexerConfig.schemaName(), indexerConfig.databaseName()); expect(hasuraClient.trackTables).toBeCalledWith(indexerConfig.schemaName(), tableNames, indexerConfig.databaseName()); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( @@ -239,19 +236,7 @@ describe('Provisioner', () => { }); it('throws when scheduling cron jobs fails', async () => { - userPgClientQuery = jest.fn().mockRejectedValueOnce(error); - - await expect(provisioner.provisionUserApi(indexerConfig)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to schedule log partition jobs: some error'); - }); - - it('throws when scheduling cron jobs fails', async () => { - userPgClientQuery = jest.fn().mockRejectedValueOnce(error); - - await expect(provisioner.provisionUserApi(indexerConfig)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to schedule log partition jobs: some error'); - }); - - it('throws when scheduling cron jobs fails', async () => { - userPgClientQuery = jest.fn().mockRejectedValueOnce(error); + userPgClientQuery = jest.fn().mockReturnValueOnce({}).mockRejectedValueOnce(error); // Succeed setting provisioning status first await expect(provisioner.provisionUserApi(indexerConfig)).rejects.toThrow('Failed to provision endpoint: Failed to setup partitioned logs table: Failed to schedule log partition jobs: some error'); }); @@ -269,6 +254,9 @@ describe('Provisioner', () => { await provisioner.provisionMetadataIfNeeded(indexerConfig); expect(hasuraClient.executeSqlOnSchema).toBeCalledTimes(1); + expect(userPgClientQuery.mock.calls).toEqual([ + [setProvisioningStatusQuery], + ]); }); it('get credentials for postgres', async () => { @@ -289,7 +277,7 @@ describe('Provisioner', () => { pgBouncerPort: 2, }); - const params = await mockProvisioner.getPostgresConnectionParameters(indexerConfig); + const params = await mockProvisioner.getPostgresConnectionParameters(indexerConfig.userName()); expect(params).toEqual({ user: 'username', password: 'password', @@ -317,7 +305,7 @@ describe('Provisioner', () => { pgBouncerPort: 2, }); - const params = await mockProvisioner.getPgBouncerConnectionParameters(indexerConfig); + const params = await mockProvisioner.getPgBouncerConnectionParameters(indexerConfig.userName()); expect(params).toEqual({ user: 'username', password: 'password', diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index 6aaf5079b..2e60f42ae 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -8,6 +8,7 @@ import { logsTableDDL } from './schemas/logs-table'; import { metadataTableDDL } from './schemas/metadata-table'; import PgClientClass, { type PostgresConnectionParams } from '../pg-client'; import type IndexerConfig from '../indexer-config/indexer-config'; +import { IndexerStatus, METADATA_TABLE_UPSERT, MetadataFields } from '../indexer-meta/indexer-meta'; const DEFAULT_PASSWORD_LENGTH = 16; @@ -186,7 +187,16 @@ export default class Provisioner { } async createMetadataTable (databaseName: string, schemaName: string): Promise { - return await wrapError(async () => await this.hasuraClient.executeSqlOnSchema(databaseName, schemaName, metadataTableDDL()), `Failed to create metadata table in ${databaseName}.${schemaName}`); + await wrapError(async () => await this.hasuraClient.executeSqlOnSchema(databaseName, schemaName, metadataTableDDL()), + `Failed to create metadata table in ${databaseName}.${schemaName}`); + } + + async setProvisioningStatus (userName: string, schemaName: string): Promise { + await wrapError(async () => { + const userDbConnectionParameters = await this.getPostgresConnectionParameters(userName); + const userPgClient = new this.PgClient(userDbConnectionParameters); + await userPgClient.query(pgFormatLib(METADATA_TABLE_UPSERT, schemaName, [[MetadataFields.STATUS, IndexerStatus.PROVISIONING]])); + }, 'Failed to set provisioning status on metadata table'); } async runIndexerSql (databaseName: string, schemaName: string, sqlScript: any): Promise { @@ -268,6 +278,7 @@ export default class Provisioner { if (!tableNames.includes(metadataTable)) { await this.createMetadataTable(indexerConfig.databaseName(), indexerConfig.schemaName()); + await this.setProvisioningStatus(indexerConfig.userName(), indexerConfig.schemaName()); await this.trackTables(indexerConfig.schemaName(), [metadataTable], indexerConfig.databaseName()); await this.addPermissionsToTables(indexerConfig.schemaName(), indexerConfig.databaseName(), [metadataTable], indexerConfig.userName(), ['select', 'insert', 'update', 'delete']); } @@ -296,9 +307,10 @@ export default class Provisioner { await this.createSchema(databaseName, schemaName); - await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); - await this.setupPartitionedLogsTable(userName, databaseName, schemaName); await this.createMetadataTable(databaseName, schemaName); + await this.setProvisioningStatus(userName, schemaName); + await this.setupPartitionedLogsTable(userName, databaseName, schemaName); + await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); const updatedTableNames = await this.getTableNames(schemaName, databaseName); diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 6e9196089..441cd2c73 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -58,11 +58,14 @@ export default class StreamHandler { private handleError (error: Error): void { console.error(`Encountered error processing stream: ${this.indexerConfig.fullName()}, terminating thread`, error); this.executorContext.status = IndexerStatus.STOPPED; - const indexer = new Indexer(this.indexerConfig); + const indexer = new Indexer(this.indexerConfig); indexer.setStatus(0, IndexerStatus.STOPPED).catch((e) => { console.error(`Failed to set status STOPPED for stream: ${this.indexerConfig.redisStreamKey}`, e); }); + indexer.setStoppedStatus().catch((e) => { + console.error(`Failed to set stopped status for stream in Metadata table: ${this.indexerConfig.redisStreamKey}`, e); + }); const streamErrorLogEntry = LogEntry.systemError(`Encountered error processing stream: ${this.indexerConfig.redisStreamKey}, terminating thread\n${error.toString()}`, this.executorContext.block_height); @@ -70,7 +73,7 @@ export default class StreamHandler { indexer.writeLogOld(LogLevel.ERROR, this.executorContext.block_height, `Encountered error processing stream: ${this.indexerConfig.fullName()}, terminating thread\n${error.toString()}`), indexer.callWriteLog(streamErrorLogEntry), ]).catch((e) => { - console.error(`Failed to write log for stream: ${this.indexerConfig.redisStreamKey}`, e); + console.error(`Failed to write failure log for stream: ${this.indexerConfig.redisStreamKey}`, e); }); this.worker.terminate().catch(() => { diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index df9c6944c..1de2adcd2 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -9,43 +9,25 @@ import PgClient from '../src/pg-client'; import { HasuraGraphQLContainer, type StartedHasuraGraphQLContainer } from './testcontainers/hasura'; import { PostgreSqlContainer, type StartedPostgreSqlContainer } from './testcontainers/postgres'; -import block115185108 from './blocks/00115185108/streamer_message.json'; -import block115185109 from './blocks/00115185109/streamer_message.json'; +import block_115185108 from './blocks/00115185108/streamer_message.json'; +import block_115185109 from './blocks/00115185109/streamer_message.json'; import { LogLevel } from '../src/indexer-meta/log-entry'; import IndexerConfig from '../src/indexer-config'; describe('Indexer integration', () => { jest.setTimeout(300_000); + let hasuraClient: HasuraClient; + let pgClient: PgClient; + let provisioner: Provisioner; + let network: StartedNetwork; let postgresContainer: StartedPostgreSqlContainer; let hasuraContainer: StartedHasuraGraphQLContainer; let graphqlClient: GraphQLClient; - beforeAll(async () => { - network = await new Network().start(); - postgresContainer = await (await PostgreSqlContainer.build()) - .withNetwork(network) - .start(); - hasuraContainer = await (await HasuraGraphQLContainer.build()) - .withNetwork(network) - .withDatabaseUrl(postgresContainer.getConnectionUri(network.getName())) - .start(); - graphqlClient = new GraphQLClient(`${hasuraContainer.getEndpoint()}/v1/graphql`, { - headers: { - 'X-Hasura-Admin-Secret': hasuraContainer.getAdminSecret(), - } - }); - }); - - afterAll(async () => { - await postgresContainer.stop(); - await hasuraContainer.stop(); - await network.stop(); - }); - - it('works', async () => { - const hasuraClient = new HasuraClient({}, { + beforeEach(async () => { + hasuraClient = new HasuraClient({}, { adminSecret: hasuraContainer.getAdminSecret(), endpoint: hasuraContainer.getEndpoint(), pgHostHasura: postgresContainer.getIpAddress(network.getName()), @@ -54,7 +36,7 @@ describe('Indexer integration', () => { pgPort: postgresContainer.getPort() }); - const pgClient = new PgClient({ + pgClient = new PgClient({ user: postgresContainer.getUsername(), password: postgresContainer.getPassword(), host: postgresContainer.getIpAddress(), @@ -62,7 +44,7 @@ describe('Indexer integration', () => { database: postgresContainer.getDatabase(), }); - const provisioner = new Provisioner( + provisioner = new Provisioner( hasuraClient, pgClient, pgClient, @@ -74,8 +56,32 @@ describe('Indexer integration', () => { pgBouncerPort: Number(postgresContainer.getPort()), } ); + }); - const code = ` + beforeAll(async () => { + network = await new Network().start(); + postgresContainer = await (await PostgreSqlContainer.build()) + .withNetwork(network) + .start(); + hasuraContainer = await (await HasuraGraphQLContainer.build()) + .withNetwork(network) + .withDatabaseUrl(postgresContainer.getConnectionUri(network.getName())) + .start(); + graphqlClient = new GraphQLClient(`${hasuraContainer.getEndpoint()}/v1/graphql`, { + headers: { + 'X-Hasura-Admin-Secret': hasuraContainer.getAdminSecret(), + } + }); + }); + + afterAll(async () => { + await postgresContainer.stop(); + await hasuraContainer.stop(); + await network.stop(); + }); + + it('works', async () => { + const indexerCode = ` await context.graphql( \` mutation ($height:numeric){ @@ -89,6 +95,13 @@ describe('Indexer integration', () => { } ); `; + const blocksIndexerQuery = gql` + query { + morgs_near_test_blocks { + height + } + } + `; const schema = 'CREATE TABLE blocks (height numeric)'; const indexerConfig = new IndexerConfig( @@ -96,7 +109,7 @@ describe('Indexer integration', () => { 'morgs.near', 'test', 0, - code, + indexerCode, schema, LogLevel.INFO ); @@ -113,105 +126,33 @@ describe('Indexer integration', () => { } ); - await indexer.execute(Block.fromStreamerMessage(block115185108 as any as StreamerMessage)); - - await indexer.execute(Block.fromStreamerMessage(block115185109 as any as StreamerMessage)); - - const { morgs_near_test_blocks: blocks }: any = await graphqlClient.request(gql` - query { - morgs_near_test_blocks { - height - } - } - `); - - expect(blocks.map(({ height }: any) => height)).toEqual([115185108, 115185109]); + await indexer.execute(Block.fromStreamerMessage(block_115185108 as any as StreamerMessage)); - const { indexer_state: [state] }: any = await graphqlClient.request(gql` - query { - indexer_state(where: { function_name: { _eq: "morgs.near/test" } }) { - current_block_height - status - } - } - `); + const firstHeight = await indexerBlockHeightQuery('morgs_near_test', graphqlClient); + expect(firstHeight.value).toEqual('115185108'); - expect(state.current_block_height).toEqual(115185109); - expect(state.status).toEqual('RUNNING'); + await indexer.execute(Block.fromStreamerMessage(block_115185109 as any as StreamerMessage)); - const { indexer_log_entries: old_logs }: any = await graphqlClient.request(gql` - query { - indexer_log_entries(where: { function_name: { _eq:"morgs.near/test" } }) { - message - } - } - `); + const secondStatus = await indexerStatusQuery('morgs_near_test', graphqlClient); + expect(secondStatus.value).toEqual('RUNNING'); + const secondHeight: any = await indexerBlockHeightQuery('morgs_near_test', graphqlClient); + expect(secondHeight.value).toEqual('115185109'); - expect(old_logs.length).toEqual(4); + const indexerState: any = await indexerOldStateQuery('morgs.near/test', graphqlClient); + expect(indexerState.current_block_height).toEqual(115185109); + expect(indexerState.status).toEqual('RUNNING'); - const { morgs_near_test___logs: logs }: any = await graphqlClient.request(gql` - query { - morgs_near_test___logs { - message - } - } - `); + const oldLogs: any = await indexerOldLogsQuery('morgs.near/test', graphqlClient); + expect(oldLogs.length).toEqual(4); + const logs: any = await indexerLogsQuery('morgs_near_test', graphqlClient); expect(logs.length).toEqual(4); - - const { morgs_near_test___logs: provisioning_endpoints }: any = await graphqlClient.request(gql` - query { - morgs_near_test___logs(where: {message: {_ilike: "%Provisioning endpoint%"}}) { - message - } - } - `); - - expect(provisioning_endpoints.length).toEqual(2); - - const { morgs_near_test___logs: running_function_enpoint }: any = await graphqlClient.request(gql` - query { - morgs_near_test___logs(where: {message: {_ilike: "%Running function%"}}) { - message - } - } - `); - - expect(running_function_enpoint.length).toEqual(2); + const { morgs_near_test_blocks: blocks }: any = await graphqlClient.request(blocksIndexerQuery); + expect(blocks.map(({ height }: any) => height)).toEqual([115185108, 115185109]); }); it('test context db', async () => { - const hasuraClient = new HasuraClient({}, { - adminSecret: hasuraContainer.getAdminSecret(), - endpoint: hasuraContainer.getEndpoint(), - pgHostHasura: postgresContainer.getIpAddress(network.getName()), - pgPortHasura: postgresContainer.getPort(network.getName()), - pgHost: postgresContainer.getIpAddress(), - pgPort: postgresContainer.getPort() - }); - - const pgClient = new PgClient({ - user: postgresContainer.getUsername(), - password: postgresContainer.getPassword(), - host: postgresContainer.getIpAddress(), - port: postgresContainer.getPort(), - database: postgresContainer.getDatabase(), - }); - - const provisioner = new Provisioner( - hasuraClient, - pgClient, - pgClient, - { - cronDatabase: postgresContainer.getDatabase(), - postgresHost: postgresContainer.getIpAddress(), - postgresPort: Number(postgresContainer.getPort()), - pgBouncerHost: postgresContainer.getIpAddress(), // TODO: Enable pgBouncer in Integ Tests - pgBouncerPort: Number(postgresContainer.getPort()), - } - ); - const schema = ` CREATE TABLE "indexer_storage" ( @@ -257,6 +198,24 @@ describe('Indexer integration', () => { value: "updated_value" }); `; + const queryAllRows = gql` + query MyQuery { + morgs_near_test_context_db_indexer_storage { + function_name + key_name + value + } + } + `; + const queryTestKeyRows = gql` + query MyQuery { + morgs_near_test_context_db_indexer_storage(where: {key_name: {_eq: "test_key"}, function_name: {_eq: "sample_indexer"}}) { + function_name + key_name + value + } + } + `; const indexerConfig = new IndexerConfig( 'test:stream', @@ -280,29 +239,67 @@ describe('Indexer integration', () => { } ); - await indexer.execute(Block.fromStreamerMessage(block115185108 as any as StreamerMessage)); - await indexer.execute(Block.fromStreamerMessage(block115185109 as any as StreamerMessage)); + await indexer.execute(Block.fromStreamerMessage(block_115185108 as any as StreamerMessage)); + await indexer.execute(Block.fromStreamerMessage(block_115185109 as any as StreamerMessage)); - const { morgs_near_test_context_db_indexer_storage: sampleRows }: any = await graphqlClient.request(gql` - query MyQuery { - morgs_near_test_context_db_indexer_storage(where: {key_name: {_eq: "test_key"}, function_name: {_eq: "sample_indexer"}}) { - function_name - key_name - value - } - } - `); + const { morgs_near_test_context_db_indexer_storage: sampleRows }: any = await graphqlClient.request(queryTestKeyRows); expect(sampleRows[0].value).toEqual('testing_value'); - const { morgs_near_test_context_db_indexer_storage: totalRows }: any = await graphqlClient.request(gql` - query MyQuery { - morgs_near_test_context_db_indexer_storage { - function_name - key_name - value - } - } - `); + const { morgs_near_test_context_db_indexer_storage: totalRows }: any = await graphqlClient.request(queryAllRows); expect(totalRows.length).toEqual(3); // Two inserts, and the overwritten upsert }); }); + +async function indexerOldStateQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise { + const { indexer_state: result }: any = await graphqlClient.request(gql` + query { + indexer_state(where: { function_name: { _eq: "${indexerSchemaName}" } }) { + current_block_height + status + } + } + `); + return result[0]; +} + +async function indexerOldLogsQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise { + const { indexer_log_entries: result }: any = await graphqlClient.request(gql` + query { + indexer_log_entries(where: { function_name: { _eq:"${indexerSchemaName}" } }) { + message + } + } + `); + return result; +} + +async function indexerLogsQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise { + const graphqlResult: any = await graphqlClient.request(gql` + query { + ${indexerSchemaName}___logs { + message + } + } + `); + return graphqlResult[`${indexerSchemaName}___logs`]; +} + +async function indexerStatusQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise { + return await indexerMetadataQuery(indexerSchemaName, 'STATUS', graphqlClient); +} + +async function indexerBlockHeightQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise { + return await indexerMetadataQuery(indexerSchemaName, 'LAST_PROCESSED_BLOCK_HEIGHT', graphqlClient); +} + +async function indexerMetadataQuery (indexerSchemaName: string, attribute: string, graphqlClient: GraphQLClient): Promise { + const graphqlResult: any = await graphqlClient.request(gql` + query { + ${indexerSchemaName}___metadata(where: {attribute: {_eq: "${attribute}"}}) { + attribute + value + } + } + `); + return graphqlResult[`${indexerSchemaName}___metadata`][0]; +}