Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update sender in transaction_message (main) #419

Merged
merged 5 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@
"millisecondRepeatJob": 2000,
"blocksPerCall": 100
},
"jobUpdateSenderInTxMessages": {
"millisecondCrawl": 1000,
"blocksPerCall": 1000
},
"crawlIbcIcs20": {
"key": "crawlIbcIcs20",
"millisecondRepeatJob": 2000,
Expand Down
4 changes: 4 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@
"millisecondRepeatJob": 2000,
"blocksPerCall": 100
},
"jobUpdateSenderInTxMessages": {
"millisecondCrawl": 1000,
"blocksPerCall": 1000
},
"crawlIbcIcs20": {
"key": "crawlIbcIcs20",
"millisecondRepeatJob": 2000,
Expand Down
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export const BULL_JOB_NAME = {
CRAWL_IBC_ICS20: 'crawl:ibc-ics20',
JOB_CREATE_COMPOSITE_INDEX_ATTR_PARTITION:
'job:create-index-composite-attr-partition',
JOB_UPDATE_SENDER_IN_TX_MESSAGES: 'job:update-sender-in-tx-messages',
};

export const SERVICE = {
Expand Down Expand Up @@ -248,6 +249,10 @@ export const SERVICE = {
key: 'ReAssignMsgIndexToEvent',
path: 'v1.ReAssignMsgIndexToEvent',
},
UpdateSenderInTxMessages: {
key: 'UpdateSenderInTxMessages',
path: 'v1.UpdateSenderInTxMessages',
},
},
CrawlIBCTaoService: {
key: 'CrawlIBCTaoService',
Expand Down
22 changes: 13 additions & 9 deletions src/services/crawl-tx/crawl_tx.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,15 +586,19 @@ export default class CrawlTxService extends BullableService {
attributeKey: string
): string {
let result = '';
events.forEach((event: any) => {
if (event.type === eventType) {
event.attributes.forEach((attribute: any) => {
if (attribute.key === attributeKey) {
result = attribute.value;
}
});
}
});
const foundEvent = events.find(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, nhẽ ra thì for thường xong rồi return luôn cũng được đúng ko? cơ mà để sau cũng được, tách hẳn cái hàm này cho vào ultility hay helper gì đó

(event: any) =>
event.type === eventType &&
event.attributes.some(
(attribute: any) => attribute.key === attributeKey
)
);
if (foundEvent) {
const foundAttribute = foundEvent.attributes.find(
(attribute: any) => attribute.key === attributeKey
);
result = foundAttribute.value;
}
if (!result.length) {
throw new Error(
`Could not find attribute ${attributeKey} in event type ${eventType}`
Expand Down
155 changes: 155 additions & 0 deletions src/services/job/update_sender_in_tx_message.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/* eslint-disable no-await-in-loop */
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { ServiceBroker } from 'moleculer';
import { Transaction, BlockCheckpoint } from '../../models';
import BullableService, { QueueHandler } from '../../base/bullable.service';
import { BULL_JOB_NAME, SERVICE } from '../../common';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';

@Service({
name: SERVICE.V1.JobService.UpdateSenderInTxMessages.key,
version: 1,
})
export default class UpdateSenderInTxMessages extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

@QueueHandler({
queueName: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
jobName: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
})
async updateSender(_payload: { lastBlockCrawled: number }) {
const blockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
});
this.logger.info(
`Update sender in transaction_message table start from block ${blockCheckpoint?.height}`
);
if (blockCheckpoint?.height === _payload.lastBlockCrawled) {
return;
}

let lastBlock =
(blockCheckpoint?.height ?? 0) +
config.jobUpdateSenderInTxMessages.blocksPerCall;
if (lastBlock > _payload.lastBlockCrawled) {
lastBlock = _payload.lastBlockCrawled;
}
const listTx = await Transaction.query()
.withGraphFetched('events.[attributes]')
.modifyGraph('events', (builder) => {
builder.orderBy('id', 'asc');
})
.modifyGraph('events.[attributes]', (builder) => {
builder.orderBy('index', 'asc');
})
.modifyGraph('messages', (builder) => {
builder.orderBy('id', 'asc');
})
.orderBy('id', 'asc')
.where('height', '>=', blockCheckpoint?.height ?? 0)
.andWhere('height', '<', lastBlock);
const listUpdates = listTx.map((tx) => {
try {
const sender = this._findFirstAttribute(tx.events, 'message', 'sender');
return {
tx_id: tx.id,
sender,
};
} catch (error) {
this.logger.warn('Tx error not has message.sender: ', tx.hash);
return {
tx_id: tx.id,
sender: '',
};
}
});

await knex.transaction(async (trx) => {
if (listUpdates.length > 0) {
const stringListUpdates = listUpdates
.map((update) => `(${update.tx_id}, '${update.sender}')`)
.join(',');

await knex
.raw(
`UPDATE transaction_message SET sender = temp.sender from (VALUES ${stringListUpdates}) as temp(tx_id, sender) where temp.tx_id = transaction_message.tx_id`
)
.transacting(trx);
}
await BlockCheckpoint.query()
.update(
BlockCheckpoint.fromJson({
job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
height: lastBlock,
})
)
.where({
job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
})
.transacting(trx);
});
}

private _findFirstAttribute(
events: any,
eventType: string,
attributeKey: string
): string {
let result = '';
const foundEvent = events.find(
(event: any) =>
event.type === eventType &&
event.attributes.some(
(attribute: any) => attribute.key === attributeKey
)
);
if (foundEvent) {
const foundAttribute = foundEvent.attributes.find(
(attribute: any) => attribute.key === attributeKey
);
result = foundAttribute.value;
}
if (!result.length) {
throw new Error(
`Could not find attribute ${attributeKey} in event type ${eventType}`
);
}
return result;
}

async _start(): Promise<void> {
const blockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
});
if (!blockCheckpoint) {
await BlockCheckpoint.query().insert({
job_name: BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
height: config.crawlBlock.startBlock,
});
const crawlBlockCheckpoint = await BlockCheckpoint.query().findOne({
job_name: BULL_JOB_NAME.CRAWL_BLOCK,
});

this.createJob(
BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
BULL_JOB_NAME.JOB_UPDATE_SENDER_IN_TX_MESSAGES,
{
lastBlockCrawled: crawlBlockCheckpoint?.height ?? 0,
},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.jobUpdateSenderInTxMessages.millisecondCrawl,
},
}
);
}
return super._start();
}
}