Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READY: DHTCommunity #3642

Merged
merged 1 commit into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
STATE_START_API_ENDPOINTS, STATE_START_WATCH_FOLDER, STATE_START_CREDIT_MINING)
from Tribler.community.market.wallet.dummy_wallet import DummyWallet1, DummyWallet2
from Tribler.community.market.wallet.tc_wallet import TrustchainWallet
from Tribler.community.dht.provider import DHTCommunityProvider
from Tribler.dispersy.taskmanager import TaskManager
from Tribler.dispersy.util import blockingCallFromThread, blocking_call_on_reactor_thread

Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(self):

self.credit_mining_manager = None
self.market_community = None
self.dht_community = None

def register(self, session, session_lock):
assert isInIOThread()
Expand Down Expand Up @@ -254,16 +256,25 @@ def load_ipv8_overlays(self):
self.ipv8.overlays.append(self.triblerchain_community)
self.ipv8.strategies.append((EdgeWalk(self.triblerchain_community), 20))

# DHT Community
if self.session.config.get_dht_community_enabled():
from Tribler.community.dht.community import DHTCommunity
self.dht_community = DHTCommunity(peer, self.ipv8.endpoint, self.ipv8.network)
self.ipv8.overlays.append(self.dht_community)
self.ipv8.strategies.append((RandomWalk(self.dht_community), 20))

# Tunnel Community
if self.session.config.get_tunnel_community_enabled():
tunnel_peer = Peer(self.session.trustchain_keypair)
if self.dht_community:
dht_provider = DHTCommunityProvider(self.dht_community, self.session.config.get_dispersy_port())
else:
dht_provider = MainlineDHTProvider(self.mainline_dht, self.session.config.get_dispersy_port())

from Tribler.community.triblertunnel.community import TriblerTunnelCommunity
self.tunnel_community = TriblerTunnelCommunity(tunnel_peer, self.ipv8.endpoint, self.ipv8.network,
tribler_session=self.session,
dht_provider=MainlineDHTProvider(
self.mainline_dht,
self.session.config.get_dispersy_port()),
dht_provider=dht_provider,
triblerchain_community=self.triblerchain_community)
self.ipv8.overlays.append(self.tunnel_community)
self.ipv8.strategies.append((RandomWalk(self.tunnel_community), 20))
Expand Down
3 changes: 3 additions & 0 deletions Tribler/Core/Config/config.spec
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,6 @@ history_size = integer(min=1, default=20)
enabled = boolean(default=True)
sources = string_list(default=list())
max_disk_space = integer(min=0, default=53687091200)

[dht_community]
enabled = boolean(default=False)
8 changes: 8 additions & 0 deletions Tribler/Core/Config/tribler_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,11 @@ def set_credit_mining_disk_space(self, value):

def get_credit_mining_disk_space(self):
return self.config['credit_mining']['max_disk_space']

# DHT Community

def set_dht_community_enabled(self, value):
self.config['dht_community']['enabled'] = value

def get_dht_community_enabled(self):
return self.config['dht_community']['enabled']
198 changes: 198 additions & 0 deletions Tribler/Test/Community/DHT/test_community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import sys

from twisted.internet.defer import succeed

from Tribler.Test.ipv8_base import TestBase
from Tribler.Test.mocking.ipv8 import MockIPv8
from Tribler.Test.util.ipv8_util import twisted_wrapper
from Tribler.Test.Core.base_test import MockObject
from Tribler.community.dht.community import DHTCommunity
from Tribler.community.dht.routing import distance, Node


class TestDHTCommunity(TestBase):

def setUp(self):
super(TestDHTCommunity, self).setUp()
self.initialize(DHTCommunity, 2)

def create_node(self):
return MockIPv8(u"curve25519", DHTCommunity)

@twisted_wrapper
def test_routing_table(self):
yield self.introduce_nodes()
yield self.deliver_messages()

node0_id = self.nodes[0].overlay.my_node_id
node1_id = self.nodes[1].overlay.my_node_id

