summaryrefslogtreecommitdiffstats
path: root/kernel/rcu
diff options
context:
space:
mode:
authorByungchul Park <byungchul.park@lge.com>2019-08-05 18:22:27 -0400
committerPaul E. McKenney <paulmck@kernel.org>2020-01-24 10:17:03 -0800
commita35d16905efc6ad5523d864a5c6efcb1e657e386 (patch)
tree5e28a249d7f5a8bb07d61b07f0030b2fb97635ef /kernel/rcu
parente42617b825f8073569da76dc4510bfa019b1c35a (diff)
downloadlinux-a35d16905efc6ad5523d864a5c6efcb1e657e386.tar.bz2
rcu: Add basic support for kfree_rcu() batching
Recently a discussion about stability and performance of a system involving a high rate of kfree_rcu() calls surfaced on the list [1] which led to another discussion how to prepare for this situation. This patch adds basic batching support for kfree_rcu(). It is "basic" because we do none of the slab management, dynamic allocation, code moving or any of the other things, some of which previous attempts did [2]. These fancier improvements can be follow-up patches and there are different ideas being discussed in those regards. This is an effort to start simple, and build up from there. In the future, an extension to use kfree_bulk and possibly per-slab batching could be done to further improve performance due to cache-locality and slab-specific bulk free optimizations. By using an array of pointers, the worker thread processing the work would need to read lesser data since it does not need to deal with large rcu_head(s) any longer. Torture tests follow in the next patch and show improvements of around 5x reduction in number of grace periods on a 16 CPU system. More details and test data are in that patch. There is an implication with rcu_barrier() with this patch. Since the kfree_rcu() calls can be batched, and may not be handed yet to the RCU machinery in fact, the monitor may not have even run yet to do the queue_rcu_work(), there seems no easy way of implementing rcu_barrier() to wait for those kfree_rcu()s that are already made. So this means a kfree_rcu() followed by an rcu_barrier() does not imply that memory will be freed once rcu_barrier() returns. Another implication is higher active memory usage (although not run-away..) until the kfree_rcu() flooding ends, in comparison to without batching. More details about this are in the second patch which adds an rcuperf test. Finally, in the near future we will get rid of kfree_rcu() special casing within RCU such as in rcu_do_batch and switch everything to just batching. Currently we don't do that since timer subsystem is not yet up and we cannot schedule the kfree_rcu() monitor as the timer subsystem's lock are not initialized. That would also mean getting rid of kfree_call_rcu_nobatch() entirely. [1] http://lore.kernel.org/lkml/20190723035725-mutt-send-email-mst@kernel.org [2] https://lkml.org/lkml/2017/12/19/824 Cc: kernel-team@android.com Cc: kernel-team@lge.com Co-developed-by: Byungchul Park <byungchul.park@lge.com> Signed-off-by: Byungchul Park <byungchul.park@lge.com> Signed-off-by: Joel Fernandes (Google) <joel@joelfernandes.org> [ paulmck: Applied 0day and Paul Walmsley feedback on ->monitor_todo. ] [ paulmck: Make it work during early boot. ] [ paulmck: Add a crude early boot self-test. ] [ paulmck: Style adjustments and experimental docbook structure header. ] Link: https://lore.kernel.org/lkml/alpine.DEB.2.21.9999.1908161931110.32497@viisi.sifive.com/T/#me9956f66cb611b95d26ae92700e1d901f46e8c59 Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
Diffstat (limited to 'kernel/rcu')
-rw-r--r--kernel/rcu/tree.c194
-rw-r--r--kernel/rcu/update.c10
2 files changed, 198 insertions, 6 deletions
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 1694a6b57ad8..0af016fdbf19 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -2683,19 +2683,187 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
}
EXPORT_SYMBOL_GPL(call_rcu);
+
+/* Maximum number of jiffies to wait before draining a batch. */
+#define KFREE_DRAIN_JIFFIES (HZ / 50)
+
+/**
+ * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
+ * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
+ * @head: List of kfree_rcu() objects not yet waiting for a grace period
+ * @head_free: List of kfree_rcu() objects already waiting for a grace period
+ * @lock: Synchronize access to this structure
+ * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
+ * @monitor_todo: Tracks whether a @monitor_work delayed work is pending
+ * @initialized: The @lock and @rcu_work fields have been initialized
+ *
+ * This is a per-CPU structure. The reason that it is not included in
+ * the rcu_data structure is to permit this code to be extracted from
+ * the RCU files. Such extraction could allow further optimization of
+ * the interactions with the slab allocators.
+ */
+struct kfree_rcu_cpu {
+ struct rcu_work rcu_work;
+ struct rcu_head *head;
+ struct rcu_head *head_free;
+ spinlock_t lock;
+ struct delayed_work monitor_work;
+ int monitor_todo;
+ bool initialized;
+};
+
+static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
+
/*
- * Queue an RCU callback for lazy invocation after a grace period.
- * This will likely be later named something like "call_rcu_lazy()",
- * but this change will require some way of tagging the lazy RCU
- * callbacks in the list of pending callbacks. Until then, this
- * function may only be called from __kfree_rcu().
+ * This function is invoked in workqueue context after a grace period.
+ * It frees all the objects queued on ->head_free.
*/
-void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+static void kfree_rcu_work(struct work_struct *work)
+{
+ unsigned long flags;
+ struct rcu_head *head, *next;
+ struct kfree_rcu_cpu *krcp;
+
+ krcp = container_of(to_rcu_work(work), struct kfree_rcu_cpu, rcu_work);
+ spin_lock_irqsave(&krcp->lock, flags);
+ head = krcp->head_free;
+ krcp->head_free = NULL;
+ spin_unlock_irqrestore(&krcp->lock, flags);
+
+ // List "head" is now private, so traverse locklessly.
+ for (; head; head = next) {
+ next = head->next;
+ // Potentially optimize with kfree_bulk in future.
+ __rcu_reclaim(rcu_state.name, head);
+ cond_resched_tasks_rcu_qs();
+ }
+}
+
+/*
+ * Schedule the kfree batch RCU work to run in workqueue context after a GP.
+ *
+ * This function is invoked by kfree_rcu_monitor() when the KFREE_DRAIN_JIFFIES
+ * timeout has been reached.
+ */
+static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
+{
+ lockdep_assert_held(&krcp->lock);
+
+ // If a previous RCU batch is in progress, we cannot immediately
+ // queue another one, so return false to tell caller to retry.
+ if (krcp->head_free)
+ return false;
+
+ krcp->head_free = krcp->head;
+ krcp->head = NULL;
+ INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
+ queue_rcu_work(system_wq, &krcp->rcu_work);
+ return true;
+}
+
+static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
+ unsigned long flags)
+{
+ // Attempt to start a new batch.
+ if (queue_kfree_rcu_work(krcp)) {
+ // Success! Our job is done here.
+ spin_unlock_irqrestore(&krcp->lock, flags);
+ return;
+ }
+
+ // Previous RCU batch still in progress, try again later.
+ if (!xchg(&krcp->monitor_todo, true))
+ schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
+ spin_unlock_irqrestore(&krcp->lock, flags);
+}
+
+/*
+ * This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
+ * It invokes kfree_rcu_drain_unlock() to attempt to start another batch.
+ */
+static void kfree_rcu_monitor(struct work_struct *work)
+{
+ unsigned long flags;
+ struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
+ monitor_work.work);
+
+ spin_lock_irqsave(&krcp->lock, flags);
+ if (xchg(&krcp->monitor_todo, false))
+ kfree_rcu_drain_unlock(krcp, flags);
+ else
+ spin_unlock_irqrestore(&krcp->lock, flags);
+}
+
+/*
+ * This version of kfree_call_rcu does not do batching of kfree_rcu() requests.
+ * Used only by rcuperf torture test for comparison with kfree_rcu_batch().
+ */
+void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
{
__call_rcu(head, func, 1);
}
+EXPORT_SYMBOL_GPL(kfree_call_rcu_nobatch);
+
+/*
+ * Queue a request for lazy invocation of kfree() after a grace period.
+ *
+ * Each kfree_call_rcu() request is added to a batch. The batch will be drained
+ * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch
+ * will be kfree'd in workqueue context. This allows us to:
+ *
+ * 1. Batch requests together to reduce the number of grace periods during
+ * heavy kfree_rcu() load.
+ *
+ * 2. It makes it possible to use kfree_bulk() on a large number of
+ * kfree_rcu() requests thus reducing cache misses and the per-object
+ * overhead of kfree().
+ */
+void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
+{
+ unsigned long flags;
+ struct kfree_rcu_cpu *krcp;
+
+ head->func = func;
+
+ local_irq_save(flags); // For safely calling this_cpu_ptr().
+ krcp = this_cpu_ptr(&krc);
+ if (krcp->initialized)
+ spin_lock(&krcp->lock);
+
+ // Queue the object but don't yet schedule the batch.
+ head->func = func;
+ head->next = krcp->head;
+ krcp->head = head;
+
+ // Set timer to drain after KFREE_DRAIN_JIFFIES.
+ if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
+ !xchg(&krcp->monitor_todo, true))
+ schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
+
+ if (krcp->initialized)
+ spin_unlock(&krcp->lock);
+ local_irq_restore(flags);
+}
EXPORT_SYMBOL_GPL(kfree_call_rcu);
+void __init kfree_rcu_scheduler_running(void)
+{
+ int cpu;
+ unsigned long flags;
+
+ for_each_online_cpu(cpu) {
+ struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+ spin_lock_irqsave(&krcp->lock, flags);
+ if (!krcp->head || xchg(&krcp->monitor_todo, true)) {
+ spin_unlock_irqrestore(&krcp->lock, flags);
+ continue;
+ }
+ schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
+ spin_unlock_irqrestore(&krcp->lock, flags);
+ }
+}
+
/*
* During early boot, any blocking grace-period wait automatically
* implies a grace period. Later on, this is never the case for PREEMPT.
@@ -3557,12 +3725,26 @@ static void __init rcu_dump_rcu_node_tree(void)
struct workqueue_struct *rcu_gp_wq;
struct workqueue_struct *rcu_par_gp_wq;
+static void __init kfree_rcu_batch_init(void)
+{
+ int cpu;
+
+ for_each_possible_cpu(cpu) {
+ struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
+
+ spin_lock_init(&krcp->lock);
+ INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
+ krcp->initialized = true;
+ }
+}
+
void __init rcu_init(void)
{
int cpu;
rcu_early_boot_tests();
+ kfree_rcu_batch_init();
rcu_bootup_announce();
rcu_init_geometry();
rcu_init_one();
diff --git a/kernel/rcu/update.c b/kernel/rcu/update.c
index 1861103662db..196487762b96 100644
--- a/kernel/rcu/update.c
+++ b/kernel/rcu/update.c
@@ -40,6 +40,7 @@
#include <linux/rcupdate_wait.h>
#include <linux/sched/isolation.h>
#include <linux/kprobes.h>
+#include <linux/slab.h>
#define CREATE_TRACE_POINTS
@@ -218,6 +219,7 @@ static int __init rcu_set_runtime_mode(void)
{
rcu_test_sync_prims();
rcu_scheduler_active = RCU_SCHEDULER_RUNNING;
+ kfree_rcu_scheduler_running();
rcu_test_sync_prims();
return 0;
}
@@ -853,14 +855,22 @@ static void test_callback(struct rcu_head *r)
DEFINE_STATIC_SRCU(early_srcu);
+struct early_boot_kfree_rcu {
+ struct rcu_head rh;
+};
+
static void early_boot_test_call_rcu(void)
{
static struct rcu_head head;
static struct rcu_head shead;
+ struct early_boot_kfree_rcu *rhp;
call_rcu(&head, test_callback);
if (IS_ENABLED(CONFIG_SRCU))
call_srcu(&early_srcu, &shead, test_callback);
+ rhp = kmalloc(sizeof(*rhp), GFP_KERNEL);
+ if (!WARN_ON_ONCE(!rhp))
+ kfree_rcu(rhp, rh);
}
void rcu_early_boot_tests(void)