Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ refactor indexer service so state actually works #120

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 57 additions & 17 deletions src/modules/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import { OnEvent } from '@nestjs/event-emitter';
@Injectable()
export class IndexerService {
private readonly logger = new LokiLogger(IndexerService.name);
private processingBlocks: boolean = false;

public nextBlockToSync: number;
private processingBlocks = false;

constructor(
private readonly prisma: PrismaService,
Expand All @@ -52,13 +53,18 @@ export class IndexerService {
async onApplicationBootstrap() {
await this.nodeApi.getAndSetNodeInfo();
await this.setNextBlockandState();
await this.executeStartUpCommands();
setImmediate(() => {
this.executeStartUpCommands().catch((error) => {
this.logger.error('Error executing startup commands', error);
});
});
}

private async executeStartUpCommands() {
if (this.state.get(Modules.INDEXER) !== IndexerState.START_UP) return;
await this.commandBus.execute(new IndexGenesisBlockCommand());
await this.commandBus.execute(new IndexKnownAccountsCommand());
this.state.set(Modules.INDEXER, IndexerState.STARTED);
}

private async executeBlockEventCommands(
Expand Down Expand Up @@ -188,26 +194,32 @@ export class IndexerService {
const newBlockHeight = newBlockData.blockHeader.height;
const state = this.state.get(Modules.INDEXER);

if (state === IndexerState.SYNCING)
return this.logger.log(`Syncing: Current height ${this.nextBlockToSync}`);
if (this.isStartup(state)) return;
if (this.isSyncing(state)) return;
if (this.isProcessingBlocks()) return;

if (this.processingBlocks)
return this.logger.log(`Already processing blocks: Current height ${this.nextBlockToSync}`);
if (this.shouldSync(newBlockHeight, state)) {
this.startSyncing();
return;
}

// will go back to syncing state if received block is greather then `nextBlockToSync`
if (newBlockHeight > this.nextBlockToSync || state === IndexerState.RESTART) {
this.state.set(Modules.INDEXER, IndexerState.SYNCING);
await this.processNewBlock(newBlockData, newBlockHeight);
}

setImmediate(() => {
this.syncWithNode().catch((error) => {
this.state.set(Modules.INDEXER, IndexerState.RESTART);
this.logger.error('Error syncing with node, will retry', error);
});
private startSyncing(): void {
this.state.set(Modules.INDEXER, IndexerState.SYNCING);
setImmediate(() => {
this.syncWithNode().catch((error) => {
this.state.set(Modules.INDEXER, IndexerState.RESTART);
this.logger.error('Error syncing with node, will retry', error);
});
});
}

return;
}

private async processNewBlock(
newBlockData: NewBlockEvent,
newBlockHeight: number,
): Promise<void> {
const block = await this.nodeApi.invokeApi<Block>(NodeApi.CHAIN_GET_BLOCK_BY_ID, {
id: newBlockData.blockHeader.id,
});
Expand All @@ -219,6 +231,34 @@ export class IndexerService {
]);
}

private isStartup(state: IndexerState): boolean {
if (state === IndexerState.START_UP) {
this.logger.log('Startup commands running');
return true;
}
return false;
}

private isSyncing(state: IndexerState): boolean {
if (state === IndexerState.SYNCING) {
this.logger.log(`Syncing: Current height ${this.nextBlockToSync}`);
return true;
}
return false;
}

private isProcessingBlocks(): boolean {
if (this.processingBlocks) {
this.logger.log(`Already processing blocks: Current height ${this.nextBlockToSync}`);
return true;
}
return false;
}

private shouldSync(newBlockHeight: number, state: IndexerState): boolean {
return newBlockHeight > this.nextBlockToSync || state === IndexerState.RESTART;
}

private async updateNextBlockToSync(height: number): Promise<void> {
const nextBlockToSync = await this.prisma.nextBlockToSync.update({
where: { id: KEY_NEXT_BLOCK_TO_SYNC },
Expand Down
1 change: 1 addition & 0 deletions src/modules/state/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum Modules {

export enum IndexerState {
START_UP,
STARTED,
RESTART,
SYNCING,
INDEXING,
Expand Down
Loading