diff options
-rw-r--r-- | include/linux/srcu.h | 10 | ||||
-rw-r--r-- | kernel/rcutorture.c | 2 | ||||
-rw-r--r-- | kernel/srcu.c | 284 |
3 files changed, 198 insertions, 98 deletions
diff --git a/include/linux/srcu.h b/include/linux/srcu.h index d3d5fa54f25e..a478c8eb8479 100644 --- a/include/linux/srcu.h +++ b/include/linux/srcu.h @@ -31,13 +31,19 @@ #include <linux/rcupdate.h> struct srcu_struct_array { - int c[2]; + unsigned long c[2]; }; +/* Bit definitions for field ->c above and ->snap below. */ +#define SRCU_USAGE_BITS 2 +#define SRCU_REF_MASK (ULONG_MAX >> SRCU_USAGE_BITS) +#define SRCU_USAGE_COUNT (SRCU_REF_MASK + 1) + struct srcu_struct { - int completed; + unsigned completed; struct srcu_struct_array __percpu *per_cpu_ref; struct mutex mutex; + unsigned long snap[NR_CPUS]; #ifdef CONFIG_DEBUG_LOCK_ALLOC struct lockdep_map dep_map; #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ diff --git a/kernel/rcutorture.c b/kernel/rcutorture.c index 8cd262b41499..d10b179dea83 100644 --- a/kernel/rcutorture.c +++ b/kernel/rcutorture.c @@ -639,7 +639,7 @@ static int srcu_torture_stats(char *page) cnt += sprintf(&page[cnt], "%s%s per-CPU(idx=%d):", torture_type, TORTURE_FLAG, idx); for_each_possible_cpu(cpu) { - cnt += sprintf(&page[cnt], " %d(%d,%d)", cpu, + cnt += sprintf(&page[cnt], " %d(%lu,%lu)", cpu, per_cpu_ptr(srcu_ctl.per_cpu_ref, cpu)->c[!idx], per_cpu_ptr(srcu_ctl.per_cpu_ref, cpu)->c[idx]); } diff --git a/kernel/srcu.c b/kernel/srcu.c index ba35f3a4a1f4..84c9b97dc3d9 100644 --- a/kernel/srcu.c +++ b/kernel/srcu.c @@ -73,19 +73,102 @@ EXPORT_SYMBOL_GPL(init_srcu_struct); #endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */ /* - * srcu_readers_active_idx -- returns approximate number of readers - * active on the specified rank of per-CPU counters. + * Returns approximate number of readers active on the specified rank + * of per-CPU counters. Also snapshots each counter's value in the + * corresponding element of sp->snap[] for later use validating + * the sum. */ +static unsigned long srcu_readers_active_idx(struct srcu_struct *sp, int idx) +{ + int cpu; + unsigned long sum = 0; + unsigned long t; -static int srcu_readers_active_idx(struct srcu_struct *sp, int idx) + for_each_possible_cpu(cpu) { + t = ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]); + sum += t; + sp->snap[cpu] = t; + } + return sum & SRCU_REF_MASK; +} + +/* + * To be called from the update side after an index flip. Returns true + * if the modulo sum of the counters is stably zero, false if there is + * some possibility of non-zero. + */ +static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx) { int cpu; - int sum; - sum = 0; + /* + * Note that srcu_readers_active_idx() can incorrectly return + * zero even though there is a pre-existing reader throughout. + * To see this, suppose that task A is in a very long SRCU + * read-side critical section that started on CPU 0, and that + * no other reader exists, so that the modulo sum of the counters + * is equal to one. Then suppose that task B starts executing + * srcu_readers_active_idx(), summing up to CPU 1, and then that + * task C starts reading on CPU 0, so that its increment is not + * summed, but finishes reading on CPU 2, so that its decrement + * -is- summed. Then when task B completes its sum, it will + * incorrectly get zero, despite the fact that task A has been + * in its SRCU read-side critical section the whole time. + * + * We therefore do a validation step should srcu_readers_active_idx() + * return zero. + */ + if (srcu_readers_active_idx(sp, idx) != 0) + return false; + + /* + * Since the caller recently flipped ->completed, we can see at + * most one increment of each CPU's counter from this point + * forward. The reason for this is that the reader CPU must have + * fetched the index before srcu_readers_active_idx checked + * that CPU's counter, but not yet incremented its counter. + * Its eventual counter increment will follow the read in + * srcu_readers_active_idx(), and that increment is immediately + * followed by smp_mb() B. Because smp_mb() D is between + * the ->completed flip and srcu_readers_active_idx()'s read, + * that CPU's subsequent load of ->completed must see the new + * value, and therefore increment the counter in the other rank. + */ + smp_mb(); /* A */ + + /* + * Now, we check the ->snap array that srcu_readers_active_idx() + * filled in from the per-CPU counter values. Since both + * __srcu_read_lock() and __srcu_read_unlock() increment the + * upper bits of the per-CPU counter, an increment/decrement + * pair will change the value of the counter. Since there is + * only one possible increment, the only way to wrap the counter + * is to have a huge number of counter decrements, which requires + * a huge number of tasks and huge SRCU read-side critical-section + * nesting levels, even on 32-bit systems. + * + * All of the ways of confusing the readings require that the scan + * in srcu_readers_active_idx() see the read-side task's decrement, + * but not its increment. However, between that decrement and + * increment are smb_mb() B and C. Either or both of these pair + * with smp_mb() A above to ensure that the scan below will see + * the read-side tasks's increment, thus noting a difference in + * the counter values between the two passes. + * + * Therefore, if srcu_readers_active_idx() returned zero, and + * none of the counters changed, we know that the zero was the + * correct sum. + * + * Of course, it is possible that a task might be delayed + * for a very long time in __srcu_read_lock() after fetching + * the index but before incrementing its counter. This + * possibility will be dealt with in __synchronize_srcu(). + */ for_each_possible_cpu(cpu) - sum += per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx]; - return sum; + if (sp->snap[cpu] != + ACCESS_ONCE(per_cpu_ptr(sp->per_cpu_ref, cpu)->c[idx])) + return false; /* False zero reading! */ + return true; } /** @@ -131,10 +214,11 @@ int __srcu_read_lock(struct srcu_struct *sp) int idx; preempt_disable(); - idx = sp->completed & 0x1; - barrier(); /* ensure compiler looks -once- at sp->completed. */ - per_cpu_ptr(sp->per_cpu_ref, smp_processor_id())->c[idx]++; - srcu_barrier(); /* ensure compiler won't misorder critical section. */ + idx = rcu_dereference_index_check(sp->completed, + rcu_read_lock_sched_held()) & 0x1; + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += + SRCU_USAGE_COUNT + 1; + smp_mb(); /* B */ /* Avoid leaking the critical section. */ preempt_enable(); return idx; } @@ -149,8 +233,9 @@ EXPORT_SYMBOL_GPL(__srcu_read_lock); void __srcu_read_unlock(struct srcu_struct *sp, int idx) { preempt_disable(); - srcu_barrier(); /* ensure compiler won't misorder critical section. */ - per_cpu_ptr(sp->per_cpu_ref, smp_processor_id())->c[idx]--; + smp_mb(); /* C */ /* Avoid leaking the critical section. */ + ACCESS_ONCE(this_cpu_ptr(sp->per_cpu_ref)->c[idx]) += + SRCU_USAGE_COUNT - 1; preempt_enable(); } EXPORT_SYMBOL_GPL(__srcu_read_unlock); @@ -163,12 +248,65 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock); * we repeatedly block for 1-millisecond time periods. This approach * has done well in testing, so there is no need for a config parameter. */ -#define SYNCHRONIZE_SRCU_READER_DELAY 10 +#define SYNCHRONIZE_SRCU_READER_DELAY 5 + +/* + * Flip the readers' index by incrementing ->completed, then wait + * until there are no more readers using the counters referenced by + * the old index value. (Recall that the index is the bottom bit + * of ->completed.) + * + * Of course, it is possible that a reader might be delayed for the + * full duration of flip_idx_and_wait() between fetching the + * index and incrementing its counter. This possibility is handled + * by __synchronize_srcu() invoking flip_idx_and_wait() twice. + */ +static void flip_idx_and_wait(struct srcu_struct *sp, bool expedited) +{ + int idx; + int trycount = 0; + + idx = sp->completed++ & 0x1; + + /* + * If a reader fetches the index before the above increment, + * but increments its counter after srcu_readers_active_idx_check() + * sums it, then smp_mb() D will pair with __srcu_read_lock()'s + * smp_mb() B to ensure that the SRCU read-side critical section + * will see any updates that the current task performed before its + * call to synchronize_srcu(), or to synchronize_srcu_expedited(), + * as the case may be. + */ + smp_mb(); /* D */ + + /* + * SRCU read-side critical sections are normally short, so wait + * a small amount of time before possibly blocking. + */ + if (!srcu_readers_active_idx_check(sp, idx)) { + udelay(SYNCHRONIZE_SRCU_READER_DELAY); + while (!srcu_readers_active_idx_check(sp, idx)) { + if (expedited && ++ trycount < 10) + udelay(SYNCHRONIZE_SRCU_READER_DELAY); + else + schedule_timeout_interruptible(1); + } + } + + /* + * The following smp_mb() E pairs with srcu_read_unlock()'s + * smp_mb C to ensure that if srcu_readers_active_idx_check() + * sees srcu_read_unlock()'s counter decrement, then any + * of the current task's subsequent code will happen after + * that SRCU read-side critical section. + */ + smp_mb(); /* E */ +} /* * Helper function for synchronize_srcu() and synchronize_srcu_expedited(). */ -static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) +static void __synchronize_srcu(struct srcu_struct *sp, bool expedited) { int idx; @@ -178,90 +316,53 @@ static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) !lock_is_held(&rcu_sched_lock_map), "Illegal synchronize_srcu() in same-type SRCU (or RCU) read-side critical section"); - idx = sp->completed; + smp_mb(); /* Ensure prior action happens before grace period. */ + idx = ACCESS_ONCE(sp->completed); + smp_mb(); /* Access to ->completed before lock acquisition. */ mutex_lock(&sp->mutex); /* * Check to see if someone else did the work for us while we were - * waiting to acquire the lock. We need -two- advances of + * waiting to acquire the lock. We need -three- advances of * the counter, not just one. If there was but one, we might have * shown up -after- our helper's first synchronize_sched(), thus * having failed to prevent CPU-reordering races with concurrent - * srcu_read_unlock()s on other CPUs (see comment below). So we - * either (1) wait for two or (2) supply the second ourselves. + * srcu_read_unlock()s on other CPUs (see comment below). If there + * was only two, we are guaranteed to have waited through only one + * full index-flip phase. So we either (1) wait for three or + * (2) supply the additional ones we need. */ - if ((sp->completed - idx) >= 2) { + if (sp->completed == idx + 2) + idx = 1; + else if (sp->completed == idx + 3) { mutex_unlock(&sp->mutex); return; - } - - sync_func(); /* Force memory barrier on all CPUs. */ + } else + idx = 0; /* - * The preceding synchronize_sched() ensures that any CPU that - * sees the new value of sp->completed will also see any preceding - * changes to data structures made by this CPU. This prevents - * some other CPU from reordering the accesses in its SRCU - * read-side critical section to precede the corresponding - * srcu_read_lock() -- ensuring that such references will in - * fact be protected. + * If there were no helpers, then we need to do two flips of + * the index. The first flip is required if there are any + * outstanding SRCU readers even if there are no new readers + * running concurrently with the first counter flip. * - * So it is now safe to do the flip. - */ - - idx = sp->completed & 0x1; - sp->completed++; - - sync_func(); /* Force memory barrier on all CPUs. */ - - /* - * At this point, because of the preceding synchronize_sched(), - * all srcu_read_lock() calls using the old counters have completed. - * Their corresponding critical sections might well be still - * executing, but the srcu_read_lock() primitives themselves - * will have finished executing. We initially give readers - * an arbitrarily chosen 10 microseconds to get out of their - * SRCU read-side critical sections, then loop waiting 1/HZ - * seconds per iteration. The 10-microsecond value has done - * very well in testing. - */ - - if (srcu_readers_active_idx(sp, idx)) - udelay(SYNCHRONIZE_SRCU_READER_DELAY); - while (srcu_readers_active_idx(sp, idx)) - schedule_timeout_interruptible(1); - - sync_func(); /* Force memory barrier on all CPUs. */ - - /* - * The preceding synchronize_sched() forces all srcu_read_unlock() - * primitives that were executing concurrently with the preceding - * for_each_possible_cpu() loop to have completed by this point. - * More importantly, it also forces the corresponding SRCU read-side - * critical sections to have also completed, and the corresponding - * references to SRCU-protected data items to be dropped. + * The second flip is required when a new reader picks up + * the old value of the index, but does not increment its + * counter until after its counters is summed/rechecked by + * srcu_readers_active_idx_check(). In this case, the current SRCU + * grace period would be OK because the SRCU read-side critical + * section started after this SRCU grace period started, so the + * grace period is not required to wait for the reader. * - * Note: - * - * Despite what you might think at first glance, the - * preceding synchronize_sched() -must- be within the - * critical section ended by the following mutex_unlock(). - * Otherwise, a task taking the early exit can race - * with a srcu_read_unlock(), which might have executed - * just before the preceding srcu_readers_active() check, - * and whose CPU might have reordered the srcu_read_unlock() - * with the preceding critical section. In this case, there - * is nothing preventing the synchronize_sched() task that is - * taking the early exit from freeing a data structure that - * is still being referenced (out of order) by the task - * doing the srcu_read_unlock(). - * - * Alternatively, the comparison with "2" on the early exit - * could be changed to "3", but this increases synchronize_srcu() - * latency for bulk loads. So the current code is preferred. + * However, the next SRCU grace period would be waiting for the + * other set of counters to go to zero, and therefore would not + * wait for the reader, which would be very bad. To avoid this + * bad scenario, we flip and wait twice, clearing out both sets + * of counters. */ - + for (; idx < 2; idx++) + flip_idx_and_wait(sp, expedited); mutex_unlock(&sp->mutex); } @@ -281,7 +382,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, void (*sync_func)(void)) */ void synchronize_srcu(struct srcu_struct *sp) { - __synchronize_srcu(sp, synchronize_sched); + __synchronize_srcu(sp, 0); } EXPORT_SYMBOL_GPL(synchronize_srcu); @@ -289,18 +390,11 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); * synchronize_srcu_expedited - Brute-force SRCU grace period * @sp: srcu_struct with which to synchronize. * - * Wait for an SRCU grace period to elapse, but use a "big hammer" - * approach to force the grace period to end quickly. This consumes - * significant time on all CPUs and is unfriendly to real-time workloads, - * so is thus not recommended for any sort of common-case code. In fact, - * if you are using synchronize_srcu_expedited() in a loop, please - * restructure your code to batch your updates, and then use a single - * synchronize_srcu() instead. + * Wait for an SRCU grace period to elapse, but be more aggressive about + * spinning rather than blocking when waiting. * * Note that it is illegal to call this function while holding any lock - * that is acquired by a CPU-hotplug notifier. And yes, it is also illegal - * to call this function from a CPU-hotplug notifier. Failing to observe - * these restriction will result in deadlock. It is also illegal to call + * that is acquired by a CPU-hotplug notifier. It is also illegal to call * synchronize_srcu_expedited() from the corresponding SRCU read-side * critical section; doing so will result in deadlock. However, it is * perfectly legal to call synchronize_srcu_expedited() on one srcu_struct @@ -309,7 +403,7 @@ EXPORT_SYMBOL_GPL(synchronize_srcu); */ void synchronize_srcu_expedited(struct srcu_struct *sp) { - __synchronize_srcu(sp, synchronize_sched_expedited); + __synchronize_srcu(sp, 1); } EXPORT_SYMBOL_GPL(synchronize_srcu_expedited); |