Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/make alpha more realistic #18

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 76 additions & 38 deletions dht/dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def lookup_for_hash(self, key: Hash):
'targetKey': key,
'startTime': time.time(),
'connectionAttempts': 0,
'connectionFinished': 0,
'successfulCons': 0,
'failedCons': 0,
'aggrDelay': 0,
}

def has_closer_nodes(prev, new):
Expand All @@ -65,54 +65,77 @@ def has_closer_nodes(prev, new):
closestnodes = self.rt.get_closest_nodes_to(key)
nodestotry = closestnodes.copy()
triednodes = deque()
alpha_results = deque()
alpha_delays = deque()
for _ in range(self.alpha):
alpha_delays.append(0)
lookupvalue = "" # TODO: hardcoded to string
stepscnt = 0

while (stepscnt < self.lookupsteptostop) and (len(nodestotry) > 0):
# ask queued nodes to try
nodes = nodestotry.copy()
aggregateddelay = 0
concurrency = 0

for node in nodes:
nodestotry.pop(node) # remove item from peers to attempt
if node in triednodes: # make sure we don't contact the same node twice
continue
triednodes.append(node)
lookupsummary['connectionAttempts'] += 1

try:
connection, conndelay = self.network.connect_to_node(self.ID, node)
newnodes, val, ok, closestdelay = connection.get_closest_nodes_to(key)
if ok:
lookupvalue = val
operationdelay = conndelay + closestdelay
if operationdelay > aggregateddelay:
aggregateddelay = operationdelay
lookupsummary['successfulCons'] += 1
if has_closer_nodes(closestnodes, newnodes):
newnodes, val, _, closestdelay = connection.get_closest_nodes_to(key)
# we only want to aggregate the difference between the base + conn delay - the already aggregated one
# this allows to simulate de delay of a proper scheduler
operationdelay = (conndelay + closestdelay)
if len(alpha_results) < self.alpha:
alpha_results.append((operationdelay, newnodes, val))
alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0]))
else:
print("huge error here")

except ConnectionError as e:
errortype, errordelay = e.get_delay()
alpha_results.append((errordelay, (), ""))
alpha_results = deque(sorted(alpha_results, key=lambda pair: pair[0]))

# check if the concurrency array is full
# if so aggregate the delay and the nodes to the total and empty the slot in the deque
if len(alpha_results) >= self.alpha:
# 1. Append the aggragated delay of the last node connection (suc or failed) to the smaller aggregated
# alpha delay (mimicking a scheduler)
# 2. The max value on the alpha delays will determine the aggrDelay of the lookup
mindelayedlookup = alpha_results.popleft()
minaggrdelayidx = alpha_delays.index(min(alpha_delays))
alpha_delays[minaggrdelayidx] += mindelayedlookup[0]

if mindelayedlookup[2] != "":
lookupvalue = mindelayedlookup[2]

lookupsummary['connectionFinished'] += 1
if len(mindelayedlookup[1]) > 0:
lookupsummary['successfulCons'] += 1
else:
lookupsummary['failedCons'] += 1

if has_closer_nodes(closestnodes, mindelayedlookup[1]):
stepscnt = 0
closestnodes.update(newnodes)
nodestotry.update(newnodes)
nodestotry = OrderedDict(sorted(nodestotry.items(), key= lambda item: item[1]))
else:
else:
stepscnt += 1
except ConnectionError:
# count the delay of the connection attempt
# TODO: take into account fast vs slow errors (https://github.com/cortze/py-dht/issues/14)
if self.network.delayrange is not None:
connErrorDelay = random.sample(self.network.delayrange, 1)[0]
if connErrorDelay > aggregateddelay:
aggregateddelay = connErrorDelay
lookupsummary['failedCons'] += 1
stepscnt += 1
concurrency += 1
if concurrency >= self.alpha or stepscnt >= self.lookupsteptostop:
lookupsummary['aggrDelay'] += aggregateddelay

