Skip to content

Commit

Permalink
prov/efa: Queue txes when handshake is enforced but not made.
Browse files Browse the repository at this point in the history
Introduce efa_rdm_txe_enforce_handshake() function to handle
the handshake triggering and the txe queueing.

Signed-off-by: Shi Jin <sjina@amazon.com>
  • Loading branch information
shijin-aws committed Jun 25, 2024
1 parent 25201fd commit e413982
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 46 deletions.
9 changes: 3 additions & 6 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ efa_rdm_atomic_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *txe)
{
bool delivery_complete_requested;
int ret;
static int req_pkt_type_list[] = {
[ofi_op_atomic] = EFA_RDM_WRITE_RTA_PKT,
[ofi_op_atomic_fetch] = EFA_RDM_FETCH_RTA_PKT,
Expand All @@ -119,12 +118,10 @@ ssize_t efa_rdm_atomic_post_atomic(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm
* the information whether the peer
* support it or not.
*/
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
ret = efa_rdm_ep_trigger_handshake(efa_rdm_ep, txe->peer);
return ret ? ret : -FI_EAGAIN;
}
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(efa_rdm_ep, txe);

if (!efa_rdm_peer_support_delivery_complete(txe->peer))
if (!(txe->peer->is_self) && !efa_rdm_peer_support_delivery_complete(txe->peer))
return -FI_EOPNOTSUPP;
}

Expand Down
2 changes: 2 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ ssize_t efa_rdm_ep_trigger_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer

ssize_t efa_rdm_ep_post_handshake(struct efa_rdm_ep *ep, struct efa_rdm_peer *peer);

int efa_rdm_ep_enforce_handshake_for_txe(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);

void efa_rdm_ep_post_handshake_or_queue(struct efa_rdm_ep *ep,
struct efa_rdm_peer *peer);

Expand Down
32 changes: 32 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -967,3 +967,35 @@ size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface
return memory_alignment;
}

/**
* @brief Enforce a handshake to made for given txe.
* It will trigger a handshake with peer and choose to
* return EAGAIN or queue the txe.
* @param ep efa_rdm_ep
* @param txe tx entry
* @return int 0 on success, negative integer on failure.
*/
int efa_rdm_ep_enforce_handshake_for_txe(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
{
int ret;

assert(txe->type == EFA_RDM_TXE);
assert(!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED));

ret = efa_rdm_ep_trigger_handshake(ep, txe->peer);
if (ret)
return ret;
/**
* we cannot queue requests (and return 0) for inject,
* which expects the buffer can be reused when the call
* returns success.
*/
if (txe->fi_flags & FI_INJECT)
return -FI_EAGAIN;

if (!(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)) {
txe->internal_flags |= EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE;
dlist_insert_tail(&txe->queued_entry, &efa_rdm_ep_domain(ep)->ope_queued_list);
}
return FI_SUCCESS;
}
6 changes: 2 additions & 4 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,8 @@ ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
*
* Check handshake packet from peer to verify support status.
*/
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return err ? err : -FI_EAGAIN;
}
if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);

if (!efa_rdm_pkt_type_is_supported_by_peer(rtm_type, txe->peer))
return -FI_EOPNOTSUPP;
Expand Down
28 changes: 7 additions & 21 deletions prov/efa/src/rdm/efa_rdm_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,8 @@ ssize_t efa_rdm_rma_post_read(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
* For local read (read from self ep), such handshake is not needed because we only
* need to check the local ep's capabilities.
*/
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
ret = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return ret ? ret : -FI_EAGAIN;
}
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);

