Skip to content

Commit

Permalink
trinity: Proper cleanup in RopstenLightNode
Browse files Browse the repository at this point in the history
RopstenLightNode wasn't passing its CancelToken down to its
sub-services, so its _cleanup() method could return while we still had
pending asyncio tasks

Closes: #918
  • Loading branch information
gsalgado committed Jun 22, 2018
1 parent 1c11875 commit 3f6d058
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 64 deletions.
14 changes: 13 additions & 1 deletion p2p/DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,16 @@ we use `CancelToken`s, which are heavily inspired by https://vorpus.org/blog/tim

- If your service runs coroutines in the background (e.g. via `asyncio.ensure_future`), you must
ensure they exit when `is_running` is False or when the cancel token is triggered
- If your service runs other services in the background, you should ensure your `_cleanup()` method stops them.
- If your service runs other services in the background, you should pass your CancelToken down to
those services and ensure your `_cleanup()` waits for them to cleanup as well

```Python
class Node(BaseService):
async def _run(self):
self.discovery = DiscoveryService(token=self.cancel_token)
asyncio.ensure_future(self.discovery.run())
# Node's run logic goes here...

async def _cleanup(self):
await self.discovery.cleaned_up.wait()
```
5 changes: 3 additions & 2 deletions p2p/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,9 @@ class DiscoveryService(BaseService):
_last_lookup: float = 0
_lookup_interval: int = 30

def __init__(self, proto: DiscoveryProtocol, peer_pool: PeerPool) -> None:
super().__init__()
def __init__(
self, proto: DiscoveryProtocol, peer_pool: PeerPool, token: CancelToken = None) -> None:
super().__init__(token)
self.proto = proto
self.peer_pool = peer_pool

Expand Down
9 changes: 6 additions & 3 deletions p2p/lightchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
OperationCancelled,
TooManyTimeouts,
)
from p2p.cancel_token import CancelToken
from p2p import les
from p2p import protocol
from p2p.constants import REPLY_TIMEOUT
Expand Down Expand Up @@ -78,8 +79,10 @@ def __init__(
self,
headerdb: 'BaseAsyncHeaderDB',
peer_pool: PeerPool,
chain_class: Type[BaseChain]) -> None:
super().__init__()
chain_class: Type[BaseChain],
token: CancelToken = None) -> None:
PeerPoolSubscriber.__init__(self)
BaseService.__init__(self, token)
self.headerdb = headerdb
self.peer_pool = peer_pool
self._announcement_queue: asyncio.Queue[Tuple[LESPeer, les.HeadInfo]] = asyncio.Queue()
Expand Down Expand Up @@ -258,7 +261,7 @@ async def _validate_header(self, header):
VM.validate_header(header, parent_header)

async def _cleanup(self):
self.logger.info("Stopping LightPeerChain...")
pass

async def _wait_for_reply(self, request_id: int) -> Dict[str, Any]:
reply = None
Expand Down
3 changes: 2 additions & 1 deletion p2p/peer.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,9 @@ def __init__(self,
privkey: datatypes.PrivateKey,
vm_configuration: Tuple[Tuple[int, Type[BaseVM]], ...],
max_peers: int = DEFAULT_MAX_PEERS,
token: CancelToken = None,
) -> None:
super().__init__()
super().__init__(token)
self.peer_class = peer_class
self.headerdb = headerdb
self.network_id = network_id
Expand Down
8 changes: 3 additions & 5 deletions p2p/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def _make_peer_pool(self) -> PeerPool:
self.privkey,
self.chain.vm_configuration,
max_peers=self.max_peers,
token=self.cancel_token,
)

async def _run(self) -> None:
Expand All @@ -172,7 +173,7 @@ async def _run(self) -> None:
discovery_proto = PreferredNodeDiscoveryProtocol(
self.privkey, addr, self.bootstrap_nodes, self.preferred_nodes)
await self._start_udp_listener(discovery_proto)
self.discovery = DiscoveryService(discovery_proto, self.peer_pool)
self.discovery = DiscoveryService(discovery_proto, self.peer_pool, self.cancel_token)
asyncio.ensure_future(self.peer_pool.run())
asyncio.ensure_future(self.discovery.run())
asyncio.ensure_future(self.upnp_service.run())
Expand All @@ -181,10 +182,7 @@ async def _run(self) -> None:

async def _cleanup(self) -> None:
self.logger.info("Closing server...")
await asyncio.gather(
self.peer_pool.cancel(),
self.discovery.cancel(),
)
await asyncio.gather(self.peer_pool.cleaned_up.wait(), self.discovery.cleaned_up.wait())
await self._close()

async def receive_handshake(
Expand Down
45 changes: 4 additions & 41 deletions p2p/service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from abc import ABC, abstractmethod
import asyncio
from collections import UserList
import logging
from typing import Any, Awaitable, Callable, List, Optional
from typing import Any, Awaitable, Callable, Optional

from p2p.cancel_token import CancelToken, wait_with_token
from p2p.exceptions import OperationCancelled
Expand Down Expand Up @@ -70,8 +69,6 @@ async def run(
self.logger.info("%s finished: %s", self, e)
except Exception:
self.logger.exception("Unexpected error in %r, exiting", self)
else:
self.logger.debug("%s finished cleanly", self)
finally:
# Trigger our cancel token to ensure all pending asyncio tasks and background
# coroutines started by this service exit cleanly.
Expand All @@ -91,6 +88,7 @@ async def cancel(self):
"""Trigger the CancelToken and wait for the cleaned_up event to be set."""
if self.cancel_token.triggered:
self.logger.warning("Tried to cancel %s, but it was already cancelled", self)
return
elif not self.is_running:
raise RuntimeError("Cannot cancel a service that has not been started")

Expand All @@ -101,6 +99,8 @@ async def cancel(self):
self.cleaned_up.wait(), timeout=self._wait_until_finished_timeout)
except asyncio.futures.TimeoutError:
self.logger.info("Timed out waiting for %s to finish its cleanup, exiting anyway", self)
else:
self.logger.debug("%s finished cleanly", self)

@property
def is_running(self) -> bool:
Expand Down Expand Up @@ -129,40 +129,3 @@ async def _run(self) -> None:

async def _cleanup(self) -> None:
pass


class ServiceContext(UserList):
"""
Run a sequence of services in a context manager, closing them all cleanly on exit.
"""
logger = logging.getLogger("p2p.service.ServiceContext")

def __init__(self, services: List[BaseService] = None) -> None:
if services is None:
super().__init__()
else:
super().__init__(services)
self.started_services: List[BaseService] = []
self._run_lock = asyncio.Lock()

async def __aenter__(self):
if self._run_lock.locked():
raise RuntimeError("Cannot enter ServiceContext while it is already running")
await self._run_lock.acquire()

self.started_services = list(self.data)
for service in self.started_services:
asyncio.ensure_future(service.run())

async def __aexit__(self, exc_type, exc, tb):
service_cancellations = [service.cancel() for service in self.started_services]
results = await asyncio.gather(*service_cancellations, return_exceptions=True)
for service, result in zip(self.started_services, results):
if isinstance(result, BaseException):
self.logger.warning(
"Exception while cancelling service %r: %r",
service,
result,
)
self.started_services = []
self._run_lock.release()
3 changes: 3 additions & 0 deletions trinity/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def main() -> None:
logger.info('Keyboard Interrupt: Stopping')
kill_process_gracefully(database_server_process, logger)
logger.info('DB server process (pid=%d) terminated', database_server_process.pid)
# XXX: This short sleep here seems to avoid us hitting a deadlock when attempting to
# join() the networking subprocess: https://github.com/ethereum/py-evm/issues/940
import time; time.sleep(0.2) # noqa: E702
kill_process_gracefully(networking_process, logger)
logger.info('Networking process (pid=%d) terminated', networking_process.pid)

Expand Down
11 changes: 7 additions & 4 deletions trinity/nodes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
)
from threading import Thread
from typing import (
List,
Type,
Union
)
Expand All @@ -18,7 +19,6 @@
from p2p.service import (
BaseService,
EmptyService,
ServiceContext,
)
from trinity.chains import (
ChainProxy,
Expand Down Expand Up @@ -61,7 +61,7 @@ def __init__(self, chain_config: ChainConfig) -> None:
self._headerdb = self._db_manager.get_headerdb() # type: ignore

self._jsonrpc_ipc_path: Path = chain_config.jsonrpc_ipc_path
self._auxiliary_services = ServiceContext()
self._auxiliary_services: List[BaseService] = []

@abstractmethod
def get_chain(self) -> BaseChain:
Expand Down Expand Up @@ -122,12 +122,15 @@ async def _run(self) -> None:
self._ipc_server.run(loop=ipc_loop), loop=ipc_loop # type: ignore
)

async with self._auxiliary_services:
await self.get_p2p_server().run()
for service in self._auxiliary_services:
asyncio.ensure_future(service.run())

await self.get_p2p_server().run()

async def _cleanup(self) -> None:
if isinstance(self._ipc_server, IPCServer):
await self._ipc_server.stop()
await asyncio.gather(*[service.cleaned_up.wait() for service in self._auxiliary_services])

def _make_new_loop_thread(self) -> asyncio.AbstractEventLoop:
new_loop = asyncio.new_event_loop()
Expand Down
14 changes: 7 additions & 7 deletions trinity/nodes/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def __init__(self, chain_config: ChainConfig) -> None:
preferred_nodes=chain_config.preferred_nodes,
)
self._peer_pool = self._create_peer_pool(chain_config)
self._discovery = DiscoveryService(self._discovery_proto, self._peer_pool)
self._discovery = DiscoveryService(
self._discovery_proto, self._peer_pool, self.cancel_token)
self.add_service(self._discovery)
self.add_service(self._peer_pool)
self.create_and_add_tx_pool()

Expand All @@ -61,11 +63,7 @@ async def _run(self) -> None:
lambda: self._discovery_proto,
local_addr=('0.0.0.0', self._port)
)
asyncio.ensure_future(self._discovery.run())
try:
await super()._run()
finally:
await self._discovery.cancel()
await super()._run()

def get_chain(self) -> LightDispatchChain:
if self._chain is None:
Expand All @@ -79,7 +77,8 @@ def get_p2p_server(self) -> LightPeerChain:
if self._p2p_server is None:
if self.chain_class is None:
raise AttributeError("LightNode subclass must set chain_class")
self._p2p_server = LightPeerChain(self.headerdb, self._peer_pool, self.chain_class)
self._p2p_server = LightPeerChain(
self.headerdb, self._peer_pool, self.chain_class, self.cancel_token)
return self._p2p_server

def get_peer_pool(self) -> PeerPool:
Expand All @@ -92,4 +91,5 @@ def _create_peer_pool(self, chain_config: ChainConfig) -> PeerPool:
chain_config.network_id,
chain_config.nodekey,
self.chain_class.vm_configuration,
token=self.cancel_token,
)

0 comments on commit 3f6d058

Please sign in to comment.