Skip to content

Commit

Permalink
prov/efa: Do not write cq error for ope from internal operations
Browse files Browse the repository at this point in the history
We have OPEs created for internal usage instead, like the RTW pkt used to trigger a handshake, and handshake pkt itself.
When handling completion error associated with such op entries, we shouldn't write cq error otherwise application will see
garbage cq err entries that don't associate with any operations. We should write eq error in this case instead.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Feb 14, 2025
1 parent 66b1bbf commit 40d4532
Show file tree
Hide file tree
Showing 18 changed files with 466 additions and 13 deletions.
1 change: 1 addition & 0 deletions prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ prov_efa_test_efa_unit_test_LDFLAGS = $(cmocka_rpath) $(efa_LDFLAGS) $(cmocka_LD
-Wl,--wrap=ofi_copy_from_hmem_iov \
-Wl,--wrap=efa_rdm_pke_read \
-Wl,--wrap=efa_rdm_pke_proc_matched_rtm \
-Wl,--wrap=efa_rdm_ope_post_send \
-Wl,--wrap=efa_device_support_unsolicited_write_recv

if HAVE_EFADV_CQ_EX
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ ssize_t efa_rdm_ep_trigger_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer
*/
txe->fi_flags = EFA_RDM_TXE_NO_COMPLETION | EFA_RDM_TXE_NO_COUNTER;
txe->msg_id = -1;
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;

err = efa_rdm_ope_post_send(txe, EFA_RDM_EAGER_RTW_PKT);

Expand Down Expand Up @@ -559,6 +560,7 @@ ssize_t efa_rdm_ep_post_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer *pe
* reset to desired flags (remove things like FI_DELIVERY_COMPLETE, and FI_COMPLETION)
*/
txe->fi_flags = EFA_RDM_TXE_NO_COMPLETION | EFA_RDM_TXE_NO_COUNTER;
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;

pkt_entry = efa_rdm_pke_alloc(ep, ep->efa_tx_pkt_pool, EFA_RDM_PKE_FROM_EFA_TX_POOL);
if (OFI_UNLIKELY(!pkt_entry)) {
Expand Down
15 changes: 15 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,13 @@ void efa_rdm_rxe_handle_error(struct efa_rdm_ope *rxe, int err, int prov_errno)
*/
//efa_rdm_rxe_release(rxe);

if (rxe->internal_flags & EFA_RDM_OPE_INTERNAL) {
EFA_WARN(FI_LOG_CQ,
"Writing eq error for rxe from internal operations\n");
efa_base_ep_write_eq_error(&ep->base_ep, err, prov_errno);
return;
}

efa_cntr_report_error(&ep->base_ep.util_ep, err_entry.flags);
write_cq_err = ofi_cq_write_error(util_cq, &err_entry);
if (write_cq_err) {
Expand Down Expand Up @@ -731,6 +738,13 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
*/
//efa_rdm_txe_release(txe);

if (txe->internal_flags & EFA_RDM_OPE_INTERNAL) {
EFA_WARN(FI_LOG_CQ,
"Writing eq error for txe from internal operations\n");
efa_base_ep_write_eq_error(&ep->base_ep, err, prov_errno);
return;
}

efa_cntr_report_error(&ep->base_ep.util_ep, txe->cq_entry.flags);
write_cq_err = ofi_cq_write_error(util_cq, &err_entry);
if (write_cq_err) {
Expand Down Expand Up @@ -1681,6 +1695,7 @@ int efa_rdm_rxe_post_local_read_or_queue(struct efa_rdm_ope *rxe,
}

txe->local_read_pkt_entry = pkt_entry;
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;
err = efa_rdm_ope_post_remote_read_or_queue(txe);
/* The rx pkts are held until the local read completes */
if (err)
Expand Down
11 changes: 11 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,17 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
*/
#define EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE BIT_ULL(14)

/**
* @brief flag to indicate that the ope was created
* for internal operations, so it should not generate
* any cq entry or err entry.
* NOTICE: the ope->internal_flags is uint16_t, so
* to introduce more bits for internal flags, the
* internal_flags needs to be changed to uint32_t
* or larger.
*/
#define EFA_RDM_OPE_INTERNAL BIT_ULL(15)

#define EFA_RDM_OPE_QUEUED_FLAGS (EFA_RDM_OPE_QUEUED_RNR | EFA_RDM_OPE_QUEUED_CTRL | EFA_RDM_OPE_QUEUED_READ | EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)

void efa_rdm_ope_try_fill_desc(struct efa_rdm_ope *ope, int mr_iov_start, uint64_t access);
Expand Down
16 changes: 13 additions & 3 deletions prov/efa/src/rdm/efa_rdm_pke_rta.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ struct efa_rdm_ope *efa_rdm_pke_alloc_rta_rxe(struct efa_rdm_pke *pkt_entry, int
return NULL;
}

rxe->internal_flags |= EFA_RDM_OPE_INTERNAL;

if (op == ofi_op_atomic) {
rxe->addr = pkt_entry->addr;
return rxe;
Expand Down Expand Up @@ -301,7 +303,9 @@ int efa_rdm_pke_proc_dc_write_rta(struct efa_rdm_pke *pkt_entry)
EFA_WARN(FI_LOG_CQ,
"Posting of receipt packet failed! err=%s\n",
fi_strerror(err));
efa_rdm_rxe_handle_error(rxe, -err, FI_EFA_ERR_PKT_POST);
efa_base_ep_write_eq_error(&pkt_entry->ep->base_ep, err, FI_EFA_ERR_PKT_POST);
efa_rdm_rxe_release(rxe);
efa_rdm_pke_release_rx(pkt_entry);
return err;
}

Expand Down Expand Up @@ -415,10 +419,16 @@ int efa_rdm_pke_proc_fetch_rta(struct efa_rdm_pke *pkt_entry)
}

err = efa_rdm_ope_post_send_or_queue(rxe, EFA_RDM_ATOMRSP_PKT);
if (OFI_UNLIKELY(err))
efa_rdm_rxe_handle_error(rxe, -err, FI_EFA_ERR_PKT_POST);

efa_rdm_pke_release_rx(pkt_entry);

if (OFI_UNLIKELY(err)) {
EFA_WARN(FI_LOG_CQ, "Posting of atomrsp packet failed! err=%ld\n", err);
efa_base_ep_write_eq_error(&ep->base_ep, err, FI_EFA_ERR_PKT_POST);
efa_rdm_rxe_release(rxe);
return err;
}

return 0;
}

Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_pke_rta.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ ssize_t efa_rdm_pke_init_compare_rta(struct efa_rdm_pke *pkt_entry,

int efa_rdm_pke_proc_compare_rta(struct efa_rdm_pke *pkt_entry);

struct efa_rdm_ope *efa_rdm_pke_alloc_rta_rxe(struct efa_rdm_pke *pkt_entry, int op);

#endif
43 changes: 35 additions & 8 deletions prov/efa/src/rdm/efa_rdm_pke_rtr.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,40 @@ ssize_t efa_rdm_pke_init_longcts_rtr(struct efa_rdm_pke *pkt_entry,
return 0;
}

/**
* @brief allcoate an RX entry for a incoming RTR packet
*
* The RX entry will be allocated from endpoint's OP entry
* pool
* @param[in] pkt_entry received RTR packet
*
* @return
* pointer to the newly allocated RX entry.
* NULL when OP entry pool has been exhausted.
*/
struct efa_rdm_ope *efa_rdm_pke_alloc_rtr_rxe(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_ep *ep = pkt_entry->ep;
struct efa_rdm_ope *rxe;
struct efa_rdm_rtr_hdr *rtr_hdr;

rxe = efa_rdm_ep_alloc_rxe(ep, pkt_entry->addr, ofi_op_read_rsp);
if (OFI_UNLIKELY(!rxe))
return NULL;

rxe->addr = pkt_entry->addr;
rxe->bytes_received = 0;
rxe->bytes_copied = 0;

rtr_hdr = (struct efa_rdm_rtr_hdr *)pkt_entry->wiredata;
rxe->tx_id = rtr_hdr->recv_id;
rxe->window = rtr_hdr->recv_length;
rxe->iov_count = rtr_hdr->rma_iov_count;
rxe->internal_flags |= EFA_RDM_OPE_INTERNAL;

return rxe;
}

/**
* @brief process an incoming RTR packet
*
Expand All @@ -81,7 +115,7 @@ void efa_rdm_pke_handle_rtr_recv(struct efa_rdm_pke *pkt_entry)

ep = pkt_entry->ep;

rxe = efa_rdm_ep_alloc_rxe(ep, pkt_entry->addr, ofi_op_read_rsp);
rxe = efa_rdm_pke_alloc_rtr_rxe(pkt_entry);
if (OFI_UNLIKELY(!rxe)) {
EFA_WARN(FI_LOG_CQ,
"RX entries exhausted.\n");
Expand All @@ -90,14 +124,7 @@ void efa_rdm_pke_handle_rtr_recv(struct efa_rdm_pke *pkt_entry)
return;
}

rxe->addr = pkt_entry->addr;
rxe->bytes_received = 0;
rxe->bytes_copied = 0;

rtr_hdr = (struct efa_rdm_rtr_hdr *)pkt_entry->wiredata;
rxe->tx_id = rtr_hdr->recv_id;
rxe->window = rtr_hdr->recv_length;
rxe->iov_count = rtr_hdr->rma_iov_count;
err = efa_rdm_rma_verified_copy_iov(ep, rtr_hdr->rma_iov, rtr_hdr->rma_iov_count,
FI_REMOTE_READ, rxe->iov, rxe->desc);
if (OFI_UNLIKELY(err)) {
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_pke_rtr.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ ssize_t efa_rdm_pke_init_longcts_rtr(struct efa_rdm_pke *pkt_entry,

void efa_rdm_pke_handle_rtr_recv(struct efa_rdm_pke *pkt_entry);

struct efa_rdm_ope *efa_rdm_pke_alloc_rtr_rxe(struct efa_rdm_pke *pkt_entry);
#endif
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_pke_rtw.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ ssize_t efa_rdm_pke_init_rtw_common(struct efa_rdm_pke *pkt_entry,
* pointer to the newly allocated RX entry.
* NULL when OP entry pool has been exhausted.
*/
static
struct efa_rdm_ope *efa_rdm_pke_alloc_rtw_rxe(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_ope *rxe;
Expand All @@ -79,6 +78,7 @@ struct efa_rdm_ope *efa_rdm_pke_alloc_rtw_rxe(struct efa_rdm_pke *pkt_entry)
rxe->addr = pkt_entry->addr;
rxe->bytes_received = 0;
rxe->bytes_copied = 0;
rxe->internal_flags |= EFA_RDM_OPE_INTERNAL;
return rxe;
}

Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_pke_rtw.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ ssize_t efa_rdm_pke_init_longread_rtw(struct efa_rdm_pke *pkt_entry,

void efa_rdm_pke_handle_longread_rtw_recv(struct efa_rdm_pke *pkt_entry);

struct efa_rdm_ope *efa_rdm_pke_alloc_rtw_rxe(struct efa_rdm_pke *pkt_entry);

#endif
43 changes: 43 additions & 0 deletions prov/efa/test/efa_unit_test_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,46 @@ void efa_unit_test_handshake_pkt_construct(struct efa_rdm_pke *pkt_entry, struct
APPEND_OPT_HANDSHAKE_FIELD(host_id, EFA_RDM_HANDSHAKE_HOST_ID_HDR);
APPEND_OPT_HANDSHAKE_FIELD(device_version, EFA_RDM_HANDSHAKE_DEVICE_VERSION_HDR);
}


struct efa_rdm_ope *efa_unit_test_alloc_txe(struct efa_resource *resource, uint32_t op)
{
fi_addr_t peer_addr = 0;
struct efa_ep_addr raw_addr = {0};
size_t raw_addr_len = sizeof(raw_addr);
struct efa_rdm_peer *peer;
struct fi_msg msg = {0};
struct efa_rdm_ep *efa_rdm_ep;

efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);

/* Create and register a fake peer */
assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
raw_addr.qpn = 0;
raw_addr.qkey = 0x1234;

assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1);

peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr);

return efa_rdm_ep_alloc_txe(efa_rdm_ep, peer, &msg, op, 0, 0);
}

struct efa_rdm_ope *efa_unit_test_alloc_rxe(struct efa_resource *resource, uint32_t op)
{
fi_addr_t peer_addr = 0;
struct efa_ep_addr raw_addr = {0};
size_t raw_addr_len = sizeof(raw_addr);
struct efa_rdm_ep *efa_rdm_ep;

efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);

/* Create and register a fake peer */
assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
raw_addr.qpn = 0;
raw_addr.qkey = 0x1234;

assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1);

