Skip to content

Commit

Permalink
work on component restarting and module reloading
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Oberstein committed May 21, 2014
1 parent 79ab070 commit 21910b0
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 35 deletions.
127 changes: 96 additions & 31 deletions crossbar/crossbar/worker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,26 @@


import os
import sys
import importlib
import pkg_resources
import traceback
import StringIO
from datetime import datetime

try:
reload
except NameError:
# Python 3
from imp import reload

from twisted.internet import reactor
from twisted import internet
from twisted.python import log
from twisted.internet.defer import Deferred, \
DeferredList, \
inlineCallbacks
inlineCallbacks, \
returnValue

from autobahn.util import utcnow, utcstr
from autobahn.wamp.exception import ApplicationError
Expand Down Expand Up @@ -112,7 +123,9 @@ def onJoin(self, details):
procs = [
'start_component',
'stop_component',
'get_components'
'get_components',
#'reload_component',
'restart_component'
]

for proc in procs:
Expand All @@ -125,7 +138,7 @@ def onJoin(self, details):



def start_component(self, config, details = None):
def start_component(self, config, reload_module = False, details = None):
"""
Starts a Class or WAMPlet in this component container.
Expand All @@ -142,43 +155,58 @@ def start_component(self, config, details = None):
raise ApplicationError('crossbar.error.invalid_configuration', emsg)


module = None

## 1) create WAMP application component factory
##
if config['type'] == 'wamplet':

package = config['package']
entrypoint = config['entrypoint']

try:
dist = config['dist']
name = config['entry']
## create_component() is supposed to make instances of ApplicationSession later
##
create_component = pkg_resources.load_entry_point(package, 'autobahn.twisted.wamplet', entrypoint)

except Exception as e:
tb = traceback.format_exc()
emsg = 'ERROR: failed to import WAMPlet {}.{} ("{}")'.format(package, entrypoint, e)
log.msg(emsg)
raise ApplicationError("crossbar.error.cannot_import", emsg, tb)

else:
if self.debug:
log.msg("Starting WAMPlet '{}/{}' in realm '{}' ..".format(dist, name, config['router']['realm']))
log.msg("Creating component from WAMPlet {}.{}".format(package, entrypoint))

## make is supposed to make instances of ApplicationSession
make = pkg_resources.load_entry_point(dist, 'autobahn.twisted.wamplet', name)

except Exception as e:
log.msg("Failed to import class - {}".format(e))
raise ApplicationError("crossbar.error.class_import_failed", str(e))

elif config['type'] == 'class':

try:
klassname = config['name']
qualified_classname = config['classname']

if self.debug:
log.msg("Worker {}: starting class '{}' in realm '{}' ..".format(self.config.extra.id, klassname, config['router']['realm']))
try:
c = qualified_classname.split('.')
module_name, class_name = '.'.join(c[:-1]), c[-1]
module = importlib.import_module(module_name)

import importlib
c = klassname.split('.')
mod, kls = '.'.join(c[:-1]), c[-1]
app = importlib.import_module(mod)
## http://stackoverflow.com/questions/437589/how-do-i-unload-reload-a-python-module
##
if reload_module:
reload(module)

## make is supposed to be of class ApplicationSession
make = getattr(app, kls)
## create_component() is supposed to make instances of ApplicationSession later
##
create_component = getattr(module, class_name)

except Exception as e:
log.msg("Worker {}: failed to import class - {}".format(e))
raise ApplicationError("crossbar.error.class_import_failed", str(e))
tb = traceback.format_exc()
emsg = 'ERROR: failed to import class {} ("{}")'.format(klassname, e)
log.msg(emsg)
raise ApplicationError("crossbar.error.cannot_import", emsg, tb)

else:
if self.debug:
log.msg("Creating component from class {}".format(klassname))

else:
## should not arrive here, since we did `check_container_component()`
Expand All @@ -188,15 +216,17 @@ def start_component(self, config, details = None):
## WAMP application session factory
##
def create_session():
cfg = ComponentConfig(realm = config['router']['realm'], extra = config.get('extra', None))
c = make(cfg)
cfg = ComponentConfig(realm = config['router']['realm'],
extra = config.get('extra', None))
c = create_component(cfg)
return c


## 2) create WAMP transport factory
##
transport_config = config['router']['transport']
transport_debug = transport_config.get('debug', False)
transport_debug_wamp = transport_config.get('debug_wamp', False)


## WAMP-over-WebSocket transport
Expand All @@ -205,13 +235,17 @@ def create_session():

## create a WAMP-over-WebSocket transport client factory
##
transport_factory = CrossbarWampWebSocketClientFactory(create_session, transport_config['url'], debug = transport_debug, debug_wamp = transport_debug)
transport_factory = CrossbarWampWebSocketClientFactory(create_session,
transport_config['url'],
debug = transport_debug,
debug_wamp = transport_debug_wamp)

## WAMP-over-RawSocket transport
##
elif transport_config['type'] == 'rawsocket':

transport_factory = CrossbarWampRawSocketClientFactory(create_session, transport_config)
transport_factory = CrossbarWampRawSocketClientFactory(create_session,
transport_config)

else:
## should not arrive here, since we did `check_container_component()`
Expand All @@ -220,7 +254,9 @@ def create_session():

## 3) create and connect client endpoint
##
endpoint = create_connecting_endpoint_from_config(transport_config['endpoint'], self.config.extra.cbdir, reactor)
endpoint = create_connecting_endpoint_from_config(transport_config['endpoint'],
self.config.extra.cbdir,
reactor)


## now connect the client
Expand All @@ -229,7 +265,9 @@ def create_session():

def success(proto):
self.component_id += 1
self.components[self.component_id] = ContainerComponent(self.component_id, config, proto, None)
self.components[self.component_id] = ContainerComponent(self.component_id,
config, proto, None)
self.components[self.component_id].module = module

## publish event "on_component_start" to all but the caller
##
Expand All @@ -255,6 +293,20 @@ def error(err):



@inlineCallbacks
def restart_component(self, id, reload_module = False, details = None):
"""
"""
if id not in self.components:
raise ApplicationError('crossbar.error.no_such_object', 'no component with ID {} running in this container'.format(id))

config = self.components[id].config
stopped = yield self.stop_component(id, details = details)
started = yield self.start_component(config, reload_module = reload_module, details = details)
returnValue({'stopped': stopped, 'started': started})



def stop_component(self, id, details = None):
"""
Stop a component currently running within this container.
Expand All @@ -265,16 +317,29 @@ def stop_component(self, id, details = None):
if id not in self.components:
raise ApplicationError('crossbar.error.no_such_object', 'no component with ID {} running in this container'.format(id))

