Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Introduced deferred_from_future as workaround for bug in Twisted
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Jul 25, 2019
1 parent 8b8b6d4 commit 28c07cf
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
31 changes: 31 additions & 0 deletions golem/core/deferred.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from queue import Queue, Empty
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -61,3 +62,33 @@ def sync_wait(deferred: defer.Deferred,
def call_later(delay: int, fn, *args, **kwargs) -> None:
from twisted.internet import reactor
deferLater(reactor, delay, fn, *args, **kwargs)


def deferred_from_future(future: asyncio.Future) -> defer.Deferred:
# FIXME: Remove when https://github.com/twisted/twisted/pull/1169 is merged
def adapt(result):
try:
try:
extracted = result.result()
except asyncio.CancelledError:
raise defer.CancelledError()
except: # pylint: disable=too-broad-except
extracted = Failure()
adapt.actual.callback(extracted)

future_cancel = object()

def cancel(reself):
future.cancel()
reself.callback(future_cancel)
deferred = defer.Deferred(cancel)
adapt.actual = deferred

def uncancel(result):
if result is future_cancel:
adapt.actual = defer.Deferred()
return adapt.actual
return result
deferred.addCallback(uncancel)
future.add_done_callback(adapt)
return deferred
84 changes: 83 additions & 1 deletion tests/golem/core/test_deferred.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import asyncio
import unittest
from unittest import mock

from twisted.internet import defer
from twisted.internet.defer import Deferred, succeed, fail
from twisted.python.failure import Failure
from twisted.trial.unittest import TestCase as TwistedTestCase

from golem.core.deferred import chain_function, DeferredSeq
from golem.core.common import install_reactor
from golem.core.deferred import (
chain_function,
DeferredSeq,
deferred_from_future
)
from golem.tools.testwithreactor import uninstall_reactor


@mock.patch('golem.core.deferred.deferToThread', lambda x: succeed(x()))
Expand Down Expand Up @@ -86,3 +95,76 @@ def test_fn_errback(self):
assert result.called
assert result.result
assert isinstance(result.result, Failure)


class TestDeferredFromFuture(TwistedTestCase):

@classmethod
def setUpClass(cls) -> None:
install_reactor()

@classmethod
def tearDownClass(cls) -> None:
uninstall_reactor()

@defer.inlineCallbacks
def test_result(self):
future = asyncio.Future()
future.set_result(1)
deferred = deferred_from_future(future)
result = yield deferred
self.assertEqual(result, 1)

@defer.inlineCallbacks
def test_exception(self):
future = asyncio.Future()
future.set_exception(ValueError())
deferred = deferred_from_future(future)
with self.assertRaises(ValueError):
yield deferred

@defer.inlineCallbacks
def test_deferred_cancelled(self):
future = asyncio.Future()
deferred = deferred_from_future(future)
deferred.cancel()
with self.assertRaises(defer.CancelledError):
yield deferred

@defer.inlineCallbacks
def test_future_cancelled(self):
future = asyncio.Future()
deferred = deferred_from_future(future)
future.cancel()
with self.assertRaises(defer.CancelledError):
yield deferred

@defer.inlineCallbacks
def test_timed_out(self):
from twisted.internet import reactor
coroutine = asyncio.sleep(3)
future = asyncio.ensure_future(coroutine)
deferred = deferred_from_future(future)
deferred.addTimeout(1, reactor)
with self.assertRaises(defer.TimeoutError):
yield deferred

@defer.inlineCallbacks
def test_deferred_with_timeout_cancelled(self):
from twisted.internet import reactor
future = asyncio.Future()
deferred = deferred_from_future(future)
deferred.addTimeout(1, reactor)
deferred.cancel()
with self.assertRaises(defer.CancelledError):
yield deferred

@defer.inlineCallbacks
def test_future_with_timeout_cancelled(self):
from twisted.internet import reactor
future = asyncio.Future()
deferred = deferred_from_future(future)
deferred.addTimeout(1, reactor)
future.cancel()
with self.assertRaises(defer.CancelledError):
yield deferred

0 comments on commit 28c07cf

Please sign in to comment.