Skip to content

Commit

Permalink
tipc: make media xmit call outside node spinlock context
Browse files Browse the repository at this point in the history
Currently, message sending is performed through a deep call chain,
where the node spinlock is grabbed and held during a significant
part of the transmission time. This is clearly detrimental to
overall throughput performance; it would be better if we could send
the message after the spinlock has been released.

In this commit, we do instead let the call revert on the stack after
the buffer chain has been added to the transmission queue, whereafter
clones of the buffers are transmitted to the device layer outside the
spinlock scope.

As a further step in our effort to separate the roles of the node
and link entities we also move the function tipc_link_xmit() to
node.c, and rename it to tipc_node_xmit().

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Jon Paul Maloy authored and davem330 committed Jul 21, 2015
1 parent 22d85c7 commit af9b028
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 77 deletions.
26 changes: 26 additions & 0 deletions net/tipc/bearer.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,32 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
rcu_read_unlock();
}

/* tipc_bearer_xmit() -send buffer to destination over bearer
*/
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr *dst)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_bearer *b;
struct sk_buff *skb, *tmp;

if (skb_queue_empty(xmitq))
return;

rcu_read_lock();
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b)) {
skb_queue_walk_safe(xmitq, skb, tmp) {
__skb_dequeue(xmitq);
b->media->send_msg(net, skb, b, dst);
/* Until we remove cloning in tipc_l2_send_msg(): */
kfree_skb(skb);
}
}
rcu_read_unlock();
}

/**
* tipc_l2_rcv_msg - handle incoming TIPC message from an interface
* @buf: the received packet
Expand Down
3 changes: 3 additions & 0 deletions net/tipc/bearer.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,5 +217,8 @@ void tipc_bearer_cleanup(void);
void tipc_bearer_stop(struct net *net);
void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
struct tipc_media_addr *dest);
void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq,
struct tipc_media_addr *dst);

#endif /* _TIPC_BEARER_H */
132 changes: 72 additions & 60 deletions net/tipc/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
/* This really cannot happen... */
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
tipc_link_reset(link);
return -ENOBUFS;
}
/* Non-blocking sender: */
Expand Down Expand Up @@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
return 0;
}

/**
* tipc_link_xmit(): enqueue buffer list according to queue situation
* @link: link to use
* @list: chain of buffers containing message
* @xmitq: returned list of packets to be sent by caller
*
* Consumes the buffer chain, except when returning -ELINKCONG,
* since the caller then may want to make more send attempts.
* Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
* Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
*/
int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
struct sk_buff_head *xmitq)
{
struct tipc_msg *hdr = buf_msg(skb_peek(list));
unsigned int maxwin = l->window;
unsigned int i, imp = msg_importance(hdr);
unsigned int mtu = l->mtu;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
u16 bc_last_in = l->owner->bclink.last_in;
struct sk_buff_head *transmq = &l->transmq;
struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb;

/* Match msg importance against this and all higher backlog limits: */
for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
return link_schedule_user(l, list);
}
if (unlikely(msg_size(hdr) > mtu))
return -EMSGSIZE;

/* Prepare each packet for sending, and add to relevant queue: */
while (skb_queue_len(list)) {
skb = skb_peek(list);
hdr = buf_msg(skb);
msg_set_seqno(hdr, seqno);
msg_set_ack(hdr, ack);
msg_set_bcast_ack(hdr, bc_last_in);

if (likely(skb_queue_len(transmq) < maxwin)) {
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb)
return -ENOBUFS;
__skb_dequeue(list);
__skb_queue_tail(transmq, skb);
__skb_queue_tail(xmitq, _skb);
l->rcv_unacked = 0;
seqno++;
continue;
}
if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
kfree_skb(__skb_dequeue(list));
l->stats.sent_bundled++;
continue;
}
if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
kfree_skb(__skb_dequeue(list));
__skb_queue_tail(backlogq, bskb);
l->backlog[msg_importance(buf_msg(bskb))].len++;
l->stats.sent_bundled++;
l->stats.sent_bundles++;
continue;
}
l->backlog[imp].len += skb_queue_len(list);
skb_queue_splice_tail_init(list, backlogq);
}
l->snd_nxt = seqno;
return 0;
}

static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
skb_queue_head_init(list);
Expand All @@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
return __tipc_link_xmit(link->owner->net, link, &head);
}

/* tipc_link_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
* messages, which will not cause link congestion
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
* tipc_wait_for_sendpkt() where applicable
*/
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
u32 selector)
{
struct sk_buff_head head;
int rc;

skb2list(skb, &head);
rc = tipc_link_xmit(net, &head, dnode, selector);
if (rc)
kfree_skb(skb);
return 0;
}

