From fc2d6d3db038bb57176da668e0b0f3c60c32bd71 Mon Sep 17 00:00:00 2001 From: dustinface <35775977+xdustinface@users.noreply.github.com> Date: Fri, 13 May 2022 00:16:22 +0200 Subject: [PATCH] harvester: Fix deadlock on disconnect after a repeated plot sync start (#11481) * Test plot sync start and disconnect while a sync is in progress * Drop `Receiver._lock`, instead wait for `Receiver.sync_active` to clear * Properly stop the sender and plot manager refreshing on disconnect * Poll faster, drop the log * Reset `_sync_id` at the very end of `_finalize_sync` --- chia/harvester/harvester.py | 3 +- chia/plot_sync/sender.py | 17 ++++---- tests/plot_sync/test_plot_sync.py | 66 +++++++++++++++++++++++++++++-- tests/plot_sync/test_sender.py | 1 - 4 files changed, 73 insertions(+), 14 deletions(-) diff --git a/chia/harvester/harvester.py b/chia/harvester/harvester.py index 9dc2b885579a..84d0cb2f2773 100644 --- a/chia/harvester/harvester.py +++ b/chia/harvester/harvester.py @@ -104,8 +104,9 @@ def _plot_refresh_callback(self, event: PlotRefreshEvents, update_result: PlotRe def on_disconnect(self, connection: ws.WSChiaConnection): self.log.info(f"peer disconnected {connection.get_peer_logging()}") self.state_changed("close_connection") - self.plot_manager.stop_refreshing() self.plot_sync_sender.stop() + asyncio.run_coroutine_threadsafe(self.plot_sync_sender.await_closed(), asyncio.get_running_loop()) + self.plot_manager.stop_refreshing() def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]: self.log.debug(f"get_plots prover items: {self.plot_manager.plot_count()}") diff --git a/chia/plot_sync/sender.py b/chia/plot_sync/sender.py index 257051a4c2ed..a9ea4f5f1497 100644 --- a/chia/plot_sync/sender.py +++ b/chia/plot_sync/sender.py @@ -1,6 +1,5 @@ import asyncio import logging -import threading import time import traceback from dataclasses import dataclass @@ -91,7 +90,6 @@ class Sender: _last_sync_id: uint64 _stop_requested = False _task: Optional[asyncio.Task] # type: ignore[type-arg] # Asks for Task parameter which doesn't work - _lock: threading.Lock _response: Optional[ExpectedResponse] def __init__(self, plot_manager: PlotManager) -> None: @@ -103,7 +101,6 @@ def __init__(self, plot_manager: PlotManager) -> None: self._last_sync_id = uint64(0) self._stop_requested = False self._task = None - self._lock = threading.Lock() self._response = None def __str__(self) -> str: @@ -145,8 +142,6 @@ def _reset(self) -> None: self._sync_id = uint64(0) self._next_message_id = uint64(0) self._messages.clear() - if self._lock.locked(): - self._lock.release() if self._task is not None: # TODO, Add typing in PlotManager self.sync_start(self._plot_manager.plot_count(), True) # type:ignore[no-untyped-call] @@ -256,7 +251,11 @@ def _add_list_batched(self, message_type: ProtocolMessageTypes, payload_type: An def sync_start(self, count: float, initial: bool) -> None: log.debug(f"sync_start {self}: count {count}, initial {initial}") - self._lock.acquire() + while self.sync_active(): + if self._stop_requested: + log.debug("sync_start aborted") + return + time.sleep(0.1) sync_id = int(time.time()) # Make sure we have unique sync-id's even if we restart refreshing within a second (i.e. in tests) if sync_id == self._last_sync_id: @@ -294,13 +293,13 @@ def _finalize_sync(self) -> None: log.debug(f"_finalize_sync {self}") assert self._sync_id != 0 self._last_sync_id = self._sync_id - self._sync_id = uint64(0) self._next_message_id = uint64(0) self._messages.clear() - self._lock.release() + # Do this at the end since `_sync_id` is used as sync active indicator. + self._sync_id = uint64(0) def sync_active(self) -> bool: - return self._lock.locked() and self._sync_id != 0 + return self._sync_id != 0 def connected(self) -> bool: return self._connection is not None diff --git a/tests/plot_sync/test_plot_sync.py b/tests/plot_sync/test_plot_sync.py index 3c7942e6dd6b..2bcaf985ff3a 100644 --- a/tests/plot_sync/test_plot_sync.py +++ b/tests/plot_sync/test_plot_sync.py @@ -1,7 +1,9 @@ +import asyncio +import functools from dataclasses import dataclass, field from pathlib import Path from shutil import copy -from typing import List, Optional, Tuple +from typing import Any, AsyncIterator, Callable, List, Optional, Tuple import pytest import pytest_asyncio @@ -12,27 +14,36 @@ from chia.plot_sync.delta import Delta, PathListDelta, PlotListDelta from chia.plot_sync.receiver import Receiver from chia.plot_sync.sender import Sender -from chia.plot_sync.util import State +from chia.plot_sync.util import Constants, State from chia.plotting.manager import PlotManager from chia.plotting.util import add_plot_directory, remove_plot_directory from chia.protocols.harvester_protocol import Plot from chia.server.start_service import Service +from chia.server.ws_connection import ProtocolMessageTypes from chia.types.blockchain_format.sized_bytes import bytes32 from chia.util.config import create_default_chia_config, lock_and_load_config, save_config from chia.util.ints import uint8, uint64 +from chia.util.streamable import _T_Streamable from tests.block_tools import BlockTools from tests.plot_sync.util import start_harvester_service from tests.plotting.test_plot_manager import MockPlotInfo, TestDirectory from tests.plotting.util import get_test_plots +from tests.setup_nodes import setup_harvester_farmer, test_constants from tests.time_out_assert import time_out_assert +@pytest_asyncio.fixture(scope="function") +async def harvester_farmer_simulation(bt: BlockTools, tmp_path: Path) -> AsyncIterator[Service]: + async for _ in setup_harvester_farmer(bt, tmp_path, test_constants, start_services=True): + yield _ + + def synced(sender: Sender, receiver: Receiver, previous_last_sync_id: int) -> bool: return ( sender._last_sync_id != previous_last_sync_id and sender._last_sync_id == receiver._last_sync.sync_id != 0 and receiver.current_sync().state == State.idle - and not sender._lock.locked() + and not sender.sync_active() ) @@ -539,3 +550,52 @@ async def test_farmer_restart(environment: Environment) -> None: assert len(plot_manager.failed_to_open_filenames) == len(receiver.invalid()) == expected.invalid_count assert len(plot_manager.no_key_filenames) == len(receiver.keys_missing()) == expected.keys_missing_count assert len(plot_manager.get_duplicates()) == len(receiver.duplicates()) == expected.duplicates_count + + +@pytest.mark.asyncio +async def test_sync_start_and_disconnect_while_sync_is_active( + harvester_farmer_simulation: Tuple[Service, Service] +) -> None: + harvester_service, farmer_service = harvester_farmer_simulation + harvester = harvester_service._node + farmer: Farmer = farmer_service._node + Constants.message_timeout = 3 + + async def receiver_available() -> bool: + return harvester.server.node_id in farmer.plot_sync_receivers + + async def disconnecting_process( + self: Receiver, method: Callable[[_T_Streamable], Any], message_type: ProtocolMessageTypes, message: Any + ) -> None: + if self.current_sync().state == State.loaded: + harvester.plot_manager.trigger_refresh() + await asyncio.sleep(2) + await self.connection().close() + return + await original_process(method, message_type, message) + + # Wait for the receiver to show up + await time_out_assert(10, receiver_available) + receiver = farmer.plot_sync_receivers[harvester.server.node_id] + # And wait until the first sync from the harvester to the farmer is done + await time_out_assert(10, receiver.initial_sync, False) + # Replace the `Receiver._process` with `disconnecting_process` which triggers a plot manager refresh and disconnects + # the farmer from the harvester during an active sync. + original_process = receiver._process + receiver._process = functools.partial(disconnecting_process, receiver) # type: ignore[assignment] + # Trigger the refresh which leads to a new sync_start being triggered during the active sync. + harvester.plot_manager.trigger_refresh() + await time_out_assert(10, harvester.plot_sync_sender.sync_active) + # Now wait until the receiver disappears from the farmer's plot_sync_receivers which means its disconnected. + await time_out_assert(10, receiver_available, False) + # Wait until the sync was aborted + await time_out_assert(10, harvester.plot_sync_sender.sync_active, False) + # And then wait for the harvester to reconnect and the receiver to re-appear. + await time_out_assert(10, receiver_available, True) + # Make sure the receiver object has been changed because of the disconnect + assert farmer.plot_sync_receivers[harvester.server.node_id] is not receiver + receiver = farmer.plot_sync_receivers[harvester.server.node_id] + current_last_sync_id = receiver.last_sync().sync_id + # Now start another sync and wait for it to be done to make sure everything still works fine + harvester.plot_manager.trigger_refresh() + await time_out_assert(10, synced, True, harvester.plot_sync_sender, receiver, current_last_sync_id) diff --git a/tests/plot_sync/test_sender.py b/tests/plot_sync/test_sender.py index 09747ec8bbaf..db15ae7abeca 100644 --- a/tests/plot_sync/test_sender.py +++ b/tests/plot_sync/test_sender.py @@ -20,7 +20,6 @@ def test_default_values(bt: BlockTools) -> None: assert sender._last_sync_id == uint64(0) assert not sender._stop_requested assert sender._task is None - assert not sender._lock.locked() assert sender._response is None