Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-93357: Lay the foundation for further work in asyncio.test_streams: port server cases to IsolatedAsyncioTestCase #93369

Merged
merged 11 commits into from
Oct 4, 2022
292 changes: 119 additions & 173 deletions Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,46 +566,10 @@ def test_exception_cancel(self):
test_utils.run_briefly(self.loop)
self.assertIs(stream._waiter, None)

def test_start_server(self):

class MyServer:

def __init__(self, loop):
self.server = None
self.loop = loop

async def handle_client(self, client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

def start(self):
sock = socket.create_server(('127.0.0.1', 0))
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client,
sock=sock))
return sock.getsockname()

def handle_client_callback(self, client_reader, client_writer):
self.loop.create_task(self.handle_client(client_reader,
client_writer))

def start_callback(self):
sock = socket.create_server(('127.0.0.1', 0))
addr = sock.getsockname()
sock.close()
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client_callback,
host=addr[0], port=addr[1]))
return addr

def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None

class NewStreamTests(unittest.IsolatedAsyncioTestCase):

async def test_start_server(self):

async def client(addr):
reader, writer = await asyncio.open_connection(*addr)
Expand All @@ -617,61 +581,43 @@ async def client(addr):
await writer.wait_closed()
return msgback

messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

# test the server variant with a coroutine as client handler
server = MyServer(self.loop)
addr = server.start()
msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
server.stop()
self.assertEqual(msg, b"hello world!\n")
async def handle_client(client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

with self.subTest(msg="coroutine"):
server = await asyncio.start_server(
handle_client,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()
msg = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")

# test the server variant with a callback as client handler
server = MyServer(self.loop)
addr = server.start_callback()
msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
server.stop()
self.assertEqual(msg, b"hello world!\n")
with self.subTest(msg="callback"):
async def handle_client_callback(client_reader, client_writer):
asyncio.get_running_loop().create_task(
handle_client(client_reader, client_writer)
)

self.assertEqual(messages, [])
server = await asyncio.start_server(
handle_client_callback,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()
reader, writer = await asyncio.open_connection(*addr)
msg = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")

@socket_helper.skip_unless_bind_unix_socket
def test_start_unix_server(self):

class MyServer:

def __init__(self, loop, path):
self.server = None
self.loop = loop
self.path = path

async def handle_client(self, client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

def start(self):
self.server = self.loop.run_until_complete(
asyncio.start_unix_server(self.handle_client,
path=self.path))

def handle_client_callback(self, client_reader, client_writer):
self.loop.create_task(self.handle_client(client_reader,
client_writer))

def start_callback(self):
start = asyncio.start_unix_server(self.handle_client_callback,
path=self.path)
self.server = self.loop.run_until_complete(start)

def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None
async def test_start_unix_server(self):

async def client(path):
reader, writer = await asyncio.open_unix_connection(path)
Expand All @@ -683,64 +629,42 @@ async def client(path):
await writer.wait_closed()
return msgback

messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

# test the server variant with a coroutine as client handler
with test_utils.unix_socket_path() as path:
server = MyServer(self.loop, path)
server.start()
msg = self.loop.run_until_complete(
self.loop.create_task(client(path)))
server.stop()
self.assertEqual(msg, b"hello world!\n")

# test the server variant with a callback as client handler
with test_utils.unix_socket_path() as path:
server = MyServer(self.loop, path)
server.start_callback()
msg = self.loop.run_until_complete(
self.loop.create_task(client(path)))
server.stop()
self.assertEqual(msg, b"hello world!\n")

self.assertEqual(messages, [])
async def handle_client(client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

with self.subTest(msg="coroutine"):
with test_utils.unix_socket_path() as path:
server = await asyncio.start_unix_server(
handle_client,
path=path
)
msg = await client(path)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")

with self.subTest(msg="callback"):
async def handle_client_callback(client_reader, client_writer):
asyncio.get_running_loop().create_task(
handle_client(client_reader, client_writer)
)

with test_utils.unix_socket_path() as path:
server = await asyncio.start_unix_server(
handle_client_callback,
path=path
)
msg = await client(path)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")

@unittest.skipIf(ssl is None, 'No ssl module')
def test_start_tls(self):

class MyServer:

def __init__(self, loop):
self.server = None
self.loop = loop

async def handle_client(self, client_reader, client_writer):
data1 = await client_reader.readline()
client_writer.write(data1)
await client_writer.drain()
assert client_writer.get_extra_info('sslcontext') is None
await client_writer.start_tls(
test_utils.simple_server_sslcontext())
assert client_writer.get_extra_info('sslcontext') is not None
data2 = await client_reader.readline()
client_writer.write(data2)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

def start(self):
sock = socket.create_server(('127.0.0.1', 0))
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client,
sock=sock))
return sock.getsockname()

def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None
async def test_start_tls(self):

async def client(addr):
reader, writer = await asyncio.open_connection(*addr)
Expand All @@ -757,18 +681,49 @@ async def client(addr):
await writer.wait_closed()
return msgback1, msgback2

messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

server = MyServer(self.loop)
addr = server.start()
msg1, msg2 = self.loop.run_until_complete(client(addr))
server.stop()

self.assertEqual(messages, [])
async def handle_client(client_reader, client_writer):
data1 = await client_reader.readline()
client_writer.write(data1)
await client_writer.drain()
assert client_writer.get_extra_info('sslcontext') is None
await client_writer.start_tls(
test_utils.simple_server_sslcontext())
assert client_writer.get_extra_info('sslcontext') is not None

data2 = await client_reader.readline()
client_writer.write(data2)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

server = await asyncio.start_server(
handle_client,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()

msg1, msg2 = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")


class StreamTests2(test_utils.TestCase):

def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)

def tearDown(self):
# just in case if we have transport close callbacks
test_utils.run_briefly(self.loop)

self.loop.close()
gc.collect()
super().tearDown()

@unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self):
# See asyncio issue 168. This test is derived from the example
Expand Down Expand Up @@ -967,22 +922,20 @@ def test_LimitOverrunError_pickleable(self):
self.assertEqual(str(e), str(e2))
self.assertEqual(e.consumed, e2.consumed)

def test_wait_closed_on_close(self):
with test_utils.run_test_server() as httpd:
async def test_wait_closed_on_close(self):
async with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))

