Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

harvester: Fix deadlock on disconnect after a repeated plot sync start #11481

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chia/harvester/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ def _plot_refresh_callback(self, event: PlotRefreshEvents, update_result: PlotRe
def on_disconnect(self, connection: ws.WSChiaConnection):
self.log.info(f"peer disconnected {connection.get_peer_logging()}")
self.state_changed("close_connection")
self.plot_manager.stop_refreshing()
self.plot_sync_sender.stop()
asyncio.run_coroutine_threadsafe(self.plot_sync_sender.await_closed(), asyncio.get_running_loop())
self.plot_manager.stop_refreshing()

def get_plots(self) -> Tuple[List[Dict], List[str], List[str]]:
self.log.debug(f"get_plots prover items: {self.plot_manager.plot_count()}")
Expand Down
15 changes: 7 additions & 8 deletions chia/plot_sync/sender.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import threading
import time
import traceback
from dataclasses import dataclass
Expand Down Expand Up @@ -91,7 +90,6 @@ class Sender:
_last_sync_id: uint64
_stop_requested = False
_task: Optional[asyncio.Task] # type: ignore[type-arg] # Asks for Task parameter which doesn't work
_lock: threading.Lock
_response: Optional[ExpectedResponse]

def __init__(self, plot_manager: PlotManager) -> None:
Expand All @@ -103,7 +101,6 @@ def __init__(self, plot_manager: PlotManager) -> None:
self._last_sync_id = uint64(0)
self._stop_requested = False
self._task = None
self._lock = threading.Lock()
self._response = None

def __str__(self) -> str:
Expand Down Expand Up @@ -145,8 +142,6 @@ def _reset(self) -> None:
self._sync_id = uint64(0)
self._next_message_id = uint64(0)
self._messages.clear()
if self._lock.locked():
self._lock.release()
if self._task is not None:
# TODO, Add typing in PlotManager
self.sync_start(self._plot_manager.plot_count(), True) # type:ignore[no-untyped-call]
Expand Down Expand Up @@ -256,7 +251,12 @@ def _add_list_batched(self, message_type: ProtocolMessageTypes, payload_type: An

def sync_start(self, count: float, initial: bool) -> None:
log.debug(f"sync_start {self}: count {count}, initial {initial}")
self._lock.acquire()
while self.sync_active():
if self._stop_requested:
log.debug("sync_start aborted")
return
log.debug(f"sync_start wait for sync {self._sync_id}")
time.sleep(1)
sync_id = int(time.time())
# Make sure we have unique sync-id's even if we restart refreshing within a second (i.e. in tests)
if sync_id == self._last_sync_id:
Expand Down Expand Up @@ -297,10 +297,9 @@ def _finalize_sync(self) -> None:
self._sync_id = uint64(0)
xdustinface marked this conversation as resolved.
Show resolved Hide resolved
self._next_message_id = uint64(0)
self._messages.clear()
self._lock.release()

def sync_active(self) -> bool:
return self._lock.locked() and self._sync_id != 0
return self._sync_id != 0

def connected(self) -> bool:
return self._connection is not None
Expand Down
66 changes: 63 additions & 3 deletions tests/plot_sync/test_plot_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
import functools
from dataclasses import dataclass, field
from pathlib import Path
from shutil import copy
from typing import List, Optional, Tuple
from typing import Any, AsyncIterator, Callable, List, Optional, Tuple

import pytest
import pytest_asyncio
Expand All @@ -12,27 +14,36 @@
from chia.plot_sync.delta import Delta, PathListDelta, PlotListDelta
from chia.plot_sync.receiver import Receiver
from chia.plot_sync.sender import Sender
from chia.plot_sync.util import State
from chia.plot_sync.util import Constants, State
from chia.plotting.manager import PlotManager
from chia.plotting.util import add_plot_directory, remove_plot_directory
from chia.protocols.harvester_protocol import Plot
from chia.server.start_service import Service
from chia.server.ws_connection import ProtocolMessageTypes
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.config import create_default_chia_config, lock_and_load_config, save_config
from chia.util.ints import uint8, uint64
from chia.util.streamable import _T_Streamable
from tests.block_tools import BlockTools
from tests.plot_sync.util import start_harvester_service
from tests.plotting.test_plot_manager import MockPlotInfo, TestDirectory
from tests.plotting.util import get_test_plots
from tests.setup_nodes import setup_harvester_farmer, test_constants
from tests.time_out_assert import time_out_assert


@pytest_asyncio.fixture(scope="function")
async def harvester_farmer_simulation(bt: BlockTools, tmp_path: Path) -> AsyncIterator[Service]:
async for _ in setup_harvester_farmer(bt, tmp_path, test_constants, start_services=True):
yield _


def synced(sender: Sender, receiver: Receiver, previous_last_sync_id: int) -> bool:
return (
sender._last_sync_id != previous_last_sync_id
and sender._last_sync_id == receiver._last_sync.sync_id != 0
and receiver.current_sync().state == State.idle
and not sender._lock.locked()
and not sender.sync_active()
)


Expand Down Expand Up @@ -539,3 +550,52 @@ async def test_farmer_restart(environment: Environment) -> None:
assert len(plot_manager.failed_to_open_filenames) == len(receiver.invalid()) == expected.invalid_count
assert len(plot_manager.no_key_filenames) == len(receiver.keys_missing()) == expected.keys_missing_count
assert len(plot_manager.get_duplicates()) == len(receiver.duplicates()) == expected.duplicates_count


@pytest.mark.asyncio
async def test_sync_start_and_disconnect_while_sync_is_active(
harvester_farmer_simulation: Tuple[Service, Service]
) -> None:
harvester_service, farmer_service = harvester_farmer_simulation
harvester = harvester_service._node
farmer: Farmer = farmer_service._node
Constants.message_timeout = 3

async def receiver_available() -> bool:
return harvester.server.node_id in farmer.plot_sync_receivers

async def disconnecting_process(
self: Receiver, method: Callable[[_T_Streamable], Any], message_type: ProtocolMessageTypes, message: Any
) -> None:
if self.current_sync().state == State.loaded:
harvester.plot_manager.trigger_refresh()
await asyncio.sleep(2)
await self.connection().close()
return
await original_process(method, message_type, message)

# Wait for the receiver to show up
await time_out_assert(10, receiver_available)
receiver = farmer.plot_sync_receivers[harvester.server.node_id]
# And wait until the first sync from the harvester to the farmer is done
await time_out_assert(10, receiver.initial_sync, False)
# Replace the `Receiver._process` with `disconnecting_process` which triggers a plot manager refresh and disconnects
# the farmer from the harvester during an active sync.
original_process = receiver._process
receiver._process = functools.partial(disconnecting_process, receiver) # type: ignore[assignment]
# Trigger the refresh which leads to a new sync_start being triggered during the active sync.
harvester.plot_manager.trigger_refresh()
await time_out_assert(10, harvester.plot_sync_sender.sync_active)
# Now wait until the receiver disappears from the farmer's plot_sync_receivers which means its disconnected.
await time_out_assert(10, receiver_available, False)
# Wait until the sync was aborted
await time_out_assert(10, harvester.plot_sync_sender.sync_active, False)
# And then wait for the harvester to reconnect and the receiver to re-appear.
await time_out_assert(10, receiver_available, True)
# Make sure the receiver object has been changed because of the disconnect
assert farmer.plot_sync_receivers[harvester.server.node_id] is not receiver
receiver = farmer.plot_sync_receivers[harvester.server.node_id]
current_last_sync_id = receiver.last_sync().sync_id
# Now start another sync and wait for it to be done to make sure everything still works fine
harvester.plot_manager.trigger_refresh()
await time_out_assert(10, synced, True, harvester.plot_sync_sender, receiver, current_last_sync_id)
1 change: 0 additions & 1 deletion tests/plot_sync/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def test_default_values(bt: BlockTools) -> None:
assert sender._last_sync_id == uint64(0)
assert not sender._stop_requested
assert sender._task is None
assert not sender._lock.locked()
assert sender._response is None


Expand Down