Skip to content

Commit

Permalink
Merge pull request flux-framework#6445 from garlick/issue#6089
Browse files Browse the repository at this point in the history
support partially allocated jobs across scheduler reload
  • Loading branch information
mergify[bot] authored Dec 17, 2024
2 parents 280e509 + f580637 commit 09d70fa
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 42 deletions.
3 changes: 2 additions & 1 deletion src/common/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ libflux_idset_la_LDFLAGS = \
libflux_schedutil_la_SOURCES =
libflux_schedutil_la_LIBADD = \
$(builddir)/libschedutil/libschedutil.la \
$(builddir)/libczmqcontainers/libczmqcontainers.la \
$(builddir)/librlist/librlist.la \
libflux-internal.la \
libflux-core.la \
$(JANSSON_LIBS)
libflux_schedutil_la_LDFLAGS = \
Expand Down
48 changes: 42 additions & 6 deletions src/common/libschedutil/hello.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <jansson.h>

#include "src/common/libjob/idf58.h"
#include "src/common/librlist/rlist.h"
#include "src/common/libutil/errno_safe.h"

#include "schedutil_private.h"
#include "init.h"
#include "hello.h"
Expand All @@ -39,15 +42,40 @@ static void raise_exception (flux_t *h, flux_jobid_t id, const char *note)
flux_future_destroy (f);
}

static const char *create_partial_R (const flux_msg_t *msg,
const char *R_orig,
const char *free_ranks)
{
struct idset *ids;
struct rlist *rl = NULL;
char *R_new = NULL;

if (!(ids = idset_decode (free_ranks))
|| !(rl = rlist_from_R (R_orig))
|| rlist_remove_ranks (rl, ids) < 0
|| !(R_new = rlist_encode (rl))
|| flux_msg_aux_set (msg, NULL, R_new, (flux_free_f)free) < 0) {
ERRNO_SAFE_WRAP (free, R_new);
R_new = NULL;
}
rlist_destroy (rl);
idset_destroy (ids);
return R_new;
}

