Skip to content

Commit

Permalink
Merge pull request ofiwg#3099 from a-ilango/rxm
Browse files Browse the repository at this point in the history
prov/rxm: Add fi_cancel support
  • Loading branch information
arun ilango authored Jun 26, 2017
2 parents 6ffd1cc + 484ecfb commit ce4c116
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 35 deletions.
2 changes: 2 additions & 0 deletions include/fi_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ ssize_t ofi_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
ssize_t ofi_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
fi_addr_t *src_addr, const void *cond, int timeout);
int ofi_cq_signal(struct fid_cq *cq_fid);
int ofi_cq_write_error(struct util_cq *cq,
const struct fi_cq_err_entry *err_entry);

/*
* Counter
Expand Down
6 changes: 6 additions & 0 deletions prov/rxm/src/rxm.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,13 @@ struct rxm_send_queue {
fastlock_t lock;
};

enum rxm_recv_queue_type {
RXM_RECV_QUEUE_MSG,
RXM_RECV_QUEUE_TAGGED,
};

struct rxm_recv_queue {
enum rxm_recv_queue_type type;
struct rxm_recv_fs *fs;
struct dlist_entry recv_list;
struct dlist_entry unexp_msg_list;
Expand Down
26 changes: 1 addition & 25 deletions prov/rxm/src/rxm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,6 @@ static const char *rxm_cq_strerror(struct fid_cq *cq_fid, int prov_errno,
return fi_cq_strerror(rxm_ep->msg_cq, prov_errno, err_data, buf, len);
}

static int rxm_cq_report_error(struct util_cq *util_cq, struct fi_cq_err_entry *err_entry)
{
struct util_cq_err_entry *entry;
struct fi_cq_tagged_entry *comp;

entry = calloc(1, sizeof(*entry));
if (!entry) {
FI_WARN(&rxm_prov, FI_LOG_CQ,
"Unable to allocate util_cq_err_entry\n");
return -FI_ENOMEM;
}

entry->err_entry = *err_entry;
fastlock_acquire(&util_cq->cq_lock);
slist_insert_tail(&entry->list_entry, &util_cq->err_list);

comp = ofi_cirque_tail(util_cq->cirq);
comp->flags = UTIL_FLAG_ERROR;
ofi_cirque_commit(util_cq->cirq);
fastlock_release(&util_cq->cq_lock);

return 0;
}

int rxm_cq_comp(struct util_cq *util_cq, void *context, uint64_t flags, size_t len,
void *buf, uint64_t data, uint64_t tag)
{
Expand Down Expand Up @@ -546,7 +522,7 @@ static ssize_t rxm_cq_read(struct fid_cq *msg_cq, struct fi_cq_tagged_entry *com
assert(0);
return err_entry.err;
}
return rxm_cq_report_error(util_cq, &err_entry);
return ofi_cq_write_error(util_cq, &err_entry);
}

void rxm_cq_progress(struct rxm_ep *rxm_ep)
Expand Down
80 changes: 70 additions & 10 deletions prov/rxm/src/rxm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ static int rxm_match_recv_entry_tagged(struct dlist_entry *item, const void *arg
rxm_match_tag(recv_entry->tag, recv_entry->ignore, attr->tag);
}

static int rxm_match_recv_entry_context(struct dlist_entry *item, const void *context)
{
struct rxm_recv_entry *recv_entry;

recv_entry = container_of(item, struct rxm_recv_entry, entry);
return recv_entry->context == context;
}

static int rxm_match_unexp_msg(struct dlist_entry *item, const void *arg)
{
struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *)arg;
Expand Down Expand Up @@ -170,17 +178,22 @@ static int rxm_send_queue_init(struct rxm_send_queue *send_queue, size_t size)
}

static int rxm_recv_queue_init(struct rxm_recv_queue *recv_queue, size_t size,
dlist_func_t match_recv,
dlist_func_t match_unexp)
enum rxm_recv_queue_type type)
{
recv_queue->type = type;
recv_queue->fs = rxm_recv_fs_create(size);
if (!recv_queue->fs)
return -FI_ENOMEM;

dlist_init(&recv_queue->recv_list);
dlist_init(&recv_queue->unexp_msg_list);
recv_queue->match_recv = match_recv;
recv_queue->match_unexp = match_unexp;
if (type == RXM_RECV_QUEUE_MSG) {
recv_queue->match_recv = rxm_match_recv_entry;
recv_queue->match_unexp = rxm_match_unexp_msg;
} else {
recv_queue->match_recv = rxm_match_recv_entry_tagged;
recv_queue->match_unexp = rxm_match_unexp_msg_tagged;
}
fastlock_init(&recv_queue->lock);
return 0;
}
Expand Down Expand Up @@ -229,13 +242,12 @@ static int rxm_ep_txrx_res_open(struct rxm_ep *rxm_ep)
goto err2;

ret = rxm_recv_queue_init(&rxm_ep->recv_queue, rxm_ep->rxm_info->rx_attr->size,
rxm_match_recv_entry, rxm_match_unexp_msg);
RXM_RECV_QUEUE_MSG);
if (ret)
goto err3;

ret = rxm_recv_queue_init(&rxm_ep->trecv_queue, rxm_ep->rxm_info->rx_attr->size,
rxm_match_recv_entry_tagged,
rxm_match_unexp_msg_tagged);
RXM_RECV_QUEUE_TAGGED);
if (ret)
goto err4;

Expand Down Expand Up @@ -341,9 +353,55 @@ int rxm_setopt(fid_t fid, int level, int optname,
return -FI_ENOPROTOOPT;
}

static int rxm_ep_cancel_recv(struct rxm_ep *rxm_ep,
struct rxm_recv_queue *recv_queue, void *context)
{
struct fi_cq_err_entry err_entry;
struct rxm_recv_entry *recv_entry;
struct dlist_entry *entry;

fastlock_acquire(&recv_queue->lock);
entry = dlist_remove_first_match(&recv_queue->recv_list,
rxm_match_recv_entry_context,
context);
fastlock_release(&recv_queue->lock);
if (entry) {
recv_entry = container_of(entry, struct rxm_recv_entry, entry);
memset(&err_entry, 0, sizeof(err_entry));
err_entry.op_context = recv_entry->context;
if (recv_queue->type == RXM_RECV_QUEUE_TAGGED) {
err_entry.flags |= FI_TAGGED | FI_RECV;
err_entry.tag = recv_entry->tag;
} else {
err_entry.flags = FI_MSG | FI_RECV;
}
err_entry.err = FI_ECANCELED;
err_entry.prov_errno = -FI_ECANCELED;
rxm_recv_entry_release(recv_queue, recv_entry);
return ofi_cq_write_error(rxm_ep->util_ep.rx_cq, &err_entry);
}
return 0;
}

static ssize_t rxm_ep_cancel(fid_t fid_ep, void *context)
{
struct rxm_ep *rxm_ep = container_of(fid_ep, struct rxm_ep, util_ep.ep_fid);
int ret;

ret = rxm_ep_cancel_recv(rxm_ep, &rxm_ep->recv_queue, context);
if (ret)
return ret;

ret = rxm_ep_cancel_recv(rxm_ep, &rxm_ep->trecv_queue, context);
if (ret)
return ret;

return 0;
}

static struct fi_ops_ep rxm_ops_ep = {
.size = sizeof(struct fi_ops_ep),
.cancel = fi_no_cancel,
.cancel = rxm_ep_cancel,
.getopt = rxm_getopt,
.setopt = rxm_setopt,
.tx_ctx = fi_no_tx_ctx,
Expand All @@ -367,7 +425,8 @@ static int rxm_check_unexp_msg_list(struct rxm_ep *rxm_ep,
return -FI_EAGAIN;

match_attr.addr = recv_entry->addr;
match_attr.tag = recv_entry->tag;
if (recv_queue->type == RXM_RECV_QUEUE_TAGGED)
match_attr.tag = recv_entry->tag;
match_attr.ignore = recv_entry->ignore;

entry = dlist_remove_first_match(&recv_queue->unexp_msg_list,
Expand Down Expand Up @@ -410,8 +469,9 @@ static int rxm_ep_recv_common(struct rxm_ep *rxm_ep, const struct iovec *iov,
src_addr : FI_ADDR_UNSPEC;
recv_entry->context = context;
recv_entry->flags = flags;
recv_entry->tag = tag;
recv_entry->ignore = ignore;
if (recv_queue->type == RXM_RECV_QUEUE_TAGGED)
recv_entry->tag = tag;

fastlock_acquire(&recv_queue->lock);
/* rxm_check_unexp_msg_list() would release the lock if successful */
Expand Down
21 changes: 21 additions & 0 deletions prov/util/src/util_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,27 @@

#define UTIL_DEF_CQ_SIZE (1024)

int ofi_cq_write_error(struct util_cq *cq,
const struct fi_cq_err_entry *err_entry)
{
struct util_cq_err_entry *entry;
struct fi_cq_tagged_entry *comp;

if (!(entry = calloc(1, sizeof(*entry))))
return -FI_ENOMEM;

entry->err_entry = *err_entry;
fastlock_acquire(&cq->cq_lock);
slist_insert_tail(&entry->list_entry, &cq->err_list);
comp = ofi_cirque_tail(cq->cirq);
comp->flags = UTIL_FLAG_ERROR;
ofi_cirque_commit(cq->cirq);
fastlock_release(&cq->cq_lock);
if (cq->wait)
cq->wait->signal(cq->wait);
return 0;
}

int ofi_check_cq_attr(const struct fi_provider *prov,
const struct fi_cq_attr *attr)
{
Expand Down

0 comments on commit ce4c116

Please sign in to comment.