Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,20 +625,26 @@ def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
# Reverse so that the next (FIFO) future is on the right
fs.reverse()
# Careful not to keep references to futures or results
while fs:
# Wait for the next result
if timeout is None:
_result_or_cancel(fs[-1])
else:
_result_or_cancel(fs[-1], end_time - time.monotonic())

# Buffer next task
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
# Careful not to keep a reference to the popped future
if timeout is None:
yield _result_or_cancel(fs.pop())
else:
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())

# Yield the awaited result
yield fs.pop().result()
Copy link
Contributor Author

@ebonnal ebonnal Mar 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be discussed: this could be replaced by a lighter yield fs.pop()._result because the prior call to _result_or_cancel guarantees that at this point the result is available.

finally:
for future in fs:
future.cancel()
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures/executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import itertools
import operator
import threading
import time
import weakref
from concurrent import futures
from operator import add
from functools import partial
from contextlib import suppress
from test import support
from test.support import Py_GIL_DISABLED

Expand Down Expand Up @@ -143,6 +146,29 @@ def test_map_buffersize_when_buffer_is_full(self):
msg="should have fetched only `buffersize` elements from `ints`.",
)

def test_map_buffersize_when_error(self):
ints = [1, 2, 3, 0, 4, 5, 6]
index_of_zero = ints.index(0)
ints_iter = iter(ints)
buffersize = 2
reciprocal = partial(operator.truediv, 1)
results = []
with suppress(ZeroDivisionError):
for result in self.executor.map(
reciprocal, ints_iter, buffersize=buffersize
):
results.append(result)
self.assertEqual(
len(results),
index_of_zero,
msg="should have mapped until reaching the zero.",
)
self.assertEqual(
len(results) + buffersize + len(list(ints_iter)),
len(ints),
msg="ints should be either processed, or buffered, or not fetched.",
)

def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
Expand Down
Loading