diff options
-rw-r--r-- | net/tipc/bcast.c | 143 | ||||
-rw-r--r-- | net/tipc/bcast.h | 8 | ||||
-rw-r--r-- | net/tipc/bearer.c | 35 | ||||
-rw-r--r-- | net/tipc/bearer.h | 3 | ||||
-rw-r--r-- | net/tipc/node.c | 7 |
5 files changed, 151 insertions, 45 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index ea28c2919b38..74ee09ac430d 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -90,10 +90,12 @@ struct tipc_bcbearer { /** * struct tipc_bc_base - link used for broadcast messages - * @link: (non-standard) broadcast link structure + * @link: broadcast send link structure * @node: (non-standard) node structure representing b'cast link's peer node * @bcast_nodes: map of broadcast-capable nodes * @retransmit_to: node that most recently requested a retransmit + * @dest_nnt: array indicating number of reachable destinations per bearer + * @bearers: array of bearers, sorted by number of reachable destinations * * Handles sequence numbering, fragmentation, bundling, etc. */ @@ -103,6 +105,8 @@ struct tipc_bc_base { struct sk_buff_head arrvq; struct sk_buff_head inputq; struct sk_buff_head namedq; + int dests[MAX_BEARERS]; + int primary_bearer; struct tipc_node_map bcast_nodes; struct tipc_node *retransmit_to; }; @@ -164,6 +168,52 @@ static void bcbuf_decr_acks(struct sk_buff *buf) bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); } +/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, + * if any, and make it primary bearer + */ +static void tipc_bcbase_select_primary(struct net *net) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + int all_dests = tipc_link_bc_peers(bb->link); + int i; + + bb->primary_bearer = INVALID_BEARER_ID; + + if (!all_dests) + return; + + for (i = 0; i < MAX_BEARERS; i++) { + if (bb->dests[i] < all_dests) + continue; + + bb->primary_bearer = i; + + /* Reduce risk that all nodes select same primary */ + if ((i ^ tipc_own_addr(net)) & 1) + break; + } +} + +void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + + tipc_bcast_lock(net); + bb->dests[bearer_id]++; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} + +void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) +{ + struct tipc_bc_base *bb = tipc_bc_base(net); + + tipc_bcast_lock(net); + bb->dests[bearer_id]--; + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); +} + static void bclink_set_last_sent(struct net *net) { struct tipc_net *tn = net_generic(net, tipc_net_id); @@ -439,6 +489,51 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) tipc_node_put(n_ptr); } +/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers + * + * Note that number of reachable destinations, as indicated in the dests[] + * array, may transitionally differ from the number of destinations indicated + * in each sent buffer. We can sustain this. Excess destination nodes will + * drop and never acknowledge the unexpected packets, and missing destinations + * will either require retransmission (if they are just about to be added to + * the bearer), or be removed from the buffer's 'ackers' counter (if they + * just went down) + */ +static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) +{ + int bearer_id; + struct tipc_bc_base *bb = tipc_bc_base(net); + struct sk_buff *skb, *_skb; + struct sk_buff_head _xmitq; + + if (skb_queue_empty(xmitq)) + return; + + /* The typical case: at least one bearer has links to all nodes */ + bearer_id = bb->primary_bearer; + if (bearer_id >= 0) { + tipc_bearer_bc_xmit(net, bearer_id, xmitq); + return; + } + + /* We have to transmit across all bearers */ + skb_queue_head_init(&_xmitq); + for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { + if (!bb->dests[bearer_id]) + continue; + + skb_queue_walk(xmitq, skb) { + _skb = pskb_copy_for_clone(skb, GFP_ATOMIC); + if (!_skb) + break; + __skb_queue_tail(&_xmitq, _skb); + } + tipc_bearer_bc_xmit(net, bearer_id, &_xmitq); + } + __skb_queue_purge(xmitq); + __skb_queue_purge(&_xmitq); +} + /* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster * and to identified node local sockets * @net: the applicable net namespace @@ -463,7 +558,6 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) tipc_bcast_lock(net); if (tipc_link_bc_peers(l)) rc = tipc_link_xmit(l, list, &xmitq); - bclink_set_last_sent(net); tipc_bcast_unlock(net); /* Don't send to local node if adding to link failed */ @@ -473,7 +567,7 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) } /* Broadcast to all nodes, inluding local node */ - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); tipc_sk_mcast_rcv(net, &rcvq, &inputq); __skb_queue_purge(list); return 0; @@ -504,8 +598,7 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) rc = tipc_link_rcv(l, skb, NULL); tipc_bcast_unlock(net); - if (!skb_queue_empty(&xmitq)) - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -529,7 +622,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked) tipc_link_bc_ack_rcv(l, acked, &xmitq); tipc_bcast_unlock(net); - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -557,7 +650,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, } tipc_bcast_unlock(net); - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -568,38 +661,35 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, * * RCU is locked, node lock is set */ -void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l, +void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, struct sk_buff_head *xmitq) { - struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_link *snd_l = tipc_bc_sndlink(net); - tipc_bclink_lock(net); - tipc_nmap_add(&tn->bcbase->bcast_nodes, addr); + tipc_bcast_lock(net); tipc_link_add_bc_peer(snd_l, uc_l, xmitq); - tipc_bclink_unlock(net); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); } /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer * * RCU is locked, node lock is set */ -void tipc_bcast_remove_peer(struct net *net, u32 addr, - struct tipc_link *rcv_l) +void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) { - struct tipc_net *tn = net_generic(net, tipc_net_id); - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; struct tipc_link *snd_l = tipc_bc_sndlink(net); + struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; struct sk_buff_head xmitq; __skb_queue_head_init(&xmitq); - tipc_bclink_lock(net); - tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr); + tipc_bcast_lock(net); tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); - tipc_bclink_unlock(net); + tipc_bcbase_select_primary(net); + tipc_bcast_unlock(net); - tipc_bcbearer_xmit(net, &xmitq); + tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ if (!skb_queue_empty(inputq)) @@ -869,19 +959,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, return 0; } -static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq) -{ - struct sk_buff *skb, *tmp; - - skb_queue_walk_safe(xmitq, skb, tmp) { - __skb_dequeue(xmitq); - tipc_bcbearer_send(net, skb, NULL, NULL); - - /* Until we remove cloning in tipc_l2_send_msg(): */ - kfree_skb(skb); - } -} - /** * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer */ diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 568a57cd89e6..76b747a73b0b 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -47,11 +47,11 @@ struct tipc_node_map; int tipc_bcast_init(struct net *net); void tipc_bcast_reinit(struct net *net); void tipc_bcast_stop(struct net *net); -void tipc_bcast_add_peer(struct net *net, u32 addr, - struct tipc_link *l, +void tipc_bcast_add_peer(struct net *net, struct tipc_link *l, struct sk_buff_head *xmitq); -void tipc_bcast_remove_peer(struct net *net, u32 addr, - struct tipc_link *rcv_bcl); +void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); +void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); +void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 82b278668ab7..62f47ecc6b84 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest) rcu_read_lock(); b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); - if (b_ptr) { - tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true); + if (b_ptr) tipc_disc_add_dest(b_ptr->link_req); - } rcu_read_unlock(); } @@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest) rcu_read_lock(); b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); - if (b_ptr) { - tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false); + if (b_ptr) tipc_disc_remove_dest(b_ptr->link_req); - } rcu_read_unlock(); } @@ -494,6 +490,33 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id, rcu_read_unlock(); } +/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations + */ +void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq) +{ + struct tipc_net *tn = tipc_net(net); + int net_id = tn->net_id; + struct tipc_bearer *b; + struct sk_buff *skb, *tmp; + struct tipc_msg *hdr; + + rcu_read_lock(); + b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + if (likely(b)) { + skb_queue_walk_safe(xmitq, skb, tmp) { + hdr = buf_msg(skb); + msg_set_non_seq(hdr, 1); + msg_set_mc_netid(hdr, net_id); + __skb_dequeue(xmitq); + b->media->send_msg(net, skb, b, &b->bcast_addr); + /* Until we remove cloning in tipc_l2_send_msg(): */ + kfree_skb(skb); + } + } + rcu_read_unlock(); +} + /** * tipc_l2_rcv_msg - handle incoming TIPC message from an interface * @buf: the received packet diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 6426f242f626..9fc1e074f7c0 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -163,6 +163,7 @@ struct tipc_bearer { u32 identity; struct tipc_link_req *link_req; char net_plane; + int node_cnt; struct tipc_node_map nodes; }; @@ -220,5 +221,7 @@ void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, void tipc_bearer_xmit(struct net *net, u32 bearer_id, struct sk_buff_head *xmitq, struct tipc_media_addr *dst); +void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id, + struct sk_buff_head *xmitq); #endif /* _TIPC_BEARER_H */ diff --git a/net/tipc/node.c b/net/tipc/node.c index cd924552244b..b27439097d6c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -346,6 +346,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; tipc_bearer_add_dest(n->net, bearer_id, n->addr); + tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id); pr_debug("Established link <%s> on network plane %c\n", nl->name, nl->net_plane); @@ -356,7 +357,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id, *slot1 = bearer_id; tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT); n->action_flags |= TIPC_NOTIFY_NODE_UP; - tipc_bcast_add_peer(n->net, n->addr, nl, xmitq); + tipc_bcast_add_peer(n->net, nl, xmitq); return; } @@ -443,8 +444,10 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, tipc_link_build_reset_msg(l, xmitq); *maddr = &n->links[*bearer_id].maddr; node_lost_contact(n, &le->inputq); + tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id); return; } + tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id); /* There is still a working link => initiate failover */ tnl = node_active_link(n, 0); @@ -860,7 +863,7 @@ static void node_lost_contact(struct tipc_node *n, tipc_addr_string_fill(addr_string, n->addr)); /* Clean up broadcast state */ - tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link); + tipc_bcast_remove_peer(n->net, n->bc_entry.link); /* Abort any ongoing link failover */ for (i = 0; i < MAX_BEARERS; i++) { |