diff --git a/da/freeze.py b/da/freeze.py index 66abf55..46f5c5f 100644 --- a/da/freeze.py +++ b/da/freeze.py @@ -13,6 +13,8 @@ import types import weakref from copyreg import dispatch_table +import traceback +import logging __all__ = ['frozendict', 'frozenlist', 'deepfreeze'] @@ -25,6 +27,8 @@ class frozendict(dict): """ def _blocked_attribute(obj): + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + raise AttributeError("A frozendict cannot be modified.") _blocked_attribute = property(_blocked_attribute) @@ -37,12 +41,14 @@ def __new__(cls, *args, **kws): return new def __init__(self, *args, **kws): - pass + self.log = logging.getLogger(__name__) \ + .getChild(self.__class__.__name__) def __hash__(self): try: return self._cached_hash except AttributeError: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) h = self._cached_hash = hash(tuple(sorted(self.items()))) return h @@ -54,6 +60,7 @@ def _build_set_keyvalue_(self, key, val): if not hasattr(self, '_cached_hash'): return super().__setitem__(key, val) else: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise AttributeError("Attempting to update frozendict after " "hash value has been read.") @@ -66,6 +73,7 @@ class frozenlist(list): """ def _blocked_attribute(obj): + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise AttributeError("A frozenlist cannot be modified.") _blocked_attribute = property(_blocked_attribute) @@ -78,13 +86,15 @@ def __new__(cls, *args, **kws): return new def __init__(self, *args, **kws): - pass + self.log = logging.getLogger(__name__) \ + .getChild(self.__class__.__name__) def __hash__(self): try: return self._cached_hash except AttributeError: h = self._cached_hash = hash(tuple(sorted(self))) + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) return h def __repr__(self): @@ -95,10 +105,13 @@ def _build_add_elem_(self, elem): if not hasattr(self, '_cached_hash'): return super().append(elem) else: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise AttributeError("Attempting to modify frozenlist after " "hash value has been read.") +log = logging.getLogger(__name__) + def deepfreeze(x, memo=None, _nil=[]): """Deep freeze operation on arbitrary Python objects. @@ -122,7 +135,8 @@ def deepfreeze(x, memo=None, _nil=[]): else: try: issc = issubclass(cls, type) - except TypeError: # cls is not a class (old Boost; see SF #502085) + except TypeError as e: # cls is not a class (old Boost; see SF #502085) + log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) issc = 0 if issc: y = _deepfreeze_atomic(x, memo) @@ -143,6 +157,7 @@ def deepfreeze(x, memo=None, _nil=[]): if reductor: rv = reductor() else: + raise Error( "un(deep)copyable object of type %s" % cls) if isinstance(rv, str): @@ -209,7 +224,8 @@ def _deepfreeze_tuple(x, memo, deepfreeze=deepfreeze): # check for it, in case the tuple contains recursive mutable structures. try: return memo[id(x)] - except KeyError: + except KeyError as e: + log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) pass for k, j in zip(x, y): if k is not j: @@ -247,8 +263,10 @@ def _keep_alive(x, memo): """ try: memo[id(memo)].append(x) - except KeyError: + except KeyError as e: # aha, this is the first one :-) + log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + memo[id(memo)]=[x] def _reconstruct(x, memo, func, args, diff --git a/da/sim.py b/da/sim.py index 8c93f1d..e7b6c49 100644 --- a/da/sim.py +++ b/da/sim.py @@ -36,7 +36,7 @@ import collections import multiprocessing import os.path - +import traceback from . import common, pattern from .common import (builtin, internal, name_split_host, name_split_node, ProcessId, get_runtime_option, @@ -686,9 +686,13 @@ def __process_jobqueue(self, label=None): handler, args = self.__jobq.popleft() except IndexError: self._log.debug("Job item stolen by another thread.") + print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + break except ValueError: self._log.error("Corrupted job item!") + print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + continue if ((handler._labels is None or label in handler._labels) and @@ -701,6 +705,9 @@ def __process_jobqueue(self, label=None): self._log.error( "%r when calling handler '%s' with '%s': %s", e, handler.__name__, args, e) + print("exception in handler {} of replicaID {} ".format(handler,args['source'])) + print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + else: if self._keep_unmatched: dbgmsg = "Skipping (%s, %r) due to label constraint." @@ -867,8 +874,12 @@ def _cmd_Setup(self, src, args): self.__setup_called = True self._log.debug("`setup` complete.") except Exception as e: + print(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self._log.error("Exception during setup(%r): %r", args, e) + self._log.debug("%r", e, exc_info=1) + res = False if hasattr(sys.stdout, 'flush'): sys.stdout.flush() @@ -1206,11 +1217,15 @@ def bootstrap_node(self, hostname, port, timeout=None): hostname, port, transport) self.bootstrap_peer = None except AuthenticationException as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + # Abort immediately: raise e except (CircularRoutingException, TransportException) as e: self.log.debug("Bootstrap attempt to %s:%d with %s failed " ": %r", hostname, port, transport, e) + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + if self.bootstrap_peer is None: raise BootstrapException("Unable to contact a peer node.") @@ -1250,6 +1265,8 @@ def run(self): self.mesgloop(until=(lambda: not self.running)) except Exception as e: self.log.debug("Unhandled exception: %r.", e, exc_info=1) + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self.terminate_local_processes() def stop(self): @@ -1296,6 +1313,7 @@ def _replay(self, targetpid): try: return queue._out_loader.load() except EOFError as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise TraceEndedException("No more items in send trace.") from e def _send_remote(self, src, dest, mesg, flags=0, transport=None, **params): @@ -1326,18 +1344,24 @@ def _send_remote(self, src, dest, mesg, flags=0, transport=None, **params): payload = (src, dest, mesg) wrapper = common.BufferIOWrapper(self.local.buf) try: - pickle.dump(payload, wrapper) + tmp = pickle.dumps(mesg) + pickle.dump(payload, wrapper,2) except TypeError as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise InvalidMessageException("Error pickling {}.".format(payload)) \ from e except OSError as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise MessageTooBigException( "** Outgoing message object too big to fit in buffer, dropped.") self.log.debug("** Forwarding %r(%d bytes) to %s with flags=%d using %s.", mesg, wrapper.fptr, dest, flags, transport) + + with memoryview(self.local.buf)[0:wrapper.fptr] as chunk: transport.send(chunk, dest.address_for_transport(transport), **params) + self.local.buf=None def _dispatch(self, src, dest, payload, params=dict(), flags=0): if dest in self.local_procs: @@ -1355,6 +1379,8 @@ def _dispatch(self, src, dest, payload, params=dict(), flags=0): queue.append((src, payload)) return True except Exception as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self.log.warning("Failed to deliver to local process %s: %r", dest, e) return False @@ -1371,9 +1397,12 @@ def _dispatch(self, src, dest, payload, params=dict(), flags=0): except CircularRoutingException as e: # This is most likely due to stale process ids, so don't log # error, just debug: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self.log.debug("Caught %r.", e) return False except Exception as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) self.log.error("Could not send message due to: %r", e) self.log.debug("Send failed: ", exc_info=1) return False @@ -1384,6 +1413,8 @@ def _dispatch(self, src, dest, payload, params=dict(), flags=0): self._dispatch_table[cmd.value](src, args) return True except Exception as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self.log.warning( "Caught exception while processing router message from " "%s(%r): %r", src, payload, e) @@ -1410,6 +1441,8 @@ def mesgloop(self, until, timeout=None): except common.QueueEmpty: pass except (ImportError, ValueError, pickle.UnpicklingError) as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self.log.warning( "Dropped invalid message from %s through %s: %r", remote, transport, e) @@ -1451,6 +1484,8 @@ def __init__(self, process_class, transport_manager, self.seqno = cmd_seqno self._trace_in_fd = None self._trace_out_fd = None + self.log = logging.getLogger(__name__) \ + .getChild(self.__class__.__name__) if len(process_name) > 0: self.name = process_name self.transport_manager = transport_manager @@ -1465,6 +1500,8 @@ def __init__(self, process_class, transport_manager, try: self._init_replay(replay_file) except (Exception, TraceException) as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self.cleanup() raise e else: @@ -1483,6 +1520,8 @@ def _init_replay(self, filename): try: os.stat(sndname) except OSError as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + raise TraceMismatchException( 'Missing corresponding send trace file {!r} for {!r}!' .format(sndname, tracename) @@ -1549,6 +1588,8 @@ def _spawn_process_spawn(self, pcls, name, parent, props, seqno=None, cid = None except Exception as e: cid = None + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self._log.error("Failed to create instance (%s) of %s: %r", name, pcls, e) if p is not None and p.is_alive(): @@ -1584,6 +1625,7 @@ def _spawn_process_fork(self, pcls, name, parent, props, seqno=None, cid = None except Exception as e: cid = None + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) self._log.error("Failed to create instance (%s) of %s: %r", name, pcls, e) if p is not None and p.is_alive(): @@ -1616,6 +1658,8 @@ def _spawn_thread(self, pcls, name, parent, props, seqno=None, daemon=False): cid = None except Exception as e: cid = None + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self._log.error("Failed to create instance (%s) of %s: %r", name, pcls, e) return cid @@ -1693,9 +1737,12 @@ def run(self): self._log.debug("Caught %r, exiting gracefully.", e) return e.exit_code except RoutingException as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self._log.debug("Caught %r.", e) return 2 except TraceException as e: + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) self._log.error("%r occurred.", e) self._log.debug(e, exc_info=1) return 3 diff --git a/da/transport/mesgloop.py b/da/transport/mesgloop.py index 1b58c63..b33ff31 100644 --- a/da/transport/mesgloop.py +++ b/da/transport/mesgloop.py @@ -26,6 +26,7 @@ import socket import selectors import threading +import traceback __all__ = ["SelectorLoop"] @@ -48,6 +49,8 @@ def __init__(self, selectorcls=selectors.DefaultSelector): self.notifier, self.event = None, None # Background thread: self.worker = None + self.log = logging.getLogger(__name__) \ + .getChild(self.__class__.__name__) def _handle_event(self, sock, _): # Just drain the event socket buffer: @@ -75,6 +78,7 @@ def register(self, conn, callback, data=None): # trigger any cleanup routines from the caller self._log.debug("Registering invalid connection %s: %r", conn, e, exc_info=1) + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) callback(conn, data) def deregister(self, conn): @@ -89,7 +93,7 @@ def notify(self): if self.notifier: try: self.notifier.send(b'x') - except (AttributeError, OSError): + except (AttributeError, OSError) as e: # socket already closed, just ignore pass @@ -135,6 +139,8 @@ def run(self): except Exception as e: self._log.error("Message loop terminated abnormally: %r", e) self._log.debug("Uncaught exception %r", e, exc_info=1) + self.log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + finally: if self.notifier: try: diff --git a/da/transport/sock.py b/da/transport/sock.py index 77fcca7..8551f32 100755 --- a/da/transport/sock.py +++ b/da/transport/sock.py @@ -28,7 +28,7 @@ import socket import logging import threading - +import traceback from .base import * from .manager import transport from .mesgloop import SelectorLoop @@ -107,6 +107,7 @@ def initialize(self, port=None, strict=False, linear=False, self.port = random.randint(min_port, max_port) retry += 1 else: + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise BindingException( "Failed to bind to an available port.") from e self._log.debug("Transport initialized at address %s", address) @@ -196,6 +197,8 @@ def initialize(self, strict=False, pipe=None, **params): if self.conn is not None: self.conn.close() self.conn = None + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + raise e def start(self, queue, mesgloop=None): @@ -268,6 +271,8 @@ def send(self, chunk, target, wait=0.01, retries=MAX_RETRIES, **_): # to return `EPERM` if it's sending too fast: self._log.debug("Packet to %s dropped by kernel, " "reduce send rate.", target) + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + cnt += 1 if cnt >= retries: raise TransportException("Packet blocked by OS.") from e @@ -305,10 +310,14 @@ def _recvmesg1(self, _conn, _data): self._verify_packet(chunk, remote) self.queue.append((self, chunk, remote)) except TransportException as e: + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + self._log.warning("Packet from %s dropped due to: %r", remote, e) except (socket.error, AttributeError) as e: self._log.debug("Terminating receive loop due to %r", e) + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + # TCP Implementation: @@ -376,6 +385,7 @@ def initialize(self, strict=False, pipe=None, **params): if self.conn is not None: self.conn.close() self.conn = None + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise e def start(self, queue, mesgloop=None): @@ -509,6 +519,8 @@ def _connect(self, target): return conn except TransportException as e: conn.close() + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + raise e def _cleanup(self, conn, remote): @@ -550,15 +562,19 @@ def send(self, chunk, target, retries=MAX_RETRIES, wait=0.05, return except ConnectionRefusedError as e: if (not retry_refused_connections) or retry > retries: + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) raise TransportException( 'connection refused by {}'.format(target)) from e except (socket.error, socket.timeout) as e: self._log.debug("Sending to %s failed on %dth try: %r", target, retry, e) + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) + if conn is not None: conn.close() conn = None - except: + except Exception as e: + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) self._log.debug("Sending to %s failed on %dth try: %r", target, retry, sys.exc_info()[0]) @@ -598,10 +614,12 @@ def _recvmesg_wrapper(self, conn, job): try: callback(conn, aux) except TransportException as e: + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) self._log.warning("Exception when handling %s: %r", aux.peername, e) self._cleanup(conn, aux.peername) except socket.error as e: + self._log.error(''.join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__))) self._log.debug( "socket.error when receiving from %s: %r", aux.peername, e)