diff --git a/src/main.py b/src/main.py index 00660b5..934ae06 100644 --- a/src/main.py +++ b/src/main.py @@ -405,6 +405,9 @@ def primary_iter(self, db_state, zk_state): self.checks['primary_switch'] = 0 + # release replication source locks + self._acquire_replication_source_slot_lock(None) + self._handle_slots() self._store_replics_info(db_state, zk_state) @@ -603,7 +606,7 @@ def replica_return(self, db_state, zk_state): # Try to resume WAL replaying, it can be paused earlier self.db.pg_wal_replay_resume() - if not self._check_archive_recovery(limit) and not self._wait_for_streaming(limit): + if not self._check_archive_recovery(limit) and not self._wait_for_streaming(holder, limit): # Wal receiver is not running and # postgresql isn't in archive recovery # We should try to restart @@ -1035,7 +1038,7 @@ def _simple_primary_switch(self, limit, new_primary, is_dead): # timeline N-1 before current recovery point M". # Checking it with the info from ZK. # - if self._wait_for_streaming(limit, new_primary): + if self._wait_for_streaming(new_primary, limit): # # The easy way succeeded. # @@ -1091,7 +1094,7 @@ def _attach_to_primary(self, new_primary, limit): self.checks['primary_switch'] = 0 return None - if not self._wait_for_streaming(limit): + if not self._wait_for_streaming(new_primary, limit): self.checks['primary_switch'] = 0 return None @@ -1168,8 +1171,9 @@ def _acquire_replication_source_slot_lock(self, source): 'Could not get all hosts list from ZK.' 'Can not release old replication slot locks. We will skip it this time' ) - # And acquire lock (then new_primary will create replication slot) - self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True) + if source: + # And acquire lock (then new_primary will create replication slot) + self.zk.acquire_lock(os.path.join(self.zk.HOST_REPLICATION_SOURCES, source), read_lock=True, release_on_fail=True) def _return_to_cluster(self, new_primary, role, is_dead=False): """ @@ -1617,11 +1621,12 @@ def _check_postgresql_streaming(self, primary=None): return None - def _wait_for_streaming(self, limit=-1, primary=None): + def _wait_for_streaming(self, primary, limit=-1): """ Stop until postgresql start streaming from primary. With limit=-1 the loop here can be infinite. """ + self._acquire_replication_source_slot_lock(primary) check_streaming = functools.partial(self._check_postgresql_streaming, primary) return helpers.await_for_value(check_streaming, limit, 'PostgreSQL started streaming from primary')