diff --git a/prov/rxm/src/rxm.h b/prov/rxm/src/rxm.h index b422abcbfd8..9f75b703e97 100644 --- a/prov/rxm/src/rxm.h +++ b/prov/rxm/src/rxm.h @@ -366,14 +366,14 @@ struct rxm_atomic_resp_hdr { FUNC(RXM_RNDV_WRITE_DATA_WAIT), \ FUNC(RXM_RNDV_WRITE_DONE_WAIT), \ FUNC(RXM_RNDV_READ), \ - FUNC(RXM_RNDV_WRITE), \ + FUNC(RXM_RNDV_WRITE), /* not used */ \ FUNC(RXM_RNDV_READ_DONE_SENT), \ FUNC(RXM_RNDV_READ_DONE_RECVD), \ FUNC(RXM_RNDV_WRITE_DATA_SENT), \ - FUNC(RXM_RNDV_WRITE_DATA_RECVD),\ + FUNC(RXM_RNDV_WRITE_DATA_RECVD), /* not used */ \ FUNC(RXM_RNDV_WRITE_DONE_SENT), \ FUNC(RXM_RNDV_WRITE_DONE_RECVD),\ - FUNC(RXM_RNDV_FINISH), \ + FUNC(RXM_RNDV_FINISH), /* not needed */ \ FUNC(RXM_ATOMIC_RESP_WAIT), \ FUNC(RXM_ATOMIC_RESP_SENT) diff --git a/prov/rxm/src/rxm_conn.c b/prov/rxm/src/rxm_conn.c index 30dd5c9d7bd..6ac12c3762c 100644 --- a/prov/rxm/src/rxm_conn.c +++ b/prov/rxm/src/rxm_conn.c @@ -1109,9 +1109,13 @@ static int rxm_conn_handle_notify(struct fi_eq_entry *eq_entry) dlist_remove(&handle->peer->entry); free(handle->peer); handle->peer = NULL; - } else { + } + + if (handle->fi_addr != FI_ADDR_NOTAVAIL) { cmap->handles_av[handle->fi_addr] = NULL; + handle->fi_addr = FI_ADDR_NOTAVAIL; } + rxm_conn_free(handle); return 0; } diff --git a/prov/rxm/src/rxm_cq.c b/prov/rxm/src/rxm_cq.c index 4aad276d4d1..d9bb64ae21b 100644 --- a/prov/rxm/src/rxm_cq.c +++ b/prov/rxm/src/rxm_cq.c @@ -506,21 +506,21 @@ static ssize_t rxm_rndv_xfer(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep, ssize_t rxm_rndv_read(struct rxm_rx_buf *rx_buf) { ssize_t ret; - size_t total_len = - MIN(rx_buf->recv_entry->total_len, rx_buf->pkt.hdr.size); + size_t total_len; + total_len = MIN(rx_buf->recv_entry->total_len, rx_buf->pkt.hdr.size); RXM_UPDATE_STATE(FI_LOG_CQ, rx_buf, RXM_RNDV_READ); - ret = rxm_rndv_xfer(rx_buf->ep, rx_buf->conn->msg_ep, rx_buf->remote_rndv_hdr, + ret = rxm_rndv_xfer(rx_buf->ep, rx_buf->conn->msg_ep, + rx_buf->remote_rndv_hdr, rx_buf->recv_entry->rxm_iov.iov, rx_buf->recv_entry->rxm_iov.desc, rx_buf->recv_entry->rxm_iov.count, total_len, rx_buf); - - if (ret) + if (ret) { rxm_cq_write_error(rx_buf->ep->util_ep.rx_cq, - rx_buf->ep->util_ep.rx_cntr, - rx_buf, ret); + rx_buf->ep->util_ep.rx_cntr, rx_buf, ret); + } return ret; } @@ -549,6 +549,11 @@ static ssize_t rxm_rndv_handle_wr_data(struct rxm_rx_buf *rx_buf) } } + /* BUG: This is forcing a state change without knowing what state + * we're currently in. This loses whether we processed the completion + * for the original send request. Valid states here are + * RXM_RNDV_TX or RXM_RNDV_WRITE_DATA_WAIT. + */ RXM_UPDATE_STATE(FI_LOG_CQ, tx_buf, RXM_RNDV_WRITE); ret = rxm_rndv_xfer(rx_buf->ep, tx_buf->write_rndv.conn->msg_ep, rx_hdr, @@ -1350,7 +1355,7 @@ void rxm_finish_coll_eager_send(struct rxm_ep *rxm_ep, } else { rxm_finish_eager_send(rxm_ep, tx_eager_buf); } -}; +} ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp) { @@ -1366,6 +1371,7 @@ ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp) switch (RXM_GET_PROTO_STATE(comp->op_context)) { case RXM_TX: + case RXM_INJECT_TX: tx_buf = comp->op_context; rxm_ep->eager_ops->comp_tx(rxm_ep, tx_buf); rxm_free_rx_buf(rxm_ep, tx_buf); @@ -1375,9 +1381,6 @@ ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp) assert(comp->flags & FI_SEND); ofi_buf_free(tx_buf); return 0; - case RXM_INJECT_TX: - assert(0); - return 0; case RXM_RMA: tx_buf = comp->op_context; assert((comp->flags & (FI_WRITE | FI_RMA)) || @@ -1473,14 +1476,15 @@ ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp) assert(0); return 0; case RXM_ATOMIC_RESP_WAIT: - /* Optional atomic request completion; TX completion - * processing is performed when atomic response is received */ + /* BUG: need to wait for completion, even if a response has + * been received. + */ assert(comp->flags & FI_SEND); return 0; case RXM_ATOMIC_RESP_SENT: tx_buf = comp->op_context; assert(comp->flags & FI_SEND); - ofi_buf_free(tx_buf); + ofi_buf_free(tx_buf); /* BUG: should have consumed tx credit */ return 0; default: assert(0); @@ -1714,13 +1718,31 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) switch (RXM_GET_PROTO_STATE(err_entry.op_context)) { case RXM_TX: + case RXM_RNDV_TX: + case RXM_RNDV_WRITE_DONE_SENT: + case RXM_ATOMIC_RESP_WAIT: tx_buf = err_entry.op_context; err_entry.op_context = tx_buf->app_context; err_entry.flags = ofi_tx_cq_flags(tx_buf->pkt.hdr.op); rxm_free_rx_buf(rxm_ep, tx_buf); break; + case RXM_RNDV_READ_DONE_RECVD: + /* We received the response, so ignore the send error */ + rxm_rndv_tx_finish(rxm_ep, err_entry.op_context); + return; + case RXM_RNDV_WRITE_DONE_RECVD: + /* We received the response, so ignore the send error */ + rxm_rndv_rx_finish(err_entry.op_context); + return; case RXM_INJECT_TX: - assert(0); + rxm_free_rx_buf(rxm_ep, err_entry.op_context); + if (cntr) + rxm_cntr_incerr(cntr); + return; + case RXM_CREDIT_TX: + case RXM_ATOMIC_RESP_SENT: /* BUG: should have consumed tx credit */ + tx_buf = err_entry.op_context; + ofi_buf_free(tx_buf); return; case RXM_RMA: tx_buf = err_entry.op_context; @@ -1739,14 +1761,7 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) if (!rxm_complete_sar(rxm_ep, tx_buf)) return; break; - case RXM_CREDIT_TX: - tx_buf = err_entry.op_context; - err_entry.op_context = 0; - err_entry.flags = ofi_tx_cq_flags(tx_buf->pkt.hdr.op); - break; case RXM_RNDV_WRITE: - /* fall through */ - case RXM_RNDV_TX: tx_buf = err_entry.op_context; err_entry.op_context = tx_buf->app_context; err_entry.flags = ofi_tx_cq_flags(tx_buf->pkt.hdr.op); @@ -1767,7 +1782,7 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) } /* fall through */ case RXM_RNDV_READ_DONE_SENT: - case RXM_RNDV_WRITE_DATA_SENT: + case RXM_RNDV_WRITE_DATA_SENT: /* BUG: should fail initial send */ case RXM_RNDV_READ: rx_buf = (struct rxm_rx_buf *) err_entry.op_context; assert(rx_buf->recv_entry); diff --git a/prov/rxm/src/rxm_ep.c b/prov/rxm/src/rxm_ep.c index 0d6486bdac6..25d5ac89040 100644 --- a/prov/rxm/src/rxm_ep.c +++ b/prov/rxm/src/rxm_ep.c @@ -1049,7 +1049,6 @@ rxm_ep_rndv_tx_send(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, { ssize_t ret; - RXM_UPDATE_STATE(FI_LOG_EP_DATA, tx_buf, RXM_RNDV_TX); if (pkt_size <= rxm_ep->inject_limit) { if (rxm_ep->rndv_ops == &rxm_rndv_ops_write) RXM_UPDATE_STATE(FI_LOG_EP_DATA, tx_buf, @@ -1060,17 +1059,14 @@ rxm_ep_rndv_tx_send(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, ret = fi_inject(rxm_conn->msg_ep, &tx_buf->pkt, pkt_size, 0); } else { - tx_buf->hdr.state = RXM_RNDV_TX; - + RXM_UPDATE_STATE(FI_LOG_EP_DATA, tx_buf, RXM_RNDV_TX); ret = rxm_ep_msg_normal_send(rxm_conn, &tx_buf->pkt, pkt_size, tx_buf->hdr.desc, tx_buf); } - if (ret) { - if (ret == -FI_EAGAIN) - rxm_ep_do_progress(&rxm_ep->util_ep); + if (ret) goto err; - } + return FI_SUCCESS; err: @@ -1275,10 +1271,8 @@ rxm_ep_emulate_inject(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, if (!tx_buf) return -FI_EAGAIN; - tx_buf->hdr.state = RXM_TX; + tx_buf->hdr.state = RXM_INJECT_TX; tx_buf->pkt.ctrl_hdr.type = rxm_ctrl_eager; - /* avoid reporting bogus context in fi_cq_err_entry */ - tx_buf->app_context = NULL; tx_buf->flags = flags; rxm_ep_format_tx_buf_pkt(rxm_conn, len, op, data, tag, flags,