summaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-16 16:54:24 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-20 20:41:15 -0700
commitaf9b028e270fda6fb812d70d17d902297df1ceb5 (patch)
tree1a204c6d10d597d5db18908dc2066e980a78120d /net/tipc/link.c
parent22d85c79428b8ca9a01623aa3e3a1fe29a30a119 (diff)
downloadlinux-af9b028e270fda6fb812d70d17d902297df1ceb5.tar.bz2
tipc: make media xmit call outside node spinlock context
Currently, message sending is performed through a deep call chain, where the node spinlock is grabbed and held during a significant part of the transmission time. This is clearly detrimental to overall throughput performance; it would be better if we could send the message after the spinlock has been released. In this commit, we do instead let the call revert on the stack after the buffer chain has been added to the transmission queue, whereafter clones of the buffers are transmitted to the device layer outside the spinlock scope. As a further step in our effort to separate the roles of the node and link entities we also move the function tipc_link_xmit() to node.c, and rename it to tipc_node_xmit(). Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c132
1 files changed, 72 insertions, 60 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ea32679b6737..c052437a7cfa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -353,7 +353,6 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
/* This really cannot happen... */
if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
- tipc_link_reset(link);
return -ENOBUFS;
}
/* Non-blocking sender: */
@@ -701,6 +700,78 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
return 0;
}
+/**
+ * tipc_link_xmit(): enqueue buffer list according to queue situation
+ * @link: link to use
+ * @list: chain of buffers containing message
+ * @xmitq: returned list of packets to be sent by caller
+ *
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
+ */
+int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
+ struct sk_buff_head *xmitq)
+{
+ struct tipc_msg *hdr = buf_msg(skb_peek(list));
+ unsigned int maxwin = l->window;
+ unsigned int i, imp = msg_importance(hdr);
+ unsigned int mtu = l->mtu;
+ u16 ack = l->rcv_nxt - 1;
+ u16 seqno = l->snd_nxt;
+ u16 bc_last_in = l->owner->bclink.last_in;
+ struct sk_buff_head *transmq = &l->transmq;
+ struct sk_buff_head *backlogq = &l->backlogq;
+ struct sk_buff *skb, *_skb, *bskb;
+
+ /* Match msg importance against this and all higher backlog limits: */
+ for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+ if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+ return link_schedule_user(l, list);
+ }
+ if (unlikely(msg_size(hdr) > mtu))
+ return -EMSGSIZE;
+
+ /* Prepare each packet for sending, and add to relevant queue: */
+ while (skb_queue_len(list)) {
+ skb = skb_peek(list);
+ hdr = buf_msg(skb);
+ msg_set_seqno(hdr, seqno);
+ msg_set_ack(hdr, ack);
+ msg_set_bcast_ack(hdr, bc_last_in);
+
+ if (likely(skb_queue_len(transmq) < maxwin)) {
+ _skb = skb_clone(skb, GFP_ATOMIC);
+ if (!_skb)
+ return -ENOBUFS;
+ __skb_dequeue(list);
+ __skb_queue_tail(transmq, skb);
+ __skb_queue_tail(xmitq, _skb);
+ l->rcv_unacked = 0;
+ seqno++;
+ continue;
+ }
+ if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+ kfree_skb(__skb_dequeue(list));
+ l->stats.sent_bundled++;
+ continue;
+ }
+ if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+ kfree_skb(__skb_dequeue(list));
+ __skb_queue_tail(backlogq, bskb);
+ l->backlog[msg_importance(buf_msg(bskb))].len++;
+ l->stats.sent_bundled++;
+ l->stats.sent_bundles++;
+ continue;
+ }
+ l->backlog[imp].len += skb_queue_len(list);
+ skb_queue_splice_tail_init(list, backlogq);
+ }
+ l->snd_nxt = seqno;
+ return 0;
+}
+
static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
{
skb_queue_head_init(list);
@@ -715,65 +786,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
return __tipc_link_xmit(link->owner->net, link, &head);
}
-/* tipc_link_xmit_skb(): send single buffer to destination
- * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not cause link congestion
- * The only exception is datagram messages rerouted after secondary
- * lookup, which are rare and safe to dispose of anyway.
- * TODO: Return real return value, and let callers use
- * tipc_wait_for_sendpkt() where applicable
- */
-int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
- u32 selector)
-{
- struct sk_buff_head head;
- int rc;
-
- skb2list(skb, &head);
- rc = tipc_link_xmit(net, &head, dnode, selector);
- if (rc)
- kfree_skb(skb);
- return 0;
-}
-
-/**
- * tipc_link_xmit() is the general link level function for message sending
- * @net: the applicable net namespace
- * @list: chain of buffers containing message
- * @dsz: amount of user data to be sent
- * @dnode: address of destination node
- * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning error
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
- */
-int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
- u32 selector)
-{
- struct tipc_link *link = NULL;
- struct tipc_node *node;
- int rc = -EHOSTUNREACH;
-
- node = tipc_node_find(net, dnode);
- if (node) {
- tipc_node_lock(node);
- link = node_active_link(node, selector & 1);
- if (link)
- rc = __tipc_link_xmit(net, link, list);
- tipc_node_unlock(node);
- tipc_node_put(node);
- }
- if (link)
- return rc;
-
- if (likely(in_own_node(net, dnode))) {
- tipc_sk_rcv(net, list);
- return 0;
- }
-
- __skb_queue_purge(list);
- return rc;
-}
-
/*
* tipc_link_sync_xmit - synchronize broadcast link endpoints.
*