Skip to content

Commit

Permalink
don't progress the same progress multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
ooststep committed Mar 18, 2024
1 parent 6253bf3 commit 3a4a016
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 21 deletions.
7 changes: 7 additions & 0 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ struct xnet_progress {
struct dlist_entry unexp_msg_list;
struct dlist_entry unexp_tag_list;
struct dlist_entry saved_tag_list;

struct util_cq *tx_cq;
struct util_cq *rx_cq;

struct fd_signal signal;

struct slist event_list;
Expand Down Expand Up @@ -383,6 +387,7 @@ void xnet_close_progress(struct xnet_progress *progress);
int xnet_start_progress(struct xnet_progress *progress);
void xnet_stop_progress(struct xnet_progress *progress);
int xnet_start_recv(struct xnet_ep *ep, struct xnet_xfer_entry *rx_entry);
int xnet_progress_add_cq_fd(struct xnet_progress *progress, struct util_cq *cq);

void xnet_progress(struct xnet_progress *progress, bool clear_signal);
void xnet_run_progress(struct xnet_progress *progress, bool clear_signal);
Expand Down Expand Up @@ -484,6 +489,8 @@ struct xnet_domain {

struct xnet_cq {
struct util_cq util_cq;
struct dlist_entry progress_list;
struct ofi_genlock prog_list_lock;
};

int xnet_cq_wait_try_func(void *arg);
Expand Down
27 changes: 25 additions & 2 deletions prov/tcp/src/xnet_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,23 @@ int xnet_cq_wait_try_func(void *arg)
return FI_SUCCESS;
}

void xnet_cq_progress(struct util_cq *util_cq)
{
struct xnet_cq *cq;
struct xnet_progress *progress;
struct fid_list_entry *fid_entry;
struct dlist_entry *item, *tmp;

cq = container_of(util_cq, struct xnet_cq, util_cq);
ofi_genlock_lock(&cq->prog_list_lock);
dlist_foreach_safe(&cq->progress_list, item, tmp) {
fid_entry = container_of(item, struct fid_list_entry, entry);
progress = container_of(fid_entry->fid, struct xnet_progress, fid);
xnet_progress(progress, false);
}
ofi_genlock_unlock(&cq->prog_list_lock);
}

int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq_fid, void *context)
{
Expand All @@ -254,15 +271,21 @@ int xnet_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
}

ret = ofi_cq_init(&xnet_prov, domain, attr, &cq->util_cq,
&ofi_cq_progress, context);
&xnet_cq_progress, context);
if (ret)
goto free_cq;

dlist_init(&cq->progress_list);
ret = ofi_genlock_init(&cq->prog_list_lock, OFI_LOCK_MUTEX);
if (ret)
goto cleanup_cq;

*cq_fid = &cq->util_cq.cq_fid;
(*cq_fid)->fid.ops = &xnet_cq_fi_ops;
(*cq_fid)->ops = &xnet_cq_ops;
return 0;

cleanup_cq:
ofi_cq_cleanup(&cq->util_cq);
free_cq:
free(cq);
return ret;
Expand Down
32 changes: 14 additions & 18 deletions prov/tcp/src/xnet_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -621,17 +621,6 @@ static int xnet_ep_close(struct fid *fid)
return 0;
}

static int xnet_ep_add_cq_fd(struct xnet_ep *ep, struct util_cq *cq)
{
if (cq->wait && ofi_have_epoll)
return ofi_wait_add_fd(cq->wait,
ofi_dynpoll_get_fd(&ep->progress->epoll_fd),
POLLIN, xnet_cq_wait_try_func, cq,
&cq->cq_fid);

return FI_SUCCESS;
}

static int xnet_ep_add_cntr_fd(struct xnet_ep *ep, struct util_cntr *cntr)
{
if (cntr->wait && cntr->wait->wait_obj == FI_WAIT_FD && ofi_have_epoll)
Expand Down Expand Up @@ -659,13 +648,13 @@ static int xnet_ep_ctrl(struct fid *fid, int command, void *arg)
return -FI_ENOCQ;
}

