From 598411d70f85dcf5b5c6c2369cc48637c251b656 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 30 Jul 2015 18:24:23 -0400 Subject: [PATCH] tipc: make resetting of links non-atomic In order to facilitate future improvements to the locking structure, we want to make resetting and establishing of links non-atomic. I.e., the functions tipc_node_link_up() and tipc_node_link_down() should be called from outside the node lock context, and grab/release the node lock themselves. This requires that we can freeze the link state from the moment it is set to RESETTING or PEER_RESET in one lock context until it is set to RESET or ESTABLISHING in a later context. The recently introduced link FSM makes this possible, so we are now ready to introduce the above change. This commit implements this. Tested-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 2 +- net/tipc/msg.h | 29 +++++++++ net/tipc/node.c | 166 ++++++++++++++++++++++++++++-------------------- 3 files changed, 127 insertions(+), 70 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 9840b03348e1..3a92924711a1 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -489,8 +489,8 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) xmit = true; mtyp = ACTIVATE_MSG; break; - case LINK_RESETTING: case LINK_PEER_RESET: + case LINK_RESETTING: case LINK_FAILINGOVER: break; default: diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 115bb2aa6bed..53d98ef78650 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -916,4 +916,33 @@ static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list, return false; } +/* tipc_skb_queue_splice_tail - append an skb list to lock protected list + * @list: the new list to append. Not lock protected + * @head: target list. Lock protected. + */ +static inline void tipc_skb_queue_splice_tail(struct sk_buff_head *list, + struct sk_buff_head *head) +{ + spin_lock_bh(&head->lock); + skb_queue_splice_tail(list, head); + spin_unlock_bh(&head->lock); +} + +/* tipc_skb_queue_splice_tail_init - merge two lock protected skb lists + * @list: the new list to add. Lock protected. Will be reinitialized + * @head: target list. Lock protected. + */ +static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list, + struct sk_buff_head *head) +{ + struct sk_buff_head tmp; + + __skb_queue_head_init(&tmp); + + spin_lock_bh(&list->lock); + skb_queue_splice_tail_init(list, &tmp); + spin_unlock_bh(&list->lock); + tipc_skb_queue_splice_tail(&tmp, head); +} + #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index d03e88f2273b..cdca57be85bf 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -66,8 +66,12 @@ enum { NODE_SYNCH_END_EVT = 0xcee }; -static void tipc_node_link_down(struct tipc_node *n, int bearer_id); -static void node_lost_contact(struct tipc_node *n_ptr); +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr); +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, + bool delete); +static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); static void node_established_contact(struct tipc_node *n_ptr); static void tipc_node_delete(struct tipc_node *node); static void tipc_node_timeout(unsigned long data); @@ -275,9 +279,8 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port) static void tipc_node_timeout(unsigned long data) { struct tipc_node *n = (struct tipc_node *)data; + struct tipc_link_entry *le; struct sk_buff_head xmitq; - struct tipc_link *l; - struct tipc_media_addr *maddr; int bearer_id; int rc = 0; @@ -285,17 +288,16 @@ static void tipc_node_timeout(unsigned long data) for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { tipc_node_lock(n); - l = n->links[bearer_id].link; - if (l) { + le = &n->links[bearer_id]; + if (le->link) { /* Link tolerance may change asynchronously: */ - tipc_node_calculate_timer(n, l); - rc = tipc_link_timeout(l, &xmitq); - if (rc & TIPC_LINK_DOWN_EVT) - tipc_node_link_down(n, bearer_id); + tipc_node_calculate_timer(n, le->link); + rc = tipc_link_timeout(le->link, &xmitq); } tipc_node_unlock(n); - maddr = &n->links[bearer_id].maddr; - tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); + if (rc & TIPC_LINK_DOWN_EVT) + tipc_node_link_down(n, bearer_id, false); } if (!mod_timer(&n->timer, jiffies + n->keepalive_intv)) tipc_node_get(n); @@ -303,18 +305,21 @@ static void tipc_node_timeout(unsigned long data) } /** - * tipc_node_link_up - handle addition of link - * + * __tipc_node_link_up - handle addition of link + * Node lock must be held by caller * Link becomes active (alone or shared) or standby, depending on its priority. */ -static void tipc_node_link_up(struct tipc_node *n, int bearer_id, - struct sk_buff_head *xmitq) +static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; struct tipc_link *ol = node_active_link(n, 0); struct tipc_link *nl = n->links[bearer_id].link; + if (!nl || !tipc_link_is_up(nl)) + return; + if (n->working_links > 1) { pr_warn("Attempt to establish 3rd link to %x\n", n->addr); return; @@ -356,28 +361,40 @@ static void tipc_node_link_up(struct tipc_node *n, int bearer_id, } /** - * tipc_node_link_down - handle loss of link + * tipc_node_link_up - handle addition of link + * + * Link becomes active (alone or shared) or standby, depending on its priority. */ -static void tipc_node_link_down(struct tipc_node *n, int bearer_id) +static void tipc_node_link_up(struct tipc_node *n, int bearer_id, + struct sk_buff_head *xmitq) { + tipc_node_lock(n); + __tipc_node_link_up(n, bearer_id, xmitq); + tipc_node_unlock(n); +} + +/** + * __tipc_node_link_down - handle loss of link + */ +static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, + struct sk_buff_head *xmitq, + struct tipc_media_addr **maddr) +{ + struct tipc_link_entry *le = &n->links[*bearer_id]; int *slot0 = &n->active_links[0]; int *slot1 = &n->active_links[1]; - struct tipc_media_addr *maddr = &n->links[bearer_id].maddr; int i, highest = 0; struct tipc_link *l, *_l, *tnl; - struct sk_buff_head xmitq; - l = n->links[bearer_id].link; + l = n->links[*bearer_id].link; if (!l || tipc_link_is_reset(l)) return; - __skb_queue_head_init(&xmitq); - n->working_links--; n->action_flags |= TIPC_NOTIFY_LINK_DOWN; - n->link_id = l->peer_bearer_id << 16 | bearer_id; + n->link_id = l->peer_bearer_id << 16 | *bearer_id; - tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr); + tipc_bearer_remove_dest(n->net, *bearer_id, n->addr); pr_debug("Lost link <%s> on network plane %c\n", l->name, l->net_plane); @@ -404,18 +421,40 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id) if (!tipc_node_is_up(n)) { tipc_link_reset(l); - node_lost_contact(n); + node_lost_contact(n, &le->inputq); return; } /* There is still a working link => initiate failover */ tnl = node_active_link(n, 0); - tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1); - tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq); + tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); tipc_link_reset(l); tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); - tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr); + tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT); + *maddr = &n->links[tnl->bearer_id].maddr; + *bearer_id = tnl->bearer_id; +} + +static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) +{ + struct tipc_link_entry *le = &n->links[bearer_id]; + struct tipc_media_addr *maddr; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + tipc_node_lock(n); + __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); + if (delete && le->link) { + kfree(le->link); + le->link = NULL; + n->link_cnt--; + } + tipc_node_unlock(n); + + tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); + tipc_sk_rcv(n->net, &le->inputq); } bool tipc_node_is_up(struct tipc_node *n) @@ -437,7 +476,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, bool sign_match = false; bool link_up = false; bool accept_addr = false; - + bool reset = true; *dupl_addr = false; *respond = false; @@ -460,6 +499,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, if (sign_match && addr_match && link_up) { /* All is fine. Do nothing. */ + reset = false; } else if (sign_match && addr_match && !link_up) { /* Respond. The link will come up in due time */ *respond = true; @@ -531,29 +571,21 @@ void tipc_node_check_dest(struct net *net, u32 onode, } memcpy(&l->media_addr, maddr, sizeof(*maddr)); memcpy(curr_maddr, maddr, sizeof(*maddr)); - tipc_node_link_down(n, b->identity); exit: tipc_node_unlock(n); + if (reset) + tipc_node_link_down(n, b->identity, false); tipc_node_put(n); } void tipc_node_delete_links(struct net *net, int bearer_id) { struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *l; struct tipc_node *n; rcu_read_lock(); list_for_each_entry_rcu(n, &tn->node_list, list) { - tipc_node_lock(n); - l = n->links[bearer_id].link; - if (l) { - tipc_node_link_down(n, bearer_id); - n->links[bearer_id].link = NULL; - n->link_cnt--; - } - tipc_node_unlock(n); - kfree(l); + tipc_node_link_down(n, bearer_id, true); } rcu_read_unlock(); } @@ -561,19 +593,14 @@ void tipc_node_delete_links(struct net *net, int bearer_id) static void tipc_node_reset_links(struct tipc_node *n) { char addr_string[16]; - u32 i; - - tipc_node_lock(n); + int i; pr_warn("Resetting all links to %s\n", tipc_addr_string_fill(addr_string, n->addr)); for (i = 0; i < MAX_BEARERS; i++) { - if (!n->links[i].link) - continue; - tipc_node_link_down(n, i); + tipc_node_link_down(n, i, false); } - tipc_node_unlock(n); } void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) @@ -798,10 +825,12 @@ static void node_established_contact(struct tipc_node *n_ptr) tipc_bclink_add_node(n_ptr->net, n_ptr->addr); } -static void node_lost_contact(struct tipc_node *n_ptr) +static void node_lost_contact(struct tipc_node *n_ptr, + struct sk_buff_head *inputq) { char addr_string[16]; struct tipc_sock_conn *conn, *safe; + struct tipc_link *l; struct list_head *conns = &n_ptr->conn_sks; struct sk_buff *skb; struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id); @@ -827,14 +856,11 @@ static void node_lost_contact(struct tipc_node *n_ptr) /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { - struct tipc_link *l_ptr = n_ptr->links[i].link; - if (!l_ptr) - continue; - tipc_link_fsm_evt(l_ptr, LINK_FAILOVER_END_EVT); - kfree_skb(l_ptr->failover_reasm_skb); - l_ptr->failover_reasm_skb = NULL; - tipc_link_reset_fragments(l_ptr); + l = n_ptr->links[i].link; + if (l) + tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); } + /* Prevent re-contact with node until cleanup is done */ tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT); @@ -848,7 +874,7 @@ static void node_lost_contact(struct tipc_node *n_ptr) conn->peer_node, conn->port, conn->peer_port, TIPC_ERR_NO_NODE); if (likely(skb)) { - skb_queue_tail(n_ptr->inputq, skb); + skb_queue_tail(inputq, skb); n_ptr->action_flags |= TIPC_MSG_EVT; } list_del(&conn->list); @@ -1025,9 +1051,9 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, l = tipc_node_select_link(n, selector, &bearer_id, &maddr); if (likely(l)) rc = tipc_link_xmit(l, list, &xmitq); - if (unlikely(rc == -ENOBUFS)) - tipc_node_link_down(n, bearer_id); tipc_node_unlock(n); + if (unlikely(rc == -ENOBUFS)) + tipc_node_link_down(n, bearer_id, false); tipc_node_put(n); } if (likely(!rc)) { @@ -1081,8 +1107,8 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, u16 rcv_nxt, syncpt, dlv_nxt; int state = n->state; struct tipc_link *l, *pl = NULL; - struct sk_buff_head; - int i; + struct tipc_media_addr *maddr; + int i, pb_id; l = n->links[bearer_id].link; if (!l) @@ -1123,9 +1149,11 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate or update failover mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { syncpt = oseqno + exp_pkts - 1; - if (pl && tipc_link_is_up(pl)) - tipc_node_link_down(n, pl->bearer_id); - + if (pl && tipc_link_is_up(pl)) { + pb_id = pl->bearer_id; + __tipc_node_link_down(n, &pb_id, xmitq, &maddr); + tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq); + } /* If pkts arrive out of order, use lowest calculated syncpt */ if (less(syncpt, n->sync_point)) n->sync_point = syncpt; @@ -1146,7 +1174,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, syncpt = iseqno + exp_pkts - 1; if (!tipc_link_is_up(l)) { tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT); - tipc_node_link_up(n, bearer_id, xmitq); + __tipc_node_link_up(n, bearer_id, xmitq); } if (n->state == SELF_UP_PEER_UP) { n->sync_point = syncpt; @@ -1224,7 +1252,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) tipc_bclink_sync_state(n, hdr); - /* Release acked broadcast messages */ + /* Release acked broadcast packets */ if (unlikely(n->bclink.acked != msg_bcast_ack(hdr))) tipc_bclink_acknowledge(n, msg_bcast_ack(hdr)); @@ -1233,14 +1261,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) rc = tipc_link_rcv(le->link, skb, &xmitq); skb = NULL; } +unlock: + tipc_node_unlock(n); if (unlikely(rc & TIPC_LINK_UP_EVT)) tipc_node_link_up(n, bearer_id, &xmitq); if (unlikely(rc & TIPC_LINK_DOWN_EVT)) - tipc_node_link_down(n, bearer_id); -unlock: - tipc_node_unlock(n); + tipc_node_link_down(n, bearer_id, false); if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq);