tipc: simplify bearer level broadcast

Until now, we have been keeping track of the exact set of broadcast
destinations though the help structure tipc_node_map. This leads us to
have to maintain a whole infrastructure for supporting this, including
a pseudo-bearer and a number of functions to manipulate both the bearers
and the node map correctly. Apart from the complexity, this approach is
also limiting, as struct tipc_node_map only can support cluster local
broadcast if we want to avoid it becoming excessively large. We want to
eliminate this limitation, in order to enable introduction of scoped
multicast in the future.

A closer analysis reveals that it is unnecessary maintaining this "full
set" overview; it is sufficient to keep a counter per bearer, indicating
how many nodes can be reached via this bearer at the moment. The protocol
is now robust enough to handle transitional discrepancies between the
nominal number of reachable destinations, as expected by the broadcast
protocol itself, and the number which is actually reachable at the
moment. The initial broadcast synchronization, in conjunction with the
retransmission mechanism, ensures that all packets will eventually be
acknowledged by the correct set of destinations.

This commit introduces these changes.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy 2015-10-22 08:51:42 -04:00 committed by David S. Miller
parent 5266698661
commit b06b281e79
5 changed files with 151 additions and 45 deletions

View File

@ -90,10 +90,12 @@ struct tipc_bcbearer {
/** /**
* struct tipc_bc_base - link used for broadcast messages * struct tipc_bc_base - link used for broadcast messages
* @link: (non-standard) broadcast link structure * @link: broadcast send link structure
* @node: (non-standard) node structure representing b'cast link's peer node * @node: (non-standard) node structure representing b'cast link's peer node
* @bcast_nodes: map of broadcast-capable nodes * @bcast_nodes: map of broadcast-capable nodes
* @retransmit_to: node that most recently requested a retransmit * @retransmit_to: node that most recently requested a retransmit
* @dest_nnt: array indicating number of reachable destinations per bearer
* @bearers: array of bearers, sorted by number of reachable destinations
* *
* Handles sequence numbering, fragmentation, bundling, etc. * Handles sequence numbering, fragmentation, bundling, etc.
*/ */
@ -103,6 +105,8 @@ struct tipc_bc_base {
struct sk_buff_head arrvq; struct sk_buff_head arrvq;
struct sk_buff_head inputq; struct sk_buff_head inputq;
struct sk_buff_head namedq; struct sk_buff_head namedq;
int dests[MAX_BEARERS];
int primary_bearer;
struct tipc_node_map bcast_nodes; struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to; struct tipc_node *retransmit_to;
}; };
@ -164,6 +168,52 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
} }
/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
* if any, and make it primary bearer
*/
static void tipc_bcbase_select_primary(struct net *net)
{
struct tipc_bc_base *bb = tipc_bc_base(net);
int all_dests = tipc_link_bc_peers(bb->link);
int i;
bb->primary_bearer = INVALID_BEARER_ID;
if (!all_dests)
return;
for (i = 0; i < MAX_BEARERS; i++) {
if (bb->dests[i] < all_dests)
continue;
bb->primary_bearer = i;
/* Reduce risk that all nodes select same primary */
if ((i ^ tipc_own_addr(net)) & 1)
break;
}
}
void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
{
struct tipc_bc_base *bb = tipc_bc_base(net);
tipc_bcast_lock(net);
bb->dests[bearer_id]++;
tipc_bcbase_select_primary(net);
tipc_bcast_unlock(net);
}
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
{
struct tipc_bc_base *bb = tipc_bc_base(net);
tipc_bcast_lock(net);
bb->dests[bearer_id]--;
tipc_bcbase_select_primary(net);
tipc_bcast_unlock(net);
}
static void bclink_set_last_sent(struct net *net) static void bclink_set_last_sent(struct net *net)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_net *tn = net_generic(net, tipc_net_id);
@ -439,6 +489,51 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
tipc_node_put(n_ptr); tipc_node_put(n_ptr);
} }
/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
*
* Note that number of reachable destinations, as indicated in the dests[]
* array, may transitionally differ from the number of destinations indicated
* in each sent buffer. We can sustain this. Excess destination nodes will
* drop and never acknowledge the unexpected packets, and missing destinations
* will either require retransmission (if they are just about to be added to
* the bearer), or be removed from the buffer's 'ackers' counter (if they
* just went down)
*/
static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
{
int bearer_id;
struct tipc_bc_base *bb = tipc_bc_base(net);
struct sk_buff *skb, *_skb;
struct sk_buff_head _xmitq;
if (skb_queue_empty(xmitq))
return;
/* The typical case: at least one bearer has links to all nodes */
bearer_id = bb->primary_bearer;
if (bearer_id >= 0) {
tipc_bearer_bc_xmit(net, bearer_id, xmitq);
return;
}
/* We have to transmit across all bearers */
skb_queue_head_init(&_xmitq);
for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
if (!bb->dests[bearer_id])
continue;
skb_queue_walk(xmitq, skb) {
_skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
if (!_skb)
break;
__skb_queue_tail(&_xmitq, _skb);
}
tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
}
__skb_queue_purge(xmitq);
__skb_queue_purge(&_xmitq);
}
/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster /* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
* and to identified node local sockets * and to identified node local sockets
* @net: the applicable net namespace * @net: the applicable net namespace
@ -463,7 +558,6 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
tipc_bcast_lock(net); tipc_bcast_lock(net);
if (tipc_link_bc_peers(l)) if (tipc_link_bc_peers(l))
rc = tipc_link_xmit(l, list, &xmitq); rc = tipc_link_xmit(l, list, &xmitq);
bclink_set_last_sent(net);
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
/* Don't send to local node if adding to link failed */ /* Don't send to local node if adding to link failed */
@ -473,7 +567,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
} }
/* Broadcast to all nodes, inluding local node */ /* Broadcast to all nodes, inluding local node */
tipc_bcbearer_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
tipc_sk_mcast_rcv(net, &rcvq, &inputq); tipc_sk_mcast_rcv(net, &rcvq, &inputq);
__skb_queue_purge(list); __skb_queue_purge(list);
return 0; return 0;
@ -504,8 +598,7 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
rc = tipc_link_rcv(l, skb, NULL); rc = tipc_link_rcv(l, skb, NULL);
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
if (!skb_queue_empty(&xmitq)) tipc_bcbase_xmit(net, &xmitq);
tipc_bcbearer_xmit(net, &xmitq);
/* Any socket wakeup messages ? */ /* Any socket wakeup messages ? */
if (!skb_queue_empty(inputq)) if (!skb_queue_empty(inputq))
@ -529,7 +622,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
tipc_link_bc_ack_rcv(l, acked, &xmitq); tipc_link_bc_ack_rcv(l, acked, &xmitq);
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
tipc_bcbearer_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
/* Any socket wakeup messages ? */ /* Any socket wakeup messages ? */
if (!skb_queue_empty(inputq)) if (!skb_queue_empty(inputq))
@ -557,7 +650,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
} }
tipc_bcast_unlock(net); tipc_bcast_unlock(net);
tipc_bcbearer_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
/* Any socket wakeup messages ? */ /* Any socket wakeup messages ? */
if (!skb_queue_empty(inputq)) if (!skb_queue_empty(inputq))
@ -568,38 +661,35 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
* *
* RCU is locked, node lock is set * RCU is locked, node lock is set
*/ */
void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l, void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
struct sk_buff_head *xmitq) struct sk_buff_head *xmitq)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct tipc_link *snd_l = tipc_bc_sndlink(net); struct tipc_link *snd_l = tipc_bc_sndlink(net);
tipc_bclink_lock(net); tipc_bcast_lock(net);
tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
tipc_link_add_bc_peer(snd_l, uc_l, xmitq); tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
tipc_bclink_unlock(net); tipc_bcbase_select_primary(net);
tipc_bcast_unlock(net);
} }
/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
* *
* RCU is locked, node lock is set * RCU is locked, node lock is set
*/ */
void tipc_bcast_remove_peer(struct net *net, u32 addr, void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
struct tipc_link *rcv_l)
{ {
struct tipc_net *tn = net_generic(net, tipc_net_id);
struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
struct tipc_link *snd_l = tipc_bc_sndlink(net); struct tipc_link *snd_l = tipc_bc_sndlink(net);
struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
struct sk_buff_head xmitq; struct sk_buff_head xmitq;
__skb_queue_head_init(&xmitq); __skb_queue_head_init(&xmitq);
tipc_bclink_lock(net); tipc_bcast_lock(net);
tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
tipc_bclink_unlock(net); tipc_bcbase_select_primary(net);
tipc_bcast_unlock(net);
tipc_bcbearer_xmit(net, &xmitq); tipc_bcbase_xmit(net, &xmitq);
/* Any socket wakeup messages ? */ /* Any socket wakeup messages ? */
if (!skb_queue_empty(inputq)) if (!skb_queue_empty(inputq))
@ -869,19 +959,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
return 0; return 0;
} }
static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
{
struct sk_buff *skb, *tmp;
skb_queue_walk_safe(xmitq, skb, tmp) {
__skb_dequeue(xmitq);
tipc_bcbearer_send(net, skb, NULL, NULL);
/* Until we remove cloning in tipc_l2_send_msg(): */
kfree_skb(skb);
}
}
/** /**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/ */

