From 070c8777fb4da826e85219395ad74b9f1753f614 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Wed, 15 Jan 2025 23:25:36 +0100 Subject: [PATCH 1/7] Make itertools.batched thread-safe --- Modules/itertoolsmodule.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 3f736f0cf19968..1cb70439911a59 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -186,12 +186,12 @@ static PyObject * batched_next(batchedobject *bo) { Py_ssize_t i; - Py_ssize_t n = bo->batch_size; + Py_ssize_t n = FT_ATOMIC_LOAD_SSIZE_RELAXED(bo->batch_size); PyObject *it = bo->it; PyObject *item; PyObject *result; - if (it == NULL) { + if (n < 0) { return NULL; } result = PyTuple_New(n); @@ -213,19 +213,19 @@ batched_next(batchedobject *bo) if (PyErr_Occurred()) { if (!PyErr_ExceptionMatches(PyExc_StopIteration)) { /* Input raised an exception other than StopIteration */ - Py_CLEAR(bo->it); + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); Py_DECREF(result); return NULL; } PyErr_Clear(); } if (i == 0) { - Py_CLEAR(bo->it); + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); Py_DECREF(result); return NULL; } if (bo->strict) { - Py_CLEAR(bo->it); + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); Py_DECREF(result); PyErr_SetString(PyExc_ValueError, "batched(): incomplete batch"); return NULL; From cc3c42b29664b4b588dabb6512991c6eda69e913 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Tue, 28 Jan 2025 20:11:14 +0100 Subject: [PATCH 2/7] enable tests --- .../test_itertools_batched.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 Lib/test/test_free_threading/test_itertools_batched.py diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py new file mode 100644 index 00000000000000..ee316d7420e723 --- /dev/null +++ b/Lib/test/test_free_threading/test_itertools_batched.py @@ -0,0 +1,39 @@ +import unittest +import sys +from threading import Thread, Barrier +from itertools import batched +from test.support import threading_helper + + +class EnumerateThreading(unittest.TestCase): + + @threading_helper.reap_threads + @threading_helper.requires_working_threading() + def test_threading(self): + number_of_threads = 10 + number_of_iterations = 20 + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + while True: + try: + _ = next(it) + except StopIteration: + break + + data = tuple(range(1000)) + for it in range(number_of_iterations): + batch_iterator = batched(data, 2) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[batch_iterator])) + for t in worker_threads: + t.start() + for t in worker_threads: + t.join() + + barrier.reset() + +if __name__ == "__main__": + unittest.main() From 0193795bfaab9895b2ec6708c63fc51479975b4f Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Wed, 19 Feb 2025 08:06:44 +0000 Subject: [PATCH 3/7] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst diff --git a/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst new file mode 100644 index 00000000000000..f34d0bcd4c1e37 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst @@ -0,0 +1 @@ +Make concurrent iterations over :class:`itertools.batched` safe under free-threading. From 71fed22e6c5c88ce5d395ff700094a87b7d03b67 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Tue, 11 Mar 2025 20:45:54 +0100 Subject: [PATCH 4/7] review comments --- Modules/itertoolsmodule.c | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 1cb70439911a59..93b965773dffe7 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -191,9 +191,15 @@ batched_next(batchedobject *bo) PyObject *item; PyObject *result; +#ifdef Py_GIL_DISABLED + if (it == NULL) { + return NULL; + } +#else if (n < 0) { return NULL; } +#endif result = PyTuple_New(n); if (result == NULL) { return NULL; @@ -213,19 +219,31 @@ batched_next(batchedobject *bo) if (PyErr_Occurred()) { if (!PyErr_ExceptionMatches(PyExc_StopIteration)) { /* Input raised an exception other than StopIteration */ +#ifdef Py_GIL_DISABLED FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#else + Py_CLEAR(bo->it); +#endif Py_DECREF(result); return NULL; } PyErr_Clear(); } if (i == 0) { +#ifdef Py_GIL_DISABLED FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#else + Py_CLEAR(bo->it); +#endif Py_DECREF(result); return NULL; } if (bo->strict) { +#ifdef Py_GIL_DISABLED FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#else + Py_CLEAR(bo->it); +#endif Py_DECREF(result); PyErr_SetString(PyExc_ValueError, "batched(): incomplete batch"); return NULL; From 3ae3c481c81b7c2ada1743d410e26590e60c3e7f Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Tue, 11 Mar 2025 21:12:54 +0100 Subject: [PATCH 5/7] review comments --- Lib/test/test_free_threading/test_itertools_batched.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py index ee316d7420e723..0907ac47af8cc6 100644 --- a/Lib/test/test_free_threading/test_itertools_batched.py +++ b/Lib/test/test_free_threading/test_itertools_batched.py @@ -5,10 +5,11 @@ from test.support import threading_helper +threading_helper.requires_working_threading(module=True) + class EnumerateThreading(unittest.TestCase): @threading_helper.reap_threads - @threading_helper.requires_working_threading() def test_threading(self): number_of_threads = 10 number_of_iterations = 20 @@ -28,10 +29,8 @@ def work(it): for ii in range(number_of_threads): worker_threads.append( Thread(target=work, args=[batch_iterator])) - for t in worker_threads: - t.start() - for t in worker_threads: - t.join() + + threading_helper.start_threads(worker_threads) barrier.reset() From 560bc59322fe92de2e9acf7cc123a78420ac626f Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Tue, 11 Mar 2025 21:52:13 +0100 Subject: [PATCH 6/7] review comments --- Modules/itertoolsmodule.c | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 93b965773dffe7..915eb679a65c15 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -191,15 +191,9 @@ batched_next(batchedobject *bo) PyObject *item; PyObject *result; -#ifdef Py_GIL_DISABLED - if (it == NULL) { - return NULL; - } -#else if (n < 0) { return NULL; } -#endif result = PyTuple_New(n); if (result == NULL) { return NULL; @@ -219,9 +213,8 @@ batched_next(batchedobject *bo) if (PyErr_Occurred()) { if (!PyErr_ExceptionMatches(PyExc_StopIteration)) { /* Input raised an exception other than StopIteration */ -#ifdef Py_GIL_DISABLED FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); -#else +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); #endif Py_DECREF(result); @@ -230,18 +223,16 @@ batched_next(batchedobject *bo) PyErr_Clear(); } if (i == 0) { -#ifdef Py_GIL_DISABLED FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); -#else +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); #endif Py_DECREF(result); return NULL; } if (bo->strict) { -#ifdef Py_GIL_DISABLED FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); -#else +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); #endif Py_DECREF(result); From 4df892d1ecdea9524ac2309f8cbf37f4a5c4fcf2 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Wed, 12 Mar 2025 08:56:04 +0100 Subject: [PATCH 7/7] Update Lib/test/test_free_threading/test_itertools_batched.py --- Lib/test/test_free_threading/test_itertools_batched.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py index 0907ac47af8cc6..fa9e06bf07fa26 100644 --- a/Lib/test/test_free_threading/test_itertools_batched.py +++ b/Lib/test/test_free_threading/test_itertools_batched.py @@ -30,7 +30,8 @@ def work(it): worker_threads.append( Thread(target=work, args=[batch_iterator])) - threading_helper.start_threads(worker_threads) + with threading_helper.start_threads(worker_threads): + pass barrier.reset()