Skip to content

Commit

Permalink
Merge pull request #6953 from ooststep/cm_fixes
Browse files Browse the repository at this point in the history
prov/rxm: minor fixes to connection management
  • Loading branch information
j-xiong authored Aug 3, 2021
2 parents 17a382d + 5426311 commit 7d6d2a1
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 6 deletions.
2 changes: 2 additions & 0 deletions prov/rxm/src/rxm.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct rxm_conn {
struct dlist_entry deferred_tx_queue;
struct dlist_entry deferred_sar_msgs;
struct dlist_entry deferred_sar_segments;
struct dlist_entry loopback_entry;
};

void rxm_freeall_conns(struct rxm_ep *ep);
Expand Down Expand Up @@ -623,6 +624,7 @@ struct rxm_ep {
struct fi_info *msg_info;

struct index_map conn_idx_map;
struct dlist_entry loopback_list;
union ofi_sock_ip addr;

pthread_t cm_thread;
Expand Down
2 changes: 1 addition & 1 deletion prov/rxm/src/rxm_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

static int rxm_addr_compare(struct ofi_rbmap *map, void *key, void *data)
{
return memcmp(data, key,
return memcmp(&((struct rxm_peer_addr *) data)->addr, key,
container_of(map, struct rxm_av, addr_map)->util_av.addrlen);
}

Expand Down
21 changes: 16 additions & 5 deletions prov/rxm/src/rxm_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ static void rxm_close_conn(struct rxm_conn *conn)
}
fi_close(&conn->msg_ep->fid);
rxm_flush_msg_cq(conn->ep);
dlist_remove_init(&conn->loopback_entry);
conn->msg_ep = NULL;
conn->state = RXM_CM_IDLE;
}
Expand Down Expand Up @@ -276,6 +277,7 @@ static void rxm_free_conn(struct rxm_conn *conn)
void rxm_freeall_conns(struct rxm_ep *ep)
{
struct rxm_conn *conn;
struct dlist_entry *tmp;
struct rxm_av *av;
int i;

Expand All @@ -293,6 +295,12 @@ void rxm_freeall_conns(struct rxm_ep *ep)
rxm_free_conn(conn);
}

dlist_foreach_container_safe(&ep->loopback_list, struct rxm_conn,
conn, loopback_entry, tmp) {
rxm_close_conn(conn);
rxm_free_conn(conn);
}

ofi_ep_lock_release(&ep->util_ep);
}

Expand All @@ -316,6 +324,7 @@ rxm_alloc_conn(struct rxm_ep *ep, struct rxm_peer_addr *peer)
dlist_init(&conn->deferred_tx_queue);
dlist_init(&conn->deferred_sar_msgs);
dlist_init(&conn->deferred_sar_segments);
dlist_init(&conn->loopback_entry);

conn->peer = peer;
peer->refcnt++;
Expand Down Expand Up @@ -528,7 +537,7 @@ rxm_process_connreq(struct rxm_ep *ep, struct rxm_eq_cm_entry *cm_entry)
break;
case RXM_CM_CONNECTING:
/* simultaneous connections */
cmp = ofi_addr_cmp(&rxm_prov, &peer_addr.sa, &peer->addr.sa);
cmp = ofi_addr_cmp(&rxm_prov, &peer_addr.sa, &ep->addr.sa);
if (cmp < 0) {
/* let our request finish */
rxm_reject_connreq(ep, cm_entry,
Expand All @@ -540,14 +549,16 @@ rxm_process_connreq(struct rxm_ep *ep, struct rxm_eq_cm_entry *cm_entry)
} else {
/* connecting to ourself, create loopback conn */
conn = rxm_alloc_conn(ep, peer);
if (!conn)
goto remove;

dlist_insert_tail(&conn->loopback_entry, &ep->loopback_list);
break;
}
break;
case RXM_CM_ACCEPTING:
case RXM_CM_CONNECTED:
/* peer reset and lost previous connection state */
rxm_close_conn(conn);
break;
goto put;
default:
assert(0);
break;
Expand Down Expand Up @@ -600,7 +611,7 @@ void rxm_process_shutdown(struct rxm_conn *conn)

static void rxm_handle_error(struct rxm_ep *ep)
{
struct fi_eq_err_entry entry;
struct fi_eq_err_entry entry = {0};
ssize_t ret;

assert(ofi_ep_lock_held(&ep->util_ep));
Expand Down
2 changes: 2 additions & 0 deletions prov/rxm/src/rxm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2914,6 +2914,8 @@ int rxm_endpoint(struct fid_domain *domain, struct fi_info *info,
if (rxm_ep->rxm_info->caps & FI_ATOMIC)
(*ep_fid)->atomic = &rxm_ops_atomic;

dlist_init(&rxm_ep->loopback_list);

return 0;
err2:
ofi_endpoint_close(&rxm_ep->util_ep);
Expand Down

0 comments on commit 7d6d2a1

Please sign in to comment.