diff options
author | Jon Paul Maloy <jon.maloy@ericsson.com> | 2015-10-22 08:51:42 -0400 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2015-10-24 06:56:39 -0700 |
commit | b06b281e79375fcbd9ffaec7c5fdc350b888d089 (patch) | |
tree | 74227c840529e15377aad08e5dc4ddab21fba2cd /net/tipc/bcast.c | |
parent | 5266698661401afc5e4a1a521cf9ba10724d10dd (diff) | |
download | linux-b06b281e79375fcbd9ffaec7c5fdc350b888d089.tar.bz2 |
tipc: simplify bearer level broadcast
Until now, we have been keeping track of the exact set of broadcast
destinations though the help structure tipc_node_map. This leads us to
have to maintain a whole infrastructure for supporting this, including
a pseudo-bearer and a number of functions to manipulate both the bearers
and the node map correctly. Apart from the complexity, this approach is
also limiting, as struct tipc_node_map only can support cluster local
broadcast if we want to avoid it becoming excessively large. We want to
eliminate this limitation, in order to enable introduction of scoped
multicast in the future.
A closer analysis reveals that it is unnecessary maintaining this "full
set" overview; it is sufficient to keep a counter per bearer, indicating
how many nodes can be reached via this bearer at the moment. The protocol
is now robust enough to handle transitional discrepancies between the
nominal number of reachable destinations, as expected by the broadcast
protocol itself, and the number which is actually reachable at the
moment. The initial broadcast synchronization, in conjunction with the
retransmission mechanism, ensures that all packets will eventually be
acknowledged by the correct set of destinations.
This commit introduces these changes.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r-- | net/tipc/bcast.c | 143 |
1 files changed, 110 insertions, 33 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index ea28c2919b38..74ee09ac430d 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -90,10 +90,12 @@ struct tipc_bcbearer { /** * struct tipc_bc_base - link used for broadcast messages - * @link: (non-standard) broadcast link structure + * @link: broadcast send link structure * @node: (non-standard) node structure representing b'cast link's peer node * @bcast_nodes: map of broadcast-capable nodes * @retransmit_to: node that most recently requested a retransmit + * @dest_nnt: array indicating number of reachable destinations per bearer + * @bearers: array of bearers, sorted by number of reachable destinations * * Handles sequence numbering, fragmentation, bundling, etc. */ @@ -103,6 +105,8 @@ struct tipc_bc_base { struct sk_buff_head arrvq; struct sk_buff_head inputq; struct sk_buff_head namedq; + int dests[MAX_BEARERS]; + int primary_bearer; struct tipc_node_map bcast_nodes; struct tipc_node *retransmit_to; }; @@ -164,6 +168,52 @@ static void bcbuf_decr_acks(struct sk_buff *buf) bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); } +/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, + * if any, and make it primary bearer + */ +static void tipc_bcbase_select_primary(struct net *net) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + int all_dests = tipc_link_bc_peers(bb->link); + int i; + + bb->primary_bearer = INVALID_BEARER_ID; + + if (!all_dests) + return; + + for (i = 0; i < MAX_BEARERS; i++) { + if (bb->dests[i] < all_dests) + continue; + + bb->primary_bearer = i; + + /* Reduce risk that all nodes select same primary */ + if ((i ^ tipc_own_addr(net)) & 1) + break; + } +} + +void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + + tipc_bcast_lock(net); + bb->dests[bearer_id]++; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} + +void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + + tipc_bcast_lock(net); + bb->dests[bearer_id]--; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} + static void bclink_set_last_sent(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); @@ -439,6 +489,51 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) tipc_node_put(n_ptr); } +/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers + * + * Note that number of reachable destinations, as indicated in the dests[] + * array, may transitionally differ from the number of destinations indicated + * in each sent buffer. We can sustain this. Excess destination nodes will + * drop and never acknowledge the unexpected packets, and missing destinations + * will either require retransmission (if they are just about to be added to + * the bearer), or be removed from the buffer's 'ackers' counter (if they + * just went down) + */ +static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) +{ + int bearer_id; + struct tipc_bc_base *bb = tipc_bc_base(net); + struct sk_buff *skb, *_skb; + struct sk_buff_head _xmitq; + + if (skb_queue_empty(xmitq)) + return; + + /* The typical case: at least one bearer has links to all nodes */ + bearer_id = bb->primary_bearer; + if (bearer_id >= 0) { + tipc_bearer_bc_xmit(net, bearer_id, xmitq); + return; + } + + /* We have to transmit across all bearers */ + skb_queue_head_init(&_xmitq); + for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { + if (!bb->dests[bearer_id]) + continue; + + skb_queue_walk(xmitq, skb) { + _skb = pskb_copy_for_clone(skb, GFP_ATOMIC); + if (!_skb) + break; + __skb_queue_tail(&_xmitq, _skb); + } + tipc_bearer_bc_xmit(net, bearer_id, &_xmitq); + } + __skb_queue_purge(xmitq); + __skb_queue_purge(&_xmitq); +} + /* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster * and to identified node local sockets * @net: the applicable net namespace @@ -463,7 +558,6 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) tipc_bcast_lock(net); if (tipc_link_bc_peers(l)) rc = tipc_link_xmit(l, list, &xmitq); - bclink_set_last_sent(net); tipc_bcast_unlock(net); /* Don't send to local node if adding to link failed */ @@ -473,7 +567,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) } /* Broadcast to all nodes, inluding local node */ - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); tipc_sk_mcast_rcv(net, &rcvq, &inputq); __skb_queue_purge(list); return 0; @@ -504,8 +598,7 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) rc = tipc_link_rcv(l, skb, NULL); tipc_bcast_unlock(net); - if (!skb_queue_empty(&xmitq)) - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -529,7 +622,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked) tipc_link_bc_ack_rcv(l, acked, &xmitq); tipc_bcast_unlock(net); - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -557,7 +650,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, } tipc_bcast_unlock(net); - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -568,38 +661,35 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, * * RCU is locked, node lock is set */ -void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l, +void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, struct sk_buff_head *xmitq) { - struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_link *snd_l = tipc_bc_sndlink(net); - tipc_bclink_lock(net); - tipc_nmap_add(&tn->bcbase->bcast_nodes, addr); + tipc_bcast_lock(net); tipc_link_add_bc_peer(snd_l, uc_l, xmitq); - tipc_bclink_unlock(net); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); } /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer * * RCU is locked, node lock is set */ -void tipc_bcast_remove_peer(struct net *net, u32 addr, - struct tipc_link *rcv_l) +void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; struct tipc_link *snd_l = tipc_bc_sndlink(net); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; struct sk_buff_head xmitq; __skb_queue_head_init(&xmitq); - tipc_bclink_lock(net); - tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr); + tipc_bcast_lock(net); tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); - tipc_bclink_unlock(net); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -869,19 +959,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, return 0; } -static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq) -{ - struct sk_buff *skb, *tmp; - - skb_queue_walk_safe(xmitq, skb, tmp) { - __skb_dequeue(xmitq); - tipc_bcbearer_send(net, skb, NULL, NULL); - - /* Until we remove cloning in tipc_l2_send_msg(): */ - kfree_skb(skb); - } -} - /** * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer */ |