Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Feature/mcfadden8/axl client server #145

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,11 @@ ENDIF(MPI AND MPI_FOUND)

INSTALL(FILES ${libaxl_install_headers} DESTINATION ${CMAKE_INSTALL_INCLUDEDIR})

LIST(APPEND libaxl_srcs
axl.c
axl_sync.c
axl_err.c
axl_io.c
axl_util.c
)
LIST(APPEND libaxl_srcs axl.c axl_sync.c axl_err.c axl_io.c axl_util.c)

# TODO: Separate PTHREADS from SOCKETS
IF(HAVE_PTHREADS)
LIST(APPEND libaxl_srcs axl_pthread.c)
LIST(APPEND libaxl_srcs axl_pthread.c axl_socket.c)
ENDIF(HAVE_PTHREADS)

IF(BBAPI_FOUND)
Expand Down Expand Up @@ -62,6 +57,20 @@ TARGET_INCLUDE_DIRECTORIES(axl-static PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_S
SET_TARGET_PROPERTIES(axl-static PROPERTIES OUTPUT_NAME axl CLEAN_DIRECT_OUTPUT 1)
INSTALL(TARGETS axl-static EXPORT axlTargets LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})

# TODO: Separate PTHREADS from SOCKETS

IF(AXL_LINK_STATIC)
SET(axl_lib axl::axl-static)
ELSE()
SET(axl_lib axl::axl)
ENDIF()

IF(HAVE_PTHREADS)
ADD_EXECUTABLE(axl_socket_daemon axl_socket_daemon.c)
TARGET_LINK_LIBRARIES(axl_socket_daemon ${axl_lib})
INSTALL(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/axl_socket_daemon DESTINATION ${CMAKE_INSTALL_LIBEXECDIR})
ENDIF(HAVE_PTHREADS)

# AXL library with MPI
LIST(APPEND libaxl_mpi_srcs ${libaxl_srcs})

Expand Down
103 changes: 71 additions & 32 deletions src/axl.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
/* xfer methods */
#include "axl_sync.h"

/* (Optional) service functions */
#include "axl_socket.h"

#ifdef HAVE_PTHREADS
#include "axl_pthread.h"
#endif /* HAVE_PTHREAD */
Expand Down Expand Up @@ -75,18 +78,19 @@ int axl_copy_metadata;
/* global rank of calling process, used for BBAPI */
int axl_rank = -1;

/*
* If we are NOT running as a server, use axl_client_xfer_list to contain
* our list of transfer items fro AXL Create. Otherwise, the server will
* change the axl_xfer_list pointer for each connection it is servicing.
*/
static struct axl_transfer_array axl_client_xfer_list = {
.axl_kvtrees = NULL, .axl_kvtrees_count = 0
};
struct axl_transfer_array* axl_xfer_list = &axl_client_xfer_list;

/* reference count for number of times AXL_Init has been called */
static unsigned int axl_init_count = 0;

/* Array for all the AXL_Create'd kvtree pointers. It's indexed by the AXL id.
*
* Note: We only expand this array, we never shrink it. This is fine since
* the user is only going to be calling AXL_Create() a handful of times. It
* also simplifies the code if we never shrink it, and the extra memory usage
* is negligible, if any at all. */
kvtree** axl_kvtrees;
static unsigned int axl_kvtrees_count = 0;

#ifdef HAVE_BBAPI
static int bbapi_is_loaded = 0;
#endif
Expand All @@ -110,19 +114,19 @@ static int axl_alloc_id(const char* state_file)
kvtree_util_set_str(new, AXL_KEY_STATE_FILE, state_file);
}

int id = axl_kvtrees_count;
axl_kvtrees_count++;
int id = axl_xfer_list->axl_kvtrees_count;
axl_xfer_list->axl_kvtrees_count++;

axl_kvtrees = realloc(axl_kvtrees, sizeof(struct kvtree*) * axl_kvtrees_count);
axl_kvtrees[id] = new;
axl_xfer_list->axl_kvtrees = realloc(axl_xfer_list->axl_kvtrees, sizeof(struct kvtree*) * axl_xfer_list->axl_kvtrees_count);
axl_xfer_list->axl_kvtrees[id] = new;

return id;
}