now = datetime.utcnow()

## FIXME: should we session.leave() first and only close the transport then?
## This gives the app component a better hook to do any cleanup.
self.components[id].proto.close()

c = self.components[id]
event = {
'id': id,
'started': utcstr(c.started),
'stopped': utcstr(now),
'uptime': (now - c.started).total_seconds()
}

## publish event "on_component_stop" to all but the caller
##
topic = 'crossbar.node.{}.process.{}.container.on_component_stop'.format(self.config.extra.node, self.config.extra.id)
event = {'id': id}
self.publish(topic, event, options = PublishOptions(exclude = [details.caller]))

del self.components[id]

return event



def get_components(self, details = None):
Expand Down
79 changes: 79 additions & 0 deletions crossbar/crossbar/worker/reloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
###############################################################################
##
## Copyright (C) 2014 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################

from __future__ import absolute_import

__all__ = ['TrackingModuleReloader']

import sys

try:
reload
except NameError:
# Python 3
from imp import reload


class TrackingModuleReloader:
"""
A tracking module reloader.
This will track modules loaded _after_ a snapshot (point in time), and
later allow to force reload of exactly those modules that have been (first)
loaded after that point in time.
"""

def __init__(self, silence = False):
"""
Ctor.
:param silence: Disable any log messages.
:type silence: bool
"""
self._silence = silence
self.snapshot()


def snapshot(self):
"""
Establish a snapshot - that is, remember which modules are currently
loaded. Later, when reload() is called, only modules imported later
will be (forcefully) reloaded.
"""
self._old_modules = sys.modules.copy()


def reload(self):
"""
Trigger reloading of all modules imported later than the last snapshot
established.
:returns int -- Number of modules reloaded.
"""
current_modules = sys.modules
maybe_dirty_modules = set(current_modules.keys()) - set(self._old_modules.keys())
if len(maybe_dirty_modules):
if not self._silence:
print("Reloading {} possibly changed modules".format(len(maybe_dirty_modules)))
for module in maybe_dirty_modules:
print("Reloading module {}".format(module))
reload(current_modules[module])
else:
if not self._silence:
print("No modules to reload")
return len(maybe_dirty_modules)
4 changes: 4 additions & 0 deletions crossbar/foobar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from moo import mega

def getfoo():
return 234, mega.test()
19 changes: 17 additions & 2 deletions crossbar/timeservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@

from autobahn.twisted.wamp import ApplicationSession

FOO = 25
#dfdfg

def foo():
return FOO

from foobar import getfoo


## WAMP application component with our app code.
Expand All @@ -35,13 +42,21 @@ def onJoin(self, details):
##
def utcnow():
now = datetime.datetime.utcnow()
print FOO, foo()
#return "foo"
return getfoo()
return now.strftime("%Y-%m-%dT%H:%M:%SZ")

reg = yield self.register(utcnow, 'com.timeservice.now')
print("Procedure registered with ID {}".format(reg.id))
try:
reg = yield self.register(utcnow, 'com.timeservice.now')
except Exception as e:
self.leave()
else:
print("xx Procedure registered with ID {}".format(reg.id))


def onDisconnect(self):
print("onDisconnect")
reactor.stop()


Expand Down
Loading

0 comments on commit 21910b0

Please sign in to comment.