Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce block chain call on indexer batch #593

Merged
merged 2 commits into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion batch/indexer_issue_redeem.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
from app.model.db import (
Account,
IDXIssueRedeem,
IDXIssueRedeemBlockNumber,
IDXIssueRedeemEventType,
Expand Down Expand Up @@ -105,7 +106,15 @@ async def __get_token_list(self, db_session: AsyncSession):
record[0]
for record in (
await db_session.execute(
select(Token.token_address).where(Token.token_status == 1)
select(Token.token_address)
.join(
Account,
and_(
Account.issuer_address == Token.issuer_address,
Account.is_deleted == False,
),
)
.where(Token.token_status == 1)
)
)
.tuples()
Expand Down
45 changes: 33 additions & 12 deletions batch/indexer_personal_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
from app.model.blockchain import PersonalInfoContract
from app.model.db import Account, IDXPersonalInfo, IDXPersonalInfoBlockNumber, Token
from app.utils.contract_utils import AsyncContractUtils
from app.model.blockchain import (
IbetShareContract,
IbetStraightBondContract,
PersonalInfoContract,
)
from app.model.db import (
Account,
IDXPersonalInfo,
IDXPersonalInfoBlockNumber,
Token,
TokenType,
)
from app.utils.web3_utils import AsyncWeb3Wrapper
from batch import batch_log
from config import INDEXER_BLOCK_LOT_MAX_SIZE, INDEXER_SYNC_INTERVAL, ZERO_ADDRESS
Expand Down Expand Up @@ -95,18 +104,30 @@ async def process(self):
async def __refresh_personal_info_list(self, db_session: AsyncSession):
self.personal_info_contract_list.clear()
_tokens: Sequence[Token] = (
await db_session.scalars(select(Token).where(Token.token_status == 1))
await db_session.scalars(
select(Token)
.join(
Account,
and_(
Account.issuer_address == Token.issuer_address,
Account.is_deleted == False,
),
)
.where(Token.token_status == 1)
)
).all()
tmp_list = []
for _token in _tokens:
abi = _token.abi
token_contract = web3.eth.contract(address=_token.token_address, abi=abi)
personal_info_address = await AsyncContractUtils.call_function(
contract=token_contract,
function_name="personalInfoAddress",
args=(),
default_returns=ZERO_ADDRESS,
)
personal_info_address = ZERO_ADDRESS
if _token.type == TokenType.IBET_STRAIGHT_BOND.value:
bond_token = IbetStraightBondContract(_token.token_address)
await bond_token.get()
personal_info_address = bond_token.personal_info_contract_address
elif _token.type == TokenType.IBET_SHARE.value:
share_token = IbetShareContract(_token.token_address)
await share_token.get()
personal_info_address = share_token.personal_info_contract_address

if personal_info_address != ZERO_ADDRESS:
tmp_list.append(
{
Expand Down
65 changes: 29 additions & 36 deletions batch/indexer_position_bond.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
from app.model.blockchain import IbetExchangeInterface
from app.model.blockchain import IbetExchangeInterface, IbetStraightBondContract
from app.model.db import (
Account,
IDXLock,
IDXLockedPosition,
IDXPosition,
Expand Down Expand Up @@ -123,7 +124,15 @@ async def __get_contract_list(self, db_session: AsyncSession):
record[0]
for record in (
await db_session.execute(
select(Token.token_address).where(
select(Token.token_address)
.join(
Account,
and_(
Account.issuer_address == Token.issuer_address,
Account.is_deleted == False,
),
)
.where(
and_(
Token.type == TokenType.IBET_STRAIGHT_BOND,
Token.token_status == 1,
Expand Down Expand Up @@ -160,14 +169,10 @@ async def __get_contract_list(self, db_session: AsyncSession):

_exchange_list_tmp = []
for token_contract in self.token_list.values():
tradable_exchange_address = await AsyncContractUtils.call_function(
contract=token_contract,
function_name="tradableExchange",
args=(),
default_returns=ZERO_ADDRESS,
)
if tradable_exchange_address != ZERO_ADDRESS:
_exchange_list_tmp.append(tradable_exchange_address)
bond_token = IbetStraightBondContract(token_contract.address)
await bond_token.get()
if bond_token.tradable_exchange_contract_address != ZERO_ADDRESS:
_exchange_list_tmp.append(bond_token.tradable_exchange_contract_address)

# Remove duplicate exchanges from a list
self.exchange_address_list = list(set(_exchange_list_tmp))
Expand Down Expand Up @@ -215,12 +220,9 @@ async def __sync_issuer(self, db_session: AsyncSession):
"""Synchronize issuer position"""
for token in self.token_list.values():
try:
issuer_address = await AsyncContractUtils.call_function(
contract=token,
function_name="owner",
args=(),
default_returns=ZERO_ADDRESS,
)
bond_token = IbetStraightBondContract(token.address)
await bond_token.get()
issuer_address = bond_token.issuer_address
balance, pending_transfer = await self.__get_account_balance_token(
token, issuer_address
)
Expand Down Expand Up @@ -396,12 +398,9 @@ async def __sync_lock(

# Insert Notification
if len(events) > 0:
issuer_address = await AsyncContractUtils.call_function(
contract=token,
function_name="owner",
args=(),
default_returns=ZERO_ADDRESS,
)
bond_token = IbetStraightBondContract(token.address)
await bond_token.get()
issuer_address = bond_token.issuer_address
for event in events:
args = event["args"]
account_address = args.get("accountAddress", "")
Expand Down Expand Up @@ -508,12 +507,9 @@ async def __sync_unlock(

# Insert Notification
if len(events) > 0:
issuer_address = await AsyncContractUtils.call_function(
contract=token,
function_name="owner",
args=(),
default_returns=ZERO_ADDRESS,
)
bond_token = IbetStraightBondContract(token.address)
await bond_token.get()
issuer_address = bond_token.issuer_address
for event in events:
args = event["args"]
account_address = args.get("accountAddress", "")
Expand Down Expand Up @@ -1239,22 +1235,19 @@ async def __get_account_balance_all(token_contract, account_address: str):
args=(account_address,),
default_returns=0,
),
AsyncContractUtils.call_function(
contract=token_contract,
function_name="tradableExchange",
args=(),
default_returns=ZERO_ADDRESS,
),
max_concurrency=3,
)
balance, pending_transfer, tradable_exchange_address = (
balance, pending_transfer = (
tasks[0].result(),
tasks[1].result(),
tasks[2].result(),
)
except ExceptionGroup:
raise ServiceUnavailableError

bond_token = IbetStraightBondContract(token_contract.address)
await bond_token.get()
tradable_exchange_address = bond_token.tradable_exchange_contract_address

if tradable_exchange_address != ZERO_ADDRESS:
exchange_contract = IbetExchangeInterface(tradable_exchange_address)
exchange_contract_balance = await exchange_contract.get_account_balance(
Expand Down
67 changes: 31 additions & 36 deletions batch/indexer_position_share.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
from app.model.blockchain import IbetExchangeInterface
from app.model.blockchain import IbetExchangeInterface, IbetShareContract
from app.model.db import (
Account,
IDXLock,
IDXLockedPosition,
IDXPosition,
Expand Down Expand Up @@ -123,7 +124,15 @@ async def __get_contract_list(self, db_session: AsyncSession):
record[0]
for record in (
await db_session.execute(
select(Token.token_address).where(
select(Token.token_address)
.join(
Account,
and_(
Account.issuer_address == Token.issuer_address,
Account.is_deleted == False,
),
)
.where(
and_(
Token.type == TokenType.IBET_SHARE,
Token.token_status == 1,
Expand Down Expand Up @@ -160,14 +169,12 @@ async def __get_contract_list(self, db_session: AsyncSession):

_exchange_list_tmp = []
for token_contract in self.token_list.values():
tradable_exchange_address = await AsyncContractUtils.call_function(
contract=token_contract,
function_name="tradableExchange",
args=(),
default_returns=ZERO_ADDRESS,
)
if tradable_exchange_address != ZERO_ADDRESS:
_exchange_list_tmp.append(tradable_exchange_address)
share_token = IbetShareContract(token_contract.address)
await share_token.get()
if share_token.tradable_exchange_contract_address != ZERO_ADDRESS:
_exchange_list_tmp.append(
share_token.tradable_exchange_contract_address
)

# Remove duplicate exchanges from a list
self.exchange_address_list = list(set(_exchange_list_tmp))
Expand Down Expand Up @@ -215,12 +222,9 @@ async def __sync_issuer(self, db_session: AsyncSession):
"""Synchronize issuer position"""
for token in self.token_list.values():
try:
issuer_address = await AsyncContractUtils.call_function(
contract=token,
function_name="owner",
args=(),
default_returns=ZERO_ADDRESS,
)
share_token = IbetShareContract(token.address)
await share_token.get()
issuer_address = share_token.issuer_address
balance, pending_transfer = await self.__get_account_balance_token(
token, issuer_address
)
Expand Down Expand Up @@ -396,12 +400,9 @@ async def __sync_lock(

# Insert Notification
if len(events) > 0:
issuer_address = await AsyncContractUtils.call_function(
contract=token,
function_name="owner",
args=(),
default_returns=ZERO_ADDRESS,
)
share_token = IbetShareContract(token.address)
await share_token.get()
issuer_address = share_token.issuer_address
for event in events:
args = event["args"]
account_address = args.get("accountAddress", "")
Expand Down Expand Up @@ -508,12 +509,9 @@ async def __sync_unlock(

# Insert Notification
if len(events) > 0:
issuer_address = await AsyncContractUtils.call_function(
contract=token,
function_name="owner",
args=(),
default_returns=ZERO_ADDRESS,
)
share_token = IbetShareContract(token.address)
await share_token.get()
issuer_address = share_token.issuer_address
for event in events:
args = event["args"]
account_address = args.get("accountAddress", "")
Expand Down Expand Up @@ -1239,22 +1237,19 @@ async def __get_account_balance_all(token_contract, account_address: str):
args=(account_address,),
default_returns=0,
),
AsyncContractUtils.call_function(
contract=token_contract,
function_name="tradableExchange",
args=(),
default_returns=ZERO_ADDRESS,
),
max_concurrency=3,
)
balance, pending_transfer, tradable_exchange_address = (
balance, pending_transfer = (
tasks[0].result(),
tasks[1].result(),
tasks[2].result(),
)
except ExceptionGroup:
raise ServiceUnavailableError

share_token = IbetShareContract(token_contract.address)
await share_token.get()
tradable_exchange_address = share_token.tradable_exchange_contract_address

if tradable_exchange_address != ZERO_ADDRESS:
exchange_contract = IbetExchangeInterface(tradable_exchange_address)
exchange_contract_balance = await exchange_contract.get_account_balance(
Expand Down
13 changes: 10 additions & 3 deletions batch/indexer_token_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
from datetime import timedelta, timezone
from typing import Sequence

from sqlalchemy import select
from sqlalchemy import and_, select
from sqlalchemy.exc import SQLAlchemyError

from app.database import BatchAsyncSessionLocal
from app.exceptions import ServiceUnavailableError
from app.model.blockchain import IbetShareContract, IbetStraightBondContract
from app.model.db import Token, TokenType
from app.model.db import Account, Token, TokenType
from batch import batch_log
from config import INDEXER_SYNC_INTERVAL

Expand All @@ -47,7 +47,14 @@ async def process():
(
await db_session.execute(
select(Token.type, Token.token_address)
.filter(Token.token_status == 1)
.join(
Account,
and_(
Account.issuer_address == Token.issuer_address,
Account.is_deleted == False,
),
)
.where(Token.token_status == 1)
.order_by(Token.created)
)
)
Expand Down
Loading
Loading