Merge branch 'tipc'

Jon Maloy says:

====================
tipc: clean up media and bearer layer

This commit series aims at facilitating future changes to the
locking policy around nodes, links and bearers.

Currently, we have a big read/write lock (net_lock) that is used for
serializing all changes to the node, link and bearer lists, as well
as to their mutual pointers and references.

But, in order to allow for concurrent access to the contents of these
structures, net_lock is only used in read mode by the data path code,
and hence a finer granular locking policy must be applied inside the
scope of net_lock: a spinlock (node_lock) for each node structure,
and another one (bearer_lock) for protection of bearer structures.

This locking policy has proved hard to maintain. We have several
times encountered contention problems between node_lock and
bearer_lock, and with the advent of the RCU locking mechanism we
feel it is anyway obsolete and ripe for improvements.

We now plan to replace net_lock with an RCU lock, as well as
getting rid of bearer_lock altogether. This will both reduce data
path overhead and make the code more manageable, while reducing the
risk of future lock contention problems.

Prior to these changes, we need to do some necessary cleanup and
code consolidation. This is what we do with this commit series,
before we finally remove bearer_lock. In a later series we will
replace net_lock with an RCU lock.

v2:
 - Re-inserted a removed kerneldoc entry in commit#5, based on
   feedback from D. Miller.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
David S. Miller 2014-02-13 17:57:11 -05:00
commit ca52b6647f
7 changed files with 286 additions and 252 deletions

View File

@ -481,9 +481,9 @@ receive:
tipc_link_recv_bundle(buf); tipc_link_recv_bundle(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
int ret; int ret;
ret = tipc_link_recv_fragment(&node->bclink.reasm_head, ret = tipc_link_frag_rcv(&node->bclink.reasm_head,
&node->bclink.reasm_tail, &node->bclink.reasm_tail,
&buf); &buf);
if (ret == LINK_REASM_ERROR) if (ret == LINK_REASM_ERROR)
goto unlock; goto unlock;
spin_lock_bh(&bc_lock); spin_lock_bh(&bc_lock);
@ -785,7 +785,6 @@ void tipc_bclink_init(void)
bcl->owner = &bclink->node; bcl->owner = &bclink->node;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
spin_lock_init(&bcbearer->bearer.lock);
bcl->b_ptr = &bcbearer->bearer; bcl->b_ptr = &bcbearer->bearer;
bcl->state = WORKING_WORKING; bcl->state = WORKING_WORKING;
strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);

View File

@ -51,7 +51,7 @@ static struct tipc_media * const media_info_array[] = {
struct tipc_bearer tipc_bearers[MAX_BEARERS]; struct tipc_bearer tipc_bearers[MAX_BEARERS];
static void bearer_disable(struct tipc_bearer *b_ptr); static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down);
/** /**
* tipc_media_find - locates specified media object by name * tipc_media_find - locates specified media object by name
@ -327,12 +327,10 @@ restart:
b_ptr->net_plane = bearer_id + 'A'; b_ptr->net_plane = bearer_id + 'A';
b_ptr->active = 1; b_ptr->active = 1;
b_ptr->priority = priority; b_ptr->priority = priority;
INIT_LIST_HEAD(&b_ptr->links);
spin_lock_init(&b_ptr->lock);
res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr, disc_domain); res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr, disc_domain);
if (res) { if (res) {
bearer_disable(b_ptr); bearer_disable(b_ptr, false);
pr_warn("Bearer <%s> rejected, discovery object creation failed\n", pr_warn("Bearer <%s> rejected, discovery object creation failed\n",
name); name);
goto exit; goto exit;
@ -350,20 +348,9 @@ exit:
*/ */
static int tipc_reset_bearer(struct tipc_bearer *b_ptr) static int tipc_reset_bearer(struct tipc_bearer *b_ptr)
{ {
struct tipc_link *l_ptr;
struct tipc_link *temp_l_ptr;
read_lock_bh(&tipc_net_lock); read_lock_bh(&tipc_net_lock);
pr_info("Resetting bearer <%s>\n", b_ptr->name); pr_info("Resetting bearer <%s>\n", b_ptr->name);
spin_lock_bh(&b_ptr->lock); tipc_link_reset_list(b_ptr->identity);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
struct tipc_node *n_ptr = l_ptr->owner;
spin_lock_bh(&n_ptr->lock);
tipc_link_reset(l_ptr);
spin_unlock_bh(&n_ptr->lock);
}
spin_unlock_bh(&b_ptr->lock);
read_unlock_bh(&tipc_net_lock); read_unlock_bh(&tipc_net_lock);
return 0; return 0;
} }
@ -373,25 +360,14 @@ static int tipc_reset_bearer(struct tipc_bearer *b_ptr)
* *
* Note: This routine assumes caller holds tipc_net_lock. * Note: This routine assumes caller holds tipc_net_lock.
*/ */
static void bearer_disable(struct tipc_bearer *b_ptr) static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)
{ {
struct tipc_link *l_ptr;
struct tipc_link *temp_l_ptr;
struct tipc_link_req *temp_req;
pr_info("Disabling bearer <%s>\n", b_ptr->name); pr_info("Disabling bearer <%s>\n", b_ptr->name);
spin_lock_bh(&b_ptr->lock);
b_ptr->media->disable_media(b_ptr); b_ptr->media->disable_media(b_ptr);
list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
tipc_link_delete(l_ptr);
}
temp_req = b_ptr->link_req;
b_ptr->link_req = NULL;
spin_unlock_bh(&b_ptr->lock);
if (temp_req)
tipc_disc_delete(temp_req);
tipc_link_delete_list(b_ptr->identity, shutting_down);
if (b_ptr->link_req)
tipc_disc_delete(b_ptr->link_req);
memset(b_ptr, 0, sizeof(struct tipc_bearer)); memset(b_ptr, 0, sizeof(struct tipc_bearer));
} }
@ -406,7 +382,7 @@ int tipc_disable_bearer(const char *name)
pr_warn("Attempt to disable unknown bearer <%s>\n", name); pr_warn("Attempt to disable unknown bearer <%s>\n", name);
res = -EINVAL; res = -EINVAL;
} else { } else {
bearer_disable(b_ptr); bearer_disable(b_ptr, false);
res = 0; res = 0;
} }
write_unlock_bh(&tipc_net_lock); write_unlock_bh(&tipc_net_lock);
@ -626,6 +602,6 @@ void tipc_bearer_stop(void)
for (i = 0; i < MAX_BEARERS; i++) { for (i = 0; i < MAX_BEARERS; i++) {
if (tipc_bearers[i].active) if (tipc_bearers[i].active)
bearer_disable(&tipc_bearers[i]); bearer_disable(&tipc_bearers[i], true);
} }
} }

