Skip to content

Commit

Permalink
Move reptracker to flush
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Zander authored and vogel76 committed Aug 2, 2024
1 parent ff4931b commit 3fd075c
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 63 deletions.
2 changes: 0 additions & 2 deletions hive/db/db_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,6 @@ def ensure_finalize_massive_sync(cls, last_imported_blocks, last_completed_block
if last_imported_blocks > last_completed_blocks:
if cls.db().is_trx_active():
cls.db().query_no_return("COMMIT")
cls.ensure_reputations_recalculated(last_completed_blocks, last_imported_blocks)

is_initial_massive = (last_imported_blocks - last_completed_blocks) > ONE_WEEK_IN_BLOCKS

if is_initial_massive:
Expand Down
8 changes: 8 additions & 0 deletions hive/indexer/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from hive.indexer.block import Block, Operation, OperationType, Transaction, VirtualOperationType
from hive.indexer.custom_op import CustomOp
from hive.indexer.follow import Follow
from hive.indexer.reputations import Reputations
from hive.indexer.hive_db.block import BlockHiveDb
from hive.indexer.notify import Notify
from hive.indexer.payments import Payments
Expand Down Expand Up @@ -53,6 +54,7 @@ class Blocks:
_concurrent_flush = [
('Posts', Posts.flush, Posts),
('PostDataCache', PostDataCache.flush, PostDataCache),
('Reputations', Reputations.flush, Reputations),
('Votes', Votes.flush, Votes),
('Follow', Follow.flush, Follow),
('Reblog', Reblog.flush, Reblog),
Expand All @@ -79,6 +81,7 @@ def setup_own_db_access(shared_db_adapter: Db) -> None:
DbAdapterHolder.open_common_blocks_in_background_processing_db()

PostDataCache.setup_own_db_access(shared_db_adapter, "PostDataCache")
Reputations.setup_own_db_access(shared_db_adapter, "Reputations")
Votes.setup_own_db_access(shared_db_adapter, "Votes")
Follow.setup_own_db_access(shared_db_adapter, "Follow")
Posts.setup_own_db_access(shared_db_adapter, "Posts")
Expand All @@ -93,6 +96,7 @@ def close_own_db_access() -> None:
DbAdapterHolder.close_common_blocks_in_background_processing_db()

PostDataCache.close_own_db_access()
Reputations.close_own_db_access()
Votes.close_own_db_access()
Follow.close_own_db_access()
Posts.close_own_db_access()
Expand Down Expand Up @@ -220,6 +224,10 @@ def process_multi(cls, blocks, is_massive_sync: bool) -> None:
"""Batch-process blocks; wrapped in a transaction."""
time_start = OPSM.start()

if blocks:
Reputations._from_block = blocks[0][0]
Reputations._to_block = blocks[-1][0]

if is_massive_sync:
DbAdapterHolder.common_block_processing_db().query_no_return("START TRANSACTION")

Expand Down
59 changes: 6 additions & 53 deletions hive/indexer/reputations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,21 @@

import logging

from hive.conf import SCHEMA_NAME
from hive.conf import SCHEMA_NAME, REPTRACKER_SCHEMA_NAME
from hive.indexer.db_adapter_holder import DbAdapterHolder
from hive.utils.normalize import escape_characters

log = logging.getLogger(__name__)

CACHED_ITEMS_LIMIT = 200


class Reputations(DbAdapterHolder):
_values = []
_total_values = 0

@classmethod
def process_vote(self, block_num, effective_vote_op):
tuple = f"('{effective_vote_op['author']}', '{effective_vote_op['voter']}', {escape_characters(effective_vote_op['permlink'])}, {effective_vote_op['rshares']}, {block_num})"
self._values.append(tuple)
_from_block= 0
_to_block= 0

@classmethod
def flush(self):
if not self._values:
log.info(f"Written total reputation data records: {self._total_values}")
return 0

sql = f"""
INSERT INTO {SCHEMA_NAME}.hive_reputation_data
(voter_id, author_id, permlink, rshares, block_num)
SELECT (SELECT ha_v.id FROM {SCHEMA_NAME}.hive_accounts ha_v WHERE ha_v.name = t.voter) as voter_id,
(SELECT ha.id FROM {SCHEMA_NAME}.hive_accounts ha WHERE ha.name = t.author) as author_id,
t.permlink as permlink, t.rshares, t.block_num
FROM
(
VALUES
-- author, voter, permlink, rshares, block_num
{{}}
) AS T(author, voter, permlink, rshares, block_num)
"""

self.beginTx()

begin = 0
end = 0
value_limit = 1000
size = len(self._values)
while begin < size:
end = begin + value_limit
if end > size:
end = size

param = ",".join(self._values[begin:end])
query = sql.format(param)
self.db.query_no_return(query)
begin = end

self.commitTx()

n = len(self._values)
self._values.clear()

self._total_values = self._total_values + n

log.info(f"Written total reputation data records: {self._total_values}")
sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:from_block, :to_block));"
self.db.query_no_return(sql_rep, from_block=self._from_block, to_block=self._to_block)

return n
return self._to_block - self._from_block + 1
7 changes: 0 additions & 7 deletions hive/indexer/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ def report_enter_to_stage(current_stage) -> bool:
DbState.ensure_indexes_are_disabled()

self._process_massive_blocks(self._lbound, self._ubound, active_connections_before)

sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:to_block, :from_block));"
self._db.query_no_return(sql_rep, to_block=self._lbound, from_block=self._ubound)
elif application_stage == "MASSIVE_WITH_INDEXES":
DbState.set_massive_sync( True )
if report_enter_to_stage(application_stage):
Expand All @@ -168,8 +165,6 @@ def report_enter_to_stage(current_stage) -> bool:
DbState.ensure_indexes_are_enabled()

self._process_massive_blocks(self._lbound, self._ubound, active_connections_before)
sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:to_block, :from_block));"
self._db.query_no_return(sql_rep, to_block=self._lbound, from_block=self._ubound)
elif application_stage == "live":
self._wait_for_massive_consume() # wait for flushing massive data in thread
DbState.set_massive_sync( False )
Expand All @@ -187,8 +182,6 @@ def report_enter_to_stage(current_stage) -> bool:
log.info(f"[SINGLE] Current system time: {datetime.now().isoformat(sep=' ', timespec='milliseconds')}")

self._process_live_blocks(self._lbound, self._ubound, active_connections_before)
sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:to_block, :from_block));"
self._db.query_no_return(sql_rep, to_block=self._lbound, from_block=self._ubound)
else:
self._on_stop_synchronization(active_connections_before)
assert False, f"Unknown application stage {application_stage}"
Expand Down
2 changes: 1 addition & 1 deletion reputation_tracker

0 comments on commit 3fd075c

Please sign in to comment.