summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/sunrpc/auth_gss/svcauth_gss.c2
-rw-r--r--net/sunrpc/cache.c2
-rw-r--r--net/sunrpc/sunrpc.h13
-rw-r--r--net/sunrpc/svc_xprt.c5
-rw-r--r--net/sunrpc/svcauth.c2
-rw-r--r--net/sunrpc/svcsock.c17
-rw-r--r--net/sunrpc/xdr.c174
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c643
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c230
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c69
10 files changed, 547 insertions, 610 deletions
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 0f73f4507746..4ce5eccec1f6 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1503,6 +1503,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
if (unwrap_integ_data(rqstp, &rqstp->rq_arg,
gc->gc_seq, rsci->mechctx))
goto garbage_args;
+ rqstp->rq_auth_slack = RPC_MAX_AUTH_SIZE;
break;
case RPC_GSS_SVC_PRIVACY:
/* placeholders for length and seq. number: */
@@ -1511,6 +1512,7 @@ svcauth_gss_accept(struct svc_rqst *rqstp, __be32 *authp)
if (unwrap_priv_data(rqstp, &rqstp->rq_arg,
gc->gc_seq, rsci->mechctx))
goto garbage_args;
+ rqstp->rq_auth_slack = RPC_MAX_AUTH_SIZE * 2;
break;
default:
goto auth_err;
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index ae333c1845bb..066362141133 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -374,7 +374,7 @@ void sunrpc_destroy_cache_detail(struct cache_detail *cd)
}
return;
out:
- printk(KERN_ERR "nfsd: failed to unregister %s cache\n", cd->name);
+ printk(KERN_ERR "RPC: failed to unregister %s cache\n", cd->name);
}
EXPORT_SYMBOL_GPL(sunrpc_destroy_cache_detail);
diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
index 14c9f6d1c5ff..f2b7cb540e61 100644
--- a/net/sunrpc/sunrpc.h
+++ b/net/sunrpc/sunrpc.h
@@ -43,6 +43,19 @@ static inline int rpc_reply_expected(struct rpc_task *task)
(task->tk_msg.rpc_proc->p_decode != NULL);
}
+static inline int sock_is_loopback(struct sock *sk)
+{
+ struct dst_entry *dst;
+ int loopback = 0;
+ rcu_read_lock();
+ dst = rcu_dereference(sk->sk_dst_cache);
+ if (dst && dst->dev &&
+ (dst->dev->features & NETIF_F_LOOPBACK))
+ loopback = 1;
+ rcu_read_unlock();
+ return loopback;
+}
+
int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
struct page *headpage, unsigned long headoffset,
struct page *tailpage, unsigned long tailoffset);
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 06c6ff0cb911..b4737fbdec13 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -597,6 +597,7 @@ static int svc_alloc_arg(struct svc_rqst *rqstp)
}
rqstp->rq_pages[i] = p;
}
+ rqstp->rq_page_end = &rqstp->rq_pages[i];
rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */
/* Make arg->head point to first page and arg->pages point to rest */
@@ -730,6 +731,8 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
newxpt = xprt->xpt_ops->xpo_accept(xprt);
if (newxpt)
svc_add_new_temp_xprt(serv, newxpt);
+ else
+ module_put(xprt->xpt_class->xcl_owner);
} else if (xprt->xpt_ops->xpo_has_wspace(xprt)) {
/* XPT_DATA|XPT_DEFERRED case: */
dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
@@ -793,7 +796,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
clear_bit(XPT_OLD, &xprt->xpt_flags);
- rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp));
+ rqstp->rq_secure = xprt->xpt_ops->xpo_secure_port(rqstp);
rqstp->rq_chandle.defer = svc_defer;
if (serv->sv_stats)
diff --git a/net/sunrpc/svcauth.c b/net/sunrpc/svcauth.c
index 2af7b0cba43a..79c0f3459b5c 100644
--- a/net/sunrpc/svcauth.c
+++ b/net/sunrpc/svcauth.c
@@ -54,6 +54,8 @@ svc_authenticate(struct svc_rqst *rqstp, __be32 *authp)
}
spin_unlock(&authtab_lock);
+ rqstp->rq_auth_slack = 0;
+
rqstp->rq_authop = aops;
return aops->accept(rqstp, authp);
}
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 43bcb4699d69..b507cd327d9b 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -400,6 +400,12 @@ static void svc_sock_setbufsize(struct socket *sock, unsigned int snd,
release_sock(sock->sk);
#endif
}
+
+static int svc_sock_secure_port(struct svc_rqst *rqstp)
+{
+ return svc_port_is_privileged(svc_addr(rqstp));
+}
+
/*
* INET callback when data has been received on the socket.
*/
@@ -678,6 +684,7 @@ static struct svc_xprt_ops svc_udp_ops = {
.xpo_prep_reply_hdr = svc_udp_prep_reply_hdr,
.xpo_has_wspace = svc_udp_has_wspace,
.xpo_accept = svc_udp_accept,
+ .xpo_secure_port = svc_sock_secure_port,
};
static struct svc_xprt_class svc_udp_class = {
@@ -842,8 +849,7 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
* tell us anything. For now just warn about unpriv connections.
*/
if (!svc_port_is_privileged(sin)) {
- dprintk(KERN_WARNING
- "%s: connect from unprivileged port: %s\n",
+ dprintk("%s: connect from unprivileged port: %s\n",
serv->sv_name,
__svc_print_addr(sin, buf, sizeof(buf)));
}
@@ -867,6 +873,10 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
}
svc_xprt_set_local(&newsvsk->sk_xprt, sin, slen);
+ if (sock_is_loopback(newsock->sk))
+ set_bit(XPT_LOCAL, &newsvsk->sk_xprt.xpt_flags);
+ else
+ clear_bit(XPT_LOCAL, &newsvsk->sk_xprt.xpt_flags);
if (serv->sv_stats)
serv->sv_stats->nettcpconn++;
@@ -1112,6 +1122,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
rqstp->rq_xprt_ctxt = NULL;
rqstp->rq_prot = IPPROTO_TCP;
+ rqstp->rq_local = !!test_bit(XPT_LOCAL, &svsk->sk_xprt.xpt_flags);
p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
calldir = p[1];
@@ -1234,6 +1245,7 @@ static struct svc_xprt_ops svc_tcp_bc_ops = {
.xpo_detach = svc_bc_tcp_sock_detach,
.xpo_free = svc_bc_sock_free,
.xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr,
+ .xpo_secure_port = svc_sock_secure_port,
};
static struct svc_xprt_class svc_tcp_bc_class = {
@@ -1272,6 +1284,7 @@ static struct svc_xprt_ops svc_tcp_ops = {
.xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr,
.xpo_has_wspace = svc_tcp_has_wspace,
.xpo_accept = svc_tcp_accept,
+ .xpo_secure_port = svc_sock_secure_port,
};
static struct svc_xprt_class svc_tcp_class = {
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index dd97ba3c4456..23fb4e75e245 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -462,6 +462,7 @@ void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
struct kvec *iov = buf->head;
int scratch_len = buf->buflen - buf->page_len - buf->tail[0].iov_len;
+ xdr_set_scratch_buffer(xdr, NULL, 0);
BUG_ON(scratch_len < 0);
xdr->buf = buf;
xdr->iov = iov;
@@ -482,6 +483,73 @@ void xdr_init_encode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p)
EXPORT_SYMBOL_GPL(xdr_init_encode);
/**
+ * xdr_commit_encode - Ensure all data is written to buffer
+ * @xdr: pointer to xdr_stream
+ *
+ * We handle encoding across page boundaries by giving the caller a
+ * temporary location to write to, then later copying the data into
+ * place; xdr_commit_encode does that copying.
+ *
+ * Normally the caller doesn't need to call this directly, as the
+ * following xdr_reserve_space will do it. But an explicit call may be
+ * required at the end of encoding, or any other time when the xdr_buf
+ * data might be read.
+ */
+void xdr_commit_encode(struct xdr_stream *xdr)
+{
+ int shift = xdr->scratch.iov_len;
+ void *page;
+
+ if (shift == 0)
+ return;
+ page = page_address(*xdr->page_ptr);
+ memcpy(xdr->scratch.iov_base, page, shift);
+ memmove(page, page + shift, (void *)xdr->p - page);
+ xdr->scratch.iov_len = 0;
+}
+EXPORT_SYMBOL_GPL(xdr_commit_encode);
+
+__be32 *xdr_get_next_encode_buffer(struct xdr_stream *xdr, size_t nbytes)
+{
+ static __be32 *p;
+ int space_left;
+ int frag1bytes, frag2bytes;
+
+ if (nbytes > PAGE_SIZE)
+ return NULL; /* Bigger buffers require special handling */
+ if (xdr->buf->len + nbytes > xdr->buf->buflen)
+ return NULL; /* Sorry, we're totally out of space */
+ frag1bytes = (xdr->end - xdr->p) << 2;
+ frag2bytes = nbytes - frag1bytes;
+ if (xdr->iov)
+ xdr->iov->iov_len += frag1bytes;
+ else
+ xdr->buf->page_len += frag1bytes;
+ xdr->page_ptr++;
+ xdr->iov = NULL;
+ /*
+ * If the last encode didn't end exactly on a page boundary, the
+ * next one will straddle boundaries. Encode into the next
+ * page, then copy it back later in xdr_commit_encode. We use
+ * the "scratch" iov to track any temporarily unused fragment of
+ * space at the end of the previous buffer:
+ */
+ xdr->scratch.iov_base = xdr->p;
+ xdr->scratch.iov_len = frag1bytes;
+ p = page_address(*xdr->page_ptr);
+ /*
+ * Note this is where the next encode will start after we've
+ * shifted this one back:
+ */
+ xdr->p = (void *)p + frag2bytes;
+ space_left = xdr->buf->buflen - xdr->buf->len;
+ xdr->end = (void *)p + min_t(int, space_left, PAGE_SIZE);
+ xdr->buf->page_len += frag2bytes;
+ xdr->buf->len += nbytes;
+ return p;
+}
+
+/**
* xdr_reserve_space - Reserve buffer space for sending
* @xdr: pointer to xdr_stream
* @nbytes: number of bytes to reserve
@@ -495,20 +563,122 @@ __be32 * xdr_reserve_space(struct xdr_stream *xdr, size_t nbytes)
__be32 *p = xdr->p;
__be32 *q;
+ xdr_commit_encode(xdr);
/* align nbytes on the next 32-bit boundary */
nbytes += 3;
nbytes &= ~3;
q = p + (nbytes >> 2);
if (unlikely(q > xdr->end || q < p))
- return NULL;
+ return xdr_get_next_encode_buffer(xdr, nbytes);
xdr->p = q;
- xdr->iov->iov_len += nbytes;
+ if (xdr->iov)
+ xdr->iov->iov_len += nbytes;
+ else
+ xdr->buf->page_len += nbytes;
xdr->buf->len += nbytes;
return p;
}
EXPORT_SYMBOL_GPL(xdr_reserve_space);
/**
+ * xdr_truncate_encode - truncate an encode buffer
+ * @xdr: pointer to xdr_stream
+ * @len: new length of buffer
+ *
+ * Truncates the xdr stream, so that xdr->buf->len == len,
+ * and xdr->p points at offset len from the start of the buffer, and
+ * head, tail, and page lengths are adjusted to correspond.
+ *
+ * If this means moving xdr->p to a different buffer, we assume that
+ * that the end pointer should be set to the end of the current page,
+ * except in the case of the head buffer when we assume the head
+ * buffer's current length represents the end of the available buffer.
+ *
+ * This is *not* safe to use on a buffer that already has inlined page
+ * cache pages (as in a zero-copy server read reply), except for the
+ * simple case of truncating from one position in the tail to another.
+ *
+ */
+void xdr_truncate_encode(struct xdr_stream *xdr, size_t len)
+{
+ struct xdr_buf *buf = xdr->buf;
+ struct kvec *head = buf->head;
+ struct kvec *tail = buf->tail;
+ int fraglen;
+ int new, old;
+
+ if (len > buf->len) {
+ WARN_ON_ONCE(1);
+ return;
+ }
+ xdr_commit_encode(xdr);
+
+ fraglen = min_t(int, buf->len - len, tail->iov_len);
+ tail->iov_len -= fraglen;
+ buf->len -= fraglen;
+ if (tail->iov_len && buf->len == len) {
+ xdr->p = tail->iov_base + tail->iov_len;
+ /* xdr->end, xdr->iov should be set already */
+ return;
+ }
+ WARN_ON_ONCE(fraglen);
+ fraglen = min_t(int, buf->len - len, buf->page_len);
+ buf->page_len -= fraglen;
+ buf->len -= fraglen;
+
+ new = buf->page_base + buf->page_len;
+ old = new + fraglen;
+ xdr->page_ptr -= (old >> PAGE_SHIFT) - (new >> PAGE_SHIFT);
+
+ if (buf->page_len && buf->len == len) {
+ xdr->p = page_address(*xdr->page_ptr);
+ xdr->end = (void *)xdr->p + PAGE_SIZE;
+ xdr->p = (void *)xdr->p + (new % PAGE_SIZE);
+ /* xdr->iov should already be NULL */
+ return;
+ }
+ if (fraglen) {
+ xdr->end = head->iov_base + head->iov_len;
+ xdr->page_ptr--;
+ }
+ /* (otherwise assume xdr->end is already set) */
+ head->iov_len = len;
+ buf->len = len;
+ xdr->p = head->iov_base + head->iov_len;
+ xdr->iov = buf->head;
+}
+EXPORT_SYMBOL(xdr_truncate_encode);
+
+/**
+ * xdr_restrict_buflen - decrease available buffer space
+ * @xdr: pointer to xdr_stream
+ * @newbuflen: new maximum number of bytes available
+ *
+ * Adjust our idea of how much space is available in the buffer.
+ * If we've already used too much space in the buffer, returns -1.
+ * If the available space is already smaller than newbuflen, returns 0
+ * and does nothing. Otherwise, adjusts xdr->buf->buflen to newbuflen
+ * and ensures xdr->end is set at most offset newbuflen from the start
+ * of the buffer.
+ */
+int xdr_restrict_buflen(struct xdr_stream *xdr, int newbuflen)
+{
+ struct xdr_buf *buf = xdr->buf;
+ int left_in_this_buf = (void *)xdr->end - (void *)xdr->p;
+ int end_offset = buf->len + left_in_this_buf;
+
+ if (newbuflen < 0 || newbuflen < buf->len)
+ return -1;
+ if (newbuflen > buf->buflen)
+ return 0;
+ if (newbuflen < end_offset)
+ xdr->end = (void *)xdr->end + newbuflen - end_offset;
+ buf->buflen = newbuflen;
+ return 0;
+}
+EXPORT_SYMBOL(xdr_restrict_buflen);
+
+/**
* xdr_write_pages - Insert a list of pages into an XDR buffer for sending
* @xdr: pointer to xdr_stream
* @pages: list of pages
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 8d904e4eef15..8f92a61ee2df 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -69,7 +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
/* Set up the XDR head */
rqstp->rq_arg.head[0].iov_base = page_address(page);
- rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+ rqstp->rq_arg.head[0].iov_len =
+ min_t(size_t, byte_count, ctxt->sge[0].length);
rqstp->rq_arg.len = byte_count;
rqstp->rq_arg.buflen = byte_count;
@@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
page = ctxt->pages[sge_no];
put_page(rqstp->rq_pages[sge_no]);
rqstp->rq_pages[sge_no] = page;
- bc -= min(bc, ctxt->sge[sge_no].length);
+ bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
sge_no++;
}
@@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
rqstp->rq_arg.tail[0].iov_len = 0;
}
-/* Encode a read-chunk-list as an array of IB SGE
- *
- * Assumptions:
- * - chunk[0]->position points to pages[0] at an offset of 0
- * - pages[] is not physically or virtually contiguous and consists of
- * PAGE_SIZE elements.
- *
- * Output:
- * - sge array pointing into pages[] array.
- * - chunk_sge array specifying sge index and count for each
- * chunk in the read list
- *
- */
-static int map_read_chunks(struct svcxprt_rdma *xprt,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *head,
- struct rpcrdma_msg *rmsgp,
- struct svc_rdma_req_map *rpl_map,
- struct svc_rdma_req_map *chl_map,
- int ch_count,
- int byte_count)
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
{
- int sge_no;
- int sge_bytes;
- int page_off;
- int page_no;
- int ch_bytes;
- int ch_no;
- struct rpcrdma_read_chunk *ch;
+ if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
+ RDMA_TRANSPORT_IWARP)
+ return 1;
+ else
+ return min_t(int, sge_count, xprt->sc_max_sge);
+}
- sge_no = 0;
- page_no = 0;
- page_off = 0;
- ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
- ch_no = 0;
- ch_bytes = ntohl(ch->rc_target.rs_length);
- head->arg.head[0] = rqstp->rq_arg.head[0];
- head->arg.tail[0] = rqstp->rq_arg.tail[0];
- head->arg.pages = &head->pages[head->count];
- head->hdr_count = head->count; /* save count of hdr pages */
- head->arg.page_base = 0;
- head->arg.page_len = ch_bytes;
- head->arg.len = rqstp->rq_arg.len + ch_bytes;
- head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
- head->count++;
- chl_map->ch[0].start = 0;
- while (byte_count) {
- rpl_map->sge[sge_no].iov_base =
- page_address(rqstp->rq_arg.pages[page_no]) + page_off;
- sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
- rpl_map->sge[sge_no].iov_len = sge_bytes;
- /*
- * Don't bump head->count here because the same page
- * may be used by multiple SGE.
- */
- head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
- rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ int last);
+
+/* Issue an RDMA_READ using the local lkey to map the data sink */
+static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ int last)
+{
+ struct ib_send_wr read_wr;
+ int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+ struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
+ int ret, read, pno;
+ u32 pg_off = *page_offset;
+ u32 pg_no = *page_no;
+
+ ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->read_hdr = head;
+ pages_needed =
+ min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
+ read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
+
+ for (pno = 0; pno < pages_needed; pno++) {
+ int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
+
+ head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+ head->arg.page_len += len;
+ head->arg.len += len;
+ if (!pg_off)
+ head->count++;
+ rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
rqstp->rq_next_page = rqstp->rq_respages + 1;
+ ctxt->sge[pno].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ head->arg.pages[pg_no], pg_off,
+ PAGE_SIZE - pg_off,
+ DMA_FROM_DEVICE);
+ ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
+ ctxt->sge[pno].addr);
+ if (ret)
+ goto err;
+ atomic_inc(&xprt->sc_dma_used);
- byte_count -= sge_bytes;
- ch_bytes -= sge_bytes;
- sge_no++;
- /*
- * If all bytes for this chunk have been mapped to an
- * SGE, move to the next SGE
- */
- if (ch_bytes == 0) {
- chl_map->ch[ch_no].count =
- sge_no - chl_map->ch[ch_no].start;
- ch_no++;
- ch++;
- chl_map->ch[ch_no].start = sge_no;
- ch_bytes = ntohl(ch->rc_target.rs_length);
- /* If bytes remaining account for next chunk */
- if (byte_count) {
- head->arg.page_len += ch_bytes;
- head->arg.len += ch_bytes;
- head->arg.buflen += ch_bytes;
- }
+ /* The lkey here is either a local dma lkey or a dma_mr lkey */
+ ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
+ ctxt->sge[pno].length = len;
+ ctxt->count++;
+
+ /* adjust offset and wrap to next page if needed */
+ pg_off += len;
+ if (pg_off == PAGE_SIZE) {
+ pg_off = 0;
+ pg_no++;
}
- /*
- * If this SGE consumed all of the page, move to the
- * next page
- */
- if ((sge_bytes + page_off) == PAGE_SIZE) {
- page_no++;
- page_off = 0;
- /*
- * If there are still bytes left to map, bump
- * the page count
- */
- if (byte_count)
- head->count++;
- } else
- page_off += sge_bytes;
+ rs_length -= len;
}
- BUG_ON(byte_count != 0);
- return sge_no;
+
+ if (last && rs_length == 0)
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ else
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+
+ memset(&read_wr, 0, sizeof(read_wr));
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.opcode = IB_WR_RDMA_READ;
+ ctxt->wr_op = read_wr.opcode;
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = rs_handle;
+ read_wr.wr.rdma.remote_addr = rs_offset;
+ read_wr.sg_list = ctxt->sge;
+ read_wr.num_sge = pages_needed;
+
+ ret = svc_rdma_send(xprt, &read_wr);
+ if (ret) {
+ pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ goto err;
+ }
+
+ /* return current location in page array */
+ *page_no = pg_no;
+ *page_offset = pg_off;
+ ret = read;
+ atomic_inc(&rdma_stat_read);
+ return ret;
+ err:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+ return ret;
}
-/* Map a read-chunk-list to an XDR and fast register the page-list.
- *
- * Assumptions:
- * - chunk[0] position points to pages[0] at an offset of 0
- * - pages[] will be made physically contiguous by creating a one-off memory
- * region using the fastreg verb.
- * - byte_count is # of bytes in read-chunk-list
- * - ch_count is # of chunks in read-chunk-list
- *
- * Output:
- * - sge array pointing into pages[] array.
- * - chunk_sge array specifying sge index and count for each
- * chunk in the read list
- */
-static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
+/* Issue an RDMA_READ using an FRMR to map the data sink */
+static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *head,
- struct rpcrdma_msg *rmsgp,
- struct svc_rdma_req_map *rpl_map,
- struct svc_rdma_req_map *chl_map,
- int ch_count,
- int byte_count)
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ int last)
{
- int page_no;
- int ch_no;
- u32 offset;
- struct rpcrdma_read_chunk *ch;
- struct svc_rdma_fastreg_mr *frmr;
- int ret = 0;
+ struct ib_send_wr read_wr;
+ struct ib_send_wr inv_wr;
+ struct ib_send_wr fastreg_wr;
+ u8 key;
+ int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+ struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
+ struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
+ int ret, read, pno;
+ u32 pg_off = *page_offset;
+ u32 pg_no = *page_no;
- frmr = svc_rdma_get_frmr(xprt);
if (IS_ERR(frmr))
return -ENOMEM;
- head->frmr = frmr;
- head->arg.head[0] = rqstp->rq_arg.head[0];
- head->arg.tail[0] = rqstp->rq_arg.tail[0];
- head->arg.pages = &head->pages[head->count];
- head->hdr_count = head->count; /* save count of hdr pages */
- head->arg.page_base = 0;
- head->arg.page_len = byte_count;
- head->arg.len = rqstp->rq_arg.len + byte_count;
- head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
+ ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->frmr = frmr;
+ pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
+ read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
- /* Fast register the page list */
- frmr->kva = page_address(rqstp->rq_arg.pages[0]);
+ frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
frmr->direction = DMA_FROM_DEVICE;
frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
- frmr->map_len = byte_count;
- frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
- for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
- frmr->page_list->page_list[page_no] =
+ frmr->map_len = pages_needed << PAGE_SHIFT;
+ frmr->page_list_len = pages_needed;
+
+ for (pno = 0; pno < pages_needed; pno++) {
+ int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
+
+ head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+ head->arg.page_len += len;
+ head->arg.len += len;
+ if (!pg_off)
+ head->count++;
+ rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+ frmr->page_list->page_list[pno] =
ib_dma_map_page(xprt->sc_cm_id->device,
- rqstp->rq_arg.pages[page_no], 0,
+ head->arg.pages[pg_no], 0,
PAGE_SIZE, DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device,
- frmr->page_list->page_list[page_no]))
- goto fatal_err;
+ ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
+ frmr->page_list->page_list[pno]);
+ if (ret)
+ goto err;
atomic_inc(&xprt->sc_dma_used);
- head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
- }
- head->count += page_no;
-
- /* rq_respages points one past arg pages */
- rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
- /* Create the reply and chunk maps */
- offset = 0;
- ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
- for (ch_no = 0; ch_no < ch_count; ch_no++) {
- int len = ntohl(ch->rc_target.rs_length);
- rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
- rpl_map->sge[ch_no].iov_len = len;
- chl_map->ch[ch_no].count = 1;
- chl_map->ch[ch_no].start = ch_no;
- offset += len;
- ch++;
+ /* adjust offset and wrap to next page if needed */
+ pg_off += len;
+ if (pg_off == PAGE_SIZE) {
+ pg_off = 0;
+ pg_no++;
+ }
+ rs_length -= len;
}
- ret = svc_rdma_fastreg(xprt, frmr);
- if (ret)
- goto fatal_err;
-
- return ch_no;
-
- fatal_err:
- printk("svcrdma: error fast registering xdr for xprt %p", xprt);
- svc_rdma_put_frmr(xprt, frmr);
- return -EIO;
-}
-
-static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
- struct svc_rdma_op_ctxt *ctxt,
- struct svc_rdma_fastreg_mr *frmr,
- struct kvec *vec,
- u64 *sgl_offset,
- int count)
-{
- int i;
- unsigned long off;
+ if (last && rs_length == 0)
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ else
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- ctxt->count = count;
- ctxt->direction = DMA_FROM_DEVICE;
- for (i = 0; i < count; i++) {
- ctxt->sge[i].length = 0; /* in case map fails */
- if (!frmr) {
- BUG_ON(!virt_to_page(vec[i].iov_base));
- off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;
- ctxt->sge[i].addr =
- ib_dma_map_page(xprt->sc_cm_id->device,
- virt_to_page(vec[i].iov_base),
- off,
- vec[i].iov_len,
- DMA_FROM_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device,
- ctxt->sge[i].addr))
- return -EINVAL;
- ctxt->sge[i].lkey = xprt->sc_dma_lkey;
- atomic_inc(&xprt->sc_dma_used);
- } else {
- ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
- ctxt->sge[i].lkey = frmr->mr->lkey;
- }
- ctxt->sge[i].length = vec[i].iov_len;
- *sgl_offset = *sgl_offset + vec[i].iov_len;
+ /* Bump the key */
+ key = (u8)(frmr->mr->lkey & 0x000000FF);
+ ib_update_fast_reg_key(frmr->mr, ++key);
+
+ ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
+ ctxt->sge[0].lkey = frmr->mr->lkey;
+ ctxt->sge[0].length = read;
+ ctxt->count = 1;
+ ctxt->read_hdr = head;
+
+ /* Prepare FASTREG WR */
+ memset(&fastreg_wr, 0, sizeof(fastreg_wr));
+ fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+ fastreg_wr.send_flags = IB_SEND_SIGNALED;
+ fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
+ fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
+ fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
+ fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+ fastreg_wr.wr.fast_reg.length = frmr->map_len;
+ fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
+ fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
+ fastreg_wr.next = &read_wr;
+
+ /* Prepare RDMA_READ */
+ memset(&read_wr, 0, sizeof(read_wr));
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = rs_handle;
+ read_wr.wr.rdma.remote_addr = rs_offset;
+ read_wr.sg_list = ctxt->sge;
+ read_wr.num_sge = 1;
+ if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
+ read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
+ } else {
+ read_wr.opcode = IB_WR_RDMA_READ;
+ read_wr.next = &inv_wr;
+ /* Prepare invalidate */
+ memset(&inv_wr, 0, sizeof(inv_wr));
+ inv_wr.wr_id = (unsigned long)ctxt;
+ inv_wr.opcode = IB_WR_LOCAL_INV;
+ inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
+ inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
+ }
+ ctxt->wr_op = read_wr.opcode;
+
+ /* Post the chain */
+ ret = svc_rdma_send(xprt, &fastreg_wr);
+ if (ret) {
+ pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ goto err;
}
- return 0;
-}
-static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
-{
- if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
- RDMA_TRANSPORT_IWARP) &&
- sge_count > 1)
- return 1;
- else
- return min_t(int, sge_count, xprt->sc_max_sge);
+ /* return current location in page array */
+ *page_no = pg_no;
+ *page_offset = pg_off;
+ ret = read;
+ atomic_inc(&rdma_stat_read);
+ return ret;
+ err:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+ svc_rdma_put_frmr(xprt, frmr);
+ return ret;
}
-/*
- * Use RDMA_READ to read data from the advertised client buffer into the
- * XDR stream starting at rq_arg.head[0].iov_base.
- * Each chunk in the array
- * contains the following fields:
- * discrim - '1', This isn't used for data placement
- * position - The xdr stream offset (the same for every chunk)
- * handle - RMR for client memory region
- * length - data transfer length
- * offset - 64 bit tagged offset in remote memory region
- *
- * On our side, we need to read into a pagelist. The first page immediately
- * follows the RPC header.
- *
- * This function returns:
- * 0 - No error and no read-list found.
- *
- * 1 - Successful read-list processing. The data is not yet in
- * the pagelist and therefore the RPC request must be deferred. The
- * I/O completion will enqueue the transport again and
- * svc_rdma_recvfrom will complete the request.
- *
- * <0 - Error processing/posting read-list.
- *
- * NOTE: The ctxt must not be touched after the last WR has been posted
- * because the I/O completion processing may occur on another
- * processor and free / modify the context. Ne touche pas!
- */
-static int rdma_read_xdr(struct svcxprt_rdma *xprt,
- struct rpcrdma_msg *rmsgp,
- struct svc_rqst *rqstp,
- struct svc_rdma_op_ctxt *hdr_ctxt)
+static int rdma_read_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head)
{
- struct ib_send_wr read_wr;
- struct ib_send_wr inv_wr;
- int err = 0;
- int ch_no;
- int ch_count;
- int byte_count;
- int sge_count;
- u64 sgl_offset;
+ int page_no, ch_count, ret;
struct rpcrdma_read_chunk *ch;
- struct svc_rdma_op_ctxt *ctxt = NULL;
- struct svc_rdma_req_map *rpl_map;
- struct svc_rdma_req_map *chl_map;
+ u32 page_offset, byte_count;
+ u64 rs_offset;
+ rdma_reader_fn reader;
/* If no read list is present, return 0 */
ch = svc_rdma_get_read_chunk(rmsgp);
@@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
if (ch_count > RPCSVC_MAXPAGES)
return -EINVAL;
- /* Allocate temporary reply and chunk maps */
- rpl_map = svc_rdma_get_req_map();
- chl_map = svc_rdma_get_req_map();
+ /* The request is completed when the RDMA_READs complete. The
+ * head context keeps all the pages that comprise the
+ * request.
+ */
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->arg.pages = &head->pages[head->count];
+ head->hdr_count = head->count;
+ head->arg.page_base = 0;
+ head->arg.page_len = 0;
+ head->arg.len = rqstp->rq_arg.len;
+ head->arg.buflen = rqstp->rq_arg.buflen;
- if (!xprt->sc_frmr_pg_list_len)
- sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
- rpl_map, chl_map, ch_count,
- byte_count);
+ /* Use FRMR if supported */
+ if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
+ reader = rdma_read_chunk_frmr;
else
- sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
- rpl_map, chl_map, ch_count,
- byte_count);
- if (sge_count < 0) {
- err = -EIO;
- goto out;
- }
-
- sgl_offset = 0;
- ch_no = 0;
+ reader = rdma_read_chunk_lcl;
+ page_no = 0; page_offset = 0;
for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
- ch->rc_discrim != 0; ch++, ch_no++) {
- u64 rs_offset;
-next_sge:
- ctxt = svc_rdma_get_context(xprt);
- ctxt->direction = DMA_FROM_DEVICE;
- ctxt->frmr = hdr_ctxt->frmr;
- ctxt->read_hdr = NULL;
- clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
+ ch->rc_discrim != 0; ch++) {
- /* Prepare READ WR */
- memset(&read_wr, 0, sizeof read_wr);
- read_wr.wr_id = (unsigned long)ctxt;
- read_wr.opcode = IB_WR_RDMA_READ;
- ctxt->wr_op = read_wr.opcode;
- read_wr.send_flags = IB_SEND_SIGNALED;
- read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
&rs_offset);
- read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
- read_wr.sg_list = ctxt->sge;
- read_wr.num_sge =
- rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
- err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
- &rpl_map->sge[chl_map->ch[ch_no].start],
- &sgl_offset,
- read_wr.num_sge);
- if (err) {
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 0);
- goto out;
- }
- if (((ch+1)->rc_discrim == 0) &&
- (read_wr.num_sge == chl_map->ch[ch_no].count)) {
- /*
- * Mark the last RDMA_READ with a bit to
- * indicate all RPC data has been fetched from
- * the client and the RPC needs to be enqueued.
- */
- set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
- if (hdr_ctxt->frmr) {
- set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
- /*
- * Invalidate the local MR used to map the data
- * sink.
- */
- if (xprt->sc_dev_caps &
- SVCRDMA_DEVCAP_READ_W_INV) {
- read_wr.opcode =
- IB_WR_RDMA_READ_WITH_INV;
- ctxt->wr_op = read_wr.opcode;
- read_wr.ex.invalidate_rkey =
- ctxt->frmr->mr->lkey;
- } else {
- /* Prepare INVALIDATE WR */
- memset(&inv_wr, 0, sizeof inv_wr);
- inv_wr.opcode = IB_WR_LOCAL_INV;
- inv_wr.send_flags = IB_SEND_SIGNALED;
- inv_wr.ex.invalidate_rkey =
- hdr_ctxt->frmr->mr->lkey;
- read_wr.next = &inv_wr;
- }
- }
- ctxt->read_hdr = hdr_ctxt;
- }
- /* Post the read */
- err = svc_rdma_send(xprt, &read_wr);
- if (err) {
- printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
- err);
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
- svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_context(ctxt, 0);
- goto out;
+ byte_count = ntohl(ch->rc_target.rs_length);
+
+ while (byte_count > 0) {
+ ret = reader(xprt, rqstp, head,
+ &page_no, &page_offset,
+ ntohl(ch->rc_target.rs_handle),
+ byte_count, rs_offset,
+ ((ch+1)->rc_discrim == 0) /* last */
+ );
+ if (ret < 0)
+ goto err;
+ byte_count -= ret;
+ rs_offset += ret;
+ head->arg.buflen += ret;
}
- atomic_inc(&rdma_stat_read);
-
- if (read_wr.num_sge < chl_map->ch[ch_no].count) {
- chl_map->ch[ch_no].count -= read_wr.num_sge;
- chl_map->ch[ch_no].start += read_wr.num_sge;
- goto next_sge;
- }
- sgl_offset = 0;
- err = 1;
}
-
- out:
- svc_rdma_put_req_map(rpl_map);
- svc_rdma_put_req_map(chl_map);
-
+ ret = 1;
+ err:
/* Detach arg pages. svc_recv will replenish them */
- for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
- rqstp->rq_pages[ch_no] = NULL;
+ for (page_no = 0;
+ &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
+ rqstp->rq_pages[page_no] = NULL;
- return err;
+ return ret;
}
static int rdma_read_complete(struct svc_rqst *rqstp,
@@ -595,13 +504,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
struct svc_rdma_op_ctxt,
dto_q);
list_del_init(&ctxt->dto_q);
- }
- if (ctxt) {
spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
return rdma_read_complete(rqstp, ctxt);
- }
-
- if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+ } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
struct svc_rdma_op_ctxt,
dto_q);
@@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
goto close_out;
- BUG_ON(ret);
goto out;
}
dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
}
/* Read read-list data. */
- ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
+ ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
if (ret > 0) {
/* read-list posted, defer until data received from client. */
goto defer;
- }
- if (ret < 0) {
+ } else if (ret < 0) {
/* Post of read-list failed, free context. */
svc_rdma_put_context(ctxt, 1);
return 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7e024a51617e..49fd21a5c215 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -49,152 +50,6 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
-/* Encode an XDR as an array of IB SGE
- *
- * Assumptions:
- * - head[0] is physically contiguous.
- * - tail[0] is physically contiguous.
- * - pages[] is not physically or virtually contiguous and consists of
- * PAGE_SIZE elements.
- *
- * Output:
- * SGE[0] reserved for RCPRDMA header
- * SGE[1] data from xdr->head[]
- * SGE[2..sge_count-2] data from xdr->pages[]
- * SGE[sge_count-1] data from xdr->tail.
- *
- * The max SGE we need is the length of the XDR / pagesize + one for
- * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
- * reserves a page for both the request and the reply header, and this
- * array is only concerned with the reply we are assured that we have
- * on extra page for the RPCRMDA header.
- */
-static int fast_reg_xdr(struct svcxprt_rdma *xprt,
- struct xdr_buf *xdr,
- struct svc_rdma_req_map *vec)
-{
- int sge_no;
- u32 sge_bytes;
- u32 page_bytes;
- u32 page_off;
- int page_no = 0;
- u8 *frva;
- struct svc_rdma_fastreg_mr *frmr;
-
- frmr = svc_rdma_get_frmr(xprt);
- if (IS_ERR(frmr))
- return -ENOMEM;
- vec->frmr = frmr;
-
- /* Skip the RPCRDMA header */
- sge_no = 1;
-
- /* Map the head. */
- frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
- vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
- vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
- vec->count = 2;
- sge_no++;
-
- /* Map the XDR head */
- frmr->kva = frva;
- frmr->direction = DMA_TO_DEVICE;
- frmr->access_flags = 0;
- frmr->map_len = PAGE_SIZE;
- frmr->page_list_len = 1;
- page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
- frmr->page_list->page_list[page_no] =
- ib_dma_map_page(xprt->sc_cm_id->device,
- virt_to_page(xdr->head[0].iov_base),
- page_off,
- PAGE_SIZE - page_off,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device,
- frmr->page_list->page_list[page_no]))
- goto fatal_err;
- atomic_inc(&xprt->sc_dma_used);
-
- /* Map the XDR page list */
- page_off = xdr->page_base;
- page_bytes = xdr->page_len + page_off;
- if (!page_bytes)
- goto encode_tail;
-
- /* Map the pages */
- vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
- vec->sge[sge_no].iov_len = page_bytes;
- sge_no++;
- while (page_bytes) {
- struct page *page;
-
- page = xdr->pages[page_no++];
- sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
- page_bytes -= sge_bytes;
-
- frmr->page_list->page_list[page_no] =
- ib_dma_map_page(xprt->sc_cm_id->device,
- page, page_off,
- sge_bytes, DMA_TO_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device,
- frmr->page_list->page_list[page_no]))
- goto fatal_err;
-
- atomic_inc(&xprt->sc_dma_used);
- page_off = 0; /* reset for next time through loop */
- frmr->map_len += PAGE_SIZE;
- frmr->page_list_len++;
- }
- vec->count++;
-
- encode_tail:
- /* Map tail */
- if (0 == xdr->tail[0].iov_len)
- goto done;
-
- vec->count++;
- vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
-
- if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
- ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
- /*
- * If head and tail use the same page, we don't need
- * to map it again.
- */
- vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
- } else {
- void *va;
-
- /* Map another page for the tail */
- page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
- va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
- vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
-
- frmr->page_list->page_list[page_no] =
- ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va),
- page_off,
- PAGE_SIZE,
- DMA_TO_DEVICE);
- if (ib_dma_mapping_error(xprt->sc_cm_id->device,
- frmr->page_list->page_list[page_no]))
- goto fatal_err;
- atomic_inc(&xprt->sc_dma_used);
- frmr->map_len += PAGE_SIZE;
- frmr->page_list_len++;
- }
-
- done:
- if (svc_rdma_fastreg(xprt, frmr))
- goto fatal_err;
-
- return 0;
-
- fatal_err:
- printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
- vec->frmr = NULL;
- svc_rdma_put_frmr(xprt, frmr);
- return -EIO;
-}
-
static int map_xdr(struct svcxprt_rdma *xprt,
struct xdr_buf *xdr,
struct svc_rdma_req_map *vec)
@@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,
BUG_ON(xdr->len !=
(xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
- if (xprt->sc_frmr_pg_list_len)
- return fast_reg_xdr(xprt, xdr, vec);
-
/* Skip the first sge, this is for the RPCRDMA header */
sge_no = 1;
@@ -282,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
}
/* Assumptions:
- * - We are using FRMR
- * - or -
* - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
*/
static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
@@ -327,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
sge_bytes = min_t(size_t,
bc, vec->sge[xdr_sge_no].iov_len-sge_off);
sge[sge_no].length = sge_bytes;
- if (!vec->frmr) {
- sge[sge_no].addr =
- dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
- sge_bytes, DMA_TO_DEVICE);
- xdr_off += sge_bytes;
- if (ib_dma_mapping_error(xprt->sc_cm_id->device,
- sge[sge_no].addr))
- goto err;
- atomic_inc(&xprt->sc_dma_used);
- sge[sge_no].lkey = xprt->sc_dma_lkey;
- } else {
- sge[sge_no].addr = (unsigned long)
- vec->sge[xdr_sge_no].iov_base + sge_off;
- sge[sge_no].lkey = vec->frmr->mr->lkey;
- }
+ sge[sge_no].addr =
+ dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
+ sge_bytes, DMA_TO_DEVICE);
+ xdr_off += sge_bytes;
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+ sge[sge_no].addr))
+ goto err;
+ atomic_inc(&xprt->sc_dma_used);
+ sge[sge_no].lkey = xprt->sc_dma_lkey;
ctxt->count++;
- ctxt->frmr = vec->frmr;
sge_off = 0;
sge_no++;
xdr_sge_no++;
@@ -369,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
return 0;
err:
svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_frmr(xprt, vec->frmr);
svc_rdma_put_context(ctxt, 0);
/* Fatal error, close transport */
return -EIO;
@@ -397,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[1];
- if (vec->frmr)
- max_write = vec->frmr->map_len;
- else
- max_write = xprt->sc_max_sge * PAGE_SIZE;
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
/* Write chunks start at the pagelist */
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
@@ -472,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
res_ary = (struct rpcrdma_write_array *)
&rdma_resp->rm_body.rm_chunks[2];
- if (vec->frmr)
- max_write = vec->frmr->map_len;
- else
- max_write = xprt->sc_max_sge * PAGE_SIZE;
+ max_write = xprt->sc_max_sge * PAGE_SIZE;
/* xdr offset starts at RPC message */
nchunks = ntohl(arg_ary->wc_nchunks);
@@ -545,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
int byte_count)
{
struct ib_send_wr send_wr;
- struct ib_send_wr inv_wr;
int sge_no;
int sge_bytes;
int page_no;
@@ -559,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
"svcrdma: could not post a receive buffer, err=%d."
"Closing transport %p.\n", ret, rdma);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
- svc_rdma_put_frmr(rdma, vec->frmr);
svc_rdma_put_context(ctxt, 0);
return -ENOTCONN;
}
@@ -567,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
/* Prepare the context */
ctxt->pages[0] = page;
ctxt->count = 1;
- ctxt->frmr = vec->frmr;
- if (vec->frmr)
- set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
- else
- clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
/* Prepare the SGE for the RPCRDMA Header */
ctxt->sge[0].lkey = rdma->sc_dma_lkey;
@@ -590,21 +419,15 @@ static int send_reply(struct svcxprt_rdma *rdma,
int xdr_off = 0;
sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
byte_count -= sge_bytes;
- if (!vec->frmr) {
- ctxt->sge[sge_no].addr =
- dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
- sge_bytes, DMA_TO_DEVICE);
- xdr_off += sge_bytes;
- if (ib_dma_mapping_error(rdma->sc_cm_id->device,
- ctxt->sge[sge_no].addr))
- goto err;
- atomic_inc(&rdma->sc_dma_used);
- ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
- } else {
- ctxt->sge[sge_no].addr = (unsigned long)
- vec->sge[sge_no].iov_base;
- ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
- }
+ ctxt->sge[sge_no].addr =
+ dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
+ sge_bytes, DMA_TO_DEVICE);
+ xdr_off += sge_bytes;
+ if (ib_dma_mapping_error(rdma->sc_cm_id->device,
+ ctxt->sge[sge_no].addr))
+ goto err;
+ atomic_inc(&rdma->sc_dma_used);
+ ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
ctxt->sge[sge_no].length = sge_bytes;
}
BUG_ON(byte_count != 0);
@@ -627,6 +450,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
ctxt->sge[page_no+1].length = 0;
}
rqstp->rq_next_page = rqstp->rq_respages + 1;
+
BUG_ON(sge_no > rdma->sc_max_sge);
memset(&send_wr, 0, sizeof send_wr);
ctxt->wr_op = IB_WR_SEND;
@@ -635,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND;
send_wr.send_flags = IB_SEND_SIGNALED;
- if (vec->frmr) {
- /* Prepare INVALIDATE WR */
- memset(&inv_wr, 0, sizeof inv_wr);
- inv_wr.opcode = IB_WR_LOCAL_INV;
- inv_wr.send_flags = IB_SEND_SIGNALED;
- inv_wr.ex.invalidate_rkey =
- vec->frmr->mr->lkey;
- send_wr.next = &inv_wr;
- }
ret = svc_rdma_send(rdma, &send_wr);
if (ret)
@@ -653,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
err:
svc_rdma_unmap_dma(ctxt);
- svc_rdma_put_frmr(rdma, vec->frmr);
svc_rdma_put_context(ctxt, 1);
return -EIO;
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 25688fa2207f..e7323fbbd348 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1,4 +1,5 @@
/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
@@ -65,6 +66,7 @@ static void dto_tasklet_func(unsigned long data);
static void svc_rdma_detach(struct svc_xprt *xprt);
static void svc_rdma_free(struct svc_xprt *xprt);
static int svc_rdma_has_wspace(struct svc_xprt *xprt);
+static int svc_rdma_secure_port(struct svc_rqst *);
static void rq_cq_reap(struct svcxprt_rdma *xprt);
static void sq_cq_reap(struct svcxprt_rdma *xprt);
@@ -82,6 +84,7 @@ static struct svc_xprt_ops svc_rdma_ops = {
.xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
.xpo_has_wspace = svc_rdma_has_wspace,
.xpo_accept = svc_rdma_accept,
+ .xpo_secure_port = svc_rdma_secure_port,
};
struct svc_xprt_class svc_rdma_class = {
@@ -160,7 +163,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)
schedule_timeout_uninterruptible(msecs_to_jiffies(500));
}
map->count = 0;
- map->frmr = NULL;
return map;
}
@@ -336,22 +338,21 @@ static void process_context(struct svcxprt_rdma *xprt,
switch (ctxt->wr_op) {
case IB_WR_SEND:
- if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
- svc_rdma_put_frmr(xprt, ctxt->frmr);
+ BUG_ON(ctxt->frmr);
svc_rdma_put_context(ctxt, 1);
break;
case IB_WR_RDMA_WRITE:
+ BUG_ON(ctxt->frmr);
svc_rdma_put_context(ctxt, 0);
break;
case IB_WR_RDMA_READ:
case IB_WR_RDMA_READ_WITH_INV:
+ svc_rdma_put_frmr(xprt, ctxt->frmr);
if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
BUG_ON(!read_hdr);
- if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
- svc_rdma_put_frmr(xprt, ctxt->frmr);
spin_lock_bh(&xprt->sc_rq_dto_lock);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
list_add_tail(&read_hdr->dto_q,
@@ -363,6 +364,7 @@ static void process_context(struct svcxprt_rdma *xprt,
break;
default:
+ BUG_ON(1);
printk(KERN_ERR "svcrdma: unexpected completion type, "
"opcode=%d\n",
ctxt->wr_op);
@@ -378,29 +380,42 @@ static void process_context(struct svcxprt_rdma *xprt,
static void sq_cq_reap(struct svcxprt_rdma *xprt)
{
struct svc_rdma_op_ctxt *ctxt = NULL;
- struct ib_wc wc;
+ struct ib_wc wc_a[6];
+ struct ib_wc *wc;
struct ib_cq *cq = xprt->sc_sq_cq;
int ret;
+ memset(wc_a, 0, sizeof(wc_a));
+
if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
return;
ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_sq_poll);
- while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
- if (wc.status != IB_WC_SUCCESS)
- /* Close the transport */
- set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
+ int i;
- /* Decrement used SQ WR count */
- atomic_dec(&xprt->sc_sq_count);
- wake_up(&xprt->sc_send_wait);
+ for (i = 0; i < ret; i++) {
+ wc = &wc_a[i];
+ if (wc->status != IB_WC_SUCCESS) {
+ dprintk("svcrdma: sq wc err status %d\n",
+ wc->status);
- ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
- if (ctxt)
- process_context(xprt, ctxt);
+ /* Close the transport */
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ }
- svc_xprt_put(&xprt->sc_xprt);
+ /* Decrement used SQ WR count */
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+
+ ctxt = (struct svc_rdma_op_ctxt *)
+ (unsigned long)wc->wr_id;
+ if (ctxt)
+ process_context(xprt, ctxt);
+
+ svc_xprt_put(&xprt->sc_xprt);
+ }
}
if (ctxt)
@@ -993,7 +1008,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
need_dma_mr = 0;
break;
case RDMA_TRANSPORT_IB:
- if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
+ if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
+ need_dma_mr = 1;
+ dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
+ } else if (!(devattr.device_cap_flags &
+ IB_DEVICE_LOCAL_DMA_LKEY)) {
need_dma_mr = 1;
dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
} else
@@ -1190,14 +1209,7 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)
container_of(xprt, struct svcxprt_rdma, sc_xprt);
/*
- * If there are fewer SQ WR available than required to send a
- * simple response, return false.
- */
- if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
- return 0;
-
- /*
- * ...or there are already waiters on the SQ,
+ * If there are already waiters on the SQ,
* return false.
*/
if (waitqueue_active(&rdma->sc_send_wait))
@@ -1207,6 +1219,11 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)
return 1;
}
+static int svc_rdma_secure_port(struct svc_rqst *rqstp)
+{
+ return 1;
+}
+
/*
* Attempt to register the kvec representing the RPC memory with the
* device.