wr.write(b'GET / HTTP/1.0\r\n\r\n')
f = rd.readline()
data = self.loop.run_until_complete(f)
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
arhadthedev marked this conversation as resolved.
Show resolved Hide resolved
f = rd.read()
data = self.loop.run_until_complete(f)
await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
self.assertFalse(wr.is_closing())
wr.close()
self.assertTrue(wr.is_closing())
self.loop.run_until_complete(wr.wait_closed())
await wr.wait_closed()

def test_wait_closed_on_close_with_unread_data(self):
with test_utils.run_test_server() as httpd:
Expand Down Expand Up @@ -1038,15 +991,10 @@ async def inner(httpd):

self.assertEqual(messages, [])

def test_eof_feed_when_closing_writer(self):
async def test_eof_feed_when_closing_writer(self):
# See http://bugs.python.org/issue35065
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like .set_exception_handler is now missing from this test. Is it fine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've removed all usages of messages inter-thread communication list:

  1. -        messages = []
    -        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
  2. -        self.assertEqual(messages, [])

because the server is now async, lives in a main thread, and needs no intermediary to pass exceptions into a test harness.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, thank you for noticing, I've added the clarification as the second paragraph of the first PR comment.


with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))

async with test_utils.run_test_server() as httpd:
rd, wr = await asyncio.open_connection(*httpd.address)
wr.close()
f = wr.wait_closed()
self.loop.run_until_complete(f)
Expand All @@ -1055,8 +1003,6 @@ def test_eof_feed_when_closing_writer(self):
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'')

self.assertEqual(messages, [])


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
:mod:`test.test_asyncio.test_streams` temporarily introduced two new tests,
arhadthedev marked this conversation as resolved.
Show resolved Hide resolved
``NewStreamTests`` and ``StreamTests2``.