-
-
Notifications
You must be signed in to change notification settings - Fork 343
notes to self
jakkdl edited this page Nov 25, 2024
·
1 revision
The main trio repository used to have these files in a directory called notes-to-self. They are various small test scripts and examples written by njsmith during early development of Trio. They are referred to in the documentation in a few places, but have otherwise not seen much attention for a long time and are now archived in this wiki.
If something in here is of value, it should perhaps be pulled out and put directly into the official documentation.
# A little script to experiment with AFD polling.
#
# This cheats and uses a bunch of internal APIs. Don't follow its example. The
# point is just to experiment with random junk that probably won't work, so we
# can figure out what we actually do want to do internally.
# Currently this demonstrates what seems to be a weird bug in the Windows
# kernel. If you:
#
# 0. Set up a socket so that it's not writable.
# 1. Submit a SEND poll operation.
# 2. Submit a RECEIVE poll operation.
# 3. Send some data through the socket, to trigger the RECEIVE.
#
# ...then the SEND poll operation completes with the RECEIVE flag set.
#
# (This bug is why our Windows backend jumps through hoops to avoid ever
# issuing multiple polls simultaneously for the same socket.)
#
# This script's output on my machine:
#
# -- Iteration start --
# Starting a poll for <AFDPollFlags.AFD_POLL_SEND: 4>
# Starting a poll for <AFDPollFlags.AFD_POLL_RECEIVE: 1>
# Sending another byte
# Poll for <AFDPollFlags.AFD_POLL_SEND: 4>: got <AFDPollFlags.AFD_POLL_RECEIVE: 1>
# Poll for <AFDPollFlags.AFD_POLL_RECEIVE: 1>: Cancelled()
# -- Iteration start --
# Starting a poll for <AFDPollFlags.AFD_POLL_SEND: 4>
# Starting a poll for <AFDPollFlags.AFD_POLL_RECEIVE: 1>
# Poll for <AFDPollFlags.AFD_POLL_RECEIVE: 1>: got <AFDPollFlags.AFD_POLL_RECEIVE: 1> Sending another byte
# Poll for <AFDPollFlags.AFD_POLL_SEND: 4>: got <AFDPollFlags.AFD_POLL_RECEIVE: 1>
#
# So what we're seeing is:
#
# On the first iteration, where there's initially no data in the socket, the
# SEND completes with the RECEIVE flag set, and the RECEIVE operation doesn't
# return at all, until we cancel it.
#
# On the second iteration, there's already data sitting in the socket from the
# last loop. This time, the RECEIVE returns immediately with the RECEIVE flag
# set, which makes sense -- when starting a RECEIVE poll, it does an immediate
# check to see if there's data already, and if so it does an early exit. But
# the bizarre thing is, when we then send *another* byte of data, the SEND
# operation wakes up with the RECEIVE flag set.
#
# Why is this bizarre? Let me count the ways:
#
# - The SEND operation should never return RECEIVE.
#
# - If it does insist on returning RECEIVE, it should do it immediately, since
# there is already data to receive. But it doesn't.
#
# - And then when we send data into a socket that already has data in it, that
# shouldn't have any effect at all! But instead it wakes up the SEND.
#
# - Also, the RECEIVE call did an early check for data and exited out
# immediately, without going through the whole "register a callback to
# be notified when data arrives" dance. So even if you do have some bug
# in tracking which operations should be woken on which state transitions,
# there's no reason this operation would even touch that tracking data. Yet,
# if we take out the brief RECEIVE, then the SEND *doesn't* wake up.
#
# - Also, if I move the send() call up above the loop, so that there's already
# data in the socket when we start our first iteration, then you would think
# that would just make the first iteration act like it was the second
# iteration. But it doesn't. Instead it makes all the weird behavior
# disappear entirely.
#
# "What do we know … of the world and the universe about us? Our means of
# receiving impressions are absurdly few, and our notions of surrounding
# objects infinitely narrow. We see things only as we are constructed to see
# them, and can gain no idea of their absolute nature. With five feeble senses
# we pretend to comprehend the boundlessly complex cosmos, yet other beings
# with wider, stronger, or different range of senses might not only see very
# differently the things we see, but might see and study whole worlds of
# matter, energy, and life which lie close at hand yet can never be detected
# with the senses we have."
import os.path
import sys
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__) + r"\.."))
import trio
print(trio.__file__)
import socket
import trio.testing
from trio._core._io_windows import _afd_helper_handle, _check, _get_base_socket
from trio._core._windows_cffi import (
AFDPollFlags,
ErrorCodes,
IoControlCodes,
ffi,
kernel32,
)
class AFDLab:
def __init__(self):
self._afd = _afd_helper_handle()
trio.lowlevel.register_with_iocp(self._afd)
async def afd_poll(self, sock, flags, *, exclusive=0):
print(f"Starting a poll for {flags!r}")
lpOverlapped = ffi.new("LPOVERLAPPED")
poll_info = ffi.new("AFD_POLL_INFO *")
poll_info.Timeout = 2**63 - 1 # INT64_MAX
poll_info.NumberOfHandles = 1
poll_info.Exclusive = exclusive
poll_info.Handles[0].Handle = _get_base_socket(sock)
poll_info.Handles[0].Status = 0
poll_info.Handles[0].Events = flags
try:
_check(
kernel32.DeviceIoControl(
self._afd,
IoControlCodes.IOCTL_AFD_POLL,
poll_info,
ffi.sizeof("AFD_POLL_INFO"),
poll_info,
ffi.sizeof("AFD_POLL_INFO"),
ffi.NULL,
lpOverlapped,
),
)
except OSError as exc:
if exc.winerror != ErrorCodes.ERROR_IO_PENDING: # pragma: no cover
raise
try:
await trio.lowlevel.wait_overlapped(self._afd, lpOverlapped)
except:
print(f"Poll for {flags!r}: {sys.exc_info()[1]!r}")
raise
out_flags = AFDPollFlags(poll_info.Handles[0].Events)
print(f"Poll for {flags!r}: got {out_flags!r}")
return out_flags
def fill_socket(sock):
try:
while True:
sock.send(b"x" * 65536)
except BlockingIOError:
pass
async def main():
afdlab = AFDLab()
a, b = socket.socketpair()
a.setblocking(False)
b.setblocking(False)
fill_socket(a)
while True:
print("-- Iteration start --")
async with trio.open_nursery() as nursery:
nursery.start_soon(
afdlab.afd_poll,
a,
AFDPollFlags.AFD_POLL_SEND,
)
await trio.sleep(2)
nursery.start_soon(
afdlab.afd_poll,
a,
AFDPollFlags.AFD_POLL_RECEIVE,
)
await trio.sleep(2)
print("Sending another byte")
b.send(b"x")
await trio.sleep(2)
nursery.cancel_scope.cancel()
trio.run(main)
import asyncio
import trio
async def aio_main():
loop = asyncio.get_running_loop()
trio_done_fut = loop.create_future()
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)
trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)
(await trio_done_fut).unwrap()
async def trio_main():
print("trio_main!")
to_trio, from_aio = trio.open_memory_channel(float("inf"))
from_trio = asyncio.Queue()
_task_ref = asyncio.create_task(aio_pingpong(from_trio, to_trio))
from_trio.put_nowait(0)
async for n in from_aio:
print(f"trio got: {n}")
await trio.sleep(1)
from_trio.put_nowait(n + 1)
if n >= 10:
return
del _task_ref
async def aio_pingpong(from_trio, to_trio):
print("aio_pingpong!")
while True:
n = await from_trio.get()
print(f"aio got: {n}")
await asyncio.sleep(1)
to_trio.send_nowait(n + 1)
asyncio.run(aio_main())
from types import CodeType
# Has to be a string :-(
sentinel = "_unique_name"
def f():
print(locals())
# code(argcount, kwonlyargcount, nlocals, stacksize, flags, codestring,
# constants, names, varnames, filename, name, firstlineno,
# lnotab[, freevars[, cellvars]])
new_code = CodeType(
f.__code__.co_argcount,
f.__code__.co_kwonlyargcount + 1,
f.__code__.co_nlocals + 1,
f.__code__.co_stacksize,
f.__code__.co_flags,
f.__code__.co_code,
f.__code__.co_consts,
f.__code__.co_names,
(*f.__code__.co_varnames, sentinel),
f.__code__.co_filename,
f.__code__.co_name,
f.__code__.co_firstlineno,
f.__code__.co_lnotab,
f.__code__.co_freevars,
f.__code__.co_cellvars,
)
f.__code__ = new_code
f.__kwdefaults__ = {sentinel: "fdsa"}
f()
import errno
import os
import socket
import trio
bad_socket = socket.socket()
class BlockingReadTimeoutError(Exception):
pass
async def blocking_read_with_timeout(
fd,
count,
timeout, # noqa: ASYNC109 # manual timeout
):
print("reading from fd", fd)
cancel_requested = False
async def kill_it_after_timeout(new_fd):
print("sleeping")
await trio.sleep(timeout)
print("breaking the fd")
os.dup2(bad_socket.fileno(), new_fd, inheritable=False)
# MAGIC
print("setuid(getuid())")
os.setuid(os.getuid())
nonlocal cancel_requested
cancel_requested = True
new_fd = os.dup(fd)
print("working fd is", new_fd)
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(kill_it_after_timeout, new_fd)
try:
data = await trio.to_thread.run_sync(os.read, new_fd, count)
except OSError as exc:
if cancel_requested and exc.errno == errno.ENOTCONN:
# Call was successfully cancelled. In a real version we'd
# integrate properly with Trio's cancellation tools; here
# we'll just raise an arbitrary error.
raise BlockingReadTimeoutError from None
print("got", data)
nursery.cancel_scope.cancel()
return data
finally:
os.close(new_fd)
trio.run(blocking_read_with_timeout, 0, 10, 2)
# Little script to get a rough estimate of how much memory each task takes
import resource
import trio
import trio.testing
LOW = 1000
HIGH = 10000
async def tinytask():
await trio.sleep_forever()
async def measure(count):
async with trio.open_nursery() as nursery:
for _ in range(count):
nursery.start_soon(tinytask)
await trio.testing.wait_all_tasks_blocked()
nursery.cancel_scope.cancel()
return resource.getrusage(resource.RUSAGE_SELF)
async def main():
low_usage = await measure(LOW)
high_usage = await measure(HIGH + LOW)
print("Memory usage per task:", (high_usage.ru_maxrss - low_usage.ru_maxrss) / HIGH)
print("(kilobytes on Linux, bytes on macOS)")
trio.run(main)
# This script completes correctly on macOS and FreeBSD 13.0-CURRENT, but hangs
# on FreeBSD 12.1. I'm told the fix will be backported to 12.2 (which is due
# out in October 2020).
#
# Upstream bug: https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=246350
import os
import select
r, w = os.pipe()
os.set_blocking(w, False)
print("filling pipe buffer")
try:
while True:
os.write(w, b"x")
except BlockingIOError:
pass
_, wfds, _ = select.select([], [w], [], 0)
print("select() says the write pipe is", "writable" if w in wfds else "NOT writable")
kq = select.kqueue()
event = select.kevent(w, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
kq.control([event], 0)
print("closing read end of pipe")
os.close(r)
_, wfds, _ = select.select([], [w], [], 0)
print("select() says the write pipe is", "writable" if w in wfds else "NOT writable")
print("waiting for kqueue to report the write end is writable")
got = kq.control([], 1)
print("done!")
print(got)
import time
# https://bitbucket.org/pypy/pypy/issues/2624/weird-performance-on-pypy3-when-reading
# COUNT = 100000
# f = open("/etc/passwd", "rt")
COUNT = 1000000
# With default buffering this test never even syscalls, and goes at about ~140
# ns per call, instead of ~500 ns/call for the syscall and related overhead.
# That's probably more fair -- the BufferedIOBase code can't service random
# accesses, even if your working set fits entirely in RAM.
with open("/etc/passwd", "rb") as f: # , buffering=0)
while True:
start = time.perf_counter()
for _ in range(COUNT):
f.seek(0)
f.read(1)
between = time.perf_counter()
for _ in range(COUNT):
f.seek(0)
end = time.perf_counter()
both = (between - start) / COUNT * 1e9
seek = (end - between) / COUNT * 1e9
read = both - seek
print(
f"{both:.2f} ns/(seek+read), {seek:.2f} ns/seek, estimate ~{read:.2f} ns/read",
)
import signal
import gsm
import trio
class GracefulShutdownManager:
def __init__(self):
self._shutting_down = False
self._cancel_scopes = set()
def start_shutdown(self):
self._shutting_down = True
for cancel_scope in self._cancel_scopes:
cancel_scope.cancel()
def cancel_on_graceful_shutdown(self):
cancel_scope = trio.CancelScope()
self._cancel_scopes.add(cancel_scope)
if self._shutting_down:
cancel_scope.cancel()
return cancel_scope
@property
def shutting_down(self):
return self._shutting_down
# Code can check gsm.shutting_down occasionally at appropriate points to see
# if it should exit.
#
# When doing operations that might block for an indefinite about of time and
# that should be aborted when a graceful shutdown starts, wrap them in 'with
# gsm.cancel_on_graceful_shutdown()'.
async def stream_handler(stream):
while True:
with gsm.cancel_on_graceful_shutdown():
data = await stream.receive_some()
print(f"{data = }")
if gsm.shutting_down:
break
# To trigger the shutdown:
async def listen_for_shutdown_signals():
with trio.open_signal_receiver(signal.SIGINT, signal.SIGTERM) as signal_aiter:
async for _sig in signal_aiter:
gsm.start_shutdown()
break
# TODO: it'd be nice to have some logic like "if we get another
# signal, or if 30 seconds pass, then do a hard shutdown".
# That's easy enough:
#
# with trio.move_on_after(30):
# async for sig in signal_aiter:
# break
# sys.exit()
#
# The trick is, if we do finish shutting down in (say) 10 seconds,
# then we want to exit immediately. So I guess you'd need the main
# part of the program to detect when it's finished shutting down, and
# then cancel listen_for_shutdown_signals?
#
# I guess this would be a good place to use @smurfix's daemon task
# construct:
# https://github.com/python-trio/trio/issues/569#issuecomment-408419260
# There are some tables here:
# https://web.archive.org/web/20120206195747/https://msdn.microsoft.com/en-us/library/windows/desktop/ms740621(v=vs.85).aspx
# They appear to be wrong.
#
# See https://github.com/python-trio/trio/issues/928 for details and context
import errno
import socket
modes = ["default", "SO_REUSEADDR", "SO_EXCLUSIVEADDRUSE"]
bind_types = ["wildcard", "specific"]
def sock(mode):
s = socket.socket(family=socket.AF_INET)
if mode == "SO_REUSEADDR":
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
elif mode == "SO_EXCLUSIVEADDRUSE":
s.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
return s
def bind(sock, bind_type):
if bind_type == "wildcard":
sock.bind(("0.0.0.0", 12345))
elif bind_type == "specific":
sock.bind(("127.0.0.1", 12345))
else:
raise AssertionError()
def table_entry(mode1, bind_type1, mode2, bind_type2):
with sock(mode1) as sock1:
bind(sock1, bind_type1)
try:
with sock(mode2) as sock2:
bind(sock2, bind_type2)
except OSError as exc:
if exc.winerror == errno.WSAEADDRINUSE:
return "INUSE"
elif exc.winerror == errno.WSAEACCES:
return "ACCESS"
raise
else:
return "Success"
print(
"""
second bind
| """
+ " | ".join(["%-19s" % mode for mode in modes]),
)
print(""" """, end="")
for _ in modes:
print(" | " + " | ".join(["%8s" % bind_type for bind_type in bind_types]), end="")
print(
"""
first bind -----------------------------------------------------------------""",
# default | wildcard | INUSE | Success | ACCESS | Success | INUSE | Success
)
for mode1 in modes:
for bind_type1 in bind_types:
row = []
for mode2 in modes:
for bind_type2 in bind_types:
entry = table_entry(mode1, bind_type1, mode2, bind_type2)
row.append(entry)
# print(mode1, bind_type1, mode2, bind_type2, entry)
print(
f"{mode1:>19} | {bind_type1:>8} | "
+ " | ".join(["%8s" % entry for entry in row]),
)
import time
import trio
async def loopy():
try:
while True:
# synchronous sleep to avoid maxing out CPU
time.sleep(0.01) # noqa: ASYNC251
await trio.lowlevel.checkpoint()
except KeyboardInterrupt:
print("KI!")
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(loopy)
nursery.start_soon(loopy)
nursery.start_soon(loopy)
trio.run(main)
import sys
import trio
(COUNT_STR,) = sys.argv[1:]
COUNT = int(COUNT_STR)
async def main():
async with trio.open_nursery() as nursery:
for _ in range(COUNT):
nursery.start_soon(trio.sleep, 1)
trio.run(main)
# How to manually call the SIGINT handler on Windows without using raise() or
# similar.
import os
import sys
if os.name == "nt":
import cffi
ffi = cffi.FFI()
ffi.cdef(
"""
void* WINAPI GetProcAddress(void* hModule, char* lpProcName);
typedef void (*PyOS_sighandler_t)(int);
""",
)
kernel32 = ffi.dlopen("kernel32.dll")
PyOS_getsig_ptr = kernel32.GetProcAddress(
ffi.cast("void*", sys.dllhandle),
b"PyOS_getsig",
)
PyOS_getsig = ffi.cast("PyOS_sighandler_t (*)(int)", PyOS_getsig_ptr)
import signal
PyOS_getsig(signal.SIGINT)(signal.SIGINT)
import trio
async def run_test(nominal_backlog):
print("--\nnominal:", nominal_backlog)
listen_sock = trio.socket.socket()
await listen_sock.bind(("127.0.0.1", 0))
listen_sock.listen(nominal_backlog)
client_socks = []
while True:
client_sock = trio.socket.socket()
# Generally the response to the listen buffer being full is that the
# SYN gets dropped, and the client retries after 1 second. So we
# assume that any connect() call to localhost that takes >0.5 seconds
# indicates a dropped SYN.
with trio.move_on_after(0.5) as cancel_scope:
await client_sock.connect(listen_sock.getsockname())
if cancel_scope.cancelled_caught:
break
client_socks.append(client_sock)
print("actual:", len(client_socks))
for client_sock in client_socks:
client_sock.close()
for nominal_backlog in [10, trio.socket.SOMAXCONN, 65535]:
trio.run(run_test, nominal_backlog)
# If you want to use IPv6, then:
# - replace AF_INET with AF_INET6 everywhere
# - use the hostname "2.pool.ntp.org"
# (see: https://news.ntppool.org/2011/06/continuing-ipv6-deployment/)
import datetime
import struct
import trio
def make_query_packet():
"""Construct a UDP packet suitable for querying an NTP server to ask for
the current time."""
# The structure of an NTP packet is described here:
# https://tools.ietf.org/html/rfc5905#page-19
# They're always 48 bytes long, unless you're using extensions, which we
# aren't.
packet = bytearray(48)
# The first byte contains 3 subfields:
# first 2 bits: 11, leap second status unknown
# next 3 bits: 100, NTP version indicator, 0b100 == 4 = version 4
# last 3 bits: 011, NTP mode indicator, 0b011 == 3 == "client"
packet[0] = 0b11100011
# For an outgoing request, all other fields can be left as zeros.
return packet
def extract_transmit_timestamp(ntp_packet):
"""Given an NTP packet, extract the "transmit timestamp" field, as a
Python datetime."""
# The transmit timestamp is the time that the server sent its response.
# It's stored in bytes 40-47 of the NTP packet. See:
# https://tools.ietf.org/html/rfc5905#page-19
encoded_transmit_timestamp = ntp_packet[40:48]
# The timestamp is stored in the "NTP timestamp format", which is a 32
# byte count of whole seconds, followed by a 32 byte count of fractions of
# a second. See:
# https://tools.ietf.org/html/rfc5905#page-13
seconds, fraction = struct.unpack("!II", encoded_transmit_timestamp)
# The timestamp is the number of seconds since January 1, 1900 (ignoring
# leap seconds). To convert it to a datetime object, we do some simple
# datetime arithmetic:
base_time = datetime.datetime(1900, 1, 1)
offset = datetime.timedelta(seconds=seconds + fraction / 2**32)
return base_time + offset
async def main():
print("Our clock currently reads (in UTC):", datetime.datetime.utcnow())
# Look up some random NTP servers.
# (See www.pool.ntp.org for information about the NTP pool.)
servers = await trio.socket.getaddrinfo(
"pool.ntp.org", # host
"ntp", # port
family=trio.socket.AF_INET, # IPv4
type=trio.socket.SOCK_DGRAM, # UDP
)
# Construct an NTP query packet.
query_packet = make_query_packet()
# Create a UDP socket
udp_sock = trio.socket.socket(
family=trio.socket.AF_INET, # IPv4
type=trio.socket.SOCK_DGRAM, # UDP
)
# Use the socket to send the query packet to each of the servers.
print("-- Sending queries --")
for server in servers:
address = server[-1]
print("Sending to:", address)
await udp_sock.sendto(query_packet, address)
# Read responses from the socket.
print("-- Reading responses (for 10 seconds) --")
with trio.move_on_after(10):
while True:
# We accept packets up to 1024 bytes long (though in practice NTP
# packets will be much shorter).
data, address = await udp_sock.recvfrom(1024)
print("Got response from:", address)
transmit_timestamp = extract_transmit_timestamp(data)
print("Their clock read (in UTC):", transmit_timestamp)
trio.run(main)
# NOTE:
# possibly it would be easier to use https://pypi.org/project/tree-format/
# instead of formatting by hand like this code does...
"""
Demo/exploration of how to print a task tree. Outputs:
<init>
├─ __main__.main
│ ├─ __main__.child1
│ │ ├─ trio.sleep_forever
│ │ ├─ __main__.child2
│ │ │ ├─ trio.sleep_forever
│ │ │ └─ trio.sleep_forever
│ │ └─ __main__.child2
│ │ ├─ trio.sleep_forever
│ │ └─ trio.sleep_forever
│ └─ (nested nursery)
│ └─ __main__.child1
│ ├─ trio.sleep_forever
│ ├─ __main__.child2
│ │ ├─ trio.sleep_forever
│ │ └─ trio.sleep_forever
│ └─ __main__.child2
│ ├─ trio.sleep_forever
│ └─ trio.sleep_forever
└─ <call soon task>
"""
import trio
import trio.testing
MID_PREFIX = "├─ "
MID_CONTINUE = "│ "
END_PREFIX = "└─ "
END_CONTINUE = " " * len(END_PREFIX)
def current_root_task():
task = trio.lowlevel.current_task()
while task.parent_nursery is not None:
task = task.parent_nursery.parent_task
return task
def _render_subtree(name, rendered_children):
lines = []
lines.append(name)
for child_lines in rendered_children:
if child_lines is rendered_children[-1]:
first_prefix = END_PREFIX
rest_prefix = END_CONTINUE
else:
first_prefix = MID_PREFIX
rest_prefix = MID_CONTINUE
lines.append(first_prefix + child_lines[0])
lines.extend(rest_prefix + child_line for child_line in child_lines[1:])
return lines
def _rendered_nursery_children(nursery):
return [task_tree_lines(t) for t in nursery.child_tasks]
def task_tree_lines(task=None):
if task is None:
task = current_root_task()
rendered_children = []
nurseries = list(task.child_nurseries)
while nurseries:
nursery = nurseries.pop()
nursery_children = _rendered_nursery_children(nursery)
if rendered_children:
nested = _render_subtree("(nested nursery)", rendered_children)
nursery_children.append(nested)
rendered_children = nursery_children
return _render_subtree(task.name, rendered_children)
def print_task_tree(task=None):
for line in task_tree_lines(task):
print(line)
################################################################
async def child2():
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.sleep_forever)
nursery.start_soon(trio.sleep_forever)
async def child1():
async with trio.open_nursery() as nursery:
nursery.start_soon(child2)
nursery.start_soon(child2)
nursery.start_soon(trio.sleep_forever)
async def main():
async with trio.open_nursery() as nursery0:
nursery0.start_soon(child1)
async with trio.open_nursery() as nursery1:
nursery1.start_soon(child1)
await trio.testing.wait_all_tasks_blocked()
print_task_tree()
nursery0.cancel_scope.cancel()
trio.run(main)
import textwrap
import time
methods = {"fileno"}
class Proxy1:
strategy = "__getattr__"
works_for = "any attr"
def __init__(self, wrapped):
self._wrapped = wrapped
def __getattr__(self, name):
if name in methods:
return getattr(self._wrapped, name)
raise AttributeError(name)
################################################################
class Proxy2:
strategy = "generated methods (getattr + closure)"
works_for = "methods"
def __init__(self, wrapped):
self._wrapped = wrapped
def add_wrapper(cls, method):
def wrapper(self, *args, **kwargs):
return getattr(self._wrapped, method)(*args, **kwargs)
setattr(cls, method, wrapper)
for method in methods:
add_wrapper(Proxy2, method)
################################################################
class Proxy3:
strategy = "generated methods (exec)"
works_for = "methods"
def __init__(self, wrapped):
self._wrapped = wrapped
def add_wrapper(cls, method):
code = textwrap.dedent(
f"""
def wrapper(self, *args, **kwargs):
return self._wrapped.{method}(*args, **kwargs)
""",
)
ns = {}
exec(code, ns)
setattr(cls, method, ns["wrapper"])
for method in methods:
add_wrapper(Proxy3, method)
################################################################
class Proxy4:
strategy = "generated properties (getattr + closure)"
works_for = "any attr"
def __init__(self, wrapped):
self._wrapped = wrapped
def add_wrapper(cls, attr):
def getter(self):
return getattr(self._wrapped, attr)
def setter(self, newval):
setattr(self._wrapped, attr, newval)
def deleter(self):
delattr(self._wrapped, attr)
setattr(cls, attr, property(getter, setter, deleter))
for method in methods:
add_wrapper(Proxy4, method)
################################################################
class Proxy5:
strategy = "generated properties (exec)"
works_for = "any attr"
def __init__(self, wrapped):
self._wrapped = wrapped
def add_wrapper(cls, attr):
code = textwrap.dedent(
f"""
def getter(self):
return self._wrapped.{attr}
def setter(self, newval):
self._wrapped.{attr} = newval
def deleter(self):
del self._wrapped.{attr}
""",
)
ns = {}
exec(code, ns)
setattr(cls, attr, property(ns["getter"], ns["setter"], ns["deleter"]))
for method in methods:
add_wrapper(Proxy5, method)
################################################################
# methods only
class Proxy6:
strategy = "copy attrs from wrappee to wrapper"
works_for = "methods + constant attrs"
def __init__(self, wrapper):
self._wrapper = wrapper
for method in methods:
setattr(self, method, getattr(self._wrapper, method))
################################################################
classes = [Proxy1, Proxy2, Proxy3, Proxy4, Proxy5, Proxy6]
def check(cls):
with open("/etc/passwd") as f:
p = cls(f)
assert p.fileno() == f.fileno()
for cls in classes:
check(cls)
with open("/etc/passwd") as f:
objs = [c(f) for c in classes]
COUNT = 1000000
try:
import __pypy__ # noqa: F401 # __pypy__ imported but unused
except ImportError:
pass
else:
COUNT *= 10
while True:
print("-------")
for obj in objs:
start = time.perf_counter()
for _ in range(COUNT):
obj.fileno()
# obj.fileno
end = time.perf_counter()
per_usec = COUNT / (end - start) / 1e6
print(f"{per_usec:7.2f} / us: {obj.strategy} ({obj.works_for})")
import os
import tempfile
import threading
import time
def check_reopen(r1, w):
try:
print("Reopening read end")
r2 = os.open(f"/proc/self/fd/{r1}", os.O_RDONLY)
print(f"r1 is {r1}, r2 is {r2}")
print("checking they both can receive from w...")
os.write(w, b"a")
assert os.read(r1, 1) == b"a"
os.write(w, b"b")
assert os.read(r2, 1) == b"b"
print("...ok")
print("setting r2 to non-blocking")
os.set_blocking(r2, False)
print("os.get_blocking(r1) ==", os.get_blocking(r1))
print("os.get_blocking(r2) ==", os.get_blocking(r2))
# Check r2 is really truly non-blocking
try:
os.read(r2, 1)
except BlockingIOError:
print("r2 definitely seems to be in non-blocking mode")
# Check that r1 is really truly still in blocking mode
def sleep_then_write():
time.sleep(1)
os.write(w, b"c")
threading.Thread(target=sleep_then_write, daemon=True).start()
assert os.read(r1, 1) == b"c"
print("r1 definitely seems to be in blocking mode")
except Exception as exc:
print(f"ERROR: {exc!r}")
print("-- testing anonymous pipe --")
check_reopen(*os.pipe())
print("-- testing FIFO --")
with tempfile.TemporaryDirectory() as tmpdir:
fifo = tmpdir + "/" + "myfifo"
os.mkfifo(fifo)
# "A process can open a FIFO in nonblocking mode. In this case, opening
# for read-only will succeed even if no-one has opened on the write side
# yet and opening for write-only will fail with ENXIO (no such device or
# address) unless the other end has already been opened." -- Linux fifo(7)
r = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK)
assert not os.get_blocking(r)
os.set_blocking(r, True)
assert os.get_blocking(r)
w = os.open(fifo, os.O_WRONLY)
check_reopen(r, w)
print("-- testing socketpair --")
import socket
rs, ws = socket.socketpair()
check_reopen(rs.fileno(), ws.fileno())
import time
import trio
LOOPS = 0
RUNNING = True
async def reschedule_loop(depth):
if depth == 0:
global LOOPS
while RUNNING:
LOOPS += 1
await trio.lowlevel.checkpoint()
# await trio.lowlevel.cancel_shielded_checkpoint()
else:
await reschedule_loop(depth - 1)
async def report_loop():
global RUNNING
try:
while True:
start_count = LOOPS
start_time = time.perf_counter()
await trio.sleep(1)
end_time = time.perf_counter()
end_count = LOOPS
loops = end_count - start_count
duration = end_time - start_time
print(f"{loops / duration} loops/sec")
finally:
RUNNING = False
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(reschedule_loop, 10)
nursery.start_soon(report_loop)
trio.run(main)
# Suppose:
# - we're blocked until a timeout occurs
# - our process gets put to sleep for a while (SIGSTOP or whatever)
# - then it gets woken up again
# what happens to our timeout?
#
# Here we do things that sleep for 6 seconds, and we put the process to sleep
# for 2 seconds in the middle of that.
#
# Results on Linux: everything takes 6 seconds, except for select.select(),
# and also time.sleep() (which on CPython uses the select() call internally)
#
# Results on macOS: everything takes 6 seconds.
#
# Why do we care:
# https://github.com/python-trio/trio/issues/591#issuecomment-498020805
import os
import select
import signal
import subprocess
import sys
import time
DUR = 6
# Can also try SIGTSTP
STOP_SIGNAL = signal.SIGSTOP
test_progs = [
f"import threading; ev = threading.Event(); ev.wait({DUR})",
# Python's time.sleep() calls select() internally
f"import time; time.sleep({DUR})",
# This is the real sleep() function
f"import ctypes; ctypes.CDLL(None).sleep({DUR})",
f"import select; select.select([], [], [], {DUR})",
f"import select; p = select.poll(); p.poll({DUR} * 1000)",
]
if hasattr(select, "epoll"):
test_progs += [
f"import select; ep = select.epoll(); ep.poll({DUR})",
]
if hasattr(select, "kqueue"):
test_progs += [
f"import select; kq = select.kqueue(); kq.control([], 1, {DUR})",
]
for test_prog in test_progs:
print("----------------------------------------------------------------")
start = time.monotonic()
print(f"Running: {test_prog}")
print(f"Expected duration: {DUR} seconds")
p = subprocess.Popen([sys.executable, "-c", test_prog])
time.sleep(DUR / 3)
print(f"Putting it to sleep for {DUR / 3} seconds")
os.kill(p.pid, STOP_SIGNAL)
time.sleep(DUR / 3)
print("Waking it up again")
os.kill(p.pid, signal.SIGCONT)
p.wait()
end = time.monotonic()
print(f"Actual duration: {end - start:.2f}")
import socket
# Linux:
# low values get rounded up to ~2-4 KB, so that's predictable
# with low values, can queue up 6 one-byte sends (!)
# with default values, can queue up 278 one-byte sends
#
# Windows:
# if SNDBUF = 0 freezes, so that's useless
# by default, buffers 655121
# with both set to 1, buffers 525347
# except sometimes it's less intermittently (?!?)
#
# macOS:
# if bufsize = 1, can queue up 1 one-byte send
# with default bufsize, can queue up 8192 one-byte sends
# and bufsize = 0 is invalid (setsockopt errors out)
for bufsize in [1, None, 0]:
a, b = socket.socketpair()
a.setblocking(False)
b.setblocking(False)
a.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
if bufsize is not None:
a.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
b.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
try:
for _count in range(10000000):
a.send(b"\x00")
except BlockingIOError:
break
print(f"setsockopt bufsize {bufsize}: {_count}")
a.close()
b.close()
# Little script to measure how wait_readable scales with the number of
# sockets. We look at three key measurements:
#
# - cost of issuing wait_readable
# - cost of running the scheduler, while wait_readables are blocked in the
# background
# - cost of cancelling wait_readable
#
# On Linux and macOS, these all appear to be ~O(1), as we'd expect.
#
# On Windows: with the old 'select'-based loop, the cost of scheduling grew
# with the number of outstanding sockets, which was bad.
#
# To run this on Unix systems, you'll probably first have to run:
#
# ulimit -n 31000
#
# or similar.
import socket
import time
import trio
import trio.testing
async def main():
for total in [10, 100, 500, 1_000, 10_000, 20_000, 30_000]:
def pt(desc, *, count=total, item="socket"):
nonlocal last_time
now = time.perf_counter()
total_ms = (now - last_time) * 1000
per_us = total_ms * 1000 / count
print(f"{desc}: {total_ms:.2f} ms total, {per_us:.2f} µs/{item}")
last_time = now
print(f"\n-- {total} sockets --")
last_time = time.perf_counter()
sockets = []
for _ in range(total // 2):
a, b = socket.socketpair()
sockets += [a, b]
pt("socket creation")
async with trio.open_nursery() as nursery:
for s in sockets:
nursery.start_soon(trio.lowlevel.wait_readable, s)
await trio.testing.wait_all_tasks_blocked()
pt("spawning wait tasks")
for _ in range(1000):
await trio.lowlevel.cancel_shielded_checkpoint()
pt("scheduling 1000 times", count=1000, item="schedule")
nursery.cancel_scope.cancel()
pt("cancelling wait tasks")
for sock in sockets:
sock.close()
pt("closing sockets")
trio.run(main)
# This demonstrates a PyPy bug:
# https://bitbucket.org/pypy/pypy/issues/2578/
import socket
import ssl
import threading
# client_sock, server_sock = socket.socketpair()
listen_sock = socket.socket()
listen_sock.bind(("127.0.0.1", 0))
listen_sock.listen(1)
client_sock = socket.socket()
client_sock.connect(listen_sock.getsockname())
server_sock, _ = listen_sock.accept()
server_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_ctx.load_cert_chain("trio-test-1.pem")
server = server_ctx.wrap_socket(
server_sock,
server_side=True,
suppress_ragged_eofs=False,
do_handshake_on_connect=False,
)
client_ctx = ssl.create_default_context(cafile="trio-test-CA.pem")
client = client_ctx.wrap_socket(
client_sock,
server_hostname="trio-test-1.example.org",
suppress_ragged_eofs=False,
do_handshake_on_connect=False,
)
server_handshake_thread = threading.Thread(target=server.do_handshake)
server_handshake_thread.start()
client_handshake_thread = threading.Thread(target=client.do_handshake)
client_handshake_thread.start()
server_handshake_thread.join()
client_handshake_thread.join()
# Now we have two SSLSockets that have established an encrypted connection
# with each other
assert client.getpeercert() is not None
client.sendall(b"x")
assert server.recv(10) == b"x"
# A few different ways to make attempts to read/write the socket's fd return
# weird failures at the operating system level
# Attempting to send on a socket after shutdown should raise EPIPE or similar
server.shutdown(socket.SHUT_WR)
# Attempting to read/write to the fd after it's closed should raise EBADF
# os.close(server.fileno())
# Attempting to read/write to an fd opened with O_DIRECT raises EINVAL in most
# cases (unless you're very careful with alignment etc. which openssl isn't)
# os.dup2(os.open("/tmp/blah-example-file", os.O_RDWR | os.O_CREAT | os.O_DIRECT), server.fileno())
# Sending or receiving
server.sendall(b"hello")
# server.recv(10)
# Scenario:
# - TLS connection is set up successfully
# - client sends close_notify then closes socket
# - server receives the close_notify then attempts to send close_notify back
#
# On CPython, the last step raises BrokenPipeError. On PyPy, it raises
# SSLEOFError.
#
# SSLEOFError seems a bit perverse given that it's supposed to mean "EOF
# occurred in violation of protocol", and the client's behavior here is
# explicitly allowed by the RFCs. But maybe openssl is just perverse like
# that, and it's a coincidence that CPython and PyPy act differently here? I
# don't know if this is a bug or not.
#
# (Using: debian's CPython 3.5 or 3.6, and pypy3 5.8.0-beta)
import socket
import ssl
import threading
client_sock, server_sock = socket.socketpair()
client_done = threading.Event()
def server_thread_fn():
server_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_ctx.load_cert_chain("trio-test-1.pem")
server = server_ctx.wrap_socket(
server_sock,
server_side=True,
suppress_ragged_eofs=False,
)
while True:
data = server.recv(4096)
print("server got:", data)
if not data:
print("server waiting for client to finish everything")
client_done.wait()
print("server attempting to send back close-notify")
server.unwrap()
print("server ok")
break
server.sendall(data)
server_thread = threading.Thread(target=server_thread_fn)
server_thread.start()
client_ctx = ssl.create_default_context(cafile="trio-test-CA.pem")
client = client_ctx.wrap_socket(client_sock, server_hostname="trio-test-1.example.org")
# Now we have two SSLSockets that have established an encrypted connection
# with each other
assert client.getpeercert() is not None
client.sendall(b"x")
assert client.recv(10) == b"x"
# The client sends a close-notify, and then immediately closes the connection
# (as explicitly permitted by the TLS RFCs).
# This is a slightly odd construction, but if you trace through the ssl module
# far enough you'll see that it's equivalent to calling SSL_shutdown() once,
# which generates the close_notify, and then immediately calling it again,
# which checks for the close_notify and then immediately raises
# SSLWantReadError because of course it hasn't arrived yet:
print("client sending close_notify")
client.setblocking(False)
try:
client.unwrap()
except ssl.SSLWantReadError:
print("client got SSLWantReadError as expected")
else:
raise AssertionError()
client.close()
client_done.set()
import socket
import ssl
import threading
from contextlib import contextmanager
BUFSIZE = 4096
server_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_ctx.load_cert_chain("trio-test-1.pem")
def _ssl_echo_serve_sync(sock):
try:
wrapped = server_ctx.wrap_socket(sock, server_side=True)
while True:
data = wrapped.recv(BUFSIZE)
if not data:
wrapped.unwrap()
return
wrapped.sendall(data)
except BrokenPipeError:
pass
@contextmanager
def echo_server_connection():
client_sock, server_sock = socket.socketpair()
with client_sock, server_sock:
t = threading.Thread(
target=_ssl_echo_serve_sync,
args=(server_sock,),
daemon=True,
)
t.start()
yield client_sock
class ManuallyWrappedSocket:
def __init__(self, ctx, sock, **kwargs):
self.incoming = ssl.MemoryBIO()
self.outgoing = ssl.MemoryBIO()
self.obj = ctx.wrap_bio(self.incoming, self.outgoing, **kwargs)
self.sock = sock
def _retry(self, fn, *args):
finished = False
while not finished:
want_read = False
try:
ret = fn(*args)
except ssl.SSLWantReadError:
want_read = True
except ssl.SSLWantWriteError:
# can't happen, but if it did this would be the right way to
# handle it anyway
pass
else:
finished = True
# do any sending
data = self.outgoing.read()
if data:
self.sock.sendall(data)
# do any receiving
if want_read:
data = self.sock.recv(BUFSIZE)
if not data:
self.incoming.write_eof()
else:
self.incoming.write(data)
# then retry if necessary
return ret
def do_handshake(self):
self._retry(self.obj.do_handshake)
def recv(self, bufsize):
return self._retry(self.obj.read, bufsize)
def sendall(self, data):
self._retry(self.obj.write, data)
def unwrap(self):
self._retry(self.obj.unwrap)
return self.sock
def wrap_socket_via_wrap_socket(ctx, sock, **kwargs):
return ctx.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
def wrap_socket_via_wrap_bio(ctx, sock, **kwargs):
return ManuallyWrappedSocket(ctx, sock, **kwargs)
for wrap_socket in [
wrap_socket_via_wrap_socket,
wrap_socket_via_wrap_bio,
]:
print(f"\n--- checking {wrap_socket.__name__} ---\n")
print("checking with do_handshake + correct hostname...")
with echo_server_connection() as client_sock:
client_ctx = ssl.create_default_context(cafile="trio-test-CA.pem")
wrapped = wrap_socket(
client_ctx,
client_sock,
server_hostname="trio-test-1.example.org",
)
wrapped.do_handshake()
wrapped.sendall(b"x")
assert wrapped.recv(1) == b"x"
wrapped.unwrap()
print("...success")
print("checking with do_handshake + wrong hostname...")
with echo_server_connection() as client_sock:
client_ctx = ssl.create_default_context(cafile="trio-test-CA.pem")
wrapped = wrap_socket(
client_ctx,
client_sock,
server_hostname="trio-test-2.example.org",
)
try:
wrapped.do_handshake()
except Exception:
print("...got error as expected")
else:
print("??? no error ???")
print("checking withOUT do_handshake + wrong hostname...")
with echo_server_connection() as client_sock:
client_ctx = ssl.create_default_context(cafile="trio-test-CA.pem")
wrapped = wrap_socket(
client_ctx,
client_sock,
server_hostname="trio-test-2.example.org",
)
# We forgot to call do_handshake
# But the hostname is wrong so something had better error out...
sent = b"x"
print("sending", sent)
wrapped.sendall(sent)
got = wrapped.recv(1)
print("got:", got)
assert got == sent
print("!!!! successful chat with invalid host! we have been haxored!")
import ssl
from contextlib import contextmanager
client_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
client_ctx.check_hostname = False
client_ctx.verify_mode = ssl.CERT_NONE
cinb = ssl.MemoryBIO()
coutb = ssl.MemoryBIO()
cso = client_ctx.wrap_bio(cinb, coutb)
server_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
server_ctx.load_cert_chain("server.crt", "server.key", "xxxx")
sinb = ssl.MemoryBIO()
soutb = ssl.MemoryBIO()
sso = server_ctx.wrap_bio(sinb, soutb, server_side=True)
@contextmanager
def expect(etype):
try:
yield
except etype:
pass
else:
raise AssertionError(f"expected {etype}")
with expect(ssl.SSLWantReadError):
cso.do_handshake()
assert not cinb.pending
assert coutb.pending
with expect(ssl.SSLWantReadError):
sso.do_handshake()
assert not sinb.pending
assert not soutb.pending
# A trickle is not enough
# sinb.write(coutb.read(1))
# with expect(ssl.SSLWantReadError):
# cso.do_handshake()
# with expect(ssl.SSLWantReadError):
# sso.do_handshake()
sinb.write(coutb.read())
# Now it should be able to respond
with expect(ssl.SSLWantReadError):
sso.do_handshake()
assert soutb.pending
cinb.write(soutb.read())
with expect(ssl.SSLWantReadError):
cso.do_handshake()
sinb.write(coutb.read())
# server done!
sso.do_handshake()
assert soutb.pending
# client done!
cinb.write(soutb.read())
cso.do_handshake()
cso.write(b"hello")
sinb.write(coutb.read())
assert sso.read(10) == b"hello"
with expect(ssl.SSLWantReadError):
sso.read(10)
# cso.write(b"x" * 2 ** 30)
# print(coutb.pending)
assert not coutb.pending
assert not cinb.pending
sso.do_handshake()
assert not coutb.pending
assert not cinb.pending
# This is a reproducer for:
# https://bugs.python.org/issue30744
# https://bitbucket.org/pypy/pypy/issues/2591/
import sys
import threading
import time
COUNT = 100
def slow_tracefunc(frame, event, arg):
# A no-op trace function that sleeps briefly to make us more likely to hit
# the race condition.
time.sleep(0.01)
return slow_tracefunc
def run_with_slow_tracefunc(fn):
# settrace() only takes effect when you enter a new frame, so we need this
# little dance:
sys.settrace(slow_tracefunc)
return fn()
def outer():
x = 0
# We hide the done variable inside a list, because we want to use it to
# communicate between the main thread and the looper thread, and the bug
# here is that variable assignments made in the main thread disappear
# before the child thread can see them...
done = [False]
def traced_looper():
# Force w_locals to be instantiated (only matters on PyPy; on CPython
# you can comment this line out and everything stays the same)
print(locals())
nonlocal x # Force x to be closed over
# Random nonsense whose only purpose is to trigger lots of calls to
# the trace func
count = 0
while not done[0]:
count += 1
return count
t = threading.Thread(target=run_with_slow_tracefunc, args=(traced_looper,))
t.start()
for i in range(COUNT):
print(f"after {i} increments, x is {x}")
x += 1
time.sleep(0.01)
done[0] = True
t.join()
print(f"Final discrepancy: {COUNT - x} (should be 0)")
outer()
# Estimate the cost of simply passing some data into a thread and back, in as
# minimal a fashion as possible.
#
# This is useful to get a sense of the *lower-bound* cost of
# trio.to_thread.run_sync
import threading
import time
from queue import Queue
COUNT = 10000
def worker(in_q, out_q):
while True:
job = in_q.get()
out_q.put(job())
def main():
in_q = Queue()
out_q = Queue()
t = threading.Thread(target=worker, args=(in_q, out_q))
t.start()
while True:
start = time.monotonic()
for _ in range(COUNT):
in_q.put(lambda: None)
out_q.get()
end = time.monotonic()
print(f"{(end - start) / COUNT * 1e6:.2f} µs/job")
main()
# what does SO_REUSEADDR do, exactly?
# Theory:
#
# - listen1 is bound to port P
# - listen1.accept() returns a connected socket server1, which is also bound
# to port P
# - listen1 is closed
# - we attempt to bind listen2 to port P
# - this fails because server1 is still open, or still in TIME_WAIT, and you
# can't use bind() to bind to a port that still has sockets on it, unless
# both those sockets and the socket being bound have SO_REUSEADDR
#
# The standard way to avoid this is to set SO_REUSEADDR on all listening
# sockets before binding them. And this works, but for somewhat more
# complicated reasons than are often appreciated.
#
# In our scenario above it doesn't really matter for listen1 (assuming the
# port is initially unused).
#
# What is important is that it's set on *server1*. Setting it on listen1
# before calling bind() automatically accomplishes this, because SO_REUSEADDR
# is inherited by accept()ed sockets. But it also works to set it on listen1
# any time before calling accept(), or to set it on server1 directly.
#
# Also, it must be set on listen2 before calling bind(), or it will conflict
# with the lingering server1 socket.
import errno
import socket
import attrs
@attrs.define(repr=False, slots=False)
class Options:
listen1_early = None
listen1_middle = None
listen1_late = None
server = None
listen2 = None
def set(self, which, sock):
value = getattr(self, which)
if value is not None:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, value)
def describe(self):
info = []
for f in attrs.fields(self.__class__):
value = getattr(self, f.name)
if value is not None:
info.append(f"{f.name}={value}")
return "Set/unset: {}".format(", ".join(info))
def time_wait(options):
print(options.describe())
# Find a pristine port (one we can definitely bind to without
# SO_REUSEADDR)
listen0 = socket.socket()
listen0.bind(("127.0.0.1", 0))
sockaddr = listen0.getsockname()
# print(" ", sockaddr)
listen0.close()
listen1 = socket.socket()
options.set("listen1_early", listen1)
listen1.bind(sockaddr)
listen1.listen(1)
options.set("listen1_middle", listen1)
client = socket.socket()
client.connect(sockaddr)
options.set("listen1_late", listen1)
server, _ = listen1.accept()
options.set("server", server)
# Server initiated close to trigger TIME_WAIT status
server.close()
assert client.recv(10) == b""
client.close()
listen1.close()
listen2 = socket.socket()
options.set("listen2", listen2)
try:
listen2.bind(sockaddr)
except OSError as exc:
if exc.errno == errno.EADDRINUSE:
print(" -> EADDRINUSE")
else:
raise
else:
print(" -> ok")
time_wait(Options())
time_wait(Options(listen1_early=True, server=True, listen2=True))
time_wait(Options(listen1_early=True))
time_wait(Options(server=True))
time_wait(Options(listen2=True))
time_wait(Options(listen1_early=True, listen2=True))
time_wait(Options(server=True, listen2=True))
time_wait(Options(listen1_middle=True, listen2=True))
time_wait(Options(listen1_late=True, listen2=True))
time_wait(Options(listen1_middle=True, server=False, listen2=True))
# On windows, what does SO_EXCLUSIVEADDRUSE actually do? Apparently not what
# the documentation says!
# See: https://stackoverflow.com/questions/45624916/
#
# Specifically, this script seems to demonstrate that it only creates
# conflicts between listening sockets, *not* lingering connected sockets.
import socket
from contextlib import contextmanager
@contextmanager
def report_outcome(tagline):
try:
yield
except OSError as exc:
print(f"{tagline}: failed")
print(f" details: {exc!r}")
else:
print(f"{tagline}: succeeded")
# Set up initial listening socket
lsock = socket.socket()
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
lsock.bind(("127.0.0.1", 0))
sockaddr = lsock.getsockname()
lsock.listen(10)
# Make connected client and server sockets
csock = socket.socket()
csock.connect(sockaddr)
ssock, _ = lsock.accept()
print("lsock", lsock.getsockname())
print("ssock", ssock.getsockname())
# Can't make a second listener while the first exists
probe = socket.socket()
probe.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
with report_outcome("rebind with existing listening socket"):
probe.bind(sockaddr)
# Now we close the first listen socket, while leaving the connected sockets
# open:
lsock.close()
# This time binding succeeds (contra MSDN!)
probe = socket.socket()
probe.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
with report_outcome("rebind with live connected sockets"):
probe.bind(sockaddr)
probe.listen(10)
print("probe", probe.getsockname())
print("ssock", ssock.getsockname())
probe.close()
# Server-initiated close to trigger TIME_WAIT status
ssock.send(b"x")
assert csock.recv(1) == b"x"
ssock.close()
assert csock.recv(1) == b""
# And does the TIME_WAIT sock prevent binding?
probe = socket.socket()
probe.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
with report_outcome("rebind with TIME_WAIT socket"):
probe.bind(sockaddr)
probe.listen(10)
probe.close()
import json
import os
from itertools import count
import trio
# Experiment with generating Chrome Event Trace format, which can be browsed
# through chrome://tracing or other mechanisms.
#
# Screenshot: https://files.gitter.im/python-trio/general/fp6w/image.png
#
# Trace format docs: https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/preview#
#
# Things learned so far:
# - I don't understand how the ph="s"/ph="f" flow events work – I think
# they're supposed to show up as arrows, and I'm emitting them between tasks
# that wake each other up, but they're not showing up.
# - I think writing out json synchronously from each event is creating gaps in
# the trace; maybe better to batch them up to write up all at once at the
# end
# - including tracebacks would be cool
# - there doesn't seem to be any good way to group together tasks based on
# nurseries. this really limits the value of this particular trace
# format+viewer for us. (also maybe we should have an instrumentation event
# when a nursery is opened/closed?)
# - task._counter should maybe be public
# - I don't know how to best show task lifetime, scheduling times, and what
# the task is actually doing on the same plot. if we want to show particular
# events like "called stream.send_all", then the chrome trace format won't
# let us also show "task is running", because neither kind of event is
# strictly nested inside the other
class Trace(trio.abc.Instrument):
def __init__(self, out):
self.out = out
self.out.write("[\n")
self.ids = count()
self._task_metadata(-1, "I/O manager")
def _write(self, **ev):
ev.setdefault("pid", os.getpid())
if ev["ph"] != "M":
ev.setdefault("ts", trio.current_time() * 1e6)
self.out.write(json.dumps(ev))
self.out.write(",\n")
def _task_metadata(self, tid, name):
self._write(
name="thread_name",
ph="M",
tid=tid,
args={"name": name},
)
self._write(
name="thread_sort_index",
ph="M",
tid=tid,
args={"sort_index": tid},
)
def task_spawned(self, task):
self._task_metadata(task._counter, task.name)
self._write(
name="task lifetime",
ph="B",
tid=task._counter,
)
def task_exited(self, task):
self._write(
name="task lifetime",
ph="E",
tid=task._counter,
)
def before_task_step(self, task):
self._write(
name="running",
ph="B",
tid=task._counter,
)
def after_task_step(self, task):
self._write(
name="running",
ph="E",
tid=task._counter,
)
def task_scheduled(self, task):
try:
waker = trio.lowlevel.current_task()
except RuntimeError:
pass
else:
id_ = next(self.ids)
self._write(
ph="s",
cat="wakeup",
id=id_,
tid=waker._counter,
)
self._write(
cat="wakeup",
ph="f",
id=id_,
tid=task._counter,
)
def before_io_wait(self, timeout):
self._write(
name="I/O wait",
ph="B",
tid=-1,
)
def after_io_wait(self, timeout):
self._write(
name="I/O wait",
ph="E",
tid=-1,
)
async def child1():
print(" child1: started! sleeping now...")
await trio.sleep(1)
print(" child1: exiting!")
async def child2():
print(" child2: started! sleeping now...")
await trio.sleep(1)
print(" child2: exiting!")
async def parent():
print("parent: started!")
async with trio.open_nursery() as nursery:
print("parent: spawning child1...")
nursery.start_soon(child1)
print("parent: spawning child2...")
nursery.start_soon(child2)
print("parent: waiting for children to finish...")
# -- we exit the nursery block here --
print("parent: all done!")
with open("/tmp/t.json", "w") as t_json:
t = Trace(t_json)
trio.run(parent, instruments=[t])
import sys
import trio
sys.stderr = sys.stdout
async def child1():
raise ValueError
async def child2():
async with trio.open_nursery() as nursery:
nursery.start_soon(grandchild1)
nursery.start_soon(grandchild2)
async def grandchild1():
raise KeyError
async def grandchild2():
raise NameError("Bob")
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(child1)
nursery.start_soon(child2)
# nursery.start_soon(grandchild1)
trio.run(main)
import trio
async def foo():
print("in foo!")
return 3
print("running!")
print(trio.run(foo))
import itertools
import os
import select
import signal
import socket
import threading
import time
# Equivalent to the C function raise(), which Python doesn't wrap
if os.name == "nt":
import cffi
_ffi = cffi.FFI()
_ffi.cdef("int raise(int);")
_lib = _ffi.dlopen("api-ms-win-crt-runtime-l1-1-0.dll")
signal_raise = getattr(_lib, "raise")
else:
def signal_raise(signum):
# Use pthread_kill to make sure we're actually using the wakeup fd on
# Unix
signal.pthread_kill(threading.get_ident(), signum)
def raise_SIGINT_soon():
time.sleep(1)
signal_raise(signal.SIGINT)
# Sending 2 signals becomes reliable, as we'd expect (because we need
# set-flags -> write-to-fd, and doing it twice does
# write-to-fd -> set-flags -> write-to-fd -> set-flags)
# signal_raise(signal.SIGINT)
def drain(sock):
total = 0
try:
while True:
total += len(sock.recv(1024))
except BlockingIOError:
pass
return total
def main():
writer, reader = socket.socketpair()
writer.setblocking(False)
reader.setblocking(False)
signal.set_wakeup_fd(writer.fileno())
# Keep trying until we lose the race...
for attempt in itertools.count():
print(f"Attempt {attempt}: start")
# Make sure the socket is empty
drained = drain(reader)
if drained:
print(f"Attempt {attempt}: ({drained} residual bytes discarded)")
# Arrange for SIGINT to be delivered 1 second from now
thread = threading.Thread(target=raise_SIGINT_soon)
thread.start()
# Fake an IO loop that's trying to sleep for 10 seconds (but will
# hopefully get interrupted after just 1 second)
start = time.perf_counter()
target = start + 10
try:
select_calls = 0
drained = 0
while True:
now = time.perf_counter()
if now > target:
break
select_calls += 1
r, _, _ = select.select([reader], [], [], target - now)
if r:
# In theory we should loop to fully drain the socket but
# honestly there's 1 byte in there at most and it'll be
# fine.
drained += drain(reader)
except KeyboardInterrupt:
pass
else:
print(f"Attempt {attempt}: no KeyboardInterrupt?!")
# We expect a successful run to take 1 second, and a failed run to
# take 10 seconds, so 2 seconds is a reasonable cutoff to distinguish
# them.
duration = time.perf_counter() - start
if duration < 2:
print(
f"Attempt {attempt}: OK, trying again "
f"(select_calls = {select_calls}, drained = {drained})",
)
else:
print(f"Attempt {attempt}: FAILED, took {duration} seconds")
print(f"select_calls = {select_calls}, drained = {drained}")
break
thread.join()
if __name__ == "__main__":
main()
# Sandbox for exploring the Windows "waitable timer" API.
# Cf https://github.com/python-trio/trio/issues/173
#
# Observations:
# - if you set a timer in the far future, then block in
# WaitForMultipleObjects, then set the computer's clock forward by a few
# years (past the target sleep time), then the timer immediately wakes up
# (which is good!)
# - if you set a timer in the past, then it wakes up immediately
# Random thoughts:
# - top-level API sleep_until_datetime
# - portable manages the heap of outstanding sleeps, runs a system task to
# wait for the next one, wakes up tasks when their deadline arrives, etc.
# - non-portable code: async def sleep_until_datetime_raw, which simply blocks
# until the given time using system-specific methods. Can assume that there
# is only one call to this method at a time.
# Actually, this should be a method, so it can hold persistent state (e.g.
# timerfd).
# Can assume that the datetime passed in has tzinfo=timezone.utc
# Need a way to override this object for testing.
#
# should we expose wake-system-on-alarm functionality? windows and linux both
# make this fairly straightforward, but you obviously need to use a separate
# time source
import contextlib
from datetime import datetime, timedelta, timezone
import cffi
import trio
from trio._core._windows_cffi import ffi, kernel32, raise_winerror
with contextlib.suppress(cffi.CDefError):
ffi.cdef(
"""
typedef struct _PROCESS_LEAP_SECOND_INFO {
ULONG Flags;
ULONG Reserved;
} PROCESS_LEAP_SECOND_INFO, *PPROCESS_LEAP_SECOND_INFO;
typedef struct _SYSTEMTIME {
WORD wYear;
WORD wMonth;
WORD wDayOfWeek;
WORD wDay;
WORD wHour;
WORD wMinute;
WORD wSecond;
WORD wMilliseconds;
} SYSTEMTIME, *PSYSTEMTIME, *LPSYSTEMTIME;
""",
)
ffi.cdef(
"""
typedef LARGE_INTEGER FILETIME;
typedef FILETIME* LPFILETIME;
HANDLE CreateWaitableTimerW(
LPSECURITY_ATTRIBUTES lpTimerAttributes,
BOOL bManualReset,
LPCWSTR lpTimerName
);
BOOL SetWaitableTimer(
HANDLE hTimer,
const LPFILETIME lpDueTime,
LONG lPeriod,
void* pfnCompletionRoutine,
LPVOID lpArgToCompletionRoutine,
BOOL fResume
);
BOOL SetProcessInformation(
HANDLE hProcess,
/* Really an enum, PROCESS_INFORMATION_CLASS */
int32_t ProcessInformationClass,
LPVOID ProcessInformation,
DWORD ProcessInformationSize
);
void GetSystemTimeAsFileTime(
LPFILETIME lpSystemTimeAsFileTime
);
BOOL SystemTimeToFileTime(
const SYSTEMTIME *lpSystemTime,
LPFILETIME lpFileTime
);
""",
override=True,
)
ProcessLeapSecondInfo = 8
PROCESS_LEAP_SECOND_INFO_FLAG_ENABLE_SIXTY_SECOND = 1
def set_leap_seconds_enabled(enabled):
plsi = ffi.new("PROCESS_LEAP_SECOND_INFO*")
if enabled:
plsi.Flags = PROCESS_LEAP_SECOND_INFO_FLAG_ENABLE_SIXTY_SECOND
else:
plsi.Flags = 0
plsi.Reserved = 0
if not kernel32.SetProcessInformation(
ffi.cast("HANDLE", -1), # current process
ProcessLeapSecondInfo,
plsi,
ffi.sizeof("PROCESS_LEAP_SECOND_INFO"),
):
raise_winerror()
def now_as_filetime():
ft = ffi.new("LARGE_INTEGER*")
kernel32.GetSystemTimeAsFileTime(ft)
return ft[0]
# "FILETIME" is a specific Windows time representation, that I guess was used
# for files originally but now gets used in all kinds of non-file-related
# places. Essentially: integer count of "ticks" since an epoch in 1601, where
# each tick is 100 nanoseconds, in UTC but pretending that leap seconds don't
# exist. (Fortunately, the Python datetime module also pretends that
# leapseconds don't exist, so we can use datetime arithmetic to compute
# FILETIME values.)
#
# https://docs.microsoft.com/en-us/windows/win32/sysinfo/file-times
#
# This page has FILETIME converters and can be useful for debugging:
#
# https://www.epochconverter.com/ldap
#
FILETIME_TICKS_PER_SECOND = 10**7
FILETIME_EPOCH = datetime.strptime("1601-01-01 00:00:00 Z", "%Y-%m-%d %H:%M:%S %z")
# XXX THE ABOVE IS WRONG:
#
# https://techcommunity.microsoft.com/t5/networking-blog/leap-seconds-for-the-appdev-what-you-should-know/ba-p/339813#
#
# Sometimes Windows FILETIME does include leap seconds! It depends on Windows
# version, process-global state, environment state, registry settings, and who
# knows what else!
#
# So actually the only correct way to convert a YMDhms-style representation of
# a time into a FILETIME is to use SystemTimeToFileTime
#
# ...also I can't even run this test on my VM, because it's running an ancient
# version of Win10 that doesn't have leap second support. Also also, Windows
# only tracks leap seconds since they added leap second support, and there
# haven't been any, so right now things work correctly either way.
#
# It is possible to insert some fake leap seconds for testing, if you want.
def py_datetime_to_win_filetime(dt):
# We'll want to call this on every datetime as it comes in
# dt = dt.astimezone(timezone.utc)
assert dt.tzinfo is timezone.utc
return round((dt - FILETIME_EPOCH).total_seconds() * FILETIME_TICKS_PER_SECOND)
async def main():
h = kernel32.CreateWaitableTimerW(ffi.NULL, True, ffi.NULL)
if not h:
raise_winerror()
print(h)
SECONDS = 2
wakeup = datetime.now(timezone.utc) + timedelta(seconds=SECONDS)
wakeup_filetime = py_datetime_to_win_filetime(wakeup)
wakeup_cffi = ffi.new("LARGE_INTEGER *")
wakeup_cffi[0] = wakeup_filetime
print(wakeup_filetime, wakeup_cffi)
print(f"Sleeping for {SECONDS} seconds (until {wakeup})")
if not kernel32.SetWaitableTimer(
h,
wakeup_cffi,
0,
ffi.NULL,
ffi.NULL,
False,
):
raise_winerror()
await trio.hazmat.WaitForSingleObject(h)
print(f"Current FILETIME: {now_as_filetime()}")
set_leap_seconds_enabled(False)
print(f"Current FILETIME: {now_as_filetime()}")
set_leap_seconds_enabled(True)
print(f"Current FILETIME: {now_as_filetime()}")
set_leap_seconds_enabled(False)
print(f"Current FILETIME: {now_as_filetime()}")
trio.run(main)