diff options
author | Trond Myklebust <Trond.Myklebust@netapp.com> | 2009-06-17 17:59:58 -0700 |
---|---|---|
committer | Trond Myklebust <Trond.Myklebust@netapp.com> | 2009-06-17 17:59:58 -0700 |
commit | 301933a0acfdec837fd8b4884093b3f0fff01d8a (patch) | |
tree | 1f2412a30d710493179b1b3743cf30302872df15 /net | |
parent | 3fe0344faf7fdcb158bd5c1a9aec960a8d70c8e8 (diff) | |
parent | 68f3f90133d56e0c38f04f991e662c2b21592b31 (diff) | |
download | linux-301933a0acfdec837fd8b4884093b3f0fff01d8a.tar.bz2 |
Merge commit 'linux-pnfs/nfs41-for-2.6.31' into nfsv41-for-2.6.31
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/Makefile | 1 | ||||
-rw-r--r-- | net/sunrpc/backchannel_rqst.c | 278 | ||||
-rw-r--r-- | net/sunrpc/bc_svc.c | 81 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 143 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 2 | ||||
-rw-r--r-- | net/sunrpc/stats.c | 8 | ||||
-rw-r--r-- | net/sunrpc/sunrpc.h | 37 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 134 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 39 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 60 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 216 |
11 files changed, 924 insertions, 75 deletions
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index 5369aa369b35..db73fd2a3f0e 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -13,5 +13,6 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ rpcb_clnt.o timer.o xdr.o \ sunrpc_syms.o cache.o rpc_pipe.o \ svc_xprt.o +sunrpc-$(CONFIG_NFS_V4_1) += backchannel_rqst.o bc_svc.o sunrpc-$(CONFIG_PROC_FS) += stats.o sunrpc-$(CONFIG_SYSCTL) += sysctl.o diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c new file mode 100644 index 000000000000..5a7d342e3087 --- /dev/null +++ b/net/sunrpc/backchannel_rqst.c @@ -0,0 +1,278 @@ +/****************************************************************************** + +(c) 2007 Network Appliance, Inc. All Rights Reserved. +(c) 2009 NetApp. All Rights Reserved. + +NetApp provides this source code under the GPL v2 License. +The GPL v2 license is available at +http://opensource.org/licenses/gpl-license.php. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +******************************************************************************/ + +#include <linux/tcp.h> +#include <linux/sunrpc/xprt.h> + +#ifdef RPC_DEBUG +#define RPCDBG_FACILITY RPCDBG_TRANS +#endif + +#if defined(CONFIG_NFS_V4_1) + +/* + * Helper routines that track the number of preallocation elements + * on the transport. + */ +static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) +{ + return xprt->bc_alloc_count > 0; +} + +static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n) +{ + xprt->bc_alloc_count += n; +} + +static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n) +{ + return xprt->bc_alloc_count -= n; +} + +/* + * Free the preallocated rpc_rqst structure and the memory + * buffers hanging off of it. + */ +static void xprt_free_allocation(struct rpc_rqst *req) +{ + struct xdr_buf *xbufp; + + dprintk("RPC: free allocations for req= %p\n", req); + BUG_ON(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); + xbufp = &req->rq_private_buf; + free_page((unsigned long)xbufp->head[0].iov_base); + xbufp = &req->rq_snd_buf; + free_page((unsigned long)xbufp->head[0].iov_base); + list_del(&req->rq_bc_pa_list); + kfree(req); +} + +/* + * Preallocate up to min_reqs structures and related buffers for use + * by the backchannel. This function can be called multiple times + * when creating new sessions that use the same rpc_xprt. The + * preallocated buffers are added to the pool of resources used by + * the rpc_xprt. Anyone of these resources may be used used by an + * incoming callback request. It's up to the higher levels in the + * stack to enforce that the maximum number of session slots is not + * being exceeded. + * + * Some callback arguments can be large. For example, a pNFS server + * using multiple deviceids. The list can be unbound, but the client + * has the ability to tell the server the maximum size of the callback + * requests. Each deviceID is 16 bytes, so allocate one page + * for the arguments to have enough room to receive a number of these + * deviceIDs. The NFS client indicates to the pNFS server that its + * callback requests can be up to 4096 bytes in size. + */ +int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs) +{ + struct page *page_rcv = NULL, *page_snd = NULL; + struct xdr_buf *xbufp = NULL; + struct rpc_rqst *req, *tmp; + struct list_head tmp_list; + int i; + + dprintk("RPC: setup backchannel transport\n"); + + /* + * We use a temporary list to keep track of the preallocated + * buffers. Once we're done building the list we splice it + * into the backchannel preallocation list off of the rpc_xprt + * struct. This helps minimize the amount of time the list + * lock is held on the rpc_xprt struct. It also makes cleanup + * easier in case of memory allocation errors. + */ + INIT_LIST_HEAD(&tmp_list); + for (i = 0; i < min_reqs; i++) { + /* Pre-allocate one backchannel rpc_rqst */ + req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); + if (req == NULL) { + printk(KERN_ERR "Failed to create bc rpc_rqst\n"); + goto out_free; + } + + /* Add the allocated buffer to the tmp list */ + dprintk("RPC: adding req= %p\n", req); + list_add(&req->rq_bc_pa_list, &tmp_list); + + req->rq_xprt = xprt; + INIT_LIST_HEAD(&req->rq_list); + INIT_LIST_HEAD(&req->rq_bc_list); + + /* Preallocate one XDR receive buffer */ + page_rcv = alloc_page(GFP_KERNEL); + if (page_rcv == NULL) { + printk(KERN_ERR "Failed to create bc receive xbuf\n"); + goto out_free; + } + xbufp = &req->rq_rcv_buf; + xbufp->head[0].iov_base = page_address(page_rcv); + xbufp->head[0].iov_len = PAGE_SIZE; + xbufp->tail[0].iov_base = NULL; + xbufp->tail[0].iov_len = 0; + xbufp->page_len = 0; + xbufp->len = PAGE_SIZE; + xbufp->buflen = PAGE_SIZE; + + /* Preallocate one XDR send buffer */ + page_snd = alloc_page(GFP_KERNEL); + if (page_snd == NULL) { + printk(KERN_ERR "Failed to create bc snd xbuf\n"); + goto out_free; + } + + xbufp = &req->rq_snd_buf; + xbufp->head[0].iov_base = page_address(page_snd); + xbufp->head[0].iov_len = 0; + xbufp->tail[0].iov_base = NULL; + xbufp->tail[0].iov_len = 0; + xbufp->page_len = 0; + xbufp->len = 0; + xbufp->buflen = PAGE_SIZE; + } + + /* + * Add the temporary list to the backchannel preallocation list + */ + spin_lock_bh(&xprt->bc_pa_lock); + list_splice(&tmp_list, &xprt->bc_pa_list); + xprt_inc_alloc_count(xprt, min_reqs); + spin_unlock_bh(&xprt->bc_pa_lock); + + dprintk("RPC: setup backchannel transport done\n"); + return 0; + +out_free: + /* + * Memory allocation failed, free the temporary list + */ + list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) + xprt_free_allocation(req); + + dprintk("RPC: setup backchannel transport failed\n"); + return -1; +} +EXPORT_SYMBOL(xprt_setup_backchannel); + +/* + * Destroys the backchannel preallocated structures. + * Since these structures may have been allocated by multiple calls + * to xprt_setup_backchannel, we only destroy up to the maximum number + * of reqs specified by the caller. + * @xprt: the transport holding the preallocated strucures + * @max_reqs the maximum number of preallocated structures to destroy + */ +void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs) +{ + struct rpc_rqst *req = NULL, *tmp = NULL; + + dprintk("RPC: destroy backchannel transport\n"); + + BUG_ON(max_reqs == 0); + spin_lock_bh(&xprt->bc_pa_lock); + xprt_dec_alloc_count(xprt, max_reqs); + list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { + dprintk("RPC: req=%p\n", req); + xprt_free_allocation(req); + if (--max_reqs == 0) + break; + } + spin_unlock_bh(&xprt->bc_pa_lock); + + dprintk("RPC: backchannel list empty= %s\n", + list_empty(&xprt->bc_pa_list) ? "true" : "false"); +} +EXPORT_SYMBOL(xprt_destroy_backchannel); + +/* + * One or more rpc_rqst structure have been preallocated during the + * backchannel setup. Buffer space for the send and private XDR buffers + * has been preallocated as well. Use xprt_alloc_bc_request to allocate + * to this request. Use xprt_free_bc_request to return it. + * + * Return an available rpc_rqst, otherwise NULL if non are available. + */ +struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt) +{ + struct rpc_rqst *req; + + dprintk("RPC: allocate a backchannel request\n"); + spin_lock_bh(&xprt->bc_pa_lock); + if (!list_empty(&xprt->bc_pa_list)) { + req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, + rq_bc_pa_list); + list_del(&req->rq_bc_pa_list); + } else { + req = NULL; + } + spin_unlock_bh(&xprt->bc_pa_lock); + + if (req != NULL) { + set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); + req->rq_reply_bytes_recvd = 0; + req->rq_bytes_sent = 0; + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, + sizeof(req->rq_private_buf)); + } + dprintk("RPC: backchannel req=%p\n", req); + return req; +} + +/* + * Return the preallocated rpc_rqst structure and XDR buffers + * associated with this rpc_task. + */ +void xprt_free_bc_request(struct rpc_rqst *req) +{ + struct rpc_xprt *xprt = req->rq_xprt; + + dprintk("RPC: free backchannel req=%p\n", req); + + smp_mb__before_clear_bit(); + BUG_ON(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state)); + clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state); + smp_mb__after_clear_bit(); + + if (!xprt_need_to_requeue(xprt)) { + /* + * The last remaining session was destroyed while this + * entry was in use. Free the entry and don't attempt + * to add back to the list because there is no need to + * have anymore preallocated entries. + */ + dprintk("RPC: Last session removed req=%p\n", req); + xprt_free_allocation(req); + return; + } + + /* + * Return it to the list of preallocations so that it + * may be reused by a new callback request. + */ + spin_lock_bh(&xprt->bc_pa_lock); + list_add(&req->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); +} + +#endif /* CONFIG_NFS_V4_1 */ diff --git a/net/sunrpc/bc_svc.c b/net/sunrpc/bc_svc.c new file mode 100644 index 000000000000..13f214f53120 --- /dev/null +++ b/net/sunrpc/bc_svc.c @@ -0,0 +1,81 @@ +/****************************************************************************** + +(c) 2007 Network Appliance, Inc. All Rights Reserved. +(c) 2009 NetApp. All Rights Reserved. + +NetApp provides this source code under the GPL v2 License. +The GPL v2 license is available at +http://opensource.org/licenses/gpl-license.php. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +******************************************************************************/ + +/* + * The NFSv4.1 callback service helper routines. + * They implement the transport level processing required to send the + * reply over an existing open connection previously established by the client. + */ + +#if defined(CONFIG_NFS_V4_1) + +#include <linux/module.h> + +#include <linux/sunrpc/xprt.h> +#include <linux/sunrpc/sched.h> +#include <linux/sunrpc/bc_xprt.h> + +#define RPCDBG_FACILITY RPCDBG_SVCDSP + +void bc_release_request(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + dprintk("RPC: bc_release_request: task= %p\n", task); + + /* + * Release this request only if it's a backchannel + * preallocated request + */ + if (!bc_prealloc(req)) + return; + xprt_free_bc_request(req); +} + +/* Empty callback ops */ +static const struct rpc_call_ops nfs41_callback_ops = { +}; + + +/* + * Send the callback reply + */ +int bc_send(struct rpc_rqst *req) +{ + struct rpc_task *task; + int ret; + + dprintk("RPC: bc_send req= %p\n", req); + task = rpc_run_bc_task(req, &nfs41_callback_ops); + if (IS_ERR(task)) + ret = PTR_ERR(task); + else { + BUG_ON(atomic_read(&task->tk_count) != 1); + ret = task->tk_status; + rpc_put_task(task); + } + return ret; + dprintk("RPC: bc_send ret= %d \n", ret); +} + +#endif /* CONFIG_NFS_V4_1 */ diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 5abab094441f..5bc2f45bddf0 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -36,7 +36,9 @@ #include <linux/sunrpc/clnt.h> #include <linux/sunrpc/rpc_pipe_fs.h> #include <linux/sunrpc/metrics.h> +#include <linux/sunrpc/bc_xprt.h> +#include "sunrpc.h" #ifdef RPC_DEBUG # define RPCDBG_FACILITY RPCDBG_CALL @@ -63,6 +65,9 @@ static void call_decode(struct rpc_task *task); static void call_bind(struct rpc_task *task); static void call_bind_status(struct rpc_task *task); static void call_transmit(struct rpc_task *task); +#if defined(CONFIG_NFS_V4_1) +static void call_bc_transmit(struct rpc_task *task); +#endif /* CONFIG_NFS_V4_1 */ static void call_status(struct rpc_task *task); static void call_transmit_status(struct rpc_task *task); static void call_refresh(struct rpc_task *task); @@ -613,6 +618,50 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags, } EXPORT_SYMBOL_GPL(rpc_call_async); +#if defined(CONFIG_NFS_V4_1) +/** + * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run + * rpc_execute against it + * @ops: RPC call ops + */ +struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req, + const struct rpc_call_ops *tk_ops) +{ + struct rpc_task *task; + struct xdr_buf *xbufp = &req->rq_snd_buf; + struct rpc_task_setup task_setup_data = { + .callback_ops = tk_ops, + }; + + dprintk("RPC: rpc_run_bc_task req= %p\n", req); + /* + * Create an rpc_task to send the data + */ + task = rpc_new_task(&task_setup_data); + if (!task) { + xprt_free_bc_request(req); + goto out; + } + task->tk_rqstp = req; + + /* + * Set up the xdr_buf length. + * This also indicates that the buffer is XDR encoded already. + */ + xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + + xbufp->tail[0].iov_len; + + task->tk_action = call_bc_transmit; + atomic_inc(&task->tk_count); + BUG_ON(atomic_read(&task->tk_count) != 2); + rpc_execute(task); + +out: + dprintk("RPC: rpc_run_bc_task: task= %p\n", task); + return task; +} +#endif /* CONFIG_NFS_V4_1 */ + void rpc_call_start(struct rpc_task *task) { @@ -695,6 +744,19 @@ void rpc_force_rebind(struct rpc_clnt *clnt) EXPORT_SYMBOL_GPL(rpc_force_rebind); /* + * Restart an (async) RPC call from the call_prepare state. + * Usually called from within the exit handler. + */ +void +rpc_restart_call_prepare(struct rpc_task *task) +{ + if (RPC_ASSASSINATED(task)) + return; + task->tk_action = rpc_prepare_task; +} +EXPORT_SYMBOL_GPL(rpc_restart_call_prepare); + +/* * Restart an (async) RPC call. Usually called from within the * exit handler. */ @@ -1085,7 +1147,7 @@ call_transmit(struct rpc_task *task) * in order to allow access to the socket to other RPC requests. */ call_transmit_status(task); - if (task->tk_msg.rpc_proc->p_decode != NULL) + if (rpc_reply_expected(task)) return; task->tk_action = rpc_exit_task; rpc_wake_up_queued_task(&task->tk_xprt->pending, task); @@ -1120,6 +1182,72 @@ call_transmit_status(struct rpc_task *task) } } +#if defined(CONFIG_NFS_V4_1) +/* + * 5b. Send the backchannel RPC reply. On error, drop the reply. In + * addition, disconnect on connectivity errors. + */ +static void +call_bc_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + + BUG_ON(task->tk_status != 0); + task->tk_status = xprt_prepare_transmit(task); + if (task->tk_status == -EAGAIN) { + /* + * Could not reserve the transport. Try again after the + * transport is released. + */ + task->tk_status = 0; + task->tk_action = call_bc_transmit; + return; + } + + task->tk_action = rpc_exit_task; + if (task->tk_status < 0) { + printk(KERN_NOTICE "RPC: Could not send backchannel reply " + "error: %d\n", task->tk_status); + return; + } + + xprt_transmit(task); + xprt_end_transmit(task); + dprint_status(task); + switch (task->tk_status) { + case 0: + /* Success */ + break; + case -EHOSTDOWN: + case -EHOSTUNREACH: + case -ENETUNREACH: + case -ETIMEDOUT: + /* + * Problem reaching the server. Disconnect and let the + * forechannel reestablish the connection. The server will + * have to retransmit the backchannel request and we'll + * reprocess it. Since these ops are idempotent, there's no + * need to cache our reply at this time. + */ + printk(KERN_NOTICE "RPC: Could not send backchannel reply " + "error: %d\n", task->tk_status); + xprt_conditional_disconnect(task->tk_xprt, + req->rq_connect_cookie); + break; + default: + /* + * We were unable to reply and will have to drop the + * request. The server should reconnect and retransmit. + */ + BUG_ON(task->tk_status == -EAGAIN); + printk(KERN_NOTICE "RPC: Could not send backchannel reply " + "error: %d\n", task->tk_status); + break; + } + rpc_wake_up_queued_task(&req->rq_xprt->pending, task); +} +#endif /* CONFIG_NFS_V4_1 */ + /* * 6. Sort out the RPC call status */ @@ -1130,8 +1258,8 @@ call_status(struct rpc_task *task) struct rpc_rqst *req = task->tk_rqstp; int status; - if (req->rq_received > 0 && !req->rq_bytes_sent) - task->tk_status = req->rq_received; + if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent) + task->tk_status = req->rq_reply_bytes_recvd; dprint_status(task); @@ -1248,7 +1376,7 @@ call_decode(struct rpc_task *task) /* * Ensure that we see all writes made by xprt_complete_rqst() - * before it changed req->rq_received. + * before it changed req->rq_reply_bytes_recvd. */ smp_rmb(); req->rq_rcv_buf.len = req->rq_private_buf.len; @@ -1289,7 +1417,7 @@ out_retry: task->tk_status = 0; /* Note: rpc_verify_header() may have freed the RPC slot */ if (task->tk_rqstp == req) { - req->rq_received = req->rq_rcv_buf.len = 0; + req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0; if (task->tk_client->cl_discrtry) xprt_conditional_disconnect(task->tk_xprt, req->rq_connect_cookie); @@ -1377,13 +1505,14 @@ rpc_verify_header(struct rpc_task *task) } if ((len -= 3) < 0) goto out_overflow; - p += 1; /* skip XID */ + p += 1; /* skip XID */ if ((n = ntohl(*p++)) != RPC_REPLY) { dprintk("RPC: %5u %s: not an RPC reply: %x\n", - task->tk_pid, __func__, n); + task->tk_pid, __func__, n); goto out_garbage; } + if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) { if (--len < 0) goto out_overflow; diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index ff50a0546865..1102ce1251f7 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -569,7 +569,7 @@ EXPORT_SYMBOL_GPL(rpc_delay); /* * Helper to call task->tk_ops->rpc_call_prepare */ -static void rpc_prepare_task(struct rpc_task *task) +void rpc_prepare_task(struct rpc_task *task) { task->tk_ops->rpc_call_prepare(task, task->tk_calldata); } diff --git a/net/sunrpc/stats.c b/net/sunrpc/stats.c index 1ef6e46d9da2..1b4e6791ecf3 100644 --- a/net/sunrpc/stats.c +++ b/net/sunrpc/stats.c @@ -141,12 +141,14 @@ EXPORT_SYMBOL_GPL(rpc_free_iostats); void rpc_count_iostats(struct rpc_task *task) { struct rpc_rqst *req = task->tk_rqstp; - struct rpc_iostats *stats = task->tk_client->cl_metrics; + struct rpc_iostats *stats; struct rpc_iostats *op_metrics; long rtt, execute, queue; - if (!stats || !req) + if (!task->tk_client || !task->tk_client->cl_metrics || !req) return; + + stats = task->tk_client->cl_metrics; op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx]; op_metrics->om_ops++; @@ -154,7 +156,7 @@ void rpc_count_iostats(struct rpc_task *task) op_metrics->om_timeouts += task->tk_timeouts; op_metrics->om_bytes_sent += task->tk_bytes_sent; - op_metrics->om_bytes_recv += req->rq_received; + op_metrics->om_bytes_recv += req->rq_reply_bytes_recvd; queue = (long)req->rq_xtime - task->tk_start; if (queue < 0) diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h new file mode 100644 index 000000000000..5d9dd742264b --- /dev/null +++ b/net/sunrpc/sunrpc.h @@ -0,0 +1,37 @@ +/****************************************************************************** + +(c) 2008 NetApp. All Rights Reserved. + +NetApp provides this source code under the GPL v2 License. +The GPL v2 license is available at +http://opensource.org/licenses/gpl-license.php. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +******************************************************************************/ + +/* + * Functions and macros used internally by RPC + */ + +#ifndef _NET_SUNRPC_SUNRPC_H +#define _NET_SUNRPC_SUNRPC_H + +static inline int rpc_reply_expected(struct rpc_task *task) +{ + return (task->tk_msg.rpc_proc != NULL) && + (task->tk_msg.rpc_proc->p_decode != NULL); +} + +#endif /* _NET_SUNRPC_SUNRPC_H */ + diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 5ed8931dfe98..952f206ff307 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -25,6 +25,7 @@ #include <linux/sunrpc/stats.h> #include <linux/sunrpc/svcsock.h> #include <linux/sunrpc/clnt.h> +#include <linux/sunrpc/bc_xprt.h> #define RPCDBG_FACILITY RPCDBG_SVCDSP @@ -486,6 +487,10 @@ svc_destroy(struct svc_serv *serv) if (svc_serv_is_pooled(serv)) svc_pool_map_put(); +#if defined(CONFIG_NFS_V4_1) + svc_sock_destroy(serv->bc_xprt); +#endif /* CONFIG_NFS_V4_1 */ + svc_unregister(serv); kfree(serv->sv_pools); kfree(serv); @@ -970,20 +975,18 @@ svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) } /* - * Process the RPC request. + * Common routine for processing the RPC request. */ -int -svc_process(struct svc_rqst *rqstp) +static int +svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv) { struct svc_program *progp; struct svc_version *versp = NULL; /* compiler food */ struct svc_procedure *procp = NULL; - struct kvec * argv = &rqstp->rq_arg.head[0]; - struct kvec * resv = &rqstp->rq_res.head[0]; struct svc_serv *serv = rqstp->rq_server; kxdrproc_t xdr; __be32 *statp; - u32 dir, prog, vers, proc; + u32 prog, vers, proc; __be32 auth_stat, rpc_stat; int auth_res; __be32 *reply_statp; @@ -993,19 +996,6 @@ svc_process(struct svc_rqst *rqstp) if (argv->iov_len < 6*4) goto err_short_len; - /* setup response xdr_buf. - * Initially it has just one page - */ - rqstp->rq_resused = 1; - resv->iov_base = page_address(rqstp->rq_respages[0]); - resv->iov_len = 0; - rqstp->rq_res.pages = rqstp->rq_respages + 1; - rqstp->rq_res.len = 0; - rqstp->rq_res.page_base = 0; - rqstp->rq_res.page_len = 0; - rqstp->rq_res.buflen = PAGE_SIZE; - rqstp->rq_res.tail[0].iov_base = NULL; - rqstp->rq_res.tail[0].iov_len = 0; /* Will be turned off only in gss privacy case: */ rqstp->rq_splice_ok = 1; /* Will be turned off only when NFSv4 Sessions are used */ @@ -1014,17 +1004,13 @@ svc_process(struct svc_rqst *rqstp) /* Setup reply header */ rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp); - rqstp->rq_xid = svc_getu32(argv); svc_putu32(resv, rqstp->rq_xid); - dir = svc_getnl(argv); vers = svc_getnl(argv); /* First words of reply: */ svc_putnl(resv, 1); /* REPLY */ - if (dir != 0) /* direction != CALL */ - goto err_bad_dir; if (vers != 2) /* RPC version number */ goto err_bad_rpc; @@ -1147,7 +1133,7 @@ svc_process(struct svc_rqst *rqstp) sendit: if (svc_authorise(rqstp)) goto dropit; - return svc_send(rqstp); + return 1; /* Caller can now send it */ dropit: svc_authorise(rqstp); /* doesn't hurt to call this twice */ @@ -1161,12 +1147,6 @@ err_short_len: goto dropit; /* drop request */ -err_bad_dir: - svc_printk(rqstp, "bad direction %d, dropping request\n", dir); - - serv->sv_stats->rpcbadfmt++; - goto dropit; /* drop request */ - err_bad_rpc: serv->sv_stats->rpcbadfmt++; svc_putnl(resv, 1); /* REJECT */ @@ -1220,6 +1200,100 @@ err_bad: EXPORT_SYMBOL_GPL(svc_process); /* + * Process the RPC request. + */ +int +svc_process(struct svc_rqst *rqstp) +{ + struct kvec *argv = &rqstp->rq_arg.head[0]; + struct kvec *resv = &rqstp->rq_res.head[0]; + struct svc_serv *serv = rqstp->rq_server; + u32 dir; + int error; + + /* + * Setup response xdr_buf. + * Initially it has just one page + */ + rqstp->rq_resused = 1; + resv->iov_base = page_address(rqstp->rq_respages[0]); + resv->iov_len = 0; + rqstp->rq_res.pages = rqstp->rq_respages + 1; + rqstp->rq_res.len = 0; + rqstp->rq_res.page_base = 0; + rqstp->rq_res.page_len = 0; + rqstp->rq_res.buflen = PAGE_SIZE; + rqstp->rq_res.tail[0].iov_base = NULL; + rqstp->rq_res.tail[0].iov_len = 0; + + rqstp->rq_xid = svc_getu32(argv); + + dir = svc_getnl(argv); + if (dir != 0) { + /* direction != CALL */ + svc_printk(rqstp, "bad direction %d, dropping request\n", dir); + serv->sv_stats->rpcbadfmt++; + svc_drop(rqstp); + return 0; + } + + error = svc_process_common(rqstp, argv, resv); + if (error <= 0) + return error; + + return svc_send(rqstp); +} + +#if defined(CONFIG_NFS_V4_1) +/* + * Process a backchannel RPC request that arrived over an existing + * outbound connection + */ +int +bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, + struct svc_rqst *rqstp) +{ + struct kvec *argv = &rqstp->rq_arg.head[0]; + struct kvec *resv = &rqstp->rq_res.head[0]; + int error; + + /* Build the svc_rqst used by the common processing routine */ + rqstp->rq_xprt = serv->bc_xprt; + rqstp->rq_xid = req->rq_xid; + rqstp->rq_prot = req->rq_xprt->prot; + rqstp->rq_server = serv; + + rqstp->rq_addrlen = sizeof(req->rq_xprt->addr); + memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen); + memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg)); + memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res)); + + /* reset result send buffer "put" position */ + resv->iov_len = 0; + + if (rqstp->rq_prot != IPPROTO_TCP) { + printk(KERN_ERR "No support for Non-TCP transports!\n"); + BUG(); + } + + /* + * Skip the next two words because they've already been + * processed in the trasport + */ + svc_getu32(argv); /* XID */ + svc_getnl(argv); /* CALLDIR */ + + error = svc_process_common(rqstp, argv, resv); + if (error <= 0) + return error; + + memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf)); + return bc_send(req); +} +EXPORT_SYMBOL(bc_svc_process); +#endif /* CONFIG_NFS_V4_1 */ + +/* * Return (transport-specific) limit on the rpc payload. */ u32 svc_max_payload(const struct svc_rqst *rqstp) diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 9d504234af4a..a2a03e500533 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -1327,3 +1327,42 @@ static void svc_sock_free(struct svc_xprt *xprt) sock_release(svsk->sk_sock); kfree(svsk); } + +/* + * Create a svc_xprt. + * + * For internal use only (e.g. nfsv4.1 backchannel). + * Callers should typically use the xpo_create() method. + */ +struct svc_xprt *svc_sock_create(struct svc_serv *serv, int prot) +{ + struct svc_sock *svsk; + struct svc_xprt *xprt = NULL; + + dprintk("svc: %s\n", __func__); + svsk = kzalloc(sizeof(*svsk), GFP_KERNEL); + if (!svsk) + goto out; + + xprt = &svsk->sk_xprt; + if (prot == IPPROTO_TCP) + svc_xprt_init(&svc_tcp_class, xprt, serv); + else if (prot == IPPROTO_UDP) + svc_xprt_init(&svc_udp_class, xprt, serv); + else + BUG(); +out: + dprintk("svc: %s return %p\n", __func__, xprt); + return xprt; +} +EXPORT_SYMBOL_GPL(svc_sock_create); + +/* + * Destroy a svc_sock. + */ +void svc_sock_destroy(struct svc_xprt *xprt) +{ + if (xprt) + kfree(container_of(xprt, struct svc_sock, sk_xprt)); +} +EXPORT_SYMBOL_GPL(svc_sock_destroy); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 06ca058572f2..f412a852bc73 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -12,8 +12,9 @@ * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the - * transport's wait list. At the same time, it installs a timer that - * is run after the packet's timeout has expired. + * transport's wait list. At the same time, if a reply is expected, + * it installs a timer that is run after the packet's timeout has + * expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. @@ -46,6 +47,8 @@ #include <linux/sunrpc/clnt.h> #include <linux/sunrpc/metrics.h> +#include "sunrpc.h" + /* * Local variables */ @@ -192,8 +195,8 @@ EXPORT_SYMBOL_GPL(xprt_load_transport); */ int xprt_reserve_xprt(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) @@ -803,9 +806,10 @@ void xprt_complete_rqst(struct rpc_task *task, int copied) list_del_init(&req->rq_list); req->rq_private_buf.len = copied; - /* Ensure all writes are done before we update req->rq_received */ + /* Ensure all writes are done before we update */ + /* req->rq_reply_bytes_recvd */ smp_wmb(); - req->rq_received = copied; + req->rq_reply_bytes_recvd = copied; rpc_wake_up_queued_task(&xprt->pending, task); } EXPORT_SYMBOL_GPL(xprt_complete_rqst); @@ -820,7 +824,7 @@ static void xprt_timer(struct rpc_task *task) dprintk("RPC: %5u xprt_timer\n", task->tk_pid); spin_lock_bh(&xprt->transport_lock); - if (!req->rq_received) { + if (!req->rq_reply_bytes_recvd) { if (xprt->ops->timer) xprt->ops->timer(task); } else @@ -842,8 +846,8 @@ int xprt_prepare_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); spin_lock_bh(&xprt->transport_lock); - if (req->rq_received && !req->rq_bytes_sent) { - err = req->rq_received; + if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { + err = req->rq_reply_bytes_recvd; goto out_unlock; } if (!xprt->ops->reserve_xprt(task)) @@ -855,7 +859,7 @@ out_unlock: void xprt_end_transmit(struct rpc_task *task) { - xprt_release_write(task->tk_xprt, task); + xprt_release_write(task->tk_rqstp->rq_xprt, task); } /** @@ -872,8 +876,11 @@ void xprt_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); - if (!req->rq_received) { - if (list_empty(&req->rq_list)) { + if (!req->rq_reply_bytes_recvd) { + if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { + /* + * Add to the list only if we're expecting a reply + */ spin_lock_bh(&xprt->transport_lock); /* Update the softirq receive buffer */ memcpy(&req->rq_private_buf, &req->rq_rcv_buf, @@ -908,8 +915,13 @@ void xprt_transmit(struct rpc_task *task) /* Don't race with disconnect */ if (!xprt_connected(xprt)) task->tk_status = -ENOTCONN; - else if (!req->rq_received) + else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { + /* + * Sleep on the pending queue since + * we're expecting a reply. + */ rpc_sleep_on(&xprt->pending, task, xprt_timer); + } spin_unlock_bh(&xprt->transport_lock); } @@ -982,11 +994,17 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) */ void xprt_release(struct rpc_task *task) { - struct rpc_xprt *xprt = task->tk_xprt; + struct rpc_xprt *xprt; struct rpc_rqst *req; + int is_bc_request; if (!(req = task->tk_rqstp)) return; + + /* Preallocated backchannel request? */ + is_bc_request = bc_prealloc(req); + + xprt = req->rq_xprt; rpc_count_iostats(task); spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); @@ -999,10 +1017,19 @@ void xprt_release(struct rpc_task *task) mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); spin_unlock_bh(&xprt->transport_lock); - xprt->ops->buf_free(req->rq_buffer); + if (!bc_prealloc(req)) + xprt->ops->buf_free(req->rq_buffer); task->tk_rqstp = NULL; if (req->rq_release_snd_buf) req->rq_release_snd_buf(req); + + /* + * Early exit if this is a backchannel preallocated request. + * There is no need to have it added to the RPC slot list. + */ + if (is_bc_request) + return; + memset(req, 0, sizeof(*req)); /* mark unused */ dprintk("RPC: %5u release request %p\n", task->tk_pid, req); @@ -1049,6 +1076,11 @@ found: INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv); +#if defined(CONFIG_NFS_V4_1) + spin_lock_init(&xprt->bc_pa_lock); + INIT_LIST_HEAD(&xprt->bc_pa_list); +#endif /* CONFIG_NFS_V4_1 */ + INIT_WORK(&xprt->task_cleanup, xprt_autoclose); setup_timer(&xprt->timer, xprt_init_autodisconnect, (unsigned long)xprt); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 6c2d61586551..9111d11c09fd 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -34,6 +34,9 @@ #include <linux/sunrpc/sched.h> #include <linux/sunrpc/xprtsock.h> #include <linux/file.h> +#ifdef CONFIG_NFS_V4_1 +#include <linux/sunrpc/bc_xprt.h> +#endif #include <net/sock.h> #include <net/checksum.h> @@ -270,6 +273,13 @@ struct sock_xprt { #define TCP_RCV_COPY_FRAGHDR (1UL << 1) #define TCP_RCV_COPY_XID (1UL << 2) #define TCP_RCV_COPY_DATA (1UL << 3) +#define TCP_RCV_READ_CALLDIR (1UL << 4) +#define TCP_RCV_COPY_CALLDIR (1UL << 5) + +/* + * TCP RPC flags + */ +#define TCP_RPC_REPLY (1UL << 6) static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) { @@ -956,7 +966,7 @@ static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_rea transport->tcp_offset = 0; /* Sanity check of the record length */ - if (unlikely(transport->tcp_reclen < 4)) { + if (unlikely(transport->tcp_reclen < 8)) { dprintk("RPC: invalid TCP record fragment length\n"); xprt_force_disconnect(xprt); return; @@ -991,33 +1001,77 @@ static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_r if (used != len) return; transport->tcp_flags &= ~TCP_RCV_COPY_XID; - transport->tcp_flags |= TCP_RCV_COPY_DATA; + transport->tcp_flags |= TCP_RCV_READ_CALLDIR; transport->tcp_copied = 4; - dprintk("RPC: reading reply for XID %08x\n", + dprintk("RPC: reading %s XID %08x\n", + (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" + : "request with", ntohl(transport->tcp_xid)); xs_tcp_check_fraghdr(transport); } -static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) +static inline void xs_tcp_read_calldir(struct sock_xprt *transport, + struct xdr_skb_reader *desc) { - struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - struct rpc_rqst *req; + size_t len, used; + u32 offset; + __be32 calldir; + + /* + * We want transport->tcp_offset to be 8 at the end of this routine + * (4 bytes for the xid and 4 bytes for the call/reply flag). + * When this function is called for the first time, + * transport->tcp_offset is 4 (after having already read the xid). + */ + offset = transport->tcp_offset - sizeof(transport->tcp_xid); + len = sizeof(calldir) - offset; + dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); + used = xdr_skb_read_bits(desc, &calldir, len); + transport->tcp_offset += used; + if (used != len) + return; + transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; + transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; + transport->tcp_flags |= TCP_RCV_COPY_DATA; + /* + * We don't yet have the XDR buffer, so we will write the calldir + * out after we get the buffer from the 'struct rpc_rqst' + */ + if (ntohl(calldir) == RPC_REPLY) + transport->tcp_flags |= TCP_RPC_REPLY; + else + transport->tcp_flags &= ~TCP_RPC_REPLY; + dprintk("RPC: reading %s CALL/REPLY flag %08x\n", + (transport->tcp_flags & TCP_RPC_REPLY) ? + "reply for" : "request with", calldir); + xs_tcp_check_fraghdr(transport); +} + +static inline void xs_tcp_read_common(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc, + struct rpc_rqst *req) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *rcvbuf; size_t len; ssize_t r; - /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); - req = xprt_lookup_rqst(xprt, transport->tcp_xid); - if (!req) { - transport->tcp_flags &= ~TCP_RCV_COPY_DATA; - dprintk("RPC: XID %08x request not found!\n", - ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); - return; + rcvbuf = &req->rq_private_buf; + + if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { + /* + * Save the RPC direction in the XDR buffer + */ + __be32 calldir = transport->tcp_flags & TCP_RPC_REPLY ? + htonl(RPC_REPLY) : 0; + + memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, + &calldir, sizeof(calldir)); + transport->tcp_copied += sizeof(calldir); + transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; } - rcvbuf = &req->rq_private_buf; len = desc->count; if (len > transport->tcp_reclen - transport->tcp_offset) { struct xdr_skb_reader my_desc; @@ -1054,7 +1108,7 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea "tcp_offset = %u, tcp_reclen = %u\n", xprt, transport->tcp_copied, transport->tcp_offset, transport->tcp_reclen); - goto out; + return; } dprintk("RPC: XID %08x read %Zd bytes\n", @@ -1070,11 +1124,125 @@ static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_rea transport->tcp_flags &= ~TCP_RCV_COPY_DATA; } -out: + return; +} + +/* + * Finds the request corresponding to the RPC xid and invokes the common + * tcp read code to read the data. + */ +static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_rqst *req; + + dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); + + /* Find and lock the request corresponding to this xid */ + spin_lock(&xprt->transport_lock); + req = xprt_lookup_rqst(xprt, transport->tcp_xid); + if (!req) { + dprintk("RPC: XID %08x request not found!\n", + ntohl(transport->tcp_xid)); + spin_unlock(&xprt->transport_lock); + return -1; + } + + xs_tcp_read_common(xprt, desc, req); + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); + spin_unlock(&xprt->transport_lock); - xs_tcp_check_fraghdr(transport); + return 0; +} + +#if defined(CONFIG_NFS_V4_1) +/* + * Obtains an rpc_rqst previously allocated and invokes the common + * tcp read code to read the data. The result is placed in the callback + * queue. + * If we're unable to obtain the rpc_rqst we schedule the closing of the + * connection and return -1. + */ +static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + struct rpc_rqst *req; + + req = xprt_alloc_bc_request(xprt); + if (req == NULL) { + printk(KERN_WARNING "Callback slot table overflowed\n"); + xprt_force_disconnect(xprt); + return -1; + } + + req->rq_xid = transport->tcp_xid; + dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); + xs_tcp_read_common(xprt, desc, req); + + if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { + struct svc_serv *bc_serv = xprt->bc_serv; + + /* + * Add callback request to callback list. The callback + * service sleeps on the sv_cb_waitq waiting for new + * requests. Wake it up after adding enqueing the + * request. + */ + dprintk("RPC: add callback request to list\n"); + spin_lock(&bc_serv->sv_cb_lock); + list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + wake_up(&bc_serv->sv_cb_waitq); + } + + req->rq_private_buf.len = transport->tcp_copied; + + return 0; +} + +static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + return (transport->tcp_flags & TCP_RPC_REPLY) ? + xs_tcp_read_reply(xprt, desc) : + xs_tcp_read_callback(xprt, desc); +} +#else +static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + return xs_tcp_read_reply(xprt, desc); +} +#endif /* CONFIG_NFS_V4_1 */ + +/* + * Read data off the transport. This can be either an RPC_CALL or an + * RPC_REPLY. Relay the processing to helper functions. + */ +static void xs_tcp_read_data(struct rpc_xprt *xprt, + struct xdr_skb_reader *desc) +{ + struct sock_xprt *transport = + container_of(xprt, struct sock_xprt, xprt); + + if (_xs_tcp_read_data(xprt, desc) == 0) + xs_tcp_check_fraghdr(transport); + else { + /* + * The transport_lock protects the request handling. + * There's no need to hold it to update the tcp_flags. + */ + transport->tcp_flags &= ~TCP_RCV_COPY_DATA; + } } static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) @@ -1114,9 +1282,14 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns xs_tcp_read_xid(transport, &desc); continue; } + /* Read in the call/reply flag */ + if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { + xs_tcp_read_calldir(transport, &desc); + continue; + } /* Read in the request data */ if (transport->tcp_flags & TCP_RCV_COPY_DATA) { - xs_tcp_read_request(xprt, &desc); + xs_tcp_read_data(xprt, &desc); continue; } /* Skip over any trailing bytes on short reads */ @@ -2010,6 +2183,9 @@ static struct rpc_xprt_ops xs_tcp_ops = { .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, +#if defined(CONFIG_NFS_V4_1) + .release_request = bc_release_request, +#endif /* CONFIG_NFS_V4_1 */ .close = xs_tcp_close, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats, |