From 14c04493cb77bc38404dbcb39d5ccbb667831ad7 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:17 +0200 Subject: tipc: add ability to order and receive topology events in driver As preparation for introducing communication groups, we add the ability to issue topology subscriptions and receive topology events from kernel space. This will make it possible for group member sockets to keep track of other group members. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/msg.h | 1 + 1 file changed, 1 insertion(+) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/msg.h b/net/tipc/msg.h index c843fd2bc48d..d058b1c464e9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -78,6 +78,7 @@ struct plist; #define MSG_FRAGMENTER 12 #define LINK_CONFIG 13 #define SOCK_WAKEUP 14 /* pseudo user */ +#define TOP_SRV 15 /* pseudo user */ /* * Message header sizes -- cgit v1.2.3 From 64ac5f5977df5b276374fb2f051082129f5cdb22 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:20 +0200 Subject: tipc: refactor function filter_rcv() In the following commits we will need to handle multiple incoming and rejected/returned buffers in the function socket.c::filter_rcv(). As a preparation for this, we generalize the function by handling buffer queues instead of individual buffers. We also introduce a help function tipc_skb_reject(), and rename filter_rcv() to tipc_sk_filter_rcv() in line with other functions in socket.c. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/msg.c | 7 +++ net/tipc/msg.h | 2 + net/tipc/socket.c | 161 +++++++++++++++++++++++++++--------------------------- 3 files changed, 89 insertions(+), 81 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 17146c16ee2d..1649d456e22d 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, } kfree_skb(skb); } + +void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) + __skb_queue_tail(xmitq, skb); +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index d058b1c464e9..be3e38aa9dd2 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -819,6 +819,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr) struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp); bool tipc_msg_validate(struct sk_buff *skb); bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err); +void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, + struct sk_buff_head *xmitq); void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, u32 hsize, u32 destnode); struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 7659e792ecdb..bc226f5a1be3 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -111,7 +111,7 @@ struct tipc_sock { struct rcu_head rcu; }; -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); +static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); static void tipc_data_ready(struct sock *sk); static void tipc_write_space(struct sock *sk); static void tipc_sock_destruct(struct sock *sk); @@ -453,7 +453,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, msg_set_origport(msg, tsk->portid); setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); sk->sk_shutdown = 0; - sk->sk_backlog_rcv = tipc_backlog_rcv; + sk->sk_backlog_rcv = tipc_sk_backlog_rcv; sk->sk_rcvbuf = sysctl_tipc_rmem[1]; sk->sk_data_ready = tipc_data_ready; sk->sk_write_space = tipc_write_space; @@ -850,16 +850,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, } /** - * tipc_sk_proto_rcv - receive a connection mng protocol message + * tipc_sk_conn_proto_rcv - receive a connection mng protocol message * @tsk: receiving socket * @skb: pointer to message buffer. */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, - struct sk_buff_head *xmitq) +static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { - struct sock *sk = &tsk->sk; - u32 onode = tsk_own_node(tsk); struct tipc_msg *hdr = buf_msg(skb); + u32 onode = tsk_own_node(tsk); + struct sock *sk = &tsk->sk; int mtyp = msg_type(hdr); bool conn_cong; @@ -1536,14 +1536,41 @@ static void tipc_sock_destruct(struct sock *sk) __skb_queue_purge(&sk->sk_receive_queue); } +static void tipc_sk_proto_rcv(struct sock *sk, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = buf_msg(skb); + + switch (msg_user(hdr)) { + case CONN_MANAGER: + tipc_sk_conn_proto_rcv(tsk, skb, xmitq); + return; + case SOCK_WAKEUP: + u32_del(&tsk->cong_links, msg_orignode(hdr)); + tsk->cong_link_cnt--; + sk->sk_write_space(sk); + break; + case TOP_SRV: + tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); + break; + default: + break; + } + + kfree_skb(skb); +} + /** - * filter_connect - Handle all incoming messages for a connection-based socket + * tipc_filter_connect - Handle incoming message for a connection-based socket * @tsk: TIPC socket * @skb: pointer to message buffer. Set to NULL if buffer is consumed * * Returns true if everything ok, false otherwise */ -static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) +static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) { struct sock *sk = &tsk->sk; struct net *net = sock_net(sk); @@ -1657,7 +1684,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) } /** - * filter_rcv - validate incoming message + * tipc_sk_filter_rcv - validate incoming message * @sk: socket * @skb: pointer to message. * @@ -1666,75 +1693,49 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) * * Called with socket lock already taken * - * Returns true if message was added to socket receive queue, otherwise false */ -static bool filter_rcv(struct sock *sk, struct sk_buff *skb, - struct sk_buff_head *xmitq) +static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { + bool sk_conn = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); - unsigned int limit = rcvbuf_limit(sk, skb); - int err = TIPC_OK; + struct net *net = sock_net(sk); + struct sk_buff_head inputq; + int limit, err = TIPC_OK; - if (unlikely(!msg_isdata(hdr))) { - switch (msg_user(hdr)) { - case CONN_MANAGER: - tipc_sk_proto_rcv(tsk, skb, xmitq); - return false; - case SOCK_WAKEUP: - u32_del(&tsk->cong_links, msg_orignode(hdr)); - tsk->cong_link_cnt--; - sk->sk_write_space(sk); - break; - case TOP_SRV: - tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); - break; - default: - break; - } - kfree_skb(skb); - return false; - } + TIPC_SKB_CB(skb)->bytes_read = 0; + __skb_queue_head_init(&inputq); + __skb_queue_tail(&inputq, skb); - /* Drop if illegal message type */ - if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { - kfree_skb(skb); - return false; - } + if (unlikely(!msg_isdata(hdr))) + tipc_sk_proto_rcv(sk, &inputq, xmitq); + else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) + return kfree_skb(skb); - /* Reject if wrong message type for current socket state */ - if (tipc_sk_type_connectionless(sk)) { - if (msg_connected(hdr)) { + /* Validate and add to receive buffer if there is space */ + while ((skb = __skb_dequeue(&inputq))) { + hdr = buf_msg(skb); + limit = rcvbuf_limit(sk, skb); + if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || + (!sk_conn && msg_connected(hdr))) err = TIPC_ERR_NO_PORT; - goto reject; - } - } else if (unlikely(!filter_connect(tsk, skb))) { - err = TIPC_ERR_NO_PORT; - goto reject; - } + else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) + err = TIPC_ERR_OVERLOAD; - /* Reject message if there isn't room to queue it */ - if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { - err = TIPC_ERR_OVERLOAD; - goto reject; + if (unlikely(err)) { + tipc_skb_reject(net, err, skb, xmitq); + err = TIPC_OK; + continue; + } + __skb_queue_tail(&sk->sk_receive_queue, skb); + skb_set_owner_r(skb, sk); + sk->sk_data_ready(sk); } - - /* Enqueue message */ - TIPC_SKB_CB(skb)->bytes_read = 0; - __skb_queue_tail(&sk->sk_receive_queue, skb); - skb_set_owner_r(skb, sk); - - sk->sk_data_ready(sk); - return true; - -reject: - if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err)) - __skb_queue_tail(xmitq, skb); - return false; } /** - * tipc_backlog_rcv - handle incoming message from backlog queue + * tipc_sk_backlog_rcv - handle incoming message from backlog queue * @sk: socket * @skb: message * @@ -1742,27 +1743,25 @@ reject: * * Returns 0 */ -static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) +static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) { - unsigned int truesize = skb->truesize; + unsigned int before = sk_rmem_alloc_get(sk); struct sk_buff_head xmitq; u32 dnode, selector; + unsigned int added; __skb_queue_head_init(&xmitq); - if (likely(filter_rcv(sk, skb, &xmitq))) { - atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); - return 0; - } - - if (skb_queue_empty(&xmitq)) - return 0; + tipc_sk_filter_rcv(sk, skb, &xmitq); + added = sk_rmem_alloc_get(sk) - before; + atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt); - /* Send response/rejected message */ - skb = __skb_dequeue(&xmitq); - dnode = msg_destnode(buf_msg(skb)); - selector = msg_origport(buf_msg(skb)); - tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); + /* Send pending response/rejected messages, if any */ + while ((skb = __skb_dequeue(&xmitq))) { + selector = msg_origport(buf_msg(skb)); + dnode = msg_destnode(buf_msg(skb)); + tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); + } return 0; } @@ -1794,7 +1793,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, /* Add message directly to receive queue if possible */ if (!sock_owned_by_user(sk)) { - filter_rcv(sk, skb, xmitq); + tipc_sk_filter_rcv(sk, skb, xmitq); continue; } -- cgit v1.2.3 From 75da2163dbb6af9f2dce1d80056d11d290dd19a5 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:23 +0200 Subject: tipc: introduce communication groups As a preparation for introducing flow control for multicast and datagram messaging we need a more strictly defined framework than we have now. A socket must be able keep track of exactly how many and which other sockets it is allowed to communicate with at any moment, and keep the necessary state for those. We therefore introduce a new concept we have named Communication Group. Sockets can join a group via a new setsockopt() call TIPC_GROUP_JOIN. The call takes four parameters: 'type' serves as group identifier, 'instance' serves as an logical member identifier, and 'scope' indicates the visibility of the group (node/cluster/zone). Finally, 'flags' makes it possible to set certain properties for the member. For now, there is only one flag, indicating if the creator of the socket wants to receive a copy of broadcast or multicast messages it is sending via the socket, and if wants to be eligible as destination for its own anycasts. A group is closed, i.e., sockets which have not joined a group will not be able to send messages to or receive messages from members of the group, and vice versa. Any member of a group can send multicast ('group broadcast') messages to all group members, optionally including itself, using the primitive send(). The messages are received via the recvmsg() primitive. A socket can only be member of one group at a time. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- include/uapi/linux/tipc.h | 14 ++ net/tipc/Makefile | 2 +- net/tipc/group.c | 404 ++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/group.h | 64 ++++++++ net/tipc/link.c | 3 +- net/tipc/msg.h | 50 +++++- net/tipc/name_table.c | 44 +++-- net/tipc/name_table.h | 3 + net/tipc/node.h | 3 +- net/tipc/socket.c | 209 ++++++++++++++++++++---- 10 files changed, 748 insertions(+), 48 deletions(-) create mode 100644 net/tipc/group.c create mode 100644 net/tipc/group.h (limited to 'net/tipc/msg.h') diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h index 5351b08c897a..5f7b2c4a09ab 100644 --- a/include/uapi/linux/tipc.h +++ b/include/uapi/linux/tipc.h @@ -231,6 +231,20 @@ struct sockaddr_tipc { #define TIPC_SOCK_RECVQ_DEPTH 132 /* Default: none (read only) */ #define TIPC_MCAST_BROADCAST 133 /* Default: TIPC selects. No arg */ #define TIPC_MCAST_REPLICAST 134 /* Default: TIPC selects. No arg */ +#define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */ +#define TIPC_GROUP_LEAVE 136 /* No argument */ + +/* + * Flag values + */ +#define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ + +struct tipc_group_req { + __u32 type; /* group id */ + __u32 instance; /* member id */ + __u32 scope; /* zone/cluster/node */ + __u32 flags; +}; /* * Maximum sizes of TIPC bearer-related names (including terminating NULL) diff --git a/net/tipc/Makefile b/net/tipc/Makefile index 31b9f9c52974..a3af73ec0b78 100644 --- a/net/tipc/Makefile +++ b/net/tipc/Makefile @@ -8,7 +8,7 @@ tipc-y += addr.o bcast.o bearer.o \ core.o link.o discover.o msg.o \ name_distr.o subscr.o monitor.o name_table.o net.o \ netlink.o netlink_compat.o node.o socket.o eth_media.o \ - server.o socket.o + server.o socket.o group.o tipc-$(CONFIG_TIPC_MEDIA_UDP) += udp_media.o tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o diff --git a/net/tipc/group.c b/net/tipc/group.c new file mode 100644 index 000000000000..3f0e1ce1e3b9 --- /dev/null +++ b/net/tipc/group.c @@ -0,0 +1,404 @@ +/* + * net/tipc/group.c: TIPC group messaging code + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "core.h" +#include "addr.h" +#include "group.h" +#include "bcast.h" +#include "server.h" +#include "msg.h" +#include "socket.h" +#include "node.h" +#include "name_table.h" +#include "subscr.h" + +#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) +#define ADV_IDLE ADV_UNIT + +enum mbr_state { + MBR_QUARANTINED, + MBR_DISCOVERED, + MBR_JOINING, + MBR_PUBLISHED, + MBR_JOINED, + MBR_LEAVING +}; + +struct tipc_member { + struct rb_node tree_node; + struct list_head list; + u32 node; + u32 port; + enum mbr_state state; + u16 bc_rcv_nxt; +}; + +struct tipc_group { + struct rb_root members; + struct tipc_nlist dests; + struct net *net; + int subid; + u32 type; + u32 instance; + u32 domain; + u32 scope; + u32 portid; + u16 member_cnt; + u16 bc_snd_nxt; + bool loopback; +}; + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq); + +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) +{ + return grp->bc_snd_nxt; +} + +static bool tipc_group_is_receiver(struct tipc_member *m) +{ + return m && m->state >= MBR_JOINED; +} + +int tipc_group_size(struct tipc_group *grp) +{ + return grp->member_cnt; +} + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq) +{ + struct tipc_group *grp; + u32 type = mreq->type; + + grp = kzalloc(sizeof(*grp), GFP_ATOMIC); + if (!grp) + return NULL; + tipc_nlist_init(&grp->dests, tipc_own_addr(net)); + grp->members = RB_ROOT; + grp->net = net; + grp->portid = portid; + grp->domain = addr_domain(net, mreq->scope); + grp->type = type; + grp->instance = mreq->instance; + grp->scope = mreq->scope; + grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; + if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) + return grp; + kfree(grp); + return NULL; +} + +void tipc_group_delete(struct net *net, struct tipc_group *grp) +{ + struct rb_root *tree = &grp->members; + struct tipc_member *m, *tmp; + struct sk_buff_head xmitq; + + __skb_queue_head_init(&xmitq); + + rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { + tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); + list_del(&m->list); + kfree(m); + } + tipc_node_distr_xmit(net, &xmitq); + tipc_nlist_purge(&grp->dests); + tipc_topsrv_kern_unsubscr(net, grp->subid); + kfree(grp); +} + +struct tipc_member *tipc_group_find_member(struct tipc_group *grp, + u32 node, u32 port) +{ + struct rb_node *n = grp->members.rb_node; + u64 nkey, key = (u64)node << 32 | port; + struct tipc_member *m; + + while (n) { + m = container_of(n, struct tipc_member, tree_node); + nkey = (u64)m->node << 32 | m->port; + if (key < nkey) + n = n->rb_left; + else if (key > nkey) + n = n->rb_right; + else + return m; + } + return NULL; +} + +static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, + u32 node) +{ + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (m->node == node) + return m; + } + return NULL; +} + +static void tipc_group_add_to_tree(struct tipc_group *grp, + struct tipc_member *m) +{ + u64 nkey, key = (u64)m->node << 32 | m->port; + struct rb_node **n, *parent = NULL; + struct tipc_member *tmp; + + n = &grp->members.rb_node; + while (*n) { + tmp = container_of(*n, struct tipc_member, tree_node); + parent = *n; + tmp = container_of(parent, struct tipc_member, tree_node); + nkey = (u64)tmp->node << 32 | tmp->port; + if (key < nkey) + n = &(*n)->rb_left; + else if (key > nkey) + n = &(*n)->rb_right; + else + return; + } + rb_link_node(&m->tree_node, parent, n); + rb_insert_color(&m->tree_node, &grp->members); +} + +static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, + u32 node, u32 port, + int state) +{ + struct tipc_member *m; + + m = kzalloc(sizeof(*m), GFP_ATOMIC); + if (!m) + return NULL; + INIT_LIST_HEAD(&m->list); + m->node = node; + m->port = port; + grp->member_cnt++; + tipc_group_add_to_tree(grp, m); + tipc_nlist_add(&grp->dests, m->node); + m->state = state; + return m; +} + +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) +{ + tipc_group_create_member(grp, node, port, MBR_DISCOVERED); +} + +static void tipc_group_delete_member(struct tipc_group *grp, + struct tipc_member *m) +{ + rb_erase(&m->tree_node, &grp->members); + grp->member_cnt--; + list_del_init(&m->list); + + /* If last member on a node, remove node from dest list */ + if (!tipc_group_find_node(grp, m->node)) + tipc_nlist_del(&grp->dests, m->node); + + kfree(m); +} + +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) +{ + return &grp->dests; +} + +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope) +{ + seq->type = grp->type; + seq->lower = grp->instance; + seq->upper = grp->instance; + *scope = grp->scope; +} + +void tipc_group_update_bc_members(struct tipc_group *grp) +{ + grp->bc_snd_nxt++; +} + +/* tipc_group_filter_msg() - determine if we should accept arriving message + */ +void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, + struct sk_buff_head *xmitq) +{ + struct sk_buff *skb = __skb_dequeue(inputq); + struct tipc_member *m; + struct tipc_msg *hdr; + u32 node, port; + int mtyp; + + if (!skb) + return; + + hdr = buf_msg(skb); + mtyp = msg_type(hdr); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + if (!msg_in_group(hdr)) + goto drop; + + m = tipc_group_find_member(grp, node, port); + if (!tipc_group_is_receiver(m)) + goto drop; + + __skb_queue_tail(inputq, skb); + + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + return; +drop: + kfree_skb(skb); +} + +static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, + int mtyp, struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr; + struct sk_buff *skb; + + skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, + m->node, tipc_own_addr(grp->net), + m->port, grp->portid, 0); + if (!skb) + return; + + hdr = buf_msg(skb); + if (mtyp == GRP_JOIN_MSG) + msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + __skb_queue_tail(xmitq, skb); +} + +void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *xmitq) +{ + u32 node = msg_orignode(hdr); + u32 port = msg_origport(hdr); + struct tipc_member *m; + + if (!grp) + return; + + m = tipc_group_find_member(grp, node, port); + + switch (msg_type(hdr)) { + case GRP_JOIN_MSG: + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_QUARANTINED); + if (!m) + return; + m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + + /* Wait until PUBLISH event is received */ + if (m->state == MBR_DISCOVERED) + m->state = MBR_JOINING; + else if (m->state == MBR_PUBLISHED) + m->state = MBR_JOINED; + return; + case GRP_LEAVE_MSG: + if (!m) + return; + + /* Wait until WITHDRAW event is received */ + if (m->state != MBR_LEAVING) { + m->state = MBR_LEAVING; + return; + } + /* Otherwise deliver already received WITHDRAW event */ + tipc_group_delete_member(grp, m); + return; + default: + pr_warn("Received unknown GROUP_PROTO message\n"); + } +} + +/* tipc_group_member_evt() - receive and handle a member up/down event + */ +void tipc_group_member_evt(struct tipc_group *grp, + struct sk_buff *skb, + struct sk_buff_head *xmitq) +{ + struct tipc_msg *hdr = buf_msg(skb); + struct tipc_event *evt = (void *)msg_data(hdr); + u32 node = evt->port.node; + u32 port = evt->port.ref; + struct tipc_member *m; + struct net *net; + u32 self; + + if (!grp) + goto drop; + + net = grp->net; + self = tipc_own_addr(net); + if (!grp->loopback && node == self && port == grp->portid) + goto drop; + + m = tipc_group_find_member(grp, node, port); + + if (evt->event == TIPC_PUBLISHED) { + if (!m) + m = tipc_group_create_member(grp, node, port, + MBR_DISCOVERED); + if (!m) + goto drop; + + /* Wait if JOIN message not yet received */ + if (m->state == MBR_DISCOVERED) + m->state = MBR_PUBLISHED; + else + m->state = MBR_JOINED; + tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); + } else if (evt->event == TIPC_WITHDRAWN) { + if (!m) + goto drop; + + /* Keep back event if more messages might be expected */ + if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) + m->state = MBR_LEAVING; + else + tipc_group_delete_member(grp, m); + } +drop: + kfree_skb(skb); +} diff --git a/net/tipc/group.h b/net/tipc/group.h new file mode 100644 index 000000000000..9bdf4479fc03 --- /dev/null +++ b/net/tipc/group.h @@ -0,0 +1,64 @@ +/* + * net/tipc/group.h: Include file for TIPC group unicast/multicast functions + * + * Copyright (c) 2017, Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_GROUP_H +#define _TIPC_GROUP_H + +#include "core.h" + +struct tipc_group; +struct tipc_member; +struct tipc_msg; + +struct tipc_group *tipc_group_create(struct net *net, u32 portid, + struct tipc_group_req *mreq); +void tipc_group_delete(struct net *net, struct tipc_group *grp); +void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); +struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); +void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, + int *scope); +void tipc_group_filter_msg(struct tipc_group *grp, + struct sk_buff_head *inputq, + struct sk_buff_head *xmitq); +void tipc_group_member_evt(struct tipc_group *grp, + struct sk_buff *skb, + struct sk_buff_head *xmitq); +void tipc_group_proto_rcv(struct tipc_group *grp, + struct tipc_msg *hdr, + struct sk_buff_head *xmitq); +void tipc_group_update_bc_members(struct tipc_group *grp); +u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); +int tipc_group_size(struct tipc_group *grp); +#endif diff --git a/net/tipc/link.c b/net/tipc/link.c index ac0144f532aa..bd25bff63925 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1046,11 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: - if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { + if (unlikely(msg_mcast(hdr))) { skb_queue_tail(l->bc_rcvlink->inputq, skb); return true; } case CONN_MANAGER: + case GROUP_PROTOCOL: skb_queue_tail(inputq, skb); return true; case NAME_DISTRIBUTOR: diff --git a/net/tipc/msg.h b/net/tipc/msg.h index be3e38aa9dd2..dad400935405 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1,7 +1,7 @@ /* * net/tipc/msg.h: Include file for TIPC message header routines * - * Copyright (c) 2000-2007, 2014-2015 Ericsson AB + * Copyright (c) 2000-2007, 2014-2017 Ericsson AB * Copyright (c) 2005-2008, 2010-2011, Wind River Systems * All rights reserved. * @@ -61,10 +61,11 @@ struct plist; /* * Payload message types */ -#define TIPC_CONN_MSG 0 -#define TIPC_MCAST_MSG 1 -#define TIPC_NAMED_MSG 2 -#define TIPC_DIRECT_MSG 3 +#define TIPC_CONN_MSG 0 +#define TIPC_MCAST_MSG 1 +#define TIPC_NAMED_MSG 2 +#define TIPC_DIRECT_MSG 3 +#define TIPC_GRP_BCAST_MSG 4 /* * Internal message users @@ -73,6 +74,7 @@ struct plist; #define MSG_BUNDLER 6 #define LINK_PROTOCOL 7 #define CONN_MANAGER 8 +#define GROUP_PROTOCOL 9 #define TUNNEL_PROTOCOL 10 #define NAME_DISTRIBUTOR 11 #define MSG_FRAGMENTER 12 @@ -87,6 +89,7 @@ struct plist; #define BASIC_H_SIZE 32 /* Basic payload message */ #define NAMED_H_SIZE 40 /* Named payload message */ #define MCAST_H_SIZE 44 /* Multicast payload message */ +#define GROUP_H_SIZE 44 /* Group payload message */ #define INT_H_SIZE 40 /* Internal messages */ #define MIN_H_SIZE 24 /* Smallest legal TIPC header size */ #define MAX_H_SIZE 60 /* Largest possible TIPC header size */ @@ -252,6 +255,11 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 29, 0x7, n); } +static inline int msg_in_group(struct tipc_msg *m) +{ + return (msg_type(m) == TIPC_GRP_BCAST_MSG); +} + static inline u32 msg_named(struct tipc_msg *m) { return msg_type(m) == TIPC_NAMED_MSG; @@ -259,7 +267,9 @@ static inline u32 msg_named(struct tipc_msg *m) static inline u32 msg_mcast(struct tipc_msg *m) { - return msg_type(m) == TIPC_MCAST_MSG; + int mtyp = msg_type(m); + + return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG)); } static inline u32 msg_connected(struct tipc_msg *m) @@ -514,6 +524,12 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define DSC_REQ_MSG 0 #define DSC_RESP_MSG 1 +/* + * Group protocol message types + */ +#define GRP_JOIN_MSG 0 +#define GRP_LEAVE_MSG 1 + /* * Word 1 */ @@ -795,6 +811,28 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 0, 0xffff, n); } +static inline u16 msg_grp_bc_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + +/* Word 10 + */ +static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) +{ + return msg_bits(m, 10, 16, 0xffff); +} + +static inline void msg_set_grp_bc_seqno(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 10, 16, 0xffff, n); +} + static inline bool msg_peer_link_is_up(struct tipc_msg *m) { if (likely(msg_user(m) != LINK_PROTOCOL)) diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c index 76bd2777baaf..114d72bab827 100644 --- a/net/tipc/name_table.c +++ b/net/tipc/name_table.c @@ -43,6 +43,7 @@ #include "bcast.h" #include "addr.h" #include "node.h" +#include "group.h" #include #define TIPC_NAMETBL_SIZE 1024 /* must be a power of 2 */ @@ -596,18 +597,6 @@ not_found: return ref; } -/** - * tipc_nametbl_mc_translate - find multicast destinations - * - * Creates list of all local ports that overlap the given multicast address; - * also determines if any off-node ports overlap. - * - * Note: Publications with a scope narrower than 'limit' are ignored. - * (i.e. local node-scope publications mustn't receive messages arriving - * from another node, even if the multcast link brought it here) - * - * Returns non-zero if any off-node ports overlap - */ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports) { @@ -679,6 +668,37 @@ exit: rcu_read_unlock(); } +/* tipc_nametbl_build_group - build list of communication group members + */ +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain) +{ + struct sub_seq *sseq, *stop; + struct name_info *info; + struct publication *p; + struct name_seq *seq; + + rcu_read_lock(); + seq = nametbl_find_seq(net, type); + if (!seq) + goto exit; + + spin_lock_bh(&seq->lock); + sseq = seq->sseqs; + stop = seq->sseqs + seq->first_free; + for (; sseq != stop; sseq++) { + info = sseq->info; + list_for_each_entry(p, &info->zone_list, zone_list) { + if (!tipc_in_scope(domain, p->node)) + continue; + tipc_group_add_member(grp, p->node, p->ref); + } + } + spin_unlock_bh(&seq->lock); +exit: + rcu_read_unlock(); +} + /* * tipc_nametbl_publish - add name publication to network name tables */ diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h index d121175a92b5..97646b17a4a2 100644 --- a/net/tipc/name_table.h +++ b/net/tipc/name_table.h @@ -40,6 +40,7 @@ struct tipc_subscription; struct tipc_plist; struct tipc_nlist; +struct tipc_group; /* * TIPC name types reserved for internal TIPC use (both current and planned) @@ -101,6 +102,8 @@ int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, u32 limit, struct list_head *dports); +void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, + u32 type, u32 domain); void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, u32 upper, u32 domain, struct tipc_nlist *nodes); diff --git a/net/tipc/node.h b/net/tipc/node.h index df2f2197c4ad..acd58d23a70e 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -48,7 +48,8 @@ enum { TIPC_BCAST_SYNCH = (1 << 1), TIPC_BCAST_STATE_NACK = (1 << 2), TIPC_BLOCK_FLOWCTL = (1 << 3), - TIPC_BCAST_RCAST = (1 << 4) + TIPC_BCAST_RCAST = (1 << 4), + TIPC_MCAST_GROUPS = (1 << 5) }; #define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \ diff --git a/net/tipc/socket.c b/net/tipc/socket.c index daf7c4df4531..64bbf9d03629 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1,7 +1,7 @@ /* * net/tipc/socket.c: TIPC socket API * - * Copyright (c) 2001-2007, 2012-2016, Ericsson AB + * Copyright (c) 2001-2007, 2012-2017, Ericsson AB * Copyright (c) 2004-2008, 2010-2013, Wind River Systems * All rights reserved. * @@ -45,6 +45,7 @@ #include "socket.h" #include "bcast.h" #include "netlink.h" +#include "group.h" #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ @@ -78,7 +79,7 @@ enum { * @conn_timeout: the time we can wait for an unresponded setup request * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue * @cong_link_cnt: number of congested links - * @sent_unacked: # messages sent by socket, and not yet acked by peer + * @snt_unacked: # messages sent by socket, and not yet acked by peer * @rcv_unacked: # messages read by user, but not yet acked back to peer * @peer: 'connected' peer for dgram/rdm * @node: hash table node @@ -109,6 +110,7 @@ struct tipc_sock { struct rhash_head node; struct tipc_mc_method mc_method; struct rcu_head rcu; + struct tipc_group *group; }; static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb); @@ -123,6 +125,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, struct tipc_name_seq const *seq); +static int tipc_sk_leave(struct tipc_sock *tsk); static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); @@ -559,6 +562,7 @@ static int tipc_release(struct socket *sock) __tipc_shutdown(sock, TIPC_ERR_NO_PORT); sk->sk_shutdown = SHUTDOWN_MASK; + tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -601,7 +605,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr, res = tipc_sk_withdraw(tsk, 0, NULL); goto exit; } - + if (tsk->group) { + res = -EACCES; + goto exit; + } if (uaddr_len < sizeof(struct sockaddr_tipc)) { res = -EINVAL; goto exit; @@ -698,6 +705,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; u32 mask = 0; sock_poll_wait(file, sk_sleep(sk), wait); @@ -718,8 +726,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, mask |= (POLLIN | POLLRDNORM); break; case TIPC_OPEN: - if (!tsk->cong_link_cnt) - mask |= POLLOUT; + if (!grp || tipc_group_size(grp)) + if (!tsk->cong_link_cnt) + mask |= POLLOUT; if (tipc_sk_type_connectionless(sk) && (!skb_queue_empty(&sk->sk_receive_queue))) mask |= (POLLIN | POLLRDNORM); @@ -757,6 +766,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, struct tipc_nlist dsts; int rc; + if (tsk->group) + return -EACCES; + /* Block or return if any destination link is congested */ rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); if (unlikely(rc)) @@ -793,6 +805,64 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, return rc ? rc : dlen; } +/** + * tipc_send_group_bcast - send message to all members in communication group + * @sk: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + struct net *net = sock_net(sk); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct tipc_nlist *dsts = tipc_group_dests(grp); + struct tipc_mc_method *method = &tsk->mc_method; + struct tipc_msg *hdr = &tsk->phdr; + int mtu = tipc_bcast_get_mtu(net); + struct sk_buff_head pkts; + int rc = -EHOSTUNREACH; + + if (!dsts->local && !dsts->remote) + return -EHOSTUNREACH; + + /* Block or return if any destination link is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Complete message header */ + msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_destport(hdr, 0); + msg_set_destnode(hdr, 0); + msg_set_nameinst(hdr, 0); + msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_mcast_xmit(net, &pkts, method, dsts, + &tsk->cong_link_cnt); + if (unlikely(rc)) + return rc; + + /* Update broadcast sequence number */ + tipc_group_update_bc_members(tsk->group); + + return dlen; +} + /** * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * @arrvq: queue with arriving messages, to be cloned after destination lookup @@ -803,13 +873,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq) { - struct tipc_msg *msg; - struct list_head dports; - u32 portid; u32 scope = TIPC_CLUSTER_SCOPE; - struct sk_buff_head tmpq; - uint hsz; + u32 self = tipc_own_addr(net); struct sk_buff *skb, *_skb; + u32 lower = 0, upper = ~0; + struct sk_buff_head tmpq; + u32 portid, oport, onode; + struct list_head dports; + struct tipc_msg *msg; + int hsz; __skb_queue_head_init(&tmpq); INIT_LIST_HEAD(&dports); @@ -818,14 +890,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { msg = buf_msg(skb); hsz = skb_headroom(skb) + msg_hdr_sz(msg); - - if (in_own_node(net, msg_orignode(msg))) + oport = msg_origport(msg); + onode = msg_orignode(msg); + if (onode == self) scope = TIPC_NODE_SCOPE; /* Create destination port list and message clones: */ - tipc_nametbl_mc_translate(net, - msg_nametype(msg), msg_namelower(msg), - msg_nameupper(msg), scope, &dports); + if (!msg_in_group(msg)) { + lower = msg_namelower(msg); + upper = msg_nameupper(msg); + } + tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, + scope, &dports); while (tipc_dest_pop(&dports, NULL, &portid)) { _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); if (_skb) { @@ -895,10 +971,6 @@ exit: kfree_skb(skb); } -static void tipc_sk_top_evt(struct tipc_sock *tsk, struct tipc_event *evt) -{ -} - /** * tipc_sendmsg - send message in connectionless manner * @sock: socket structure @@ -934,6 +1006,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); struct list_head *clinks = &tsk->cong_links; bool syn = !tipc_sk_type_connectionless(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = &tsk->phdr; struct tipc_name_seq *seq; struct sk_buff_head pkts; @@ -944,6 +1017,9 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; + if (unlikely(grp)) + return tipc_send_group_bcast(sock, m, dlen, timeout); + if (unlikely(!dest)) { dest = &tsk->peer; if (!syn || dest->family != AF_TIPC) @@ -1543,6 +1619,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, struct sk_buff *skb = __skb_dequeue(inputq); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); + struct tipc_group *grp = tsk->group; switch (msg_user(hdr)) { case CONN_MANAGER: @@ -1553,8 +1630,12 @@ static void tipc_sk_proto_rcv(struct sock *sk, tsk->cong_link_cnt--; sk->sk_write_space(sk); break; + case GROUP_PROTOCOL: + tipc_group_proto_rcv(grp, hdr, xmitq); + break; case TOP_SRV: - tipc_sk_top_evt(tsk, (void *)msg_data(hdr)); + tipc_group_member_evt(tsk->group, skb, xmitq); + skb = NULL; break; default: break; @@ -1699,6 +1780,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, { bool sk_conn = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; struct tipc_msg *hdr = buf_msg(skb); struct net *net = sock_net(sk); struct sk_buff_head inputq; @@ -1710,15 +1792,19 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) + else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG)) return kfree_skb(skb); + if (unlikely(grp)) + tipc_group_filter_msg(grp, &inputq, xmitq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); limit = rcvbuf_limit(sk, skb); if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || - (!sk_conn && msg_connected(hdr))) + (!sk_conn && msg_connected(hdr)) || + (!grp && msg_in_group(hdr))) err = TIPC_ERR_NO_PORT; else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) err = TIPC_ERR_OVERLOAD; @@ -1837,7 +1923,6 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) sock_put(sk); continue; } - /* No destination socket => dequeue skb if still there */ skb = tipc_skb_dequeue(inputq, dport); if (!skb) @@ -1905,6 +1990,11 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest, lock_sock(sk); + if (tsk->group) { + res = -EINVAL; + goto exit; + } + if (dst->family == AF_UNSPEC) { memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc)); if (!tipc_sk_type_connectionless(sk)) @@ -2341,6 +2431,52 @@ void tipc_sk_rht_destroy(struct net *net) rhashtable_destroy(&tn->sk_rht); } +static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) +{ + struct net *net = sock_net(&tsk->sk); + u32 domain = addr_domain(net, mreq->scope); + struct tipc_group *grp = tsk->group; + struct tipc_msg *hdr = &tsk->phdr; + struct tipc_name_seq seq; + int rc; + + if (mreq->type < TIPC_RESERVED_TYPES) + return -EACCES; + if (grp) + return -EACCES; + grp = tipc_group_create(net, tsk->portid, mreq); + if (!grp) + return -ENOMEM; + tsk->group = grp; + msg_set_lookup_scope(hdr, mreq->scope); + msg_set_nametype(hdr, mreq->type); + msg_set_dest_droppable(hdr, true); + seq.type = mreq->type; + seq.lower = mreq->instance; + seq.upper = seq.lower; + tipc_nametbl_build_group(net, grp, mreq->type, domain); + rc = tipc_sk_publish(tsk, mreq->scope, &seq); + if (rc) + tipc_group_delete(net, grp); + return rc; +} + +static int tipc_sk_leave(struct tipc_sock *tsk) +{ + struct net *net = sock_net(&tsk->sk); + struct tipc_group *grp = tsk->group; + struct tipc_name_seq seq; + int scope; + + if (!grp) + return -EINVAL; + tipc_group_self(grp, &seq, &scope); + tipc_group_delete(net, grp); + tsk->group = NULL; + tipc_sk_withdraw(tsk, scope, &seq); + return 0; +} + /** * tipc_setsockopt - set socket option * @sock: socket structure @@ -2359,6 +2495,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group_req mreq; u32 value = 0; int res = 0; @@ -2374,9 +2511,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, case TIPC_CONN_TIMEOUT: if (ol < sizeof(value)) return -EINVAL; - res = get_user(value, (u32 __user *)ov); - if (res) - return res; + if (get_user(value, (u32 __user *)ov)) + return -EFAULT; + break; + case TIPC_GROUP_JOIN: + if (ol < sizeof(mreq)) + return -EINVAL; + if (copy_from_user(&mreq, ov, sizeof(mreq))) + return -EFAULT; break; default: if (ov || ol) @@ -2409,6 +2551,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt, tsk->mc_method.rcast = true; tsk->mc_method.mandatory = true; break; + case TIPC_GROUP_JOIN: + res = tipc_sk_join(tsk, &mreq); + break; + case TIPC_GROUP_LEAVE: + res = tipc_sk_leave(tsk); + break; default: res = -EINVAL; } @@ -2436,7 +2584,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, { struct sock *sk = sock->sk; struct tipc_sock *tsk = tipc_sk(sk); - int len; + struct tipc_name_seq seq; + int len, scope; u32 value; int res; @@ -2470,6 +2619,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt, case TIPC_SOCK_RECVQ_DEPTH: value = skb_queue_len(&sk->sk_receive_queue); break; + case TIPC_GROUP_JOIN: + seq.type = 0; + if (tsk->group) + tipc_group_self(tsk->group, &seq, &scope); + value = seq.type; + break; default: res = -EINVAL; } -- cgit v1.2.3 From 31c82a2d9d51fccbb85cbd2be983eb115225301c Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:24 +0200 Subject: tipc: add second source address to recvmsg()/recvfrom() With group communication, it becomes important for a message receiver to identify not only from which socket (identfied by a node:port tuple) the message was sent, but also the logical identity (type:instance) of the sending member. We fix this by adding a second instance of struct sockaddr_tipc to the source address area when a message is read. The extra address struct is filled in with data found in the received message header (type,) and in the local member representation struct (instance.) Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 3 +++ net/tipc/msg.h | 1 + net/tipc/socket.c | 53 +++++++++++++++++++++++++++++++++++------------------ 3 files changed, 39 insertions(+), 18 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/group.c b/net/tipc/group.c index 3f0e1ce1e3b9..beb214a3420c 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -61,6 +61,7 @@ struct tipc_member { struct list_head list; u32 node; u32 port; + u32 instance; enum mbr_state state; u16 bc_rcv_nxt; }; @@ -282,6 +283,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!tipc_group_is_receiver(m)) goto drop; + TIPC_SKB_CB(skb)->orig_member = m->instance; __skb_queue_tail(inputq, skb); m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; @@ -388,6 +390,7 @@ void tipc_group_member_evt(struct tipc_group *grp, m->state = MBR_PUBLISHED; else m->state = MBR_JOINED; + m->instance = evt->found_lower; tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); } else if (evt->event == TIPC_WITHDRAWN) { if (!m) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index dad400935405..e438716d2372 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -100,6 +100,7 @@ struct plist; struct tipc_skb_cb { u32 bytes_read; + u32 orig_member; struct sk_buff *tail; bool validated; u16 chain_imp; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 64bbf9d03629..25ecf1201527 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -62,6 +62,11 @@ enum { TIPC_CONNECTING = TCP_SYN_SENT, }; +struct sockaddr_pair { + struct sockaddr_tipc sock; + struct sockaddr_tipc member; +}; + /** * struct tipc_sock - TIPC socket structure * @sk: socket - interacts with 'port' and with user via the socket API @@ -1222,26 +1227,38 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, } /** - * set_orig_addr - capture sender's address for received message + * tipc_sk_set_orig_addr - capture sender's address for received message * @m: descriptor for message info - * @msg: received message header + * @hdr: received message header * * Note: Address is not captured if not requested by receiver. */ -static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) -{ - DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name); - - if (addr) { - addr->family = AF_TIPC; - addr->addrtype = TIPC_ADDR_ID; - memset(&addr->addr, 0, sizeof(addr->addr)); - addr->addr.id.ref = msg_origport(msg); - addr->addr.id.node = msg_orignode(msg); - addr->addr.name.domain = 0; /* could leave uninitialized */ - addr->scope = 0; /* could leave uninitialized */ - m->msg_namelen = sizeof(struct sockaddr_tipc); - } +static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb) +{ + DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name); + struct tipc_msg *hdr = buf_msg(skb); + + if (!srcaddr) + return; + + srcaddr->sock.family = AF_TIPC; + srcaddr->sock.addrtype = TIPC_ADDR_ID; + srcaddr->sock.addr.id.ref = msg_origport(hdr); + srcaddr->sock.addr.id.node = msg_orignode(hdr); + srcaddr->sock.addr.name.domain = 0; + srcaddr->sock.scope = 0; + m->msg_namelen = sizeof(struct sockaddr_tipc); + + if (!msg_in_group(hdr)) + return; + + /* Group message users may also want to know sending member's id */ + srcaddr->member.family = AF_TIPC; + srcaddr->member.addrtype = TIPC_ADDR_NAME; + srcaddr->member.addr.name.name.type = msg_nametype(hdr); + srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member; + srcaddr->member.addr.name.domain = 0; + m->msg_namelen = sizeof(*srcaddr); } /** @@ -1432,7 +1449,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, } while (1); /* Collect msg meta data, including error code and rejected data */ - set_orig_addr(m, hdr); + tipc_sk_set_orig_addr(m, skb); rc = tipc_sk_anc_data_recv(m, hdr, tsk); if (unlikely(rc)) goto exit; @@ -1526,7 +1543,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Collect msg meta data, incl. error code and rejected data */ if (!copied) { - set_orig_addr(m, hdr); + tipc_sk_set_orig_addr(m, skb); rc = tipc_sk_anc_data_recv(m, hdr, tsk); if (rc) break; -- cgit v1.2.3 From ae236fb208a6fbbd2e7a6913385e8fb78ac807f8 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:25 +0200 Subject: tipc: receive group membership events via member socket Like with any other service, group members' availability can be subscribed for by connecting to be topology server. However, because the events arrive via a different socket than the member socket, there is a real risk that membership events my arrive out of synch with the actual JOIN/LEAVE action. I.e., it is possible to receive the first messages from a new member before the corresponding JOIN event arrives, just as it is possible to receive the last messages from a leaving member after the LEAVE event has already been received. Since each member socket is internally also subscribing for membership events, we now fix this problem by passing those events on to the user via the member socket. We leverage the already present member synch- ronization protocol to guarantee correct message/event order. An event is delivered to the user as an empty message where the two source addresses identify the new/lost member. Furthermore, we set the MSG_OOB bit in the message flags to mark it as an event. If the event is an indication about a member loss we also set the MSG_EOR bit, so it can be distinguished from a member addition event. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- include/uapi/linux/tipc.h | 1 + net/tipc/group.c | 60 +++++++++++++++++++++++++++++++++++++---------- net/tipc/group.h | 2 ++ net/tipc/msg.h | 22 +++++++++++++++-- net/tipc/socket.c | 49 ++++++++++++++++++++++++-------------- 5 files changed, 101 insertions(+), 33 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h index 5f7b2c4a09ab..ef41c11a7f38 100644 --- a/include/uapi/linux/tipc.h +++ b/include/uapi/linux/tipc.h @@ -238,6 +238,7 @@ struct sockaddr_tipc { * Flag values */ #define TIPC_GROUP_LOOPBACK 0x1 /* Receive copy of sent msg when match */ +#define TIPC_GROUP_MEMBER_EVTS 0x2 /* Receive membership events in socket */ struct tipc_group_req { __u32 type; /* group id */ diff --git a/net/tipc/group.c b/net/tipc/group.c index beb214a3420c..1bfa9348b26d 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -59,6 +59,7 @@ enum mbr_state { struct tipc_member { struct rb_node tree_node; struct list_head list; + struct sk_buff *event_msg; u32 node; u32 port; u32 instance; @@ -79,6 +80,7 @@ struct tipc_group { u16 member_cnt; u16 bc_snd_nxt; bool loopback; + bool events; }; static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, @@ -117,6 +119,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, grp->instance = mreq->instance; grp->scope = mreq->scope; grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; + grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) return grp; kfree(grp); @@ -279,6 +282,13 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!msg_in_group(hdr)) goto drop; + if (mtyp == TIPC_GRP_MEMBER_EVT) { + if (!grp->events) + goto drop; + __skb_queue_tail(inputq, skb); + return; + } + m = tipc_group_find_member(grp, node, port); if (!tipc_group_is_receiver(m)) goto drop; @@ -311,6 +321,7 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, } void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { u32 node = msg_orignode(hdr); @@ -332,10 +343,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); /* Wait until PUBLISH event is received */ - if (m->state == MBR_DISCOVERED) + if (m->state == MBR_DISCOVERED) { m->state = MBR_JOINING; - else if (m->state == MBR_PUBLISHED) + } else if (m->state == MBR_PUBLISHED) { m->state = MBR_JOINED; + __skb_queue_tail(inputq, m->event_msg); + } return; case GRP_LEAVE_MSG: if (!m) @@ -347,6 +360,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, return; } /* Otherwise deliver already received WITHDRAW event */ + __skb_queue_tail(inputq, m->event_msg); tipc_group_delete_member(grp, m); return; default: @@ -354,16 +368,17 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, } } -/* tipc_group_member_evt() - receive and handle a member up/down event - */ void tipc_group_member_evt(struct tipc_group *grp, struct sk_buff *skb, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct tipc_msg *hdr = buf_msg(skb); struct tipc_event *evt = (void *)msg_data(hdr); + u32 instance = evt->found_lower; u32 node = evt->port.node; u32 port = evt->port.ref; + int event = evt->event; struct tipc_member *m; struct net *net; u32 self; @@ -376,32 +391,51 @@ void tipc_group_member_evt(struct tipc_group *grp, if (!grp->loopback && node == self && port == grp->portid) goto drop; + /* Convert message before delivery to user */ + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); + msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); + msg_set_origport(hdr, port); + msg_set_orignode(hdr, node); + msg_set_nametype(hdr, grp->type); + msg_set_grp_evt(hdr, event); + m = tipc_group_find_member(grp, node, port); - if (evt->event == TIPC_PUBLISHED) { + if (event == TIPC_PUBLISHED) { if (!m) m = tipc_group_create_member(grp, node, port, MBR_DISCOVERED); if (!m) goto drop; - /* Wait if JOIN message not yet received */ - if (m->state == MBR_DISCOVERED) + /* Hold back event if JOIN message not yet received */ + if (m->state == MBR_DISCOVERED) { + m->event_msg = skb; m->state = MBR_PUBLISHED; - else + } else { + __skb_queue_tail(inputq, skb); m->state = MBR_JOINED; - m->instance = evt->found_lower; + } + m->instance = instance; + TIPC_SKB_CB(skb)->orig_member = m->instance; tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); - } else if (evt->event == TIPC_WITHDRAWN) { + } else if (event == TIPC_WITHDRAWN) { if (!m) goto drop; - /* Keep back event if more messages might be expected */ - if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) + TIPC_SKB_CB(skb)->orig_member = m->instance; + + /* Hold back event if more messages might be expected */ + if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { + m->event_msg = skb; m->state = MBR_LEAVING; - else + } else { + __skb_queue_tail(inputq, skb); tipc_group_delete_member(grp, m); + } } + return; drop: kfree_skb(skb); } diff --git a/net/tipc/group.h b/net/tipc/group.h index 9bdf4479fc03..5d3f10d28967 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -54,9 +54,11 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *xmitq); void tipc_group_member_evt(struct tipc_group *grp, struct sk_buff *skb, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq); void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, + struct sk_buff_head *inputq, struct sk_buff_head *xmitq); void tipc_group_update_bc_members(struct tipc_group *grp); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index e438716d2372..1b527b154e46 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -65,7 +65,8 @@ struct plist; #define TIPC_MCAST_MSG 1 #define TIPC_NAMED_MSG 2 #define TIPC_DIRECT_MSG 3 -#define TIPC_GRP_BCAST_MSG 4 +#define TIPC_GRP_MEMBER_EVT 4 +#define TIPC_GRP_BCAST_MSG 5 /* * Internal message users @@ -258,7 +259,14 @@ static inline void msg_set_type(struct tipc_msg *m, u32 n) static inline int msg_in_group(struct tipc_msg *m) { - return (msg_type(m) == TIPC_GRP_BCAST_MSG); + int mtyp = msg_type(m); + + return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT); +} + +static inline bool msg_is_grp_evt(struct tipc_msg *m) +{ + return msg_type(m) == TIPC_GRP_MEMBER_EVT; } static inline u32 msg_named(struct tipc_msg *m) @@ -824,6 +832,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) /* Word 10 */ +static inline u16 msg_grp_evt(struct tipc_msg *m) +{ + return msg_bits(m, 10, 0, 0x3); +} + +static inline void msg_set_grp_evt(struct tipc_msg *m, int n) +{ + msg_set_bits(m, 10, 0, 0x3, n); +} + static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) { return msg_bits(m, 10, 16, 0xffff); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 25ecf1201527..0a2eac309177 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -709,41 +709,43 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock, poll_table *wait) { struct sock *sk = sock->sk; + struct sk_buff *skb = skb_peek(&sk->sk_receive_queue); struct tipc_sock *tsk = tipc_sk(sk); struct tipc_group *grp = tsk->group; - u32 mask = 0; + u32 revents = 0; sock_poll_wait(file, sk_sleep(sk), wait); if (sk->sk_shutdown & RCV_SHUTDOWN) - mask |= POLLRDHUP | POLLIN | POLLRDNORM; + revents |= POLLRDHUP | POLLIN | POLLRDNORM; if (sk->sk_shutdown == SHUTDOWN_MASK) - mask |= POLLHUP; + revents |= POLLHUP; switch (sk->sk_state) { case TIPC_ESTABLISHED: if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) - mask |= POLLOUT; + revents |= POLLOUT; /* fall thru' */ case TIPC_LISTEN: case TIPC_CONNECTING: - if (!skb_queue_empty(&sk->sk_receive_queue)) - mask |= (POLLIN | POLLRDNORM); + if (skb) + revents |= POLLIN | POLLRDNORM; break; case TIPC_OPEN: if (!grp || tipc_group_size(grp)) if (!tsk->cong_link_cnt) - mask |= POLLOUT; - if (tipc_sk_type_connectionless(sk) && - (!skb_queue_empty(&sk->sk_receive_queue))) - mask |= (POLLIN | POLLRDNORM); + revents |= POLLOUT; + if (!tipc_sk_type_connectionless(sk)) + break; + if (!skb) + break; + revents |= POLLIN | POLLRDNORM; break; case TIPC_DISCONNECTING: - mask = (POLLIN | POLLRDNORM | POLLHUP); + revents = POLLIN | POLLRDNORM | POLLHUP; break; } - - return mask; + return revents; } /** @@ -1415,11 +1417,12 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buflen, int flags) { struct sock *sk = sock->sk; - struct tipc_sock *tsk = tipc_sk(sk); - struct sk_buff *skb; - struct tipc_msg *hdr; bool connected = !tipc_sk_type_connectionless(sk); + struct tipc_sock *tsk = tipc_sk(sk); int rc, err, hlen, dlen, copy; + struct tipc_msg *hdr; + struct sk_buff *skb; + bool grp_evt; long timeout; /* Catch invalid receive requests */ @@ -1443,6 +1446,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, dlen = msg_data_sz(hdr); hlen = msg_hdr_sz(hdr); err = msg_errcode(hdr); + grp_evt = msg_is_grp_evt(hdr); if (likely(dlen || err)) break; tsk_advance_rx_queue(sk); @@ -1469,11 +1473,20 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, if (unlikely(rc)) goto exit; + /* Mark message as group event if applicable */ + if (unlikely(grp_evt)) { + if (msg_grp_evt(hdr) == TIPC_WITHDRAWN) + m->msg_flags |= MSG_EOR; + m->msg_flags |= MSG_OOB; + copy = 0; + } + /* Caption of data or error code/rejected data was successful */ if (unlikely(flags & MSG_PEEK)) goto exit; tsk_advance_rx_queue(sk); + if (likely(!connected)) goto exit; @@ -1648,10 +1661,10 @@ static void tipc_sk_proto_rcv(struct sock *sk, sk->sk_write_space(sk); break; case GROUP_PROTOCOL: - tipc_group_proto_rcv(grp, hdr, xmitq); + tipc_group_proto_rcv(grp, hdr, inputq, xmitq); break; case TOP_SRV: - tipc_group_member_evt(tsk->group, skb, xmitq); + tipc_group_member_evt(tsk->group, skb, inputq, xmitq); skb = NULL; break; default: -- cgit v1.2.3 From b7d42635517fde2b095deddd0fba37be2302a285 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:26 +0200 Subject: tipc: introduce flow control for group broadcast messages We introduce an end-to-end flow control mechanism for group broadcast messages. This ensures that no messages are ever lost because of destination receive buffer overflow, with minimal impact on performance. For now, the algorithm is based on the assumption that there is only one active transmitter at any moment in time. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- net/tipc/group.h | 11 ++-- net/tipc/msg.h | 5 +- net/tipc/socket.c | 48 +++++++++++++----- 4 files changed, 190 insertions(+), 22 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/group.c b/net/tipc/group.c index 1bfa9348b26d..b8ed70abba01 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -46,6 +46,7 @@ #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) #define ADV_IDLE ADV_UNIT +#define ADV_ACTIVE (ADV_UNIT * 12) enum mbr_state { MBR_QUARANTINED, @@ -59,16 +60,22 @@ enum mbr_state { struct tipc_member { struct rb_node tree_node; struct list_head list; + struct list_head congested; struct sk_buff *event_msg; + struct tipc_group *group; u32 node; u32 port; u32 instance; enum mbr_state state; + u16 advertised; + u16 window; u16 bc_rcv_nxt; + bool usr_pending; }; struct tipc_group { struct rb_root members; + struct list_head congested; struct tipc_nlist dests; struct net *net; int subid; @@ -86,11 +93,24 @@ struct tipc_group { static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, int mtyp, struct sk_buff_head *xmitq); +static int tipc_group_rcvbuf_limit(struct tipc_group *grp) +{ + int mcnt = grp->member_cnt + 1; + + /* Scale to bytes, considering worst-case truesize/msgsize ratio */ + return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4; +} + u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) { return grp->bc_snd_nxt; } +static bool tipc_group_is_enabled(struct tipc_member *m) +{ + return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; +} + static bool tipc_group_is_receiver(struct tipc_member *m) { return m && m->state >= MBR_JOINED; @@ -111,6 +131,7 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, if (!grp) return NULL; tipc_nlist_init(&grp->dests, tipc_own_addr(net)); + INIT_LIST_HEAD(&grp->congested); grp->members = RB_ROOT; grp->net = net; grp->portid = portid; @@ -213,6 +234,8 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, if (!m) return NULL; INIT_LIST_HEAD(&m->list); + INIT_LIST_HEAD(&m->congested); + m->group = grp; m->node = node; m->port = port; grp->member_cnt++; @@ -233,6 +256,7 @@ static void tipc_group_delete_member(struct tipc_group *grp, rb_erase(&m->tree_node, &grp->members); grp->member_cnt--; list_del_init(&m->list); + list_del_init(&m->congested); /* If last member on a node, remove node from dest list */ if (!tipc_group_find_node(grp, m->node)) @@ -255,11 +279,59 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, *scope = grp->scope; } -void tipc_group_update_bc_members(struct tipc_group *grp) +void tipc_group_update_member(struct tipc_member *m, int len) +{ + struct tipc_group *grp = m->group; + struct tipc_member *_m, *tmp; + + if (!tipc_group_is_enabled(m)) + return; + + m->window -= len; + + if (m->window >= ADV_IDLE) + return; + + if (!list_empty(&m->congested)) + return; + + /* Sort member into congested members' list */ + list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { + if (m->window > _m->window) + continue; + list_add_tail(&m->congested, &_m->congested); + return; + } + list_add_tail(&m->congested, &grp->congested); +} + +void tipc_group_update_bc_members(struct tipc_group *grp, int len) { + struct tipc_member *m; + struct rb_node *n; + + for (n = rb_first(&grp->members); n; n = rb_next(n)) { + m = container_of(n, struct tipc_member, tree_node); + if (tipc_group_is_enabled(m)) + tipc_group_update_member(m, len); + } grp->bc_snd_nxt++; } +bool tipc_group_bc_cong(struct tipc_group *grp, int len) +{ + struct tipc_member *m; + + if (list_empty(&grp->congested)) + return false; + + m = list_first_entry(&grp->congested, struct tipc_member, congested); + if (m->window >= len) + return false; + + return true; +} + /* tipc_group_filter_msg() - determine if we should accept arriving message */ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, @@ -302,11 +374,36 @@ drop: kfree_skb(skb); } +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, + u32 port, struct sk_buff_head *xmitq) +{ + struct tipc_member *m; + + m = tipc_group_find_member(grp, node, port); + if (!m) + return; + + m->advertised -= blks; + + switch (m->state) { + case MBR_JOINED: + if (m->advertised <= (ADV_ACTIVE - ADV_UNIT)) + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + break; + case MBR_DISCOVERED: + case MBR_JOINING: + case MBR_LEAVING: + default: + break; + } +} + static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, int mtyp, struct sk_buff_head *xmitq) { struct tipc_msg *hdr; struct sk_buff *skb; + int adv = 0; skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, m->node, tipc_own_addr(grp->net), @@ -314,14 +411,24 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, if (!skb) return; + if (m->state == MBR_JOINED) + adv = ADV_ACTIVE - m->advertised; + hdr = buf_msg(skb); - if (mtyp == GRP_JOIN_MSG) + + if (mtyp == GRP_JOIN_MSG) { msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); + msg_set_adv_win(hdr, adv); + m->advertised += adv; + } else if (mtyp == GRP_ADV_MSG) { + msg_set_adv_win(hdr, adv); + m->advertised += adv; + } __skb_queue_tail(xmitq, skb); } -void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, - struct sk_buff_head *inputq, +void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, + struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { u32 node = msg_orignode(hdr); @@ -341,14 +448,22 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, if (!m) return; m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr); + m->window += msg_adv_win(hdr); /* Wait until PUBLISH event is received */ if (m->state == MBR_DISCOVERED) { m->state = MBR_JOINING; } else if (m->state == MBR_PUBLISHED) { m->state = MBR_JOINED; + *usr_wakeup = true; + m->usr_pending = false; + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); __skb_queue_tail(inputq, m->event_msg); } + if (m->window < ADV_IDLE) + tipc_group_update_member(m, 0); + else + list_del_init(&m->congested); return; case GRP_LEAVE_MSG: if (!m) @@ -361,14 +476,28 @@ void tipc_group_proto_rcv(struct tipc_group *grp, struct tipc_msg *hdr, } /* Otherwise deliver already received WITHDRAW event */ __skb_queue_tail(inputq, m->event_msg); + *usr_wakeup = m->usr_pending; tipc_group_delete_member(grp, m); + list_del_init(&m->congested); + return; + case GRP_ADV_MSG: + if (!m) + return; + m->window += msg_adv_win(hdr); + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + list_del_init(&m->congested); return; default: pr_warn("Received unknown GROUP_PROTO message\n"); } } +/* tipc_group_member_evt() - receive and handle a member up/down event + */ void tipc_group_member_evt(struct tipc_group *grp, + bool *usr_wakeup, + int *sk_rcvbuf, struct sk_buff *skb, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) @@ -416,16 +545,25 @@ void tipc_group_member_evt(struct tipc_group *grp, } else { __skb_queue_tail(inputq, skb); m->state = MBR_JOINED; + *usr_wakeup = true; + m->usr_pending = false; } m->instance = instance; TIPC_SKB_CB(skb)->orig_member = m->instance; tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); + if (m->window < ADV_IDLE) + tipc_group_update_member(m, 0); + else + list_del_init(&m->congested); } else if (event == TIPC_WITHDRAWN) { if (!m) goto drop; TIPC_SKB_CB(skb)->orig_member = m->instance; + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + /* Hold back event if more messages might be expected */ if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) { m->event_msg = skb; @@ -434,7 +572,9 @@ void tipc_group_member_evt(struct tipc_group *grp, __skb_queue_tail(inputq, skb); tipc_group_delete_member(grp, m); } + list_del_init(&m->congested); } + *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); return; drop: kfree_skb(skb); diff --git a/net/tipc/group.h b/net/tipc/group.h index 5d3f10d28967..0e2740e1da90 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -52,15 +52,18 @@ void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_member_evt(struct tipc_group *grp, - struct sk_buff *skb, +void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, + int *sk_rcvbuf, struct sk_buff *skb, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_proto_rcv(struct tipc_group *grp, +void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_update_bc_members(struct tipc_group *grp); +void tipc_group_update_bc_members(struct tipc_group *grp, int len); +bool tipc_group_bc_cong(struct tipc_group *grp, int len); +void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, + u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 1b527b154e46..237d007499f9 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -538,6 +538,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) */ #define GRP_JOIN_MSG 0 #define GRP_LEAVE_MSG 1 +#define GRP_ADV_MSG 2 /* * Word 1 @@ -790,12 +791,12 @@ static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n) msg_set_bits(m, 9, 16, 0xffff, n); } -static inline u32 msg_adv_win(struct tipc_msg *m) +static inline u16 msg_adv_win(struct tipc_msg *m) { return msg_bits(m, 9, 0, 0xffff); } -static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) +static inline void msg_set_adv_win(struct tipc_msg *m, u16 n) { msg_set_bits(m, 9, 0, 0xffff, n); } diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 0a2eac309177..50145c95ac96 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -201,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk) return tsk->snt_unacked > tsk->snd_win; } +static u16 tsk_blocks(int len) +{ + return ((len / FLOWCTL_BLK_SZ) + 1); +} + /* tsk_blocks(): translate a buffer size in bytes to number of * advertisable blocks, taking into account the ratio truesize(len)/len * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ @@ -831,6 +836,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, struct tipc_group *grp = tsk->group; struct tipc_nlist *dsts = tipc_group_dests(grp); struct tipc_mc_method *method = &tsk->mc_method; + int blks = tsk_blocks(MCAST_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; int mtu = tipc_bcast_get_mtu(net); struct sk_buff_head pkts; @@ -839,14 +845,15 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, if (!dsts->local && !dsts->remote) return -EHOSTUNREACH; - /* Block or return if any destination link is congested */ - rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); + /* Block or return if any destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt && + !tipc_group_bc_cong(grp, blks)); if (unlikely(rc)) return rc; /* Complete message header */ msg_set_type(hdr, TIPC_GRP_BCAST_MSG); - msg_set_hdr_sz(hdr, MCAST_H_SIZE); + msg_set_hdr_sz(hdr, GROUP_H_SIZE); msg_set_destport(hdr, 0); msg_set_destnode(hdr, 0); msg_set_nameinst(hdr, 0); @@ -864,9 +871,8 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, if (unlikely(rc)) return rc; - /* Update broadcast sequence number */ - tipc_group_update_bc_members(tsk->group); - + /* Update broadcast sequence number and send windows */ + tipc_group_update_bc_members(tsk->group, blks); return dlen; } @@ -1024,7 +1030,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; - if (unlikely(grp)) + if (unlikely(grp && !dest)) return tipc_send_group_bcast(sock, m, dlen, timeout); if (unlikely(!dest)) { @@ -1420,6 +1426,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, bool connected = !tipc_sk_type_connectionless(sk); struct tipc_sock *tsk = tipc_sk(sk); int rc, err, hlen, dlen, copy; + struct sk_buff_head xmitq; struct tipc_msg *hdr; struct sk_buff *skb; bool grp_evt; @@ -1436,8 +1443,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, } timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); + /* Step rcv queue to first msg with data or error; wait if necessary */ do { - /* Look at first msg in receive queue; wait if necessary */ rc = tipc_wait_for_rcvmsg(sock, &timeout); if (unlikely(rc)) goto exit; @@ -1485,12 +1492,21 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, if (unlikely(flags & MSG_PEEK)) goto exit; + /* Send group flow control advertisement when applicable */ + if (tsk->group && msg_in_group(hdr) && !grp_evt) { + skb_queue_head_init(&xmitq); + tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen), + msg_orignode(hdr), msg_origport(hdr), + &xmitq); + tipc_node_distr_xmit(sock_net(sk), &xmitq); + } + tsk_advance_rx_queue(sk); if (likely(!connected)) goto exit; - /* Send connection flow control ack when applicable */ + /* Send connection flow control advertisement when applicable */ tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) tipc_sk_send_ack(tsk); @@ -1650,6 +1666,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); struct tipc_group *grp = tsk->group; + bool wakeup = false; switch (msg_user(hdr)) { case CONN_MANAGER: @@ -1658,19 +1675,23 @@ static void tipc_sk_proto_rcv(struct sock *sk, case SOCK_WAKEUP: tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0); tsk->cong_link_cnt--; - sk->sk_write_space(sk); + wakeup = true; break; case GROUP_PROTOCOL: - tipc_group_proto_rcv(grp, hdr, inputq, xmitq); + tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); break; case TOP_SRV: - tipc_group_member_evt(tsk->group, skb, inputq, xmitq); + tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, + skb, inputq, xmitq); skb = NULL; break; default: break; } + if (wakeup) + sk->sk_write_space(sk); + kfree_skb(skb); } @@ -1785,6 +1806,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = buf_msg(skb); + if (unlikely(msg_in_group(hdr))) + return sk->sk_rcvbuf; + if (unlikely(!msg_connected(hdr))) return sk->sk_rcvbuf << msg_importance(hdr); -- cgit v1.2.3 From 27bd9ec027f396457d1a147043c92ff22fc4c71e Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:27 +0200 Subject: tipc: introduce group unicast messaging We now make it possible to send connectionless unicast messages within a communication group. To send a message, the sender can use either a direct port address, aka port identity, or an indirect port name to be looked up. This type of messages are subject to the same start synchronization and flow control mechanism as group broadcast messages. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 45 +++++++++++++++++++++- net/tipc/group.h | 3 ++ net/tipc/msg.h | 3 +- net/tipc/socket.c | 112 +++++++++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 150 insertions(+), 13 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/group.c b/net/tipc/group.c index b8ed70abba01..18440be5b5fc 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -186,6 +186,17 @@ struct tipc_member *tipc_group_find_member(struct tipc_group *grp, return NULL; } +static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, + u32 node, u32 port) +{ + struct tipc_member *m; + + m = tipc_group_find_member(grp, node, port); + if (m && tipc_group_is_enabled(m)) + return m; + return NULL; +} + static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, u32 node) { @@ -318,9 +329,39 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len) grp->bc_snd_nxt++; } -bool tipc_group_bc_cong(struct tipc_group *grp, int len) +bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, + int len, struct tipc_member **mbr) { + struct sk_buff_head xmitq; struct tipc_member *m; + int adv, state; + + m = tipc_group_find_dest(grp, dnode, dport); + *mbr = m; + if (!m) + return false; + if (m->usr_pending) + return true; + if (m->window >= len) + return false; + m->usr_pending = true; + + /* If not fully advertised, do it now to prevent mutual blocking */ + adv = m->advertised; + state = m->state; + if (state < MBR_JOINED) + return true; + if (state == MBR_JOINED && adv == ADV_IDLE) + return true; + skb_queue_head_init(&xmitq); + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); + tipc_node_distr_xmit(grp->net, &xmitq); + return true; +} + +bool tipc_group_bc_cong(struct tipc_group *grp, int len) +{ + struct tipc_member *m = NULL; if (list_empty(&grp->congested)) return false; @@ -329,7 +370,7 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len) if (m->window >= len) return false; - return true; + return tipc_group_cong(grp, m->node, m->port, len, &m); } /* tipc_group_filter_msg() - determine if we should accept arriving message diff --git a/net/tipc/group.h b/net/tipc/group.h index 0e2740e1da90..8f77290bb415 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -61,9 +61,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); void tipc_group_update_bc_members(struct tipc_group *grp, int len); +bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, + int len, struct tipc_member **m); bool tipc_group_bc_cong(struct tipc_group *grp, int len); void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); +void tipc_group_update_member(struct tipc_member *m, int len); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 237d007499f9..f5033f4a7951 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -67,6 +67,7 @@ struct plist; #define TIPC_DIRECT_MSG 3 #define TIPC_GRP_MEMBER_EVT 4 #define TIPC_GRP_BCAST_MSG 5 +#define TIPC_GRP_UCAST_MSG 6 /* * Internal message users @@ -261,7 +262,7 @@ static inline int msg_in_group(struct tipc_msg *m) { int mtyp = msg_type(m); - return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT); + return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG; } static inline bool msg_is_grp_evt(struct tipc_msg *m) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 50145c95ac96..e71c8d23acb9 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -817,6 +817,93 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, return rc ? rc : dlen; } +/** + * tipc_send_group_msg - send a message to a member in the group + * @net: network namespace + * @m: message to send + * @mb: group member + * @dnode: destination node + * @dport: destination port + * @dlen: total length of message data + */ +static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, + struct msghdr *m, struct tipc_member *mb, + u32 dnode, u32 dport, int dlen) +{ + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_msg *hdr = &tsk->phdr; + struct sk_buff_head pkts; + int mtu, rc; + + /* Complete message header */ + msg_set_type(hdr, TIPC_GRP_UCAST_MSG); + msg_set_hdr_sz(hdr, GROUP_H_SIZE); + msg_set_destport(hdr, dport); + msg_set_destnode(hdr, dnode); + + /* Build message as chain of buffers */ + skb_queue_head_init(&pkts); + mtu = tipc_node_get_mtu(net, dnode, tsk->portid); + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + if (unlikely(rc != dlen)) + return rc; + + /* Send message */ + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); + if (unlikely(rc == -ELINKCONG)) { + tipc_dest_push(&tsk->cong_links, dnode, 0); + tsk->cong_link_cnt++; + } + + /* Update send window and sequence number */ + tipc_group_update_member(mb, blks); + + return dlen; +} + +/** + * tipc_send_group_unicast - send message to a member in the group + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + int blks = tsk_blocks(GROUP_H_SIZE + dlen); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct net *net = sock_net(sk); + struct tipc_member *mb = NULL; + u32 node, port; + int rc; + + node = dest->addr.id.node; + port = dest->addr.id.ref; + if (!port && !node) + return -EHOSTUNREACH; + + /* Block or return if destination link or member is congested */ + rc = tipc_wait_for_cond(sock, &timeout, + !tipc_dest_find(&tsk->cong_links, node, 0) && + !tipc_group_cong(grp, node, port, blks, &mb)); + if (unlikely(rc)) + return rc; + + if (unlikely(!mb)) + return -EHOSTUNREACH; + + rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen); + + return rc ? rc : dlen; +} + /** * tipc_send_group_bcast - send message to all members in communication group * @sk: socket structure @@ -1030,8 +1117,20 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) return -EMSGSIZE; - if (unlikely(grp && !dest)) - return tipc_send_group_bcast(sock, m, dlen, timeout); + if (likely(dest)) { + if (unlikely(m->msg_namelen < sizeof(*dest))) + return -EINVAL; + if (unlikely(dest->family != AF_TIPC)) + return -EINVAL; + } + + if (grp) { + if (!dest) + return tipc_send_group_bcast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_ID) + return tipc_send_group_unicast(sock, m, dlen, timeout); + return -EINVAL; + } if (unlikely(!dest)) { dest = &tsk->peer; @@ -1039,12 +1138,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) return -EDESTADDRREQ; } - if (unlikely(m->msg_namelen < sizeof(*dest))) - return -EINVAL; - - if (unlikely(dest->family != AF_TIPC)) - return -EINVAL; - if (unlikely(syn)) { if (sk->sk_state == TIPC_LISTEN) return -EPIPE; @@ -1077,7 +1170,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) msg_set_destport(hdr, dport); if (unlikely(!dport && !dnode)) return -EHOSTUNREACH; - } else if (dest->addrtype == TIPC_ADDR_ID) { dnode = dest->addr.id.node; msg_set_type(hdr, TIPC_DIRECT_MSG); @@ -1846,7 +1938,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG)) + else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG)) return kfree_skb(skb); if (unlikely(grp)) -- cgit v1.2.3 From 5b8dddb63769587badc50725ec9857caaeba4de0 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:29 +0200 Subject: tipc: introduce group multicast messaging The previously introduced message transport to all group members is based on the tipc multicast service, but is logically a broadcast service within the group, and that is what we call it. We now add functionality for sending messages to all group members having a certain identity. Correspondingly, we call this feature 'group multicast'. The service is using unicast when only one destination is found, otherwise it will use the bearer broadcast service to transfer the messages. In the latter case, the receiving members filter arriving messages by looking at the intended destination instance. If there is no match, the message will be dropped, while still being considered received and read as seen by the flow control mechanism. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 14 +++++++++++++- net/tipc/msg.h | 11 +++++++++-- net/tipc/socket.c | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 74 insertions(+), 7 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/group.c b/net/tipc/group.c index 16aaaa97a005..ffac2f33fce2 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -413,10 +413,22 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, if (!tipc_group_is_receiver(m)) goto drop; + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + + /* Drop multicast here if not for this member */ + if (mtyp == TIPC_GRP_MCAST_MSG) { + if (msg_nameinst(hdr) != grp->instance) { + m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; + tipc_group_update_rcv_win(grp, msg_blocks(hdr), + node, port, xmitq); + kfree_skb(skb); + return; + } + } + TIPC_SKB_CB(skb)->orig_member = m->instance; __skb_queue_tail(inputq, skb); - m->bc_rcv_nxt = msg_grp_bc_seqno(hdr) + 1; return; drop: kfree_skb(skb); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index f5033f4a7951..d6f98215267e 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -67,7 +67,8 @@ struct plist; #define TIPC_DIRECT_MSG 3 #define TIPC_GRP_MEMBER_EVT 4 #define TIPC_GRP_BCAST_MSG 5 -#define TIPC_GRP_UCAST_MSG 6 +#define TIPC_GRP_MCAST_MSG 6 +#define TIPC_GRP_UCAST_MSG 7 /* * Internal message users @@ -195,6 +196,11 @@ static inline u32 msg_size(struct tipc_msg *m) return msg_bits(m, 0, 0, 0x1ffff); } +static inline u32 msg_blocks(struct tipc_msg *m) +{ + return (msg_size(m) / 1024) + 1; +} + static inline u32 msg_data_sz(struct tipc_msg *m) { return msg_size(m) - msg_hdr_sz(m); @@ -279,7 +285,8 @@ static inline u32 msg_mcast(struct tipc_msg *m) { int mtyp = msg_type(m); - return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG)); + return ((mtyp == TIPC_MCAST_MSG) || (mtyp == TIPC_GRP_BCAST_MSG) || + (mtyp == TIPC_GRP_MCAST_MSG)); } static inline u32 msg_connected(struct tipc_msg *m) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 66165e12d2f4..8fdd969e12bd 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -999,6 +999,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m, static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, int dlen, long timeout) { + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); struct sock *sk = sock->sk; struct net *net = sock_net(sk); struct tipc_sock *tsk = tipc_sk(sk); @@ -1021,11 +1022,16 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, return rc; /* Complete message header */ - msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + if (dest) { + msg_set_type(hdr, TIPC_GRP_MCAST_MSG); + msg_set_nameinst(hdr, dest->addr.name.name.instance); + } else { + msg_set_type(hdr, TIPC_GRP_BCAST_MSG); + msg_set_nameinst(hdr, 0); + } msg_set_hdr_sz(hdr, GROUP_H_SIZE); msg_set_destport(hdr, 0); msg_set_destnode(hdr, 0); - msg_set_nameinst(hdr, 0); msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); /* Build message as chain of buffers */ @@ -1045,6 +1051,48 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, return dlen; } +/** + * tipc_send_group_mcast - send message to all members with given identity + * @sock: socket structure + * @m: message to send + * @dlen: total length of message data + * @timeout: timeout to wait for wakeup + * + * Called from function tipc_sendmsg(), which has done all sanity checks + * Returns the number of bytes sent on success, or errno + */ +static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m, + int dlen, long timeout) +{ + struct sock *sk = sock->sk; + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); + struct tipc_name_seq *seq = &dest->addr.nameseq; + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_group *grp = tsk->group; + struct net *net = sock_net(sk); + u32 domain, exclude, dstcnt; + struct list_head dsts; + + INIT_LIST_HEAD(&dsts); + + if (seq->lower != seq->upper) + return -ENOTSUPP; + + domain = addr_domain(net, dest->scope); + exclude = tipc_group_exclude(grp); + if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, + &dsts, &dstcnt, exclude, true)) + return -EHOSTUNREACH; + + if (dstcnt == 1) { + tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref); + return tipc_send_group_unicast(sock, m, dlen, timeout); + } + + tipc_dest_list_purge(&dsts); + return tipc_send_group_bcast(sock, m, dlen, timeout); +} + /** * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets * @arrvq: queue with arriving messages, to be cloned after destination lookup @@ -1213,6 +1261,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) return tipc_send_group_anycast(sock, m, dlen, timeout); if (dest->addrtype == TIPC_ADDR_ID) return tipc_send_group_unicast(sock, m, dlen, timeout); + if (dest->addrtype == TIPC_ADDR_MCAST) + return tipc_send_group_mcast(sock, m, dlen, timeout); return -EINVAL; } @@ -2022,8 +2072,6 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(!msg_isdata(hdr))) tipc_sk_proto_rcv(sk, &inputq, xmitq); - else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG)) - return kfree_skb(skb); if (unlikely(grp)) tipc_group_filter_msg(grp, &inputq, xmitq); -- cgit v1.2.3 From 2f487712b89376fce267223bbb0db93d393d4b09 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:31 +0200 Subject: tipc: guarantee that group broadcast doesn't bypass group unicast We need a mechanism guaranteeing that group unicasts sent out from a socket are not bypassed by later sent broadcasts from the same socket. We do this as follows: - Each time a unicast is sent, we set a the broadcast method for the socket to "replicast" and "mandatory". This forces the first subsequent broadcast message to follow the same network and data path as the preceding unicast to a destination, hence preventing it from overtaking the latter. - In order to make the 'same data path' statement above true, we let group unicasts pass through the multicast link input queue, instead of as previously through the unicast link input queue. - In the first broadcast following a unicast, we set a new header flag, requiring all recipients to immediately acknowledge its reception. - During the period before all the expected acknowledges are received, the socket refuses to accept any more broadcast attempts, i.e., by blocking or returning EAGAIN. This period should typically not be longer than a few microseconds. - When all acknowledges have been received, the sending socket will open up for subsequent broadcasts, this time giving the link layer freedom to itself select the best transmission method. - The forced and/or abrupt transmission method changes described above may lead to broadcasts arriving out of order to the recipients. We remedy this by introducing code that checks and if necessary re-orders such messages at the receiving end. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 47 +++++++++++++++++++++++++++++++++++++++++------ net/tipc/group.h | 4 +--- net/tipc/link.c | 5 ++--- net/tipc/msg.h | 21 +++++++++++++++++++++ net/tipc/socket.c | 34 +++++++++++++++++++++++++++++----- 5 files changed, 94 insertions(+), 17 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/group.c b/net/tipc/group.c index 985e0ce32e8e..eab862e047dc 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -71,6 +71,7 @@ struct tipc_member { u16 advertised; u16 window; u16 bc_rcv_nxt; + u16 bc_acked; bool usr_pending; }; @@ -87,6 +88,7 @@ struct tipc_group { u32 portid; u16 member_cnt; u16 bc_snd_nxt; + u16 bc_ackers; bool loopback; bool events; }; @@ -258,6 +260,7 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, m->group = grp; m->node = node; m->port = port; + m->bc_acked = grp->bc_snd_nxt - 1; grp->member_cnt++; tipc_group_add_to_tree(grp, m); tipc_nlist_add(&grp->dests, m->node); @@ -275,6 +278,11 @@ static void tipc_group_delete_member(struct tipc_group *grp, { rb_erase(&m->tree_node, &grp->members); grp->member_cnt--; + + /* Check if we were waiting for replicast ack from this member */ + if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) + grp->bc_ackers--; + list_del_init(&m->list); list_del_init(&m->congested); @@ -325,16 +333,23 @@ void tipc_group_update_member(struct tipc_member *m, int len) list_add_tail(&m->congested, &grp->congested); } -void tipc_group_update_bc_members(struct tipc_group *grp, int len) +void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) { + u16 prev = grp->bc_snd_nxt - 1; struct tipc_member *m; struct rb_node *n; for (n = rb_first(&grp->members); n; n = rb_next(n)) { m = container_of(n, struct tipc_member, tree_node); - if (tipc_group_is_enabled(m)) + if (tipc_group_is_enabled(m)) { tipc_group_update_member(m, len); + m->bc_acked = prev; + } } + + /* Mark number of acknowledges to expect, if any */ + if (ack) + grp->bc_ackers = grp->member_cnt; grp->bc_snd_nxt++; } @@ -372,6 +387,10 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len) { struct tipc_member *m = NULL; + /* If prev bcast was replicast, reject until all receivers have acked */ + if (grp->bc_ackers) + return true; + if (list_empty(&grp->congested)) return false; @@ -391,7 +410,7 @@ static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) struct sk_buff *_skb, *tmp; int mtyp = msg_type(hdr); - /* Bcast may be bypassed by unicast, - sort it in */ + /* Bcast may be bypassed by unicast or other bcast, - sort it in */ if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); @@ -412,10 +431,10 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, struct sk_buff_head *xmitq) { struct sk_buff *skb = __skb_dequeue(inputq); + bool ack, deliver, update; struct sk_buff_head *defq; struct tipc_member *m; struct tipc_msg *hdr; - bool deliver, update; u32 node, port; int mtyp, blks; @@ -451,6 +470,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, hdr = buf_msg(skb); mtyp = msg_type(hdr); deliver = true; + ack = false; update = false; if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) @@ -466,6 +486,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, /* Fall thru */ case TIPC_GRP_BCAST_MSG: m->bc_rcv_nxt++; + ack = msg_grp_bc_ack_req(hdr); break; case TIPC_GRP_UCAST_MSG: break; @@ -480,6 +501,9 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, else kfree_skb(skb); + if (ack) + tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); + if (!update) continue; @@ -540,6 +564,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, } else if (mtyp == GRP_ADV_MSG) { msg_set_adv_win(hdr, adv); m->advertised += adv; + } else if (mtyp == GRP_ACK_MSG) { + msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); } __skb_queue_tail(xmitq, skb); } @@ -593,7 +619,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, } /* Otherwise deliver already received WITHDRAW event */ __skb_queue_tail(inputq, m->event_msg); - *usr_wakeup = m->usr_pending; + *usr_wakeup = true; tipc_group_delete_member(grp, m); list_del_init(&m->congested); return; @@ -605,6 +631,15 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, m->usr_pending = false; list_del_init(&m->congested); return; + case GRP_ACK_MSG: + if (!m) + return; + m->bc_acked = msg_grp_bc_acked(hdr); + if (--grp->bc_ackers) + break; + *usr_wakeup = true; + m->usr_pending = false; + return; default: pr_warn("Received unknown GROUP_PROTO message\n"); } @@ -678,7 +713,7 @@ void tipc_group_member_evt(struct tipc_group *grp, TIPC_SKB_CB(skb)->orig_member = m->instance; - *usr_wakeup = m->usr_pending; + *usr_wakeup = true; m->usr_pending = false; /* Hold back event if more messages might be expected */ diff --git a/net/tipc/group.h b/net/tipc/group.h index e432066a211e..d525e1cd7de5 100644 --- a/net/tipc/group.h +++ b/net/tipc/group.h @@ -61,7 +61,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, struct tipc_msg *hdr, struct sk_buff_head *inputq, struct sk_buff_head *xmitq); -void tipc_group_update_bc_members(struct tipc_group *grp, int len); +void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack); bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, int len, struct tipc_member **m); bool tipc_group_bc_cong(struct tipc_group *grp, int len); @@ -69,7 +69,5 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq); u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); void tipc_group_update_member(struct tipc_member *m, int len); -struct tipc_member *tipc_group_find_sender(struct tipc_group *grp, - u32 node, u32 port); int tipc_group_size(struct tipc_group *grp); #endif diff --git a/net/tipc/link.c b/net/tipc/link.c index bd25bff63925..70a21499804d 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1046,13 +1046,12 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, case TIPC_MEDIUM_IMPORTANCE: case TIPC_HIGH_IMPORTANCE: case TIPC_CRITICAL_IMPORTANCE: - if (unlikely(msg_mcast(hdr))) { + if (unlikely(msg_in_group(hdr) || msg_mcast(hdr))) { skb_queue_tail(l->bc_rcvlink->inputq, skb); return true; } - case CONN_MANAGER: case GROUP_PROTOCOL: - skb_queue_tail(inputq, skb); + case CONN_MANAGER: return true; case NAME_DISTRIBUTOR: l->bc_rcvlink->state = LINK_ESTABLISHED; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index d6f98215267e..52c6a2e01995 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -547,6 +547,7 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define GRP_JOIN_MSG 0 #define GRP_LEAVE_MSG 1 #define GRP_ADV_MSG 2 +#define GRP_ACK_MSG 3 /* * Word 1 @@ -839,6 +840,16 @@ static inline void msg_set_grp_bc_syncpt(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_grp_bc_acked(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + /* Word 10 */ static inline u16 msg_grp_evt(struct tipc_msg *m) @@ -851,6 +862,16 @@ static inline void msg_set_grp_evt(struct tipc_msg *m, int n) msg_set_bits(m, 10, 0, 0x3, n); } +static inline u16 msg_grp_bc_ack_req(struct tipc_msg *m) +{ + return msg_bits(m, 10, 0, 0x1); +} + +static inline void msg_set_grp_bc_ack_req(struct tipc_msg *m, bool n) +{ + msg_set_bits(m, 10, 0, 0x1, n); +} + static inline u16 msg_grp_bc_seqno(struct tipc_msg *m) { return msg_bits(m, 10, 16, 0xffff); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 3276b7a0d445..b1f1c3c2b1e2 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -831,6 +831,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, u32 dnode, u32 dport, int dlen) { u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group); + struct tipc_mc_method *method = &tsk->mc_method; int blks = tsk_blocks(GROUP_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; struct sk_buff_head pkts; @@ -857,9 +858,12 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, tsk->cong_link_cnt++; } - /* Update send window and sequence number */ + /* Update send window */ tipc_group_update_member(mb, blks); + /* A broadcast sent within next EXPIRE period must follow same path */ + method->rcast = true; + method->mandatory = true; return dlen; } @@ -1008,6 +1012,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, struct tipc_group *grp = tsk->group; struct tipc_nlist *dsts = tipc_group_dests(grp); struct tipc_mc_method *method = &tsk->mc_method; + bool ack = method->mandatory && method->rcast; int blks = tsk_blocks(MCAST_H_SIZE + dlen); struct tipc_msg *hdr = &tsk->phdr; int mtu = tipc_bcast_get_mtu(net); @@ -1036,6 +1041,9 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, msg_set_destnode(hdr, 0); msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp)); + /* Avoid getting stuck with repeated forced replicasts */ + msg_set_grp_bc_ack_req(hdr, ack); + /* Build message as chain of buffers */ skb_queue_head_init(&pkts); rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); @@ -1043,13 +1051,17 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, return rc; /* Send message */ - rc = tipc_mcast_xmit(net, &pkts, method, dsts, - &tsk->cong_link_cnt); + rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt); if (unlikely(rc)) return rc; /* Update broadcast sequence number and send windows */ - tipc_group_update_bc_members(tsk->group, blks); + tipc_group_update_bc_members(tsk->group, blks, ack); + + /* Broadcast link is now free to choose method for next broadcast */ + method->mandatory = false; + method->expires = jiffies; + return dlen; } @@ -1113,7 +1125,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, u32 portid, oport, onode; struct list_head dports; struct tipc_msg *msg; - int hsz; + int user, mtyp, hsz; __skb_queue_head_init(&tmpq); INIT_LIST_HEAD(&dports); @@ -1121,6 +1133,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, skb = tipc_skb_peek(arrvq, &inputq->lock); for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { msg = buf_msg(skb); + user = msg_user(msg); + mtyp = msg_type(msg); + if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) { + spin_lock_bh(&inputq->lock); + if (skb_peek(arrvq) == skb) { + __skb_dequeue(arrvq); + __skb_queue_tail(inputq, skb); + } + refcount_dec(&skb->users); + spin_unlock_bh(&inputq->lock); + continue; + } hsz = skb_headroom(skb) + msg_hdr_sz(msg); oport = msg_origport(msg); onode = msg_orignode(msg); -- cgit v1.2.3 From 04d7b574b245c66001a33cb9da2c0311063af73f Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Fri, 13 Oct 2017 11:04:34 +0200 Subject: tipc: add multipoint-to-point flow control We already have point-to-multipoint flow control within a group. But we even need the opposite; -a scheme which can handle that potentially hundreds of sources may try to send messages to the same destination simultaneously without causing buffer overflow at the recipient. This commit adds such a mechanism. The algorithm works as follows: - When a member detects a new, joining member, it initially set its state to JOINED and advertises a minimum window to the new member. This window is chosen so that the new member can send exactly one maximum sized message, or several smaller ones, to the recipient before it must stop and wait for an additional advertisement. This minimum window ADV_IDLE is set to 65 1kB blocks. - When a member receives the first data message from a JOINED member, it changes the state of the latter to ACTIVE, and advertises a larger window ADV_ACTIVE = 12 x ADV_IDLE blocks to the sender, so it can continue sending with minimal disturbances to the data flow. - The active members are kept in a dedicated linked list. Each time a message is received from an active member, it will be moved to the tail of that list. This way, we keep a record of which members have been most (tail) and least (head) recently active. - There is a maximum number (16) of permitted simultaneous active senders per receiver. When this limit is reached, the receiver will not advertise anything immediately to a new sender, but instead put it in a PENDING state, and add it to a corresponding queue. At the same time, it will pick the least recently active member, send it an advertisement RECLAIM message, and set this member to state RECLAIMING. - The reclaimee member has to respond with a REMIT message, meaning that it goes back to a send window of ADV_IDLE, and returns its unused advertised blocks beyond that value to the reclaiming member. - When the reclaiming member receives the REMIT message, it unlinks the reclaimee from its active list, resets its state to JOINED, and notes that it is now back at ADV_IDLE advertised blocks to that member. If there are still unread data messages sent out by reclaimee before the REMIT, the member goes into an intermediate state REMITTED, where it stays until the said messages have been consumed. - The returned advertised blocks can now be re-advertised to the pending member, which is now set to state ACTIVE and added to the active member list. - To be proactive, i.e., to minimize the risk that any member will end up in the pending queue, we start reclaiming resources already when the number of active members exceeds 3/4 of the permitted maximum. Signed-off-by: Jon Maloy Acked-by: Ying Xue Signed-off-by: David S. Miller --- net/tipc/group.c | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- net/tipc/msg.h | 12 ++++++ 2 files changed, 136 insertions(+), 5 deletions(-) (limited to 'net/tipc/msg.h') diff --git a/net/tipc/group.c b/net/tipc/group.c index 8f0eb5d22e8f..7821085a7dd8 100644 --- a/net/tipc/group.c +++ b/net/tipc/group.c @@ -54,6 +54,10 @@ enum mbr_state { MBR_JOINING, MBR_PUBLISHED, MBR_JOINED, + MBR_PENDING, + MBR_ACTIVE, + MBR_RECLAIMING, + MBR_REMITTED, MBR_LEAVING }; @@ -79,6 +83,9 @@ struct tipc_member { struct tipc_group { struct rb_root members; struct list_head congested; + struct list_head pending; + struct list_head active; + struct list_head reclaiming; struct tipc_nlist dests; struct net *net; int subid; @@ -88,6 +95,8 @@ struct tipc_group { u32 scope; u32 portid; u16 member_cnt; + u16 active_cnt; + u16 max_active; u16 bc_snd_nxt; u16 bc_ackers; bool loopback; @@ -97,12 +106,29 @@ struct tipc_group { static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, int mtyp, struct sk_buff_head *xmitq); +static void tipc_group_decr_active(struct tipc_group *grp, + struct tipc_member *m) +{ + if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) + grp->active_cnt--; +} + static int tipc_group_rcvbuf_limit(struct tipc_group *grp) { + int max_active, active_pool, idle_pool; int mcnt = grp->member_cnt + 1; + /* Limit simultaneous reception from other members */ + max_active = min(mcnt / 8, 64); + max_active = max(max_active, 16); + grp->max_active = max_active; + + /* Reserve blocks for active and idle members */ + active_pool = max_active * ADV_ACTIVE; + idle_pool = (mcnt - max_active) * ADV_IDLE; + /* Scale to bytes, considering worst-case truesize/msgsize ratio */ - return mcnt * ADV_ACTIVE * FLOWCTL_BLK_SZ * 4; + return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; } u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) @@ -143,6 +169,9 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid, return NULL; tipc_nlist_init(&grp->dests, tipc_own_addr(net)); INIT_LIST_HEAD(&grp->congested); + INIT_LIST_HEAD(&grp->active); + INIT_LIST_HEAD(&grp->pending); + INIT_LIST_HEAD(&grp->reclaiming); grp->members = RB_ROOT; grp->net = net; grp->portid = portid; @@ -286,6 +315,7 @@ static void tipc_group_delete_member(struct tipc_group *grp, list_del_init(&m->list); list_del_init(&m->congested); + tipc_group_decr_active(grp, m); /* If last member on a node, remove node from dest list */ if (!tipc_group_find_node(grp, m->node)) @@ -378,6 +408,10 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, return true; if (state == MBR_JOINED && adv == ADV_IDLE) return true; + if (state == MBR_ACTIVE && adv == ADV_ACTIVE) + return true; + if (state == MBR_PENDING && adv == ADV_IDLE) + return true; skb_queue_head_init(&xmitq); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); tipc_node_distr_xmit(grp->net, &xmitq); @@ -523,7 +557,11 @@ drop: void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, u32 port, struct sk_buff_head *xmitq) { - struct tipc_member *m; + struct list_head *active = &grp->active; + int max_active = grp->max_active; + int reclaim_limit = max_active * 3 / 4; + int active_cnt = grp->active_cnt; + struct tipc_member *m, *rm; m = tipc_group_find_member(grp, node, port); if (!m) @@ -533,9 +571,41 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, switch (m->state) { case MBR_JOINED: - if (m->advertised <= (ADV_ACTIVE - ADV_UNIT)) + /* Reclaim advertised space from least active member */ + if (!list_empty(active) && active_cnt >= reclaim_limit) { + rm = list_first_entry(active, struct tipc_member, list); + rm->state = MBR_RECLAIMING; + list_move_tail(&rm->list, &grp->reclaiming); + tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); + } + /* If max active, become pending and wait for reclaimed space */ + if (active_cnt >= max_active) { + m->state = MBR_PENDING; + list_add_tail(&m->list, &grp->pending); + break; + } + /* Otherwise become active */ + m->state = MBR_ACTIVE; + list_add_tail(&m->list, &grp->active); + grp->active_cnt++; + /* Fall through */ + case MBR_ACTIVE: + if (!list_is_last(&m->list, &grp->active)) + list_move_tail(&m->list, &grp->active); + if (m->advertised > (ADV_ACTIVE * 3 / 4)) + break; + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + break; + case MBR_REMITTED: + if (m->advertised > ADV_IDLE) + break; + m->state = MBR_JOINED; + if (m->advertised < ADV_IDLE) { + pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + } break; + case MBR_RECLAIMING: case MBR_DISCOVERED: case MBR_JOINING: case MBR_LEAVING: @@ -557,8 +627,10 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, if (!skb) return; - if (m->state == MBR_JOINED) + if (m->state == MBR_ACTIVE) adv = ADV_ACTIVE - m->advertised; + else if (m->state == MBR_JOINED || m->state == MBR_PENDING) + adv = ADV_IDLE - m->advertised; hdr = buf_msg(skb); @@ -573,6 +645,8 @@ static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, m->advertised += adv; } else if (mtyp == GRP_ACK_MSG) { msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); + } else if (mtyp == GRP_REMIT_MSG) { + msg_set_grp_remitted(hdr, m->window); } __skb_queue_tail(xmitq, skb); } @@ -583,8 +657,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, { u32 node = msg_orignode(hdr); u32 port = msg_origport(hdr); - struct tipc_member *m; + struct tipc_member *m, *pm; struct tipc_msg *ehdr; + u16 remitted, in_flight; if (!grp) return; @@ -626,6 +701,7 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, /* Wait until WITHDRAW event is received */ if (m->state != MBR_LEAVING) { + tipc_group_decr_active(grp, m); m->state = MBR_LEAVING; return; } @@ -653,6 +729,48 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, *usr_wakeup = true; m->usr_pending = false; return; + case GRP_RECLAIM_MSG: + if (!m) + return; + *usr_wakeup = m->usr_pending; + m->usr_pending = false; + tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); + m->window = ADV_IDLE; + return; + case GRP_REMIT_MSG: + if (!m || m->state != MBR_RECLAIMING) + return; + + list_del_init(&m->list); + grp->active_cnt--; + remitted = msg_grp_remitted(hdr); + + /* Messages preceding the REMIT still in receive queue */ + if (m->advertised > remitted) { + m->state = MBR_REMITTED; + in_flight = m->advertised - remitted; + } + /* All messages preceding the REMIT have been read */ + if (m->advertised <= remitted) { + m->state = MBR_JOINED; + in_flight = 0; + } + /* ..and the REMIT overtaken by more messages => re-advertise */ + if (m->advertised < remitted) + tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); + + m->advertised = ADV_IDLE + in_flight; + + /* Set oldest pending member to active and advertise */ + if (list_empty(&grp->pending)) + return; + pm = list_first_entry(&grp->pending, struct tipc_member, list); + pm->state = MBR_ACTIVE; + list_move_tail(&pm->list, &grp->active); + grp->active_cnt++; + if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) + tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); + return; default: pr_warn("Received unknown GROUP_PROTO message\n"); } @@ -735,6 +853,7 @@ void tipc_group_member_evt(struct tipc_group *grp, /* Hold back event if more messages might be expected */ if (m->state != MBR_LEAVING && node_up) { m->event_msg = skb; + tipc_group_decr_active(grp, m); m->state = MBR_LEAVING; } else { if (node_up) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 52c6a2e01995..cedf811317fb 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -548,6 +548,8 @@ static inline void msg_set_nameupper(struct tipc_msg *m, u32 n) #define GRP_LEAVE_MSG 1 #define GRP_ADV_MSG 2 #define GRP_ACK_MSG 3 +#define GRP_RECLAIM_MSG 4 +#define GRP_REMIT_MSG 5 /* * Word 1 @@ -850,6 +852,16 @@ static inline void msg_set_grp_bc_acked(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_grp_remitted(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_grp_remitted(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + /* Word 10 */ static inline u16 msg_grp_evt(struct tipc_msg *m) -- cgit v1.2.3 From 8d6e79d3ce13e34957de87f7584cbf1bcde74c57 Mon Sep 17 00:00:00 2001 From: Jon Maloy Date: Wed, 8 Nov 2017 09:59:26 +0100 Subject: tipc: improve link resiliency when rps is activated Currently, the TIPC RPS dissector is based only on the incoming packets' source node address, hence steering all traffic from a node to the same core. We have seen that this makes the links vulnerable to starvation and unnecessary resets when we turn down the link tolerance to very low values. To reduce the risk of this happening, we exempt probe and probe replies packets from the convergence to one core per source node. Instead, we do the opposite, - we try to diverge those packets across as many cores as possible, by randomizing the flow selector key. To make such packets identifiable to the dissector, we add a new 'is_keepalive' bit to word 0 of the LINK_PROTOCOL header. This bit is set both for PROBE and PROBE_REPLY messages, and only for those. It should be noted that these packets are not part of any flow anyway, and only constitute a minuscule fraction of all packets sent across a link. Hence, there is no risk that this will affect overall performance. Acked-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- include/net/flow_dissector.h | 12 ++++----- include/net/tipc.h | 62 ++++++++++++++++++++++++++++++++++++++++++++ net/core/flow_dissector.c | 30 ++++++++++----------- net/tipc/link.c | 26 +++++++++++-------- net/tipc/msg.h | 10 +++++++ 5 files changed, 108 insertions(+), 32 deletions(-) create mode 100644 include/net/tipc.h (limited to 'net/tipc/msg.h') diff --git a/include/net/flow_dissector.h b/include/net/flow_dissector.h index 22aba321282d..9a074776f70b 100644 --- a/include/net/flow_dissector.h +++ b/include/net/flow_dissector.h @@ -84,11 +84,11 @@ struct flow_dissector_key_ipv6_addrs { }; /** - * struct flow_dissector_key_tipc_addrs: - * @srcnode: source node address + * struct flow_dissector_key_tipc: + * @key: source node address combined with selector */ -struct flow_dissector_key_tipc_addrs { - __be32 srcnode; +struct flow_dissector_key_tipc { + __be32 key; }; /** @@ -100,7 +100,7 @@ struct flow_dissector_key_addrs { union { struct flow_dissector_key_ipv4_addrs v4addrs; struct flow_dissector_key_ipv6_addrs v6addrs; - struct flow_dissector_key_tipc_addrs tipcaddrs; + struct flow_dissector_key_tipc tipckey; }; }; @@ -192,7 +192,7 @@ enum flow_dissector_key_id { FLOW_DISSECTOR_KEY_PORTS, /* struct flow_dissector_key_ports */ FLOW_DISSECTOR_KEY_ICMP, /* struct flow_dissector_key_icmp */ FLOW_DISSECTOR_KEY_ETH_ADDRS, /* struct flow_dissector_key_eth_addrs */ - FLOW_DISSECTOR_KEY_TIPC_ADDRS, /* struct flow_dissector_key_tipc_addrs */ + FLOW_DISSECTOR_KEY_TIPC, /* struct flow_dissector_key_tipc */ FLOW_DISSECTOR_KEY_ARP, /* struct flow_dissector_key_arp */ FLOW_DISSECTOR_KEY_VLAN, /* struct flow_dissector_key_flow_vlan */ FLOW_DISSECTOR_KEY_FLOW_LABEL, /* struct flow_dissector_key_flow_tags */ diff --git a/include/net/tipc.h b/include/net/tipc.h new file mode 100644 index 000000000000..07670ec022a7 --- /dev/null +++ b/include/net/tipc.h @@ -0,0 +1,62 @@ +/* + * include/net/tipc.h: Include file for TIPC message header routines + * + * Copyright (c) 2017 Ericsson AB + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the names of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * Alternatively, this software may be distributed under the terms of the + * GNU General Public License ("GPL") version 2 as published by the Free + * Software Foundation. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _TIPC_HDR_H +#define _TIPC_HDR_H + +#include + +#define KEEPALIVE_MSG_MASK 0x0e080000 /* LINK_PROTOCOL + MSG_IS_KEEPALIVE */ + +struct tipc_basic_hdr { + __be32 w[4]; +}; + +static inline u32 tipc_hdr_rps_key(struct tipc_basic_hdr *hdr) +{ + u32 w0 = ntohl(hdr->w[0]); + bool keepalive_msg = (w0 & KEEPALIVE_MSG_MASK) == KEEPALIVE_MSG_MASK; + int key; + + /* Return source node identity as key */ + if (likely(!keepalive_msg)) + return hdr->w[3]; + + /* Spread PROBE/PROBE_REPLY messages across the cores */ + get_random_bytes(&key, sizeof(key)); + return key; +} + +#endif diff --git a/net/core/flow_dissector.c b/net/core/flow_dissector.c index 1f5caafb4492..15ce30063765 100644 --- a/net/core/flow_dissector.c +++ b/net/core/flow_dissector.c @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -772,23 +773,22 @@ proto_again: break; } case htons(ETH_P_TIPC): { - struct { - __be32 pre[3]; - __be32 srcnode; - } *hdr, _hdr; - hdr = __skb_header_pointer(skb, nhoff, sizeof(_hdr), data, hlen, &_hdr); + struct tipc_basic_hdr *hdr, _hdr; + + hdr = __skb_header_pointer(skb, nhoff, sizeof(_hdr), + data, hlen, &_hdr); if (!hdr) { fdret = FLOW_DISSECT_RET_OUT_BAD; break; } if (dissector_uses_key(flow_dissector, - FLOW_DISSECTOR_KEY_TIPC_ADDRS)) { + FLOW_DISSECTOR_KEY_TIPC)) { key_addrs = skb_flow_dissector_target(flow_dissector, - FLOW_DISSECTOR_KEY_TIPC_ADDRS, + FLOW_DISSECTOR_KEY_TIPC, target_container); - key_addrs->tipcaddrs.srcnode = hdr->srcnode; - key_control->addr_type = FLOW_DISSECTOR_KEY_TIPC_ADDRS; + key_addrs->tipckey.key = tipc_hdr_rps_key(hdr); + key_control->addr_type = FLOW_DISSECTOR_KEY_TIPC; } fdret = FLOW_DISSECT_RET_OUT_GOOD; break; @@ -1024,8 +1024,8 @@ static inline size_t flow_keys_hash_length(const struct flow_keys *flow) case FLOW_DISSECTOR_KEY_IPV6_ADDRS: diff -= sizeof(flow->addrs.v6addrs); break; - case FLOW_DISSECTOR_KEY_TIPC_ADDRS: - diff -= sizeof(flow->addrs.tipcaddrs); + case FLOW_DISSECTOR_KEY_TIPC: + diff -= sizeof(flow->addrs.tipckey); break; } return (sizeof(*flow) - diff) / sizeof(u32); @@ -1039,8 +1039,8 @@ __be32 flow_get_u32_src(const struct flow_keys *flow) case FLOW_DISSECTOR_KEY_IPV6_ADDRS: return (__force __be32)ipv6_addr_hash( &flow->addrs.v6addrs.src); - case FLOW_DISSECTOR_KEY_TIPC_ADDRS: - return flow->addrs.tipcaddrs.srcnode; + case FLOW_DISSECTOR_KEY_TIPC: + return flow->addrs.tipckey.key; default: return 0; } @@ -1321,8 +1321,8 @@ static const struct flow_dissector_key flow_keys_dissector_keys[] = { .offset = offsetof(struct flow_keys, addrs.v6addrs), }, { - .key_id = FLOW_DISSECTOR_KEY_TIPC_ADDRS, - .offset = offsetof(struct flow_keys, addrs.tipcaddrs), + .key_id = FLOW_DISSECTOR_KEY_TIPC, + .offset = offsetof(struct flow_keys, addrs.tipckey), }, { .key_id = FLOW_DISSECTOR_KEY_PORTS, diff --git a/net/tipc/link.c b/net/tipc/link.c index 870b9b8f877a..6bce0b1117bd 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -239,7 +239,8 @@ static int link_is_up(struct tipc_link *l) static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *xmitq); static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, - u16 rcvgap, int tolerance, int priority, + bool probe_reply, u16 rcvgap, + int tolerance, int priority, struct sk_buff_head *xmitq); static void link_print(struct tipc_link *l, const char *str); static int tipc_link_build_nack_msg(struct tipc_link *l, @@ -773,7 +774,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) } if (state || probe || setup) - tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, xmitq); + tipc_link_build_proto_msg(l, mtyp, probe, 0, 0, 0, 0, xmitq); return rc; } @@ -1174,7 +1175,7 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq) /* Unicast ACK */ l->rcv_unacked = 0; l->stats.sent_acks++; - tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq); return 0; } @@ -1188,7 +1189,7 @@ void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq) if (l->state == LINK_ESTABLISHING) mtyp = ACTIVATE_MSG; - tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, xmitq); + tipc_link_build_proto_msg(l, mtyp, 0, 0, 0, 0, 0, xmitq); /* Inform peer that this endpoint is going down if applicable */ skb = skb_peek_tail(xmitq); @@ -1215,7 +1216,7 @@ static int tipc_link_build_nack_msg(struct tipc_link *l, } if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) - tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, 0, xmitq); return 0; } @@ -1289,7 +1290,8 @@ drop: } static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, - u16 rcvgap, int tolerance, int priority, + bool probe_reply, u16 rcvgap, + int tolerance, int priority, struct sk_buff_head *xmitq) { struct tipc_link *bcl = l->bc_rcvlink; @@ -1337,6 +1339,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_seq_gap(hdr, rcvgap); msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); msg_set_probe(hdr, probe); + msg_set_is_keepalive(hdr, probe || probe_reply); tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id); msg_set_size(hdr, INT_H_SIZE + dlen); skb_trim(skb, INT_H_SIZE + dlen); @@ -1442,6 +1445,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, u16 rcv_nxt = l->rcv_nxt; u16 dlen = msg_data_sz(hdr); int mtyp = msg_type(hdr); + bool reply = msg_probe(hdr); void *data; char *if_name; int rc = 0; @@ -1528,9 +1532,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, /* Send NACK if peer has sent pkts we haven't received yet */ if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l)) rcvgap = peers_snd_nxt - l->rcv_nxt; - if (rcvgap || (msg_probe(hdr))) - tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, - 0, 0, xmitq); + if (rcvgap || reply) + tipc_link_build_proto_msg(l, STATE_MSG, 0, reply, + rcvgap, 0, 0, xmitq); tipc_link_release_pkts(l, ack); /* If NACK, retransmit will now start at right position */ @@ -2122,14 +2126,14 @@ void tipc_link_set_tolerance(struct tipc_link *l, u32 tol, struct sk_buff_head *xmitq) { l->tolerance = tol; - tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, tol, 0, xmitq); + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, tol, 0, xmitq); } void tipc_link_set_prio(struct tipc_link *l, u32 prio, struct sk_buff_head *xmitq) { l->priority = prio; - tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, prio, xmitq); + tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, prio, xmitq); } void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index cedf811317fb..bf8f57ccc70c 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -226,6 +226,16 @@ static inline void msg_set_dest_droppable(struct tipc_msg *m, u32 d) msg_set_bits(m, 0, 19, 1, d); } +static inline int msg_is_keepalive(struct tipc_msg *m) +{ + return msg_bits(m, 0, 19, 1); +} + +static inline void msg_set_is_keepalive(struct tipc_msg *m, u32 d) +{ + msg_set_bits(m, 0, 19, 1, d); +} + static inline int msg_src_droppable(struct tipc_msg *m) { return msg_bits(m, 0, 18, 1); -- cgit v1.2.3