Skip to content

Commit

Permalink
Cleanup some more code.
Browse files Browse the repository at this point in the history
Signed-off-by: Samuel K. Gutierrez <samuel@lanl.gov>
  • Loading branch information
samuelkgutierrez committed Jul 24, 2024
1 parent 3f7ebb2 commit e160782
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 152 deletions.
105 changes: 30 additions & 75 deletions src/qvi-rmi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ typedef enum qvi_rpc_funid_e {
FID_TASK_SET_CPUBIND_FROM_CPUSET,
FID_OBJ_TYPE_DEPTH,
FID_GET_NOBJS_IN_CPUSET,
FID_GET_OBJ_TYPE_IN_CPUSET,
FID_GET_DEVICE_IN_CPUSET,
FID_SCOPE_GET_INTRINSIC_HWPOOL
} qvi_rpc_funid_t;
Expand Down Expand Up @@ -581,31 +580,6 @@ rpc_ssi_get_nobjs_in_cpuset(
return qvrc;
}

static int
rpc_ssi_get_obj_type_in_cpuset(
qvi_rmi_server_t *server,
qvi_msg_header_t *hdr,
void *input,
qvi_bbuff_t **output
) {
int npieces = 0;
hwloc_cpuset_t cpuset = nullptr;
int qvrc = qvi_bbuff_rmi_unpack(
input, &npieces, &cpuset
);
if (qvrc != QV_SUCCESS) return qvrc;

qv_hw_obj_type_t target_obj;
const int rpcrc = qvi_hwloc_get_obj_type_in_cpuset(
server->config.hwloc, npieces, cpuset, &target_obj
);

qvrc = rpc_pack(output, hdr->fid, rpcrc, target_obj);

hwloc_bitmap_free(cpuset);
return qvrc;
}