View File

@ -47,11 +47,11 @@ struct tipc_node_map;
int tipc_bcast_init(struct net *net); int tipc_bcast_init(struct net *net);
void tipc_bcast_reinit(struct net *net); void tipc_bcast_reinit(struct net *net);
void tipc_bcast_stop(struct net *net); void tipc_bcast_stop(struct net *net);
void tipc_bcast_add_peer(struct net *net, u32 addr, void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
struct tipc_link *l,
struct sk_buff_head *xmitq); struct sk_buff_head *xmitq);
void tipc_bcast_remove_peer(struct net *net, u32 addr, void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
struct tipc_link *rcv_bcl); void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);

View File

@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)
rcu_read_lock(); rcu_read_lock();
b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (b_ptr) { if (b_ptr)
tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
tipc_disc_add_dest(b_ptr->link_req); tipc_disc_add_dest(b_ptr->link_req);
}
rcu_read_unlock(); rcu_read_unlock();
} }
@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)
rcu_read_lock(); rcu_read_lock();
b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (b_ptr) { if (b_ptr)
tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
tipc_disc_remove_dest(b_ptr->link_req); tipc_disc_remove_dest(b_ptr->link_req);
}
rcu_read_unlock(); rcu_read_unlock();
} }
@ -494,6 +490,33 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
rcu_read_unlock(); rcu_read_unlock();
} }
/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
*/
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq)
{
struct tipc_net *tn = tipc_net(net);
int net_id = tn->net_id;
struct tipc_bearer *b;
struct sk_buff *skb, *tmp;
struct tipc_msg *hdr;
rcu_read_lock();
b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
if (likely(b)) {
skb_queue_walk_safe(xmitq, skb, tmp) {
hdr = buf_msg(skb);
msg_set_non_seq(hdr, 1);
msg_set_mc_netid(hdr, net_id);
__skb_dequeue(xmitq);
b->media->send_msg(net, skb, b, &b->bcast_addr);
/* Until we remove cloning in tipc_l2_send_msg(): */
kfree_skb(skb);
}
}
rcu_read_unlock();
}
/** /**
* tipc_l2_rcv_msg - handle incoming TIPC message from an interface * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
* @buf: the received packet * @buf: the received packet

View File

@ -163,6 +163,7 @@ struct tipc_bearer {
u32 identity; u32 identity;
struct tipc_link_req *link_req; struct tipc_link_req *link_req;
char net_plane; char net_plane;
int node_cnt;
struct tipc_node_map nodes; struct tipc_node_map nodes;
}; };
@ -220,5 +221,7 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
void tipc_bearer_xmit(struct net *net, u32 bearer_id, void tipc_bearer_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq, struct sk_buff_head *xmitq,
struct tipc_media_addr *dst); struct tipc_media_addr *dst);
void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
struct sk_buff_head *xmitq);
#endif /* _TIPC_BEARER_H */ #endif /* _TIPC_BEARER_H */