# even if there is any closest one, update the list as more in between might have come
closestnodes.update(mindelayedlookup[1])
nodestotry.update(mindelayedlookup[1])
nodestotry = OrderedDict(sorted(nodestotry.items(), key=lambda item: item[1]))
break

if stepscnt >= self.lookupsteptostop:
break

# finish with the summary
lookupsummary.update({
'finishTime': time.time(),
'totalNodes': len(closestnodes),
'aggrDelay': max(alpha_delays),
'value': lookupvalue,
})
# limit the output to beta number of nodes
Expand Down Expand Up @@ -204,14 +227,21 @@ def len(self):

class ConnectionError(Exception):
""" custom connection error exection to notify an errored connection """
def __init__(self, nodeid: int, error, time):
def __init__(self, nodeid: int, error, time, d):
self.erroredNode = nodeid
self.error = error
self.errorTime = time
self.delay = 0 # ms
if (error == "fast error") and (d is not None):
self.delay = random.sample(d, 1)[0]
elif (error == "slow error") and (d is not None):
self.delay = d

def description(self) -> str:
return f"unable to connect node {self.erroredNode}. {self.error}"

def get_delay(self):
return self.error, self.delay

class Connection():
""" connection simbolizes the interaction that 2 DHTClients could have with eachother """
Expand Down Expand Up @@ -241,11 +271,13 @@ class DHTNetwork:
""" serves a the shared point between all the nodes participating in the simulation,
allows node to communicat with eachother without needing to implement an API or similar"""

def __init__(self, networkid: int, errorrate: int, delayrange):
def __init__(self, networkid: int, fasterrorrate: int, slowerrorrate: int, fastdelayrange, slowdelay):
""" class initializer, it allows to define the networkID and the delays between nodes """
self.networkid = networkid
self.errorrate = errorrate # %
self.delayrange = delayrange # list() in ms -> i.e., (5, 100) ms | None
self.fasterrorrate = fasterrorrate # %
self.slowerrorrate = slowerrorrate # %
self.fastdelayrange = fastdelayrange # list() in ms -> i.e., (5, 100) ms | None
self.slowdelay = slowdelay # timeot delay
self.nodestore = NodeStore()
self.errortracker = deque() # every time that an error is tracked, add it to the queue
self.connectiontracker = deque() # every time that a connection was stablished
Expand Down Expand Up @@ -308,11 +340,15 @@ def connect_to_node(self, ognode: int, targetnode: int):
self.connectioncnt += 1
try:
# check the error rate (avoid stablishing the connection if there is an error)
if random.randint(0, 99) < self.errorrate:
connerror = ConnectionError(targetnode, "simulated error", time.time())
if random.randint(0, 99) < self.fasterrorrate:
connerror = ConnectionError(targetnode, "fast error", time.time(), self.fastdelayrange)
self.errortracker.append(connerror)
raise connerror
connection = Connection(self.connectioncnt, ognode, self.nodestore.get_node(targetnode), self.delayrange)
if random.randint(0, 99) < self.slowerrorrate:
connerror = ConnectionError(targetnode, "slow error", time.time(), self.slowdelay)
self.errortracker.append(connerror)
raise connerror
connection = Connection(self.connectioncnt, ognode, self.nodestore.get_node(targetnode), self.fastdelayrange)
self.connectiontracker.append({
'time': time.time(),
'from': ognode,
Expand All @@ -321,10 +357,12 @@ def connect_to_node(self, ognode: int, targetnode: int):
return connection, connection.delay

except NodeNotInStoreError as e:
connerror = ConnectionError(e.missingNode, e.description, e.time)
connerror = ConnectionError(e.missingNode, e.description, e.time, 0)
self.errortracker.append({
'time': connerror.errorTime,
'error': connerror.description})
'from': ognode,
'error': connerror.description,
'delay': 0})
raise connerror

def bootstrap_node(self, nodeid: int, bucketsize: int): # ( accuracy: int = 100 )
Expand Down
Loading