Skip to content

Commit

Permalink
Added signed values to DHTCommunity + added DHTDiscoveryCommunity
Browse files Browse the repository at this point in the history
  • Loading branch information
egbertbouman committed Jul 2, 2018
1 parent 6e5e53c commit fa593e5
Show file tree
Hide file tree
Showing 11 changed files with 780 additions and 193 deletions.
6 changes: 4 additions & 2 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ def load_ipv8_overlays(self):

# DHT Community
if self.session.config.get_dht_community_enabled():
from Tribler.community.dht.community import DHTCommunity
self.dht_community = DHTCommunity(peer, self.ipv8.endpoint, self.ipv8.network)
from Tribler.community.dht.discovery import DHTDiscoveryCommunity

dht_peer = Peer(self.session.trustchain_keypair)
self.dht_community = DHTDiscoveryCommunity(dht_peer, self.ipv8.endpoint, self.ipv8.network)
self.ipv8.overlays.append(self.dht_community)
self.ipv8.strategies.append((RandomWalk(self.dht_community), 20))

Expand Down
156 changes: 107 additions & 49 deletions Tribler/Test/Community/DHT/test_community.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import sys
import time

from twisted.internet.defer import succeed
from twisted.internet.defer import succeed, Deferred

from Tribler.Test.Core.base_test import MockObject
from Tribler.Test.ipv8_base import TestBase
from Tribler.Test.mocking.ipv8 import MockIPv8
from Tribler.Test.util.ipv8_util import twisted_wrapper
from Tribler.Test.Core.base_test import MockObject
from Tribler.community.dht.community import DHTCommunity
from Tribler.community.dht.routing import distance, Node
from Tribler.community.dht.routing import Node, distance


class TestDHTCommunity(TestBase):
Expand All @@ -16,6 +17,21 @@ def setUp(self):
super(TestDHTCommunity, self).setUp()
self.initialize(DHTCommunity, 2)

self.key = '\x00' * 20
self.value = 'test'
self.value_in_store = self.nodes[0].overlay.serialize_value(self.value, sign=False)
self.signed_in_store = self.nodes[0].overlay.serialize_value(self.value, sign=True)

now = time.time()
for node1 in self.nodes:
node1.overlay.cancel_pending_task('store_my_peer')
for node2 in self.nodes:
if node1 == node2:
continue
dht_node1 = Node(node1.my_peer.key, node1.my_peer.address)
dht_node2 = Node(node2.my_peer.key, node2.my_peer.address)
node1.overlay.tokens[dht_node2] = (now, node2.overlay.generate_token(dht_node1))

def create_node(self):
return MockIPv8(u"curve25519", DHTCommunity)

