Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

test twisted 21.7 #10402

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions changelog.d/10402.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test twisted 21.7.
1 change: 1 addition & 0 deletions changelog.d/10446.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update type annotations to work with forthcoming Twisted 21.7.0 release.
1 change: 1 addition & 0 deletions changelog.d/10450.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update type annotations to work with forthcoming Twisted 21.7.0 release.
4 changes: 2 additions & 2 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:

def read_body_with_max_size(
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
) -> defer.Deferred:
) -> defer.Deferred[int]:
"""
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.

Expand All @@ -862,7 +862,7 @@ def read_body_with_max_size(
Returns:
A Deferred which resolves to the length of the read body.
"""
d = defer.Deferred()
d: defer.Deferred[int] = defer.Deferred()

# If the Content-Length header gives a size larger than the maximum allowed
# size, do not bother downloading the body.
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/federation/matrix_federation_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer
from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse

from synapse.crypto.context_factory import FederationPolicyForHTTPS
from synapse.http.client import BlacklistingAgentWrapper
Expand Down Expand Up @@ -116,7 +116,7 @@ def request(
uri: bytes,
headers: Optional[Headers] = None,
bodyProducer: Optional[IBodyProducer] = None,
) -> Generator[defer.Deferred, Any, defer.Deferred]:
) -> Generator[defer.Deferred, Any, IResponse]:
"""
Args:
method: HTTP method: GET/POST/etc
Expand Down
4 changes: 2 additions & 2 deletions synapse/logging/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import inspect
import logging
import threading
import types
import typing
import warnings
from typing import TYPE_CHECKING, Optional, Tuple, TypeVar, Union

Expand Down Expand Up @@ -745,7 +745,7 @@ def run_in_background(f, *args, **kwargs) -> defer.Deferred:
# by synchronous exceptions, so let's turn them into Failures.
return defer.fail()

if isinstance(res, types.CoroutineType):
if isinstance(res, typing.Coroutine):
res = defer.ensureDeferred(res)

# At this point we should have a Deferred, if not then f was a synchronous
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async def complete_sso_login_async(
@defer.inlineCallbacks
def get_state_events_in_room(
self, room_id: str, types: Iterable[Tuple[str, Optional[str]]]
) -> Generator[defer.Deferred, Any, defer.Deferred]:
) -> Generator[defer.Deferred, Any, Iterable[EventBase]]:
"""Gets current state events for the given room.

