diff --git a/include/rdma/providers/fi_peer.h b/include/rdma/providers/fi_peer.h index 67a8f58f7ad..093290d1b4c 100644 --- a/include/rdma/providers/fi_peer.h +++ b/include/rdma/providers/fi_peer.h @@ -167,6 +167,7 @@ struct fi_peer_rx_entry { fi_addr_t addr; size_t size; uint64_t tag; + uint64_t cq_data; uint64_t flags; void *context; size_t count; @@ -181,7 +182,7 @@ struct fi_ops_srx_owner { int (*get_msg)(struct fid_peer_srx *srx, fi_addr_t addr, size_t size, struct fi_peer_rx_entry **entry); int (*get_tag)(struct fid_peer_srx *srx, fi_addr_t addr, - size_t size, uint64_t tag, struct fi_peer_rx_entry **entry); + uint64_t tag, struct fi_peer_rx_entry **entry); int (*queue_msg)(struct fi_peer_rx_entry *entry); int (*queue_tag)(struct fi_peer_rx_entry *entry); void (*foreach_unspec_addr)(struct fid_peer_srx *srx, diff --git a/man/fi_peer.3.md b/man/fi_peer.3.md index 42bf16b3399..bf84ba28d56 100644 --- a/man/fi_peer.3.md +++ b/man/fi_peer.3.md @@ -418,6 +418,7 @@ struct fi_peer_rx_entry { fi_addr_t addr; size_t size; uint64_t tag; + uint64_t cq_data; uint64_t flags; void *context; size_t count; @@ -482,10 +483,10 @@ the same lock, if needed. fi_peer_rx_entry defines a common receive entry for use between the owner and peer. The entry is allocated and set by the owner and passed between owner and peer to communicate details of the application-posted receive entry. All fields -are only modifiable by the owner, except for the peer_context which is provided -for the peer to use to save peer-specific information for unexpected message -processing. Similarly, the owner_context can be used by the owner_context as -needed for storing extra owner-specific information. +are initialized by the owner, except in the unexpected message case where the +peer can initialize any extra available data before queuing the message with +the owner. The peer_context and owner_context fields are only modifiable by the +peer and owner, respectively, to store extra provider-specific information. ## fi_ops_srx_owner::get_msg_entry() / get_tag_entry() @@ -493,21 +494,21 @@ These calls are invoked by the peer provider to obtain the receive buffer(s) where an incoming message should be placed. The peer provider will pass in the relevant fields to request a matching rx_entry from the owner. If source addressing is required, the addr will be passed in; otherwise, the address will -be set to FI_ADDR_NOT_AVAIL. The size field indicates the received message size. -For non-tagged message, this field is used by the owner when handling multi-received -data buffers, but may be ignored otherwise. For tagged message, this field is used -by the owner when handling trecvmsg with FI_PEEK bit set in flags, -which requires the owner to write the size of the unexpected tagged message as part -of the CQ entry. The peer provider is responsible for checking that an incoming -message fits within the provided buffer space. The tag parameter is used for tagged -messages. An fi_peer_rx_entry is allocated by the owner, whether or not a match was +be set to FI_ADDR_NOT_AVAIL. The size parameter is needed by the owner for +adjusting FI_MULTI_RECV entries. The peer provider is responsible for checking +that an incoming message fits within the provided buffer space. +An fi_peer_rx_entry is allocated by the owner, whether or not a match was found. If a match was found, the owner will return FI_SUCCESS and the rx_entry will -be filled in with the appropriate receive fields for the peer to process accordingly. +be filled in with the known receive fields for the peer to process accordingly. +This includes the information that was passed into the calls as well as the +rx_entry->flags with either FI_MSG | FI_RECV (for get_msg()) or FI_TAGGED | FI_RECV +(for get_tag()). The peer provider is responsible for completing with any other +flags, if needed. If no match was found, the owner will return -FI_ENOENT; the rx_entry will still be valid but will not match to an existing posted receive. When the peer gets FI_ENOENT, it should allocate whatever resources it needs to process the message later (on start_msg/tag) and set the rx_entry->peer_context appropriately, followed by a -call to the owner's queue_msg/tag. The get and queue messages should be serialized. +call to the owner's queue_msg/tag. The get and queue calls should be serialized. When the owner gets a matching receive for the queued unexpected message, it will call the peer's start function to notify the peer of the updated rx_entry (or the peer's discard function if the message is to be discarded) diff --git a/prov/efa/src/rdm/efa_rdm_msg.c b/prov/efa/src/rdm/efa_rdm_msg.c index 1558d59ccb7..413bbffd4a3 100644 --- a/prov/efa/src/rdm/efa_rdm_msg.c +++ b/prov/efa/src/rdm/efa_rdm_msg.c @@ -851,15 +851,12 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_tagrtm(struct efa_rdm_ep *ep, struct fid_peer_srx *peer_srx; struct fi_peer_rx_entry *peer_rxe; struct efa_rdm_ope *rxe; - size_t data_size; int ret; int pkt_type; peer_srx = util_get_peer_srx(ep->peer_srx_ep); - data_size = efa_rdm_pke_get_rtm_msg_length(*pkt_entry_ptr); ret = peer_srx->owner_ops->get_tag(peer_srx, (*pkt_entry_ptr)->addr, - data_size, efa_rdm_pke_get_rtm_tag(*pkt_entry_ptr), &peer_rxe); @@ -882,6 +879,14 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_tagrtm(struct efa_rdm_ep *ep, return NULL; } (*pkt_entry_ptr)->ope = rxe; + + peer_rxe->size = efa_rdm_pke_get_rtm_msg_length(*pkt_entry_ptr); + if (efa_rdm_pke_get_base_hdr(*pkt_entry_ptr)->flags & + EFA_RDM_REQ_OPT_CQ_DATA_HDR) { + peer_rxe->flags |= FI_REMOTE_CQ_DATA; + peer_rxe->cq_data = efa_rdm_pke_get_req_cq_data(*pkt_entry_ptr); + } + peer_rxe->peer_context = *pkt_entry_ptr; rxe->peer_rxe = peer_rxe; efa_rdm_tracepoint(msg_recv_unexpected_tagged, rxe->msg_id, diff --git a/prov/shm/src/smr_progress.c b/prov/shm/src/smr_progress.c index 89fc7ce14f3..837cde2f53d 100644 --- a/prov/shm/src/smr_progress.c +++ b/prov/shm/src/smr_progress.c @@ -911,6 +911,12 @@ static int smr_alloc_cmd_ctx(struct smr_ep *ep, memcpy(&cmd_ctx->cmd, cmd, sizeof(*cmd)); cmd_ctx->ep = ep; + rx_entry->size = cmd->msg.hdr.size; + if (cmd->msg.hdr.op_flags & SMR_REMOTE_CQ_DATA) { + rx_entry->flags |= FI_REMOTE_CQ_DATA; + rx_entry->cq_data = cmd->msg.hdr.data; + } + if (cmd->msg.hdr.op_src == smr_src_inject) { buf = ofi_buf_alloc(ep->unexp_buf_pool); if (!buf) { @@ -955,7 +961,7 @@ static int smr_progress_cmd_msg(struct smr_ep *ep, struct smr_cmd *cmd) addr = ep->region->map->peers[cmd->msg.hdr.id].fiaddr; if (cmd->msg.hdr.op == ofi_op_tagged) { ret = peer_srx->owner_ops->get_tag(peer_srx, addr, - cmd->msg.hdr.size, cmd->msg.hdr.tag, &rx_entry); + cmd->msg.hdr.tag, &rx_entry); if (ret == -FI_ENOENT) { ret = smr_alloc_cmd_ctx(ep, rx_entry, cmd); if (ret) { diff --git a/prov/sm2/src/sm2_progress.c b/prov/sm2/src/sm2_progress.c index 0bc5e3ca183..329182fea47 100644 --- a/prov/sm2/src/sm2_progress.c +++ b/prov/sm2/src/sm2_progress.c @@ -147,6 +147,10 @@ static int sm2_alloc_xfer_entry_ctx(struct sm2_ep *ep, memcpy(&xfer_ctx->xfer_entry, xfer_entry, sizeof(*xfer_entry)); xfer_ctx->ep = ep; + rx_entry->size = xfer_entry->hdr.size; + rx_entry->flags |= xfer_entry->hdr.op_flags & FI_REMOTE_CQ_DATA; + rx_entry->cq_data = xfer_entry->hdr.cq_data; + rx_entry->peer_context = xfer_ctx; return FI_SUCCESS; @@ -166,8 +170,7 @@ static int sm2_progress_recv_msg(struct sm2_ep *ep, if (xfer_entry->hdr.op == ofi_op_tagged) { ret = peer_srx->owner_ops->get_tag( - peer_srx, addr, xfer_entry->hdr.size, - xfer_entry->hdr.tag, &rx_entry); + peer_srx, addr, xfer_entry->hdr.tag, &rx_entry); if (ret == -FI_ENOENT) { ret = sm2_alloc_xfer_entry_ctx(ep, rx_entry, xfer_entry); diff --git a/prov/util/src/util_srx.c b/prov/util/src/util_srx.c index 31317332773..4a9ab4057cb 100644 --- a/prov/util/src/util_srx.c +++ b/prov/util/src/util_srx.c @@ -94,7 +94,8 @@ static struct util_rx_entry *util_get_recv_entry(struct util_srx_ctx *srx, } static struct util_rx_entry *util_init_unexp(struct util_srx_ctx *srx, - fi_addr_t addr, uint64_t size, uint64_t tag) + fi_addr_t addr, uint64_t size, uint64_t tag, + uint64_t flags) { struct util_rx_entry *util_entry; @@ -106,6 +107,7 @@ static struct util_rx_entry *util_init_unexp(struct util_srx_ctx *srx, util_entry->peer_entry.size = size; util_entry->peer_entry.addr = addr; util_entry->peer_entry.tag = tag; + util_entry->peer_entry.flags = flags; return util_entry; } @@ -163,7 +165,8 @@ static int util_match_msg(struct fid_peer_srx *srx, fi_addr_t addr, size_t size, srx_ctx = srx->ep_fid.fid.context; if (slist_empty(&srx_ctx->msg_queue)) { - util_entry = util_init_unexp(srx_ctx, addr, size, 0); + util_entry = util_init_unexp(srx_ctx, addr, size, 0, + FI_MSG | FI_RECV); if (!util_entry) return -FI_ENOMEM; util_entry->peer_entry.srx = srx; @@ -236,8 +239,7 @@ static int util_get_msg(struct fid_peer_srx *srx, fi_addr_t addr, } static int util_match_tag(struct fid_peer_srx *srx, fi_addr_t addr, - size_t size, uint64_t tag, - struct fi_peer_rx_entry **rx_entry) + uint64_t tag, struct fi_peer_rx_entry **rx_entry) { struct util_srx_ctx *srx_ctx; struct util_rx_entry *util_entry; @@ -257,7 +259,7 @@ static int util_match_tag(struct fid_peer_srx *srx, fi_addr_t addr, } } - util_entry = util_init_unexp(srx_ctx, addr, size, tag); + util_entry = util_init_unexp(srx_ctx, addr, 0, tag, FI_TAGGED | FI_RECV); if (!util_entry) return -FI_ENOMEM; ret = -FI_ENOENT; @@ -268,8 +270,7 @@ static int util_match_tag(struct fid_peer_srx *srx, fi_addr_t addr, } static int util_get_tag(struct fid_peer_srx *srx, fi_addr_t addr, - size_t size, uint64_t tag, - struct fi_peer_rx_entry **rx_entry) + uint64_t tag, struct fi_peer_rx_entry **rx_entry) { struct util_srx_ctx *srx_ctx; struct slist *queue; @@ -285,7 +286,7 @@ static int util_get_tag(struct fid_peer_srx *srx, fi_addr_t addr, ofi_array_at(&srx_ctx->src_trecv_queues, addr); if (!queue || slist_empty(queue)) - return util_match_tag(srx, addr, size, tag, rx_entry); + return util_match_tag(srx, addr, tag, rx_entry); slist_foreach(queue, item, prev) { util_entry = container_of(item, struct util_rx_entry, @@ -294,7 +295,7 @@ static int util_get_tag(struct fid_peer_srx *srx, fi_addr_t addr, util_entry->ignore, tag)) goto check_any; } - return util_match_tag(srx, addr, size, tag, rx_entry); + return util_match_tag(srx, addr, tag, rx_entry); check_any: slist_foreach(&srx_ctx->tag_queue, any_item, any_prev) { @@ -635,8 +636,9 @@ static ssize_t util_srx_peek(struct util_srx_ctx *srx, const struct iovec *iov, ((struct fi_context *)context)->internal[0] = rx_entry; } - return ofi_cq_write(srx->cq, context, FI_TAGGED | FI_RECV, - rx_entry->peer_entry.size, NULL, 0, + return ofi_cq_write(srx->cq, context, rx_entry->peer_entry.flags, + rx_entry->peer_entry.size, NULL, + rx_entry->peer_entry.cq_data, rx_entry->peer_entry.tag); }