static int schedutil_hello_job (schedutil_t *util,
const flux_msg_t *msg)
{
char key[64];
flux_future_t *f = NULL;
const char *R;
flux_jobid_t id;
const char *free_ranks = NULL;

if (flux_msg_unpack (msg, "{s:I}", "id", &id) < 0)
if (flux_msg_unpack (msg,
"{s:I s?s}",
"id", &id,
"free", &free_ranks) < 0)
goto error;
if (flux_job_kvs_key (key, sizeof (key), id, "R") < 0) {
errno = EPROTO;
Expand All @@ -57,6 +85,10 @@ static int schedutil_hello_job (schedutil_t *util,
goto error;
if (flux_kvs_lookup_get (f, &R) < 0)
goto error;
if (free_ranks) {
if (!(R = create_partial_R (msg, R, free_ranks)))
goto error;
}
if (util->ops->hello (util->h,
msg,
R,
Expand All @@ -78,16 +110,20 @@ int schedutil_hello (schedutil_t *util)
{
flux_future_t *f;
int rc = -1;
int partial_ok = 0;

if (!util || !util->ops->hello) {
errno = EINVAL;
return -1;
}
if (!(f = flux_rpc (util->h,
"job-manager.sched-hello",
NULL,
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING)))
if ((util->flags & SCHEDUTIL_HELLO_PARTIAL_OK))
partial_ok = 1;
if (!(f = flux_rpc_pack (util->h,
"job-manager.sched-hello",
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING,
"{s:b}",
"partial-ok", partial_ok)))
return -1;
while (1) {
const flux_msg_t *msg;
Expand Down
1 change: 1 addition & 0 deletions src/common/libschedutil/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ typedef struct schedutil_ctx schedutil_t;

enum schedutil_flags {
SCHEDUTIL_FREE_NOLOOKUP = 1, // now the default so this flag is ignored
SCHEDUTIL_HELLO_PARTIAL_OK = 2,
};

/* Create a handle for the schedutil convenience library.
Expand Down
11 changes: 8 additions & 3 deletions src/modules/job-manager/alloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,17 +309,22 @@ static void hello_cb (flux_t *h,
{
struct job_manager *ctx = arg;
struct job *job;
int partial_ok = 0;

/* N.B. no "state" is set in struct alloc after a hello msg, so do
* not set ctx->alloc->sched_sender in here. Do so only in the
* ready callback */
if (flux_request_decode (msg, NULL, NULL) < 0)
if (flux_request_unpack (msg, NULL, "{s?b}", "partial-ok", &partial_ok) < 0
&& flux_request_decode (msg, NULL, NULL) < 0)
goto error;
if (!flux_msg_is_streaming (msg)) {
errno = EPROTO;
goto error;
}
flux_log (h, LOG_DEBUG, "scheduler: hello");
flux_log (h,
LOG_DEBUG,
"scheduler: hello%s",
partial_ok ? " +partial-ok" : "");
job = zhashx_first (ctx->active_jobs);
while (job) {
if (job->has_resources && !job->alloc_bypass) {
Expand All @@ -334,7 +339,7 @@ static void hello_cb (flux_t *h,
}
job = zhashx_next (ctx->active_jobs);
}
if (housekeeping_hello_respond (ctx->housekeeping, msg) < 0)
if (housekeeping_hello_respond (ctx->housekeeping, msg, partial_ok) < 0)
goto error;
if (flux_respond_error (h, msg, ENODATA, NULL) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
Expand Down
73 changes: 53 additions & 20 deletions src/modules/job-manager/housekeeping.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
#include <flux/idset.h>
#include <unistd.h>
#include <signal.h>
#include <jansson.h>
#ifdef HAVE_ARGZ_ADD
#include <argz.h>
#else
Expand Down Expand Up @@ -110,11 +111,11 @@ struct allocation {
flux_jobid_t id;
struct rlist *rl; // R, diminished each time a subset is released
struct idset *pending; // ranks in need of housekeeping
struct idset *free; // ranks that have been released to the scheduler
struct housekeeping *hk;
flux_watcher_t *timer;
bool timer_armed;
bool timer_expired;
int free_count; // number of releases
double t_start;
struct bulk_exec *bulk_exec;
void *list_handle;
Expand Down Expand Up @@ -142,6 +143,7 @@ static void allocation_destroy (struct allocation *a)
int saved_errno = errno;
rlist_destroy (a->rl);
idset_destroy (a->pending);
idset_destroy (a->free);
flux_watcher_destroy (a->timer);
bulk_exec_destroy (a->bulk_exec);
free (a);
Expand Down Expand Up @@ -182,6 +184,7 @@ static struct allocation *allocation_create (struct housekeeping *hk,
a->t_start = flux_reactor_now (flux_get_reactor (hk->ctx->h));
if (!(a->rl = rlist_from_json (R, NULL))
|| !(a->pending = rlist_ranks (a->rl))
|| !(a->free = idset_create (idset_universe_size (a->pending), 0))
|| !(a->timer = flux_timer_watcher_create (r,
0,
0.,
Expand Down Expand Up @@ -242,7 +245,8 @@ static void allocation_release (struct allocation *a)
|| !(rl = rlist_copy_ranks (a->rl, ranks))
|| !(R = rlist_to_R (rl))
|| alloc_send_free_request (ctx->alloc, R, a->id, final) < 0
|| rlist_remove_ranks (a->rl, ranks) < 0) {
|| rlist_remove_ranks (a->rl, ranks) < 0
|| idset_add (a->free, ranks) < 0) {
char *s = idset_encode (ranks, IDSET_FLAG_RANGE);
flux_log (ctx->h,
LOG_ERR,
Expand All @@ -251,8 +255,6 @@ static void allocation_release (struct allocation *a)
s ? s : "NULL");
free (s);
}
else
a->free_count++;
json_decref (R);
rlist_destroy (rl);
idset_destroy (ranks);
Expand Down Expand Up @@ -465,36 +467,61 @@ int housekeeping_start (struct housekeeping *hk,
return alloc_send_free_request (hk->ctx->alloc, R, id, true);
}

static int set_idset_string (json_t *obj, const char *key, struct idset *ids)
{
char *s;
json_t *o = NULL;

if (!(s = idset_encode (ids, IDSET_FLAG_RANGE))
|| !(o = json_string (s))
|| json_object_set_new (obj, key, o) < 0) {
json_decref (o);
free (s);
return -1;
}
free (s);
return 0;
}

static int housekeeping_hello_respond_one (struct housekeeping *hk,
const flux_msg_t *msg,
struct allocation *a,
bool partial_ok,
flux_error_t *error)
{
struct job *job;
json_t *payload = NULL;

if (a->free_count > 0) {
errprintf (error, "partial release is not supported by RFC 27 hello");
goto error;
if (!idset_empty (a->free) && !partial_ok) {
errprintf (error,
"scheduler does not support restart with partially"
" released resources");
return -1;
}
if (!(job = zhashx_lookup (hk->ctx->inactive_jobs, &a->id))
&& !(job = zhashx_lookup (hk->ctx->active_jobs, &a->id))) {
errprintf (error, "the job could not be looked up during RFC 27 hello");
goto error;
return -1;
}
if (flux_respond_pack (hk->ctx->h,
msg,
"{s:I s:I s:I s:f}",
"id", job->id,
"priority", job->priority,
"userid", (json_int_t)job->userid,
"t_submit", job->t_submit) < 0) {
errprintf (error,
"the RFC 27 hello response could not be sent: %s",
strerror (errno));
if (!(payload = json_pack ("{s:I s:I s:I s:f}",
"id", job->id,
"priority", job->priority,
"userid", (json_int_t)job->userid,
"t_submit", job->t_submit)))
goto error;
if (!idset_empty (a->free)) {
if (set_idset_string (payload, "free", a->free) < 0)
goto error;
}
if (flux_respond_pack (hk->ctx->h, msg, "O", payload) < 0)
goto error;
json_decref (payload);
return 0;
error:
errprintf (error,
"failed to send scheduler HELLO handshake: %s",
strerror (errno));
json_decref (payload);
return -1;
}

Expand All @@ -513,14 +540,20 @@ static void kill_continuation (flux_future_t *f, void *arg)
* allocations. Send remaining housekeeping tasks a SIGTERM, log an error,
* and delete the allocation.
*/
int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg)
int housekeeping_hello_respond (struct housekeeping *hk,
const flux_msg_t *msg,
bool partial_ok)
{
struct allocation *a;
flux_error_t error;

a = zlistx_first (hk->allocations);
while (a) {
if (housekeeping_hello_respond_one (hk, msg, a, &error) < 0) {
if (housekeeping_hello_respond_one (hk,
msg,
a,
partial_ok,
&error) < 0) {
char *ranks;
char *hosts = NULL;
flux_future_t *f;
Expand Down
4 changes: 3 additions & 1 deletion src/modules/job-manager/housekeeping.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ int housekeeping_start (struct housekeeping *hk,
* It should inform the scheduler about resources that are still allocated,
* but no longer directly held by jobs.
*/
int housekeeping_hello_respond (struct housekeeping *hk, const flux_msg_t *msg);
int housekeeping_hello_respond (struct housekeeping *hk,
const flux_msg_t *msg,
bool partial_ok);

json_t *housekeeping_get_stats (struct housekeeping *hk);

Expand Down
19 changes: 14 additions & 5 deletions src/modules/sched-simple/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ static struct simple_sched * simple_sched_create (void)
* concurrency being excessively large.
*/
ss->alloc_limit = 8;

ss->schedutil_flags = SCHEDUTIL_HELLO_PARTIAL_OK;
return ss;
}

Expand Down Expand Up @@ -544,24 +546,28 @@ static int hello_cb (flux_t *h,
unsigned int priority;
uint32_t userid;
double t_submit;
const char *free_ranks = NULL;

if (flux_msg_unpack (msg,
"{s:I s:i s:i s:f}",
"{s:I s:i s:i s:f s?s}",
"id", &id,
"priority", &priority,
"userid", &userid,
"t_submit", &t_submit) < 0) {
"t_submit", &t_submit,
"free", &free_ranks) < 0) {
flux_log_error (h, "hello: invalid hello payload");
return -1;
}

flux_log (h,
LOG_DEBUG,
"hello: id=%s priority=%u userid=%u t_submit=%0.1f",
"hello: id=%s priority=%u userid=%u t_submit=%0.1f %s%s",
idf58 (id),
priority,
(unsigned int)userid,
t_submit);
t_submit,
free_ranks ? "free=" : "",
free_ranks ? free_ranks : "");

alloc = rlist_from_R (R);
if (!alloc) {
Expand All @@ -570,7 +576,7 @@ static int hello_cb (flux_t *h,
}
s = rlist_dumps (alloc);
if ((rc = rlist_set_allocated (ss->rlist, alloc)) < 0)
flux_log_error (h, "hello: rlist_remove (%s)", s);
flux_log_error (h, "hello: alloc %s", s);
else
flux_log (h, LOG_DEBUG, "hello: alloc %s", s);
free (s);
Expand Down Expand Up @@ -958,6 +964,9 @@ static int process_args (flux_t *h, struct simple_sched *ss,
else if (streq (argv[i], "test-free-nolookup")) {
ss->schedutil_flags |= SCHEDUTIL_FREE_NOLOOKUP;
}
else if (streq (argv[i], "test-hello-nopartial")) {
ss->schedutil_flags &= ~SCHEDUTIL_HELLO_PARTIAL_OK;
}
else {
flux_log_error (h, "Unknown module option: '%s'", argv[i]);
errno = EINVAL;
Expand Down
Loading

0 comments on commit 09d70fa

Please sign in to comment.