diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 3b7bd2174330..08d64e7527cb 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -112,6 +112,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net) return tipc_net(net)->bcbase; } +static struct tipc_link *tipc_bc_sndlink(struct net *net) +{ + return tipc_net(net)->bcl; +} + /** * tipc_nmap_equal - test for equality of node maps */ @@ -121,6 +126,7 @@ static int tipc_nmap_equal(struct tipc_node_map *nm_a, return !memcmp(nm_a, nm_b, sizeof(*nm_a)); } +static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq); static void tipc_nmap_diff(struct tipc_node_map *nm_a, struct tipc_node_map *nm_b, struct tipc_node_map *nm_diff); @@ -148,14 +154,14 @@ uint tipc_bcast_get_mtu(void) return MAX_PKT_DEFAULT_MCAST; } -static u32 bcbuf_acks(struct sk_buff *buf) +static u16 bcbuf_acks(struct sk_buff *skb) { - return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; + return TIPC_SKB_CB(skb)->ackers; } -static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) +static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers) { - TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; + TIPC_SKB_CB(buf)->ackers = ackers; } static void bcbuf_decr_acks(struct sk_buff *buf) @@ -166,9 +172,10 @@ static void bcbuf_decr_acks(struct sk_buff *buf) void tipc_bclink_add_node(struct net *net, u32 addr) { struct tipc_net *tn = net_generic(net, tipc_net_id); - + struct tipc_link *l = tipc_bc_sndlink(net); tipc_bclink_lock(net); tipc_nmap_add(&tn->bcbase->bcast_nodes, addr); + tipc_link_add_bc_peer(l); tipc_bclink_unlock(net); } @@ -178,6 +185,7 @@ void tipc_bclink_remove_node(struct net *net, u32 addr) tipc_bclink_lock(net); tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr); + tn->bcl->ackers--; /* Last node? => reset backlog queue */ if (!tn->bcbase->bcast_nodes.count) @@ -295,7 +303,6 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) if (unlikely(!n_ptr->bclink.recv_permitted)) return; - tipc_bclink_lock(net); /* Bail out if tx queue is empty (no clean up is required) */ @@ -324,13 +331,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) less_eq(acked, n_ptr->bclink.acked)) goto exit; } - /* Skip over packets that node has previously acknowledged */ skb_queue_walk(&tn->bcl->transmq, skb) { if (more(buf_seqno(skb), n_ptr->bclink.acked)) break; } - /* Update packets that node is now acknowledging */ skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { if (more(buf_seqno(skb), acked)) @@ -367,6 +372,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, struct sk_buff *buf; struct net *net = n_ptr->net; struct tipc_net *tn = net_generic(net, tipc_net_id); + struct tipc_link *bcl = tn->bcl; /* Ignore "stale" link state info */ if (less_eq(last_sent, n_ptr->bclink.last_in)) @@ -375,6 +381,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, /* Update link synchronization state; quit if in sync */ bclink_update_last_sent(n_ptr, last_sent); + /* This is a good location for statistical profiling */ + bcl->stats.queue_sz_counts++; + bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq); + if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) return; @@ -468,52 +478,35 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) */ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct tipc_link *bcl = tn->bcl; - struct tipc_bc_base *bclink = tn->bcbase; + struct tipc_link *l = tipc_bc_sndlink(net); + struct sk_buff_head xmitq, inputq, rcvq; int rc = 0; - int bc = 0; - struct sk_buff *skb; - struct sk_buff_head arrvq; - struct sk_buff_head inputq; - /* Prepare clone of message for local node */ - skb = tipc_msg_reassemble(list); - if (unlikely(!skb)) + __skb_queue_head_init(&rcvq); + __skb_queue_head_init(&xmitq); + skb_queue_head_init(&inputq); + + /* Prepare message clone for local node */ + if (unlikely(!tipc_msg_reassemble(list, &rcvq))) return -EHOSTUNREACH; - /* Broadcast to all nodes */ - if (likely(bclink)) { - tipc_bclink_lock(net); - if (likely(bclink->bcast_nodes.count)) { - rc = __tipc_link_xmit(net, bcl, list); - if (likely(!rc)) { - u32 len = skb_queue_len(&bcl->transmq); - - bclink_set_last_sent(net); - bcl->stats.queue_sz_counts++; - bcl->stats.accu_queue_sz += len; - } - bc = 1; - } - tipc_bclink_unlock(net); - } - - if (unlikely(!bc)) - __skb_queue_purge(list); + tipc_bcast_lock(net); + if (tipc_link_bc_peers(l)) + rc = tipc_link_xmit(l, list, &xmitq); + bclink_set_last_sent(net); + tipc_bcast_unlock(net); + /* Don't send to local node if adding to link failed */ if (unlikely(rc)) { - kfree_skb(skb); + __skb_queue_purge(&rcvq); return rc; } - /* Deliver message clone */ - __skb_queue_head_init(&arrvq); - skb_queue_head_init(&inputq); - __skb_queue_tail(&arrvq, skb); - tipc_sk_mcast_rcv(net, &arrvq, &inputq); - return rc; + /* Broadcast to all nodes, inluding local node */ + tipc_bcbearer_xmit(net, &xmitq); + tipc_sk_mcast_rcv(net, &rcvq, &inputq); + __skb_queue_purge(list); + return 0; } - /** * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet * @@ -564,7 +557,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) node = tipc_node_find(net, msg_prevnode(msg)); if (unlikely(!node)) goto exit; - tipc_node_lock(node); if (unlikely(!node->bclink.recv_permitted)) goto unlock; @@ -589,7 +581,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) tipc_node_put(node); goto exit; } - /* Handle in-sequence broadcast message */ seqno = msg_seqno(msg); next_in = mod(node->bclink.last_in + 1); @@ -778,6 +769,19 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, return 0; } +static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq) +{ + struct sk_buff *skb, *tmp; + + skb_queue_walk_safe(xmitq, skb, tmp) { + __skb_dequeue(xmitq); + tipc_bcbearer_send(net, skb, NULL, NULL); + + /* Until we remove cloning in tipc_l2_send_msg(): */ + kfree_skb(skb); + } +} + /** * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer */ diff --git a/net/tipc/link.c b/net/tipc/link.c index dfc738e5cff9..363da5f85704 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -158,6 +158,21 @@ int tipc_link_is_active(struct tipc_link *l) return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); } +void tipc_link_add_bc_peer(struct tipc_link *l) +{ + l->ackers++; +} + +void tipc_link_remove_bc_peer(struct tipc_link *l) +{ + l->ackers--; +} + +int tipc_link_bc_peers(struct tipc_link *l) +{ + return l->ackers; +} + static u32 link_own_addr(struct tipc_link *l) { return msg_prevnode(l->pmsg); @@ -258,6 +273,7 @@ bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, l = *link; strcpy(l->name, tipc_bclink_name); tipc_link_reset(l); + l->ackers = 0; return true; } @@ -898,8 +914,7 @@ static void link_retransmit_failure(struct tipc_link *l_ptr, char addr_string[16]; pr_info("Msg seq number: %u, ", msg_seqno(msg)); - pr_cont("Outstanding acks: %lu\n", - (unsigned long) TIPC_SKB_CB(buf)->handle); + pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers); n_ptr = tipc_bclink_retransmit_to(net); diff --git a/net/tipc/link.h b/net/tipc/link.h index be24d1fd5132..9c4acc26b3b1 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -262,5 +262,9 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq); +void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq); +void tipc_link_add_bc_peer(struct tipc_link *l); +void tipc_link_remove_bc_peer(struct tipc_link *l); +int tipc_link_bc_peers(struct tipc_link *l); #endif diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 26d38b3d8760..5b47468739e7 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -565,18 +565,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err) /* tipc_msg_reassemble() - clone a buffer chain of fragments and * reassemble the clones into one message */ -struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) +bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq) { - struct sk_buff *skb; + struct sk_buff *skb, *_skb; struct sk_buff *frag = NULL; struct sk_buff *head = NULL; - int hdr_sz; + int hdr_len; /* Copy header if single buffer */ if (skb_queue_len(list) == 1) { skb = skb_peek(list); - hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); - return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); + hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); + _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC); + if (!_skb) + return false; + __skb_queue_tail(rcvq, _skb); + return true; } /* Clone all fragments and reassemble */ @@ -590,11 +594,12 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) if (!head) goto error; } - return frag; + __skb_queue_tail(rcvq, frag); + return true; error: pr_warn("Failed do clone local mcast rcv buffer\n"); kfree_skb(head); - return NULL; + return false; } /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 799782c47f6c..fbf51fa1075d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -790,7 +790,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); -struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); +bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb);