Skip to content

Commit

Permalink
Move scale_{in,out} from ParslExecutor to BlockProviderExecutor (#2885)
Browse files Browse the repository at this point in the history
Scale in/out behaviour is only for BlockProviderExecutors. Other fairly
recent PRs have been consolidating that behaviour, and this PR is another
one in that direction.

The non-BlockProviderExecutors (threads and flux) had meaningless stub
methods to make ABCMeta allow them to be instantiated. This PR removes
those stubs.
  • Loading branch information
benclifford authored Sep 18, 2023
1 parent d0002ae commit 4a684dd
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 50 deletions.
22 changes: 0 additions & 22 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,6 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args:
"""
pass

@abstractmethod
def scale_out(self, blocks: int) -> List[str]:
"""Scale out method.
:return: A list of block ids corresponding to the blocks that were added.
"""
pass

@abstractmethod
def scale_in(self, blocks: int) -> List[str]:
"""Scale in method.
Cause the executor to reduce the number of blocks by count.
We should have the scale in method simply take resource object
which will have the scaling methods, scale_in itself should be a coroutine, since
scaling tasks can be slow.
:return: A list of block ids corresponding to the blocks that were removed.
"""
pass

@abstractmethod
def shutdown(self) -> bool:
"""Shutdown the executor.
Expand Down
6 changes: 0 additions & 6 deletions parsl/executors/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,6 @@ def submit(
)
return future

def scale_in(self, *args, **kwargs):
pass

def scale_out(self):
pass


def _submit_wrapper(
submission_queue: queue.Queue, stop_event: threading.Event, *args, **kwargs
Expand Down
14 changes: 14 additions & 0 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,20 @@ def scale_out(self, blocks: int = 1) -> List[str]:
"Failed to start block {}: {}".format(block_id, ex))
return block_ids

@abstractmethod
def scale_in(self, blocks: int) -> List[str]:
"""Scale in method.
Cause the executor to reduce the number of blocks by count.
We should have the scale in method simply take resource object
which will have the scaling methods, scale_in itself should be a coroutine, since
scaling tasks can be slow.
:return: A list of block ids corresponding to the blocks that were removed.
"""
pass

def _launch_block(self, block_id: str) -> Any:
launch_cmd = self._get_launch_command(block_id)
job_name = f"parsl.{self.label}.block-{block_id}"
Expand Down
22 changes: 0 additions & 22 deletions parsl/executors/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,6 @@ def submit(self, func, resource_specification, *args, **kwargs):

return self.executor.submit(func, *args, **kwargs)

def scale_out(self, workers=1):
"""Scales out the number of active workers by 1.
This method is notImplemented for threads and will raise the error if called.
Raises:
NotImplemented exception
"""

raise NotImplementedError

def scale_in(self, blocks):
"""Scale in the number of active blocks by specified amount.
This method is not implemented for threads and will raise the error if called.
Raises:
NotImplemented exception
"""

raise NotImplementedError

def shutdown(self, block=True):
"""Shutdown the ThreadPool. The underlying concurrent.futures thread pool
implementation will not terminate tasks that are being executed, because it
Expand Down

0 comments on commit 4a684dd

Please sign in to comment.