From 0cbd46751ef1d05d6f1f170b256ffc3f67f34d87 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 25 Aug 2020 01:03:10 +0000 Subject: [PATCH 1/2] build: remove recursive Makefile.am in python bindings Combine the Makefile.am files in src/bindings/python/flux and src/bindings/python/flux/core into a single Makefile.am. Use nobase_ prefix so that all Python source files can be listed in the same rule. The special `fluxpycoredir` automake variable is no longer needed, so it can be dropped. This should make the bindings Makefiles a bit easier to maintain by reducing duplication and the number of files to edit when new Python source files are added. --- configure.ac | 1 - src/bindings/python/flux/Makefile.am | 37 ++++++++++++----------- src/bindings/python/flux/core/Makefile.am | 11 ------- 3 files changed, 20 insertions(+), 29 deletions(-) delete mode 100644 src/bindings/python/flux/core/Makefile.am diff --git a/configure.ac b/configure.ac index 992184690798..d067956f2cc2 100644 --- a/configure.ac +++ b/configure.ac @@ -487,7 +487,6 @@ AC_CONFIG_FILES( \ src/bindings/lua/Makefile \ src/bindings/python/Makefile \ src/bindings/python/flux/Makefile \ - src/bindings/python/flux/core/Makefile \ src/bindings/python/_flux/Makefile \ src/broker/Makefile \ src/cmd/Makefile \ diff --git a/src/bindings/python/flux/Makefile.am b/src/bindings/python/flux/Makefile.am index 14a4379f0e9f..f82b8a2e6be8 100644 --- a/src/bindings/python/flux/Makefile.am +++ b/src/bindings/python/flux/Makefile.am @@ -1,25 +1,28 @@ -fluxpy_PYTHON=\ - __init__.py\ - kvs.py\ - wrapper.py\ - rpc.py\ - message.py\ - constants.py\ - job.py \ - util.py \ - future.py \ - memoized_property.py \ - debugged.py +nobase_fluxpy_PYTHON = \ + __init__.py \ + kvs.py \ + wrapper.py \ + rpc.py \ + message.py \ + constants.py \ + job.py \ + util.py \ + future.py \ + memoized_property.py \ + debugged.py \ + core/__init__.py \ + core/watchers.py \ + core/inner.py \ + core/handle.py \ + core/trampoline.py if HAVE_FLUX_SECURITY -fluxpy_PYTHON += security.py +nobase_fluxpy_PYTHON += security.py endif clean-local: - -rm -f *.pyc *.pyo - -rm -rf __pycache__ - -SUBDIRS = core + -rm -f *.pyc */*.pyc *.pyo */*.pyo + -rm -rf __pycache__ */__pycache__ install-data-hook: $(AM_V_at)echo Linking python modules in non-standard location... && \ diff --git a/src/bindings/python/flux/core/Makefile.am b/src/bindings/python/flux/core/Makefile.am deleted file mode 100644 index d0c4c3a22072..000000000000 --- a/src/bindings/python/flux/core/Makefile.am +++ /dev/null @@ -1,11 +0,0 @@ -fluxpycoredir = $(fluxpydir)/core -fluxpycore_PYTHON=\ - __init__.py\ - watchers.py\ - inner.py\ - handle.py\ - trampoline.py - -clean-local: - -rm -f *.pyc *.pyo - -rm -rf __pycache__ From 6cc9dda1d8ced5deca7bfd12db3e2599b986f7e8 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Mon, 24 Aug 2020 22:13:40 +0000 Subject: [PATCH 2/2] python: split flux.job module into multiple files Problem: The Python 'flux.job' module is large (>1200 lines) and thus getting difficult to read and/or modify. This module will only get larger since there will certainly be more classes and methods related to Flux jobs added to the Python API. Split `src/bindings/python/flux/job.py` into multiple files, while attempting to keep related code together without making any one component too large. For backwards compatibility, recreate the previous `flux.job` namespace in the __init__.py in the new job directory. --- src/bindings/python/flux/Makefile.am | 13 +- src/bindings/python/flux/job/JobID.py | 99 +++ .../python/flux/{job.py => job/Jobspec.py} | 575 +----------------- src/bindings/python/flux/job/__init__.py | 26 + src/bindings/python/flux/job/_wrapper.py | 20 + src/bindings/python/flux/job/event.py | 180 ++++++ src/bindings/python/flux/job/kill.py | 66 ++ src/bindings/python/flux/job/kvs.py | 38 ++ src/bindings/python/flux/job/list.py | 57 ++ src/bindings/python/flux/job/submit.py | 126 ++++ src/bindings/python/flux/job/wait.py | 87 +++ 11 files changed, 713 insertions(+), 574 deletions(-) create mode 100644 src/bindings/python/flux/job/JobID.py rename src/bindings/python/flux/{job.py => job/Jobspec.py} (59%) create mode 100644 src/bindings/python/flux/job/__init__.py create mode 100644 src/bindings/python/flux/job/_wrapper.py create mode 100644 src/bindings/python/flux/job/event.py create mode 100644 src/bindings/python/flux/job/kill.py create mode 100644 src/bindings/python/flux/job/kvs.py create mode 100644 src/bindings/python/flux/job/list.py create mode 100644 src/bindings/python/flux/job/submit.py create mode 100644 src/bindings/python/flux/job/wait.py diff --git a/src/bindings/python/flux/Makefile.am b/src/bindings/python/flux/Makefile.am index f82b8a2e6be8..b31cd45e59d9 100644 --- a/src/bindings/python/flux/Makefile.am +++ b/src/bindings/python/flux/Makefile.am @@ -5,7 +5,6 @@ nobase_fluxpy_PYTHON = \ rpc.py \ message.py \ constants.py \ - job.py \ util.py \ future.py \ memoized_property.py \ @@ -14,7 +13,17 @@ nobase_fluxpy_PYTHON = \ core/watchers.py \ core/inner.py \ core/handle.py \ - core/trampoline.py + core/trampoline.py \ + job/__init__.py \ + job/JobID.py \ + job/Jobspec.py \ + job/event.py \ + job/kill.py \ + job/kvs.py \ + job/list.py \ + job/submit.py \ + job/wait.py \ + job/_wrapper.py if HAVE_FLUX_SECURITY nobase_fluxpy_PYTHON += security.py diff --git a/src/bindings/python/flux/job/JobID.py b/src/bindings/python/flux/job/JobID.py new file mode 100644 index 000000000000..d7ba52344c7c --- /dev/null +++ b/src/bindings/python/flux/job/JobID.py @@ -0,0 +1,99 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### + +from flux.job._wrapper import _RAW as RAW +from _flux._core import ffi + + +def id_parse(jobid_str): + """ + returns: An integer jobid + :rtype int + """ + jobid = ffi.new("flux_jobid_t[1]") + RAW.id_parse(jobid_str, jobid) + return int(jobid[0]) + + +def id_encode(jobid, encoding="f58"): + """ + returns: Jobid encoded in encoding + :rtype str + """ + buflen = 128 + buf = ffi.new("char[]", buflen) + RAW.id_encode(int(jobid), encoding, buf, buflen) + return ffi.string(buf, buflen).decode("utf-8") + + +class JobID(int): + """Class used to represent a Flux JOBID + + JobID is a subclass of `int`, so may be used in place of integer. + However, a JobID may be created from any valid RFC 19 FLUID + encoding, including: + + - decimal integer (no prefix) + - hexidecimal integer (prefix 0x) + - dotted hex (dothex) (xxxx.xxxx.xxxx.xxxx) + - kvs dir (dotted hex with `job.` prefix) + - RFC19 F58: (Base58 encoding with prefix `ƒ` or `f`) + + A JobID object also has properties for encoding a JOBID into each + of the above representations, e.g. jobid.f85, jobid.words, jobid.dothex... + + """ + + def __new__(cls, value): + if isinstance(value, int): + jobid = value + else: + jobid = id_parse(value) + return super(cls, cls).__new__(cls, jobid) + + def encode(self, encoding="dec"): + """Encode a JobID to alternate supported format""" + return id_encode(self, encoding) + + @property + def dec(self): + """Return decimal integer representation of a JobID""" + return self.encode() + + @property + def f58(self): + """Return RFC19 F58 representation of a JobID""" + return self.encode("f58") + + @property + def hex(self): + """Return 0x-prefixed hexidecimal representation of a JobID""" + return self.encode("hex") + + @property + def dothex(self): + """Return dotted hexidecimal representation of a JobID""" + return self.encode("dothex") + + @property + def words(self): + """Return words (mnemonic) representation of a JobID""" + return self.encode("words") + + @property + def kvs(self): + """Return KVS directory path of a JobID""" + return self.encode("kvs") + + def __str__(self): + return self.encode("f58") + + def __repr__(self): + return f"JobID({self.dec})" diff --git a/src/bindings/python/flux/job.py b/src/bindings/python/flux/job/Jobspec.py similarity index 59% rename from src/bindings/python/flux/job.py rename to src/bindings/python/flux/job/Jobspec.py index 26586f1118d7..e99ee2a6a7cc 100644 --- a/src/bindings/python/flux/job.py +++ b/src/bindings/python/flux/job/Jobspec.py @@ -1,5 +1,5 @@ ############################################################### -# Copyright 2014 Lawrence Livermore National Security, LLC +# Copyright 2020 Lawrence Livermore National Security, LLC # (c.f. AUTHORS, NOTICE.LLNS, COPYING) # # This file is part of the Flux resource manager framework. @@ -15,26 +15,12 @@ import collections import collections.abc as abc import numbers -import signal import six import yaml -import flux.kvs -from flux import constants -from flux.wrapper import Wrapper -from flux.util import check_future_error, parse_fsd -from flux.future import Future -from flux.rpc import RPC -from _flux._core import ffi, lib - - -class JobWrapper(Wrapper): - def __init__(self): - super(JobWrapper, self).__init__(ffi, lib, prefixes=["flux_job_"]) - - -RAW = JobWrapper() +from _flux._core import ffi +from flux.util import parse_fsd def _convert_jobspec_arg_to_string(jobspec): @@ -60,561 +46,6 @@ def _convert_jobspec_arg_to_string(jobspec): return jobspec -def job_kvs(flux_handle, jobid): - """ - :returns: The KVS directory of the given job - :rtype: KVSDir - """ - - path_len = 1024 - buf = ffi.new("char[]", path_len) - RAW.kvs_key(buf, path_len, jobid, "") - kvs_key = ffi.string(buf, path_len) - return flux.kvs.get_dir(flux_handle, kvs_key) - - -def job_kvs_guest(flux_handle, jobid): - """ - :returns: The KVS guest directory of the given job - :rtype: KVSDir - """ - - path_len = 1024 - buf = ffi.new("char[]", path_len) - RAW.kvs_guest_key(buf, path_len, jobid, "") - kvs_key = ffi.string(buf, path_len) - return flux.kvs.get_dir(flux_handle, kvs_key) - - -def id_parse(jobid_str): - """ - returns: An integer jobid - :rtype int - """ - jobid = ffi.new("flux_jobid_t[1]") - RAW.id_parse(jobid_str, jobid) - return int(jobid[0]) - - -def id_encode(jobid, encoding="f58"): - """ - returns: Jobid encoded in encoding - :rtype str - """ - buflen = 128 - buf = ffi.new("char[]", buflen) - RAW.id_encode(int(jobid), encoding, buf, buflen) - return ffi.string(buf, buflen).decode("utf-8") - - -class JobID(int): - """Class used to represent a Flux JOBID - - JobID is a subclass of `int`, so may be used in place of integer. - However, a JobID may be created from any valid RFC 19 FLUID - encoding, including: - - - decimal integer (no prefix) - - hexidecimal integer (prefix 0x) - - dotted hex (dothex) (xxxx.xxxx.xxxx.xxxx) - - kvs dir (dotted hex with `job.` prefix) - - RFC19 F58: (Base58 encoding with prefix `ƒ` or `f`) - - A JobID object also has properties for encoding a JOBID into each - of the above representations, e.g. jobid.f85, jobid.words, jobid.dothex... - - """ - - def __new__(cls, value, *args, **kwargs): - if isinstance(value, int): - jobid = value - else: - jobid = id_parse(value) - return super(cls, cls).__new__(cls, jobid) - - def encode(self, encoding="dec"): - """Encode a JobID to alternate supported format""" - return id_encode(self, encoding) - - @property - def dec(self): - """Return decimal integer representation of a JobID""" - return self.encode() - - @property - def f58(self): - """Return RFC19 F58 representation of a JobID""" - return self.encode("f58") - - @property - def hex(self): - """Return 0x-prefixed hexidecimal representation of a JobID""" - return self.encode("hex") - - @property - def dothex(self): - """Return dotted hexidecimal representation of a JobID""" - return self.encode("dothex") - - @property - def words(self): - """Return words (mnemonic) representation of a JobID""" - return self.encode("words") - - @property - def kvs(self): - """Return KVS directory path of a JobID""" - return self.encode("kvs") - - def __str__(self): - return self.encode("f58") - - def __repr__(self): - return f"JobID({self.dec})" - - -class SubmitFuture(Future): - def get_id(self): - return submit_get_id(self) - - -def submit_async( - flux_handle, - jobspec, - priority=lib.FLUX_JOB_PRIORITY_DEFAULT, - waitable=False, - debug=False, - pre_signed=False, -): - """Ask Flux to run a job, without waiting for a response - - Submit a job to Flux. This method returns immediately with a - Flux Future, which can be used obtain the job ID later. - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobspec: jobspec defining the job request - :type jobspec: Jobspec or its string encoding - :param priority: job priority 0 (lowest) through 31 (highest) - (default is 16). Priorities 0 through 15 are restricted to - the instance owner. - :type priority: int - :param waitable: allow result to be fetched with job.wait() - (default is False). Waitable=True is restricted to the - instance owner. - :type waitable: bool - :param debug: enable job manager debugging events to job eventlog - (default is False) - :type debug: bool - :param pre_signed: jobspec argument is already signed - (default is False) - :type pre_signed: bool - :returns: a Flux Future object for obtaining the assigned jobid - :rtype: Future - """ - jobspec = _convert_jobspec_arg_to_string(jobspec) - flags = 0 - if waitable: - flags |= constants.FLUX_JOB_WAITABLE - if debug: - flags |= constants.FLUX_JOB_DEBUG - if pre_signed: - flags |= constants.FLUX_JOB_PRE_SIGNED - future_handle = RAW.submit(flux_handle, jobspec, priority, flags) - return SubmitFuture(future_handle) - - -@check_future_error -def submit_get_id(future): - """Get job ID from a Future returned by job.submit_async() - - Process a response to a Flux job submit request. This method blocks - until the response is received, then decodes the result to obtain - the assigned job ID. - - :param future: a Flux future object returned by job.submit_async() - :type future: Future - :returns: job ID - :rtype: int - """ - if future is None or future == ffi.NULL: - raise EnvironmentError(errno.EINVAL, "future must not be None/NULL") - future.wait_for() # ensure the future is fulfilled - jobid = ffi.new("flux_jobid_t[1]") - RAW.submit_get_id(future, jobid) - return int(jobid[0]) - - -def submit( - flux_handle, - jobspec, - priority=lib.FLUX_JOB_PRIORITY_DEFAULT, - waitable=False, - debug=False, - pre_signed=False, -): - """Submit a job to Flux - - Ask Flux to run a job, blocking until a job ID is assigned. - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobspec: jobspec defining the job request - :type jobspec: Jobspec or its string encoding - :param priority: job priority 0 (lowest) through 31 (highest) - (default is 16). Priorities 0 through 15 are restricted to - the instance owner. - :type priority: int - :param waitable: allow result to be fetched with job.wait() - (default is False). Waitable=true is restricted to the - instance owner. - :type waitable: bool - :param debug: enable job manager debugging events to job eventlog - (default is False) - :type debug: bool - :param pre_signed: jobspec argument is already signed - (default is False) - :type pre_signed: bool - :returns: job ID - :rtype: int - """ - future = submit_async(flux_handle, jobspec, priority, waitable, debug, pre_signed) - return future.get_id() - - -class JobWaitFuture(Future): - def get_status(self): - return wait_get_status(self) - - -def wait_async(flux_handle, jobid=lib.FLUX_JOBID_ANY): - """Wait for a job to complete, asynchronously - - Submit a request to wait for job completion. This method returns - immediately with a Flux Future, which can be used to process - the result later. - - Only jobs submitted with waitable=True can be waited for. - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID to wait for (default is any waitable job) - :returns: a Flux Future object for obtaining the job result - :rtype: Future - """ - future_handle = RAW.wait(flux_handle, jobid) - return JobWaitFuture(future_handle) - - -JobWaitResult = collections.namedtuple("JobWaitResult", "jobid, success, errstr") - - -@check_future_error -def wait_get_status(future): - """Get job status from a Future returned by job.wait_async() - - Process a response to a Flux job wait request. This method blocks - until the response is received, then decodes the result to obtain - the job status. - - :param future: a Flux future object returned by job.wait_async() - :type future: Future - :returns: job status, a tuple of: Job ID (int), success (bool), - and an error (string) if success=False - :rtype: tuple - """ - if future is None or future == ffi.NULL: - raise EnvironmentError(errno.EINVAL, "future must not be None/NULL") - future.wait_for() # ensure the future is fulfilled - success = ffi.new("bool[1]") - errstr = ffi.new("const char *[1]") - jobid = ffi.new("flux_jobid_t[1]") - RAW.wait_get_id(future, jobid) - RAW.wait_get_status(future, success, errstr) - return JobWaitResult(int(jobid[0]), bool(success[0]), ffi.string(errstr[0])) - - -def wait(flux_handle, jobid=lib.FLUX_JOBID_ANY): - """Wait for a job to complete - - Submit a request to wait for job completion, blocking until a - response is received, then return the job status. - - Only jobs submitted with waitable=True can be waited for. - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID to wait for (default is any waitable job) - :returns: job status, a tuple of: Job ID (int), success (bool), - and an error (string) if success=False - :rtype: tuple - """ - future = wait_async(flux_handle, jobid) - return future.get_status() - - -def kill_async(flux_handle, jobid, signum=None): - """Send a signal to a running job asynchronously - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID of the job to kill - :param signum: signal to send (default SIGTERM) - :returns: a Future - :rtype: Future - """ - if not signum: - signum = signal.SIGTERM - return Future(RAW.kill(flux_handle, int(jobid), signum)) - - -def kill(flux_handle, jobid, signum=None): - """Send a signal to a running job. - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID of the job to kill - :param signum: signal to send (default SIGTERM) - """ - return kill_async(flux_handle, jobid, signum).get() - - -def cancel_async(flux_handle, jobid, reason=None): - """Cancel a pending or or running job asynchronously - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID of the job to cancel - :returns: a Future - :rtype: Future - - """ - if not reason: - reason = ffi.NULL - return Future(RAW.cancel(flux_handle, int(jobid), reason)) - - -def cancel(flux_handle, jobid, signum=None): - """Cancel a pending or or running job - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID of the job to cancel - - """ - return cancel_async(flux_handle, jobid, signum).get() - - -class EventLogEvent: - """ - wrapper class for a single KVS EventLog entry - """ - - def __init__(self, event): - """ - "Initialize from a string or dict eventlog event - """ - if isinstance(event, str): - event = json.loads(event) - self._name = event["name"] - self._timestamp = event["timestamp"] - self._context = {} - if "context" in event: - self._context = event["context"] - - def __str__(self): - return "{0.timestamp:<0.5f}: {0.name} {0.context}".format(self) - - @property - def name(self): - return self._name - - @property - def timestamp(self): - return self._timestamp - - @property - def context(self): - return self._context - - -class JobEventWatchFuture(Future): - """ - A future returned from job.event_watch_async(). - Adds get_event() method to return an EventLogEntry event - """ - - def __del__(self): - if self.needs_cancel is not False: - self.cancel() - try: - super().__del__() - except AttributeError: - pass - - def __init__(self, future_handle): - super().__init__(future_handle) - self.needs_cancel = True - - def get_event(self, autoreset=True): - """ - Return the next event from a JobEventWatchFuture, or None - if the event stream has terminated. - - The future is auto-reset unless autoreset=False, so a subsequent - call to get_event() will try to fetch the next event and thus - may block. - """ - result = ffi.new("char *[1]") - try: - RAW.event_watch_get(self.pimpl, result) - except OSError as exc: - if exc.errno == errno.ENODATA: - self.needs_cancel = False - return None - # re-raise all other exceptions - raise - event = EventLogEvent(ffi.string(result[0]).decode("utf-8")) - if autoreset is True: - self.reset() - return event - - def cancel(self): - """Cancel a streaming job.event_watch_async() future""" - RAW.event_watch_cancel(self.pimpl) - self.needs_cancel = False - - -def event_watch_async(flux_handle, jobid, eventlog="eventlog"): - """Asynchronously get eventlog updates for a job - - Asynchronously watch the events of a job eventlog, optionally only - returning events that match a glob pattern. - - Returns a JobEventWatchFuture. Call .get_event() from the then - callback to get the currently returned event from the Future object. - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID on which to watch events - :param name: The event name or glob pattern for which to wait (default: *) - :param eventlog: eventlog path in job kvs directory (default: eventlog) - :returns: a JobEventWatchFuture object - :rtype: JobEventWatchFuture - """ - - future = RAW.event_watch(flux_handle, int(jobid), eventlog, 0) - return JobEventWatchFuture(future) - - -def event_watch(flux_handle, jobid, eventlog="eventlog"): - """Python generator to watch all events for a job - - Synchronously watch events a job eventlog via a simple generator. - Use as: - for event in job.event_watch(flux_handle, jobid): - # do something with event... - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID on which to watch events - :param name: The event name or glob pattern for which to wait (default: *) - :param eventlog: eventlog path in job kvs directory (default: eventlog) - """ - watcher = event_watch_async(flux_handle, jobid, eventlog) - event = watcher.get_event() - while event is not None: - yield event - event = watcher.get_event() - - -class JobException(Exception): - def __init__(self, event): - self.timestamp = event.timestamp - self.type = event.context["type"] - self.note = event.context["note"] - self.severity = event.context["severity"] - super().__init__(self) - - def __str__(self): - return f"job.exception: type={self.type}: {self.note}" - - -def event_wait(flux_handle, jobid, name, eventlog="eventlog", raiseJobException=True): - """Wait for a job eventlog entry 'name' - - Wait synchronously for an eventlog entry named "name" and - return the entry to caller, raises OSError with ENODATA if - event never occurred - - :param flux_handle: handle for Flux broker from flux.Flux() - :type flux_handle: Flux - :param jobid: the job ID on which to wait for eventlog events - :param name: The event name for which to wait - :param eventlog: eventlog path in job kvs directory (default: eventlog) - :param raiseJobException: if True, watch for job exception events and - raise a JobException if one is seen before event 'name' (default=True) - :returns: an EventLogEntry object, or raises OSError if eventlog - ended before matching event was found - :rtype: EventLogEntry - """ - for event in event_watch(flux_handle, jobid, eventlog): - if event.name == name: - return event - if ( - raiseJobException - and event.name == "exception" - and event.context["severity"] == 0 - ): - raise JobException(event) - raise OSError(errno.ENODATA, f"eventlog ended before event='{name}'") - - -class JobListRPC(RPC): - def get_jobs(self): - return self.get()["jobs"] - - -# Due to subtleties in the python bindings and this call, this binding -# is more of a reimplementation of flux_job_list() instead of calling -# the flux_job_list() C function directly. Some reasons: -# -# - Desire to return a Python RPC class and use its get() method -# - Desired return value is json array, not a single value -# -# pylint: disable=dangerous-default-value -def job_list( - flux_handle, max_entries=1000, attrs=[], userid=os.getuid(), states=0, results=0 -): - payload = { - "max_entries": int(max_entries), - "attrs": attrs, - "userid": int(userid), - "states": states, - "results": results, - } - return JobListRPC(flux_handle, "job-info.list", payload) - - -def job_list_inactive(flux_handle, since=0.0, max_entries=1000, attrs=[], name=None): - payload = {"since": float(since), "max_entries": int(max_entries), "attrs": attrs} - if name: - payload["name"] = name - return JobListRPC(flux_handle, "job-info.list-inactive", payload) - - -class JobListIdRPC(RPC): - def get_job(self): - return self.get()["job"] - - -# list-id is not like list or list-inactive, it doesn't return an -# array, so don't use JobListRPC -def job_list_id(flux_handle, jobid, attrs=[]): - payload = {"id": int(jobid), "attrs": attrs} - return JobListIdRPC(flux_handle, "job-info.list-id", payload) - - def _validate_keys(expected, given, keys_optional=False, allow_additional=False): if not isinstance(expected, set): expected = set(expected) diff --git a/src/bindings/python/flux/job/__init__.py b/src/bindings/python/flux/job/__init__.py new file mode 100644 index 000000000000..1428b2914160 --- /dev/null +++ b/src/bindings/python/flux/job/__init__.py @@ -0,0 +1,26 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### + +from flux.job.Jobspec import Jobspec, JobspecV1, validate_jobspec +from flux.job.JobID import id_parse, id_encode, JobID +from flux.job.kvs import job_kvs, job_kvs_guest +from flux.job.kill import kill_async, kill, cancel_async, cancel +from flux.job.submit import submit_async, submit, submit_get_id +from flux.job.list import job_list, job_list_inactive, job_list_id +from flux.job.wait import wait_async, wait, wait_get_status +from flux.job.event import ( + event_watch_async, + event_watch, + event_wait, + JobEventWatchFuture, + EventLogEvent, + JobException, +) +from flux.core.inner import ffi diff --git a/src/bindings/python/flux/job/_wrapper.py b/src/bindings/python/flux/job/_wrapper.py new file mode 100644 index 000000000000..4718d15c5563 --- /dev/null +++ b/src/bindings/python/flux/job/_wrapper.py @@ -0,0 +1,20 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### + +from flux.wrapper import Wrapper +from _flux._core import ffi, lib + + +class JobWrapper(Wrapper): + def __init__(self): + super(JobWrapper, self).__init__(ffi, lib, prefixes=["flux_job_"]) + + +_RAW = JobWrapper() diff --git a/src/bindings/python/flux/job/event.py b/src/bindings/python/flux/job/event.py new file mode 100644 index 000000000000..0fc0d2af3919 --- /dev/null +++ b/src/bindings/python/flux/job/event.py @@ -0,0 +1,180 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import json +import errno + +from flux.future import Future +from flux.job._wrapper import _RAW as RAW +from _flux._core import ffi + + +class EventLogEvent: + """ + wrapper class for a single KVS EventLog entry + """ + + def __init__(self, event): + """ + "Initialize from a string or dict eventlog event + """ + if isinstance(event, str): + event = json.loads(event) + self._name = event["name"] + self._timestamp = event["timestamp"] + self._context = {} + if "context" in event: + self._context = event["context"] + + def __str__(self): + return "{0.timestamp:<0.5f}: {0.name} {0.context}".format(self) + + @property + def name(self): + return self._name + + @property + def timestamp(self): + return self._timestamp + + @property + def context(self): + return self._context + + +class JobEventWatchFuture(Future): + """ + A future returned from job.event_watch_async(). + Adds get_event() method to return an EventLogEntry event + """ + + def __del__(self): + if self.needs_cancel is not False: + self.cancel() + try: + super().__del__() + except AttributeError: + pass + + def __init__(self, future_handle): + super().__init__(future_handle) + self.needs_cancel = True + + def get_event(self, autoreset=True): + """ + Return the next event from a JobEventWatchFuture, or None + if the event stream has terminated. + + The future is auto-reset unless autoreset=False, so a subsequent + call to get_event() will try to fetch the next event and thus + may block. + """ + result = ffi.new("char *[1]") + try: + RAW.event_watch_get(self.pimpl, result) + except OSError as exc: + if exc.errno == errno.ENODATA: + self.needs_cancel = False + return None + # re-raise all other exceptions + raise + event = EventLogEvent(ffi.string(result[0]).decode("utf-8")) + if autoreset is True: + self.reset() + return event + + def cancel(self): + """Cancel a streaming job.event_watch_async() future""" + RAW.event_watch_cancel(self.pimpl) + self.needs_cancel = False + + +def event_watch_async(flux_handle, jobid, eventlog="eventlog"): + """Asynchronously get eventlog updates for a job + + Asynchronously watch the events of a job eventlog, optionally only + returning events that match a glob pattern. + + Returns a JobEventWatchFuture. Call .get_event() from the then + callback to get the currently returned event from the Future object. + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID on which to watch events + :param name: The event name or glob pattern for which to wait (default: *) + :param eventlog: eventlog path in job kvs directory (default: eventlog) + :returns: a JobEventWatchFuture object + :rtype: JobEventWatchFuture + """ + + future = RAW.event_watch(flux_handle, int(jobid), eventlog, 0) + return JobEventWatchFuture(future) + + +def event_watch(flux_handle, jobid, eventlog="eventlog"): + """Python generator to watch all events for a job + + Synchronously watch events a job eventlog via a simple generator. + Use as: + for event in job.event_watch(flux_handle, jobid): + # do something with event... + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID on which to watch events + :param name: The event name or glob pattern for which to wait (default: *) + :param eventlog: eventlog path in job kvs directory (default: eventlog) + """ + watcher = event_watch_async(flux_handle, jobid, eventlog) + event = watcher.get_event() + while event is not None: + yield event + event = watcher.get_event() + + +class JobException(Exception): + def __init__(self, event): + self.timestamp = event.timestamp + self.type = event.context["type"] + self.note = event.context["note"] + self.severity = event.context["severity"] + super().__init__(self) + + def __str__(self): + return f"job.exception: type={self.type}: {self.note}" + + +def event_wait(flux_handle, jobid, name, eventlog="eventlog", raiseJobException=True): + """Wait for a job eventlog entry 'name' + + Wait synchronously for an eventlog entry named "name" and + return the entry to caller, raises OSError with ENODATA if + event never occurred + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID on which to wait for eventlog events + :param name: The event name for which to wait + :param eventlog: eventlog path in job kvs directory (default: eventlog) + :param raiseJobException: if True, watch for job exception events and + raise a JobException if one is seen before event 'name' (default=True) + :returns: an EventLogEntry object, or raises OSError if eventlog + ended before matching event was found + :rtype: EventLogEntry + """ + for event in event_watch(flux_handle, jobid, eventlog): + if event.name == name: + return event + if ( + raiseJobException + and event.name == "exception" + and event.context["severity"] == 0 + ): + raise JobException(event) + raise OSError(errno.ENODATA, f"eventlog ended before event='{name}'") diff --git a/src/bindings/python/flux/job/kill.py b/src/bindings/python/flux/job/kill.py new file mode 100644 index 000000000000..d2002c0dd820 --- /dev/null +++ b/src/bindings/python/flux/job/kill.py @@ -0,0 +1,66 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import signal + +from flux.future import Future +from flux.job._wrapper import _RAW as RAW +from _flux._core import ffi + + +def kill_async(flux_handle, jobid, signum=None): + """Send a signal to a running job asynchronously + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID of the job to kill + :param signum: signal to send (default SIGTERM) + :returns: a Future + :rtype: Future + """ + if not signum: + signum = signal.SIGTERM + return Future(RAW.kill(flux_handle, int(jobid), signum)) + + +def kill(flux_handle, jobid, signum=None): + """Send a signal to a running job. + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID of the job to kill + :param signum: signal to send (default SIGTERM) + """ + return kill_async(flux_handle, jobid, signum).get() + + +def cancel_async(flux_handle, jobid, reason=None): + """Cancel a pending or or running job asynchronously + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID of the job to cancel + :returns: a Future + :rtype: Future + + """ + if not reason: + reason = ffi.NULL + return Future(RAW.cancel(flux_handle, int(jobid), reason)) + + +def cancel(flux_handle, jobid, signum=None): + """Cancel a pending or or running job + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID of the job to cancel + + """ + return cancel_async(flux_handle, jobid, signum).get() diff --git a/src/bindings/python/flux/job/kvs.py b/src/bindings/python/flux/job/kvs.py new file mode 100644 index 000000000000..60cda310ffcf --- /dev/null +++ b/src/bindings/python/flux/job/kvs.py @@ -0,0 +1,38 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import flux.kvs +from flux.job._wrapper import _RAW as RAW +from _flux._core import ffi + + +def job_kvs(flux_handle, jobid): + """ + :returns: The KVS directory of the given job + :rtype: KVSDir + """ + + path_len = 1024 + buf = ffi.new("char[]", path_len) + RAW.kvs_key(buf, path_len, jobid, "") + kvs_key = ffi.string(buf, path_len) + return flux.kvs.get_dir(flux_handle, kvs_key) + + +def job_kvs_guest(flux_handle, jobid): + """ + :returns: The KVS guest directory of the given job + :rtype: KVSDir + """ + + path_len = 1024 + buf = ffi.new("char[]", path_len) + RAW.kvs_guest_key(buf, path_len, jobid, "") + kvs_key = ffi.string(buf, path_len) + return flux.kvs.get_dir(flux_handle, kvs_key) diff --git a/src/bindings/python/flux/job/list.py b/src/bindings/python/flux/job/list.py new file mode 100644 index 000000000000..c667464d0c5b --- /dev/null +++ b/src/bindings/python/flux/job/list.py @@ -0,0 +1,57 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import os + +from flux.rpc import RPC + + +class JobListRPC(RPC): + def get_jobs(self): + return self.get()["jobs"] + + +# Due to subtleties in the python bindings and this call, this binding +# is more of a reimplementation of flux_job_list() instead of calling +# the flux_job_list() C function directly. Some reasons: +# +# - Desire to return a Python RPC class and use its get() method +# - Desired return value is json array, not a single value +# +# pylint: disable=dangerous-default-value +def job_list( + flux_handle, max_entries=1000, attrs=[], userid=os.getuid(), states=0, results=0 +): + payload = { + "max_entries": int(max_entries), + "attrs": attrs, + "userid": int(userid), + "states": states, + "results": results, + } + return JobListRPC(flux_handle, "job-info.list", payload) + + +def job_list_inactive(flux_handle, since=0.0, max_entries=1000, attrs=[], name=None): + payload = {"since": float(since), "max_entries": int(max_entries), "attrs": attrs} + if name: + payload["name"] = name + return JobListRPC(flux_handle, "job-info.list-inactive", payload) + + +class JobListIdRPC(RPC): + def get_job(self): + return self.get()["job"] + + +# list-id is not like list or list-inactive, it doesn't return an +# array, so don't use JobListRPC +def job_list_id(flux_handle, jobid, attrs=[]): + payload = {"id": int(jobid), "attrs": attrs} + return JobListIdRPC(flux_handle, "job-info.list-id", payload) diff --git a/src/bindings/python/flux/job/submit.py b/src/bindings/python/flux/job/submit.py new file mode 100644 index 000000000000..9467ae269d6f --- /dev/null +++ b/src/bindings/python/flux/job/submit.py @@ -0,0 +1,126 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import errno + +from flux import constants +from flux.util import check_future_error +from flux.future import Future +from flux.job.Jobspec import _convert_jobspec_arg_to_string +from flux.job._wrapper import _RAW as RAW +from _flux._core import ffi, lib + + +class SubmitFuture(Future): + def get_id(self): + return submit_get_id(self) + + +def submit_async( + flux_handle, + jobspec, + priority=lib.FLUX_JOB_PRIORITY_DEFAULT, + waitable=False, + debug=False, + pre_signed=False, +): + """Ask Flux to run a job, without waiting for a response + + Submit a job to Flux. This method returns immediately with a + Flux Future, which can be used obtain the job ID later. + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobspec: jobspec defining the job request + :type jobspec: Jobspec or its string encoding + :param priority: job priority 0 (lowest) through 31 (highest) + (default is 16). Priorities 0 through 15 are restricted to + the instance owner. + :type priority: int + :param waitable: allow result to be fetched with job.wait() + (default is False). Waitable=True is restricted to the + instance owner. + :type waitable: bool + :param debug: enable job manager debugging events to job eventlog + (default is False) + :type debug: bool + :param pre_signed: jobspec argument is already signed + (default is False) + :type pre_signed: bool + :returns: a Flux Future object for obtaining the assigned jobid + :rtype: Future + """ + jobspec = _convert_jobspec_arg_to_string(jobspec) + flags = 0 + if waitable: + flags |= constants.FLUX_JOB_WAITABLE + if debug: + flags |= constants.FLUX_JOB_DEBUG + if pre_signed: + flags |= constants.FLUX_JOB_PRE_SIGNED + future_handle = RAW.submit(flux_handle, jobspec, priority, flags) + return SubmitFuture(future_handle) + + +@check_future_error +def submit_get_id(future): + """Get job ID from a Future returned by job.submit_async() + + Process a response to a Flux job submit request. This method blocks + until the response is received, then decodes the result to obtain + the assigned job ID. + + :param future: a Flux future object returned by job.submit_async() + :type future: Future + :returns: job ID + :rtype: int + """ + if future is None or future == ffi.NULL: + raise EnvironmentError(errno.EINVAL, "future must not be None/NULL") + future.wait_for() # ensure the future is fulfilled + jobid = ffi.new("flux_jobid_t[1]") + RAW.submit_get_id(future, jobid) + return int(jobid[0]) + + +def submit( + flux_handle, + jobspec, + priority=lib.FLUX_JOB_PRIORITY_DEFAULT, + waitable=False, + debug=False, + pre_signed=False, +): + """Submit a job to Flux + + Ask Flux to run a job, blocking until a job ID is assigned. + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobspec: jobspec defining the job request + :type jobspec: Jobspec or its string encoding + :param priority: job priority 0 (lowest) through 31 (highest) + (default is 16). Priorities 0 through 15 are restricted to + the instance owner. + :type priority: int + :param waitable: allow result to be fetched with job.wait() + (default is False). Waitable=true is restricted to the + instance owner. + :type waitable: bool + :param debug: enable job manager debugging events to job eventlog + (default is False) + :type debug: bool + :param pre_signed: jobspec argument is already signed + (default is False) + :type pre_signed: bool + :returns: job ID + :rtype: int + """ + future = submit_async(flux_handle, jobspec, priority, waitable, debug, pre_signed) + return future.get_id() diff --git a/src/bindings/python/flux/job/wait.py b/src/bindings/python/flux/job/wait.py new file mode 100644 index 000000000000..ef12032c1a2d --- /dev/null +++ b/src/bindings/python/flux/job/wait.py @@ -0,0 +1,87 @@ +############################################################### +# Copyright 2020 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### +import errno +import collections + +from flux.util import check_future_error +from flux.future import Future +from flux.job._wrapper import _RAW as RAW +from _flux._core import ffi, lib + + +class JobWaitFuture(Future): + def get_status(self): + return wait_get_status(self) + + +def wait_async(flux_handle, jobid=lib.FLUX_JOBID_ANY): + """Wait for a job to complete, asynchronously + + Submit a request to wait for job completion. This method returns + immediately with a Flux Future, which can be used to process + the result later. + + Only jobs submitted with waitable=True can be waited for. + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID to wait for (default is any waitable job) + :returns: a Flux Future object for obtaining the job result + :rtype: Future + """ + future_handle = RAW.wait(flux_handle, jobid) + return JobWaitFuture(future_handle) + + +JobWaitResult = collections.namedtuple("JobWaitResult", "jobid, success, errstr") + + +@check_future_error +def wait_get_status(future): + """Get job status from a Future returned by job.wait_async() + + Process a response to a Flux job wait request. This method blocks + until the response is received, then decodes the result to obtain + the job status. + + :param future: a Flux future object returned by job.wait_async() + :type future: Future + :returns: job status, a tuple of: Job ID (int), success (bool), + and an error (string) if success=False + :rtype: tuple + """ + if future is None or future == ffi.NULL: + raise EnvironmentError(errno.EINVAL, "future must not be None/NULL") + future.wait_for() # ensure the future is fulfilled + success = ffi.new("bool[1]") + errstr = ffi.new("const char *[1]") + jobid = ffi.new("flux_jobid_t[1]") + RAW.wait_get_id(future, jobid) + RAW.wait_get_status(future, success, errstr) + return JobWaitResult(int(jobid[0]), bool(success[0]), ffi.string(errstr[0])) + + +def wait(flux_handle, jobid=lib.FLUX_JOBID_ANY): + """Wait for a job to complete + + Submit a request to wait for job completion, blocking until a + response is received, then return the job status. + + Only jobs submitted with waitable=True can be waited for. + + :param flux_handle: handle for Flux broker from flux.Flux() + :type flux_handle: Flux + :param jobid: the job ID to wait for (default is any waitable job) + :returns: job status, a tuple of: Job ID (int), success (bool), + and an error (string) if success=False + :rtype: tuple + """ + future = wait_async(flux_handle, jobid) + return future.get_status()