Skip to content

Commit

Permalink
Spectrum adapter remove/lingering in-class import. (#352)
Browse files Browse the repository at this point in the history
* Commit 38567e7: Spectrum adapter remove/lingering in-class import.
Commit 51fb0fd: Remove Spectrum adapter tests

Short term: This fixes a bug where a stray import prevented the
FluxScriptAdapter from being pickled for interprocess communication.

Long term: Cleans up the code from the stale Spectrum adapter and
makes it so that we have less dead code. It prevents us having to
waste time looking through code that is no longer used.

Remove Spectrum adapter tests

* Tweaks to fix imports.

* Update docker instructions.

* Module nicknaming to prevent aliasing.

* Logging bugfix to include record name.
  • Loading branch information
Francesco Di Natale authored Mar 9, 2021
1 parent 9dd0774 commit 877aefd
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 497 deletions.
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,45 @@ Once set up, test the environment. The paths should point to a virtual environme
which python
which pip

----------------

## Using Maestro Dockerfiles


Maestro comes packaged with a set of Docker files for testing things out. The two primary files
are:

- A standard `Dockerfile` in the root of the Maestro repository. This file is a standard install
of Maestro meant to try out Maestro on the demo samples provided with this repository. In order
to try Maestro locally, with [Docker](https://www.docker.com/) installed run:

```
docker build -t maestrowf .
docker run -ti maestrowf
```

From within the container run the following:

```
maestro run ./maestrowf/samples/lulesh/lulesh_sample1_unix.yaml
```

- In order to try out Flux 0.19.0 integration, from the root of the Maestro repository run the
following:

```
docker build -t flux_0190 -f ./docker/flux/0.19.0/Dockerfile .
docker run -ti flux_0190
```

From within the container run the following:

```
maestro run ./maestrowf/samples/lulesh/lulesh_sample1_unix_flux.yaml
```

----------------

## Contributors

Many thanks go to MaestroWF's [contributors](https://github.com/LLNL/maestrowf/graphs/contributors).
Expand Down
2 changes: 1 addition & 1 deletion maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ def execute_ready_steps(self):

elif status == State.RUNNING:
# When detect that a step is running, mark it.
LOGGER.info("Step '%s' found to be running.")
LOGGER.info("Step '%s' found to be running.", record.name)
record.mark_running()

elif status == State.TIMEDOUT:
Expand Down
50 changes: 25 additions & 25 deletions maestrowf/interfaces/script/_flux/flux0_18_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
LOGGER = logging.getLogger(__name__)

try:
import flux
from flux import constants as flux_constants
from flux import job as flux_job
from flux import Flux
except ImportError:
LOGGER.info("Failed to import Flux. Continuing.")

Expand Down Expand Up @@ -63,7 +65,7 @@ def submit(
ngpus=0, job_name=None, force_broker=False
):
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")

# NOTE: This previously placed everything under a broker. However,
Expand All @@ -79,15 +81,15 @@ def submit(
force_broker, nodes
)
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
jobspec = flux_job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=nodes, gpus_per_slot=ngpus_per_slot)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, nodes=%d]",
force_broker, nodes
)
jobspec = flux.job.JobspecV1.from_command(
jobspec = flux_job.JobspecV1.from_command(
[path], num_tasks=procs, num_nodes=nodes,
cores_per_task=cores_per_task, gpus_per_task=ngpus)

Expand All @@ -108,7 +110,7 @@ def submit(
try:
# Submit our job spec.
jobid = \
flux.job.submit(cls.flux_handle, jobspec, waitable=True)
flux_job.submit(cls.flux_handle, jobspec, waitable=True)
submit_status = SubmissionCode.OK
retcode = 0

Expand Down Expand Up @@ -173,7 +175,7 @@ def get_statuses(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")

LOGGER.debug(
Expand All @@ -187,9 +189,8 @@ def get_statuses(cls, joblist):
}

for jobid in joblist:
rpc_handle = \
flux.job.job_list_id(
cls.flux_handle, int(jobid), list(cls.attrs))
rpc_handle = flux_job.job_list_id(
cls.flux_handle, int(jobid), list(cls.attrs))
rpc_handle.then(cls.status_callback, arg=(int(jobid), cb_args))
ret = cls.flux_handle.reactor_run(rpc_handle.get_reactor(), 0)

Expand All @@ -200,16 +201,17 @@ def get_statuses(cls, joblist):
chk_status = JobStatusCode.ERROR

statuses = {}
for job in cb_args["jobs"]:
if job[0] == "NF":
statuses[job[1]["id"]] = State.NOTFOUND
elif job[0] == "UNK":
statuses[job[1]["id"]] = State.UNKNOWN
for job_entry in cb_args["jobs"]:
if job_entry[0] == "NF":
statuses[job_entry[1]["id"]] = State.NOTFOUND
elif job_entry[0] == "UNK":
statuses[job_entry[1]["id"]] = State.UNKNOWN
else:
LOGGER.debug(
"Job checked with status '%s'\nEntry: %s", job[0], job[1])
statuses[job[1]["id"]] = \
cls.statustostr(job[1], True)
"Job checked with status '%s'\nEntry: %s",
job_entry[0], job_entry[1])
statuses[job_entry[1]["id"]] = \
cls.statustostr(job_entry[1], True)
return chk_status, statuses

@classmethod
Expand All @@ -227,16 +229,14 @@ def resulttostr(cls, resultid, singlechar=False):

@classmethod
def statustostr(cls, job_entry, abbrev=True):
flux = __import__("flux", fromlist=["constants"])

stateid = job_entry["state"]
LOGGER.debug(
"JOBID [%d] -- Encountered (%s)", job_entry["id"], stateid)

if stateid & flux.constants.FLUX_JOB_PENDING:
if stateid & flux_constants.FLUX_JOB_PENDING:
LOGGER.debug("Marking as PENDING.")
statusstr = "PD" if abbrev else "PENDING"
elif stateid & flux.constants.FLUX_JOB_RUNNING:
elif stateid & flux_constants.FLUX_JOB_RUNNING:
LOGGER.debug("Marking as RUNNING.")
statusstr = "R" if abbrev else "RUNNING"
else:
Expand All @@ -258,7 +258,7 @@ def cancel(cls, joblist):
# We need to import flux here, as it may not be installed on
# all systems.
if not cls.flux_handle:
cls.flux_handle = flux.Flux()
cls.flux_handle = Flux()
LOGGER.debug("New Flux instance created.")

LOGGER.debug(
Expand All @@ -270,10 +270,10 @@ def cancel(cls, joblist):

cancel_code = CancelCode.OK
cancel_rcode = 0
for job in joblist:
for entry in joblist:
try:
LOGGER.debug("Cancelling Job %s...", job)
flux.job.cancel(cls.flux_handle, int(job))
LOGGER.debug("Cancelling Job %s...", entry)
entry.cancel(cls.flux_handle, int(entry))
except Exception as exception:
LOGGER.error(str(exception))
cancel_code = CancelCode.ERROR
Expand Down
Loading

0 comments on commit 877aefd

Please sign in to comment.