diff options
Diffstat (limited to 'net/sunrpc/xprtrdma/frwr_ops.c')
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 209 |
1 files changed, 90 insertions, 119 deletions
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 766a1048a48a..229fcc9a9064 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -49,20 +49,13 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -/** - * frwr_release_mr - Destroy one MR - * @mr: MR allocated by frwr_mr_init - * - */ -void frwr_release_mr(struct rpcrdma_mr *mr) +static void frwr_cid_init(struct rpcrdma_ep *ep, + struct rpcrdma_mr *mr) { - int rc; + struct rpc_rdma_cid *cid = &mr->mr_cid; - rc = ib_dereg_mr(mr->frwr.fr_mr); - if (rc) - trace_xprtrdma_frwr_dereg(mr, rc); - kfree(mr->mr_sg); - kfree(mr); + cid->ci_queue_id = ep->re_attr.send_cq->res.id; + cid->ci_completion_id = mr->mr_ibmr->res.id; } static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) @@ -75,20 +68,22 @@ static void frwr_mr_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) } } -static void frwr_mr_recycle(struct rpcrdma_mr *mr) +/** + * frwr_mr_release - Destroy one MR + * @mr: MR allocated by frwr_mr_init + * + */ +void frwr_mr_release(struct rpcrdma_mr *mr) { - struct rpcrdma_xprt *r_xprt = mr->mr_xprt; - - trace_xprtrdma_mr_recycle(mr); - - frwr_mr_unmap(r_xprt, mr); + int rc; - spin_lock(&r_xprt->rx_buf.rb_lock); - list_del(&mr->mr_all); - r_xprt->rx_stats.mrs_recycled++; - spin_unlock(&r_xprt->rx_buf.rb_lock); + frwr_mr_unmap(mr->mr_xprt, mr); - frwr_release_mr(mr); + rc = ib_dereg_mr(mr->mr_ibmr); + if (rc) + trace_xprtrdma_frwr_dereg(mr, rc); + kfree(mr->mr_sg); + kfree(mr); } static void frwr_mr_put(struct rpcrdma_mr *mr) @@ -144,10 +139,11 @@ int frwr_mr_init(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr *mr) goto out_list_err; mr->mr_xprt = r_xprt; - mr->frwr.fr_mr = frmr; + mr->mr_ibmr = frmr; mr->mr_device = NULL; INIT_LIST_HEAD(&mr->mr_list); - init_completion(&mr->frwr.fr_linv_done); + init_completion(&mr->mr_linv_done); + frwr_cid_init(ep, mr); sg_init_table(sg, depth); mr->mr_sg = sg; @@ -257,6 +253,7 @@ int frwr_query_device(struct rpcrdma_ep *ep, const struct ib_device *device) ep->re_attr.cap.max_send_wr += 1; /* for ib_drain_sq */ ep->re_attr.cap.max_recv_wr = ep->re_max_requests; ep->re_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->re_attr.cap.max_recv_wr += RPCRDMA_MAX_RECV_BATCH; ep->re_attr.cap.max_recv_wr += 1; /* for ib_drain_rq */ ep->re_max_rdma_segs = @@ -326,7 +323,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, goto out_dmamap_err; mr->mr_device = ep->re_id->device; - ibmr = mr->frwr.fr_mr; + ibmr = mr->mr_ibmr; n = ib_map_mr_sg(ibmr, mr->mr_sg, dma_nents, NULL, PAGE_SIZE); if (n != dma_nents) goto out_mapmr_err; @@ -336,7 +333,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, key = (u8)(ibmr->rkey & 0x000000FF); ib_update_fast_reg_key(ibmr, ++key); - reg_wr = &mr->frwr.fr_regwr; + reg_wr = &mr->mr_regwr; reg_wr->mr = ibmr; reg_wr->key = ibmr->rkey; reg_wr->access = writing ? @@ -364,29 +361,19 @@ out_mapmr_err: * @cq: completion queue * @wc: WCE for a completed FastReg WR * + * Each flushed MR gets destroyed after the QP has drained. */ static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_fastreg(wc, &frwr->fr_cid); - /* The MR will get recycled when the associated req is retransmitted */ + trace_xprtrdma_wc_fastreg(wc, &mr->mr_cid); rpcrdma_flush_disconnect(cq->cq_context, wc); } -static void frwr_cid_init(struct rpcrdma_ep *ep, - struct rpcrdma_frwr *frwr) -{ - struct rpc_rdma_cid *cid = &frwr->fr_cid; - - cid->ci_queue_id = ep->re_attr.send_cq->res.id; - cid->ci_completion_id = frwr->fr_mr->res.id; -} - /** * frwr_send - post Send WRs containing the RPC Call message * @r_xprt: controlling transport instance @@ -403,27 +390,36 @@ static void frwr_cid_init(struct rpcrdma_ep *ep, */ int frwr_send(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { + struct ib_send_wr *post_wr, *send_wr = &req->rl_wr; struct rpcrdma_ep *ep = r_xprt->rx_ep; - struct ib_send_wr *post_wr; struct rpcrdma_mr *mr; + unsigned int num_wrs; - post_wr = &req->rl_wr; + num_wrs = 1; + post_wr = send_wr; list_for_each_entry(mr, &req->rl_registered, mr_list) { - struct rpcrdma_frwr *frwr; - - frwr = &mr->frwr; - - frwr->fr_cqe.done = frwr_wc_fastreg; - frwr_cid_init(ep, frwr); - frwr->fr_regwr.wr.next = post_wr; - frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe; - frwr->fr_regwr.wr.num_sge = 0; - frwr->fr_regwr.wr.opcode = IB_WR_REG_MR; - frwr->fr_regwr.wr.send_flags = 0; + trace_xprtrdma_mr_fastreg(mr); + + mr->mr_cqe.done = frwr_wc_fastreg; + mr->mr_regwr.wr.next = post_wr; + mr->mr_regwr.wr.wr_cqe = &mr->mr_cqe; + mr->mr_regwr.wr.num_sge = 0; + mr->mr_regwr.wr.opcode = IB_WR_REG_MR; + mr->mr_regwr.wr.send_flags = 0; + post_wr = &mr->mr_regwr.wr; + ++num_wrs; + } - post_wr = &frwr->fr_regwr.wr; + if ((kref_read(&req->rl_kref) > 1) || num_wrs > ep->re_send_count) { + send_wr->send_flags |= IB_SEND_SIGNALED; + ep->re_send_count = min_t(unsigned int, ep->re_send_batch, + num_wrs - ep->re_send_count); + } else { + send_wr->send_flags &= ~IB_SEND_SIGNALED; + ep->re_send_count -= num_wrs; } + trace_xprtrdma_post_send(req); return ib_post_send(ep->re_id->qp, post_wr, NULL); } @@ -440,6 +436,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) list_for_each_entry(mr, mrs, mr_list) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); + trace_xprtrdma_mr_reminv(mr); frwr_mr_put(mr); break; /* only one invalidated MR per RPC */ } @@ -447,9 +444,7 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr) { - if (wc->status != IB_WC_SUCCESS) - frwr_mr_recycle(mr); - else + if (likely(wc->status == IB_WC_SUCCESS)) frwr_mr_put(mr); } @@ -462,12 +457,10 @@ static void frwr_mr_done(struct ib_wc *wc, struct rpcrdma_mr *mr) static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li(wc, &frwr->fr_cid); + trace_xprtrdma_wc_li(wc, &mr->mr_cid); frwr_mr_done(wc, mr); rpcrdma_flush_disconnect(cq->cq_context, wc); @@ -483,14 +476,12 @@ static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li_wake(wc, &frwr->fr_cid); + trace_xprtrdma_wc_li_wake(wc, &mr->mr_cid); frwr_mr_done(wc, mr); - complete(&frwr->fr_linv_done); + complete(&mr->mr_linv_done); rpcrdma_flush_disconnect(cq->cq_context, wc); } @@ -511,7 +502,6 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) struct ib_send_wr *first, **prev, *last; struct rpcrdma_ep *ep = r_xprt->rx_ep; const struct ib_send_wr *bad_wr; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; int rc; @@ -520,35 +510,34 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - frwr = NULL; prev = &first; while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; - frwr = &mr->frwr; - frwr->fr_cqe.done = frwr_wc_localinv; - frwr_cid_init(ep, frwr); - last = &frwr->fr_invwr; + last = &mr->mr_invwr; last->next = NULL; - last->wr_cqe = &frwr->fr_cqe; + last->wr_cqe = &mr->mr_cqe; last->sg_list = NULL; last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; + last->wr_cqe->done = frwr_wc_localinv; + *prev = last; prev = &last->next; } + mr = container_of(last, struct rpcrdma_mr, mr_invwr); /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ - frwr->fr_cqe.done = frwr_wc_localinv_wake; - reinit_completion(&frwr->fr_linv_done); + last->wr_cqe->done = frwr_wc_localinv_wake; + reinit_completion(&mr->mr_linv_done); /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us @@ -562,22 +551,12 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * not happen, so don't wait in that case. */ if (bad_wr != first) - wait_for_completion(&frwr->fr_linv_done); + wait_for_completion(&mr->mr_linv_done); if (!rc) return; - /* Recycle MRs in the LOCAL_INV chain that did not get posted. - */ + /* On error, the MRs get destroyed once the QP has drained. */ trace_xprtrdma_post_linv_err(req, rc); - while (bad_wr) { - frwr = container_of(bad_wr, struct rpcrdma_frwr, - fr_invwr); - mr = container_of(frwr, struct rpcrdma_mr, frwr); - bad_wr = bad_wr->next; - - list_del_init(&mr->mr_list); - frwr_mr_recycle(mr); - } } /** @@ -589,20 +568,24 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) { struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); - struct rpcrdma_rep *rep = mr->mr_req->rl_reply; + struct rpcrdma_mr *mr = container_of(cqe, struct rpcrdma_mr, mr_cqe); + struct rpcrdma_rep *rep; /* WARNING: Only wr_cqe and status are reliable at this point */ - trace_xprtrdma_wc_li_done(wc, &frwr->fr_cid); - frwr_mr_done(wc, mr); + trace_xprtrdma_wc_li_done(wc, &mr->mr_cid); - /* Ensure @rep is generated before frwr_mr_done */ + /* Ensure that @rep is generated before the MR is released */ + rep = mr->mr_req->rl_reply; smp_rmb(); - rpcrdma_complete_rqst(rep); - rpcrdma_flush_disconnect(cq->cq_context, wc); + if (wc->status != IB_WC_SUCCESS) { + if (rep) + rpcrdma_unpin_rqst(rep); + rpcrdma_flush_disconnect(cq->cq_context, wc); + return; + } + frwr_mr_put(mr); + rpcrdma_complete_rqst(rep); } /** @@ -619,33 +602,29 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *first, *last, **prev; struct rpcrdma_ep *ep = r_xprt->rx_ep; - const struct ib_send_wr *bad_wr; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; int rc; /* Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ - frwr = NULL; prev = &first; while ((mr = rpcrdma_mr_pop(&req->rl_registered))) { trace_xprtrdma_mr_localinv(mr); r_xprt->rx_stats.local_inv_needed++; - frwr = &mr->frwr; - frwr->fr_cqe.done = frwr_wc_localinv; - frwr_cid_init(ep, frwr); - last = &frwr->fr_invwr; + last = &mr->mr_invwr; last->next = NULL; - last->wr_cqe = &frwr->fr_cqe; + last->wr_cqe = &mr->mr_cqe; last->sg_list = NULL; last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; + last->wr_cqe->done = frwr_wc_localinv; + *prev = last; prev = &last->next; } @@ -655,31 +634,23 @@ void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * are complete. The last completion will wake up the * RPC waiter. */ - frwr->fr_cqe.done = frwr_wc_localinv_done; + last->wr_cqe->done = frwr_wc_localinv_done; /* Transport disconnect drains the receive CQ before it * replaces the QP. The RPC reply handler won't call us * unless re_id->qp is a valid pointer. */ - bad_wr = NULL; - rc = ib_post_send(ep->re_id->qp, first, &bad_wr); + rc = ib_post_send(ep->re_id->qp, first, NULL); if (!rc) return; - /* Recycle MRs in the LOCAL_INV chain that did not get posted. - */ + /* On error, the MRs get destroyed once the QP has drained. */ trace_xprtrdma_post_linv_err(req, rc); - while (bad_wr) { - frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); - mr = container_of(frwr, struct rpcrdma_mr, frwr); - bad_wr = bad_wr->next; - - frwr_mr_recycle(mr); - } /* The final LOCAL_INV WR in the chain is supposed to - * do the wake. If it was never posted, the wake will - * not happen, so wake here in that case. + * do the wake. If it was never posted, the wake does + * not happen. Unpin the rqst in preparation for its + * retransmission. */ - rpcrdma_complete_rqst(req->rl_reply); + rpcrdma_unpin_rqst(req->rl_reply); } |