diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 21cf54a..3731173 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,10 +1,10 @@ -FROM fluxrm/flux-sched:focal +FROM ghcr.io/rse-ops/accounting:app-latest LABEL maintainer="Vanessasaurus <@vsoch>" # Pip not provided in this version USER root -RUN apt-get update && apt-get install -y python3-venv +RUN apt-get update && apt-get install -y python3-venv systemctl COPY ./requirements.txt /requirements.txt COPY ./.github/dev-requirements.txt /dev-requirements.txt COPY ./docs/requirements.txt /docs-requirements.txt @@ -18,11 +18,32 @@ RUN python3 -m pip install IPython && \ python3 -m pip install -r /dev-requirements.txt && \ python3 -m pip install -r /docs-requirements.txt - # Install isort and ensure on path RUN python3 -m venv /env && \ . /env/bin/activate && \ pip install -r /requirements.txt && \ pip install -r /dev-requirements.txt && \ - pip install -r /docs-requirements.txt + pip install -r /docs-requirements.txt && \ + # Only for development - don't add this to a production container + sudo useradd -m -p $(openssl passwd '12345') "flux" + +RUN mkdir -p /run/flux /var/lib/flux mkdir /etc/flux/system/cron.d /mnt/curve && \ + flux keygen /mnt/curve/curve.cert && \ + # This probably needs to be done as flux user? + flux account create-db && \ + flux account add-bank root 1 && \ + flux account add-bank --parent-bank=root user_bank 1 && \ + # These need to be owned by flux + chown -R flux /run/flux /var/lib/flux /mnt/curve && \ + # flux-imp needs setuid permission + chmod u+s /usr/libexec/flux/flux-imp + # flux account add-user --username=fluxuser --bank=user_bank + +COPY ./example/multi-user/flux.service /etc/systemd/system/flux.service +COPY ./example/multi-user/broker.toml /etc/flux/system/conf.d/broker.toml +COPY ./example/multi-user/imp.toml /etc/flux/imp/conf.d/imp.toml + +RUN chmod 4755 /usr/libexec/flux/flux-imp \ + && chmod 0644 /etc/flux/imp/conf.d/imp.toml \ + && chmod 0644 /etc/flux/system/conf.d/broker.toml ENV PATH=/env/bin:${PATH} diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 6cb9eb3..6fbf436 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -24,6 +24,6 @@ ], }, }, - // Needed for git security feature - "postStartCommand": "git config --global --add safe.directory /workspaces/flux-python-api" + // Needed for git security feature, and flux config + "postStartCommand": "git config --global --add safe.directory /workspaces/flux-python-api && flux R encode --hosts=$(hostname) > /etc/flux/system/R && sed -i 's@HOSTNAME@'$(hostname)'@' /etc/flux/system/conf.d/broker.toml && sudo service munge start" } diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 359c409..fae5301 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -29,6 +29,23 @@ jobs: run: pip install -r requirements.txt - name: Run tests run: | + # Tests for the API with auth disabled flux start pytest -xs tests/test_api.py + + # Tests for the API with single user auth + export FLUX_REQUIRE_AUTH=true export TEST_AUTH=true + export FLUX_USER=fluxuser + export FLUX_TOKEN=12345 + flux start pytest -xs tests/test_api.py + + # Tests for the API with multi-user auth, but fail because user not created + unset FLUX_USER + unset FLUX_TOKEN + export TEST_PAM_AUTH=true + export TEST_PAM_AUTH_FAIL=true + export FLUX_ENABLE_PAM=true flux start pytest -xs tests/test_api.py + + # TODO how to test pam in this mode? + # We would need to start flux as flux and run tests as a user diff --git a/CHANGELOG.md b/CHANGELOG.md index 31062d3..ab6d012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and **Merged pull requests**. Critical items to know are: The versions coincide with releases on pip. Only major versions will be released as tags on Github. ## [0.0.x](https://github.com/flux-framework/flux-restful-api/tree/main) (0.0.x) + - Support for basic PAM authentication (0.0.11) - Fixing bug with launcher always being specified (0.0.1) - catching any errors on creation of fluxjob - Add support uvicorn workers (>1 needed to run >1 process with Flux) diff --git a/Dockerfile b/Dockerfile index 367a0d9..3da702e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,6 +11,13 @@ ARG host="0.0.0.0" ARG workers="1" LABEL maintainer="Vanessasaurus <@vsoch>" +ENV FLUX_USER=${user} +ENV FLUX_TOKEN=${token} +ENV FLUX_REQUIRE_AUTH=${use_auth} +ENV PORT=${port} +ENV HOST=${host} +ENV WORKERS=${workers} + USER root RUN apt-get update COPY ./requirements.txt /requirements.txt @@ -27,10 +34,4 @@ RUN python3 -m pip install -r /requirements.txt && \ WORKDIR /code COPY . /code -ENV FLUX_USER=${user} -ENV FLUX_TOKEN=${token} -ENV FLUX_REQUIRE_AUTH=${use_auth} -ENV PORT=${port} -ENV HOST=${host} -ENV WORKERS=${workers} ENTRYPOINT ["/code/entrypoint.sh"] diff --git a/VERSION b/VERSION index 8acdd82..2cfabea 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.1 +0.0.11 diff --git a/app/core/config.py b/app/core/config.py index 708af8d..39ec567 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -67,6 +67,7 @@ class Settings(BaseSettings): # These map to envars, e.g., FLUX_USER has_gpus: bool = get_bool_envar("FLUX_HAS_GPUS") + enable_pam: bool = get_bool_envar("FLUX_ENABLE_PAM") # Assume there is at least one node! flux_nodes: int = get_int_envar("FLUX_NUMBER_NODES", 1) diff --git a/app/library/auth.py b/app/library/auth.py index 78fa050..2c74539 100644 --- a/app/library/auth.py +++ b/app/library/auth.py @@ -21,6 +21,7 @@ def not_authenticated(detail="Incorrect user or token."): def alert_auth(): print("πŸ“ Require auth: %s" % settings.require_auth) + print("πŸ“ PAM auth: %s" % settings.enable_pam) print( "πŸ“ Flux user: %s" % ("*" * len(settings.flux_user)) if settings.flux_user @@ -33,25 +34,53 @@ def alert_auth(): ) +def check_pam_auth(credentials: HTTPBasicCredentials = Depends(security)): + """ + Check base64 encoded auth (this is HTTP Basic auth.) + """ + # Ensure we have pam installed + try: + import pam + except ImportError: + print("python-pam is required for PAM.") + return + + username = credentials.username.encode("utf8") + password = credentials.password.encode("utf8") + if pam.authenticate(username, password) is True: + return credentials.username + + def check_auth(credentials: HTTPBasicCredentials = Depends(security)): """ Check base64 encoded auth (this is HTTP Basic auth.) """ + # First try to authenticate with PAM, if allowed. + if settings.enable_pam: + print("🧾️ Checking PAM auth...") + # Return the username if PAM authentication is successful + username = check_pam_auth(credentials) + if username: + print("🧾️ Success!") + return username + + # If we get here, we require the flux user and token if not settings.flux_user or not settings.flux_token: - return not_authenticated("Missing FLUX_USER and/or FLUX_TOKEN") + return not_authenticated("Missing FLUX_USER and/or FLUX_TOKEN or pam headers") + current_username_bytes = credentials.username.encode("utf8") correct_username_bytes = bytes(settings.flux_user.encode("utf8")) is_correct_username = secrets.compare_digest( current_username_bytes, correct_username_bytes ) - current_password_bytes = credentials.password.encode("utf8") + current_password_bytes = credentials.password.encode("utf8") correct_password_bytes = bytes(settings.flux_token.encode("utf8")) is_correct_password = secrets.compare_digest( current_password_bytes, correct_password_bytes ) if not (is_correct_username and is_correct_password): - return not_authenticated() + return not_authenticated("heree") return credentials.username diff --git a/app/library/flux.py b/app/library/flux.py index 4708844..e80125a 100644 --- a/app/library/flux.py +++ b/app/library/flux.py @@ -1,5 +1,6 @@ import json import os +import pwd import re import shlex import time @@ -7,8 +8,34 @@ import flux import flux.job +import app.library.terminal as terminal from app.core.config import settings +# Faux user environment (filtered set of application environment) +# We could likely find a way to better do this, but likely the users won't have customized environments +user_env = { + "SHELL": "/bin/bash", + "PATH": "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin", + "XDG_RUNTIME_DIR": "/tmp/user/0", + "DISPLAY": ":0", + "COLORTERM": "truecolor", + "SHLVL": "2", + "DEBIAN_FRONTEND": "noninteractive", + "MAKE_TERMERR": "/dev/pts/1", + "LANG": "C.UTF-8", + "TERM": "xterm-256color", +} + + +def submit_job(handle, jobspec, user): + """ + Handle to submit a job, either with flux job submit or on behalf of user. + """ + # We've enabled PAM auth + if settings.enable_pam: + return terminal.submit_job(jobspec, user) + return flux.job.submit_async(handle, jobspec) + def validate_submit_kwargs(kwargs, envars=None, runtime=None): """ @@ -68,6 +95,7 @@ def prepare_job(kwargs, runtime=0, workdir=None, envars=None): command = kwargs["command"] if isinstance(command, str): command = shlex.split(command) + print(f"⭐️ Command being submit: {command}") # Delete command from the kwargs (we added because is required and validated that way) @@ -90,8 +118,14 @@ def prepare_job(kwargs, runtime=0, workdir=None, envars=None): # A duration of zero (the default) means unlimited fluxjob.duration = runtime + # If we are running as the user, we don't want the current (root) environment + # This isn't perfect because it's artifically created, but it ensures we have paths + if settings.enable_pam: + environment = user_env + else: + environment = dict(os.environ) + # Additional envars in the payload? - environment = dict(os.environ) environment.update(envars) fluxjob.environment = environment return fluxjob @@ -131,12 +165,15 @@ def stream_job_output(jobid): pass -def cancel_job(jobid): +def cancel_job(jobid, user): """ Request a job to be cancelled by id. Returns a message to the user and a return code. """ + if settings.enable_pam: + return terminal.cancel_job(jobid, user) + from app.main import app try: @@ -147,12 +184,16 @@ def cancel_job(jobid): return "Job is requested to cancel.", 200 -def get_job_output(jobid, delay=None): +def get_job_output(jobid, user=None, delay=None): """ Given a jobid, get the output. If there is a delay, we are requesting on demand, so we want to return early. """ + # We've enabled PAM auth + if settings.enable_pam: + return terminal.get_job_output(jobid, user, delay=delay) + lines = [] start = time.time() from app.main import app @@ -171,38 +212,48 @@ def get_job_output(jobid, delay=None): return lines -def list_jobs_detailed(limit=None, query=None): +def list_jobs_detailed(user=None, limit=None, query=None): """ Get a detailed listing of jobs. """ - listing = list_jobs() + listing = list_jobs(user=user) ids = listing.get()["jobs"] jobs = {} for job in ids: - # Stop if a limit is defined and we have hit it! if limit is not None and len(jobs) >= limit: break try: - jobinfo = get_job(job["id"]) + jobinfo = get_job(job["id"], user=user) # Best effort hack to do a query if query and not query_job(jobinfo, query): continue + + # This will trigger a data table warning + for needed in ["ranks", "expiration"]: + if needed not in jobinfo: + jobinfo[needed] = "" + jobs[job["id"]] = jobinfo + except Exception: pass return jobs -def list_jobs(): +def list_jobs(user=None): """ Get a simple listing of jobs (just the ids) """ from app.main import app - return flux.job.job_list(app.handle) + if user is None or not settings.enable_pam: + return flux.job.job_list(app.handle) + pw_record = pwd.getpwnam(user) + user_uid = pw_record.pw_uid + return flux.job.job_list(app.handle, userid=user_uid) def get_simple_job(jobid): @@ -215,13 +266,17 @@ def get_simple_job(jobid): return json.loads(info.get_str())["job"] -def get_job(jobid): +def get_job(jobid, user): """ Get details for a job """ from app.main import app - payload = {"id": int(jobid), "attrs": ["all"]} + jobid = flux.job.JobID(jobid) + + payload = {"id": jobid, "attrs": ["all"]} + if settings.enable_pam: + payload["user"] = user rpc = flux.job.list.JobListIdRPC(app.handle, "job-list.list-id", payload) try: jobinfo = rpc.get() diff --git a/app/library/terminal.py b/app/library/terminal.py new file mode 100644 index 0000000..5f5f103 --- /dev/null +++ b/app/library/terminal.py @@ -0,0 +1,163 @@ +import json +import os +import pwd +import subprocess + +# Terminal functions to handle submitting on behalf of a user + +job_format = "{id.f58:>12} {username:<8.8} {name:<10.10+} {status:>9.9} {ntasks:>6} {nnodes:>6h} {t_submit!d:%b%d %R::>12} {t_remaining!F:>12h} {contextual_time!F:>8h}" +fields = [ + "id", + "user", + "name", + "status", + "ntasks", + "nnodes", + "time_submit", + "time_remaining", + "time_contextual", +] + + +class JobId: + """ + A fake Flux Future that can return a job_id + """ + + def __init__(self, jobid): + self.job_id = jobid + + def get_id(self): + return self.job_id + + +def run_as_user(command, user, cwd=None, request_env=None): + """ + Run a command as a user + """ + pw_record = pwd.getpwnam(user) + user_name = pw_record.pw_name + user_uid = pw_record.pw_uid + user_gid = pw_record.pw_gid + + # Even for a user this should be populated with dummy paths, etc. + env = {} + + # cwd will bork on an empty string + cwd = cwd or None + + print(f"🧾️ Running command as {user_name}") + env["HOME"] = pw_record.pw_dir + env["LOGNAME"] = user_name + env["USER"] = pw_record.pw_name + + # Update the environment, if provided + if request_env is not None: + env.update(request_env) + + # Run command as the user + print("⭐️ " + " ".join(command)) + print(cwd) + print(env) + process = subprocess.Popen( + command, + preexec_fn=demote(user_uid, user_gid), + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + ) + + # Let the calling function handle the return value parsing + return process.communicate() + + +def job_list(user): + """ + List jobs for a user + """ + + +def submit_job(jobspec, user): + """ + Submit a job on behalf of a user. + """ + # Prepare the command + command = ["flux", "mini", "submit"] + for resource in jobspec.resources: + if resource["with"][0]["type"] == "core": + command += ["--cores", str(resource["count"])] + + for cmd in jobspec.tasks: + if "command" in cmd: + command += cmd["command"] + break + + # Flux submit as the user + result = run_as_user( + command, request_env=jobspec.environment, user=user, cwd=jobspec.cwd + ) + jobid = (result[0].decode("utf-8")).strip() + return JobId(jobid) + + +def cancel_job(jobid, user): + """ + Cancel a job for a user + """ + command = ["flux", "job", "cancel", jobid] + result = run_as_user(command, user=user) + jobid = (result[0].decode("utf-8")).strip() + if "inactive" in jobid: + return "Job cannot be cancelled: %s." % jobid, 400 + return "Job is requested to cancel.", 200 + + +def get_job_output(jobid, user, delay=None): + """ + Given a jobid, get the output. + + If there is a delay, we are requesting on demand, so we want to return early. + """ + lines = [] + command = ["flux", "job", "info", jobid, "guest.output"] + result = run_as_user(command, user=user) + lines = (result[0].decode("utf-8")).strip() + + output = "" + for line in lines.split("\n"): + try: + content = json.loads(line) + if "context" in content and "data" in content["context"]: + output += content["context"]["data"] + except Exception: + print(line) + pass + return output + + +def demote(user_uid, user_gid): + """ + Demote the user to a specific gid/gid + """ + + def result(): + os.setgid(user_gid) + os.setuid(user_uid) + + return result + + +def get_job(jobid, user): + """ + Get details for a job + + This is not currently used, instead we pass a user to job list. + """ + command = ["flux", "jobs", jobid, "-o", job_format, "--suppress-header"] + result = run_as_user(command, user=user) + jobid = (result[0].decode("utf-8")).strip() + jobid = [x for x in jobid.split(" ") if x] + jobinfo = {} + for field in fields: + jobinfo[field] = jobid.pop(0) + return jobinfo diff --git a/app/routers/api.py b/app/routers/api.py index ff2ddcf..791b2eb 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -24,6 +24,9 @@ ) no_auth_router = APIRouter(prefix="/v1", tags=["jobs"]) +# Require auth (and the user in the view) +user_auth = Depends(check_auth) if config.settings.require_auth else None + templates = Jinja2Templates(directory="templates/") @@ -39,7 +42,7 @@ async def service_stop(): @router.get("/jobs/search") -async def jobs_listing(request: Request): +async def jobs_listing(request: Request, user=user_auth): """ Jobslist is intended to be used by the server to render data tables @@ -53,7 +56,7 @@ async def jobs_listing(request: Request): ) # If we have a query, filter to those that have in the name - jobs = list(flux_cli.list_jobs_detailed().values()) + jobs = list(flux_cli.list_jobs_detailed(user=user).values()) total = len(jobs) # Now filter @@ -79,7 +82,7 @@ async def jobs_listing(request: Request): @router.get("/jobs") -async def list_jobs(request: Request): +async def list_jobs(request: Request, user=user_auth): """ List flux jobs associated with the handle. """ @@ -90,15 +93,14 @@ async def list_jobs(request: Request): # Does the requester want details - in dict or listing form? if helpers.has_boolean_arg(payload, "details"): - # Job limit (only relevant for details) limit = helpers.get_int_arg(payload, "limit") - jobs = flux_cli.list_jobs_detailed(limit) + jobs = flux_cli.list_jobs_detailed(limit=limit, user=user) if helpers.has_boolean_arg(payload, "listing"): jobs = list(jobs.values()) else: - listing = flux_cli.list_jobs() + listing = flux_cli.list_jobs(user=user) jobs = jsonable_encoder({"jobs": listing.get_jobs()}) return JSONResponse(content=jobs, status_code=200) @@ -119,16 +121,18 @@ async def list_nodes(): @router.post("/jobs/{jobid}/cancel") -async def cancel_job(jobid): +async def cancel_job(jobid, user=user_auth): """ Cancel a running flux job """ - message, return_code = flux_cli.cancel_job(jobid) - return JSONResponse(content={"Message": message}, status_code=return_code) + message, return_code = flux_cli.cancel_job(jobid, user) + return JSONResponse( + content={"Message": message, "id": jobid}, status_code=return_code + ) @router.post("/jobs/submit") -async def submit_job(request: Request): +async def submit_job(request: Request, user=user_auth): """ Submit a job to our running cluster. @@ -184,6 +188,7 @@ async def submit_job(request: Request): # Are we using a launcher instead? is_launcher = payload.get("is_launcher", False) if is_launcher: + print("TODO need to test multi-user") message = launcher.launch(kwargs, workdir=workdir, envars=envars) result = jsonable_encoder({"Message": message, "id": "MANY"}) else: @@ -192,7 +197,8 @@ async def submit_job(request: Request): fluxjob = flux_cli.prepare_job( kwargs, runtime=runtime, workdir=workdir, envars=envars ) - flux_future = flux.job.submit_async(app.handle, fluxjob) + # This handles either a single/multi user case + flux_future = flux_cli.submit_job(app.handle, fluxjob, user=user) except Exception as e: result = jsonable_encoder( {"Message": "There was an issue submitting that job.", "Error": str(e)} @@ -206,21 +212,21 @@ async def submit_job(request: Request): @router.get("/jobs/{jobid}") -async def get_job(jobid): +async def get_job(jobid, user=user_auth): """ Get job info based on id. """ - info = flux_cli.get_job(jobid) + info = flux_cli.get_job(jobid, user=user) info = jsonable_encoder(info) return JSONResponse(content=info, status_code=200) @router.get("/jobs/{jobid}/output") -async def get_job_output(jobid): +async def get_job_output(jobid, user=user_auth): """ Get job output based on id. """ - lines = flux_cli.get_job_output(jobid) + lines = flux_cli.get_job_output(jobid, user=user) # We have output if lines: diff --git a/app/routers/views.py b/app/routers/views.py index 5b85660..6f629fd 100644 --- a/app/routers/views.py +++ b/app/routers/views.py @@ -1,4 +1,3 @@ -import flux import flux.job from fastapi import APIRouter, Depends, Request from fastapi.responses import HTMLResponse, RedirectResponse @@ -22,6 +21,9 @@ ) templates = Jinja2Templates(directory="templates/") +# Require auth (and the user in the view) +user_auth = Depends(check_auth) if settings.require_auth else None + @router.get("/", response_class=HTMLResponse) async def home(request: Request): @@ -40,8 +42,8 @@ async def home(request: Request): # List jobs @auth_views_router.get("/jobs", response_class=HTMLResponse) -async def jobs_table(request: Request): - jobs = list(flux_cli.list_jobs_detailed().values()) +async def jobs_table(request: Request, user=user_auth): + jobs = list(flux_cli.list_jobs_detailed(user=user).values()) return templates.TemplateResponse( "jobs/jobs.html", { @@ -58,19 +60,19 @@ async def jobs_table(request: Request): name="job_info", operation_id="job_info", ) -async def job_info(request: Request, jobid, msg=None): - job = flux_cli.get_job(jobid) +async def job_info(request: Request, jobid, msg=None, user=user_auth): + job = flux_cli.get_job(jobid, user=user) # If we have a message, add to messages messages = [msg] if msg else [] # If not completed, ask info to return after a second of waiting if job["state"] == "INACTIVE": - info = flux_cli.get_job_output(jobid) + info = flux_cli.get_job_output(jobid, user=user) # Otherwise ensure we get all the logs! else: - info = flux_cli.get_job_output(jobid, delay=1) + info = flux_cli.get_job_output(jobid, user=user, delay=1) return templates.TemplateResponse( "jobs/job.html", { @@ -85,7 +87,7 @@ async def job_info(request: Request, jobid, msg=None): # Submit a job via a form @auth_views_router.get("/jobs/submit", response_class=HTMLResponse) -async def submit_job(request: Request): +async def submit_job(request: Request, _=user_auth): form = SubmitForm(request) return templates.TemplateResponse( "jobs/submit.html", @@ -95,19 +97,20 @@ async def submit_job(request: Request): # Button to cancel a job @auth_views_router.get("/job/{jobid}/cancel", response_class=HTMLResponse) -async def cancel_job(request: Request, jobid): +async def cancel_job(request: Request, jobid, user=user_auth): from app.main import app - message, _ = flux_cli.cancel_job(jobid) + message, _ = flux_cli.cancel_job(jobid, user=user) url = app.url_path_for(name="job_info", jobid=jobid) + "?msg=" + message return RedirectResponse(url=url) @auth_views_router.post("/jobs/submit") -async def submit_job_post(request: Request): +async def submit_job_post(request: Request, user=user_auth): """ Receive data posted (submit) to the form. """ + print(user) from app.main import app messages = [] @@ -120,7 +123,7 @@ async def submit_job_post(request: Request): if form.kwargs.get("is_launcher") is True: messages.append(launcher.launch(form.kwargs, workdir=form.workdir)) else: - return submit_job_helper(request, app, form) + return submit_job_helper(request, app, form, user=user) else: print("πŸ’ Submit form is NOT valid!") return templates.TemplateResponse( @@ -135,19 +138,19 @@ async def submit_job_post(request: Request): ) -def submit_job_helper(request, app, form): +def submit_job_helper(request, app, form, user): """ A helper to submit a flux job (not a launcher) """ # Submit the job and return the ID, but allow for error - try: + if 1 == 1: # Prepare the flux job! We don't support envars here yet fluxjob = flux_cli.prepare_job( form.kwargs, runtime=form.runtime, workdir=form.workdir ) - flux_future = flux.job.submit_async(app.handle, fluxjob) + flux_future = flux_cli.submit_job(app.handle, fluxjob, user=user) jobid = flux_future.get_id() - intid = int(jobid) + intid = flux.job.JobID(jobid) message = f"Your job was successfully submit! 🦊 {jobid}" return templates.TemplateResponse( "jobs/submit.html", @@ -157,8 +160,8 @@ def submit_job_helper(request, app, form): "messages": [message], }, ) - except Exception as e: - form.errors.append("There was an issue submitting that job: %s" % str(e)) + # except Exception as e: + # form.errors.append("There was an issue submitting that job: %s" % str(e)) return templates.TemplateResponse( "jobs/submit.html", diff --git a/clients/python/examples/submit_job.py b/clients/python/examples/submit_job.py index 88775c8..2a3a4c7 100644 --- a/clients/python/examples/submit_job.py +++ b/clients/python/examples/submit_job.py @@ -8,6 +8,8 @@ def main(): cli = get_client() + # To set basic auth for token/user + cli.set_basic_auth("flux", "12345") # Submit the job to flux print("😴 Submitting job sleep 60") res = cli.submit(command=["sleep", 60]) diff --git a/clients/python/flux_restful_client/client/__init__.py b/clients/python/flux_restful_client/client/__init__.py index 4087507..aaf3452 100644 --- a/clients/python/flux_restful_client/client/__init__.py +++ b/clients/python/flux_restful_client/client/__init__.py @@ -208,7 +208,6 @@ def get_parser(): def run_flux_restful_client(): - parser = get_parser() def help(return_code=0): diff --git a/clients/python/flux_restful_client/client/config.py b/clients/python/flux_restful_client/client/config.py index f06c90a..aa8e54d 100644 --- a/clients/python/flux_restful_client/client/config.py +++ b/clients/python/flux_restful_client/client/config.py @@ -6,7 +6,6 @@ def main(args, parser, extra, subparser): - # If nothing provided, show help if not args.params: print(subparser.format_help()) diff --git a/clients/python/flux_restful_client/client/shell.py b/clients/python/flux_restful_client/client/shell.py index d3a426d..d64f353 100644 --- a/clients/python/flux_restful_client/client/shell.py +++ b/clients/python/flux_restful_client/client/shell.py @@ -1,5 +1,4 @@ def main(args, parser, extra, subparser): - lookup = {"ipython": ipython, "python": python, "bpython": bpython} shells = ["ipython", "python", "bpython"] diff --git a/clients/python/flux_restful_client/logger.py b/clients/python/flux_restful_client/logger.py index b861c9e..7ea4dab 100644 --- a/clients/python/flux_restful_client/logger.py +++ b/clients/python/flux_restful_client/logger.py @@ -33,7 +33,6 @@ def add_prefix(msg, char=">>"): class ColorizingStreamHandler(_logging.StreamHandler): - BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) RESET_SEQ = LogColors.ENDC COLOR_SEQ = "\033[%dm" diff --git a/clients/python/flux_restful_client/main/client.py b/clients/python/flux_restful_client/main/client.py index 336f492..6c55ddd 100644 --- a/clients/python/flux_restful_client/main/client.py +++ b/clients/python/flux_restful_client/main/client.py @@ -29,7 +29,6 @@ def __init__( timeout=2, **kwargs, ): - # If we don't have default settings, load if not hasattr(self, "settings"): self.settings = Settings(settings_file) @@ -141,7 +140,6 @@ def authenticate_request(self, originalResponse): """ authHeaderRaw = originalResponse.headers.get("Www-Authenticate") if not authHeaderRaw: - return False # If we have a username and password, set basic auth automatically @@ -162,7 +160,6 @@ def authenticate_request(self, originalResponse): h = utils.parse_auth_header(authHeaderRaw) headers.update( { - "service": h.Service, "Accept": "application/json", "User-Agent": "flux-restful-client", } @@ -280,7 +277,6 @@ def submit(self, command, **kwargs): "workdir", "envars", ]: - # Assume if it's provided, period, the user wants to set it! if optional in kwargs: data[optional] = kwargs[optional] diff --git a/clients/python/setup.py b/clients/python/setup.py index 75eb6c2..effef9e 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -55,7 +55,6 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): if __name__ == "__main__": - INSTALL_REQUIRES = get_reqs(lookup) TESTS_REQUIRES = get_reqs(lookup, "TESTS_REQUIRES") INSTALL_REQUIRES_ALL = get_reqs(lookup, "INSTALL_REQUIRES_ALL") diff --git a/docs/auto_examples/api_tutorial.py b/docs/auto_examples/api_tutorial.py index 99c0675..2b7088b 100644 --- a/docs/auto_examples/api_tutorial.py +++ b/docs/auto_examples/api_tutorial.py @@ -25,14 +25,14 @@ plt.axis("off") plt.show() -#%% +# %% # Here we instantiate a client. If you need authentication, this can optionally take # a user and token, or also derive from the FLUX_USER and FLUX_TOKEN in the # environment. cli = get_client() -#%% +# %% # Let's list the nodes in our cluster! print("Listing nodes") res = cli.list_nodes() @@ -40,7 +40,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # Now let's submit a job to Flux. print("😴 Submitting job sleep 60") @@ -51,7 +51,7 @@ print(res["detail"]) sys.exit() -#%% +# %% # To require auth, the server should be startup with these variables # in the environment (and the first two found by the client here) # variables exported: @@ -59,7 +59,7 @@ # FLUX_TOKEN=12345 # FLUX_REQUIRE_AUTH=true -#%% +# %% # And finally, let's get job info. print("πŸ“ Getting job info...") res = cli.jobs(res["id"]) @@ -67,7 +67,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # And job logs # This will be added to the client print("😴 Submitting job to echo pancakes πŸ₯žπŸ₯žπŸ₯ž") @@ -78,7 +78,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # Now let's submit three jobs in unison so we can list them back! # Submit the job to flux print("Submitting 3 jobs to sleep!") @@ -88,7 +88,7 @@ if res: print(json.dumps(res, indent=4)) -#%% +# %% # And this is how to search (with a start, length, or query) print("πŸŒ“ Querying jobs!") res = cli.search("sleep", start=1, length=2) @@ -96,7 +96,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # Finally, let's submit and cancel a job print("Submitting job sleep 60 intending to cancel..") res = cli.submit(command=["sleep", 60]) @@ -106,7 +106,7 @@ res = cli.cancel(res["id"]) print(json.dumps(res, indent=4)) -#%% +# %% # And this would be how you stop your cluster service print("Stopping the service...") # res = cli.stop_service() diff --git a/docs/conf.py b/docs/conf.py index 6804431..13634f0 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -54,8 +54,6 @@ "sphinx_immaterial.graphviz", "nbsphinx", "sphinx_markdown_tables", - "sphinx_copybutton", - "sphinx_search.extension", ] autosummary_generate = True @@ -111,7 +109,6 @@ # Custom sphinx material variables theme_logo_icon = "images/oras.png" - # material theme options (see theme.conf for more information) html_theme_options = { "icon": { diff --git a/docs/examples/README.txt b/docs/examples/README.txt index 77447bf..c113eb3 100644 --- a/docs/examples/README.txt +++ b/docs/examples/README.txt @@ -1,9 +1,9 @@ .. _examples-index: -Tutorials -========= +Examples +======== -This is the Flux RESTFul API tutorials gallery that renders from our Python +This is the Flux RESTFul API examples gallery that renders from our Python script examples! .. _api_examples: diff --git a/docs/examples/api_tutorial.py b/docs/examples/api_tutorial.py index 99c0675..2b7088b 100644 --- a/docs/examples/api_tutorial.py +++ b/docs/examples/api_tutorial.py @@ -25,14 +25,14 @@ plt.axis("off") plt.show() -#%% +# %% # Here we instantiate a client. If you need authentication, this can optionally take # a user and token, or also derive from the FLUX_USER and FLUX_TOKEN in the # environment. cli = get_client() -#%% +# %% # Let's list the nodes in our cluster! print("Listing nodes") res = cli.list_nodes() @@ -40,7 +40,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # Now let's submit a job to Flux. print("😴 Submitting job sleep 60") @@ -51,7 +51,7 @@ print(res["detail"]) sys.exit() -#%% +# %% # To require auth, the server should be startup with these variables # in the environment (and the first two found by the client here) # variables exported: @@ -59,7 +59,7 @@ # FLUX_TOKEN=12345 # FLUX_REQUIRE_AUTH=true -#%% +# %% # And finally, let's get job info. print("πŸ“ Getting job info...") res = cli.jobs(res["id"]) @@ -67,7 +67,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # And job logs # This will be added to the client print("😴 Submitting job to echo pancakes πŸ₯žπŸ₯žπŸ₯ž") @@ -78,7 +78,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # Now let's submit three jobs in unison so we can list them back! # Submit the job to flux print("Submitting 3 jobs to sleep!") @@ -88,7 +88,7 @@ if res: print(json.dumps(res, indent=4)) -#%% +# %% # And this is how to search (with a start, length, or query) print("πŸŒ“ Querying jobs!") res = cli.search("sleep", start=1, length=2) @@ -96,7 +96,7 @@ print(json.dumps(res, indent=4)) -#%% +# %% # Finally, let's submit and cancel a job print("Submitting job sleep 60 intending to cancel..") res = cli.submit(command=["sleep", 60]) @@ -106,7 +106,7 @@ res = cli.cancel(res["id"]) print(json.dumps(res, indent=4)) -#%% +# %% # And this would be how you stop your cluster service print("Stopping the service...") # res = cli.stop_service() diff --git a/docs/getting_started/developer-guide.md b/docs/getting_started/developer-guide.md index 756d8b9..43f5c51 100644 --- a/docs/getting_started/developer-guide.md +++ b/docs/getting_started/developer-guide.md @@ -45,6 +45,7 @@ $ docker run --rm -it -p 5000:5000 ghcr.io/flux-framework/flux-restful-api ``` ```console πŸ“ Require auth: True +πŸ“ PAM auth: False πŸ“ Flux user: ******** πŸ“ Flux token: ***** INFO: Started server process [72] @@ -88,7 +89,7 @@ $ pip install -r requirements.txt Install requirements (note that you also need Flux Python available, which isn't in these requirements as you cannot install from pip). ```bash -$ pip install -r app-requirements.txt +$ pip install -r requirements.txt ``` #### 2. Start Service @@ -111,8 +112,38 @@ For the latter, you can also use the Makefile: ```bash $ make ``` + If you are developing, you must do the second approach as the server won't live-update -with the first. +with the first. If you want to start flux running as a separate process: + +```bash +sudo -u flux /usr/bin/flux broker \ + --config-path=/etc/flux/system/conf.d \ + -Scron.directory=/etc/flux/system/cron.d \ + -Srundir=/run/flux \ + -Sstatedir=${STATE_DIRECTORY:-/var/lib/flux} \ + -Slocal-uri=local:///run/flux/local \ + -Slog-stderr-level=6 \ + -Slog-stderr-mode=local \ + -Sbroker.rc2_none \ + -Sbroker.quorum=0 \ + -Sbroker.quorum-timeout=none \ + -Sbroker.exit-norestart=42 \ + -Scontent.restore=auto & +``` + +And then we need munge to be started (this should be done by the devcontainer): + +```bash +$ sudo service munge start +``` + +And export any authentication envars you need before running make. + +```bash +export FLUX_URI=local:///run/flux/local +$ sudo -E make +``` #### 3. Authentication @@ -122,9 +153,19 @@ a variable that tells the server to use auth: ```bash export FLUX_USER=$USER export FLUX_TOKEN=123456 -export FLUX_USER_AUTH=true +export FLUX_REQUIRE_AUTH=true ``` +As an alternative, you can enable PAM authentication to use user accounts on the running server: + +```bash +export FLUX_ENABLE_PAM=true +export FLUX_REQUIRE_AUTH=true +``` + +Authentication must be enabled for PAM to work too - you can't just enable the first. For the latter (multi-user) +flux needs to be started first (e.g., the instance or broker) and then the actual server needs to be started by root. + ### Interactions Regardless of how you install, you can open your host to [http://127.0.0.1:5000](http://127.0.0.1:5000) @@ -149,6 +190,7 @@ The following variables are available (with their defaults): |FLUX_HAS_GPU | GPUs are available for the user to request | unset | |FLUX_NUMBER_NODES| The number of nodes available in the cluster | 1 | |FLUX_OPTION_FLAGS | Option flags to give to flux, in the same format you'd give on the command line | unset | +|FLUX_ENABLE_PAM | Enable PAM authentication (e.g., username and password must be users on the running server) | unset | ### Flux Option Flags diff --git a/docs/getting_started/index.md b/docs/getting_started/index.md index 4f0eb20..d8d7bc2 100644 --- a/docs/getting_started/index.md +++ b/docs/getting_started/index.md @@ -12,6 +12,7 @@ any questions or issues, please [let us know](https://github.com/flux-framework/ :maxdepth: 3 user-guide developer-guide +tutorials interface api ``` diff --git a/docs/getting_started/user-guide.md b/docs/getting_started/user-guide.md index 2ab941b..b8808d0 100644 --- a/docs/getting_started/user-guide.md +++ b/docs/getting_started/user-guide.md @@ -23,47 +23,6 @@ You really should only be interacting with a server that doesn't require authent From here, continue reading the user guide for different language clients, or see our Python [examples](https://github.com/flux-framework/flux-restful-api/tree/main/clients/python/examples) folder. -## Container Tutorial - -If you don't want to use a command line client (or you want to do a quick tutorial just using an interface) -this one is for you! You can run our demo container as follows: - -```bash -$ docker run -it -p 5000:5000 ghcr.io/flux-framework/flux-restful-api:latest -``` - -To start the image. Then open your browser to [http://0.0.0.0:5000](http://0.0.0.0:5000). -Note for those adventurous - you *can* in fact use this server to test the clients below! But -this is not required. - -1. Go to the "Submit" interface in the navigation to ask to submit a job. -2. We recommend something with output like `echo pancakes πŸ₯žπŸ₯žπŸ₯žπŸ₯žπŸ₯ž` -3. Then click on "View Jobs" to see your job. - -![img/pancakes-job.png](img/pancakes-job.png) - -And then explore other API endpoints by clicking "Docs." And that's it! -If you want to build a similar container with your own software, we recommend you -use this as a base container, ensure your needed executables are on the path, -and then you can submit jobs to your heart's content. Here is an example: - -```dockerfile -FROM ghcr.io/flux-framework/flux-restful-api:latest -RUN pip install pokemon -``` - -To build locally: - -```bash -$ docker build -t flux-pokemon . -``` - -And run the same as before to submit a job. But this time... we have access to generate ascii-art pokemon! - -![img/pokemon-output.png](img/pokemon-output.png) - -Yay! But seriously, you should like, do real science work. Only dinosaurs generate pokemon! - ## Python ### Installation @@ -354,4 +313,4 @@ See the [examples](https://github.com/flux-framework/flux-restful-api/tree/main/ ## Go -Coming soon! +Coming soon (or sooner upon request)! diff --git a/docs/index.md b/docs/index.md index 74db8a4..a11475c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -27,7 +27,7 @@ Would you like to request a feature or contribute? ```{toctree} :maxdepth: 1 getting_started/index.md -auto_examples/index +tutorials/index contributing.md about/license ``` diff --git a/docs/tutorials/container.md b/docs/tutorials/container.md new file mode 100644 index 0000000..d93774c --- /dev/null +++ b/docs/tutorials/container.md @@ -0,0 +1,40 @@ +# Container Tutorial + +If you don't want to use a command line client (or you want to do a quick tutorial just using an interface) +this one is for you! You can run our demo container as follows: + +```bash +$ docker run -it -p 5000:5000 ghcr.io/flux-framework/flux-restful-api:latest +``` + +To start the image. Then open your browser to [http://0.0.0.0:5000](http://0.0.0.0:5000). +Note for those adventurous - you *can* in fact use this server to test the clients below! But +this is not required. + +1. Go to the "Submit" interface in the navigation to ask to submit a job. +2. We recommend something with output like `echo pancakes πŸ₯žπŸ₯žπŸ₯žπŸ₯žπŸ₯ž` +3. Then click on "View Jobs" to see your job. + +![img/pancakes-job.png](img/pancakes-job.png) + +And then explore other API endpoints by clicking "Docs." And that's it! +If you want to build a similar container with your own software, we recommend you +use this as a base container, ensure your needed executables are on the path, +and then you can submit jobs to your heart's content. Here is an example: + +```dockerfile +FROM ghcr.io/flux-framework/flux-restful-api:latest +RUN pip install pokemon +``` + +To build locally: + +```bash +$ docker build -t flux-pokemon . +``` + +And run the same as before to submit a job. But this time... we have access to generate ascii-art pokemon! + +![img/pokemon-output.png](img/pokemon-output.png) + +Yay! But seriously, you should like, do real science work. Only dinosaurs generate pokemon! diff --git a/docs/getting_started/img/pancakes-job.png b/docs/tutorials/img/pancakes-job.png similarity index 100% rename from docs/getting_started/img/pancakes-job.png rename to docs/tutorials/img/pancakes-job.png diff --git a/docs/getting_started/img/pokemon-output.png b/docs/tutorials/img/pokemon-output.png similarity index 100% rename from docs/getting_started/img/pokemon-output.png rename to docs/tutorials/img/pokemon-output.png diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md new file mode 100644 index 0000000..d19f0ce --- /dev/null +++ b/docs/tutorials/index.md @@ -0,0 +1,19 @@ +# Tutorials + +The Flux RESTFUl API is a FastAPI application that makes it easy to +interact with a single user Flux cluster via a RESTFul endpoint. +This set of tutorials help to walk through several use cases, +including using the Python API, or deploying on your own cluster +FLux instance. We also currently provide an example client in the [examples](https://github.com/flux-framework/flux-restful-api/tree/main/examples) +folder with additional examples. If you have +any questions or issues, please [let us know](https://github.com/flux-framework/flux-restful-api/issues) + +## External Tutorials + + - [Multi-user mode](https://flux-framework.org/flux-operator/getting_started/tutorials/multi-tenancy.html) deployed via the Flux Operator. + +```{toctree} +:maxdepth: 3 +../auto_examples/index +container +``` diff --git a/example/multi-user/broker.toml b/example/multi-user/broker.toml new file mode 100644 index 0000000..eef2eda --- /dev/null +++ b/example/multi-user/broker.toml @@ -0,0 +1,20 @@ +# Flux needs to know the path to the IMP executable +[exec] +imp = "/usr/libexec/flux/flux-imp" + +[access] +allow-guest-user = true +allow-root-owner = true + +# Point to resource definition generated with flux-R(1). +[resource] +path = "/etc/flux/system/R" + +[bootstrap] +curve_cert = "/mnt/curve/curve.cert" +default_port = 8050 +default_bind = "tcp://eth0:%%p" +default_connect = "tcp://%%h:%%p" +hosts = [ + { host="HOSTNAME"}, +] diff --git a/example/multi-user/flux.service b/example/multi-user/flux.service new file mode 100644 index 0000000..5f4cfbe --- /dev/null +++ b/example/multi-user/flux.service @@ -0,0 +1,48 @@ +[Unit] +Description=Flux message broker +Wants=munge.service + +[Service] +TimeoutStopSec=90 +KillMode=mixed +ExecStart=/usr/bin/bash -c '\ + XDG_RUNTIME_DIR=/run/user/$UID \ + DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/$UID/bus \ + /usr/bin/flux broker \ + --config-path=/etc/flux/system/conf.d \ + -Scron.directory=/etc/flux/system/cron.d \ + -Srundir=/run/flux \ + -Sstatedir=${STATE_DIRECTORY:-/var/lib/flux} \ + -Slocal-uri=local:///run/flux/local \ + -Slog-stderr-level=6 \ + -Slog-stderr-mode=local \ + -Sbroker.rc2_none \ + -Sbroker.quorum=0 \ + -Sbroker.quorum-timeout=none \ + -Sbroker.exit-norestart=42 \ + -Scontent.restore=auto \ +' +SyslogIdentifier=flux +ExecReload=/usr/bin/flux config reload +Restart=always +RestartSec=5s +RestartPreventExitStatus=42 +User=flux +Group=flux +RuntimeDirectory=flux +RuntimeDirectoryMode=0755 +StateDirectory=flux +StateDirectoryMode=0700 +PermissionsStartOnly=true +# ExecStartPre=/usr/bin/loginctl enable-linger flux +# ExecStartPre=/usr/bin/bash -c 'systemctl start user@$(id -u flux).service' + +# +# Delegate cgroup control to user flux, so that systemd doesn't reset +# cgroups for flux initiated processes, and to allow (some) cgroup +# manipulation as user flux. +# +Delegate=yes + +[Install] +WantedBy=multi-user.target diff --git a/requirements.txt b/requirements.txt index 3773e0b..1e7eee9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ jinja2==3.0.3 Markdown==3.3.6 pytest==6.2.5 pyaml +python-pam diff --git a/templates/jobs/submit.html b/templates/jobs/submit.html index 736de38..f840e2a 100644 --- a/templates/jobs/submit.html +++ b/templates/jobs/submit.html @@ -12,12 +12,12 @@

