Skip to content

bpo-47015: Update test_os from asyncore to asyncio #31876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 79 additions & 157 deletions Lib/test/test_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# does add tests for a few functions which have been determined to be more
# portable than they had been thought to be.

import asyncio
import codecs
import contextlib
import decimal
Expand All @@ -23,7 +24,6 @@
import sys
import sysconfig
import tempfile
import threading
import time
import types
import unittest
Expand All @@ -33,15 +33,9 @@
from test.support import import_helper
from test.support import os_helper
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper
from platform import win32_is_iot

with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
import asynchat
import asyncore

try:
import resource
except ImportError:
Expand Down Expand Up @@ -101,6 +95,10 @@ def create_file(filename, content=b'content'):
'on AIX, splice() only accepts sockets')


def tearDownModule():
asyncio.set_event_loop_policy(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line that set a loop policy is not present any more, so this cleanup step doesn’t seem to apply anymore.

Copy link
Member Author

@arhadthedev arhadthedev Mar 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand a failed pre-policy run correctly, the policy must either be manually set to something before any case is runned or be returned back to None before the suit exits. I've chosen the first variant, Andrew proposed to switch to the second.

0:04:29 load avg: 3.56 [300/433/1] test_os failed (env changed) -- running: test_asyncio (1 min 10 sec), test_concurrent_futures (1 min 57 sec)

[a bunch of oks and skips follow]

[...]

423 tests OK.

10 slowest tests:
- test_tools: 4 min 44 sec
- test_concurrent_futures: 3 min 30 sec
- test_peg_generator: 2 min 57 sec
- test_multiprocessing_spawn: 2 min 27 sec
- test_gdb: 1 min 51 sec
- test_asyncio: 1 min 50 sec
- test_multiprocessing_forkserver: 1 min 38 sec
- test_multiprocessing_fork: 1 min 22 sec
- test_regrtest: 1 min 12 sec
- test_statistics: 49.1 sec

1 test altered the execution environment:
    test_os

9 tests skipped:
    test_devpoll test_ioctl test_kqueue test_msilib test_startfile
    test_winconsoleio test_winreg test_winsound test_zipfile64

Total duration: 11 min 50 sec
Tests result: ENV CHANGED
make: *** [Makefile:1795: buildbottest] Error 3
Error: Process completed with exit code 2.



class MiscTests(unittest.TestCase):
def test_getcwd(self):
cwd = os.getcwd()
Expand Down Expand Up @@ -3228,94 +3226,8 @@ def test_set_get_priority(self):
raise


class SendfileTestServer(asyncore.dispatcher, threading.Thread):

class Handler(asynchat.async_chat):

def __init__(self, conn):
asynchat.async_chat.__init__(self, conn)
self.in_buffer = []
self.accumulate = True
self.closed = False
self.push(b"220 ready\r\n")

def handle_read(self):
data = self.recv(4096)
if self.accumulate:
self.in_buffer.append(data)

def get_data(self):
return b''.join(self.in_buffer)

def handle_close(self):
self.close()
self.closed = True

def handle_error(self):
raise

def __init__(self, address):
threading.Thread.__init__(self)
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(address)
self.listen(5)
self.host, self.port = self.socket.getsockname()[:2]
self.handler_instance = None
self._active = False
self._active_lock = threading.Lock()

# --- public API

@property
def running(self):
return self._active

def start(self):
assert not self.running
self.__flag = threading.Event()
threading.Thread.start(self)
self.__flag.wait()

def stop(self):
assert self.running
self._active = False
self.join()

def wait(self):
# wait for handler connection to be closed, then stop the server
while not getattr(self.handler_instance, "closed", False):
time.sleep(0.001)
self.stop()

# --- internals

def run(self):
self._active = True
self.__flag.set()
while self._active and asyncore.socket_map:
self._active_lock.acquire()
asyncore.loop(timeout=0.001, count=1)
self._active_lock.release()
asyncore.close_all()

def handle_accept(self):
conn, addr = self.accept()
self.handler_instance = self.Handler(conn)

def handle_connect(self):
self.close()
handle_read = handle_connect

def writable(self):
return 0

def handle_error(self):
raise


@unittest.skipUnless(hasattr(os, 'sendfile'), "test needs os.sendfile()")
class TestSendfile(unittest.TestCase):
class TestSendfile(unittest.IsolatedAsyncioTestCase):

DATA = b"12345abcde" * 16 * 1024 # 160 KiB
SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \
Expand All @@ -3328,40 +3240,52 @@ class TestSendfile(unittest.TestCase):

@classmethod
def setUpClass(cls):
cls.key = threading_helper.threading_setup()
create_file(os_helper.TESTFN, cls.DATA)

@classmethod
def tearDownClass(cls):
threading_helper.threading_cleanup(*cls.key)
os_helper.unlink(os_helper.TESTFN)

def setUp(self):
self.server = SendfileTestServer((socket_helper.HOST, 0))
self.server.start()
@staticmethod
async def chunks(reader):
while not reader.at_eof():
yield await reader.read()

async def handle_new_client(self, reader, writer):
self.server_buffer = b''.join([x async for x in self.chunks(reader)])
writer.close()
self.server.close() # The test server processes a single client only

async def asyncSetUp(self):
self.server_buffer = b''
self.server = await asyncio.start_server(self.handle_new_client,
socket_helper.HOSTv4)
server_name = self.server.sockets[0].getsockname()
self.client = socket.socket()
self.client.connect((self.server.host, self.server.port))
self.client.settimeout(1)
# synchronize by waiting for "220 ready" response
self.client.recv(1024)
self.client.setblocking(False)
await asyncio.get_running_loop().sock_connect(self.client, server_name)
self.sockno = self.client.fileno()
self.file = open(os_helper.TESTFN, 'rb')
self.fileno = self.file.fileno()

def tearDown(self):
async def asyncTearDown(self):
self.file.close()
self.client.close()
if self.server.running:
self.server.stop()
self.server = None
await self.server.wait_closed()

# Use the test subject instead of asyncio.loop.sendfile
@staticmethod
async def async_sendfile(*args, **kwargs):
return await asyncio.to_thread(os.sendfile, *args, **kwargs)

def sendfile_wrapper(self, *args, **kwargs):
@staticmethod
async def sendfile_wrapper(*args, **kwargs):
"""A higher level wrapper representing how an application is
supposed to use sendfile().
"""
while True:
try:
return os.sendfile(*args, **kwargs)
return await TestSendfile.async_sendfile(*args, **kwargs)
except OSError as err:
if err.errno == errno.ECONNRESET:
# disconnected
Expand All @@ -3372,13 +3296,14 @@ def sendfile_wrapper(self, *args, **kwargs):
else:
raise

def test_send_whole_file(self):
async def test_send_whole_file(self):
# normal send
total_sent = 0
offset = 0
nbytes = 4096
while total_sent < len(self.DATA):
sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
sent = await self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
if sent == 0:
break
offset += sent
Expand All @@ -3389,19 +3314,19 @@ def test_send_whole_file(self):
self.assertEqual(total_sent, len(self.DATA))
self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(len(data), len(self.DATA))
self.assertEqual(data, self.DATA)
await self.server.wait_closed()
self.assertEqual(len(self.server_buffer), len(self.DATA))
self.assertEqual(self.server_buffer, self.DATA)

def test_send_at_certain_offset(self):
async def test_send_at_certain_offset(self):
# start sending a file at a certain offset
total_sent = 0
offset = len(self.DATA) // 2
must_send = len(self.DATA) - offset
nbytes = 4096
while total_sent < must_send:
sent = self.sendfile_wrapper(self.sockno, self.fileno, offset, nbytes)
sent = await self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
if sent == 0:
break
offset += sent
Expand All @@ -3410,18 +3335,18 @@ def test_send_at_certain_offset(self):

self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
await self.server.wait_closed()
expected = self.DATA[len(self.DATA) // 2:]
self.assertEqual(total_sent, len(expected))
self.assertEqual(len(data), len(expected))
self.assertEqual(data, expected)
self.assertEqual(len(self.server_buffer), len(expected))
self.assertEqual(self.server_buffer, expected)

def test_offset_overflow(self):
async def test_offset_overflow(self):
# specify an offset > file size
offset = len(self.DATA) + 4096
try:
sent = os.sendfile(self.sockno, self.fileno, offset, 4096)
sent = await self.async_sendfile(self.sockno, self.fileno,
offset, 4096)
except OSError as e:
# Solaris can raise EINVAL if offset >= file length, ignore.
if e.errno != errno.EINVAL:
Expand All @@ -3430,39 +3355,38 @@ def test_offset_overflow(self):
self.assertEqual(sent, 0)
self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(data, b'')
await self.server.wait_closed()
self.assertEqual(self.server_buffer, b'')

def test_invalid_offset(self):
async def test_invalid_offset(self):
with self.assertRaises(OSError) as cm:
os.sendfile(self.sockno, self.fileno, -1, 4096)
await self.async_sendfile(self.sockno, self.fileno, -1, 4096)
self.assertEqual(cm.exception.errno, errno.EINVAL)

def test_keywords(self):
async def test_keywords(self):
# Keyword arguments should be supported
os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
offset=0, count=4096)
await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno,
offset=0, count=4096)
if self.SUPPORT_HEADERS_TRAILERS:
os.sendfile(out_fd=self.sockno, in_fd=self.fileno,
offset=0, count=4096,
headers=(), trailers=(), flags=0)
await self.async_sendfile(out_fd=self.sockno, in_fd=self.fileno,
offset=0, count=4096,
headers=(), trailers=(), flags=0)