/**
* tipc_link_xmit() is the general link level function for message sending
* @net: the applicable net namespace
* @list: chain of buffers containing message
* @dsz: amount of user data to be sent
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning error
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
u32 selector)
{
struct tipc_link *link = NULL;
struct tipc_node *node;
int rc = -EHOSTUNREACH;

node = tipc_node_find(net, dnode);
if (node) {
tipc_node_lock(node);
link = node_active_link(node, selector & 1);
if (link)
rc = __tipc_link_xmit(net, link, list);
tipc_node_unlock(node);
tipc_node_put(node);
}
if (link)
return rc;

if (likely(in_own_node(net, dnode))) {
tipc_sk_rcv(net, list);
return 0;
}

__skb_queue_purge(list);
return rc;
}

/*
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
*
Expand Down
6 changes: 2 additions & 4 deletions net/tipc/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,10 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
void tipc_link_purge_backlog(struct tipc_link *l);
void tipc_link_reset_all(struct tipc_node *node);
void tipc_link_reset(struct tipc_link *l_ptr);
int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
u32 selector);
int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
u32 selector);
int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *list);
int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
struct sk_buff_head *xmitq);
void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority);
void tipc_link_push_packets(struct tipc_link *l_ptr);
Expand Down
4 changes: 2 additions & 2 deletions net/tipc/name_distr.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
if (!oskb)
break;
msg_set_destnode(buf_msg(oskb), dnode);
tipc_link_xmit_skb(net, oskb, dnode, dnode);
tipc_node_xmit_skb(net, oskb, dnode, dnode);
}
rcu_read_unlock();

Expand Down Expand Up @@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
&tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
rcu_read_unlock();

tipc_link_xmit(net, &head, dnode, dnode);
tipc_node_xmit(net, &head, dnode, dnode);
}

static void tipc_publ_subscribe(struct net *net, struct publication *publ,
Expand Down
78 changes: 78 additions & 0 deletions net/tipc/node.c
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,84 @@ static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
return -EMSGSIZE;
}

static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
int *bearer_id,
struct tipc_media_addr **maddr)
{
int id = n->active_links[sel & 1];

if (unlikely(id < 0))
return NULL;

*bearer_id = id;
*maddr = &n->links[id].maddr;
return n->links[id].link;
}

/**
* tipc_node_xmit() is the general link level function for message sending
* @net: the applicable net namespace
* @list: chain of buffers containing message
* @dnode: address of destination node
* @selector: a number used for deterministic link selection
* Consumes the buffer chain, except when returning -ELINKCONG
* Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
*/
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
u32 dnode, int selector)
{
struct tipc_link *l = NULL;
struct tipc_node *n;
struct sk_buff_head xmitq;
struct tipc_media_addr *maddr;
int bearer_id;
int rc = -EHOSTUNREACH;

__skb_queue_head_init(&xmitq);
n = tipc_node_find(net, dnode);
if (likely(n)) {
tipc_node_lock(n);
l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
if (likely(l))
rc = tipc_link_xmit(l, list, &xmitq);
if (unlikely(rc == -ENOBUFS))
tipc_link_reset(l);
tipc_node_unlock(n);
tipc_node_put(n);
}
if (likely(!rc)) {
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
return 0;
}
if (likely(in_own_node(net, dnode))) {
tipc_sk_rcv(net, list);
return 0;
}
return rc;
}

/* tipc_node_xmit_skb(): send single buffer to destination
* Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
* messages, which will not be rejected
* The only exception is datagram messages rerouted after secondary
* lookup, which are rare and safe to dispose of anyway.
* TODO: Return real return value, and let callers use
* tipc_wait_for_sendpkt() where applicable
*/
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
u32 selector)
{
struct sk_buff_head head;
int rc;

skb_queue_head_init(&head);
__skb_queue_tail(&head, skb);
rc = tipc_node_xmit(net, &head, dnode, selector);
if (rc == -ELINKCONG)
kfree_skb(skb);
return 0;
}

int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
int err;
Expand Down
4 changes: 4 additions & 0 deletions net/tipc/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ bool tipc_node_is_up(struct tipc_node *n);
int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
char *linkname, size_t len);
void tipc_node_unlock(struct tipc_node *node);
int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
int selector);
int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
u32 selector);
int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);

Expand Down
Loading

0 comments on commit af9b028

Please sign in to comment.