Skip to content

Commit

Permalink
prov/efa: close shm resource when it is disabled in ep
Browse files Browse the repository at this point in the history
When shm is disabled during ep enable, we should shut
down shm resources as they will never be used. This
will reduce unnecessary cost when poking shm ep/cq/av
in the later operations.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Oct 22, 2023
1 parent 29a73ec commit 5a4357f
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 68 deletions.
8 changes: 4 additions & 4 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ efa_rdm_atomic_inject(struct fid_ep *ep,
return err;
peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (!(efa_rdm_ep_domain(efa_rdm_ep)->shm_info->domain_attr->mr_mode & FI_MR_VIRT_ADDR))
remote_addr = 0;

Expand Down Expand Up @@ -293,7 +293,7 @@ efa_rdm_atomic_writemsg(struct fid_ep *ep,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
efa_rdm_atomic_init_shm_msg(efa_rdm_ep, &shm_msg, msg, rma_iov, shm_desc);
shm_msg.addr = peer->shm_fiaddr;
return fi_atomicmsg(efa_rdm_ep->shm_ep, &shm_msg, flags);
Expand Down Expand Up @@ -386,7 +386,7 @@ efa_rdm_atomic_readwritemsg(struct fid_ep *ep,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local & efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
efa_rdm_atomic_init_shm_msg(efa_rdm_ep, &shm_msg, msg, shm_rma_iov, shm_desc);
shm_msg.addr = peer->shm_fiaddr;
efa_rdm_get_desc_for_shm(result_count, result_desc, shm_res_desc);
Expand Down Expand Up @@ -498,7 +498,7 @@ efa_rdm_atomic_compwritemsg(struct fid_ep *ep,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
efa_rdm_atomic_init_shm_msg(efa_rdm_ep, &shm_msg, msg, shm_rma_iov, shm_desc);
shm_msg.addr = peer->shm_fiaddr;
efa_rdm_get_desc_for_shm(result_count, result_desc, shm_res_desc);
Expand Down
14 changes: 10 additions & 4 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,19 @@ static ssize_t efa_rdm_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t coun

ofi_genlock_lock(srx_ctx->lock);

if (cq->shm_cq)
if (cq->shm_cq) {
fi_cq_read(cq->shm_cq, NULL, 0);

ret = ofi_cq_read_entries(&cq->util_cq, buf, count, src_addr);
/*
* fi_cq_read(cq->shm_cq, NULL, 0) will progress shm ep and write
* completion to efa. Use ofi_cq_read_entries to get the number of
* shm completions without progressing efa ep again.
*/
ret = ofi_cq_read_entries(&cq->util_cq, buf, count, src_addr);

if (ret > 0)
goto out;
if (ret > 0)
goto out;
}

ret = ofi_cq_readfrom(&cq->util_cq.cq_fid, buf, count, src_addr);

Expand Down
1 change: 0 additions & 1 deletion prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ struct efa_rdm_ep {
enum ibv_cq_ex_type ibv_cq_ex_type;

/* shm provider fid */
bool use_shm_for_tx;
struct fid_ep *shm_ep;

/*
Expand Down
121 changes: 88 additions & 33 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -888,41 +888,97 @@ void efa_rdm_ep_set_extra_info(struct efa_rdm_ep *ep)
}

/**
* @brief set the "use_shm_for_tx" field of efa_rdm_ep
* The field is set based on various factors, including
* environment variables, user hints, user's fi_setopt()
* calls.
* This function should be called during call to fi_enable(),
* after user called fi_setopt().
*
* @param[in,out] ep endpoint to set the field
* @brief Close all shm resources bound to the efa ep and domain.
* This function will do this cleanup as best effort. When there is failure
* to clean up shm resource, it will still move forward by setting the resource
* pointer to NULL so it won't be used later.
*
* @param efa_rdm_ep pointer to efa_rdm_ep.
*/
static
void efa_rdm_ep_set_use_shm_for_tx(struct efa_rdm_ep *ep)
static void efa_rdm_ep_close_shm_resources(struct efa_rdm_ep *efa_rdm_ep)
{
if (!efa_rdm_ep_domain(ep)->shm_domain) {
ep->use_shm_for_tx = false;
return;
int ret;
struct efa_domain *efa_domain;
struct efa_av *efa_av;
struct efa_rdm_cq *efa_rdm_cq;

if (efa_rdm_ep->shm_ep) {
ret = fi_close(&efa_rdm_ep->shm_ep->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm ep\n");
efa_rdm_ep->shm_ep = NULL;
}

assert(ep->user_info);
efa_av = efa_rdm_ep->base_ep.av;
if (efa_av->shm_rdm_av) {
ret = fi_close(&efa_av->shm_rdm_av->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm av\n");
efa_av->shm_rdm_av = NULL;
}

/* App provided hints supercede environmental variables.
*
* Using the shm provider comes with some overheads, so avoid
* initializing the provider if the app provides a hint that it does not
* require node-local communication. We can still loopback over the EFA
* device in cases where the app violates the hint and continues
* communicating with node-local peers.
*
* aws-ofi-nccl relies on this feature.
efa_rdm_cq = container_of(efa_rdm_ep->base_ep.util_ep.tx_cq, struct efa_rdm_cq, util_cq);
if (efa_rdm_cq->shm_cq) {
ret = fi_close(&efa_rdm_cq->shm_cq->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm cq\n");
efa_rdm_cq->shm_cq = NULL;
}

efa_rdm_cq = container_of(efa_rdm_ep->base_ep.util_ep.rx_cq, struct efa_rdm_cq, util_cq);
if (efa_rdm_cq->shm_cq) {
ret = fi_close(&efa_rdm_cq->shm_cq->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm cq\n");
efa_rdm_cq->shm_cq = NULL;
}

efa_domain = efa_rdm_ep_domain(efa_rdm_ep);

if (efa_domain->shm_domain) {
ret = fi_close(&efa_domain->shm_domain->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm domain\n");
efa_domain->shm_domain = NULL;
}

if (efa_domain->fabric->shm_fabric) {
ret = fi_close(&efa_domain->fabric->shm_fabric->fid);
if (ret)
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close shm fabric\n");
efa_domain->fabric->shm_fabric = NULL;
}

if (efa_domain->shm_info) {
fi_freeinfo(efa_domain->shm_info);
efa_domain->shm_info = NULL;
}
}

/**
* @brief update the shm resources based on the
* the current ep status. When cuda_api_permitted
* is set as false via fi_setopt, shm should be
* shut down. This function must be called inside
* fi_enable which is called after fi_setopt.
*
* @param[in,out] ep efa_rdm_ep
*/
static
void efa_rdm_ep_update_shm(struct efa_rdm_ep *ep)
{
bool use_shm;

/*
* when efa_env.enable_shm_transfer is false
* , shm resources won't be created.
*/
if ((ep->user_info->caps & FI_REMOTE_COMM)
/* but not local communication */
&& !(ep->user_info->caps & FI_LOCAL_COMM)) {
ep->use_shm_for_tx = false;
if (!efa_rdm_ep_domain(ep)->shm_domain)
return;
}

use_shm = true;

assert(ep->user_info);

/*
* shm provider must make cuda calls to transfer cuda memory.
Expand All @@ -935,12 +991,11 @@ void efa_rdm_ep_set_use_shm_for_tx(struct efa_rdm_ep *ep)
if ((ep->user_info->caps & FI_HMEM)
&& hmem_ops[FI_HMEM_CUDA].initialized
&& !ep->cuda_api_permitted) {
ep->use_shm_for_tx = false;
return;
use_shm = false;
}

ep->use_shm_for_tx = efa_env.enable_shm_transfer;
return;
if (!use_shm)
efa_rdm_ep_close_shm_resources(ep);
}


Expand Down Expand Up @@ -987,7 +1042,7 @@ static int efa_rdm_ep_ctrl(struct fid *fid, int command, void *arg)
EFA_WARN(FI_LOG_EP_CTRL, "libfabric %s efa endpoint created! address: %s\n",
fi_tostr("1", FI_TYPE_VERSION), ep_addr_str);

efa_rdm_ep_set_use_shm_for_tx(ep);
efa_rdm_ep_update_shm(ep);

/* Enable shm provider endpoint & post recv buff.
* Once core ep enabled, 18 bytes efa_addr (16 bytes raw + 2 bytes qpn) is set.
Expand Down
24 changes: 12 additions & 12 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ ssize_t efa_rdm_msg_sendmsg(struct fid_ep *ep, const struct fi_msg *msg,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, msg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
shm_msg = (struct fi_msg *)msg;
if (msg->desc) {
efa_desc = msg->desc;
Expand Down Expand Up @@ -292,7 +292,7 @@ ssize_t efa_rdm_msg_sendv(struct fid_ep *ep, const struct iovec *iov,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(count, desc, shm_desc);
return fi_sendv(efa_rdm_ep->shm_ep, iov, shm_desc, count, peer->shm_fiaddr, context);
Expand Down Expand Up @@ -320,7 +320,7 @@ ssize_t efa_rdm_msg_send(struct fid_ep *ep, const void *buf, size_t len,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);
return fi_send(efa_rdm_ep->shm_ep, buf, len, desc? shm_desc[0] : NULL, peer->shm_fiaddr, context);
Expand Down Expand Up @@ -351,7 +351,7 @@ ssize_t efa_rdm_msg_senddata(struct fid_ep *ep, const void *buf, size_t len,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);
return fi_senddata(efa_rdm_ep->shm_ep, buf, len, desc? shm_desc[0] : NULL, data, peer->shm_fiaddr, context);
Expand Down Expand Up @@ -382,7 +382,7 @@ ssize_t efa_rdm_msg_inject(struct fid_ep *ep, const void *buf, size_t len,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
return fi_inject(efa_rdm_ep->shm_ep, buf, len, peer->shm_fiaddr);
}

Expand Down Expand Up @@ -413,7 +413,7 @@ ssize_t efa_rdm_msg_injectdata(struct fid_ep *ep, const void *buf,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
return fi_injectdata(efa_rdm_ep->shm_ep, buf, len, data, peer->shm_fiaddr);
}

Expand Down Expand Up @@ -451,7 +451,7 @@ ssize_t efa_rdm_msg_tsendmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *

peer = efa_rdm_ep_get_peer(efa_rdm_ep, tmsg->addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
shm_tmsg = (struct fi_msg_tagged *)tmsg;
if (tmsg->desc) {
efa_desc = tmsg->desc;
Expand Down Expand Up @@ -491,7 +491,7 @@ ssize_t efa_rdm_msg_tsendv(struct fid_ep *ep_fid, const struct iovec *iov,
return ret;

assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(count, desc, shm_desc);
return fi_tsendv(efa_rdm_ep->shm_ep, iov, desc? shm_desc : NULL, count, peer->shm_fiaddr, tag, context);
Expand Down Expand Up @@ -526,7 +526,7 @@ ssize_t efa_rdm_msg_tsend(struct fid_ep *ep_fid, const void *buf, size_t len,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);
return fi_tsend(efa_rdm_ep->shm_ep, buf, len, desc? shm_desc[0] : NULL, peer->shm_fiaddr, tag, context);
Expand Down Expand Up @@ -558,7 +558,7 @@ ssize_t efa_rdm_msg_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);
return fi_tsenddata(efa_rdm_ep->shm_ep, buf, len, desc? shm_desc[0] : NULL, data, peer->shm_fiaddr, tag, context);
Expand Down Expand Up @@ -589,7 +589,7 @@ ssize_t efa_rdm_msg_tinject(struct fid_ep *ep_fid, const void *buf, size_t len,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
return fi_tinject(efa_rdm_ep->shm_ep, buf, len, peer->shm_fiaddr, tag);
}

Expand Down Expand Up @@ -619,7 +619,7 @@ ssize_t efa_rdm_msg_tinjectdata(struct fid_ep *ep_fid, const void *buf, size_t l

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
return fi_tinjectdata(efa_rdm_ep->shm_ep, buf, len, data, peer->shm_fiaddr, tag);
}

Expand Down
14 changes: 7 additions & 7 deletions prov/efa/src/rdm/efa_rdm_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ ssize_t efa_rdm_rma_readmsg(struct fid_ep *ep, const struct fi_msg_rma *msg, uin
goto out;
}

if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
tmp_addr = msg->addr;
tmp_desc = msg->desc;
msg_clone = (struct fi_msg_rma *)msg;
Expand Down Expand Up @@ -246,7 +246,7 @@ ssize_t efa_rdm_rma_readv(struct fid_ep *ep, const struct iovec *iov, void **des

peer = efa_rdm_ep_get_peer(efa_rdm_ep, src_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(iov_count, desc, shm_desc);

Expand Down Expand Up @@ -290,7 +290,7 @@ ssize_t efa_rdm_rma_read(struct fid_ep *ep, void *buf, size_t len, void *desc,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, src_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);

Expand Down Expand Up @@ -468,7 +468,7 @@ ssize_t efa_rdm_rma_writemsg(struct fid_ep *ep,
goto out;
}

if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
tmp_addr = msg->addr;
tmp_desc = msg->desc;
msg_clone = (struct fi_msg_rma *)msg;
Expand Down Expand Up @@ -526,7 +526,7 @@ ssize_t efa_rdm_rma_writev(struct fid_ep *ep, const struct iovec *iov, void **de

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(iov_count, desc, shm_desc);
return fi_writev(efa_rdm_ep->shm_ep, iov, desc? shm_desc : NULL, iov_count, peer->shm_fiaddr, addr, key, context);
Expand Down Expand Up @@ -569,7 +569,7 @@ ssize_t efa_rdm_rma_write(struct fid_ep *ep, const void *buf, size_t len, void *

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);
return fi_write(efa_rdm_ep->shm_ep, buf, len, desc? shm_desc[0] : NULL, peer->shm_fiaddr, addr, key, context);
Expand Down Expand Up @@ -599,7 +599,7 @@ ssize_t efa_rdm_rma_writedata(struct fid_ep *ep, const void *buf, size_t len,

peer = efa_rdm_ep_get_peer(efa_rdm_ep, dest_addr);
assert(peer);
if (peer->is_local && efa_rdm_ep->use_shm_for_tx) {
if (peer->is_local && efa_rdm_ep->shm_ep) {
if (desc)
efa_rdm_get_desc_for_shm(1, &desc, shm_desc);
return fi_writedata(efa_rdm_ep->shm_ep, buf, len, desc? shm_desc[0] : NULL, data, peer->shm_fiaddr, addr, key, context);
Expand Down
Loading

0 comments on commit 5a4357f

Please sign in to comment.