# --- headers / trailers tests

@requires_headers_trailers
def test_headers(self):
async def test_headers(self):
total_sent = 0
expected_data = b"x" * 512 + b"y" * 256 + self.DATA[:-1]
sent = os.sendfile(self.sockno, self.fileno, 0, 4096,
headers=[b"x" * 512, b"y" * 256])
sent = await self.async_sendfile(self.sockno, self.fileno, 0, 4096,
headers=[b"x" * 512, b"y" * 256])
self.assertLessEqual(sent, 512 + 256 + 4096)
total_sent += sent
offset = 4096
while total_sent < len(expected_data):
nbytes = min(len(expected_data) - total_sent, 4096)
sent = self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
sent = await self.sendfile_wrapper(self.sockno, self.fileno,
offset, nbytes)
if sent == 0:
break
self.assertLessEqual(sent, nbytes)
Expand All @@ -3471,51 +3395,49 @@ def test_headers(self):

self.assertEqual(total_sent, len(expected_data))
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(hash(data), hash(expected_data))
await self.server.wait_closed()
self.assertEqual(hash(self.server_buffer), hash(expected_data))

@requires_headers_trailers
def test_trailers(self):
async def test_trailers(self):
TESTFN2 = os_helper.TESTFN + "2"
file_data = b"abcdef"

self.addCleanup(os_helper.unlink, TESTFN2)
create_file(TESTFN2, file_data)

with open(TESTFN2, 'rb') as f:
os.sendfile(self.sockno, f.fileno(), 0, 5,
trailers=[b"123456", b"789"])
await self.async_sendfile(self.sockno, f.fileno(), 0, 5,
trailers=[b"123456", b"789"])
self.client.close()
self.server.wait()
data = self.server.handler_instance.get_data()
self.assertEqual(data, b"abcde123456789")
await self.server.wait_closed()
self.assertEqual(self.server_buffer, b"abcde123456789")

@requires_headers_trailers
@requires_32b
def test_headers_overflow_32bits(self):
async def test_headers_overflow_32bits(self):
self.server.handler_instance.accumulate = False
with self.assertRaises(OSError) as cm:
os.sendfile(self.sockno, self.fileno, 0, 0,
headers=[b"x" * 2**16] * 2**15)
await self.async_sendfile(self.sockno, self.fileno, 0, 0,
headers=[b"x" * 2**16] * 2**15)
self.assertEqual(cm.exception.errno, errno.EINVAL)

@requires_headers_trailers
@requires_32b
def test_trailers_overflow_32bits(self):
async def test_trailers_overflow_32bits(self):
self.server.handler_instance.accumulate = False
with self.assertRaises(OSError) as cm:
os.sendfile(self.sockno, self.fileno, 0, 0,
trailers=[b"x" * 2**16] * 2**15)
await self.async_sendfile(self.sockno, self.fileno, 0, 0,
trailers=[b"x" * 2**16] * 2**15)
self.assertEqual(cm.exception.errno, errno.EINVAL)

@requires_headers_trailers
@unittest.skipUnless(hasattr(os, 'SF_NODISKIO'),
'test needs os.SF_NODISKIO')
def test_flags(self):
async def test_flags(self):
try:
os.sendfile(self.sockno, self.fileno, 0, 4096,
flags=os.SF_NODISKIO)
await self.async_sendfile(self.sockno, self.fileno, 0, 4096,
flags=os.SF_NODISKIO)
except OSError as err:
if err.errno not in (errno.EBUSY, errno.EAGAIN):
raise
Expand Down
Loading