Skip to content

Commit

Permalink
include/fi_peer: add cq_data to rx_entry, allow peer to modify on unexp
Browse files Browse the repository at this point in the history
Switch the peer srx API to allow modification of the rx_entry on the
unexpected path before queuing it with the owner. The peer is expected
to fill in any available fields that may be returned with a subsequent
FI_PEEK operation which reports back the size, CQ data, and flags of
the operation.

This also updates the efa, shm, and sm2 providers to match the new definition.

Signed-off-by: Alexia Ingerson <alexia.ingerson@intel.com>
  • Loading branch information
aingerson committed Aug 22, 2023
1 parent f4d7415 commit 8de4259
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 32 deletions.
3 changes: 2 additions & 1 deletion include/rdma/providers/fi_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ struct fi_peer_rx_entry {
fi_addr_t addr;
size_t size;
uint64_t tag;
uint64_t cq_data;
uint64_t flags;
void *context;
size_t count;
Expand All @@ -181,7 +182,7 @@ struct fi_ops_srx_owner {
int (*get_msg)(struct fid_peer_srx *srx, fi_addr_t addr,
size_t size, struct fi_peer_rx_entry **entry);
int (*get_tag)(struct fid_peer_srx *srx, fi_addr_t addr,
size_t size, uint64_t tag, struct fi_peer_rx_entry **entry);
uint64_t tag, struct fi_peer_rx_entry **entry);
int (*queue_msg)(struct fi_peer_rx_entry *entry);
int (*queue_tag)(struct fi_peer_rx_entry *entry);
void (*foreach_unspec_addr)(struct fid_peer_srx *srx,
Expand Down
29 changes: 15 additions & 14 deletions man/fi_peer.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ struct fi_peer_rx_entry {
fi_addr_t addr;
size_t size;
uint64_t tag;
uint64_t cq_data;
uint64_t flags;
void *context;
size_t count;
Expand Down Expand Up @@ -482,32 +483,32 @@ the same lock, if needed.
fi_peer_rx_entry defines a common receive entry for use between the owner and
peer. The entry is allocated and set by the owner and passed between owner and
peer to communicate details of the application-posted receive entry. All fields
are only modifiable by the owner, except for the peer_context which is provided
for the peer to use to save peer-specific information for unexpected message
processing. Similarly, the owner_context can be used by the owner_context as
needed for storing extra owner-specific information.
are initialized by the owner, except in the unexpected message case where the
peer can initialize any extra available data before queuing the message with
the owner. The peer_context and owner_context fields are only modifiable by the
peer and owner, respectively, to store extra provider-specific information.

## fi_ops_srx_owner::get_msg_entry() / get_tag_entry()

These calls are invoked by the peer provider to obtain the receive buffer(s)
where an incoming message should be placed. The peer provider will pass in
the relevant fields to request a matching rx_entry from the owner. If source
addressing is required, the addr will be passed in; otherwise, the address will
be set to FI_ADDR_NOT_AVAIL. The size field indicates the received message size.
For non-tagged message, this field is used by the owner when handling multi-received
data buffers, but may be ignored otherwise. For tagged message, this field is used
by the owner when handling trecvmsg with FI_PEEK bit set in flags,
which requires the owner to write the size of the unexpected tagged message as part
of the CQ entry. The peer provider is responsible for checking that an incoming
message fits within the provided buffer space. The tag parameter is used for tagged
messages. An fi_peer_rx_entry is allocated by the owner, whether or not a match was
be set to FI_ADDR_NOT_AVAIL. The size parameter is needed by the owner for
adjusting FI_MULTI_RECV entries. The peer provider is responsible for checking
that an incoming message fits within the provided buffer space.
An fi_peer_rx_entry is allocated by the owner, whether or not a match was
found. If a match was found, the owner will return FI_SUCCESS and the rx_entry will
be filled in with the appropriate receive fields for the peer to process accordingly.
be filled in with the known receive fields for the peer to process accordingly.
This includes the information that was passed into the calls as well as the
rx_entry->flags with either FI_MSG | FI_RECV (for get_msg()) or FI_TAGGED | FI_RECV
(for get_tag()). The peer provider is responsible for completing with any other
flags, if needed.
If no match was found, the owner will return -FI_ENOENT; the rx_entry will still be
valid but will not match to an existing posted receive. When the peer gets FI_ENOENT,
it should allocate whatever resources it needs to process the message later
(on start_msg/tag) and set the rx_entry->peer_context appropriately, followed by a
call to the owner's queue_msg/tag. The get and queue messages should be serialized.
call to the owner's queue_msg/tag. The get and queue calls should be serialized.
When the owner gets a matching receive for the queued unexpected message, it will
call the peer's start function to notify the peer of the updated rx_entry (or the
peer's discard function if the message is to be discarded)
Expand Down
11 changes: 8 additions & 3 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -851,15 +851,12 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_tagrtm(struct efa_rdm_ep *ep,
struct fid_peer_srx *peer_srx;
struct fi_peer_rx_entry *peer_rxe;
struct efa_rdm_ope *rxe;
size_t data_size;
int ret;
int pkt_type;

peer_srx = util_get_peer_srx(ep->peer_srx_ep);
data_size = efa_rdm_pke_get_rtm_msg_length(*pkt_entry_ptr);

ret = peer_srx->owner_ops->get_tag(peer_srx, (*pkt_entry_ptr)->addr,
data_size,
efa_rdm_pke_get_rtm_tag(*pkt_entry_ptr),
&peer_rxe);

Expand All @@ -882,6 +879,14 @@ struct efa_rdm_ope *efa_rdm_msg_alloc_rxe_for_tagrtm(struct efa_rdm_ep *ep,
return NULL;
}
(*pkt_entry_ptr)->ope = rxe;

peer_rxe->size = efa_rdm_pke_get_rtm_msg_length(*pkt_entry_ptr);
if (efa_rdm_pke_get_base_hdr(*pkt_entry_ptr)->flags &
EFA_RDM_REQ_OPT_CQ_DATA_HDR) {
peer_rxe->flags |= FI_REMOTE_CQ_DATA;
peer_rxe->cq_data = efa_rdm_pke_get_req_cq_data(*pkt_entry_ptr);
}

peer_rxe->peer_context = *pkt_entry_ptr;
rxe->peer_rxe = peer_rxe;
efa_rdm_tracepoint(msg_recv_unexpected_tagged, rxe->msg_id,
Expand Down
8 changes: 7 additions & 1 deletion prov/shm/src/smr_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,12 @@ static int smr_alloc_cmd_ctx(struct smr_ep *ep,
memcpy(&cmd_ctx->cmd, cmd, sizeof(*cmd));
cmd_ctx->ep = ep;

rx_entry->size = cmd->msg.hdr.size;
if (cmd->msg.hdr.op_flags & SMR_REMOTE_CQ_DATA) {
rx_entry->flags |= FI_REMOTE_CQ_DATA;
rx_entry->cq_data = cmd->msg.hdr.data;
}

if (cmd->msg.hdr.op_src == smr_src_inject) {
buf = ofi_buf_alloc(ep->unexp_buf_pool);
if (!buf) {
Expand Down Expand Up @@ -955,7 +961,7 @@ static int smr_progress_cmd_msg(struct smr_ep *ep, struct smr_cmd *cmd)
addr = ep->region->map->peers[cmd->msg.hdr.id].fiaddr;
if (cmd->msg.hdr.op == ofi_op_tagged) {
ret = peer_srx->owner_ops->get_tag(peer_srx, addr,
cmd->msg.hdr.size, cmd->msg.hdr.tag, &rx_entry);
cmd->msg.hdr.tag, &rx_entry);
if (ret == -FI_ENOENT) {
ret = smr_alloc_cmd_ctx(ep, rx_entry, cmd);
if (ret) {
Expand Down
7 changes: 5 additions & 2 deletions prov/sm2/src/sm2_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ static int sm2_alloc_xfer_entry_ctx(struct sm2_ep *ep,
memcpy(&xfer_ctx->xfer_entry, xfer_entry, sizeof(*xfer_entry));
xfer_ctx->ep = ep;

rx_entry->size = xfer_entry->hdr.size;
rx_entry->flags |= xfer_entry->hdr.op_flags & FI_REMOTE_CQ_DATA;
rx_entry->cq_data = xfer_entry->hdr.cq_data;

rx_entry->peer_context = xfer_ctx;

return FI_SUCCESS;
Expand All @@ -166,8 +170,7 @@ static int sm2_progress_recv_msg(struct sm2_ep *ep,

if (xfer_entry->hdr.op == ofi_op_tagged) {
ret = peer_srx->owner_ops->get_tag(
peer_srx, addr, xfer_entry->hdr.size,
xfer_entry->hdr.tag, &rx_entry);
peer_srx, addr, xfer_entry->hdr.tag, &rx_entry);
if (ret == -FI_ENOENT) {
ret = sm2_alloc_xfer_entry_ctx(ep, rx_entry,
xfer_entry);
Expand Down
24 changes: 13 additions & 11 deletions prov/util/src/util_srx.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ static struct util_rx_entry *util_get_recv_entry(struct util_srx_ctx *srx,
}

static struct util_rx_entry *util_init_unexp(struct util_srx_ctx *srx,
fi_addr_t addr, uint64_t size, uint64_t tag)
fi_addr_t addr, uint64_t size, uint64_t tag,
uint64_t flags)
{
struct util_rx_entry *util_entry;

Expand All @@ -106,6 +107,7 @@ static struct util_rx_entry *util_init_unexp(struct util_srx_ctx *srx,
util_entry->peer_entry.size = size;
util_entry->peer_entry.addr = addr;
util_entry->peer_entry.tag = tag;
util_entry->peer_entry.flags = flags;

return util_entry;
}
Expand Down Expand Up @@ -163,7 +165,8 @@ static int util_match_msg(struct fid_peer_srx *srx, fi_addr_t addr, size_t size,

srx_ctx = srx->ep_fid.fid.context;
if (slist_empty(&srx_ctx->msg_queue)) {
util_entry = util_init_unexp(srx_ctx, addr, size, 0);
util_entry = util_init_unexp(srx_ctx, addr, size, 0,
FI_MSG | FI_RECV);
if (!util_entry)
return -FI_ENOMEM;
util_entry->peer_entry.srx = srx;
Expand Down Expand Up @@ -236,8 +239,7 @@ static int util_get_msg(struct fid_peer_srx *srx, fi_addr_t addr,
}

static int util_match_tag(struct fid_peer_srx *srx, fi_addr_t addr,
size_t size, uint64_t tag,
struct fi_peer_rx_entry **rx_entry)
uint64_t tag, struct fi_peer_rx_entry **rx_entry)
{
struct util_srx_ctx *srx_ctx;
struct util_rx_entry *util_entry;
Expand All @@ -257,7 +259,7 @@ static int util_match_tag(struct fid_peer_srx *srx, fi_addr_t addr,
}
}

util_entry = util_init_unexp(srx_ctx, addr, size, tag);
util_entry = util_init_unexp(srx_ctx, addr, 0, tag, FI_TAGGED | FI_RECV);
if (!util_entry)
return -FI_ENOMEM;
ret = -FI_ENOENT;
Expand All @@ -268,8 +270,7 @@ static int util_match_tag(struct fid_peer_srx *srx, fi_addr_t addr,
}

static int util_get_tag(struct fid_peer_srx *srx, fi_addr_t addr,
size_t size, uint64_t tag,
struct fi_peer_rx_entry **rx_entry)
uint64_t tag, struct fi_peer_rx_entry **rx_entry)
{
struct util_srx_ctx *srx_ctx;
struct slist *queue;
Expand All @@ -285,7 +286,7 @@ static int util_get_tag(struct fid_peer_srx *srx, fi_addr_t addr,
ofi_array_at(&srx_ctx->src_trecv_queues, addr);

if (!queue || slist_empty(queue))
return util_match_tag(srx, addr, size, tag, rx_entry);
return util_match_tag(srx, addr, tag, rx_entry);

slist_foreach(queue, item, prev) {
util_entry = container_of(item, struct util_rx_entry,
Expand All @@ -294,7 +295,7 @@ static int util_get_tag(struct fid_peer_srx *srx, fi_addr_t addr,
util_entry->ignore, tag))
goto check_any;
}
return util_match_tag(srx, addr, size, tag, rx_entry);
return util_match_tag(srx, addr, tag, rx_entry);

check_any:
slist_foreach(&srx_ctx->tag_queue, any_item, any_prev) {
Expand Down Expand Up @@ -635,8 +636,9 @@ static ssize_t util_srx_peek(struct util_srx_ctx *srx, const struct iovec *iov,
((struct fi_context *)context)->internal[0] = rx_entry;
}

return ofi_cq_write(srx->cq, context, FI_TAGGED | FI_RECV,
rx_entry->peer_entry.size, NULL, 0,
return ofi_cq_write(srx->cq, context, rx_entry->peer_entry.flags,
rx_entry->peer_entry.size, NULL,
rx_entry->peer_entry.cq_data,
rx_entry->peer_entry.tag);
}

Expand Down

0 comments on commit 8de4259

Please sign in to comment.