Skip to content

Commit

Permalink
Introduce init and unload methods in ipv8_component
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Oct 1, 2021
1 parent 6c96306 commit 4f05665
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
from ipv8.peerdiscovery.discovery import RandomWalk

from ipv8_service import IPv8

from tribler_common.simpledefs import STATEDIR_DB_DIR

from tribler_core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
Expand All @@ -17,36 +13,34 @@
class BandwidthAccountingComponent(RestfulComponent):
community: BandwidthAccountingCommunity

_ipv8: IPv8
_ipv8_component: Ipv8Component

async def run(self):
await super().run()
await self.get_component(UpgradeComponent)
config = self.session.config

ipv8_component = await self.require_component(Ipv8Component)
self._ipv8 = ipv8_component.ipv8
peer = ipv8_component.peer
self._ipv8_component = await self.require_component(Ipv8Component)

config = self.session.config
if config.general.testnet or config.bandwidth_accounting.testnet:
bandwidth_cls = BandwidthAccountingTestnetCommunity
else:
bandwidth_cls = BandwidthAccountingCommunity

db_name = "bandwidth_gui_test.db" if config.gui_test_mode else f"{bandwidth_cls.DB_NAME}.db"
database_path = config.state_dir / STATEDIR_DB_DIR / db_name
database = BandwidthDatabase(database_path, peer.public_key.key_to_bin())
community = bandwidth_cls(peer, self._ipv8.endpoint, self._ipv8.network,
settings=config.bandwidth_accounting,
database=database)
self._ipv8.add_strategy(community, RandomWalk(community), 20)
database = BandwidthDatabase(database_path, self._ipv8_component.peer.public_key.key_to_bin())
self.community = bandwidth_cls(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
self._ipv8_component.ipv8.network,
settings=config.bandwidth_accounting,
database=database)

community.bootstrappers.append(ipv8_component.make_bootstrapper())
self._ipv8_component.initialise_community_by_default(self.community)

self.community = community
await self.init_endpoints(endpoints=['trustview', 'bandwidth'],
values={'bandwidth_db': community.database, 'bandwidth_community': community})
values={'bandwidth_db': self.community.database,
'bandwidth_community': self.community})

async def shutdown(self):
await super().shutdown()
await self._ipv8.unload_overlay(self.community)
await self._ipv8_component.unload_community(self.community)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ async def test_bandwidth_accounting_component(tribler_config):
comp = BandwidthAccountingComponent.instance()
assert comp.started.is_set() and not comp.failed
assert comp.community
assert comp._ipv8
assert comp._ipv8_component

await session.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@
GigaChannelTestnetCommunity,
)
from tribler_core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler_core.components.ipv8 import Ipv8Component
from tribler_core.components.ipv8 import INFINITE, Ipv8Component
from tribler_core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler_core.components.reporter import ReporterComponent
from tribler_core.components.restapi import RestfulComponent

INFINITE = -1


class GigaChannelComponent(RestfulComponent):
community: GigaChannelCommunity

_ipv8: IPv8
_ipv8_component: Ipv8Component

async def run(self):
await super().run()
Expand All @@ -27,17 +26,14 @@ async def run(self):
config = self.session.config
notifier = self.session.notifier

ipv8_component = await self.require_component(Ipv8Component)
self._ipv8 = ipv8_component.ipv8
peer = ipv8_component.peer

self._ipv8_component = await self.require_component(Ipv8Component)
metadata_store_component = await self.require_component(MetadataStoreComponent)

giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity
community = giga_channel_cls(
peer,
self._ipv8.endpoint,
self._ipv8.network,
self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
self._ipv8_component.ipv8.network,
notifier=notifier,
settings=config.chant,
rqc_settings=config.remote_query_community,
Expand All @@ -46,15 +42,15 @@ async def run(self):
)
self.community = community

self._ipv8.add_strategy(community, RandomWalk(community), 30)
self._ipv8.add_strategy(community, RemovePeers(community), INFINITE)
self._ipv8_component.ipv8.add_strategy(community, RandomWalk(community), 30)
self._ipv8_component.ipv8.add_strategy(community, RemovePeers(community), INFINITE)

community.bootstrappers.append(ipv8_component.make_bootstrapper())
community.bootstrappers.append(self._ipv8_component.make_bootstrapper())

await self.init_endpoints(endpoints=['remote_query', 'channels', 'collections'],
values={'gigachannel_community': community})

async def shutdown(self):
await super().shutdown()
if self._ipv8:
await self._ipv8.unload_overlay(self.community)
await self._ipv8_component.unload_community(self.community)

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ async def test_giga_channel_component(tribler_config):
comp = GigaChannelComponent.instance()
assert comp.started.is_set() and not comp.failed
assert comp.community
assert comp._ipv8
assert comp._ipv8_component

await session.shutdown()
20 changes: 16 additions & 4 deletions src/tribler-core/tribler_core/components/ipv8.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ async def run(self):
config.ipv8.walk_scaling_upper_limit).start(self._task_manager)

if config.dht.enabled:
self.init_dht_discovery_community()
self._init_dht_discovery_community()

if not self.session.config.gui_test_mode:
if config.discovery_community.enabled:
self.init_peer_discovery_community()
self._init_peer_discovery_community()
else:
if config.dht.enabled:
self.dht_discovery_community.routing_tables[UDPv4Address] = RoutingTable('\x00' * 20)
Expand All @@ -89,14 +89,26 @@ async def run(self):
'asyncio', 'attestation', 'dht', 'identity', 'isolation', 'network', 'noblockdht', 'overlays'
])

def initialise_community_by_default(self, community):
community.bootstrappers.append(self.make_bootstrapper())

# Value of `target_peers` must not be equal to the value of `max_peers` for this community.
# This causes a deformed network topology and makes it harder for peers to connect to others.
# More information: https://github.com/Tribler/py-ipv8/issues/979#issuecomment-896643760
random_walk_max_peers = max(10, community.max_peers - 10)
self.ipv8.add_strategy(community, RandomWalk(community), random_walk_max_peers)

async def unload_community(self, community):
await self.ipv8.unload_overlay(community)

def make_bootstrapper(self) -> DispersyBootstrapper:
args = DISPERSY_BOOTSTRAPPER['init']
if bootstrap_override := self.session.config.ipv8.bootstrap_override:
address, port = bootstrap_override.split(':')
args = {'ip_addresses': [(address, int(port))], 'dns_addresses': []}
return DispersyBootstrapper(**args)

def init_peer_discovery_community(self):
def _init_peer_discovery_community(self):
ipv8 = self.ipv8
community = DiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=100)
ipv8.add_strategy(community, RandomChurn(community), INFINITE)
Expand All @@ -105,7 +117,7 @@ def init_peer_discovery_community(self):
community.bootstrappers.append(self.make_bootstrapper())
self._peer_discovery_community = community

def init_dht_discovery_community(self):
def _init_dht_discovery_community(self):
ipv8 = self.ipv8
community = DHTDiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=60)
ipv8.add_strategy(community, PingChurn(community), INFINITE)
Expand Down
30 changes: 14 additions & 16 deletions src/tribler-core/tribler_core/components/popularity.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,39 @@
from ipv8.peerdiscovery.discovery import RandomWalk
from ipv8_service import IPv8
from tribler_core.components.base import Component
from tribler_core.components.ipv8 import Ipv8Component
from tribler_core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler_core.components.ipv8 import INFINITE, Ipv8Component
from tribler_core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler_core.components.reporter import ReporterComponent
from tribler_core.components.torrent_checker import TorrentCheckerComponent
from tribler_core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler_core.modules.popularity.community import PopularityCommunity

INFINITE = -1


class PopularityComponent(Component):
community: PopularityCommunity
_ipv8: IPv8

_ipv8_component: Ipv8Component

async def run(self):
await super().run()
await self.get_component(ReporterComponent)

config = self.session.config
ipv8_component = await self.require_component(Ipv8Component)
self._ipv8 = ipv8_component.ipv8
peer = ipv8_component.peer
self._ipv8_component = await self.require_component(Ipv8Component)
metadata_store_component = await self.require_component(MetadataStoreComponent)
torrent_checker_component = await self.require_component(TorrentCheckerComponent)

