diff options
author | David Howells <dhowells@redhat.com> | 2020-01-23 13:13:41 +0000 |
---|---|---|
committer | David Howells <dhowells@redhat.com> | 2022-12-01 13:36:42 +0000 |
commit | 5e6ef4f1017c7f844e305283bbd8875af475e2fc (patch) | |
tree | a1af5b9ab3f538d84a50214be6c41fd0700d4bca /net/rxrpc/call_accept.c | |
parent | 393a2a2007d13df7ae54c94328b45b6c2269b6a9 (diff) | |
download | linux-5e6ef4f1017c7f844e305283bbd8875af475e2fc.tar.bz2 |
rxrpc: Make the I/O thread take over the call and local processor work
Move the functions from the call->processor and local->processor work items
into the domain of the I/O thread.
The call event processor, now called from the I/O thread, then takes over
the job of cranking the call state machine, processing incoming packets and
transmitting DATA, ACK and ABORT packets. In a future patch,
rxrpc_send_ACK() will transmit the ACK on the spot rather than queuing it
for later transmission.
The call event processor becomes purely received-skb driven. It only
transmits things in response to events. We use "pokes" to queue a dummy
skb to make it do things like start/resume transmitting data. Timer expiry
also results in pokes.
The connection event processor, becomes similar, though crypto events, such
as dealing with CHALLENGE and RESPONSE packets is offloaded to a work item
to avoid doing crypto in the I/O thread.
The local event processor is removed and VERSION response packets are
generated directly from the packet parser. Similarly, ABORTs generated in
response to protocol errors will be transmitted immediately rather than
being pushed onto a queue for later transmission.
Changes:
========
ver #2)
- Fix a couple of introduced lock context imbalances.
Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/call_accept.c')
-rw-r--r-- | net/rxrpc/call_accept.c | 126 |
1 files changed, 57 insertions, 69 deletions
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c index 11134b7cec17..87b46c2a1985 100644 --- a/net/rxrpc/call_accept.c +++ b/net/rxrpc/call_accept.c @@ -100,6 +100,7 @@ static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx, return -ENOMEM; call->flags |= (1 << RXRPC_CALL_IS_SERVICE); call->state = RXRPC_CALL_SERVER_PREALLOC; + __set_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events); trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), user_call_ID, rxrpc_call_new_prealloc_service); @@ -235,21 +236,6 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx) } /* - * Ping the other end to fill our RTT cache and to retrieve the rwind - * and MTU parameters. - */ -static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb) -{ - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); - ktime_t now = skb->tstamp; - - if (call->peer->rtt_count < 3 || - ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now)) - rxrpc_send_ACK(call, RXRPC_ACK_PING, sp->hdr.serial, - rxrpc_propose_ack_ping_for_params); -} - -/* * Allocate a new incoming call from the prealloc pool, along with a connection * and a peer as necessary. */ @@ -330,33 +316,56 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx, } /* - * Set up a new incoming call. Called in BH context with the RCU read lock - * held. + * Set up a new incoming call. Called from the I/O thread. * * If this is for a kernel service, when we allocate the call, it will have * three refs on it: (1) the kernel service, (2) the user_call_ID tree, (3) the * retainer ref obtained from the backlog buffer. Prealloc calls for userspace - * services only have the ref from the backlog buffer. We pass this ref to the - * caller. + * services only have the ref from the backlog buffer. * * If we want to report an error, we mark the skb with the packet type and - * abort code and return NULL. - * - * The call is returned with the user access mutex held and a ref on it. + * abort code and return false. */ -struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local, - struct rxrpc_sock *rx, - struct sockaddr_rxrpc *peer_srx, - struct sk_buff *skb) +bool rxrpc_new_incoming_call(struct rxrpc_local *local, + struct rxrpc_peer *peer, + struct rxrpc_connection *conn, + struct sockaddr_rxrpc *peer_srx, + struct sk_buff *skb) { - struct rxrpc_skb_priv *sp = rxrpc_skb(skb); const struct rxrpc_security *sec = NULL; - struct rxrpc_connection *conn; - struct rxrpc_peer *peer = NULL; + struct rxrpc_skb_priv *sp = rxrpc_skb(skb); struct rxrpc_call *call = NULL; + struct rxrpc_sock *rx; _enter(""); + /* Don't set up a call for anything other than the first DATA packet. */ + if (sp->hdr.seq != 1 || + sp->hdr.type != RXRPC_PACKET_TYPE_DATA) + return true; /* Just discard */ + + rcu_read_lock(); + + /* Weed out packets to services we're not offering. Packets that would + * begin a call are explicitly rejected and the rest are just + * discarded. + */ + rx = rcu_dereference(local->service); + if (!rx || (sp->hdr.serviceId != rx->srx.srx_service && + sp->hdr.serviceId != rx->second_service) + ) { + if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA && + sp->hdr.seq == 1) + goto unsupported_service; + goto discard; + } + + if (!conn) { + sec = rxrpc_get_incoming_security(rx, skb); + if (!sec) + goto reject; + } + spin_lock(&rx->incoming_lock); if (rx->sk.sk_state == RXRPC_SERVER_LISTEN_DISABLED || rx->sk.sk_state == RXRPC_CLOSE) { @@ -367,19 +376,6 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local, goto no_call; } - /* The peer, connection and call may all have sprung into existence due - * to a duplicate packet being handled on another CPU in parallel, so - * we have to recheck the routing. However, we're now holding - * rx->incoming_lock, so the values should remain stable. - */ - conn = rxrpc_find_connection_rcu(local, peer_srx, skb, &peer); - - if (!conn) { - sec = rxrpc_get_incoming_security(rx, skb); - if (!sec) - goto no_call; - } - call = rxrpc_alloc_incoming_call(rx, local, peer, conn, sec, peer_srx, skb); if (!call) { @@ -398,35 +394,15 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local, rx->notify_new_call(&rx->sk, call, call->user_call_ID); spin_lock(&conn->state_lock); - switch (conn->state) { - case RXRPC_CONN_SERVICE_UNSECURED: + if (conn->state == RXRPC_CONN_SERVICE_UNSECURED) { conn->state = RXRPC_CONN_SERVICE_CHALLENGING; set_bit(RXRPC_CONN_EV_CHALLENGE, &call->conn->events); rxrpc_queue_conn(call->conn, rxrpc_conn_queue_challenge); - break; - - case RXRPC_CONN_SERVICE: - write_lock(&call->state_lock); - if (call->state < RXRPC_CALL_COMPLETE) - call->state = RXRPC_CALL_SERVER_RECV_REQUEST; - write_unlock(&call->state_lock); - break; - - case RXRPC_CONN_REMOTELY_ABORTED: - rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, - conn->abort_code, conn->error); - break; - case RXRPC_CONN_LOCALLY_ABORTED: - rxrpc_abort_call("CON", call, sp->hdr.seq, - conn->abort_code, conn->error); - break; - default: - BUG(); } spin_unlock(&conn->state_lock); - spin_unlock(&rx->incoming_lock); - rxrpc_send_ping(call, skb); + spin_unlock(&rx->incoming_lock); + rcu_read_unlock(); if (hlist_unhashed(&call->error_link)) { spin_lock(&call->peer->lock); @@ -435,12 +411,24 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local, } _leave(" = %p{%d}", call, call->debug_id); - return call; - + rxrpc_input_call_event(call, skb); + rxrpc_put_call(call, rxrpc_call_put_input); + return true; + +unsupported_service: + trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, + RX_INVALID_OPERATION, EOPNOTSUPP); + skb->priority = RX_INVALID_OPERATION; + goto reject; no_call: spin_unlock(&rx->incoming_lock); - _leave(" = NULL [%u]", skb->mark); - return NULL; +reject: + rcu_read_unlock(); + _leave(" = f [%u]", skb->mark); + return false; +discard: + rcu_read_unlock(); + return true; } /* |