Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
Run less code outside works (#31)
Browse files Browse the repository at this point in the history
* make args pickable

* update to callback

* remove return value

* cleanup option

* prepare release

* lint

* extract function

* remove defaults

* remove DVC todo

* poetry update
  • Loading branch information
PythonFZ authored Apr 28, 2023
1 parent 0aeca90 commit c44a98c
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 286 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@
[DVC](dvc.org) provides tools for building and executing the computational graph locally through various methods.
The `dask4dvc` package combines [Dask Distributed](https://distributed.dask.org/) with DVC to make it easier to use with HPC managers like [Slurm](https://github.com/SchedMD/slurm).

The `dask4dvc` package will try to run the DVC graph in parallel.
The `dask4dvc repro` package will run the DVC graph in parallel where possible.
Currently, `dask4dvc run` will not run stages per experiment sequentially.

> :warning: This is an experimental package **not** affiliated in any way with iterative or DVC. ``dask4dvc`` will disbale a few of the checks that DVC implements. Do not make changes to your workspace during the runtime of `dask4dvc`.
> :warning: This is an experimental package **not** affiliated in any way with iterative or DVC.
## Usage
Dask4DVC provides a CLI similar to DVC.

- `dvc repro` becomes `dask4dvc repro`.
- `dvc queue start --jobs 1` becomes `dask4dvc run`
- `dvc queue start` becomes `dask4dvc run`

You can follow the progress using `dask4dvc <cmd> --dashboard`.

> `dask4dvc run --parallel` is available for `dvc queue start --jobs <max-workers>` but it currently leads to the failure of some experiments.
> The `dask4dvc` error messages are currently really sparse. For better error messages please use the DVC commands.

### SLURM Cluster

Expand All @@ -45,6 +43,7 @@ cluster.adapt()
with this setup you can then run `dask4dvc repro --address 127.0.0.1:31415` on the example port `31415`.

You can also use config files with `dask4dvc repro --config myconfig.yaml`.
All `dask.distributed` Clusters should be supported.

```yaml
default:
Expand Down
26 changes: 12 additions & 14 deletions dask4dvc/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ def clean() -> None:

@app.command()
def repro(
targets: typing.List[str] = typer.Argument(None),
targets: typing.List[str] = typer.Argument(
None, help="Name of stages to reproduce. Leave emtpy to run the full graph."
),
address: str = typer.Option(None, help=Help.address),
leave: bool = typer.Option(True, help=Help.leave),
config: str = typer.Option(None, help=Help.config),
Expand All @@ -52,6 +54,7 @@ def repro(
option: typing.List[str] = typer.Option(
None, "-o", "--option", help="Additional dvc repro options"
),
cleanup: bool = typer.Option(True, help="Remove temporary experiments when done"),
) -> None:
"""Replicate 'dvc repro' command using dask."""
if len(option) != 0:
Expand All @@ -72,37 +75,34 @@ def repro(
client.cluster.adapt(minimum=1, maximum=max_workers)
log.info(client)

mapping, experiments, cleanup_data = dvc_repro.parallel_submit(
client, repo, stages
)
# check if futures are succesful
dask.distributed.Future
mapping, experiments = dvc_repro.parallel_submit(client, repo, stages)

wait_for_futures(client, mapping)
for cleanup in cleanup_data:
dvc_repro.collect_and_cleanup(**cleanup)
if all(x.status == "finished" for x in mapping.values()):
log.info("All stages finished successfully")
# dvc.cli.main(["exp", "apply", experiments[-1]])
dask.distributed.wait(
client.submit(subprocess.check_call, ["dvc", "repro", *targets])
)
dvc_repro.remove_experiments(experiments)
if cleanup:
dvc_repro.remove_experiments(experiments)

if not leave:
_ = input("Press Enter to close the client")


@app.command()
def run(
targets: typing.List[str] = typer.Argument(None),
targets: typing.List[str] = typer.Argument(
None, help="Name of the DVC experiments to reproduce. Leave emtpy to run all."
),
address: str = typer.Option(None, help=Help.address),
leave: bool = typer.Option(True, help=Help.leave),
config: str = typer.Option(None, help=Help.config),
max_workers: int = typer.Option(None, help=Help.max_workers),
dashboard: bool = typer.Option(False, help=Help.dashboard),
) -> None:
"""Run a DVC experiment."""
"""Replicate 'dvc queue start' using dask."""
if len(targets) == 0:
targets = None

Expand All @@ -119,12 +119,10 @@ def run(
client.cluster.adapt(minimum=1, maximum=max_workers)
log.info(client)

mapping, _, cleanup_data = dvc_repro.experiment_submit(client, repo, targets)
mapping, _ = dvc_repro.experiment_submit(client, repo, targets)

wait_for_futures(client, mapping)
# dvc_repro.remove_experiments(experiments)
for cleanup in cleanup_data:
dvc_repro.collect_and_cleanup(**cleanup)

if not leave:
_ = input("Press Enter to close the client")
Expand Down
90 changes: 44 additions & 46 deletions dask4dvc/dvc_repro.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Dask4DVC to DVC repo interface."""
import dataclasses
import functools
import logging
import subprocess
import typing
Expand All @@ -10,7 +11,7 @@
import dvc.repo
from dvc.repo.experiments.executor.local import TempDirExecutor
from dvc.repo.experiments.queue import tasks
from dvc.repo.experiments.queue.base import QueueEntry
from dvc.repo.experiments.queue.base import BaseStashQueue, QueueEntry
from dvc.repo.reproduce import _get_steps
from dvc.stage import PipelineStage

Expand Down Expand Up @@ -105,89 +106,86 @@ def remove_experiments(experiments: typing.List[str] = None) -> None:


def collect_and_cleanup(
entry_dict: dict, executor: TempDirExecutor, infofile: str
future: dask.distributed.Future, entry_dict: dict, infofile: str
) -> None:
"""Collect the results of a finished experiment and clean up."""
try:
tasks.collect_exp(proc_dict=None, entry_dict=entry_dict)
finally:
executor.cleanup(infofile)
entry = QueueEntry.from_dict(entry_dict)
with dvc.repo.Repo(entry.dvc_root) as repo:
executor = BaseStashQueue.init_executor(
repo.experiments,
entry,
TempDirExecutor,
location="dvc-task",
)
executor.cleanup(infofile)


def submit_to_dask(
client: dask.distributed.Client, infofile: str, entry: QueueEntry, successors: list
) -> dask.distributed.Future:
"""Submit a queued experiment to run with Dask."""
future = client.submit(
exec_run,
infofile=infofile,
successors=successors,
pure=False,
key=entry.name,
)

future.add_done_callback(
functools.partial(
collect_and_cleanup,
entry_dict=dataclasses.asdict(entry),
infofile=infofile,
)
)
return future


def parallel_submit(
client: dask.distributed.Client,
repo: dvc.repo.Repo,
stages: typing.Dict[PipelineStage, str],
) -> typing.Tuple[
typing.Dict[PipelineStage, dask.distributed.Future],
typing.List[str],
typing.List[dict],
]:
) -> typing.Tuple[typing.Dict[PipelineStage, dask.distributed.Future], typing.List[str],]:
"""Submit experiments in parallel."""
mapping = {}
queue_entries = get_all_queue_entries(repo)
experiments = []

cleanup_data = []

for stage in stages:
log.debug(f"Preparing experiment '{stages[stage]}'")
entry, infofile = queue_entries[stages[stage]]
executor = tasks.setup_exp(dataclasses.asdict(entry))
tasks.setup_exp(dataclasses.asdict(entry))

# we use get here, because some stages won't be queued, such as dependency files
successors = [
mapping.get(successor) for successor in repo.index.graph.successors(stage)
]
mapping[stage] = client.submit(
exec_run,
infofile=infofile,
successors=successors,
pure=False,
key=entry.name,
)
mapping[stage] = submit_to_dask(client, infofile, entry, successors)

cleanup_data.append(
{
"executor": executor,
"infofile": infofile,
"entry_dict": dataclasses.asdict(entry),
}
)
experiments.append(entry.name)

return mapping, experiments, cleanup_data
return mapping, experiments


def experiment_submit(
client: dask.distributed.Client, repo: dvc.repo.Repo, experiments: typing.List[str]
) -> typing.Tuple[
typing.Dict[str, dask.distributed.Future], typing.List[str], typing.List[dict]
]:
) -> typing.Tuple[typing.Dict[str, dask.distributed.Future], typing.List[str]]:
"""Submit experiments in parallel."""
queue_entries = get_all_queue_entries(repo)
if experiments is None:
experiments = list(queue_entries.keys())
mapping = {}
print(f"Submitting experiments: {experiments}")
cleanup_data = []

for experiment in experiments:
log.critical(f"Preparing experiment '{experiment}'")
entry, infofile = queue_entries[experiment]
executor = tasks.setup_exp(dataclasses.asdict(entry))
tasks.setup_exp(dataclasses.asdict(entry))

mapping[experiment] = client.submit(
exec_run,
infofile=infofile,
successors=[],
pure=False,
key=entry.name,
)
cleanup_data.append(
{
"executor": executor,
"infofile": infofile,
"entry_dict": dataclasses.asdict(entry),
}
)
return mapping, list(mapping.keys()), cleanup_data
mapping[experiment] = submit_to_dask(client, infofile, entry, None)

return mapping, list(mapping.keys())
Loading

0 comments on commit c44a98c

Please sign in to comment.