From f8ff4421e4a267aa87a7103f8740a84c66a0e3c6 Mon Sep 17 00:00:00 2001 From: Malcolm Smith Date: Fri, 2 Aug 2024 00:03:42 +0100 Subject: [PATCH 1/6] Make TextLogStream handle its own buffering rather than relying on TextIOWrapper --- Lib/_android_support.py | 51 ++++++++++++++++++++++++++++++++-------- Lib/test/test_android.py | 27 +++++++++++++-------- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/Lib/_android_support.py b/Lib/_android_support.py index 590e85ea8c2db1..a04c2624c9816a 100644 --- a/Lib/_android_support.py +++ b/Lib/_android_support.py @@ -1,5 +1,6 @@ import io import sys +from threading import RLock # The maximum length of a log message in bytes, including the level marker and @@ -11,9 +12,9 @@ MAX_BYTES_PER_WRITE = 4000 # UTF-8 uses a maximum of 4 bytes per character, so limiting text writes to this -# size ensures that TextIOWrapper can always avoid exceeding MAX_BYTES_PER_WRITE. +# size ensures that we can always avoid exceeding MAX_BYTES_PER_WRITE. # However, if the actual number of bytes per character is smaller than that, -# then TextIOWrapper may still join multiple consecutive text writes into binary +# then we may still join multiple consecutive text writes into binary # writes containing a larger number of characters. MAX_CHARS_PER_WRITE = MAX_BYTES_PER_WRITE // 4 @@ -35,9 +36,10 @@ def init_streams(android_log_write, stdout_prio, stderr_prio): class TextLogStream(io.TextIOWrapper): def __init__(self, android_log_write, prio, tag, **kwargs): kwargs.setdefault("encoding", "UTF-8") - kwargs.setdefault("line_buffering", True) super().__init__(BinaryLogStream(android_log_write, prio, tag), **kwargs) - self._CHUNK_SIZE = MAX_BYTES_PER_WRITE + self._lock = RLock() + self._pending_bytes = [] + self._pending_bytes_count = 0 def __repr__(self): return f"" @@ -52,15 +54,44 @@ def write(self, s): s = str.__str__(s) # We want to emit one log message per line wherever possible, so split - # the string before sending it to the superclass. Note that - # "".splitlines() == [], so nothing will be logged for an empty string. - for line in s.splitlines(keepends=True): - while line: - super().write(line[:MAX_CHARS_PER_WRITE]) - line = line[MAX_CHARS_PER_WRITE:] + # the string into lines first. Note that "".splitlines() == [], so + # nothing will be logged for an empty string. + with self._lock: + for line in s.splitlines(keepends=True): + while line: + chunk = line[:MAX_CHARS_PER_WRITE] + line = line[MAX_CHARS_PER_WRITE:] + self._write_chunk(chunk) return len(s) + # The size and behavior of TextIOWrapper's buffer is not part of its public + # API, so we handle buffering ourselves to avoid truncation. + def _write_chunk(self, s): + b = s.encode(self.encoding, self.errors) + if self._pending_bytes_count + len(b) > MAX_BYTES_PER_WRITE: + self.flush() + + self._pending_bytes.append(b) + self._pending_bytes_count += len(b) + if ( + self.write_through + or b.endswith(b"\n") + or self._pending_bytes_count > MAX_BYTES_PER_WRITE + ): + self.flush() + + def flush(self): + with self._lock: + self.buffer.write(b"".join(self._pending_bytes)) + self._pending_bytes.clear() + self._pending_bytes_count = 0 + + # Since this is a line-based logging system, line buffering cannot be turned + # off, i.e. a newline always causes a flush. + def line_buffering(self): + return True + class BinaryLogStream(io.RawIOBase): def __init__(self, android_log_write, prio, tag): diff --git a/Lib/test/test_android.py b/Lib/test/test_android.py index 115882a4c281f6..3c825140b8addd 100644 --- a/Lib/test/test_android.py +++ b/Lib/test/test_android.py @@ -147,6 +147,13 @@ def write(s, lines=None, *, write_len=None): write("f\n\ng", ["exxf", ""]) write("\n", ["g"]) + # Since this is a line-based logging system, line buffering + # cannot be turned off, i.e. a newline always causes a flush. + stream.reconfigure(line_buffering=False) + self.assertTrue(stream.line_buffering) + + # However, buffering can be turned off completely if you want a + # flush after every write. with self.unbuffered(stream): write("\nx", ["", "x"]) write("\na\n", ["", "a"]) @@ -209,21 +216,21 @@ def __str__(self): # (MAX_BYTES_PER_WRITE). # # ASCII (1 byte per character) - write(("foobar" * 700) + "\n", - [("foobar" * 666) + "foob", # 4000 bytes - "ar" + ("foobar" * 33)]) # 200 bytes + write(("foobar" * 700) + "\n", # 4200 bytes in + [("foobar" * 666) + "foob", # 4000 bytes out + "ar" + ("foobar" * 33)]) # 200 bytes out # "Full-width" digits 0-9 (3 bytes per character) s = "\uff10\uff11\uff12\uff13\uff14\uff15\uff16\uff17\uff18\uff19" - write((s * 150) + "\n", - [s * 100, # 3000 bytes - s * 50]) # 1500 bytes + write((s * 150) + "\n", # 4500 bytes in + [s * 100, # 3000 bytes out + s * 50]) # 1500 bytes out s = "0123456789" - write(s * 200, []) - write(s * 150, []) - write(s * 51, [s * 350]) # 3500 bytes - write("\n", [s * 51]) # 510 bytes + write(s * 200, []) # 2000 bytes in + write(s * 150, []) # 1500 bytes in + write(s * 51, [s * 350]) # 510 bytes in, 3500 bytes out + write("\n", [s * 51]) # 0 bytes in, 510 bytes out def test_bytes(self): for stream_name, level in [("stdout", "I"), ("stderr", "W")]: From 801658ab00b9ef44fc95839bb74015c516b53765 Mon Sep 17 00:00:00 2001 From: Malcolm Smith Date: Fri, 2 Aug 2024 01:52:22 +0100 Subject: [PATCH 2/6] Assert that boolean values are the correct type, not just the correct truthiness --- Lib/_android_support.py | 1 + Lib/test/test_android.py | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/Lib/_android_support.py b/Lib/_android_support.py index a04c2624c9816a..0d55bff5ea9e88 100644 --- a/Lib/_android_support.py +++ b/Lib/_android_support.py @@ -89,6 +89,7 @@ def flush(self): # Since this is a line-based logging system, line buffering cannot be turned # off, i.e. a newline always causes a flush. + @property def line_buffering(self): return True diff --git a/Lib/test/test_android.py b/Lib/test/test_android.py index 3c825140b8addd..85cf2a9c670b9c 100644 --- a/Lib/test/test_android.py +++ b/Lib/test/test_android.py @@ -88,11 +88,11 @@ def test_str(self): tag = f"python.{stream_name}" self.assertEqual(f"", repr(stream)) - self.assertTrue(stream.writable()) - self.assertFalse(stream.readable()) + self.assertIs(stream.writable(), True) + self.assertIs(stream.readable(), False) self.assertEqual("UTF-8", stream.encoding) - self.assertTrue(stream.line_buffering) - self.assertFalse(stream.write_through) + self.assertIs(stream.line_buffering, True) + self.assertIs(stream.write_through, False) # stderr is backslashreplace by default; stdout is configured # that way by libregrtest.main. @@ -150,7 +150,7 @@ def write(s, lines=None, *, write_len=None): # Since this is a line-based logging system, line buffering # cannot be turned off, i.e. a newline always causes a flush. stream.reconfigure(line_buffering=False) - self.assertTrue(stream.line_buffering) + self.assertIs(stream.line_buffering, True) # However, buffering can be turned off completely if you want a # flush after every write. @@ -238,8 +238,8 @@ def test_bytes(self): stream = getattr(sys, stream_name).buffer tag = f"python.{stream_name}" self.assertEqual(f"", repr(stream)) - self.assertTrue(stream.writable()) - self.assertFalse(stream.readable()) + self.assertIs(stream.writable(), True) + self.assertIs(stream.readable(), False) def write(b, lines=None, *, write_len=None): if write_len is None: From 3da2d05c04cd6d51d69b782d91322e2089c28fde Mon Sep 17 00:00:00 2001 From: Malcolm Smith Date: Mon, 5 Aug 2024 18:17:08 +0100 Subject: [PATCH 3/6] Add logcat rate limit of 1 MB per second --- Lib/_android_support.py | 76 ++++++++++++++++++++++++++++++---------- Lib/test/test_android.py | 49 +++++++++++++++++++++++++- 2 files changed, 106 insertions(+), 19 deletions(-) diff --git a/Lib/_android_support.py b/Lib/_android_support.py index 0d55bff5ea9e88..d5d13ec6a48e14 100644 --- a/Lib/_android_support.py +++ b/Lib/_android_support.py @@ -1,14 +1,14 @@ import io import sys from threading import RLock - +from time import sleep, time # The maximum length of a log message in bytes, including the level marker and -# tag, is defined as LOGGER_ENTRY_MAX_PAYLOAD in -# platform/system/logging/liblog/include/log/log.h. As of API level 30, messages -# longer than this will be be truncated by logcat. This limit has already been -# reduced at least once in the history of Android (from 4076 to 4068 between API -# level 23 and 26), so leave some headroom. +# tag, is defined as LOGGER_ENTRY_MAX_PAYLOAD at +# https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log.h;l=71. +# Messages longer than this will be be truncated by logcat. This limit has already +# been reduced at least once in the history of Android (from 4076 to 4068 between +# API level 23 and 26), so leave some headroom. MAX_BYTES_PER_WRITE = 4000 # UTF-8 uses a maximum of 4 bytes per character, so limiting text writes to this @@ -27,16 +27,19 @@ def init_streams(android_log_write, stdout_prio, stderr_prio): if sys.executable: return # Not embedded in an app. + global logcat + logcat = Logcat(android_log_write) + sys.stdout = TextLogStream( - android_log_write, stdout_prio, "python.stdout", errors=sys.stdout.errors) + stdout_prio, "python.stdout", errors=sys.stdout.errors) sys.stderr = TextLogStream( - android_log_write, stderr_prio, "python.stderr", errors=sys.stderr.errors) + stderr_prio, "python.stderr", errors=sys.stderr.errors) class TextLogStream(io.TextIOWrapper): - def __init__(self, android_log_write, prio, tag, **kwargs): + def __init__(self, prio, tag, **kwargs): kwargs.setdefault("encoding", "UTF-8") - super().__init__(BinaryLogStream(android_log_write, prio, tag), **kwargs) + super().__init__(BinaryLogStream(prio, tag), **kwargs) self._lock = RLock() self._pending_bytes = [] self._pending_bytes_count = 0 @@ -95,8 +98,7 @@ def line_buffering(self): class BinaryLogStream(io.RawIOBase): - def __init__(self, android_log_write, prio, tag): - self.android_log_write = android_log_write + def __init__(self, prio, tag): self.prio = prio self.tag = tag @@ -117,10 +119,48 @@ def write(self, b): # Writing an empty string to the stream should have no effect. if b: - # Encode null bytes using "modified UTF-8" to avoid truncating the - # message. This should not affect the return value, as the caller - # may be expecting it to match the length of the input. - self.android_log_write(self.prio, self.tag, - b.replace(b"\x00", b"\xc0\x80")) - + logcat.write(self.prio, self.tag, b) return len(b) + + +# When a large volume of data is written to logcat at once, e.g. when a test +# module fails in --verbose3 mode, there's a risk of overflowing logcat's own +# buffer and losing messages. We avoid this by imposing a rate limit using the +# token bucket algorithm, based on a conservative estimate of how fast `adb +# logcat` can consume data. +MAX_BYTES_PER_SECOND = 1024 * 1024 + +# The logcat buffer size of a device can be determined by running `logcat -g`. +# We set the token bucket size to half of the buffer size of our current minimum +# API level, because other things on the system will be producing messages as +# well. +BUCKET_SIZE = 128 * 1024 + +# https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39 +PER_MESSAGE_OVERHEAD = 28 + + +class Logcat: + def __init__(self, android_log_write): + self.android_log_write = android_log_write + self._lock = RLock() + self._bucket_level = 0 + self._prev_write_time = time() + + def write(self, prio, tag, message): + # Encode null bytes using "modified UTF-8" to avoid them truncating the + # message. + message = message.replace(b"\x00", b"\xc0\x80") + + with self._lock: + now = time() + self._bucket_level += ( + (now - self._prev_write_time) * MAX_BYTES_PER_SECOND) + self._bucket_level = min(self._bucket_level, BUCKET_SIZE) + self._prev_write_time = now + + self._bucket_level -= PER_MESSAGE_OVERHEAD + len(tag) + len(message) + if self._bucket_level < 0: + sleep(-self._bucket_level / MAX_BYTES_PER_SECOND) + + self.android_log_write(prio, tag, message) diff --git a/Lib/test/test_android.py b/Lib/test/test_android.py index 85cf2a9c670b9c..46e451395a9283 100644 --- a/Lib/test/test_android.py +++ b/Lib/test/test_android.py @@ -4,11 +4,12 @@ import subprocess import sys import unittest +from _android_support import TextLogStream from array import array from contextlib import contextmanager from threading import Thread from test.support import LOOPBACK_TIMEOUT -from time import time +from time import sleep, time if sys.platform != "android": @@ -337,3 +338,49 @@ def write(b, lines=None, *, write_len=None): fr"{type(obj).__name__}" ): stream.write(obj) + + def test_rate_limit(self): + # See _android_support.py. + MAX_KB_PER_SECOND = 1024 + BUCKET_KB = 128 + PER_MESSAGE_OVERHEAD = 28 + + # https://developer.android.com/ndk/reference/group/logging + ANDROID_LOG_DEBUG = 3 + + # To avoid flooding the test script output, use a different tag rather + # than stdout or stderr. + tag = "python.rate_limit" + stream = TextLogStream(ANDROID_LOG_DEBUG, tag) + + # Make a test message which consumes 1 KB of the logcat buffer. + message = "Line {:03d} " + message += "." * ( + 1024 - PER_MESSAGE_OVERHEAD - len(tag) - len(message.format(0)) + ) + "\n" + + # Make sure the token bucket is full. + sleep(BUCKET_KB / MAX_KB_PER_SECOND) + line_num = 0 + + # Send BUCKET_KB messages and return the rate at which they were consumed. + def fill_bucket(): + nonlocal line_num + start = time() + max_line_num = line_num + BUCKET_KB + while line_num < max_line_num: + stream.write(message.format(line_num)) + line_num += 1 + return BUCKET_KB / (time() - start) + + # The first BUCKET_KB should be written with minimal delay. + self.assertGreater(fill_bucket(), MAX_KB_PER_SECOND * 2) + + # The next BUCKET_KB should be written at the rate limit. + rate = fill_bucket() + self.assertGreater(rate, MAX_KB_PER_SECOND * 0.75) + self.assertLess(rate, MAX_KB_PER_SECOND * 1.25) + + # Once the token bucket is full again, we should go back to full speed. + sleep(BUCKET_KB / MAX_KB_PER_SECOND) + self.assertGreater(fill_bucket(), MAX_KB_PER_SECOND * 2) From 4872a0284e62bb3fc14040008cdd12be4b3ae0fd Mon Sep 17 00:00:00 2001 From: Malcolm Smith Date: Mon, 5 Aug 2024 18:39:50 +0100 Subject: [PATCH 4/6] Handle sys.stdout and sys.stderr redirection in --verbose3 mode --- Lib/test/test_android.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_android.py b/Lib/test/test_android.py index 46e451395a9283..73211eb4009804 100644 --- a/Lib/test/test_android.py +++ b/Lib/test/test_android.py @@ -1,3 +1,4 @@ +import io import platform import queue import re @@ -6,10 +7,11 @@ import unittest from _android_support import TextLogStream from array import array -from contextlib import contextmanager +from contextlib import ExitStack, contextmanager from threading import Thread from test.support import LOOPBACK_TIMEOUT from time import sleep, time +from unittest.mock import patch if sys.platform != "android": @@ -82,9 +84,30 @@ def unbuffered(self, stream): finally: stream.reconfigure(write_through=False) + # In --verbose3 mode, sys.stdout and sys.stderr are captured, so we can't + # test them directly. Detect this mode and use some temporary streams with + # the same properties. + def stream_context(self, stream_name, level): + # https://developer.android.com/ndk/reference/group/logging + prio = {"I": 4, "W": 5}[level] + + stack = ExitStack() + stack.enter_context(self.subTest(stream_name)) + stream = getattr(sys, stream_name) + if isinstance(stream, io.StringIO): + stack.enter_context( + patch( + f"sys.{stream_name}", + TextLogStream( + prio, f"python.{stream_name}", errors="backslashreplace" + ), + ) + ) + return stack + def test_str(self): for stream_name, level in [("stdout", "I"), ("stderr", "W")]: - with self.subTest(stream=stream_name): + with self.stream_context(stream_name, level): stream = getattr(sys, stream_name) tag = f"python.{stream_name}" self.assertEqual(f"", repr(stream)) @@ -235,7 +258,7 @@ def __str__(self): def test_bytes(self): for stream_name, level in [("stdout", "I"), ("stderr", "W")]: - with self.subTest(stream=stream_name): + with self.stream_context(stream_name, level): stream = getattr(sys, stream_name).buffer tag = f"python.{stream_name}" self.assertEqual(f"", repr(stream)) From e178b9e9c9e82985a15f0e78e4c942494f6262b6 Mon Sep 17 00:00:00 2001 From: Malcolm Smith Date: Mon, 5 Aug 2024 19:06:48 +0100 Subject: [PATCH 5/6] Clarify function name, add news --- Lib/test/test_android.py | 8 ++++---- .../2024-08-05-19-04-06.gh-issue-116622.3LWUzE.rst | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2024-08-05-19-04-06.gh-issue-116622.3LWUzE.rst diff --git a/Lib/test/test_android.py b/Lib/test/test_android.py index 73211eb4009804..a3a3fbd1297ba8 100644 --- a/Lib/test/test_android.py +++ b/Lib/test/test_android.py @@ -387,7 +387,7 @@ def test_rate_limit(self): line_num = 0 # Send BUCKET_KB messages and return the rate at which they were consumed. - def fill_bucket(): + def write_bucketful(): nonlocal line_num start = time() max_line_num = line_num + BUCKET_KB @@ -397,13 +397,13 @@ def fill_bucket(): return BUCKET_KB / (time() - start) # The first BUCKET_KB should be written with minimal delay. - self.assertGreater(fill_bucket(), MAX_KB_PER_SECOND * 2) + self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) # The next BUCKET_KB should be written at the rate limit. - rate = fill_bucket() + rate = write_bucketful() self.assertGreater(rate, MAX_KB_PER_SECOND * 0.75) self.assertLess(rate, MAX_KB_PER_SECOND * 1.25) # Once the token bucket is full again, we should go back to full speed. sleep(BUCKET_KB / MAX_KB_PER_SECOND) - self.assertGreater(fill_bucket(), MAX_KB_PER_SECOND * 2) + self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) diff --git a/Misc/NEWS.d/next/Core and Builtins/2024-08-05-19-04-06.gh-issue-116622.3LWUzE.rst b/Misc/NEWS.d/next/Core and Builtins/2024-08-05-19-04-06.gh-issue-116622.3LWUzE.rst new file mode 100644 index 00000000000000..9320928477af2c --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2024-08-05-19-04-06.gh-issue-116622.3LWUzE.rst @@ -0,0 +1 @@ +Fix Android stdout and stderr messages being truncated or lost. From 73b2772f8ba5bc31331c91aa5fb0233df55f6158 Mon Sep 17 00:00:00 2001 From: Malcolm Smith Date: Mon, 5 Aug 2024 23:05:19 +0100 Subject: [PATCH 6/6] Make rate limit test more reliable on slower devices --- Lib/test/test_android.py | 71 ++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/Lib/test/test_android.py b/Lib/test/test_android.py index a3a3fbd1297ba8..82035061bb6fdd 100644 --- a/Lib/test/test_android.py +++ b/Lib/test/test_android.py @@ -363,9 +363,7 @@ def write(b, lines=None, *, write_len=None): stream.write(obj) def test_rate_limit(self): - # See _android_support.py. - MAX_KB_PER_SECOND = 1024 - BUCKET_KB = 128 + # https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39 PER_MESSAGE_OVERHEAD = 28 # https://developer.android.com/ndk/reference/group/logging @@ -382,28 +380,45 @@ def test_rate_limit(self): 1024 - PER_MESSAGE_OVERHEAD - len(tag) - len(message.format(0)) ) + "\n" - # Make sure the token bucket is full. - sleep(BUCKET_KB / MAX_KB_PER_SECOND) - line_num = 0 - - # Send BUCKET_KB messages and return the rate at which they were consumed. - def write_bucketful(): - nonlocal line_num - start = time() - max_line_num = line_num + BUCKET_KB - while line_num < max_line_num: - stream.write(message.format(line_num)) - line_num += 1 - return BUCKET_KB / (time() - start) - - # The first BUCKET_KB should be written with minimal delay. - self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) - - # The next BUCKET_KB should be written at the rate limit. - rate = write_bucketful() - self.assertGreater(rate, MAX_KB_PER_SECOND * 0.75) - self.assertLess(rate, MAX_KB_PER_SECOND * 1.25) - - # Once the token bucket is full again, we should go back to full speed. - sleep(BUCKET_KB / MAX_KB_PER_SECOND) - self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) + # See _android_support.py. The default values of these parameters work + # well across a wide range of devices, but we'll use smaller values to + # ensure a quick and reliable test that doesn't flood the log too much. + MAX_KB_PER_SECOND = 100 + BUCKET_KB = 10 + with ( + patch("_android_support.MAX_BYTES_PER_SECOND", MAX_KB_PER_SECOND * 1024), + patch("_android_support.BUCKET_SIZE", BUCKET_KB * 1024), + ): + # Make sure the token bucket is full. + sleep(BUCKET_KB / MAX_KB_PER_SECOND) + line_num = 0 + + # Write BUCKET_KB messages, and return the rate at which they were + # accepted in KB per second. + def write_bucketful(): + nonlocal line_num + start = time() + max_line_num = line_num + BUCKET_KB + while line_num < max_line_num: + stream.write(message.format(line_num)) + line_num += 1 + return BUCKET_KB / (time() - start) + + # The first bucketful should be written with minimal delay. The + # factor of 2 here is not arbitrary: it verifies that the system can + # write fast enough to empty the bucket within two bucketfuls, which + # the next part of the test depends on. + self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2) + + # Write another bucketful to empty the token bucket completely. + write_bucketful() + + # The next bucketful should be written at the rate limit. + self.assertAlmostEqual( + write_bucketful(), MAX_KB_PER_SECOND, + delta=MAX_KB_PER_SECOND * 0.1 + ) + + # Once the token bucket refills, we should go back to full speed. + sleep(BUCKET_KB / MAX_KB_PER_SECOND) + self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2)