diff --git a/src/qvi-rmi.cc b/src/qvi-rmi.cc index f30634d..eb11f7c 100644 --- a/src/qvi-rmi.cc +++ b/src/qvi-rmi.cc @@ -34,6 +34,42 @@ static const cstr_t ZINPROC_ADDR = "inproc://qvi-rmi-workers"; +static void +send_server_shutdown_msg( + qvi_rmi_server_t *server +); + +static void +qvi_zwrn_msg( + cstr_t ers, + int err_no +) { + const int erno = err_no; + qvi_log_warn("{} with errno={} ({})", (ers), erno, qvi_strerr(erno)); +} + +static void +zsocket_close( + void **sock +) { + void *isock = *sock; + if (!isock) return; + const int rc = zmq_close(isock); + if (rc != 0) qvi_zwrn_msg("zmq_close() failed", errno); + *sock = nullptr; +} + +static void +zctx_destroy( + void **ctx +) { + void *ictx = *ctx; + if (!ictx) return; + const int rc = zmq_ctx_destroy(ictx); + if (rc != 0) qvi_zwrn_msg("zmq_ctx_destroy() failed", errno); + *ctx = nullptr; +} + struct qvi_rmi_server_s { /** Server configuration */ qvi_rmi_config_s config; @@ -47,6 +83,27 @@ struct qvi_rmi_server_s { pthread_t worker_thread; /** Flag indicating if main thread blocks for workers to complete. */ bool blocks = false; + /** Constructor. */ + qvi_rmi_server_s(void) + { + zctx = zmq_ctx_new(); + if (!zctx) throw qvi_runtime_error(); + + const int rc = qvi_new(&hwpool); + if (rc != QV_SUCCESS) throw qvi_runtime_error(); + } + /** Destructor. */ + ~qvi_rmi_server_s(void) + { + send_server_shutdown_msg(this); + zsocket_close(&zlo); + zctx_destroy(&zctx); + unlink(config.hwtopo_path.c_str()); + qvi_delete(&hwpool); + if (!blocks) { + pthread_join(worker_thread, nullptr); + } + } }; struct qvi_rmi_client_s { @@ -56,6 +113,23 @@ struct qvi_rmi_client_s { void *zctx = nullptr; /** Communication socket */ void *zsock = nullptr; + /** Constructor. */ + qvi_rmi_client_s(void) + { + // Remember clients own the hwloc data, unlike the server. + int rc = qvi_hwloc_new(&config.hwloc); + if (rc != QV_SUCCESS) throw qvi_runtime_error(); + // Create a new ZMQ context. + zctx = zmq_ctx_new(); + if (!zctx) throw qvi_runtime_error(); + } + /** Destructor. */ + ~qvi_rmi_client_s(void) + { + zsocket_close(&zsock); + zctx_destroy(&zctx); + qvi_hwloc_free(&config.hwloc); + } }; typedef enum qvi_rpc_funid_e { @@ -102,37 +176,6 @@ qvi_zerr_msg( qvi_log_error("{} with errno={} ({})", ers, erno, qvi_strerr(erno)); } -static void -qvi_zwrn_msg( - cstr_t ers, - int err_no -) { - const int erno = err_no; - qvi_log_warn("{} with errno={} ({})", (ers), erno, qvi_strerr(erno)); -} - -static void -zctx_destroy( - void **ctx -) { - void *ictx = *ctx; - if (!ictx) return; - const int rc = zmq_ctx_destroy(ictx); - if (rc != 0) qvi_zwrn_msg("zmq_ctx_destroy() failed", errno); - *ctx = nullptr; -} - -static void -zsocket_close( - void **sock -) { - void *isock = *sock; - if (!isock) return; - const int rc = zmq_close(isock); - if (rc != 0) qvi_zwrn_msg("zmq_close() failed", errno); - *sock = nullptr; -} - static void * zsocket_create_and_connect( void *zctx, @@ -761,31 +804,7 @@ int qvi_rmi_server_new( qvi_rmi_server_t **server ) { - cstr_t ers = nullptr; - - qvi_rmi_server_t *iserver = nullptr; - int rc = qvi_new(&iserver); - if (rc != QV_SUCCESS) goto out; - - iserver->zctx = zmq_ctx_new(); - if (!iserver->zctx) { - ers = "zmq_ctx_new() failed"; - rc = QV_ERR_MSG; - goto out; - } - - rc = qvi_new(&iserver->hwpool); - if (rc != QV_SUCCESS) { - ers = "qvi_hwpool_new() failed"; - goto out; - } -out: - if (ers) { - qvi_log_error("{} with rc={} ({})", ers, rc, qv_strerr(rc)); - qvi_rmi_server_free(&iserver); - } - *server = iserver; - return rc; + return qvi_new(server); } static void @@ -800,25 +819,7 @@ void qvi_rmi_server_free( qvi_rmi_server_t **server ) { - if (!server) return; - qvi_rmi_server_t *iserver = *server; - if (!iserver) goto out; - - send_server_shutdown_msg(iserver); - - zsocket_close(&iserver->zlo); - zctx_destroy(&iserver->zctx); - - unlink(iserver->config.hwtopo_path.c_str()); - qvi_delete(&iserver->hwpool); - - if (!iserver->blocks) { - pthread_join(iserver->worker_thread, nullptr); - } - - delete iserver; -out: - *server = nullptr; + qvi_delete(server); } int @@ -917,51 +918,18 @@ qvi_rmi_server_start( return qvrc; } -// TODO(skg) Use new method. int qvi_rmi_client_new( qvi_rmi_client_t **client ) { - cstr_t ers = nullptr; - - qvi_rmi_client_t *icli = nullptr; - int rc = qvi_new(&icli); - if (rc != QV_SUCCESS) goto out; - // Remember clients own the hwloc data, unlike the server. - rc = qvi_hwloc_new(&icli->config.hwloc); - if (rc != QV_SUCCESS) { - ers = "qvi_hwloc_new() failed"; - goto out; - } - - icli->zctx = zmq_ctx_new(); - if (!icli->zctx) { - ers = "zmq_ctx_new() failed"; - rc = QV_ERR_MSG; - goto out; - } -out: - if (ers) { - qvi_log_error("{} with rc={} ({})", ers, rc, qv_strerr(rc)); - qvi_rmi_client_free(&icli); - } - *client = icli; - return rc; + return qvi_new(client); } void qvi_rmi_client_free( qvi_rmi_client_t **client ) { - if (!client) return; - qvi_rmi_client_t *iclient = *client; - if (!iclient) goto out; - zsocket_close(&iclient->zsock); - zctx_destroy(&iclient->zctx); - qvi_hwloc_free(&iclient->config.hwloc); - delete iclient; -out: - *client = nullptr; + qvi_delete(client); } static int