Expand All @@ -39,10 +55,12 @@ def test_routing_table(self):
@twisted_wrapper
def test_ping_pong(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.ping(self.nodes[1].my_peer)
yield self.deliver_messages()
self.assertEqual((yield d), self.nodes[1].my_peer)
node = yield self.nodes[0].overlay.ping(self.nodes[1].my_peer)
self.assertEqual(node, self.nodes[1].my_peer)

@twisted_wrapper
def test_ping_pong_fail(self):
yield self.introduce_nodes()
yield self.nodes[1].unload()
d = self.nodes[0].overlay.ping(self.nodes[1].my_peer)
yield self.deliver_messages()
Expand All @@ -56,53 +74,57 @@ def test_ping_all(self):
node1.failed = 1
node1.last_response = 0

yield self.nodes[0].overlay.ping_all()
self.nodes[0].overlay.ping_all()
yield self.deliver_messages()
self.assertTrue(node1.failed == 0)
self.assertNotEqual(node1.last_response, 0)

@twisted_wrapper
def test_ping_all_skip(self):
yield self.introduce_nodes()
bucket = self.nodes[0].overlay.routing_table.trie[u'']
node1 = bucket.get(self.nodes[1].overlay.my_node_id)
node1.failed = 1
node1.last_response = time.time()

yield self.nodes[0].overlay.ping_all()
self.assertTrue(node1.failed == 1)

@twisted_wrapper
def test_store(self):
def test_store_value(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.store('\00' * 20, 'test1')
yield self.deliver_messages()
self.assertIn(self.nodes[1].my_peer, (yield d))
self.assertEqual(self.nodes[1].overlay.storage.get('\00' * 20)[0], 'test1')
node = yield self.nodes[0].overlay.store_value(self.key, self.value)
self.assertIn(self.nodes[1].my_peer, node)
self.assertEqual(self.nodes[1].overlay.storage.get(self.key), [self.value_in_store])

@twisted_wrapper
def test_store_value_fail(self):
yield self.introduce_nodes()
self.nodes[1].unload()
d = self.nodes[0].overlay.store('\00' * 20, 'test2')
d = self.nodes[0].overlay.store_value(self.key, self.value)
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)
self.assertEqual(self.nodes[1].overlay.storage.get('\00' * 20)[0], 'test1')

@twisted_wrapper
def test_find_nodes(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.find_nodes('\00' * 20)
yield self.deliver_messages()
nodes = (yield d)
self.assertItemsEqual(nodes, [Node(n.my_peer.key.pub().key_to_bin(), n.my_peer.address) for n in self.nodes[1:]])
nodes = yield self.nodes[0].overlay.find_nodes(self.key)
self.assertItemsEqual(nodes, [Node(n.my_peer.key.pub().key_to_bin(), n.my_peer.address)
for n in self.nodes[1:]])

@twisted_wrapper
def test_find_values(self):
yield self.introduce_nodes()
self.nodes[1].overlay.storage.put('\00' * 20, 'test', 60)
d = self.nodes[0].overlay.find_values('\00' * 20)
yield self.deliver_messages()
values = (yield d)
self.assertEqual(values[0], 'test')
self.nodes[1].overlay.storage.put(self.key, self.value_in_store)
values = yield self.nodes[0].overlay.find_values(self.key)
self.assertIn((self.value, None), values)

@twisted_wrapper
def test_move_data(self):
self.nodes[0].overlay.storage.put(self.nodes[1].overlay.my_node_id, 'test', 60)
self.nodes[0].overlay.on_node_discovered(Node(self.nodes[1].overlay.my_peer.key,
self.nodes[1].overlay.my_peer.address))
yield self.deliver_messages()
self.assertIn('test', self.nodes[1].overlay.storage.get(self.nodes[1].overlay.my_node_id))
def test_find_values_signed(self):
yield self.introduce_nodes()
self.nodes[1].overlay.storage.put(self.key, self.signed_in_store)
values = yield self.nodes[0].overlay.find_values(self.key)
self.assertIn((self.value, self.nodes[0].my_peer.public_key.key_to_bin()), values)

@twisted_wrapper
def test_caching(self):
Expand All @@ -111,21 +133,20 @@ def test_caching(self):
self.add_node_to_experiment(node)

# Sort nodes based on distance to target
self.nodes.sort(key=lambda n: distance(n.overlay.my_node_id, '\x00' * 20), reverse=True)
self.nodes.sort(key=lambda n: distance(n.overlay.my_node_id, self.key), reverse=True)

self.nodes[0].overlay.on_node_discovered(Node(self.nodes[1].my_peer.key,
self.nodes[1].my_peer.address))
self.nodes[1].overlay.on_node_discovered(Node(self.nodes[2].my_peer.key,
self.nodes[2].my_peer.address))

self.nodes[2].overlay.storage.put('\x00' * 20, 'test1', 60)
yield self.nodes[0].overlay.find_values('\x00' * 20)
self.nodes[2].overlay.storage.put(self.key, self.value_in_store)
yield self.nodes[0].overlay.find_values(self.key)
yield self.deliver_messages()

self.assertEqual(self.nodes[1].overlay.storage.get('\x00' * 20), ['test1'])
self.assertEqual(self.nodes[1].overlay.storage.get(self.key), [self.value_in_store])

@twisted_wrapper
def test_maintenance(self):
def test_refresh(self):
yield self.introduce_nodes()
yield self.deliver_messages()

Expand All @@ -135,38 +156,75 @@ def test_maintenance(self):
mock = MockObject()
mock.is_called = False

# Refresh
self.nodes[0].overlay.find_values = lambda *args: setattr(mock, 'is_called', True)
self.nodes[0].overlay.maintenance()
self.nodes[0].overlay.find_values = lambda *args: setattr(mock, 'is_called', True) or succeed([])
self.nodes[0].overlay.value_maintenance()
self.assertNotEqual(bucket.last_changed, 0)
self.assertTrue(mock.is_called)

mock.is_called = False
prev_ts = bucket.last_changed
self.nodes[0].overlay.maintenance()
self.nodes[0].overlay.value_maintenance()
self.assertEqual(bucket.last_changed, prev_ts)
self.assertFalse(mock.is_called)

@twisted_wrapper
def test_republish(self):
yield self.introduce_nodes()
yield self.deliver_messages()

bucket = self.nodes[0].overlay.routing_table.get_bucket(self.nodes[1].overlay.my_node_id)
bucket.last_changed = 0

# Republish
mock = MockObject()
mock.is_called = False
self.nodes[0].overlay.storage.data['\x00' * 20] = [(0, 60, 'test')]
self.nodes[0].overlay.store = lambda *args: setattr(mock, 'is_called', True)
self.nodes[0].overlay.maintenance()

self.nodes[0].overlay.storage.put(self.key, self.value_in_store)
self.nodes[0].overlay.storage.items[self.key][0].last_update = 0
self.nodes[0].overlay._store = lambda *args: setattr(mock, 'is_called', True) or Deferred()
self.nodes[0].overlay.value_maintenance()
self.assertTrue(mock.is_called)

mock.is_called = False
self.nodes[0].overlay.storage.data['\x00' * 20] = [(sys.maxint, 60, 'test')]
self.nodes[0].overlay.maintenance()
self.nodes[0].overlay.storage.put(self.key, self.value_in_store)
self.nodes[0].overlay.storage.items[self.key][0].last_update = sys.maxint
self.nodes[0].overlay.value_maintenance()
self.assertFalse(mock.is_called)

@twisted_wrapper
def test_token(self):
dht_node = Node(self.nodes[1].my_peer.key, self.nodes[1].my_peer.address)

# Since the setup should have already have generated tokens, a direct store should work.
yield self.introduce_nodes()
yield self.nodes[0].overlay.store_on_nodes(self.key, [self.value_in_store], [dht_node])
yield self.deliver_messages()
self.assertEqual(self.nodes[1].overlay.storage.get(self.key), [self.value_in_store])

# Without tokens..
for node in self.nodes:
node.overlay.tokens.clear()
self.nodes[1].overlay.storage.items.clear()
yield self.introduce_nodes()
d = self.nodes[0].overlay.store_on_nodes(self.key, [self.value_in_store], [dht_node])
self.assertFailure(d, RuntimeError)
self.assertEqual(self.nodes[1].overlay.storage.get(self.key), [])

# With a bad token..
self.nodes[0].overlay.tokens[dht_node] = 'faketoken'
yield self.introduce_nodes()
d = self.nodes[0].overlay.store_on_nodes(self.key, [self.value_in_store], [dht_node])
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)
self.assertEqual(self.nodes[1].overlay.storage.get(self.key), [])


class TestDHTCommunityXL(TestBase):

def setUp(self):
super(TestDHTCommunityXL, self).setUp()
self.initialize(DHTCommunity, 25)
for node in self.nodes:
node.overlay.cancel_pending_task('store_peer')
node.overlay.ping = lambda _:succeed(None)

def create_node(self):
Expand All @@ -183,16 +241,16 @@ def test_full_protocol(self):

# Store key value pair
kv_pair = ('\x00' * 20, 'test1')
yield self.nodes[0].overlay.store(*kv_pair)
yield self.nodes[0].overlay.store_value(*kv_pair)

# Check if the closest nodes have now stored the key
for node in self.get_closest_nodes(kv_pair[0]):
self.assertTrue(node.overlay.storage.get(kv_pair[0]), kv_pair[1])

# Store another value under the same key
yield self.nodes[1].overlay.store('\x00' * 20, 'test2')
yield self.nodes[1].overlay.store_value('\x00' * 20, 'test2', sign=True)

# Check if we get both values
values = yield self.nodes[-1].overlay.find_values('\x00' * 20)
self.assertIn('test1', values)
self.assertIn('test2', values)
self.assertIn(('test1', None), values)
self.assertIn(('test2', self.nodes[1].my_peer.public_key.key_to_bin()), values)
106 changes: 106 additions & 0 deletions Tribler/Test/Community/DHT/test_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import time

from Tribler.Test.ipv8_base import TestBase
from Tribler.Test.mocking.ipv8 import MockIPv8
from Tribler.Test.util.ipv8_util import twisted_wrapper
from Tribler.community.dht.discovery import DHTDiscoveryCommunity
from Tribler.community.dht.routing import Node


class TestDHTDiscoveryCommunity(TestBase):

def setUp(self):
super(TestDHTDiscoveryCommunity, self).setUp()
self.initialize(DHTDiscoveryCommunity, 2)

now = time.time()
for node1 in self.nodes:
node1.overlay.cancel_pending_task('store_peer')
for node2 in self.nodes:
if node1 == node2:
continue
dht_node1 = Node(node1.my_peer.key, node1.my_peer.address)
dht_node2 = Node(node2.my_peer.key, node2.my_peer.address)
node1.overlay.tokens[dht_node2] = (now, node2.overlay.generate_token(dht_node1))

def create_node(self):
return MockIPv8(u"curve25519", DHTDiscoveryCommunity)

@twisted_wrapper
def test_store_peer(self):
yield self.introduce_nodes()
yield self.nodes[0].overlay.store_peer()
self.assertIn(self.nodes[0].my_peer.mid, self.nodes[1].overlay.store)
self.assertIn(self.nodes[0].my_peer.mid, self.nodes[0].overlay.store_for_me)

@twisted_wrapper
def test_store_peer_fail(self):
yield self.introduce_nodes()
self.nodes[1].unload()
d = self.nodes[0].overlay.store_peer()
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)

