From 170b3927b4c4f6e105964f81ae985fc9772b1f9b Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Tue, 7 Jan 2014 17:02:41 -0500 Subject: [PATCH 1/4] tipc: rename functions related to link failover and improve comments The functionality related to link addition and failover is unnecessarily hard to understand and maintain. We try to improve this by renaming some of the functions, at the same time adding or improving the explanatory comments around them. Names such as "tipc_rcv()" etc. also align better with what is used in other networking components. The changes in this commit are purely cosmetic, no functional changes are made. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Reviewed-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/bearer.c | 2 +- net/tipc/bearer.h | 3 +- net/tipc/link.c | 70 ++++++++++++++++++++++++++++------------------- net/tipc/link.h | 15 ++++++---- net/tipc/node.c | 4 +-- 5 files changed, 56 insertions(+), 38 deletions(-) diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 07ed5cc8235dc1..2d456ab0e843af 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -541,7 +541,7 @@ static int tipc_l2_rcv_msg(struct sk_buff *buf, struct net_device *dev, if (likely(b_ptr)) { if (likely(buf->pkt_type <= PACKET_BROADCAST)) { buf->next = NULL; - tipc_recv_msg(buf, b_ptr); + tipc_rcv(buf, b_ptr); rcu_read_unlock(); return NET_RX_SUCCESS; } diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 410efb1e930bc7..4f5db9ad5bf639 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -161,8 +161,7 @@ extern struct tipc_bearer tipc_bearers[]; * TIPC routines available to supported media types */ -void tipc_recv_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr); - +void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr); int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority); int tipc_disable_bearer(const char *name); diff --git a/net/tipc/link.c b/net/tipc/link.c index 131a32a7b17417..bb48b96850201e 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1,7 +1,7 @@ /* * net/tipc/link.c: TIPC link code * - * Copyright (c) 1996-2007, 2012, Ericsson AB + * Copyright (c) 1996-2007, 2012-2014, Ericsson AB * Copyright (c) 2004-2007, 2010-2013, Wind River Systems * All rights reserved. * @@ -78,8 +78,8 @@ static const char *link_unk_evt = "Unknown link event "; static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, struct sk_buff *buf); static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); -static int link_recv_changeover_msg(struct tipc_link **l_ptr, - struct sk_buff **buf); +static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, + struct sk_buff **buf); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); static int link_send_sections_long(struct tipc_port *sender, struct iovec const *msg_sect, @@ -278,7 +278,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, tipc_node_attach_link(n_ptr, l_ptr); - k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); + k_init_timer(&l_ptr->timer, (Handler)link_timeout, + (unsigned long)l_ptr); list_add_tail(&l_ptr->link_list, &b_ptr->links); tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); @@ -1422,14 +1423,14 @@ static int link_recv_buf_validate(struct sk_buff *buf) } /** - * tipc_recv_msg - process TIPC messages arriving from off-node + * tipc_rcv - process TIPC packets/messages arriving from off-node * @head: pointer to message buffer chain * @tb_ptr: pointer to bearer message arrived on * * Invoked with no locks held. Bearer pointer must point to a valid bearer * structure (i.e. cannot be NULL), but bearer can be inactive. */ -void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) +void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) { read_lock_bh(&tipc_net_lock); while (head) { @@ -1603,7 +1604,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr) continue; case CHANGEOVER_PROTOCOL: type = msg_type(msg); - if (link_recv_changeover_msg(&l_ptr, &buf)) { + if (tipc_link_tunnel_rcv(&l_ptr, &buf)) { msg = buf_msg(buf); seq_no = msg_seqno(msg); if (type == ORIGINAL_MSG) @@ -1954,13 +1955,13 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) } -/* - * tipc_link_tunnel(): Send one message via a link belonging to - * another bearer. Owner node is locked. +/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to + * a different bearer. Owner node is locked. */ -static void tipc_link_tunnel(struct tipc_link *l_ptr, - struct tipc_msg *tunnel_hdr, struct tipc_msg *msg, - u32 selector) +static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, + struct tipc_msg *tunnel_hdr, + struct tipc_msg *msg, + u32 selector) { struct tipc_link *tunnel; struct sk_buff *buf; @@ -1983,12 +1984,13 @@ static void tipc_link_tunnel(struct tipc_link *l_ptr, } - -/* - * changeover(): Send whole message queue via the remaining link - * Owner node is locked. +/* tipc_link_failover_send_queue(): A link has gone down, but a second + * link is still active. We can do failover. Tunnel the failing link's + * whole send queue via the remaining link. This way, we don't lose + * any packets, and sequence order is preserved for subsequent traffic + * sent over the remaining link. Owner node is locked. */ -void tipc_link_changeover(struct tipc_link *l_ptr) +void tipc_link_failover_send_queue(struct tipc_link *l_ptr) { u32 msgcount = l_ptr->out_queue_size; struct sk_buff *crs = l_ptr->first_out; @@ -2037,20 +2039,30 @@ void tipc_link_changeover(struct tipc_link *l_ptr) msgcount = msg_msgcnt(msg); while (msgcount--) { msg_set_seqno(m, msg_seqno(msg)); - tipc_link_tunnel(l_ptr, &tunnel_hdr, m, - msg_link_selector(m)); + tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, + msg_link_selector(m)); pos += align(msg_size(m)); m = (struct tipc_msg *)pos; } } else { - tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, - msg_link_selector(msg)); + tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, + msg_link_selector(msg)); } crs = crs->next; } } -void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *tunnel) +/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a + * duplicate of the first link's send queue via the new link. This way, we + * are guaranteed that currently queued packets from a socket are delivered + * before future traffic from the same socket, even if this is using the + * new link. The last arriving copy of each duplicate packet is dropped at + * the receiving end by the regular protocol check, so packet cardinality + * and sequence order is preserved per sender/receiver socket pair. + * Owner node is locked. + */ +void tipc_link_dup_send_queue(struct tipc_link *l_ptr, + struct tipc_link *tunnel) { struct sk_buff *iter; struct tipc_msg tunnel_hdr; @@ -2106,12 +2118,14 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) return eb; } -/* - * link_recv_changeover_msg(): Receive tunneled packet sent - * via other link. Node is locked. Return extracted buffer. +/* tipc_link_tunnel_rcv(): Receive a tunneled packet, sent + * via other link as result of a failover (ORIGINAL_MSG) or + * a new active link (DUPLICATE_MSG). Failover packets are + * returned to the active link for delivery upwards. + * Owner node is locked. */ -static int link_recv_changeover_msg(struct tipc_link **l_ptr, - struct sk_buff **buf) +static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, + struct sk_buff **buf) { struct sk_buff *tunnel_buf = *buf; struct tipc_link *dest_link; diff --git a/net/tipc/link.h b/net/tipc/link.h index 0636ca95be367e..89ab89be848da3 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -216,15 +216,20 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, struct tipc_bearer *b_ptr, const struct tipc_media_addr *media_addr); void tipc_link_delete(struct tipc_link *l_ptr); -void tipc_link_changeover(struct tipc_link *l_ptr); -void tipc_link_send_duplicate(struct tipc_link *l_ptr, struct tipc_link *dest); +void tipc_link_failover_send_queue(struct tipc_link *l_ptr); +void tipc_link_dup_send_queue(struct tipc_link *l_ptr, + struct tipc_link *dest); void tipc_link_reset_fragments(struct tipc_link *l_ptr); int tipc_link_is_up(struct tipc_link *l_ptr); int tipc_link_is_active(struct tipc_link *l_ptr); void tipc_link_stop(struct tipc_link *l_ptr); -struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, u16 cmd); -struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space); -struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space); +struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, + int req_tlv_space, + u16 cmd); +struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, + int req_tlv_space); +struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, + int req_tlv_space); void tipc_link_reset(struct tipc_link *l_ptr); int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector); void tipc_link_send_names(struct list_head *message_list, u32 dest); diff --git a/net/tipc/node.c b/net/tipc/node.c index e167d2615569c3..efe4d41bf11bac 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -162,7 +162,7 @@ void tipc_node_link_up(struct tipc_node *n_ptr, struct tipc_link *l_ptr) pr_info("New link <%s> becomes standby\n", l_ptr->name); return; } - tipc_link_send_duplicate(active[0], l_ptr); + tipc_link_dup_send_queue(active[0], l_ptr); if (l_ptr->priority == active[0]->priority) { active[0] = l_ptr; return; @@ -225,7 +225,7 @@ void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr) if (active[0] == l_ptr) node_select_active_links(n_ptr); if (tipc_node_is_up(n_ptr)) - tipc_link_changeover(l_ptr); + tipc_link_failover_send_queue(l_ptr); else node_lost_contact(n_ptr); } From b9d4c33935bb5673fa9f721ecf85e5029c847f08 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Tue, 7 Jan 2014 17:02:42 -0500 Subject: [PATCH 2/4] tipc: remove 'has_redundant_link' flag from STATE link protocol messages The flag 'has_redundant_link' is defined only in RESET and ACTIVATE protocol messages. Due to an ambiguity in the protocol specification it is currently also transferred in STATE messages. Its value is used to initialize a link state variable, 'permit_changeover', which is used to inhibit futile link failover attempts when it is known that the peer node has no working links at the moment, although the local node may still think it has one. The fact that 'has_redundant_link' incorrectly is read from STATE messages has the effect that 'permit_changeover' sometimes gets a wrong value, and permanently blocks any links from being re-established. Such failures can only occur in in dual-link systems, and are extremely rare. This bug seems to have always been present in the code. Furthermore, since commit b4b5610223f17790419b03eaa962b0e3ecf930d7 ("tipc: Ensure both nodes recognize loss of contact between them"), the 'permit_changeover' field serves no purpose any more. The task of enforcing 'lost contact' cycles at both peer endpoints is now taken by a new mechanism, using the flags WAIT_NODE_DOWN and WAIT_PEER_DOWN in struct tipc_node to abort unnecessary failover attempts. We therefore remove the 'has_redundant_link' flag from STATE messages, as well as the now redundant 'permit_changeover' variable. Signed-off-by: Jon Maloy Reviewed-by: Ying Xue Reviewed-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/link.c | 10 +--------- net/tipc/node.h | 2 -- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index bb48b96850201e..9fb0f6b96b4574 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -438,8 +438,7 @@ void tipc_link_reset(struct tipc_link *l_ptr) tipc_node_link_down(l_ptr->owner, l_ptr); tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); - if (was_active_link && tipc_node_active_links(l_ptr->owner) && - l_ptr->owner->permit_changeover) { + if (was_active_link && tipc_node_active_links(l_ptr->owner)) { l_ptr->reset_checkpoint = checkpoint; l_ptr->exp_msg_count = START_CHANGEOVER; } @@ -1838,8 +1837,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf) if (tipc_own_addr > msg_prevnode(msg)) l_ptr->b_ptr->net_plane = msg_net_plane(msg); - l_ptr->owner->permit_changeover = msg_redundant_link(msg); - switch (msg_type(msg)) { case RESET_MSG: @@ -2001,11 +1998,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr) if (!tunnel) return; - if (!l_ptr->owner->permit_changeover) { - pr_warn("%speer did not permit changeover\n", link_co_err); - return; - } - tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); diff --git a/net/tipc/node.h b/net/tipc/node.h index d4bb654c858df7..63e2e8ead2fe5d 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -64,7 +64,6 @@ * @working_links: number of working links to node (both active and standby) * @block_setup: bit mask of conditions preventing link establishment to node * @link_cnt: number of links to node - * @permit_changeover: non-zero if node has redundant links to this system * @signature: node instance identifier * @bclink: broadcast-related info * @acked: sequence # of last outbound b'cast message acknowledged by node @@ -89,7 +88,6 @@ struct tipc_node { int link_cnt; int working_links; int block_setup; - int permit_changeover; u32 signature; struct { u32 acked; From f9a2c80b8b7366748a1c3975df07f4a34aa80538 Mon Sep 17 00:00:00 2001 From: Ying Xue Date: Tue, 7 Jan 2014 17:02:43 -0500 Subject: [PATCH 3/4] tipc: introduce new spinlock to protect struct link_req Currently, only 'bearer_lock' is used to protect struct link_req in the function disc_timeout(). This is unsafe, since the member fields 'num_nodes' and 'timer_intv' might be accessed by below three different threads simultaneously, none of them grabbing bearer_lock in the critical region: link_activate() tipc_bearer_add_dest() tipc_disc_add_dest() req->num_nodes++; tipc_link_reset() tipc_bearer_remove_dest() tipc_disc_remove_dest() req->num_nodes-- disc_update() read req->num_nodes write req->timer_intv disc_timeout() read req->num_nodes read/write req->timer_intv Without lock protection, the only symptom of a race is that discovery messages occasionally may not be sent out. This is not fatal, since such messages are best-effort anyway. On the other hand, since discovery messages are not time critical, adding a protecting lock brings no serious overhead either. So we add a new, dedicated spinlock in order to guarantee absolute data consistency in link_req objects. This also helps reduce the overall role of the bearer_lock, which we want to remove completely in a later commit series. Signed-off-by: Ying Xue Reviewed-by: Paul Gortmaker Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/discover.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/net/tipc/discover.c b/net/tipc/discover.c index bc849f1efa1677..412ff41b861166 100644 --- a/net/tipc/discover.c +++ b/net/tipc/discover.c @@ -50,6 +50,7 @@ * @dest: destination address for request messages * @domain: network domain to which links can be established * @num_nodes: number of nodes currently discovered (i.e. with an active link) + * @lock: spinlock for controlling access to requests * @buf: request message to be (repeatedly) sent * @timer: timer governing period between requests * @timer_intv: current interval between requests (in ms) @@ -59,6 +60,7 @@ struct tipc_link_req { struct tipc_media_addr dest; u32 domain; int num_nodes; + spinlock_t lock; struct sk_buff *buf; struct timer_list timer; unsigned int timer_intv; @@ -274,7 +276,9 @@ static void disc_update(struct tipc_link_req *req) */ void tipc_disc_add_dest(struct tipc_link_req *req) { + spin_lock_bh(&req->lock); req->num_nodes++; + spin_unlock_bh(&req->lock); } /** @@ -283,8 +287,10 @@ void tipc_disc_add_dest(struct tipc_link_req *req) */ void tipc_disc_remove_dest(struct tipc_link_req *req) { + spin_lock_bh(&req->lock); req->num_nodes--; disc_update(req); + spin_unlock_bh(&req->lock); } /** @@ -297,7 +303,7 @@ static void disc_timeout(struct tipc_link_req *req) { int max_delay; - spin_lock_bh(&req->bearer->lock); + spin_lock_bh(&req->lock); /* Stop searching if only desired node has been found */ if (tipc_node(req->domain) && req->num_nodes) { @@ -325,7 +331,7 @@ static void disc_timeout(struct tipc_link_req *req) k_start_timer(&req->timer, req->timer_intv); exit: - spin_unlock_bh(&req->bearer->lock); + spin_unlock_bh(&req->lock); } /** @@ -356,6 +362,7 @@ int tipc_disc_create(struct tipc_bearer *b_ptr, struct tipc_media_addr *dest, req->domain = dest_domain; req->num_nodes = 0; req->timer_intv = TIPC_LINK_REQ_INIT; + spin_lock_init(&req->lock); k_init_timer(&req->timer, (Handler)disc_timeout, (unsigned long)req); k_start_timer(&req->timer, req->timer_intv); b_ptr->link_req = req; From 581465fa285863344efc233bc546823bfabd295f Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Tue, 7 Jan 2014 17:02:44 -0500 Subject: [PATCH 4/4] tipc: make link start event synchronous When a link is created we delay the start event by launching it to be executed later in a tasklet. As we hold all the necessary locks at the moment of creation, and there is no risk of deadlock or contention, this delay serves no purpose in the current code. We remove this obsolete indirection step, and the associated function link_start(). At the same time, we rename the function tipc_link_stop() to the more appropriate tipc_link_purge_queues(). Signed-off-by: Jon Maloy Reviewed-by: Paul Gortmaker Signed-off-by: David S. Miller --- net/tipc/bcast.c | 2 +- net/tipc/link.c | 16 +++++----------- net/tipc/link.h | 2 +- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 4c2a80b3c01e10..bf860d9e75af23 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -794,7 +794,7 @@ void tipc_bclink_init(void) void tipc_bclink_stop(void) { spin_lock_bh(&bc_lock); - tipc_link_stop(bcl); + tipc_link_purge_queues(bcl); spin_unlock_bh(&bc_lock); memset(bclink, 0, sizeof(*bclink)); diff --git a/net/tipc/link.c b/net/tipc/link.c index 9fb0f6b96b4574..471973ff134f89 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -87,7 +87,6 @@ static int link_send_sections_long(struct tipc_port *sender, static void link_state_event(struct tipc_link *l_ptr, u32 event); static void link_reset_statistics(struct tipc_link *l_ptr); static void link_print(struct tipc_link *l_ptr, const char *str); -static void link_start(struct tipc_link *l_ptr); static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); static void tipc_link_send_sync(struct tipc_link *l); static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf); @@ -281,7 +280,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); list_add_tail(&l_ptr->link_list, &b_ptr->links); - tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); + + link_state_event(l_ptr, STARTING_EVT); return l_ptr; } @@ -306,19 +306,13 @@ void tipc_link_delete(struct tipc_link *l_ptr) tipc_node_lock(l_ptr->owner); tipc_link_reset(l_ptr); tipc_node_detach_link(l_ptr->owner, l_ptr); - tipc_link_stop(l_ptr); + tipc_link_purge_queues(l_ptr); list_del_init(&l_ptr->link_list); tipc_node_unlock(l_ptr->owner); k_term_timer(&l_ptr->timer); kfree(l_ptr); } -static void link_start(struct tipc_link *l_ptr) -{ - tipc_node_lock(l_ptr->owner); - link_state_event(l_ptr, STARTING_EVT); - tipc_node_unlock(l_ptr->owner); -} /** * link_schedule_port - schedule port for deferred sending @@ -404,10 +398,10 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr) } /** - * tipc_link_stop - purge all inbound and outbound messages associated with link + * tipc_link_purge_queues - purge all pkt queues associated with link * @l_ptr: pointer to link */ -void tipc_link_stop(struct tipc_link *l_ptr) +void tipc_link_purge_queues(struct tipc_link *l_ptr) { kfree_skb_list(l_ptr->oldest_deferred_in); kfree_skb_list(l_ptr->first_out); diff --git a/net/tipc/link.h b/net/tipc/link.h index 89ab89be848da3..3b6aa65b608c11 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -222,7 +222,7 @@ void tipc_link_dup_send_queue(struct tipc_link *l_ptr, void tipc_link_reset_fragments(struct tipc_link *l_ptr); int tipc_link_is_up(struct tipc_link *l_ptr); int tipc_link_is_active(struct tipc_link *l_ptr); -void tipc_link_stop(struct tipc_link *l_ptr); +void tipc_link_purge_queues(struct tipc_link *l_ptr); struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, u16 cmd);