From c69f55c1a08c2a6e80915e6de6c57bf9b9a256f0 Mon Sep 17 00:00:00 2001 From: drew2a Date: Wed, 7 Jul 2021 15:36:59 +0200 Subject: [PATCH] DI v1 (no tests) --- src/run_tribler.py | 34 ++++- .../modules/bandwidth_accounting/community.py | 21 ++- .../modules/community_di_mixin.py | 23 +++ .../tribler_core/modules/community_loader.py | 135 ------------------ .../tribler_core/modules/dht/community.py | 29 ++++ .../modules/discovery/community.py | 22 +++ .../community/gigachannel_community.py | 20 ++- .../modules/popularity/community.py | 17 ++- .../remote_query_community/community.py | 14 +- .../tribler_core/modules/tribler_community.py | 12 -- .../modules/tunnel/community/community.py | 37 +++-- src/tribler-core/tribler_core/session.py | 99 +++++++------ 12 files changed, 239 insertions(+), 224 deletions(-) create mode 100644 src/tribler-core/tribler_core/modules/community_di_mixin.py delete mode 100644 src/tribler-core/tribler_core/modules/community_loader.py create mode 100644 src/tribler-core/tribler_core/modules/dht/community.py create mode 100644 src/tribler-core/tribler_core/modules/discovery/community.py delete mode 100644 src/tribler-core/tribler_core/modules/tribler_community.py diff --git a/src/run_tribler.py b/src/run_tribler.py index 39948becdca..c39e2168b85 100644 --- a/src/run_tribler.py +++ b/src/run_tribler.py @@ -1,9 +1,7 @@ import asyncio import logging.config import os -import signal import sys -from asyncio import ensure_future, get_event_loop from PyQt5.QtCore import QSettings @@ -12,8 +10,17 @@ from tribler_common.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy from tribler_common.sentry_reporter.sentry_scrubber import SentryScrubber from tribler_common.version_manager import VersionHistory +from tribler_core.config.tribler_config import TriblerConfig from tribler_core.dependencies import check_for_missing_dependencies -from tribler_core.session import core_session +from tribler_core.modules.bandwidth_accounting.community import BandwidthAccountingCommunity, \ + BandwidthAccountingTestnetCommunity +from tribler_core.modules.dht.community import DHTDiscoveryStrategies +from tribler_core.modules.discovery.community import TriblerDiscoveryStrategies +from tribler_core.modules.metadata_store.community.gigachannel_community import GigaChannelCommunity, \ + GigaChannelTestnetCommunity +from tribler_core.modules.popularity.community import PopularityCommunity +from tribler_core.modules.tunnel.community.community import TriblerTunnelCommunity, TriblerTunnelTestnetCommunity +from tribler_core.session import CommunityFactory, core_session from tribler_core.utilities.osutils import get_root_state_directory from tribler_core.version import sentry_url, version_id from tribler_gui.utilities import get_translator @@ -22,6 +29,25 @@ CONFIG_FILE_NAME = 'triblerd.conf' +# pylint: disable=import-outside-toplevel + + +def communities_gen(config: TriblerConfig): + yield CommunityFactory(create_class=TriblerDiscoveryStrategies) if config.discovery_community.enabled else ... + yield CommunityFactory(create_class=DHTDiscoveryStrategies) if config.dht.enabled else ... + + bandwidth_accounting_kwargs = {'database': config.state_dir / "sqlite" / "bandwidth.db"} + bandwidth_accounting_cls = BandwidthAccountingTestnetCommunity if config.general.testnet or config.bandwidth_accounting.testnet else BandwidthAccountingCommunity + yield CommunityFactory(create_class=bandwidth_accounting_cls, kwargs=bandwidth_accounting_kwargs) + + tribler_tunnel_cls = TriblerTunnelTestnetCommunity if config.general.testnet or config.tunnel_community.testnet else TriblerTunnelCommunity + yield CommunityFactory(create_class=tribler_tunnel_cls) if config.tunnel_community.enabled else ... + yield CommunityFactory(create_class=PopularityCommunity) if config.popularity_community.enabled else ... + + giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity + yield CommunityFactory(create_class=giga_channel_cls) if config.chant.enabled else ... + + def start_tribler_core(base_path, api_port, api_key, root_state_dir, core_test_mode=False): """ This method will start a new Tribler session. @@ -82,7 +108,7 @@ async def start_tribler(): trace_logger = check_and_enable_code_tracing('core', log_dir) # Run until core_session exits - await core_session(config) + await core_session(config, communities_cls=list(communities_gen(config))) if trace_logger: trace_logger.close() diff --git a/src/tribler-core/tribler_core/modules/bandwidth_accounting/community.py b/src/tribler-core/tribler_core/modules/bandwidth_accounting/community.py index 784fa4dc381..a19a265c141 100644 --- a/src/tribler-core/tribler_core/modules/bandwidth_accounting/community.py +++ b/src/tribler-core/tribler_core/modules/bandwidth_accounting/community.py @@ -6,7 +6,9 @@ from random import Random from typing import Dict, Union +from ipv8.community import Community from ipv8.peer import Peer +from ipv8.peerdiscovery.discovery import RandomWalk from ipv8.requestcache import RequestCache from ipv8.types import Address from tribler_core.modules.bandwidth_accounting import EMPTY_SIGNATURE @@ -15,11 +17,12 @@ from tribler_core.modules.bandwidth_accounting.payload import BandwidthTransactionPayload, \ BandwidthTransactionQueryPayload from tribler_core.modules.bandwidth_accounting.transaction import BandwidthTransactionData -from tribler_core.modules.tribler_community import TriblerCommunity +from tribler_core.modules.community_di_mixin import CommunityDIMixin, DEFAULT_TARGET_PEERS, StrategyFactory +from tribler_core.session import Mediator from tribler_core.utilities.unicode import hexlify -class BandwidthAccountingCommunity(TriblerCommunity): +class BandwidthAccountingCommunity(CommunityDIMixin, Community): """ Community around bandwidth accounting and payouts. """ @@ -27,7 +30,8 @@ class BandwidthAccountingCommunity(TriblerCommunity): DB_NAME = 'bandwidth' version = b'\x02' - def __init__(self, *args, database: Union[str, Path, BandwidthDatabase], **kwargs) -> None: + def __init__(self, *args, mediator: Mediator = None, + database: Union[str, Path, BandwidthDatabase], **kwargs) -> None: """ Initialize the community. :param persistence: The database that stores transactions, will be created if not provided. @@ -37,6 +41,8 @@ def __init__(self, *args, database: Union[str, Path, BandwidthDatabase], **kwarg super().__init__(*args, **kwargs) + self.settings = mediator.config.bandwidth_accounting + self.request_cache = RequestCache() self.my_pk = self.my_peer.public_key.key_to_bin() @@ -52,6 +58,15 @@ def __init__(self, *args, database: Union[str, Path, BandwidthDatabase], **kwarg self.logger.info("Started bandwidth accounting community with public key %s", hexlify(self.my_pk)) + self.init_community_di_mixin(strategies=[ + StrategyFactory(create_class=RandomWalk, target_peers=DEFAULT_TARGET_PEERS), + ]) + + def fill_mediator(self, mediator): + super().fill_mediator(mediator) + + mediator.dictionary['bandwidth_community'] = self + def construct_signed_transaction(self, peer: Peer, amount: int) -> BandwidthTransactionData: """ Construct a new signed bandwidth transaction. diff --git a/src/tribler-core/tribler_core/modules/community_di_mixin.py b/src/tribler-core/tribler_core/modules/community_di_mixin.py new file mode 100644 index 00000000000..feab6392491 --- /dev/null +++ b/src/tribler-core/tribler_core/modules/community_di_mixin.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass + +from ipv8.peerdiscovery.discovery import RandomWalk + +DEFAULT_TARGET_PEERS = 20 +INFINITE_TARGET_PEERS = -1 + + +@dataclass +class StrategyFactory: + create_class: type = RandomWalk + target_peers: int = 20 + + +class CommunityDIMixin: + """Mixin for Dependency Injection + """ + + def init_community_di_mixin(self, strategies=None): + self.strategies = strategies + + def fill_mediator(self, mediator): + ... diff --git a/src/tribler-core/tribler_core/modules/community_loader.py b/src/tribler-core/tribler_core/modules/community_loader.py deleted file mode 100644 index 66dcc77de78..00000000000 --- a/src/tribler-core/tribler_core/modules/community_loader.py +++ /dev/null @@ -1,135 +0,0 @@ -# pylint: disable=import-outside-toplevel -from ipv8.peer import Peer -from ipv8.peerdiscovery.churn import RandomChurn -from ipv8.peerdiscovery.discovery import RandomWalk - -from tribler_core.config.tribler_config import TriblerConfig -from tribler_core.modules.metadata_store.community.sync_strategy import RemovePeers - -INFINITE = -1 -""" -The amount of target_peers for a walk_strategy definition to never stop. -""" - - -def add_bootstrapper(community, bootstrapper): - if bootstrapper: - community.bootstrappers.append(bootstrapper) - - -def load_communities(config: TriblerConfig, trustchain_keypair, ipv8, dlmgr, metadata_store, - torrent_checker, notifier, bootstrapper): - """ This method will be splitted after grand Vadim's PR is merged - """ - peer = Peer(trustchain_keypair) - - if config.discovery_community.enabled: - from ipv8.peerdiscovery.community import DiscoveryCommunity - from ipv8.peerdiscovery.community import PeriodicSimilarity - - community = DiscoveryCommunity(peer, ipv8.endpoint, ipv8.network, max_peers=100) - add_bootstrapper(community, bootstrapper) - ipv8.overlays.append(community) - ipv8.strategies.append((RandomChurn(community), INFINITE)) - ipv8.strategies.append((PeriodicSimilarity(community), INFINITE)) - ipv8.strategies.append((RandomWalk(community), 20)) - - dht_community = None - if config.dht.enabled: - from ipv8.dht.discovery import DHTDiscoveryCommunity - from ipv8.dht.churn import PingChurn - - dht_community = DHTDiscoveryCommunity(peer, ipv8.endpoint, ipv8.network, max_peers=60) - add_bootstrapper(dht_community, bootstrapper) - - ipv8.overlays.append(dht_community) - ipv8.strategies.append((RandomWalk(dht_community), 20)) - ipv8.strategies.append((PingChurn(dht_community), INFINITE)) - - # - - if config.general.testnet or config.bandwidth_accounting.testnet: - from tribler_core.modules.bandwidth_accounting.community import BandwidthAccountingTestnetCommunity - bandwidth_community_cls = BandwidthAccountingTestnetCommunity - else: - from tribler_core.modules.bandwidth_accounting.community import BandwidthAccountingCommunity - bandwidth_community_cls = BandwidthAccountingCommunity - - bandwidth_community = bandwidth_community_cls(peer, ipv8.endpoint, ipv8.network, - settings=config.bandwidth_accounting, - database=config.state_dir / "sqlite" / "bandwidth.db") - add_bootstrapper(bandwidth_community, bootstrapper) - - ipv8.overlays.append(bandwidth_community) - ipv8.strategies.append((RandomWalk(bandwidth_community), 20)) - - # - if config.tunnel_community.enabled: - tunnel_community_cls = None - if config.general.testnet or config.tunnel_community.testnet: - from tribler_core.modules.tunnel.community.community import TriblerTunnelTestnetCommunity - tunnel_community_cls = TriblerTunnelTestnetCommunity - else: - from tribler_core.modules.tunnel.community.community import TriblerTunnelCommunity - tunnel_community_cls = TriblerTunnelCommunity - - from tribler_core.modules.tunnel.community.discovery import GoldenRatioStrategy - from ipv8.messaging.anonymization.community import TunnelSettings - from ipv8.dht.provider import DHTCommunityProvider - - settings = TunnelSettings() - settings.min_circuits = config.tunnel_community.min_circuits - settings.max_circuits = config.tunnel_community.max_circuits - - community = tunnel_community_cls(peer, ipv8.endpoint, ipv8.network, - bandwidth_community=bandwidth_community, - competing_slots=config.tunnel_community.competing_slots, - ipv8=ipv8, - random_slots=config.tunnel_community.random_slots, - config=config.tunnel_community, - notifier=notifier, - dlmgr=dlmgr, - dht_provider=DHTCommunityProvider(dht_community, config.ipv8.port), - settings=settings, - ) - add_bootstrapper(community, bootstrapper) - - ipv8.overlays.append(community) - ipv8.strategies.append((RandomWalk(community), 20)) - ipv8.strategies.append((GoldenRatioStrategy(community), INFINITE)) - - if config.popularity_community.enabled: - from tribler_core.modules.popularity.community import PopularityCommunity - - community = PopularityCommunity(peer, ipv8.endpoint, ipv8.network, - settings=config.popularity_community, - rqc_settings=config.remote_query_community, - metadata_store=metadata_store, - torrent_checker=torrent_checker - ) - add_bootstrapper(community, bootstrapper) - - ipv8.overlays.append(community) - ipv8.strategies.append((RandomWalk(community), 30)) - ipv8.strategies.append((RemovePeers(community), INFINITE)) - - if config.chant.enabled: - if config.general.testnet or config.chant.testnet: - from tribler_core.modules.metadata_store.community.gigachannel_community import GigaChannelTestnetCommunity - gigachannel_community_cls = GigaChannelTestnetCommunity - else: - from tribler_core.modules.metadata_store.community.gigachannel_community import GigaChannelCommunity - gigachannel_community_cls = GigaChannelCommunity - - community = gigachannel_community_cls(peer, ipv8.endpoint, ipv8.network, - settings=config.chant, - rqc_settings=config.remote_query_community, - metadata_store=metadata_store, - notifier=notifier, - max_peers=50 - ) - add_bootstrapper(community, bootstrapper) - - ipv8.overlays.append(community) - ipv8.strategies.append((RandomWalk(community), 30)) - ipv8.strategies.append((RemovePeers(community), INFINITE)) diff --git a/src/tribler-core/tribler_core/modules/dht/community.py b/src/tribler-core/tribler_core/modules/dht/community.py new file mode 100644 index 00000000000..2d4df417bff --- /dev/null +++ b/src/tribler-core/tribler_core/modules/dht/community.py @@ -0,0 +1,29 @@ +from ipv8.dht.discovery import DHTDiscoveryCommunity +from ipv8.peerdiscovery.churn import RandomChurn +from ipv8.peerdiscovery.community import PeriodicSimilarity +from ipv8.peerdiscovery.discovery import RandomWalk + +from tribler_core.modules.community_di_mixin import ( + CommunityDIMixin, + DEFAULT_TARGET_PEERS, + INFINITE_TARGET_PEERS, + StrategyFactory, +) +from tribler_core.session import Mediator + + +class DHTDiscoveryStrategies(CommunityDIMixin, DHTDiscoveryCommunity): + def __init__(self, *args, mediator=None, **kwargs): + kwargs['max_peers'] = 60 + super().__init__(*args, **kwargs) + + self.init_community_di_mixin(strategies=[ + StrategyFactory(create_class=RandomChurn, target_peers=INFINITE_TARGET_PEERS), + StrategyFactory(create_class=PeriodicSimilarity, target_peers=INFINITE_TARGET_PEERS), + StrategyFactory(create_class=RandomWalk, target_peers=DEFAULT_TARGET_PEERS), + ]) + + def fill_mediator(self, mediator: Mediator): + super().fill_mediator(mediator) + + mediator.dictionary['dht_community'] = self diff --git a/src/tribler-core/tribler_core/modules/discovery/community.py b/src/tribler-core/tribler_core/modules/discovery/community.py new file mode 100644 index 00000000000..be28444e30a --- /dev/null +++ b/src/tribler-core/tribler_core/modules/discovery/community.py @@ -0,0 +1,22 @@ +from ipv8.peerdiscovery.churn import RandomChurn +from ipv8.peerdiscovery.community import DiscoveryCommunity, PeriodicSimilarity +from ipv8.peerdiscovery.discovery import RandomWalk + +from tribler_core.modules.community_di_mixin import ( + CommunityDIMixin, + DEFAULT_TARGET_PEERS, + INFINITE_TARGET_PEERS, + StrategyFactory, +) + + +class TriblerDiscoveryStrategies(CommunityDIMixin, DiscoveryCommunity): + def __init__(self, *args, mediator=None, **kwargs): + kwargs['max_peers'] = 100 + super().__init__(*args, **kwargs) + + self.init_community_di_mixin(strategies=[ + StrategyFactory(create_class=RandomChurn, target_peers=INFINITE_TARGET_PEERS), + StrategyFactory(create_class=PeriodicSimilarity, target_peers=INFINITE_TARGET_PEERS), + StrategyFactory(create_class=RandomWalk, target_peers=DEFAULT_TARGET_PEERS), + ]) diff --git a/src/tribler-core/tribler_core/modules/metadata_store/community/gigachannel_community.py b/src/tribler-core/tribler_core/modules/metadata_store/community/gigachannel_community.py index c04607711c3..737494385fa 100644 --- a/src/tribler-core/tribler_core/modules/metadata_store/community/gigachannel_community.py +++ b/src/tribler-core/tribler_core/modules/metadata_store/community/gigachannel_community.py @@ -4,18 +4,22 @@ from dataclasses import dataclass from random import sample +from ipv8.peerdiscovery.discovery import RandomWalk from ipv8.peerdiscovery.network import Network from ipv8.types import Peer from pony.orm import db_session from tribler_common.simpledefs import CHANNELS_VIEW_UUID, NTFY +from tribler_core.modules.community_di_mixin import CommunityDIMixin, INFINITE_TARGET_PEERS, StrategyFactory from tribler_core.modules.metadata_store.community.discovery_booster import DiscoveryBooster +from tribler_core.modules.metadata_store.community.sync_strategy import RemovePeers from tribler_core.modules.metadata_store.payload_checker import ObjState from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT from tribler_core.modules.metadata_store.utils import NoChannelSourcesException from tribler_core.modules.remote_query_community.community import RemoteQueryCommunity +from tribler_core.session import Mediator from tribler_core.utilities.unicode import hexlify minimal_blob_size = 200 @@ -64,7 +68,7 @@ def get_last_seen_peers_for_channel(self, channel_pk: bytes, channel_id: int, li return sorted(channel_peers, key=lambda x: x.last_response, reverse=True)[0:limit] -class GigaChannelCommunity(RemoteQueryCommunity): +class GigaChannelCommunity(CommunityDIMixin, RemoteQueryCommunity): community_id = unhexlify('d3512d0ff816d8ac672eab29a9c1a3a32e17cb13') def create_introduction_response(self, *args, introduction=None, extra_bytes=b'', prefix=None, new_style=False): @@ -77,12 +81,13 @@ def create_introduction_response(self, *args, introduction=None, extra_bytes=b'' new_style=new_style ) - def __init__(self, my_peer, endpoint, network, metadata_store, **kwargs): - self.notifier = kwargs.pop("notifier", None) - + def __init__(self, my_peer, endpoint, network, mediator: Mediator = None, max_peers = None, **kwargs): # ACHTUNG! We create a separate instance of Network for this community because it # walks aggressively and wants lots of peers, which can interfere with other communities - super().__init__(my_peer, endpoint, Network(), metadata_store, **kwargs) + super().__init__(my_peer, endpoint, Network(), max_peers=50, mediator=mediator, **kwargs) + + self.settings = mediator.config.chant + self.mediator = mediator.notifier # This set contains all the peers that we queried for subscribed channels over time. # It is emptied regularly. The purpose of this set is to work as a filter so we never query the same @@ -94,6 +99,11 @@ def __init__(self, my_peer, endpoint, network, metadata_store, **kwargs): self.channels_peers = ChannelsPeersMapping() + self.init_community_di_mixin(strategies=[ + StrategyFactory(create_class=RandomWalk, target_peers=30), + StrategyFactory(create_class=RemovePeers, target_peers=INFINITE_TARGET_PEERS), + ]) + def get_random_peers(self, sample_size=None): # Randomly sample sample_size peers from the complete list of our peers all_peers = self.get_peers() diff --git a/src/tribler-core/tribler_core/modules/popularity/community.py b/src/tribler-core/tribler_core/modules/popularity/community.py index 010c0e84043..bbea216fdc5 100644 --- a/src/tribler-core/tribler_core/modules/popularity/community.py +++ b/src/tribler-core/tribler_core/modules/popularity/community.py @@ -3,17 +3,21 @@ from binascii import unhexlify from ipv8.lazy_community import lazy_wrapper +from ipv8.peerdiscovery.discovery import RandomWalk from ipv8.peerdiscovery.network import Network from pony.orm import db_session +from tribler_core.modules.community_di_mixin import CommunityDIMixin, INFINITE_TARGET_PEERS, StrategyFactory +from tribler_core.modules.metadata_store.community.sync_strategy import RemovePeers from tribler_core.modules.popularity.payload import TorrentsHealthPayload from tribler_core.modules.popularity.version_community_mixin import VersionCommunityMixin from tribler_core.modules.remote_query_community.community import RemoteQueryCommunity +from tribler_core.session import Mediator from tribler_core.utilities.unicode import hexlify -class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): +class PopularityCommunity(CommunityDIMixin, RemoteQueryCommunity, VersionCommunityMixin): """ Community for disseminating the content across the network. @@ -30,11 +34,10 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648') - def __init__(self, my_peer, endpoint, network, **kwargs): - self.torrent_checker = kwargs.pop('torrent_checker', None) - + def __init__(self, my_peer, endpoint, network, mediator: Mediator = None, **kwargs): # Creating a separate instance of Network for this community to find more peers - super().__init__(my_peer, endpoint, Network(), **kwargs) + super().__init__(my_peer, endpoint, Network(), mediator=mediator, **kwargs) + self.torrent_checker = mediator.torrent_checker self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) @@ -47,6 +50,10 @@ def __init__(self, my_peer, endpoint, network, **kwargs): # Init version community message handlers self.init_version_community() + self.init_community_di_mixin(strategies=[ + StrategyFactory(create_class=RandomWalk, target_peers=30), + StrategyFactory(create_class=RemovePeers, target_peers=INFINITE_TARGET_PEERS), + ]) @staticmethod def select_torrents_to_gossip(torrents, include_popular=True, include_random=True) -> (set, set): diff --git a/src/tribler-core/tribler_core/modules/remote_query_community/community.py b/src/tribler-core/tribler_core/modules/remote_query_community/community.py index e61eecda1d6..8421dd9e606 100644 --- a/src/tribler-core/tribler_core/modules/remote_query_community/community.py +++ b/src/tribler-core/tribler_core/modules/remote_query_community/community.py @@ -3,6 +3,7 @@ from asyncio import Future from binascii import unhexlify +from ipv8.community import Community from ipv8.lazy_community import lazy_wrapper from ipv8.messaging.lazy_payload import VariablePayload, vp_compile from ipv8.requestcache import NumberCache, RandomNumberCache, RequestCache @@ -15,8 +16,8 @@ from tribler_core.modules.metadata_store.store import MetadataStore from tribler_core.modules.metadata_store.utils import RequestTimeoutException from tribler_core.modules.remote_query_community.eva_protocol import EVAProtocolMixin -from tribler_core.modules.remote_query_community.settings import RemoteQueryCommunitySettings -from tribler_core.modules.tribler_community import TriblerCommunity + +from tribler_core.session import Mediator from tribler_core.utilities.unicode import hexlify BINARY_FIELDS = ("infohash", "channel_pk") @@ -122,17 +123,16 @@ def on_timeout(self): pass -class RemoteQueryCommunity(TriblerCommunity, EVAProtocolMixin): +class RemoteQueryCommunity(Community, EVAProtocolMixin): """ Community for general purpose SELECT-like queries into remote Channels database """ - def __init__(self, my_peer, endpoint, network, metadata_store, rqc_settings: RemoteQueryCommunitySettings = None, - **kwargs): + def __init__(self, my_peer, endpoint, network, mediator: Mediator = None, **kwargs): super().__init__(my_peer, endpoint, network=network, **kwargs) - self.rqc_settings = rqc_settings or RemoteQueryCommunitySettings() - self.mds: MetadataStore = metadata_store + self.rqc_settings = mediator.config.remote_query_community + self.mds: MetadataStore = mediator.metadata_store # This object stores requests for "select" queries that we sent to other hosts. # We keep track of peers we actually requested for data so people can't randomly push spam at us. diff --git a/src/tribler-core/tribler_core/modules/tribler_community.py b/src/tribler-core/tribler_core/modules/tribler_community.py deleted file mode 100644 index 1116c9efc60..00000000000 --- a/src/tribler-core/tribler_core/modules/tribler_community.py +++ /dev/null @@ -1,12 +0,0 @@ -from ipv8.community import Community - -from tribler_core.config.tribler_config_section import TriblerConfigSection - - -class TriblerCommunity(Community): - """Base class for Tribler communities. - """ - def __init__(self, *args, settings: TriblerConfigSection = None, **kwargs): - super().__init__(*args, **kwargs) - self.settings = settings - self.logger.info(f'Init. Settings: {settings}.') diff --git a/src/tribler-core/tribler_core/modules/tunnel/community/community.py b/src/tribler-core/tribler_core/modules/tunnel/community/community.py index d8289a26f53..ffc771f8533 100644 --- a/src/tribler-core/tribler_core/modules/tunnel/community/community.py +++ b/src/tribler-core/tribler_core/modules/tunnel/community/community.py @@ -10,8 +10,9 @@ import async_timeout +from ipv8.dht.provider import DHTCommunityProvider from ipv8.messaging.anonymization.caches import CreateRequestCache -from ipv8.messaging.anonymization.community import unpack_cell, TunnelCommunity +from ipv8.messaging.anonymization.community import TunnelSettings, unpack_cell, TunnelCommunity from ipv8.messaging.anonymization.hidden_services import HiddenTunnelCommunity from ipv8.messaging.anonymization.payload import EstablishIntroPayload, NO_CRYPTO_PACKETS from ipv8.messaging.anonymization.tunnel import ( @@ -27,6 +28,7 @@ RelayRoute, ) from ipv8.peer import Peer +from ipv8.peerdiscovery.discovery import RandomWalk from ipv8.peerdiscovery.network import Network from ipv8.taskmanager import task from ipv8.types import Address @@ -35,6 +37,8 @@ from tribler_common.simpledefs import DLSTATUS_DOWNLOADING, DLSTATUS_METADATA, DLSTATUS_SEEDING, DLSTATUS_STOPPED, NTFY from tribler_core.modules.bandwidth_accounting.transaction import BandwidthTransactionData +from tribler_core.modules.community_di_mixin import CommunityDIMixin, INFINITE_TARGET_PEERS, StrategyFactory +from tribler_core.modules.metadata_store.community.sync_strategy import RemovePeers from tribler_core.modules.tunnel.community.caches import BalanceRequestCache, HTTPRequestCache from tribler_core.modules.tunnel.community.discovery import GoldenRatioStrategy from tribler_core.modules.tunnel.community.dispatcher import TunnelDispatcher @@ -49,6 +53,7 @@ ) from tribler_core.modules.tunnel.community.settings import TunnelCommunitySettings from tribler_core.modules.tunnel.socks5.server import Socks5Server +from tribler_core.session import Mediator from tribler_core.utilities.bencodecheck import is_bencoded from tribler_core.utilities.unicode import hexlify @@ -59,23 +64,28 @@ MAX_HTTP_PACKET_SIZE = 1400 -class TriblerTunnelCommunity(HiddenTunnelCommunity): +class TriblerTunnelCommunity(CommunityDIMixin, HiddenTunnelCommunity): """ This community is built upon the anonymous messaging layer in IPv8. It adds support for libtorrent anonymous downloads and bandwidth token payout when closing circuits. """ community_id = unhexlify('a3591a6bd89bbaca0974062a1287afcfbc6fd6bb') - def __init__(self, *args, exitnode_cache:Path = None, **kwargs): - self.config = kwargs.pop('config', TunnelCommunitySettings()) - self.notifier = kwargs.pop('notifier', None) - self.dlmgr = kwargs.pop('dlmgr', None) - num_competing_slots = kwargs.pop('competing_slots', 15) - num_random_slots = kwargs.pop('random_slots', 5) - self.bandwidth_community = kwargs.pop('bandwidth_community', None) - socks_listen_ports = kwargs.pop('socks_listen_ports', None) + def __init__(self, *args, exitnode_cache: Path = None, mediator: Mediator = None, **kwargs): + self.config = mediator.config.tunnel_community + self.notifier = mediator.notifier + self.dlmgr = mediator.download_manager + num_competing_slots = self.config.competing_slots + num_random_slots = self.config.random_slots + self.bandwidth_community = mediator.dictionary.pop('bandwidth_community', None) + self.dht_community = mediator.dictionary.pop('dht_community', None) + socks_listen_ports = self.config.socks5_listen_ports + self.settings = TunnelSettings() + self.settings.min_circuits = self.config.min_circuits + self.settings.max_circuits = self.config.max_circuits self.exitnode_cache = exitnode_cache - super().__init__(*args, **kwargs) + dht_provider = DHTCommunityProvider(self.dht_community, mediator.config.ipv8.port) + super().__init__(*args, ipv8=mediator.ipv8, dht_provider=dht_provider, **kwargs) self._use_main_thread = True if self.config.exitnode_enabled: @@ -118,6 +128,11 @@ def __init__(self, *args, exitnode_cache:Path = None, **kwargs): if self.exitnode_cache is not None: self.restore_exitnodes_from_disk() + self.init_community_di_mixin(strategies=[ + StrategyFactory(create_class=RandomWalk, target_peers=30), + StrategyFactory(create_class=RemovePeers, target_peers=INFINITE_TARGET_PEERS), + ]) + async def wait_for_socks_servers(self): # Wait for the socks server to be ready. Otherwise, hidden services downloads may fail. while any([name.startswith('start_socks_') for name in self._pending_tasks.keys()]): diff --git a/src/tribler-core/tribler_core/session.py b/src/tribler-core/tribler_core/session.py index 47f4fcffbd1..17afe159741 100644 --- a/src/tribler-core/tribler_core/session.py +++ b/src/tribler-core/tribler_core/session.py @@ -6,13 +6,13 @@ import signal import sys from asyncio import Event, get_event_loop -from dataclasses import dataclass +from dataclasses import dataclass, field +from typing import Optional -from ipv8.keyvault.private.libnaclkey import LibNaCLSK +import tribler_core.utilities.permid as permid_module +from ipv8.peer import Peer from ipv8.taskmanager import TaskManager - from ipv8_service import IPv8 - from tribler_common.network_utils import NetworkUtils from tribler_common.sentry_reporter.sentry_reporter import SentryReporter from tribler_common.simpledefs import ( @@ -27,10 +27,7 @@ STATE_START_WATCH_FOLDER, STATE_UPGRADING_READABLE, ) - -import tribler_core.utilities.permid as permid_module from tribler_core.config.tribler_config import TriblerConfig -from tribler_core.modules.community_loader import load_communities from tribler_core.modules.metadata_store.utils import generate_test_channels from tribler_core.modules.settings import Ipv8Settings from tribler_core.notifier import Notifier @@ -39,11 +36,31 @@ from tribler_core.utilities.unicode import hexlify +@dataclass +class Mediator: + config: TriblerConfig = None + notifier: Optional[Notifier] = None + ipv8: Optional[IPv8] = None + metadata_store = None + download_manager = None + torrent_checker = None + + dictionary: dict = field(default_factory=dict) + + +@dataclass +class CommunityFactory: + create_class: Optional[type] = None + kwargs: dict = field(default_factory=dict) + + async def create_ipv8( config: Ipv8Settings, state_dir, - prosthetic_session, ipv8_tasks, + communities_cls: list[CommunityFactory], + mediator: Mediator, + trustchain_keypair, core_test_mode=False): from ipv8.configuration import ConfigBuilder from ipv8.messaging.interfaces.dispatcher.endpoint import DispatcherEndpoint @@ -79,9 +96,20 @@ async def create_ipv8( from ipv8.bootstrapping.dispersy.bootstrapper import DispersyBootstrapper bootstrapper = DispersyBootstrapper(ip_addresses=[(address, int(port))], dns_addresses=[]) - load_communities(prosthetic_session.config, prosthetic_session.trustchain_keypair, ipv8, prosthetic_session.dlmgr, - prosthetic_session.mds, prosthetic_session.torrent_checker, prosthetic_session.notifier, - bootstrapper) + peer = Peer(trustchain_keypair) + mediator.ipv8 = ipv8 + for community_cls in communities_cls: + community = community_cls.create_class(peer, ipv8.endpoint, ipv8.network, mediator=mediator, **community_cls.kwargs) + community.fill_mediator(mediator) + + for strategy in community.strategies: + ipv8.strategies.append((strategy.create_class(community), strategy.target_peers)) + + if bootstrapper: + community.bootstrappers.append(bootstrapper) + + ipv8.overlays.append(community) + if config.statistics and not core_test_mode: # Enable gathering IPv8 statistics for overlay in ipv8.overlays: @@ -110,23 +138,6 @@ def create_in_state_dir(path): create_in_state_dir(STATEDIR_CHANNELS_DIR) -@dataclass -class ProstheticSession: - config: TriblerConfig = None - trustchain_keypair: LibNaCLSK = None - discovery_community: None = None - ipv8: IPv8 = None - remote_query_community: None = None - bandwidth_community: None = None - tunnel_community: None = None - dht_community: None = None - mds: None = None - torrent_checker: None = None - notifier: None = None - overlays: None = None - dlmgr: None = None - - def init_keypair(state_dir, keypair_filename): """ Set parameters that depend on state_dir. @@ -144,7 +155,12 @@ def init_keypair(state_dir, keypair_filename): return trustchain_keypair -async def core_session(config: TriblerConfig): +async def core_session( + config: TriblerConfig, + communities_cls: list[CommunityFactory]): + + mediator = Mediator(config=config) + mediator.config = config # In test mode, the Core does not communicate with the external world and the state dir is read-only logger = logging.getLogger("Session") @@ -158,6 +174,7 @@ async def core_session(config: TriblerConfig): patch_crypto_be_discovery() notifier = Notifier() + mediator.notifier = notifier logger.info("Session is using state directory: %s", config.state_dir) create_state_directory_structure(config.state_dir) @@ -209,7 +226,7 @@ async def core_session(config: TriblerConfig): notifier=notifier) state_endpoint.readable_status = STATE_UPGRADING_READABLE - api_manager.get_endpoint('upgrader').upgrader=upgrader + api_manager.get_endpoint('upgrader').upgrader = upgrader await upgrader.run() # On Mac, we bundle the root certificate for the SSL validation since Twisted is not using the root @@ -233,13 +250,8 @@ async def core_session(config: TriblerConfig): if config.core_test_mode: generate_test_channels(metadata_store) - prosthetic_session = ProstheticSession( - config=config, - trustchain_keypair=trustchain_keypair, - mds=metadata_store, - notifier=notifier, - overlays=[] - ) + mediator.metadata_store = metadata_store + ipv8_tasks = TaskManager() if config.tunnel_community.enabled: @@ -259,14 +271,14 @@ async def core_session(config: TriblerConfig): state_dir=config.state_dir, config=config.ipv8, ipv8_tasks=ipv8_tasks, - prosthetic_session=prosthetic_session, + communities_cls=communities_cls, + mediator=mediator, + trustchain_keypair=trustchain_keypair, core_test_mode=config.core_test_mode) from ipv8.messaging.anonymization.community import TunnelCommunity api_manager.get_endpoint('ipv8').initialize(ipv8) await ipv8.get_overlay(TunnelCommunity).wait_for_socks_servers() - - # Note that currently we should only start libtorrent after the SOCKS5 servers have been started if config.libtorrent.enabled: state_endpoint.readable_status = STATE_START_LIBTORRENT from tribler_core.modules.libtorrent.download_manager import DownloadManager @@ -279,8 +291,8 @@ async def core_session(config: TriblerConfig): tunnel_community=ipv8.get_overlay(TunnelCommunity), bootstrap_infohash=config.bootstrap.infohash, dummy_mode=config.core_test_mode) - # FIXME! Required by TunnelCommunity - prosthetic_session.dlmgr = download_manager + + mediator.download_manager = download_manager download_manager.initialize() state_endpoint.readable_status = STATE_LOAD_CHECKPOINTS @@ -294,6 +306,8 @@ async def core_session(config: TriblerConfig): await download_manager.start_download_from_uri( "magnet:?xt=urn:btih:0000000000000000000000000000000000000000") + # Note that currently we should only start libtorrent after the SOCKS5 servers have been started + api_manager.get_endpoint('settings').download_manager = download_manager state_endpoint.readable_status = STATE_READABLE_STARTED @@ -310,6 +324,7 @@ async def core_session(config: TriblerConfig): notifier=notifier, tracker_manager=tracker_manager, metadata_store=metadata_store) + mediator.torrent_checker = torrent_checker await torrent_checker.initialize() from tribler_core.modules.bandwidth_accounting.community import BandwidthAccountingCommunity