Skip to content

Commit

Permalink
Merge pull request #6419 from kozlovsky/components_shutdown
Browse files Browse the repository at this point in the history
Fixes #6358: correct component's shutdown
  • Loading branch information
kozlovsky authored Oct 6, 2021
2 parents 19dcfd8 + 9d8af0e commit 39f9841
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@


class BandwidthAccountingComponent(RestfulComponent):
community: BandwidthAccountingCommunity

_ipv8_component: Ipv8Component
community: BandwidthAccountingCommunity = None
_ipv8_component: Ipv8Component = None

async def run(self):
await super().run()
Expand Down Expand Up @@ -43,4 +42,5 @@ async def run(self):

async def shutdown(self):
await super().shutdown()
await self._ipv8_component.unload_community(self.community)
if self._ipv8_component and self.community:
await self._ipv8_component.unload_community(self.community)
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from ipv8.peerdiscovery.discovery import RandomWalk
from ipv8.peerdiscovery.network import Network

from tribler_core.components.gigachannel.community.gigachannel_community import (
Expand All @@ -13,9 +12,8 @@


class GigaChannelComponent(RestfulComponent):
community: GigaChannelCommunity

_ipv8_component: Ipv8Component
community: GigaChannelCommunity = None
_ipv8_component: Ipv8Component = None

async def run(self):
await super().run()
Expand Down Expand Up @@ -46,4 +44,5 @@ async def run(self):

async def shutdown(self):
await super().shutdown()
await self._ipv8_component.unload_community(self.community)
if self._ipv8_component and self.community:
await self._ipv8_component.unload_community(self.community)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class GigachannelManagerComponent(RestfulComponent):
gigachannel_manager: GigaChannelManager
gigachannel_manager: GigaChannelManager = None

async def run(self):
await super().run()
Expand All @@ -28,6 +28,6 @@ async def run(self):
values={'gigachannel_manager': self.gigachannel_manager})

async def shutdown(self):
self.session.notifier.notify_shutdown_state("Shutting down Gigachannel Manager...")
await super().shutdown()
await self.gigachannel_manager.shutdown()
if self.gigachannel_manager:
await self.gigachannel_manager.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from ipv8.peerdiscovery.community import DiscoveryCommunity, PeriodicSimilarity
from ipv8.peerdiscovery.discovery import RandomWalk
from ipv8.taskmanager import TaskManager

from ipv8_service import IPv8

from tribler_core.components.masterkey.masterkey_component import MasterKeyComponent
from tribler_core.components.restapi import RestfulComponent

Expand All @@ -22,7 +24,7 @@
# pylint: disable=import-outside-toplevel

class Ipv8Component(RestfulComponent):
ipv8: IPv8
ipv8: IPv8 = None
peer: Peer
dht_discovery_community: Optional[DHTDiscoveryCommunity] = None

Expand Down Expand Up @@ -134,10 +136,12 @@ def _init_dht_discovery_community(self):
async def shutdown(self):
await super().shutdown()

if not self.ipv8:
return

for overlay in (self.dht_discovery_community, self._peer_discovery_community):
if overlay:
await self.ipv8.unload_overlay(overlay)

self.session.notifier.notify_shutdown_state("Shutting down IPv8...")
await self._task_manager.shutdown_task_manager()
await self.ipv8.stop(stop_loop=False)
7 changes: 4 additions & 3 deletions src/tribler-core/tribler_core/components/libtorrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class LibtorrentComponent(RestfulComponent):
download_manager: DownloadManager
download_manager: DownloadManager = None

async def run(self):
await super().run()
Expand Down Expand Up @@ -43,5 +43,6 @@ async def run(self):

async def shutdown(self):
await super().shutdown()
self.download_manager.stop_download_states_callback()
await self.download_manager.shutdown()
if self.download_manager:
self.download_manager.stop_download_states_callback()
await self.download_manager.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


class MetadataStoreComponent(RestfulComponent):
mds: MetadataStore
mds: MetadataStore = None

async def run(self):
await super().run()
Expand Down Expand Up @@ -59,5 +59,5 @@ async def run(self):

