tipc: clean up handling of message priorities

Messages transferred by TIPC are assigned an "importance priority", -an
integer value indicating how to treat the message when there is link or
destination socket congestion.

There is no separate header field for this value. Instead, the message
user values have been chosen in ascending order according to perceived
importance, so that the message user field can be used for this.

This is not a good solution. First, we have many more users than the
needed priority levels, so we end up with treating more priority
levels than necessary. Second, the user field cannot always
accurately reflect the priority of the message. E.g., a message
fragment packet should really have the priority of the enveloped
user data message, and not the priority of the MSG_FRAGMENTER user.
Until now, we have been working around this problem in different ways,
but it is now time to implement a consistent way of handling such
priorities, although still within the constraint that we cannot
allocate any more bits in the regular data message header for this.

In this commit, we define a new priority level, TIPC_SYSTEM_IMPORTANCE,
that will be the only one used apart from the four (lower) user data
levels. All non-data messages map down to this priority. Furthermore,
we take some free bits from the MSG_FRAGMENTER header and allocate
them to store the priority of the enveloped message. We then adjust
the functions msg_importance()/msg_set_importance() so that they
read/set the correct header fields depending on user type.

This small protocol change is fully compatible, because the code at
the receiving end of a link currently reads the importance level
only from user data messages, where there is no change.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy 2015-03-13 16:08:11 -04:00 committed by David S. Miller
parent 05dcc5aa4d
commit e3eea1eb47
4 changed files with 49 additions and 60 deletions

View File

