Skip to content

Commit

Permalink
fix: Unresolved comments in #608 (#640)
Browse files Browse the repository at this point in the history
Feat: created logEntry class and test cases
Chore:  relocated createLogs to abstracted func
Chore: renamed schema idx to prefix with '__'
  • Loading branch information
Kevin101Zhang authored Apr 9, 2024
1 parent bc93fcf commit 89dadf5
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 98 deletions.
95 changes: 36 additions & 59 deletions runner/src/indexer-meta/indexer-meta.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pgFormat from 'pg-format';
import IndexerMeta, { IndexerStatus } from './indexer-meta';
import type PgClient from '../pg-client';
import { LogType, LogLevel, type LogEntry } from './indexer-meta';
import LogEntry, { LogLevel } from './log-entry';

describe('IndexerMeta', () => {
let genericMockPgClient: PgClient;
Expand All @@ -27,53 +27,46 @@ describe('IndexerMeta', () => {

describe('writeLog', () => {
it('should insert a single log entry into the database', async () => {
const date = new Date();
jest.useFakeTimers({ now: date.getTime() });
const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00');

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const infoEntry = LogEntry.systemInfo('Info message');
await indexerMeta.writeLogs([infoEntry]);

const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES (NULL, '${formattedDate}', '${formattedDate}', 'system', 'INFO', 'Info message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

it('should insert a single log entry into the database when logEntry has a blockheight', async () => {
const date = new Date();
jest.useFakeTimers({ now: date.getTime() });
const formattedDate = date.toISOString().replace('T', ' ').replace('Z', '+00');

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const logEntry: LogEntry = {
blockHeight: 123,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message'
};

await indexerMeta.writeLogs(logEntry);

const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES`;
expect(query.mock.calls[0][0]).toContain(expectedQueryStructure);
const errorEntry = LogEntry.systemError('Error message', 12345);
await indexerMeta.writeLogs([errorEntry]);

const expectedQueryStructure = `INSERT INTO ${schemaName}.__logs (block_height, date, timestamp, type, level, message) VALUES ('12345', '${formattedDate}', '${formattedDate}', 'system', 'ERROR', 'Error message')`;
expect(query.mock.calls[0][0]).toEqual(expectedQueryStructure);
});

it('should handle errors when inserting a single log entry', async () => {
query.mockRejectedValueOnce(new Error('Failed to insert log'));

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const logEntry: LogEntry = {
blockHeight: 123,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message'
};

await expect(indexerMeta.writeLogs(logEntry)).rejects.toThrow('Failed to insert log');
const errorEntry = LogEntry.systemError('Error message', 12345);
await expect(indexerMeta.writeLogs([errorEntry])).rejects.toThrow('Failed to insert log');
});

it('should insert a batch of log entries into the database', async () => {
const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');
const infoEntry = LogEntry.systemInfo('Information message');
const logEntries: LogEntry[] = [
{
blockHeight: 123,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message 1'
},
{
blockHeight: 124,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message 2'
}
debugEntry,
infoEntry
];

await indexerMeta.writeLogs(logEntries);
Expand All @@ -86,21 +79,11 @@ describe('IndexerMeta', () => {
query.mockRejectedValueOnce(new Error('Failed to insert batch of logs'));

const indexerMeta = new IndexerMeta(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, genericMockPgClient);
const debugEntry = LogEntry.systemDebug('Debug message');
const infoEntry = LogEntry.systemInfo('Information message');
const logEntries: LogEntry[] = [
{
blockHeight: 123,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message 1'
},
{
blockHeight: 124,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message 2'
}
debugEntry,
infoEntry
];

await expect(indexerMeta.writeLogs(logEntries)).rejects.toThrow('Failed to insert batch of logs');
Expand All @@ -116,15 +99,9 @@ describe('IndexerMeta', () => {

it('should skip log entries with levels lower than the logging level specified in the constructor', async () => {
const indexerMeta = new IndexerMeta(functionName, LogLevel.ERROR, mockDatabaseConnectionParameters, genericMockPgClient);
const logEntry: LogEntry = {
blockHeight: 123,
logTimestamp: new Date(),
logType: LogType.SYSTEM,
logLevel: LogLevel.INFO,
message: 'Test log message'
};

await indexerMeta.writeLogs(logEntry);
const debugEntry = LogEntry.systemDebug('Debug message');

await indexerMeta.writeLogs([debugEntry]);

expect(query).not.toHaveBeenCalled();
});
Expand Down
34 changes: 8 additions & 26 deletions runner/src/indexer-meta/indexer-meta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { wrapError } from '../utility';
import PgClient from '../pg-client';
import { type DatabaseConnectionParameters } from '../provisioner/provisioner';
import { trace } from '@opentelemetry/api';
import type LogEntry from './log-entry';
import { LogLevel } from './log-entry';

export enum IndexerStatus {
PROVISIONING = 'PROVISIONING',
Expand All @@ -11,26 +13,6 @@ export enum IndexerStatus {
STOPPED = 'STOPPED',
}

export interface LogEntry {
blockHeight: number
logTimestamp: Date
logType: LogType
logLevel: LogLevel
message: string
}

export enum LogLevel {
DEBUG = 2,
INFO = 5,
WARN = 6,
ERROR = 8,
}

export enum LogType {
SYSTEM = 'system',
USER = 'user',
}

const METADATA_TABLE_UPSERT = 'INSERT INTO %I.__metadata (attribute, value) VALUES %L ON CONFLICT (attribute) DO UPDATE SET value = EXCLUDED.value RETURNING *';
const STATUS_ATTRIBUTE = 'STATUS';
const LAST_PROCESSED_BLOCK_HEIGHT_ATTRIBUTE = 'LAST_PROCESSED_BLOCK_HEIGHT';
Expand Down Expand Up @@ -67,9 +49,9 @@ export default class IndexerMeta {
}

async writeLogs (
logEntries: LogEntry | LogEntry[],
logEntries: LogEntry[],
): Promise<void> {
const entriesArray = (Array.isArray(logEntries) ? logEntries : [logEntries]).filter(entry => this.shouldLog(entry.logLevel)); ;
const entriesArray = logEntries.filter(entry => this.shouldLog(entry.level));
if (entriesArray.length === 0) return;

const spanMessage = `write log for ${entriesArray.length === 1 ? 'single entry' : `batch of ${entriesArray.length}`} through postgres `;
Expand All @@ -78,10 +60,10 @@ export default class IndexerMeta {
await wrapError(async () => {
const values = entriesArray.map(entry => [
entry.blockHeight,
entry.logTimestamp,
entry.logTimestamp,
entry.logType,
LogLevel[entry.logLevel],
entry.timestamp,
entry.timestamp,
entry.type,
LogLevel[entry.level],
entry.message
]);

Expand Down
92 changes: 92 additions & 0 deletions runner/src/indexer-meta/log-entry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import LogEntry, { LogType, LogLevel } from './log-entry';

describe('LogEntry', () => {
test('create a system debug log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.systemDebug('Debug message', blockHeight);
expect(logEntry.message).toBe('Debug message');
expect(logEntry.level).toBe(LogLevel.DEBUG);
expect(logEntry.type).toBe(LogType.SYSTEM);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a system info log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.systemInfo('Info message', blockHeight);
expect(logEntry.message).toBe('Info message');
expect(logEntry.level).toBe(LogLevel.INFO);
expect(logEntry.type).toBe(LogType.SYSTEM);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a system warn log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.systemWarn('Warn message', blockHeight);
expect(logEntry.message).toBe('Warn message');
expect(logEntry.level).toBe(LogLevel.WARN);
expect(logEntry.type).toBe(LogType.SYSTEM);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a system error log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.systemError('Error message', blockHeight);
expect(logEntry.message).toBe('Error message');
expect(logEntry.level).toBe(LogLevel.ERROR);
expect(logEntry.type).toBe(LogType.SYSTEM);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a user debug log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.userDebug('Debug message', blockHeight);
expect(logEntry.message).toBe('Debug message');
expect(logEntry.level).toBe(LogLevel.DEBUG);
expect(logEntry.type).toBe(LogType.USER);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a user info log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.userInfo('User info message', blockHeight);
expect(logEntry.message).toBe('User info message');
expect(logEntry.level).toBe(LogLevel.INFO);
expect(logEntry.type).toBe(LogType.USER);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a user warn log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.userWarn('User warn message', blockHeight);
expect(logEntry.message).toBe('User warn message');
expect(logEntry.level).toBe(LogLevel.WARN);
expect(logEntry.type).toBe(LogType.USER);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a user error log entry', () => {
const blockHeight = 100;
const logEntry = LogEntry.userError('User error message', blockHeight);
expect(logEntry.message).toBe('User error message');
expect(logEntry.level).toBe(LogLevel.ERROR);
expect(logEntry.type).toBe(LogType.USER);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBe(blockHeight);
});

test('create a system info log entry without blockheight', () => {
const logEntry = LogEntry.systemInfo('Info message');
expect(logEntry.message).toBe('Info message');
expect(logEntry.level).toBe(LogLevel.INFO);
expect(logEntry.type).toBe(LogType.SYSTEM);
expect(logEntry.timestamp).toBeInstanceOf(Date);
expect(logEntry.blockHeight).toBeUndefined();
});
});
60 changes: 60 additions & 0 deletions runner/src/indexer-meta/log-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
export enum LogLevel {
DEBUG = 2,
INFO = 5,
WARN = 6,
ERROR = 8,
}

export enum LogType {
SYSTEM = 'system',
USER = 'user',
}

export default class LogEntry {
public readonly timestamp: Date;

constructor (
public readonly message: string,
public readonly level: LogLevel,
public readonly type: LogType,
public readonly blockHeight?: number
) {
this.timestamp = new Date();
}

static createLog (message: string, level: LogLevel, type: LogType, blockHeight?: number): LogEntry {
return new LogEntry(message, level, type, blockHeight);
}

static systemDebug (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.DEBUG, LogType.SYSTEM, blockHeight);
}

static systemInfo (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.INFO, LogType.SYSTEM, blockHeight);
}

static systemWarn (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.WARN, LogType.SYSTEM, blockHeight);
}

static systemError (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.ERROR, LogType.SYSTEM, blockHeight);
}

static userDebug (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.DEBUG, LogType.USER, blockHeight);
}

static userInfo (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.INFO, LogType.USER, blockHeight);
}

static userWarn (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.WARN, LogType.USER, blockHeight);
}

static userError (message: string, blockHeight?: number): LogEntry {
return LogEntry.createLog(message, LogLevel.ERROR, LogType.USER, blockHeight);
}
}
2 changes: 1 addition & 1 deletion runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { VM } from 'vm2';
import DmlHandler from '../dml-handler/dml-handler';
import type PgClient from '../pg-client';
import { type IndexerBehavior } from '../stream-handler/stream-handler';
import { LogLevel } from '../indexer-meta/indexer-meta';
import { LogLevel } from '../indexer-meta/log-entry';

describe('Indexer unit tests', () => {
const HASURA_ROLE = 'morgs_near';
Expand Down
3 changes: 2 additions & 1 deletion runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import DmlHandler from '../dml-handler/dml-handler';
// import IndexerMeta from '../indexer-meta/indexer-meta';

import { type IndexerBehavior } from '../stream-handler/stream-handler';
import { /* type LogEntry, LogType, */ IndexerStatus, LogLevel } from '../indexer-meta/indexer-meta';
import { IndexerStatus } from '../indexer-meta/indexer-meta';
import { LogLevel } from '../indexer-meta/log-entry';
import { type DatabaseConnectionParameters } from '../provisioner/provisioner';
import { trace, type Span } from '@opentelemetry/api';

Expand Down
3 changes: 1 addition & 2 deletions runner/src/provisioner/provisioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ export default class Provisioner {
async setupPartitionedLogsTable (userName: string, databaseName: string, schemaName: string): Promise<void> {
await wrapError(
async () => {
// TODO: Create logs table
// await this.runLogsSql(databaseName, schemaName);
await this.grantCronAccess(userName);
await this.scheduleLogPartitionJobs(userName, databaseName, schemaName);
},
Expand Down Expand Up @@ -249,7 +249,6 @@ export default class Provisioner {

await this.createSchema(databaseName, schemaName);

// await this.runLogsSql(databaseName, schemaName);
// await this.createMetadataTable(databaseName, schemaName);
await this.runIndexerSql(databaseName, schemaName, databaseSchema);

Expand Down
Loading

0 comments on commit 89dadf5

Please sign in to comment.