async def shutdown(self):
await super().shutdown()
self.session.notifier.notify_shutdown_state("Shutting down Metadata Store...")
self.mds.shutdown()
if self.mds:
self.mds.shutdown()
25 changes: 15 additions & 10 deletions src/tribler-core/tribler_core/components/payout.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tribler_common.simpledefs import NTFY

from tribler_core.components.bandwidth_accounting.bandwidth_accounting_component import BandwidthAccountingComponent
from tribler_core.components.base import Component
from tribler_core.components.ipv8.ipv8_component import Ipv8Component
Expand All @@ -9,26 +10,30 @@


class PayoutComponent(Component):
payout_manager: PayoutManager
payout_manager: PayoutManager = None

async def run(self):
await self.get_component(ReporterComponent)
await super().run()

config = self.session.config
assert not config.gui_test_mode

await self.get_component(ReporterComponent)

ipv8_component = await self.require_component(Ipv8Component)
bandwidth_accounting_component = await self.require_component(BandwidthAccountingComponent)

payout_manager = PayoutManager(bandwidth_accounting_component.community, ipv8_component.dht_discovery_community)
self.session.notifier.add_observer(NTFY.PEER_DISCONNECTED_EVENT, payout_manager.do_payout)
self.session.notifier.add_observer(NTFY.TRIBLER_TORRENT_PEER_UPDATE, payout_manager.update_peer)
self.payout_manager = PayoutManager(bandwidth_accounting_component.community,
ipv8_component.dht_discovery_community)

assert not config.gui_test_mode
self.session.notifier.add_observer(NTFY.PEER_DISCONNECTED_EVENT, self.payout_manager.do_payout)
self.session.notifier.add_observer(NTFY.TRIBLER_TORRENT_PEER_UPDATE, self.payout_manager.update_peer)

self.payout_manager = payout_manager

async def shutdown(self):
self.session.notifier.remove_observer(NTFY.PEER_DISCONNECTED_EVENT, self.payout_manager.do_payout)
self.session.notifier.remove_observer(NTFY.TRIBLER_TORRENT_PEER_UPDATE, self.payout_manager.update_peer)
await super().shutdown()
if self.payout_manager:
self.session.notifier.remove_observer(NTFY.PEER_DISCONNECTED_EVENT, self.payout_manager.do_payout)
self.session.notifier.remove_observer(NTFY.TRIBLER_TORRENT_PEER_UPDATE, self.payout_manager.update_peer)

await self.payout_manager.shutdown()
await self.payout_manager.shutdown()
9 changes: 5 additions & 4 deletions src/tribler-core/tribler_core/components/popularity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ipv8.peerdiscovery.network import Network

from tribler_core.components.base import Component
from tribler_core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler_core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
Expand All @@ -8,11 +9,10 @@
from tribler_core.modules.popularity.community import PopularityCommunity



class PopularityComponent(Component):
community: PopularityCommunity
community: PopularityCommunity = None

_ipv8_component: Ipv8Component
_ipv8_component: Ipv8Component = None

async def run(self):
await super().run()
Expand All @@ -37,4 +37,5 @@ async def run(self):

async def shutdown(self):
await super().shutdown()
await self._ipv8_component.unload_community(self.community)
if self._ipv8_component and self.community:
await self._ipv8_component.unload_community(self.community)
6 changes: 3 additions & 3 deletions src/tribler-core/tribler_core/components/resource_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class ResourceMonitorComponent(RestfulComponent):
resource_monitor: CoreResourceMonitor
resource_monitor: CoreResourceMonitor = None

async def run(self):
await super().run()
Expand All @@ -24,6 +24,6 @@ async def run(self):
await self.init_endpoints(endpoints=['debug'], values={'resource_monitor': resource_monitor})

async def shutdown(self):
self.session.notifier.notify_shutdown_state("Shutting down Resource Monitor...")
await super().shutdown()
await self.resource_monitor.stop()
if self.resource_monitor:
await self.resource_monitor.stop()
17 changes: 9 additions & 8 deletions src/tribler-core/tribler_core/components/restapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ipv8_service import IPv8

from tribler_common.simpledefs import STATE_START_API

from tribler_core.components.base import Component
from tribler_core.components.reporter import ReporterComponent
from tribler_core.exception_handler import CoreExceptionHandler
Expand Down Expand Up @@ -53,9 +54,11 @@ async def init_ipv8_endpoints(self, ipv8: IPv8, endpoints: List[str]):
endpoint.initialize(ipv8)

