diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/sched.c | 7 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 108 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 182 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 411 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 111 |
5 files changed, 472 insertions, 347 deletions
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index d20f2329eea3..b91fd9c597b4 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -844,10 +844,10 @@ static void rpc_async_schedule(struct work_struct *work) void *rpc_malloc(struct rpc_task *task, size_t size) { struct rpc_buffer *buf; - gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN; + gfp_t gfp = GFP_NOIO | __GFP_NOWARN; if (RPC_IS_SWAPPER(task)) - gfp |= __GFP_MEMALLOC; + gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; size += sizeof(struct rpc_buffer); if (size <= RPC_BUFFER_MAXSIZE) @@ -1069,7 +1069,8 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1); + /* Note: highpri because network receive is latency sensitive */ + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); rpciod_workqueue = wq; return rpciod_workqueue != NULL; } diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index df01d124936c..7e9acd9361c5 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -209,9 +209,11 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, if (cur_rchunk) { /* read */ cur_rchunk->rc_discrim = xdr_one; /* all read chunks have the same "position" */ - cur_rchunk->rc_position = htonl(pos); - cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey); - cur_rchunk->rc_target.rs_length = htonl(seg->mr_len); + cur_rchunk->rc_position = cpu_to_be32(pos); + cur_rchunk->rc_target.rs_handle = + cpu_to_be32(seg->mr_rkey); + cur_rchunk->rc_target.rs_length = + cpu_to_be32(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_rchunk->rc_target.rs_offset, seg->mr_base); @@ -222,8 +224,10 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, cur_rchunk++; r_xprt->rx_stats.read_chunk_count++; } else { /* write/reply */ - cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey); - cur_wchunk->wc_target.rs_length = htonl(seg->mr_len); + cur_wchunk->wc_target.rs_handle = + cpu_to_be32(seg->mr_rkey); + cur_wchunk->wc_target.rs_length = + cpu_to_be32(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_wchunk->wc_target.rs_offset, seg->mr_base); @@ -257,7 +261,7 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, *iptr++ = xdr_zero; /* encode a NULL reply chunk */ } else { warray->wc_discrim = xdr_one; - warray->wc_nchunks = htonl(nchunks); + warray->wc_nchunks = cpu_to_be32(nchunks); iptr = (__be32 *) cur_wchunk; if (type == rpcrdma_writech) { *iptr++ = xdr_zero; /* finish the write chunk list */ @@ -290,7 +294,7 @@ ssize_t rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result) { struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base; + struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf); if (req->rl_rtype != rpcrdma_noch) result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, @@ -402,13 +406,12 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) base = rqst->rq_svec[0].iov_base; rpclen = rqst->rq_svec[0].iov_len; - /* build RDMA header in private area at front */ - headerp = (struct rpcrdma_msg *) req->rl_base; - /* don't htonl XID, it's already done in request */ + headerp = rdmab_to_msg(req->rl_rdmabuf); + /* don't byte-swap XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; - headerp->rm_vers = xdr_one; - headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests); - headerp->rm_type = htonl(RDMA_MSG); + headerp->rm_vers = rpcrdma_version; + headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); + headerp->rm_type = rdma_msg; /* * Chunks needed for results? @@ -468,7 +471,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) return -EIO; } - hdrlen = 28; /*sizeof *headerp;*/ + hdrlen = RPCRDMA_HDRLEN_MIN; padlen = 0; /* @@ -482,11 +485,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) RPCRDMA_INLINE_PAD_VALUE(rqst)); if (padlen) { - headerp->rm_type = htonl(RDMA_MSGP); + headerp->rm_type = rdma_msgp; headerp->rm_body.rm_padded.rm_align = - htonl(RPCRDMA_INLINE_PAD_VALUE(rqst)); + cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst)); headerp->rm_body.rm_padded.rm_thresh = - htonl(RPCRDMA_INLINE_PAD_THRESH); + cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH); headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero; headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; @@ -524,7 +527,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" " headerp 0x%p base 0x%p lkey 0x%x\n", __func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen, - headerp, base, req->rl_iov.lkey); + headerp, base, rdmab_lkey(req->rl_rdmabuf)); /* * initialize send_iov's - normally only two: rdma chunk header and @@ -533,26 +536,26 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * header and any write data. In all non-rdma cases, any following * data has been copied into the RPC header buffer. */ - req->rl_send_iov[0].addr = req->rl_iov.addr; + req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = hdrlen; - req->rl_send_iov[0].lkey = req->rl_iov.lkey; + req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); - req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base); + req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); req->rl_send_iov[1].length = rpclen; - req->rl_send_iov[1].lkey = req->rl_iov.lkey; + req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); req->rl_niovs = 2; if (padlen) { struct rpcrdma_ep *ep = &r_xprt->rx_ep; - req->rl_send_iov[2].addr = ep->rep_pad.addr; + req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf); req->rl_send_iov[2].length = padlen; - req->rl_send_iov[2].lkey = ep->rep_pad.lkey; + req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf); req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; req->rl_send_iov[3].length = rqst->rq_slen - rpclen; - req->rl_send_iov[3].lkey = req->rl_iov.lkey; + req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf); req->rl_niovs = 4; } @@ -569,8 +572,9 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b { unsigned int i, total_len; struct rpcrdma_write_chunk *cur_wchunk; + char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); - i = ntohl(**iptrp); /* get array count */ + i = be32_to_cpu(**iptrp); if (i > max) return -1; cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); @@ -582,11 +586,11 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n", __func__, - ntohl(seg->rs_length), + be32_to_cpu(seg->rs_length), (unsigned long long)off, - ntohl(seg->rs_handle)); + be32_to_cpu(seg->rs_handle)); } - total_len += ntohl(seg->rs_length); + total_len += be32_to_cpu(seg->rs_length); ++cur_wchunk; } /* check and adjust for properly terminated write chunk */ @@ -596,7 +600,7 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b return -1; cur_wchunk = (struct rpcrdma_write_chunk *) w; } - if ((char *) cur_wchunk > rep->rr_base + rep->rr_len) + if ((char *)cur_wchunk > base + rep->rr_len) return -1; *iptrp = (__be32 *) cur_wchunk; @@ -691,7 +695,9 @@ rpcrdma_connect_worker(struct work_struct *work) { struct rpcrdma_ep *ep = container_of(work, struct rpcrdma_ep, rep_connect_worker.work); - struct rpc_xprt *xprt = ep->rep_xprt; + struct rpcrdma_xprt *r_xprt = + container_of(ep, struct rpcrdma_xprt, rx_ep); + struct rpc_xprt *xprt = &r_xprt->rx_xprt; spin_lock_bh(&xprt->transport_lock); if (++xprt->connect_cookie == 0) /* maintain a reserved value */ @@ -732,7 +738,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpc_xprt *xprt = rep->rr_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); __be32 *iptr; - int rdmalen, status; + int credits, rdmalen, status; unsigned long cwnd; /* Check status. If bad, signal disconnect and return rep to pool */ @@ -744,14 +750,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) } return; } - if (rep->rr_len < 28) { + if (rep->rr_len < RPCRDMA_HDRLEN_MIN) { dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; } - headerp = (struct rpcrdma_msg *) rep->rr_base; - if (headerp->rm_vers != xdr_one) { + headerp = rdmab_to_msg(rep->rr_rdmabuf); + if (headerp->rm_vers != rpcrdma_version) { dprintk("RPC: %s: invalid version %d\n", - __func__, ntohl(headerp->rm_vers)); + __func__, be32_to_cpu(headerp->rm_vers)); goto repost; } @@ -762,7 +768,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) spin_unlock(&xprt->transport_lock); dprintk("RPC: %s: reply 0x%p failed " "to match any request xid 0x%08x len %d\n", - __func__, rep, headerp->rm_xid, rep->rr_len); + __func__, rep, be32_to_cpu(headerp->rm_xid), + rep->rr_len); repost: r_xprt->rx_stats.bad_reply_count++; rep->rr_func = rpcrdma_reply_handler; @@ -778,13 +785,14 @@ repost: spin_unlock(&xprt->transport_lock); dprintk("RPC: %s: duplicate reply 0x%p to RPC " "request 0x%p: xid 0x%08x\n", __func__, rep, req, - headerp->rm_xid); + be32_to_cpu(headerp->rm_xid)); goto repost; } dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", - __func__, rep, req, rqst, headerp->rm_xid); + __func__, rep, req, rqst, + be32_to_cpu(headerp->rm_xid)); /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; @@ -793,7 +801,7 @@ repost: /* check for expected message types */ /* The order of some of these tests is important. */ switch (headerp->rm_type) { - case htonl(RDMA_MSG): + case rdma_msg: /* never expect read chunks */ /* never expect reply chunks (two ways to check) */ /* never expect write chunks without having offered RDMA */ @@ -824,22 +832,24 @@ repost: } else { /* else ordinary inline */ rdmalen = 0; - iptr = (__be32 *)((unsigned char *)headerp + 28); - rep->rr_len -= 28; /*sizeof *headerp;*/ + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); + rep->rr_len -= RPCRDMA_HDRLEN_MIN; status = rep->rr_len; } /* Fix up the rpc results for upper layer */ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); break; - case htonl(RDMA_NOMSG): + case rdma_nomsg: /* never expect read or write chunks, always reply chunks */ if (headerp->rm_body.rm_chunks[0] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero || headerp->rm_body.rm_chunks[2] != xdr_one || req->rl_nchunks == 0) goto badheader; - iptr = (__be32 *)((unsigned char *)headerp + 28); + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); if (rdmalen < 0) goto badheader; @@ -853,7 +863,7 @@ badheader: dprintk("%s: invalid rpcrdma reply header (type %d):" " chunks[012] == %d %d %d" " expected chunks <= %d\n", - __func__, ntohl(headerp->rm_type), + __func__, be32_to_cpu(headerp->rm_type), headerp->rm_body.rm_chunks[0], headerp->rm_body.rm_chunks[1], headerp->rm_body.rm_chunks[2], @@ -863,8 +873,14 @@ badheader: break; } + credits = be32_to_cpu(headerp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > r_xprt->rx_buf.rb_max_requests) + credits = r_xprt->rx_buf.rb_max_requests; + cwnd = xprt->cwnd; - xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; + xprt->cwnd = credits << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index bbd6155d3e34..2e192baa59f3 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -200,9 +200,9 @@ xprt_rdma_free_addresses(struct rpc_xprt *xprt) static void xprt_rdma_connect_worker(struct work_struct *work) { - struct rpcrdma_xprt *r_xprt = - container_of(work, struct rpcrdma_xprt, rdma_connect.work); - struct rpc_xprt *xprt = &r_xprt->xprt; + struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt, + rx_connect_worker.work); + struct rpc_xprt *xprt = &r_xprt->rx_xprt; int rc = 0; xprt_clear_connected(xprt); @@ -235,7 +235,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) dprintk("RPC: %s: called\n", __func__); - cancel_delayed_work_sync(&r_xprt->rdma_connect); + cancel_delayed_work_sync(&r_xprt->rx_connect_worker); xprt_clear_connected(xprt); @@ -364,8 +364,7 @@ xprt_setup_rdma(struct xprt_create *args) * any inline data. Also specify any padding which will be provided * from a preregistered zero buffer. */ - rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia, - &new_xprt->rx_data); + rc = rpcrdma_buffer_create(new_xprt); if (rc) goto out3; @@ -374,9 +373,8 @@ xprt_setup_rdma(struct xprt_create *args) * connection loss notification is async. We also catch connection loss * when reaping receives. */ - INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker); - new_ep->rep_func = rpcrdma_conn_func; - new_ep->rep_xprt = xprt; + INIT_DELAYED_WORK(&new_xprt->rx_connect_worker, + xprt_rdma_connect_worker); xprt_rdma_format_addresses(xprt); xprt->max_payload = rpcrdma_max_payload(new_xprt); @@ -434,94 +432,101 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) if (r_xprt->rx_ep.rep_connected != 0) { /* Reconnect */ - schedule_delayed_work(&r_xprt->rdma_connect, - xprt->reestablish_timeout); + schedule_delayed_work(&r_xprt->rx_connect_worker, + xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; } else { - schedule_delayed_work(&r_xprt->rdma_connect, 0); + schedule_delayed_work(&r_xprt->rx_connect_worker, 0); if (!RPC_IS_ASYNC(task)) - flush_delayed_work(&r_xprt->rdma_connect); + flush_delayed_work(&r_xprt->rx_connect_worker); } } /* * The RDMA allocate/free functions need the task structure as a place * to hide the struct rpcrdma_req, which is necessary for the actual send/recv - * sequence. For this reason, the recv buffers are attached to send - * buffers for portions of the RPC. Note that the RPC layer allocates - * both send and receive buffers in the same call. We may register - * the receive buffer portion when using reply chunks. + * sequence. + * + * The RPC layer allocates both send and receive buffers in the same call + * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer). + * We may register rq_rcv_buf when using reply chunks. */ static void * xprt_rdma_allocate(struct rpc_task *task, size_t size) { struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt; - struct rpcrdma_req *req, *nreq; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_regbuf *rb; + struct rpcrdma_req *req; + size_t min_size; + gfp_t flags; - req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf); + req = rpcrdma_buffer_get(&r_xprt->rx_buf); if (req == NULL) return NULL; - if (size > req->rl_size) { - dprintk("RPC: %s: size %zd too large for buffer[%zd]: " - "prog %d vers %d proc %d\n", - __func__, size, req->rl_size, - task->tk_client->cl_prog, task->tk_client->cl_vers, - task->tk_msg.rpc_proc->p_proc); - /* - * Outgoing length shortage. Our inline write max must have - * been configured to perform direct i/o. - * - * This is therefore a large metadata operation, and the - * allocate call was made on the maximum possible message, - * e.g. containing long filename(s) or symlink data. In - * fact, while these metadata operations *might* carry - * large outgoing payloads, they rarely *do*. However, we - * have to commit to the request here, so reallocate and - * register it now. The data path will never require this - * reallocation. - * - * If the allocation or registration fails, the RPC framework - * will (doggedly) retry. - */ - if (task->tk_flags & RPC_TASK_SWAPPER) - nreq = kmalloc(sizeof *req + size, GFP_ATOMIC); - else - nreq = kmalloc(sizeof *req + size, GFP_NOFS); - if (nreq == NULL) - goto outfail; - - if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia, - nreq->rl_base, size + sizeof(struct rpcrdma_req) - - offsetof(struct rpcrdma_req, rl_base), - &nreq->rl_handle, &nreq->rl_iov)) { - kfree(nreq); - goto outfail; - } - rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size; - nreq->rl_size = size; - nreq->rl_niovs = 0; - nreq->rl_nchunks = 0; - nreq->rl_buffer = (struct rpcrdma_buffer *)req; - nreq->rl_reply = req->rl_reply; - memcpy(nreq->rl_segments, - req->rl_segments, sizeof nreq->rl_segments); - /* flag the swap with an unused field */ - nreq->rl_iov.length = 0; - req->rl_reply = NULL; - req = nreq; - } + flags = GFP_NOIO | __GFP_NOWARN; + if (RPC_IS_SWAPPER(task)) + flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; + + if (req->rl_rdmabuf == NULL) + goto out_rdmabuf; + if (req->rl_sendbuf == NULL) + goto out_sendbuf; + if (size > req->rl_sendbuf->rg_size) + goto out_sendbuf; + +out: dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); req->rl_connect_cookie = 0; /* our reserved value */ - return req->rl_xdr_buf; - -outfail: + return req->rl_sendbuf->rg_base; + +out_rdmabuf: + min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); + rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags); + if (IS_ERR(rb)) + goto out_fail; + req->rl_rdmabuf = rb; + +out_sendbuf: + /* XDR encoding and RPC/RDMA marshaling of this request has not + * yet occurred. Thus a lower bound is needed to prevent buffer + * overrun during marshaling. + * + * RPC/RDMA marshaling may choose to send payload bearing ops + * inline, if the result is smaller than the inline threshold. + * The value of the "size" argument accounts for header + * requirements but not for the payload in these cases. + * + * Likewise, allocate enough space to receive a reply up to the + * size of the inline threshold. + * + * It's unlikely that both the send header and the received + * reply will be large, but slush is provided here to allow + * flexibility when marshaling. + */ + min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp); + min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp); + if (size < min_size) + size = min_size; + + rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags); + if (IS_ERR(rb)) + goto out_fail; + rb->rg_owner = req; + + r_xprt->rx_stats.hardway_register_count += size; + rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf); + req->rl_sendbuf = rb; + goto out; + +out_fail: rpcrdma_buffer_put(req); - rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++; + r_xprt->rx_stats.failed_marshal_count++; return NULL; } @@ -533,47 +538,24 @@ xprt_rdma_free(void *buffer) { struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; - struct rpcrdma_rep *rep; + struct rpcrdma_regbuf *rb; int i; if (buffer == NULL) return; - req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]); - if (req->rl_iov.length == 0) { /* see allocate above */ - r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer, - struct rpcrdma_xprt, rx_buf); - } else - r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - rep = req->rl_reply; + rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]); + req = rb->rg_owner; + r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); - dprintk("RPC: %s: called on 0x%p%s\n", - __func__, rep, (rep && rep->rr_func) ? " (with waiter)" : ""); + dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - /* - * Finish the deregistration. The process is considered - * complete when the rr_func vector becomes NULL - this - * was put in place during rpcrdma_reply_handler() - the wait - * call below will not block if the dereg is "done". If - * interrupted, our framework will clean up. - */ for (i = 0; req->rl_nchunks;) { --req->rl_nchunks; i += rpcrdma_deregister_external( &req->rl_segments[i], r_xprt); } - if (req->rl_iov.length == 0) { /* see allocate above */ - struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer; - oreq->rl_reply = req->rl_reply; - (void) rpcrdma_deregister_internal(&r_xprt->rx_ia, - req->rl_handle, - &req->rl_iov); - kfree(req); - req = oreq; - } - - /* Put back request+reply buffers */ rpcrdma_buffer_put(req); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c98e40643910..124676c13780 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -49,6 +49,7 @@ #include <linux/interrupt.h> #include <linux/slab.h> +#include <linux/prefetch.h> #include <asm/bitops.h> #include "xprt_rdma.h" @@ -153,7 +154,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) event->device->name, context); if (ep->rep_connected == 1) { ep->rep_connected = -EIO; - ep->rep_func(ep); + rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); } } @@ -168,23 +169,59 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) event->device->name, context); if (ep->rep_connected == 1) { ep->rep_connected = -EIO; - ep->rep_func(ep); + rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); } } +static const char * const wc_status[] = { + "success", + "local length error", + "local QP operation error", + "local EE context operation error", + "local protection error", + "WR flushed", + "memory management operation error", + "bad response error", + "local access error", + "remote invalid request error", + "remote access error", + "remote operation error", + "transport retry counter exceeded", + "RNR retrycounter exceeded", + "local RDD violation error", + "remove invalid RD request", + "operation aborted", + "invalid EE context number", + "invalid EE context state", + "fatal error", + "response timeout error", + "general error", +}; + +#define COMPLETION_MSG(status) \ + ((status) < ARRAY_SIZE(wc_status) ? \ + wc_status[(status)] : "unexpected completion error") + static void rpcrdma_sendcq_process_wc(struct ib_wc *wc) { - struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + if (likely(wc->status == IB_WC_SUCCESS)) + return; - dprintk("RPC: %s: frmr %p status %X opcode %d\n", - __func__, frmr, wc->status, wc->opcode); + /* WARNING: Only wr_id and status are reliable at this point */ + if (wc->wr_id == 0ULL) { + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("RPC: %s: SEND: %s\n", + __func__, COMPLETION_MSG(wc->status)); + } else { + struct rpcrdma_mw *r; - if (wc->wr_id == 0ULL) - return; - if (wc->status != IB_WC_SUCCESS) - frmr->r.frmr.fr_state = FRMR_IS_STALE; + r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + r->r.frmr.fr_state = FRMR_IS_STALE; + pr_err("RPC: %s: frmr %p (stale): %s\n", + __func__, r, COMPLETION_MSG(wc->status)); + } } static int @@ -248,33 +285,32 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; - dprintk("RPC: %s: rep %p status %X opcode %X length %u\n", - __func__, rep, wc->status, wc->opcode, wc->byte_len); + /* WARNING: Only wr_id and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) + goto out_fail; - if (wc->status != IB_WC_SUCCESS) { - rep->rr_len = ~0U; - goto out_schedule; - } + /* status == SUCCESS means all fields in wc are trustworthy */ if (wc->opcode != IB_WC_RECV) return; + dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n", + __func__, rep, wc->byte_len); + rep->rr_len = wc->byte_len; ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device, - rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE); - - if (rep->rr_len >= 16) { - struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base; - unsigned int credits = ntohl(p->rm_credit); - - if (credits == 0) - credits = 1; /* don't deadlock */ - else if (credits > rep->rr_buffer->rb_max_requests) - credits = rep->rr_buffer->rb_max_requests; - atomic_set(&rep->rr_buffer->rb_credits, credits); - } + rdmab_addr(rep->rr_rdmabuf), + rep->rr_len, DMA_FROM_DEVICE); + prefetch(rdmab_to_msg(rep->rr_rdmabuf)); out_schedule: list_add_tail(&rep->rr_list, sched_list); + return; +out_fail: + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("RPC: %s: rep %p: %s\n", + __func__, rep, COMPLETION_MSG(wc->status)); + rep->rr_len = ~0U; + goto out_schedule; } static int @@ -390,8 +426,8 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr; #endif - struct ib_qp_attr attr; - struct ib_qp_init_attr iattr; + struct ib_qp_attr *attr = &ia->ri_qp_attr; + struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr; int connstate = 0; switch (event->event) { @@ -414,12 +450,13 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) break; case RDMA_CM_EVENT_ESTABLISHED: connstate = 1; - ib_query_qp(ia->ri_id->qp, &attr, - IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, - &iattr); + ib_query_qp(ia->ri_id->qp, attr, + IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC, + iattr); dprintk("RPC: %s: %d responder resources" " (%d initiator)\n", - __func__, attr.max_dest_rd_atomic, attr.max_rd_atomic); + __func__, attr->max_dest_rd_atomic, + attr->max_rd_atomic); goto connected; case RDMA_CM_EVENT_CONNECT_ERROR: connstate = -ENOTCONN; @@ -436,11 +473,10 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) case RDMA_CM_EVENT_DEVICE_REMOVAL: connstate = -ENODEV; connected: - atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1); dprintk("RPC: %s: %sconnected\n", __func__, connstate > 0 ? "" : "dis"); ep->rep_connected = connstate; - ep->rep_func(ep); + rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); /*FALLTHROUGH*/ default: @@ -453,7 +489,7 @@ connected: #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) if (connstate == 1) { - int ird = attr.max_dest_rd_atomic; + int ird = attr->max_dest_rd_atomic; int tird = ep->rep_remote_cma.responder_resources; printk(KERN_INFO "rpcrdma: connection to %pI4:%u " "on %s, memreg %d slots %d ird %d%s\n", @@ -554,8 +590,8 @@ int rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) { int rc, mem_priv; - struct ib_device_attr devattr; struct rpcrdma_ia *ia = &xprt->rx_ia; + struct ib_device_attr *devattr = &ia->ri_devattr; ia->ri_id = rpcrdma_create_id(xprt, ia, addr); if (IS_ERR(ia->ri_id)) { @@ -571,26 +607,21 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) goto out2; } - /* - * Query the device to determine if the requested memory - * registration strategy is supported. If it isn't, set the - * strategy to a globally supported model. - */ - rc = ib_query_device(ia->ri_id->device, &devattr); + rc = ib_query_device(ia->ri_id->device, devattr); if (rc) { dprintk("RPC: %s: ib_query_device failed %d\n", __func__, rc); - goto out2; + goto out3; } - if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { + if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) { ia->ri_have_dma_lkey = 1; ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey; } if (memreg == RPCRDMA_FRMR) { /* Requires both frmr reg and local dma lkey */ - if ((devattr.device_cap_flags & + if ((devattr->device_cap_flags & (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) != (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) { dprintk("RPC: %s: FRMR registration " @@ -600,7 +631,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) /* Mind the ia limit on FRMR page list depth */ ia->ri_max_frmr_depth = min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - devattr.max_fast_reg_page_list_len); + devattr->max_fast_reg_page_list_len); } } if (memreg == RPCRDMA_MTHCAFMR) { @@ -638,14 +669,14 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) "phys register failed with %lX\n", __func__, PTR_ERR(ia->ri_bind_mem)); rc = -ENOMEM; - goto out2; + goto out3; } break; default: printk(KERN_ERR "RPC: Unsupported memory " "registration mode: %d\n", memreg); rc = -ENOMEM; - goto out2; + goto out3; } dprintk("RPC: %s: memory registration strategy is %d\n", __func__, memreg); @@ -655,6 +686,10 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) rwlock_init(&ia->ri_qplock); return 0; + +out3: + ib_dealloc_pd(ia->ri_pd); + ia->ri_pd = NULL; out2: rdma_destroy_id(ia->ri_id); ia->ri_id = NULL; @@ -698,20 +733,13 @@ int rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { - struct ib_device_attr devattr; + struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; int rc, err; - rc = ib_query_device(ia->ri_id->device, &devattr); - if (rc) { - dprintk("RPC: %s: ib_query_device failed %d\n", - __func__, rc); - return rc; - } - /* check provider's send/recv wr limits */ - if (cdata->max_requests > devattr.max_qp_wr) - cdata->max_requests = devattr.max_qp_wr; + if (cdata->max_requests > devattr->max_qp_wr) + cdata->max_requests = devattr->max_qp_wr; ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; @@ -746,8 +774,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, } ep->rep_attr.cap.max_send_wr *= depth; - if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) { - cdata->max_requests = devattr.max_qp_wr / depth; + if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) { + cdata->max_requests = devattr->max_qp_wr / depth; if (!cdata->max_requests) return -EINVAL; ep->rep_attr.cap.max_send_wr = cdata->max_requests * @@ -766,6 +794,14 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.qp_type = IB_QPT_RC; ep->rep_attr.port_num = ~0; + if (cdata->padding) { + ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding, + GFP_KERNEL); + if (IS_ERR(ep->rep_padbuf)) + return PTR_ERR(ep->rep_padbuf); + } else + ep->rep_padbuf = NULL; + dprintk("RPC: %s: requested max: dtos: send %d recv %d; " "iovs: send %d recv %d\n", __func__, @@ -781,7 +817,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, else if (ep->rep_cqinit <= 2) ep->rep_cqinit = 0; INIT_CQCOUNT(ep); - ep->rep_ia = ia; init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); @@ -831,10 +866,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, /* Client offers RDMA Read but does not initiate */ ep->rep_remote_cma.initiator_depth = 0; - if (devattr.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ + if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */ ep->rep_remote_cma.responder_resources = 32; else - ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom; + ep->rep_remote_cma.responder_resources = + devattr->max_qp_rd_atom; ep->rep_remote_cma.retry_count = 7; ep->rep_remote_cma.flow_control = 0; @@ -848,6 +884,7 @@ out2: dprintk("RPC: %s: ib_destroy_cq returned %i\n", __func__, err); out1: + rpcrdma_free_regbuf(ia, ep->rep_padbuf); return rc; } @@ -874,11 +911,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ia->ri_id->qp = NULL; } - /* padding - could be done in rpcrdma_buffer_destroy... */ - if (ep->rep_pad_mr) { - rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad); - ep->rep_pad_mr = NULL; - } + rpcrdma_free_regbuf(ia, ep->rep_padbuf); rpcrdma_clean_cq(ep->rep_attr.recv_cq); rc = ib_destroy_cq(ep->rep_attr.recv_cq); @@ -1048,6 +1081,48 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) } } +static struct rpcrdma_req * +rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_req *req; + + req = kzalloc(sizeof(*req), GFP_KERNEL); + if (req == NULL) + return ERR_PTR(-ENOMEM); + + req->rl_buffer = &r_xprt->rx_buf; + return req; +} + +static struct rpcrdma_rep * +rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_rep *rep; + int rc; + + rc = -ENOMEM; + rep = kzalloc(sizeof(*rep), GFP_KERNEL); + if (rep == NULL) + goto out; + + rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize, + GFP_KERNEL); + if (IS_ERR(rep->rr_rdmabuf)) { + rc = PTR_ERR(rep->rr_rdmabuf); + goto out_free; + } + + rep->rr_buffer = &r_xprt->rx_buf; + return rep; + +out_free: + kfree(rep); +out: + return ERR_PTR(rc); +} + static int rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf) { @@ -1134,27 +1209,26 @@ out_free: } int -rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, - struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) +rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; char *p; - size_t len, rlen, wlen; + size_t len; int i, rc; buf->rb_max_requests = cdata->max_requests; spin_lock_init(&buf->rb_lock); - atomic_set(&buf->rb_credits, 1); /* Need to allocate: * 1. arrays for send and recv pointers * 2. arrays of struct rpcrdma_req to fill in pointers * 3. array of struct rpcrdma_rep for replies - * 4. padding, if any * Send/recv buffers in req/rep need to be registered */ len = buf->rb_max_requests * (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); - len += cdata->padding; p = kzalloc(len, GFP_KERNEL); if (p == NULL) { @@ -1170,17 +1244,6 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, buf->rb_recv_bufs = (struct rpcrdma_rep **) p; p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; - /* - * Register the zeroed pad buffer, if any. - */ - if (cdata->padding) { - rc = rpcrdma_register_internal(ia, p, cdata->padding, - &ep->rep_pad_mr, &ep->rep_pad); - if (rc) - goto out; - } - p += cdata->padding; - INIT_LIST_HEAD(&buf->rb_mws); INIT_LIST_HEAD(&buf->rb_all); switch (ia->ri_memreg_strategy) { @@ -1198,62 +1261,29 @@ rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep, break; } - /* - * Allocate/init the request/reply buffers. Doing this - * using kmalloc for now -- one for each buf. - */ - wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req)); - rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep)); - dprintk("RPC: %s: wlen = %zu, rlen = %zu\n", - __func__, wlen, rlen); - for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; struct rpcrdma_rep *rep; - req = kmalloc(wlen, GFP_KERNEL); - if (req == NULL) { + req = rpcrdma_create_req(r_xprt); + if (IS_ERR(req)) { dprintk("RPC: %s: request buffer %d alloc" " failed\n", __func__, i); - rc = -ENOMEM; + rc = PTR_ERR(req); goto out; } - memset(req, 0, sizeof(struct rpcrdma_req)); buf->rb_send_bufs[i] = req; - buf->rb_send_bufs[i]->rl_buffer = buf; - rc = rpcrdma_register_internal(ia, req->rl_base, - wlen - offsetof(struct rpcrdma_req, rl_base), - &buf->rb_send_bufs[i]->rl_handle, - &buf->rb_send_bufs[i]->rl_iov); - if (rc) - goto out; - - buf->rb_send_bufs[i]->rl_size = wlen - - sizeof(struct rpcrdma_req); - - rep = kmalloc(rlen, GFP_KERNEL); - if (rep == NULL) { + rep = rpcrdma_create_rep(r_xprt); + if (IS_ERR(rep)) { dprintk("RPC: %s: reply buffer %d alloc failed\n", __func__, i); - rc = -ENOMEM; + rc = PTR_ERR(rep); goto out; } - memset(rep, 0, sizeof(struct rpcrdma_rep)); buf->rb_recv_bufs[i] = rep; - buf->rb_recv_bufs[i]->rr_buffer = buf; - - rc = rpcrdma_register_internal(ia, rep->rr_base, - rlen - offsetof(struct rpcrdma_rep, rr_base), - &buf->rb_recv_bufs[i]->rr_handle, - &buf->rb_recv_bufs[i]->rr_iov); - if (rc) - goto out; - } - dprintk("RPC: %s: max_requests %d\n", - __func__, buf->rb_max_requests); - /* done */ + return 0; out: rpcrdma_buffer_destroy(buf); @@ -1261,6 +1291,27 @@ out: } static void +rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) +{ + if (!rep) + return; + + rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); + kfree(rep); +} + +static void +rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) +{ + if (!req) + return; + + rpcrdma_free_regbuf(ia, req->rl_sendbuf); + rpcrdma_free_regbuf(ia, req->rl_rdmabuf); + kfree(req); +} + +static void rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf) { struct rpcrdma_mw *r; @@ -1315,18 +1366,10 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) dprintk("RPC: %s: entering\n", __func__); for (i = 0; i < buf->rb_max_requests; i++) { - if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) { - rpcrdma_deregister_internal(ia, - buf->rb_recv_bufs[i]->rr_handle, - &buf->rb_recv_bufs[i]->rr_iov); - kfree(buf->rb_recv_bufs[i]); - } - if (buf->rb_send_bufs && buf->rb_send_bufs[i]) { - rpcrdma_deregister_internal(ia, - buf->rb_send_bufs[i]->rl_handle, - &buf->rb_send_bufs[i]->rl_iov); - kfree(buf->rb_send_bufs[i]); - } + if (buf->rb_recv_bufs) + rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); + if (buf->rb_send_bufs) + rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); } switch (ia->ri_memreg_strategy) { @@ -1450,8 +1493,8 @@ rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) int i; for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++) - rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf); - rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf); + rpcrdma_buffer_put_mr(&seg->rl_mw, buf); + rpcrdma_buffer_put_mr(&seg1->rl_mw, buf); } static void @@ -1537,7 +1580,7 @@ rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf, list_add(&r->mw_list, stale); continue; } - req->rl_segments[i].mr_chunk.rl_mw = r; + req->rl_segments[i].rl_mw = r; if (unlikely(i-- == 0)) return req; /* Success */ } @@ -1559,7 +1602,7 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) r = list_entry(buf->rb_mws.next, struct rpcrdma_mw, mw_list); list_del(&r->mw_list); - req->rl_segments[i].mr_chunk.rl_mw = r; + req->rl_segments[i].rl_mw = r; if (unlikely(i-- == 0)) return req; /* Success */ } @@ -1658,8 +1701,6 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) struct rpcrdma_buffer *buffers = req->rl_buffer; unsigned long flags; - if (req->rl_iov.length == 0) /* special case xprt_rdma_allocate() */ - buffers = ((struct rpcrdma_req *) buffers)->rl_buffer; spin_lock_irqsave(&buffers->rb_lock, flags); if (buffers->rb_recv_index < buffers->rb_max_requests) { req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; @@ -1688,7 +1729,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) * Wrappers for internal-use kmalloc memory registration, used by buffer code. */ -int +static int rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, struct ib_mr **mrp, struct ib_sge *iov) { @@ -1739,7 +1780,7 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len, return rc; } -int +static int rpcrdma_deregister_internal(struct rpcrdma_ia *ia, struct ib_mr *mr, struct ib_sge *iov) { @@ -1757,6 +1798,61 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia, return rc; } +/** + * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers + * @ia: controlling rpcrdma_ia + * @size: size of buffer to be allocated, in bytes + * @flags: GFP flags + * + * Returns pointer to private header of an area of internally + * registered memory, or an ERR_PTR. The registered buffer follows + * the end of the private header. + * + * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for + * receiving the payload of RDMA RECV operations. regbufs are not + * used for RDMA READ/WRITE operations, thus are registered only for + * LOCAL access. + */ +struct rpcrdma_regbuf * +rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags) +{ + struct rpcrdma_regbuf *rb; + int rc; + + rc = -ENOMEM; + rb = kmalloc(sizeof(*rb) + size, flags); + if (rb == NULL) + goto out; + + rb->rg_size = size; + rb->rg_owner = NULL; + rc = rpcrdma_register_internal(ia, rb->rg_base, size, + &rb->rg_mr, &rb->rg_iov); + if (rc) + goto out_free; + + return rb; + +out_free: + kfree(rb); +out: + return ERR_PTR(rc); +} + +/** + * rpcrdma_free_regbuf - deregister and free registered buffer + * @ia: controlling rpcrdma_ia + * @rb: regbuf to be deregistered and freed + */ +void +rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb) +{ + if (rb) { + rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov); + kfree(rb); + } +} + /* * Wrappers for chunk registration, shared by read/write chunk code. */ @@ -1799,7 +1895,7 @@ rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg, struct rpcrdma_xprt *r_xprt) { struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw; + struct rpcrdma_mw *mw = seg1->rl_mw; struct rpcrdma_frmr *frmr = &mw->r.frmr; struct ib_mr *mr = frmr->fr_mr; struct ib_send_wr fastreg_wr, *bad_wr; @@ -1888,12 +1984,12 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, struct ib_send_wr invalidate_wr, *bad_wr; int rc; - seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID; + seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID; memset(&invalidate_wr, 0, sizeof invalidate_wr); - invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw; + invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw; invalidate_wr.opcode = IB_WR_LOCAL_INV; - invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey; + invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); read_lock(&ia->ri_qplock); @@ -1903,7 +1999,7 @@ rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg, read_unlock(&ia->ri_qplock); if (rc) { /* Force rpcrdma_buffer_get() to retry */ - seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE; + seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE; dprintk("RPC: %s: failed ib_post_send for invalidate," " status %i\n", __func__, rc); } @@ -1935,8 +2031,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr, - physaddrs, i, seg1->mr_dma); + rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma); if (rc) { dprintk("RPC: %s: failed ib_map_phys_fmr " "%u@0x%llx+%i (%d)... status %i\n", __func__, @@ -1945,7 +2040,7 @@ rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg, while (i--) rpcrdma_unmap_one(ia, --seg); } else { - seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey; + seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; seg1->mr_len = len; @@ -1962,7 +2057,7 @@ rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg, LIST_HEAD(l); int rc; - list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l); + list_add(&seg1->rl_mw->r.fmr->list, &l); rc = ib_unmap_fmr(&l); read_lock(&ia->ri_qplock); while (seg1->mr_nsegs--) @@ -2104,11 +2199,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, recv_wr.next = NULL; recv_wr.wr_id = (u64) (unsigned long) rep; - recv_wr.sg_list = &rep->rr_iov; + recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; recv_wr.num_sge = 1; ib_dma_sync_single_for_cpu(ia->ri_id->device, - rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL); + rdmab_addr(rep->rr_rdmabuf), + rdmab_length(rep->rr_rdmabuf), + DMA_BIDIRECTIONAL); rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index b799041b75bf..c9d2a02f631b 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -70,6 +70,9 @@ struct rpcrdma_ia { int ri_async_rc; enum rpcrdma_memreg ri_memreg_strategy; unsigned int ri_max_frmr_depth; + struct ib_device_attr ri_devattr; + struct ib_qp_attr ri_qp_attr; + struct ib_qp_init_attr ri_qp_init_attr; }; /* @@ -83,13 +86,9 @@ struct rpcrdma_ep { atomic_t rep_cqcount; int rep_cqinit; int rep_connected; - struct rpcrdma_ia *rep_ia; struct ib_qp_init_attr rep_attr; wait_queue_head_t rep_connect_wait; - struct ib_sge rep_pad; /* holds zeroed pad */ - struct ib_mr *rep_pad_mr; /* holds zeroed pad */ - void (*rep_func)(struct rpcrdma_ep *); - struct rpc_xprt *rep_xprt; /* for rep_func */ + struct rpcrdma_regbuf *rep_padbuf; struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; @@ -106,6 +105,44 @@ struct rpcrdma_ep { #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) +/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV + * + * The below structure appears at the front of a large region of kmalloc'd + * memory, which always starts on a good alignment boundary. + */ + +struct rpcrdma_regbuf { + size_t rg_size; + struct rpcrdma_req *rg_owner; + struct ib_mr *rg_mr; + struct ib_sge rg_iov; + __be32 rg_base[0] __attribute__ ((aligned(256))); +}; + +static inline u64 +rdmab_addr(struct rpcrdma_regbuf *rb) +{ + return rb->rg_iov.addr; +} + +static inline u32 +rdmab_length(struct rpcrdma_regbuf *rb) +{ + return rb->rg_iov.length; +} + +static inline u32 +rdmab_lkey(struct rpcrdma_regbuf *rb) +{ + return rb->rg_iov.lkey; +} + +static inline struct rpcrdma_msg * +rdmab_to_msg(struct rpcrdma_regbuf *rb) +{ + return (struct rpcrdma_msg *)rb->rg_base; +} + enum rpcrdma_chunktype { rpcrdma_noch = 0, rpcrdma_readch, @@ -134,22 +171,16 @@ enum rpcrdma_chunktype { /* temporary static scatter/gather max */ #define RPCRDMA_MAX_DATA_SEGS (64) /* max scatter/gather */ #define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ -#define MAX_RPCRDMAHDR (\ - /* max supported RPC/RDMA header */ \ - sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \ - (sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32)) struct rpcrdma_buffer; struct rpcrdma_rep { - unsigned int rr_len; /* actual received reply length */ - struct rpcrdma_buffer *rr_buffer; /* home base for this structure */ - struct rpc_xprt *rr_xprt; /* needed for request/reply matching */ - void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */ - struct list_head rr_list; /* tasklet list */ - struct ib_sge rr_iov; /* for posting */ - struct ib_mr *rr_handle; /* handle for mem in rr_iov */ - char rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */ + unsigned int rr_len; + struct rpcrdma_buffer *rr_buffer; + struct rpc_xprt *rr_xprt; + void (*rr_func)(struct rpcrdma_rep *); + struct list_head rr_list; + struct rpcrdma_regbuf *rr_rdmabuf; }; /* @@ -211,10 +242,7 @@ struct rpcrdma_mw { */ struct rpcrdma_mr_seg { /* chunk descriptors */ - union { /* chunk memory handles */ - struct ib_mr *rl_mr; /* if registered directly */ - struct rpcrdma_mw *rl_mw; /* if registered from region */ - } mr_chunk; + struct rpcrdma_mw *rl_mw; /* registered MR */ u64 mr_base; /* registration result */ u32 mr_rkey; /* registration result */ u32 mr_len; /* length of chunk or segment */ @@ -227,22 +255,26 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ }; struct rpcrdma_req { - size_t rl_size; /* actual length of buffer */ unsigned int rl_niovs; /* 0, 2 or 4 */ unsigned int rl_nchunks; /* non-zero if chunks */ unsigned int rl_connect_cookie; /* retry detection */ enum rpcrdma_chunktype rl_rtype, rl_wtype; struct rpcrdma_buffer *rl_buffer; /* home base for this structure */ struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ - struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */ struct ib_sge rl_send_iov[4]; /* for active requests */ - struct ib_sge rl_iov; /* for posting */ - struct ib_mr *rl_handle; /* handle for mem in rl_iov */ - char rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */ - __u32 rl_xdr_buf[0]; /* start of returned rpc rq_buffer */ + struct rpcrdma_regbuf *rl_rdmabuf; + struct rpcrdma_regbuf *rl_sendbuf; + struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; -#define rpcr_to_rdmar(r) \ - container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0]) + +static inline struct rpcrdma_req * +rpcr_to_rdmar(struct rpc_rqst *rqst) +{ + struct rpcrdma_regbuf *rb = container_of(rqst->rq_buffer, + struct rpcrdma_regbuf, + rg_base[0]); + return rb->rg_owner; +} /* * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for @@ -252,7 +284,6 @@ struct rpcrdma_req { */ struct rpcrdma_buffer { spinlock_t rb_lock; /* protects indexes */ - atomic_t rb_credits; /* most recent server credits */ int rb_max_requests;/* client max requests */ struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */ struct list_head rb_all; @@ -318,16 +349,16 @@ struct rpcrdma_stats { * during unmount. */ struct rpcrdma_xprt { - struct rpc_xprt xprt; + struct rpc_xprt rx_xprt; struct rpcrdma_ia rx_ia; struct rpcrdma_ep rx_ep; struct rpcrdma_buffer rx_buf; struct rpcrdma_create_data_internal rx_data; - struct delayed_work rdma_connect; + struct delayed_work rx_connect_worker; struct rpcrdma_stats rx_stats; }; -#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt) +#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt) #define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data) /* Setting this to 0 ensures interoperability with early servers. @@ -358,9 +389,7 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, /* * Buffer calls - xprtrdma/verbs.c */ -int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *, - struct rpcrdma_ia *, - struct rpcrdma_create_data_internal *); +int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); @@ -368,16 +397,16 @@ void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); -int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int, - struct ib_mr **, struct ib_sge *); -int rpcrdma_deregister_internal(struct rpcrdma_ia *, - struct ib_mr *, struct ib_sge *); - int rpcrdma_register_external(struct rpcrdma_mr_seg *, int, int, struct rpcrdma_xprt *); int rpcrdma_deregister_external(struct rpcrdma_mr_seg *, struct rpcrdma_xprt *); +struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, + size_t, gfp_t); +void rpcrdma_free_regbuf(struct rpcrdma_ia *, + struct rpcrdma_regbuf *); + /* * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c */ |