Skip to content

Commit

Permalink
Implemented Kademlia-based DHT community
Browse files Browse the repository at this point in the history
  • Loading branch information
egbertbouman committed Jun 1, 2018
1 parent b096b9b commit ec8b94a
Show file tree
Hide file tree
Showing 16 changed files with 1,305 additions and 7 deletions.
17 changes: 14 additions & 3 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
STATE_START_API_ENDPOINTS, STATE_START_WATCH_FOLDER, STATE_START_CREDIT_MINING)
from Tribler.community.market.wallet.dummy_wallet import DummyWallet1, DummyWallet2
from Tribler.community.market.wallet.tc_wallet import TrustchainWallet
from Tribler.community.dht.provider import DHTCommunityProvider
from Tribler.dispersy.taskmanager import TaskManager
from Tribler.dispersy.util import blockingCallFromThread, blocking_call_on_reactor_thread

Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(self):

self.credit_mining_manager = None
self.market_community = None
self.dht_community = None

def register(self, session, session_lock):
assert isInIOThread()
Expand Down Expand Up @@ -254,16 +256,25 @@ def load_ipv8_overlays(self):
self.ipv8.overlays.append(self.triblerchain_community)
self.ipv8.strategies.append((EdgeWalk(self.triblerchain_community), 20))

# DHT Community
if self.session.config.get_dht_community_enabled():
from Tribler.community.dht.community import DHTCommunity
self.dht_community = DHTCommunity(peer, self.ipv8.endpoint, self.ipv8.network)
self.ipv8.overlays.append(self.dht_community)
self.ipv8.strategies.append((RandomWalk(self.dht_community), 20))

# Tunnel Community
if self.session.config.get_tunnel_community_enabled():
tunnel_peer = Peer(self.session.trustchain_keypair)
if self.dht_community:
dht_provider = DHTCommunityProvider(self.dht_community, self.session.config.get_dispersy_port())
else:
dht_provider = MainlineDHTProvider(self.mainline_dht, self.session.config.get_dispersy_port())

from Tribler.community.triblertunnel.community import TriblerTunnelCommunity
self.tunnel_community = TriblerTunnelCommunity(tunnel_peer, self.ipv8.endpoint, self.ipv8.network,
tribler_session=self.session,
dht_provider=MainlineDHTProvider(
self.mainline_dht,
self.session.config.get_dispersy_port()),
dht_provider=dht_provider,
triblerchain_community=self.triblerchain_community)
self.ipv8.overlays.append(self.tunnel_community)
self.ipv8.strategies.append((RandomWalk(self.tunnel_community), 20))
Expand Down
3 changes: 3 additions & 0 deletions Tribler/Core/Config/config.spec
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,6 @@ history_size = integer(min=1, default=20)
enabled = boolean(default=True)
sources = string_list(default=list())
max_disk_space = integer(min=0, default=53687091200)

[dht_community]
enabled = boolean(default=False)
8 changes: 8 additions & 0 deletions Tribler/Core/Config/tribler_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,11 @@ def set_credit_mining_disk_space(self, value):

def get_credit_mining_disk_space(self):
return self.config['credit_mining']['max_disk_space']

# DHT Community

def set_dht_community_enabled(self, value):
self.config['dht_community']['enabled'] = value

def get_dht_community_enabled(self):
return self.config['dht_community']['enabled']
198 changes: 198 additions & 0 deletions Tribler/Test/Community/DHT/test_community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
import sys

from twisted.internet.defer import succeed

from Tribler.Test.ipv8_base import TestBase
from Tribler.Test.mocking.ipv8 import MockIPv8
from Tribler.Test.util.ipv8_util import twisted_wrapper
from Tribler.Test.Core.base_test import MockObject
from Tribler.community.dht.community import DHTCommunity
from Tribler.community.dht.routing import distance, Node


class TestDHTCommunity(TestBase):

def setUp(self):
super(TestDHTCommunity, self).setUp()
self.initialize(DHTCommunity, 2)

def create_node(self):
return MockIPv8(u"curve25519", DHTCommunity)

@twisted_wrapper
def test_routing_table(self):
yield self.introduce_nodes()
yield self.deliver_messages()