community = PopularityCommunity(peer, self._ipv8.endpoint, self._ipv8.network,
config = self.session.config
community = PopularityCommunity(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
self._ipv8_component.ipv8.network,
settings=config.popularity_community,
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
torrent_checker=torrent_checker_component.torrent_checker)
self.community = community

self._ipv8.add_strategy(community, RandomWalk(community), 30)
self._ipv8.add_strategy(community, RemovePeers(community), INFINITE)

community.bootstrappers.append(ipv8_component.make_bootstrapper())
self._ipv8_component.initialise_community_by_default(community)
self._ipv8_component.ipv8.add_strategy(community, RemovePeers(community), INFINITE)

async def shutdown(self):
await self._ipv8.unload_overlay(self.community)
await super().shutdown()
await self._ipv8_component.unload_community(self.community)
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def test_popularity_component(tribler_config):

comp = PopularityComponent.instance()
assert comp.community
assert comp._ipv8
assert comp._ipv8_component

await session.shutdown()

Expand Down Expand Up @@ -195,7 +195,7 @@ async def test_tunnels_component(tribler_config):
comp = TunnelsComponent.instance()
assert comp.started.is_set() and not comp.failed
assert comp.community
assert comp._ipv8
assert comp._ipv8_component

await session.shutdown()

Expand Down
59 changes: 24 additions & 35 deletions src/tribler-core/tribler_core/components/tunnels.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
from ipv8.dht.provider import DHTCommunityProvider
from ipv8.messaging.anonymization.community import TunnelSettings
from ipv8.peerdiscovery.discovery import RandomWalk

from ipv8_service import IPv8

from tribler_core.components.bandwidth_accounting.bandwidth_accounting_component import BandwidthAccountingComponent
from tribler_core.components.ipv8 import Ipv8Component
from tribler_core.components.ipv8 import INFINITE, Ipv8Component
from tribler_core.components.libtorrent import LibtorrentComponent
from tribler_core.components.restapi import RestfulComponent
from tribler_core.components.socks_configurator import SocksServersComponent
from tribler_core.modules.tunnel.community.community import TriblerTunnelCommunity, TriblerTunnelTestnetCommunity
from tribler_core.modules.tunnel.community.discovery import GoldenRatioStrategy

INFINITE = -1


class TunnelsComponent(RestfulComponent):
community: TriblerTunnelCommunity
_ipv8: IPv8

_ipv8_component: Ipv8Component

async def run(self):
await super().run()

config = self.session.config
ipv8_component = await self.require_component(Ipv8Component)
self._ipv8 = ipv8_component.ipv8
peer = ipv8_component.peer
dht_discovery_community = ipv8_component.dht_discovery_community
self._ipv8_component = await self.require_component(Ipv8Component)
dht_discovery_community = self._ipv8_component.dht_discovery_community

bandwidth_component = await self.get_component(BandwidthAccountingComponent)
bandwidth_community = bandwidth_component.community if bandwidth_component else None
Expand All @@ -38,6 +31,7 @@ async def run(self):
socks_servers = socks_servers_component.socks_servers if socks_servers_component else None

settings = TunnelSettings()
config = self.session.config
settings.min_circuits = config.tunnel_community.min_circuits
settings.max_circuits = config.tunnel_community.max_circuits

Expand All @@ -50,29 +44,24 @@ async def run(self):
exitnode_cache = config.state_dir / "exitnode_cache.dat"

# TODO: decouple bandwidth community and dlmgr to initiate later
community = tunnel_cls(peer, self._ipv8.endpoint, self._ipv8.network,
socks_servers=socks_servers,
config=config.tunnel_community,
notifier=self.session.notifier,
dlmgr=download_manager,
bandwidth_community=bandwidth_community,
dht_provider=provider,
exitnode_cache=exitnode_cache,
settings=settings)

# Value of `target_peers` must not be equal to the value of `max_peers` for this community.
# This causes a deformed network topology and makes it harder for peers to connect to others.
# More information: https://github.com/Tribler/py-ipv8/issues/979#issuecomment-896643760
self._ipv8.add_strategy(community, RandomWalk(community), 20)
self._ipv8.add_strategy(community, GoldenRatioStrategy(community), INFINITE)

community.bootstrappers.append(ipv8_component.make_bootstrapper())

self.community = community

await self.init_endpoints(endpoints=['downloads', 'debug'], values={'tunnel_community': community})
await self.init_ipv8_endpoints(self._ipv8, endpoints=['tunnel'])
self.community = tunnel_cls(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
self._ipv8_component.ipv8.network,
socks_servers=socks_servers,
config=config.tunnel_community,
notifier=self.session.notifier,
dlmgr=download_manager,
bandwidth_community=bandwidth_community,
dht_provider=provider,
exitnode_cache=exitnode_cache,
settings=settings)

self._ipv8_component.initialise_community_by_default(self.community)
self._ipv8_component.ipv8.add_strategy(self.community, GoldenRatioStrategy(self.community), INFINITE)

await self.init_endpoints(endpoints=['downloads', 'debug'], values={'tunnel_community': self.community})
await self.init_ipv8_endpoints(self._ipv8_component.ipv8, endpoints=['tunnel'])

async def shutdown(self):
await super().shutdown()
await self._ipv8.unload_overlay(self.community)
await self._ipv8_component.unload_community(self.community)

0 comments on commit 4f05665

Please sign in to comment.