Skip to content

Commit

Permalink
[v1.22.x] prov/efa: Handle recv cancel for zero copy recv
Browse files Browse the repository at this point in the history
Currently, posted recv is not tracked in zero-copy mode,
which breaks various ep operations including fi_cancel. This patch fixes this
issue by introducing a user_recv_rxe_list that tracks the posted user recv,
and implement fi_cancel operation for this list.

Signed-off-by: Shi Jin <sjina@amazon.com>
(cherry picked from commit 2cffc27)
  • Loading branch information
shijin-aws committed Jul 24, 2024
1 parent 12b8d30 commit 8258ec3
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 5 deletions.
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ struct efa_rdm_ep {
struct dlist_entry entry;
/* the count of opes queued before handshake is made with their peers */
size_t ope_queued_before_handshake_cnt;
/* user posted rx entry list (for zero copy recv) */
struct dlist_entry user_recv_rxe_list;
};

int efa_rdm_ep_flush_queued_blocking_copy_to_hmem(struct efa_rdm_ep *ep);
Expand Down
58 changes: 57 additions & 1 deletion prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ void efa_rdm_ep_init_linked_lists(struct efa_rdm_ep *ep)
#endif
dlist_init(&ep->rxe_list);
dlist_init(&ep->txe_list);
dlist_init(&ep->user_recv_rxe_list);
}

/**
Expand Down Expand Up @@ -1288,6 +1289,59 @@ static int efa_rdm_ep_ctrl(struct fid *fid, int command, void *arg)
return ret;
}

static int efa_rdm_ep_cancel_match_recv(struct dlist_entry *item,
const void *context)
{
struct efa_rdm_ope *rxe = container_of(item, struct efa_rdm_ope, entry);
return rxe->cq_entry.op_context == context;
}

/**
* @brief Cancel a recv in ep->user_recv_rxe_list
*
* @param ep efa_rdm_ep
* @param context pointer to the context to be cancelled
* @return ssize_t 0 on success, negative integer on failure.
*/
static
ssize_t efa_rdm_ep_cancel_zcpy_recv(struct efa_rdm_ep *ep,
void *context)
{
struct dlist_entry *entry;
struct efa_rdm_ope *rxe;
struct fi_cq_err_entry err_entry = {0};
struct util_srx_ctx *srx_ctx;
int ret;

srx_ctx = efa_rdm_ep_get_peer_srx_ctx(ep);
ofi_genlock_lock(srx_ctx->lock);

entry = dlist_remove_first_match(&ep->user_recv_rxe_list,
&efa_rdm_ep_cancel_match_recv,
context);
if (!entry) {
ret = 0;
goto unlock;
}

rxe = container_of(entry, struct efa_rdm_ope, entry);
assert(rxe->user_rx_pkt);
rxe->user_rx_pkt->flags |= EFA_RDM_PKE_USER_RECV_CANCEL;

dlist_remove(&rxe->entry);
err_entry.op_context = rxe->cq_entry.op_context;
err_entry.flags |= rxe->cq_entry.flags;
err_entry.err = FI_ECANCELED;
err_entry.prov_errno = -FI_ECANCELED;

efa_rdm_rxe_release(rxe);
ret = ofi_cq_write_error(ep->base_ep.util_ep.rx_cq, &err_entry);

unlock:
ofi_genlock_unlock(srx_ctx->lock);
return ret;
}

/**
* @brief implement the fi_cancel API
* @param[in] fid_ep EFA RDM endpoint to perform the cancel operation
Expand All @@ -1299,7 +1353,9 @@ ssize_t efa_rdm_ep_cancel(fid_t fid_ep, void *context)
struct efa_rdm_ep *ep;

ep = container_of(fid_ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid.fid);
return ep->peer_srx_ep->ops->cancel(&ep->peer_srx_ep->fid, context);

return (ep->use_zcpy_rx) ? efa_rdm_ep_cancel_zcpy_recv(ep, context) :
ep->peer_srx_ep->ops->cancel(&ep->peer_srx_ep->fid, context);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe

pkt_entry->ope = rxe;
rxe->state = EFA_RDM_RXE_MATCHED;
rxe->user_rx_pkt = pkt_entry;

err = ofi_iov_locate(rxe->iov, rxe->iov_count, ep->msg_prefix_size, &rx_iov_index, &rx_iov_offset);
if (OFI_UNLIKELY(err)) {
Expand All @@ -247,6 +248,7 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe
dlist_insert_tail(&pkt_entry->dbg_entry, &ep->rx_posted_buf_list);
#endif
ep->user_rx_pkts_posted++;
dlist_insert_tail(&rxe->entry, &ep->user_recv_rxe_list);
return 0;
}

Expand Down
6 changes: 4 additions & 2 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ struct efa_rdm_ope {

struct fi_cq_tagged_entry cq_entry;

/* For txe, entry is linked with tx_pending_list, ope_longcts_send_list in efa_rdm_ep.
* For rxe, entry is linked with ope_longcts_send_list.
/* For txe, entry is linked with ope_longcts_send_list in efa_rdm_ep.
* For rxe, entry is linked with ope_longcts_send_list, user_recv_rxe_list in efa_rdm_ep.
*/
struct dlist_entry entry;