node0_id = self.nodes[0].overlay.my_node_id
node1_id = self.nodes[1].overlay.my_node_id

node0_bucket = self.nodes[0].overlay.routing_table.get_bucket(node1_id)
node1_bucket = self.nodes[1].overlay.routing_table.get_bucket(node0_id)

self.assertTrue(node0_bucket and node0_bucket.prefix_id == u'')
self.assertTrue(node1_bucket and node1_bucket.prefix_id == u'')

self.assertTrue(node1_bucket.get(node0_id))
self.assertTrue(node0_bucket.get(node1_id))

@twisted_wrapper
def test_ping_pong(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.ping(self.nodes[1].my_peer)
yield self.deliver_messages()
self.assertEqual((yield d), self.nodes[1].my_peer)

yield self.nodes[1].unload()
d = self.nodes[0].overlay.ping(self.nodes[1].my_peer)
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)

@twisted_wrapper
def test_ping_all(self):
yield self.introduce_nodes()
bucket = self.nodes[0].overlay.routing_table.trie[u'']
node1 = bucket.get(self.nodes[1].overlay.my_node_id)
node1.failed = 1
node1.last_response = 0

yield self.nodes[0].overlay.ping_all()
self.assertTrue(node1.failed == 0)
self.assertNotEqual(node1.last_response, 0)

node1.failed = 1
yield self.nodes[0].overlay.ping_all()
self.assertTrue(node1.failed == 1)

@twisted_wrapper
def test_store(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.store('\00' * 20, 'test1')
yield self.deliver_messages()
self.assertIn(self.nodes[1].my_peer, (yield d))
self.assertEqual(self.nodes[1].overlay.storage.get('\00' * 20)[0], 'test1')

yield self.introduce_nodes()
self.nodes[1].unload()
d = self.nodes[0].overlay.store('\00' * 20, 'test2')
yield self.deliver_messages()
self.assertFailure(d, RuntimeError)
self.assertEqual(self.nodes[1].overlay.storage.get('\00' * 20)[0], 'test1')

@twisted_wrapper
def test_find_nodes(self):
yield self.introduce_nodes()
d = self.nodes[0].overlay.find_nodes('\00' * 20)
yield self.deliver_messages()
nodes = (yield d)
self.assertItemsEqual(nodes, [Node(n.my_peer.key.pub().key_to_bin(), n.my_peer.address) for n in self.nodes[1:]])

@twisted_wrapper
def test_find_values(self):
yield self.introduce_nodes()
self.nodes[1].overlay.storage.put('\00' * 20, 'test', 60)
d = self.nodes[0].overlay.find_values('\00' * 20)
yield self.deliver_messages()
values = (yield d)
self.assertEqual(values[0], 'test')

@twisted_wrapper
def test_move_data(self):
self.nodes[0].overlay.storage.put(self.nodes[1].overlay.my_node_id, 'test', 60)
self.nodes[0].overlay.on_node_discovered(Node(self.nodes[1].overlay.my_peer.key,
self.nodes[1].overlay.my_peer.address))
yield self.deliver_messages()
self.assertIn('test', self.nodes[1].overlay.storage.get(self.nodes[1].overlay.my_node_id))

@twisted_wrapper
def test_caching(self):
# Add a third node
node = MockIPv8(u"curve25519", DHTCommunity)
self.add_node_to_experiment(node)

# Sort nodes based on distance to target
self.nodes.sort(key=lambda n: distance(n.overlay.my_node_id, '\x00' * 20), reverse=True)

self.nodes[0].overlay.on_node_discovered(Node(self.nodes[1].my_peer.key,
self.nodes[1].my_peer.address))
self.nodes[1].overlay.on_node_discovered(Node(self.nodes[2].my_peer.key,
self.nodes[2].my_peer.address))

self.nodes[2].overlay.storage.put('\x00' * 20, 'test1', 60)
yield self.nodes[0].overlay.find_values('\x00' * 20)
yield self.deliver_messages()

self.assertEqual(self.nodes[1].overlay.storage.get('\x00' * 20), ['test1'])

@twisted_wrapper
def test_maintenance(self):
yield self.introduce_nodes()
yield self.deliver_messages()

bucket = self.nodes[0].overlay.routing_table.get_bucket(self.nodes[1].overlay.my_node_id)
bucket.last_changed = 0

mock = MockObject()
mock.is_called = False

# Refresh
self.nodes[0].overlay.find_values = lambda *args: setattr(mock, 'is_called', True)
self.nodes[0].overlay.maintenance()
self.assertNotEqual(bucket.last_changed, 0)
self.assertTrue(mock.is_called)

mock.is_called = False
prev_ts = bucket.last_changed
self.nodes[0].overlay.maintenance()
self.assertEqual(bucket.last_changed, prev_ts)
self.assertFalse(mock.is_called)


# Republish
mock.is_called = False
self.nodes[0].overlay.storage.data['\x00' * 20] = [(0, 60, 'test')]
self.nodes[0].overlay.store = lambda *args: setattr(mock, 'is_called', True)
self.nodes[0].overlay.maintenance()
self.assertTrue(mock.is_called)

mock.is_called = False
self.nodes[0].overlay.storage.data['\x00' * 20] = [(sys.maxint, 60, 'test')]
self.nodes[0].overlay.maintenance()
self.assertFalse(mock.is_called)


class TestDHTCommunityXL(TestBase):

def setUp(self):
super(TestDHTCommunityXL, self).setUp()
self.initialize(DHTCommunity, 25)
for node in self.nodes:
node.overlay.ping = lambda _:succeed(None)

def create_node(self):
return MockIPv8(u"curve25519", DHTCommunity)

def get_closest_nodes(self, node_id, max_nodes=8):
return sorted(self.nodes, key=lambda n: distance(n.overlay.my_node_id, node_id))[:max_nodes]

@twisted_wrapper
def test_full_protocol(self):
# Fill routing tables
yield self.introduce_nodes()
yield self.deliver_messages()

# Store key value pair
kv_pair = ('\x00' * 20, 'test1')
yield self.nodes[0].overlay.store(*kv_pair)

# Check if the closest nodes have now stored the key
for node in self.get_closest_nodes(kv_pair[0]):
self.assertTrue(node.overlay.storage.get(kv_pair[0]), kv_pair[1])

# Store another value under the same key
yield self.nodes[1].overlay.store('\x00' * 20, 'test2')

# Check if we get both values
values = yield self.nodes[-1].overlay.find_values('\x00' * 20)
self.assertIn('test1', values)
self.assertIn('test2', values)
42 changes: 42 additions & 0 deletions Tribler/Test/Community/DHT/test_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from twisted.internet.defer import inlineCallbacks, succeed

from Tribler.community.dht.provider import DHTCommunityProvider
from Tribler.pyipv8.ipv8.util import blocking_call_on_reactor_thread
from Tribler.Test.Core.base_test import TriblerCoreTest, MockObject


class TestDHTProvider(TriblerCoreTest):

@blocking_call_on_reactor_thread
@inlineCallbacks
def setUp(self, annotate=True):
yield super(TestDHTProvider, self).setUp(annotate=annotate)

def mocked_find_values(key):
return succeed(['\x01\x01\x01\x01\x04\xd2'])

def mocked_store(key, value):
self.stored_value = value
return succeed([])

self.cb_invoked = False
self.stored_value = None
self.dhtcommunity = MockObject()
self.dhtcommunity.find_values = mocked_find_values
self.dhtcommunity.store = mocked_store
self.dhtcommunity.my_estimated_lan = '1.1.1.1'
self.dht_provider = DHTCommunityProvider(self.dhtcommunity, 1234)

def test_lookup(self):
def check_result(result):
self.cb_invoked = True
self.assertEqual(result, [('1.1.1.1', 1234)])
self.dht_provider.lookup('a' * 20, check_result)
self.assertTrue(self.cb_invoked)

def test_announce(self):
def check_result(result):
self.cb_invoked = True
self.dht_provider.announce('a' * 20, check_result)
self.assertTrue(self.cb_invoked)
self.assertEqual(self.stored_value, '\x01\x01\x01\x01\x04\xd2')
Loading

0 comments on commit ec8b94a

Please sign in to comment.