diff options
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r-- | net/tipc/socket.c | 212 |
1 files changed, 142 insertions, 70 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 4d420bb27396..c49b8df438cb 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -42,6 +42,7 @@ #include "name_distr.h" #include "socket.h" #include "bcast.h" +#include "netlink.h" #define SS_LISTENING -1 /* socket is listening */ #define SS_READY -2 /* socket is connectionless */ @@ -95,8 +96,11 @@ struct tipc_sock { uint conn_timeout; atomic_t dupl_rcvcnt; bool link_cong; - uint sent_unacked; - uint rcv_unacked; + u16 snt_unacked; + u16 snd_win; + u16 peer_caps; + u16 rcv_unacked; + u16 rcv_win; struct sockaddr_tipc remote; struct rhash_head node; struct rcu_head rcu; @@ -126,14 +130,6 @@ static const struct proto_ops stream_ops; static const struct proto_ops msg_ops; static struct proto tipc_proto; -static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = { - [TIPC_NLA_SOCK_UNSPEC] = { .type = NLA_UNSPEC }, - [TIPC_NLA_SOCK_ADDR] = { .type = NLA_U32 }, - [TIPC_NLA_SOCK_REF] = { .type = NLA_U32 }, - [TIPC_NLA_SOCK_CON] = { .type = NLA_NESTED }, - [TIPC_NLA_SOCK_HAS_PUBL] = { .type = NLA_FLAG } -}; - static const struct rhashtable_params tsk_rht_params; /* @@ -234,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk) return container_of(sk, struct tipc_sock, sk); } -static int tsk_conn_cong(struct tipc_sock *tsk) +static bool tsk_conn_cong(struct tipc_sock *tsk) +{ + return tsk->snt_unacked >= tsk->snd_win; +} + +/* tsk_blocks(): translate a buffer size in bytes to number of + * advertisable blocks, taking into account the ratio truesize(len)/len + * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ + */ +static u16 tsk_adv_blocks(int len) +{ + return len / FLOWCTL_BLK_SZ / 4; +} + +/* tsk_inc(): increment counter for sent or received data + * - If block based flow control is not supported by peer we + * fall back to message based ditto, incrementing the counter + */ +static u16 tsk_inc(struct tipc_sock *tsk, int msglen) { - return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; + if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL)) + return ((msglen / FLOWCTL_BLK_SZ) + 1); + return 1; } /** @@ -373,7 +389,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sock->state = state; sock_init_data(sock, sk); if (tipc_sk_insert(tsk)) { - pr_warn("Socket create failed; port numbrer exhausted\n"); + pr_warn("Socket create failed; port number exhausted\n"); return -EINVAL; } msg_set_origport(msg, tsk->portid); @@ -384,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sk->sk_write_space = tipc_write_space; sk->sk_destruct = tipc_sock_destruct; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; - tsk->sent_unacked = 0; atomic_set(&tsk->dupl_rcvcnt, 0); + /* Start out with safe limits until we receive an advertised window */ + tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN); + tsk->rcv_win = tsk->snd_win; + if (sock->state == SS_READY) { tsk_set_unreturnable(tsk, true); if (sock->type == SOCK_DGRAM) @@ -777,12 +796,14 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, * @tsk: receiving socket * @skb: pointer to message buffer. */ -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { struct sock *sk = &tsk->sk; + u32 onode = tsk_own_node(tsk); struct tipc_msg *hdr = buf_msg(skb); int mtyp = msg_type(hdr); - int conn_cong; + bool conn_cong; /* Ignore if connection cannot be validated: */ if (!tsk_peer_msg(tsk, hdr)) @@ -792,11 +813,14 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) if (mtyp == CONN_PROBE) { msg_set_type(hdr, CONN_PROBE_REPLY); - tipc_sk_respond(sk, skb, TIPC_OK); + if (tipc_msg_reverse(onode, &skb, TIPC_OK)) + __skb_queue_tail(xmitq, skb); return; } else if (mtyp == CONN_ACK) { conn_cong = tsk_conn_cong(tsk); - tsk->sent_unacked -= msg_msgcnt(hdr); + tsk->snt_unacked -= msg_conn_ack(hdr); + if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) + tsk->snd_win = msg_adv_win(hdr); if (conn_cong) sk->sk_write_space(sk); } else if (mtyp != CONN_PROBE_REPLY) { @@ -1027,12 +1051,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) u32 dnode; uint mtu, send, sent = 0; struct iov_iter save; + int hlen = MIN_H_SIZE; /* Handle implied connection establishment */ if (unlikely(dest)) { rc = __tipc_sendmsg(sock, m, dsz); + hlen = msg_hdr_sz(mhdr); if (dsz && (dsz == rc)) - tsk->sent_unacked = 1; + tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); return rc; } if (dsz > (uint)INT_MAX) @@ -1061,7 +1087,7 @@ next: if (likely(!tsk_conn_cong(tsk))) { rc = tipc_node_xmit(net, &pktchain, dnode, portid); if (likely(!rc)) { - tsk->sent_unacked++; + tsk->snt_unacked += tsk_inc(tsk, send + hlen); sent += send; if (sent == dsz) return dsz; @@ -1125,6 +1151,13 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port, sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv); tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); + tsk->peer_caps = tipc_node_get_capabilities(net, peer_node); + if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) + return; + + /* Fall back to message based flow control */ + tsk->rcv_win = FLOWCTL_MSG_WIN; + tsk->snd_win = FLOWCTL_MSG_WIN; } /** @@ -1221,7 +1254,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg, return 0; } -static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) +static void tipc_sk_send_ack(struct tipc_sock *tsk) { struct net *net = sock_net(&tsk->sk); struct sk_buff *skb = NULL; @@ -1237,7 +1270,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) if (!skb) return; msg = buf_msg(skb); - msg_set_msgcnt(msg, ack); + msg_set_conn_ack(msg, tsk->rcv_unacked); + tsk->rcv_unacked = 0; + + /* Adjust to and advertize the correct window limit */ + if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) { + tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf); + msg_set_adv_win(msg, tsk->rcv_win); + } tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); } @@ -1295,7 +1335,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len, long timeo; unsigned int sz; u32 err; - int res; + int res, hlen; /* Catch invalid receive requests */ if (unlikely(!buf_len)) @@ -1320,6 +1360,7 @@ restart: buf = skb_peek(&sk->sk_receive_queue); msg = buf_msg(buf); sz = msg_data_sz(msg); + hlen = msg_hdr_sz(msg); err = msg_errcode(msg); /* Discard an empty non-errored message & try again */ @@ -1342,7 +1383,7 @@ restart: sz = buf_len; m->msg_flags |= MSG_TRUNC; } - res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz); + res = skb_copy_datagram_msg(buf, hlen, m, sz); if (res) goto exit; res = sz; @@ -1354,15 +1395,15 @@ restart: res = -ECONNRESET; } - /* Consume received message (optional) */ - if (likely(!(flags & MSG_PEEK))) { - if ((sock->state != SS_READY) && - (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { - tipc_sk_send_ack(tsk, tsk->rcv_unacked); - tsk->rcv_unacked = 0; - } - tsk_advance_rx_queue(sk); + if (unlikely(flags & MSG_PEEK)) + goto exit; + + if (likely(sock->state != SS_READY)) { + tsk->rcv_unacked += tsk_inc(tsk, hlen + sz); + if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4))) + tipc_sk_send_ack(tsk); } + tsk_advance_rx_queue(sk); exit: release_sock(sk); return res; @@ -1391,7 +1432,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m, int sz_to_copy, target, needed; int sz_copied = 0; u32 err; - int res = 0; + int res = 0, hlen; /* Catch invalid receive attempts */ if (unlikely(!buf_len)) @@ -1417,6 +1458,7 @@ restart: buf = skb_peek(&sk->sk_receive_queue); msg = buf_msg(buf); sz = msg_data_sz(msg); + hlen = msg_hdr_sz(msg); err = msg_errcode(msg); /* Discard an empty non-errored message & try again */ @@ -1441,8 +1483,7 @@ restart: needed = (buf_len - sz_copied); sz_to_copy = (sz <= needed) ? sz : needed; - res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset, - m, sz_to_copy); + res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy); if (res) goto exit; @@ -1464,20 +1505,18 @@ restart: res = -ECONNRESET; } - /* Consume received message (optional) */ - if (likely(!(flags & MSG_PEEK))) { - if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { - tipc_sk_send_ack(tsk, tsk->rcv_unacked); - tsk->rcv_unacked = 0; - } - tsk_advance_rx_queue(sk); - } + if (unlikely(flags & MSG_PEEK)) + goto exit; + + tsk->rcv_unacked += tsk_inc(tsk, hlen + sz); + if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4))) + tipc_sk_send_ack(tsk); + tsk_advance_rx_queue(sk); /* Loop around if more data is required */ if ((sz_copied < buf_len) && /* didn't get all requested data */ (!skb_queue_empty(&sk->sk_receive_queue) || (sz_copied < target)) && /* and more is ready or required */ - (!(flags & MSG_PEEK)) && /* and aren't just peeking at data */ (!err)) /* and haven't reached a FIN */ goto restart; @@ -1609,30 +1648,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) /** * rcvbuf_limit - get proper overload limit of socket receive queue * @sk: socket - * @buf: message + * @skb: message * - * For all connection oriented messages, irrespective of importance, - * the default overload value (i.e. 67MB) is set as limit. + * For connection oriented messages, irrespective of importance, + * default queue limit is 2 MB. * - * For all connectionless messages, by default new queue limits are - * as belows: + * For connectionless messages, queue limits are based on message + * importance as follows: * - * TIPC_LOW_IMPORTANCE (4 MB) - * TIPC_MEDIUM_IMPORTANCE (8 MB) - * TIPC_HIGH_IMPORTANCE (16 MB) - * TIPC_CRITICAL_IMPORTANCE (32 MB) + * TIPC_LOW_IMPORTANCE (2 MB) + * TIPC_MEDIUM_IMPORTANCE (4 MB) + * TIPC_HIGH_IMPORTANCE (8 MB) + * TIPC_CRITICAL_IMPORTANCE (16 MB) * * Returns overload limit according to corresponding message importance */ -static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) +static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb) { - struct tipc_msg *msg = buf_msg(buf); + struct tipc_sock *tsk = tipc_sk(sk); + struct tipc_msg *hdr = buf_msg(skb); - if (msg_connected(msg)) - return sysctl_tipc_rmem[2]; + if (unlikely(!msg_connected(hdr))) + return sk->sk_rcvbuf << msg_importance(hdr); - return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << - msg_importance(msg); + if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL)) + return sk->sk_rcvbuf; + + return FLOWCTL_MSG_LIM; } /** @@ -1647,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) * * Returns true if message was added to socket receive queue, otherwise false */ -static bool filter_rcv(struct sock *sk, struct sk_buff *skb) +static bool filter_rcv(struct sock *sk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { struct socket *sock = sk->sk_socket; struct tipc_sock *tsk = tipc_sk(sk); @@ -1657,7 +1700,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb) int usr = msg_user(hdr); if (unlikely(msg_user(hdr) == CONN_MANAGER)) { - tipc_sk_proto_rcv(tsk, skb); + tipc_sk_proto_rcv(tsk, skb, xmitq); return false; } @@ -1700,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb) return true; reject: - tipc_sk_respond(sk, skb, err); + if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err)) + __skb_queue_tail(xmitq, skb); return false; } @@ -1716,9 +1760,24 @@ reject: static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) { unsigned int truesize = skb->truesize; + struct sk_buff_head xmitq; + u32 dnode, selector; + + __skb_queue_head_init(&xmitq); - if (likely(filter_rcv(sk, skb))) + if (likely(filter_rcv(sk, skb, &xmitq))) { atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); + return 0; + } + + if (skb_queue_empty(&xmitq)) + return 0; + + /* Send response/rejected message */ + skb = __skb_dequeue(&xmitq); + dnode = msg_destnode(buf_msg(skb)); + selector = msg_origport(buf_msg(skb)); + tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); return 0; } @@ -1732,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) * Caller must hold socket lock */ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, - u32 dport) + u32 dport, struct sk_buff_head *xmitq) { + unsigned long time_limit = jiffies + 2; + struct sk_buff *skb; unsigned int lim; atomic_t *dcnt; - struct sk_buff *skb; - unsigned long time_limit = jiffies + 2; + u32 onode; while (skb_queue_len(inputq)) { if (unlikely(time_after_eq(jiffies, time_limit))) @@ -1749,20 +1809,22 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, /* Add message directly to receive queue if possible */ if (!sock_owned_by_user(sk)) { - filter_rcv(sk, skb); + filter_rcv(sk, skb, xmitq); continue; } /* Try backlog, compensating for double-counted bytes */ dcnt = &tipc_sk(sk)->dupl_rcvcnt; - if (sk->sk_backlog.len) + if (!sk->sk_backlog.len) atomic_set(dcnt, 0); lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); if (likely(!sk_add_backlog(sk, skb, lim))) continue; /* Overload => reject message back to sender */ - tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); + onode = tipc_own_addr(sock_net(sk)); + if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) + __skb_queue_tail(xmitq, skb); break; } } @@ -1775,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, */ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) { + struct sk_buff_head xmitq; u32 dnode, dport = 0; int err; struct tipc_sock *tsk; struct sock *sk; struct sk_buff *skb; + __skb_queue_head_init(&xmitq); while (skb_queue_len(inputq)) { dport = tipc_skb_peek_port(inputq, dport); tsk = tipc_sk_lookup(net, dport); @@ -1788,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) if (likely(tsk)) { sk = &tsk->sk; if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { - tipc_sk_enqueue(inputq, sk, dport); + tipc_sk_enqueue(inputq, sk, dport, &xmitq); spin_unlock_bh(&sk->sk_lock.slock); } + /* Send pending response/rejected messages, if any */ + while ((skb = __skb_dequeue(&xmitq))) { + dnode = msg_destnode(buf_msg(skb)); + tipc_node_xmit_skb(net, skb, dnode, dport); + } sock_put(sk); continue; } @@ -2814,6 +2883,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb) if (err) return err; + if (!attrs[TIPC_NLA_SOCK]) + return -EINVAL; + err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX, attrs[TIPC_NLA_SOCK], tipc_nl_sock_policy); |