Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/efa: Queue txes when handshake is enforced but not made #10115

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ efa_domain_ops_open(struct fid *fid, const char *ops_name, uint64_t flags,
return ret;
}


void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
{
struct efa_rdm_peer *peer;
Expand Down Expand Up @@ -494,6 +493,21 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
if (peer && (peer->flags & EFA_RDM_PEER_IN_BACKOFF))
continue;

if (ope->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE) {
ret = efa_rdm_ope_repost_ope_queued_before_handshake(ope);
if (ret == -FI_EAGAIN)
continue;

if (OFI_UNLIKELY(ret)) {
assert(ope->type == EFA_RDM_TXE);
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
continue;
}

dlist_remove(&ope->queued_entry);
ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE;
}

if (ope->internal_flags & EFA_RDM_OPE_QUEUED_RNR) {
assert(!dlist_empty(&ope->queued_pkts));
ret = efa_rdm_ep_post_queued_pkts(ope->ep, &ope->queued_pkts);
Expand Down Expand Up @@ -552,7 +566,7 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
else
efa_rdm_rxe_handle_error(ope, -ret, FI_EFA_ERR_READ_POST);
return;
continue;
}

ope->internal_flags &= ~EFA_RDM_OPE_QUEUED_READ;
Expand Down Expand Up @@ -597,7 +611,7 @@ void efa_domain_progress_rdm_peers_and_queues(struct efa_domain *domain)
ret = efa_rdm_ope_post_send(ope, EFA_RDM_CTSDATA_PKT);
if (OFI_UNLIKELY(ret)) {
if (ret == -FI_EAGAIN)
break;
continue;

efa_rdm_txe_handle_error(ope, -ret, FI_EFA_ERR_PKT_POST);
continue;
Expand Down
9 changes: 3 additions & 6 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ efa_rdm_atomic_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *txe)
{
bool delivery_complete_requested;
int ret;
static int req_pkt_type_list[] = {
[ofi_op_atomic] = EFA_RDM_WRITE_RTA_PKT,
[ofi_op_atomic_fetch] = EFA_RDM_FETCH_RTA_PKT,
Expand All @@ -119,12 +118,10 @@ ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm
* the information whether the peer
* support it or not.
*/
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
ret = efa_rdm_ep_trigger_handshake(efa_rdm_ep, txe->peer);
return ret ? ret : -FI_EAGAIN;
}
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(efa_rdm_ep, txe);

if (!efa_rdm_peer_support_delivery_complete(txe->peer))
if (!(txe->peer->is_self) && !efa_rdm_peer_support_delivery_complete(txe->peer))
return -FI_EOPNOTSUPP;
}

Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ int efa_rdm_atomic_query(struct fid_domain *domain,
enum fi_datatype datatype, enum fi_op op,
struct fi_atomic_attr *attr, uint64_t flags);

ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *txe);

extern struct fi_ops_atomic efa_rdm_atomic_ops;

#endif
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ ssize_t efa_rdm_ep_trigger_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer

ssize_t efa_rdm_ep_post_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer *peer);

int efa_rdm_ep_enforce_handshake_for_txe(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);

void efa_rdm_ep_post_handshake_or_queue(struct efa_rdm_ep *ep,
struct efa_rdm_peer *peer);

Expand Down
12 changes: 3 additions & 9 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,22 +804,16 @@ bool efa_rdm_ep_has_unfinished_send(struct efa_rdm_ep *efa_rdm_ep)
{
struct dlist_entry *entry, *tmp;
struct efa_rdm_ope *ope;
/* Only flush the opes queued due to rnr and ctrl */
uint64_t queued_ope_flags = EFA_RDM_OPE_QUEUED_CTRL | EFA_RDM_OPE_QUEUED_RNR;

if (efa_rdm_ep->efa_outstanding_tx_ops > 0)
return true;

dlist_foreach_safe(&efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list, entry, tmp) {
ope = container_of(entry, struct efa_rdm_ope,
queued_entry);
if (ope->ep == efa_rdm_ep) {
return true;
}
}

dlist_foreach_safe(&efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list, entry, tmp) {
ope = container_of(entry, struct efa_rdm_ope,
queued_entry);
if (ope->ep == efa_rdm_ep) {
if (ope->ep == efa_rdm_ep && (ope->internal_flags & queued_ope_flags)) {
return true;
}
}
Expand Down
32 changes: 32 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -934,3 +934,35 @@ size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface
return memory_alignment;
}

/**
* @brief Enforce a handshake to made for given txe.
* It will trigger a handshake with peer and choose to
* return EAGAIN or queue the txe.
* @param ep efa_rdm_ep
* @param txe tx entry
* @return int 0 on success, negative integer on failure.
*/
int efa_rdm_ep_enforce_handshake_for_txe(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
{
int ret;

assert(txe->type == EFA_RDM_TXE);
assert(!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED));

ret = efa_rdm_ep_trigger_handshake(ep, txe->peer);
if (ret)
return ret;
/**
* we cannot queue requests (and return 0) for inject,
* which expects the buffer can be reused when the call
* returns success.
*/
if (txe->fi_flags & FI_INJECT)
return -FI_EAGAIN;

if (!(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)) {
txe->internal_flags |= EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE;
dlist_insert_tail(&txe->queued_entry, &efa_rdm_ep_domain(ep)->ope_queued_list);
}
return FI_SUCCESS;
}
9 changes: 3 additions & 6 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
*/
if ((ep->extra_info[0] & EFA_RDM_EXTRA_FEATURE_REQUEST_USER_RECV_QP) &&
!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return err ? err : -FI_EAGAIN;
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);
}

err = efa_rdm_ep_use_p2p(ep, txe->desc[0]);
Expand All @@ -148,10 +147,8 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
*
* Check handshake packet from peer to verify support status.
*/
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return err ? err : -FI_EAGAIN;
}
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);

if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, txe->peer))
return -FI_EOPNOTSUPP;
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct efa_rdm_ope *efa_rdm_msg_split_rxe(struct efa_rdm_ep *ep,
struct efa_rdm_ope *posted_entry,
struct efa_rdm_ope *consumer_entry,
struct efa_rdm_pke *pkt_entry);
ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);
/*
* The following 2 OP structures are defined in efa_rdm_msg.c and is
* used by #efa_rdm_ep_open()
Expand Down
69 changes: 44 additions & 25 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "efa_cntr.h"
#include "efa_rdm_msg.h"
#include "efa_rdm_rma.h"
#include "efa_rdm_atomic.h"
#include "efa_rdm_pke_cmd.h"
#include "efa_rdm_pke_nonreq.h"
#include "efa_rdm_tracepoint.h"
Expand Down Expand Up @@ -129,10 +130,7 @@ void efa_rdm_txe_release(struct efa_rdm_ope *txe)
efa_rdm_pke_release_tx(pkt_entry);
}

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)
dlist_remove(&txe->queued_entry);

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
if (txe->internal_flags & EFA_RDM_OPE_QUEUED_FLAGS)
dlist_remove(&txe->queued_entry);

#ifdef ENABLE_EFA_POISONING
Expand Down Expand Up @@ -171,16 +169,12 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe)
}
}

if (!dlist_empty(&rxe->queued_pkts)) {
dlist_foreach_container_safe(&rxe->queued_pkts,
struct efa_rdm_pke,
pkt_entry, entry, tmp) {
efa_rdm_pke_release_tx(pkt_entry);
}
dlist_remove(&rxe->queued_entry);
}
dlist_foreach_container_safe(&rxe->queued_pkts,
struct efa_rdm_pke,
pkt_entry, entry, tmp)
efa_rdm_pke_release_tx(pkt_entry);

if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_FLAGS)
dlist_remove(&rxe->queued_entry);

#ifdef ENABLE_EFA_POISONING
Expand Down Expand Up @@ -583,15 +577,12 @@ void efa_rdm_rxe_handle_error(struct efa_rdm_ope *rxe, int err, int prov_errno)
assert(0 && "rxe unknown state");
}

if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_RNR) {
dlist_foreach_container_safe(&rxe->queued_pkts,
struct efa_rdm_pke,
pkt_entry, entry, tmp)
efa_rdm_pke_release_tx(pkt_entry);
dlist_remove(&rxe->queued_entry);
}
dlist_foreach_container_safe(&rxe->queued_pkts,
struct efa_rdm_pke,
pkt_entry, entry, tmp)
efa_rdm_pke_release_tx(pkt_entry);

if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
if (rxe->internal_flags & EFA_RDM_OPE_QUEUED_FLAGS)
dlist_remove(&rxe->queued_entry);

if (rxe->unexp_pkt) {
Expand Down Expand Up @@ -684,10 +675,7 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
assert(0 && "txe unknown state");
}

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)
dlist_remove(&txe->queued_entry);

if (txe->internal_flags & EFA_RDM_OPE_QUEUED_CTRL)
if (txe->internal_flags & EFA_RDM_OPE_QUEUED_FLAGS)
dlist_remove(&txe->queued_entry);

dlist_foreach_container_safe(&txe->queued_pkts,
Expand Down Expand Up @@ -1832,3 +1820,34 @@ ssize_t efa_rdm_ope_post_send_or_queue(struct efa_rdm_ope *ope, int pkt_type)

return err;
}

/**
* @brief Repost the ope that was queued before a handshake is made with peer
*
* @param ope efa rdm ope
* @return ssize_t 0 on success, negative integer on failure.
*/
ssize_t efa_rdm_ope_repost_ope_queued_before_handshake(struct efa_rdm_ope *ope)
{
assert(ope->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE);

if (!(ope->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return -FI_EAGAIN;

switch (ope->op) {
case ofi_op_msg: /* fall through */
case ofi_op_tagged:
return efa_rdm_msg_post_rtm(ope->ep, ope);
case ofi_op_write:
return efa_rdm_rma_post_write(ope->ep, ope);
case ofi_op_read_req:
return efa_rdm_rma_post_read(ope->ep, ope);
case ofi_op_atomic: /* fall through */
case ofi_op_atomic_fetch: /* fall through */
case ofi_op_atomic_compare:
return efa_rdm_atomic_post_atomic(ope->ep, ope);
default:
EFA_WARN(FI_LOG_EP_DATA, "Unknown operation type: %d\n", ope->op);
return -FI_EINVAL;
}
}
13 changes: 13 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
*/
#define EFA_RDM_OPE_READ_NACK BIT_ULL(13)

/**
* @brief flag to indicate that the ope was queued because it hasn't
* made a handshake with the peer. Because this happens before
* EFA provider makes any protocol selection, the progress engine
* needs to determine the protocol from the peer status when
* progressing the queued opes.
*/
#define EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE BIT_ULL(14)

#define EFA_RDM_OPE_QUEUED_FLAGS (EFA_RDM_OPE_QUEUED_RNR | EFA_RDM_OPE_QUEUED_CTRL | EFA_RDM_OPE_QUEUED_READ | EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)

void efa_rdm_ope_try_fill_desc(struct efa_rdm_ope *ope, int mr_iov_start, uint64_t access);

int efa_rdm_txe_prepare_to_be_read(struct efa_rdm_ope *txe,
Expand Down Expand Up @@ -318,4 +329,6 @@ ssize_t efa_rdm_ope_post_send_fallback(struct efa_rdm_ope *ope,

ssize_t efa_rdm_ope_post_send_or_queue(struct efa_rdm_ope *ope, int pkt_type);

ssize_t efa_rdm_ope_repost_ope_queued_before_handshake(struct efa_rdm_ope *ope);

#endif
28 changes: 7 additions & 21 deletions prov/efa/src/rdm/efa_rdm_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ ssize_t efa_rdm_rma_post_read(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
* For local read (read from self ep), such handshake is not needed because we only
* need to check the local ep's capabilities.
*/
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
ret = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return ret ? ret : -FI_EAGAIN;
}
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);

if (efa_rdm_interop_rdma_read(ep, txe->peer)) {
/* RDMA read interoperability check also checks domain.use_device_rdma,
Expand Down Expand Up @@ -361,10 +359,8 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
* For local write (writing it self), this handshake is not required because we only need to
* check one-side capability
*/
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return err ? err : -FI_EAGAIN;
}
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);

if (efa_rdm_rma_should_write_using_rdma(ep, txe, txe->peer)) {
efa_rdm_ope_prepare_to_post_write(txe);
Expand All @@ -380,21 +376,11 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
* The sender cannot send with FI_DELIVERY_COMPLETE
* if the peer is not able to handle it.
*
* If the sender does not know whether the peer
* can handle it, it needs to trigger
* a handshake packet from the peer.
*
* The handshake packet contains
* the information whether the peer
* support it or not.
* handshake is already made now since we enforce
* handshake for write earlier.
*/
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
if (OFI_UNLIKELY(err))
return err;

if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return -FI_EAGAIN;
else if (!efa_rdm_peer_support_delivery_complete(txe->peer))
if (!(txe->peer->is_self) && !efa_rdm_peer_support_delivery_complete(txe->peer))
return -FI_EOPNOTSUPP;

max_eager_rtw_data_size = efa_rdm_txe_max_req_data_capacity(ep, txe, EFA_RDM_DC_EAGER_RTW_PKT);
Expand Down
4 changes: 4 additions & 0 deletions prov/efa/src/rdm/efa_rdm_rma.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ efa_rdm_rma_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
uint32_t op,
uint64_t flags);

ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);

ssize_t efa_rdm_rma_post_read(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);

#endif
8 changes: 1 addition & 7 deletions prov/efa/test/efa_unit_test_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,16 +415,10 @@ void test_rdm_cq_create_error_handling(struct efa_resource **state)
static
int test_efa_rdm_cq_get_ibv_cq_poll_list_length(struct fid_cq *cq_fid)
{
int i = 0;
struct dlist_entry *item;
struct efa_rdm_cq *cq;

cq = container_of(cq_fid, struct efa_rdm_cq, util_cq.cq_fid.fid);
dlist_foreach(&cq->ibv_cq_poll_list, item) {
i++;
}

return i;
return efa_unit_test_get_dlist_length(&cq->ibv_cq_poll_list);
}

/**
Expand Down
Loading
Loading