From 7d972f9440cb7594f1252e85c0c05d8b6968350c Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Thu, 16 Apr 2020 18:38:03 +0200 Subject: [PATCH 01/15] make the Runner work with unhashable points --- adaptive/runner.py | 48 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 9a8a9b1a5..2a47c7b75 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -54,6 +54,12 @@ ) +def _key_by_value(dct, value): + for k, v in dct.items(): + if v == value: + return k + + class BaseRunner(metaclass=abc.ABCMeta): r"""Base class for runners that use `concurrent.futures.Executors`. @@ -146,11 +152,16 @@ def __init__( self.to_retry = {} self.tracebacks = {} + # Keeping track of index -> point + self.index_to_point = {} + self._i = 0 # some unique index to be associated with each point + def _get_max_tasks(self): return self._max_tasks or _get_ncores(self.executor) - def _do_raise(self, e, x): - tb = self.tracebacks[x] + def _do_raise(self, e, i): + tb = self.tracebacks[i] + x = self.index_to_point[i] raise RuntimeError( "An error occured while evaluating " f'"learner.function({x})". ' @@ -162,9 +173,14 @@ def do_log(self): return self.log is not None def _ask(self, n): - points = [ - p for p in self.to_retry.keys() if p not in self.pending_points.values() - ][:n] + points = [] + for i, index in enumerate(self.to_retry.keys()): + if i == n: + break + point = self.index_to_point[index] + if point not in self.pending_points.values(): + points.append(point) + loss_improvements = len(points) * [float("inf")] if len(points) < n: new_points, new_losses = self.learner.ask(n - len(points)) @@ -198,20 +214,22 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: x = self.pending_points.pop(fut) + i = _key_by_value(self.index_to_point, x) # O(N) try: y = fut.result() t = time.time() - fut.start_time # total execution time except Exception as e: - self.tracebacks[x] = traceback.format_exc() - self.to_retry[x] = self.to_retry.get(x, 0) + 1 - if self.to_retry[x] > self.retries: - self.to_retry.pop(x) + self.tracebacks[i] = traceback.format_exc() + self.to_retry[i] = self.to_retry.get(i, 0) + 1 + if self.to_retry[i] > self.retries: + self.to_retry.pop(i) if self.raise_if_retries_exceeded: - self._do_raise(e, x) + self._do_raise(e, i) else: self._elapsed_function_time += t / self._get_max_tasks() - self.to_retry.pop(x, None) - self.tracebacks.pop(x, None) + self.to_retry.pop(i, None) + self.tracebacks.pop(i, None) + self.index_to_point.pop(i) if self.do_log: self.log.append(("tell", x, y)) self.learner.tell(x, y) @@ -232,6 +250,12 @@ def _get_futures(self): fut = self._submit(x) fut.start_time = start_time self.pending_points[fut] = x + i = _key_by_value(self.index_to_point, x) # O(N) + if i is None: + # `x` is not a value in `self.index_to_point` + self._i += 1 + i = self._i + self.index_to_point[i] = x # Collect and results and add them to the learner futures = list(self.pending_points.keys()) From 878ae79125b49afa37ba6b9bca845441b2199b6d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Thu, 16 Apr 2020 20:08:40 +0200 Subject: [PATCH 02/15] make tracebacks and to_retry properties and lists of tuples --- adaptive/runner.py | 54 ++++++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 2a47c7b75..02807aa7c 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -97,12 +97,12 @@ class BaseRunner(metaclass=abc.ABCMeta): log : list or None Record of the method calls made to the learner, in the format ``(method_name, *args)``. - to_retry : dict - Mapping of ``{point: n_fails, ...}``. When a point has failed + to_retry : list of tuples + List of ``(point, n_fails)``. When a point has failed ``runner.retries`` times it is removed but will be present in ``runner.tracebacks``. - tracebacks : dict - A mapping of point to the traceback if that point failed. + tracebacks : list of tuples + List of of ``(point, tb)`` for points that failed. pending_points : dict A mapping of `~concurrent.futures.Future`\s to points. @@ -149,19 +149,19 @@ def __init__( # Error handling attributes self.retries = retries self.raise_if_retries_exceeded = raise_if_retries_exceeded - self.to_retry = {} - self.tracebacks = {} + self._to_retry = {} + self._tracebacks = {} # Keeping track of index -> point - self.index_to_point = {} + self._index_to_point = {} self._i = 0 # some unique index to be associated with each point def _get_max_tasks(self): return self._max_tasks or _get_ncores(self.executor) def _do_raise(self, e, i): - tb = self.tracebacks[i] - x = self.index_to_point[i] + tb = self._tracebacks[i] + x = self._index_to_point[i] raise RuntimeError( "An error occured while evaluating " f'"learner.function({x})". ' @@ -174,10 +174,10 @@ def do_log(self): def _ask(self, n): points = [] - for i, index in enumerate(self.to_retry.keys()): + for i, index in enumerate(self._to_retry.keys()): if i == n: break - point = self.index_to_point[index] + point = self._index_to_point[index] if point not in self.pending_points.values(): points.append(point) @@ -214,22 +214,22 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: x = self.pending_points.pop(fut) - i = _key_by_value(self.index_to_point, x) # O(N) + i = _key_by_value(self._index_to_point, x) # O(N) try: y = fut.result() t = time.time() - fut.start_time # total execution time except Exception as e: - self.tracebacks[i] = traceback.format_exc() - self.to_retry[i] = self.to_retry.get(i, 0) + 1 - if self.to_retry[i] > self.retries: - self.to_retry.pop(i) + self._tracebacks[i] = traceback.format_exc() + self._to_retry[i] = self._to_retry.get(i, 0) + 1 + if self._to_retry[i] > self.retries: + self._to_retry.pop(i) if self.raise_if_retries_exceeded: self._do_raise(e, i) else: self._elapsed_function_time += t / self._get_max_tasks() - self.to_retry.pop(i, None) - self.tracebacks.pop(i, None) - self.index_to_point.pop(i) + self._to_retry.pop(i, None) + self._tracebacks.pop(i, None) + self._index_to_point.pop(i) if self.do_log: self.log.append(("tell", x, y)) self.learner.tell(x, y) @@ -250,12 +250,12 @@ def _get_futures(self): fut = self._submit(x) fut.start_time = start_time self.pending_points[fut] = x - i = _key_by_value(self.index_to_point, x) # O(N) + i = _key_by_value(self._index_to_point, x) # O(N) if i is None: - # `x` is not a value in `self.index_to_point` + # `x` is not a value in `self._index_to_point` self._i += 1 i = self._i - self.index_to_point[i] = x + self._index_to_point[i] = x # Collect and results and add them to the learner futures = list(self.pending_points.keys()) @@ -284,7 +284,7 @@ def _cleanup(self): @property def failed(self): """Set of points that failed ``runner.retries`` times.""" - return set(self.tracebacks) - set(self.to_retry) + return set(self._tracebacks) - set(self._to_retry) @abc.abstractmethod def elapsed_time(self): @@ -300,6 +300,14 @@ def _submit(self, x): """Is called in `_get_futures`.""" pass + @property + def tracebacks(self): + return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()] + + @property + def to_retry(self): + return [(self._index_to_point[i], n) for i, n in self._to_retry.items()] + class BlockingRunner(BaseRunner): """Run a learner synchronously in an executor. From 4b990d03b89922985d902d87d4158c7b99efeb0a Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Fri, 17 Apr 2020 10:27:06 +0200 Subject: [PATCH 03/15] use next in _key_by_value --- adaptive/runner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 02807aa7c..2d5ead982 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -55,9 +55,7 @@ def _key_by_value(dct, value): - for k, v in dct.items(): - if v == value: - return k + return next((k for k, v in dct.items() if v == value), None) class BaseRunner(metaclass=abc.ABCMeta): From 2f1f2bc7d4d947a921add4587b89d9ba7fbe32e9 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Fri, 17 Apr 2020 10:30:24 +0200 Subject: [PATCH 04/15] raise StopIteration because it is clearer than checking for None --- adaptive/runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 2d5ead982..ab0488042 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -55,7 +55,7 @@ def _key_by_value(dct, value): - return next((k for k, v in dct.items() if v == value), None) + return next(k for k, v in dct.items() if v == value) class BaseRunner(metaclass=abc.ABCMeta): @@ -248,9 +248,9 @@ def _get_futures(self): fut = self._submit(x) fut.start_time = start_time self.pending_points[fut] = x - i = _key_by_value(self._index_to_point, x) # O(N) - if i is None: - # `x` is not a value in `self._index_to_point` + try: + i = _key_by_value(self._index_to_point, x) # O(N) + except StopIteration: # `x` is not a value in `self._index_to_point` self._i += 1 i = self._i self._index_to_point[i] = x From c50e52fb7e9ec561b0247ecb1f97a402c48b1e72 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Fri, 17 Apr 2020 10:48:41 +0200 Subject: [PATCH 05/15] use that runner.tracebacks is a list of tuples --- docs/source/tutorial/tutorial.advanced-topics.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/tutorial/tutorial.advanced-topics.rst b/docs/source/tutorial/tutorial.advanced-topics.rst index 84487a9cb..1156979c8 100644 --- a/docs/source/tutorial/tutorial.advanced-topics.rst +++ b/docs/source/tutorial/tutorial.advanced-topics.rst @@ -297,12 +297,12 @@ raise the exception with the stack trace: runner.task.result() -You can also check ``runner.tracebacks`` which is a mapping from -point → traceback. +You can also check ``runner.tracebacks`` which is a list of tuples with +(point, traceback). .. jupyter-execute:: - for point, tb in runner.tracebacks.items(): + for point, tb in runner.tracebacks: print(f'point: {point}:\n {tb}') Logging runners From d1fd55a7d5bc596e804a205b5eb43dc5ca76e752 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 20:09:03 +0200 Subject: [PATCH 06/15] rename _index_to_point -> _id_to_point --- adaptive/runner.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index ab0488042..b06f3db15 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -151,7 +151,7 @@ def __init__( self._tracebacks = {} # Keeping track of index -> point - self._index_to_point = {} + self._id_to_point = {} self._i = 0 # some unique index to be associated with each point def _get_max_tasks(self): @@ -159,7 +159,7 @@ def _get_max_tasks(self): def _do_raise(self, e, i): tb = self._tracebacks[i] - x = self._index_to_point[i] + x = self._id_to_point[i] raise RuntimeError( "An error occured while evaluating " f'"learner.function({x})". ' @@ -175,7 +175,7 @@ def _ask(self, n): for i, index in enumerate(self._to_retry.keys()): if i == n: break - point = self._index_to_point[index] + point = self._id_to_point[index] if point not in self.pending_points.values(): points.append(point) @@ -212,7 +212,7 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: x = self.pending_points.pop(fut) - i = _key_by_value(self._index_to_point, x) # O(N) + i = _key_by_value(self._id_to_point, x) # O(N) try: y = fut.result() t = time.time() - fut.start_time # total execution time @@ -227,7 +227,7 @@ def _process_futures(self, done_futs): self._elapsed_function_time += t / self._get_max_tasks() self._to_retry.pop(i, None) self._tracebacks.pop(i, None) - self._index_to_point.pop(i) + self._id_to_point.pop(i) if self.do_log: self.log.append(("tell", x, y)) self.learner.tell(x, y) @@ -249,11 +249,11 @@ def _get_futures(self): fut.start_time = start_time self.pending_points[fut] = x try: - i = _key_by_value(self._index_to_point, x) # O(N) - except StopIteration: # `x` is not a value in `self._index_to_point` + i = _key_by_value(self._id_to_point, x) # O(N) + except StopIteration: # `x` is not a value in `self._id_to_point` self._i += 1 i = self._i - self._index_to_point[i] = x + self._id_to_point[i] = x # Collect and results and add them to the learner futures = list(self.pending_points.keys()) @@ -300,11 +300,11 @@ def _submit(self, x): @property def tracebacks(self): - return [(self._index_to_point[i], tb) for i, tb in self._tracebacks.items()] + return [(self._id_to_point[i], tb) for i, tb in self._tracebacks.items()] @property def to_retry(self): - return [(self._index_to_point[i], n) for i, n in self._to_retry.items()] + return [(self._id_to_point[i], n) for i, n in self._to_retry.items()] class BlockingRunner(BaseRunner): From 58f7b39733c79e4f2d3512b28fcef318f4dee468 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 20:14:51 +0200 Subject: [PATCH 07/15] use partial(next, itertools.count()) to generate unique ids --- adaptive/runner.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index b06f3db15..ccf5ac4cd 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -1,7 +1,9 @@ import abc import asyncio import concurrent.futures as concurrent +import functools import inspect +import itertools import pickle import sys import time @@ -150,9 +152,10 @@ def __init__( self._to_retry = {} self._tracebacks = {} - # Keeping track of index -> point self._id_to_point = {} - self._i = 0 # some unique index to be associated with each point + self._next_id = functools.partial( + next, itertools.count() + ) # some unique id to be associated with each point def _get_max_tasks(self): return self._max_tasks or _get_ncores(self.executor) @@ -172,10 +175,10 @@ def do_log(self): def _ask(self, n): points = [] - for i, index in enumerate(self._to_retry.keys()): + for i, _id in enumerate(self._to_retry.keys()): if i == n: break - point = self._id_to_point[index] + point = self._id_to_point[_id] if point not in self.pending_points.values(): points.append(point) @@ -249,11 +252,10 @@ def _get_futures(self): fut.start_time = start_time self.pending_points[fut] = x try: - i = _key_by_value(self._id_to_point, x) # O(N) + _id = _key_by_value(self._id_to_point, x) # O(N) except StopIteration: # `x` is not a value in `self._id_to_point` - self._i += 1 - i = self._i - self._id_to_point[i] = x + _id = self._next_id() + self._id_to_point[_id] = x # Collect and results and add them to the learner futures = list(self.pending_points.keys()) From c9c223d485b65f858dc295210dce755684cfc16d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 20:36:56 +0200 Subject: [PATCH 08/15] move pid logic to _ask --- adaptive/runner.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index ccf5ac4cd..fa96096ca 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -175,10 +175,10 @@ def do_log(self): def _ask(self, n): points = [] - for i, _id in enumerate(self._to_retry.keys()): + for i, pid in enumerate(self._to_retry.keys()): if i == n: break - point = self._id_to_point[_id] + point = self._id_to_point[pid] if point not in self.pending_points.values(): points.append(point) @@ -187,6 +187,8 @@ def _ask(self, n): new_points, new_losses = self.learner.ask(n - len(points)) points += new_points loss_improvements += new_losses + for p in new_points: + self._id_to_point[self._next_id()] = p return points, loss_improvements def overhead(self): @@ -251,11 +253,6 @@ def _get_futures(self): fut = self._submit(x) fut.start_time = start_time self.pending_points[fut] = x - try: - _id = _key_by_value(self._id_to_point, x) # O(N) - except StopIteration: # `x` is not a value in `self._id_to_point` - _id = self._next_id() - self._id_to_point[_id] = x # Collect and results and add them to the learner futures = list(self.pending_points.keys()) From 540b13f2a68467d74e9dbcc264569727e8f8e991 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 22:52:17 +0200 Subject: [PATCH 09/15] make pending_points a mapping of future -> pid --- adaptive/runner.py | 57 +++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index fa96096ca..c48307fcf 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -174,22 +174,21 @@ def do_log(self): return self.log is not None def _ask(self, n): - points = [] - for i, pid in enumerate(self._to_retry.keys()): - if i == n: - break - point = self._id_to_point[pid] - if point not in self.pending_points.values(): - points.append(point) - - loss_improvements = len(points) * [float("inf")] - if len(points) < n: - new_points, new_losses = self.learner.ask(n - len(points)) - points += new_points + pids = [ + pid + for pid in self._to_retry.keys() + if pid not in self.pending_points.values() + ][:n] + loss_improvements = len(pids) * [float("inf")] + + if len(pids) < n: + new_points, new_losses = self.learner.ask(n - len(pids)) loss_improvements += new_losses - for p in new_points: - self._id_to_point[self._next_id()] = p - return points, loss_improvements + for point in new_points: + pid = self._next_id() + self._id_to_point[pid] = point + pids.append(pid) + return pids, loss_improvements def overhead(self): """Overhead of using Adaptive and the executor in percent. @@ -216,23 +215,22 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: - x = self.pending_points.pop(fut) - i = _key_by_value(self._id_to_point, x) # O(N) + pid = self.pending_points.pop(fut) try: y = fut.result() t = time.time() - fut.start_time # total execution time except Exception as e: - self._tracebacks[i] = traceback.format_exc() - self._to_retry[i] = self._to_retry.get(i, 0) + 1 - if self._to_retry[i] > self.retries: - self._to_retry.pop(i) + self._tracebacks[pid] = traceback.format_exc() + self._to_retry[pid] = self._to_retry.get(pid, 0) + 1 + if self._to_retry[pid] > self.retries: + self._to_retry.pop(pid) if self.raise_if_retries_exceeded: - self._do_raise(e, i) + self._do_raise(e, pid) else: self._elapsed_function_time += t / self._get_max_tasks() - self._to_retry.pop(i, None) - self._tracebacks.pop(i, None) - self._id_to_point.pop(i) + self._to_retry.pop(pid, None) + self._tracebacks.pop(pid, None) + x = self._id_to_point.pop(pid) if self.do_log: self.log.append(("tell", x, y)) self.learner.tell(x, y) @@ -246,13 +244,14 @@ def _get_futures(self): if self.do_log: self.log.append(("ask", n_new_tasks)) - points, _ = self._ask(n_new_tasks) + pids, _ = self._ask(n_new_tasks) - for x in points: + for pid in pids: start_time = time.time() # so we can measure execution time - fut = self._submit(x) + point = self._id_to_point[pid] + fut = self._submit(point) fut.start_time = start_time - self.pending_points[fut] = x + self.pending_points[fut] = pid # Collect and results and add them to the learner futures = list(self.pending_points.keys()) From a2ac197e711d447a3ea25d80a69ced1c24a6db67 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 22:57:22 +0200 Subject: [PATCH 10/15] introduce a property pending_points which is a list of tuples and fix docs --- adaptive/runner.py | 52 ++++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index c48307fcf..63cbf3d17 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -103,8 +103,8 @@ class BaseRunner(metaclass=abc.ABCMeta): in ``runner.tracebacks``. tracebacks : list of tuples List of of ``(point, tb)`` for points that failed. - pending_points : dict - A mapping of `~concurrent.futures.Future`\s to points. + pending_points : list of tuples + A list of tuples with ``(concurrent.futures.Future, point)``. Methods ------- @@ -132,7 +132,7 @@ def __init__( self._max_tasks = ntasks - self.pending_points = {} + self._pending_points = {} # if we instantiate our own executor, then we are also responsible # for calling 'shutdown' @@ -177,7 +177,7 @@ def _ask(self, n): pids = [ pid for pid in self._to_retry.keys() - if pid not in self.pending_points.values() + if pid not in self._pending_points.values() ][:n] loss_improvements = len(pids) * [float("inf")] @@ -215,7 +215,7 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: - pid = self.pending_points.pop(fut) + pid = self._pending_points.pop(fut) try: y = fut.result() t = time.time() - fut.start_time # total execution time @@ -239,7 +239,7 @@ def _get_futures(self): # Launch tasks to replace the ones that completed # on the last iteration, making sure to fill workers # that have started since the last iteration. - n_new_tasks = max(0, self._get_max_tasks() - len(self.pending_points)) + n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_points)) if self.do_log: self.log.append(("ask", n_new_tasks)) @@ -251,17 +251,17 @@ def _get_futures(self): point = self._id_to_point[pid] fut = self._submit(point) fut.start_time = start_time - self.pending_points[fut] = pid + self._pending_points[fut] = pid # Collect and results and add them to the learner - futures = list(self.pending_points.keys()) + futures = list(self._pending_points.keys()) return futures def _remove_unfinished(self): # remove points with 'None' values from the learner self.learner.remove_unfinished() # cancel any outstanding tasks - remaining = list(self.pending_points.keys()) + remaining = list(self._pending_points.keys()) for fut in remaining: fut.cancel() return remaining @@ -298,11 +298,17 @@ def _submit(self, x): @property def tracebacks(self): - return [(self._id_to_point[i], tb) for i, tb in self._tracebacks.items()] + return [(self._id_to_point[pid], tb) for pid, tb in self._tracebacks.items()] @property def to_retry(self): - return [(self._id_to_point[i], n) for i, n in self._to_retry.items()] + return [(self._id_to_point[pid], n) for pid, n in self._to_retry.items()] + + @property + def pending_points(self): + return [ + (fut, self._id_to_point[pid]) for fut, pid in self._pending_points.items() + ] class BlockingRunner(BaseRunner): @@ -343,14 +349,14 @@ class BlockingRunner(BaseRunner): log : list or None Record of the method calls made to the learner, in the format ``(method_name, *args)``. - to_retry : dict - Mapping of ``{point: n_fails, ...}``. When a point has failed + to_retry : list of tuples + List of ``(point, n_fails)``. When a point has failed ``runner.retries`` times it is removed but will be present in ``runner.tracebacks``. - tracebacks : dict - A mapping of point to the traceback if that point failed. - pending_points : dict - A mapping of `~concurrent.futures.Future`\to points. + tracebacks : list of tuples + List of of ``(point, tb)`` for points that failed. + pending_points : list of tuples + A list of tuples with ``(concurrent.futures.Future, point)``. Methods ------- @@ -466,14 +472,14 @@ class AsyncRunner(BaseRunner): log : list or None Record of the method calls made to the learner, in the format ``(method_name, *args)``. - to_retry : dict - Mapping of ``{point: n_fails, ...}``. When a point has failed + to_retry : list of tuples + List of ``(point, n_fails)``. When a point has failed ``runner.retries`` times it is removed but will be present in ``runner.tracebacks``. - tracebacks : dict - A mapping of point to the traceback if that point failed. - pending_points : dict - A mapping of `~concurrent.futures.Future`\s to points. + tracebacks : list of tuples + List of of ``(point, tb)`` for points that failed. + pending_points : list of tuples + A list of tuples with ``(concurrent.futures.Future, point)``. Methods ------- From 8a6249ff230af9202325fb9e09f8043b9e45576b Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 23:00:23 +0200 Subject: [PATCH 11/15] remove _key_by_value --- adaptive/runner.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 63cbf3d17..495bca113 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -56,10 +56,6 @@ ) -def _key_by_value(dct, value): - return next(k for k, v in dct.items() if v == value) - - class BaseRunner(metaclass=abc.ABCMeta): r"""Base class for runners that use `concurrent.futures.Executors`. From 8eb178a35b184f1564d2a0fbe4423831593e66a4 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 22 Apr 2020 23:17:45 +0200 Subject: [PATCH 12/15] do not do the entire loop but only until "n" --- adaptive/runner.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 495bca113..27b7148b7 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -170,11 +170,10 @@ def do_log(self): return self.log is not None def _ask(self, n): - pids = [ - pid - for pid in self._to_retry.keys() - if pid not in self._pending_points.values() - ][:n] + pending_ids = self._pending_points.values() + pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) + pids = list(itertools.islice(pids_gen, n)) + loss_improvements = len(pids) * [float("inf")] if len(pids) < n: From 2c9caa30c08fde6be282f0433cfcc9f503ae3d6a Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Thu, 23 Apr 2020 12:07:39 +0200 Subject: [PATCH 13/15] rename _pending_points -> _pending_pids --- adaptive/runner.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 27b7148b7..c93b44b75 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -128,7 +128,7 @@ def __init__( self._max_tasks = ntasks - self._pending_points = {} + self._pending_pids = {} # if we instantiate our own executor, then we are also responsible # for calling 'shutdown' @@ -170,7 +170,7 @@ def do_log(self): return self.log is not None def _ask(self, n): - pending_ids = self._pending_points.values() + pending_ids = self._pending_pids.values() pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) pids = list(itertools.islice(pids_gen, n)) @@ -210,7 +210,7 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: - pid = self._pending_points.pop(fut) + pid = self._pending_pids.pop(fut) try: y = fut.result() t = time.time() - fut.start_time # total execution time @@ -234,7 +234,7 @@ def _get_futures(self): # Launch tasks to replace the ones that completed # on the last iteration, making sure to fill workers # that have started since the last iteration. - n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_points)) + n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_pids)) if self.do_log: self.log.append(("ask", n_new_tasks)) @@ -246,17 +246,17 @@ def _get_futures(self): point = self._id_to_point[pid] fut = self._submit(point) fut.start_time = start_time - self._pending_points[fut] = pid + self._pending_pids[fut] = pid # Collect and results and add them to the learner - futures = list(self._pending_points.keys()) + futures = list(self._pending_pids.keys()) return futures def _remove_unfinished(self): # remove points with 'None' values from the learner self.learner.remove_unfinished() # cancel any outstanding tasks - remaining = list(self._pending_points.keys()) + remaining = list(self._pending_pids.keys()) for fut in remaining: fut.cancel() return remaining @@ -302,7 +302,7 @@ def to_retry(self): @property def pending_points(self): return [ - (fut, self._id_to_point[pid]) for fut, pid in self._pending_points.items() + (fut, self._id_to_point[pid]) for fut, pid in self._pending_pids.items() ] From 5ba052cc5ee557be6255a18fc91fce710ebc921d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Thu, 23 Apr 2020 12:42:12 +0200 Subject: [PATCH 14/15] =?UTF-8?q?rename=20=5Fpending=5Fpids=20=E2=86=92=20?= =?UTF-8?q?=5Fpending=5Ftasks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adaptive/runner.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index c93b44b75..3e0a5d8d3 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -128,7 +128,7 @@ def __init__( self._max_tasks = ntasks - self._pending_pids = {} + self._pending_tasks = {} # mapping from concurrent.futures.Future → point id # if we instantiate our own executor, then we are also responsible # for calling 'shutdown' @@ -170,7 +170,7 @@ def do_log(self): return self.log is not None def _ask(self, n): - pending_ids = self._pending_pids.values() + pending_ids = self._pending_tasks.values() pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) pids = list(itertools.islice(pids_gen, n)) @@ -210,7 +210,7 @@ def overhead(self): def _process_futures(self, done_futs): for fut in done_futs: - pid = self._pending_pids.pop(fut) + pid = self._pending_tasks.pop(fut) try: y = fut.result() t = time.time() - fut.start_time # total execution time @@ -234,7 +234,7 @@ def _get_futures(self): # Launch tasks to replace the ones that completed # on the last iteration, making sure to fill workers # that have started since the last iteration. - n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_pids)) + n_new_tasks = max(0, self._get_max_tasks() - len(self._pending_tasks)) if self.do_log: self.log.append(("ask", n_new_tasks)) @@ -246,17 +246,17 @@ def _get_futures(self): point = self._id_to_point[pid] fut = self._submit(point) fut.start_time = start_time - self._pending_pids[fut] = pid + self._pending_tasks[fut] = pid # Collect and results and add them to the learner - futures = list(self._pending_pids.keys()) + futures = list(self._pending_tasks.keys()) return futures def _remove_unfinished(self): # remove points with 'None' values from the learner self.learner.remove_unfinished() # cancel any outstanding tasks - remaining = list(self._pending_pids.keys()) + remaining = list(self._pending_tasks.keys()) for fut in remaining: fut.cancel() return remaining @@ -302,7 +302,7 @@ def to_retry(self): @property def pending_points(self): return [ - (fut, self._id_to_point[pid]) for fut, pid in self._pending_pids.items() + (fut, self._id_to_point[pid]) for fut, pid in self._pending_tasks.items() ] From c7a12a4f67a493a758ae09ca28c6f04e930c342a Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Fri, 24 Apr 2020 18:05:45 +0200 Subject: [PATCH 15/15] add a comment about using a generator expression --- adaptive/runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/adaptive/runner.py b/adaptive/runner.py index 3e0a5d8d3..73fd9ed94 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -171,6 +171,7 @@ def do_log(self): def _ask(self, n): pending_ids = self._pending_tasks.values() + # using generator here because we only need until `n` pids_gen = (pid for pid in self._to_retry.keys() if pid not in pending_ids) pids = list(itertools.islice(pids_gen, n))