domain = container_of(ep->util_ep.domain, struct xnet_domain,
util_domain);
if (!ep->srx) {
ep->progress = calloc(sizeof(*ep->progress), 1);
if (!ep->progress)
return -FI_ENOMEM;

domain = container_of(ep->util_ep.domain, struct xnet_domain,
util_domain);
ret = xnet_init_progress(ep->progress, domain);
if (ret) {
free(ep->progress);
Expand All @@ -674,14 +663,21 @@ static int xnet_ep_ctrl(struct fid *fid, int command, void *arg)
} else {
ep->progress = &ep->srx->progress;
}
ret = xnet_ep_add_cq_fd(ep, ep->util_ep.tx_cq);
if (ret)
goto free_progress;

if (ep->util_ep.tx_cq != ep->util_ep.rx_cq) {
ret = xnet_ep_add_cq_fd(ep, ep->util_ep.rx_cq);
if (domain->ep_type == FI_EP_MSG) {
ret = xnet_progress_add_cq_fd(ep->progress,
ep->util_ep.tx_cq);
if (ret)
goto free_progress;
ep->progress->tx_cq = ep->util_ep.tx_cq;

if (ep->util_ep.tx_cq != ep->util_ep.rx_cq) {
ret = xnet_progress_add_cq_fd(ep->progress,
ep->util_ep.rx_cq);
if (ret)
goto free_progress;
ep->progress->rx_cq = ep->util_ep.rx_cq;
}
}

for (i = 0; i < CNTR_CNT; i++) {
Expand Down
37 changes: 37 additions & 0 deletions prov/tcp/src/xnet_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,35 @@ static void xnet_destroy_uring(struct xnet_uring *uring,
}
}

int xnet_progress_del_cq_fd(struct xnet_progress *progress, struct util_cq *cq)
{
struct xnet_cq *xnet_cq = container_of(cq, struct xnet_cq, util_cq.cq_fid);

fid_list_remove2(&xnet_cq->progress_list, &xnet_cq->prog_list_lock,
&progress->fid);

if (cq->wait && ofi_have_epoll)
return ofi_wait_del_fd(cq->wait,
ofi_dynpoll_get_fd(&progress->epoll_fd));

return FI_SUCCESS;
}

int xnet_progress_add_cq_fd(struct xnet_progress *progress, struct util_cq *cq)
{
struct xnet_cq *xnet_cq = container_of(cq, struct xnet_cq, util_cq.cq_fid);
fid_list_insert2(&xnet_cq->progress_list, &xnet_cq->prog_list_lock,
&progress->fid);

if (cq->wait && ofi_have_epoll)
return ofi_wait_add_fd(cq->wait,
ofi_dynpoll_get_fd(&progress->epoll_fd),
POLLIN, xnet_cq_wait_try_func, cq,
&cq->cq_fid);

return FI_SUCCESS;
}

int xnet_init_progress(struct xnet_progress *progress, struct xnet_domain *domain)
{
int ret;
Expand Down Expand Up @@ -1907,12 +1936,20 @@ void xnet_close_progress(struct xnet_progress *progress)
assert(dlist_empty(&progress->unexp_tag_list));
assert(dlist_empty(&progress->saved_tag_list));
assert(slist_empty(&progress->event_list));

xnet_stop_progress(progress);
if (xnet_io_uring) {
free(progress->cqes);
xnet_destroy_uring(&progress->rx_uring, &progress->epoll_fd);
xnet_destroy_uring(&progress->tx_uring, &progress->epoll_fd);
}

if (progress->tx_cq)
xnet_progress_del_cq_fd(progress, progress->tx_cq);

if (progress->rx_cq)
xnet_progress_del_cq_fd(progress, progress->rx_cq);

ofi_dynpoll_close(&progress->epoll_fd);
ofi_bufpool_destroy(progress->xfer_pool);
ofi_genlock_destroy(&progress->ep_lock);
Expand Down
14 changes: 14 additions & 0 deletions prov/tcp/src/xnet_rdm.c
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,20 @@ static int xnet_enable_rdm(struct xnet_rdm *rdm)
size_t len;
int ret;

// insert rdm progress to the cq progress list
ret = xnet_progress_add_cq_fd(&rdm->srx->progress, rdm->util_ep.tx_cq);
if (ret)
return ret;
rdm->srx->progress.tx_cq = rdm->util_ep.tx_cq;

if (rdm->util_ep.tx_cq != rdm->util_ep.rx_cq) {
ret = xnet_progress_add_cq_fd(&rdm->srx->progress,
rdm->util_ep.rx_cq);
if (ret)
return ret;
rdm->srx->progress.rx_cq = rdm->util_ep.rx_cq;
}

(void) fi_ep_bind(&rdm->srx->rx_fid, &rdm->util_ep.rx_cq->cq_fid.fid,
FI_RECV);
if (rdm->util_ep.cntrs[CNTR_RX]) {
Expand Down
1 change: 0 additions & 1 deletion prov/util/src/util_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ void ofi_cq_progress(struct util_cq *cq)
fid_entry = container_of(item, struct fid_list_entry, entry);
ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
ep->progress(ep);

}
ofi_genlock_unlock(&cq->ep_list_lock);
}
Expand Down

0 comments on commit 3a4a016

Please sign in to comment.