@ -383,7 +383,6 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
__skb_queue_purge(list);
return -EHOSTUNREACH;
}
/* Broadcast to all nodes */
if (likely(bclink)) {
tipc_bclink_lock(net);

View File

@ -35,6 +35,7 @@
*/
#include "core.h"
#include "subscr.h"
#include "link.h"
#include "bcast.h"
#include "socket.h"
@ -305,12 +306,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
msg_set_session(msg, (tn->random & 0xffff));
msg_set_bearer_id(msg, b_ptr->identity);
strcpy((char *)msg_data(msg), if_name);
l_ptr->priority = b_ptr->priority;
tipc_link_set_queue_limits(l_ptr, b_ptr->window);
l_ptr->net_plane = b_ptr->net_plane;
link_init_max_pkt(l_ptr);
l_ptr->priority = b_ptr->priority;
tipc_link_set_queue_limits(l_ptr, b_ptr->window);
l_ptr->next_out_no = 1;
__skb_queue_head_init(&l_ptr->transmq);
@ -708,7 +707,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
{
struct sk_buff *skb = skb_peek(list);
struct tipc_msg *msg = buf_msg(skb);
uint imp = tipc_msg_tot_importance(msg);
int imp = msg_importance(msg);
u32 oport = msg_tot_origport(msg);
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
@ -745,7 +744,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
{
struct tipc_msg *msg = buf_msg(skb_peek(list));
unsigned int maxwin = link->window;
uint imp = tipc_msg_tot_importance(msg);
unsigned int imp = msg_importance(msg);
uint mtu = link->max_pkt;
uint ack = mod(link->next_in_no - 1);
uint seqno = link->next_out_no;
@ -755,7 +754,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
struct sk_buff_head *backlogq = &link->backlogq;
struct sk_buff *skb, *tmp;
/* Match queue limits against msg importance: */
/* Match queue limit against msg importance: */
if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
return tipc_link_cong(link, list);
@ -1811,25 +1810,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
}
void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{
l_ptr->window = window;
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
/* Data messages from this node, inclusive FIRST_FRAGM */
l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
/* Transiting data messages,inclusive FIRST_FRAGM */
l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
l_ptr->queue_limit[CONN_MANAGER] = 1200;
l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
/* FRAGMENT and LAST_FRAGMENT packets */
l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
l->window = win;
l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2;
l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win;
l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3;
l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk;
}
/* tipc_link_find_owner - locate owner node of link by link's name

View File

@ -272,6 +272,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
msg_set_size(&pkthdr, pktmax);
msg_set_fragm_no(&pkthdr, pktno);
msg_set_importance(&pkthdr, msg_importance(mhdr));
/* Prepare first fragment */
skb = tipc_buf_acquire(pktmax);
@ -467,7 +468,6 @@ bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
int err)
{
struct tipc_msg *msg = buf_msg(buf);
uint imp = msg_importance(msg);
struct tipc_msg ohdr;
uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
@ -479,9 +479,6 @@ bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
if (msg_errcode(msg))
goto exit;
memcpy(&ohdr, msg, msg_hdr_sz(msg));
imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
if (msg_isdata(msg))
msg_set_importance(msg, imp);
msg_set_errcode(msg, err);
msg_set_origport(msg, msg_destport(&ohdr));
msg_set_destport(msg, msg_origport(&ohdr));

View File

@ -54,6 +54,8 @@ struct plist;
* - TIPC_HIGH_IMPORTANCE
* - TIPC_CRITICAL_IMPORTANCE
*/
#define TIPC_SYSTEM_IMPORTANCE 4
/*
* Payload message types
@ -63,6 +65,19 @@ struct plist;
#define TIPC_NAMED_MSG 2
#define TIPC_DIRECT_MSG 3
/*
* Internal message users
*/
#define BCAST_PROTOCOL 5
#define MSG_BUNDLER 6
#define LINK_PROTOCOL 7
#define CONN_MANAGER 8
#define CHANGEOVER_PROTOCOL 10
#define NAME_DISTRIBUTOR 11
#define MSG_FRAGMENTER 12
#define LINK_CONFIG 13
#define SOCK_WAKEUP 14 /* pseudo user */
/*
* Message header sizes
*/
@ -170,16 +185,6 @@ static inline void msg_set_user(struct tipc_msg *m, u32 n)
msg_set_bits(m, 0, 25, 0xf, n);
}
static inline u32 msg_importance(struct tipc_msg *m)
{
return msg_bits(m, 0, 25, 0xf);
}
static inline void msg_set_importance(struct tipc_msg *m, u32 i)
{
msg_set_user(m, i);
}
static inline u32 msg_hdr_sz(struct tipc_msg *m)
{
return msg_bits(m, 0, 21, 0xf) << 2;
@ -336,6 +341,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)
/*
* Words 3-10
*/
static inline u32 msg_importance(struct tipc_msg *m)
{
if (unlikely(msg_user(m) == MSG_FRAGMENTER))
return msg_bits(m, 5, 13, 0x7);
if (likely(msg_isdata(m) && !msg_errcode(m)))
return msg_user(m);
return TIPC_SYSTEM_IMPORTANCE;
}
static inline void msg_set_importance(struct tipc_msg *m, u32 i)
{
if (unlikely(msg_user(m) == MSG_FRAGMENTER))
msg_set_bits(m, 5, 13, 0x7, i);
else if (likely(i < TIPC_SYSTEM_IMPORTANCE))
msg_set_user(m, i);
else
pr_warn("Trying to set illegal importance in message\n");
}
static inline u32 msg_prevnode(struct tipc_msg *m)
{
return msg_word(m, 3);
@ -457,20 +481,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
* Constants and routines used to read and write TIPC internal message headers
*/
/*
* Internal message users
*/
#define BCAST_PROTOCOL 5
#define MSG_BUNDLER 6
#define LINK_PROTOCOL 7
#define CONN_MANAGER 8
#define ROUTE_DISTRIBUTOR 9 /* obsoleted */
#define CHANGEOVER_PROTOCOL 10
#define NAME_DISTRIBUTOR 11
#define MSG_FRAGMENTER 12
#define LINK_CONFIG 13
#define SOCK_WAKEUP 14 /* pseudo user */
/*
* Connection management protocol message types
*/
@ -743,13 +753,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
msg_set_bits(m, 9, 0, 0xffff, n);
}
static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
{
if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
return msg_importance(msg_get_wrapped(m));
return msg_importance(m);
}
static inline u32 msg_tot_origport(struct tipc_msg *m)
{
if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))