Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/table asset main #716

Merged
merged 13 commits into from
Mar 25, 2024
4 changes: 4 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,9 @@
"jobUpdateTxCountInBlock": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
},
"jobUpdateAssets": {
"millisecondRepeatJob": 10000,
"lcdRecordGet": 5
}
}
4 changes: 4 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,9 @@
"key": "jobUpdateTxCountInBlock",
"millisecondCrawl": 1000,
"blocksPerCall": 100
},
"jobUpdateAssets": {
"millisecondRepeatJob": 10000,
"lcdRecordGet": 5
}
}
19 changes: 19 additions & 0 deletions migrations/20240110065150_create_asset_model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable('asset', (table) => {
table.increments('id').primary();
table.string('denom').unique().notNullable();
table.string('decimal');
table.string('name').index();
table.string('type').index().notNullable();
table.float('price');
table.decimal('total_supply', 80, 0);
table.string('origin_id').index();
table.timestamp('updated_at').notNullable().defaultTo(knex.raw('now()'));
});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTable('asset');
}
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export const BULL_JOB_NAME = {
JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT:
'job:create-transaction-message-constraint',
JOB_UPDATE_TX_COUNT_IN_BLOCK: 'job:update-tx-count-in-block',
JOB_UPDATE_ASSETS: 'job:update-assets',
};

