Skip to content

Commit

Permalink
Merge pull request #38 from freepn/netstate-refactor
Browse files Browse the repository at this point in the history
Net state refactoring and network closure, some new helper funcs and unit-test updates, bump version for packaging.
  • Loading branch information
sarnold authored Jun 10, 2020
2 parents 5342ca2 + 71abd4f commit 7b1f473
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 29 deletions.
113 changes: 111 additions & 2 deletions node_tools/async_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,71 @@ async def bootstrap_mbr_node(client, ctlr_id, node_id, deque, ex=False):
# logger.debug('TRIE: id_trie has items: {}'.format(ct.id_trie.items()))


async def close_mbr_net(client, node_lst, boot_lst, min_nodes=5):
"""
Wrapper for closing the bootstrap chain or adding it to an existing
(closed) network. This should run in the ctlr state runner *after*
the other handlers.
:param client: ztcli_api client object
:param node_lst: list of all active nodes
:param boot_lst: list of bootstrap nodes
:param min_nodes: minimum number of nodes for a closed network
"""
from node_tools import ctlr_data as ct
from node_tools import state_data as st

from node_tools.ctlr_funcs import unset_network_cfg
from node_tools.network_funcs import publish_cfg_msg
from node_tools.trie_funcs import cleanup_state_tries
from node_tools.trie_funcs import find_dangling_nets
from node_tools.trie_funcs import find_exit_net
from node_tools.trie_funcs import get_neighbor_ids
from node_tools.trie_funcs import get_target_node_id

head_id = boot_lst[-1]
tail_id = boot_lst[0]
head_exit_net = find_exit_net(ct.id_trie)[0]
head_src_net, _, _, _ = get_neighbor_ids(ct.net_trie, head_id)
tail_exit_net = find_dangling_nets(ct.id_trie)[0]
deauth = unset_network_cfg()

# if true, we only have a boot list
if len(node_lst) - len(boot_lst) == 1:
# check if we have enough nodes for a network
if len(boot_lst) >= min_nodes:
logger.debug('CLOSURE: creating network from boot_list {}'.format(boot_lst))
# detach and connect head to tail
await config_network_object(client, deauth, head_exit_net, head_id)
cleanup_state_tries(ct.net_trie, ct.id_trie, head_exit_net, head_id, mbr_only=True)
logger.debug('CLOSURE: deauthed node id {} from exit net {}'.format(head_id, head_exit_net))

await connect_mbr_node(client, head_id, head_src_net, tail_exit_net, tail_id)
publish_cfg_msg(ct.id_trie, head_id, addr='127.0.0.1')
else:
logger.debug('CLOSURE: not enough bootstrap nodes to wrap')
else:
logger.debug('CLOSURE: adding bootstrap list {} to network'.format(boot_lst))
tgt_id = get_target_node_id(node_lst, boot_lst)
tgt_net, tgt_exit_net, tgt_src_node, tgt_exit_node = get_neighbor_ids(ct.net_trie, tgt_id)
tgt_src_net, _, _, _ = get_neighbor_ids(ct.net_trie, tgt_src_node)

# detach and connect tgt to tail
await config_network_object(client, deauth, tgt_exit_net, tgt_id)
cleanup_state_tries(ct.net_trie, ct.id_trie, tgt_exit_net, tgt_id, mbr_only=True)
logger.debug('CLOSURE: deauthed node id {} from tgt exit net {}'.format(tgt_id, tgt_exit_net))

await connect_mbr_node(client, tgt_id, tgt_src_net, tail_exit_net, tail_id)
publish_cfg_msg(ct.id_trie, tgt_id, addr='127.0.0.1')

# detach and connect head to tgt exit net
await config_network_object(client, deauth, head_exit_net, head_id)
cleanup_state_tries(ct.net_trie, ct.id_trie, head_exit_net, head_id, mbr_only=True)
logger.debug('CLOSURE: deauthed node id {} from head exit net {}'.format(head_id, head_exit_net))

await connect_mbr_node(client, head_id, head_src_net, tgt_exit_net, tgt_exit_node)
publish_cfg_msg(ct.id_trie, head_id, addr='127.0.0.1')