Expand Down Expand Up @@ -154,6 +154,8 @@ struct efa_rdm_ope {
size_t efa_outstanding_tx_ops;

struct efa_rdm_pke *unexp_pkt;
/* the pkt entry posted from user's rx buffer */
struct efa_rdm_pke *user_rx_pkt;
char *atomrsp_data;
enum efa_rdm_cuda_copy_method cuda_copy_method;
/* end of RX related variables */
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_pke.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#define EFA_RDM_PKE_DC_LONGCTS_DATA BIT_ULL(3) /**< this DATA packet entry is used by a delivery complete LONGCTS send/write protocol*/
#define EFA_RDM_PKE_LOCAL_WRITE BIT_ULL(4) /**< this packet entry is used as context of an RDMA Write to self */
#define EFA_RDM_PKE_SEND_TO_USER_RECV_QP BIT_ULL(5) /**< this packet entry is used for posting send to a dedicated QP that doesn't expect any pkt hdrs */
#define EFA_RDM_PKE_USER_RECV_CANCEL BIT_ULL(6) /**< this packet entry refers to a rx buffer that has been canceled by the user */

#define EFA_RDM_PKE_ALIGNMENT 128

Expand Down
11 changes: 10 additions & 1 deletion prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,17 @@ fi_addr_t efa_rdm_pke_insert_addr(struct efa_rdm_pke *pkt_entry, void *raw_addr)

void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_imm_data, uint32_t imm_data)
{
struct efa_rdm_ope *rxe = pkt_entry->ope;
struct efa_rdm_ope *rxe;

assert(pkt_entry->alloc_type == EFA_RDM_PKE_FROM_USER_RX_POOL);

/* In this case, the rxe has been released and we need to release pkt entry only */
if (pkt_entry->flags & EFA_RDM_PKE_USER_RECV_CANCEL) {
efa_rdm_pke_release_rx(pkt_entry);
return;
}

rxe = pkt_entry->ope;
assert(rxe);

if (has_imm_data) {
Expand All @@ -784,6 +792,7 @@ void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_im
rxe->total_len = pkt_entry->pkt_size;
rxe->cq_entry.len = pkt_entry->pkt_size;

dlist_remove(&rxe->entry);
efa_rdm_rxe_report_completion(rxe);
efa_rdm_rxe_release(rxe);
efa_rdm_pke_release_rx(pkt_entry);
Expand Down
56 changes: 55 additions & 1 deletion prov/efa/test/efa_unit_test_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1017,4 +1017,58 @@ void test_efa_rdm_ep_close_discard_posted_recv(struct efa_resource **state)

/* Reset to NULL to avoid test reaper closing again */
resource->ep = NULL;
}
}

void test_efa_rdm_ep_zcpy_recv_cancel(struct efa_resource **state)
{
struct efa_resource *resource = *state;
struct fi_context cancel_context = {0};
struct fi_cq_err_entry cq_err_entry = {0};
struct efa_unit_test_buff recv_buff;
struct efa_rdm_pke *pke;
struct efa_rdm_ep *efa_rdm_ep;
struct efa_rdm_ope *rxe;

resource->hints = efa_unit_test_alloc_hints(FI_EP_RDM);
assert_non_null(resource->hints);

resource->hints->tx_attr->msg_order = FI_ORDER_NONE;
resource->hints->rx_attr->msg_order = FI_ORDER_NONE;
resource->hints->caps = FI_MSG;

/* enable zero-copy recv mode in ep */
test_efa_rdm_ep_use_zcpy_rx_impl(resource, true);

/* Construct a recv buffer with mr */
efa_unit_test_buff_construct(&recv_buff, resource, 16);

assert_int_equal(fi_recv(resource->ep, recv_buff.buff, recv_buff.size, fi_mr_desc(recv_buff.mr), FI_ADDR_UNSPEC, &cancel_context), 0);

efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);

assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->user_recv_rxe_list), 1);

rxe = container_of(efa_rdm_ep->user_recv_rxe_list.next, struct efa_rdm_ope, entry);
pke = rxe->user_rx_pkt;

assert_int_equal(fi_cancel((struct fid *)resource->ep, &cancel_context), 0);

assert_true(pke->flags & EFA_RDM_PKE_USER_RECV_CANCEL);

assert_int_equal(fi_cq_read(resource->cq, NULL, 1), -FI_EAVAIL);

assert_int_equal(fi_cq_readerr(resource->cq, &cq_err_entry, 0), 1);

assert_int_equal(cq_err_entry.err, FI_ECANCELED);

assert_int_equal(cq_err_entry.prov_errno, -FI_ECANCELED);

assert_true(cq_err_entry.op_context == (void *) &cancel_context);

/**
* the buf is still posted to rdma-core, so unregistering mr can
* return non-zero. Currently ignore this failure.
*/
(void) fi_close(&recv_buff.mr->fid);
free(recv_buff.buff);
}
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_user_zcpy_rx_happy, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_user_zcpy_rx_unhappy_due_to_sas, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_close_discard_posted_recv, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_zcpy_recv_cancel, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_dgram_cq_read_empty_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_ibv_cq_ex_read_empty_cq, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_ibv_cq_ex_read_failed_poll, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
Expand Down
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_tests.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ void test_efa_rdm_ep_enable_qp_in_order_aligned_128_bytes_bad();
void test_efa_rdm_ep_user_zcpy_rx_happy();
void test_efa_rdm_ep_user_zcpy_rx_unhappy_due_to_sas();
void test_efa_rdm_ep_close_discard_posted_recv();
void test_efa_rdm_ep_zcpy_recv_cancel();
void test_dgram_cq_read_empty_cq();
void test_ibv_cq_ex_read_empty_cq();
void test_ibv_cq_ex_read_failed_poll();
Expand Down

0 comments on commit 8258ec3

Please sign in to comment.