From 0f2d9b6c2968b560f4b7763a3ab74f0111117c9b Mon Sep 17 00:00:00 2001 From: Phan Anh Tuan Date: Fri, 13 Oct 2023 15:17:57 +0700 Subject: [PATCH 1/4] fix: update sender in transaction_message; create job to migrate old tx msgs --- ci/config.json.ci | 4 + config.json | 4 + src/common/constant.ts | 5 + src/services/crawl-tx/crawl_tx.service.ts | 4 +- .../job/update_sender_in_tx_message.ts | 140 ++++++++++++++++++ 5 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 src/services/job/update_sender_in_tx_message.ts diff --git a/ci/config.json.ci b/ci/config.json.ci index ba8d743a0..48f14c28f 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -192,5 +192,9 @@ "key": "crawlIbcApp", "millisecondRepeatJob": 2000, "blocksPerCall": 100 + }, + "jobUpdateSenderInTxMessages": { + "millisecondCrawl": 1000, + "blocksPerCall": 1000 } } diff --git a/config.json b/config.json index 34d996a93..22ee115d6 100644 --- a/config.json +++ b/config.json @@ -192,5 +192,9 @@ "key": "crawlIbcApp", "millisecondRepeatJob": 2000, "blocksPerCall": 100 + }, + "jobUpdateSenderInTxMessages": { + "millisecondCrawl": 1000, + "blocksPerCall": 1000 } } diff --git a/src/common/constant.ts b/src/common/constant.ts index a37ae91fb..352d4888c 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -79,6 +79,7 @@ export const BULL_JOB_NAME = { REINDEX_CW20_HISTORY: 'reindex:cw20-history', JOB_CREATE_COMPOSITE_INDEX_ATTR_PARTITION: 'job:create-index-composite-attr-partition', + JOB_UPDATE_SENDER_IN_TX_MESSAGES: 'job:update-sender-in-tx-messages', }; export const SERVICE = { @@ -247,6 +248,10 @@ export const SERVICE = { key: 'ReAssignMsgIndexToEvent', path: 'v1.ReAssignMsgIndexToEvent', }, + UpdateSenderInTxMessages: { + key: 'UpdateSenderInTxMessages', + path: 'v1.UpdateSenderInTxMessages', + }, }, CrawlIBCTaoService: { key: 'CrawlIBCTaoService', diff --git a/src/services/crawl-tx/crawl_tx.service.ts b/src/services/crawl-tx/crawl_tx.service.ts index e90a6657d..0bbcc085f 100644 --- a/src/services/crawl-tx/crawl_tx.service.ts +++ b/src/services/crawl-tx/crawl_tx.service.ts @@ -587,9 +587,9 @@ export default class CrawlTxService extends BullableService { ): string { let result = ''; events.forEach((event: any) => { - if (event.type === eventType) { + if (result === '' && event.type === eventType) { event.attributes.forEach((attribute: any) => { - if (attribute.key === attributeKey) { + if (result === '' && attribute.key === attributeKey) { result = attribute.value; } }); diff --git a/src/services/job/update_sender_in_tx_message.ts b/src/services/job/update_sender_in_tx_message.ts new file mode 100644 index 000000000..5a1118587 --- /dev/null +++ b/src/services/job/update_sender_in_tx_message.ts @@ -0,0 +1,140 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import { Transaction, BlockCheckpoint, TransactionMessage } from '../../models'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { BULL_JOB_NAME, SERVICE } from '../../common'; +import config from '../../../config.json' assert { type: 'json' }; +import knex from '../../common/utils/db_connection'; + +@Service({ + name: SERVICE.V1.JobService.UpdateSenderInTxMessages.key, + version: 1, +}) +export default class UpdateSenderInTxMessages extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + jobName: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + }) + async updateSender(_payload: { lastBlockCrawled: number }) { + const blockCheckpoint = await BlockCheckpoint.query().findOne({ + job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + }); + this.logger.info( + `Update sender in transaction_message table start from block ${blockCheckpoint?.height}` + ); + if (blockCheckpoint?.height === _payload.lastBlockCrawled) { + return; + } + + let lastBlock = + (blockCheckpoint?.height ?? 0) + + config.jobUpdateSenderInTxMessages.blocksPerCall; + if (lastBlock > _payload.lastBlockCrawled) { + lastBlock = _payload.lastBlockCrawled; + } + await knex.transaction(async (trx) => { + const listTx = await Transaction.query() + .withGraphFetched('events.[attributes]') + .modifyGraph('events', (builder) => { + builder.orderBy('id', 'asc'); + }) + .modifyGraph('events.[attributes]', (builder) => { + builder.orderBy('index', 'asc'); + }) + .modifyGraph('messages', (builder) => { + builder.orderBy('id', 'asc'); + }) + .orderBy('id', 'asc') + .where('height', '>=', blockCheckpoint?.height ?? 0) + .andWhere('height', '<', lastBlock) + .transacting(trx); + await Promise.all( + listTx.map((tx) => { + const sender = this._findFirstAttribute( + tx.events, + 'message', + 'sender' + ); + return TransactionMessage.query() + .patch({ + sender, + }) + .where({ tx_id: tx.id }) + .transacting(trx); + }) + ); + + await BlockCheckpoint.query() + .update( + BlockCheckpoint.fromJson({ + job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + height: lastBlock, + }) + ) + .where({ + job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + }); + }); + } + + private _findFirstAttribute( + events: any, + eventType: string, + attributeKey: string + ): string { + let result = ''; + events.forEach((event: any) => { + if (result === '' && event.type === eventType) { + event.attributes.forEach((attribute: any) => { + if (result === '' && attribute.key === attributeKey) { + result = attribute.value; + } + }); + } + }); + if (!result.length) { + throw new Error( + `Could not find attribute ${attributeKey} in event type ${eventType}` + ); + } + return result; + } + + async _start(): Promise { + const blockCheckpoint = await BlockCheckpoint.query().findOne({ + job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + }); + if (!blockCheckpoint) { + await BlockCheckpoint.query().insert({ + job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + height: config.crawlBlock.startBlock, + }); + const crawlBlockCheckpoint = await BlockCheckpoint.query().findOne({ + job_name: BULL_JOB_NAME.CRAWL_BLOCK, + }); + + this.createJob( + BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, + { + lastBlockCrawled: crawlBlockCheckpoint?.height ?? 0, + }, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.jobUpdateSenderInTxMessages.millisecondCrawl, + }, + } + ); + } + return super._start(); + } +} From c777cda4148b0b53164ff1b2b82f62d57453f738 Mon Sep 17 00:00:00 2001 From: Phan Anh Tuan Date: Mon, 16 Oct 2023 13:51:20 +0700 Subject: [PATCH 2/4] fix: use 1 raw query update multiple row transaction_message --- .../job/update_sender_in_tx_message.ts | 67 ++++++++++--------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/src/services/job/update_sender_in_tx_message.ts b/src/services/job/update_sender_in_tx_message.ts index 5a1118587..b64878324 100644 --- a/src/services/job/update_sender_in_tx_message.ts +++ b/src/services/job/update_sender_in_tx_message.ts @@ -1,7 +1,7 @@ /* eslint-disable no-await-in-loop */ import { Service } from '@ourparentcenter/moleculer-decorators-extended'; import { ServiceBroker } from 'moleculer'; -import { Transaction, BlockCheckpoint, TransactionMessage } from '../../models'; +import { Transaction, BlockCheckpoint } from '../../models'; import BullableService, { QueueHandler } from '../../base/bullable.service'; import { BULL_JOB_NAME, SERVICE } from '../../common'; import config from '../../../config.json' assert { type: 'json' }; @@ -37,38 +37,40 @@ export default class UpdateSenderInTxMessages extends BullableService { if (lastBlock > _payload.lastBlockCrawled) { lastBlock = _payload.lastBlockCrawled; } + const listTx = await Transaction.query() + .withGraphFetched('events.[attributes]') + .modifyGraph('events', (builder) => { + builder.orderBy('id', 'asc'); + }) + .modifyGraph('events.[attributes]', (builder) => { + builder.orderBy('index', 'asc'); + }) + .modifyGraph('messages', (builder) => { + builder.orderBy('id', 'asc'); + }) + .orderBy('id', 'asc') + .where('height', '>=', blockCheckpoint?.height ?? 0) + .andWhere('height', '<', lastBlock); + const listUpdates = listTx.map((tx) => { + const sender = this._findFirstAttribute(tx.events, 'message', 'sender'); + return { + tx_id: tx.id, + sender, + }; + }); + await knex.transaction(async (trx) => { - const listTx = await Transaction.query() - .withGraphFetched('events.[attributes]') - .modifyGraph('events', (builder) => { - builder.orderBy('id', 'asc'); - }) - .modifyGraph('events.[attributes]', (builder) => { - builder.orderBy('index', 'asc'); - }) - .modifyGraph('messages', (builder) => { - builder.orderBy('id', 'asc'); - }) - .orderBy('id', 'asc') - .where('height', '>=', blockCheckpoint?.height ?? 0) - .andWhere('height', '<', lastBlock) - .transacting(trx); - await Promise.all( - listTx.map((tx) => { - const sender = this._findFirstAttribute( - tx.events, - 'message', - 'sender' - ); - return TransactionMessage.query() - .patch({ - sender, - }) - .where({ tx_id: tx.id }) - .transacting(trx); - }) - ); + if (listUpdates.length > 0) { + const stringListUpdates = listUpdates + .map((update) => `(${update.tx_id}, '${update.sender}')`) + .join(','); + await knex + .raw( + `UPDATE transaction_message SET sender = temp.sender from (VALUES ${stringListUpdates}) as temp(tx_id, sender) where temp.tx_id = transaction_message.tx_id` + ) + .transacting(trx); + } await BlockCheckpoint.query() .update( BlockCheckpoint.fromJson({ @@ -78,7 +80,8 @@ export default class UpdateSenderInTxMessages extends BullableService { ) .where({ job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES, - }); + }) + .transacting(trx); }); } From 5b048d0628875d117a4a4a0c60bff6762df393ed Mon Sep 17 00:00:00 2001 From: Phan Anh Tuan Date: Mon, 16 Oct 2023 15:45:11 +0700 Subject: [PATCH 3/4] fix: refactor 2 for-loop in findFirstAttribute function --- src/services/crawl-tx/crawl_tx.service.ts | 22 +++++++++++-------- .../job/update_sender_in_tx_message.ts | 22 +++++++++++-------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/services/crawl-tx/crawl_tx.service.ts b/src/services/crawl-tx/crawl_tx.service.ts index 0bbcc085f..2105c542e 100644 --- a/src/services/crawl-tx/crawl_tx.service.ts +++ b/src/services/crawl-tx/crawl_tx.service.ts @@ -586,15 +586,19 @@ export default class CrawlTxService extends BullableService { attributeKey: string ): string { let result = ''; - events.forEach((event: any) => { - if (result === '' && event.type === eventType) { - event.attributes.forEach((attribute: any) => { - if (result === '' && attribute.key === attributeKey) { - result = attribute.value; - } - }); - } - }); + const foundEvent = events.find( + (event: any) => + event.type === eventType && + event.attributes.some( + (attribute: any) => attribute.key === attributeKey + ) + ); + if (foundEvent) { + const foundAttribute = foundEvent.attributes.find( + (attribute: any) => attribute.key === attributeKey + ); + result = foundAttribute.value; + } if (!result.length) { throw new Error( `Could not find attribute ${attributeKey} in event type ${eventType}` diff --git a/src/services/job/update_sender_in_tx_message.ts b/src/services/job/update_sender_in_tx_message.ts index b64878324..ca26a6342 100644 --- a/src/services/job/update_sender_in_tx_message.ts +++ b/src/services/job/update_sender_in_tx_message.ts @@ -91,15 +91,19 @@ export default class UpdateSenderInTxMessages extends BullableService { attributeKey: string ): string { let result = ''; - events.forEach((event: any) => { - if (result === '' && event.type === eventType) { - event.attributes.forEach((attribute: any) => { - if (result === '' && attribute.key === attributeKey) { - result = attribute.value; - } - }); - } - }); + const foundEvent = events.find( + (event: any) => + event.type === eventType && + event.attributes.some( + (attribute: any) => attribute.key === attributeKey + ) + ); + if (foundEvent) { + const foundAttribute = foundEvent.attributes.find( + (attribute: any) => attribute.key === attributeKey + ); + result = foundAttribute.value; + } if (!result.length) { throw new Error( `Could not find attribute ${attributeKey} in event type ${eventType}` From c63d2995c2661be2a52bbdbc6f4927fd998d4ed1 Mon Sep 17 00:00:00 2001 From: Phan Anh Tuan Date: Tue, 17 Oct 2023 15:18:10 +0700 Subject: [PATCH 4/4] refactor: update name job (add .service) --- ... => update_sender_in_tx_message.service.ts} | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) rename src/services/job/{update_sender_in_tx_message.ts => update_sender_in_tx_message.service.ts} (93%) diff --git a/src/services/job/update_sender_in_tx_message.ts b/src/services/job/update_sender_in_tx_message.service.ts similarity index 93% rename from src/services/job/update_sender_in_tx_message.ts rename to src/services/job/update_sender_in_tx_message.service.ts index ca26a6342..c741de89e 100644 --- a/src/services/job/update_sender_in_tx_message.ts +++ b/src/services/job/update_sender_in_tx_message.service.ts @@ -52,11 +52,19 @@ export default class UpdateSenderInTxMessages extends BullableService { .where('height', '>=', blockCheckpoint?.height ?? 0) .andWhere('height', '<', lastBlock); const listUpdates = listTx.map((tx) => { - const sender = this._findFirstAttribute(tx.events, 'message', 'sender'); - return { - tx_id: tx.id, - sender, - }; + try { + const sender = this._findFirstAttribute(tx.events, 'message', 'sender'); + return { + tx_id: tx.id, + sender, + }; + } catch (error) { + this.logger.warn('Tx error not has message.sender: ', tx.hash); + return { + tx_id: tx.id, + sender: '', + }; + } }); await knex.transaction(async (trx) => {