Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop ignoring tests/server.py #15084

Merged
merged 21 commits into from
Feb 17, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from collections import deque
from io import SEEK_END, BytesIO
from typing import (
Any,
Callable,
Dict,
Iterable,
Expand All @@ -31,6 +32,7 @@
Sequence,
Tuple,
Type,
TypeVar,
Union,
cast,
)
Expand All @@ -46,6 +48,7 @@
from twisted.internet.error import DNSLookupError
from twisted.internet.interfaces import (
IAddress,
IConnector,
IConsumer,
IHostnameResolver,
IProducer,
Expand All @@ -57,6 +60,8 @@
IResolverSimple,
ITransport,
)
from twisted.internet.protocol import ClientFactory, DatagramProtocol
from twisted.python import threadpool
from twisted.python.failure import Failure
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
from twisted.web.http_headers import Headers
Expand Down Expand Up @@ -91,6 +96,8 @@

logger = logging.getLogger(__name__)

R = TypeVar("R")

# the type of thing that can be passed into `make_request` in the headers list
CustomHeaderType = Tuple[Union[str, bytes], Union[str, bytes]]

Expand Down Expand Up @@ -432,19 +439,21 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
A MemoryReactorClock that supports callFromThread.
"""

def __init__(self):
def __init__(self) -> None:
self.threadpool = ThreadPool(self)

self._tcp_callbacks: Dict[Tuple[str, int], Callable] = {}
self._udp = []
self._udp: List[udp.Port] = []
self.lookups: Dict[str, str] = {}
self._thread_callbacks: Deque[Callable[[], None]] = deque()
self._thread_callbacks: Deque[Callable[..., R]] = deque()

lookups = self.lookups

@implementer(IResolverSimple)
class FakeResolver:
def getHostByName(self, name, timeout=None):
def getHostByName(
self, name: str, timeout: Optional[Sequence[int]] = None
) -> "Deferred[str]":
if name not in lookups:
return fail(DNSLookupError("OH NO: unknown %s" % (name,)))
return succeed(lookups[name])
Expand All @@ -455,25 +464,44 @@ def getHostByName(self, name, timeout=None):
def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
raise NotImplementedError()

def listenUDP(self, port, protocol, interface="", maxPacketSize=8196):
def listenUDP(
self,
port: int,
protocol: DatagramProtocol,
interface: str = "",
maxPacketSize: int = 8196,
) -> udp.Port:
p = udp.Port(port, protocol, interface, maxPacketSize, self)
p.startListening()
self._udp.append(p)
return p

def callFromThread(self, callback, *args, **kwargs):
def callFromThread(
self, callable: Callable[..., Any], *args: object, **kwargs: object
) -> None:
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
"""
Make the callback fire in the next reactor iteration.
"""
cb = lambda: callback(*args, **kwargs)
cb = lambda: callable(*args, **kwargs)
# it's not safe to call callLater() here, so we append the callback to a
# separate queue.
self._thread_callbacks.append(cb)

def getThreadPool(self):
return self.threadpool
def callInThread(
self, callable: Callable[..., Any], *args: object, **kwargs: object
) -> None:
raise NotImplementedError()

def suggestThreadPoolSize(self, size: int) -> None:
raise NotImplementedError()

def getThreadPool(self) -> "threadpool.ThreadPool":
# Cast to match super-class.
return cast(threadpool.ThreadPool, self.threadpool)

def add_tcp_client_callback(self, host: str, port: int, callback: Callable):
def add_tcp_client_callback(
self, host: str, port: int, callback: Callable[[], None]
) -> None:
"""Add a callback that will be invoked when we receive a connection
attempt to the given IP/port using `connectTCP`.

Expand All @@ -482,7 +510,14 @@ def add_tcp_client_callback(self, host: str, port: int, callback: Callable):
"""
self._tcp_callbacks[(host, port)] = callback

def connectTCP(self, host: str, port: int, factory, timeout=30, bindAddress=None):
def connectTCP(
self,
host: str,
port: int,
factory: ClientFactory,
timeout: float = 30,
bindAddress: Optional[Tuple[str, int]] = None,
) -> IConnector:
"""Fake L{IReactorTCP.connectTCP}."""

conn = super().connectTCP(
Expand All @@ -495,7 +530,7 @@ def connectTCP(self, host: str, port: int, factory, timeout=30, bindAddress=None

return conn

def advance(self, amount):
def advance(self, amount: float) -> None:
# first advance our reactor's time, and run any "callLater" callbacks that
# makes ready
super().advance(amount)
Expand Down