export const SERVICE = {
Expand Down Expand Up @@ -345,6 +346,10 @@ export const SERVICE = {
key: 'UpdateTxCountInBlock',
path: 'v1.UpdateTxCountInBlock',
},
UpdateAssets: {
key: 'UpdateAssets',
path: 'v1.UpdateAssets',
},
},
CrawlIBCTaoService: {
key: 'CrawlIBCTaoService',
Expand Down
41 changes: 41 additions & 0 deletions src/models/asset.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* eslint-disable import/no-cycle */
import BaseModel from './base';
import { IbcChannel } from './ibc_channel';

export class Asset extends BaseModel {
id!: number;

denom!: string;

decimal!: string;

name!: string;

type!: string;

price!: string;

total_supply!: string;

origin_id!: string;

updated_at!: Date;

ibc_channel!: IbcChannel;

static get tableName() {
return 'asset';
}

static TYPE = {
CW20_TOKEN: 'CW20_TOKEN',
NATIVE: 'NATIVE',
IBC_TOKEN: 'IBC_TOKEN',
FACTORY_TOKEN: 'FACTORY_TOKEN',
};

static PREFIX = {
IBC: 'ibc/',
FACTORY: 'factory/',
};
}
238 changes: 238 additions & 0 deletions src/services/job/update_assets.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import { fromBase64, toHex } from '@cosmjs/encoding';
import { JsonRpcSuccessResponse } from '@cosmjs/json-rpc';
import { createJsonRpcRequest } from '@cosmjs/tendermint-rpc/build/jsonrpc';
import { cosmos, ibc } from '@horoscope-v2/sei-js-proto';
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import Long from 'long';
import { ServiceBroker } from 'moleculer';
import _ from 'lodash';
import { HttpBatchClient } from '@cosmjs/tendermint-rpc';
import config from '../../../config.json' assert { type: 'json' };
import BullableService, { QueueHandler } from '../../base/bullable.service';
import {
ABCI_QUERY_PATH,
BULL_JOB_NAME,
SERVICE,
getHttpBatchClient,
getLcdClient,
} from '../../common';
import { Cw20Contract } from '../../models';
import { Asset } from '../../models/asset';

@Service({
name: SERVICE.V1.JobService.UpdateAssets.key,
version: 1,
})
export default class UpdateAssetsJob extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

@QueueHandler({
queueName: BULL_JOB_NAME.JOB_UPDATE_ASSETS,
jobName: BULL_JOB_NAME.JOB_UPDATE_ASSETS,
})
async jobUpdateAssets() {
const coinAssets = await this.queryRpcAssets();
const originAssets = await this.queryRpcOriginAssets(coinAssets);
const cw20Assets = await Cw20Contract.query()
.joinRelated('smart_contract')
.select(
'smart_contract.address',
'cw20_contract.total_supply',
'cw20_contract.id as cw20_contract_id',
'cw20_contract.decimal',
'cw20_contract.name'
);
const assets: Asset[] = [];
assets.push(
...originAssets.map((originAsset) => {
let type = null;
if (originAsset.denom.startsWith(Asset.PREFIX.IBC)) {
type = Asset.TYPE.IBC_TOKEN;
} else if (originAsset.denom.startsWith(Asset.PREFIX.FACTORY)) {
type = Asset.TYPE.FACTORY_TOKEN;
} else {
type = Asset.TYPE.NATIVE;
}
return Asset.fromJson({
denom: originAsset.denom,
type,
total_supply: originAsset.amount,
updated_at: new Date().toISOString(),
origin_id: originAsset.origin,
});
}),
...cw20Assets.map((cw20Asset) =>
Asset.fromJson({
denom: cw20Asset.address,
type: Asset.TYPE.CW20_TOKEN,
decimal: cw20Asset.decimal,
name: cw20Asset.name,
total_supply: cw20Asset.total_supply,
origin_id: cw20Asset.cw20_contract_id,
updated_at: new Date().toISOString(),
})
)
);
if (assets.length > 0) {
await Asset.query().insert(assets).onConflict('denom').merge();
}
}

async queryRpcAssets() {
const lcdClient = await getLcdClient();
const assets: any[] = [];
const countTotal = parseInt(
(
await lcdClient.cosmos.cosmos.bank.v1beta1.totalSupply({
pagination: {
count_total: true,
},
})
).pagination.total,
10
);
const httpBatchClient = getHttpBatchClient();
const batchQueries: any = [];
for (
let index = 0;
index < Math.ceil(countTotal / config.jobUpdateAssets.lcdRecordGet);
index += 1
) {
batchQueries.push(
httpBatchClient.execute(
createJsonRpcRequest('abci_query', {
path: '/cosmos.bank.v1beta1.Query/TotalSupply',
data: toHex(
cosmos.bank.v1beta1.QueryTotalSupplyRequest.encode({
pagination: {
key: new Uint8Array(),
limit: Long.fromInt(config.jobUpdateAssets.lcdRecordGet),
offset: Long.fromInt(
index * config.jobUpdateAssets.lcdRecordGet
),
countTotal: false,
reverse: false,
},
}).finish()
),
})
)
);
}
const resultTotalSupply: JsonRpcSuccessResponse[] = await Promise.all(
batchQueries
);
resultTotalSupply.forEach((res) => {
assets.push(
...cosmos.bank.v1beta1.QueryTotalSupplyResponse.decode(
fromBase64(res.result.response.value)
).supply
);
});
const coinAssetsKeyByDenom = _.keyBy(assets, 'denom');
const missingDenomAssets = (
await Asset.query()
.whereIn('type', [
Asset.TYPE.IBC_TOKEN,
Asset.TYPE.FACTORY_TOKEN,
Asset.TYPE.NATIVE,
])
.andWhereNot('total_supply', '0')
).filter((asset) => !coinAssetsKeyByDenom[asset.denom]);
// query missing assets by denom
const resultSupplyOf: JsonRpcSuccessResponse[] =
await this.queryRpcMissingAssetByDenom(
missingDenomAssets.map((e) => e.denom),
httpBatchClient
);
resultSupplyOf.forEach((res) => {
assets.push(
cosmos.bank.v1beta1.QuerySupplyOfResponse.decode(
fromBase64(res.result.response.value)
).amount
);
});
return assets;
}

// Query missing assets by denom
async queryRpcMissingAssetByDenom(
denoms: string[],
httpBatchClient: HttpBatchClient
) {
const batchQueries: any = denoms.map((denom) =>
httpBatchClient.execute(
createJsonRpcRequest('abci_query', {
path: '/cosmos.bank.v1beta1.Query/SupplyOf',
data: toHex(
cosmos.bank.v1beta1.QuerySupplyOfRequest.encode({
denom,
}).finish()
),
})
)
);
return Promise.all(batchQueries);
}

async queryRpcOriginAssets(
coinAssets: {
denom: string;
amount: string;
origin: string | undefined;
}[]
) {
const httpBatchClient = getHttpBatchClient();
const batchQueries: any = [];
coinAssets.forEach((coinAsset) => {
batchQueries.push(
httpBatchClient.execute(
createJsonRpcRequest('abci_query', {
path: ABCI_QUERY_PATH.DENOM_TRACE,
data: toHex(
ibc.applications.transfer.v1.QueryDenomTraceRequest.encode({
hash: coinAsset.denom.substring(4),
}).finish()
),
})
)
);
});
const resultIbcDenom: JsonRpcSuccessResponse[] = await Promise.all(
batchQueries
);
resultIbcDenom.forEach((res, index) => {
let origin;
if (res.result.response.value) {
const path =
ibc.applications.transfer.v1.QueryDenomTraceResponse.decode(
fromBase64(res.result.response.value)
).denomTrace?.path;
origin = path?.split('/')[1];
}
// eslint-disable-next-line no-param-reassign
coinAssets[index].origin = origin;
});
return coinAssets;
}

public async _start(): Promise<void> {
await this.createJob(
BULL_JOB_NAME.JOB_UPDATE_ASSETS,
BULL_JOB_NAME.JOB_UPDATE_ASSETS,
{},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.jobUpdateAssets.millisecondRepeatJob,
},
}
);
return super._start();
}
}
Loading
Loading