From d9ba131d8f58c0d2ff5029e7002ab43f913b36f9 Mon Sep 17 00:00:00 2001
From: Trond Myklebust <Trond.Myklebust@netapp.com>
Date: Sun, 17 Jul 2011 18:11:30 -0400
Subject: SUNRPC: Support dynamic slot allocation for TCP connections

Allow the number of available slots to grow with the TCP window size.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
---
 include/linux/sunrpc/xprt.h     |  9 ++++--
 net/sunrpc/xprt.c               | 70 ++++++++++++++++++++++++++++++++++-------
 net/sunrpc/xprtrdma/transport.c |  1 +
 net/sunrpc/xprtsock.c           | 49 ++++++++++++++++++++++++-----
 4 files changed, 109 insertions(+), 20 deletions(-)

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 05047250f218..d02762d1de27 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -22,6 +22,7 @@
 #define RPC_MIN_SLOT_TABLE	(2U)
 #define RPC_DEF_SLOT_TABLE	(16U)
 #define RPC_MAX_SLOT_TABLE	(128U)
+#define RPC_MAX_SLOT_TABLE_LIMIT	(65536U)
 
 /*
  * This describes a timeout strategy
@@ -168,7 +169,9 @@ struct rpc_xprt {
 	struct rpc_wait_queue	pending;	/* requests in flight */
 	struct rpc_wait_queue	backlog;	/* waiting for slot */
 	struct list_head	free;		/* free slots */
-	unsigned int		max_reqs;	/* total slots */
+	unsigned int		max_reqs;	/* max number of slots */
+	unsigned int		min_reqs;	/* min number of slots */
+	atomic_t		num_reqs;	/* total slots */
 	unsigned long		state;		/* transport state */
 	unsigned char		shutdown   : 1,	/* being shut down */
 				resvport   : 1; /* use a reserved port */
@@ -281,7 +284,9 @@ void			xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
 void			xprt_release(struct rpc_task *task);
 struct rpc_xprt *	xprt_get(struct rpc_xprt *xprt);
 void			xprt_put(struct rpc_xprt *xprt);
-struct rpc_xprt *	xprt_alloc(struct net *net, int size, int max_req);
+struct rpc_xprt *	xprt_alloc(struct net *net, size_t size,
+				unsigned int num_prealloc,
+				unsigned int max_req);
 void			xprt_free(struct rpc_xprt *);
 
 static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index ea7b3c16cddd..be85cf04a479 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -935,25 +935,66 @@ void xprt_transmit(struct rpc_task *task)
 	spin_unlock_bh(&xprt->transport_lock);
 }
 
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+	struct rpc_rqst *req = ERR_PTR(-EAGAIN);
+
+	if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
+		goto out;
+	req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+	if (req != NULL)
+		goto out;
+	atomic_dec(&xprt->num_reqs);
+	req = ERR_PTR(-ENOMEM);
+out:
+	return req;
+}
+
+static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+	if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
+		kfree(req);
+		return true;
+	}
+	return false;
+}
+
 static void xprt_alloc_slot(struct rpc_task *task)
 {
 	struct rpc_xprt	*xprt = task->tk_xprt;
+	struct rpc_rqst *req;
 
-	task->tk_status = 0;
 	if (!list_empty(&xprt->free)) {
-		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
-		list_del_init(&req->rq_list);
-		task->tk_rqstp = req;
-		xprt_request_init(task, xprt);
-		return;
+		req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+		list_del(&req->rq_list);
+		goto out_init_req;
+	}
+	req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
+	if (!IS_ERR(req))
+		goto out_init_req;
+	switch (PTR_ERR(req)) {
+	case -ENOMEM:
+		rpc_delay(task, HZ >> 2);
+		dprintk("RPC:       dynamic allocation of request slot "
+				"failed! Retrying\n");
+		break;
+	case -EAGAIN:
+		rpc_sleep_on(&xprt->backlog, task, NULL);
+		dprintk("RPC:       waiting for request slot\n");
 	}
-	dprintk("RPC:       waiting for request slot\n");
 	task->tk_status = -EAGAIN;
-	rpc_sleep_on(&xprt->backlog, task, NULL);
+	return;
+out_init_req:
+	task->tk_status = 0;
+	task->tk_rqstp = req;
+	xprt_request_init(task, xprt);
 }
 
 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
+	if (xprt_dynamic_free_slot(xprt, req))
+		return;
+
 	memset(req, 0, sizeof(*req));	/* mark unused */
 
 	spin_lock(&xprt->reserve_lock);
@@ -972,7 +1013,9 @@ static void xprt_free_all_slots(struct rpc_xprt *xprt)
 	}
 }
 
-struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
+struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
+		unsigned int num_prealloc,
+		unsigned int max_alloc)
 {
 	struct rpc_xprt *xprt;
 	struct rpc_rqst *req;
@@ -992,7 +1035,12 @@ struct rpc_xprt *xprt_alloc(struct net *net, int size, int num_prealloc)
 	}
 	if (i < num_prealloc)
 		goto out_free;
-	xprt->max_reqs = num_prealloc;
+	if (max_alloc > num_prealloc)
+		xprt->max_reqs = max_alloc;
+	else
+		xprt->max_reqs = num_prealloc;
+	xprt->min_reqs = num_prealloc;
+	atomic_set(&xprt->num_reqs, num_prealloc);
 
 	return xprt;
 
@@ -1036,7 +1084,6 @@ void xprt_reserve(struct rpc_task *task)
 	if (!xprt_lock_write(xprt, task))
 		return;
 
-	task->tk_status = -EIO;
 	spin_lock(&xprt->reserve_lock);
 	xprt_alloc_slot(task);
 	spin_unlock(&xprt->reserve_lock);
@@ -1057,6 +1104,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 {
 	struct rpc_rqst	*req = task->tk_rqstp;
 
+	INIT_LIST_HEAD(&req->rq_list);
 	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
 	req->rq_task	= task;
 	req->rq_xprt    = xprt;
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 674a49224450..b446e100286f 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -283,6 +283,7 @@ xprt_setup_rdma(struct xprt_create *args)
 	}
 
 	xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
+			xprt_rdma_slot_table_entries,
 			xprt_rdma_slot_table_entries);
 	if (xprt == NULL) {
 		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index adaa54c6a09a..d7f97ef26590 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -54,7 +54,8 @@ static void xs_close(struct rpc_xprt *xprt);
  * xprtsock tunables
  */
 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
-unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
+unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
+unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
 
 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
@@ -75,6 +76,7 @@ static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
 
 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
+static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
 
@@ -103,6 +105,15 @@ static ctl_table xs_tunables_table[] = {
 		.extra1		= &min_slot_table_size,
 		.extra2		= &max_slot_table_size
 	},
+	{
+		.procname	= "tcp_max_slot_table_entries",
+		.data		= &xprt_max_tcp_slot_table_entries,
+		.maxlen		= sizeof(unsigned int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec_minmax,
+		.extra1		= &min_slot_table_size,
+		.extra2		= &max_tcp_slot_table_limit
+	},
 	{
 		.procname	= "min_resvport",
 		.data		= &xprt_min_resvport,
@@ -2491,7 +2502,8 @@ static int xs_init_anyaddr(const int family, struct sockaddr *sap)
 }
 
 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
-				      unsigned int slot_table_size)
+				      unsigned int slot_table_size,
+				      unsigned int max_slot_table_size)
 {
 	struct rpc_xprt *xprt;
 	struct sock_xprt *new;
@@ -2501,7 +2513,8 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
 		return ERR_PTR(-EBADF);
 	}
 
-	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size);
+	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
+			max_slot_table_size);
 	if (xprt == NULL) {
 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
 				"rpc_xprt\n");
@@ -2543,7 +2556,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
 	struct rpc_xprt *xprt;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			xprt_max_tcp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2607,7 +2621,8 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
 	struct sock_xprt *transport;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
+			xprt_udp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2683,7 +2698,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
 	struct sock_xprt *transport;
 	struct rpc_xprt *ret;
 
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			xprt_max_tcp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2762,7 +2778,8 @@ static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
 		 */
 		 return args->bc_xprt->xpt_bc_xprt;
 	}
-	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
+			xprt_tcp_slot_table_entries);
 	if (IS_ERR(xprt))
 		return xprt;
 	transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2949,8 +2966,26 @@ static struct kernel_param_ops param_ops_slot_table_size = {
 #define param_check_slot_table_size(name, p) \
 	__param_check(name, p, unsigned int);
 
+static int param_set_max_slot_table_size(const char *val,
+				     const struct kernel_param *kp)
+{
+	return param_set_uint_minmax(val, kp,
+			RPC_MIN_SLOT_TABLE,
+			RPC_MAX_SLOT_TABLE_LIMIT);
+}
+
+static struct kernel_param_ops param_ops_max_slot_table_size = {
+	.set = param_set_max_slot_table_size,
+	.get = param_get_uint,
+};
+
+#define param_check_max_slot_table_size(name, p) \
+	__param_check(name, p, unsigned int);
+
 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
 		   slot_table_size, 0644);
+module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
+		   max_slot_table_size, 0644);
 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
 		   slot_table_size, 0644);
 
-- 
cgit v1.2.3