if (efa_rdm_interop_rdma_read(ep, txe->peer)) {
/* RDMA read interoperability check also checks domain.use_device_rdma,
Expand Down Expand Up @@ -361,10 +359,8 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
* For local write (writing it self), this handshake is not required because we only need to
* check one-side capability
*/
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
return err ? err : -FI_EAGAIN;
}
if (!(txe->peer->is_self) && !(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return efa_rdm_ep_enforce_handshake_for_txe(ep, txe);

if (efa_rdm_rma_should_write_using_rdma(ep, txe, txe->peer)) {
efa_rdm_ope_prepare_to_post_write(txe);
Expand All @@ -380,21 +376,11 @@ ssize_t efa_rdm_rma_post_write(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
* The sender cannot send with FI_DELIVERY_COMPLETE
* if the peer is not able to handle it.
*
* If the sender does not know whether the peer
* can handle it, it needs to trigger
* a handshake packet from the peer.
*
* The handshake packet contains
* the information whether the peer
* support it or not.
* handshake is already made now since we enforce
* handshake for write earlier.
*/
err = efa_rdm_ep_trigger_handshake(ep, txe->peer);
if (OFI_UNLIKELY(err))
return err;

if (!(txe->peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED))
return -FI_EAGAIN;
else if (!efa_rdm_peer_support_delivery_complete(txe->peer))
if (!(txe->peer->is_self) && !efa_rdm_peer_support_delivery_complete(txe->peer))
return -FI_EOPNOTSUPP;

max_eager_rtw_data_size = efa_rdm_txe_max_req_data_capacity(ep, txe, EFA_RDM_DC_EAGER_RTW_PKT);
Expand Down
8 changes: 1 addition & 7 deletions prov/efa/test/efa_unit_test_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,16 +415,10 @@ void test_rdm_cq_create_error_handling(struct efa_resource **state)
static
int test_efa_rdm_cq_get_ibv_cq_poll_list_length(struct fid_cq *cq_fid)
{
int i = 0;
struct dlist_entry *item;
struct efa_rdm_cq *cq;

cq = container_of(cq_fid, struct efa_rdm_cq, util_cq.cq_fid.fid);
dlist_foreach(&cq->ibv_cq_poll_list, item) {
i++;
}

return i;
return efa_unit_test_get_dlist_length(&cq->ibv_cq_poll_list);
}

/**
Expand Down
168 changes: 162 additions & 6 deletions prov/efa/test/efa_unit_test_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,13 @@ void test_efa_rdm_ep_pkt_pool_page_alignment(struct efa_resource **state)
}



/**
* @brief when delivery complete atomic was used and handshake packet has not been received
* verify there is no txe leak
* verify the txe is queued
*
* @param[in] state struct efa_resource that is managed by the framework
*/
void test_efa_rdm_ep_dc_atomic_error_handling(struct efa_resource **state)
void test_efa_rdm_ep_dc_atomic_queue_before_handshake(struct efa_resource **state)
{
struct efa_rdm_ep *efa_rdm_ep;
struct efa_rdm_peer *peer;
Expand All @@ -322,6 +321,7 @@ void test_efa_rdm_ep_dc_atomic_error_handling(struct efa_resource **state)
size_t raw_addr_len = sizeof(struct efa_ep_addr);
fi_addr_t peer_addr;
int buf[1] = {0}, err, numaddr;
struct efa_rdm_ope *txe;

efa_unit_test_resource_construct(resource, FI_EP_RDM);

Expand Down Expand Up @@ -363,11 +363,167 @@ void test_efa_rdm_ep_dc_atomic_error_handling(struct efa_resource **state)
assert_true(dlist_empty(&efa_rdm_ep->txe_list));
err = fi_atomicmsg(resource->ep, &msg, FI_DELIVERY_COMPLETE);
/* DC has been reuquested, but ep do not know whether peer supports it, therefore
* -FI_EAGAIN should be returned
* the ope has been queued to domain->ope_queued_list
*/
assert_int_equal(err, -FI_EAGAIN);
/* make sure there is no leaking of txe */
assert_int_equal(err, 0);
assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1);
assert_int_equal(efa_unit_test_get_dlist_length(&(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list)), 1);
txe = container_of(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list.next, struct efa_rdm_ope, queued_entry);
assert_true((txe->op == ofi_op_atomic));
assert_true(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE);
}

/**
* @brief when delivery complete send was used and handshake packet has not been received
* verify the txe is queued
*
* @param[in] state struct efa_resource that is managed by the framework
*/
void test_efa_rdm_ep_dc_send_queue_before_handshake(struct efa_resource **state)
{
struct efa_rdm_ep *efa_rdm_ep;
struct efa_rdm_peer *peer;
struct fi_msg msg = {0};
struct iovec iov;
struct efa_resource *resource = *state;
struct efa_ep_addr raw_addr = {0};
size_t raw_addr_len = sizeof(struct efa_ep_addr);
fi_addr_t peer_addr;
int err, numaddr;
struct efa_rdm_ope *txe;

efa_unit_test_resource_construct(resource, FI_EP_RDM);

/* create a fake peer */
err = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len);
assert_int_equal(err, 0);
raw_addr.qpn = 1;
raw_addr.qkey = 0x1234;
numaddr = fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL);
assert_int_equal(numaddr, 1);

msg.addr = peer_addr;
msg.iov_count = 1;
iov.iov_base = NULL;
iov.iov_len = 0;
msg.msg_iov = &iov;
msg.desc = NULL;

efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);
/* close shm_ep to force efa_rdm_ep to use efa device to send */
if (efa_rdm_ep->shm_ep) {
err = fi_close(&efa_rdm_ep->shm_ep->fid);
assert_int_equal(err, 0);
efa_rdm_ep->shm_ep = NULL;
}
/* set peer->flag to EFA_RDM_PEER_REQ_SENT will make efa_rdm_atomic() think
* a REQ packet has been sent to the peer (so no need to send again)
* handshake has not been received, so we do not know whether the peer support DC
*/
peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr);
peer->flags = EFA_RDM_PEER_REQ_SENT;
peer->is_local = false;

assert_true(dlist_empty(&efa_rdm_ep->txe_list));
err = fi_sendmsg(resource->ep, &msg, FI_DELIVERY_COMPLETE);
/* DC has been reuquested, but ep do not know whether peer supports it, therefore
* the ope has been queued to domain->ope_queued_list
*/
assert_int_equal(err, 0);
assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1);
assert_int_equal(efa_unit_test_get_dlist_length(&(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list)), 1);
txe = container_of(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list.next, struct efa_rdm_ope, queued_entry);
assert_true((txe->op == ofi_op_msg));
assert_true(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE);
}


/**
* @brief verify tx entry is queued for rma (read or write) request before handshake is made.
*
* @param[in] state struct efa_resource that is managed by the framework
* @param[in] op op code
*/
void test_efa_rdm_ep_rma_queue_before_handshake(struct efa_resource **state, int op)
{
struct efa_resource *resource = *state;
struct efa_rdm_ep *efa_rdm_ep;
struct efa_ep_addr raw_addr = {0};
size_t raw_addr_len = sizeof(struct efa_ep_addr);
fi_addr_t peer_addr;
int num_addr;
const int buf_len = 8;
char buf[8] = {0};
int err;
uint64_t rma_addr, rma_key;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;

resource->hints = efa_unit_test_alloc_hints(FI_EP_RDM);
resource->hints->caps |= FI_MSG | FI_TAGGED | FI_RMA;
resource->hints->domain_attr->mr_mode = FI_MR_BASIC;
efa_unit_test_resource_construct_with_hints(resource, FI_EP_RDM, resource->hints, true, true);

/* ensure we don't have RMA capability. */
efa_rdm_ep = container_of(resource->ep, struct efa_rdm_ep, base_ep.util_ep.ep_fid);

/* create a fake peer */
err = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len);
assert_int_equal(err, 0);
raw_addr.qpn = 1;
raw_addr.qkey = 0x1234;
num_addr = fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL);
assert_int_equal(num_addr, 1);

