Skip to content

Commit

Permalink
Address TODO item in RMI.
Browse files Browse the repository at this point in the history
Use newer style object creation and destruction forms.

Signed-off-by: Samuel K. Gutierrez <samuel@lanl.gov>
  • Loading branch information
samuelkgutierrez committed Jul 12, 2024
1 parent 8827aed commit d600df0
Showing 1 changed file with 78 additions and 110 deletions.
188 changes: 78 additions & 110 deletions src/qvi-rmi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,42 @@

static const cstr_t ZINPROC_ADDR = "inproc://qvi-rmi-workers";

static void
send_server_shutdown_msg(
qvi_rmi_server_t *server
);

static void
qvi_zwrn_msg(
cstr_t ers,
int err_no
) {
const int erno = err_no;
qvi_log_warn("{} with errno={} ({})", (ers), erno, qvi_strerr(erno));
}

static void
zsocket_close(
void **sock
) {
void *isock = *sock;
if (!isock) return;
const int rc = zmq_close(isock);
if (rc != 0) qvi_zwrn_msg("zmq_close() failed", errno);
*sock = nullptr;
}

static void
zctx_destroy(
void **ctx
) {
void *ictx = *ctx;
if (!ictx) return;
const int rc = zmq_ctx_destroy(ictx);
if (rc != 0) qvi_zwrn_msg("zmq_ctx_destroy() failed", errno);
*ctx = nullptr;
}

struct qvi_rmi_server_s {
/** Server configuration */
qvi_rmi_config_s config;
Expand All @@ -47,6 +83,27 @@ struct qvi_rmi_server_s {
pthread_t worker_thread;
/** Flag indicating if main thread blocks for workers to complete. */
bool blocks = false;
/** Constructor. */
qvi_rmi_server_s(void)
{
zctx = zmq_ctx_new();
if (!zctx) throw qvi_runtime_error();

const int rc = qvi_new(&hwpool);
if (rc != QV_SUCCESS) throw qvi_runtime_error();
}
/** Destructor. */
~qvi_rmi_server_s(void)
{
send_server_shutdown_msg(this);
zsocket_close(&zlo);
zctx_destroy(&zctx);
unlink(config.hwtopo_path.c_str());
qvi_delete(&hwpool);
if (!blocks) {
pthread_join(worker_thread, nullptr);
}
}
};

struct qvi_rmi_client_s {
Expand All @@ -56,6 +113,23 @@ struct qvi_rmi_client_s {
void *zctx = nullptr;
/** Communication socket */
void *zsock = nullptr;
/** Constructor. */
qvi_rmi_client_s(void)
{
// Remember clients own the hwloc data, unlike the server.
int rc = qvi_hwloc_new(&config.hwloc);
if (rc != QV_SUCCESS) throw qvi_runtime_error();
// Create a new ZMQ context.
zctx = zmq_ctx_new();
if (!zctx) throw qvi_runtime_error();
}
/** Destructor. */
~qvi_rmi_client_s(void)
{
zsocket_close(&zsock);
zctx_destroy(&zctx);
qvi_hwloc_free(&config.hwloc);
}
};

typedef enum qvi_rpc_funid_e {
Expand Down Expand Up @@ -102,37 +176,6 @@ qvi_zerr_msg(
qvi_log_error("{} with errno={} ({})", ers, erno, qvi_strerr(erno));
}

static void
qvi_zwrn_msg(
cstr_t ers,
int err_no
) {
const int erno = err_no;
qvi_log_warn("{} with errno={} ({})", (ers), erno, qvi_strerr(erno));
}

static void
zctx_destroy(
void **ctx
) {
void *ictx = *ctx;
if (!ictx) return;
const int rc = zmq_ctx_destroy(ictx);
if (rc != 0) qvi_zwrn_msg("zmq_ctx_destroy() failed", errno);
*ctx = nullptr;
}

static void
zsocket_close(
void **sock
) {
void *isock = *sock;
if (!isock) return;
const int rc = zmq_close(isock);
if (rc != 0) qvi_zwrn_msg("zmq_close() failed", errno);
*sock = nullptr;
}

static void *
zsocket_create_and_connect(
void *zctx,
Expand Down Expand Up @@ -761,31 +804,7 @@ int
qvi_rmi_server_new(
qvi_rmi_server_t **server
) {
cstr_t ers = nullptr;

qvi_rmi_server_t *iserver = nullptr;
int rc = qvi_new(&iserver);
if (rc != QV_SUCCESS) goto out;

iserver->zctx = zmq_ctx_new();
if (!iserver->zctx) {
ers = "zmq_ctx_new() failed";
rc = QV_ERR_MSG;
goto out;
}

rc = qvi_new(&iserver->hwpool);
if (rc != QV_SUCCESS) {
ers = "qvi_hwpool_new() failed";
goto out;
}
out:
if (ers) {
qvi_log_error("{} with rc={} ({})", ers, rc, qv_strerr(rc));
qvi_rmi_server_free(&iserver);
}
*server = iserver;
return rc;
return qvi_new(server);
}

static void
Expand All @@ -800,25 +819,7 @@ void
qvi_rmi_server_free(
qvi_rmi_server_t **server
) {
if (!server) return;
qvi_rmi_server_t *iserver = *server;
if (!iserver) goto out;

send_server_shutdown_msg(iserver);

zsocket_close(&iserver->zlo);
zctx_destroy(&iserver->zctx);

unlink(iserver->config.hwtopo_path.c_str());
qvi_delete(&iserver->hwpool);

if (!iserver->blocks) {
pthread_join(iserver->worker_thread, nullptr);
}

delete iserver;
out:
*server = nullptr;
qvi_delete(server);
}

int
Expand Down Expand Up @@ -917,51 +918,18 @@ qvi_rmi_server_start(
return qvrc;
}

// TODO(skg) Use new method.
int
qvi_rmi_client_new(
qvi_rmi_client_t **client
) {
cstr_t ers = nullptr;

qvi_rmi_client_t *icli = nullptr;
int rc = qvi_new(&icli);
if (rc != QV_SUCCESS) goto out;
// Remember clients own the hwloc data, unlike the server.
rc = qvi_hwloc_new(&icli->config.hwloc);
if (rc != QV_SUCCESS) {
ers = "qvi_hwloc_new() failed";
goto out;
}

icli->zctx = zmq_ctx_new();
if (!icli->zctx) {
ers = "zmq_ctx_new() failed";
rc = QV_ERR_MSG;
goto out;
}
out:
if (ers) {
qvi_log_error("{} with rc={} ({})", ers, rc, qv_strerr(rc));
qvi_rmi_client_free(&icli);
}
*client = icli;
return rc;
return qvi_new(client);
}

void
qvi_rmi_client_free(
qvi_rmi_client_t **client
) {
if (!client) return;
qvi_rmi_client_t *iclient = *client;
if (!iclient) goto out;
zsocket_close(&iclient->zsock);
zctx_destroy(&iclient->zctx);
qvi_hwloc_free(&iclient->config.hwloc);
delete iclient;
out:
*client = nullptr;
qvi_delete(client);
}

static int
Expand Down

0 comments on commit d600df0

Please sign in to comment.