@twisted_wrapper
def test_connect_peer(self):
# Add a third node
node = MockIPv8(u"curve25519", DHTDiscoveryCommunity)
self.add_node_to_experiment(node)
yield self.introduce_nodes()

# Node1 is storing the peer of node0
self.nodes[1].overlay.store[self.nodes[0].my_peer.mid].append(self.nodes[0].my_peer)
self.nodes[0].overlay.store_for_me[self.nodes[0].my_peer.mid].append(self.nodes[1].my_peer)

org_func = self.nodes[1].overlay.create_puncture_request
def create_puncture_request(*args):
self.puncture_to = args[1]
return org_func(*args)
self.nodes[1].overlay.create_puncture_request = create_puncture_request

yield self.deliver_messages()
nodes = yield self.nodes[2].overlay.connect_peer(self.nodes[0].my_peer.mid)
self.assertEqual(self.puncture_to, self.nodes[2].my_peer.address)
self.assertIn(self.nodes[0].overlay.my_peer.public_key.key_to_bin(),
[n.public_key.key_to_bin() for n in nodes])

@twisted_wrapper
def test_connect_peer_fail(self):
yield self.introduce_nodes()
self.nodes[1].unload()
d = self.nodes[0].overlay.connect_peer(self.nodes[1].my_peer.mid)
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)

