Skip to content

Commit ceda4c0

Browse files
Use resource dict for internal communication (#455)
* Use resource dict for internal communication * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix bug * more fixes * Update user interface to use resource_dict * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 09c15f1 commit ceda4c0

File tree

8 files changed

+215
-213
lines changed

8 files changed

+215
-213
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def calc(i):
4343
return i, size, rank
4444

4545
with flux.job.FluxExecutor() as flux_exe:
46-
with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe:
46+
with Executor(max_cores=2, executor=flux_exe, resource_dict={"cores": 2}) as exe:
4747
fs = exe.submit(calc, 3)
4848
print(fs.result())
4949
```

executorlib/__init__.py

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ class Executor:
3636
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
3737
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
3838
max_cores (int): defines the number cores which can be used in parallel
39-
cores_per_worker (int): number of MPI cores to be used for each function call
40-
threads_per_core (int): number of OpenMP threads to be used for each function call
41-
gpus_per_worker (int): number of GPUs per worker - defaults to 0
42-
cwd (str/None): current working directory where the parallel python task is executed
43-
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
44-
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
39+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
40+
- cores_per_worker (int): number of MPI cores to be used for each function call
41+
- threads_per_core (int): number of OpenMP threads to be used for each function call
42+
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
43+
- cwd (str/None): current working directory where the parallel python task is executed
44+
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
45+
SLURM only) - default False
46+
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
4547
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
4648
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
4749
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -89,12 +91,7 @@ def __init__(
8991
backend: str = "local",
9092
cache_directory: Optional[str] = None,
9193
max_cores: int = 1,
92-
cores_per_worker: int = 1,
93-
threads_per_core: int = 1,
94-
gpus_per_worker: int = 0,
95-
cwd: Optional[str] = None,
96-
openmpi_oversubscribe: bool = False,
97-
slurm_cmd_args: list[str] = [],
94+
resource_dict: Optional[dict] = None,
9895
flux_executor=None,
9996
flux_executor_pmi_mode: Optional[str] = None,
10097
flux_executor_nesting: bool = False,
@@ -114,12 +111,7 @@ def __new__(
114111
backend: str = "local",
115112
cache_directory: Optional[str] = None,
116113
max_cores: int = 1,
117-
cores_per_worker: int = 1,
118-
threads_per_core: int = 1,
119-
gpus_per_worker: int = 0,
120-
cwd: Optional[str] = None,
121-
openmpi_oversubscribe: bool = False,
122-
slurm_cmd_args: list[str] = [],
114+
resource_dict: Optional[dict] = None,
123115
flux_executor=None,
124116
flux_executor_pmi_mode: Optional[str] = None,
125117
flux_executor_nesting: bool = False,
@@ -145,12 +137,15 @@ def __new__(
145137
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
146138
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
147139
max_cores (int): defines the number cores which can be used in parallel
148-
cores_per_worker (int): number of MPI cores to be used for each function call
149-
threads_per_core (int): number of OpenMP threads to be used for each function call
150-
gpus_per_worker (int): number of GPUs per worker - defaults to 0
151-
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
152-
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
153-
cwd (str/None): current working directory where the parallel python task is executed
140+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
141+
- cores_per_worker (int): number of MPI cores to be used for each function call
142+
- threads_per_core (int): number of OpenMP threads to be used for each function call
143+
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
144+
- cwd (str/None): current working directory where the parallel python task is executed
145+
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
146+
and SLURM only) - default False
147+
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM
148+
only)
154149
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
155150
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
156151
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -172,18 +167,26 @@ def __new__(
172167
debugging purposes and to get an overview of the specified dependencies.
173168
174169
"""
170+
default_resource_dict = {
171+
"cores": 1,
172+
"threads_per_core": 1,
173+
"gpus_per_core": 0,
174+
"cwd": None,
175+
"openmpi_oversubscribe": False,
176+
"slurm_cmd_args": [],
177+
}
178+
if resource_dict is None:
179+
resource_dict = {}
180+
resource_dict.update(
181+
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
182+
)
175183
if not disable_dependencies:
176184
return ExecutorWithDependencies(
177185
max_workers=max_workers,
178186
backend=backend,
179187
cache_directory=cache_directory,
180188
max_cores=max_cores,
181-
cores_per_worker=cores_per_worker,
182-
threads_per_core=threads_per_core,
183-
gpus_per_worker=gpus_per_worker,
184-
cwd=cwd,
185-
openmpi_oversubscribe=openmpi_oversubscribe,
186-
slurm_cmd_args=slurm_cmd_args,
189+
resource_dict=resource_dict,
187190
flux_executor=flux_executor,
188191
flux_executor_pmi_mode=flux_executor_pmi_mode,
189192
flux_executor_nesting=flux_executor_nesting,
@@ -201,12 +204,7 @@ def __new__(
201204
backend=backend,
202205
cache_directory=cache_directory,
203206
max_cores=max_cores,
204-
cores_per_worker=cores_per_worker,
205-
threads_per_core=threads_per_core,
206-
gpus_per_worker=gpus_per_worker,
207-
cwd=cwd,
208-
openmpi_oversubscribe=openmpi_oversubscribe,
209-
slurm_cmd_args=slurm_cmd_args,
207+
resource_dict=resource_dict,
210208
flux_executor=flux_executor,
211209
flux_executor_pmi_mode=flux_executor_pmi_mode,
212210
flux_executor_nesting=flux_executor_nesting,

executorlib/interactive/executor.py

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,7 @@ def create_executor(
151151
backend: str = "local",
152152
max_cores: int = 1,
153153
cache_directory: Optional[str] = None,
154-
cores_per_worker: int = 1,
155-
threads_per_core: int = 1,
156-
gpus_per_worker: int = 0,
157-
cwd: Optional[str] = None,
158-
openmpi_oversubscribe: bool = False,
159-
slurm_cmd_args: list[str] = [],
154+
resource_dict: Optional[dict] = None,
160155
flux_executor=None,
161156
flux_executor_pmi_mode: Optional[str] = None,
162157
flux_executor_nesting: bool = False,
@@ -179,12 +174,14 @@ def create_executor(
179174
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
180175
max_cores (int): defines the number cores which can be used in parallel
181176
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
182-
cores_per_worker (int): number of MPI cores to be used for each function call
183-
threads_per_core (int): number of OpenMP threads to be used for each function call
184-
gpus_per_worker (int): number of GPUs per worker - defaults to 0
185-
cwd (str/None): current working directory where the parallel python task is executed
186-
openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False
187-
slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
177+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
178+
- cores_per_worker (int): number of MPI cores to be used for each function call
179+
- threads_per_core (int): number of OpenMP threads to be used for each function call
180+
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
181+
- cwd (str/None): current working directory where the parallel python task is executed
182+
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
183+
SLURM only) - default False
184+
- slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only)
188185
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
189186
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
190187
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
@@ -205,70 +202,69 @@ def create_executor(
205202
if flux_executor is not None and backend != "flux":
206203
backend = "flux"
207204
check_pmi(backend=backend, pmi=flux_executor_pmi_mode)
208-
executor_kwargs = {
209-
"cores": cores_per_worker,
210-
"hostname_localhost": hostname_localhost,
211-
"cwd": cwd,
212-
"cache_directory": cache_directory,
213-
}
205+
cores_per_worker = resource_dict["cores"]
206+
resource_dict["cache_directory"] = cache_directory
207+
resource_dict["hostname_localhost"] = hostname_localhost
214208
if backend == "flux":
215-
check_oversubscribe(oversubscribe=openmpi_oversubscribe)
216-
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args)
217-
executor_kwargs["threads_per_core"] = threads_per_core
218-
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
219-
executor_kwargs["flux_executor"] = flux_executor
220-
executor_kwargs["flux_executor_pmi_mode"] = flux_executor_pmi_mode
221-
executor_kwargs["flux_executor_nesting"] = flux_executor_nesting
209+
check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"])
210+
check_command_line_argument_lst(
211+
command_line_argument_lst=resource_dict["slurm_cmd_args"]
212+
)
213+
del resource_dict["openmpi_oversubscribe"]
214+
del resource_dict["slurm_cmd_args"]
215+
resource_dict["flux_executor"] = flux_executor
216+
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
217+
resource_dict["flux_executor_nesting"] = flux_executor_nesting
222218
if block_allocation:
223-
executor_kwargs["init_function"] = init_function
219+
resource_dict["init_function"] = init_function
224220
return InteractiveExecutor(
225221
max_workers=int(max_cores / cores_per_worker),
226-
executor_kwargs=executor_kwargs,
222+
executor_kwargs=resource_dict,
227223
spawner=FluxPythonSpawner,
228224
)
229225
else:
230226
return InteractiveStepExecutor(
231227
max_cores=max_cores,
232-
executor_kwargs=executor_kwargs,
228+
executor_kwargs=resource_dict,
233229
spawner=FluxPythonSpawner,
234230
)
235231
elif backend == "slurm":
236232
check_executor(executor=flux_executor)
237233
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
238-
executor_kwargs["threads_per_core"] = threads_per_core
239-
executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
240-
executor_kwargs["slurm_cmd_args"] = slurm_cmd_args
241-
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe
242234
if block_allocation:
243-
executor_kwargs["init_function"] = init_function
235+
resource_dict["init_function"] = init_function
244236
return InteractiveExecutor(
245237
max_workers=int(max_cores / cores_per_worker),
246-
executor_kwargs=executor_kwargs,
238+
executor_kwargs=resource_dict,
247239
spawner=SrunSpawner,
248240
)
249241
else:
250242
return InteractiveStepExecutor(
251243
max_cores=max_cores,
252-
executor_kwargs=executor_kwargs,
244+
executor_kwargs=resource_dict,
253245
spawner=SrunSpawner,
254246
)
255247
else: # backend="local"
256-
check_threads_per_core(threads_per_core=threads_per_core)
257-
check_gpus_per_worker(gpus_per_worker=gpus_per_worker)
258-
check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args)
259248
check_executor(executor=flux_executor)
260249
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
261-
executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe
250+
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
251+
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
252+
check_command_line_argument_lst(
253+
command_line_argument_lst=resource_dict["slurm_cmd_args"]
254+
)
255+
del resource_dict["threads_per_core"]
256+
del resource_dict["gpus_per_core"]
257+
del resource_dict["slurm_cmd_args"]
262258
if block_allocation:
263-
executor_kwargs["init_function"] = init_function
259+
resource_dict["init_function"] = init_function
264260
return InteractiveExecutor(
265261
max_workers=int(max_cores / cores_per_worker),
266-
executor_kwargs=executor_kwargs,
262+
executor_kwargs=resource_dict,
267263
spawner=MpiExecSpawner,
268264
)
269265
else:
270266
return InteractiveStepExecutor(
271267
max_cores=max_cores,
272-
executor_kwargs=executor_kwargs,
268+
executor_kwargs=resource_dict,
273269
spawner=MpiExecSpawner,
274270
)

0 commit comments

Comments
 (0)