Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource manager support for Flux #798

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/rm_enumerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ENUMITEM(SLURM, "SchedMD SLURM") \
ENUMITEM(LSF, "IBM Spectrum LSF") \
ENUMITEM(LSF_CSM, "IBM Spectrum LSF with Cluster System Management") \
ENUMITEM(FLUX, "Flux") \

#ifdef __cplusplus
extern "C" {
Expand Down
209 changes: 209 additions & 0 deletions util/unifyfs/src/unifyfs-rm.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,102 @@ static int slurm_read_resource(unifyfs_resource_t* resource)
return ret;
}

/**
*
* @brief Get list of nodes using Flux resource
*
* @param resource The job resource record to be filled
*
* @return 0 on success, negative errno otherwise
*/
static int flux_read_resource(unifyfs_resource_t* resource)
{
size_t n_nodes = 0;
char num_nodes_str[10] = {0};
char nodelist_str[1024] = {0};
char* ret = NULL;
FILE* pipe_fp = NULL;
char** nodes = NULL;
int node_idx = 0;

// get num nodes using flux resource command
pipe_fp = popen("flux resource list --states=free -no {nnodes}", "r");
ret = fgets(num_nodes_str, sizeof(num_nodes_str), pipe_fp);
if (ret == NULL) {
pclose(pipe_fp);
return -EINVAL;
}
n_nodes = (size_t) strtoul(num_nodes_str, NULL, sizeof(num_nodes_str));
pclose(pipe_fp);

nodes = calloc(sizeof(char*), n_nodes);
if (nodes == NULL) {
return -ENOMEM;
}

// get node list using flux resource command
// the returned list is in a condensed format
// e.g., tioga[18-19,21,32]
// TODO: is it safe to assume flux resource only
// returns a single line?
pipe_fp = popen("flux resource list --states=free -no {nodelist}", "r");
ret = fgets(nodelist_str, sizeof(nodelist_str), pipe_fp);
if (ret == NULL) {
pclose(pipe_fp);
return -EINVAL;
}
pclose(pipe_fp);

// get the node ids, i.e., the list in []
char* node_ids = strstr(nodelist_str, "[");
if (node_ids) {
// remove the trailing "]\n"
nodelist_str[strlen(nodelist_str)-2] = 0;
char* host = calloc(1, (node_ids-nodelist_str)+2);
strncpy(host, nodelist_str, (node_ids-nodelist_str));

// separate by ","
char* end_str;
char* token = strtok_r(node_ids+1, ",", &end_str);
while (token) {
// case 1: contiguous node range
// e.g., 3-5, lo=3, hi=5
// case 2: a single node, then lo=hi
int lo, hi;
if (strstr(token, "-")) {
char* end_str2;
char* lo_str = strtok_r(token, "-", &end_str2);
char* hi_str = strtok_r(NULL, "-", &end_str2);
lo = atoi(lo_str);
hi = atoi(hi_str);
} else {
lo = atoi(token);
hi = lo;
}

for (int i = lo; i <= hi; i++) {
char nodename[30] = {0};
sprintf(nodename, "%s%d", host, i);
if (node_idx >= n_nodes) {
return -EINVAL;
}
nodes[node_idx++] = strdup(nodename);
}
token = strtok_r(NULL, ",", &end_str);
}
} else {
// no '[' in the string, meaning it has a single node
if (n_nodes != 1) {
return -EINVAL;
}
nodes[0] = strdup(nodelist_str);
wangvsa marked this conversation as resolved.
Show resolved Hide resolved
}

resource->n_nodes = n_nodes;
resource->nodes = nodes;
return 0;
}

// construct_server_argv():
// This function is called in two ways.
// Call it once with server_argv==NULL and it
Expand Down Expand Up @@ -1143,6 +1239,107 @@ static int srun_stage(unifyfs_resource_t* resource,
return -errno;
}

/**
* @brief Launch servers using Flux
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_launch(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t argc, flux_argc, server_argc;
char** argv = NULL;
char n_nodes[16];
char n_tasks[16];
char n_cores[8];

snprintf(n_nodes, sizeof(n_nodes), "-N%zu", resource->n_nodes);
// without -n, --ntasks, Flux will schedule the server job
// to use all nodes exclusively
snprintf(n_tasks, sizeof(n_tasks), "-n%zu", resource->n_nodes);
snprintf(n_cores, sizeof(n_cores), "-c%d", resource->n_cores_per_server);

// full command: srun <srun args> <server args>
flux_argc = 5;
server_argc = construct_server_argv(args, NULL);

// setup full command argv
argc = 1 + flux_argc + server_argc;
argv = calloc(argc, sizeof(char*));
argv[0] = strdup("flux");
argv[1] = strdup("run");
argv[2] = strdup(n_nodes);
argv[3] = strdup(n_tasks);
argv[4] = strdup(n_cores);
construct_server_argv(args, argv + flux_argc);

if (args->debug) {
for (int i = 0; i < (argc - 1); i++) {
fprintf(stdout, "UNIFYFS LAUNCH DEBUG: flux argv[%d] = %s\n",
i, argv[i]);
fflush(stdout);
}
}

execvp(argv[0], argv);
perror("failed to execvp() flux to launch unifyfsd");
return -errno;
}

/**
* @brief Terminate servers using Flux
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_terminate(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t argc, flux_argc;
char** argv = NULL;

// full command: flux <flux args> pkill name:unifyfsd
flux_argc = 3;
argc = 1 + flux_argc;
argv = calloc(argc, sizeof(char*));
argv[0] = strdup("flux");
argv[1] = strdup("pkill");
argv[2] = strdup("name:unifyfsd");

execvp(argv[0], argv);
perror("failed to execvp() flux to pkill unifyfsd");
return -errno;
}

/**
* @brief Launch unifyfs-stage using flux run
*
* @param resource The job resource record
* @param args The command-line options
*
* @return
*/
static int flux_stage(unifyfs_resource_t* resource,
unifyfs_args_t* args)
{
size_t flux_argc = 5;
char cmd[200];

// full command: flux run <flux args> <server args>
snprintf(cmd, sizeof(cmd), "flux run -N%zu -n%zu -c1",
resource->n_nodes, resource->n_nodes);

generic_stage(cmd, flux_argc, args);

perror("failed to execvp() flux to launch unifyfsd");
return -errno;
}

/**
* @brief Launch servers using custom script
*
Expand Down Expand Up @@ -1249,6 +1446,13 @@ static _ucr_resource_manager_t resource_managers[] = {
.terminate = &jsrun_terminate,
.stage = &jsrun_stage,
},
{
.type = "flux",
.read_resource = &flux_read_resource,
.launch = &flux_launch,
.terminate = &flux_terminate,
.stage = &flux_stage,
},
};

int unifyfs_detect_resources(unifyfs_resource_t* resource)
Expand All @@ -1257,6 +1461,11 @@ int unifyfs_detect_resources(unifyfs_resource_t* resource)
resource->rm = UNIFYFS_RM_PBS;
} else if (getenv("SLURM_JOBID") != NULL) {
resource->rm = UNIFYFS_RM_SLURM;
} else if (getenv("FLUX_EXEC_PATH") != NULL) {
// TODO: need to use a better environment
// variable or maybe a better way to decide
// whether to use flux scheduler.
resource->rm = UNIFYFS_RM_FLUX;
} else if (getenv("LSB_JOBID") != NULL) {
if (getenv("CSM_ALLOCATION_ID") != NULL) {
resource->rm = UNIFYFS_RM_LSF_CSM;
Expand Down
Loading