/* Remove the state file for an id, if one exists */
static void axl_remove_state_file(int id)
{
kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
char* state_file = NULL;
if (kvtree_util_get_str(file_list, AXL_KEY_STATE_FILE,
&state_file) == KVTREE_SUCCESS)
Expand All @@ -136,15 +140,15 @@ static void axl_free_id(int id)
{
axl_remove_state_file(id);

/* kvtree_delete() will set axl_kvtrees[id] = NULL */
kvtree_delete(&axl_kvtrees[id]);
/* kvtree_delete() will set axl_xfer_list->axl_kvtrees[id] = NULL */
kvtree_delete(&axl_xfer_list->axl_kvtrees[id]);
}

/* If the user specified a state_file then write our kvtree to it. If not, then
* do nothing. */
void axl_write_state_file(int id)
{
kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
char* state_file = NULL;
if (kvtree_util_get_str(file_list, AXL_KEY_STATE_FILE,
&state_file) == KVTREE_SUCCESS)
Expand All @@ -163,7 +167,7 @@ static int axl_get_info(int id, kvtree** list, axl_xfer_t* type, axl_xfer_state_
*state = AXL_XFER_STATE_NULL;

/* lookup transfer info for the given id */
kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
if (file_list == NULL) {
AXL_ERR("Could not find fileset for UID %d", id);
return AXL_FAILURE;
Expand Down Expand Up @@ -260,6 +264,13 @@ int AXL_Init (void)
axl_make_directories = atoi(val);
}

/* If the user has set both the AXL_SERVICE_HOST and AXL_SERVICE_PORT
* environment variables, then they are expecting to use the AXL Service
* rather than the library included with the SCR library.
*/
char* axl_service_host = NULL;
int axl_service_port = -1;

/* initialize our flag on whether to first copy files to temporary names with extension */
axl_use_extension = 0;
val = getenv("AXL_USE_EXTENSION");
Expand All @@ -275,7 +286,26 @@ int AXL_Init (void)
}

/* keep a reference count to free memory on last AXL_Finalize */
axl_init_count++;
if (axl_init_count++ == 0) {
/*
* If we are not running as the AXL Server, check to see if we are
* expected to run as a client and then connect if so.
*/
if (axl_service_mode != AXL_SOCKET_SERVER) {
if ( (val = getenv("AXL_SERVICE_HOST")) != NULL) {
axl_service_host = strdup(val);

if ( (val = getenv("AXL_SERVICE_PORT")) != NULL) {
axl_service_port = atoi(val);

if (axl_socket_client_init(axl_service_host, (unsigned short)axl_service_port)) {
axl_service_mode = AXL_SOCKET_CLIENT;
}
}
free(axl_service_host);
}
}
}

return rc;
}
Expand All @@ -297,8 +327,13 @@ int AXL_Finalize (void)
axl_init_count--;
if (axl_init_count == 0) {
/* TODO: are there cases where we also need to delete trees? */
axl_free(&axl_kvtrees);
axl_kvtrees_count = 0;
axl_free(&axl_xfer_list->axl_kvtrees);
axl_xfer_list->axl_kvtrees_count = 0;

if (axl_service_mode == AXL_SOCKET_CLIENT) {
axl_socket_client_AXL_Finalize();
}

}

return rc;
Expand Down Expand Up @@ -360,7 +395,7 @@ static kvtree* AXL_Config_Set(const kvtree* config)
char* endptr;
long id = strtol(key, &endptr, 10);
if ((*key == '\0' || *endptr != '\0') ||
(id < 0 || id >= axl_kvtrees_count))
(id < 0 || id >= axl_xfer_list->axl_kvtrees_count))
{
retval = NULL;
break;
Expand All @@ -372,7 +407,7 @@ static kvtree* AXL_Config_Set(const kvtree* config)
break;
}

kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];

const char** opt;
for (opt = known_transfer_options; *opt != NULL; opt++) {
Expand Down Expand Up @@ -462,6 +497,10 @@ static kvtree* AXL_Config_Set(const kvtree* config)
}
}

if (axl_service_mode == AXL_SOCKET_CLIENT) {
axl_socket_client_AXL_Config_Set(config);
}

return retval;
}

Expand Down Expand Up @@ -502,8 +541,8 @@ static kvtree* AXL_Config_Get()

