diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2018-06-12 09:49:33 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2018-06-12 09:49:33 -0700 |
commit | 89e255678fec5a1a9ed59664a62212d19873aedc (patch) | |
tree | eac17093cf2122f54d599f6ffbda0206064a324a /net | |
parent | 8efcf34a263965e471e3999904f94d1f6799d42a (diff) | |
parent | 692ad280bff3e81721ab138b9455948ab5289acf (diff) | |
download | linux-89e255678fec5a1a9ed59664a62212d19873aedc.tar.bz2 |
Merge tag 'nfsd-4.18' of git://linux-nfs.org/~bfields/linux
Pull nfsd updates from Bruce Fields:
"A relatively quiet cycle for nfsd.
The largest piece is an RDMA update from Chuck Lever with new trace
points, miscellaneous cleanups, and streamlining of the send and
receive paths.
Other than that, some miscellaneous bugfixes"
* tag 'nfsd-4.18' of git://linux-nfs.org/~bfields/linux: (26 commits)
nfsd: fix error handling in nfs4_set_delegation()
nfsd: fix potential use-after-free in nfsd4_decode_getdeviceinfo
Fix 16-byte memory leak in gssp_accept_sec_context_upcall
svcrdma: Fix incorrect return value/type in svc_rdma_post_recvs
svcrdma: Remove unused svc_rdma_op_ctxt
svcrdma: Persistently allocate and DMA-map Send buffers
svcrdma: Simplify svc_rdma_send()
svcrdma: Remove post_send_wr
svcrdma: Don't overrun the SGE array in svc_rdma_send_ctxt
svcrdma: Introduce svc_rdma_send_ctxt
svcrdma: Clean up Send SGE accounting
svcrdma: Refactor svc_rdma_dma_map_buf
svcrdma: Allocate recv_ctxt's on CPU handling Receives
svcrdma: Persistently allocate and DMA-map Receive buffers
svcrdma: Preserve Receive buffer until svc_rdma_sendto
svcrdma: Simplify svc_rdma_recv_ctxt_put
svcrdma: Remove sc_rq_depth
svcrdma: Introduce svc_rdma_recv_ctxt
svcrdma: Trace key RDMA API events
svcrdma: Trace key RPC/RDMA protocol events
...
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/auth_gss/gss_rpc_upcall.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 3 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/module.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 7 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 3 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 54 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 439 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_rw.c | 133 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 510 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 481 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 4 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 1 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 2 |
15 files changed, 830 insertions, 819 deletions
diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c index 46b295e4f2b8..d98e2b610ce8 100644 --- a/net/sunrpc/auth_gss/gss_rpc_upcall.c +++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c @@ -298,9 +298,11 @@ int gssp_accept_sec_context_upcall(struct net *net, if (res.context_handle) { data->out_handle = rctxh.exported_context_token; data->mech_oid.len = rctxh.mech.len; - if (rctxh.mech.data) + if (rctxh.mech.data) { memcpy(data->mech_oid.data, rctxh.mech.data, data->mech_oid.len); + kfree(rctxh.mech.data); + } client_name = rctxh.src_name.display_name; } diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 47ebac949769..dbedc872ec10 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -9,8 +9,10 @@ #include <linux/sunrpc/xprt.h> #include <linux/sunrpc/svc.h> #include <linux/sunrpc/svc_xprt.h> +#include <linux/sunrpc/svc_rdma.h> #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index f2f63959fddd..3c8d19f6e320 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -20,7 +20,10 @@ * verb (fmr_op_unmap). */ +#include <linux/sunrpc/svc_rdma.h> + #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index c59c5c788db0..2d2fefbb810b 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -71,8 +71,10 @@ */ #include <linux/sunrpc/rpc_rdma.h> +#include <linux/sunrpc/svc_rdma.h> #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS diff --git a/net/sunrpc/xprtrdma/module.c b/net/sunrpc/xprtrdma/module.c index a762d192372b..d95ac0736b7f 100644 --- a/net/sunrpc/xprtrdma/module.c +++ b/net/sunrpc/xprtrdma/module.c @@ -13,9 +13,11 @@ #include <asm/swab.h> -#define CREATE_TRACE_POINTS #include "xprt_rdma.h" +#define CREATE_TRACE_POINTS +#include <trace/events/rpcrdma.h> + MODULE_AUTHOR("Open Grid Computing and Network Appliance, Inc."); MODULE_DESCRIPTION("RPC/RDMA Transport"); MODULE_LICENSE("Dual BSD/GPL"); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index e8adad33d0bb..b942d7e0aef5 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -46,10 +46,13 @@ * to the Linux RPC framework lives. */ -#include "xprt_rdma.h" - #include <linux/highmem.h> +#include <linux/sunrpc/svc_rdma.h> + +#include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS #endif diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index dd8a431dc2ae..357ba90c382d 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -1,4 +1,6 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* + * Copyright (c) 2015-2018 Oracle. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two @@ -46,7 +48,6 @@ #include <linux/sunrpc/clnt.h> #include <linux/sunrpc/sched.h> #include <linux/sunrpc/svc_rdma.h> -#include "xprt_rdma.h" #define RPCDBG_FACILITY RPCDBG_SVCXPRT diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index a73632ca9048..343e7add672c 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -1,13 +1,16 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2015 Oracle. All rights reserved. + * Copyright (c) 2015-2018 Oracle. All rights reserved. * * Support for backward direction RPCs on RPC/RDMA (server-side). */ #include <linux/module.h> + #include <linux/sunrpc/svc_rdma.h> + #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #define RPCDBG_FACILITY RPCDBG_SVCXPRT @@ -112,39 +115,21 @@ out_notfound: * the adapter has a small maximum SQ depth. */ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, - struct rpc_rqst *rqst) + struct rpc_rqst *rqst, + struct svc_rdma_send_ctxt *ctxt) { - struct svc_rdma_op_ctxt *ctxt; int ret; - ctxt = svc_rdma_get_context(rdma); - - /* rpcrdma_bc_send_request builds the transport header and - * the backchannel RPC message in the same buffer. Thus only - * one SGE is needed to send both. - */ - ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer, - rqst->rq_snd_buf.len); + ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL); if (ret < 0) - goto out_err; + return -EIO; /* Bump page refcnt so Send completion doesn't release * the rq_buffer before all retransmits are complete. */ get_page(virt_to_page(rqst->rq_buffer)); - ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0); - if (ret) - goto out_unmap; - -out_err: - dprintk("svcrdma: %s returns %d\n", __func__, ret); - return ret; - -out_unmap: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - ret = -EIO; - goto out_err; + ctxt->sc_send_wr.opcode = IB_WR_SEND; + return svc_rdma_send(rdma, &ctxt->sc_send_wr); } /* Server-side transport endpoint wants a whole page for its send @@ -191,13 +176,15 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) { struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct svc_rdma_send_ctxt *ctxt; __be32 *p; int rc; - /* Space in the send buffer for an RPC/RDMA header is reserved - * via xprt->tsh_size. - */ - p = rqst->rq_buffer; + ctxt = svc_rdma_send_ctxt_get(rdma); + if (!ctxt) + goto drop_connection; + + p = ctxt->sc_xprt_buf; *p++ = rqst->rq_xid; *p++ = rpcrdma_version; *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests); @@ -205,14 +192,17 @@ rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) *p++ = xdr_zero; *p++ = xdr_zero; *p = xdr_zero; + svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_MIN); #ifdef SVCRDMA_BACKCHANNEL_DEBUG pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer); #endif - rc = svc_rdma_bc_sendto(rdma, rqst); - if (rc) + rc = svc_rdma_bc_sendto(rdma, rqst, ctxt); + if (rc) { + svc_rdma_send_ctxt_put(rdma, ctxt); goto drop_connection; + } return rc; drop_connection: @@ -320,7 +310,7 @@ xprt_setup_rdma_bc(struct xprt_create *args) xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; xprt->prot = XPRT_TRANSPORT_BC_RDMA; - xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32); + xprt->tsh_size = 0; xprt->ops = &xprt_rdma_bc_procs; memcpy(&xprt->addr, args->dstaddr, args->addrlen); diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 3d45015dca97..841fca143804 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -1,5 +1,6 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* - * Copyright (c) 2016, 2017 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * @@ -60,7 +61,7 @@ * svc_rdma_recvfrom must post RDMA Reads to pull the RPC Call's * data payload from the client. svc_rdma_recvfrom sets up the * RDMA Reads using pages in svc_rqst::rq_pages, which are - * transferred to an svc_rdma_op_ctxt for the duration of the + * transferred to an svc_rdma_recv_ctxt for the duration of the * I/O. svc_rdma_recvfrom then returns zero, since the RPC message * is still not yet ready. * @@ -69,18 +70,18 @@ * svc_rdma_recvfrom again. This second call may use a different * svc_rqst than the first one, thus any information that needs * to be preserved across these two calls is kept in an - * svc_rdma_op_ctxt. + * svc_rdma_recv_ctxt. * * The second call to svc_rdma_recvfrom performs final assembly * of the RPC Call message, using the RDMA Read sink pages kept in - * the svc_rdma_op_ctxt. The xdr_buf is copied from the - * svc_rdma_op_ctxt to the second svc_rqst. The second call returns + * the svc_rdma_recv_ctxt. The xdr_buf is copied from the + * svc_rdma_recv_ctxt to the second svc_rqst. The second call returns * the length of the completed RPC Call message. * * Page Management * * Pages under I/O must be transferred from the first svc_rqst to an - * svc_rdma_op_ctxt before the first svc_rdma_recvfrom call returns. + * svc_rdma_recv_ctxt before the first svc_rdma_recvfrom call returns. * * The first svc_rqst supplies pages for RDMA Reads. These are moved * from rqstp::rq_pages into ctxt::pages. The consumed elements of @@ -88,78 +89,286 @@ * svc_rdma_recvfrom call returns. * * During the second svc_rdma_recvfrom call, RDMA Read sink pages - * are transferred from the svc_rdma_op_ctxt to the second svc_rqst + * are transferred from the svc_rdma_recv_ctxt to the second svc_rqst * (see rdma_read_complete() below). */ +#include <linux/spinlock.h> #include <asm/unaligned.h> #include <rdma/ib_verbs.h> #include <rdma/rdma_cm.h> -#include <linux/spinlock.h> - #include <linux/sunrpc/xdr.h> #include <linux/sunrpc/debug.h> #include <linux/sunrpc/rpc_rdma.h> #include <linux/sunrpc/svc_rdma.h> +#include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> + #define RPCDBG_FACILITY RPCDBG_SVCXPRT -/* - * Replace the pages in the rq_argpages array with the pages from the SGE in - * the RDMA_RECV completion. The SGL should contain full pages up until the - * last one. +static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc); + +static inline struct svc_rdma_recv_ctxt * +svc_rdma_next_recv_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt, + rc_list); +} + +static struct svc_rdma_recv_ctxt * +svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + dma_addr_t addr; + void *buffer; + + ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); + if (!ctxt) + goto fail0; + buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); + if (!buffer) + goto fail1; + addr = ib_dma_map_single(rdma->sc_pd->device, buffer, + rdma->sc_max_req_size, DMA_FROM_DEVICE); + if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) + goto fail2; + + ctxt->rc_recv_wr.next = NULL; + ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe; + ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge; + ctxt->rc_recv_wr.num_sge = 1; + ctxt->rc_cqe.done = svc_rdma_wc_receive; + ctxt->rc_recv_sge.addr = addr; + ctxt->rc_recv_sge.length = rdma->sc_max_req_size; + ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey; + ctxt->rc_recv_buf = buffer; + ctxt->rc_temp = false; + return ctxt; + +fail2: + kfree(buffer); +fail1: + kfree(ctxt); +fail0: + return NULL; +} + +static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma, + struct svc_rdma_recv_ctxt *ctxt) +{ + ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr, + ctxt->rc_recv_sge.length, DMA_FROM_DEVICE); + kfree(ctxt->rc_recv_buf); + kfree(ctxt); +} + +/** + * svc_rdma_recv_ctxts_destroy - Release all recv_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * */ -static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) +void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) { - struct page *page; - int sge_no; - u32 len; + struct svc_rdma_recv_ctxt *ctxt; - /* The reply path assumes the Call's transport header resides - * in rqstp->rq_pages[0]. - */ - page = ctxt->pages[0]; - put_page(rqstp->rq_pages[0]); - rqstp->rq_pages[0] = page; - - /* Set up the XDR head */ - rqstp->rq_arg.head[0].iov_base = page_address(page); - rqstp->rq_arg.head[0].iov_len = - min_t(size_t, ctxt->byte_len, ctxt->sge[0].length); - rqstp->rq_arg.len = ctxt->byte_len; - rqstp->rq_arg.buflen = ctxt->byte_len; - - /* Compute bytes past head in the SGL */ - len = ctxt->byte_len - rqstp->rq_arg.head[0].iov_len; - - /* If data remains, store it in the pagelist */ - rqstp->rq_arg.page_len = len; - rqstp->rq_arg.page_base = 0; - - sge_no = 1; - while (len && sge_no < ctxt->count) { - page = ctxt->pages[sge_no]; - put_page(rqstp->rq_pages[sge_no]); - rqstp->rq_pages[sge_no] = page; - len -= min_t(u32, len, ctxt->sge[sge_no].length); - sge_no++; + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_destroy(rdma, ctxt); } - rqstp->rq_respages = &rqstp->rq_pages[sge_no]; - rqstp->rq_next_page = rqstp->rq_respages + 1; +} + +static struct svc_rdma_recv_ctxt * +svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + + spin_lock(&rdma->sc_recv_lock); + ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_recv_ctxts); + if (!ctxt) + goto out_empty; + list_del(&ctxt->rc_list); + spin_unlock(&rdma->sc_recv_lock); + +out: + ctxt->rc_page_count = 0; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_recv_lock); + + ctxt = svc_rdma_recv_ctxt_alloc(rdma); + if (!ctxt) + return NULL; + goto out; +} + +/** + * svc_rdma_recv_ctxt_put - Return recv_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + */ +void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_recv_ctxt *ctxt) +{ + unsigned int i; + + for (i = 0; i < ctxt->rc_page_count; i++) + put_page(ctxt->rc_pages[i]); + + if (!ctxt->rc_temp) { + spin_lock(&rdma->sc_recv_lock); + list_add(&ctxt->rc_list, &rdma->sc_recv_ctxts); + spin_unlock(&rdma->sc_recv_lock); + } else + svc_rdma_recv_ctxt_destroy(rdma, ctxt); +} + +static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct ib_recv_wr *bad_recv_wr; + int ret; + + svc_xprt_get(&rdma->sc_xprt); + ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, &bad_recv_wr); + trace_svcrdma_post_recv(&ctxt->rc_recv_wr, ret); + if (ret) + goto err_post; + return 0; + +err_post: + svc_rdma_recv_ctxt_put(rdma, ctxt); + svc_xprt_put(&rdma->sc_xprt); + return ret; +} - /* If not all pages were used from the SGL, free the remaining ones */ - len = sge_no; - while (sge_no < ctxt->count) { - page = ctxt->pages[sge_no++]; - put_page(page); +static int svc_rdma_post_recv(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + + ctxt = svc_rdma_recv_ctxt_get(rdma); + if (!ctxt) + return -ENOMEM; + return __svc_rdma_post_recv(rdma, ctxt); +} + +/** + * svc_rdma_post_recvs - Post initial set of Recv WRs + * @rdma: fresh svcxprt_rdma + * + * Returns true if successful, otherwise false. + */ +bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + unsigned int i; + int ret; + + for (i = 0; i < rdma->sc_max_requests; i++) { + ctxt = svc_rdma_recv_ctxt_get(rdma); + if (!ctxt) + return false; + ctxt->rc_temp = true; + ret = __svc_rdma_post_recv(rdma, ctxt); + if (ret) { + pr_err("svcrdma: failure posting recv buffers: %d\n", + ret); + return false; + } } - ctxt->count = len; + return true; +} - /* Set up tail */ - rqstp->rq_arg.tail[0].iov_base = NULL; - rqstp->rq_arg.tail[0].iov_len = 0; +/** + * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC + * @cq: Completion Queue context + * @wc: Work Completion object + * + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Receive completion handler could be running. + */ +static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) +{ + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_recv_ctxt *ctxt; + + trace_svcrdma_wc_receive(wc); + + /* WARNING: Only wc->wr_cqe and wc->status are reliable */ + ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe); + + if (wc->status != IB_WC_SUCCESS) + goto flushed; + + if (svc_rdma_post_recv(rdma)) + goto post_err; + + /* All wc fields are now known to be valid */ + ctxt->rc_byte_len = wc->byte_len; + ib_dma_sync_single_for_cpu(rdma->sc_pd->device, + ctxt->rc_recv_sge.addr, + wc->byte_len, DMA_FROM_DEVICE); + + spin_lock(&rdma->sc_rq_dto_lock); + list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q); + spin_unlock(&rdma->sc_rq_dto_lock); + set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); + if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) + svc_xprt_enqueue(&rdma->sc_xprt); + goto out; + +flushed: + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: Recv: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); +post_err: + svc_rdma_recv_ctxt_put(rdma, ctxt); + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); +out: + svc_xprt_put(&rdma->sc_xprt); +} + +/** + * svc_rdma_flush_recv_queues - Drain pending Receive work + * @rdma: svcxprt_rdma being shut down + * + */ +void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_recv_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_put(rdma, ctxt); + } + while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) { + list_del(&ctxt->rc_list); + svc_rdma_recv_ctxt_put(rdma, ctxt); + } +} + +static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp, + struct svc_rdma_recv_ctxt *ctxt) +{ + struct xdr_buf *arg = &rqstp->rq_arg; + + arg->head[0].iov_base = ctxt->rc_recv_buf; + arg->head[0].iov_len = ctxt->rc_byte_len; + arg->tail[0].iov_base = NULL; + arg->tail[0].iov_len = 0; + arg->page_len = 0; + arg->page_base = 0; + arg->buflen = ctxt->rc_byte_len; + arg->len = ctxt->rc_byte_len; + + rqstp->rq_respages = &rqstp->rq_pages[0]; + rqstp->rq_next_page = rqstp->rq_respages + 1; } /* This accommodates the largest possible Write chunk, @@ -294,7 +503,6 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) { __be32 *p, *end, *rdma_argp; unsigned int hdr_len; - char *proc; /* Verify that there's enough bytes for header + something */ if (rq_arg->len <= RPCRDMA_HDRLEN_ERR) @@ -306,10 +514,8 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) switch (*(rdma_argp + 3)) { case rdma_msg: - proc = "RDMA_MSG"; break; case rdma_nomsg: - proc = "RDMA_NOMSG"; break; case rdma_done: @@ -339,103 +545,94 @@ static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) hdr_len = (unsigned long)p - (unsigned long)rdma_argp; rq_arg->head[0].iov_len -= hdr_len; rq_arg->len -= hdr_len; - dprintk("svcrdma: received %s request for XID 0x%08x, hdr_len=%u\n", - proc, be32_to_cpup(rdma_argp), hdr_len); + trace_svcrdma_decode_rqst(rdma_argp, hdr_len); return hdr_len; out_short: - dprintk("svcrdma: header too short = %d\n", rq_arg->len); + trace_svcrdma_decode_short(rq_arg->len); return -EINVAL; out_version: - dprintk("svcrdma: bad xprt version: %u\n", - be32_to_cpup(rdma_argp + 1)); + trace_svcrdma_decode_badvers(rdma_argp); return -EPROTONOSUPPORT; out_drop: - dprintk("svcrdma: dropping RDMA_DONE/ERROR message\n"); + trace_svcrdma_decode_drop(rdma_argp); return 0; out_proc: - dprintk("svcrdma: bad rdma procedure (%u)\n", - be32_to_cpup(rdma_argp + 3)); + trace_svcrdma_decode_badproc(rdma_argp); return -EINVAL; out_inval: - dprintk("svcrdma: failed to parse transport header\n"); + trace_svcrdma_decode_parse(rdma_argp); return -EINVAL; } static void rdma_read_complete(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head) + struct svc_rdma_recv_ctxt *head) { int page_no; - /* Copy RPC pages */ - for (page_no = 0; page_no < head->count; page_no++) { + /* Move Read chunk pages to rqstp so that they will be released + * when svc_process is done with them. + */ + for (page_no = 0; page_no < head->rc_page_count; page_no++) { put_page(rqstp->rq_pages[page_no]); - rqstp->rq_pages[page_no] = head->pages[page_no]; + rqstp->rq_pages[page_no] = head->rc_pages[page_no]; } + head->rc_page_count = 0; /* Point rq_arg.pages past header */ - rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; - rqstp->rq_arg.page_len = head->arg.page_len; + rqstp->rq_arg.pages = &rqstp->rq_pages[head->rc_hdr_count]; + rqstp->rq_arg.page_len = head->rc_arg.page_len; /* rq_respages starts after the last arg page */ rqstp->rq_respages = &rqstp->rq_pages[page_no]; rqstp->rq_next_page = rqstp->rq_respages + 1; /* Rebuild rq_arg head and tail. */ - rqstp->rq_arg.head[0] = head->arg.head[0]; - rqstp->rq_arg.tail[0] = head->arg.tail[0]; - rqstp->rq_arg.len = head->arg.len; - rqstp->rq_arg.buflen = head->arg.buflen; + rqstp->rq_arg.head[0] = head->rc_arg.head[0]; + rqstp->rq_arg.tail[0] = head->rc_arg.tail[0]; + rqstp->rq_arg.len = head->rc_arg.len; + rqstp->rq_arg.buflen = head->rc_arg.buflen; } static void svc_rdma_send_error(struct svcxprt_rdma *xprt, __be32 *rdma_argp, int status) { - struct svc_rdma_op_ctxt *ctxt; - __be32 *p, *err_msgp; + struct svc_rdma_send_ctxt *ctxt; unsigned int length; - struct page *page; + __be32 *p; int ret; - page = alloc_page(GFP_KERNEL); - if (!page) + ctxt = svc_rdma_send_ctxt_get(xprt); + if (!ctxt) return; - err_msgp = page_address(page); - p = err_msgp; + p = ctxt->sc_xprt_buf; *p++ = *rdma_argp; *p++ = *(rdma_argp + 1); *p++ = xprt->sc_fc_credits; *p++ = rdma_error; - if (status == -EPROTONOSUPPORT) { + switch (status) { + case -EPROTONOSUPPORT: *p++ = err_vers; *p++ = rpcrdma_version; *p++ = rpcrdma_version; - } else { + trace_svcrdma_err_vers(*rdma_argp); + break; + default: *p++ = err_chunk; + trace_svcrdma_err_chunk(*rdma_argp); } - length = (unsigned long)p - (unsigned long)err_msgp; - - /* Map transport header; no RPC message payload */ - ctxt = svc_rdma_get_context(xprt); - ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); - if (ret) { - dprintk("svcrdma: Error %d mapping send for protocol error\n", - ret); - return; - } + length = (unsigned long)p - (unsigned long)ctxt->sc_xprt_buf; + svc_rdma_sync_reply_hdr(xprt, ctxt, length); - ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); - if (ret) { - dprintk("svcrdma: Error %d posting send for protocol error\n", - ret); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - } + ctxt->sc_send_wr.opcode = IB_WR_SEND; + ret = svc_rdma_send(xprt, &ctxt->sc_send_wr); + if (ret) + svc_rdma_send_ctxt_put(xprt, ctxt); } /* By convention, backchannel calls arrive via rdma_msg type @@ -507,32 +704,28 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma_xprt = container_of(xprt, struct svcxprt_rdma, sc_xprt); - struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_recv_ctxt *ctxt; __be32 *p; int ret; spin_lock(&rdma_xprt->sc_rq_dto_lock); - if (!list_empty(&rdma_xprt->sc_read_complete_q)) { - ctxt = list_first_entry(&rdma_xprt->sc_read_complete_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q); + if (ctxt) { + list_del(&ctxt->rc_list); spin_unlock(&rdma_xprt->sc_rq_dto_lock); rdma_read_complete(rqstp, ctxt); goto complete; - } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { - ctxt = list_first_entry(&rdma_xprt->sc_rq_dto_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - } else { + } + ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q); + if (!ctxt) { /* No new incoming requests, terminate the loop */ clear_bit(XPT_DATA, &xprt->xpt_flags); spin_unlock(&rdma_xprt->sc_rq_dto_lock); return 0; } + list_del(&ctxt->rc_list); spin_unlock(&rdma_xprt->sc_rq_dto_lock); - dprintk("svcrdma: recvfrom: ctxt=%p on xprt=%p, rqstp=%p\n", - ctxt, rdma_xprt, rqstp); atomic_inc(&rdma_stat_recv); svc_rdma_build_arg_xdr(rqstp, ctxt); @@ -548,7 +741,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) if (svc_rdma_is_backchannel_reply(xprt, p)) { ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, p, &rqstp->rq_arg); - svc_rdma_put_context(ctxt, 0); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return ret; } @@ -557,9 +750,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) goto out_readchunk; complete: - svc_rdma_put_context(ctxt, 0); - dprintk("svcrdma: recvfrom: xprt=%p, rqstp=%p, rq_arg.len=%u\n", - rdma_xprt, rqstp, rqstp->rq_arg.len); + rqstp->rq_xprt_ctxt = ctxt; rqstp->rq_prot = IPPROTO_MAX; svc_xprt_copy_addrs(rqstp, xprt); return rqstp->rq_arg.len; @@ -572,16 +763,16 @@ out_readchunk: out_err: svc_rdma_send_error(rdma_xprt, p, ret); - svc_rdma_put_context(ctxt, 0); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return 0; out_postfail: if (ret == -EINVAL) svc_rdma_send_error(rdma_xprt, p, ret); - svc_rdma_put_context(ctxt, 1); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return ret; out_drop: - svc_rdma_put_context(ctxt, 1); + svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); return 0; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c index 12b9a7e0b6d2..ce3ea8419704 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_rw.c +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -1,15 +1,18 @@ // SPDX-License-Identifier: GPL-2.0 /* - * Copyright (c) 2016 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. */ +#include <rdma/rw.h> + #include <linux/sunrpc/rpc_rdma.h> #include <linux/sunrpc/svc_rdma.h> #include <linux/sunrpc/debug.h> -#include <rdma/rw.h> +#include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #define RPCDBG_FACILITY RPCDBG_SVCXPRT @@ -205,6 +208,8 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) struct svc_rdma_write_info *info = container_of(cc, struct svc_rdma_write_info, wi_cc); + trace_svcrdma_wc_write(wc); + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); wake_up(&rdma->sc_send_wait); @@ -222,7 +227,7 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) /* State for pulling a Read chunk. */ struct svc_rdma_read_info { - struct svc_rdma_op_ctxt *ri_readctxt; + struct svc_rdma_recv_ctxt *ri_readctxt; unsigned int ri_position; unsigned int ri_pageno; unsigned int ri_pageoff; @@ -266,6 +271,8 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) struct svc_rdma_read_info *info = container_of(cc, struct svc_rdma_read_info, ri_cc); + trace_svcrdma_wc_read(wc); + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); wake_up(&rdma->sc_send_wait); @@ -275,10 +282,10 @@ static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) pr_err("svcrdma: read ctx: %s (%u/0x%x)\n", ib_wc_status_msg(wc->status), wc->status, wc->vendor_err); - svc_rdma_put_context(info->ri_readctxt, 1); + svc_rdma_recv_ctxt_put(rdma, info->ri_readctxt); } else { spin_lock(&rdma->sc_rq_dto_lock); - list_add_tail(&info->ri_readctxt->list, + list_add_tail(&info->ri_readctxt->rc_list, &rdma->sc_read_complete_q); spin_unlock(&rdma->sc_rq_dto_lock); @@ -323,18 +330,20 @@ static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) if (atomic_sub_return(cc->cc_sqecount, &rdma->sc_sq_avail) > 0) { ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); + trace_svcrdma_post_rw(&cc->cc_cqe, + cc->cc_sqecount, ret); if (ret) break; return 0; } - atomic_inc(&rdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma); atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); wait_event(rdma->sc_send_wait, atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); + trace_svcrdma_sq_retry(rdma); } while (1); - pr_err("svcrdma: ib_post_send failed (%d)\n", ret); set_bit(XPT_CLOSE, &xprt->xpt_flags); /* If even one was posted, there will be a completion. */ @@ -437,6 +446,7 @@ svc_rdma_build_writes(struct svc_rdma_write_info *info, if (ret < 0) goto out_initerr; + trace_svcrdma_encode_wseg(seg_handle, write_len, seg_offset); list_add(&ctxt->rw_list, &cc->cc_rwctxts); cc->cc_sqecount += ret; if (write_len == seg_length - info->wi_seg_off) { @@ -462,7 +472,7 @@ out_noctx: out_initerr: svc_rdma_put_rw_ctxt(rdma, ctxt); - pr_err("svcrdma: failed to map pagelist (%d)\n", ret); + trace_svcrdma_dma_map_rwctx(rdma, ret); return -EIO; } @@ -526,6 +536,8 @@ int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); if (ret < 0) goto out_err; + + trace_svcrdma_encode_write(xdr->page_len); return xdr->page_len; out_err: @@ -582,6 +594,8 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); if (ret < 0) goto out_err; + + trace_svcrdma_encode_reply(consumed); return consumed; out_err: @@ -593,7 +607,7 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, struct svc_rqst *rqstp, u32 rkey, u32 len, u64 offset) { - struct svc_rdma_op_ctxt *head = info->ri_readctxt; + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; struct svc_rdma_rw_ctxt *ctxt; unsigned int sge_no, seg_len; @@ -606,18 +620,15 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, goto out_noctx; ctxt->rw_nents = sge_no; - dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n", - len, offset, rkey, sge_no); - sg = ctxt->rw_sg_table.sgl; for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { seg_len = min_t(unsigned int, len, PAGE_SIZE - info->ri_pageoff); - head->arg.pages[info->ri_pageno] = + head->rc_arg.pages[info->ri_pageno] = rqstp->rq_pages[info->ri_pageno]; if (!info->ri_pageoff) - head->count++; + head->rc_page_count++; sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], seg_len, info->ri_pageoff); @@ -656,8 +667,8 @@ out_overrun: return -EINVAL; out_initerr: + trace_svcrdma_dma_map_rwctx(cc->cc_rdma, ret); svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt); - pr_err("svcrdma: failed to map pagelist (%d)\n", ret); return -EIO; } @@ -686,6 +697,7 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, if (ret < 0) break; + trace_svcrdma_encode_rseg(rs_handle, rs_length, rs_offset); info->ri_chunklen += rs_length; } @@ -693,9 +705,9 @@ static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, } /* Construct RDMA Reads to pull over a normal Read chunk. The chunk - * data lands in the page list of head->arg.pages. + * data lands in the page list of head->rc_arg.pages. * - * Currently NFSD does not look at the head->arg.tail[0] iovec. + * Currently NFSD does not look at the head->rc_arg.tail[0] iovec. * Therefore, XDR round-up of the Read chunk and trailing * inline content must both be added at the end of the pagelist. */ @@ -703,29 +715,27 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, struct svc_rdma_read_info *info, __be32 *p) { - struct svc_rdma_op_ctxt *head = info->ri_readctxt; + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; int ret; - dprintk("svcrdma: Reading Read chunk at position %u\n", - info->ri_position); - - info->ri_pageno = head->hdr_count; - info->ri_pageoff = 0; - ret = svc_rdma_build_read_chunk(rqstp, info, p); if (ret < 0) goto out; + trace_svcrdma_encode_read(info->ri_chunklen, info->ri_position); + + head->rc_hdr_count = 0; + /* Split the Receive buffer between the head and tail * buffers at Read chunk's position. XDR roundup of the * chunk is not included in either the pagelist or in * the tail. */ - head->arg.tail[0].iov_base = - head->arg.head[0].iov_base + info->ri_position; - head->arg.tail[0].iov_len = - head->arg.head[0].iov_len - info->ri_position; - head->arg.head[0].iov_len = info->ri_position; + head->rc_arg.tail[0].iov_base = + head->rc_arg.head[0].iov_base + info->ri_position; + head->rc_arg.tail[0].iov_len = + head->rc_arg.head[0].iov_len - info->ri_position; + head->rc_arg.head[0].iov_len = info->ri_position; /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). * @@ -738,9 +748,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, */ info->ri_chunklen = XDR_QUADLEN(info->ri_chunklen) << 2; - head->arg.page_len = info->ri_chunklen; - head->arg.len += info->ri_chunklen; - head->arg.buflen += info->ri_chunklen; + head->rc_arg.page_len = info->ri_chunklen; + head->rc_arg.len += info->ri_chunklen; + head->rc_arg.buflen += info->ri_chunklen; out: return ret; @@ -749,7 +759,7 @@ out: /* Construct RDMA Reads to pull over a Position Zero Read chunk. * The start of the data lands in the first page just after * the Transport header, and the rest lands in the page list of - * head->arg.pages. + * head->rc_arg.pages. * * Assumptions: * - A PZRC has an XDR-aligned length (no implicit round-up). @@ -761,35 +771,25 @@ static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, struct svc_rdma_read_info *info, __be32 *p) { - struct svc_rdma_op_ctxt *head = info->ri_readctxt; + struct svc_rdma_recv_ctxt *head = info->ri_readctxt; int ret; - dprintk("svcrdma: Reading Position Zero Read chunk\n"); - - info->ri_pageno = head->hdr_count - 1; - info->ri_pageoff = offset_in_page(head->byte_len); - ret = svc_rdma_build_read_chunk(rqstp, info, p); if (ret < 0) goto out; - head->arg.len += info->ri_chunklen; - head->arg.buflen += info->ri_chunklen; + trace_svcrdma_encode_pzr(info->ri_chunklen); - if (head->arg.buflen <= head->sge[0].length) { - /* Transport header and RPC message fit entirely - * in page where head iovec resides. - */ - head->arg.head[0].iov_len = info->ri_chunklen; - } else { - /* Transport header and part of RPC message reside - * in the head iovec's page. - */ - head->arg.head[0].iov_len = - head->sge[0].length - head->byte_len; - head->arg.page_len = - info->ri_chunklen - head->arg.head[0].iov_len; - } + head->rc_arg.len += info->ri_chunklen; + head->rc_arg.buflen += info->ri_chunklen; + + head->rc_hdr_count = 1; + head->rc_arg.head[0].iov_base = page_address(head->rc_pages[0]); + head->rc_arg.head[0].iov_len = min_t(size_t, PAGE_SIZE, + info->ri_chunklen); + + head->rc_arg.page_len = info->ri_chunklen - + head->rc_arg.head[0].iov_len; out: return ret; @@ -813,29 +813,30 @@ out: * - All Read segments in @p have the same Position value. */ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head, __be32 *p) + struct svc_rdma_recv_ctxt *head, __be32 *p) { struct svc_rdma_read_info *info; struct page **page; int ret; /* The request (with page list) is constructed in - * head->arg. Pages involved with RDMA Read I/O are + * head->rc_arg. Pages involved with RDMA Read I/O are * transferred there. */ - head->hdr_count = head->count; - head->arg.head[0] = rqstp->rq_arg.head[0]; - head->arg.tail[0] = rqstp->rq_arg.tail[0]; - head->arg.pages = head->pages; - head->arg.page_base = 0; - head->arg.page_len = 0; - head->arg.len = rqstp->rq_arg.len; - head->arg.buflen = rqstp->rq_arg.buflen; + head->rc_arg.head[0] = rqstp->rq_arg.head[0]; + head->rc_arg.tail[0] = rqstp->rq_arg.tail[0]; + head->rc_arg.pages = head->rc_pages; + head->rc_arg.page_base = 0; + head->rc_arg.page_len = 0; + head->rc_arg.len = rqstp->rq_arg.len; + head->rc_arg.buflen = rqstp->rq_arg.buflen; info = svc_rdma_read_info_alloc(rdma); if (!info) return -ENOMEM; info->ri_readctxt = head; + info->ri_pageno = 0; + info->ri_pageoff = 0; info->ri_position = be32_to_cpup(p + 1); if (info->ri_position) @@ -856,7 +857,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, out: /* Read sink pages have been moved from rqstp->rq_pages to - * head->arg.pages. Force svc_recv to refill those slots + * head->rc_arg.pages. Force svc_recv to refill those slots * in rq_pages. */ for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++) diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 649441d5087d..4a3efaea277c 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,5 +1,6 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* - * Copyright (c) 2016 Oracle. All rights reserved. + * Copyright (c) 2016-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * @@ -74,11 +75,11 @@ * DMA-unmap the pages under I/O for that Write segment. The Write * completion handler does not release any pages. * - * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * When the Send WR is constructed, it also gets its own svc_rdma_send_ctxt. * The ownership of all of the Reply's pages are transferred into that * ctxt, the Send WR is posted, and sendto returns. * - * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * The svc_rdma_send_ctxt is presented when the Send WR completes. The * Send completion handler finally releases the Reply's pages. * * This mechanism also assumes that completions on the transport's Send @@ -98,16 +99,230 @@ * where two different Write segments send portions of the same page. */ -#include <linux/sunrpc/debug.h> -#include <linux/sunrpc/rpc_rdma.h> #include <linux/spinlock.h> #include <asm/unaligned.h> + #include <rdma/ib_verbs.h> #include <rdma/rdma_cm.h> + +#include <linux/sunrpc/debug.h> +#include <linux/sunrpc/rpc_rdma.h> #include <linux/sunrpc/svc_rdma.h> +#include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> + #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc); + +static inline struct svc_rdma_send_ctxt * +svc_rdma_next_send_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_send_ctxt, + sc_list); +} + +static struct svc_rdma_send_ctxt * +svc_rdma_send_ctxt_alloc(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + dma_addr_t addr; + void *buffer; + size_t size; + int i; + + size = sizeof(*ctxt); + size += rdma->sc_max_send_sges * sizeof(struct ib_sge); + ctxt = kmalloc(size, GFP_KERNEL); + if (!ctxt) + goto fail0; + buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL); + if (!buffer) + goto fail1; + addr = ib_dma_map_single(rdma->sc_pd->device, buffer, + rdma->sc_max_req_size, DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdma->sc_pd->device, addr)) + goto fail2; + + ctxt->sc_send_wr.next = NULL; + ctxt->sc_send_wr.wr_cqe = &ctxt->sc_cqe; + ctxt->sc_send_wr.sg_list = ctxt->sc_sges; + ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED; + ctxt->sc_cqe.done = svc_rdma_wc_send; + ctxt->sc_xprt_buf = buffer; + ctxt->sc_sges[0].addr = addr; + + for (i = 0; i < rdma->sc_max_send_sges; i++) + ctxt->sc_sges[i].lkey = rdma->sc_pd->local_dma_lkey; + return ctxt; + +fail2: + kfree(buffer); +fail1: + kfree(ctxt); +fail0: + return NULL; +} + +/** + * svc_rdma_send_ctxts_destroy - Release all send_ctxt's for an xprt + * @rdma: svcxprt_rdma being torn down + * + */ +void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts))) { + list_del(&ctxt->sc_list); + ib_dma_unmap_single(rdma->sc_pd->device, + ctxt->sc_sges[0].addr, + rdma->sc_max_req_size, + DMA_TO_DEVICE); + kfree(ctxt->sc_xprt_buf); + kfree(ctxt); + } +} + +/** + * svc_rdma_send_ctxt_get - Get a free send_ctxt + * @rdma: controlling svcxprt_rdma + * + * Returns a ready-to-use send_ctxt, or NULL if none are + * available and a fresh one cannot be allocated. + */ +struct svc_rdma_send_ctxt *svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_send_ctxt *ctxt; + + spin_lock(&rdma->sc_send_lock); + ctxt = svc_rdma_next_send_ctxt(&rdma->sc_send_ctxts); + if (!ctxt) + goto out_empty; + list_del(&ctxt->sc_list); + spin_unlock(&rdma->sc_send_lock); + +out: + ctxt->sc_send_wr.num_sge = 0; + ctxt->sc_cur_sge_no = 0; + ctxt->sc_page_count = 0; + return ctxt; + +out_empty: + spin_unlock(&rdma->sc_send_lock); + ctxt = svc_rdma_send_ctxt_alloc(rdma); + if (!ctxt) + return NULL; + goto out; +} + +/** + * svc_rdma_send_ctxt_put - Return send_ctxt to free list + * @rdma: controlling svcxprt_rdma + * @ctxt: object to return to the free list + * + * Pages left in sc_pages are DMA unmapped and released. + */ +void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt) +{ + struct ib_device *device = rdma->sc_cm_id->device; + unsigned int i; + + /* The first SGE contains the transport header, which + * remains mapped until @ctxt is destroyed. + */ + for (i = 1; i < ctxt->sc_send_wr.num_sge; i++) + ib_dma_unmap_page(device, + ctxt->sc_sges[i].addr, + ctxt->sc_sges[i].length, + DMA_TO_DEVICE); + + for (i = 0; i < ctxt->sc_page_count; ++i) + put_page(ctxt->sc_pages[i]); + + spin_lock(&rdma->sc_send_lock); + list_add(&ctxt->sc_list, &rdma->sc_send_ctxts); + spin_unlock(&rdma->sc_send_lock); +} + +/** + * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: Completion Queue context + * @wc: Work Completion object + * + * NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that + * the Send completion handler could be running. + */ +static void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) +{ + struct svcxprt_rdma *rdma = cq->cq_context; + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_send_ctxt *ctxt; + + trace_svcrdma_wc_send(wc); + + atomic_inc(&rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + ctxt = container_of(cqe, struct svc_rdma_send_ctxt, sc_cqe); + svc_rdma_send_ctxt_put(rdma, ctxt); + + if (unlikely(wc->status != IB_WC_SUCCESS)) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_enqueue(&rdma->sc_xprt); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + } + + svc_xprt_put(&rdma->sc_xprt); +} + +/** + * svc_rdma_send - Post a single Send WR + * @rdma: transport on which to post the WR + * @wr: prepared Send WR to post + * + * Returns zero the Send WR was posted successfully. Otherwise, a + * negative errno is returned. + */ +int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr) +{ + struct ib_send_wr *bad_wr; + int ret; + + might_sleep(); + + /* If the SQ is full, wait until an SQ entry is available */ + while (1) { + if ((atomic_dec_return(&rdma->sc_sq_avail) < 0)) { + atomic_inc(&rdma_stat_sq_starve); + trace_svcrdma_sq_full(rdma); + atomic_inc(&rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > 1); + if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) + return -ENOTCONN; + trace_svcrdma_sq_retry(rdma); + continue; + } + + svc_xprt_get(&rdma->sc_xprt); + ret = ib_post_send(rdma->sc_qp, wr, &bad_wr); + trace_svcrdma_post_send(wr, ret); + if (ret) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + svc_xprt_put(&rdma->sc_xprt); + wake_up(&rdma->sc_send_wait); + } + break; + } + return ret; +} + static u32 xdr_padsize(u32 len) { return (len & 3) ? (4 - (len & 3)) : 0; @@ -296,41 +511,10 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, return be32_to_cpup(p); } -/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() - * is used during completion to DMA-unmap this memory, and - * it uses ib_dma_unmap_page() exclusively. - */ -static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - unsigned int sge_no, - unsigned char *base, - unsigned int len) -{ - unsigned long offset = (unsigned long)base & ~PAGE_MASK; - struct ib_device *dev = rdma->sc_cm_id->device; - dma_addr_t dma_addr; - - dma_addr = ib_dma_map_page(dev, virt_to_page(base), - offset, len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(dev, dma_addr)) - goto out_maperr; - - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - svc_rdma_count_mappings(rdma, ctxt); - return 0; - -out_maperr: - pr_err("svcrdma: failed to map buffer\n"); - return -EIO; -} - static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - unsigned int sge_no, + struct svc_rdma_send_ctxt *ctxt, struct page *page, - unsigned int offset, + unsigned long offset, unsigned int len) { struct ib_device *dev = rdma->sc_cm_id->device; @@ -340,58 +524,71 @@ static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, if (ib_dma_mapping_error(dev, dma_addr)) goto out_maperr; - ctxt->sge[sge_no].addr = dma_addr; - ctxt->sge[sge_no].length = len; - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - svc_rdma_count_mappings(rdma, ctxt); + ctxt->sc_sges[ctxt->sc_cur_sge_no].addr = dma_addr; + ctxt->sc_sges[ctxt->sc_cur_sge_no].length = len; + ctxt->sc_send_wr.num_sge++; return 0; out_maperr: - pr_err("svcrdma: failed to map page\n"); + trace_svcrdma_dma_map_page(rdma, page); return -EIO; } +/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() + * handles DMA-unmap and it uses ib_dma_unmap_page() exclusively. + */ +static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt, + unsigned char *base, + unsigned int len) +{ + return svc_rdma_dma_map_page(rdma, ctxt, virt_to_page(base), + offset_in_page(base), len); +} + /** - * svc_rdma_map_reply_hdr - DMA map the transport header buffer + * svc_rdma_sync_reply_hdr - DMA sync the transport header buffer * @rdma: controlling transport - * @ctxt: op_ctxt for the Send WR - * @rdma_resp: buffer containing transport header + * @ctxt: send_ctxt for the Send WR * @len: length of transport header * - * Returns: - * %0 if the header is DMA mapped, - * %-EIO if DMA mapping failed. */ -int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - __be32 *rdma_resp, - unsigned int len) +void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt, + unsigned int len) { - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = virt_to_page(rdma_resp); - ctxt->count = 1; - return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); + ctxt->sc_sges[0].length = len; + ctxt->sc_send_wr.num_sge++; + ib_dma_sync_single_for_device(rdma->sc_pd->device, + ctxt->sc_sges[0].addr, len, + DMA_TO_DEVICE); } -/* Load the xdr_buf into the ctxt's sge array, and DMA map each +/* svc_rdma_map_reply_msg - Map the buffer holding RPC message + * @rdma: controlling transport + * @ctxt: send_ctxt for the Send WR + * @xdr: prepared xdr_buf containing RPC message + * @wr_lst: pointer to Call header's Write list, or NULL + * + * Load the xdr_buf into the ctxt's sge array, and DMA map each * element as it is added. * - * Returns the number of sge elements loaded on success, or - * a negative errno on failure. + * Returns zero on success, or a negative errno on failure. */ -static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, - struct xdr_buf *xdr, __be32 *wr_lst) +int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_send_ctxt *ctxt, + struct xdr_buf *xdr, __be32 *wr_lst) { - unsigned int len, sge_no, remaining, page_off; + unsigned int len, remaining; + unsigned long page_off; struct page **ppages; unsigned char *base; u32 xdr_pad; int ret; - sge_no = 1; - - ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, + if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges) + return -EIO; + ret = svc_rdma_dma_map_buf(rdma, ctxt, xdr->head[0].iov_base, xdr->head[0].iov_len); if (ret < 0) @@ -421,8 +618,10 @@ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, while (remaining) { len = min_t(u32, PAGE_SIZE - page_off, remaining); - ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++, - *ppages++, page_off, len); + if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges) + return -EIO; + ret = svc_rdma_dma_map_page(rdma, ctxt, *ppages++, + page_off, len); if (ret < 0) return ret; @@ -434,12 +633,14 @@ static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, len = xdr->tail[0].iov_len; tail: if (len) { - ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len); + if (++ctxt->sc_cur_sge_no >= rdma->sc_max_send_sges) + return -EIO; + ret = svc_rdma_dma_map_buf(rdma, ctxt, base, len); if (ret < 0) return ret; } - return sge_no - 1; + return 0; } /* The svc_rqst and all resources it owns are released as soon as @@ -447,62 +648,25 @@ tail: * so they are released by the Send completion handler. */ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *ctxt) + struct svc_rdma_send_ctxt *ctxt) { int i, pages = rqstp->rq_next_page - rqstp->rq_respages; - ctxt->count += pages; + ctxt->sc_page_count += pages; for (i = 0; i < pages; i++) { - ctxt->pages[i + 1] = rqstp->rq_respages[i]; + ctxt->sc_pages[i] = rqstp->rq_respages[i]; rqstp->rq_respages[i] = NULL; } rqstp->rq_next_page = rqstp->rq_respages + 1; } -/** - * svc_rdma_post_send_wr - Set up and post one Send Work Request - * @rdma: controlling transport - * @ctxt: op_ctxt for transmitting the Send WR - * @num_sge: number of SGEs to send - * @inv_rkey: R_key argument to Send With Invalidate, or zero - * - * Returns: - * %0 if the Send* was posted successfully, - * %-ENOTCONN if the connection was lost or dropped, - * %-EINVAL if there was a problem with the Send we built, - * %-ENOMEM if ib_post_send failed. - */ -int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, - struct svc_rdma_op_ctxt *ctxt, int num_sge, - u32 inv_rkey) -{ - struct ib_send_wr *send_wr = &ctxt->send_wr; - - dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge); - - send_wr->next = NULL; - ctxt->cqe.done = svc_rdma_wc_send; - send_wr->wr_cqe = &ctxt->cqe; - send_wr->sg_list = ctxt->sge; - send_wr->num_sge = num_sge; - send_wr->send_flags = IB_SEND_SIGNALED; - if (inv_rkey) { - send_wr->opcode = IB_WR_SEND_WITH_INV; - send_wr->ex.invalidate_rkey = inv_rkey; - } else { - send_wr->opcode = IB_WR_SEND; - } - - return svc_rdma_send(rdma, send_wr); -} - /* Prepare the portion of the RPC Reply that will be transmitted * via RDMA Send. The RPC-over-RDMA transport header is prepared - * in sge[0], and the RPC xdr_buf is prepared in following sges. + * in sc_sges[0], and the RPC xdr_buf is prepared in following sges. * * Depending on whether a Write list or Reply chunk is present, * the server may send all, a portion of, or none of the xdr_buf. - * In the latter case, only the transport header (sge[0]) is + * In the latter case, only the transport header (sc_sges[0]) is * transmitted. * * RDMA Send is the last step of transmitting an RPC reply. Pages @@ -515,49 +679,32 @@ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, * - The Reply's transport header will never be larger than a page. */ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, - __be32 *rdma_argp, __be32 *rdma_resp, + struct svc_rdma_send_ctxt *ctxt, + __be32 *rdma_argp, struct svc_rqst *rqstp, __be32 *wr_lst, __be32 *rp_ch) { - struct svc_rdma_op_ctxt *ctxt; - u32 inv_rkey; int ret; - dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n", - (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"), - rqstp->rq_res.head[0].iov_len, - rqstp->rq_res.page_len, - rqstp->rq_res.tail[0].iov_len); - - ctxt = svc_rdma_get_context(rdma); - - ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, - svc_rdma_reply_hdr_len(rdma_resp)); - if (ret < 0) - goto err; - if (!rp_ch) { ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqstp->rq_res, wr_lst); if (ret < 0) - goto err; + return ret; } svc_rdma_save_io_pages(rqstp, ctxt); - inv_rkey = 0; - if (rdma->sc_snd_w_inv) - inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); - ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey); - if (ret) - goto err; - - return 0; - -err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - return ret; + ctxt->sc_send_wr.opcode = IB_WR_SEND; + if (rdma->sc_snd_w_inv) { + ctxt->sc_send_wr.ex.invalidate_rkey = + svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); + if (ctxt->sc_send_wr.ex.invalidate_rkey) + ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV; + } + dprintk("svcrdma: posting Send WR with %u sge(s)\n", + ctxt->sc_send_wr.num_sge); + return svc_rdma_send(rdma, &ctxt->sc_send_wr); } /* Given the client-provided Write and Reply chunks, the server was not @@ -568,38 +715,29 @@ err: * Remote Invalidation is skipped for simplicity. */ static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, - __be32 *rdma_resp, struct svc_rqst *rqstp) + struct svc_rdma_send_ctxt *ctxt, + struct svc_rqst *rqstp) { - struct svc_rdma_op_ctxt *ctxt; __be32 *p; int ret; - ctxt = svc_rdma_get_context(rdma); - - /* Replace the original transport header with an - * RDMA_ERROR response. XID etc are preserved. - */ - p = rdma_resp + 3; + p = ctxt->sc_xprt_buf; + trace_svcrdma_err_chunk(*p); + p += 3; *p++ = rdma_error; *p = err_chunk; - - ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20); - if (ret < 0) - goto err; + svc_rdma_sync_reply_hdr(rdma, ctxt, RPCRDMA_HDRLEN_ERR); svc_rdma_save_io_pages(rqstp, ctxt); - ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0); - if (ret) - goto err; + ctxt->sc_send_wr.opcode = IB_WR_SEND; + ret = svc_rdma_send(rdma, &ctxt->sc_send_wr); + if (ret) { + svc_rdma_send_ctxt_put(rdma, ctxt); + return ret; + } return 0; - -err: - pr_err("svcrdma: failed to post Send WR (%d)\n", ret); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - return ret; } void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) @@ -623,20 +761,15 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); + struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt; __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; struct xdr_buf *xdr = &rqstp->rq_res; - struct page *res_page; + struct svc_rdma_send_ctxt *sctxt; int ret; - /* Find the call's chunk lists to decide how to send the reply. - * Receive places the Call's xprt header at the start of page 0. - */ - rdma_argp = page_address(rqstp->rq_pages[0]); + rdma_argp = rctxt->rc_recv_buf; svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); - dprintk("svcrdma: preparing response for XID 0x%08x\n", - be32_to_cpup(rdma_argp)); - /* Create the RDMA response header. xprt->xpt_mutex, * acquired in svc_send(), serializes RPC replies. The * code path below that inserts the credit grant value @@ -644,10 +777,10 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) * critical section. */ ret = -ENOMEM; - res_page = alloc_page(GFP_KERNEL); - if (!res_page) + sctxt = svc_rdma_send_ctxt_get(rdma); + if (!sctxt) goto err0; - rdma_resp = page_address(res_page); + rdma_resp = sctxt->sc_xprt_buf; p = rdma_resp; *p++ = *rdma_argp; @@ -674,26 +807,33 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); } - ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp, + svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); + ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp, wr_lst, rp_ch); if (ret < 0) - goto err0; - return 0; + goto err1; + ret = 0; + +out: + rqstp->rq_xprt_ctxt = NULL; + svc_rdma_recv_ctxt_put(rdma, rctxt); + return ret; err2: if (ret != -E2BIG && ret != -EINVAL) goto err1; - ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp); + ret = svc_rdma_send_error_msg(rdma, sctxt, rqstp); if (ret < 0) - goto err0; - return 0; + goto err1; + ret = 0; + goto out; err1: - put_page(res_page); + svc_rdma_send_ctxt_put(rdma, sctxt); err0: - pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", - ret); + trace_svcrdma_send_failed(rqstp, ret); set_bit(XPT_CLOSE, &xprt->xpt_flags); - return -ENOTCONN; + ret = -ENOTCONN; + goto out; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 96cc8f6597d3..e9535a66bab0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1,4 +1,6 @@ +// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause /* + * Copyright (c) 2015-2018 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. * @@ -40,26 +42,30 @@ * Author: Tom Tucker <tom@opengridcomputing.com> */ -#include <linux/sunrpc/svc_xprt.h> -#include <linux/sunrpc/addr.h> -#include <linux/sunrpc/debug.h> -#include <linux/sunrpc/rpc_rdma.h> #include <linux/interrupt.h> #include <linux/sched.h> #include <linux/slab.h> #include <linux/spinlock.h> #include <linux/workqueue.h> +#include <linux/export.h> + #include <rdma/ib_verbs.h> #include <rdma/rdma_cm.h> #include <rdma/rw.h> + +#include <linux/sunrpc/addr.h> +#include <linux/sunrpc/debug.h> +#include <linux/sunrpc/rpc_rdma.h> +#include <linux/sunrpc/svc_xprt.h> #include <linux/sunrpc/svc_rdma.h> -#include <linux/export.h> + #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #define RPCDBG_FACILITY RPCDBG_SVCXPRT -static int svc_rdma_post_recv(struct svcxprt_rdma *xprt); -static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); +static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, + struct net *net); static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, struct net *net, struct sockaddr *sa, int salen, @@ -123,7 +129,7 @@ static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, struct svcxprt_rdma *cma_xprt; struct svc_xprt *xprt; - cma_xprt = rdma_create_xprt(serv, 0); + cma_xprt = svc_rdma_create_xprt(serv, net); if (!cma_xprt) return ERR_PTR(-ENOMEM); xprt = &cma_xprt->sc_xprt; @@ -152,133 +158,20 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt) } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, - gfp_t flags) -{ - struct svc_rdma_op_ctxt *ctxt; - - ctxt = kmalloc(sizeof(*ctxt), flags); - if (ctxt) { - ctxt->xprt = xprt; - INIT_LIST_HEAD(&ctxt->list); - } - return ctxt; -} - -static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) -{ - unsigned int i; - - /* Each RPC/RDMA credit can consume one Receive and - * one Send WQE at the same time. - */ - i = xprt->sc_sq_depth + xprt->sc_rq_depth; - - while (i--) { - struct svc_rdma_op_ctxt *ctxt; - - ctxt = alloc_ctxt(xprt, GFP_KERNEL); - if (!ctxt) { - dprintk("svcrdma: No memory for RDMA ctxt\n"); - return false; - } - list_add(&ctxt->list, &xprt->sc_ctxts); - } - return true; -} - -struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) -{ - struct svc_rdma_op_ctxt *ctxt = NULL; - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used++; - if (list_empty(&xprt->sc_ctxts)) - goto out_empty; - - ctxt = list_first_entry(&xprt->sc_ctxts, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - spin_unlock(&xprt->sc_ctxt_lock); - -out: - ctxt->count = 0; - ctxt->mapped_sges = 0; - return ctxt; - -out_empty: - /* Either pre-allocation missed the mark, or send - * queue accounting is broken. - */ - spin_unlock(&xprt->sc_ctxt_lock); - - ctxt = alloc_ctxt(xprt, GFP_NOIO); - if (ctxt) - goto out; - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used--; - spin_unlock(&xprt->sc_ctxt_lock); - WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); - return NULL; -} - -void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) -{ - struct svcxprt_rdma *xprt = ctxt->xprt; - struct ib_device *device = xprt->sc_cm_id->device; - unsigned int i; - - for (i = 0; i < ctxt->mapped_sges; i++) - ib_dma_unmap_page(device, - ctxt->sge[i].addr, - ctxt->sge[i].length, - ctxt->direction); - ctxt->mapped_sges = 0; -} - -void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) -{ - struct svcxprt_rdma *xprt = ctxt->xprt; - int i; - - if (free_pages) - for (i = 0; i < ctxt->count; i++) - put_page(ctxt->pages[i]); - - spin_lock(&xprt->sc_ctxt_lock); - xprt->sc_ctxt_used--; - list_add(&ctxt->list, &xprt->sc_ctxts); - spin_unlock(&xprt->sc_ctxt_lock); -} - -static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) -{ - while (!list_empty(&xprt->sc_ctxts)) { - struct svc_rdma_op_ctxt *ctxt; - - ctxt = list_first_entry(&xprt->sc_ctxts, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - kfree(ctxt); - } -} - /* QP event handler */ static void qp_event_handler(struct ib_event *event, void *context) { struct svc_xprt *xprt = context; + trace_svcrdma_qp_error(event, (struct sockaddr *)&xprt->xpt_remote); switch (event->event) { /* These are considered benign events */ case IB_EVENT_PATH_MIG: case IB_EVENT_COMM_EST: case IB_EVENT_SQ_DRAINED: case IB_EVENT_QP_LAST_WQE_REACHED: - dprintk("svcrdma: QP event %s (%d) received for QP=%p\n", - ib_event_msg(event->event), event->event, - event->element.qp); break; + /* These are considered fatal events */ case IB_EVENT_PATH_MIG_ERR: case IB_EVENT_QP_FATAL: @@ -286,111 +179,34 @@ static void qp_event_handler(struct ib_event *event, void *context) case IB_EVENT_QP_ACCESS_ERR: case IB_EVENT_DEVICE_FATAL: default: - dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, " - "closing transport\n", - ib_event_msg(event->event), event->event, - event->element.qp); set_bit(XPT_CLOSE, &xprt->xpt_flags); svc_xprt_enqueue(xprt); break; } } -/** - * svc_rdma_wc_receive - Invoked by RDMA provider for each polled Receive WC - * @cq: completion queue - * @wc: completed WR - * - */ -static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) -{ - struct svcxprt_rdma *xprt = cq->cq_context; - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - /* WARNING: Only wc->wr_cqe and wc->status are reliable */ - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - - if (wc->status != IB_WC_SUCCESS) - goto flushed; - - /* All wc fields are now known to be valid */ - ctxt->byte_len = wc->byte_len; - spin_lock(&xprt->sc_rq_dto_lock); - list_add_tail(&ctxt->list, &xprt->sc_rq_dto_q); - spin_unlock(&xprt->sc_rq_dto_lock); - - svc_rdma_post_recv(xprt); - - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); - if (test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) - goto out; - goto out_enqueue; - -flushed: - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("svcrdma: Recv: %s (%u/0x%x)\n", - ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_rdma_put_context(ctxt, 1); - -out_enqueue: - svc_xprt_enqueue(&xprt->sc_xprt); -out: - svc_xprt_put(&xprt->sc_xprt); -} - -/** - * svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC - * @cq: completion queue - * @wc: completed WR - * - */ -void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) -{ - struct svcxprt_rdma *xprt = cq->cq_context; - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - atomic_inc(&xprt->sc_sq_avail); - wake_up(&xprt->sc_send_wait); - - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - - if (unlikely(wc->status != IB_WC_SUCCESS)) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_xprt_enqueue(&xprt->sc_xprt); - if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("svcrdma: Send: %s (%u/0x%x)\n", - ib_wc_status_msg(wc->status), - wc->status, wc->vendor_err); - } - - svc_xprt_put(&xprt->sc_xprt); -} - -static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, - int listener) +static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, + struct net *net) { struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL); - if (!cma_xprt) + if (!cma_xprt) { + dprintk("svcrdma: failed to create new transport\n"); return NULL; - svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); + } + svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv); INIT_LIST_HEAD(&cma_xprt->sc_accept_q); INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); - INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); - spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_send_lock); + spin_lock_init(&cma_xprt->sc_recv_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); /* @@ -401,70 +217,9 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, */ set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags); - if (listener) { - strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener"); - set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); - } - return cma_xprt; } -static int -svc_rdma_post_recv(struct svcxprt_rdma *xprt) -{ - struct ib_recv_wr recv_wr, *bad_recv_wr; - struct svc_rdma_op_ctxt *ctxt; - struct page *page; - dma_addr_t pa; - int sge_no; - int buflen; - int ret; - - ctxt = svc_rdma_get_context(xprt); - buflen = 0; - ctxt->direction = DMA_FROM_DEVICE; - ctxt->cqe.done = svc_rdma_wc_receive; - for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) { - if (sge_no >= xprt->sc_max_sge) { - pr_err("svcrdma: Too many sges (%d)\n", sge_no); - goto err_put_ctxt; - } - page = alloc_page(GFP_KERNEL); - if (!page) - goto err_put_ctxt; - ctxt->pages[sge_no] = page; - pa = ib_dma_map_page(xprt->sc_cm_id->device, - page, 0, PAGE_SIZE, - DMA_FROM_DEVICE); - if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa)) - goto err_put_ctxt; - svc_rdma_count_mappings(xprt, ctxt); - ctxt->sge[sge_no].addr = pa; - ctxt->sge[sge_no].length = PAGE_SIZE; - ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; - ctxt->count = sge_no + 1; - buflen += PAGE_SIZE; - } - recv_wr.next = NULL; - recv_wr.sg_list = &ctxt->sge[0]; - recv_wr.num_sge = ctxt->count; - recv_wr.wr_cqe = &ctxt->cqe; - - svc_xprt_get(&xprt->sc_xprt); - ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); - if (ret) { - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - svc_xprt_put(&xprt->sc_xprt); - } - return ret; - - err_put_ctxt: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - return -ENOMEM; -} - static void svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt, struct rdma_conn_param *param) @@ -504,15 +259,12 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, struct sockaddr *sa; /* Create a new transport */ - newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); - if (!newxprt) { - dprintk("svcrdma: failed to create new transport\n"); + newxprt = svc_rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, + listen_xprt->sc_xprt.xpt_net); + if (!newxprt) return; - } newxprt->sc_cm_id = new_cma_id; new_cma_id->context = newxprt; - dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", - newxprt, newxprt->sc_cm_id, listen_xprt); svc_rdma_parse_connect_private(newxprt, param); /* Save client advertised inbound read limit for use later in accept. */ @@ -543,9 +295,11 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, static int rdma_listen_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event) { - struct svcxprt_rdma *xprt = cma_id->context; + struct sockaddr *sap = (struct sockaddr *)&cma_id->route.addr.src_addr; int ret = 0; + trace_svcrdma_cm_event(event, sap); + switch (event->event) { case RDMA_CM_EVENT_CONNECT_REQUEST: dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " @@ -553,23 +307,8 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id, rdma_event_msg(event->event), event->event); handle_connect_req(cma_id, &event->param.conn); break; - - case RDMA_CM_EVENT_ESTABLISHED: - /* Accept complete */ - dprintk("svcrdma: Connection completed on LISTEN xprt=%p, " - "cm_id=%p\n", xprt, cma_id); - break; - - case RDMA_CM_EVENT_DEVICE_REMOVAL: - dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n", - xprt, cma_id); - if (xprt) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - svc_xprt_enqueue(&xprt->sc_xprt); - } - break; - default: + /* NB: No device removal upcall for INADDR_ANY listeners */ dprintk("svcrdma: Unexpected event on listening endpoint %p, " "event = %s (%d)\n", cma_id, rdma_event_msg(event->event), event->event); @@ -582,9 +321,12 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id, static int rdma_cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event) { - struct svc_xprt *xprt = cma_id->context; - struct svcxprt_rdma *rdma = - container_of(xprt, struct svcxprt_rdma, sc_xprt); + struct sockaddr *sap = (struct sockaddr *)&cma_id->route.addr.dst_addr; + struct svcxprt_rdma *rdma = cma_id->context; + struct svc_xprt *xprt = &rdma->sc_xprt; + + trace_svcrdma_cm_event(event, sap); + switch (event->event) { case RDMA_CM_EVENT_ESTABLISHED: /* Accept complete */ @@ -597,21 +339,17 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id, case RDMA_CM_EVENT_DISCONNECTED: dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n", xprt, cma_id); - if (xprt) { - set_bit(XPT_CLOSE, &xprt->xpt_flags); - svc_xprt_enqueue(xprt); - svc_xprt_put(xprt); - } + set_bit(XPT_CLOSE, &xprt->xpt_flags); + svc_xprt_enqueue(xprt); + svc_xprt_put(xprt); break; case RDMA_CM_EVENT_DEVICE_REMOVAL: dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, " "event = %s (%d)\n", cma_id, xprt, rdma_event_msg(event->event), event->event); - if (xprt) { - set_bit(XPT_CLOSE, &xprt->xpt_flags); - svc_xprt_enqueue(xprt); - svc_xprt_put(xprt); - } + set_bit(XPT_CLOSE, &xprt->xpt_flags); + svc_xprt_enqueue(xprt); + svc_xprt_put(xprt); break; default: dprintk("svcrdma: Unexpected event on DTO endpoint %p, " @@ -634,16 +372,18 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, struct svcxprt_rdma *cma_xprt; int ret; - dprintk("svcrdma: Creating RDMA socket\n"); + dprintk("svcrdma: Creating RDMA listener\n"); if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); return ERR_PTR(-EAFNOSUPPORT); } - cma_xprt = rdma_create_xprt(serv, 1); + cma_xprt = svc_rdma_create_xprt(serv, net); if (!cma_xprt) return ERR_PTR(-ENOMEM); + set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); + strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener"); - listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, + listen_id = rdma_create_id(net, rdma_listen_handler, cma_xprt, RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(listen_id)) { ret = PTR_ERR(listen_id); @@ -708,9 +448,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) struct rdma_conn_param conn_param; struct rpcrdma_connect_private pmsg; struct ib_qp_init_attr qp_attr; + unsigned int ctxts, rq_depth; struct ib_device *dev; struct sockaddr *sap; - unsigned int i, ctxts; int ret = 0; listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); @@ -736,24 +476,28 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) /* Qualify the transport resource defaults with the * capabilities of this particular device */ - newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, - (size_t)RPCSVC_MAXPAGES); + newxprt->sc_max_send_sges = dev->attrs.max_sge; + /* transport hdr, head iovec, one page list entry, tail iovec */ + if (newxprt->sc_max_send_sges < 4) { + pr_err("svcrdma: too few Send SGEs available (%d)\n", + newxprt->sc_max_send_sges); + goto errout; + } newxprt->sc_max_req_size = svcrdma_max_req_size; newxprt->sc_max_requests = svcrdma_max_requests; newxprt->sc_max_bc_requests = svcrdma_max_bc_requests; - newxprt->sc_rq_depth = newxprt->sc_max_requests + - newxprt->sc_max_bc_requests; - if (newxprt->sc_rq_depth > dev->attrs.max_qp_wr) { + rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests; + if (rq_depth > dev->attrs.max_qp_wr) { pr_warn("svcrdma: reducing receive depth to %d\n", dev->attrs.max_qp_wr); - newxprt->sc_rq_depth = dev->attrs.max_qp_wr; - newxprt->sc_max_requests = newxprt->sc_rq_depth - 2; + rq_depth = dev->attrs.max_qp_wr; + newxprt->sc_max_requests = rq_depth - 2; newxprt->sc_max_bc_requests = 2; } newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests); ctxts = rdma_rw_mr_factor(dev, newxprt->sc_port_num, RPCSVC_MAXPAGES); ctxts *= newxprt->sc_max_requests; - newxprt->sc_sq_depth = newxprt->sc_rq_depth + ctxts; + newxprt->sc_sq_depth = rq_depth + ctxts; if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr) { pr_warn("svcrdma: reducing send depth to %d\n", dev->attrs.max_qp_wr); @@ -761,9 +505,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) } atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); - if (!svc_rdma_prealloc_ctxts(newxprt)) - goto errout; - newxprt->sc_pd = ib_alloc_pd(dev, 0); if (IS_ERR(newxprt->sc_pd)) { dprintk("svcrdma: error creating PD for connect request\n"); @@ -775,7 +516,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dprintk("svcrdma: error creating SQ CQ for connect request\n"); goto errout; } - newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, newxprt->sc_rq_depth, + newxprt->sc_rq_cq = ib_alloc_cq(dev, newxprt, rq_depth, 0, IB_POLL_WORKQUEUE); if (IS_ERR(newxprt->sc_rq_cq)) { dprintk("svcrdma: error creating RQ CQ for connect request\n"); @@ -788,9 +529,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) qp_attr.port_num = newxprt->sc_port_num; qp_attr.cap.max_rdma_ctxs = ctxts; qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts; - qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; - qp_attr.cap.max_send_sge = newxprt->sc_max_sge; - qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; + qp_attr.cap.max_recv_wr = rq_depth; + qp_attr.cap.max_send_sge = newxprt->sc_max_send_sges; + qp_attr.cap.max_recv_sge = 1; qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; qp_attr.qp_type = IB_QPT_RC; qp_attr.send_cq = newxprt->sc_sq_cq; @@ -815,14 +556,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) !rdma_ib_or_roce(dev, newxprt->sc_port_num)) goto errout; - /* Post receive buffers */ - for (i = 0; i < newxprt->sc_max_requests; i++) { - ret = svc_rdma_post_recv(newxprt); - if (ret) { - dprintk("svcrdma: failure posting receive buffers\n"); - goto errout; - } - } + if (!svc_rdma_post_recvs(newxprt)) + goto errout; /* Swap out the handler */ newxprt->sc_cm_id->event_handler = rdma_cma_handler; @@ -856,16 +591,18 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dprintk(" local address : %pIS:%u\n", sap, rpc_get_port(sap)); sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr; dprintk(" remote address : %pIS:%u\n", sap, rpc_get_port(sap)); - dprintk(" max_sge : %d\n", newxprt->sc_max_sge); + dprintk(" max_sge : %d\n", newxprt->sc_max_send_sges); dprintk(" sq_depth : %d\n", newxprt->sc_sq_depth); dprintk(" rdma_rw_ctxs : %d\n", ctxts); dprintk(" max_requests : %d\n", newxprt->sc_max_requests); dprintk(" ord : %d\n", conn_param.initiator_depth); + trace_svcrdma_xprt_accept(&newxprt->sc_xprt); return &newxprt->sc_xprt; errout: dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); + trace_svcrdma_xprt_fail(&newxprt->sc_xprt); /* Take a reference in case the DTO handler runs */ svc_xprt_get(&newxprt->sc_xprt); if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) @@ -896,7 +633,6 @@ static void svc_rdma_detach(struct svc_xprt *xprt) { struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); - dprintk("svc: svc_rdma_detach(%p)\n", xprt); /* Disconnect and flush posted WQE */ rdma_disconnect(rdma->sc_cm_id); @@ -908,7 +644,7 @@ static void __svc_rdma_free(struct work_struct *work) container_of(work, struct svcxprt_rdma, sc_work); struct svc_xprt *xprt = &rdma->sc_xprt; - dprintk("svcrdma: %s(%p)\n", __func__, rdma); + trace_svcrdma_xprt_free(xprt); if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) ib_drain_qp(rdma->sc_qp); @@ -918,25 +654,7 @@ static void __svc_rdma_free(struct work_struct *work) pr_err("svcrdma: sc_xprt still in use? (%d)\n", kref_read(&xprt->xpt_ref)); - while (!list_empty(&rdma->sc_read_complete_q)) { - struct svc_rdma_op_ctxt *ctxt; - ctxt = list_first_entry(&rdma->sc_read_complete_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - svc_rdma_put_context(ctxt, 1); - } - while (!list_empty(&rdma->sc_rq_dto_q)) { - struct svc_rdma_op_ctxt *ctxt; - ctxt = list_first_entry(&rdma->sc_rq_dto_q, - struct svc_rdma_op_ctxt, list); - list_del(&ctxt->list); - svc_rdma_put_context(ctxt, 1); - } - - /* Warn if we leaked a resource or under-referenced */ - if (rdma->sc_ctxt_used != 0) - pr_err("svcrdma: ctxt still in use? (%d)\n", - rdma->sc_ctxt_used); + svc_rdma_flush_recv_queues(rdma); /* Final put of backchannel client transport */ if (xprt->xpt_bc_xprt) { @@ -945,7 +663,8 @@ static void __svc_rdma_free(struct work_struct *work) } svc_rdma_destroy_rw_ctxts(rdma); - svc_rdma_destroy_ctxts(rdma); + svc_rdma_send_ctxts_destroy(rdma); + svc_rdma_recv_ctxts_destroy(rdma); /* Destroy the QP if present (not a listener) */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) @@ -998,51 +717,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp) static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) { } - -int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) -{ - struct ib_send_wr *bad_wr, *n_wr; - int wr_count; - int i; - int ret; - - if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) - return -ENOTCONN; - - wr_count = 1; - for (n_wr = wr->next; n_wr; n_wr = n_wr->next) - wr_count++; - - /* If the SQ is full, wait until an SQ entry is available */ - while (1) { - if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) { - atomic_inc(&rdma_stat_sq_starve); - - /* Wait until SQ WR available if SQ still full */ - atomic_add(wr_count, &xprt->sc_sq_avail); - wait_event(xprt->sc_send_wait, - atomic_read(&xprt->sc_sq_avail) > wr_count); - if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) - return -ENOTCONN; - continue; - } - /* Take a transport ref for each WR posted */ - for (i = 0; i < wr_count; i++) - svc_xprt_get(&xprt->sc_xprt); - - /* Bump used SQ WR count and post */ - ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); - if (ret) { - set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); - for (i = 0; i < wr_count; i ++) - svc_xprt_put(&xprt->sc_xprt); - dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret); - dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n", - atomic_read(&xprt->sc_sq_avail), - xprt->sc_sq_depth); - wake_up(&xprt->sc_send_wait); - } - break; - } - return ret; -} diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index cc1aad325496..caca977e3755 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -51,9 +51,13 @@ #include <linux/module.h> #include <linux/slab.h> #include <linux/seq_file.h> +#include <linux/smp.h> + #include <linux/sunrpc/addr.h> +#include <linux/sunrpc/svc_rdma.h> #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) # define RPCDBG_FACILITY RPCDBG_TRANS diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c345d365af88..7f913ece5038 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -59,6 +59,7 @@ #include <rdma/ib_cm.h> #include "xprt_rdma.h" +#include <trace/events/rpcrdma.h> /* * Globals/Macros diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index cb41b12a3bf8..f845b71793e2 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -675,5 +675,3 @@ void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); extern struct xprt_class xprt_rdma_bc; #endif /* _LINUX_SUNRPC_XPRT_RDMA_H */ - -#include <trace/events/rpcrdma.h> |