summaryrefslogtreecommitdiffstats
path: root/net/rxrpc/output.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2020-01-30 21:48:13 +0000
committerDavid Howells <dhowells@redhat.com>2022-11-08 16:42:28 +0000
commit72f0c6fb057971864fe4d42b289b8e6ede836ef1 (patch)
tree235027c34ecdd59d28f77b12f084e9f83df95996 /net/rxrpc/output.c
parent02a1935640f8f8539b8f2dbd6eeb539de93b2ce4 (diff)
downloadlinux-72f0c6fb057971864fe4d42b289b8e6ede836ef1.tar.bz2
rxrpc: Allocate ACK records at proposal and queue for transmission
Allocate rxrpc_txbuf records for ACKs and put onto a queue for the transmitter thread to dispatch. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/output.c')
-rw-r--r--net/rxrpc/output.c157
1 files changed, 72 insertions, 85 deletions
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 71885d741987..1edbc678e8be 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -16,14 +16,6 @@
#include <net/udp.h>
#include "ar-internal.h"
-struct rxrpc_ack_buffer {
- struct rxrpc_wire_header whdr;
- struct rxrpc_ackpacket ack;
- u8 acks[255];
- u8 pad[3];
- struct rxrpc_ackinfo ackinfo;
-};
-
extern int udpv6_sendmsg(struct sock *sk, struct msghdr *msg, size_t len);
static ssize_t do_udp_sendmsg(struct socket *sk, struct msghdr *msg, size_t len)
@@ -82,22 +74,21 @@ static void rxrpc_set_keepalive(struct rxrpc_call *call)
*/
static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
struct rxrpc_call *call,
- struct rxrpc_ack_buffer *pkt,
+ struct rxrpc_txbuf *txb,
rxrpc_seq_t *_hard_ack,
- rxrpc_seq_t *_top,
- u8 reason)
+ rxrpc_seq_t *_top)
{
- rxrpc_serial_t serial;
+ struct rxrpc_ackinfo ackinfo;
unsigned int tmp;
rxrpc_seq_t hard_ack, top, seq;
int ix;
u32 mtu, jmax;
- u8 *ackp = pkt->acks;
+ u8 *ackp = txb->acks;
tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
- if (!tmp && (reason == RXRPC_ACK_DELAY ||
- reason == RXRPC_ACK_IDLE)) {
+ if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY ||
+ txb->ack.reason == RXRPC_ACK_IDLE)) {
rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip);
return 0;
}
@@ -105,24 +96,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
/* Barrier against rxrpc_input_data(). */
- serial = call->ackr_serial;
hard_ack = READ_ONCE(call->rx_hard_ack);
top = smp_load_acquire(&call->rx_top);
*_hard_ack = hard_ack;
*_top = top;
- pkt->ack.bufferSpace = htons(0);
- pkt->ack.maxSkew = htons(0);
- pkt->ack.firstPacket = htonl(hard_ack + 1);
- pkt->ack.previousPacket = htonl(call->ackr_highest_seq);
- pkt->ack.serial = htonl(serial);
- pkt->ack.reason = reason;
- pkt->ack.nAcks = top - hard_ack;
-
- if (reason == RXRPC_ACK_PING)
- pkt->whdr.flags |= RXRPC_REQUEST_ACK;
+ txb->ack.firstPacket = htonl(hard_ack + 1);
+ txb->ack.previousPacket = htonl(call->ackr_highest_seq);
+ txb->ack.nAcks = top - hard_ack;
- if (after(top, hard_ack)) {
+ if (txb->ack.nAcks) {
seq = hard_ack + 1;
do {
ix = seq & RXRPC_RXTX_BUFF_MASK;
@@ -137,15 +120,16 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
mtu = conn->params.peer->if_mtu;
mtu -= conn->params.peer->hdrsize;
jmax = (call->nr_jumbo_bad > 3) ? 1 : rxrpc_rx_jumbo_max;
- pkt->ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
- pkt->ackinfo.maxMTU = htonl(mtu);
- pkt->ackinfo.rwind = htonl(call->rx_winsize);
- pkt->ackinfo.jumbo_max = htonl(jmax);
+ ackinfo.rxMTU = htonl(rxrpc_rx_mtu);
+ ackinfo.maxMTU = htonl(mtu);
+ ackinfo.rwind = htonl(call->rx_winsize);
+ ackinfo.jumbo_max = htonl(jmax);
*ackp++ = 0;
*ackp++ = 0;
*ackp++ = 0;
- return top - hard_ack + 3;
+ memcpy(ackp, &ackinfo, sizeof(ackinfo));
+ return top - hard_ack + 3 + sizeof(ackinfo);
}
/*
@@ -194,26 +178,21 @@ static void rxrpc_cancel_rtt_probe(struct rxrpc_call *call,
/*
* Send an ACK call packet.
*/
-int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
- rxrpc_serial_t *_serial)
+static int rxrpc_send_ack_packet(struct rxrpc_local *local, struct rxrpc_txbuf *txb)
{
struct rxrpc_connection *conn;
struct rxrpc_ack_buffer *pkt;
+ struct rxrpc_call *call = txb->call;
struct msghdr msg;
- struct kvec iov[2];
+ struct kvec iov[1];
rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top;
size_t len, n;
int ret, rtt_slot = -1;
- u8 reason;
if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags))
return -ECONNRESET;
- pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
- if (!pkt)
- return -ENOMEM;
-
conn = call->conn;
msg.msg_name = &call->peer->srx.transport;
@@ -222,86 +201,94 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
msg.msg_controllen = 0;
msg.msg_flags = 0;
- pkt->whdr.epoch = htonl(conn->proto.epoch);
- pkt->whdr.cid = htonl(call->cid);
- pkt->whdr.callNumber = htonl(call->call_id);
- pkt->whdr.seq = 0;
- pkt->whdr.type = RXRPC_PACKET_TYPE_ACK;
- pkt->whdr.flags = RXRPC_SLOW_START_OK | conn->out_clientflag;
- pkt->whdr.userStatus = 0;
- pkt->whdr.securityIndex = call->security_ix;
- pkt->whdr._rsvd = 0;
- pkt->whdr.serviceId = htons(call->service_id);
+ if (txb->ack.reason == RXRPC_ACK_PING)
+ txb->wire.flags |= RXRPC_REQUEST_ACK;
spin_lock_bh(&call->lock);
- if (ping) {
- reason = RXRPC_ACK_PING;
- } else {
- reason = call->ackr_reason;
- if (!call->ackr_reason) {
- spin_unlock_bh(&call->lock);
- ret = 0;
- goto out;
- }
- call->ackr_reason = 0;
- }
- n = rxrpc_fill_out_ack(conn, call, pkt, &hard_ack, &top, reason);
-
+ n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top);
spin_unlock_bh(&call->lock);
if (n == 0) {
kfree(pkt);
return 0;
}
- iov[0].iov_base = pkt;
- iov[0].iov_len = sizeof(pkt->whdr) + sizeof(pkt->ack) + n;
- iov[1].iov_base = &pkt->ackinfo;
- iov[1].iov_len = sizeof(pkt->ackinfo);
- len = iov[0].iov_len + iov[1].iov_len;
+ iov[0].iov_base = &txb->wire;
+ iov[0].iov_len = sizeof(txb->wire) + sizeof(txb->ack) + n;
+ len = iov[0].iov_len;
serial = atomic_inc_return(&conn->serial);
- pkt->whdr.serial = htonl(serial);
+ txb->wire.serial = htonl(serial);
trace_rxrpc_tx_ack(call->debug_id, serial,
- ntohl(pkt->ack.firstPacket),
- ntohl(pkt->ack.serial),
- pkt->ack.reason, pkt->ack.nAcks);
- if (_serial)
- *_serial = serial;
+ ntohl(txb->ack.firstPacket),
+ ntohl(txb->ack.serial), txb->ack.reason, txb->ack.nAcks);
+ if (txb->ack_why == rxrpc_propose_ack_ping_for_lost_ack)
+ call->acks_lost_ping = serial;
- if (ping)
+ if (txb->ack.reason == RXRPC_ACK_PING)
rtt_slot = rxrpc_begin_rtt_probe(call, serial, rxrpc_rtt_tx_ping);
rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
- iov_iter_kvec(&msg.msg_iter, WRITE, iov, 2, len);
+ iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
ret = do_udp_sendmsg(conn->params.local->socket, &msg, len);
call->peer->last_tx_at = ktime_get_seconds();
if (ret < 0)
trace_rxrpc_tx_fail(call->debug_id, serial, ret,
rxrpc_tx_point_call_ack);
else
- trace_rxrpc_tx_packet(call->debug_id, &pkt->whdr,
+ trace_rxrpc_tx_packet(call->debug_id, &txb->wire,
rxrpc_tx_point_call_ack);
rxrpc_tx_backoff(call, ret);
if (call->state < RXRPC_CALL_COMPLETE) {
- if (ret < 0) {
+ if (ret < 0)
rxrpc_cancel_rtt_probe(call, serial, rtt_slot);
- rxrpc_propose_ACK(call, pkt->ack.reason,
- ntohl(pkt->ack.serial),
- false, true,
- rxrpc_propose_ack_retry_tx);
- }
-
rxrpc_set_keepalive(call);
}
-out:
kfree(pkt);
return ret;
}
/*
+ * ACK transmitter for a local endpoint. The UDP socket locks around each
+ * transmission, so we can only transmit one packet at a time, ACK, DATA or
+ * otherwise.
+ */
+void rxrpc_transmit_ack_packets(struct rxrpc_local *local)
+{
+ LIST_HEAD(queue);
+ int ret;
+
+ trace_rxrpc_local(local->debug_id, rxrpc_local_tx_ack,
+ refcount_read(&local->ref), NULL);
+
+ if (list_empty(&local->ack_tx_queue))
+ return;
+
+ spin_lock_bh(&local->ack_tx_lock);
+ list_splice_tail_init(&local->ack_tx_queue, &queue);
+ spin_unlock_bh(&local->ack_tx_lock);
+
+ while (!list_empty(&queue)) {
+ struct rxrpc_txbuf *txb =
+ list_entry(queue.next, struct rxrpc_txbuf, tx_link);
+
+ ret = rxrpc_send_ack_packet(local, txb);
+ if (ret < 0 && ret != -ECONNRESET) {
+ spin_lock_bh(&local->ack_tx_lock);
+ list_splice_init(&queue, &local->ack_tx_queue);
+ spin_unlock_bh(&local->ack_tx_lock);
+ break;
+ }
+
+ list_del_init(&txb->tx_link);
+ rxrpc_put_call(txb->call, rxrpc_call_put);
+ rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx);
+ }
+}
+
+/*
* Send an ABORT call packet.
*/
int rxrpc_send_abort_packet(struct rxrpc_call *call)