Skip to content

Commit

Permalink
prov/efa: Move cuda-sync-memops from MR to EP
Browse files Browse the repository at this point in the history
Remove CUDA sync memops from memory registration path, and make it the
user's responsiblity to set the flag if required. Also remove env
variable FI_EFA_SET_CUDA_SYNC_MEMOPS from EFA. Adds
efa_rdm_attempt_to_sync_memops() EFA RDM endpoint for msg, rma, and
atomic protocols.

Signed-off-by: Seth Zegelstein <szegel@amazon.com>
  • Loading branch information
a-szegel committed Oct 20, 2023
1 parent c68c5b3 commit bc6c52c
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 17 deletions.
3 changes: 0 additions & 3 deletions man/fi_efa.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,6 @@ These OFI runtime parameters apply only to the RDM endpoint.
*FI_EFA_RUNT_SIZE*
: The maximum number of bytes that will be eagerly sent by inflight messages uses runting read message protocol (Default 307200).

*FI_EFA_SET_CUDA_SYNC_MEMOPS*
: Set CU_POINTER_ATTRIBUTE_SYNC_MEMOPS for cuda ptr. (Default: 1)

*FI_EFA_INTER_MIN_READ_MESSAGE_SIZE*
: The minimum message size in bytes for inter EFA read message protocol. If instance support RDMA read, messages whose size is larger than this value will be sent by read message protocol. (Default 1048576).