Submit a Job

+ {% if form.errors %}
{% for error in form.errors %} +

{{ error }}

{% endfor %} +
{% endif %}
View Jobs - {% if form.errors %}
{% for error in form.errors %} -

{{ error }}

{% endfor %} -
{% endif %}
diff --git a/tests/test_api.py b/tests/test_api.py index 86106cd..aa5f908 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -3,6 +3,7 @@ import sys import time +import pytest from fastapi.testclient import TestClient here = os.path.abspath(os.path.dirname(__file__)) @@ -26,6 +27,8 @@ def get_basic_auth(username, password): def get_headers(): + flux_user = os.environ.get("FLUX_USER") + flux_token = os.environ.get("FLUX_TOKEN") auth_header = get_basic_auth(flux_user, flux_token) if isinstance(auth_header, bytes): auth_header = auth_header.decode("utf-8") @@ -34,19 +37,23 @@ def get_headers(): # Do we want auth? test_auth = False +test_pam_auth = False +test_pam_auth_fail = False headers = {} -if os.environ.get("TEST_AUTH"): - # Define authentication in environment for server - flux_user = "fluxuser" - flux_token = "12345" - os.environ["FLUX_USER"] = flux_user - os.environ["FLUX_TOKEN"] = flux_token - os.environ["FLUX_REQUIRE_AUTH"] = "true" +if os.environ.get("TEST_AUTH"): test_auth = True headers = get_headers() +if os.environ.get("TEST_PAM_AUTH"): + test_pam_auth = True + test_pam_auth_fail = os.environ["TEST_PAM_AUTH_FAIL"] == "true" + headers = get_headers() + +skip_if_expected_fail = pytest.mark.skipif(test_pam_auth_fail, reason="Unix stuff") + +@skip_if_expected_fail def test_submit_list_job(): """ Test a manual submission @@ -102,8 +109,8 @@ def test_submit_list_job(): assert not result +@skip_if_expected_fail def test_cancel_job(): - # Now submit a job, ensure we get one job back response = client.post( "/v1/jobs/submit", json={"command": "sleep 10"}, headers=headers @@ -116,6 +123,7 @@ def test_cancel_job(): # TODO we don't have way to actually verify that cancel happened +@skip_if_expected_fail def test_submit_option_flags(): """ Test that option flags are parsed. @@ -155,6 +163,7 @@ def test_submit_option_flags(): assert "id" in result +@skip_if_expected_fail def test_job_output(): """ Test endpoint to retrieve list of job output @@ -184,6 +193,7 @@ def test_job_output(): assert "pancakes πŸ₯žοΈπŸ₯žοΈπŸ₯žοΈ\n" in lines["Output"] +@skip_if_expected_fail def test_job_query(): """ Test endpoint to query jobs