/* per transfer options */
int id;
for (id = 0; id < axl_kvtrees_count; id++) {
kvtree* file_list = axl_kvtrees[id];
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
if (file_list == NULL) {
/* TODO: check if it would be better to return an empty hash instead */
continue;
Expand Down Expand Up @@ -613,7 +652,7 @@ int AXL_Create(axl_xfer_t xtype, const char* name, const char* state_file)
return -1;
}

kvtree* file_list = axl_kvtrees[id];
kvtree* file_list = axl_xfer_list->axl_kvtrees[id];
kvtree_util_set_int(file_list, AXL_KEY_XFER_TYPE, xtype);
kvtree_util_set_str(file_list, AXL_KEY_UNAME, name);
if (!reload_from_state_file) {
Expand Down Expand Up @@ -1521,8 +1560,8 @@ int AXL_Stop ()

/* cancel each active id */
int id;
for (id = 0; id < axl_kvtrees_count; id++) {
if (!axl_kvtrees[id]) {
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
if (!axl_xfer_list->axl_kvtrees[id]) {
continue;
}

Expand All @@ -1532,8 +1571,8 @@ int AXL_Stop ()
}

/* wait */
for (id = 0; id < axl_kvtrees_count; id++) {
if (!axl_kvtrees[id]) {
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
if (!axl_xfer_list->axl_kvtrees[id]) {
continue;
}

Expand All @@ -1543,8 +1582,8 @@ int AXL_Stop ()
}

/* and free it */
for (id = 0; id < axl_kvtrees_count; id++) {
if (!axl_kvtrees[id]) {
for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) {
if (!axl_xfer_list->axl_kvtrees[id]) {
continue;
}

Expand Down
17 changes: 13 additions & 4 deletions src/axl_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
/* unless otherwise indicated all global variables defined in this file must
* only be accessed by the main thread */

/*
* A list of pointers to kvtrees, indexed by AXL ID.
*/
extern kvtree** axl_kvtrees;
struct axl_transfer_array {
/* Array for all the AXL_Create'd kvtree pointers. It's indexed by the AXL id.
*
* Note: We only expand this array, we never shrink it. This is fine since
* the user is only going to be calling AXL_Create() a handful of times. It
* also simplifies the code if we never shrink it, and the extra memory usage
* is negligible, if any at all. */
kvtree** axl_kvtrees;
unsigned int axl_kvtrees_count;
};

extern struct axl_transfer_array* axl_xfer_list;

/* current debug level for AXL library,
* set in AXL_Init and AXL_Config used in axl_dbg.
Expand Down Expand Up @@ -63,6 +71,7 @@ extern int axl_rank;

/* attaches function name, file name, and line number to error messages
* https://gcc.gnu.org/onlinedocs/cpp/Variadic-Macros.html */
#define AXL_ABORT(exitcode, format, ...) axl_abort(exitcode, format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__)
#define AXL_ERR(format, ...) axl_err(format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__)
#define AXL_DBG(level, format, ...) axl_dbg(level, format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__)

Expand Down
57 changes: 3 additions & 54 deletions src/axl_mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ int AXL_Finalize_comm (
return rc;
}

#include <stdio.h>

int AXL_Create_comm (
axl_xfer_t type, /**< [IN] - AXL transfer type (AXL_XFER_SYNC, AXL_XFER_PTHREAD, etc) */
const char* name,
const char* name,
const char* file,
MPI_Comm comm) /**< [IN] - communicator used for coordination and flow control */
{
Expand Down Expand Up @@ -144,59 +146,6 @@ int AXL_Dispatch_comm (
int id, /**< [IN] - transfer hander ID returned from AXL_Create */
MPI_Comm comm) /**< [IN] - communicator used for coordination and flow control */
{
#if 0
/* lookup transfer info for the given id */
kvtree* file_list = NULL;
axl_xfer_t xtype = AXL_XFER_NULL;
axl_xfer_state_t xstate = AXL_XFER_STATE_NULL;
if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) {
AXL_ERR("Could not find transfer info for UID %d", id);
return AXL_FAILURE;
}

/* check that handle is in correct state to dispatch */
if (xstate != AXL_XFER_STATE_CREATED) {
AXL_ERR("Invalid state to dispatch UID %d", id);
return AXL_FAILURE;
}
kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_DISPATCHED);
#endif

#if 0
/* create destination directories for each file */
if (axl_make_directories) {
/* count number of files we have */
kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id);
kvtree* files_hash = kvtree_get(file_list, AXL_KEY_FILES);
int num_files = kvtree_size(files_hash);

/* allocate pointer for each one */
const char** files = (const char**) AXL_MALLOC(num_files * sizeof(char*));

/* set pointer to each file */
int i;
char* dest;
kvtree_elem* elem;
while ((elem = axl_get_next_path(id, elem, NULL, &dest))) {
files[i] = dest;
i++;
}

/* create directories */
axl_create_dirs(num_files, files, comm);

/* free list of files */
axl_free2(&files);
}

/* TODO: this is hacky */
/* delegate remaining work to regular dispatch,
* but disable mkdir since we already did that */
int make_dir = axl_make_directories;
axl_make_directories = 0;
int rc = AXL_Dispatch(id);
axl_make_directories = make_dir;
#endif
/* delegate remaining work to regular dispatch */
int rc = AXL_Dispatch(id);

Expand Down
Loading
Loading