Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions da/freeze.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import types
import weakref
from copyreg import dispatch_table
import traceback
import logging

__all__ = ['frozendict', 'frozenlist', 'deepfreeze']

Expand All @@ -25,6 +27,8 @@ class frozendict(dict):
"""

def _blocked_attribute(obj):
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

raise AttributeError("A frozendict cannot be modified.")
_blocked_attribute = property(_blocked_attribute)

Expand All @@ -37,12 +41,14 @@ def __new__(cls, *args, **kws):
return new

def __init__(self, *args, **kws):
pass
self.log = logging.getLogger(__name__) \
.getChild(self.__class__.__name__)

def __hash__(self):
try:
return self._cached_hash
except AttributeError:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
h = self._cached_hash = hash(tuple(sorted(self.items())))
return h

Expand All @@ -54,6 +60,7 @@ def _build_set_keyvalue_(self, key, val):
if not hasattr(self, '_cached_hash'):
return super().__setitem__(key, val)
else:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
raise AttributeError("Attempting to update frozendict after "
"hash value has been read.")

Expand All @@ -66,6 +73,7 @@ class frozenlist(list):
"""

def _blocked_attribute(obj):
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
raise AttributeError("A frozenlist cannot be modified.")
_blocked_attribute = property(_blocked_attribute)

Expand All @@ -78,13 +86,15 @@ def __new__(cls, *args, **kws):
return new

def __init__(self, *args, **kws):
pass
self.log = logging.getLogger(__name__) \
.getChild(self.__class__.__name__)

def __hash__(self):
try:
return self._cached_hash
except AttributeError:
h = self._cached_hash = hash(tuple(sorted(self)))
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
return h

def __repr__(self):
Expand All @@ -95,10 +105,13 @@ def _build_add_elem_(self, elem):
if not hasattr(self, '_cached_hash'):
return super().append(elem)
else:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
raise AttributeError("Attempting to modify frozenlist after "
"hash value has been read.")


log = logging.getLogger(__name__)

def deepfreeze(x, memo=None, _nil=[]):
"""Deep freeze operation on arbitrary Python objects.

Expand All @@ -122,7 +135,8 @@ def deepfreeze(x, memo=None, _nil=[]):
else:
try:
issc = issubclass(cls, type)
except TypeError: # cls is not a class (old Boost; see SF #502085)
except TypeError as e: # cls is not a class (old Boost; see SF #502085)
log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
issc = 0
if issc:
y = _deepfreeze_atomic(x, memo)
Expand All @@ -143,6 +157,7 @@ def deepfreeze(x, memo=None, _nil=[]):
if reductor:
rv = reductor()
else:

raise Error(
"un(deep)copyable object of type %s" % cls)
if isinstance(rv, str):
Expand Down Expand Up @@ -209,7 +224,8 @@ def _deepfreeze_tuple(x, memo, deepfreeze=deepfreeze):
# check for it, in case the tuple contains recursive mutable structures.
try:
return memo[id(x)]
except KeyError:
except KeyError as e:
log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
pass
for k, j in zip(x, y):
if k is not j:
Expand Down Expand Up @@ -247,8 +263,10 @@ def _keep_alive(x, memo):
"""
try:
memo[id(memo)].append(x)
except KeyError:
except KeyError as e:
# aha, this is the first one :-)
log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

memo[id(memo)]=[x]

