Skip to content

Commit

Permalink
prov/efa: Make the inflight read msg per domain
Browse files Browse the repository at this point in the history
Make the inflight read msg counter per domain rather than per peer.

This counter is used to prevent using runting read when EFA is busy
with a read, since in that case runting read would be less performant
than a read.

Since any ongoing read, regardless of peer, makes EFA busy the counter
should be domain scoped and not peer scoped.

Signed-off-by: Yonatan Goldhirsh <ygold@amazon.com>
Signed-off-by: Shi Jin <sjina@amazon.com>w
  • Loading branch information
ygoldamzn authored and shijin-aws committed Mar 20, 2024
1 parent 17e1fd0 commit ba53be8
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 21 deletions.
1 change: 1 addition & 0 deletions prov/efa/src/efa_domain.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ static int efa_domain_init_rdm(struct efa_domain *efa_domain, struct fi_info *in
efa_domain->addrlen = (info->src_addr) ? info->src_addrlen : info->dest_addrlen;
efa_domain->rdm_cq_size = MAX(info->rx_attr->size + info->tx_attr->size,
efa_env.cq_size);
efa_domain->num_read_msg_in_flight = 0;
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/efa_domain.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct efa_domain {
size_t rdm_cq_size;
struct dlist_entry list_entry; /* linked to g_efa_domain_list */
struct ofi_genlock srx_lock; /* shared among peer providers */
uint64_t num_read_msg_in_flight;
};

extern struct dlist_entry g_efa_domain_list;
Expand Down
8 changes: 6 additions & 2 deletions prov/efa/src/rdm/efa_rdm_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ void efa_rdm_peer_construct(struct efa_rdm_peer *peer, struct efa_rdm_ep *ep, st
peer->efa_fiaddr = conn->fi_addr;
peer->is_self = efa_is_same_addr(&ep->base_ep.src_addr, conn->ep_addr);
peer->host_id = peer->is_self ? ep->host_id : 0; /* Peer host id is exchanged via handshake */
peer->num_read_msg_in_flight = 0;
peer->num_runt_bytes_in_flight = 0;
ofi_recvwin_buf_alloc(&peer->robuf, efa_env.recvwin_size);
dlist_init(&peer->outstanding_tx_pkts);
Expand Down Expand Up @@ -270,9 +269,14 @@ int efa_rdm_peer_select_readbase_rtm(struct efa_rdm_peer *peer,
struct efa_rdm_ep *ep, struct efa_rdm_ope *ope)
{
int op = ope->op;
struct efa_domain *domain;

assert(op == ofi_op_tagged || op == ofi_op_msg);
if (peer->num_read_msg_in_flight == 0 &&

domain = efa_rdm_ep_domain(ep);
assert(domain);

if (domain->num_read_msg_in_flight == 0 &&
efa_rdm_peer_get_runt_size(peer, ep, ope) > 0 &&
!(ope->fi_flags & FI_DELIVERY_COMPLETE)) {
return (op == ofi_op_tagged) ? EFA_RDM_RUNTREAD_TAGRTM_PKT
Expand Down
5 changes: 0 additions & 5 deletions prov/efa/src/rdm/efa_rdm_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ struct efa_rdm_peer {
* @details this value is capped by efa_env.efa_runt_size
*/
int64_t num_runt_bytes_in_flight;

/**
* @brief number of messages that are using read based protocol
*/
int64_t num_read_msg_in_flight;
};

/**
Expand Down
16 changes: 8 additions & 8 deletions prov/efa/src/rdm/efa_rdm_pke_nonreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,11 @@ void efa_rdm_pke_handle_eor_recv(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_eor_hdr *eor_hdr;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
struct efa_domain *domain;

peer = efa_rdm_ep_get_peer(pkt_entry->ep, pkt_entry->addr);
assert(peer);
peer->num_read_msg_in_flight -= 1;
domain = efa_rdm_ep_domain(pkt_entry->ep);
assert(domain);
domain->num_read_msg_in_flight -= 1;

eor_hdr = (struct efa_rdm_eor_hdr *)pkt_entry->wiredata;

Expand All @@ -674,11 +674,11 @@ void efa_rdm_pke_handle_read_nack_recv(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_read_nack_hdr *nack_hdr;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
struct efa_domain *domain;

peer = efa_rdm_ep_get_peer(pkt_entry->ep, pkt_entry->addr);
assert(peer);
peer->num_read_msg_in_flight -= 1;
domain = efa_rdm_ep_domain(pkt_entry->ep);
assert(domain);
domain->num_read_msg_in_flight -= 1;

nack_hdr = (struct efa_rdm_read_nack_hdr *) pkt_entry->wiredata;

Expand Down
16 changes: 10 additions & 6 deletions prov/efa/src/rdm/efa_rdm_pke_rtm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,11 +1172,11 @@ ssize_t efa_rdm_pke_init_longread_tagrtm(struct efa_rdm_pke *pkt_entry,
*/
void efa_rdm_pke_handle_longread_rtm_sent(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_peer *peer;
struct efa_domain *domain;

peer = efa_rdm_ep_get_peer(pkt_entry->ep, pkt_entry->addr);
assert(peer);
peer->num_read_msg_in_flight += 1;
domain = efa_rdm_ep_domain(pkt_entry->ep);
assert(domain);
domain->num_read_msg_in_flight += 1;
}

/**
Expand Down Expand Up @@ -1344,20 +1344,24 @@ void efa_rdm_pke_handle_runtread_rtm_sent(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_ep *ep;
struct efa_rdm_ope *txe;
struct efa_domain *domain;
struct efa_rdm_peer *peer;
size_t pkt_data_size = pkt_entry->payload_size;

ep = pkt_entry->ep;
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
domain = efa_rdm_ep_domain(pkt_entry->ep);
assert(domain);

txe = pkt_entry->ope;
txe->bytes_sent += pkt_data_size;
peer->num_runt_bytes_in_flight += pkt_data_size;

if (efa_rdm_pke_get_runtread_rtm_base_hdr(pkt_entry)->seg_offset == 0 &&
txe->total_len > txe->bytes_runt)
peer->num_read_msg_in_flight += 1;
txe->total_len > txe->bytes_runt) {
domain->num_read_msg_in_flight += 1;
}
}

/**
Expand Down

0 comments on commit ba53be8

Please sign in to comment.