static int
rpc_ssi_get_device_in_cpuset(
qvi_rmi_server_t *server,
Expand Down Expand Up @@ -713,21 +687,19 @@ rpc_ssi_scope_get_intrinsic_hwpool(
}

/**
* Maps a given qvi_rpc_funid_t to a given function pointer. Must be kept in
* sync with qvi_rpc_funid_t.
* Maps a given qvi_rpc_funid_t to a given function pointer.
*/
static const qvi_rpc_fun_ptr_t rpc_dispatch_table[] = {
rpc_ssi_invalid,
rpc_ssi_shutdown,
rpc_ssi_hello,
rpc_ssi_gbye,
rpc_ssi_task_get_cpubind,
rpc_ssi_task_set_cpubind_from_cpuset,
rpc_ssi_obj_type_depth,
rpc_ssi_get_nobjs_in_cpuset,
rpc_ssi_get_obj_type_in_cpuset,
rpc_ssi_get_device_in_cpuset,
rpc_ssi_scope_get_intrinsic_hwpool
static const std::map<qvi_rpc_funid_t, qvi_rpc_fun_ptr_t> rpc_dispatch_table = {
{FID_INVALID, rpc_ssi_invalid},
{FID_SERVER_SHUTDOWN, rpc_ssi_shutdown},
{FID_HELLO, rpc_ssi_hello},
{FID_GBYE, rpc_ssi_gbye},
{FID_TASK_GET_CPUBIND, rpc_ssi_task_get_cpubind},
{FID_TASK_SET_CPUBIND_FROM_CPUSET, rpc_ssi_task_set_cpubind_from_cpuset},
{FID_OBJ_TYPE_DEPTH, rpc_ssi_obj_type_depth},
{FID_GET_NOBJS_IN_CPUSET, rpc_ssi_get_nobjs_in_cpuset},
{FID_GET_DEVICE_IN_CPUSET, rpc_ssi_get_device_in_cpuset},
{FID_SCOPE_GET_INTRINSIC_HWPOOL, rpc_ssi_scope_get_intrinsic_hwpool}
};

static int
Expand All @@ -744,8 +716,15 @@ server_rpc_dispatch(
const size_t trim = unpack_msg_header(data, &hdr);
void *body = data_trim(data, trim);

auto fidfunp = rpc_dispatch_table.find(hdr.fid);
if (qvi_unlikely(fidfunp == rpc_dispatch_table.end())) {
qvi_log_error("Unknown function ID ({}) in RPC. Aborting.", hdr.fid);
rc = QV_ERR_RPC;
goto out;
}

qvi_bbuff_t *res;
rc = rpc_dispatch_table[hdr.fid](server, &hdr, body, &res);
rc = fidfunp->second(server, &hdr, body, &res);
if (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN) {
cstr_t ers = "RPC dispatch failed";
qvi_log_error("{} with rc={} ({})", ers, rc, qv_strerr(rc));
Expand All @@ -767,24 +746,24 @@ static void *
server_go(
void *data
) {
qvi_rmi_server_t *server = (qvi_rmi_server_t *)data;
qvi_rmi_server_t *const server = (qvi_rmi_server_t *)data;

void *zworksock = zsocket_create_and_connect(
server->zctx, ZMQ_REP, ZINPROC_ADDR
);
if (!zworksock) return nullptr;
if (qvi_unlikely(!zworksock)) return nullptr;

int rc, bsent, bsentt = 0;
volatile bool active = true;
volatile std::atomic<bool> active{true};
do {
zmq_msg_t mrx, mtx;
rc = zmsg_recv(zworksock, &mrx);
if (rc != QV_SUCCESS) break;
if (qvi_unlikely(rc != QV_SUCCESS)) break;
rc = server_rpc_dispatch(server, &mrx, &mtx);
if (rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN) break;
if (rc == QV_SUCCESS_SHUTDOWN) active = false;
if (qvi_unlikely(rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) break;
if (qvi_unlikely(rc == QV_SUCCESS_SHUTDOWN)) active = false;
rc = zmsg_send(zworksock, &mtx, &bsent);
if (rc != QV_SUCCESS) break;
if (qvi_unlikely(rc != QV_SUCCESS)) break;
bsentt += bsent;
} while(active);
#if QVI_DEBUG_MODE == 1
Expand Down Expand Up @@ -956,16 +935,16 @@ qvi_rmi_client_connect(
client->zsock = zsocket_create_and_connect(
client->zctx, ZMQ_REQ, url.c_str()
);
if (!client->zsock) return QV_ERR_MSG;
if (qvi_unlikely(!client->zsock)) return QV_ERR_MSG;

int rc = hello_handshake(client);
if (rc != QV_SUCCESS) return rc;
if (qvi_unlikely(rc != QV_SUCCESS)) return rc;

rc = qvi_hwloc_topology_init(
client->config.hwloc,
client->config.hwtopo_path.c_str()
);
if (rc != QV_SUCCESS) return rc;
if (qvi_unlikely(rc != QV_SUCCESS)) return rc;

return qvi_hwloc_topology_load(client->config.hwloc);
}
Expand Down Expand Up @@ -1097,30 +1076,6 @@ qvi_rmi_get_nobjs_in_cpuset(
return rpcrc;
}

int
qvi_rmi_get_obj_type_in_cpuset(
qvi_rmi_client_t *client,
int npieces,
hwloc_const_cpuset_t cpuset,
qv_hw_obj_type_t *target_obj
) {
int qvrc = rpc_req(
client->zsock,
FID_GET_OBJ_TYPE_IN_CPUSET,
npieces,
cpuset
);
if (qvrc != QV_SUCCESS) return qvrc;

// Should be set by rpc_rep, so assume an error.
int rpcrc = QV_ERR_MSG;
qvrc = rpc_rep(client->zsock, &rpcrc, target_obj);
if (qvrc != QV_SUCCESS) return qvrc;

return rpcrc;
}


int
qvi_rmi_get_device_in_cpuset(
qvi_rmi_client_t *client,
Expand Down
11 changes: 0 additions & 11 deletions src/qvi-rmi.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,6 @@ qvi_rmi_get_nobjs_in_cpuset(
int *nobjs
);

/**
*
*/
int
qvi_rmi_get_obj_type_in_cpuset(
qvi_rmi_client_t *client,
int npieces,
hwloc_const_cpuset_t cpuset,
qv_hw_obj_type_t *target_obj
);

/**
*
*/
Expand Down
48 changes: 13 additions & 35 deletions src/qvi-scope.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ struct qvi_scope_split_agg_s {
for (auto &hwpool : hwpools) {
qvi_delete(&hwpool);
}
hwpools.clear();
}
/**
* Minimally initializes instance.
Expand Down Expand Up @@ -205,18 +204,6 @@ get_nobjs_in_hwpool(
return QV_SUCCESS;
}

static int
get_obj_type_in_hwpool(
qvi_rmi_client_t *rmi,
qvi_hwpool_s *hwpool,
int npieces,
qv_hw_obj_type_t *obj
) {
return qvi_rmi_get_obj_type_in_cpuset(
rmi, npieces, hwpool->get_cpuset().cdata(), obj
);
}

template <typename TYPE>
static int
gather_values(
Expand Down Expand Up @@ -1127,10 +1114,10 @@ qvi_scope_thsplit(
int *kcolors,
uint_t k,
qv_hw_obj_type_t maybe_obj_type,
qv_scope_t ***kchildren
qv_scope_t ***thchildren
) {
if (k == 0 || !kchildren) return QV_ERR_INVLD_ARG;
*kchildren = nullptr;
if (k == 0 || !thchildren) return QV_ERR_INVLD_ARG;
*thchildren = nullptr;

const uint_t group_size = k;
qvi_scope_split_agg_s splitagg{};
Expand Down Expand Up @@ -1174,15 +1161,15 @@ qvi_scope_thsplit(
if (rc != QV_SUCCESS) return rc;

// Now populate the children.
qv_scope_t **ikchildren = new qv_scope_t*[group_size];
qv_scope_t **ithchildren = new qv_scope_t *[group_size];

qvi_group_t *thgroup = nullptr;
// Split off from our parent group. This call is called from a context in
// which a process is splitting its resources across threads, so create a
// new thread group for each child.
rc = parent->group->thsplit(group_size, &thgroup);
if (rc != QV_SUCCESS) {
qvi_scope_thfree(&ikchildren, group_size);
qvi_scope_thfree(&ithchildren, group_size);
return rc;
}

Expand Down Expand Up @@ -1210,15 +1197,17 @@ qvi_scope_thsplit(
break;
}
thgroup->retain();
ikchildren[i] = child;
ithchildren[i] = child;
}
if (rc != QV_SUCCESS) {
qvi_scope_thfree(&ikchildren, k);
qvi_scope_thfree(&ithchildren, k);
}
// Subtract one to account for the parent's implicit retain during
// construct.
thgroup->release();
*kchildren = ikchildren;
else {
// Subtract one to account for the parent's
// implicit retain during construct.
thgroup->release();
}
*thchildren = ithchildren;
return rc;
}

Expand Down Expand Up @@ -1309,17 +1298,6 @@ qvi_scope_nobjs(
);
}

int
qvi_scope_obj_type(
qv_scope_t *scope,
int npieces,
qv_hw_obj_type_t *obj
) {
return get_obj_type_in_hwpool(
scope->group->task()->rmi(), scope->hwpool, npieces, obj
);
}

int
qvi_scope_get_device_id(
qv_scope_t *scope,
Expand Down
7 changes: 0 additions & 7 deletions src/qvi-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,6 @@ qvi_scope_nobjs(
int *n
);

int
qvi_scope_obj_type(
qv_scope_t *scope,
int npieces,
qv_hw_obj_type_t *obj
);

int
qvi_scope_get_device_id(
qv_scope_t *scope,
Expand Down
Loading

0 comments on commit e160782

Please sign in to comment.