From 17f5f7f506aaca985b95df7ef7fc2ff49c36a8e9 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:05:36 -0400 Subject: svcrdma: Move send_wr to svc_rdma_op_ctxt Clean up: Move the ib_send_wr off the stack, and move common code to post a Send Work Request into a helper. This is a refactoring change only. Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 11 +---- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 64 ++++++++++++++++++------------ 2 files changed, 40 insertions(+), 35 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index ff1df40f0d26..f12f39c189c3 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -104,7 +104,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct xdr_buf *sndbuf = &rqst->rq_snd_buf; struct svc_rdma_op_ctxt *ctxt; struct svc_rdma_req_map *vec; - struct ib_send_wr send_wr; int ret; vec = svc_rdma_get_req_map(rdma); @@ -132,15 +131,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, } svc_rdma_count_mappings(rdma, ctxt); - memset(&send_wr, 0, sizeof(send_wr)); - ctxt->cqe.done = svc_rdma_wc_send; - send_wr.wr_cqe = &ctxt->cqe; - send_wr.sg_list = ctxt->sge; - send_wr.num_sge = 1; - send_wr.opcode = IB_WR_SEND; - send_wr.send_flags = IB_SEND_SIGNALED; - - ret = svc_rdma_send(rdma, &send_wr); + ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0); if (ret) { ret = -EIO; goto out_unmap; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 515221b16d09..f90b40d0932f 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -435,6 +435,43 @@ out_err: return -EIO; } +/** + * svc_rdma_post_send_wr - Set up and post one Send Work Request + * @rdma: controlling transport + * @ctxt: op_ctxt for transmitting the Send WR + * @num_sge: number of SGEs to send + * @inv_rkey: R_key argument to Send With Invalidate, or zero + * + * Returns: + * %0 if the Send* was posted successfully, + * %-ENOTCONN if the connection was lost or dropped, + * %-EINVAL if there was a problem with the Send we built, + * %-ENOMEM if ib_post_send failed. + */ +int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, int num_sge, + u32 inv_rkey) +{ + struct ib_send_wr *send_wr = &ctxt->send_wr; + + dprintk("svcrdma: posting Send WR with %u sge(s)\n", num_sge); + + send_wr->next = NULL; + ctxt->cqe.done = svc_rdma_wc_send; + send_wr->wr_cqe = &ctxt->cqe; + send_wr->sg_list = ctxt->sge; + send_wr->num_sge = num_sge; + send_wr->send_flags = IB_SEND_SIGNALED; + if (inv_rkey) { + send_wr->opcode = IB_WR_SEND_WITH_INV; + send_wr->ex.invalidate_rkey = inv_rkey; + } else { + send_wr->opcode = IB_WR_SEND; + } + + return svc_rdma_send(rdma, send_wr); +} + /* This function prepares the portion of the RPCRDMA message to be * sent in the RDMA_SEND. This function is called after data sent via * RDMA has already been transmitted. There are three cases: @@ -460,7 +497,6 @@ static int send_reply(struct svcxprt_rdma *rdma, u32 inv_rkey) { struct svc_rdma_op_ctxt *ctxt; - struct ib_send_wr send_wr; u32 xdr_off; int sge_no; int sge_bytes; @@ -524,19 +560,8 @@ static int send_reply(struct svcxprt_rdma *rdma, pr_err("svcrdma: Too many sges (%d)\n", sge_no); goto err; } - memset(&send_wr, 0, sizeof send_wr); - ctxt->cqe.done = svc_rdma_wc_send; - send_wr.wr_cqe = &ctxt->cqe; - send_wr.sg_list = ctxt->sge; - send_wr.num_sge = sge_no; - if (inv_rkey) { - send_wr.opcode = IB_WR_SEND_WITH_INV; - send_wr.ex.invalidate_rkey = inv_rkey; - } else - send_wr.opcode = IB_WR_SEND; - send_wr.send_flags = IB_SEND_SIGNALED; - ret = svc_rdma_send(rdma, &send_wr); + ret = svc_rdma_post_send_wr(rdma, ctxt, sge_no, inv_rkey); if (ret) goto err; @@ -652,7 +677,6 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, int status) { - struct ib_send_wr err_wr; struct page *p; struct svc_rdma_op_ctxt *ctxt; enum rpcrdma_errcode err; @@ -692,17 +716,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, } svc_rdma_count_mappings(xprt, ctxt); - /* Prepare SEND WR */ - memset(&err_wr, 0, sizeof(err_wr)); - ctxt->cqe.done = svc_rdma_wc_send; - err_wr.wr_cqe = &ctxt->cqe; - err_wr.sg_list = ctxt->sge; - err_wr.num_sge = 1; - err_wr.opcode = IB_WR_SEND; - err_wr.send_flags = IB_SEND_SIGNALED; - - /* Post It */ - ret = svc_rdma_send(xprt, &err_wr); + ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); if (ret) { dprintk("svcrdma: Error %d posting send for protocol error\n", ret); -- cgit v1.2.3 From 6e6092ca305ad785c605d7e313727aad96c228a5 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:05:44 -0400 Subject: svcrdma: Add svc_rdma_map_reply_hdr() Introduce a helper to DMA-map a reply's transport header before sending it. This will in part replace the map vector cache. Signed-off-by: Chuck Lever Reviewed-by: Christoph Hellwig Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 3 ++ net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 36 ++++++------------ net/sunrpc/xprtrdma/svc_rdma_sendto.c | 61 +++++++++++++++++++++++------- 3 files changed, 62 insertions(+), 38 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 287db5c179d8..002a46d1faa1 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -228,6 +228,9 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *, /* svc_rdma_sendto.c */ extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *, struct svc_rdma_req_map *, bool); +extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + __be32 *rdma_resp, unsigned int len); extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, struct svc_rdma_op_ctxt *ctxt, int num_sge, u32 inv_rkey); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index f12f39c189c3..0305b33d482f 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -101,50 +101,36 @@ out_notfound: static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) { - struct xdr_buf *sndbuf = &rqst->rq_snd_buf; struct svc_rdma_op_ctxt *ctxt; - struct svc_rdma_req_map *vec; int ret; - vec = svc_rdma_get_req_map(rdma); - ret = svc_rdma_map_xdr(rdma, sndbuf, vec, false); - if (ret) + ctxt = svc_rdma_get_context(rdma); + + /* rpcrdma_bc_send_request builds the transport header and + * the backchannel RPC message in the same buffer. Thus only + * one SGE is needed to send both. + */ + ret = svc_rdma_map_reply_hdr(rdma, ctxt, rqst->rq_buffer, + rqst->rq_snd_buf.len); + if (ret < 0) goto out_err; ret = svc_rdma_repost_recv(rdma, GFP_NOIO); if (ret) goto out_err; - ctxt = svc_rdma_get_context(rdma); - ctxt->pages[0] = virt_to_page(rqst->rq_buffer); - ctxt->count = 1; - - ctxt->direction = DMA_TO_DEVICE; - ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->sge[0].length = sndbuf->len; - ctxt->sge[0].addr = - ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0, - sndbuf->len, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) { - ret = -EIO; - goto out_unmap; - } - svc_rdma_count_mappings(rdma, ctxt); - ret = svc_rdma_post_send_wr(rdma, ctxt, 1, 0); - if (ret) { - ret = -EIO; + if (ret) goto out_unmap; - } out_err: - svc_rdma_put_req_map(rdma, vec); dprintk("svcrdma: %s returns %d\n", __func__, ret); return ret; out_unmap: svc_rdma_unmap_dma(ctxt); svc_rdma_put_context(ctxt, 1); + ret = -EIO; goto out_err; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index f90b40d0932f..a7dc71daa776 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -217,6 +217,49 @@ static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp, return 0; } +static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + unsigned int sge_no, + struct page *page, + unsigned int offset, + unsigned int len) +{ + struct ib_device *dev = rdma->sc_cm_id->device; + dma_addr_t dma_addr; + + dma_addr = ib_dma_map_page(dev, page, offset, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(dev, dma_addr)) + return -EIO; + + ctxt->sge[sge_no].addr = dma_addr; + ctxt->sge[sge_no].length = len; + ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; + svc_rdma_count_mappings(rdma, ctxt); + return 0; +} + +/** + * svc_rdma_map_reply_hdr - DMA map the transport header buffer + * @rdma: controlling transport + * @ctxt: op_ctxt for the Send WR + * @rdma_resp: buffer containing transport header + * @len: length of transport header + * + * Returns: + * %0 if the header is DMA mapped, + * %-EIO if DMA mapping failed. + */ +int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + __be32 *rdma_resp, + unsigned int len) +{ + ctxt->direction = DMA_TO_DEVICE; + ctxt->pages[0] = virt_to_page(rdma_resp); + ctxt->count = 1; + return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); +} + /* Assumptions: * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE */ @@ -699,22 +742,14 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, err = ERR_VERS; length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); + /* Map transport header; no RPC message payload */ ctxt = svc_rdma_get_context(xprt); - ctxt->direction = DMA_TO_DEVICE; - ctxt->count = 1; - ctxt->pages[0] = p; - - /* Prepare SGE for local address */ - ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey; - ctxt->sge[0].length = length; - ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device, - p, 0, length, DMA_TO_DEVICE); - if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) { - dprintk("svcrdma: Error mapping buffer for protocol error\n"); - svc_rdma_put_context(ctxt, 1); + ret = svc_rdma_map_reply_hdr(xprt, ctxt, &rmsgp->rm_xid, length); + if (ret) { + dprintk("svcrdma: Error %d mapping send for protocol error\n", + ret); return; } - svc_rdma_count_mappings(xprt, ctxt); ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); if (ret) { -- cgit v1.2.3 From b623589dbacbc786c2fffc85113a1dc1a331e2ca Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:05:52 -0400 Subject: svcrdma: Eliminate RPCRDMA_SQ_DEPTH_MULT The Send Queue depth is temporarily reduced to 1 SQE per credit. The new rdma_rw API does an internal computation, during QP creation, to increase the depth of the Send Queue to handle RDMA Read and Write operations. This change has to come before the NFSD code paths are updated to use the rdma_rw API. Without this patch, rdma_rw_init_qp() increases the size of the SQ too much, resulting in memory allocation failures during QP creation. Signed-off-by: Chuck Lever Reviewed-by: Christoph Hellwig Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 1 - net/sunrpc/xprtrdma/svc_rdma.c | 2 -- net/sunrpc/xprtrdma/svc_rdma_transport.c | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 002a46d1faa1..11d5aa123f17 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -182,7 +182,6 @@ struct svcxprt_rdma { /* The default ORD value is based on two outstanding full-size writes with a * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ. */ #define RPCRDMA_ORD (64/4) -#define RPCRDMA_SQ_DEPTH_MULT 8 #define RPCRDMA_MAX_REQUESTS 32 #define RPCRDMA_MAX_REQ_SIZE 4096 diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index c846ca9f1eba..912444174647 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -247,8 +247,6 @@ int svc_rdma_init(void) dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); dprintk("\tmax_requests : %u\n", svcrdma_max_requests); - dprintk("\tsq_depth : %u\n", - svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT); dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index fc8f14c7bfec..e1097cc6d1eb 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1014,7 +1014,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) svcrdma_max_bc_requests); newxprt->sc_rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests; - newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; + newxprt->sc_sq_depth = newxprt->sc_rq_depth; atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); if (!svc_rdma_prealloc_ctxts(newxprt)) -- cgit v1.2.3 From c55ab0707b2817046ad48d6c87a6b764119a2458 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:00 -0400 Subject: svcrdma: Add helper to save pages under I/O Clean up: extract the logic to save pages under I/O into a helper to add a big documenting comment without adding clutter in the send path. This is a refactoring change only. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Reviewed-by: Christoph Hellwig Signed-off-by: J. Bruce Fields --- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index a7dc71daa776..2798f3ea0020 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -478,6 +478,23 @@ out_err: return -EIO; } +/* The svc_rqst and all resources it owns are released as soon as + * svc_rdma_sendto returns. Transfer pages under I/O to the ctxt + * so they are released by the Send completion handler. + */ +static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *ctxt) +{ + int i, pages = rqstp->rq_next_page - rqstp->rq_respages; + + ctxt->count += pages; + for (i = 0; i < pages; i++) { + ctxt->pages[i + 1] = rqstp->rq_respages[i]; + rqstp->rq_respages[i] = NULL; + } + rqstp->rq_next_page = rqstp->rq_respages + 1; +} + /** * svc_rdma_post_send_wr - Set up and post one Send Work Request * @rdma: controlling transport @@ -543,8 +560,6 @@ static int send_reply(struct svcxprt_rdma *rdma, u32 xdr_off; int sge_no; int sge_bytes; - int page_no; - int pages; int ret = -EIO; /* Prepare the context */ @@ -587,17 +602,7 @@ static int send_reply(struct svcxprt_rdma *rdma, goto err; } - /* Save all respages in the ctxt and remove them from the - * respages array. They are our pages until the I/O - * completes. - */ - pages = rqstp->rq_next_page - rqstp->rq_respages; - for (page_no = 0; page_no < pages; page_no++) { - ctxt->pages[page_no+1] = rqstp->rq_respages[page_no]; - ctxt->count++; - rqstp->rq_respages[page_no] = NULL; - } - rqstp->rq_next_page = rqstp->rq_respages + 1; + svc_rdma_save_io_pages(rqstp, ctxt); if (sge_no > rdma->sc_max_sge) { pr_err("svcrdma: Too many sges (%d)\n", sge_no); -- cgit v1.2.3 From c238c4c034f857d12d7efbf9934d96b8bb68fbc7 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:08 -0400 Subject: svcrdma: Clean up svc_rdma_get_inv_rkey() Replace C structure-based XDR decoding with more portable code that instead uses pointer arithmetic. This is a refactoring change only. Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 39 +++++++++++++++-------------------- 1 file changed, 17 insertions(+), 22 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 2798f3ea0020..2eb3df698e11 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -189,32 +189,25 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, * Invalidate, and responder chooses one rkey to invalidate. * * Find a candidate rkey to invalidate when sending a reply. Picks the - * first rkey it finds in the chunks lists. + * first R_key it finds in the chunk lists. * * Returns zero if RPC's chunk lists are empty. */ -static u32 svc_rdma_get_inv_rkey(struct rpcrdma_msg *rdma_argp, - struct rpcrdma_write_array *wr_ary, - struct rpcrdma_write_array *rp_ary) +static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, + __be32 *wr_lst, __be32 *rp_ch) { - struct rpcrdma_read_chunk *rd_ary; - struct rpcrdma_segment *arg_ch; - - rd_ary = (struct rpcrdma_read_chunk *)&rdma_argp->rm_body.rm_chunks[0]; - if (rd_ary->rc_discrim != xdr_zero) - return be32_to_cpu(rd_ary->rc_target.rs_handle); - - if (wr_ary && be32_to_cpu(wr_ary->wc_nchunks)) { - arg_ch = &wr_ary->wc_array[0].wc_target; - return be32_to_cpu(arg_ch->rs_handle); - } - - if (rp_ary && be32_to_cpu(rp_ary->wc_nchunks)) { - arg_ch = &rp_ary->wc_array[0].wc_target; - return be32_to_cpu(arg_ch->rs_handle); - } + __be32 *p; - return 0; + p = rdma_argp + rpcrdma_fixed_maxsz; + if (*p != xdr_zero) + p += 2; + else if (wr_lst && be32_to_cpup(wr_lst + 1)) + p = wr_lst + 2; + else if (rp_ch && be32_to_cpup(rp_ch + 1)) + p = rp_ch + 2; + else + return 0; + return be32_to_cpup(p); } static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, @@ -650,7 +643,9 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) inv_rkey = 0; if (rdma->sc_snd_w_inv) - inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_ary, rp_ary); + inv_rkey = svc_rdma_get_inv_rkey(&rdma_argp->rm_xid, + (__be32 *)wr_ary, + (__be32 *)rp_ary); /* Build an req vec for the XDR */ vec = svc_rdma_get_req_map(rdma); -- cgit v1.2.3 From f13193f50b64e2e0c87706b838d6b9895626a892 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:16 -0400 Subject: svcrdma: Introduce local rdma_rw API helpers The plan is to replace the local bespoke code that constructs and posts RDMA Read and Write Work Requests with calls to the rdma_rw API. This shares code with other RDMA-enabled ULPs that manages the gory details of buffer registration and posting Work Requests. Some design notes: o The structure of RPC-over-RDMA transport headers is flexible, allowing multiple segments per Reply with arbitrary alignment, each with a unique R_key. Write and Send WRs continue to be built and posted in separate code paths. However, one whole chunk (with one or more RDMA segments apiece) gets exactly one ib_post_send and one work completion. o svc_xprt reference counting is modified, since a chain of rdma_rw_ctx structs generates one completion, no matter how many Write WRs are posted. o The current code builds the transport header as it is construct- ing Write WRs. I've replaced that with marshaling of transport header data items in a separate step. This is because the exact structure of client-provided segments may not align with the components of the server's reply xdr_buf, or the pages in the page list. Thus parts of each client-provided segment may be written at different points in the send path. Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 11 + net/sunrpc/Kconfig | 1 + net/sunrpc/xprtrdma/Makefile | 2 +- net/sunrpc/xprtrdma/svc_rdma_rw.c | 512 +++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/svc_rdma_transport.c | 4 + 5 files changed, 529 insertions(+), 1 deletion(-) create mode 100644 net/sunrpc/xprtrdma/svc_rdma_rw.c (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 11d5aa123f17..ca08671fb7e2 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -145,12 +145,15 @@ struct svcxprt_rdma { u32 sc_max_requests; /* Max requests */ u32 sc_max_bc_requests;/* Backward credits */ int sc_max_req_size; /* Size of each RQ WR buf */ + u8 sc_port_num; struct ib_pd *sc_pd; spinlock_t sc_ctxt_lock; struct list_head sc_ctxts; int sc_ctxt_used; + spinlock_t sc_rw_ctxt_lock; + struct list_head sc_rw_ctxts; spinlock_t sc_map_lock; struct list_head sc_maps; @@ -224,6 +227,14 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *, struct svc_rdma_op_ctxt *, int *, u32 *, u32, u32, u64, bool); +/* svc_rdma_rw.c */ +extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma); +extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, + __be32 *wr_ch, struct xdr_buf *xdr); +extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, + __be32 *rp_ch, bool writelist, + struct xdr_buf *xdr); + /* svc_rdma_sendto.c */ extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *, struct svc_rdma_req_map *, bool); diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig index 04ce2c0b660e..ac09ca803296 100644 --- a/net/sunrpc/Kconfig +++ b/net/sunrpc/Kconfig @@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA tristate "RPC-over-RDMA transport" depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS default SUNRPC && INFINIBAND + select SG_POOL help This option allows the NFS client and server to use RDMA transports (InfiniBand, iWARP, or RoCE). diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index ef19fa42c50f..c1ae8142ab73 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ fmr_ops.o frwr_ops.o \ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ - module.o + svc_rdma_rw.o module.o rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c new file mode 100644 index 000000000000..0cf620277693 --- /dev/null +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c @@ -0,0 +1,512 @@ +/* + * Copyright (c) 2016 Oracle. All rights reserved. + * + * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. + */ + +#include +#include +#include + +#include + +#define RPCDBG_FACILITY RPCDBG_SVCXPRT + +/* Each R/W context contains state for one chain of RDMA Read or + * Write Work Requests. + * + * Each WR chain handles a single contiguous server-side buffer, + * because scatterlist entries after the first have to start on + * page alignment. xdr_buf iovecs cannot guarantee alignment. + * + * Each WR chain handles only one R_key. Each RPC-over-RDMA segment + * from a client may contain a unique R_key, so each WR chain moves + * up to one segment at a time. + * + * The scatterlist makes this data structure over 4KB in size. To + * make it less likely to fail, and to handle the allocation for + * smaller I/O requests without disabling bottom-halves, these + * contexts are created on demand, but cached and reused until the + * controlling svcxprt_rdma is destroyed. + */ +struct svc_rdma_rw_ctxt { + struct list_head rw_list; + struct rdma_rw_ctx rw_ctx; + int rw_nents; + struct sg_table rw_sg_table; + struct scatterlist rw_first_sgl[0]; +}; + +static inline struct svc_rdma_rw_ctxt * +svc_rdma_next_ctxt(struct list_head *list) +{ + return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, + rw_list); +} + +static struct svc_rdma_rw_ctxt * +svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) +{ + struct svc_rdma_rw_ctxt *ctxt; + + spin_lock(&rdma->sc_rw_ctxt_lock); + + ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); + if (ctxt) { + list_del(&ctxt->rw_list); + spin_unlock(&rdma->sc_rw_ctxt_lock); + } else { + spin_unlock(&rdma->sc_rw_ctxt_lock); + ctxt = kmalloc(sizeof(*ctxt) + + SG_CHUNK_SIZE * sizeof(struct scatterlist), + GFP_KERNEL); + if (!ctxt) + goto out; + INIT_LIST_HEAD(&ctxt->rw_list); + } + + ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; + if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, + ctxt->rw_sg_table.sgl)) { + kfree(ctxt); + ctxt = NULL; + } +out: + return ctxt; +} + +static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, + struct svc_rdma_rw_ctxt *ctxt) +{ + sg_free_table_chained(&ctxt->rw_sg_table, true); + + spin_lock(&rdma->sc_rw_ctxt_lock); + list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); + spin_unlock(&rdma->sc_rw_ctxt_lock); +} + +/** + * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts + * @rdma: transport about to be destroyed + * + */ +void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) +{ + struct svc_rdma_rw_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { + list_del(&ctxt->rw_list); + kfree(ctxt); + } +} + +/* A chunk context tracks all I/O for moving one Read or Write + * chunk. This is a a set of rdma_rw's that handle data movement + * for all segments of one chunk. + * + * These are small, acquired with a single allocator call, and + * no more than one is needed per chunk. They are allocated on + * demand, and not cached. + */ +struct svc_rdma_chunk_ctxt { + struct ib_cqe cc_cqe; + struct svcxprt_rdma *cc_rdma; + struct list_head cc_rwctxts; + int cc_sqecount; + enum dma_data_direction cc_dir; +}; + +static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, + struct svc_rdma_chunk_ctxt *cc, + enum dma_data_direction dir) +{ + cc->cc_rdma = rdma; + svc_xprt_get(&rdma->sc_xprt); + + INIT_LIST_HEAD(&cc->cc_rwctxts); + cc->cc_sqecount = 0; + cc->cc_dir = dir; +} + +static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc) +{ + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_rdma_rw_ctxt *ctxt; + + while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { + list_del(&ctxt->rw_list); + + rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, ctxt->rw_sg_table.sgl, + ctxt->rw_nents, cc->cc_dir); + svc_rdma_put_rw_ctxt(rdma, ctxt); + } + svc_xprt_put(&rdma->sc_xprt); +} + +/* State for sending a Write or Reply chunk. + * - Tracks progress of writing one chunk over all its segments + * - Stores arguments for the SGL constructor functions + */ +struct svc_rdma_write_info { + /* write state of this chunk */ + unsigned int wi_seg_off; + unsigned int wi_seg_no; + unsigned int wi_nsegs; + __be32 *wi_segs; + + /* SGL constructor arguments */ + struct xdr_buf *wi_xdr; + unsigned char *wi_base; + unsigned int wi_next_off; + + struct svc_rdma_chunk_ctxt wi_cc; +}; + +static struct svc_rdma_write_info * +svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) +{ + struct svc_rdma_write_info *info; + + info = kmalloc(sizeof(*info), GFP_KERNEL); + if (!info) + return info; + + info->wi_seg_off = 0; + info->wi_seg_no = 0; + info->wi_nsegs = be32_to_cpup(++chunk); + info->wi_segs = ++chunk; + svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE); + return info; +} + +static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) +{ + svc_rdma_cc_release(&info->wi_cc); + kfree(info); +} + +/** + * svc_rdma_write_done - Write chunk completion + * @cq: controlling Completion Queue + * @wc: Work Completion + * + * Pages under I/O are freed by a subsequent Send completion. + */ +static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct svc_rdma_chunk_ctxt *cc = + container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_rdma_write_info *info = + container_of(cc, struct svc_rdma_write_info, wi_cc); + + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + + if (unlikely(wc->status != IB_WC_SUCCESS)) { + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("svcrdma: write ctx: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); + } + + svc_rdma_write_info_free(info); +} + +/* This function sleeps when the transport's Send Queue is congested. + * + * Assumptions: + * - If ib_post_send() succeeds, only one completion is expected, + * even if one or more WRs are flushed. This is true when posting + * an rdma_rw_ctx or when posting a single signaled WR. + */ +static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) +{ + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_xprt *xprt = &rdma->sc_xprt; + struct ib_send_wr *first_wr, *bad_wr; + struct list_head *tmp; + struct ib_cqe *cqe; + int ret; + + first_wr = NULL; + cqe = &cc->cc_cqe; + list_for_each(tmp, &cc->cc_rwctxts) { + struct svc_rdma_rw_ctxt *ctxt; + + ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); + first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, cqe, first_wr); + cqe = NULL; + } + + do { + if (atomic_sub_return(cc->cc_sqecount, + &rdma->sc_sq_avail) > 0) { + ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); + if (ret) + break; + return 0; + } + + atomic_inc(&rdma_stat_sq_starve); + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); + wait_event(rdma->sc_send_wait, + atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); + } while (1); + + pr_err("svcrdma: ib_post_send failed (%d)\n", ret); + set_bit(XPT_CLOSE, &xprt->xpt_flags); + + /* If even one was posted, there will be a completion. */ + if (bad_wr != first_wr) + return 0; + + atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); + wake_up(&rdma->sc_send_wait); + return -ENOTCONN; +} + +/* Build and DMA-map an SGL that covers one kvec in an xdr_buf + */ +static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, + unsigned int len, + struct svc_rdma_rw_ctxt *ctxt) +{ + struct scatterlist *sg = ctxt->rw_sg_table.sgl; + + sg_set_buf(&sg[0], info->wi_base, len); + info->wi_base += len; + + ctxt->rw_nents = 1; +} + +/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. + */ +static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, + unsigned int remaining, + struct svc_rdma_rw_ctxt *ctxt) +{ + unsigned int sge_no, sge_bytes, page_off, page_no; + struct xdr_buf *xdr = info->wi_xdr; + struct scatterlist *sg; + struct page **page; + + page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK; + page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT; + page = xdr->pages + page_no; + info->wi_next_off += remaining; + sg = ctxt->rw_sg_table.sgl; + sge_no = 0; + do { + sge_bytes = min_t(unsigned int, remaining, + PAGE_SIZE - page_off); + sg_set_page(sg, *page, sge_bytes, page_off); + + remaining -= sge_bytes; + sg = sg_next(sg); + page_off = 0; + sge_no++; + page++; + } while (remaining); + + ctxt->rw_nents = sge_no; +} + +/* Construct RDMA Write WRs to send a portion of an xdr_buf containing + * an RPC Reply. + */ +static int +svc_rdma_build_writes(struct svc_rdma_write_info *info, + void (*constructor)(struct svc_rdma_write_info *info, + unsigned int len, + struct svc_rdma_rw_ctxt *ctxt), + unsigned int remaining) +{ + struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; + struct svcxprt_rdma *rdma = cc->cc_rdma; + struct svc_rdma_rw_ctxt *ctxt; + __be32 *seg; + int ret; + + cc->cc_cqe.done = svc_rdma_write_done; + seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; + do { + unsigned int write_len; + u32 seg_length, seg_handle; + u64 seg_offset; + + if (info->wi_seg_no >= info->wi_nsegs) + goto out_overflow; + + seg_handle = be32_to_cpup(seg); + seg_length = be32_to_cpup(seg + 1); + xdr_decode_hyper(seg + 2, &seg_offset); + seg_offset += info->wi_seg_off; + + write_len = min(remaining, seg_length - info->wi_seg_off); + ctxt = svc_rdma_get_rw_ctxt(rdma, + (write_len >> PAGE_SHIFT) + 2); + if (!ctxt) + goto out_noctx; + + constructor(info, write_len, ctxt); + ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, + rdma->sc_port_num, ctxt->rw_sg_table.sgl, + ctxt->rw_nents, 0, seg_offset, + seg_handle, DMA_TO_DEVICE); + if (ret < 0) + goto out_initerr; + + list_add(&ctxt->rw_list, &cc->cc_rwctxts); + cc->cc_sqecount += ret; + if (write_len == seg_length - info->wi_seg_off) { + seg += 4; + info->wi_seg_no++; + info->wi_seg_off = 0; + } else { + info->wi_seg_off += write_len; + } + remaining -= write_len; + } while (remaining); + + return 0; + +out_overflow: + dprintk("svcrdma: inadequate space in Write chunk (%u)\n", + info->wi_nsegs); + return -E2BIG; + +out_noctx: + dprintk("svcrdma: no R/W ctxs available\n"); + return -ENOMEM; + +out_initerr: + svc_rdma_put_rw_ctxt(rdma, ctxt); + pr_err("svcrdma: failed to map pagelist (%d)\n", ret); + return -EIO; +} + +/* Send one of an xdr_buf's kvecs by itself. To send a Reply + * chunk, the whole RPC Reply is written back to the client. + * This function writes either the head or tail of the xdr_buf + * containing the Reply. + */ +static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, + struct kvec *vec) +{ + info->wi_base = vec->iov_base; + return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, + vec->iov_len); +} + +/* Send an xdr_buf's page list by itself. A Write chunk is + * just the page list. a Reply chunk is the head, page list, + * and tail. This function is shared between the two types + * of chunk. + */ +static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, + struct xdr_buf *xdr) +{ + info->wi_xdr = xdr; + info->wi_next_off = 0; + return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, + xdr->page_len); +} + +/** + * svc_rdma_send_write_chunk - Write all segments in a Write chunk + * @rdma: controlling RDMA transport + * @wr_ch: Write chunk provided by client + * @xdr: xdr_buf containing the data payload + * + * Returns a non-negative number of bytes the chunk consumed, or + * %-E2BIG if the payload was larger than the Write chunk, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + */ +int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, + struct xdr_buf *xdr) +{ + struct svc_rdma_write_info *info; + int ret; + + if (!xdr->page_len) + return 0; + + info = svc_rdma_write_info_alloc(rdma, wr_ch); + if (!info) + return -ENOMEM; + + ret = svc_rdma_send_xdr_pagelist(info, xdr); + if (ret < 0) + goto out_err; + + ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); + if (ret < 0) + goto out_err; + return xdr->page_len; + +out_err: + svc_rdma_write_info_free(info); + return ret; +} + +/** + * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk + * @rdma: controlling RDMA transport + * @rp_ch: Reply chunk provided by client + * @writelist: true if client provided a Write list + * @xdr: xdr_buf containing an RPC Reply + * + * Returns a non-negative number of bytes the chunk consumed, or + * %-E2BIG if the payload was larger than the Reply chunk, + * %-ENOMEM if rdma_rw context pool was exhausted, + * %-ENOTCONN if posting failed (connection is lost), + * %-EIO if rdma_rw initialization failed (DMA mapping, etc). + */ +int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, + bool writelist, struct xdr_buf *xdr) +{ + struct svc_rdma_write_info *info; + int consumed, ret; + + info = svc_rdma_write_info_alloc(rdma, rp_ch); + if (!info) + return -ENOMEM; + + ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]); + if (ret < 0) + goto out_err; + consumed = xdr->head[0].iov_len; + + /* Send the page list in the Reply chunk only if the + * client did not provide Write chunks. + */ + if (!writelist && xdr->page_len) { + ret = svc_rdma_send_xdr_pagelist(info, xdr); + if (ret < 0) + goto out_err; + consumed += xdr->page_len; + } + + if (xdr->tail[0].iov_len) { + ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]); + if (ret < 0) + goto out_err; + consumed += xdr->tail[0].iov_len; + } + + ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); + if (ret < 0) + goto out_err; + return consumed; + +out_err: + svc_rdma_write_info_free(info); + return ret; +} diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index e1097cc6d1eb..b25c50992a95 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -561,6 +561,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_maps); init_waitqueue_head(&cma_xprt->sc_send_wait); @@ -568,6 +569,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_frmr_q_lock); spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); spin_lock_init(&cma_xprt->sc_map_lock); /* @@ -999,6 +1001,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) newxprt, newxprt->sc_cm_id); dev = newxprt->sc_cm_id->device; + newxprt->sc_port_num = newxprt->sc_cm_id->port_num; /* Qualify the transport resource defaults with the * capabilities of this particular device */ @@ -1248,6 +1251,7 @@ static void __svc_rdma_free(struct work_struct *work) } rdma_dealloc_frmr_q(rdma); + svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_destroy_ctxts(rdma); svc_rdma_destroy_maps(rdma); -- cgit v1.2.3 From 9a6a180b7867ceceeeab88a6f011bac23174b939 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:25 -0400 Subject: svcrdma: Use rdma_rw API in RPC reply path The current svcrdma sendto code path posts one RDMA Write WR at a time. Each of these Writes typically carries a small number of pages (for instance, up to 30 pages for mlx4 devices). That means a 1MB NFS READ reply requires 9 ib_post_send() calls for the Write WRs, and one for the Send WR carrying the actual RPC Reply message. Instead, use the new rdma_rw API. The details of Write WR chain construction and memory registration are taken care of in the RDMA core. svcrdma can focus on the details of the RPC-over-RDMA protocol. This gives three main benefits: 1. All Write WRs for one RDMA segment are posted in a single chain. As few as one ib_post_send() for each Write chunk. 2. The Write path can now use FRWR to register the Write buffers. If the device's maximum page list depth is large, this means a single Write WR is needed for each RPC's Write chunk data. 3. The new code introduces support for RPCs that carry both a Write list and a Reply chunk. This combination can be used for an NFSv4 READ where the data payload is large, and thus is removed from the Payload Stream, but the Payload Stream is still larger than the inline threshold. Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 1 - net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 6 +- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 696 ++++++++++++++--------------- net/sunrpc/xprtrdma/svc_rdma_transport.c | 2 + 4 files changed, 350 insertions(+), 355 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index ca08671fb7e2..599ee03ee3fb 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -212,7 +212,6 @@ extern int svc_rdma_xdr_decode_req(struct xdr_buf *); extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *, struct rpcrdma_msg *, enum rpcrdma_errcode, __be32 *); -extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int); extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int); extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int, __be32, __be64, u32); diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index 0305b33d482f..bf185b79c98f 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -90,9 +90,9 @@ out_notfound: * Caller holds the connection's mutex and has already marshaled * the RPC/RDMA request. * - * This is similar to svc_rdma_reply, but takes an rpc_rqst - * instead, does not support chunks, and avoids blocking memory - * allocation. + * This is similar to svc_rdma_send_reply_msg, but takes a struct + * rpc_rqst instead, does not support chunks, and avoids blocking + * memory allocation. * * XXX: There is still an opportunity to block in svc_rdma_send() * if there are no SQ entries to post the Send. This may occur if diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 2eb3df698e11..ce62b78e5bc9 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -1,4 +1,5 @@ /* + * Copyright (c) 2016 Oracle. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. * @@ -40,6 +41,63 @@ * Author: Tom Tucker */ +/* Operation + * + * The main entry point is svc_rdma_sendto. This is called by the + * RPC server when an RPC Reply is ready to be transmitted to a client. + * + * The passed-in svc_rqst contains a struct xdr_buf which holds an + * XDR-encoded RPC Reply message. sendto must construct the RPC-over-RDMA + * transport header, post all Write WRs needed for this Reply, then post + * a Send WR conveying the transport header and the RPC message itself to + * the client. + * + * svc_rdma_sendto must fully transmit the Reply before returning, as + * the svc_rqst will be recycled as soon as sendto returns. Remaining + * resources referred to by the svc_rqst are also recycled at that time. + * Therefore any resources that must remain longer must be detached + * from the svc_rqst and released later. + * + * Page Management + * + * The I/O that performs Reply transmission is asynchronous, and may + * complete well after sendto returns. Thus pages under I/O must be + * removed from the svc_rqst before sendto returns. + * + * The logic here depends on Send Queue and completion ordering. Since + * the Send WR is always posted last, it will always complete last. Thus + * when it completes, it is guaranteed that all previous Write WRs have + * also completed. + * + * Write WRs are constructed and posted. Each Write segment gets its own + * svc_rdma_rw_ctxt, allowing the Write completion handler to find and + * DMA-unmap the pages under I/O for that Write segment. The Write + * completion handler does not release any pages. + * + * When the Send WR is constructed, it also gets its own svc_rdma_op_ctxt. + * The ownership of all of the Reply's pages are transferred into that + * ctxt, the Send WR is posted, and sendto returns. + * + * The svc_rdma_op_ctxt is presented when the Send WR completes. The + * Send completion handler finally releases the Reply's pages. + * + * This mechanism also assumes that completions on the transport's Send + * Completion Queue do not run in parallel. Otherwise a Write completion + * and Send completion running at the same time could release pages that + * are still DMA-mapped. + * + * Error Handling + * + * - If the Send WR is posted successfully, it will either complete + * successfully, or get flushed. Either way, the Send completion + * handler releases the Reply's pages. + * - If the Send WR cannot be not posted, the forward path releases + * the Reply's pages. + * + * This handles the case, without the use of page reference counting, + * where two different Write segments send portions of the same page. + */ + #include #include #include @@ -55,6 +113,133 @@ static u32 xdr_padsize(u32 len) return (len & 3) ? (4 - (len & 3)) : 0; } +/* Returns length of transport header, in bytes. + */ +static unsigned int svc_rdma_reply_hdr_len(__be32 *rdma_resp) +{ + unsigned int nsegs; + __be32 *p; + + p = rdma_resp; + + /* RPC-over-RDMA V1 replies never have a Read list. */ + p += rpcrdma_fixed_maxsz + 1; + + /* Skip Write list. */ + while (*p++ != xdr_zero) { + nsegs = be32_to_cpup(p++); + p += nsegs * rpcrdma_segment_maxsz; + } + + /* Skip Reply chunk. */ + if (*p++ != xdr_zero) { + nsegs = be32_to_cpup(p++); + p += nsegs * rpcrdma_segment_maxsz; + } + + return (unsigned long)p - (unsigned long)rdma_resp; +} + +/* One Write chunk is copied from Call transport header to Reply + * transport header. Each segment's length field is updated to + * reflect number of bytes consumed in the segment. + * + * Returns number of segments in this chunk. + */ +static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src, + unsigned int remaining) +{ + unsigned int i, nsegs; + u32 seg_len; + + /* Write list discriminator */ + *dst++ = *src++; + + /* number of segments in this chunk */ + nsegs = be32_to_cpup(src); + *dst++ = *src++; + + for (i = nsegs; i; i--) { + /* segment's RDMA handle */ + *dst++ = *src++; + + /* bytes returned in this segment */ + seg_len = be32_to_cpu(*src); + if (remaining >= seg_len) { + /* entire segment was consumed */ + *dst = *src; + remaining -= seg_len; + } else { + /* segment only partly filled */ + *dst = cpu_to_be32(remaining); + remaining = 0; + } + dst++; src++; + + /* segment's RDMA offset */ + *dst++ = *src++; + *dst++ = *src++; + } + + return nsegs; +} + +/* The client provided a Write list in the Call message. Fill in + * the segments in the first Write chunk in the Reply's transport + * header with the number of bytes consumed in each segment. + * Remaining chunks are returned unused. + * + * Assumptions: + * - Client has provided only one Write chunk + */ +static void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch, + unsigned int consumed) +{ + unsigned int nsegs; + __be32 *p, *q; + + /* RPC-over-RDMA V1 replies never have a Read list. */ + p = rdma_resp + rpcrdma_fixed_maxsz + 1; + + q = wr_ch; + while (*q != xdr_zero) { + nsegs = xdr_encode_write_chunk(p, q, consumed); + q += 2 + nsegs * rpcrdma_segment_maxsz; + p += 2 + nsegs * rpcrdma_segment_maxsz; + consumed = 0; + } + + /* Terminate Write list */ + *p++ = xdr_zero; + + /* Reply chunk discriminator; may be replaced later */ + *p = xdr_zero; +} + +/* The client provided a Reply chunk in the Call message. Fill in + * the segments in the Reply chunk in the Reply message with the + * number of bytes consumed in each segment. + * + * Assumptions: + * - Reply can always fit in the provided Reply chunk + */ +static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, + unsigned int consumed) +{ + __be32 *p; + + /* Find the Reply chunk in the Reply's xprt header. + * RPC-over-RDMA V1 replies never have a Read list. + */ + p = rdma_resp + rpcrdma_fixed_maxsz + 1; + + /* Skip past Write list */ + while (*p++ != xdr_zero) + p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz; + + xdr_encode_write_chunk(p, rp_ch, consumed); +} + int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, struct xdr_buf *xdr, struct svc_rdma_req_map *vec, @@ -123,45 +308,14 @@ int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, return 0; } -static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt, - struct xdr_buf *xdr, - u32 xdr_off, size_t len, int dir) -{ - struct page *page; - dma_addr_t dma_addr; - if (xdr_off < xdr->head[0].iov_len) { - /* This offset is in the head */ - xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK; - page = virt_to_page(xdr->head[0].iov_base); - } else { - xdr_off -= xdr->head[0].iov_len; - if (xdr_off < xdr->page_len) { - /* This offset is in the page list */ - xdr_off += xdr->page_base; - page = xdr->pages[xdr_off >> PAGE_SHIFT]; - xdr_off &= ~PAGE_MASK; - } else { - /* This offset is in the tail */ - xdr_off -= xdr->page_len; - xdr_off += (unsigned long) - xdr->tail[0].iov_base & ~PAGE_MASK; - page = virt_to_page(xdr->tail[0].iov_base); - } - } - dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off, - min_t(size_t, PAGE_SIZE, len), dir); - return dma_addr; -} - /* Parse the RPC Call's transport header. */ -static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, - struct rpcrdma_write_array **write, - struct rpcrdma_write_array **reply) +static void svc_rdma_get_write_arrays(__be32 *rdma_argp, + __be32 **write, __be32 **reply) { __be32 *p; - p = (__be32 *)&rmsgp->rm_body.rm_chunks[0]; + p = rdma_argp + rpcrdma_fixed_maxsz; /* Read list */ while (*p++ != xdr_zero) @@ -169,7 +323,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, /* Write list */ if (*p != xdr_zero) { - *write = (struct rpcrdma_write_array *)p; + *write = p; while (*p++ != xdr_zero) p += 1 + be32_to_cpu(*p) * 4; } else { @@ -179,7 +333,7 @@ static void svc_rdma_get_write_arrays(struct rpcrdma_msg *rmsgp, /* Reply chunk */ if (*p != xdr_zero) - *reply = (struct rpcrdma_write_array *)p; + *reply = p; else *reply = NULL; } @@ -210,6 +364,32 @@ static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp, return be32_to_cpup(p); } +/* ib_dma_map_page() is used here because svc_rdma_dma_unmap() + * is used during completion to DMA-unmap this memory, and + * it uses ib_dma_unmap_page() exclusively. + */ +static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + unsigned int sge_no, + unsigned char *base, + unsigned int len) +{ + unsigned long offset = (unsigned long)base & ~PAGE_MASK; + struct ib_device *dev = rdma->sc_cm_id->device; + dma_addr_t dma_addr; + + dma_addr = ib_dma_map_page(dev, virt_to_page(base), + offset, len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(dev, dma_addr)) + return -EIO; + + ctxt->sge[sge_no].addr = dma_addr; + ctxt->sge[sge_no].length = len; + ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; + svc_rdma_count_mappings(rdma, ctxt); + return 0; +} + static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, struct svc_rdma_op_ctxt *ctxt, unsigned int sge_no, @@ -253,222 +433,73 @@ int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, return svc_rdma_dma_map_page(rdma, ctxt, 0, ctxt->pages[0], 0, len); } -/* Assumptions: - * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE +/* Load the xdr_buf into the ctxt's sge array, and DMA map each + * element as it is added. + * + * Returns the number of sge elements loaded on success, or + * a negative errno on failure. */ -static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, - u32 rmr, u64 to, - u32 xdr_off, int write_len, - struct svc_rdma_req_map *vec) +static int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma, + struct svc_rdma_op_ctxt *ctxt, + struct xdr_buf *xdr, __be32 *wr_lst) { - struct ib_rdma_wr write_wr; - struct ib_sge *sge; - int xdr_sge_no; - int sge_no; - int sge_bytes; - int sge_off; - int bc; - struct svc_rdma_op_ctxt *ctxt; + unsigned int len, sge_no, remaining, page_off; + struct page **ppages; + unsigned char *base; + u32 xdr_pad; + int ret; - if (vec->count > RPCSVC_MAXPAGES) { - pr_err("svcrdma: Too many pages (%lu)\n", vec->count); - return -EIO; - } + sge_no = 1; - dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " - "write_len=%d, vec->sge=%p, vec->count=%lu\n", - rmr, (unsigned long long)to, xdr_off, - write_len, vec->sge, vec->count); + ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, + xdr->head[0].iov_base, + xdr->head[0].iov_len); + if (ret < 0) + return ret; - ctxt = svc_rdma_get_context(xprt); - ctxt->direction = DMA_TO_DEVICE; - sge = ctxt->sge; - - /* Find the SGE associated with xdr_off */ - for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count; - xdr_sge_no++) { - if (vec->sge[xdr_sge_no].iov_len > bc) - break; - bc -= vec->sge[xdr_sge_no].iov_len; - } + /* If a Write chunk is present, the xdr_buf's page list + * is not included inline. However the Upper Layer may + * have added XDR padding in the tail buffer, and that + * should not be included inline. + */ + if (wr_lst) { + base = xdr->tail[0].iov_base; + len = xdr->tail[0].iov_len; + xdr_pad = xdr_padsize(xdr->page_len); - sge_off = bc; - bc = write_len; - sge_no = 0; - - /* Copy the remaining SGE */ - while (bc != 0) { - sge_bytes = min_t(size_t, - bc, vec->sge[xdr_sge_no].iov_len-sge_off); - sge[sge_no].length = sge_bytes; - sge[sge_no].addr = - dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, - sge_bytes, DMA_TO_DEVICE); - xdr_off += sge_bytes; - if (ib_dma_mapping_error(xprt->sc_cm_id->device, - sge[sge_no].addr)) - goto err; - svc_rdma_count_mappings(xprt, ctxt); - sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; - ctxt->count++; - sge_off = 0; - sge_no++; - xdr_sge_no++; - if (xdr_sge_no > vec->count) { - pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no); - goto err; + if (len && xdr_pad) { + base += xdr_pad; + len -= xdr_pad; } - bc -= sge_bytes; - if (sge_no == xprt->sc_max_sge) - break; - } - - /* Prepare WRITE WR */ - memset(&write_wr, 0, sizeof write_wr); - ctxt->cqe.done = svc_rdma_wc_write; - write_wr.wr.wr_cqe = &ctxt->cqe; - write_wr.wr.sg_list = &sge[0]; - write_wr.wr.num_sge = sge_no; - write_wr.wr.opcode = IB_WR_RDMA_WRITE; - write_wr.wr.send_flags = IB_SEND_SIGNALED; - write_wr.rkey = rmr; - write_wr.remote_addr = to; - - /* Post It */ - atomic_inc(&rdma_stat_write); - if (svc_rdma_send(xprt, &write_wr.wr)) - goto err; - return write_len - bc; - err: - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 0); - return -EIO; -} -noinline -static int send_write_chunks(struct svcxprt_rdma *xprt, - struct rpcrdma_write_array *wr_ary, - struct rpcrdma_msg *rdma_resp, - struct svc_rqst *rqstp, - struct svc_rdma_req_map *vec) -{ - u32 xfer_len = rqstp->rq_res.page_len; - int write_len; - u32 xdr_off; - int chunk_off; - int chunk_no; - int nchunks; - struct rpcrdma_write_array *res_ary; - int ret; - - res_ary = (struct rpcrdma_write_array *) - &rdma_resp->rm_body.rm_chunks[1]; - - /* Write chunks start at the pagelist */ - nchunks = be32_to_cpu(wr_ary->wc_nchunks); - for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; - xfer_len && chunk_no < nchunks; - chunk_no++) { - struct rpcrdma_segment *arg_ch; - u64 rs_offset; - - arg_ch = &wr_ary->wc_array[chunk_no].wc_target; - write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length)); - - /* Prepare the response chunk given the length actually - * written */ - xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset); - svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, - arg_ch->rs_handle, - arg_ch->rs_offset, - write_len); - chunk_off = 0; - while (write_len) { - ret = send_write(xprt, rqstp, - be32_to_cpu(arg_ch->rs_handle), - rs_offset + chunk_off, - xdr_off, - write_len, - vec); - if (ret <= 0) - goto out_err; - chunk_off += ret; - xdr_off += ret; - xfer_len -= ret; - write_len -= ret; - } + goto tail; } - /* Update the req with the number of chunks actually used */ - svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no); - return rqstp->rq_res.page_len; + ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); + page_off = xdr->page_base & ~PAGE_MASK; + remaining = xdr->page_len; + while (remaining) { + len = min_t(u32, PAGE_SIZE - page_off, remaining); -out_err: - pr_err("svcrdma: failed to send write chunks, rc=%d\n", ret); - return -EIO; -} - -noinline -static int send_reply_chunks(struct svcxprt_rdma *xprt, - struct rpcrdma_write_array *rp_ary, - struct rpcrdma_msg *rdma_resp, - struct svc_rqst *rqstp, - struct svc_rdma_req_map *vec) -{ - u32 xfer_len = rqstp->rq_res.len; - int write_len; - u32 xdr_off; - int chunk_no; - int chunk_off; - int nchunks; - struct rpcrdma_segment *ch; - struct rpcrdma_write_array *res_ary; - int ret; + ret = svc_rdma_dma_map_page(rdma, ctxt, sge_no++, + *ppages++, page_off, len); + if (ret < 0) + return ret; - /* XXX: need to fix when reply lists occur with read-list and or - * write-list */ - res_ary = (struct rpcrdma_write_array *) - &rdma_resp->rm_body.rm_chunks[2]; - - /* xdr offset starts at RPC message */ - nchunks = be32_to_cpu(rp_ary->wc_nchunks); - for (xdr_off = 0, chunk_no = 0; - xfer_len && chunk_no < nchunks; - chunk_no++) { - u64 rs_offset; - ch = &rp_ary->wc_array[chunk_no].wc_target; - write_len = min(xfer_len, be32_to_cpu(ch->rs_length)); - - /* Prepare the reply chunk given the length actually - * written */ - xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset); - svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, - ch->rs_handle, ch->rs_offset, - write_len); - chunk_off = 0; - while (write_len) { - ret = send_write(xprt, rqstp, - be32_to_cpu(ch->rs_handle), - rs_offset + chunk_off, - xdr_off, - write_len, - vec); - if (ret <= 0) - goto out_err; - chunk_off += ret; - xdr_off += ret; - xfer_len -= ret; - write_len -= ret; - } + remaining -= len; + page_off = 0; } - /* Update the req with the number of chunks actually used */ - svc_rdma_xdr_encode_reply_array(res_ary, chunk_no); - return rqstp->rq_res.len; + base = xdr->tail[0].iov_base; + len = xdr->tail[0].iov_len; +tail: + if (len) { + ret = svc_rdma_dma_map_buf(rdma, ctxt, sge_no++, base, len); + if (ret < 0) + return ret; + } -out_err: - pr_err("svcrdma: failed to send reply chunks, rc=%d\n", ret); - return -EIO; + return sge_no - 1; } /* The svc_rqst and all resources it owns are released as soon as @@ -525,90 +556,66 @@ int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, return svc_rdma_send(rdma, send_wr); } -/* This function prepares the portion of the RPCRDMA message to be - * sent in the RDMA_SEND. This function is called after data sent via - * RDMA has already been transmitted. There are three cases: - * - The RPCRDMA header, RPC header, and payload are all sent in a - * single RDMA_SEND. This is the "inline" case. - * - The RPCRDMA header and some portion of the RPC header and data - * are sent via this RDMA_SEND and another portion of the data is - * sent via RDMA. - * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC - * header and data are all transmitted via RDMA. - * In all three cases, this function prepares the RPCRDMA header in - * sge[0], the 'type' parameter indicates the type to place in the - * RPCRDMA header, and the 'byte_count' field indicates how much of - * the XDR to include in this RDMA_SEND. NB: The offset of the payload - * to send is zero in the XDR. +/* Prepare the portion of the RPC Reply that will be transmitted + * via RDMA Send. The RPC-over-RDMA transport header is prepared + * in sge[0], and the RPC xdr_buf is prepared in following sges. + * + * Depending on whether a Write list or Reply chunk is present, + * the server may send all, a portion of, or none of the xdr_buf. + * In the latter case, only the transport header (sge[0]) is + * transmitted. + * + * RDMA Send is the last step of transmitting an RPC reply. Pages + * involved in the earlier RDMA Writes are here transferred out + * of the rqstp and into the ctxt's page array. These pages are + * DMA unmapped by each Write completion, but the subsequent Send + * completion finally releases these pages. + * + * Assumptions: + * - The Reply's transport header will never be larger than a page. */ -static int send_reply(struct svcxprt_rdma *rdma, - struct svc_rqst *rqstp, - struct page *page, - struct rpcrdma_msg *rdma_resp, - struct svc_rdma_req_map *vec, - int byte_count, - u32 inv_rkey) +static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, + __be32 *rdma_argp, __be32 *rdma_resp, + struct svc_rqst *rqstp, + __be32 *wr_lst, __be32 *rp_ch) { struct svc_rdma_op_ctxt *ctxt; - u32 xdr_off; - int sge_no; - int sge_bytes; - int ret = -EIO; + u32 inv_rkey; + int ret; + + dprintk("svcrdma: sending %s reply: head=%zu, pagelen=%u, tail=%zu\n", + (rp_ch ? "RDMA_NOMSG" : "RDMA_MSG"), + rqstp->rq_res.head[0].iov_len, + rqstp->rq_res.page_len, + rqstp->rq_res.tail[0].iov_len); - /* Prepare the context */ ctxt = svc_rdma_get_context(rdma); - ctxt->direction = DMA_TO_DEVICE; - ctxt->pages[0] = page; - ctxt->count = 1; - /* Prepare the SGE for the RPCRDMA Header */ - ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->sge[0].length = - svc_rdma_xdr_get_reply_hdr_len((__be32 *)rdma_resp); - ctxt->sge[0].addr = - ib_dma_map_page(rdma->sc_cm_id->device, page, 0, - ctxt->sge[0].length, DMA_TO_DEVICE); - if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) + ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, + svc_rdma_reply_hdr_len(rdma_resp)); + if (ret < 0) goto err; - svc_rdma_count_mappings(rdma, ctxt); - - ctxt->direction = DMA_TO_DEVICE; - /* Map the payload indicated by 'byte_count' */ - xdr_off = 0; - for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { - sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); - byte_count -= sge_bytes; - ctxt->sge[sge_no].addr = - dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, - sge_bytes, DMA_TO_DEVICE); - xdr_off += sge_bytes; - if (ib_dma_mapping_error(rdma->sc_cm_id->device, - ctxt->sge[sge_no].addr)) + if (!rp_ch) { + ret = svc_rdma_map_reply_msg(rdma, ctxt, + &rqstp->rq_res, wr_lst); + if (ret < 0) goto err; - svc_rdma_count_mappings(rdma, ctxt); - ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; - ctxt->sge[sge_no].length = sge_bytes; - } - if (byte_count != 0) { - pr_err("svcrdma: Could not map %d bytes\n", byte_count); - goto err; } svc_rdma_save_io_pages(rqstp, ctxt); - if (sge_no > rdma->sc_max_sge) { - pr_err("svcrdma: Too many sges (%d)\n", sge_no); - goto err; - } - - ret = svc_rdma_post_send_wr(rdma, ctxt, sge_no, inv_rkey); + inv_rkey = 0; + if (rdma->sc_snd_w_inv) + inv_rkey = svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); + ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, inv_rkey); if (ret) goto err; return 0; - err: +err: + pr_err("svcrdma: failed to post Send WR (%d)\n", ret); svc_rdma_unmap_dma(ctxt); svc_rdma_put_context(ctxt, 1); return ret; @@ -618,41 +625,36 @@ void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) { } +/** + * svc_rdma_sendto - Transmit an RPC reply + * @rqstp: processed RPC request, reply XDR already in ::rq_res + * + * Any resources still associated with @rqstp are released upon return. + * If no reply message was possible, the connection is closed. + * + * Returns: + * %0 if an RPC reply has been successfully posted, + * %-ENOMEM if a resource shortage occurred (connection is lost), + * %-ENOTCONN if posting failed (connection is lost). + */ int svc_rdma_sendto(struct svc_rqst *rqstp) { struct svc_xprt *xprt = rqstp->rq_xprt; struct svcxprt_rdma *rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); - struct rpcrdma_msg *rdma_argp; - struct rpcrdma_msg *rdma_resp; - struct rpcrdma_write_array *wr_ary, *rp_ary; - int ret; - int inline_bytes; + __be32 *p, *rdma_argp, *rdma_resp, *wr_lst, *rp_ch; + struct xdr_buf *xdr = &rqstp->rq_res; struct page *res_page; - struct svc_rdma_req_map *vec; - u32 inv_rkey; - __be32 *p; - - dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); + int ret; - /* Get the RDMA request header. The receive logic always - * places this at the start of page 0. + /* Find the call's chunk lists to decide how to send the reply. + * Receive places the Call's xprt header at the start of page 0. */ rdma_argp = page_address(rqstp->rq_pages[0]); - svc_rdma_get_write_arrays(rdma_argp, &wr_ary, &rp_ary); + svc_rdma_get_write_arrays(rdma_argp, &wr_lst, &rp_ch); - inv_rkey = 0; - if (rdma->sc_snd_w_inv) - inv_rkey = svc_rdma_get_inv_rkey(&rdma_argp->rm_xid, - (__be32 *)wr_ary, - (__be32 *)rp_ary); - - /* Build an req vec for the XDR */ - vec = svc_rdma_get_req_map(rdma); - ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL); - if (ret) - goto err0; - inline_bytes = rqstp->rq_res.len; + dprintk("svcrdma: preparing response for XID 0x%08x\n", + be32_to_cpup(rdma_argp)); /* Create the RDMA response header. xprt->xpt_mutex, * acquired in svc_send(), serializes RPC replies. The @@ -666,54 +668,46 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) goto err0; rdma_resp = page_address(res_page); - p = &rdma_resp->rm_xid; - *p++ = rdma_argp->rm_xid; - *p++ = rdma_argp->rm_vers; + p = rdma_resp; + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); *p++ = rdma->sc_fc_credits; - *p++ = rp_ary ? rdma_nomsg : rdma_msg; + *p++ = rp_ch ? rdma_nomsg : rdma_msg; /* Start with empty chunks */ *p++ = xdr_zero; *p++ = xdr_zero; *p = xdr_zero; - /* Send any write-chunk data and build resp write-list */ - if (wr_ary) { - ret = send_write_chunks(rdma, wr_ary, rdma_resp, rqstp, vec); + if (wr_lst) { + /* XXX: Presume the client sent only one Write chunk */ + ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); if (ret < 0) goto err1; - inline_bytes -= ret + xdr_padsize(ret); + svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); } - - /* Send any reply-list data and update resp reply-list */ - if (rp_ary) { - ret = send_reply_chunks(rdma, rp_ary, rdma_resp, rqstp, vec); + if (rp_ch) { + ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); if (ret < 0) goto err1; - inline_bytes -= ret; + svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); } - /* Post a fresh Receive buffer _before_ sending the reply */ ret = svc_rdma_post_recv(rdma, GFP_KERNEL); if (ret) goto err1; - - ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, - inline_bytes, inv_rkey); + ret = svc_rdma_send_reply_msg(rdma, rdma_argp, rdma_resp, rqstp, + wr_lst, rp_ch); if (ret < 0) goto err0; - - svc_rdma_put_req_map(rdma, vec); - dprintk("svcrdma: send_reply returns %d\n", ret); - return ret; + return 0; err1: put_page(res_page); err0: - svc_rdma_put_req_map(rdma, vec); pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", ret); - set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + set_bit(XPT_CLOSE, &xprt->xpt_flags); return -ENOTCONN; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index b25c50992a95..237c377c1e06 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -1053,6 +1053,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) memset(&qp_attr, 0, sizeof qp_attr); qp_attr.event_handler = qp_event_handler; qp_attr.qp_context = &newxprt->sc_xprt; + qp_attr.port_num = newxprt->sc_cm_id->port_num; + qp_attr.cap.max_rdma_ctxs = newxprt->sc_max_requests; qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; qp_attr.cap.max_send_sge = newxprt->sc_max_sge; -- cgit v1.2.3 From 6b19cc5ca2f78ebc88f5d39ba6a94197bb392fcc Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:33 -0400 Subject: svcrdma: Clean up RDMA_ERROR path Now that svc_rdma_sendto has been renovated, svc_rdma_send_error can be refactored to reduce code duplication and remove C structure- based XDR encoding. It is also relocated to the source file that contains its only caller. This is a refactoring change only. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/rpc_rdma.h | 3 ++ include/linux/sunrpc/svc_rdma.h | 5 ---- net/sunrpc/xprtrdma/svc_rdma_marshal.c | 19 ------------ net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 52 ++++++++++++++++++++++++++++++++- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 43 --------------------------- 5 files changed, 54 insertions(+), 68 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h index 245fc59b7324..b7e85b341a54 100644 --- a/include/linux/sunrpc/rpc_rdma.h +++ b/include/linux/sunrpc/rpc_rdma.h @@ -143,6 +143,9 @@ enum rpcrdma_proc { #define rdma_done cpu_to_be32(RDMA_DONE) #define rdma_error cpu_to_be32(RDMA_ERROR) +#define err_vers cpu_to_be32(ERR_VERS) +#define err_chunk cpu_to_be32(ERR_CHUNK) + /* * Private extension to RPC-over-RDMA Version One. * Message passed during RDMA-CM connection set-up. diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 599ee03ee3fb..a770d200f607 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -209,9 +209,6 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, /* svc_rdma_marshal.c */ extern int svc_rdma_xdr_decode_req(struct xdr_buf *); -extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *, - struct rpcrdma_msg *, - enum rpcrdma_errcode, __be32 *); extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int); extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int, __be32, __be64, u32); @@ -244,8 +241,6 @@ extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, struct svc_rdma_op_ctxt *ctxt, int num_sge, u32 inv_rkey); extern int svc_rdma_sendto(struct svc_rqst *); -extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *, - int); /* svc_rdma_transport.c */ extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *); diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c index 1c4aabf0f657..ae58a897eca0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c @@ -167,25 +167,6 @@ out_inval: return -EINVAL; } -int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt, - struct rpcrdma_msg *rmsgp, - enum rpcrdma_errcode err, __be32 *va) -{ - __be32 *startp = va; - - *va++ = rmsgp->rm_xid; - *va++ = rmsgp->rm_vers; - *va++ = xprt->sc_fc_credits; - *va++ = rdma_error; - *va++ = cpu_to_be32(err); - if (err == ERR_VERS) { - *va++ = rpcrdma_version; - *va++ = rpcrdma_version; - } - - return (int)((unsigned long)va - (unsigned long)startp); -} - /** * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header * @rdma_resp: buffer containing Reply transport header diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index f7b2daf72a86..7435cb666f42 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -558,6 +558,56 @@ static void rdma_read_complete(struct svc_rqst *rqstp, rqstp->rq_arg.buflen = head->arg.buflen; } +static void svc_rdma_send_error(struct svcxprt_rdma *xprt, + __be32 *rdma_argp, int status) +{ + struct svc_rdma_op_ctxt *ctxt; + __be32 *p, *err_msgp; + unsigned int length; + struct page *page; + int ret; + + ret = svc_rdma_repost_recv(xprt, GFP_KERNEL); + if (ret) + return; + + page = alloc_page(GFP_KERNEL); + if (!page) + return; + err_msgp = page_address(page); + + p = err_msgp; + *p++ = *rdma_argp; + *p++ = *(rdma_argp + 1); + *p++ = xprt->sc_fc_credits; + *p++ = rdma_error; + if (status == -EPROTONOSUPPORT) { + *p++ = err_vers; + *p++ = rpcrdma_version; + *p++ = rpcrdma_version; + } else { + *p++ = err_chunk; + } + length = (unsigned long)p - (unsigned long)err_msgp; + + /* Map transport header; no RPC message payload */ + ctxt = svc_rdma_get_context(xprt); + ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); + if (ret) { + dprintk("svcrdma: Error %d mapping send for protocol error\n", + ret); + return; + } + + ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); + if (ret) { + dprintk("svcrdma: Error %d posting send for protocol error\n", + ret); + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 1); + } +} + /* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes * the RPC/RDMA header small and fixed in size, so it is @@ -686,7 +736,7 @@ complete: return ret; out_err: - svc_rdma_send_error(rdma_xprt, rmsgp, ret); + svc_rdma_send_error(rdma_xprt, &rmsgp->rm_xid, ret); svc_rdma_put_context(ctxt, 0); return 0; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index ce62b78e5bc9..0b646e8f23c7 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -710,46 +710,3 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) set_bit(XPT_CLOSE, &xprt->xpt_flags); return -ENOTCONN; } - -void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, - int status) -{ - struct page *p; - struct svc_rdma_op_ctxt *ctxt; - enum rpcrdma_errcode err; - __be32 *va; - int length; - int ret; - - ret = svc_rdma_repost_recv(xprt, GFP_KERNEL); - if (ret) - return; - - p = alloc_page(GFP_KERNEL); - if (!p) - return; - va = page_address(p); - - /* XDR encode an error reply */ - err = ERR_CHUNK; - if (status == -EPROTONOSUPPORT) - err = ERR_VERS; - length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); - - /* Map transport header; no RPC message payload */ - ctxt = svc_rdma_get_context(xprt); - ret = svc_rdma_map_reply_hdr(xprt, ctxt, &rmsgp->rm_xid, length); - if (ret) { - dprintk("svcrdma: Error %d mapping send for protocol error\n", - ret); - return; - } - - ret = svc_rdma_post_send_wr(xprt, ctxt, 1, 0); - if (ret) { - dprintk("svcrdma: Error %d posting send for protocol error\n", - ret); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 1); - } -} -- cgit v1.2.3 From 4757d90b15d851b9986bfca745eacc176359c13d Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:41 -0400 Subject: svcrdma: Report Write/Reply chunk overruns Observed at Connectathon 2017. If a client has underestimated the size of a Write or Reply chunk, the Linux server writes as much payload data as it can, then it recognizes there was a problem and closes the connection without sending the transport header. This creates a couple of problems: <> The client never receives indication of the server-side failure, so it continues to retransmit the bad RPC. Forward progress on the transport is blocked. <> The reply payload pages are not moved out of the svc_rqst, thus they can be released by the RPC server before the RDMA Writes have completed. The new rdma_rw-ized helpers return a distinct error code when a Write/Reply chunk overrun occurs, so it's now easy for the caller (svc_rdma_sendto) to recognize this case. Instead of dropping the connection, post an RDMA_ERROR message. The client now sees an RDMA_ERROR and can properly terminate the RPC transaction. As part of the new logic, set up the same delayed release for these payload pages as would have occurred in the normal case. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: J. Bruce Fields --- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 58 +++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 0b646e8f23c7..e514f6864a93 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -621,6 +621,48 @@ err: return ret; } +/* Given the client-provided Write and Reply chunks, the server was not + * able to form a complete reply. Return an RDMA_ERROR message so the + * client can retire this RPC transaction. As above, the Send completion + * routine releases payload pages that were part of a previous RDMA Write. + * + * Remote Invalidation is skipped for simplicity. + */ +static int svc_rdma_send_error_msg(struct svcxprt_rdma *rdma, + __be32 *rdma_resp, struct svc_rqst *rqstp) +{ + struct svc_rdma_op_ctxt *ctxt; + __be32 *p; + int ret; + + ctxt = svc_rdma_get_context(rdma); + + /* Replace the original transport header with an + * RDMA_ERROR response. XID etc are preserved. + */ + p = rdma_resp + 3; + *p++ = rdma_error; + *p = err_chunk; + + ret = svc_rdma_map_reply_hdr(rdma, ctxt, rdma_resp, 20); + if (ret < 0) + goto err; + + svc_rdma_save_io_pages(rqstp, ctxt); + + ret = svc_rdma_post_send_wr(rdma, ctxt, 1 + ret, 0); + if (ret) + goto err; + + return 0; + +err: + pr_err("svcrdma: failed to post Send WR (%d)\n", ret); + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 1); + return ret; +} + void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) { } @@ -683,13 +725,13 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) /* XXX: Presume the client sent only one Write chunk */ ret = svc_rdma_send_write_chunk(rdma, wr_lst, xdr); if (ret < 0) - goto err1; + goto err2; svc_rdma_xdr_encode_write_list(rdma_resp, wr_lst, ret); } if (rp_ch) { ret = svc_rdma_send_reply_chunk(rdma, rp_ch, wr_lst, xdr); if (ret < 0) - goto err1; + goto err2; svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch, ret); } @@ -702,6 +744,18 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) goto err0; return 0; + err2: + if (ret != -E2BIG) + goto err1; + + ret = svc_rdma_post_recv(rdma, GFP_KERNEL); + if (ret) + goto err1; + ret = svc_rdma_send_error_msg(rdma, rdma_resp, rqstp); + if (ret < 0) + goto err0; + return 0; + err1: put_page(res_page); err0: -- cgit v1.2.3 From f5821c76b2c9c2fb98b276c0bf6a101bfe9050a3 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:49 -0400 Subject: svcrdma: Clean up RPC-over-RDMA backchannel reply processing Replace C structure-based XDR decoding with pointer arithmetic. Pointer arithmetic is considered more portable. Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 2 +- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 18 ++++++++++++++---- net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 27 +++++++++++++++------------ 3 files changed, 30 insertions(+), 17 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index a770d200f607..44d642bbfce6 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -204,7 +204,7 @@ static inline void svc_rdma_count_mappings(struct svcxprt_rdma *rdma, /* svc_rdma_backchannel.c */ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, - struct rpcrdma_msg *rmsgp, + __be32 *rdma_resp, struct xdr_buf *rcvbuf); /* svc_rdma_marshal.c */ diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c index bf185b79c98f..c676ed0efb5a 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -12,7 +12,17 @@ #undef SVCRDMA_BACKCHANNEL_DEBUG -int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, +/** + * svc_rdma_handle_bc_reply - Process incoming backchannel reply + * @xprt: controlling backchannel transport + * @rdma_resp: pointer to incoming transport header + * @rcvbuf: XDR buffer into which to decode the reply + * + * Returns: + * %0 if @rcvbuf is filled in, xprt_complete_rqst called, + * %-EAGAIN if server should call ->recvfrom again. + */ +int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, struct xdr_buf *rcvbuf) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); @@ -27,13 +37,13 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, p = (__be32 *)src->iov_base; len = src->iov_len; - xid = rmsgp->rm_xid; + xid = *rdma_resp; #ifdef SVCRDMA_BACKCHANNEL_DEBUG pr_info("%s: xid=%08x, length=%zu\n", __func__, be32_to_cpu(xid), len); pr_info("%s: RPC/RDMA: %*ph\n", - __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp); + __func__, (int)RPCRDMA_HDRLEN_MIN, rdma_resp); pr_info("%s: RPC: %*ph\n", __func__, (int)len, p); #endif @@ -53,7 +63,7 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, goto out_unlock; memcpy(dst->iov_base, p, len); - credits = be32_to_cpu(rmsgp->rm_credit); + credits = be32_to_cpup(rdma_resp + 2); if (credits == 0) credits = 1; /* don't deadlock */ else if (credits > r_xprt->rx_buf.rb_bc_max_requests) diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 7435cb666f42..27a99bf5b1a6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -613,28 +613,30 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, * the RPC/RDMA header small and fixed in size, so it is * straightforward to check the RPC header's direction field. */ -static bool -svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp) +static bool svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, + __be32 *rdma_resp) { - __be32 *p = (__be32 *)rmsgp; + __be32 *p; if (!xprt->xpt_bc_xprt) return false; - if (rmsgp->rm_type != rdma_msg) + p = rdma_resp + 3; + if (*p++ != rdma_msg) return false; - if (rmsgp->rm_body.rm_chunks[0] != xdr_zero) + + if (*p++ != xdr_zero) return false; - if (rmsgp->rm_body.rm_chunks[1] != xdr_zero) + if (*p++ != xdr_zero) return false; - if (rmsgp->rm_body.rm_chunks[2] != xdr_zero) + if (*p++ != xdr_zero) return false; - /* sanity */ - if (p[7] != rmsgp->rm_xid) + /* XID sanity */ + if (*p++ != *rdma_resp) return false; /* call direction */ - if (p[8] == cpu_to_be32(RPC_CALL)) + if (*p == cpu_to_be32(RPC_CALL)) return false; return true; @@ -700,8 +702,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) goto out_drop; rqstp->rq_xprt_hlen = ret; - if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) { - ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp, + if (svc_rdma_is_backchannel_reply(xprt, &rmsgp->rm_xid)) { + ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, + &rmsgp->rm_xid, &rqstp->rq_arg); svc_rdma_put_context(ctxt, 0); if (ret) -- cgit v1.2.3 From ded8d19641a605232ab48f5d27f542648beba3cc Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:06:57 -0400 Subject: svcrdma: Reduce size of sge array in struct svc_rdma_op_ctxt The sge array in struct svc_rdma_op_ctxt is no longer used for sending RDMA Write WRs. It need only accommodate the construction of Send and Receive WRs. The maximum inline size is the largest payload it needs to handle now. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 9 +++++++-- net/sunrpc/xprtrdma/svc_rdma.c | 6 +++--- 2 files changed, 10 insertions(+), 5 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 44d642bbfce6..e84b77556784 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -48,6 +48,12 @@ #include #define SVCRDMA_DEBUG +/* Default and maximum inline threshold sizes */ +enum { + RPCRDMA_DEF_INLINE_THRESH = 4096, + RPCRDMA_MAX_INLINE_THRESH = 65536 +}; + /* RPC/RDMA parameters and stats */ extern unsigned int svcrdma_ord; extern unsigned int svcrdma_max_requests; @@ -86,7 +92,7 @@ struct svc_rdma_op_ctxt { int count; unsigned int mapped_sges; struct ib_send_wr send_wr; - struct ib_sge sge[RPCSVC_MAXPAGES]; + struct ib_sge sge[1 + RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE]; struct page *pages[RPCSVC_MAXPAGES]; }; @@ -186,7 +192,6 @@ struct svcxprt_rdma { * page size of 4k, or 32k * 2 ops / 4k = 16 outstanding RDMA_READ. */ #define RPCRDMA_ORD (64/4) #define RPCRDMA_MAX_REQUESTS 32 -#define RPCRDMA_MAX_REQ_SIZE 4096 /* Typical ULP usage of BC requests is NFSv4.1 backchannel. Our * current NFSv4.1 implementation supports one backchannel slot. diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index 912444174647..a4a8f6989ee7 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -58,9 +58,9 @@ unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS; unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS; static unsigned int min_max_requests = 4; static unsigned int max_max_requests = 16384; -unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE; -static unsigned int min_max_inline = 4096; -static unsigned int max_max_inline = 65536; +unsigned int svcrdma_max_req_size = RPCRDMA_DEF_INLINE_THRESH; +static unsigned int min_max_inline = RPCRDMA_DEF_INLINE_THRESH; +static unsigned int max_max_inline = RPCRDMA_MAX_INLINE_THRESH; atomic_t rdma_stat_recv; atomic_t rdma_stat_read; -- cgit v1.2.3 From 68cc4636bbbca89b9fedcf46d8b6bee444fc5e4e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:07:05 -0400 Subject: svcrdma: Remove unused RDMA Write completion handler Clean up. All RDMA Write completions are now handled by svc_rdma_wc_write_ctx. Signed-off-by: Chuck Lever Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 1 - net/sunrpc/xprtrdma/svc_rdma_transport.c | 18 ------------------ 2 files changed, 19 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index e84b77556784..f58c5349beb7 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -249,7 +249,6 @@ extern int svc_rdma_sendto(struct svc_rqst *); /* svc_rdma_transport.c */ extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *); -extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *); extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *); extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *); extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *); diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 237c377c1e06..1597ca8ba3c0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -473,24 +473,6 @@ void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) svc_rdma_put_context(ctxt, 1); } -/** - * svc_rdma_wc_write - Invoked by RDMA provider for each polled Write WC - * @cq: completion queue - * @wc: completed WR - * - */ -void svc_rdma_wc_write(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct svc_rdma_op_ctxt *ctxt; - - svc_rdma_send_wc_common_put(cq, wc, "write"); - - ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe); - svc_rdma_unmap_dma(ctxt); - svc_rdma_put_context(ctxt, 0); -} - /** * svc_rdma_wc_reg - Invoked by RDMA provider for each polled FASTREG WC * @cq: completion queue -- cgit v1.2.3 From 2cf32924c68a22783e6f630e1b5345a80aa1a376 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:07:13 -0400 Subject: svcrdma: Remove the req_map cache req_maps are no longer used by the send path and can thus be removed. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 34 +------------ net/sunrpc/xprtrdma/svc_rdma_sendto.c | 68 -------------------------- net/sunrpc/xprtrdma/svc_rdma_transport.c | 84 -------------------------------- 3 files changed, 1 insertion(+), 185 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index f58c5349beb7..479bb7f65233 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -96,23 +96,6 @@ struct svc_rdma_op_ctxt { struct page *pages[RPCSVC_MAXPAGES]; }; -/* - * NFS_ requests are mapped on the client side by the chunk lists in - * the RPCRDMA header. During the fetching of the RPC from the client - * and the writing of the reply to the client, the memory in the - * client and the memory in the server must be mapped as contiguous - * vaddr/len for access by the hardware. These data strucures keep - * these mappings. - * - * For an RDMA_WRITE, the 'sge' maps the RPC REPLY. For RDMA_READ, the - * 'sge' in the svc_rdma_req_map maps the server side RPC reply and the - * 'ch' field maps the read-list of the RPCRDMA header to the 'sge' - * mapping of the reply. - */ -struct svc_rdma_chunk_sge { - int start; /* sge no for this chunk */ - int count; /* sge count for this chunk */ -}; struct svc_rdma_fastreg_mr { struct ib_mr *mr; struct scatterlist *sg; @@ -121,15 +104,7 @@ struct svc_rdma_fastreg_mr { enum dma_data_direction direction; struct list_head frmr_list; }; -struct svc_rdma_req_map { - struct list_head free; - unsigned long count; - union { - struct kvec sge[RPCSVC_MAXPAGES]; - struct svc_rdma_chunk_sge ch[RPCSVC_MAXPAGES]; - unsigned long lkey[RPCSVC_MAXPAGES]; - }; -}; + #define RDMACTXT_F_LAST_CTXT 2 #define SVCRDMA_DEVCAP_FAST_REG 1 /* fast mr registration */ @@ -160,8 +135,6 @@ struct svcxprt_rdma { int sc_ctxt_used; spinlock_t sc_rw_ctxt_lock; struct list_head sc_rw_ctxts; - spinlock_t sc_map_lock; - struct list_head sc_maps; struct list_head sc_rq_dto_q; spinlock_t sc_rq_dto_lock; @@ -237,8 +210,6 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, struct xdr_buf *xdr); /* svc_rdma_sendto.c */ -extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *, - struct svc_rdma_req_map *, bool); extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, struct svc_rdma_op_ctxt *ctxt, __be32 *rdma_resp, unsigned int len); @@ -259,9 +230,6 @@ extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *); extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *); extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int); extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt); -extern struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *); -extern void svc_rdma_put_req_map(struct svcxprt_rdma *, - struct svc_rdma_req_map *); extern struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *); extern void svc_rdma_put_frmr(struct svcxprt_rdma *, struct svc_rdma_fastreg_mr *); diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index e514f6864a93..1736337f3a55 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -240,74 +240,6 @@ static void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch, xdr_encode_write_chunk(p, rp_ch, consumed); } -int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, - struct xdr_buf *xdr, - struct svc_rdma_req_map *vec, - bool write_chunk_present) -{ - int sge_no; - u32 sge_bytes; - u32 page_bytes; - u32 page_off; - int page_no; - - if (xdr->len != - (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) { - pr_err("svcrdma: %s: XDR buffer length error\n", __func__); - return -EIO; - } - - /* Skip the first sge, this is for the RPCRDMA header */ - sge_no = 1; - - /* Head SGE */ - vec->sge[sge_no].iov_base = xdr->head[0].iov_base; - vec->sge[sge_no].iov_len = xdr->head[0].iov_len; - sge_no++; - - /* pages SGE */ - page_no = 0; - page_bytes = xdr->page_len; - page_off = xdr->page_base; - while (page_bytes) { - vec->sge[sge_no].iov_base = - page_address(xdr->pages[page_no]) + page_off; - sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off)); - page_bytes -= sge_bytes; - vec->sge[sge_no].iov_len = sge_bytes; - - sge_no++; - page_no++; - page_off = 0; /* reset for next time through loop */ - } - - /* Tail SGE */ - if (xdr->tail[0].iov_len) { - unsigned char *base = xdr->tail[0].iov_base; - size_t len = xdr->tail[0].iov_len; - u32 xdr_pad = xdr_padsize(xdr->page_len); - - if (write_chunk_present && xdr_pad) { - base += xdr_pad; - len -= xdr_pad; - } - - if (len) { - vec->sge[sge_no].iov_base = base; - vec->sge[sge_no].iov_len = len; - sge_no++; - } - } - - dprintk("svcrdma: %s: sge_no %d page_no %d " - "page_base %u page_len %u head_len %zu tail_len %zu\n", - __func__, sge_no, page_no, xdr->page_base, xdr->page_len, - xdr->head[0].iov_len, xdr->tail[0].iov_len); - - vec->count = sge_no; - return 0; -} - /* Parse the RPC Call's transport header. */ static void svc_rdma_get_write_arrays(__be32 *rdma_argp, diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 1597ca8ba3c0..a9d9cb1ba4c6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -272,85 +272,6 @@ static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) } } -static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) -{ - struct svc_rdma_req_map *map; - - map = kmalloc(sizeof(*map), flags); - if (map) - INIT_LIST_HEAD(&map->free); - return map; -} - -static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) -{ - unsigned int i; - - /* One for each receive buffer on this connection. */ - i = xprt->sc_max_requests; - - while (i--) { - struct svc_rdma_req_map *map; - - map = alloc_req_map(GFP_KERNEL); - if (!map) { - dprintk("svcrdma: No memory for request map\n"); - return false; - } - list_add(&map->free, &xprt->sc_maps); - } - return true; -} - -struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) -{ - struct svc_rdma_req_map *map = NULL; - - spin_lock(&xprt->sc_map_lock); - if (list_empty(&xprt->sc_maps)) - goto out_empty; - - map = list_first_entry(&xprt->sc_maps, - struct svc_rdma_req_map, free); - list_del_init(&map->free); - spin_unlock(&xprt->sc_map_lock); - -out: - map->count = 0; - return map; - -out_empty: - spin_unlock(&xprt->sc_map_lock); - - /* Pre-allocation amount was incorrect */ - map = alloc_req_map(GFP_NOIO); - if (map) - goto out; - - WARN_ONCE(1, "svcrdma: empty request map list?\n"); - return NULL; -} - -void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, - struct svc_rdma_req_map *map) -{ - spin_lock(&xprt->sc_map_lock); - list_add(&map->free, &xprt->sc_maps); - spin_unlock(&xprt->sc_map_lock); -} - -static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) -{ - while (!list_empty(&xprt->sc_maps)) { - struct svc_rdma_req_map *map; - - map = list_first_entry(&xprt->sc_maps, - struct svc_rdma_req_map, free); - list_del(&map->free); - kfree(map); - } -} - /* QP event handler */ static void qp_event_handler(struct ib_event *event, void *context) { @@ -544,7 +465,6 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); INIT_LIST_HEAD(&cma_xprt->sc_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); - INIT_LIST_HEAD(&cma_xprt->sc_maps); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); @@ -552,7 +472,6 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, spin_lock_init(&cma_xprt->sc_frmr_q_lock); spin_lock_init(&cma_xprt->sc_ctxt_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); - spin_lock_init(&cma_xprt->sc_map_lock); /* * Note that this implies that the underlying transport support @@ -1004,8 +923,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) if (!svc_rdma_prealloc_ctxts(newxprt)) goto errout; - if (!svc_rdma_prealloc_maps(newxprt)) - goto errout; /* * Limit ORD based on client limit, local device limit, and @@ -1237,7 +1154,6 @@ static void __svc_rdma_free(struct work_struct *work) rdma_dealloc_frmr_q(rdma); svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_destroy_ctxts(rdma); - svc_rdma_destroy_maps(rdma); /* Destroy the QP if present (not a listener) */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) -- cgit v1.2.3 From dadf3e435debb85dfcf28c157012047153a21a97 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Sun, 9 Apr 2017 13:07:21 -0400 Subject: svcrdma: Clean out old XDR encoders Clean up: These have been replaced and are no longer used. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: J. Bruce Fields --- include/linux/sunrpc/svc_rdma.h | 4 -- net/sunrpc/xprtrdma/svc_rdma_marshal.c | 70 ---------------------------------- 2 files changed, 74 deletions(-) (limited to 'net') diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h index 479bb7f65233..f3787d800ba4 100644 --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@ -187,10 +187,6 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, /* svc_rdma_marshal.c */ extern int svc_rdma_xdr_decode_req(struct xdr_buf *); -extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int); -extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int, - __be32, __be64, u32); -extern unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp); /* svc_rdma_recvfrom.c */ extern int svc_rdma_recvfrom(struct svc_rqst *); diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c index ae58a897eca0..bdcf7d85a3dc 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c @@ -166,73 +166,3 @@ out_inval: dprintk("svcrdma: failed to parse transport header\n"); return -EINVAL; } - -/** - * svc_rdma_xdr_get_reply_hdr_length - Get length of Reply transport header - * @rdma_resp: buffer containing Reply transport header - * - * Returns length of transport header, in bytes. - */ -unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp) -{ - unsigned int nsegs; - __be32 *p; - - p = rdma_resp; - - /* RPC-over-RDMA V1 replies never have a Read list. */ - p += rpcrdma_fixed_maxsz + 1; - - /* Skip Write list. */ - while (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } - - /* Skip Reply chunk. */ - if (*p++ != xdr_zero) { - nsegs = be32_to_cpup(p++); - p += nsegs * rpcrdma_segment_maxsz; - } - - return (unsigned long)p - (unsigned long)rdma_resp; -} - -void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks) -{ - struct rpcrdma_write_array *ary; - - /* no read-list */ - rmsgp->rm_body.rm_chunks[0] = xdr_zero; - - /* write-array discrim */ - ary = (struct rpcrdma_write_array *) - &rmsgp->rm_body.rm_chunks[1]; - ary->wc_discrim = xdr_one; - ary->wc_nchunks = cpu_to_be32(chunks); - - /* write-list terminator */ - ary->wc_array[chunks].wc_target.rs_handle = xdr_zero; - - /* reply-array discriminator */ - ary->wc_array[chunks].wc_target.rs_length = xdr_zero; -} - -void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary, - int chunks) -{ - ary->wc_discrim = xdr_one; - ary->wc_nchunks = cpu_to_be32(chunks); -} - -void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary, - int chunk_no, - __be32 rs_handle, - __be64 rs_offset, - u32 write_len) -{ - struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target; - seg->rs_handle = rs_handle; - seg->rs_offset = rs_offset; - seg->rs_length = cpu_to_be32(write_len); -} -- cgit v1.2.3 From 9e0d87680d689f1758185851c3da6eafb16e71e1 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 26 Apr 2017 11:55:26 -0400 Subject: SUNRPC: Refactor svc_set_num_threads() Refactor to separate out the functions of starting and stopping threads so that they can be used in other helpers. Signed-off-by: Trond Myklebust Tested-and-reviewed-by: Kinglong Mee Signed-off-by: J. Bruce Fields --- net/sunrpc/svc.c | 96 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 38 deletions(-) (limited to 'net') diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index a08aeb56b8e4..98dc33ae738b 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -702,59 +702,32 @@ found_pool: return task; } -/* - * Create or destroy enough new threads to make the number - * of threads the given number. If `pool' is non-NULL, applies - * only to threads in that pool, otherwise round-robins between - * all pools. Caller must ensure that mutual exclusion between this and - * server startup or shutdown. - * - * Destroying threads relies on the service threads filling in - * rqstp->rq_task, which only the nfs ones do. Assumes the serv - * has been created using svc_create_pooled(). - * - * Based on code that used to be in nfsd_svc() but tweaked - * to be pool-aware. - */ -int -svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +/* create new threads */ +static int +svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) { struct svc_rqst *rqstp; struct task_struct *task; struct svc_pool *chosen_pool; - int error = 0; unsigned int state = serv->sv_nrthreads-1; int node; - if (pool == NULL) { - /* The -1 assumes caller has done a svc_get() */ - nrservs -= (serv->sv_nrthreads-1); - } else { - spin_lock_bh(&pool->sp_lock); - nrservs -= pool->sp_nrthreads; - spin_unlock_bh(&pool->sp_lock); - } - - /* create new threads */ - while (nrservs > 0) { + do { nrservs--; chosen_pool = choose_pool(serv, pool, &state); node = svc_pool_map_get_node(chosen_pool->sp_id); rqstp = svc_prepare_thread(serv, chosen_pool, node); - if (IS_ERR(rqstp)) { - error = PTR_ERR(rqstp); - break; - } + if (IS_ERR(rqstp)) + return PTR_ERR(rqstp); __module_get(serv->sv_ops->svo_module); task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp, node, "%s", serv->sv_name); if (IS_ERR(task)) { - error = PTR_ERR(task); module_put(serv->sv_ops->svo_module); svc_exit_thread(rqstp); - break; + return PTR_ERR(task); } rqstp->rq_task = task; @@ -763,15 +736,62 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) svc_sock_update_bufs(serv); wake_up_process(task); - } + } while (nrservs > 0); + + return 0; +} + + +/* destroy old threads */ +static int +svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + struct task_struct *task; + unsigned int state = serv->sv_nrthreads-1; + /* destroy old threads */ - while (nrservs < 0 && - (task = choose_victim(serv, pool, &state)) != NULL) { + do { + task = choose_victim(serv, pool, &state); + if (task == NULL) + break; send_sig(SIGINT, task, 1); nrservs++; + } while (nrservs < 0); + + return 0; +} + +/* + * Create or destroy enough new threads to make the number + * of threads the given number. If `pool' is non-NULL, applies + * only to threads in that pool, otherwise round-robins between + * all pools. Caller must ensure that mutual exclusion between this and + * server startup or shutdown. + * + * Destroying threads relies on the service threads filling in + * rqstp->rq_task, which only the nfs ones do. Assumes the serv + * has been created using svc_create_pooled(). + * + * Based on code that used to be in nfsd_svc() but tweaked + * to be pool-aware. + */ +int +svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + if (pool == NULL) { + /* The -1 assumes caller has done a svc_get() */ + nrservs -= (serv->sv_nrthreads-1); + } else { + spin_lock_bh(&pool->sp_lock); + nrservs -= pool->sp_nrthreads; + spin_unlock_bh(&pool->sp_lock); } - return error; + if (nrservs > 0) + return svc_start_kthreads(serv, pool, nrservs); + if (nrservs < 0) + return svc_signal_kthreads(serv, pool, nrservs); + return 0; } EXPORT_SYMBOL_GPL(svc_set_num_threads); -- cgit v1.2.3 From ed6473ddc704a2005b9900ca08e236ebb2d8540a Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Wed, 26 Apr 2017 11:55:27 -0400 Subject: NFSv4: Fix callback server shutdown We want to use kthread_stop() in order to ensure the threads are shut down before we tear down the nfs_callback_info in nfs_callback_down. Tested-and-reviewed-by: Kinglong Mee Reported-by: Kinglong Mee Fixes: bb6aeba736ba9 ("NFSv4.x: Switch to using svc_set_num_threads()...") Signed-off-by: Trond Myklebust Signed-off-by: J. Bruce Fields --- fs/nfs/callback.c | 24 ++++++++++++++++-------- include/linux/sunrpc/svc.h | 1 + net/sunrpc/svc.c | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 8 deletions(-) (limited to 'net') diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c index c5e27ebd8da8..73a1f928226c 100644 --- a/fs/nfs/callback.c +++ b/fs/nfs/callback.c @@ -76,7 +76,10 @@ nfs4_callback_svc(void *vrqstp) set_freezable(); - while (!kthread_should_stop()) { + while (!kthread_freezable_should_stop(NULL)) { + + if (signal_pending(current)) + flush_signals(current); /* * Listen for a request on the socket */ @@ -85,6 +88,8 @@ nfs4_callback_svc(void *vrqstp) continue; svc_process(rqstp); } + svc_exit_thread(rqstp); + module_put_and_exit(0); return 0; } @@ -103,9 +108,10 @@ nfs41_callback_svc(void *vrqstp) set_freezable(); - while (!kthread_should_stop()) { - if (try_to_freeze()) - continue; + while (!kthread_freezable_should_stop(NULL)) { + + if (signal_pending(current)) + flush_signals(current); prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE); spin_lock_bh(&serv->sv_cb_lock); @@ -121,11 +127,13 @@ nfs41_callback_svc(void *vrqstp) error); } else { spin_unlock_bh(&serv->sv_cb_lock); - schedule(); + if (!kthread_should_stop()) + schedule(); finish_wait(&serv->sv_cb_waitq, &wq); } - flush_signals(current); } + svc_exit_thread(rqstp); + module_put_and_exit(0); return 0; } @@ -221,14 +229,14 @@ err_bind: static struct svc_serv_ops nfs40_cb_sv_ops = { .svo_function = nfs4_callback_svc, .svo_enqueue_xprt = svc_xprt_do_enqueue, - .svo_setup = svc_set_num_threads, + .svo_setup = svc_set_num_threads_sync, .svo_module = THIS_MODULE, }; #if defined(CONFIG_NFS_V4_1) static struct svc_serv_ops nfs41_cb_sv_ops = { .svo_function = nfs41_callback_svc, .svo_enqueue_xprt = svc_xprt_do_enqueue, - .svo_setup = svc_set_num_threads, + .svo_setup = svc_set_num_threads_sync, .svo_module = THIS_MODULE, }; diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h index 6ef19cf658b4..94631026f79c 100644 --- a/include/linux/sunrpc/svc.h +++ b/include/linux/sunrpc/svc.h @@ -473,6 +473,7 @@ void svc_pool_map_put(void); struct svc_serv * svc_create_pooled(struct svc_program *, unsigned int, struct svc_serv_ops *); int svc_set_num_threads(struct svc_serv *, struct svc_pool *, int); +int svc_set_num_threads_sync(struct svc_serv *, struct svc_pool *, int); int svc_pool_stats_open(struct svc_serv *serv, struct file *file); void svc_destroy(struct svc_serv *); void svc_shutdown_net(struct svc_serv *, struct net *); diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index 98dc33ae738b..bc0f5a0ecbdc 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -795,6 +795,44 @@ svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) } EXPORT_SYMBOL_GPL(svc_set_num_threads); +/* destroy old threads */ +static int +svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + struct task_struct *task; + unsigned int state = serv->sv_nrthreads-1; + + /* destroy old threads */ + do { + task = choose_victim(serv, pool, &state); + if (task == NULL) + break; + kthread_stop(task); + nrservs++; + } while (nrservs < 0); + return 0; +} + +int +svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs) +{ + if (pool == NULL) { + /* The -1 assumes caller has done a svc_get() */ + nrservs -= (serv->sv_nrthreads-1); + } else { + spin_lock_bh(&pool->sp_lock); + nrservs -= pool->sp_nrthreads; + spin_unlock_bh(&pool->sp_lock); + } + + if (nrservs > 0) + return svc_start_kthreads(serv, pool, nrservs); + if (nrservs < 0) + return svc_stop_kthreads(serv, pool, nrservs); + return 0; +} +EXPORT_SYMBOL_GPL(svc_set_num_threads_sync); + /* * Called from a server thread as it's exiting. Caller must hold the "service * mutex" for the service. -- cgit v1.2.3