(This is exposed for compatibility with the old SpamCheckerApi. We should
Expand Down
5 changes: 3 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ def __init__(
self.last_notified_token = current_token
self.last_notified_ms = time_now_ms

with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
self.notify_deferred: ObservableDeferred[StreamToken] = ObservableDeferred(
defer.Deferred()
)

def notify(
self,
Expand Down
6 changes: 4 additions & 2 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@
"service_identity>=18.1.0",
# Twisted 18.9 introduces some logger improvements that the structured
# logger utilises
"Twisted>=18.9.0",
"treq>=15.1",
# testing twisted 21.7
"Twisted>=21.7.0rc2",
"Twisted[tls]>=21.7.0rc2",
"treq>=17.8",
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0",
"pyyaml>=3.11",
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ async def wait_for_stream_position(

# Create a new deferred that times out after N seconds, as we don't want
# to wedge here forever.
deferred = Deferred()
deferred: Deferred[None] = Deferred()
deferred = timeout_deferred(
deferred, _WAIT_FOR_REPLICATION_TIMEOUT_SECONDS, self._reactor
)
Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ async def add_to_queue(
end_item = queue[-1]
else:
# need to make a new queue item
deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
deferred: ObservableDeferred[_PersistResult] = ObservableDeferred(
defer.Deferred(), consumeErrors=True
)

end_item = _EventPersistQueueItem(
events_and_contexts=[],
Expand Down
27 changes: 14 additions & 13 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Awaitable,
Callable,
Dict,
Generic,
Hashable,
Iterable,
List,
Expand All @@ -49,8 +50,10 @@

logger = logging.getLogger(__name__)

_T = TypeVar("_T")

class ObservableDeferred:

class ObservableDeferred(Generic[_T]):
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
deferred.
Expand All @@ -68,7 +71,7 @@ class ObservableDeferred:

__slots__ = ["_deferred", "_observers", "_result"]

def __init__(self, deferred: defer.Deferred, consumeErrors: bool = False):
def __init__(self, deferred: defer.Deferred[_T], consumeErrors: bool = False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
object.__setattr__(self, "_observers", set())
Expand Down Expand Up @@ -113,15 +116,15 @@ def errback(f):

deferred.addCallbacks(callback, errback)

def observe(self) -> defer.Deferred:
def observe(self) -> defer.Deferred[_T]:
"""Observe the underlying deferred.

This returns a brand new deferred that is resolved when the underlying
deferred is resolved. Interacting with the returned deferred does not
effect the underlying deferred.
"""
if not self._result:
d = defer.Deferred()
d: defer.Deferred[_T] = defer.Deferred()

def remove(r):
self._observers.discard(d)
Expand All @@ -135,7 +138,7 @@ def remove(r):
success, res = self._result
return defer.succeed(res) if success else defer.fail(res)

def observers(self) -> List[defer.Deferred]:
def observers(self) -> List[defer.Deferred[_T]]:
return self._observers

def has_called(self) -> bool:
Expand All @@ -144,7 +147,7 @@ def has_called(self) -> bool:
def has_succeeded(self) -> bool:
return self._result is not None and self._result[0] is True

def get_result(self) -> Any:
def get_result(self) -> _T:
return self._result[1]

def __getattr__(self, name: str) -> Any:
Expand Down Expand Up @@ -415,7 +418,7 @@ def __init__(self):
self.key_to_current_writer: Dict[str, defer.Deferred] = {}

async def read(self, key: str) -> ContextManager:
new_defer = defer.Deferred()
new_defer: defer.Deferred[None] = defer.Deferred()

curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
Expand All @@ -438,7 +441,7 @@ def _ctx_manager():
return _ctx_manager()

async def write(self, key: str) -> ContextManager:
new_defer = defer.Deferred()
new_defer: defer.Deferred[None] = defer.Deferred()

curr_readers = self.key_to_current_readers.get(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
Expand Down Expand Up @@ -471,10 +474,8 @@ def _ctx_manager():


def timeout_deferred(
deferred: defer.Deferred,
timeout: float,
reactor: IReactorTime,
) -> defer.Deferred:
deferred: defer.Deferred[_T], timeout: float, reactor: IReactorTime
) -> defer.Deferred[_T]:
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
Expand All @@ -497,7 +498,7 @@ def timeout_deferred(
Returns:
A new Deferred, which will errback with defer.TimeoutError on timeout.
"""
new_d = defer.Deferred()
new_d: defer.Deferred[_T] = defer.Deferred()

timed_out = [False]

Expand Down
34 changes: 23 additions & 11 deletions synapse/util/caches/cached_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum
from typing import Awaitable, Callable, Generic, Optional, TypeVar, Union

from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure

Expand All @@ -22,6 +23,10 @@
TV = TypeVar("TV")


class _Sentinel(enum.Enum):
sentinel = object()


class CachedCall(Generic[TV]):
"""A wrapper for asynchronous calls whose results should be shared

Expand Down Expand Up @@ -65,7 +70,7 @@ def __init__(self, f: Callable[[], Awaitable[TV]]):
"""
self._callable: Optional[Callable[[], Awaitable[TV]]] = f
self._deferred: Optional[Deferred] = None
self._result: Union[None, Failure, TV] = None
self._result: Union[_Sentinel, TV, Failure] = _Sentinel.sentinel

async def get(self) -> TV:
"""Kick off the call if necessary, and return the result"""
Expand All @@ -78,8 +83,9 @@ async def get(self) -> TV:
self._callable = None

# once the deferred completes, store the result. We cannot simply leave the
# result in the deferred, since if it's a Failure, GCing the deferred
# would then log a critical error about unhandled Failures.
# result in the deferred, since `awaiting` a deferred destroys its result.
# (Also, if it's a Failure, GCing the deferred would log a critical error
# about unhandled Failures)
def got_result(r):
self._result = r

Expand All @@ -92,13 +98,19 @@ def got_result(r):
# and any eventual exception may not be reported.

# we can now await the deferred, and once it completes, return the result.
await make_deferred_yieldable(self._deferred)

# I *think* this is the easiest way to correctly raise a Failure without having
# to gut-wrench into the implementation of Deferred.
d = Deferred()
d.callback(self._result)
return await d
if isinstance(self._result, _Sentinel):
await make_deferred_yieldable(self._deferred)
assert not isinstance(self._result, _Sentinel)

if isinstance(self._result, Failure):
# I *think* awaiting a failed Deferred is the easiest way to correctly raise
# the right exception.
d = defer.fail(self._result)
await d
# the `await` should always raise, so this should be unreachable.
raise AssertionError("unexpected return from await on failure")

return self._result


class RetryOnExceptionCachedCall(Generic[TV]):
Expand Down
15 changes: 12 additions & 3 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@

import enum
import threading
from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union
from typing import (
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
TypeVar,
Union,
cast,
)

from prometheus_client import Gauge

Expand Down Expand Up @@ -166,7 +175,7 @@ def get_immediate(
def set(
self,
key: KT,
value: defer.Deferred,
value: defer.Deferred[VT],
callback: Optional[Callable[[], None]] = None,
) -> defer.Deferred:
"""Adds a new entry to the cache (or updates an existing one).
Expand Down Expand Up @@ -214,7 +223,7 @@ def set(
if value.called:
result = value.result
if not isinstance(result, failure.Failure):
self.cache.set(key, result, callbacks)
self.cache.set(key, cast(VT, result), callbacks)
return value

# otherwise, we'll add an entry to the _pending_deferred_cache for now,
Expand Down
2 changes: 1 addition & 1 deletion synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def arg_to_cache_key(arg):
# relevant result for that key.
deferreds_map = {}
for arg in missing:
deferred = defer.Deferred()
deferred: defer.Deferred[Any] = defer.Deferred()
deferreds_map[arg] = deferred
key = arg_to_cache_key(arg)
cache.set(key, deferred, callback=invalidate_callback)
Expand Down