diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 125 |
1 files changed, 81 insertions, 44 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 694e9b13ecf0..ca4d6e4528f3 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -141,7 +141,7 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, if (xdr->page_len) { remaining = xdr->page_len; - offset = xdr->page_base & ~PAGE_MASK; + offset = offset_in_page(xdr->page_base); count = 0; while (remaining) { remaining -= min_t(unsigned int, @@ -222,7 +222,7 @@ rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf, len = xdrbuf->page_len; ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); - page_base = xdrbuf->page_base & ~PAGE_MASK; + page_base = offset_in_page(xdrbuf->page_base); p = 0; while (len && n < RPCRDMA_MAX_SEGS) { if (!ppages[p]) { @@ -540,7 +540,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, goto out; page = virt_to_page(xdr->tail[0].iov_base); - page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK; + page_base = offset_in_page(xdr->tail[0].iov_base); /* If the content in the page list is an odd length, * xdr_write_pages() has added a pad at the beginning @@ -557,7 +557,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, */ if (xdr->page_len) { ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT); - page_base = xdr->page_base & ~PAGE_MASK; + page_base = offset_in_page(xdr->page_base); remaining = xdr->page_len; while (remaining) { sge_no++; @@ -587,7 +587,7 @@ rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req, */ if (xdr->tail[0].iov_len) { page = virt_to_page(xdr->tail[0].iov_base); - page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK; + page_base = offset_in_page(xdr->tail[0].iov_base); len = xdr->tail[0].iov_len; map_tail: @@ -734,6 +734,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) rpclen = 0; } + req->rl_xid = rqst->rq_xid; + rpcrdma_insert_req(&r_xprt->rx_buf, req); + /* This implementation supports the following combinations * of chunk lists in one RPC-over-RDMA Call message: * @@ -875,9 +878,9 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) srcp += curlen; copy_len -= curlen; - page_base = rqst->rq_rcv_buf.page_base; - ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); - page_base &= ~PAGE_MASK; + ppages = rqst->rq_rcv_buf.pages + + (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT); + page_base = offset_in_page(rqst->rq_rcv_buf.page_base); fixup_copy_count = 0; if (copy_len && rqst->rq_rcv_buf.page_len) { int pagelist_len; @@ -928,6 +931,24 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) return fixup_copy_count; } +/* Caller must guarantee @rep remains stable during this call. + */ +static void +rpcrdma_mark_remote_invalidation(struct list_head *mws, + struct rpcrdma_rep *rep) +{ + struct rpcrdma_mw *mw; + + if (!(rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)) + return; + + list_for_each_entry(mw, mws, mw_list) + if (mw->mw_handle == rep->rr_inv_rkey) { + mw->mw_flags = RPCRDMA_MW_F_RI; + break; /* only one invalidated MR per RPC */ + } +} + #if defined(CONFIG_SUNRPC_BACKCHANNEL) /* By convention, backchannel calls arrive via rdma_msg type * messages, and never populate the chunk lists. This makes @@ -969,14 +990,16 @@ rpcrdma_reply_handler(struct work_struct *work) { struct rpcrdma_rep *rep = container_of(work, struct rpcrdma_rep, rr_work); + struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpc_xprt *xprt = &r_xprt->rx_xprt; struct rpcrdma_msg *headerp; struct rpcrdma_req *req; struct rpc_rqst *rqst; - struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; - struct rpc_xprt *xprt = &r_xprt->rx_xprt; __be32 *iptr; int rdmalen, status, rmerr; unsigned long cwnd; + struct list_head mws; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); @@ -994,27 +1017,45 @@ rpcrdma_reply_handler(struct work_struct *work) /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. */ - spin_lock_bh(&xprt->transport_lock); - rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); - if (!rqst) + spin_lock(&buf->rb_lock); + req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf, + headerp->rm_xid); + if (!req) goto out_nomatch; - - req = rpcr_to_rdmar(rqst); if (req->rl_reply) goto out_duplicate; - /* Sanity checking has passed. We are now committed - * to complete this transaction. + list_replace_init(&req->rl_registered, &mws); + rpcrdma_mark_remote_invalidation(&mws, rep); + + /* Avoid races with signals and duplicate replies + * by marking this req as matched. */ - list_del_init(&rqst->rq_list); - spin_unlock_bh(&xprt->transport_lock); + req->rl_reply = rep; + spin_unlock(&buf->rb_lock); + dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", __func__, rep, req, be32_to_cpu(headerp->rm_xid)); - /* from here on, the reply is no longer an orphan */ - req->rl_reply = rep; - xprt->reestablish_timeout = 0; + /* Invalidate and unmap the data payloads before waking the + * waiting application. This guarantees the memory regions + * are properly fenced from the server before the application + * accesses the data. It also ensures proper send flow control: + * waking the next RPC waits until this RPC has relinquished + * all its Send Queue entries. + */ + if (!list_empty(&mws)) + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws); + /* Perform XID lookup, reconstruction of the RPC reply, and + * RPC completion while holding the transport lock to ensure + * the rep, rqst, and rq_task pointers remain stable. + */ + spin_lock_bh(&xprt->transport_lock); + rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); + if (!rqst) + goto out_norqst; + xprt->reestablish_timeout = 0; if (headerp->rm_vers != rpcrdma_version) goto out_badversion; @@ -1024,12 +1065,9 @@ rpcrdma_reply_handler(struct work_struct *work) case rdma_msg: /* never expect read chunks */ /* never expect reply chunks (two ways to check) */ - /* never expect write chunks without having offered RDMA */ if (headerp->rm_body.rm_chunks[0] != xdr_zero || (headerp->rm_body.rm_chunks[1] == xdr_zero && - headerp->rm_body.rm_chunks[2] != xdr_zero) || - (headerp->rm_body.rm_chunks[1] != xdr_zero && - list_empty(&req->rl_registered))) + headerp->rm_body.rm_chunks[2] != xdr_zero)) goto badheader; if (headerp->rm_body.rm_chunks[1] != xdr_zero) { /* count any expected write chunks in read reply */ @@ -1066,8 +1104,7 @@ rpcrdma_reply_handler(struct work_struct *work) /* never expect read or write chunks, always reply chunks */ if (headerp->rm_body.rm_chunks[0] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero || - headerp->rm_body.rm_chunks[2] != xdr_one || - list_empty(&req->rl_registered)) + headerp->rm_body.rm_chunks[2] != xdr_one) goto badheader; iptr = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); @@ -1093,17 +1130,6 @@ badheader: } out: - /* Invalidate and flush the data payloads before waking the - * waiting application. This guarantees the memory region is - * properly fenced from the server before the application - * accesses the data. It also ensures proper send flow - * control: waking the next RPC waits until this RPC has - * relinquished all its Send Queue entries. - */ - if (!list_empty(&req->rl_registered)) - r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); - - spin_lock_bh(&xprt->transport_lock); cwnd = xprt->cwnd; xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) @@ -1112,7 +1138,7 @@ out: xprt_complete_rqst(rqst->rq_task, status); spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", - __func__, xprt, rqst, status); + __func__, xprt, rqst, status); return; out_badstatus: @@ -1161,26 +1187,37 @@ out_rdmaerr: r_xprt->rx_stats.bad_reply_count++; goto out; -/* If no pending RPC transaction was matched, post a replacement - * receive buffer before returning. +/* The req was still available, but by the time the transport_lock + * was acquired, the rqst and task had been released. Thus the RPC + * has already been terminated. */ +out_norqst: + spin_unlock_bh(&xprt->transport_lock); + rpcrdma_buffer_put(req); + dprintk("RPC: %s: race, no rqst left for req %p\n", + __func__, req); + return; + out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; out_nomatch: - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&buf->rb_lock); dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n", __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); goto repost; out_duplicate: - spin_unlock_bh(&xprt->transport_lock); + spin_unlock(&buf->rb_lock); dprintk("RPC: %s: " "duplicate reply %p to RPC request %p: xid 0x%08x\n", __func__, rep, req, be32_to_cpu(headerp->rm_xid)); +/* If no pending RPC transaction was matched, post a replacement + * receive buffer before returning. + */ repost: r_xprt->rx_stats.bad_reply_count++; if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep)) |