async def run(self):
await super().run()
await self.get_component(ReporterComponent)

async def shutdown(self):
await super().shutdown()
rest_component = await self.get_component(RESTComponent)
if not rest_component:
return
Expand All @@ -67,9 +70,10 @@ async def shutdown(self):


class RESTComponent(Component):
rest_manager: RESTManager
rest_manager: RESTManager = None

async def run(self):
await super().run()
await self.get_component(ReporterComponent)
session = self.session
config = session.config
Expand All @@ -83,6 +87,7 @@ async def run(self):
# communicate with the upgrader. Thus, we start the endpoints immediately and
# then gradually connect them to their respective backends during the core start process.
await rest_manager.start()
self.rest_manager = rest_manager

rest_manager.get_endpoint('shutdown').connect_shutdown_callback(shutdown_event.set)
rest_manager.get_endpoint('settings').tribler_config = config
Expand All @@ -109,12 +114,8 @@ def report_callback(text_long, sentry_event):

CoreExceptionHandler.report_callback = report_callback

# We provide the REST API only after the essential endpoints (events, state and shutdown) and
# the exception handler were initialized
self.rest_manager = rest_manager

async def shutdown(self):
# TODO: disconnect notifier from endpoints
await super().shutdown()
CoreExceptionHandler.report_callback = None
self.session.notifier.notify_shutdown_state("Shutting down API Manager...")
await self.rest_manager.stop()
if self.rest_manager:
await self.rest_manager.stop()
6 changes: 3 additions & 3 deletions src/tribler-core/tribler_core/components/torrent_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


class TorrentCheckerComponent(RestfulComponent):
torrent_checker: TorrentChecker
torrent_checker: TorrentChecker = None

async def run(self):
await super().run()
Expand All @@ -33,6 +33,6 @@ async def run(self):
await self.init_endpoints(endpoints=['metadata'], values={'torrent_checker': torrent_checker})

async def shutdown(self):
self.session.notifier.notify_shutdown_state("Shutting down Torrent Checker...")
await super().shutdown()
await self.torrent_checker.shutdown()
if self.torrent_checker:
await self.torrent_checker.shutdown()
7 changes: 4 additions & 3 deletions src/tribler-core/tribler_core/components/tunnels.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@


class TunnelsComponent(RestfulComponent):
community: TriblerTunnelCommunity
community: TriblerTunnelCommunity = None

_ipv8_component: Ipv8Component
_ipv8_component: Ipv8Component = None

async def run(self):
await super().run()
Expand Down Expand Up @@ -64,4 +64,5 @@ async def run(self):

async def shutdown(self):
await super().shutdown()
await self._ipv8_component.unload_community(self.community)
if self._ipv8_component and self.community:
await self._ipv8_component.unload_community(self.community)
8 changes: 5 additions & 3 deletions src/tribler-core/tribler_core/components/version_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@


class VersionCheckComponent(Component):
version_check_manager: VersionCheckManager
version_check_manager: VersionCheckManager = None

async def run(self):
await super().run()
await self.get_component(ReporterComponent)
await self.get_component(UpgradeComponent)

Expand All @@ -17,5 +18,6 @@ async def run(self):
self.version_check_manager.start()

async def shutdown(self):
self.session.notifier.notify_shutdown_state("Shutting down Version Checker...")
await self.version_check_manager.stop()
await super().shutdown()
if self.version_check_manager:
await self.version_check_manager.stop()
6 changes: 3 additions & 3 deletions src/tribler-core/tribler_core/components/watch_folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class WatchFolderComponent(RestfulComponent):
watch_folder: WatchFolder
watch_folder: WatchFolder = None

async def run(self):
await super().run()
Expand All @@ -23,6 +23,6 @@ async def run(self):
self.watch_folder = watch_folder

async def shutdown(self):
self.session.notifier.notify_shutdown_state("Shutting down Watch Folder...")
await super().shutdown()
await self.watch_folder.stop()
if self.watch_folder:
await self.watch_folder.stop()

0 comments on commit 39f9841

Please sign in to comment.