-
Notifications
You must be signed in to change notification settings - Fork 3
/
concurrently.py
39 lines (29 loc) · 1.3 KB
/
concurrently.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import concurrent.futures
import itertools
def concurrently(handler, inputs, *, max_concurrency=5):
"""
Calls the function ``handler`` on the values ``inputs``.
``handler`` should be a function that takes a single input, which is the
individual values in the iterable ``inputs``.
Generates (input, output) tuples as the calls to ``handler`` complete.
See https://alexwlchan.net/2019/10/adventures-with-concurrent-futures/ for an explanation
of how this function works.
"""
# Make sure we get a consistent iterator throughout, rather than
# getting the first element repeatedly.
handler_inputs = iter(inputs)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(handler, input): input
for input in itertools.islice(handler_inputs, max_concurrency)
}
while futures:
done, _ = concurrent.futures.wait(
futures, return_when=concurrent.futures.FIRST_COMPLETED
)
for fut in done:
original_input = futures.pop(fut)
yield original_input, fut.result()
for input in itertools.islice(handler_inputs, len(done)):
fut = executor.submit(handler, input)
futures[fut] = input