Skip to content

Commit

Permalink
draft of direct messaging between net_nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakers-the-rat committed May 25, 2021
1 parent d5e7fd4 commit b000504
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 115 deletions.
15 changes: 15 additions & 0 deletions autopilot/networking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@
the :class:`~.pilot.Pilot` or :class:`~.terminal.Terminal`.
* :class:`.Net_Node` is a pop-in networking class that can be given to any other object that
wants to send or receive messages.
The :class:`~autopilot.networking.Message` object is used to serialize and pass
messages. When sent, messages are ``JSON`` serialized (with some special magic
to compress/encode numpy arrays) and sent as ``zmq`` multipart messages.
Each serialized message, when sent, can have ``n`` frames of the format::
[hop_0, hop_1, ... hop_n, final_recipient, serialized_message]
Or, messages can have multiple "hops" (a typical message will have one 'hop' specified
by the ``to`` field), the second to last frame is always the final intended recipient,
and the final frame is the serialized message. Note that the ``to`` field of a
:class:`~autopilot.networking.Message` object will always be the final recipient
even if a list is passed for ``to`` when sending. This lets :class:`~.networking.Station`
objects efficiently forward messages without deserializing them at every hop.
"""


Expand Down
1 change: 0 additions & 1 deletion autopilot/networking/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class Message(object):
serialized = None

def __init__(self, msg=None, expand_arrays = False, **kwargs):
# type: (object, object) -> None
# Messages don't need to have all attributes on creation,
# but do need them to serialize
"""
Expand Down
124 changes: 74 additions & 50 deletions autopilot/networking/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,26 @@ class Net_Node(object):
These objects are intended to communicate locally, within a piece of hardware,
though not necessarily within the same process.
To minimize the complexity of the network topology, Net_Nodes
must communicate through a :class:`.Station` ROUTER, rather than
address each other directly. -- This proved to be horribly misguided and
To minimize the complexity of the network topology, the typical way to use
``Net_Node``s is through a :class:`.Station` ROUTER, rather than
addressing each other directly. Practically, this means that
all messages are sent first to the parent :class:`.networking.Station` object,
which then handles them, forwards them, etc.
This proved to be horribly misguided and
will be changed in v0.5.0 to support simplified
messaging to a ``agent_id.netnode_id`` address.
messaging to a ``agent_id.netnode_id`` address. Until then the networking modules
will be in a bit of flux.
To receive messages directly at this Net_Node, pass the ``router_port``
which will bind a ``zmq.ROUTER`` socket, and messages will be handled as regular 'listens'
Note that Net_Nodes assume that they are the final recipients of messages,
and so don't handle forwarding messages (unless a ``listen`` method explicitly
does so), and will automatically deserialize them on receipt.
.. note::
Listen methods currently receive only the ``value`` of a message, this will change in v0.5.0,
where they will receive the full message like :class:`.networking.Station` objects.
Args:
id (str): What are we known as? What do we set our :attr:`~zmq.Socket.identity` as?
Expand All @@ -38,7 +52,7 @@ class Net_Node(object):
keys match the :attr:`.Message.key`.
instance (bool): Should the node try and use the existing zmq context and tornado loop?
upstream_ip (str): If this Net_Node is being used on its own (ie. not behind a :class:`.Station`), it can directly connect to another node at this IP. Otherwise use 'localhost' to connect to a station.
route_port (int): Typically, Net_Nodes only have a single Dealer socket and receive messages from their encapsulating :class:`.Station`, but
router_port (int): Typically, Net_Nodes only have a single Dealer socket and receive messages from their encapsulating :class:`.Station`, but
if you want to take this node offroad and use it independently, an int here binds a Router to the port.
daemon (bool): Run the IOLoop thread as a ``daemon`` (default: ``True``)
Expand All @@ -61,10 +75,9 @@ class Net_Node(object):
def __init__(self, id: str, upstream: str, port: int,
listens: typing.Dict[str, typing.Callable],
instance:bool=True, upstream_ip:str='localhost',
router_port:Optional[int] = None,
daemon:bool=True, expand_on_receive:bool=True):
"""

"""
if instance:
self.context = zmq.Context.instance() # type: zmq.Context
self.loop = IOLoop.current() # type: IOLoop
Expand All @@ -84,8 +97,12 @@ def __init__(self, id: str, upstream: str, port: int,
self.listens.update(listens)

self.id = id # type: str
self.upstream = upstream # type: str
self.port = int(port) # type: int
self.upstream = upstream # type: str
self.port = int(port) # type: int
self.router_port = router_port
self.router = None # type: Optional[zmq.Socket]
self.loop_thread = None # type: Optional[threading.Thread]
self.senders = {} # type: typing.Dict[bytes, str]

# self.connected = False
self.msg_counter = count()
Expand Down Expand Up @@ -119,13 +136,20 @@ def init_networking(self):
self.sock.setsockopt_string(zmq.IDENTITY, self.id)
#self.sock.probe_router = 1

# if used locally (typical case), connect to localhost
# connect our dealer socket to "push" messages upstream
self.sock.connect('tcp://{}:{}'.format(self.upstream_ip, self.port))

# wrap in zmqstreams and start loop thread
self.sock = ZMQStream(self.sock, self.loop)
self.sock.on_recv(self.handle_listen)

# if want to directly receive messages, bind a router port
if self.router_port is not None:
self.router = self.context.socket(zmq.ROUTER)
self.router.setsockopt_string(zmq.IDENTITY, self.id)
self.router.bind('tcp://*:{}'.format(self.router_port))
self.router = ZMQStream(self.router, self.loop)
self.router.on_recv(self.handle_listen)

# the loop thread keeps the ioloop alive until the program exits
self.loop_thread = threading.Thread(target=self.threaded_loop)
if self.daemon:
self.loop_thread.daemon = True
Expand All @@ -145,7 +169,7 @@ def threaded_loop(self):
# loop already started
break

def handle_listen(self, msg):
def handle_listen(self, msg: typing.List[bytes, ...]):
"""
Upon receiving a message, call the appropriate listen method
in a new thread and send confirmation it was received.
Expand All @@ -156,14 +180,15 @@ def handle_listen(self, msg):
methods, but this might change in the future to unify the messaging system.
Args:
msg (str): JSON :meth:`.Message.serialize` d message.
msg (list): JSON :meth:`.Message.serialize` d message.
"""
# messages from dealers are single frames because we only have one connected partner
# and that's the dealer spec lol

#msg = json.loads(msg[0])
# if we have a router, check if this is a router msg and store
# the sender if so
if self.router is not None and len(msg)>=2:
if msg[0] not in self.senders.keys():
self.senders[msg[0]] = ''

#msg = Message(**msg)
# Nodes expand arrays by default as they're expected to
msg = Message(msg[-1], expand_arrays=self.expand)

Expand All @@ -173,28 +198,9 @@ def handle_listen(self, msg):
self.logger.error('Message failed to validate:\n{}'.format(str(msg)))
return


# if msg.key == 'CONFIRM':
# if msg.value in self.outbox.keys():
# del self.outbox[msg.value]
#
# # stop a timer thread if we have it
# if msg.value in self.timers.keys():
# self.timers[msg.value].cancel()
# del self.timers[msg.value]
#
# self.logger.info('CONFIRMED MESSAGE {}'.format(msg.value))
# else:
# Log and spawn thread to respond to listen

if isinstance(msg.to, list):
if len(msg.to) == 1:
msg.to = msg.to[0]

if isinstance(msg.to, list):
# not to us, just keep it going
_ = msg.to.pop(0)
self.send(msg=msg, repeat=False)
# unnest any list if it was a multihop message
if isinstance(msg.to, list) and len(msg.to) == 1:
msg.to = msg.to[0]

try:
listen_funk = self.listens[msg.key]
Expand All @@ -208,7 +214,7 @@ def handle_listen(self, msg):
except Exception as e:
self.logger.exception(e)

self.logger.error('MSG ID {} - No listen function found for key: {}'.format(msg.id, msg.key))
self.logger.exception('MSG ID {} - No listen function found for key: {}'.format(msg.id, msg.key))

if (msg.key != "CONFIRM") and ('NOREPEAT' not in msg.flags.keys()) :
# send confirmation
Expand All @@ -222,18 +228,27 @@ def handle_listen(self, msg):
self.logger.debug('RECEIVED: {}'.format(str(msg)))


def send(self, to:Union[str, list]=None,
def send(self, to: Optional[Union[str, list]] = None,
key:str=None,
value:typing.Any=None,
msg:Optional['Message']=None,
repeat:bool=True, flags = None, force_to:bool = False):
"""
Send a message via our :attr:`~.Net_Node.sock` , DEALER socket.
`to` is not required. Every message
is always sent to :attr:`~.Net_Node.upstream` . `to` can be included
to send a message further up the network tree to a networking object
we're not directly connected to.
`to` is not required.
* If the node doesn't have a router, (or the recipient is not
in the :attr:`Net_Node.senders` dict ) every message
is always sent to :attr:`~.Net_Node.upstream` . `to` can be included
to send a message further up the network tree to a networking object
we're not directly connected to.
* If the node has a router, since messages can only be sent on router
sockets after the recipient has first sent us a message, if the
``to`` is in the :attr:`~.Net_Node.senders` dict, it will be
directly sent via :attr:`.Net_Node.router`
* If the ``force_to`` arg is ``True``, send to the ``to`` recipient directly
via the dealer :attr:`.Net_Node.sock`
Either an already created :class:`.Message` should be passed as `msg`,
or at least `key` must be provided for a new message created
Expand Down Expand Up @@ -269,7 +284,7 @@ def send(self, to:Union[str, list]=None,

# differentiate between a single 'to' and a list (ie. a multihop message)
# in this case, 'recipient' is encoded in the message as the final node to
# send to, and the rest of 'to' is encoded as parts in a multipart message
# send to, and the rest of 'to' is encoded as parts in a multipart message.
if isinstance(to, list):
recipient = to[-1]
else:
Expand All @@ -292,17 +307,26 @@ def send(self, to:Union[str, list]=None,
multipart = [bytes(hop, encoding='utf-8') for hop in to]
multipart.append(recipient)
multipart.append(msg_enc)

else:
# the first frame will be added below if needed...
multipart = [recipient.encode('utf-8'), msg_enc]

if self.router is not None and multipart[0] in self.senders.keys():
self.router.send_multipart(multipart)
elif force_to:
multipart.insert(0, to.encode('utf-8'))
self.sock.send_multipart(multipart)
else:
self.sock.send_multipart([to.encode('utf-8'), recipient.encode('utf-8'), msg_enc])
multipart.insert(0, self.upstream.encode('utf-8'))
self.sock.send_multipart(multipart)

if self.logger and log_this:
self.logger.debug("MESSAGE SENT - {}".format(str(msg)))

if repeat and not msg.key == "CONFIRM":
# add to outbox and spawn timer to resend
self.outbox[msg.id] = (time.time(), msg)
# self.timers[msg.id] = threading.Timer(5.0, self.repeat, args=(msg.id,))
# self.timers[msg.id].start()

def repeat(self):
"""
Expand Down
Loading

0 comments on commit b000504

Please sign in to comment.