Expand Down
6 changes: 1 addition & 5 deletions prov/efa/src/efa_env.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ struct efa_env efa_env = {
.tx_queue_size = 0,
.enable_shm_transfer = 1,
.use_zcpy_rx = 1,
.set_cuda_sync_memops = 1,
.zcpy_rx_seed = 0,
.shm_av_size = 256,
.shm_max_medium_size = 4096,
Expand Down Expand Up @@ -106,7 +105,7 @@ void efa_env_param_get(void)
*/
size_t max_rnr_backoff_wait_time_cap = INT_MAX/2 - 1;

char *deprecated_env_vars[] = {"FI_EFA_SHM_MAX_MEDIUM_SIZE", "FI_EFA_MTU_SIZE", "FI_EFA_TX_IOV_LIMIT", "FI_EFA_RX_IOV_LIMIT"};
char *deprecated_env_vars[] = {"FI_EFA_SHM_MAX_MEDIUM_SIZE", "FI_EFA_MTU_SIZE", "FI_EFA_TX_IOV_LIMIT", "FI_EFA_RX_IOV_LIMIT", "FI_EFA_SET_CUDA_SYNC_MEMOPS"};
for (int i = 0; i < sizeof(deprecated_env_vars) / sizeof(deprecated_env_vars[0]); i++) {
if (getenv(deprecated_env_vars[i])) {
fprintf(stderr,
Expand All @@ -129,7 +128,6 @@ void efa_env_param_get(void)
fi_param_get_int(&efa_prov, "tx_queue_size", &efa_env.tx_queue_size);
fi_param_get_int(&efa_prov, "enable_shm_transfer", &efa_env.enable_shm_transfer);
fi_param_get_int(&efa_prov, "use_zcpy_rx", &efa_env.use_zcpy_rx);
fi_param_get_int(&efa_prov, "set_cuda_sync_memops", &efa_env.set_cuda_sync_memops);
fi_param_get_int(&efa_prov, "zcpy_rx_seed", &efa_env.zcpy_rx_seed);
fi_param_get_int(&efa_prov, "shm_av_size", &efa_env.shm_av_size);
fi_param_get_int(&efa_prov, "recvwin_size", &efa_env.recvwin_size);
Expand Down Expand Up @@ -185,8 +183,6 @@ void efa_env_define()
"Enable using SHM provider to perform TX/RX operations between processes on the same system. (Default: 1)");
fi_param_define(&efa_prov, "use_zcpy_rx", FI_PARAM_INT,
"Enables the use of application's receive buffers in place of bounce-buffers when feasible. (Default: 1)");
fi_param_define(&efa_prov, "set_cuda_sync_memops", FI_PARAM_INT,
"Set CU_POINTER_ATTRIBUTE_SYNC_MEMOPS for cuda ptr. (Default: 1)");
fi_param_define(&efa_prov, "zcpy_rx_seed", FI_PARAM_INT,
"Defines the number of bounce-buffers the provider will prepost during EP initialization. (Default: 0)");
fi_param_define(&efa_prov, "shm_av_size", FI_PARAM_INT,
Expand Down
1 change: 0 additions & 1 deletion prov/efa/src/efa_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct efa_env {
int tx_min_credits;
int tx_queue_size;
int use_zcpy_rx;
int set_cuda_sync_memops;
int zcpy_rx_seed;
int enable_shm_transfer;
int shm_av_size;
Expand Down
10 changes: 2 additions & 8 deletions prov/efa/src/efa_mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,9 @@ static int efa_mr_hmem_setup(struct efa_mr *efa_mr,
efa_mr->peer.flags &= ~OFI_HMEM_DATA_GDRCOPY_HANDLE;
efa_mr->peer.hmem_data = NULL;
if (efa_mr->peer.iface == FI_HMEM_CUDA) {
efa_mr->needs_sync = true;
efa_mr->peer.device.cuda = attr->device.cuda;

if (efa_env.set_cuda_sync_memops) {
err = cuda_set_sync_memops(attr->mr_iov->iov_base);
if (err) {
EFA_WARN(FI_LOG_MR, "unable to set memops for cuda ptr\n");
return err;
}
}

if (cuda_is_gdrcopy_enabled()) {
err = cuda_gdrcopy_dev_register((struct fi_mr_attr *)attr, (uint64_t *)&efa_mr->peer.hmem_data);
efa_mr->peer.flags |= OFI_HMEM_DATA_GDRCOPY_HANDLE;
Expand Down Expand Up @@ -808,6 +801,7 @@ static int efa_mr_reg_impl(struct efa_mr *efa_mr, uint64_t flags, const void *at
efa_mr->inserted_to_mr_map = false;
efa_mr->mr_fid.mem_desc = NULL;
efa_mr->mr_fid.key = FI_KEY_NOTAVAIL;
efa_mr->needs_sync = false;

ofi_mr_update_attr(
efa_mr->domain->util_domain.fabric->fabric_fid.api_version,
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/efa_mr.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct efa_mr {
struct fid_mr *shm_mr;
struct efa_mr_peer peer;
bool inserted_to_mr_map;
bool needs_sync;
};

extern int efa_mr_cache_enable;
Expand Down
27 changes: 27 additions & 0 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,11 @@ efa_rdm_atomic_writemsg(struct fid_ep *ep,
err = efa_rdm_ep_cap_check_atomic(efa_rdm_ep);
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -370,6 +375,15 @@ efa_rdm_atomic_readwritemsg(struct fid_ep *ep,
err = efa_rdm_ep_cap_check_atomic(efa_rdm_ep);
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_ioc(efa_rdm_ep, (struct fi_ioc *)resultv, result_desc, result_count);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local & efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -469,6 +483,19 @@ efa_rdm_atomic_compwritemsg(struct fid_ep *ep,
err = efa_rdm_ep_cap_check_atomic(efa_rdm_ep);
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_ioc(efa_rdm_ep, (struct fi_ioc *)comparev, compare_desc, compare_count);
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_ioc(efa_rdm_ep, (struct fi_ioc *)resultv, result_desc, result_count);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down
49 changes: 49 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,53 @@ ssize_t efa_rdm_ep_post_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer *pe
void efa_rdm_ep_post_handshake_or_queue(struct efa_rdm_ep *ep,
struct efa_rdm_peer *peer);

static inline int efa_rdm_attempt_to_sync_memops(struct efa_rdm_ep *ep, void *buf, void *desc)
{
int err = 0;
struct efa_mr *efa_mr = (struct efa_mr *) desc;

if (OFI_UNLIKELY(ep->cuda_api_permitted && efa_mr && efa_mr->needs_sync)) {
err = cuda_set_sync_memops(buf);
if (err) {
EFA_WARN(FI_LOG_MR, "Unable to set memops for cuda ptr %p\n", buf);
return err;
}
efa_mr->needs_sync = false;
}

return err;
}

static inline int efa_rdm_attempt_to_sync_memops_iov(struct efa_rdm_ep *ep, struct iovec *iov, void **desc, int num_desc)
{
int err = 0, i;

if (!desc)
return err;

for (i = 0; i < num_desc; i++) {
err = efa_rdm_attempt_to_sync_memops(ep, iov[i].iov_base, (struct efa_mr *) desc[i]);
if (err)
return err;
}

return err;
}

static inline int efa_rdm_attempt_to_sync_memops_ioc(struct efa_rdm_ep *ep, struct fi_ioc *ioc, void **desc, int num_desc)
{
int err = 0, i;

if (!desc)
return err;

for (i = 0; i < num_desc; i++) {
err = efa_rdm_attempt_to_sync_memops(ep, ioc[i].addr, (struct efa_mr *) desc[i]);
if (err)
return err;
}

return err;
}

#endif
50 changes: 50 additions & 0 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ ssize_t efa_rdm_msg_sendmsg(struct fid_ep *ep, const struct fi_msg *msg,
int ret;

efa_rdm_ep = container_of(ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -277,8 +282,14 @@ ssize_t efa_rdm_msg_sendv(struct fid_ep *ep, const struct iovec *iov,
struct fi_msg msg = {0};
struct efa_rdm_peer *peer;
void *shm_desc[EFA_RDM_IOV_LIMIT] = {NULL};
int ret;

efa_rdm_ep = container_of(ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)iov, desc, count);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand All @@ -299,8 +310,14 @@ ssize_t efa_rdm_msg_send(struct fid_ep *ep, const void *buf, size_t len,
struct efa_rdm_peer *peer;
struct efa_rdm_ep *efa_rdm_ep;
void *shm_desc[EFA_RDM_IOV_LIMIT] = {NULL};
int ret;

efa_rdm_ep = container_of(ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops(efa_rdm_ep, (void *)buf, desc);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand All @@ -324,8 +341,14 @@ ssize_t efa_rdm_msg_senddata(struct fid_ep *ep, const void *buf, size_t len,
struct efa_rdm_ep *efa_rdm_ep;
struct efa_rdm_peer *peer;
void *shm_desc[EFA_RDM_IOV_LIMIT] = {NULL};
int ret;

efa_rdm_ep = container_of(ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops(efa_rdm_ep, (void *)buf, desc);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -421,6 +444,11 @@ ssize_t efa_rdm_msg_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *
int ret;

efa_rdm_ep = container_of(ep_fid, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)tmsg->msg_iov, tmsg->desc, tmsg->iov_count);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, tmsg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -453,9 +481,15 @@ ssize_t efa_rdm_msg_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,
struct fi_msg_tagged msg = {0};
struct efa_rdm_peer *peer;
void *shm_desc[EFA_RDM_IOV_LIMIT] = {NULL};
int ret;

efa_rdm_ep = container_of(ep_fid, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);
peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);

ret = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)iov, desc, count);
if (ret)
return ret;

assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (desc)
Expand All @@ -482,8 +516,14 @@ ssize_t efa_rdm_msg_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,
struct efa_rdm_peer *peer;
struct efa_rdm_ep *efa_rdm_ep;
void *shm_desc[EFA_RDM_IOV_LIMIT] = {NULL};
int ret;

efa_rdm_ep = container_of(ep_fid, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops(efa_rdm_ep, (void *)buf, desc);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand All @@ -508,8 +548,14 @@ ssize_t efa_rdm_msg_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len
struct efa_rdm_ep *efa_rdm_ep;
struct efa_rdm_peer *peer;
void *shm_desc[EFA_RDM_IOV_LIMIT] = {NULL};
int ret;

efa_rdm_ep = container_of(ep_fid, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);

ret = efa_rdm_attempt_to_sync_memops(efa_rdm_ep, (void *)buf, desc);
if (ret)
return ret;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -905,6 +951,10 @@ ssize_t efa_rdm_msg_generic_recv(struct fid_ep *ep, const struct fi_msg *msg,

efa_rdm_tracepoint(recv_begin_msg_context, (size_t) msg->context, (size_t) msg->addr);

ret = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (ret)
return ret;

if (efa_rdm_ep->use_zcpy_rx) {
ofi_genlock_lock(srx_ctx->lock);
rxe = efa_rdm_msg_alloc_rxe(efa_rdm_ep, msg, op, flags, tag, ignore);
Expand Down
24 changes: 24 additions & 0 deletions prov/efa/src/rdm/efa_rdm_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ ssize_t efa_rdm_rma_readmsg(struct fid_ep *ep, const struct fi_msg_rma *msg, uin
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (err)
return err;

assert(msg->iov_count <= efa_rdm_ep->tx_iov_limit);

efa_perfset_start(efa_rdm_ep, perf_efa_tx);
Expand Down Expand Up @@ -236,6 +240,10 @@ ssize_t efa_rdm_rma_readv(struct fid_ep *ep, const struct iovec *iov, void **des
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)iov, desc, iov_count);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, src_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -276,6 +284,10 @@ ssize_t efa_rdm_rma_read(struct fid_ep *ep, void *buf, size_t len, void *desc,
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops(efa_rdm_ep, (void *)buf, desc);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, src_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -439,6 +451,10 @@ ssize_t efa_rdm_rma_writemsg(struct fid_ep *ep,
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)msg->msg_iov, msg->desc, msg->iov_count);
if (err)
return err;

assert(msg->iov_count <= efa_rdm_ep->tx_iov_limit);

efa_perfset_start(efa_rdm_ep, perf_efa_tx);
Expand Down Expand Up @@ -504,6 +520,10 @@ ssize_t efa_rdm_rma_writev(struct fid_ep *ep, const struct iovec *iov, void **de
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops_iov(efa_rdm_ep, (struct iovec *)iov, desc, iov_count);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down Expand Up @@ -543,6 +563,10 @@ ssize_t efa_rdm_rma_write(struct fid_ep *ep, const void *buf, size_t len, void *
if (err)
return err;

err = efa_rdm_attempt_to_sync_memops(efa_rdm_ep, (void *)buf, desc);
if (err)
return err;

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
Expand Down

0 comments on commit bc6c52c

Please sign in to comment.