Skip to content

Commit

Permalink
Add comments for ams flux
Browse files Browse the repository at this point in the history
  • Loading branch information
koparasy committed Aug 22, 2024
1 parent 262be05 commit ab060a0
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions src/AMSWorkflow/ams/ams_flux.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Copyright 2021-2023 Lawrence Livermore National Security, LLC and other
# AMSLib Project Developers
#
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

import threading
import logging
from concurrent.futures import Future
Expand All @@ -6,16 +11,25 @@
import collections
import subprocess
import types
from typing import Union


from flux.job.event import MAIN_EVENTS
from flux.job import FluxExecutor
import flux
from queue import Queue

from ams.orchestrator import AvailableDomains, DomainSpec


# pylint: disable=too-many-instance-attributes
class AMSFluxExecutorFuture(Future):
"""A ``concurrent.futures.Future`` subclass that represents a single Flux job.
"""A ``concurrent.futures.Future`` subclass that represents a AMSTrain job.
The class provides a future abstraction for training jobs. In 'AMS' training jobs
are actually a sequence of 2 jobs. First we do the sub-selection and right after
we schedule a ml model training job. The future provides accessor to the description
of the current AMS training job and the phase in which the job is part off.
In addition to all of the ``flux.job.FluxExecutorFuture`` functionality,
``AMSFluxExecutorFuture`` instances offer:
Expand All @@ -38,7 +52,16 @@ def __get_uri_cb(fut, eventlog):
fut._set_uri(None, KeyError(f"uri does not exist in memo callback {eventlog}"))
fut._set_uri(eventlog.context["uri"])

def __init__(self, owning_thread_id, flux_executor, track_uri, domain_descr, *args, **kwargs):
def __init__(
self,
owning_thread_id: int,
flux_executor: Union[AMSFluxOrchestratorExecutor, AMSFluxExecutor],
track_uri: bool,
domain_descr: DomainSpec,
*args,
**kwargs,
):

super().__init__(*args, **kwargs)
# Thread.ident of thread tasked with completing this future
self.__owning_thread_id = owning_thread_id
Expand Down Expand Up @@ -349,6 +372,12 @@ def shutdown_event(self):


class _WorkItem:
"""
A Workitem represents a scheduled jobs that will now be executed. The purpose of the class
is to provide our own "exeucution" vehicle when flux is not present. This is to be used only for
debugging purposes in composition with the AMSFakeFluxOrchestatorExecutor.
"""

def __init__(self, future, spec):
self.future = future
self.spec = spec
Expand Down Expand Up @@ -395,14 +424,29 @@ def run(self):


class AMSFakeFluxOrchestatorExecutor(ThreadPoolExecutor):
def __init__(self, o_queue, domains, *args, **kwargs):
"""
A class to emulate a FluxExecutor with subprocesses. For every job being submitted instead
of running it with flux job submission we use subprocess to run a single instantiation of the
job.
"""

def __init__(self, o_queue: Queue, domains: AvailableDomains, *args, **kwargs):
self._o_queue = o_queue
self._domains = domains
print("Args", args)
print("kwargs", kwargs)
super().__init__(*args, **kwargs)

def submit(self, domain, job_spec):
def submit(self, domain: DomainSpec, job_spec):
"""Submit a jobspec to Flux and return a ``FluxExecutorFuture``.
Accepts the same positional and keyword arguments as ``flux.job.submit``,
except for the ``flux.job.submit`` function's first argument, ``flux_handle``.
:param domain: AMSDomain training job description
:param jobspec: jobspec defining the job request
:type jobspec: Jobspec or its string encoding
:raises RuntimeError: if ``shutdown`` has been called or if an error has
occurred and new jobs cannot be submitted (e.g. a remote Flux instance
can no longer be communicated with).
"""
with self._shutdown_lock:
if self._broken:
raise RuntimeError(f"Thread is broken {self._broken}")
Expand Down Expand Up @@ -474,5 +518,6 @@ def _create_future(self, domain, factory, *factory_args):
def get_o_queue(self):
return self._o_queue

return self._domains
@property
def domains(self):
return self._domains

0 comments on commit ab060a0

Please sign in to comment.