From 58f1369216367d27e047bead7221ff977431acaa Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 16:45:47 -0500 Subject: SUNRPC: Remove unused function rpc_task_reset_client Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 8 -------- 1 file changed, 8 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index b7f21044f4d8..2b4ad7aa40c6 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -900,14 +900,6 @@ void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) } } -void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt) -{ - rpc_task_release_client(task); - rpc_task_set_client(task, clnt); -} -EXPORT_SYMBOL_GPL(rpc_task_reset_client); - - static void rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg) { -- cgit v1.2.3 From 30c5116b113689c87a711a0963753adadd702c04 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 24 Feb 2015 20:31:39 -0500 Subject: SUNRPC: Uninline xprt_get(); It isn't performance critical. Also allow callers to pass NULL arguments to xprt_get() and xprt_put(). Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 16 +++------------- net/sunrpc/xprt.c | 24 +++++++++++++++++++++--- 2 files changed, 24 insertions(+), 16 deletions(-) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 69ef5b3ab038..1bdb59a2efe8 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -166,7 +167,7 @@ enum xprt_transports { }; struct rpc_xprt { - atomic_t count; /* Reference count */ + struct kref kref; /* Reference count */ struct rpc_xprt_ops * ops; /* transport methods */ const struct rpc_timeout *timeout; /* timeout parms */ @@ -318,24 +319,13 @@ int xprt_adjust_timeout(struct rpc_rqst *req); void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_release(struct rpc_task *task); +struct rpc_xprt * xprt_get(struct rpc_xprt *xprt); void xprt_put(struct rpc_xprt *xprt); struct rpc_xprt * xprt_alloc(struct net *net, size_t size, unsigned int num_prealloc, unsigned int max_req); void xprt_free(struct rpc_xprt *); -/** - * xprt_get - return a reference to an RPC transport. - * @xprt: pointer to the transport - * - */ -static inline struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) -{ - if (atomic_inc_not_zero(&xprt->count)) - return xprt; - return NULL; -} - static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p) { return p + xprt->tsh_size; diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 37edea6fa92d..d8fd84c0cbba 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1307,7 +1307,7 @@ void xprt_release(struct rpc_task *task) static void xprt_init(struct rpc_xprt *xprt, struct net *net) { - atomic_set(&xprt->count, 1); + kref_init(&xprt->kref); spin_lock_init(&xprt->transport_lock); spin_lock_init(&xprt->reserve_lock); @@ -1415,6 +1415,24 @@ static void xprt_destroy(struct rpc_xprt *xprt) xprt->ops->destroy(xprt); } +static void xprt_destroy_kref(struct kref *kref) +{ + xprt_destroy(container_of(kref, struct rpc_xprt, kref)); +} + +/** + * xprt_get - return a reference to an RPC transport. + * @xprt: pointer to the transport + * + */ +struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) +{ + if (xprt != NULL && kref_get_unless_zero(&xprt->kref)) + return xprt; + return NULL; +} +EXPORT_SYMBOL_GPL(xprt_get); + /** * xprt_put - release a reference to an RPC transport. * @xprt: pointer to the transport @@ -1422,7 +1440,7 @@ static void xprt_destroy(struct rpc_xprt *xprt) */ void xprt_put(struct rpc_xprt *xprt) { - if (atomic_dec_and_test(&xprt->count)) - xprt_destroy(xprt); + if (xprt != NULL) + kref_put(&xprt->kref, xprt_destroy_kref); } EXPORT_SYMBOL_GPL(xprt_put); -- cgit v1.2.3 From fda1bfef9e465b28260d27cd9e538dd601c4cdc1 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 14 Feb 2015 17:48:49 -0500 Subject: SUNRPC: Make freeing of struct xprt rcu-safe Have it call kfree_rcu() to ensure that we can use it on rcu-protected lists. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 1 + net/sunrpc/xprt.c | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 1bdb59a2efe8..83218129ff28 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -257,6 +257,7 @@ struct rpc_xprt { struct dentry *debugfs; /* debugfs directory */ atomic_t inject_disconnect; #endif + struct rcu_head rcu; }; #if defined(CONFIG_SUNRPC_BACKCHANNEL) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index d8fd84c0cbba..605858699f6c 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -48,6 +48,7 @@ #include #include #include +#include #include @@ -1166,7 +1167,7 @@ void xprt_free(struct rpc_xprt *xprt) { put_net(xprt->xprt_net); xprt_free_all_slots(xprt); - kfree(xprt); + kfree_rcu(xprt, rcu); } EXPORT_SYMBOL_GPL(xprt_free); -- cgit v1.2.3 From 80b14d5e61ca6d08e46b4fc72baf6e4f738b30ce Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 14 Feb 2015 20:31:59 -0500 Subject: SUNRPC: Add a structure to track multiple transports In order to support multipathing/trunking we will need the ability to track multiple transports. This patch sets up a basic structure for doing so. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprt.h | 5 + include/linux/sunrpc/xprtmultipath.h | 69 +++++ net/sunrpc/Makefile | 3 +- net/sunrpc/xprt.c | 1 + net/sunrpc/xprtmultipath.c | 475 +++++++++++++++++++++++++++++++++++ 5 files changed, 552 insertions(+), 1 deletion(-) create mode 100644 include/linux/sunrpc/xprtmultipath.h create mode 100644 net/sunrpc/xprtmultipath.c (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index 83218129ff28..fb0d212e0d3a 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -197,6 +197,11 @@ struct rpc_xprt { transport */ unsigned int bind_index; /* bind function index */ + /* + * Multipath + */ + struct list_head xprt_switch; + /* * Connection of transports */ diff --git a/include/linux/sunrpc/xprtmultipath.h b/include/linux/sunrpc/xprtmultipath.h new file mode 100644 index 000000000000..5a9acffa41be --- /dev/null +++ b/include/linux/sunrpc/xprtmultipath.h @@ -0,0 +1,69 @@ +/* + * RPC client multipathing definitions + * + * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. + * + * Trond Myklebust + */ +#ifndef _NET_SUNRPC_XPRTMULTIPATH_H +#define _NET_SUNRPC_XPRTMULTIPATH_H + +struct rpc_xprt_iter_ops; +struct rpc_xprt_switch { + spinlock_t xps_lock; + struct kref xps_kref; + + unsigned int xps_nxprts; + struct list_head xps_xprt_list; + + struct net * xps_net; + + const struct rpc_xprt_iter_ops *xps_iter_ops; + + struct rcu_head xps_rcu; +}; + +struct rpc_xprt_iter { + struct rpc_xprt_switch __rcu *xpi_xpswitch; + struct rpc_xprt * xpi_cursor; + + const struct rpc_xprt_iter_ops *xpi_ops; +}; + + +struct rpc_xprt_iter_ops { + void (*xpi_rewind)(struct rpc_xprt_iter *); + struct rpc_xprt *(*xpi_xprt)(struct rpc_xprt_iter *); + struct rpc_xprt *(*xpi_next)(struct rpc_xprt_iter *); +}; + +extern struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, + gfp_t gfp_flags); + +extern struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps); +extern void xprt_switch_put(struct rpc_xprt_switch *xps); + +extern void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps); + +extern void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt); +extern void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt); + +extern void xprt_iter_init(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps); + +extern void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps); + +extern void xprt_iter_destroy(struct rpc_xprt_iter *xpi); + +extern struct rpc_xprt_switch *xprt_iter_xchg_switch( + struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *newswitch); + +extern struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi); +extern struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi); +extern struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi); + +#endif diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile index b512fbd9d79a..ea7ffa12e0f9 100644 --- a/net/sunrpc/Makefile +++ b/net/sunrpc/Makefile @@ -12,7 +12,8 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \ svc.o svcsock.o svcauth.o svcauth_unix.o \ addr.o rpcb_clnt.o timer.o xdr.o \ sunrpc_syms.o cache.o rpc_pipe.o \ - svc_xprt.o + svc_xprt.o \ + xprtmultipath.o sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o sunrpc-$(CONFIG_PROC_FS) += stats.o diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 605858699f6c..323b332f8f7c 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1319,6 +1319,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net) spin_lock_init(&xprt->bc_pa_lock); INIT_LIST_HEAD(&xprt->bc_pa_list); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ + INIT_LIST_HEAD(&xprt->xprt_switch); xprt->last_used = jiffies; xprt->cwnd = RPC_INITCWND; diff --git a/net/sunrpc/xprtmultipath.c b/net/sunrpc/xprtmultipath.c new file mode 100644 index 000000000000..e7fd76975d86 --- /dev/null +++ b/net/sunrpc/xprtmultipath.c @@ -0,0 +1,475 @@ +/* + * Multipath support for RPC + * + * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved. + * + * Trond Myklebust + * + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head, + const struct rpc_xprt *cur); + +static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular; +static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin; +static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall; + +static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + if (unlikely(xprt_get(xprt) == NULL)) + return; + list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list); + smp_wmb(); + if (xps->xps_nxprts == 0) + xps->xps_net = xprt->xprt_net; + xps->xps_nxprts++; +} + +/** + * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * @xprt: pointer to struct rpc_xprt + * + * Adds xprt to the end of the list of struct rpc_xprt in xps. + */ +void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + if (xprt == NULL) + return; + spin_lock(&xps->xps_lock); + if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL) + xprt_switch_add_xprt_locked(xps, xprt); + spin_unlock(&xps->xps_lock); +} + +static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + if (unlikely(xprt == NULL)) + return; + xps->xps_nxprts--; + if (xps->xps_nxprts == 0) + xps->xps_net = NULL; + smp_wmb(); + list_del_rcu(&xprt->xprt_switch); +} + +/** + * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * @xprt: pointer to struct rpc_xprt + * + * Removes xprt from the list of struct rpc_xprt in xps. + */ +void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt) +{ + spin_lock(&xps->xps_lock); + xprt_switch_remove_xprt_locked(xps, xprt); + spin_unlock(&xps->xps_lock); + xprt_put(xprt); +} + +/** + * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch + * @xprt: pointer to struct rpc_xprt + * @gfp_flags: allocation flags + * + * On success, returns an initialised struct rpc_xprt_switch, containing + * the entry xprt. Returns NULL on failure. + */ +struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt, + gfp_t gfp_flags) +{ + struct rpc_xprt_switch *xps; + + xps = kmalloc(sizeof(*xps), gfp_flags); + if (xps != NULL) { + spin_lock_init(&xps->xps_lock); + kref_init(&xps->xps_kref); + xps->xps_nxprts = 0; + INIT_LIST_HEAD(&xps->xps_xprt_list); + xps->xps_iter_ops = &rpc_xprt_iter_singular; + xprt_switch_add_xprt_locked(xps, xprt); + } + + return xps; +} + +static void xprt_switch_free_entries(struct rpc_xprt_switch *xps) +{ + spin_lock(&xps->xps_lock); + while (!list_empty(&xps->xps_xprt_list)) { + struct rpc_xprt *xprt; + + xprt = list_first_entry(&xps->xps_xprt_list, + struct rpc_xprt, xprt_switch); + xprt_switch_remove_xprt_locked(xps, xprt); + spin_unlock(&xps->xps_lock); + xprt_put(xprt); + spin_lock(&xps->xps_lock); + } + spin_unlock(&xps->xps_lock); +} + +static void xprt_switch_free(struct kref *kref) +{ + struct rpc_xprt_switch *xps = container_of(kref, + struct rpc_xprt_switch, xps_kref); + + xprt_switch_free_entries(xps); + kfree_rcu(xps, xps_rcu); +} + +/** + * xprt_switch_get - Return a reference to a rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * + * Returns a reference to xps unless the refcount is already zero. + */ +struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps) +{ + if (xps != NULL && kref_get_unless_zero(&xps->xps_kref)) + return xps; + return NULL; +} + +/** + * xprt_switch_put - Release a reference to a rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * + * Release the reference to xps, and free it once the refcount is zero. + */ +void xprt_switch_put(struct rpc_xprt_switch *xps) +{ + if (xps != NULL) + kref_put(&xps->xps_kref, xprt_switch_free); +} + +/** + * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch + * @xps: pointer to struct rpc_xprt_switch + * + * Sets a round-robin default policy for iterators acting on xps. + */ +void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps) +{ + if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin) + WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin); +} + +static +const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi) +{ + if (xpi->xpi_ops != NULL) + return xpi->xpi_ops; + return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops; +} + +static +void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi) +{ +} + +static +void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi) +{ + WRITE_ONCE(xpi->xpi_cursor, NULL); +} + +static +struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head) +{ + return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch); +} + +static +struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + + if (xps == NULL) + return NULL; + return xprt_switch_find_first_entry(&xps->xps_xprt_list); +} + +static +struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head, + const struct rpc_xprt *cur) +{ + struct rpc_xprt *pos; + + list_for_each_entry_rcu(pos, head, xprt_switch) { + if (cur == pos) + return pos; + } + return NULL; +} + +static +struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + struct list_head *head; + + if (xps == NULL) + return NULL; + head = &xps->xps_xprt_list; + if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2) + return xprt_switch_find_first_entry(head); + return xprt_switch_find_current_entry(head, xpi->xpi_cursor); +} + +static +struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head, + const struct rpc_xprt *cur) +{ + struct rpc_xprt *pos, *prev = NULL; + + list_for_each_entry_rcu(pos, head, xprt_switch) { + if (cur == prev) + return pos; + prev = pos; + } + return NULL; +} + +static +struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head, + struct rpc_xprt **cursor, + xprt_switch_find_xprt_t find_next) +{ + struct rpc_xprt *cur, *pos, *old; + + cur = READ_ONCE(*cursor); + for (;;) { + old = cur; + pos = find_next(head, old); + if (pos == NULL) + break; + cur = cmpxchg_relaxed(cursor, old, pos); + if (cur == old) + break; + } + return pos; +} + +static +struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi, + xprt_switch_find_xprt_t find_next) +{ + struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch); + struct list_head *head; + + if (xps == NULL) + return NULL; + head = &xps->xps_xprt_list; + if (xps->xps_nxprts < 2) + return xprt_switch_find_first_entry(head); + return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next); +} + +static +struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head, + const struct rpc_xprt *cur) +{ + struct rpc_xprt *ret; + + ret = xprt_switch_find_next_entry(head, cur); + if (ret != NULL) + return ret; + return xprt_switch_find_first_entry(head); +} + +static +struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi) +{ + return xprt_iter_next_entry_multiple(xpi, + xprt_switch_find_next_entry_roundrobin); +} + +static +struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi) +{ + return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry); +} + +/* + * xprt_iter_rewind - Resets the xprt iterator + * @xpi: pointer to rpc_xprt_iter + * + * Resets xpi to ensure that it points to the first entry in the list + * of transports. + */ +static +void xprt_iter_rewind(struct rpc_xprt_iter *xpi) +{ + rcu_read_lock(); + xprt_iter_ops(xpi)->xpi_rewind(xpi); + rcu_read_unlock(); +} + +static void __xprt_iter_init(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps, + const struct rpc_xprt_iter_ops *ops) +{ + rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps)); + xpi->xpi_cursor = NULL; + xpi->xpi_ops = ops; +} + +/** + * xprt_iter_init - Initialise an xprt iterator + * @xpi: pointer to rpc_xprt_iter + * @xps: pointer to rpc_xprt_switch + * + * Initialises the iterator to use the default iterator ops + * as set in xps. This function is mainly intended for internal + * use in the rpc_client. + */ +void xprt_iter_init(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps) +{ + __xprt_iter_init(xpi, xps, NULL); +} + +/** + * xprt_iter_init_listall - Initialise an xprt iterator + * @xpi: pointer to rpc_xprt_iter + * @xps: pointer to rpc_xprt_switch + * + * Initialises the iterator to iterate once through the entire list + * of entries in xps. + */ +void xprt_iter_init_listall(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *xps) +{ + __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall); +} + +/** + * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch + * @xpi: pointer to rpc_xprt_iter + * @xps: pointer to a new rpc_xprt_switch or NULL + * + * Swaps out the existing xpi->xpi_xpswitch with a new value. + */ +struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi, + struct rpc_xprt_switch *newswitch) +{ + struct rpc_xprt_switch __rcu *oldswitch; + + /* Atomically swap out the old xpswitch */ + oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch)); + if (newswitch != NULL) + xprt_iter_rewind(xpi); + return rcu_dereference_protected(oldswitch, true); +} + +/** + * xprt_iter_destroy - Destroys the xprt iterator + * @xpi pointer to rpc_xprt_iter + */ +void xprt_iter_destroy(struct rpc_xprt_iter *xpi) +{ + xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL)); +} + +/** + * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor + * @xpi: pointer to rpc_xprt_iter + * + * Returns a pointer to the struct rpc_xprt that is currently + * pointed to by the cursor. + * Caller must be holding rcu_read_lock(). + */ +struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi) +{ + WARN_ON_ONCE(!rcu_read_lock_held()); + return xprt_iter_ops(xpi)->xpi_xprt(xpi); +} + +static +struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi, + struct rpc_xprt *(*fn)(struct rpc_xprt_iter *)) +{ + struct rpc_xprt *ret; + + do { + ret = fn(xpi); + if (ret == NULL) + break; + ret = xprt_get(ret); + } while (ret == NULL); + return ret; +} + +/** + * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor + * @xpi: pointer to rpc_xprt_iter + * + * Returns a reference to the struct rpc_xprt that is currently + * pointed to by the cursor. + */ +struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt *xprt; + + rcu_read_lock(); + xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt); + rcu_read_unlock(); + return xprt; +} + +/** + * xprt_iter_get_next - Returns the next rpc_xprt following the cursor + * @xpi: pointer to rpc_xprt_iter + * + * Returns a reference to the struct rpc_xprt that immediately follows the + * entry pointed to by the cursor. + */ +struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt *xprt; + + rcu_read_lock(); + xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next); + rcu_read_unlock(); + return xprt; +} + +/* Policy for always returning the first entry in the rpc_xprt_switch */ +static +const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = { + .xpi_rewind = xprt_iter_no_rewind, + .xpi_xprt = xprt_iter_first_entry, + .xpi_next = xprt_iter_first_entry, +}; + +/* Policy for round-robin iteration of entries in the rpc_xprt_switch */ +static +const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = { + .xpi_rewind = xprt_iter_default_rewind, + .xpi_xprt = xprt_iter_current_entry, + .xpi_next = xprt_iter_next_entry_roundrobin, +}; + +/* Policy for once-through iteration of entries in the rpc_xprt_switch */ +static +const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = { + .xpi_rewind = xprt_iter_default_rewind, + .xpi_xprt = xprt_iter_current_entry, + .xpi_next = xprt_iter_next_entry_all, +}; -- cgit v1.2.3 From ad01b2c68d0ac5f3d4a3f807bec77b720b1c62a0 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 14:17:26 -0500 Subject: SUNRPC: Make rpc_clnt store the multipath iterators This is a pre-patch for the RPC multipath code. It sets up the storage in struct rpc_clnt for the multipath code. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 2 ++ net/sunrpc/auth_gss/auth_gss.c | 4 ++-- net/sunrpc/clnt.c | 32 +++++++++++++++++++++++++++++--- net/sunrpc/rpcb_clnt.c | 4 ++-- 4 files changed, 35 insertions(+), 7 deletions(-) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 0c5c2cbe96c9..1713e41d65ae 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -25,6 +25,7 @@ #include #include #include +#include struct rpc_inode; @@ -67,6 +68,7 @@ struct rpc_clnt { #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) struct dentry *cl_debugfs; /* debugfs directory */ #endif + struct rpc_xprt_iter cl_xpi; }; /* diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 799e65b944b9..3ce391cb80c4 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1181,12 +1181,12 @@ static struct rpc_auth * gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt) { struct gss_auth *gss_auth; - struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt); + struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch); while (clnt != clnt->cl_parent) { struct rpc_clnt *parent = clnt->cl_parent; /* Find the original parent for this transport */ - if (rcu_access_pointer(parent->cl_xprt) != xprt) + if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps) break; clnt = parent; } diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 2b4ad7aa40c6..625fb8a184c6 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -354,6 +354,7 @@ static void rpc_free_clid(struct rpc_clnt *clnt) } static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, + struct rpc_xprt_switch *xps, struct rpc_xprt *xprt, struct rpc_clnt *parent) { @@ -411,6 +412,8 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, } rpc_clnt_set_transport(clnt, xprt, timeout); + xprt_iter_init(&clnt->cl_xpi, xps); + xprt_switch_put(xps); clnt->cl_rtt = &clnt->cl_rtt_default; rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval); @@ -438,6 +441,7 @@ out_no_clid: out_err: rpciod_down(); out_no_rpciod: + xprt_switch_put(xps); xprt_put(xprt); return ERR_PTR(err); } @@ -446,8 +450,13 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args, struct rpc_xprt *xprt) { struct rpc_clnt *clnt = NULL; + struct rpc_xprt_switch *xps; - clnt = rpc_new_client(args, xprt, NULL); + xps = xprt_switch_alloc(xprt, GFP_KERNEL); + if (xps == NULL) + return ERR_PTR(-ENOMEM); + + clnt = rpc_new_client(args, xps, xprt, NULL); if (IS_ERR(clnt)) return clnt; @@ -564,6 +573,7 @@ EXPORT_SYMBOL_GPL(rpc_create); static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args, struct rpc_clnt *clnt) { + struct rpc_xprt_switch *xps; struct rpc_xprt *xprt; struct rpc_clnt *new; int err; @@ -571,13 +581,17 @@ static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args, err = -ENOMEM; rcu_read_lock(); xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); + xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); rcu_read_unlock(); - if (xprt == NULL) + if (xprt == NULL || xps == NULL) { + xprt_put(xprt); + xprt_switch_put(xps); goto out_err; + } args->servername = xprt->servername; args->nodename = clnt->cl_nodename; - new = rpc_new_client(args, xprt, clnt); + new = rpc_new_client(args, xps, xprt, clnt); if (IS_ERR(new)) { err = PTR_ERR(new); goto out_err; @@ -657,6 +671,7 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt, { const struct rpc_timeout *old_timeo; rpc_authflavor_t pseudoflavor; + struct rpc_xprt_switch *xps, *oldxps; struct rpc_xprt *xprt, *old; struct rpc_clnt *parent; int err; @@ -668,10 +683,17 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt, return PTR_ERR(xprt); } + xps = xprt_switch_alloc(xprt, GFP_KERNEL); + if (xps == NULL) { + xprt_put(xprt); + return -ENOMEM; + } + pseudoflavor = clnt->cl_auth->au_flavor; old_timeo = clnt->cl_timeout; old = rpc_clnt_set_transport(clnt, xprt, timeout); + oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps); rpc_unregister_client(clnt); __rpc_clnt_remove_pipedir(clnt); @@ -697,14 +719,17 @@ int rpc_switch_client_transport(struct rpc_clnt *clnt, synchronize_rcu(); if (parent != clnt) rpc_release_client(parent); + xprt_switch_put(oldxps); xprt_put(old); dprintk("RPC: replaced xprt for clnt %p\n", clnt); return 0; out_revert: + xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps); rpc_clnt_set_transport(clnt, old, old_timeo); clnt->cl_parent = parent; rpc_client_register(clnt, pseudoflavor, NULL); + xprt_switch_put(xps); xprt_put(xprt); dprintk("RPC: failed to switch xprt for clnt %p\n", clnt); return err; @@ -783,6 +808,7 @@ rpc_free_client(struct rpc_clnt *clnt) rpc_free_iostats(clnt->cl_metrics); clnt->cl_metrics = NULL; xprt_put(rcu_dereference_raw(clnt->cl_xprt)); + xprt_iter_destroy(&clnt->cl_xpi); rpciod_down(); rpc_free_clid(clnt); kfree(clnt); diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c index cf5770d8f49a..44f025c150d8 100644 --- a/net/sunrpc/rpcb_clnt.c +++ b/net/sunrpc/rpcb_clnt.c @@ -648,10 +648,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt) { struct rpc_clnt *parent = clnt->cl_parent; - struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt); + struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch); while (parent != clnt) { - if (rcu_dereference(parent->cl_xprt) != xprt) + if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps) break; if (clnt->cl_autobind) break; -- cgit v1.2.3 From fb43d17210baa538e58fc83d2d0f8a32399db73b Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 16:39:26 -0500 Subject: SUNRPC: Use the multipath iterator to assign a transport to each task Signed-off-by: Trond Myklebust --- include/linux/sunrpc/sched.h | 2 ++ net/sunrpc/clnt.c | 38 ++++++++++++++++++++------------------ net/sunrpc/rpcb_clnt.c | 6 ++---- net/sunrpc/xprt.c | 14 +++----------- net/sunrpc/xprtsock.c | 4 +--- 5 files changed, 28 insertions(+), 36 deletions(-) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index ee0fbcf9b02e..0b248e98ee3b 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -69,6 +69,8 @@ struct rpc_task { const struct rpc_call_ops *tk_ops; /* Caller callbacks */ struct rpc_clnt * tk_client; /* RPC client */ + struct rpc_xprt * tk_xprt; /* Transport */ + struct rpc_rqst * tk_rqstp; /* RPC request */ struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 625fb8a184c6..8e46fa5a2ab1 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -894,6 +894,7 @@ EXPORT_SYMBOL_GPL(rpc_bind_new_program); void rpc_task_release_client(struct rpc_task *task) { struct rpc_clnt *clnt = task->tk_client; + struct rpc_xprt *xprt = task->tk_xprt; if (clnt != NULL) { /* Remove from client task list */ @@ -904,13 +905,22 @@ void rpc_task_release_client(struct rpc_task *task) rpc_release_client(clnt); } + + if (xprt != NULL) { + task->tk_xprt = NULL; + + xprt_put(xprt); + } } static void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt) { + if (clnt != NULL) { rpc_task_release_client(task); + if (task->tk_xprt == NULL) + task->tk_xprt = xprt_iter_get_next(&clnt->cl_xpi); task->tk_client = clnt; atomic_inc(&clnt->cl_count); if (clnt->cl_softrtry) @@ -2122,11 +2132,9 @@ call_timeout(struct rpc_task *task) } if (RPC_IS_SOFT(task)) { if (clnt->cl_chatty) { - rcu_read_lock(); printk(KERN_NOTICE "%s: server %s not responding, timed out\n", clnt->cl_program->name, - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); } if (task->tk_flags & RPC_TASK_TIMEOUT) rpc_exit(task, -ETIMEDOUT); @@ -2138,11 +2146,9 @@ call_timeout(struct rpc_task *task) if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) { task->tk_flags |= RPC_CALL_MAJORSEEN; if (clnt->cl_chatty) { - rcu_read_lock(); printk(KERN_NOTICE "%s: server %s not responding, still trying\n", clnt->cl_program->name, - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); } } rpc_force_rebind(clnt); @@ -2172,11 +2178,9 @@ call_decode(struct rpc_task *task) if (task->tk_flags & RPC_CALL_MAJORSEEN) { if (clnt->cl_chatty) { - rcu_read_lock(); printk(KERN_NOTICE "%s: server %s OK\n", clnt->cl_program->name, - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); } task->tk_flags &= ~RPC_CALL_MAJORSEEN; } @@ -2330,11 +2334,9 @@ rpc_verify_header(struct rpc_task *task) task->tk_action = call_bind; goto out_retry; case RPC_AUTH_TOOWEAK: - rcu_read_lock(); printk(KERN_NOTICE "RPC: server %s requires stronger " "authentication.\n", - rcu_dereference(clnt->cl_xprt)->servername); - rcu_read_unlock(); + task->tk_xprt->servername); break; default: dprintk("RPC: %5u %s: unknown auth error: %x\n", @@ -2359,27 +2361,27 @@ rpc_verify_header(struct rpc_task *task) case RPC_SUCCESS: return p; case RPC_PROG_UNAVAIL: - dprintk_rcu("RPC: %5u %s: program %u is unsupported " + dprintk("RPC: %5u %s: program %u is unsupported " "by server %s\n", task->tk_pid, __func__, (unsigned int)clnt->cl_prog, - rcu_dereference(clnt->cl_xprt)->servername); + task->tk_xprt->servername); error = -EPFNOSUPPORT; goto out_err; case RPC_PROG_MISMATCH: - dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported " + dprintk("RPC: %5u %s: program %u, version %u unsupported " "by server %s\n", task->tk_pid, __func__, (unsigned int)clnt->cl_prog, (unsigned int)clnt->cl_vers, - rcu_dereference(clnt->cl_xprt)->servername); + task->tk_xprt->servername); error = -EPROTONOSUPPORT; goto out_err; case RPC_PROC_UNAVAIL: - dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, " + dprintk("RPC: %5u %s: proc %s unsupported by program %u, " "version %u on server %s\n", task->tk_pid, __func__, rpc_proc_name(task), clnt->cl_prog, clnt->cl_vers, - rcu_dereference(clnt->cl_xprt)->servername); + task->tk_xprt->servername); error = -EOPNOTSUPP; goto out_err; case RPC_GARBAGE_ARGS: diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c index 44f025c150d8..5b30603596d0 100644 --- a/net/sunrpc/rpcb_clnt.c +++ b/net/sunrpc/rpcb_clnt.c @@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task) int status; rcu_read_lock(); - do { - clnt = rpcb_find_transport_owner(task->tk_client); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - } while (xprt == NULL); + clnt = rpcb_find_transport_owner(task->tk_client); rcu_read_unlock(); + xprt = xprt_get(task->tk_xprt); dprintk("RPC: %5u %s(%s, %u, %u, %d)\n", task->tk_pid, __func__, diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 323b332f8f7c..216a1385718a 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1181,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free); */ void xprt_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt; + struct rpc_xprt *xprt = task->tk_xprt; task->tk_status = 0; if (task->tk_rqstp != NULL) @@ -1189,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; - rcu_read_lock(); - xprt = rcu_dereference(task->tk_client->cl_xprt); if (!xprt_throttle_congested(xprt, task)) xprt->ops->alloc_slot(xprt, task); - rcu_read_unlock(); } /** @@ -1207,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task) */ void xprt_retry_reserve(struct rpc_task *task) { - struct rpc_xprt *xprt; + struct rpc_xprt *xprt = task->tk_xprt; task->tk_status = 0; if (task->tk_rqstp != NULL) @@ -1215,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task) task->tk_timeout = 0; task->tk_status = -EAGAIN; - rcu_read_lock(); - xprt = rcu_dereference(task->tk_client->cl_xprt); xprt->ops->alloc_slot(xprt, task); - rcu_read_unlock(); } static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) @@ -1265,11 +1259,9 @@ void xprt_release(struct rpc_task *task) if (req == NULL) { if (task->tk_client) { - rcu_read_lock(); - xprt = rcu_dereference(task->tk_client->cl_xprt); + xprt = task->tk_xprt; if (xprt->snd_task == task) xprt_release_write(xprt, task); - rcu_read_unlock(); } return; } diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index fde2138b81e7..65e759569e48 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock) */ static void xs_local_rpcbind(struct rpc_task *task) { - rcu_read_lock(); - xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt)); - rcu_read_unlock(); + xprt_set_bound(task->tk_xprt); } static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) -- cgit v1.2.3 From 9d61498d5f6cde68a708781bf2cd33cae21121dc Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 18:13:05 -0500 Subject: SUNRPC: Allow caller to specify the transport to use This is needed in order to allow the NFSv4.1 backchannel and BIND_CONN_TO_SESSION function to work. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/sched.h | 1 + net/sunrpc/sched.c | 2 ++ 2 files changed, 3 insertions(+) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h index 0b248e98ee3b..05a1809c44d9 100644 --- a/include/linux/sunrpc/sched.h +++ b/include/linux/sunrpc/sched.h @@ -103,6 +103,7 @@ struct rpc_call_ops { struct rpc_task_setup { struct rpc_task *task; struct rpc_clnt *rpc_client; + struct rpc_xprt *rpc_xprt; const struct rpc_message *rpc_message; const struct rpc_call_ops *callback_ops; void *callback_data; diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 73ad57a59989..fcfd48d263f6 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -909,6 +909,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta /* Initialize workqueue for async tasks */ task->tk_workqueue = task_setup_data->workqueue; + task->tk_xprt = xprt_get(task_setup_data->rpc_xprt); + if (task->tk_ops->rpc_call_prepare != NULL) task->tk_action = rpc_prepare_task; -- cgit v1.2.3 From 3227886c6500e76c17f3b864ff9b7dd84f636a99 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 20:39:19 -0500 Subject: SUNRPC: Add a helper to apply a function to all the rpc_clnt's transports Add a helper for tasks that require us to apply a function to all the transports in an rpc_clnt. An example of a usecase would be BIND_CONN_TO_SESSION, where we want to send one RPC call down each transport. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 4 ++++ net/sunrpc/clnt.c | 51 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 1713e41d65ae..d6510f64a361 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -182,6 +182,10 @@ size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t); const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t); int rpc_localaddr(struct rpc_clnt *, struct sockaddr *, size_t); +int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt, + int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *), + void *data); + const char *rpc_proc_name(const struct rpc_task *task); #endif /* __KERNEL__ */ #endif /* _LINUX_SUNRPC_CLNT_H */ diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 8e46fa5a2ab1..19feb4d5d796 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -736,6 +736,57 @@ out_revert: } EXPORT_SYMBOL_GPL(rpc_switch_client_transport); +static +int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi) +{ + struct rpc_xprt_switch *xps; + + rcu_read_lock(); + xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); + rcu_read_unlock(); + if (xps == NULL) + return -EAGAIN; + xprt_iter_init_listall(xpi, xps); + xprt_switch_put(xps); + return 0; +} + +/** + * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports + * @clnt: pointer to client + * @fn: function to apply + * @data: void pointer to function data + * + * Iterates through the list of RPC transports currently attached to the + * client and applies the function fn(clnt, xprt, data). + * + * On error, the iteration stops, and the function returns the error value. + */ +int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt, + int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *), + void *data) +{ + struct rpc_xprt_iter xpi; + int ret; + + ret = rpc_clnt_xprt_iter_init(clnt, &xpi); + if (ret) + return ret; + for (;;) { + struct rpc_xprt *xprt = xprt_iter_get_next(&xpi); + + if (!xprt) + break; + ret = fn(clnt, xprt, data); + xprt_put(xprt); + if (ret < 0) + break; + } + xprt_iter_destroy(&xpi); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt); + /* * Kill all tasks for the given client. * XXX: kill their descendants as well? -- cgit v1.2.3 From 15001e5a7e1e207b6bd258cd8f187814cd15b6dc Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 20:05:34 -0500 Subject: SUNRPC: Make NFS swap work with multipath Signed-off-by: Trond Myklebust --- net/sunrpc/clnt.c | 66 ++++++++++++++++++++----------------------------------- 1 file changed, 24 insertions(+), 42 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 19feb4d5d796..4b6ceb7c5e1d 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -2554,57 +2554,39 @@ void rpc_show_tasks(struct net *net) #endif #if IS_ENABLED(CONFIG_SUNRPC_SWAP) +static int +rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + void *dummy) +{ + return xprt_enable_swap(xprt); +} + int rpc_clnt_swap_activate(struct rpc_clnt *clnt) { - int ret = 0; - struct rpc_xprt *xprt; - - if (atomic_inc_return(&clnt->cl_swapper) == 1) { -retry: - rcu_read_lock(); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - rcu_read_unlock(); - if (!xprt) { - /* - * If we didn't get a reference, then we likely are - * racing with a migration event. Wait for a grace - * period and try again. - */ - synchronize_rcu(); - goto retry; - } - - ret = xprt_enable_swap(xprt); - xprt_put(xprt); - } - return ret; + if (atomic_inc_return(&clnt->cl_swapper) == 1) + return rpc_clnt_iterate_for_each_xprt(clnt, + rpc_clnt_swap_activate_callback, NULL); + return 0; } EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate); +static int +rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + void *dummy) +{ + xprt_disable_swap(xprt); + return 0; +} + void rpc_clnt_swap_deactivate(struct rpc_clnt *clnt) { - struct rpc_xprt *xprt; - - if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) { -retry: - rcu_read_lock(); - xprt = xprt_get(rcu_dereference(clnt->cl_xprt)); - rcu_read_unlock(); - if (!xprt) { - /* - * If we didn't get a reference, then we likely are - * racing with a migration event. Wait for a grace - * period and try again. - */ - synchronize_rcu(); - goto retry; - } - - xprt_disable_swap(xprt); - xprt_put(xprt); - } + if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) + rpc_clnt_iterate_for_each_xprt(clnt, + rpc_clnt_swap_deactivate_callback, NULL); } EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate); #endif /* CONFIG_SUNRPC_SWAP */ -- cgit v1.2.3 From 7f554890587c63ca4c087d6bcc4a9fe368e57484 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Sat, 30 Jan 2016 23:43:35 -0500 Subject: SUNRPC: Allow addition of new transports to a struct rpc_clnt Add a function to allow creation and addition of a new transport to an existing rpc_clnt Signed-off-by: Trond Myklebust --- include/linux/sunrpc/clnt.h | 11 ++++ net/sunrpc/clnt.c | 133 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 142 insertions(+), 2 deletions(-) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index d6510f64a361..9a7ddbaf116e 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -186,6 +186,17 @@ int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt, int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *), void *data); +int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, + struct rpc_xprt_switch *xps, + struct rpc_xprt *xprt, + void *dummy); +int rpc_clnt_add_xprt(struct rpc_clnt *, struct xprt_create *, + int (*setup)(struct rpc_clnt *, + struct rpc_xprt_switch *, + struct rpc_xprt *, + void *), + void *data); + const char *rpc_proc_name(const struct rpc_task *task); #endif /* __KERNEL__ */ #endif /* _LINUX_SUNRPC_CLNT_H */ diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 4b6ceb7c5e1d..7e0c9bf22df8 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -2492,7 +2492,10 @@ static int rpc_ping(struct rpc_clnt *clnt) return err; } -struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags) +static +struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, struct rpc_cred *cred, int flags, + const struct rpc_call_ops *ops, void *data) { struct rpc_message msg = { .rpc_proc = &rpcproc_null, @@ -2500,14 +2503,140 @@ struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int }; struct rpc_task_setup task_setup_data = { .rpc_client = clnt, + .rpc_xprt = xprt, .rpc_message = &msg, - .callback_ops = &rpc_default_ops, + .callback_ops = (ops != NULL) ? ops : &rpc_default_ops, + .callback_data = data, .flags = flags, }; + return rpc_run_task(&task_setup_data); } + +struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags) +{ + return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL); +} EXPORT_SYMBOL_GPL(rpc_call_null); +struct rpc_cb_add_xprt_calldata { + struct rpc_xprt_switch *xps; + struct rpc_xprt *xprt; +}; + +static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata) +{ + struct rpc_cb_add_xprt_calldata *data = calldata; + + if (task->tk_status == 0) + rpc_xprt_switch_add_xprt(data->xps, data->xprt); +} + +static void rpc_cb_add_xprt_release(void *calldata) +{ + struct rpc_cb_add_xprt_calldata *data = calldata; + + xprt_put(data->xprt); + xprt_switch_put(data->xps); + kfree(data); +} + +const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = { + .rpc_call_done = rpc_cb_add_xprt_done, + .rpc_release = rpc_cb_add_xprt_release, +}; + +/** + * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt + * @clnt: pointer to struct rpc_clnt + * @xps: pointer to struct rpc_xprt_switch, + * @xprt: pointer struct rpc_xprt + * @dummy: unused + */ +int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt, + struct rpc_xprt_switch *xps, struct rpc_xprt *xprt, + void *dummy) +{ + struct rpc_cb_add_xprt_calldata *data; + struct rpc_cred *cred; + struct rpc_task *task; + + data = kmalloc(sizeof(*data), GFP_NOFS); + if (!data) + return -ENOMEM; + data->xps = xprt_switch_get(xps); + data->xprt = xprt_get(xprt); + + cred = authnull_ops.lookup_cred(NULL, NULL, 0); + task = rpc_call_null_helper(clnt, xprt, cred, + RPC_TASK_SOFT|RPC_TASK_SOFTCONN|RPC_TASK_ASYNC, + &rpc_cb_add_xprt_call_ops, data); + put_rpccred(cred); + if (IS_ERR(task)) + return PTR_ERR(task); + rpc_put_task(task); + return 1; +} +EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt); + +/** + * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt + * @clnt: pointer to struct rpc_clnt + * @xprtargs: pointer to struct xprt_create + * @setup: callback to test and/or set up the connection + * @data: pointer to setup function data + * + * Creates a new transport using the parameters set in args and + * adds it to clnt. + * If ping is set, then test that connectivity succeeds before + * adding the new transport. + * + */ +int rpc_clnt_add_xprt(struct rpc_clnt *clnt, + struct xprt_create *xprtargs, + int (*setup)(struct rpc_clnt *, + struct rpc_xprt_switch *, + struct rpc_xprt *, + void *), + void *data) +{ + struct rpc_xprt_switch *xps; + struct rpc_xprt *xprt; + unsigned char resvport; + int ret = 0; + + rcu_read_lock(); + xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch)); + xprt = xprt_iter_xprt(&clnt->cl_xpi); + if (xps == NULL || xprt == NULL) { + rcu_read_unlock(); + return -EAGAIN; + } + resvport = xprt->resvport; + rcu_read_unlock(); + + xprt = xprt_create_transport(xprtargs); + if (IS_ERR(xprt)) { + ret = PTR_ERR(xprt); + goto out_put_switch; + } + xprt->resvport = resvport; + + rpc_xprt_switch_set_roundrobin(xps); + if (setup) { + ret = setup(clnt, xps, xprt, data); + if (ret != 0) + goto out_put_xprt; + } + rpc_xprt_switch_add_xprt(xps, xprt); +out_put_xprt: + xprt_put(xprt); +out_put_switch: + xprt_switch_put(xps); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt); + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static void rpc_show_header(void) { -- cgit v1.2.3 From 4d97291b6fb46e31fcb03876595d49b477ac196d Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:27:35 -0500 Subject: xprtrdma: Clean up physical_op_map() physical_op_unmap{_sync} don't use mr_nsegs, so don't bother to set it in physical_op_map. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/physical_ops.c | 1 - 1 file changed, 1 deletion(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index dbb302ecf590..481b9b6f4a15 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -68,7 +68,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); seg->mr_rkey = ia->ri_dma_mr->rkey; seg->mr_base = seg->mr_dma; - seg->mr_nsegs = 1; return 1; } -- cgit v1.2.3 From af0f16e825cebd53a3460adc8391acb0d85dc913 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:27:43 -0500 Subject: xprtrdma: Clean up dprintk format string containing a newline Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 0f28f2d743ed..e9dfd6a826f4 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -809,10 +809,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) */ list_del_init(&rqst->rq_list); spin_unlock_bh(&xprt->transport_lock); - dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" - " RPC request 0x%p xid 0x%08x\n", - __func__, rep, req, rqst, - be32_to_cpu(headerp->rm_xid)); + dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", + __func__, rep, req, be32_to_cpu(headerp->rm_xid)); /* from here on, the reply is no longer an orphan */ req->rl_reply = rep; -- cgit v1.2.3 From 821c791a0bde997499384733fc98dba76baac41e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:27:52 -0500 Subject: xprtrdma: Segment head and tail XDR buffers on page boundaries A single memory allocation is used for the pair of buffers wherein the RPC client builds an RPC call message and decodes its matching reply. These buffers are sized based on the maximum possible size of the RPC call and reply messages for the operation in progress. This means that as the call buffer increases in size, the start of the reply buffer is pushed farther into the memory allocation. RPC requests are growing in size. It used to be that both the call and reply buffers fit inside a single page. But these days, thanks to NFSv4 (and especially security labels in NFSv4.2) the maximum call and reply sizes are large. NFSv4.0 OPEN, for example, now requires a 6KB allocation for a pair of call and reply buffers, and NFSv4 LOOKUP is not far behind. As the maximum size of a call increases, the reply buffer is pushed far enough into the buffer's memory allocation that a page boundary can appear in the middle of it. When the maximum possible reply size is larger than the client's RDMA receive buffers (currently 1KB), the client has to register a Reply chunk for the server to RDMA Write the reply into. The logic in rpcrdma_convert_iovs() assumes that xdr_buf head and tail buffers would always be contained on a single page. It supplies just one segment for the head and one for the tail. FMR, for example, registers up to a page boundary (only a portion of the reply buffer in the OPEN case above). But without additional segments, it doesn't register the rest of the buffer. When the server tries to write the OPEN reply, the RDMA Write fails with a remote access error since the client registered only part of the Reply chunk. rpcrdma_convert_iovs() must split the XDR buffer into multiple segments, each of which are guaranteed not to contain a page boundary. That way fmr_op_map is given the proper number of segments to register the whole reply buffer. Signed-off-by: Chuck Lever Reviewed-by: Devesh Sharma Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index e9dfd6a826f4..060739144552 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf) return tlen; } +/* Split "vec" on page boundaries into segments. FMR registers pages, + * not a byte range. Other modes coalesce these segments into a single + * MR when they can. + */ +static int +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, + int n, int nsegs) +{ + size_t page_offset; + u32 remaining; + char *base; + + base = vec->iov_base; + page_offset = offset_in_page(base); + remaining = vec->iov_len; + while (remaining && n < nsegs) { + seg[n].mr_page = NULL; + seg[n].mr_offset = base; + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); + remaining -= seg[n].mr_len; + base += seg[n].mr_len; + ++n; + page_offset = 0; + } + return n; +} + /* * Chunk assembly from upper layer xdr_buf. * @@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, int page_base; struct page **ppages; - if (pos == 0 && xdrbuf->head[0].iov_len) { - seg[n].mr_page = NULL; - seg[n].mr_offset = xdrbuf->head[0].iov_base; - seg[n].mr_len = xdrbuf->head[0].iov_len; - ++n; + if (pos == 0) { + n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); + if (n == nsegs) + return -EIO; } len = xdrbuf->page_len; @@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * xdr pad bytes, saving the server an RDMA operation. */ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) return n; + n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); if (n == nsegs) - /* Tail remains, but we're out of segments */ return -EIO; - seg[n].mr_page = NULL; - seg[n].mr_offset = xdrbuf->tail[0].iov_base; - seg[n].mr_len = xdrbuf->tail[0].iov_len; - ++n; } return n; -- cgit v1.2.3 From b892a699cecc36ca373def4bc5ddc5fa3d46ba3b Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:28:01 -0500 Subject: xprtrdma: Do not wait if ib_post_send() fails If ib_post_send() in ro_unmap_sync() fails, the WRs have not been posted, no completions will fire, and wait_for_completion() will wait forever. Skip the wait in that case. To ensure the MRs are invalid, disconnect. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index e16567389e28..ecb005f871f1 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -520,14 +520,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * unless ri_id->qp is a valid pointer. */ rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); - if (rc) + if (rc) { pr_warn("%s: ib_post_send failed %i\n", __func__, rc); + rdma_disconnect(ia->ri_id); + goto unmap; + } wait_for_completion(&f->fr_linv_done); /* ORDER: Now DMA unmap all of the req's MRs, and return * them to the free MW list. */ +unmap: for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { seg = &req->rl_segments[i]; -- cgit v1.2.3 From 59aa1f9a3cce388b4d7d842d6963df11d92a407e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:28:18 -0500 Subject: xprtrdma: Properly handle RDMA_ERROR replies These are shorter than RPCRDMA_HDRLEN_MIN, and they need to complete the waiting RPC. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- include/linux/sunrpc/rpc_rdma.h | 11 +++++---- net/sunrpc/xprtrdma/rpc_rdma.c | 51 ++++++++++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 13 deletions(-) (limited to 'net/sunrpc') diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h index 8c6d23cb0cae..3b1ff38f0c37 100644 --- a/include/linux/sunrpc/rpc_rdma.h +++ b/include/linux/sunrpc/rpc_rdma.h @@ -93,6 +93,12 @@ struct rpcrdma_msg { __be32 rm_pempty[3]; /* 3 empty chunk lists */ } rm_padded; + struct { + __be32 rm_err; + __be32 rm_vers_low; + __be32 rm_vers_high; + } rm_error; + __be32 rm_chunks[0]; /* read, write and reply chunks */ } rm_body; @@ -109,11 +115,6 @@ enum rpcrdma_errcode { ERR_CHUNK = 2 }; -struct rpcrdma_err_vers { - uint32_t rdma_vers_low; /* Version range supported by peer */ - uint32_t rdma_vers_high; -}; - enum rpcrdma_proc { RDMA_MSG = 0, /* An RPC call or reply msg */ RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */ diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 060739144552..35f810899729 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -795,7 +795,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; struct rpc_xprt *xprt = &r_xprt->rx_xprt; __be32 *iptr; - int rdmalen, status; + int rdmalen, status, rmerr; unsigned long cwnd; u32 credits; @@ -803,12 +803,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (rep->rr_len == RPCRDMA_BAD_LEN) goto out_badstatus; - if (rep->rr_len < RPCRDMA_HDRLEN_MIN) + if (rep->rr_len < RPCRDMA_HDRLEN_ERR) goto out_shortreply; headerp = rdmab_to_msg(rep->rr_rdmabuf); - if (headerp->rm_vers != rpcrdma_version) - goto out_badversion; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (rpcrdma_is_bcall(headerp)) goto out_bcall; @@ -838,6 +836,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) req->rl_reply = rep; xprt->reestablish_timeout = 0; + if (headerp->rm_vers != rpcrdma_version) + goto out_badversion; + /* check for expected message types */ /* The order of some of these tests is important. */ switch (headerp->rm_type) { @@ -898,6 +899,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) status = rdmalen; break; + case rdma_error: + goto out_rdmaerr; + badheader: default: dprintk("%s: invalid rpcrdma reply header (type %d):" @@ -913,6 +917,7 @@ badheader: break; } +out: /* Invalidate and flush the data payloads before waking the * waiting application. This guarantees the memory region is * properly fenced from the server before the application @@ -955,13 +960,43 @@ out_bcall: return; #endif -out_shortreply: - dprintk("RPC: %s: short/invalid reply\n", __func__); - goto repost; - +/* If the incoming reply terminated a pending RPC, the next + * RPC call will post a replacement receive buffer as it is + * being marshaled. + */ out_badversion: dprintk("RPC: %s: invalid version %d\n", __func__, be32_to_cpu(headerp->rm_vers)); + status = -EIO; + r_xprt->rx_stats.bad_reply_count++; + goto out; + +out_rdmaerr: + rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err); + switch (rmerr) { + case ERR_VERS: + pr_err("%s: server reports header version error (%u-%u)\n", + __func__, + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low), + be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high)); + break; + case ERR_CHUNK: + pr_err("%s: server reports header decoding error\n", + __func__); + break; + default: + pr_err("%s: server reports unknown error %d\n", + __func__, rmerr); + } + status = -EREMOTEIO; + r_xprt->rx_stats.bad_reply_count++; + goto out; + +/* If no pending RPC transaction was matched, post a replacement + * receive buffer before returning. + */ +out_shortreply: + dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; out_nomatch: -- cgit v1.2.3 From 23826c7aeac7e333bfee6f10a3407a23c58b6147 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:28:27 -0500 Subject: xprtrdma: Serialize credit accounting again Commit fe97b47cd623 ("xprtrdma: Use workqueue to process RPC/RDMA replies") replaced the reply tasklet with a workqueue that allows RPC replies to be processed in parallel. Thus the credit values in RPC-over-RDMA replies can be applied in a different order than in which the server sent them. To fix this, revert commit eba8ff660b2d ("xprtrdma: Move credit update to RPC reply handler"). Reverting is done by hand to accommodate code changes that have occurred since then. Fixes: fe97b47cd623 ("xprtrdma: Use workqueue to process . . .") Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 9 +-------- net/sunrpc/xprtrdma/verbs.c | 27 ++++++++++++++++++++++++++- net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 3 files changed, 28 insertions(+), 9 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 35f810899729..888823bb6dae 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -797,7 +797,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) __be32 *iptr; int rdmalen, status, rmerr; unsigned long cwnd; - u32 credits; dprintk("RPC: %s: incoming rep %p\n", __func__, rep); @@ -928,15 +927,9 @@ out: if (req->rl_nchunks) r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); - credits = be32_to_cpu(headerp->rm_credit); - if (credits == 0) - credits = 1; /* don't deadlock */ - else if (credits > r_xprt->rx_buf.rb_max_requests) - credits = r_xprt->rx_buf.rb_max_requests; - spin_lock_bh(&xprt->transport_lock); cwnd = xprt->cwnd; - xprt->cwnd = credits << RPC_CWNDSHIFT; + xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) xprt_release_rqst_cong(rqst->rq_task); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 878f1bfb1db9..fc1ef5f144b8 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -190,6 +190,28 @@ rpcrdma_receive_worker(struct work_struct *work) rpcrdma_reply_handler(rep); } +/* Perform basic sanity checking to avoid using garbage + * to update the credit grant value. + */ +static void +rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) +{ + struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf); + struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf; + u32 credits; + + if (rep->rr_len < RPCRDMA_HDRLEN_ERR) + return; + + credits = be32_to_cpu(rmsgp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > buffer->rb_max_requests) + credits = buffer->rb_max_requests; + + atomic_set(&buffer->rb_credits, credits); +} + static void rpcrdma_recvcq_process_wc(struct ib_wc *wc) { @@ -211,7 +233,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc) ib_dma_sync_single_for_cpu(rep->rr_device, rdmab_addr(rep->rr_rdmabuf), rep->rr_len, DMA_FROM_DEVICE); - prefetch(rdmab_to_msg(rep->rr_rdmabuf)); + + rpcrdma_update_granted_credits(rep); out_schedule: queue_work(rpcrdma_receive_wq, &rep->rr_work); @@ -330,6 +353,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) connected: dprintk("RPC: %s: %sconnected\n", __func__, connstate > 0 ? "" : "dis"); + atomic_set(&xprt->rx_buf.rb_credits, 1); ep->rep_connected = connstate; rpcrdma_conn_func(ep); wake_up_all(&ep->rep_connect_wait); @@ -943,6 +967,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); + atomic_set(&buf->rb_credits, 1); rc = ia->ri_ops->ro_init(r_xprt); if (rc) diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 38fe11b09875..7bf6f43fa4b9 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -311,6 +311,7 @@ struct rpcrdma_buffer { struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; + atomic_t rb_credits; /* most recent credit grant */ u32 rb_bc_srv_max_requests; spinlock_t rb_reqslock; /* protect rb_allreqs */ -- cgit v1.2.3 From 552bf225281f96e7a02e1a1b874966fdb6b997e0 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:28:36 -0500 Subject: xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs Calling ib_poll_cq() to sort through WCs during a completion is a common pattern amongst RDMA consumers. Since commit 14d3a3b2498e ("IB: add a proper completion queue abstraction"), WC sorting can be handled by the IB core. By converting to this new API, xprtrdma is made a better neighbor to other RDMA consumers, as it allows the core to schedule the delivery of completions more fairly amongst all active consumers. Because each ib_cqe carries a pointer to a completion method, the core can now post its own operations on a consumer's QP, and handle the completions itself, without changes to the consumer. xprtrdma's reply processing is already handled in a work queue, but there is some initial order-dependent processing that is done in the soft IRQ context before a work item is scheduled. IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma receive code path. Signed-off-by: Chuck Lever Reviewed-by: Devesh Sharma Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 78 +++++++++++------------------------------ net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 2 files changed, 21 insertions(+), 58 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index fc1ef5f144b8..05779f48745b 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -212,11 +212,18 @@ rpcrdma_update_granted_credits(struct rpcrdma_rep *rep) atomic_set(&buffer->rb_credits, credits); } +/** + * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc) +rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_rep *rep = - (struct rpcrdma_rep *)(unsigned long)wc->wr_id; + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep, + rr_cqe); /* WARNING: Only wr_id and status are reliable at this point */ if (wc->status != IB_WC_SUCCESS) @@ -242,55 +249,20 @@ out_schedule: out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("RPC: %s: rep %p: %s\n", - __func__, rep, ib_wc_status_msg(wc->status)); + pr_err("rpcrdma: Recv: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); rep->rr_len = RPCRDMA_BAD_LEN; goto out_schedule; } -/* The wc array is on stack: automatic memory is always CPU-local. - * - * struct ib_wc is 64 bytes, making the poll array potentially - * large. But this is at the bottom of the call chain. Further - * substantial work is done in another thread. - */ -static void -rpcrdma_recvcq_poll(struct ib_cq *cq) -{ - struct ib_wc *pos, wcs[4]; - int count, rc; - - do { - pos = wcs; - - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); - if (rc < 0) - break; - - count = rc; - while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++); - } while (rc == ARRAY_SIZE(wcs)); -} - -/* Handle provider receive completion upcalls. - */ -static void -rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) -{ - do { - rpcrdma_recvcq_poll(cq); - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | - IB_CQ_REPORT_MISSED_EVENTS) > 0); -} - static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc); + rpcrdma_receive_wc(NULL, &wc); while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) rpcrdma_sendcq_process_wc(&wc); } @@ -655,9 +627,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out2; } - cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; - recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, NULL, &cq_attr); + recvcq = ib_alloc_cq(ia->ri_device, NULL, + ep->rep_attr.cap.max_recv_wr + 1, + 0, IB_POLL_SOFTIRQ); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", @@ -665,14 +637,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out2; } - rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP); - if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - ib_destroy_cq(recvcq); - goto out2; - } - ep->rep_attr.send_cq = sendcq; ep->rep_attr.recv_cq = recvcq; @@ -735,10 +699,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ia->ri_id->qp = NULL; } - rc = ib_destroy_cq(ep->rep_attr.recv_cq); - if (rc) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, rc); + ib_free_cq(ep->rep_attr.recv_cq); rc = ib_destroy_cq(ep->rep_attr.send_cq); if (rc) @@ -947,6 +908,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) } rep->rr_device = ia->ri_device; + rep->rr_cqe.done = rpcrdma_receive_wc; rep->rr_rxprt = r_xprt; INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; @@ -1322,7 +1284,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, int rc; recv_wr.next = NULL; - recv_wr.wr_id = (u64) (unsigned long) rep; + recv_wr.wr_cqe = &rep->rr_cqe; recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; recv_wr.num_sge = 1; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 7bf6f43fa4b9..d60feb9aa631 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -171,6 +171,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) struct rpcrdma_buffer; struct rpcrdma_rep { + struct ib_cqe rr_cqe; unsigned int rr_len; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; -- cgit v1.2.3 From c882a655b78d23eb7037cfd6ebf94af1d7068a58 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:28:45 -0500 Subject: xprtrdma: Use an anonymous union in struct rpcrdma_mw Clean up: Make code more readable. Signed-off-by: Chuck Lever Reviewed-by: Devesh Sharma Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/fmr_ops.c | 28 +++++++++++++-------------- net/sunrpc/xprtrdma/frwr_ops.c | 42 ++++++++++++++++++++--------------------- net/sunrpc/xprtrdma/xprt_rdma.h | 2 +- 3 files changed, 36 insertions(+), 36 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index c14f3a4bff68..b289e106540b 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) if (!r) goto out; - r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * - sizeof(u64), GFP_KERNEL); - if (!r->r.fmr.physaddrs) + r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * + sizeof(u64), GFP_KERNEL); + if (!r->fmr.physaddrs) goto out_free; - r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); - if (IS_ERR(r->r.fmr.fmr)) + r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); + if (IS_ERR(r->fmr.fmr)) goto out_fmr_err; list_add(&r->mw_list, &buf->rb_mws); @@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) return 0; out_fmr_err: - rc = PTR_ERR(r->r.fmr.fmr); + rc = PTR_ERR(r->fmr.fmr); dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); - kfree(r->r.fmr.physaddrs); + kfree(r->fmr.physaddrs); out_free: kfree(r); out: @@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r) { LIST_HEAD(l); - list_add(&r->r.fmr.fmr->list, &l); + list_add(&r->fmr.fmr->list, &l); return ib_unmap_fmr(&l); } @@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { rpcrdma_map_one(device, seg, direction); - mw->r.fmr.physaddrs[i] = seg->mr_dma; + mw->fmr.physaddrs[i] = seg->mr_dma; len += seg->mr_len; ++seg; ++i; @@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, break; } - rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs, + rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs, i, seg1->mr_dma); if (rc) goto out_maperr; seg1->rl_mw = mw; - seg1->mr_rkey = mw->r.fmr.fmr->rkey; + seg1->mr_rkey = mw->fmr.fmr->rkey; seg1->mr_base = seg1->mr_dma + pageoff; seg1->mr_nsegs = i; seg1->mr_len = len; @@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; mw = seg->rl_mw; - list_add(&mw->r.fmr.fmr->list, &unmap_list); + list_add(&mw->fmr.fmr->list, &unmap_list); i += seg->mr_nsegs; } @@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) while (!list_empty(&buf->rb_all)) { r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); list_del(&r->mw_all); - kfree(r->r.fmr.physaddrs); + kfree(r->fmr.physaddrs); - rc = ib_dealloc_fmr(r->r.fmr.fmr); + rc = ib_dealloc_fmr(r->fmr.fmr); if (rc) dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", __func__, rc); diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index ecb005f871f1..0cb9efa281cc 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -109,20 +109,20 @@ static void __frwr_recovery_worker(struct work_struct *work) { struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - r.frmr.fr_work); - struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt; + frmr.fr_work); + struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - if (ib_dereg_mr(r->r.frmr.fr_mr)) + if (ib_dereg_mr(r->frmr.fr_mr)) goto out_fail; - r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(r->r.frmr.fr_mr)) + r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); + if (IS_ERR(r->frmr.fr_mr)) goto out_fail; dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); - r->r.frmr.fr_state = FRMR_IS_INVALID; + r->frmr.fr_state = FRMR_IS_INVALID; rpcrdma_put_mw(r_xprt, r); return; @@ -137,15 +137,15 @@ out_fail: static void __frwr_queue_recovery(struct rpcrdma_mw *r) { - INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->r.frmr.fr_work); + INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->frmr.fr_work); } static int __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, unsigned int depth) { - struct rpcrdma_frmr *f = &r->r.frmr; + struct rpcrdma_frmr *f = &r->frmr; int rc; f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); @@ -179,11 +179,11 @@ __frwr_release(struct rpcrdma_mw *r) { int rc; - rc = ib_dereg_mr(r->r.frmr.fr_mr); + rc = ib_dereg_mr(r->frmr.fr_mr); if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - kfree(r->r.frmr.sg); + kfree(r->frmr.sg); } static int @@ -263,14 +263,14 @@ __frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r) pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", __func__, r, ib_wc_status_msg(wc->status), wc->status); - r->r.frmr.fr_state = FRMR_IS_STALE; + r->frmr.fr_state = FRMR_IS_STALE; } static void frwr_sendcompletion(struct ib_wc *wc) { struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - struct rpcrdma_frmr *f = &r->r.frmr; + struct rpcrdma_frmr *f = &r->frmr; if (unlikely(wc->status != IB_WC_SUCCESS)) __frwr_sendcompletion_flush(wc, r); @@ -314,7 +314,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); r->mw_sendcompletion = frwr_sendcompletion; - r->r.frmr.fr_xprt = r_xprt; + r->frmr.fr_xprt = r_xprt; } return 0; @@ -347,8 +347,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, mw = rpcrdma_get_mw(r_xprt); if (!mw) return -ENOMEM; - } while (mw->r.frmr.fr_state != FRMR_IS_INVALID); - frmr = &mw->r.frmr; + } while (mw->frmr.fr_state != FRMR_IS_INVALID); + frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; frmr->fr_waiter = false; mr = frmr->fr_mr; @@ -434,7 +434,7 @@ static struct ib_send_wr * __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) { struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->r.frmr; + struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; f->fr_waiter = false; @@ -455,7 +455,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, { struct ib_device *device = r_xprt->rx_ia.ri_device; struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->r.frmr; + struct rpcrdma_frmr *f = &mw->frmr; seg->rl_mw = NULL; @@ -504,7 +504,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) i += seg->mr_nsegs; } - f = &seg->rl_mw->r.frmr; + f = &seg->rl_mw->frmr; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain @@ -553,7 +553,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->r.frmr; + struct rpcrdma_frmr *frmr = &mw->frmr; struct ib_send_wr *invalidate_wr, *bad_wr; int rc, nsegs = seg->mr_nsegs; @@ -561,7 +561,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) seg1->rl_mw = NULL; frmr->fr_state = FRMR_IS_INVALID; - invalidate_wr = &mw->r.frmr.fr_invwr; + invalidate_wr = &mw->frmr.fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); invalidate_wr->wr_id = (uintptr_t)mw; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index d60feb9aa631..b3c4472a3d6a 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -225,7 +225,7 @@ struct rpcrdma_mw { union { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; - } r; + }; void (*mw_sendcompletion)(struct ib_wc *); struct list_head mw_list; struct list_head mw_all; -- cgit v1.2.3 From 2fa8f88d8892507ecff0126fbc67906740491d31 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Fri, 4 Mar 2016 11:28:53 -0500 Subject: xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs Calling ib_poll_cq() to sort through WCs during a completion is a common pattern amongst RDMA consumers. Since commit 14d3a3b2498e ("IB: add a proper completion queue abstraction"), WC sorting can be handled by the IB core. By converting to this new API, xprtrdma is made a better neighbor to other RDMA consumers, as it allows the core to schedule the delivery of completions more fairly amongst all active consumers. Because each ib_cqe carries a pointer to a completion method, the core can now post its own operations on a consumer's QP, and handle the completions itself, without changes to the consumer. Send completions were previously handled entirely in the completion upcall handler (ie, deferring to a process context is unneeded). Thus IB_POLL_SOFTIRQ is a direct replacement for the current xprtrdma send code path. Signed-off-by: Chuck Lever Reviewed-by: Devesh Sharma Reviewed-by: Sagi Grimberg Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 99 ++++++++++++++++++++++++++----------- net/sunrpc/xprtrdma/verbs.c | 107 +++++++--------------------------------- net/sunrpc/xprtrdma/xprt_rdma.h | 10 ++-- 3 files changed, 91 insertions(+), 125 deletions(-) (limited to 'net/sunrpc') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 0cb9efa281cc..c250924a9fd3 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -158,6 +158,8 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, sg_init_table(f->sg, depth); + init_completion(&f->fr_linv_done); + return 0; out_mr_err: @@ -244,39 +246,76 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); } -/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs - * to be reset. +static void +__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr, + const char *wr) +{ + frmr->fr_state = FRMR_IS_STALE; + if (wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("rpcrdma: %s: %s (%u/0x%x)\n", + wr, ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); +} + +/** + * frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * WARNING: Only wr_id and status are reliable at this point */ static void -__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r) +frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) { - if (likely(wc->status == IB_WC_SUCCESS)) - return; - - /* WARNING: Only wr_id and status are reliable at this point */ - r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - if (wc->status == IB_WC_WR_FLUSH_ERR) - dprintk("RPC: %s: frmr %p flushed\n", __func__, r); - else - pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", - __func__, r, ib_wc_status_msg(wc->status), wc->status); + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; - r->frmr.fr_state = FRMR_IS_STALE; + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) { + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + __frwr_sendcompletion_flush(wc, frmr, "fastreg"); + } } +/** + * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ static void -frwr_sendcompletion(struct ib_wc *wc) +frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) { - struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - struct rpcrdma_frmr *f = &r->frmr; + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; - if (unlikely(wc->status != IB_WC_SUCCESS)) - __frwr_sendcompletion_flush(wc, r); + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS) { + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + __frwr_sendcompletion_flush(wc, frmr, "localinv"); + } +} - if (f->fr_waiter) - complete(&f->fr_linv_done); +/** + * frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + * Awaken anyone waiting for an MR to finish being fenced. + */ +static void +frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) +{ + struct rpcrdma_frmr *frmr; + struct ib_cqe *cqe; + + /* WARNING: Only wr_cqe and status are reliable at this point */ + cqe = wc->wr_cqe; + frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe); + if (wc->status != IB_WC_SUCCESS) + __frwr_sendcompletion_flush(wc, frmr, "localinv"); + complete_all(&frmr->fr_linv_done); } static int @@ -313,7 +352,6 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); - r->mw_sendcompletion = frwr_sendcompletion; r->frmr.fr_xprt = r_xprt; } @@ -350,7 +388,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, } while (mw->frmr.fr_state != FRMR_IS_INVALID); frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; - frmr->fr_waiter = false; mr = frmr->fr_mr; reg_wr = &frmr->fr_regwr; @@ -400,7 +437,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, reg_wr->wr.next = NULL; reg_wr->wr.opcode = IB_WR_REG_MR; - reg_wr->wr.wr_id = (uintptr_t)mw; + frmr->fr_cqe.done = frwr_wc_fastreg; + reg_wr->wr.wr_cqe = &frmr->fr_cqe; reg_wr->wr.num_sge = 0; reg_wr->wr.send_flags = 0; reg_wr->mr = mr; @@ -437,12 +475,12 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; - f->fr_waiter = false; f->fr_state = FRMR_IS_INVALID; invalidate_wr = &f->fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - invalidate_wr->wr_id = (unsigned long)(void *)mw; + f->fr_cqe.done = frwr_wc_localinv; + invalidate_wr->wr_cqe = &f->fr_cqe; invalidate_wr->opcode = IB_WR_LOCAL_INV; invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey; @@ -511,8 +549,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * are complete. */ f->fr_invwr.send_flags = IB_SEND_SIGNALED; - f->fr_waiter = true; - init_completion(&f->fr_linv_done); + f->fr_cqe.done = frwr_wc_localinv_wake; + reinit_completion(&f->fr_linv_done); INIT_CQCOUNT(&r_xprt->rx_ep); /* Transport disconnect drains the receive CQ before it @@ -564,7 +602,8 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) invalidate_wr = &mw->frmr.fr_invwr; memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - invalidate_wr->wr_id = (uintptr_t)mw; + frmr->fr_cqe.done = frwr_wc_localinv; + invalidate_wr->wr_cqe = &frmr->fr_cqe; invalidate_wr->opcode = IB_WR_LOCAL_INV; invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 05779f48745b..f5ed9f982cd7 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -112,73 +112,20 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context) } } -static void -rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context) -{ - struct rpcrdma_ep *ep = context; - - pr_err("RPC: %s: %s on device %s ep %p\n", - __func__, ib_event_msg(event->event), - event->device->name, context); - if (ep->rep_connected == 1) { - ep->rep_connected = -EIO; - rpcrdma_conn_func(ep); - wake_up_all(&ep->rep_connect_wait); - } -} - -static void -rpcrdma_sendcq_process_wc(struct ib_wc *wc) -{ - /* WARNING: Only wr_id and status are reliable at this point */ - if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) { - if (wc->status != IB_WC_SUCCESS && - wc->status != IB_WC_WR_FLUSH_ERR) - pr_err("RPC: %s: SEND: %s\n", - __func__, ib_wc_status_msg(wc->status)); - } else { - struct rpcrdma_mw *r; - - r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - r->mw_sendcompletion(wc); - } -} - -/* The common case is a single send completion is waiting. By - * passing two WC entries to ib_poll_cq, a return code of 1 - * means there is exactly one WC waiting and no more. We don't - * have to invoke ib_poll_cq again to know that the CQ has been - * properly drained. - */ -static void -rpcrdma_sendcq_poll(struct ib_cq *cq) -{ - struct ib_wc *pos, wcs[2]; - int count, rc; - - do { - pos = wcs; - - rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); - if (rc < 0) - break; - - count = rc; - while (count-- > 0) - rpcrdma_sendcq_process_wc(pos++); - } while (rc == ARRAY_SIZE(wcs)); - return; -} - -/* Handle provider send completion upcalls. +/** + * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC + * @cq: completion queue (ignored) + * @wc: completed WR + * */ static void -rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) +rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc) { - do { - rpcrdma_sendcq_poll(cq); - } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | - IB_CQ_REPORT_MISSED_EVENTS) > 0); + /* WARNING: Only wr_cqe and status are reliable at this point */ + if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) + pr_err("rpcrdma: Send: %s (%u/0x%x)\n", + ib_wc_status_msg(wc->status), + wc->status, wc->vendor_err); } static void @@ -263,8 +210,6 @@ rpcrdma_flush_cqs(struct rpcrdma_ep *ep) while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) rpcrdma_receive_wc(NULL, &wc); - while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) - rpcrdma_sendcq_process_wc(&wc); } static int @@ -556,9 +501,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { struct ib_cq *sendcq, *recvcq; - struct ib_cq_init_attr cq_attr = {}; unsigned int max_qp_wr; - int rc, err; + int rc; if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { dprintk("RPC: %s: insufficient sge's available\n", @@ -610,9 +554,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); - cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; - sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, NULL, &cq_attr); + sendcq = ib_alloc_cq(ia->ri_device, NULL, + ep->rep_attr.cap.max_send_wr + 1, + 0, IB_POLL_SOFTIRQ); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@ -620,13 +564,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, goto out1; } - rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP); - if (rc) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - goto out2; - } - recvcq = ib_alloc_cq(ia->ri_device, NULL, ep->rep_attr.cap.max_recv_wr + 1, 0, IB_POLL_SOFTIRQ); @@ -661,10 +598,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return 0; out2: - err = ib_destroy_cq(sendcq); - if (err) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, err); + ib_free_cq(sendcq); out1: if (ia->ri_dma_mr) ib_dereg_mr(ia->ri_dma_mr); @@ -700,11 +634,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) } ib_free_cq(ep->rep_attr.recv_cq); - - rc = ib_destroy_cq(ep->rep_attr.send_cq); - if (rc) - dprintk("RPC: %s: ib_destroy_cq returned %i\n", - __func__, rc); + ib_free_cq(ep->rep_attr.send_cq); if (ia->ri_dma_mr) { rc = ib_dereg_mr(ia->ri_dma_mr); @@ -883,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_lock(&buffer->rb_reqslock); list_add(&req->rl_all, &buffer->rb_allreqs); spin_unlock(&buffer->rb_reqslock); + req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; return req; } @@ -1246,7 +1177,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, } send_wr.next = NULL; - send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION; + send_wr.wr_cqe = &req->rl_cqe; send_wr.sg_list = iov; send_wr.num_sge = req->rl_niovs; send_wr.opcode = IB_WR_SEND; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index b3c4472a3d6a..2ebc743cb96f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -95,10 +95,6 @@ struct rpcrdma_ep { #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) -/* Force completion handler to ignore the signal - */ -#define RPCRDMA_IGNORE_COMPLETION (0ULL) - /* Pre-allocate extra Work Requests for handling backward receives * and sends. This is a fixed value because the Work Queues are * allocated when the forward channel is set up. @@ -205,11 +201,11 @@ struct rpcrdma_frmr { struct scatterlist *sg; int sg_nents; struct ib_mr *fr_mr; + struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; + struct completion fr_linv_done; struct work_struct fr_work; struct rpcrdma_xprt *fr_xprt; - bool fr_waiter; - struct completion fr_linv_done;; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -226,7 +222,6 @@ struct rpcrdma_mw { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; - void (*mw_sendcompletion)(struct ib_wc *); struct list_head mw_list; struct list_head mw_all; }; @@ -282,6 +277,7 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct ib_cqe rl_cqe; struct list_head rl_all; bool rl_backchannel; }; -- cgit v1.2.3