async def connect_mbr_node(client, node_id, src_net, exit_net, gw_node):
"""
Wrapper to reconnect an existing member node; needs the (upstream)
Expand Down Expand Up @@ -164,7 +229,7 @@ async def offline_mbr_node(client, node_id):
else:
await config_network_object(client, deauth, exit_net, node_id)
cleanup_state_tries(ct.net_trie, ct.id_trie, exit_net, node_id, mbr_only=True)
logger.debug('OFFLINE: deauthed node id {} from exit net'.format(node_id))
logger.debug('OFFLINE: deauthed node id {} from exit net {}'.format(node_id, exit_net))
await delete_network_object(client, node_net)
cleanup_state_tries(ct.net_trie, ct.id_trie, node_net, node_id)
logger.debug('OFFLINE: removed network id {} and node {}'.format(node_net, node_id))
Expand All @@ -189,10 +254,11 @@ async def update_state_tries(client, net_trie, id_trie):
logger.debug('{} networks found'.format(len(client.data)))
net_list = client.data
for net_id in net_list:
mbr_list = []
# get details about each network and update trie data
await get_network_object_data(client, net_id)
net_trie[net_id] = client.data
load_id_trie(net_trie, id_trie, [net_id], [], nw=True)
# load_id_trie(net_trie, id_trie, [net_id], [], nw=True)
await get_network_object_ids(client, net_id)
logger.debug('network {} has {} possible member(s)'.format(net_id, len(client.data)))
member_dict = client.data
Expand All @@ -203,9 +269,52 @@ async def update_state_tries(client, net_trie, id_trie):
logger.debug('adding member: {}'.format(mbr_id))
net_trie[net_id + mbr_id] = client.data
load_id_trie(net_trie, id_trie, [], [mbr_id])
mbr_list.append(mbr_id)
load_id_trie(net_trie, id_trie, [net_id], mbr_list, nw=True)
logger.debug('member key suffixes: {}'.format(net_trie.suffixes(net_id)))


async def unwrap_mbr_net(client, node_lst, boot_lst, min_nodes=5):
"""
Wrapper for unwrapping the (closed) network when it gets too small.
This should run in the ctlr state runner *after* the other handlers
(and when there are not enough nodes to keep a closed network).
:param client: ztcli_api client object
:param node_lst: list of all active nodes
:param boot_lst: list of bootstrap nodes
:param min_nodes: minimum number of nodes for a closed network
"""
from node_tools import ctlr_data as ct
from node_tools import state_data as st

from node_tools.ctlr_funcs import unset_network_cfg
from node_tools.network_funcs import publish_cfg_msg
from node_tools.trie_funcs import cleanup_state_tries
from node_tools.trie_funcs import find_dangling_nets
from node_tools.trie_funcs import get_neighbor_ids
from node_tools.trie_funcs import get_target_node_id

if len(node_lst) <= min_nodes and len(node_lst) > 1:
logger.debug('UNWRAP: creating bootstrap list from network {}'.format(node_lst))
tgt_id = get_target_node_id(node_lst, boot_lst)
tgt_net, tgt_exit_net, _, _ = get_neighbor_ids(ct.net_trie, tgt_id)
# tgt_src_net, _, _, _ = get_neighbor_ids(ct.net_trie, tgt_src_node)
data_list = find_dangling_nets(ct.id_trie)
exit_net = data_list[0]
exit_node = data_list[1]
deauth = unset_network_cfg()

# detach and connect tgt node back to exit node
await config_network_object(client, deauth, tgt_exit_net, tgt_id)
cleanup_state_tries(ct.net_trie, ct.id_trie, tgt_exit_net, tgt_id, mbr_only=True)
logger.debug('UNWRAP: deauthed node id {} from tgt exit net {}'.format(tgt_id, tgt_exit_net))

await connect_mbr_node(client, tgt_id, tgt_net, exit_net, exit_node)
publish_cfg_msg(ct.id_trie, tgt_id, addr='127.0.0.1')
else:
logger.debug('UNWRAP: num nodes greater than {} so not unwrapping'.format(min_nodes))


async def add_network_object(client, net_id=None, mbr_id=None, ctlr_id=None):
"""
Command wrapper for creating ZT objects under the `controller` endpoint.
Expand Down
4 changes: 2 additions & 2 deletions node_tools/ctlr_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def handle_net_cfg(deque):
derived from async wrapper funcs. Context is netstate runner and
bootstrap_mbr_node.
:param deque: netobj queue
:return tuple: formatted cfg fragments
:return: tuple of formatted cfg fragments
"""

ipnet = deque.popleft()
Expand Down Expand Up @@ -194,7 +194,7 @@ def unset_network_cfg():
"""
Create a config fragment to unset (remove) the IP address and
deauthorize the node.
:return dict: formatted cfg fragment for async payload
:return: <dict> formatted cfg fragment for async payload
"""

src_addr = {
Expand Down
26 changes: 21 additions & 5 deletions node_tools/netstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@

from node_tools import ctlr_data as ct

from node_tools.async_funcs import add_network_object
from node_tools.async_funcs import bootstrap_mbr_node
from node_tools.async_funcs import get_network_object_data
from node_tools.async_funcs import get_network_object_ids
from node_tools.async_funcs import close_mbr_net
from node_tools.async_funcs import offline_mbr_node
from node_tools.async_funcs import unwrap_mbr_net
from node_tools.async_funcs import update_state_tries
from node_tools.cache_funcs import find_keys
from node_tools.cache_funcs import handle_node_status
from node_tools.ctlr_funcs import is_exit_node
from node_tools.helper_funcs import AttrDict
Expand All @@ -28,6 +26,9 @@
from node_tools.msg_queues import handle_node_queues
from node_tools.msg_queues import handle_wedged_nodes
from node_tools.network_funcs import publish_cfg_msg
from node_tools.trie_funcs import get_active_nodes
from node_tools.trie_funcs import get_bootstrap_list


logger = logging.getLogger('netstate')

Expand All @@ -39,7 +40,7 @@ async def main():
client = ZeroTier(ZT_API, loop, session)

try:
# start with handling offline nodes
# handle offline/wedged nodes
handle_wedged_nodes(ct.net_trie, wdg_q, off_q)
pre_off = list(off_q)
logger.debug('{} nodes in offline queue: {}'.format(len(pre_off), pre_off))
Expand Down Expand Up @@ -85,6 +86,20 @@ async def main():
logger.debug('{} nodes in staging queue: {}'.format(len(staging_q),
list(staging_q)))

# refresh ctlr state tries again
await update_state_tries(client, ct.net_trie, ct.id_trie)

node_list = get_active_nodes(ct.id_trie)
logger.debug('{} nodes in node_list: {}'.format(len(node_list), node_list))
if len(node_list) > 0:
boot_list = get_bootstrap_list(ct.net_trie, ct.id_trie)
logger.debug('{} nodes in boot_list: {}'.format(len(boot_list), boot_list))

if len(boot_list) != 0:
await close_mbr_net(client, node_list, boot_list, min_nodes=3)
elif len(boot_list) == 0 and len(node_list) > 1:
await unwrap_mbr_net(client, node_list, boot_list, min_nodes=3)

except Exception as exc:
logger.error('netstate exception was: {}'.format(exc))
raise exc
Expand All @@ -96,5 +111,6 @@ async def main():
netobj_q = dc.Deque(directory=get_cachedir('netobj_queue'))
staging_q = dc.Deque(directory=get_cachedir('staging_queue'))
wdg_q = dc.Deque(directory=get_cachedir('wedge_queue'))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
2 changes: 1 addition & 1 deletion node_tools/nodestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def main():

# check for reconfiguration events
for net in netStatus:
if net['status'] == 'NOT_FOUND':
if net['status'] == 'NOT_FOUND' or net['status'] == 'ACCESS_DENIED':
run_ztcli_cmd(action='leave', extra=net['identity'])
net_id_handler(None, net['identity'], old=True)
nsState.cfg_ref = None
Expand Down
66 changes: 50 additions & 16 deletions node_tools/trie_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def check_trie_params(nw_id, node_id, needs):
"""Check load/update trie params for correctness"""

for param in [nw_id, node_id, needs]:
if not type(param) is list:
if not isinstance(param, list):
raise AssertionError('Invalid trie parameter: {}'.format(param))
for param in [nw_id, node_id]:
if not len(param) < 3:
Expand All @@ -57,8 +57,9 @@ def check_trie_params(nw_id, node_id, needs):

def cleanup_state_tries(net_trie, id_trie, nw_id, node_id, mbr_only=False):
"""
Cleanup offlined nets and mbr nodes from state data tries. This needs
to run from offline_mbr_node to cleanup stale trie data.
Delete offlined mbr nodes/nets from state data tries. This needs to
run whenever nodes are disconnected or removed, in order to cleanup
stale trie data.
:param net_trie: net state trie object
:param id_trie: ID state trie object
:param nw_id: network ID str
Expand All @@ -74,6 +75,8 @@ def cleanup_state_tries(net_trie, id_trie, nw_id, node_id, mbr_only=False):
for key in net_trie.keys(nw_id):
del net_trie[key]
del id_trie[nw_id]
if node_id in id_trie:
del id_trie[node_id]