/* create a fake rma_key and address. fi_read should return before
* they are needed. */
rma_key = 0x1234;
rma_addr = (uint64_t) &buf;

/* set peer->flag to EFA_RDM_PEER_REQ_SENT will make efa_rdm_atomic() think
* a REQ packet has been sent to the peer (so no need to send again)
* handshake has not been received, so we do not know whether the peer support DC
*/
peer = efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr);
peer->flags = EFA_RDM_PEER_REQ_SENT;
peer->is_local = false;

assert_true(dlist_empty(&efa_rdm_ep->txe_list));

if (op == ofi_op_read_req) {
err = fi_read(resource->ep, buf, buf_len,
NULL, /* desc, not required */
peer_addr,
rma_addr,
rma_key,
NULL); /* context */
} else if (op == ofi_op_write) {
err = fi_write(resource->ep, buf, buf_len,
NULL, /* desc, not required */
peer_addr,
rma_addr,
rma_key,
NULL); /* context */
} else {
fprintf(stderr, "Unknown op code %d\n", op);
fail();
}
assert_int_equal(err, 0);
assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1);
assert_int_equal(efa_unit_test_get_dlist_length(&(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list)), 1);
txe = container_of(efa_rdm_ep_domain(efa_rdm_ep)->ope_queued_list.next, struct efa_rdm_ope, queued_entry);
assert_true((txe->op == op));
assert_true(txe->internal_flags & EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE);
}

void test_efa_rdm_ep_write_queue_before_handshake(struct efa_resource **state)
{
test_efa_rdm_ep_rma_queue_before_handshake(state, ofi_op_write);
}

void test_efa_rdm_ep_read_queue_before_handshake(struct efa_resource **state)
{
test_efa_rdm_ep_rma_queue_before_handshake(state, ofi_op_read_req);
}

/**
Expand Down
5 changes: 4 additions & 1 deletion prov/efa/test/efa_unit_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_enable_qp_in_order_aligned_128_bytes_bad, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_pkt_pool_flags, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_pkt_pool_page_alignment, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_atomic_error_handling, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_atomic_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_dc_send_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_read_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_write_queue_before_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_send_with_shm_no_copy, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_rma_without_caps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_ep_atomic_without_caps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
Expand Down
18 changes: 17 additions & 1 deletion prov/efa/test/efa_unit_tests.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ void test_efa_rdm_ep_getopt_undersized_optlen();
void test_efa_rdm_ep_getopt_oversized_optlen();
void test_efa_rdm_ep_pkt_pool_flags();
void test_efa_rdm_ep_pkt_pool_page_alignment();
void test_efa_rdm_ep_dc_atomic_error_handling();
void test_efa_rdm_ep_dc_atomic_queue_before_handshake();
void test_efa_rdm_ep_dc_send_queue_before_handshake();
void test_efa_rdm_ep_write_queue_before_handshake();
void test_efa_rdm_ep_read_queue_before_handshake();
void test_efa_rdm_ep_send_with_shm_no_copy();
void test_efa_rdm_ep_rma_without_caps();
void test_efa_rdm_ep_atomic_without_caps();
Expand Down Expand Up @@ -175,4 +178,17 @@ void test_efa_rdm_cq_ibv_cq_poll_list_same_tx_rx_cq_single_ep();
void test_efa_rdm_cq_ibv_cq_poll_list_separate_tx_rx_cq_single_ep();
void test_efa_rdm_cntr_ibv_cq_poll_list_same_tx_rx_cq_single_ep();
void test_efa_rdm_cntr_ibv_cq_poll_list_separate_tx_rx_cq_single_ep();

static inline
int efa_unit_test_get_dlist_length(struct dlist_entry *head)
{
int i = 0;
struct dlist_entry *item;

dlist_foreach(head, item) {
i++;
}

return i;
}
#endif

0 comments on commit e413982

Please sign in to comment.