From a71a2651bdd3ad9ccae7d8e8c6782727c7ecba98 Mon Sep 17 00:00:00 2001 From: David Howells Date: Mon, 23 Jul 2018 17:18:37 +0100 Subject: rxrpc: Propose, but don't immediately transmit, the final ACK for a call The final ACK that closes out an rxrpc call needs to be transmitted by the client unless we're going to follow up with a DATA packet for a new call on the same channel (which implicitly ACK's the previous call, thereby saving an ACK). Currently, we don't do that, so if no follow on call is immediately forthcoming, the server will resend the last DATA packet - at which point rxrpc_conn_retransmit_call() will be triggered and will (re)send the final ACK. But the server has to hold on to the last packet until the ACK is received, thereby holding up its resources. Fix the client side to propose a delayed final ACK, to be transmitted after a short delay, assuming the call isn't superseded by a new one. Signed-off-by: David Howells --- net/rxrpc/recvmsg.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'net/rxrpc/recvmsg.c') diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index 7bff716e911e..02f1f768e16a 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -144,13 +144,11 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top); ASSERTCMP(call->rx_hard_ack, ==, call->rx_top); -#if 0 // TODO: May want to transmit final ACK under some circumstances anyway if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) { - rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false, + rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, false, true, rxrpc_propose_ack_terminal_ack); - rxrpc_send_ack_packet(call, false, NULL); + //rxrpc_send_ack_packet(call, false, NULL); } -#endif write_lock_bh(&call->state_lock); -- cgit v1.2.3 From d0b35a42031a3107a5735e0d0a605a68f530a96b Mon Sep 17 00:00:00 2001 From: David Howells Date: Mon, 23 Jul 2018 17:18:38 +0100 Subject: rxrpc: Transmit more ACKs during data reception Immediately flush any outstanding ACK on entry to rxrpc_recvmsg_data() - which transfers data to the target buffers - if we previously had an Rx underrun (ie. we returned -EAGAIN because we ran out of received data). This lets the server know what we've managed to receive something. Also flush any outstanding ACK after calling the function if it hit -EAGAIN to let the server know we processed some data. It might be better to send more ACKs, possibly on a time-based scheme, but that needs some more consideration. With this and some additional AFS patches, it is possible to get large unencrypted O_DIRECT reads to be almost as fast as NFS over TCP. It looks like it might be theoretically possible to improve performance yet more for a server running a single operation as investigation of packet timestamps indicates that the server keeps stalling. The issue appears to be that rxrpc runs in to trouble with ACK packets getting batched together (up to ~32 at a time) somewhere between the IP transmit queue on the client and the ethernet receive queue on the server. However, this case isn't too much of a worry as even a lightly loaded server should be receiving sufficient packet flux to flush the ACK packets to the UDP socket. Signed-off-by: David Howells --- net/rxrpc/ar-internal.h | 1 + net/rxrpc/recvmsg.c | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) (limited to 'net/rxrpc/recvmsg.c') diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index e791d35ee34b..9d9278a13d91 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -479,6 +479,7 @@ enum rxrpc_call_flag { RXRPC_CALL_RETRANS_TIMEOUT, /* Retransmission due to timeout occurred */ RXRPC_CALL_BEGAN_RX_TIMER, /* We began the expect_rx_by timer */ RXRPC_CALL_RX_HEARD, /* The peer responded at least once to this call */ + RXRPC_CALL_RX_UNDERRUN, /* Got data underrun */ }; /* diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index 02f1f768e16a..a57ea96c84ea 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -313,6 +313,10 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, unsigned int rx_pkt_offset, rx_pkt_len; int ix, copy, ret = -EAGAIN, ret2; + if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) && + call->ackr_reason) + rxrpc_send_ack_packet(call, false, NULL); + rx_pkt_offset = call->rx_pkt_offset; rx_pkt_len = call->rx_pkt_len; @@ -412,6 +416,8 @@ out: done: trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq, rx_pkt_offset, rx_pkt_len, ret); + if (ret == -EAGAIN) + set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags); return ret; } @@ -684,6 +690,17 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, read_phase_complete: ret = 1; out: + switch (call->ackr_reason) { + case RXRPC_ACK_IDLE: + break; + case RXRPC_ACK_DELAY: + if (ret != -EAGAIN) + break; + /* Fall through */ + default: + rxrpc_send_ack_packet(call, false, NULL); + } + if (_service) *_service = call->service_id; mutex_unlock(&call->user_mutex); -- cgit v1.2.3 From eb9950eb31f56e57582a61c92073336d04a26542 Mon Sep 17 00:00:00 2001 From: David Howells Date: Fri, 3 Aug 2018 17:06:56 +0100 Subject: rxrpc: Push iov_iter up from rxrpc_kernel_recv_data() to caller Push iov_iter up from rxrpc_kernel_recv_data() to its caller to allow non-contiguous iovs to be passed down, thereby permitting file reading to be simplified in the AFS filesystem in a future patch. Signed-off-by: David Howells Signed-off-by: David S. Miller --- fs/afs/rxrpc.c | 28 +++++++++++++++++----------- include/net/af_rxrpc.h | 2 +- net/rxrpc/recvmsg.c | 33 +++++++++++---------------------- 3 files changed, 29 insertions(+), 34 deletions(-) (limited to 'net/rxrpc/recvmsg.c') diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c index a1b18082991b..19db5f672a9d 100644 --- a/fs/afs/rxrpc.c +++ b/fs/afs/rxrpc.c @@ -346,7 +346,6 @@ long afs_make_call(struct afs_addr_cursor *ac, struct afs_call *call, struct rxrpc_call *rxcall; struct msghdr msg; struct kvec iov[1]; - size_t offset; s64 tx_total_len; int ret; @@ -433,10 +432,10 @@ error_do_abort: rxrpc_kernel_abort_call(call->net->socket, rxcall, RX_USER_ABORT, ret, "KSD"); } else { - offset = 0; - rxrpc_kernel_recv_data(call->net->socket, rxcall, NULL, - 0, &offset, false, &call->abort_code, - &call->service_id); + iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, NULL, 0, 0); + rxrpc_kernel_recv_data(call->net->socket, rxcall, + &msg.msg_iter, false, + &call->abort_code, &call->service_id); ac->abort_code = call->abort_code; ac->responded = true; } @@ -467,13 +466,14 @@ static void afs_deliver_to_call(struct afs_call *call) state == AFS_CALL_SV_AWAIT_ACK ) { if (state == AFS_CALL_SV_AWAIT_ACK) { - size_t offset = 0; + struct iov_iter iter; + + iov_iter_kvec(&iter, READ | ITER_KVEC, NULL, 0, 0); ret = rxrpc_kernel_recv_data(call->net->socket, - call->rxcall, - NULL, 0, &offset, false, + call->rxcall, &iter, false, &remote_abort, &call->service_id); - trace_afs_recv_data(call, 0, offset, false, ret); + trace_afs_recv_data(call, 0, 0, false, ret); if (ret == -EINPROGRESS || ret == -EAGAIN) return; @@ -894,6 +894,8 @@ int afs_extract_data(struct afs_call *call, void *buf, size_t count, bool want_more) { struct afs_net *net = call->net; + struct iov_iter iter; + struct kvec iov; enum afs_call_state state; u32 remote_abort = 0; int ret; @@ -903,10 +905,14 @@ int afs_extract_data(struct afs_call *call, void *buf, size_t count, ASSERTCMP(call->offset, <=, count); - ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, - buf, count, &call->offset, + iov.iov_base = buf + call->offset; + iov.iov_len = count - call->offset; + iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, count - call->offset); + + ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, &iter, want_more, &remote_abort, &call->service_id); + call->offset += (count - call->offset) - iov_iter_count(&iter); trace_afs_recv_data(call, count, call->offset, want_more, ret); if (ret == 0 || ret == -EAGAIN) return ret; diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h index 8ae8ee004258..f53edb3754bc 100644 --- a/include/net/af_rxrpc.h +++ b/include/net/af_rxrpc.h @@ -61,7 +61,7 @@ int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *, struct msghdr *, size_t, rxrpc_notify_end_tx_t); int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *, - void *, size_t, size_t *, bool, u32 *, u16 *); + struct iov_iter *, bool, u32 *, u16 *); bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *, u32, int, const char *); void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *); diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c index a57ea96c84ea..816b19a78809 100644 --- a/net/rxrpc/recvmsg.c +++ b/net/rxrpc/recvmsg.c @@ -611,9 +611,7 @@ wait_error: * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info * @sock: The socket that the call exists on * @call: The call to send data through - * @buf: The buffer to receive into - * @size: The size of the buffer, including data already read - * @_offset: The running offset into the buffer. + * @iter: The buffer to receive into * @want_more: True if more data is expected to be read * @_abort: Where the abort code is stored if -ECONNABORTED is returned * @_service: Where to store the actual service ID (may be upgraded) @@ -626,39 +624,30 @@ wait_error: * Note that we may return -EAGAIN to drain empty packets at the end of the * data, even if we've already copied over the requested data. * - * This function adds the amount it transfers to *_offset, so this should be - * precleared as appropriate. Note that the amount remaining in the buffer is - * taken to be size - *_offset. - * * *_abort should also be initialised to 0. */ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, - void *buf, size_t size, size_t *_offset, + struct iov_iter *iter, bool want_more, u32 *_abort, u16 *_service) { - struct iov_iter iter; - struct kvec iov; + size_t offset = 0; int ret; - _enter("{%d,%s},%zu/%zu,%d", + _enter("{%d,%s},%zu,%d", call->debug_id, rxrpc_call_states[call->state], - *_offset, size, want_more); + iov_iter_count(iter), want_more); - ASSERTCMP(*_offset, <=, size); ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING); - iov.iov_base = buf + *_offset; - iov.iov_len = size - *_offset; - iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset); - mutex_lock(&call->user_mutex); switch (READ_ONCE(call->state)) { case RXRPC_CALL_CLIENT_RECV_REPLY: case RXRPC_CALL_SERVER_RECV_REQUEST: case RXRPC_CALL_SERVER_ACK_REQUEST: - ret = rxrpc_recvmsg_data(sock, call, NULL, &iter, size, 0, - _offset); + ret = rxrpc_recvmsg_data(sock, call, NULL, iter, + iov_iter_count(iter), 0, + &offset); if (ret < 0) goto out; @@ -667,7 +656,7 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, * full buffer or have been given -EAGAIN. */ if (ret == 1) { - if (*_offset < size) + if (iov_iter_count(iter) > 0) goto short_data; if (!want_more) goto read_phase_complete; @@ -704,7 +693,7 @@ out: if (_service) *_service = call->service_id; mutex_unlock(&call->user_mutex); - _leave(" = %d [%zu,%d]", ret, *_offset, *_abort); + _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); return ret; short_data: @@ -720,7 +709,7 @@ call_complete: ret = call->error; if (call->completion == RXRPC_CALL_SUCCEEDED) { ret = 1; - if (size > 0) + if (iov_iter_count(iter) > 0) ret = -ECONNRESET; } goto out; -- cgit v1.2.3