summaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/xprtrdma/Makefile2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_pcl.c306
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c196
3 files changed, 422 insertions, 82 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 8ed0377d7a18..55b21bae866d 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -4,5 +4,5 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
rpcrdma-y := transport.o rpc_rdma.o verbs.o frwr_ops.o \
svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
svc_rdma_sendto.o svc_rdma_recvfrom.o svc_rdma_rw.o \
- module.o
+ svc_rdma_pcl.o module.o
rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_pcl.c b/net/sunrpc/xprtrdma/svc_rdma_pcl.c
new file mode 100644
index 000000000000..b63cfeaa2923
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_pcl.c
@@ -0,0 +1,306 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Copyright (c) 2020 Oracle. All rights reserved.
+ */
+
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/sunrpc/rpc_rdma.h>
+
+#include "xprt_rdma.h"
+#include <trace/events/rpcrdma.h>
+
+/**
+ * pcl_free - Release all memory associated with a parsed chunk list
+ * @pcl: parsed chunk list
+ *
+ */
+void pcl_free(struct svc_rdma_pcl *pcl)
+{
+ while (!list_empty(&pcl->cl_chunks)) {
+ struct svc_rdma_chunk *chunk;
+
+ chunk = pcl_first_chunk(pcl);
+ list_del(&chunk->ch_list);
+ kfree(chunk);
+ }
+}
+
+static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
+{
+ struct svc_rdma_chunk *chunk;
+
+ chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
+ if (!chunk)
+ return NULL;
+
+ chunk->ch_position = position;
+ chunk->ch_length = 0;
+ chunk->ch_payload_length = 0;
+ chunk->ch_segcount = 0;
+ return chunk;
+}
+
+static struct svc_rdma_chunk *
+pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
+{
+ struct svc_rdma_chunk *pos;
+
+ pcl_for_each_chunk(pos, pcl) {
+ if (pos->ch_position == position)
+ return pos;
+ }
+ return NULL;
+}
+
+static void pcl_insert_position(struct svc_rdma_pcl *pcl,
+ struct svc_rdma_chunk *chunk)
+{
+ struct svc_rdma_chunk *pos;
+
+ pcl_for_each_chunk(pos, pcl) {
+ if (pos->ch_position > chunk->ch_position)
+ break;
+ }
+ __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
+ pcl->cl_count++;
+}
+
+static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_chunk *chunk,
+ u32 handle, u32 length, u64 offset)
+{
+ struct svc_rdma_segment *segment;
+
+ segment = &chunk->ch_segments[chunk->ch_segcount];
+ segment->rs_handle = handle;
+ segment->rs_length = length;
+ segment->rs_offset = offset;
+
+ trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
+
+ chunk->ch_length += length;
+ chunk->ch_segcount++;
+}
+
+/**
+ * pcl_alloc_call - Construct a parsed chunk list for the Call body
+ * @rctxt: Ingress receive context
+ * @p: Start of an un-decoded Read list
+ *
+ * Assumptions:
+ * - The incoming Read list has already been sanity checked.
+ * - cl_count is already set to the number of segments in
+ * the un-decoded list.
+ * - The list might not be in order by position.
+ *
+ * Return values:
+ * %true: Parsed chunk list was successfully constructed, and
+ * cl_count is updated to be the number of chunks (ie.
+ * unique positions) in the Read list.
+ * %false: Memory allocation failed.
+ */
+bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
+{
+ struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
+ unsigned int i, segcount = pcl->cl_count;
+
+ pcl->cl_count = 0;
+ for (i = 0; i < segcount; i++) {
+ struct svc_rdma_chunk *chunk;
+ u32 position, handle, length;
+ u64 offset;
+
+ p++; /* skip the list discriminator */
+ p = xdr_decode_read_segment(p, &position, &handle,
+ &length, &offset);
+ if (position != 0)
+ continue;
+
+ if (pcl_is_empty(pcl)) {
+ chunk = pcl_alloc_chunk(segcount, position);
+ if (!chunk)
+ return false;
+ pcl_insert_position(pcl, chunk);
+ } else {
+ chunk = list_first_entry(&pcl->cl_chunks,
+ struct svc_rdma_chunk,
+ ch_list);
+ }
+
+ pcl_set_read_segment(rctxt, chunk, handle, length, offset);
+ }
+
+ return true;
+}
+
+/**
+ * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
+ * @rctxt: Ingress receive context
+ * @p: Start of an un-decoded Read list
+ *
+ * Assumptions:
+ * - The incoming Read list has already been sanity checked.
+ * - cl_count is already set to the number of segments in
+ * the un-decoded list.
+ * - The list might not be in order by position.
+ *
+ * Return values:
+ * %true: Parsed chunk list was successfully constructed, and
+ * cl_count is updated to be the number of chunks (ie.
+ * unique position values) in the Read list.
+ * %false: Memory allocation failed.
+ *
+ * TODO:
+ * - Check for chunk range overlaps
+ */
+bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
+{
+ struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
+ unsigned int i, segcount = pcl->cl_count;
+
+ pcl->cl_count = 0;
+ for (i = 0; i < segcount; i++) {
+ struct svc_rdma_chunk *chunk;
+ u32 position, handle, length;
+ u64 offset;
+
+ p++; /* skip the list discriminator */
+ p = xdr_decode_read_segment(p, &position, &handle,
+ &length, &offset);
+ if (position == 0)
+ continue;
+
+ chunk = pcl_lookup_position(pcl, position);
+ if (!chunk) {
+ chunk = pcl_alloc_chunk(segcount, position);
+ if (!chunk)
+ return false;
+ pcl_insert_position(pcl, chunk);
+ }
+
+ pcl_set_read_segment(rctxt, chunk, handle, length, offset);
+ }
+
+ return true;
+}
+
+/**
+ * pcl_alloc_write - Construct a parsed chunk list from a Write list
+ * @rctxt: Ingress receive context
+ * @pcl: Parsed chunk list to populate
+ * @p: Start of an un-decoded Write list
+ *
+ * Assumptions:
+ * - The incoming Write list has already been sanity checked, and
+ * - cl_count is set to the number of chunks in the un-decoded list.
+ *
+ * Return values:
+ * %true: Parsed chunk list was successfully constructed.
+ * %false: Memory allocation failed.
+ */
+bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_pcl *pcl, __be32 *p)
+{
+ struct svc_rdma_segment *segment;
+ struct svc_rdma_chunk *chunk;
+ unsigned int i, j;
+ u32 segcount;
+
+ for (i = 0; i < pcl->cl_count; i++) {
+ p++; /* skip the list discriminator */
+ segcount = be32_to_cpup(p++);
+
+ chunk = pcl_alloc_chunk(segcount, 0);
+ if (!chunk)
+ return false;
+ list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
+
+ for (j = 0; j < segcount; j++) {
+ segment = &chunk->ch_segments[j];
+ p = xdr_decode_rdma_segment(p, &segment->rs_handle,
+ &segment->rs_length,
+ &segment->rs_offset);
+ trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
+
+ chunk->ch_length += segment->rs_length;
+ chunk->ch_segcount++;
+ }
+ }
+ return true;
+}
+
+static int pcl_process_region(const struct xdr_buf *xdr,
+ unsigned int offset, unsigned int length,
+ int (*actor)(const struct xdr_buf *, void *),
+ void *data)
+{
+ struct xdr_buf subbuf;
+
+ if (!length)
+ return 0;
+ if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
+ return -EMSGSIZE;
+ return actor(&subbuf, data);
+}
+
+/**
+ * pcl_process_nonpayloads - Process non-payload regions inside @xdr
+ * @pcl: Chunk list to process
+ * @xdr: xdr_buf to process
+ * @actor: Function to invoke on each non-payload region
+ * @data: Arguments for @actor
+ *
+ * This mechanism must ignore not only result payloads that were already
+ * sent via RDMA Write, but also XDR padding for those payloads that
+ * the upper layer has added.
+ *
+ * Assumptions:
+ * The xdr->len and ch_position fields are aligned to 4-byte multiples.
+ *
+ * Returns:
+ * On success, zero,
+ * %-EMSGSIZE on XDR buffer overflow, or
+ * The return value of @actor
+ */
+int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
+ const struct xdr_buf *xdr,
+ int (*actor)(const struct xdr_buf *, void *),
+ void *data)
+{
+ struct svc_rdma_chunk *chunk, *next;
+ unsigned int start;
+ int ret;
+
+ chunk = pcl_first_chunk(pcl);
+
+ /* No result payloads were generated */
+ if (!chunk || !chunk->ch_payload_length)
+ return actor(xdr, data);
+
+ /* Process the region before the first result payload */
+ ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
+ if (ret < 0)
+ return ret;
+
+ /* Process the regions between each middle result payload */
+ while ((next = pcl_next_chunk(pcl, chunk))) {
+ if (!next->ch_payload_length)
+ break;
+
+ start = pcl_chunk_end_offset(chunk);
+ ret = pcl_process_region(xdr, start, next->ch_position - start,
+ actor, data);
+ if (ret < 0)
+ return ret;
+
+ chunk = next;
+ }
+
+ /* Process the region after the last result payload */
+ start = pcl_chunk_end_offset(chunk);
+ ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c6ea2903c21a..ec9d259b149c 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -93,6 +93,7 @@
* (see rdma_read_complete() below).
*/
+#include <linux/slab.h>
#include <linux/spinlock.h>
#include <asm/unaligned.h>
#include <rdma/ib_verbs.h>
@@ -143,6 +144,10 @@ svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
goto fail2;
svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
+ pcl_init(&ctxt->rc_call_pcl);
+ pcl_init(&ctxt->rc_read_pcl);
+ pcl_init(&ctxt->rc_write_pcl);
+ pcl_init(&ctxt->rc_reply_pcl);
ctxt->rc_recv_wr.next = NULL;
ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
@@ -226,6 +231,11 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
for (i = 0; i < ctxt->rc_page_count; i++)
put_page(ctxt->rc_pages[i]);
+ pcl_free(&ctxt->rc_call_pcl);
+ pcl_free(&ctxt->rc_read_pcl);
+ pcl_free(&ctxt->rc_write_pcl);
+ pcl_free(&ctxt->rc_reply_pcl);
+
if (!ctxt->rc_temp)
llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
else
@@ -385,100 +395,123 @@ static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
arg->len = ctxt->rc_byte_len;
}
-/* This accommodates the largest possible Write chunk.
- */
-#define MAX_BYTES_WRITE_CHUNK ((u32)(RPCSVC_MAXPAGES << PAGE_SHIFT))
-
-/* This accommodates the largest possible Position-Zero
- * Read chunk or Reply chunk.
- */
-#define MAX_BYTES_SPECIAL_CHUNK ((u32)((RPCSVC_MAXPAGES + 2) << PAGE_SHIFT))
-
-/* Sanity check the Read list.
+/**
+ * xdr_count_read_segments - Count number of Read segments in Read list
+ * @rctxt: Ingress receive context
+ * @p: Start of an un-decoded Read list
*
- * Implementation limits:
- * - This implementation supports only one Read chunk.
+ * Before allocating anything, ensure the ingress Read list is safe
+ * to use.
*
- * Sanity checks:
- * - Read list does not overflow Receive buffer.
- * - Segment size limited by largest NFS data payload.
- *
- * The segment count is limited to how many segments can
- * fit in the transport header without overflowing the
- * buffer. That's about 40 Read segments for a 1KB inline
- * threshold.
+ * The segment count is limited to how many segments can fit in the
+ * transport header without overflowing the buffer. That's about 40
+ * Read segments for a 1KB inline threshold.
*
* Return values:
- * %true: Read list is valid. @rctxt's xdr_stream is updated
- * to point to the first byte past the Read list.
- * %false: Read list is corrupt. @rctxt's xdr_stream is left
- * in an unknown state.
+ * %true: Read list is valid. @rctxt's xdr_stream is updated to point
+ * to the first byte past the Read list. rc_read_pcl and
+ * rc_call_pcl cl_count fields are set to the number of
+ * Read segments in the list.
+ * %false: Read list is corrupt. @rctxt's xdr_stream is left in an
+ * unknown state.
*/
-static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
+static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
{
- u32 position, len;
- bool first;
- __be32 *p;
-
- p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
- if (!p)
- return false;
-
- len = 0;
- first = true;
+ rctxt->rc_call_pcl.cl_count = 0;
+ rctxt->rc_read_pcl.cl_count = 0;
while (xdr_item_is_present(p)) {
+ u32 position, handle, length;
+ u64 offset;
+
p = xdr_inline_decode(&rctxt->rc_stream,
rpcrdma_readseg_maxsz * sizeof(*p));
if (!p)
return false;
- if (first) {
- position = be32_to_cpup(p);
- first = false;
- } else if (be32_to_cpup(p) != position) {
- return false;
+ xdr_decode_read_segment(p, &position, &handle,
+ &length, &offset);
+ if (position) {
+ if (position & 3)
+ return false;
+ ++rctxt->rc_read_pcl.cl_count;
+ } else {
+ ++rctxt->rc_call_pcl.cl_count;
}
- p += 2;
- len += be32_to_cpup(p);
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
return false;
}
- return len <= MAX_BYTES_SPECIAL_CHUNK;
+ return true;
}
-/* The segment count is limited to how many segments can
- * fit in the transport header without overflowing the
- * buffer. That's about 60 Write segments for a 1KB inline
- * threshold.
+/* Sanity check the Read list.
+ *
+ * Sanity checks:
+ * - Read list does not overflow Receive buffer.
+ * - Chunk size limited by largest NFS data payload.
+ *
+ * Return values:
+ * %true: Read list is valid. @rctxt's xdr_stream is updated
+ * to point to the first byte past the Read list.
+ * %false: Read list is corrupt. @rctxt's xdr_stream is left
+ * in an unknown state.
*/
-static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
+static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
{
- u32 i, segcount, total;
__be32 *p;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
return false;
- segcount = be32_to_cpup(p);
+ if (!xdr_count_read_segments(rctxt, p))
+ return false;
+ if (!pcl_alloc_call(rctxt, p))
+ return false;
+ return pcl_alloc_read(rctxt, p);
+}
- total = 0;
- for (i = 0; i < segcount; i++) {
- u32 handle, length;
- u64 offset;
+static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt)
+{
+ u32 segcount;
+ __be32 *p;
- p = xdr_inline_decode(&rctxt->rc_stream,
- rpcrdma_segment_maxsz * sizeof(*p));
- if (!p)
- return false;
+ if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount))
+ return false;
- xdr_decode_rdma_segment(p, &handle, &length, &offset);
- trace_svcrdma_decode_wseg(handle, length, offset);
+ /* A bogus segcount causes this buffer overflow check to fail. */
+ p = xdr_inline_decode(&rctxt->rc_stream,
+ segcount * rpcrdma_segment_maxsz * sizeof(*p));
+ return p != NULL;
+}
- total += length;
+/**
+ * xdr_count_write_chunks - Count number of Write chunks in Write list
+ * @rctxt: Received header and decoding state
+ * @p: start of an un-decoded Write list
+ *
+ * Before allocating anything, ensure the ingress Write list is
+ * safe to use.
+ *
+ * Return values:
+ * %true: Write list is valid. @rctxt's xdr_stream is updated
+ * to point to the first byte past the Write list, and
+ * the number of Write chunks is in rc_write_pcl.cl_count.
+ * %false: Write list is corrupt. @rctxt's xdr_stream is left
+ * in an indeterminate state.
+ */
+static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
+{
+ rctxt->rc_write_pcl.cl_count = 0;
+ while (xdr_item_is_present(p)) {
+ if (!xdr_check_write_chunk(rctxt))
+ return false;
+ ++rctxt->rc_write_pcl.cl_count;
+ p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
+ if (!p)
+ return false;
}
- return total <= maxlen;
+ return true;
}
/* Sanity check the Write list.
@@ -498,24 +531,22 @@ static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
*/
static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
{
- u32 chcount = 0;
__be32 *p;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
return false;
- rctxt->rc_write_list = p;
- while (xdr_item_is_present(p)) {
- if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK))
- return false;
- ++chcount;
- p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
- if (!p)
- return false;
- }
- if (!chcount)
- rctxt->rc_write_list = NULL;
- return chcount < 2;
+
+ rctxt->rc_write_list = NULL;
+ if (!xdr_count_write_chunks(rctxt, p))
+ return false;
+ if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p))
+ return false;
+
+ if (!pcl_is_empty(&rctxt->rc_write_pcl))
+ rctxt->rc_write_list = p;
+ rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl);
+ return rctxt->rc_write_pcl.cl_count < 2;
}
/* Sanity check the Reply chunk.
@@ -537,13 +568,16 @@ static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
return false;
+
rctxt->rc_reply_chunk = NULL;
- if (xdr_item_is_present(p)) {
- if (!xdr_check_write_chunk(rctxt, MAX_BYTES_SPECIAL_CHUNK))
- return false;
- rctxt->rc_reply_chunk = p;
- }
- return true;
+ if (!xdr_item_is_present(p))
+ return true;
+ if (!xdr_check_write_chunk(rctxt))
+ return false;
+
+ rctxt->rc_reply_chunk = p;
+ rctxt->rc_reply_pcl.cl_count = 1;
+ return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p);
}
/* RPC-over-RDMA Version One private extension: Remote Invalidation.