node0_bucket = self.nodes[0].overlay.routing_table.get_bucket(node1_id)
node1_bucket = self.nodes[1].overlay.routing_table.get_bucket(node0_id)

self.assertTrue(node0_bucket and node0_bucket.prefix_id == u'')
self.assertTrue(node1_bucket and node1_bucket.prefix_id == u'')

self.assertTrue(node1_bucket.get(node0_id))
self.assertTrue(node0_bucket.get(node1_id))

@twisted_wrapper
def test_ping_pong(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.ping(self.nodes[1].my_peer)
yield self.deliver_messages()
self.assertEqual((yield d), self.nodes[1].my_peer)

yield self.nodes[1].unload()
d = self.nodes[0].overlay.ping(self.nodes[1].my_peer)
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)

@twisted_wrapper
def test_ping_all(self):
yield self.introduce_nodes()
bucket = self.nodes[0].overlay.routing_table.trie[u'']
node1 = bucket.get(self.nodes[1].overlay.my_node_id)
node1.failed = 1
node1.last_response = 0

yield self.nodes[0].overlay.ping_all()
self.assertTrue(node1.failed == 0)
self.assertNotEqual(node1.last_response, 0)

node1.failed = 1
yield self.nodes[0].overlay.ping_all()
self.assertTrue(node1.failed == 1)

@twisted_wrapper
def test_store(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.store('\00' * 20, 'test1')
yield self.deliver_messages()
self.assertIn(self.nodes[1].my_peer, (yield d))
self.assertEqual(self.nodes[1].overlay.storage.get('\00' * 20)[0], 'test1')

yield self.introduce_nodes()
self.nodes[1].unload()
d = self.nodes[0].overlay.store('\00' * 20, 'test2')
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)
self.assertEqual(self.nodes[1].overlay.storage.get('\00' * 20)[0], 'test1')

