diff options
author | Trond Myklebust <trond.myklebust@hammerspace.com> | 2018-08-09 23:33:21 -0400 |
---|---|---|
committer | Trond Myklebust <trond.myklebust@hammerspace.com> | 2018-09-30 15:35:15 -0400 |
commit | 944b042921a17d1a4e51bb05f8edf2b93d26e36f (patch) | |
tree | 8a1b6271aae6794793c136d2b12b1386f739b25d /net | |
parent | ef3f54347f690d06649c0d7a1f63d3410b3d08d3 (diff) | |
download | linux-944b042921a17d1a4e51bb05f8edf2b93d26e36f.tar.bz2 |
SUNRPC: Add a transmission queue for RPC requests
Add the queue that will enforce the ordering of RPC task transmission.
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/sunrpc/clnt.c | 6 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 84 |
2 files changed, 77 insertions, 13 deletions
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index be0f06a8156b..c1a19a3e1356 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1156,11 +1156,11 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req) */ xbufp->len = xbufp->head[0].iov_len + xbufp->page_len + xbufp->tail[0].iov_len; - set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); task->tk_action = call_bc_transmit; atomic_inc(&task->tk_count); WARN_ON_ONCE(atomic_read(&task->tk_count) != 2); + xprt_request_enqueue_transmit(task); rpc_execute(task); dprintk("RPC: rpc_run_bc_task: task= %p\n", task); @@ -1759,8 +1759,6 @@ rpc_xdr_encode(struct rpc_task *task) task->tk_status = rpcauth_wrap_req(task, encode, req, p, task->tk_msg.rpc_argp); - if (task->tk_status == 0) - set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); } /* @@ -1964,6 +1962,7 @@ call_transmit(struct rpc_task *task) /* Add task to reply queue before transmission to avoid races */ if (rpc_reply_expected(task)) xprt_request_enqueue_receive(task); + xprt_request_enqueue_transmit(task); if (!xprt_prepare_transmit(task)) return; @@ -1998,7 +1997,6 @@ call_transmit_status(struct rpc_task *task) xprt_end_transmit(task); break; case -EBADMSG: - clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); task->tk_action = call_transmit; task->tk_status = 0; xprt_end_transmit(task); diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index d527dc08540e..1f69d9f219af 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1058,6 +1058,72 @@ void xprt_request_wait_receive(struct rpc_task *task) spin_unlock(&xprt->queue_lock); } +static bool +xprt_request_need_transmit(struct rpc_task *task) +{ + return !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) || + xprt_request_retransmit_after_disconnect(task); +} + +static bool +xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req) +{ + return xprt_request_need_transmit(task) && + !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); +} + +/** + * xprt_request_enqueue_transmit - queue a task for transmission + * @task: pointer to rpc_task + * + * Add a task to the transmission queue. + */ +void +xprt_request_enqueue_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + if (xprt_request_need_enqueue_transmit(task, req)) { + spin_lock(&xprt->queue_lock); + list_add_tail(&req->rq_xmit, &xprt->xmit_queue); + set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); + spin_unlock(&xprt->queue_lock); + } +} + +/** + * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue + * @task: pointer to rpc_task + * + * Remove a task from the transmission queue + * Caller must hold xprt->queue_lock + */ +static void +xprt_request_dequeue_transmit_locked(struct rpc_task *task) +{ + xprt_task_clear_bytes_sent(task); + if (test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + list_del(&task->tk_rqstp->rq_xmit); +} + +/** + * xprt_request_dequeue_transmit - remove a task from the transmission queue + * @task: pointer to rpc_task + * + * Remove a task from the transmission queue + */ +static void +xprt_request_dequeue_transmit(struct rpc_task *task) +{ + struct rpc_rqst *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + + spin_lock(&xprt->queue_lock); + xprt_request_dequeue_transmit_locked(task); + spin_unlock(&xprt->queue_lock); +} + /** * xprt_prepare_transmit - reserve the transport before sending a request * @task: RPC task about to send a request @@ -1077,12 +1143,8 @@ bool xprt_prepare_transmit(struct rpc_task *task) task->tk_status = req->rq_reply_bytes_recvd; goto out_unlock; } - if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) && - !xprt_request_retransmit_after_disconnect(task)) { - xprt->ops->set_retrans_timeout(task); - rpc_sleep_on(&xprt->pending, task, xprt_timer); + if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) goto out_unlock; - } } if (!xprt->ops->reserve_xprt(xprt, task)) { task->tk_status = -EAGAIN; @@ -1116,11 +1178,11 @@ void xprt_transmit(struct rpc_task *task) if (!req->rq_bytes_sent) { if (xprt_request_data_received(task)) - return; + goto out_dequeue; /* Verify that our message lies in the RPCSEC_GSS window */ if (rpcauth_xmit_need_reencode(task)) { task->tk_status = -EBADMSG; - return; + goto out_dequeue; } } @@ -1135,7 +1197,6 @@ void xprt_transmit(struct rpc_task *task) xprt_inject_disconnect(xprt); dprintk("RPC: %5u xmit complete\n", task->tk_pid); - clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate); task->tk_flags |= RPC_TASK_SENT; spin_lock_bh(&xprt->transport_lock); @@ -1147,6 +1208,8 @@ void xprt_transmit(struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); req->rq_connect_cookie = connect_cookie; +out_dequeue: + xprt_request_dequeue_transmit(task); } static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) @@ -1420,9 +1483,11 @@ xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req) { struct rpc_xprt *xprt = req->rq_xprt; - if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || + if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) || + test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) || xprt_is_pinned_rqst(req)) { spin_lock(&xprt->queue_lock); + xprt_request_dequeue_transmit_locked(task); xprt_request_dequeue_receive_locked(task); while (xprt_is_pinned_rqst(req)) { set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate); @@ -1493,6 +1558,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) INIT_LIST_HEAD(&xprt->free); INIT_LIST_HEAD(&xprt->recv_queue); + INIT_LIST_HEAD(&xprt->xmit_queue); #if defined(CONFIG_SUNRPC_BACKCHANNEL) spin_lock_init(&xprt->bc_pa_lock); INIT_LIST_HEAD(&xprt->bc_pa_list); |