View File

@ -107,10 +107,8 @@ struct tipc_media {
/** /**
* struct tipc_bearer - Generic TIPC bearer structure * struct tipc_bearer - Generic TIPC bearer structure
* @dev: ptr to associated network device * @media_ptr: pointer to additional media-specific information about bearer
* @usr_handle: pointer to additional media-specific information about bearer
* @mtu: max packet size bearer can support * @mtu: max packet size bearer can support
* @lock: spinlock for controlling access to bearer
* @addr: media-specific address associated with bearer * @addr: media-specific address associated with bearer
* @name: bearer name (format = media:interface) * @name: bearer name (format = media:interface)
* @media: ptr to media structure associated with bearer * @media: ptr to media structure associated with bearer
@ -120,7 +118,6 @@ struct tipc_media {
* @tolerance: default link tolerance for bearer * @tolerance: default link tolerance for bearer
* @identity: array index of this bearer within TIPC bearer array * @identity: array index of this bearer within TIPC bearer array
* @link_req: ptr to (optional) structure making periodic link setup requests * @link_req: ptr to (optional) structure making periodic link setup requests
* @links: list of non-congested links associated with bearer
* @active: non-zero if bearer structure is represents a bearer * @active: non-zero if bearer structure is represents a bearer
* @net_plane: network plane ('A' through 'H') currently associated with bearer * @net_plane: network plane ('A' through 'H') currently associated with bearer
* @nodes: indicates which nodes in cluster can be reached through bearer * @nodes: indicates which nodes in cluster can be reached through bearer
@ -134,7 +131,6 @@ struct tipc_bearer {
u32 mtu; /* initalized by media */ u32 mtu; /* initalized by media */
struct tipc_media_addr addr; /* initalized by media */ struct tipc_media_addr addr; /* initalized by media */
char name[TIPC_MAX_BEARER_NAME]; char name[TIPC_MAX_BEARER_NAME];
spinlock_t lock;
struct tipc_media *media; struct tipc_media *media;
struct tipc_media_addr bcast_addr; struct tipc_media_addr bcast_addr;
u32 priority; u32 priority;
@ -142,7 +138,6 @@ struct tipc_bearer {
u32 tolerance; u32 tolerance;
u32 identity; u32 identity;
struct tipc_link_req *link_req; struct tipc_link_req *link_req;
struct list_head links;
int active; int active;
char net_plane; char net_plane;
struct tipc_node_map nodes; struct tipc_node_map nodes;

View File

@ -1,7 +1,7 @@
/* /*
* net/tipc/core.c: TIPC module code * net/tipc/core.c: TIPC module code
* *
* Copyright (c) 2003-2006, Ericsson AB * Copyright (c) 2003-2006, 2013, Ericsson AB
* Copyright (c) 2005-2006, 2010-2013, Wind River Systems * Copyright (c) 2005-2006, 2010-2013, Wind River Systems
* All rights reserved. * All rights reserved.
* *

View File

@ -78,7 +78,7 @@ static const char *link_unk_evt = "Unknown link event ";
static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
struct sk_buff *buf); struct sk_buff *buf);
static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf); static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
struct sk_buff **buf); struct sk_buff **buf);
static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
static int link_send_sections_long(struct tipc_port *sender, static int link_send_sections_long(struct tipc_port *sender,
@ -147,11 +147,6 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
/** /**
* link_timeout - handle expiration of link timer * link_timeout - handle expiration of link timer
* @l_ptr: pointer to link * @l_ptr: pointer to link
*
* This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
* with tipc_link_delete(). (There is no risk that the node will be deleted by
* another thread because tipc_link_delete() always cancels the link timer before
* tipc_node_delete() is called.)
*/ */
static void link_timeout(struct tipc_link *l_ptr) static void link_timeout(struct tipc_link *l_ptr)
{ {
@ -213,8 +208,8 @@ static void link_set_timer(struct tipc_link *l_ptr, u32 time)
* Returns pointer to link. * Returns pointer to link.
*/ */
struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
struct tipc_bearer *b_ptr, struct tipc_bearer *b_ptr,
const struct tipc_media_addr *media_addr) const struct tipc_media_addr *media_addr)
{ {
struct tipc_link *l_ptr; struct tipc_link *l_ptr;
struct tipc_msg *msg; struct tipc_msg *msg;
@ -279,41 +274,43 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
k_init_timer(&l_ptr->timer, (Handler)link_timeout, k_init_timer(&l_ptr->timer, (Handler)link_timeout,
(unsigned long)l_ptr); (unsigned long)l_ptr);
list_add_tail(&l_ptr->link_list, &b_ptr->links);
link_state_event(l_ptr, STARTING_EVT); link_state_event(l_ptr, STARTING_EVT);
return l_ptr; return l_ptr;
} }
/**
* tipc_link_delete - delete a link void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
* @l_ptr: pointer to link
*
* Note: 'tipc_net_lock' is write_locked, bearer is locked.
* This routine must not grab the node lock until after link timer cancellation
* to avoid a potential deadlock situation.
*/
void tipc_link_delete(struct tipc_link *l_ptr)
{ {
if (!l_ptr) { struct tipc_link *l_ptr;
pr_err("Attempt to delete non-existent link\n"); struct tipc_node *n_ptr;
return;
list_for_each_entry(n_ptr, &tipc_node_list, list) {
spin_lock_bh(&n_ptr->lock);
l_ptr = n_ptr->links[bearer_id];
if (l_ptr) {
tipc_link_reset(l_ptr);
if (shutting_down || !tipc_node_is_up(n_ptr)) {
tipc_node_detach_link(l_ptr->owner, l_ptr);
tipc_link_reset_fragments(l_ptr);
spin_unlock_bh(&n_ptr->lock);
/* Nobody else can access this link now: */
del_timer_sync(&l_ptr->timer);
kfree(l_ptr);
} else {
/* Detach/delete when failover is finished: */
l_ptr->flags |= LINK_STOPPED;
spin_unlock_bh(&n_ptr->lock);
del_timer_sync(&l_ptr->timer);
}
continue;
}
spin_unlock_bh(&n_ptr->lock);
} }
k_cancel_timer(&l_ptr->timer);
tipc_node_lock(l_ptr->owner);
tipc_link_reset(l_ptr);
tipc_node_detach_link(l_ptr->owner, l_ptr);
tipc_link_purge_queues(l_ptr);
list_del_init(&l_ptr->link_list);
tipc_node_unlock(l_ptr->owner);
k_term_timer(&l_ptr->timer);
kfree(l_ptr);
} }
/** /**
* link_schedule_port - schedule port for deferred sending * link_schedule_port - schedule port for deferred sending
* @l_ptr: pointer to link * @l_ptr: pointer to link
@ -461,6 +458,19 @@ void tipc_link_reset(struct tipc_link *l_ptr)
link_reset_statistics(l_ptr); link_reset_statistics(l_ptr);
} }
void tipc_link_reset_list(unsigned int bearer_id)
{
struct tipc_link *l_ptr;
struct tipc_node *n_ptr;
list_for_each_entry(n_ptr, &tipc_node_list, list) {
spin_lock_bh(&n_ptr->lock);
l_ptr = n_ptr->links[bearer_id];
if (l_ptr)
tipc_link_reset(l_ptr);
spin_unlock_bh(&n_ptr->lock);
}
}
static void link_activate(struct tipc_link *l_ptr) static void link_activate(struct tipc_link *l_ptr)
{ {
@ -479,7 +489,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
struct tipc_link *other; struct tipc_link *other;
u32 cont_intv = l_ptr->continuity_interval; u32 cont_intv = l_ptr->continuity_interval;
if (!l_ptr->started && (event != STARTING_EVT)) if (l_ptr->flags & LINK_STOPPED)
return;
if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
return; /* Not yet. */ return; /* Not yet. */
/* Check whether changeover is going on */ /* Check whether changeover is going on */
@ -605,7 +618,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
link_set_timer(l_ptr, cont_intv); link_set_timer(l_ptr, cont_intv);
break; break;
case STARTING_EVT: case STARTING_EVT:
l_ptr->started = 1; l_ptr->flags |= LINK_STARTED;
/* fall through */ /* fall through */
case TIMEOUT_EVT: case TIMEOUT_EVT:
tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
@ -1435,7 +1448,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
u32 seq_no; u32 seq_no;
u32 ackd; u32 ackd;
u32 released = 0; u32 released = 0;
int type;
head = head->next; head = head->next;
buf->next = NULL; buf->next = NULL;
@ -1483,7 +1495,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
if ((n_ptr->block_setup & WAIT_PEER_DOWN) && if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
msg_user(msg) == LINK_PROTOCOL && msg_user(msg) == LINK_PROTOCOL &&
(msg_type(msg) == RESET_MSG || (msg_type(msg) == RESET_MSG ||
msg_type(msg) == ACTIVATE_MSG) && msg_type(msg) == ACTIVATE_MSG) &&
!msg_redundant_link(msg)) !msg_redundant_link(msg))
n_ptr->block_setup &= ~WAIT_PEER_DOWN; n_ptr->block_setup &= ~WAIT_PEER_DOWN;
@ -1502,7 +1514,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
while ((crs != l_ptr->next_out) && while ((crs != l_ptr->next_out) &&
less_eq(buf_seqno(crs), ackd)) { less_eq(buf_seqno(crs), ackd)) {
struct sk_buff *next = crs->next; struct sk_buff *next = crs->next;
kfree_skb(crs); kfree_skb(crs);
crs = next; crs = next;
released++; released++;
@ -1515,15 +1526,17 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
/* Try sending any messages link endpoint has pending */ /* Try sending any messages link endpoint has pending */
if (unlikely(l_ptr->next_out)) if (unlikely(l_ptr->next_out))
tipc_link_push_queue(l_ptr); tipc_link_push_queue(l_ptr);
if (unlikely(!list_empty(&l_ptr->waiting_ports))) if (unlikely(!list_empty(&l_ptr->waiting_ports)))
tipc_link_wakeup_ports(l_ptr, 0); tipc_link_wakeup_ports(l_ptr, 0);
if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
l_ptr->stats.sent_acks++; l_ptr->stats.sent_acks++;
tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); tipc_link_send_proto_msg(l_ptr, STATE_MSG,
0, 0, 0, 0, 0);
} }
/* Now (finally!) process the incoming message */ /* Process the incoming packet */
protocol_check:
if (unlikely(!link_working_working(l_ptr))) { if (unlikely(!link_working_working(l_ptr))) {
if (msg_user(msg) == LINK_PROTOCOL) { if (msg_user(msg) == LINK_PROTOCOL) {
link_recv_proto_msg(l_ptr, buf); link_recv_proto_msg(l_ptr, buf);
@ -1555,14 +1568,40 @@ protocol_check:
l_ptr->next_in_no++; l_ptr->next_in_no++;
if (unlikely(l_ptr->oldest_deferred_in)) if (unlikely(l_ptr->oldest_deferred_in))
head = link_insert_deferred_queue(l_ptr, head); head = link_insert_deferred_queue(l_ptr, head);
deliver:
if (likely(msg_isdata(msg))) { /* Deliver packet/message to correct user: */
if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) {
if (!tipc_link_tunnel_rcv(n_ptr, &buf)) {
tipc_node_unlock(n_ptr);
continue;
}
msg = buf_msg(buf);
} else if (msg_user(msg) == MSG_FRAGMENTER) {
int rc;
l_ptr->stats.recv_fragments++;
rc = tipc_link_frag_rcv(&l_ptr->reasm_head,
&l_ptr->reasm_tail,
&buf);
if (rc == LINK_REASM_COMPLETE) {
l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf);
} else {
if (rc == LINK_REASM_ERROR)
tipc_link_reset(l_ptr);
tipc_node_unlock(n_ptr);
continue;
}
}
switch (msg_user(msg)) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
case TIPC_HIGH_IMPORTANCE:
case TIPC_CRITICAL_IMPORTANCE:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
tipc_port_recv_msg(buf); tipc_port_recv_msg(buf);
continue; continue;
}
switch (msg_user(msg)) {
int ret;
case MSG_BUNDLER: case MSG_BUNDLER:
l_ptr->stats.recv_bundles++; l_ptr->stats.recv_bundles++;
l_ptr->stats.recv_bundled += msg_msgcnt(msg); l_ptr->stats.recv_bundled += msg_msgcnt(msg);
@ -1574,48 +1613,20 @@ deliver:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
tipc_named_recv(buf); tipc_named_recv(buf);
continue; continue;
case BCAST_PROTOCOL:
tipc_link_recv_sync(n_ptr, buf);
tipc_node_unlock(n_ptr);
continue;
case CONN_MANAGER: case CONN_MANAGER:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
tipc_port_recv_proto_msg(buf); tipc_port_recv_proto_msg(buf);
continue; continue;
case MSG_FRAGMENTER: case BCAST_PROTOCOL:
l_ptr->stats.recv_fragments++; tipc_link_recv_sync(n_ptr, buf);
ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
&l_ptr->reasm_tail,
&buf);
if (ret == LINK_REASM_COMPLETE) {
l_ptr->stats.recv_fragmented++;
msg = buf_msg(buf);
goto deliver;
}
if (ret == LINK_REASM_ERROR)
tipc_link_reset(l_ptr);
tipc_node_unlock(n_ptr);
continue;
case CHANGEOVER_PROTOCOL:
type = msg_type(msg);
if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {
msg = buf_msg(buf);
seq_no = msg_seqno(msg);
if (type == ORIGINAL_MSG)
goto deliver;
goto protocol_check;
}
break; break;
default: default:
kfree_skb(buf); kfree_skb(buf);
buf = NULL;
break; break;
} }
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
tipc_net_route_msg(buf);
continue; continue;
unlock_discard: unlock_discard:
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
discard: discard:
kfree_skb(buf); kfree_skb(buf);
@ -2105,83 +2116,108 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
return eb; return eb;
} }
/* tipc_link_tunnel_rcv(): Receive a tunneled packet, sent
/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
* Owner node is locked.
*/
static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
struct sk_buff *t_buf)
{
struct sk_buff *buf;
if (!tipc_link_is_up(l_ptr))
return;
buf = buf_extract(t_buf, INT_H_SIZE);
if (buf == NULL) {
pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
return;
}
/* Add buffer to deferred queue, if applicable: */
link_handle_out_of_seq_msg(l_ptr, buf);
}
/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
* Owner node is locked.
*/
static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
struct sk_buff *t_buf)
{
struct tipc_msg *t_msg = buf_msg(t_buf);
struct sk_buff *buf = NULL;
struct tipc_msg *msg;
if (tipc_link_is_up(l_ptr))
tipc_link_reset(l_ptr);
/* First failover packet? */
if (l_ptr->exp_msg_count == START_CHANGEOVER)
l_ptr->exp_msg_count = msg_msgcnt(t_msg);
/* Should there be an inner packet? */
if (l_ptr->exp_msg_count) {
l_ptr->exp_msg_count--;
buf = buf_extract(t_buf, INT_H_SIZE);
if (buf == NULL) {
pr_warn("%sno inner failover pkt\n", link_co_err);
goto exit;
}
msg = buf_msg(buf);
if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
kfree_skb(buf);
buf = NULL;
goto exit;
}
if (msg_user(msg) == MSG_FRAGMENTER) {
l_ptr->stats.recv_fragments++;
tipc_link_frag_rcv(&l_ptr->reasm_head,
&l_ptr->reasm_tail,
&buf);
}
}
exit:
if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) {
tipc_node_detach_link(l_ptr->owner, l_ptr);
kfree(l_ptr);
}
return buf;
}
/* tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
* via other link as result of a failover (ORIGINAL_MSG) or * via other link as result of a failover (ORIGINAL_MSG) or
* a new active link (DUPLICATE_MSG). Failover packets are * a new active link (DUPLICATE_MSG). Failover packets are
* returned to the active link for delivery upwards. * returned to the active link for delivery upwards.
* Owner node is locked. * Owner node is locked.
*/ */
static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr, static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
struct sk_buff **buf) struct sk_buff **buf)
{ {
struct sk_buff *tunnel_buf = *buf; struct sk_buff *t_buf = *buf;
struct tipc_link *dest_link; struct tipc_link *l_ptr;
struct tipc_msg *msg; struct tipc_msg *t_msg = buf_msg(t_buf);
struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); u32 bearer_id = msg_bearer_id(t_msg);
u32 msg_typ = msg_type(tunnel_msg);
u32 msg_count = msg_msgcnt(tunnel_msg); *buf = NULL;
u32 bearer_id = msg_bearer_id(tunnel_msg);
if (bearer_id >= MAX_BEARERS) if (bearer_id >= MAX_BEARERS)
goto exit; goto exit;
dest_link = (*l_ptr)->owner->links[bearer_id];
if (!dest_link)
goto exit;
if (dest_link == *l_ptr) {
pr_err("Unexpected changeover message on link <%s>\n",
(*l_ptr)->name);
goto exit;
}
*l_ptr = dest_link;
msg = msg_get_wrapped(tunnel_msg);
if (msg_typ == DUPLICATE_MSG) { l_ptr = n_ptr->links[bearer_id];
if (less(msg_seqno(msg), mod(dest_link->next_in_no))) if (!l_ptr)
goto exit;
*buf = buf_extract(tunnel_buf, INT_H_SIZE);
if (*buf == NULL) {
pr_warn("%sduplicate msg dropped\n", link_co_err);
goto exit;
}
kfree_skb(tunnel_buf);
return 1;
}
/* First original message ?: */
if (tipc_link_is_up(dest_link)) {
pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg,
dest_link->name);
tipc_link_reset(dest_link);
dest_link->exp_msg_count = msg_count;
if (!msg_count)
goto exit;
} else if (dest_link->exp_msg_count == START_CHANGEOVER) {
dest_link->exp_msg_count = msg_count;
if (!msg_count)
goto exit;
}
/* Receive original message */
if (dest_link->exp_msg_count == 0) {
pr_warn("%sgot too many tunnelled messages\n", link_co_err);
goto exit; goto exit;
}
dest_link->exp_msg_count--; if (msg_type(t_msg) == DUPLICATE_MSG)
if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { tipc_link_dup_rcv(l_ptr, t_buf);
goto exit; else if (msg_type(t_msg) == ORIGINAL_MSG)
} else { *buf = tipc_link_failover_rcv(l_ptr, t_buf);
*buf = buf_extract(tunnel_buf, INT_H_SIZE); else
if (*buf != NULL) { pr_warn("%sunknown tunnel pkt received\n", link_co_err);
kfree_skb(tunnel_buf);
return 1;
} else {
pr_warn("%soriginal msg dropped\n", link_co_err);
}
}
exit: exit:
*buf = NULL; kfree_skb(t_buf);
kfree_skb(tunnel_buf); return *buf != NULL;
return 0;
} }
/* /*
@ -2277,12 +2313,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
return dsz; return dsz;
} }
/* /* tipc_link_frag_rcv(): Called with node lock on. Returns
* tipc_link_recv_fragment(): Called with node lock on. Returns
* the reassembled buffer if message is complete. * the reassembled buffer if message is complete.
*/ */
int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail, int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail,
struct sk_buff **fbuf) struct sk_buff **fbuf)
{ {
struct sk_buff *frag = *fbuf; struct sk_buff *frag = *fbuf;
struct tipc_msg *msg = buf_msg(frag); struct tipc_msg *msg = buf_msg(frag);
@ -2296,6 +2331,7 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
goto out_free; goto out_free;
*head = frag; *head = frag;
skb_frag_list_init(*head); skb_frag_list_init(*head);
*fbuf = NULL;
return 0; return 0;
} else if (*head && } else if (*head &&
skb_try_coalesce(*head, frag, &headstolen, &delta)) { skb_try_coalesce(*head, frag, &headstolen, &delta)) {
@ -2315,10 +2351,12 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
*tail = *head = NULL; *tail = *head = NULL;
return LINK_REASM_COMPLETE; return LINK_REASM_COMPLETE;
} }
*fbuf = NULL;
return 0; return 0;
out_free: out_free:
pr_warn_ratelimited("Link unable to reassemble fragmented message\n"); pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
kfree_skb(*fbuf); kfree_skb(*fbuf);
*fbuf = NULL;
return LINK_REASM_ERROR; return LINK_REASM_ERROR;
} }
@ -2352,35 +2390,40 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
} }
/** /* tipc_link_find_owner - locate owner node of link by link's name
* link_find_link - locate link by name * @name: pointer to link name string
* @name: ptr to link name string * @bearer_id: pointer to index in 'node->links' array where the link was found.
* @node: ptr to area to be filled with ptr to associated node
*
* Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
* this also prevents link deletion. * this also prevents link deletion.
* *
* Returns pointer to link (or 0 if invalid link name). * Returns pointer to node owning the link, or 0 if no matching link is found.
*/ */
static struct tipc_link *link_find_link(const char *name, static struct tipc_node *tipc_link_find_owner(const char *link_name,
struct tipc_node **node) unsigned int *bearer_id)
{ {
struct tipc_link *l_ptr; struct tipc_link *l_ptr;
struct tipc_node *n_ptr; struct tipc_node *n_ptr;
struct tipc_node *tmp_n_ptr;
struct tipc_node *found_node = 0;
int i; int i;
list_for_each_entry(n_ptr, &tipc_node_list, list) { *bearer_id = 0;
list_for_each_entry_safe(n_ptr, tmp_n_ptr, &tipc_node_list, list) {
spin_lock(&n_ptr->lock);
for (i = 0; i < MAX_BEARERS; i++) { for (i = 0; i < MAX_BEARERS; i++) {
l_ptr = n_ptr->links[i]; l_ptr = n_ptr->links[i];
if (l_ptr && !strcmp(l_ptr->name, name)) if (l_ptr && !strcmp(l_ptr->name, link_name)) {
goto found; *bearer_id = i;
found_node = n_ptr;
break;
}
} }
spin_unlock(&n_ptr->lock);
if (found_node)
break;
} }
l_ptr = NULL; return found_node;
n_ptr = NULL;
found:
*node = n_ptr;
return l_ptr;
} }
/** /**
@ -2422,32 +2465,33 @@ static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
struct tipc_link *l_ptr; struct tipc_link *l_ptr;
struct tipc_bearer *b_ptr; struct tipc_bearer *b_ptr;
struct tipc_media *m_ptr; struct tipc_media *m_ptr;
int bearer_id;
int res = 0; int res = 0;
l_ptr = link_find_link(name, &node); node = tipc_link_find_owner(name, &bearer_id);
if (l_ptr) { if (node) {
/*
* acquire node lock for tipc_link_send_proto_msg().
* see "TIPC locking policy" in net.c.
*/
tipc_node_lock(node); tipc_node_lock(node);
switch (cmd) { l_ptr = node->links[bearer_id];
case TIPC_CMD_SET_LINK_TOL:
link_set_supervision_props(l_ptr, new_value); if (l_ptr) {
tipc_link_send_proto_msg(l_ptr, switch (cmd) {
STATE_MSG, 0, 0, new_value, 0, 0); case TIPC_CMD_SET_LINK_TOL:
break; link_set_supervision_props(l_ptr, new_value);
case TIPC_CMD_SET_LINK_PRI: tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0,
l_ptr->priority = new_value; 0, new_value, 0, 0);
tipc_link_send_proto_msg(l_ptr, break;
STATE_MSG, 0, 0, 0, new_value, 0); case TIPC_CMD_SET_LINK_PRI:
break; l_ptr->priority = new_value;
case TIPC_CMD_SET_LINK_WINDOW: tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0,
tipc_link_set_queue_limits(l_ptr, new_value); 0, 0, new_value, 0);
break; break;
default: case TIPC_CMD_SET_LINK_WINDOW:
res = -EINVAL; tipc_link_set_queue_limits(l_ptr, new_value);
break; break;
default:
res = -EINVAL;
break;
}
} }
tipc_node_unlock(node); tipc_node_unlock(node);
return res; return res;
@ -2542,6 +2586,7 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
char *link_name; char *link_name;
struct tipc_link *l_ptr; struct tipc_link *l_ptr;
struct tipc_node *node; struct tipc_node *node;
unsigned int bearer_id;
if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@ -2552,15 +2597,19 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
return tipc_cfg_reply_error_string("link not found"); return tipc_cfg_reply_error_string("link not found");
return tipc_cfg_reply_none(); return tipc_cfg_reply_none();
} }
read_lock_bh(&tipc_net_lock); read_lock_bh(&tipc_net_lock);
l_ptr = link_find_link(link_name, &node); node = tipc_link_find_owner(link_name, &bearer_id);
if (!l_ptr) { if (!node) {
read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_error_string("link not found");
}
spin_lock(&node->lock);
l_ptr = node->links[bearer_id];
if (!l_ptr) {
tipc_node_unlock(node);
read_unlock_bh(&tipc_net_lock); read_unlock_bh(&tipc_net_lock);
return tipc_cfg_reply_error_string("link not found"); return tipc_cfg_reply_error_string("link not found");
} }
tipc_node_lock(node);
link_reset_statistics(l_ptr); link_reset_statistics(l_ptr);
tipc_node_unlock(node); tipc_node_unlock(node);
read_unlock_bh(&tipc_net_lock); read_unlock_bh(&tipc_net_lock);
@ -2590,18 +2639,27 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
struct tipc_node *node; struct tipc_node *node;
char *status; char *status;
u32 profile_total = 0; u32 profile_total = 0;
unsigned int bearer_id;
int ret; int ret;
if (!strcmp(name, tipc_bclink_name)) if (!strcmp(name, tipc_bclink_name))
return tipc_bclink_stats(buf, buf_size); return tipc_bclink_stats(buf, buf_size);
read_lock_bh(&tipc_net_lock); read_lock_bh(&tipc_net_lock);
l = link_find_link(name, &node); node = tipc_link_find_owner(name, &bearer_id);
if (!l) { if (!node) {
read_unlock_bh(&tipc_net_lock); read_unlock_bh(&tipc_net_lock);
return 0; return 0;
} }
tipc_node_lock(node); tipc_node_lock(node);
l = node->links[bearer_id];
if (!l) {
tipc_node_unlock(node);
read_unlock_bh(&tipc_net_lock);
return 0;
}
s = &l->stats; s = &l->stats;
if (tipc_link_is_active(l)) if (tipc_link_is_active(l))

View File

@ -1,7 +1,7 @@
/* /*
* net/tipc/link.h: Include file for TIPC link code * net/tipc/link.h: Include file for TIPC link code
* *
* Copyright (c) 1995-2006, Ericsson AB * Copyright (c) 1995-2006, 2013, Ericsson AB
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *
@ -40,27 +40,28 @@
#include "msg.h" #include "msg.h"
#include "node.h" #include "node.h"
/* /* Link reassembly status codes
* Link reassembly status codes
*/ */
#define LINK_REASM_ERROR -1 #define LINK_REASM_ERROR -1
#define LINK_REASM_COMPLETE 1 #define LINK_REASM_COMPLETE 1
/* /* Out-of-range value for link sequence numbers
* Out-of-range value for link sequence numbers
*/ */
#define INVALID_LINK_SEQ 0x10000 #define INVALID_LINK_SEQ 0x10000
/* /* Link working states
* Link states
*/ */
#define WORKING_WORKING 560810u #define WORKING_WORKING 560810u
#define WORKING_UNKNOWN 560811u #define WORKING_UNKNOWN 560811u
#define RESET_UNKNOWN 560812u #define RESET_UNKNOWN 560812u
#define RESET_RESET 560813u #define RESET_RESET 560813u
/* /* Link endpoint execution states
* Starting value for maximum packet size negotiation on unicast links */
#define LINK_STARTED 0x0001
#define LINK_STOPPED 0x0002
/* Starting value for maximum packet size negotiation on unicast links
* (unless bearer MTU is less) * (unless bearer MTU is less)
*/ */
#define MAX_PKT_DEFAULT 1500 #define MAX_PKT_DEFAULT 1500
@ -102,8 +103,7 @@ struct tipc_stats {
* @media_addr: media address to use when sending messages over link * @media_addr: media address to use when sending messages over link
* @timer: link timer * @timer: link timer
* @owner: pointer to peer node * @owner: pointer to peer node
* @link_list: adjacent links in bearer's list of links * @flags: execution state flags for link endpoint instance
* @started: indicates if link has been started
* @checkpoint: reference point for triggering link continuity checking * @checkpoint: reference point for triggering link continuity checking
* @peer_session: link session # being used by peer end of link * @peer_session: link session # being used by peer end of link
* @peer_bearer_id: bearer id used by link's peer endpoint * @peer_bearer_id: bearer id used by link's peer endpoint
@ -149,10 +149,9 @@ struct tipc_link {
struct tipc_media_addr media_addr; struct tipc_media_addr media_addr;
struct timer_list timer; struct timer_list timer;
struct tipc_node *owner; struct tipc_node *owner;
struct list_head link_list;
/* Management and link supervision data */ /* Management and link supervision data */
int started; unsigned int flags;
u32 checkpoint; u32 checkpoint;
u32 peer_session; u32 peer_session;
u32 peer_bearer_id; u32 peer_bearer_id;
@ -215,7 +214,7 @@ struct tipc_port;
struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
struct tipc_bearer *b_ptr, struct tipc_bearer *b_ptr,
const struct tipc_media_addr *media_addr); const struct tipc_media_addr *media_addr);
void tipc_link_delete(struct tipc_link *l_ptr); void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down);
void tipc_link_failover_send_queue(struct tipc_link *l_ptr); void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
void tipc_link_dup_send_queue(struct tipc_link *l_ptr, void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
struct tipc_link *dest); struct tipc_link *dest);
@ -231,6 +230,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area,
struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
int req_tlv_space); int req_tlv_space);
void tipc_link_reset(struct tipc_link *l_ptr); void tipc_link_reset(struct tipc_link *l_ptr);
void tipc_link_reset_list(unsigned int bearer_id);
int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector); int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector);
void tipc_link_send_names(struct list_head *message_list, u32 dest); void tipc_link_send_names(struct list_head *message_list, u32 dest);
int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf); int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
@ -239,9 +239,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
struct iovec const *msg_sect, struct iovec const *msg_sect,
unsigned int len, u32 destnode); unsigned int len, u32 destnode);
void tipc_link_recv_bundle(struct sk_buff *buf); void tipc_link_recv_bundle(struct sk_buff *buf);
int tipc_link_recv_fragment(struct sk_buff **reasm_head, int tipc_link_frag_rcv(struct sk_buff **reasm_head,
struct sk_buff **reasm_tail, struct sk_buff **reasm_tail,
struct sk_buff **fbuf); struct sk_buff **fbuf);
void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob, void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob,
u32 gap, u32 tolerance, u32 priority, u32 gap, u32 tolerance, u32 priority,
u32 acked_mtu); u32 acked_mtu);

View File

@ -249,7 +249,13 @@ void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr) void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
{ {
n_ptr->links[l_ptr->b_ptr->identity] = NULL; int i;
for (i = 0; i < MAX_BEARERS; i++) {
if (l_ptr == n_ptr->links[i])
break;
}
n_ptr->links[i] = NULL;
atomic_dec(&tipc_num_links); atomic_dec(&tipc_num_links);
n_ptr->link_cnt--; n_ptr->link_cnt--;
} }