@twisted_wrapper
def test_find_nodes(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.find_nodes('\00' * 20)
yield self.deliver_messages()
nodes = (yield d)
self.assertItemsEqual(nodes, [Node(n.my_peer.key.pub().key_to_bin(), n.my_peer.address) for n in self.nodes[1:]])

@twisted_wrapper
def test_find_values(self):
yield self.introduce_nodes()
self.nodes[1].overlay.storage.put('\00' * 20, 'test', 60)
d = self.nodes[0].overlay.find_values('\00' * 20)
yield self.deliver_messages()
values = (yield d)
self.assertEqual(values[0], 'test')

@twisted_wrapper
def test_move_data(self):
self.nodes[0].overlay.storage.put(self.nodes[1].overlay.my_node_id, 'test', 60)
self.nodes[0].overlay.on_node_discovered(Node(self.nodes[1].overlay.my_peer.key,
self.nodes[1].overlay.my_peer.address))
yield self.deliver_messages()
self.assertIn('test', self.nodes[1].overlay.storage.get(self.nodes[1].overlay.my_node_id))

@twisted_wrapper
def test_caching(self):
# Add a third node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's easier to use https://github.com/Tribler/py-ipv8/blob/master/ipv8/test/base.py#L93, instead of creating a new MockIPv8 instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed that function. Thanks!

node = MockIPv8(u"curve25519", DHTCommunity)
self.add_node_to_experiment(node)

# Sort nodes based on distance to target
self.nodes.sort(key=lambda n: distance(n.overlay.my_node_id, '\x00' * 20), reverse=True)

self.nodes[0].overlay.on_node_discovered(Node(self.nodes[1].my_peer.key,
self.nodes[1].my_peer.address))
self.nodes[1].overlay.on_node_discovered(Node(self.nodes[2].my_peer.key,
self.nodes[2].my_peer.address))

self.nodes[2].overlay.storage.put('\x00' * 20, 'test1', 60)
yield self.nodes[0].overlay.find_values('\x00' * 20)
yield self.deliver_messages()

self.assertEqual(self.nodes[1].overlay.storage.get('\x00' * 20), ['test1'])

@twisted_wrapper
def test_maintenance(self):
yield self.introduce_nodes()
yield self.deliver_messages()

bucket = self.nodes[0].overlay.routing_table.get_bucket(self.nodes[1].overlay.my_node_id)
bucket.last_changed = 0

mock = MockObject()
mock.is_called = False

# Refresh
self.nodes[0].overlay.find_values = lambda *args: setattr(mock, 'is_called', True)
self.nodes[0].overlay.maintenance()
self.assertNotEqual(bucket.last_changed, 0)
self.assertTrue(mock.is_called)

mock.is_called = False
prev_ts = bucket.last_changed
self.nodes[0].overlay.maintenance()
self.assertEqual(bucket.last_changed, prev_ts)
self.assertFalse(mock.is_called)


# Republish
mock.is_called = False
self.nodes[0].overlay.storage.data['\x00' * 20] = [(0, 60, 'test')]
self.nodes[0].overlay.store = lambda *args: setattr(mock, 'is_called', True)
self.nodes[0].overlay.maintenance()
self.assertTrue(mock.is_called)

mock.is_called = False
self.nodes[0].overlay.storage.data['\x00' * 20] = [(sys.maxint, 60, 'test')]
self.nodes[0].overlay.maintenance()
self.assertFalse(mock.is_called)


class TestDHTCommunityXL(TestBase):

def setUp(self):
super(TestDHTCommunityXL, self).setUp()
self.initialize(DHTCommunity, 25)
for node in self.nodes:
node.overlay.ping = lambda _:succeed(None)

def create_node(self):
return MockIPv8(u"curve25519", DHTCommunity)

def get_closest_nodes(self, node_id, max_nodes=8):
return sorted(self.nodes, key=lambda n: distance(n.overlay.my_node_id, node_id))[:max_nodes]

@twisted_wrapper
def test_full_protocol(self):
# Fill routing tables
yield self.introduce_nodes()
yield self.deliver_messages()

# Store key value pair
kv_pair = ('\x00' * 20, 'test1')
yield self.nodes[0].overlay.store(*kv_pair)

# Check if the closest nodes have now stored the key
for node in self.get_closest_nodes(kv_pair[0]):
self.assertTrue(node.overlay.storage.get(kv_pair[0]), kv_pair[1])

# Store another value under the same key
yield self.nodes[1].overlay.store('\x00' * 20, 'test2')

# Check if we get both values
values = yield self.nodes[-1].overlay.find_values('\x00' * 20)
self.assertIn('test1', values)
self.assertIn('test2', values)
42 changes: 42 additions & 0 deletions Tribler/Test/Community/DHT/test_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from twisted.internet.defer import inlineCallbacks, succeed

from Tribler.community.dht.provider import DHTCommunityProvider
from Tribler.pyipv8.ipv8.util import blocking_call_on_reactor_thread
from Tribler.Test.Core.base_test import TriblerCoreTest, MockObject


class TestDHTProvider(TriblerCoreTest):

@blocking_call_on_reactor_thread
@inlineCallbacks
def setUp(self, annotate=True):
yield super(TestDHTProvider, self).setUp(annotate=annotate)

def mocked_find_values(key):
return succeed(['\x01\x01\x01\x01\x04\xd2'])

def mocked_store(key, value):
self.stored_value = value
return succeed([])

self.cb_invoked = False
self.stored_value = None
self.dhtcommunity = MockObject()
self.dhtcommunity.find_values = mocked_find_values
self.dhtcommunity.store = mocked_store
self.dhtcommunity.my_estimated_lan = '1.1.1.1'
self.dht_provider = DHTCommunityProvider(self.dhtcommunity, 1234)

def test_lookup(self):
def check_result(result):
self.cb_invoked = True
self.assertEqual(result, [('1.1.1.1', 1234)])
self.dht_provider.lookup('a' * 20, check_result)
self.assertTrue(self.cb_invoked)

def test_announce(self):
def check_result(result):
self.cb_invoked = True
self.dht_provider.announce('a' * 20, check_result)
self.assertTrue(self.cb_invoked)
self.assertEqual(self.stored_value, '\x01\x01\x01\x01\x04\xd2')
Loading