-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
new reactor for each test, like pytest-asyncio #71
Comments
to allow the reactor to re-run you'd need something like this: from zope.interfaces.interface import Method, Attribute
from twisted.internet import interfaces
from twisted.python.runtime import platform
def _default_reactor_factory(platform):
try:
if platform.isLinux():
try:
from twisted.internet.epollreactor import EPollReactor as Reactor
except ImportError:
from twisted.internet.pollreactor import PollReactor as Reactor
elif platform.getType() == "posix" and not platform.isMacOSX():
from twisted.internet.pollreactor import PollReactor as Reactor
else:
from twisted.internet.selectreactor import SelectReactor as Reactor
except ImportError:
from twisted.internet.selectreactor import SelectReactor as Reactor
return Reactor
class ReactorProxy(object):
def __init__(self, reactor_factory=_default_reactor_factory(platform)):
self.__reactor = None
self.__reactor_factory = reactor_factory
def __get_reactor(self):
if self.__reactor is not None:
return self.__reactor
self.__reactor = self.__reactor_factory()
return self.__reactor
def reset_reactor(self):
if self.__reactor is None:
return
if self.__reactor.running:
raise Exception("cannot reset running reactor")
self.__reactor = None
@classmethod
def setup(cls):
def instrument(name):
def do(self, *args, **kwargs):
return getattr(self.__get_reactor(), name)(*args, **kwargs)
return do
def makeprop(name):
def set_(self, attr):
setattr(self.__get_reactor(), name, attr)
def get(self):
return getattr(self.__get_reactor(), name)
return property(get, set_)
for iname, interface in vars(interfaces).items():
if not iname.startswith("IReactor"):
continue
for attr_name, attr_type in interface.namesAndDescriptions(True):
if isinstance(attr_name, Method):
setattr(cls, attr_name, instrument(attr_name))
elif isinstance(attr_name, Attribute): # Method inherits Attribute
setattr(cls, attr_name, makeprop(attr_name))
reset_reactor = ReactorProxy.reset_reactor
del ReactorProxy.reset_reactor
_setup_reactor_proxy = ReactorProxy.setup
del ReactorProxy.setup
_setup_reactor_proxy()
def install(**kwargs):
p = ReactorProxy(**kwargs)
from twisted.internet.main import installReactor
installReactor(p)
def run_until_complete(fn, *args, **kwargs):
from twisted.internet import reactor, defer
if not isinstance(reactor, ReactorProxy):
raise Exception("Wrong reactor installed")
if reactor.running:
raise Exception("run_until_complete is not reentrant")
class ExtractResult(object):
def __init__(self):
self.result = None
@defer.inlineCallbacks
def go(self):
try:
self.result = (None, (yield fn(*args, **kwargs)))
except Exception as e:
self.result = (e, None)
finally:
reactor.stop()
er = ExtractResult()
reactor.callWhenRunning(er.go)
try:
reactor.run()
finally:
reset_reactor(reactor)
e, r = er.result
if e is not None:
raise e
return r |
I've got a new version of this util now: https://gist.github.com/graingert/6dbde2a302c669e75c6fd57c1bc35944#file-twisted-py I use it for my test suite and it took a 22 minute build to ~15 minutes because we were filling our one global eventloop with all sorts of readers/writers/LoopingCalls/threads also when I hit ctrl+c it stops the test suite because the reactor isn't configured to swallow all the signals |
No description provided.
The text was updated successfully, but these errors were encountered: