From 12829fef3b34765d05c6f0b0c4a8a59bdc64e7e8 Mon Sep 17 00:00:00 2001 From: cortze Date: Fri, 11 Aug 2023 15:05:41 +0200 Subject: [PATCH 1/3] upgrade lookup to mimic a concurrency scheduler + differentiate slow errors from fast ones --- dht/dht.py | 114 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 38 deletions(-) diff --git a/dht/dht.py b/dht/dht.py index c454235..bb2ba8b 100644 --- a/dht/dht.py +++ b/dht/dht.py @@ -46,9 +46,9 @@ def lookup_for_hash(self, key: Hash): 'targetKey': key, 'startTime': time.time(), 'connectionAttempts': 0, + 'connectionFinished': 0, 'successfulCons': 0, 'failedCons': 0, - 'aggrDelay': 0, } def has_closer_nodes(prev, new): @@ -65,54 +65,77 @@ def has_closer_nodes(prev, new): closestnodes = self.rt.get_closest_nodes_to(key) nodestotry = closestnodes.copy() triednodes = deque() + alpha_results = deque() + alpha_delays = deque() + for _ in range(self.alpha): + alpha_delays.append(0) lookupvalue = "" # TODO: hardcoded to string stepscnt = 0 while (stepscnt < self.lookupsteptostop) and (len(nodestotry) > 0): - # ask queued nodes to try nodes = nodestotry.copy() - aggregateddelay = 0 - concurrency = 0 + for node in nodes: nodestotry.pop(node) # remove item from peers to attempt if node in triednodes: # make sure we don't contact the same node twice continue triednodes.append(node) lookupsummary['connectionAttempts'] += 1 + try: connection, conndelay = self.network.connect_to_node(self.ID, node) - newnodes, val, ok, closestdelay = connection.get_closest_nodes_to(key) - if ok: - lookupvalue = val - operationdelay = conndelay + closestdelay - if operationdelay > aggregateddelay: - aggregateddelay = operationdelay - lookupsummary['successfulCons'] += 1 - if has_closer_nodes(closestnodes, newnodes): + newnodes, val, _, closestdelay = connection.get_closest_nodes_to(key) + # we only want to aggregate the difference between the base + conn delay - the already aggregated one + # this allows to simulate de delay of a proper scheduler + operationdelay = (conndelay + closestdelay) + if len(alpha_results) < self.alpha: + alpha_results.append((operationdelay, newnodes, val)) + alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0])) + else: + print("huge error here") + + except ConnectionError as e: + errortype, errordelay = e.get_delay() + alpha_results.append((errordelay, (), "")) + alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0])) + + # check if the concurrency array is full + # if so aggregate the delay and the nodes to the total and empty the slot in the deque + if len(alpha_results) >= self.alpha: + # 1. Append the aggragated delay of the last node connection (suc or failed) to the smaller aggregated + # alpha delay (mimicking a scheduler) + # 2. The max value on the alpha delays will determine the aggrDelay of the lookup + mindelayedlookup = alpha_results.popleft() + minaggrdelayidx = alpha_delays.index(min(alpha_delays)) + alpha_delays[minaggrdelayidx] += mindelayedlookup[0] + + if mindelayedlookup[2] != "": + lookupvalue = mindelayedlookup[2] + + lookupsummary['connectionFinished'] += 1 + if len(mindelayedlookup[1]) > 0: + lookupsummary['successfulCons'] += 1 + else: + lookupsummary['failedCons'] += 1 + + if has_closer_nodes(closestnodes, mindelayedlookup[1]): stepscnt = 0 - closestnodes.update(newnodes) - nodestotry.update(newnodes) - nodestotry = OrderedDict(sorted(nodestotry.items(), key= lambda item: item[1])) - else: + else: stepscnt += 1 - except ConnectionError: - # count the delay of the connection attempt - # TODO: take into account fast vs slow errors (https://github.com/cortze/py-dht/issues/14) - if self.network.delayrange is not None: - connErrorDelay = random.sample(self.network.delayrange, 1)[0] - if connErrorDelay > aggregateddelay: - aggregateddelay = connErrorDelay - lookupsummary['failedCons'] += 1 - stepscnt += 1 - concurrency += 1 - if concurrency >= self.alpha or stepscnt >= self.lookupsteptostop: - lookupsummary['aggrDelay'] += aggregateddelay + + # even if there is any closest one, update the list as more in between might have come + closestnodes.update(mindelayedlookup[1]) + nodestotry.update(mindelayedlookup[1]) + nodestotry = OrderedDict(sorted(nodestotry.items(), key=lambda item: item[1])) + break + + if stepscnt >= self.lookupsteptostop: break - # finish with the summary lookupsummary.update({ 'finishTime': time.time(), 'totalNodes': len(closestnodes), + 'aggrDelay': max(alpha_delays), 'value': lookupvalue, }) # limit the output to beta number of nodes @@ -204,14 +227,21 @@ def len(self): class ConnectionError(Exception): """ custom connection error exection to notify an errored connection """ - def __init__(self, nodeid: int, error, time): + def __init__(self, nodeid: int, error, time, d): self.erroredNode = nodeid self.error = error self.errorTime = time + self.delay = 0 # ms + if (error == "fast error") and (d is not None): + self.delay = random.sample(d, 1)[0] + elif (error == "slow error") and (d is not None): + self.delay = d def description(self) -> str: return f"unable to connect node {self.erroredNode}. {self.error}" + def get_delay(self): + return self.error, self.delay class Connection(): """ connection simbolizes the interaction that 2 DHTClients could have with eachother """ @@ -241,11 +271,13 @@ class DHTNetwork: """ serves a the shared point between all the nodes participating in the simulation, allows node to communicat with eachother without needing to implement an API or similar""" - def __init__(self, networkid: int, errorrate: int, delayrange): + def __init__(self, networkid: int, fasterrorrate: int, slowerrorrate: int, fastdelayrange, slowdelay): """ class initializer, it allows to define the networkID and the delays between nodes """ self.networkid = networkid - self.errorrate = errorrate # % - self.delayrange = delayrange # list() in ms -> i.e., (5, 100) ms | None + self.fasterrorrate = fasterrorrate # % + self.slowerrorrate = slowerrorrate # % + self.fastdelayrange = fastdelayrange # list() in ms -> i.e., (5, 100) ms | None + self.slowdelay = slowdelay # timeot delay self.nodestore = NodeStore() self.errortracker = deque() # every time that an error is tracked, add it to the queue self.connectiontracker = deque() # every time that a connection was stablished @@ -308,11 +340,15 @@ def connect_to_node(self, ognode: int, targetnode: int): self.connectioncnt += 1 try: # check the error rate (avoid stablishing the connection if there is an error) - if random.randint(0, 99) < self.errorrate: - connerror = ConnectionError(targetnode, "simulated error", time.time()) + if random.randint(0, 99) < self.fasterrorrate: + connerror = ConnectionError(targetnode, "fast error", time.time(), self.fastdelayrange) self.errortracker.append(connerror) raise connerror - connection = Connection(self.connectioncnt, ognode, self.nodestore.get_node(targetnode), self.delayrange) + if random.randint(0, 99) < self.slowerrorrate: + connerror = ConnectionError(targetnode, "slow error", time.time(), self.slowdelay) + self.errortracker.append(connerror) + raise connerror + connection = Connection(self.connectioncnt, ognode, self.nodestore.get_node(targetnode), self.fastdelayrange) self.connectiontracker.append({ 'time': time.time(), 'from': ognode, @@ -321,10 +357,12 @@ def connect_to_node(self, ognode: int, targetnode: int): return connection, connection.delay except NodeNotInStoreError as e: - connerror = ConnectionError(e.missingNode, e.description, e.time) + connerror = ConnectionError(e.missingNode, e.description, e.time, 0) self.errortracker.append({ 'time': connerror.errorTime, - 'error': connerror.description}) + 'from': ognode, + 'error': connerror.description, + 'delay': 0}) raise connerror def bootstrap_node(self, nodeid: int, bucketsize: int): # ( accuracy: int = 100 ) From 27c7f3dc1c3c3cb5aa2abe8ca5775cfe245f52d9 Mon Sep 17 00:00:00 2001 From: cortze Date: Fri, 11 Aug 2023 15:06:04 +0200 Subject: [PATCH 2/3] update test to fit new delays + new lookup measurements --- tests/test_network.py | 230 +++++++++++++++++++++++++++++++++--------- 1 file changed, 184 insertions(+), 46 deletions(-) diff --git a/tests/test_network.py b/tests/test_network.py index c6566f9..eb3b9ae 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -14,9 +14,18 @@ def test_network(self): k = 20 size = 200 id = 0 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - network, _ = generate_network(k, size, id, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network, _ = generate_network( + k, + size, + id, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) # check total size of the network totalnodes = network.nodestore.len() @@ -47,8 +56,16 @@ def test_optimal_rt_for_dhtcli(self): nodeid = 1 steps4stop = 3 size = 100 - - network = DHTNetwork(0, 0, None) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network = DHTNetwork( + nodeid, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) classicnode = DHTClient(nodeid, network, k, a, b, steps4stop) fastnode = DHTClient(nodeid, network, k, a, b, steps4stop) @@ -68,10 +85,16 @@ def test_fast_network_initialization(self): b = k step4stop = 3 size = 1000 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - - network = DHTNetwork(0, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network = DHTNetwork( + 0, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) network.init_with_random_peers(1, size, k, a, b, step4stop) for nodeid in range(size): @@ -92,11 +115,17 @@ def test_threaded_fast_network_initialization(self): b = k step4stop = 3 size = 1000 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms threads = 2 - - network = DHTNetwork(0, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network = DHTNetwork( + 0, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) network.init_with_random_peers(threads, size, k, a, b, step4stop) for nodeid in range(size): @@ -117,11 +146,17 @@ def test_threading(self): b = k step4stop = 3 size = 1000 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms threads = 4 - - network = DHTNetwork(0, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network = DHTNetwork( + 0, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) start = time.time() _ = network.init_with_random_peers(threads, size, k, a, b, step4stop) print(f'{size} nodes in {time.time() - start} - {threads} cores') @@ -130,9 +165,18 @@ def test_network_initialization(self): """ test that the routing tables for each nodeID are correctly initialized """ k = 2 size = 200 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - network, nodes = generate_network(k, size, 0, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network, nodes = generate_network( + k, + size, + id, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) for node in nodes: summary = node.bootstrap() @@ -147,9 +191,19 @@ def test_dht_interop(self): k = 10 size = 500 id = 0 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - _, nodes = generate_network(k, size, id, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + _, nodes = generate_network( + k, + size, + id, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) + for node in nodes: node.bootstrap() @@ -176,15 +230,64 @@ def test_dht_interop(self): for i, node in enumerate(closestnodes): self.assertEqual((node in validationclosestnodes), True) + def test_dht_interop_with_alpha(self): + """ test if the nodes in the network actually route to the closest peer, and implicidly, if the DHTclient interface works """ + k = 10 + size = 500 + netid = 0 + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + n = DHTNetwork( + netid, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) + nodes = n.init_with_random_peers(1, size, k, 3, k, 3) + + randomsegment = "this is a simple segment of code" + segH = Hash(randomsegment) + # use random node as lookup point + randomid = random.sample(range(1, size), 1)[0] + rnode = n.nodestore.get_node(randomid) + self.assertNotEqual(rnode.network.len(), 0) + + closestnodes, val, summary, _ = rnode.lookup_for_hash(key=segH) + self.assertEqual(val, "") # empty val, nothing stored yet + self.assertEqual(len(closestnodes), k) + # print(f"lookup operation with {size} nodes done in {summary['finishTime'] - summary['startTime']}") + + # validation of the lookup closestnodes vs the actual closestnodes in the network + validationclosestnodes = {} + for nodeid in nodes: + node = n.nodestore.get_node(nodeid) + nodeH = Hash(node.ID) + dist = nodeH.xor_to_hash(segH) + validationclosestnodes[node.ID] = dist + + validationclosestnodes = dict(sorted(validationclosestnodes.items(), key=lambda item: item[1])[:k]) + for i, node in enumerate(closestnodes): + self.assertEqual((node in validationclosestnodes), True) + def test_dht_interop_with_fast_init(self): """ test if the nodes in the network actually route to the closest peer, and implicidly, if the DHTclient interface works """ k = 10 size = 500 i = 0 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - n = DHTNetwork(i, errorrate, delayrange) - _ = n.init_with_random_peers(4, size, k, 1, k, 3) + jobs = 4 + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + n = DHTNetwork( + i, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) + _ = n.init_with_random_peers(jobs, size, k, 1, k, 3) randomsegment = "this is a simple segment of code" segH = Hash(randomsegment) @@ -214,9 +317,18 @@ def test_dht_error_rate_on_connection(self): k = 1 size = 2 id = 0 - errorrate = 50 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - network, nodes = generate_network(k, size, id, errorrate, delayrange) + fasterrorrate = 20 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + network, nodes = generate_network( + k, + size, + id, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) for node in nodes: node.bootstrap() @@ -231,7 +343,7 @@ def test_dht_error_rate_on_connection(self): except ConnectionError as e: failedcnt += 1 - expected = iterations / (100/errorrate) + expected = iterations / (100/fasterrorrate) allowedvar = iterations / (100/variance) self.assertGreater(failedcnt, expected - allowedvar) self.assertLess(failedcnt, expected + allowedvar) @@ -241,9 +353,18 @@ def test_dht_provide_and_lookup(self): k = 10 size = 500 id = 0 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) - delayrange = None # ms - _, nodes = generate_network(k, size, id, errorrate, delayrange) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 + fastdelayrange = None # ms + slowdelayrange = None + _, nodes = generate_network( + k, + size, + id, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) for node in nodes: node.bootstrap() @@ -268,11 +389,21 @@ def test_aggregated_delays(self): k = 10 size = 500 id = 0 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 maxDelay = 101 minDelay = 10 + delayrange = range(minDelay, maxDelay, 10) # ms - _, nodes = generate_network(k, size, id, errorrate, delayrange) + slowdelay = None + _, nodes = generate_network( + k, + size, + id, + fasterrorrate, + slowerrorrate, + delayrange, + slowdelay) for node in nodes: node.bootstrap() @@ -308,20 +439,22 @@ def test_aggregated_delays(self): def test_aggregated_delays_and_alpha(self): """ test if the interaction between the nodes in the network actually generate a compounded delay """ - size = 500 + size = 1000 i = 0 - k = 10 + k = 5 jobs = 2 alpha = 3 beta = k stepstostop = 3 - errorrate = 0 # apply an error rate of 0 (to check if the logic pases) + fasterrorrate = 0 # apply an error rate of 0 (to check if the logic pases) + slowerrorrate = 0 delay = 50 # ms - delayrange = [delay, delay] # ms + fastdelayrange = [delay, delay] # ms + slowdelayrate = None # init the network - n = DHTNetwork(i, errorrate, delayrange) + n = DHTNetwork(i, fasterrorrate, slowerrorrate, fastdelayrange, slowdelayrate) _ = n.init_with_random_peers(jobs, size, k, alpha, beta, stepstostop) # use random node as lookup point @@ -336,14 +469,19 @@ def test_aggregated_delays_and_alpha(self): inode = n.nodestore.get_node(interestednodeid) closestnodes, _, summary, aggrdelay = inode.lookup_for_hash(segH) self.assertEqual(len(closestnodes), k) - lookuppeers = summary['connectionAttempts'] - rounds = int(lookuppeers / alpha) - if (lookuppeers % alpha) > 0: + rounds = summary['connectionFinished'] / alpha + if (summary['connectionFinished'] % alpha) > 0: rounds += 1 self.assertEqual(aggrdelay, rounds * (delay*2)) -def generate_network(k, size, id, errorrate, delayrate): - network = DHTNetwork(id, errorrate, delayrate) + +def generate_network(k, size, netid, fasterrorrate, slowerrorrate, fastdelayrange, slowdelayrange): + network = DHTNetwork( + netid, + fasterrorrate, + slowerrorrate, + fastdelayrange, + slowdelayrange) nodeids = range(0, size, 1) nodes = [] for i in nodeids: From 5328cd39a74612554b66530e03b90d15b2689887 Mon Sep 17 00:00:00 2001 From: cortze Date: Fri, 11 Aug 2023 15:17:32 +0200 Subject: [PATCH 3/3] fix flaky tests that was missing for not casting to Int results --- tests/test_network.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_network.py b/tests/test_network.py index eb3b9ae..d9d0d35 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -469,7 +469,7 @@ def test_aggregated_delays_and_alpha(self): inode = n.nodestore.get_node(interestednodeid) closestnodes, _, summary, aggrdelay = inode.lookup_for_hash(segH) self.assertEqual(len(closestnodes), k) - rounds = summary['connectionFinished'] / alpha + rounds = int(summary['connectionFinished'] / alpha) if (summary['connectionFinished'] % alpha) > 0: rounds += 1 self.assertEqual(aggrdelay, rounds * (delay*2))