From 4e50d5db8f05baa9e3972984ce01176f149c67fe Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 9 Aug 2023 17:22:34 +0200 Subject: [PATCH] fix broken delay aggregation for alpha > 1 --- dht/dht.py | 28 +++++++++++++++++++--------- tests/test_network.py | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/dht/dht.py b/dht/dht.py index 6dc7d40..c454235 100644 --- a/dht/dht.py +++ b/dht/dht.py @@ -51,12 +51,6 @@ def lookup_for_hash(self, key: Hash): 'aggrDelay': 0, } - closestnodes = self.rt.get_closest_nodes_to(key) - nodestotry = closestnodes.copy() - triednodes = deque() - lookupvalue = "" # TODO: hardcoded to string - stepscnt = 0 - concurrency = 0 def has_closer_nodes(prev, new): for n, dist in new.items(): if n in prev: @@ -68,9 +62,17 @@ def has_closer_nodes(prev, new): continue return False + closestnodes = self.rt.get_closest_nodes_to(key) + nodestotry = closestnodes.copy() + triednodes = deque() + lookupvalue = "" # TODO: hardcoded to string + stepscnt = 0 + while (stepscnt < self.lookupsteptostop) and (len(nodestotry) > 0): # ask queued nodes to try nodes = nodestotry.copy() + aggregateddelay = 0 + concurrency = 0 for node in nodes: nodestotry.pop(node) # remove item from peers to attempt if node in triednodes: # make sure we don't contact the same node twice @@ -79,11 +81,12 @@ def has_closer_nodes(prev, new): lookupsummary['connectionAttempts'] += 1 try: connection, conndelay = self.network.connect_to_node(self.ID, node) - lookupsummary['aggrDelay'] += conndelay newnodes, val, ok, closestdelay = connection.get_closest_nodes_to(key) if ok: lookupvalue = val - lookupsummary['aggrDelay'] += closestdelay + operationdelay = conndelay + closestdelay + if operationdelay > aggregateddelay: + aggregateddelay = operationdelay lookupsummary['successfulCons'] += 1 if has_closer_nodes(closestnodes, newnodes): stepscnt = 0 @@ -93,10 +96,17 @@ def has_closer_nodes(prev, new): else: stepscnt += 1 except ConnectionError: + # count the delay of the connection attempt + # TODO: take into account fast vs slow errors (https://github.com/cortze/py-dht/issues/14) + if self.network.delayrange is not None: + connErrorDelay = random.sample(self.network.delayrange, 1)[0] + if connErrorDelay > aggregateddelay: + aggregateddelay = connErrorDelay lookupsummary['failedCons'] += 1 stepscnt += 1 concurrency += 1 - if concurrency >= self.alpha or stepscnt: + if concurrency >= self.alpha or stepscnt >= self.lookupsteptostop: + lookupsummary['aggrDelay'] += aggregateddelay break # finish with the summary diff --git a/tests/test_network.py b/tests/test_network.py index 13df9ee..c6566f9 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -145,7 +145,7 @@ def test_network_initialization(self): def test_dht_interop(self): """ test if the nodes in the network actually route to the closest peer, and implicidly, if the DHTclient interface works """ k = 10 - size = 500 + size = 500 id = 0 errorrate = 0 # apply an error rate of 0 (to check if the logic pases) delayrange = None # ms @@ -306,6 +306,42 @@ def test_aggregated_delays(self): self.assertGreater(aggrdelay, bestdelay) self.assertLess(aggrdelay, worstdelay) + def test_aggregated_delays_and_alpha(self): + """ test if the interaction between the nodes in the network actually generate a compounded delay """ + size = 500 + i = 0 + k = 10 + jobs = 2 + alpha = 3 + beta = k + stepstostop = 3 + errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + + delay = 50 # ms + delayrange = [delay, delay] # ms + + # init the network + n = DHTNetwork(i, errorrate, delayrange) + _ = n.init_with_random_peers(jobs, size, k, alpha, beta, stepstostop) + + # use random node as lookup point + publishernodeid = random.sample(range(1, size), 1)[0] + pnode = n.nodestore.get_node(publishernodeid) + self.assertNotEqual(pnode.network.len(), 0) + + # lookup + randomSegment = "my rollup sample" + segH = Hash(randomSegment) + interestednodeid = random.sample(range(1, size), 1)[0] + inode = n.nodestore.get_node(interestednodeid) + closestnodes, _, summary, aggrdelay = inode.lookup_for_hash(segH) + self.assertEqual(len(closestnodes), k) + lookuppeers = summary['connectionAttempts'] + rounds = int(lookuppeers / alpha) + if (lookuppeers % alpha) > 0: + rounds += 1 + self.assertEqual(aggrdelay, rounds * (delay*2)) + def generate_network(k, size, id, errorrate, delayrate): network = DHTNetwork(id, errorrate, delayrate) nodeids = range(0, size, 1)