def find_dangling_nets(trie):
Expand All @@ -93,26 +96,40 @@ def find_dangling_nets(trie):
return net_list


def get_active_nodes(trie):
def find_exit_net(trie):
"""
Find the network attached to the exit node (search the ID trie).
:param trie: ID state trie
:return: network ID for the (only) network on the exit node
"""
net_list = []

for node in [x for x in list(trie) if len(x) == 10]:
if trie[node][1] == [False, False] and len(trie[node][0]) == 1:
net_list = trie[node][0]
return net_list


def get_active_nodes(id_trie):
"""
Find all the currently active nodes (search the ID trie).
:notes: In this case the answer depends on when this runs (relative
to the `netstate` runner).
:param trie: ID state trie
to the cmds in `netstate` runner).
:param id_trie: ID state trie
:return: list of node IDs (empty list if None)
"""
node_list = []

for node in [x for x in list(trie) if len(x) == 10]:
for node in [x for x in list(id_trie) if len(x) == 10]:
node_list.append(node)
return node_list


def get_bootstrap_list(net_trie, id_trie):
"""
Find all the nodes in the bootstrap chain (search the net trie).
:notes: We start counting from the first node attached to the exit
node.
:notes: We start counting from the last node in the bootstrap
chain.
:param trie: net data trie
:param trie: ID state trie
:return: list of node IDs (empty list if None)
Expand All @@ -121,14 +138,16 @@ def get_bootstrap_list(net_trie, id_trie):

node_list = []
next_node = None
prev_node = get_exit_node_id()
exit_node = get_exit_node_id()
dangle_list = find_dangling_nets(id_trie)
last_node = dangle_list[1]
prev_node = last_node

while next_node != last_node:
_, _, next_node, _ = get_neighbor_ids(net_trie, prev_node)
node_list.append(next_node)
prev_node = next_node
if last_node != exit_node:
while next_node != exit_node:
node_list.append(prev_node)
_, _, _, next_node = get_neighbor_ids(net_trie, prev_node)
prev_node = next_node

return node_list

Expand Down Expand Up @@ -178,7 +197,7 @@ def get_neighbor_ids(trie, node_id):
src_node = None
exit_node = None

for key in trie.keys():
for key in trie:
if node_id in key:
key_list.append(key[0:16])
node_list.append(trie[key])
Expand All @@ -201,10 +220,25 @@ def get_neighbor_ids(trie, node_id):
for node in trie.suffixes(exit_net)[1:]:
if node_id != node:
exit_node = node

return src_net, exit_net, src_node, exit_node


def get_target_node_id(node_lst, boot_lst):
"""
Return a target node ID from the active network to use as an
insertion point for all the nodes in the bootstrap list.
:notes: choice of tgt node is random; this may change
:param trie: net data trie
:param node_lst: list of all active nodes
:param boot_lst: list of bootstrap nodes
:return: <str> node ID
"""
import random
from node_tools.ctlr_funcs import is_exit_node

return random.choice([x for x in node_lst if x not in boot_lst and not is_exit_node(x)])


def get_wedged_node_id(trie, node_id):
"""
Get the node ID of a wedged node, where wedged is defined by the
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from distutils.core import setup


__version__ = '0.8.6'
__version__ = '0.8.7'

# make setuptools happy with PEP 440-compliant post version
# (enable this for patch releases)
Expand Down
Loading

0 comments on commit 7b1f473

Please sign in to comment.