def _reconstruct(x, memo, func, args,
Expand Down
51 changes: 49 additions & 2 deletions da/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import collections
import multiprocessing
import os.path

import traceback
from . import common, pattern
from .common import (builtin, internal, name_split_host, name_split_node,
ProcessId, get_runtime_option,
Expand Down Expand Up @@ -686,9 +686,13 @@ def __process_jobqueue(self, label=None):
handler, args = self.__jobq.popleft()
except IndexError:
self._log.debug("Job item stolen by another thread.")
print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

break
except ValueError:
self._log.error("Corrupted job item!")
print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

continue

if ((handler._labels is None or label in handler._labels) and
Expand All @@ -701,6 +705,9 @@ def __process_jobqueue(self, label=None):
self._log.error(
"%r when calling handler '%s' with '%s': %s",
e, handler.__name__, args, e)
print("exception in handler {} of replicaID {} ".format(handler,args['source']))
print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

else:
if self._keep_unmatched:
dbgmsg = "Skipping (%s, %r) due to label constraint."
Expand Down Expand Up @@ -867,8 +874,12 @@ def _cmd_Setup(self, src, args):
self.__setup_called = True
self._log.debug("`setup` complete.")
except Exception as e:
print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self._log.error("Exception during setup(%r): %r", args, e)

self._log.debug("%r", e, exc_info=1)

res = False
if hasattr(sys.stdout, 'flush'):
sys.stdout.flush()
Expand Down Expand Up @@ -1206,11 +1217,15 @@ def bootstrap_node(self, hostname, port, timeout=None):
hostname, port, transport)
self.bootstrap_peer = None
except AuthenticationException as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

# Abort immediately:
raise e
except (CircularRoutingException, TransportException) as e:
self.log.debug("Bootstrap attempt to %s:%d with %s failed "
": %r", hostname, port, transport, e)
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

if self.bootstrap_peer is None:
raise BootstrapException("Unable to contact a peer node.")

Expand Down Expand Up @@ -1250,6 +1265,8 @@ def run(self):
self.mesgloop(until=(lambda: not self.running))
except Exception as e:
self.log.debug("Unhandled exception: %r.", e, exc_info=1)
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self.terminate_local_processes()

def stop(self):
Expand Down Expand Up @@ -1296,6 +1313,7 @@ def _replay(self, targetpid):
try:
return queue._out_loader.load()
except EOFError as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
raise TraceEndedException("No more items in send trace.") from e

def _send_remote(self, src, dest, mesg, flags=0, transport=None, **params):
Expand Down Expand Up @@ -1326,18 +1344,24 @@ def _send_remote(self, src, dest, mesg, flags=0, transport=None, **params):
payload = (src, dest, mesg)
wrapper = common.BufferIOWrapper(self.local.buf)
try:
pickle.dump(payload, wrapper)
tmp = pickle.dumps(mesg)
pickle.dump(payload, wrapper,2)
except TypeError as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
raise InvalidMessageException("Error pickling {}.".format(payload)) \
from e
except OSError as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
raise MessageTooBigException(
"** Outgoing message object too big to fit in buffer, dropped.")
self.log.debug("** Forwarding %r(%d bytes) to %s with flags=%d using %s.",
mesg, wrapper.fptr, dest, flags, transport)


with memoryview(self.local.buf)[0:wrapper.fptr] as chunk:
transport.send(chunk, dest.address_for_transport(transport),
**params)
self.local.buf=None

def _dispatch(self, src, dest, payload, params=dict(), flags=0):
if dest in self.local_procs:
Expand All @@ -1355,6 +1379,8 @@ def _dispatch(self, src, dest, payload, params=dict(), flags=0):
queue.append((src, payload))
return True
except Exception as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self.log.warning("Failed to deliver to local process %s: %r",
dest, e)
return False
Expand All @@ -1371,9 +1397,12 @@ def _dispatch(self, src, dest, payload, params=dict(), flags=0):
except CircularRoutingException as e:
# This is most likely due to stale process ids, so don't log
# error, just debug:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self.log.debug("Caught %r.", e)
return False
except Exception as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
self.log.error("Could not send message due to: %r", e)
self.log.debug("Send failed: ", exc_info=1)
return False
Expand All @@ -1384,6 +1413,8 @@ def _dispatch(self, src, dest, payload, params=dict(), flags=0):
self._dispatch_table[cmd.value](src, args)
return True
except Exception as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self.log.warning(
"Caught exception while processing router message from "
"%s(%r): %r", src, payload, e)
Expand All @@ -1410,6 +1441,8 @@ def mesgloop(self, until, timeout=None):
except common.QueueEmpty:
pass
except (ImportError, ValueError, pickle.UnpicklingError) as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self.log.warning(
"Dropped invalid message from %s through %s: %r",
remote, transport, e)
Expand Down Expand Up @@ -1451,6 +1484,8 @@ def __init__(self, process_class, transport_manager,
self.seqno = cmd_seqno
self._trace_in_fd = None
self._trace_out_fd = None
self.log = logging.getLogger(__name__) \
.getChild(self.__class__.__name__)
if len(process_name) > 0:
self.name = process_name
self.transport_manager = transport_manager
Expand All @@ -1465,6 +1500,8 @@ def __init__(self, process_class, transport_manager,
try:
self._init_replay(replay_file)
except (Exception, TraceException) as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self.cleanup()
raise e
else:
Expand All @@ -1483,6 +1520,8 @@ def _init_replay(self, filename):
try:
os.stat(sndname)
except OSError as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

raise TraceMismatchException(
'Missing corresponding send trace file {!r} for {!r}!'
.format(sndname, tracename)
Expand Down Expand Up @@ -1549,6 +1588,8 @@ def _spawn_process_spawn(self, pcls, name, parent, props, seqno=None,
cid = None
except Exception as e:
cid = None
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self._log.error("Failed to create instance (%s) of %s: %r",
name, pcls, e)
if p is not None and p.is_alive():
Expand Down Expand Up @@ -1584,6 +1625,7 @@ def _spawn_process_fork(self, pcls, name, parent, props, seqno=None,
cid = None
except Exception as e:
cid = None
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
self._log.error("Failed to create instance (%s) of %s: %r",
name, pcls, e)
if p is not None and p.is_alive():
Expand Down Expand Up @@ -1616,6 +1658,8 @@ def _spawn_thread(self, pcls, name, parent, props, seqno=None, daemon=False):
cid = None
except Exception as e:
cid = None
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self._log.error("Failed to create instance (%s) of %s: %r",
name, pcls, e)
return cid
Expand Down Expand Up @@ -1693,9 +1737,12 @@ def run(self):
self._log.debug("Caught %r, exiting gracefully.", e)
return e.exit_code
except RoutingException as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

self._log.debug("Caught %r.", e)
return 2
except TraceException as e:
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
self._log.error("%r occurred.", e)
self._log.debug(e, exc_info=1)
return 3
Expand Down
8 changes: 7 additions & 1 deletion da/transport/mesgloop.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import socket
import selectors
import threading
import traceback

__all__ = ["SelectorLoop"]

Expand All @@ -48,6 +49,8 @@ def __init__(self, selectorcls=selectors.DefaultSelector):
self.notifier, self.event = None, None
# Background thread:
self.worker = None
self.log = logging.getLogger(__name__) \
.getChild(self.__class__.__name__)

def _handle_event(self, sock, _):
# Just drain the event socket buffer:
Expand Down Expand Up @@ -75,6 +78,7 @@ def register(self, conn, callback, data=None):
# trigger any cleanup routines from the caller
self._log.debug("Registering invalid connection %s: %r",
conn, e, exc_info=1)
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
callback(conn, data)

def deregister(self, conn):
Expand All @@ -89,7 +93,7 @@ def notify(self):
if self.notifier:
try:
self.notifier.send(b'x')
except (AttributeError, OSError):
except (AttributeError, OSError) as e:
# socket already closed, just ignore
pass

Expand Down Expand Up @@ -135,6 +139,8 @@ def run(self):
except Exception as e:
self._log.error("Message loop terminated abnormally: %r", e)
self._log.debug("Uncaught exception %r", e, exc_info=1)
self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))

finally:
if self.notifier:
try:
Expand Down
Loading