Skip to content

Commit

Permalink
Update to new loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Zander authored and vogel76 committed Aug 2, 2024
1 parent f0b721a commit ff4931b
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docker/docker_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ setup() {

if [ "${SKIP_REPTRACKER}" -eq 0 ]; then
pushd "$reptracker_dir"
./scripts/install_app.sh --postgres-url="${POSTGRES_ADMIN_URL}" --schema="$REPTRACKER_SCHEMA" --is_forking="false" --is_attached="true"
./scripts/install_app.sh --postgres-url="${POSTGRES_ADMIN_URL}" --schema="$REPTRACKER_SCHEMA" --is_forking="false"
popd
fi

Expand Down
9 changes: 0 additions & 9 deletions hive/db/db_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class DbState:
_fk_were_disabled = False
_fk_were_enabled = False
_original_synchronous_commit_mode = None
_reputations_were_recalculated = False

@classmethod
def initialize(cls, enter_massive: bool, schema_upgrade: bool):
Expand Down Expand Up @@ -379,14 +378,6 @@ def ensure_fk_are_enabled(cls):
cls._fk_were_disabled = False
cls._fk_were_enabled = True

@classmethod
def ensure_reputations_recalculated(cls, last_imported, last_completed):
if cls._reputations_were_recalculated:
return

cls.finish_account_reputations(last_imported, last_completed)
cls._reputations_were_recalculated = True

@classmethod
def _finish_hive_posts(cls, db, massive_sync_preconditions, last_imported_block, current_imported_block):
with AutoDbDisposer(db, "finish_hive_posts") as db_mgr:
Expand Down
2 changes: 0 additions & 2 deletions hive/indexer/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ def process_blocks(cls, blocks) -> Tuple[int, int]:
@classmethod
def process_multi(cls, blocks, is_massive_sync: bool) -> None:
"""Batch-process blocks; wrapped in a transaction."""
DB.query_no_return(f"SET SEARCH_PATH TO {REPTRACKER_SCHEMA_NAME};")

time_start = OPSM.start()

if is_massive_sync:
Expand Down
9 changes: 8 additions & 1 deletion hive/indexer/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ def report_enter_to_stage(current_stage) -> bool:
DbState.ensure_indexes_are_disabled()

self._process_massive_blocks(self._lbound, self._ubound, active_connections_before)

sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:to_block, :from_block));"
self._db.query_no_return(sql_rep, to_block=self._lbound, from_block=self._ubound)
elif application_stage == "MASSIVE_WITH_INDEXES":
DbState.set_massive_sync( True )
if report_enter_to_stage(application_stage):
Expand All @@ -165,6 +168,8 @@ def report_enter_to_stage(current_stage) -> bool:
DbState.ensure_indexes_are_enabled()

self._process_massive_blocks(self._lbound, self._ubound, active_connections_before)
sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:to_block, :from_block));"
self._db.query_no_return(sql_rep, to_block=self._lbound, from_block=self._ubound)
elif application_stage == "live":
self._wait_for_massive_consume() # wait for flushing massive data in thread
DbState.set_massive_sync( False )
Expand All @@ -182,6 +187,8 @@ def report_enter_to_stage(current_stage) -> bool:
log.info(f"[SINGLE] Current system time: {datetime.now().isoformat(sep=' ', timespec='milliseconds')}")

self._process_live_blocks(self._lbound, self._ubound, active_connections_before)
sql_rep = f"SET SEARCH_PATH TO '{REPTRACKER_SCHEMA_NAME}'; SELECT reptracker_process_blocks('{REPTRACKER_SCHEMA_NAME}', (:to_block, :from_block));"
self._db.query_no_return(sql_rep, to_block=self._lbound, from_block=self._ubound)
else:
self._on_stop_synchronization(active_connections_before)
assert False, f"Unknown application stage {application_stage}"
Expand Down Expand Up @@ -221,7 +228,7 @@ def _query_for_app_next_block(self) -> Tuple[int, int]:
if self._max_batch:
batch = self._max_batch

result = self._db.query_one( "CALL hive.app_next_iteration( _context => ARRAY['{}', '{}'], _blocks_range => (0,0), _limit => {}, _override_max_batch => {} )"
result = self._db.query_one( "CALL hive.app_next_iteration( _contexts => ARRAY['{}', '{}']::hive.contexts_group, _blocks_range => (0,0), _limit => {}, _override_max_batch => {} )"
.format(SCHEMA_NAME, REPTRACKER_SCHEMA_NAME, limit, batch)
)

Expand Down
2 changes: 1 addition & 1 deletion reputation_tracker

0 comments on commit ff4931b

Please sign in to comment.