Skip to content

Commit

Permalink
feat: Rename logs and metadata tables (#677)
Browse files Browse the repository at this point in the history
The logs and metadata tables were created with a `__` prefix.
Unfortunately, it turns out that the prefix is a reserved prefix used by
Hasura. So, we are renaming the tables to be prefixed with `sys_`, which
is not reserved, to the best of my knowledge.

The specific process for the migration is:
1. Delete and recreate the cron DB in dev. This deletes the BD and any
scheduled jobs.
2. Delete logs/metadata tables and any created partitions. 
3. Create new tables. 
4. Use new tables successfully.
  • Loading branch information
darunrs authored Apr 19, 2024
1 parent bccb8b9 commit 69332be
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 35 deletions.
10 changes: 5 additions & 5 deletions runner/src/indexer-meta/indexer-meta.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('IndexerMeta', () => {
const infoEntry = LogEntry.systemInfo('Info message');
await indexerMeta.writeLogs([infoEntry]);

const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.sys_logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

Expand All @@ -54,7 +54,7 @@ describe('IndexerMeta', () => {
const errorEntry = LogEntry.systemError('Error message', 12345);
await indexerMeta.writeLogs([errorEntry]);

const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
const expectedQueryStructure = `INSERT INTO ${infoIndexerConfig.schemaName()}.sys_logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

Expand All @@ -77,7 +77,7 @@ describe('IndexerMeta', () => {

await indexerMeta.writeLogs(logEntries);

const expectedQuery = `INSERT INTO ${infoIndexerConfig.schemaName()}.__logs (block_height, date, timestamp, type, level, message) VALUES`;
const expectedQuery = `INSERT INTO ${infoIndexerConfig.schemaName()}.sys_logs (block_height, date, timestamp, type, level, message) VALUES`;
expect(query.mock.calls[0][0]).toContain(expectedQuery);
});

Expand Down Expand Up @@ -116,15 +116,15 @@ describe('IndexerMeta', () => {
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.setStatus(IndexerStatus.RUNNING);
expect(query).toBeCalledWith(
`INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
`INSERT INTO ${infoIndexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('STATUS', 'RUNNING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});

it('writes last processed block height for indexer', async () => {
const indexerMeta = new IndexerMeta(infoIndexerConfig, mockDatabaseConnectionParameters, genericMockPgClient);
await indexerMeta.updateBlockHeight(123);
expect(query).toBeCalledWith(
`INSERT INTO ${infoIndexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
`INSERT INTO ${infoIndexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('LAST_PROCESSED_BLOCK_HEIGHT', '123') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`
);
});
});
Expand Down
6 changes: 3 additions & 3 deletions runner/src/indexer-meta/indexer-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export enum IndexerStatus {
STOPPED = 'STOPPED',
}

export const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
export const METADATA_TABLE_UPSERT = 'INSERT INTO %I.sys_metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
export enum MetadataFields {
STATUS = 'STATUS',
LAST_PROCESSED_BLOCK_HEIGHT = 'LAST_PROCESSED_BLOCK_HEIGHT'
Expand All @@ -24,7 +24,7 @@ export default class IndexerMeta {

private readonly pgClient: PgClient;
private readonly indexerConfig: IndexerConfig;
private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.__logs (block_height, date, timestamp, type, level, message) VALUES %L';
private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.sys_logs (block_height, date, timestamp, type, level, message) VALUES %L';

constructor (
indexerConfig: IndexerConfig,
Expand Down Expand Up @@ -62,7 +62,7 @@ export default class IndexerMeta {

const query = format(this.logInsertQueryTemplate, this.indexerConfig.schemaName(), values);
await this.pgClient.query(query);
}, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.indexerConfig.schemaName()}.__logs table`)
}, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.indexerConfig.schemaName()}.sys_logs table`)
.finally(() => {
writeLogSpan.end();
});
Expand Down
20 changes: 10 additions & 10 deletions runner/src/provisioner/provisioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Provisioner', () => {
const functionName = 'test-function';
const databaseSchema = 'CREATE TABLE blocks (height numeric)';
indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO);
const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.__metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`;
const setProvisioningStatusQuery = `INSERT INTO ${indexerConfig.schemaName()}.sys_metadata (attribute, value) VALUES ('STATUS', 'PROVISIONING') ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *`;
const logsDDL = expect.any(String);
const metadataDDL = expect.any(String);
const error = new Error('some error');
Expand Down Expand Up @@ -111,8 +111,8 @@ describe('Provisioner', () => {
]);
expect(userPgClientQuery.mock.calls).toEqual([
[setProvisioningStatusQuery],
["SELECT cron.schedule_in_database('morgs_near_test_function_logs_create_partition', '0 1 * * *', $$SELECT morgs_near_test_function.fn_create_partition('morgs_near_test_function.__logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"],
["SELECT cron.schedule_in_database('morgs_near_test_function_logs_delete_partition', '0 2 * * *', $$SELECT morgs_near_test_function.fn_delete_partition('morgs_near_test_function.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"]
["SELECT cron.schedule_in_database('morgs_near_test_function_sys_logs_create_partition', '0 1 * * *', $$SELECT morgs_near_test_function.fn_create_partition('morgs_near_test_function.sys_logs', CURRENT_DATE, '1 day', '2 day')$$, 'morgs_near');"],
["SELECT cron.schedule_in_database('morgs_near_test_function_sys_logs_delete_partition', '0 2 * * *', $$SELECT morgs_near_test_function.fn_delete_partition('morgs_near_test_function.sys_logs', CURRENT_DATE, '-15 day', '-14 day')$$, 'morgs_near');"]
]);
expect(hasuraClient.addDatasource).toBeCalledWith(indexerConfig.userName(), password, indexerConfig.databaseName());
expect(hasuraClient.createSchema).toBeCalledWith(indexerConfig.userName(), indexerConfig.schemaName());
Expand Down Expand Up @@ -243,23 +243,23 @@ describe('Provisioner', () => {
});

it('provisions logs and metadata tables once', async () => {
hasuraClient.getTableNames = jest.fn().mockReturnValueOnce(['blocks']).mockReturnValue(['blocks', '__logs', '__metadata']);
hasuraClient.getTableNames = jest.fn().mockReturnValueOnce(['blocks']).mockReturnValue(['blocks', 'sys_logs', 'sys_metadata']);
await provisioner.provisionLogsAndMetadataIfNeeded(indexerConfig);
expect(hasuraClient.executeSqlOnSchema).toBeCalledTimes(2);
expect(cronPgClient.query).toBeCalledTimes(2);
expect(userPgClientQuery).toBeCalledTimes(3); // Set provisioning status, schedule today and tomorrow partitions
});

it('ensuring consistent state tracks logs and metadata table once, adds permissions twice', async () => {
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', '__logs', '__metadata']);
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', 'sys_logs', 'sys_metadata']);
hasuraClient.getTrackedTablePermissions = jest.fn()
.mockReturnValueOnce([
generateTableConfig('morgs_near_test_function', 'blocks', 'morgs_near', ['select', 'insert', 'update', 'delete']),
])
.mockReturnValueOnce([
generateTableConfig('morgs_near_test_function', 'blocks', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', '__logs', 'morgs_near', []),
generateTableConfig('morgs_near_test_function', '__metadata', 'morgs_near', []),
generateTableConfig('morgs_near_test_function', 'sys_logs', 'morgs_near', []),
generateTableConfig('morgs_near_test_function', 'sys_metadata', 'morgs_near', []),
]);
await provisioner.ensureConsistentHasuraState(indexerConfig);
await provisioner.ensureConsistentHasuraState(indexerConfig);
Expand All @@ -270,11 +270,11 @@ describe('Provisioner', () => {
});

it('ensuring consistent state caches result', async () => {
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', '__logs', '__metadata']);
hasuraClient.getTableNames = jest.fn().mockReturnValue(['blocks', 'sys_logs', 'sys_metadata']);
hasuraClient.getTrackedTablePermissions = jest.fn().mockReturnValue([
generateTableConfig('morgs_near_test_function', 'blocks', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', '__logs', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', '__metadata', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', 'sys_logs', 'morgs_near', ['select', 'insert', 'update', 'delete']),
generateTableConfig('morgs_near_test_function', 'sys_metadata', 'morgs_near', ['select', 'insert', 'update', 'delete']),
]);
await provisioner.ensureConsistentHasuraState(indexerConfig);
await provisioner.ensureConsistentHasuraState(indexerConfig);
Expand Down
20 changes: 16 additions & 4 deletions runner/src/provisioner/provisioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ export default class Provisioner {
const userCronPgClient = new this.PgClient(userDbConnectionParameters);
await userCronPgClient.query(
this.pgFormat(
"SELECT cron.schedule_in_database('%1$I_logs_create_partition', '0 1 * * *', $$SELECT %1$I.fn_create_partition('%1$I.__logs', CURRENT_DATE, '1 day', '2 day')$$, %2$L);",
"SELECT cron.schedule_in_database('%1$I_sys_logs_create_partition', '0 1 * * *', $$SELECT %1$I.fn_create_partition('%1$I.sys_logs', CURRENT_DATE, '1 day', '2 day')$$, %2$L);",
schemaName,
databaseName
)
);
await userCronPgClient.query(
this.pgFormat(
"SELECT cron.schedule_in_database('%1$I_logs_delete_partition', '0 2 * * *', $$SELECT %1$I.fn_delete_partition('%1$I.__logs', CURRENT_DATE, '-15 day', '-14 day')$$, %2$L);",
"SELECT cron.schedule_in_database('%1$I_sys_logs_delete_partition', '0 2 * * *', $$SELECT %1$I.fn_delete_partition('%1$I.sys_logs', CURRENT_DATE, '-15 day', '-14 day')$$, %2$L);",
schemaName,
databaseName
)
Expand Down Expand Up @@ -255,12 +255,24 @@ export default class Provisioner {
if (this.#hasLogsMetadataBeenProvisioned[indexerConfig.accountId]?.[indexerConfig.functionName]) {
return;
}
const logsTable = '__logs';
const metadataTable = '__metadata';
const oldLogsTable = '__logs';
const oldMetadataTable = '__metadata';
const logsTable = 'sys_logs';
const metadataTable = 'sys_metadata';

await wrapError(
async () => {
const tableNames = await this.getTableNames(indexerConfig.schemaName(), indexerConfig.databaseName());
const tablesToDelete: string[] = tableNames.filter((tableName: string) => tableName === oldLogsTable || tableName === oldMetadataTable);
if (tablesToDelete.length > 0) {
await this.hasuraClient.untrackTables(indexerConfig.databaseName(), indexerConfig.schemaName(), tablesToDelete, true);
}
if (tableNames.includes(oldLogsTable)) {
await this.hasuraClient.executeSqlOnSchema(indexerConfig.databaseName(), indexerConfig.schemaName(), `DROP TABLE IF EXISTS ${oldLogsTable} CASCADE;`);
}
if (tableNames.includes(oldMetadataTable)) {
await this.hasuraClient.executeSqlOnSchema(indexerConfig.databaseName(), indexerConfig.schemaName(), `DROP TABLE IF EXISTS ${oldMetadataTable};`);
}

if (!tableNames.includes(logsTable)) {
await this.setupPartitionedLogsTable(indexerConfig.userName(), indexerConfig.databaseName(), indexerConfig.schemaName());
Expand Down
16 changes: 8 additions & 8 deletions runner/src/provisioner/schemas/logs-table.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const logsTableDDL = (schemaName: string): string => `
CREATE TABLE __logs (
CREATE TABLE sys_logs (
id BIGSERIAL NOT NULL,
block_height NUMERIC(20),
date DATE NOT NULL,
Expand All @@ -10,11 +10,11 @@ CREATE TABLE __logs (
PRIMARY KEY (date, id)
) PARTITION BY RANGE (date);
CREATE INDEX __logs_timestamp_idx ON __logs USING btree (timestamp);
CREATE INDEX __logs_type_idx ON __logs USING btree (type);
CREATE INDEX __logs_level_idx ON __logs USING btree (level);
CREATE INDEX __logs_block_height_idx ON __logs USING btree (block_height);
CREATE INDEX __logs_search_vector_idx ON __logs USING GIN (to_tsvector('english', message));
CREATE INDEX sys_logs_timestamp_idx ON sys_logs USING btree (timestamp);
CREATE INDEX sys_logs_type_idx ON sys_logs USING btree (type);
CREATE INDEX sys_logs_level_idx ON sys_logs USING btree (level);
CREATE INDEX sys_logs_block_height_idx ON sys_logs USING btree (block_height);
CREATE INDEX sys_logs_search_vector_idx ON sys_logs USING GIN (to_tsvector('english', message));
CREATE OR REPLACE FUNCTION fn_create_partition(_tbl text, _date date, _interval_start text, _interval_end text)
Expand All @@ -34,8 +34,8 @@ EXECUTE 'CREATE TABLE IF NOT EXISTS ' || _tbl || '_p' || _partition_name || ' PA
END
$func$;
SELECT fn_create_partition('${schemaName}.__logs', CURRENT_DATE, '0 day', '1 day');
SELECT fn_create_partition('${schemaName}.__logs', CURRENT_DATE, '1 day', '2 day');
SELECT fn_create_partition('${schemaName}.sys_logs', CURRENT_DATE, '0 day', '1 day');
SELECT fn_create_partition('${schemaName}.sys_logs', CURRENT_DATE, '1 day', '2 day');
CREATE OR REPLACE FUNCTION fn_delete_partition(_tbl text, _date date, _interval_start text, _interval_end text)
RETURNS void
Expand Down
2 changes: 1 addition & 1 deletion runner/src/provisioner/schemas/metadata-table.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const metadataTableDDL = (): string => `
CREATE TABLE IF NOT EXISTS __metadata (
CREATE TABLE IF NOT EXISTS sys_metadata (
attribute TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (attribute)
Expand Down
8 changes: 4 additions & 4 deletions runner/tests/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,12 @@ async function indexerOldLogsQuery (indexerSchemaName: string, graphqlClient: Gr
async function indexerLogsQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise<any> {
const graphqlResult: any = await graphqlClient.request(gql`
query {
${indexerSchemaName}___logs {
${indexerSchemaName}_sys_logs {
message
}
}
`);
return graphqlResult[`${indexerSchemaName}___logs`];
return graphqlResult[`${indexerSchemaName}_sys_logs`];
}

async function indexerStatusQuery (indexerSchemaName: string, graphqlClient: GraphQLClient): Promise<any> {
Expand All @@ -295,11 +295,11 @@ async function indexerBlockHeightQuery (indexerSchemaName: string, graphqlClient
async function indexerMetadataQuery (indexerSchemaName: string, attribute: string, graphqlClient: GraphQLClient): Promise<any> {
const graphqlResult: any = await graphqlClient.request(gql`
query {
${indexerSchemaName}___metadata(where: {attribute: {_eq: "${attribute}"}}) {
${indexerSchemaName}_sys_metadata(where: {attribute: {_eq: "${attribute}"}}) {
attribute
value
}
}
`);
return graphqlResult[`${indexerSchemaName}___metadata`][0];
return graphqlResult[`${indexerSchemaName}_sys_metadata`][0];
}

0 comments on commit 69332be

Please sign in to comment.