def test_ping_all(self):
self.nodes[0].overlay.ping = lambda n: setattr(self, 'pinged', n)
self.pinged = None

node1 = Node(self.nodes[1].my_peer.key, self.nodes[1].my_peer.address)
node1.last_response = time.time()
node1.last_query = time.time()

self.nodes[0].overlay.store[node1.mid].append(node1)
self.nodes[0].overlay.ping_all()
self.assertIn(node1, self.nodes[0].overlay.store[node1.mid])

node1.last_query -= 100
self.nodes[0].overlay.ping_all()
self.assertNotIn(node1, self.nodes[0].overlay.store[node1.mid])


self.nodes[0].overlay.store_for_me[node1.mid].append(node1)
self.nodes[0].overlay.ping_all()
self.assertEqual(self.pinged, None)
self.assertIn(node1.mid, self.nodes[0].overlay.store_for_me)

node1.last_response -= 30
self.nodes[0].overlay.ping_all()
self.assertEqual(self.pinged, node1)
self.assertIn(node1, self.nodes[0].overlay.store_for_me[node1.mid])

self.pinged = None
node1.failed = 3
self.nodes[0].overlay.ping_all()
self.assertEqual(self.pinged, None)
self.assertNotIn(node1, self.nodes[0].overlay.store_for_me[node1.mid])
Loading

0 comments on commit fa593e5

Please sign in to comment.