From 4f5452658e7c287fbcb4f744dacb98c92be42303 Mon Sep 17 00:00:00 2001 From: Yadu Babuji Date: Wed, 30 Oct 2024 13:29:08 -0500 Subject: [PATCH] Fixes to avoid res_spec modification * GCE now makes a deepcopy of the resource_specification to avoid modifying the user supplied object. * stack of docstring changes --- docs/userguide/execution.rst | 2 +- parsl/executors/globus_compute.py | 35 +++++++++++++------------------ 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 8b190fa7b2..848a3114e4 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -88,7 +88,7 @@ Parsl currently supports the following executors: These executors cover a broad range of execution requirements. As with other Parsl components, there is a standard interface (ParslExecutor) that can be implemented to add support for other executors. 5. `parsl.executors.globus_compute.GlobusComputeExecutor`: This executor uses `Globus Compute `_ -as the execution backend to run functions on remote systems. +as the execution backend to run tasks on remote systems. .. note:: Refer to :ref:`configuration-section` for information on how to configure these executors. diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index ac484beee4..f32256a4a8 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import uuid from concurrent.futures import Future from typing import Any, Callable, Dict, Optional, Union @@ -60,8 +61,7 @@ def __init__( for more info. label: - a label to name the executor; mainly utilized for - logging and advanced needs with multiple executors. + a label to name the executor batch_size: the maximum number of tasks to coalesce before @@ -108,12 +108,10 @@ def __init__( ) def start(self) -> None: - """Empty function - """ pass def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: - """ Submit fn to globus-compute + """ Submit func to globus-compute Parameters @@ -139,24 +137,21 @@ def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Future """ - self._executor.resource_specification = resource_specification or self.resource_specification + res_spec = copy.deepcopy(resource_specification or self.resource_specification) # Pop user_endpoint_config since it is illegal in resource_spec for globus_compute - self._executor.user_endpoint_config = resource_specification.pop('user_endpoint_config', self.user_endpoint_config) + if res_spec: + user_endpoint_config = res_spec.pop('user_endpoint_config', self.user_endpoint_config) + else: + user_endpoint_config = self.user_endpoint_config + + self._executor.resource_specification = res_spec + self._executor.user_endpoint_config = user_endpoint_config return self._executor.submit(func, *args, **kwargs) - def shutdown(self, wait=True, *, cancel_futures=False): + def shutdown(self): """Clean-up the resources associated with the Executor. - It is safe to call this method several times. Otherwise, no other methods - can be called after this one. - - Parameters - ---------- - - wait: If True, then this method will not return until all pending - futures have received results. - cancel_futures: If True, then this method will cancel all futures - that have not yet registered their tasks with the Compute web services. - Tasks cannot be cancelled once they are registered. + GCE.shutdown will cancel all futures that have not yet registered with + Globus Compute and will not wait for the launched futures to complete. """ - return self._executor.shutdown() + return self._executor.shutdown(wait=False, cancel_futures=True)