View File

@ -346,6 +346,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
tipc_bearer_add_dest(n->net, bearer_id, n->addr); tipc_bearer_add_dest(n->net, bearer_id, n->addr);
tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
pr_debug("Established link <%s> on network plane %c\n", pr_debug("Established link <%s> on network plane %c\n",
nl->name, nl->net_plane); nl->name, nl->net_plane);
@ -356,7 +357,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
*slot1 = bearer_id; *slot1 = bearer_id;
tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT); tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
n->action_flags |= TIPC_NOTIFY_NODE_UP; n->action_flags |= TIPC_NOTIFY_NODE_UP;
tipc_bcast_add_peer(n->net, n->addr, nl, xmitq); tipc_bcast_add_peer(n->net, nl, xmitq);
return; return;
} }
@ -443,8 +444,10 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
tipc_link_build_reset_msg(l, xmitq); tipc_link_build_reset_msg(l, xmitq);
*maddr = &n->links[*bearer_id].maddr; *maddr = &n->links[*bearer_id].maddr;
node_lost_contact(n, &le->inputq); node_lost_contact(n, &le->inputq);
tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
return; return;
} }
tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
/* There is still a working link => initiate failover */ /* There is still a working link => initiate failover */
tnl = node_active_link(n, 0); tnl = node_active_link(n, 0);
@ -860,7 +863,7 @@ static void node_lost_contact(struct tipc_node *n,
tipc_addr_string_fill(addr_string, n->addr)); tipc_addr_string_fill(addr_string, n->addr));
/* Clean up broadcast state */ /* Clean up broadcast state */
tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link); tipc_bcast_remove_peer(n->net, n->bc_entry.link);
/* Abort any ongoing link failover */ /* Abort any ongoing link failover */
for (i = 0; i < MAX_BEARERS; i++) { for (i = 0; i < MAX_BEARERS; i++) {