summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/group.c45
-rw-r--r--net/tipc/group.h3
-rw-r--r--net/tipc/msg.h3
-rw-r--r--net/tipc/socket.c112
4 files changed, 150 insertions, 13 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index b8ed70abba01..18440be5b5fc 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -186,6 +186,17 @@ struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
return NULL;
}
+static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
+ u32 node, u32 port)
+{
+ struct tipc_member *m;
+
+ m = tipc_group_find_member(grp, node, port);
+ if (m && tipc_group_is_enabled(m))
+ return m;
+ return NULL;
+}
+
static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
u32 node)
{
@@ -318,9 +329,39 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len)
grp->bc_snd_nxt++;
}
-bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
+ int len, struct tipc_member **mbr)
{
+ struct sk_buff_head xmitq;
struct tipc_member *m;
+ int adv, state;
+
+ m = tipc_group_find_dest(grp, dnode, dport);
+ *mbr = m;
+ if (!m)
+ return false;
+ if (m->usr_pending)
+ return true;
+ if (m->window >= len)
+ return false;
+ m->usr_pending = true;
+
+ /* If not fully advertised, do it now to prevent mutual blocking */
+ adv = m->advertised;
+ state = m->state;
+ if (state < MBR_JOINED)
+ return true;
+ if (state == MBR_JOINED && adv == ADV_IDLE)
+ return true;
+ skb_queue_head_init(&xmitq);
+ tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
+ tipc_node_distr_xmit(grp->net, &xmitq);
+ return true;
+}
+
+bool tipc_group_bc_cong(struct tipc_group *grp, int len)
+{
+ struct tipc_member *m = NULL;
if (list_empty(&grp->congested))
return false;
@@ -329,7 +370,7 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
if (m->window >= len)
return false;
- return true;
+ return tipc_group_cong(grp, m->node, m->port, len, &m);
}
/* tipc_group_filter_msg() - determine if we should accept arriving message
diff --git a/net/tipc/group.h b/net/tipc/group.h
index 0e2740e1da90..8f77290bb415 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -61,9 +61,12 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
struct sk_buff_head *inputq,
struct sk_buff_head *xmitq);
void tipc_group_update_bc_members(struct tipc_group *grp, int len);
+bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
+ int len, struct tipc_member **m);
bool tipc_group_bc_cong(struct tipc_group *grp, int len);
void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
u32 port, struct sk_buff_head *xmitq);
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
+void tipc_group_update_member(struct tipc_member *m, int len);
int tipc_group_size(struct tipc_group *grp);
#endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 237d007499f9..f5033f4a7951 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -67,6 +67,7 @@ struct plist;
#define TIPC_DIRECT_MSG 3
#define TIPC_GRP_MEMBER_EVT 4
#define TIPC_GRP_BCAST_MSG 5
+#define TIPC_GRP_UCAST_MSG 6
/*
* Internal message users
@@ -261,7 +262,7 @@ static inline int msg_in_group(struct tipc_msg *m)
{
int mtyp = msg_type(m);
- return (mtyp == TIPC_GRP_BCAST_MSG) || (mtyp == TIPC_GRP_MEMBER_EVT);
+ return mtyp >= TIPC_GRP_MEMBER_EVT && mtyp <= TIPC_GRP_UCAST_MSG;
}
static inline bool msg_is_grp_evt(struct tipc_msg *m)
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 50145c95ac96..e71c8d23acb9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -818,6 +818,93 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
}
/**
+ * tipc_send_group_msg - send a message to a member in the group
+ * @net: network namespace
+ * @m: message to send
+ * @mb: group member
+ * @dnode: destination node
+ * @dport: destination port
+ * @dlen: total length of message data
+ */
+static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
+ struct msghdr *m, struct tipc_member *mb,
+ u32 dnode, u32 dport, int dlen)
+{
+ int blks = tsk_blocks(GROUP_H_SIZE + dlen);
+ struct tipc_msg *hdr = &tsk->phdr;
+ struct sk_buff_head pkts;
+ int mtu, rc;
+
+ /* Complete message header */
+ msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
+ msg_set_hdr_sz(hdr, GROUP_H_SIZE);
+ msg_set_destport(hdr, dport);
+ msg_set_destnode(hdr, dnode);
+
+ /* Build message as chain of buffers */
+ skb_queue_head_init(&pkts);
+ mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+ rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+ if (unlikely(rc != dlen))
+ return rc;
+
+ /* Send message */
+ rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+ if (unlikely(rc == -ELINKCONG)) {
+ tipc_dest_push(&tsk->cong_links, dnode, 0);
+ tsk->cong_link_cnt++;
+ }
+
+ /* Update send window and sequence number */
+ tipc_group_update_member(mb, blks);
+
+ return dlen;
+}
+
+/**
+ * tipc_send_group_unicast - send message to a member in the group
+ * @sock: socket structure
+ * @m: message to send
+ * @dlen: total length of message data
+ * @timeout: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
+ int dlen, long timeout)
+{
+ struct sock *sk = sock->sk;
+ DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+ int blks = tsk_blocks(GROUP_H_SIZE + dlen);
+ struct tipc_sock *tsk = tipc_sk(sk);
+ struct tipc_group *grp = tsk->group;
+ struct net *net = sock_net(sk);
+ struct tipc_member *mb = NULL;
+ u32 node, port;
+ int rc;
+
+ node = dest->addr.id.node;
+ port = dest->addr.id.ref;
+ if (!port && !node)
+ return -EHOSTUNREACH;
+
+ /* Block or return if destination link or member is congested */
+ rc = tipc_wait_for_cond(sock, &timeout,
+ !tipc_dest_find(&tsk->cong_links, node, 0) &&
+ !tipc_group_cong(grp, node, port, blks, &mb));
+ if (unlikely(rc))
+ return rc;
+
+ if (unlikely(!mb))
+ return -EHOSTUNREACH;
+
+ rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
+
+ return rc ? rc : dlen;
+}
+
+/**
* tipc_send_group_bcast - send message to all members in communication group
* @sk: socket structure
* @m: message to send
@@ -1030,8 +1117,20 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
return -EMSGSIZE;
- if (unlikely(grp && !dest))
- return tipc_send_group_bcast(sock, m, dlen, timeout);
+ if (likely(dest)) {
+ if (unlikely(m->msg_namelen < sizeof(*dest)))
+ return -EINVAL;
+ if (unlikely(dest->family != AF_TIPC))
+ return -EINVAL;
+ }
+
+ if (grp) {
+ if (!dest)
+ return tipc_send_group_bcast(sock, m, dlen, timeout);
+ if (dest->addrtype == TIPC_ADDR_ID)
+ return tipc_send_group_unicast(sock, m, dlen, timeout);
+ return -EINVAL;
+ }
if (unlikely(!dest)) {
dest = &tsk->peer;
@@ -1039,12 +1138,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
return -EDESTADDRREQ;
}
- if (unlikely(m->msg_namelen < sizeof(*dest)))
- return -EINVAL;
-
- if (unlikely(dest->family != AF_TIPC))
- return -EINVAL;
-
if (unlikely(syn)) {
if (sk->sk_state == TIPC_LISTEN)
return -EPIPE;
@@ -1077,7 +1170,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
msg_set_destport(hdr, dport);
if (unlikely(!dport && !dnode))
return -EHOSTUNREACH;
-
} else if (dest->addrtype == TIPC_ADDR_ID) {
dnode = dest->addr.id.node;
msg_set_type(hdr, TIPC_DIRECT_MSG);
@@ -1846,7 +1938,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
if (unlikely(!msg_isdata(hdr)))
tipc_sk_proto_rcv(sk, &inputq, xmitq);
- else if (unlikely(msg_type(hdr) > TIPC_GRP_BCAST_MSG))
+ else if (unlikely(msg_type(hdr) > TIPC_GRP_UCAST_MSG))
return kfree_skb(skb);
if (unlikely(grp))