tipc: adapt link failover for new Gap-ACK algorithm

In commit 0ae955e2656d ("tipc: improve TIPC throughput by Gap ACK
blocks"), we enhance the link transmq by releasing as many packets as
possible with the multi-ACKs from peer node. This also means the queue
is now non-linear and the peer link deferdq becomes vital.

Whereas, in the case of link failover, all messages in the link transmq
need to be transmitted as tunnel messages in such a way that message
sequentiality and cardinality per sender is preserved. This requires us
to maintain the link deferdq somehow, so that when the tunnel messages
arrive, the inner user messages along with the ones in the deferdq will
be delivered to upper layer correctly.

The commit accomplishes this by defining a new queue in the TIPC link
structure to hold the old link deferdq when link failover happens and
process it upon receipt of tunnel messages.

Also, in the case of link syncing, the link deferdq will not be purged
to avoid unnecessary retransmissions that in the worst case will fail
because the packets might have been freed on the sending side.

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Tuong Lien 2019-04-04 11:09:53 +07:00 committed by David S. Miller
parent 382f598fb6
commit 58ee86b8c7

View File

@ -151,6 +151,7 @@ struct tipc_link {
/* Failover/synch */ /* Failover/synch */
u16 drop_point; u16 drop_point;
struct sk_buff *failover_reasm_skb; struct sk_buff *failover_reasm_skb;
struct sk_buff_head failover_deferdq;
/* Max packet negotiation */ /* Max packet negotiation */
u16 mtu; u16 mtu;
@ -498,6 +499,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
__skb_queue_head_init(&l->transmq); __skb_queue_head_init(&l->transmq);
__skb_queue_head_init(&l->backlogq); __skb_queue_head_init(&l->backlogq);
__skb_queue_head_init(&l->deferdq); __skb_queue_head_init(&l->deferdq);
__skb_queue_head_init(&l->failover_deferdq);
skb_queue_head_init(&l->wakeupq); skb_queue_head_init(&l->wakeupq);
skb_queue_head_init(l->inputq); skb_queue_head_init(l->inputq);
return true; return true;
@ -888,6 +890,7 @@ void tipc_link_reset(struct tipc_link *l)
__skb_queue_purge(&l->transmq); __skb_queue_purge(&l->transmq);
__skb_queue_purge(&l->deferdq); __skb_queue_purge(&l->deferdq);
__skb_queue_purge(&l->backlogq); __skb_queue_purge(&l->backlogq);
__skb_queue_purge(&l->failover_deferdq);
l->backlog[TIPC_LOW_IMPORTANCE].len = 0; l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
@ -1159,34 +1162,14 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
* Consumes buffer * Consumes buffer
*/ */
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq) struct sk_buff_head *inputq,
struct sk_buff **reasm_skb)
{ {
struct tipc_msg *hdr = buf_msg(skb); struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff **reasm_skb = &l->reasm_buf;
struct sk_buff *iskb; struct sk_buff *iskb;
struct sk_buff_head tmpq; struct sk_buff_head tmpq;
int usr = msg_user(hdr); int usr = msg_user(hdr);
int rc = 0;
int pos = 0; int pos = 0;
int ipos = 0;
if (unlikely(usr == TUNNEL_PROTOCOL)) {
if (msg_type(hdr) == SYNCH_MSG) {
__skb_queue_purge(&l->deferdq);
goto drop;
}
if (!tipc_msg_extract(skb, &iskb, &ipos))
return rc;
kfree_skb(skb);
skb = iskb;
hdr = buf_msg(skb);
if (less(msg_seqno(hdr), l->drop_point))
goto drop;
if (tipc_data_input(l, skb, inputq))
return rc;
usr = msg_user(hdr);
reasm_skb = &l->failover_reasm_skb;
}
if (usr == MSG_BUNDLER) { if (usr == MSG_BUNDLER) {
skb_queue_head_init(&tmpq); skb_queue_head_init(&tmpq);
@ -1211,11 +1194,66 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
tipc_link_bc_init_rcv(l->bc_rcvlink, hdr); tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
tipc_bcast_unlock(l->net); tipc_bcast_unlock(l->net);
} }
drop:
kfree_skb(skb); kfree_skb(skb);
return 0; return 0;
} }
/* tipc_link_tnl_rcv() - receive TUNNEL_PROTOCOL message, drop or process the
* inner message along with the ones in the old link's
* deferdq
* @l: tunnel link
* @skb: TUNNEL_PROTOCOL message
* @inputq: queue to put messages ready for delivery
*/
static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
struct sk_buff **reasm_skb = &l->failover_reasm_skb;
struct sk_buff_head *fdefq = &l->failover_deferdq;
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff *iskb;
int ipos = 0;
int rc = 0;
u16 seqno;
/* SYNCH_MSG */
if (msg_type(hdr) == SYNCH_MSG)
goto drop;
/* FAILOVER_MSG */
if (!tipc_msg_extract(skb, &iskb, &ipos)) {
pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n",
skb_queue_len(fdefq));
return rc;
}
do {
seqno = buf_seqno(iskb);
if (unlikely(less(seqno, l->drop_point))) {
kfree_skb(iskb);
continue;
}
if (unlikely(seqno != l->drop_point)) {
__tipc_skb_queue_sorted(fdefq, seqno, iskb);
continue;
}
l->drop_point++;
if (!tipc_data_input(l, iskb, inputq))
rc |= tipc_link_input(l, iskb, inputq, reasm_skb);
if (unlikely(rc))
break;
} while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point)));
drop:
kfree_skb(skb);
return rc;
}
static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked) static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
{ {
bool released = false; bool released = false;
@ -1457,8 +1495,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
/* Deliver packet */ /* Deliver packet */
l->rcv_nxt++; l->rcv_nxt++;
l->stats.recv_pkts++; l->stats.recv_pkts++;
if (!tipc_data_input(l, skb, l->inputq))
rc |= tipc_link_input(l, skb, l->inputq); if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL))
rc |= tipc_link_tnl_rcv(l, skb, l->inputq);
else if (!tipc_data_input(l, skb, l->inputq))
rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
rc |= tipc_link_build_state_msg(l, xmitq); rc |= tipc_link_build_state_msg(l, xmitq);
if (unlikely(rc & ~TIPC_LINK_SND_STATE)) if (unlikely(rc & ~TIPC_LINK_SND_STATE))
@ -1588,6 +1629,7 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l,
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
int mtyp, struct sk_buff_head *xmitq) int mtyp, struct sk_buff_head *xmitq)
{ {
struct sk_buff_head *fdefq = &tnl->failover_deferdq;
struct sk_buff *skb, *tnlskb; struct sk_buff *skb, *tnlskb;
struct tipc_msg *hdr, tnlhdr; struct tipc_msg *hdr, tnlhdr;
struct sk_buff_head *queue = &l->transmq; struct sk_buff_head *queue = &l->transmq;
@ -1615,7 +1657,11 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
/* Initialize reusable tunnel packet header */ /* Initialize reusable tunnel packet header */
tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL,
mtyp, INT_H_SIZE, l->addr); mtyp, INT_H_SIZE, l->addr);
pktcnt = skb_queue_len(&l->transmq) + skb_queue_len(&l->backlogq); if (mtyp == SYNCH_MSG)
pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq));
else
pktcnt = skb_queue_len(&l->transmq);
pktcnt += skb_queue_len(&l->backlogq);
msg_set_msgcnt(&tnlhdr, pktcnt); msg_set_msgcnt(&tnlhdr, pktcnt);
msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); msg_set_bearer_id(&tnlhdr, l->peer_bearer_id);
tnl: tnl:
@ -1646,6 +1692,14 @@ tnl:
tnl->drop_point = l->rcv_nxt; tnl->drop_point = l->rcv_nxt;
tnl->failover_reasm_skb = l->reasm_buf; tnl->failover_reasm_skb = l->reasm_buf;
l->reasm_buf = NULL; l->reasm_buf = NULL;
/* Failover the link's deferdq */
if (unlikely(!skb_queue_empty(fdefq))) {
pr_warn("Link failover deferdq not empty: %d!\n",
skb_queue_len(fdefq));
__skb_queue_purge(fdefq);
}
skb_queue_splice_init(&l->deferdq, fdefq);
} }
} }