return efa_rdm_ep_alloc_rxe(efa_rdm_ep, peer_addr, op);
}
61 changes: 61 additions & 0 deletions prov/efa/test/efa_unit_test_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,67 @@ void test_efa_rdm_ep_read_queue_before_handshake(struct efa_resource **state)
test_efa_rdm_ep_rma_queue_before_handshake(state, ofi_op_read_req);
}

/**
* @brief Test the efa_rdm_ep_trigger_handshake function
* with different peer setup and check the txe flags
*
* @param state efa_resource
*/
void test_efa_rdm_ep_trigger_handshake(struct efa_resource **state)
{
struct efa_rdm_ope *txe;
struct efa_rdm_ep *efa_rdm_ep;
struct efa_resource *resource = *state;
struct efa_rdm_peer *peer;
struct efa_ep_addr raw_addr = {0};
size_t raw_addr_len = sizeof(raw_addr);
fi_addr_t peer_addr = 0;

g_efa_unit_test_mocks.efa_rdm_ope_post_send = &efa_mock_efa_rdm_ope_post_send_return_mock;

efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_FABRIC_NAME);

efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);

will_return_always(efa_mock_efa_rdm_ope_post_send_return_mock, FI_SUCCESS);

/* Create and register a fake peer */
assert_int_equal(fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
raw_addr.qpn = 0;
raw_addr.qkey = 0x1234;

assert_int_equal(fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL), 1);

peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr);
assert_non_null(peer);

/* No txe should have been allocated yet */
assert_true(dlist_empty(&efa_rdm_ep->txe_list));

/*
* When the peer already has made , the function should be a no-op
* and no txe is allocated
*/
peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED | EFA_RDM_PEER_REQ_SENT;
assert_int_equal(efa_rdm_ep_trigger_handshake(efa_rdm_ep, peer), FI_SUCCESS);
assert_true(dlist_empty(&efa_rdm_ep->txe_list));

/*
* Reset the peer flags to 0, now we should expect a txe allocated
*/
peer->flags = 0;
assert_int_equal(efa_rdm_ep_trigger_handshake(efa_rdm_ep, peer), FI_SUCCESS);
assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1);

txe = container_of(efa_rdm_ep->txe_list.next, struct efa_rdm_ope, ep_entry);

assert_true(txe->fi_flags & EFA_RDM_TXE_NO_COMPLETION);
assert_true(txe->fi_flags & EFA_RDM_TXE_NO_COUNTER);
assert_true(txe->internal_flags & EFA_RDM_OPE_INTERNAL);

efa_rdm_txe_release(txe);
}

/**
* @brief When local support unsolicited write, but the peer doesn't, fi_writedata
* (use rdma-write with imm) should fail as FI_EINVAL
Expand Down
11 changes: 11 additions & 0 deletions prov/efa/test/efa_unit_test_mocks.c
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ int efa_mock_efa_rdm_pke_read_return_mock(struct efa_rdm_ope *ope)
return mock();
}

ssize_t efa_mock_efa_rdm_ope_post_send_return_mock(struct efa_rdm_ope *ope, int pkt_type)
{
return mock();
}

ssize_t efa_mock_efa_rdm_pke_proc_matched_rtm_no_op(struct efa_rdm_pke *pkt_entry)
{
return FI_SUCCESS;
Expand Down Expand Up @@ -261,6 +266,7 @@ struct efa_unit_test_mocks g_efa_unit_test_mocks = {
.ofi_copy_from_hmem_iov = __real_ofi_copy_from_hmem_iov,
.efa_rdm_pke_read = __real_efa_rdm_pke_read,
.efa_rdm_pke_proc_matched_rtm = __real_efa_rdm_pke_proc_matched_rtm,
.efa_rdm_ope_post_send = __real_efa_rdm_ope_post_send,
.efa_device_support_unsolicited_write_recv = __real_efa_device_support_unsolicited_write_recv,
.ibv_is_fork_initialized = __real_ibv_is_fork_initialized,
#if HAVE_EFADV_QUERY_MR
Expand Down Expand Up @@ -401,6 +407,11 @@ int __wrap_efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry)
return g_efa_unit_test_mocks.efa_rdm_pke_proc_matched_rtm(pkt_entry);
}

int __wrap_efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type)
{
return g_efa_unit_test_mocks.efa_rdm_ope_post_send(ope, pkt_type);
}

bool __wrap_efa_device_support_unsolicited_write_recv(void)
{
return g_efa_unit_test_mocks.efa_device_support_unsolicited_write_recv();
Expand Down
6 changes: 6 additions & 0 deletions prov/efa/test/efa_unit_test_mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ ssize_t __real_efa_rdm_pke_proc_matched_rtm(struct efa_rdm_pke *pkt_entry);

ssize_t efa_mock_efa_rdm_pke_proc_matched_rtm_no_op(struct efa_rdm_pke *pkt_entry);

ssize_t __real_efa_rdm_ope_post_send(struct efa_rdm_ope *ope, int pkt_type);

ssize_t efa_mock_efa_rdm_ope_post_send_return_mock(struct efa_rdm_ope *ope, int pkt_type);

bool efa_mock_efa_device_support_unsolicited_write_recv(void);

int efa_mock_ibv_post_recv(struct ibv_qp *qp, struct ibv_recv_wr *wr,
Expand Down Expand Up @@ -144,6 +148,8 @@ struct efa_unit_test_mocks

ssize_t (*efa_rdm_pke_proc_matched_rtm)(struct efa_rdm_pke *pkt_entry);

ssize_t (*efa_rdm_ope_post_send)(struct efa_rdm_ope *ope, int pkt_type);

bool (*efa_device_support_unsolicited_write_recv)(void);

enum ibv_fork_status (*ibv_is_fork_initialized)(void);
Expand Down
Loading

0 comments on commit 40d4532

Please sign in to comment.