diff options
Diffstat (limited to 'kernel/padata.c')
-rw-r--r-- | kernel/padata.c | 307 |
1 files changed, 130 insertions, 177 deletions
diff --git a/kernel/padata.c b/kernel/padata.c index 15a8ad63f4ff..c3fec1413295 100644 --- a/kernel/padata.c +++ b/kernel/padata.c @@ -46,18 +46,13 @@ static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) return target_cpu; } -static int padata_cpu_hash(struct parallel_data *pd) +static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr) { - unsigned int seq_nr; - int cpu_index; - /* * Hash the sequence numbers to the cpus by taking * seq_nr mod. number of cpus in use. */ - - seq_nr = atomic_inc_return(&pd->seq_nr); - cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu); + int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu); return padata_index_to_cpu(pd, cpu_index); } @@ -94,17 +89,19 @@ static void padata_parallel_worker(struct work_struct *parallel_work) * * @pinst: padata instance * @padata: object to be parallelized - * @cb_cpu: cpu the serialization callback function will run on, - * must be in the serial cpumask of padata(i.e. cpumask.cbcpu). + * @cb_cpu: pointer to the CPU that the serialization callback function should + * run on. If it's not in the serial cpumask of @pinst + * (i.e. cpumask.cbcpu), this function selects a fallback CPU and if + * none found, returns -EINVAL. * * The parallelization callback function will run with BHs off. * Note: Every object which is parallelized by padata_do_parallel * must be seen by padata_do_serial. */ int padata_do_parallel(struct padata_instance *pinst, - struct padata_priv *padata, int cb_cpu) + struct padata_priv *padata, int *cb_cpu) { - int target_cpu, err; + int i, cpu, cpu_index, target_cpu, err; struct padata_parallel_queue *queue; struct parallel_data *pd; @@ -116,8 +113,19 @@ int padata_do_parallel(struct padata_instance *pinst, if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID) goto out; - if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu)) - goto out; + if (!cpumask_test_cpu(*cb_cpu, pd->cpumask.cbcpu)) { + if (!cpumask_weight(pd->cpumask.cbcpu)) + goto out; + + /* Select an alternate fallback CPU and notify the caller. */ + cpu_index = *cb_cpu % cpumask_weight(pd->cpumask.cbcpu); + + cpu = cpumask_first(pd->cpumask.cbcpu); + for (i = 0; i < cpu_index; i++) + cpu = cpumask_next(cpu, pd->cpumask.cbcpu); + + *cb_cpu = cpu; + } err = -EBUSY; if ((pinst->flags & PADATA_RESET)) @@ -129,9 +137,10 @@ int padata_do_parallel(struct padata_instance *pinst, err = 0; atomic_inc(&pd->refcnt); padata->pd = pd; - padata->cb_cpu = cb_cpu; + padata->cb_cpu = *cb_cpu; - target_cpu = padata_cpu_hash(pd); + padata->seq_nr = atomic_inc_return(&pd->seq_nr); + target_cpu = padata_cpu_hash(pd, padata->seq_nr); padata->cpu = target_cpu; queue = per_cpu_ptr(pd->pqueue, target_cpu); @@ -139,7 +148,7 @@ int padata_do_parallel(struct padata_instance *pinst, list_add_tail(&padata->list, &queue->parallel.list); spin_unlock(&queue->parallel.lock); - queue_work_on(target_cpu, pinst->wq, &queue->work); + queue_work(pinst->parallel_wq, &queue->work); out: rcu_read_unlock_bh(); @@ -149,63 +158,53 @@ out: EXPORT_SYMBOL(padata_do_parallel); /* - * padata_get_next - Get the next object that needs serialization. + * padata_find_next - Find the next object that needs serialization. * * Return values are: * * A pointer to the control struct of the next object that needs * serialization, if present in one of the percpu reorder queues. * - * -EINPROGRESS, if the next object that needs serialization will + * NULL, if the next object that needs serialization will * be parallel processed by another cpu and is not yet present in * the cpu's reorder queue. - * - * -ENODATA, if this cpu has to do the parallel processing for - * the next object. */ -static struct padata_priv *padata_get_next(struct parallel_data *pd) +static struct padata_priv *padata_find_next(struct parallel_data *pd, + bool remove_object) { - int cpu, num_cpus; - unsigned int next_nr, next_index; struct padata_parallel_queue *next_queue; struct padata_priv *padata; struct padata_list *reorder; + int cpu = pd->cpu; - num_cpus = cpumask_weight(pd->cpumask.pcpu); - - /* - * Calculate the percpu reorder queue and the sequence - * number of the next object. - */ - next_nr = pd->processed; - next_index = next_nr % num_cpus; - cpu = padata_index_to_cpu(pd, next_index); next_queue = per_cpu_ptr(pd->pqueue, cpu); - reorder = &next_queue->reorder; spin_lock(&reorder->lock); - if (!list_empty(&reorder->list)) { - padata = list_entry(reorder->list.next, - struct padata_priv, list); - - list_del_init(&padata->list); - atomic_dec(&pd->reorder_objects); + if (list_empty(&reorder->list)) { + spin_unlock(&reorder->lock); + return NULL; + } - pd->processed++; + padata = list_entry(reorder->list.next, struct padata_priv, list); + /* + * Checks the rare case where two or more parallel jobs have hashed to + * the same CPU and one of the later ones finishes first. + */ + if (padata->seq_nr != pd->processed) { spin_unlock(&reorder->lock); - goto out; + return NULL; } - spin_unlock(&reorder->lock); - if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) { - padata = ERR_PTR(-ENODATA); - goto out; + if (remove_object) { + list_del_init(&padata->list); + atomic_dec(&pd->reorder_objects); + ++pd->processed; + pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, false); } - padata = ERR_PTR(-EINPROGRESS); -out: + spin_unlock(&reorder->lock); return padata; } @@ -215,6 +214,7 @@ static void padata_reorder(struct parallel_data *pd) struct padata_priv *padata; struct padata_serial_queue *squeue; struct padata_instance *pinst = pd->pinst; + struct padata_parallel_queue *next_queue; /* * We need to ensure that only one cpu can work on dequeueing of @@ -230,27 +230,16 @@ static void padata_reorder(struct parallel_data *pd) return; while (1) { - padata = padata_get_next(pd); + padata = padata_find_next(pd, true); /* * If the next object that needs serialization is parallel * processed by another cpu and is still on it's way to the * cpu's reorder queue, nothing to do for now. */ - if (PTR_ERR(padata) == -EINPROGRESS) + if (!padata) break; - /* - * This cpu has to do the parallel processing of the next - * object. It's waiting in the cpu's parallelization queue, - * so exit immediately. - */ - if (PTR_ERR(padata) == -ENODATA) { - del_timer(&pd->timer); - spin_unlock_bh(&pd->lock); - return; - } - cb_cpu = padata->cb_cpu; squeue = per_cpu_ptr(pd->squeue, cb_cpu); @@ -258,77 +247,37 @@ static void padata_reorder(struct parallel_data *pd) list_add_tail(&padata->list, &squeue->serial.list); spin_unlock(&squeue->serial.lock); - queue_work_on(cb_cpu, pinst->wq, &squeue->work); + queue_work_on(cb_cpu, pinst->serial_wq, &squeue->work); } spin_unlock_bh(&pd->lock); /* * The next object that needs serialization might have arrived to - * the reorder queues in the meantime, we will be called again - * from the timer function if no one else cares for it. + * the reorder queues in the meantime. * - * Ensure reorder_objects is read after pd->lock is dropped so we see - * an increment from another task in padata_do_serial. Pairs with + * Ensure reorder queue is read after pd->lock is dropped so we see + * new objects from another task in padata_do_serial. Pairs with * smp_mb__after_atomic in padata_do_serial. */ smp_mb(); - if (atomic_read(&pd->reorder_objects) - && !(pinst->flags & PADATA_RESET)) - mod_timer(&pd->timer, jiffies + HZ); - else - del_timer(&pd->timer); - return; + next_queue = per_cpu_ptr(pd->pqueue, pd->cpu); + if (!list_empty(&next_queue->reorder.list) && + padata_find_next(pd, false)) + queue_work(pinst->serial_wq, &pd->reorder_work); } static void invoke_padata_reorder(struct work_struct *work) { - struct padata_parallel_queue *pqueue; struct parallel_data *pd; local_bh_disable(); - pqueue = container_of(work, struct padata_parallel_queue, reorder_work); - pd = pqueue->pd; + pd = container_of(work, struct parallel_data, reorder_work); padata_reorder(pd); local_bh_enable(); } -static void padata_reorder_timer(struct timer_list *t) -{ - struct parallel_data *pd = from_timer(pd, t, timer); - unsigned int weight; - int target_cpu, cpu; - - cpu = get_cpu(); - - /* We don't lock pd here to not interfere with parallel processing - * padata_reorder() calls on other CPUs. We just need any CPU out of - * the cpumask.pcpu set. It would be nice if it's the right one but - * it doesn't matter if we're off to the next one by using an outdated - * pd->processed value. - */ - weight = cpumask_weight(pd->cpumask.pcpu); - target_cpu = padata_index_to_cpu(pd, pd->processed % weight); - - /* ensure to call the reorder callback on the correct CPU */ - if (cpu != target_cpu) { - struct padata_parallel_queue *pqueue; - struct padata_instance *pinst; - - /* The timer function is serialized wrt itself -- no locking - * needed. - */ - pinst = pd->pinst; - pqueue = per_cpu_ptr(pd->pqueue, target_cpu); - queue_work_on(target_cpu, pinst->wq, &pqueue->reorder_work); - } else { - padata_reorder(pd); - } - - put_cpu(); -} - static void padata_serial_worker(struct work_struct *serial_work) { struct padata_serial_queue *squeue; @@ -367,47 +316,28 @@ static void padata_serial_worker(struct work_struct *serial_work) */ void padata_do_serial(struct padata_priv *padata) { - int cpu; - struct padata_parallel_queue *pqueue; - struct parallel_data *pd; - int reorder_via_wq = 0; - - pd = padata->pd; - - cpu = get_cpu(); - - /* We need to run on the same CPU padata_do_parallel(.., padata, ..) - * was called on -- or, at least, enqueue the padata object into the - * correct per-cpu queue. - */ - if (cpu != padata->cpu) { - reorder_via_wq = 1; - cpu = padata->cpu; - } - - pqueue = per_cpu_ptr(pd->pqueue, cpu); + struct parallel_data *pd = padata->pd; + struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue, + padata->cpu); + struct padata_priv *cur; spin_lock(&pqueue->reorder.lock); + /* Sort in ascending order of sequence number. */ + list_for_each_entry_reverse(cur, &pqueue->reorder.list, list) + if (cur->seq_nr < padata->seq_nr) + break; + list_add(&padata->list, &cur->list); atomic_inc(&pd->reorder_objects); - list_add_tail(&padata->list, &pqueue->reorder.list); spin_unlock(&pqueue->reorder.lock); /* - * Ensure the atomic_inc of reorder_objects above is ordered correctly + * Ensure the addition to the reorder list is ordered correctly * with the trylock of pd->lock in padata_reorder. Pairs with smp_mb * in padata_reorder. */ smp_mb__after_atomic(); - put_cpu(); - - /* If we're running on the wrong CPU, call padata_reorder() via a - * kernel worker. - */ - if (reorder_via_wq) - queue_work_on(cpu, pd->pinst->wq, &pqueue->reorder_work); - else - padata_reorder(pd); + padata_reorder(pd); } EXPORT_SYMBOL(padata_do_serial); @@ -415,17 +345,36 @@ static int padata_setup_cpumasks(struct parallel_data *pd, const struct cpumask *pcpumask, const struct cpumask *cbcpumask) { - if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) - return -ENOMEM; + struct workqueue_attrs *attrs; + int err = -ENOMEM; + if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) + goto out; cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask); - if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) { - free_cpumask_var(pd->cpumask.pcpu); - return -ENOMEM; - } + if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) + goto free_pcpu_mask; cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask); + + attrs = alloc_workqueue_attrs(); + if (!attrs) + goto free_cbcpu_mask; + + /* Restrict parallel_wq workers to pd->cpumask.pcpu. */ + cpumask_copy(attrs->cpumask, pd->cpumask.pcpu); + err = apply_workqueue_attrs(pd->pinst->parallel_wq, attrs); + free_workqueue_attrs(attrs); + if (err < 0) + goto free_cbcpu_mask; + return 0; + +free_cbcpu_mask: + free_cpumask_var(pd->cpumask.cbcpu); +free_pcpu_mask: + free_cpumask_var(pd->cpumask.pcpu); +out: + return err; } static void __padata_list_init(struct padata_list *pd_list) @@ -451,26 +400,15 @@ static void padata_init_squeues(struct parallel_data *pd) /* Initialize all percpu queues used by parallel workers */ static void padata_init_pqueues(struct parallel_data *pd) { - int cpu_index, cpu; + int cpu; struct padata_parallel_queue *pqueue; - cpu_index = 0; - for_each_possible_cpu(cpu) { + for_each_cpu(cpu, pd->cpumask.pcpu) { pqueue = per_cpu_ptr(pd->pqueue, cpu); - if (!cpumask_test_cpu(cpu, pd->cpumask.pcpu)) { - pqueue->cpu_index = -1; - continue; - } - - pqueue->pd = pd; - pqueue->cpu_index = cpu_index; - cpu_index++; - __padata_list_init(&pqueue->reorder); __padata_list_init(&pqueue->parallel); INIT_WORK(&pqueue->work, padata_parallel_worker); - INIT_WORK(&pqueue->reorder_work, invoke_padata_reorder); atomic_set(&pqueue->num_obj, 0); } } @@ -493,17 +431,19 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, pd->squeue = alloc_percpu(struct padata_serial_queue); if (!pd->squeue) goto err_free_pqueue; + + pd->pinst = pinst; if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0) goto err_free_squeue; padata_init_pqueues(pd); padata_init_squeues(pd); - timer_setup(&pd->timer, padata_reorder_timer, 0); atomic_set(&pd->seq_nr, -1); atomic_set(&pd->reorder_objects, 0); atomic_set(&pd->refcnt, 0); - pd->pinst = pinst; spin_lock_init(&pd->lock); + pd->cpu = cpumask_first(pd->cpumask.pcpu); + INIT_WORK(&pd->reorder_work, invoke_padata_reorder); return pd; @@ -538,8 +478,6 @@ static void padata_flush_queues(struct parallel_data *pd) flush_work(&pqueue->work); } - del_timer_sync(&pd->timer); - if (atomic_read(&pd->reorder_objects)) padata_reorder(pd); @@ -883,6 +821,8 @@ static void __padata_free(struct padata_instance *pinst) padata_free_pd(pinst->pd); free_cpumask_var(pinst->cpumask.pcpu); free_cpumask_var(pinst->cpumask.cbcpu); + destroy_workqueue(pinst->serial_wq); + destroy_workqueue(pinst->parallel_wq); kfree(pinst); } @@ -1016,13 +956,11 @@ static struct kobj_type padata_attr_type = { * padata_alloc - allocate and initialize a padata instance and specify * cpumasks for serial and parallel workers. * - * @wq: workqueue to use for the allocated padata instance + * @name: used to identify the instance * @pcpumask: cpumask that will be used for padata parallelization * @cbcpumask: cpumask that will be used for padata serialization - * - * Must be called from a cpus_read_lock() protected region */ -static struct padata_instance *padata_alloc(struct workqueue_struct *wq, +static struct padata_instance *padata_alloc(const char *name, const struct cpumask *pcpumask, const struct cpumask *cbcpumask) { @@ -1033,11 +971,23 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq, if (!pinst) goto err; - if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL)) + pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0, + name); + if (!pinst->parallel_wq) goto err_free_inst; + + get_online_cpus(); + + pinst->serial_wq = alloc_workqueue("%s_serial", WQ_MEM_RECLAIM | + WQ_CPU_INTENSIVE, 1, name); + if (!pinst->serial_wq) + goto err_put_cpus; + + if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL)) + goto err_free_serial_wq; if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) { free_cpumask_var(pinst->cpumask.pcpu); - goto err_free_inst; + goto err_free_serial_wq; } if (!padata_validate_cpumask(pinst, pcpumask) || !padata_validate_cpumask(pinst, cbcpumask)) @@ -1049,8 +999,6 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq, rcu_assign_pointer(pinst->pd, pd); - pinst->wq = wq; - cpumask_copy(pinst->cpumask.pcpu, pcpumask); cpumask_copy(pinst->cpumask.cbcpu, cbcpumask); @@ -1063,11 +1011,19 @@ static struct padata_instance *padata_alloc(struct workqueue_struct *wq, #ifdef CONFIG_HOTPLUG_CPU cpuhp_state_add_instance_nocalls_cpuslocked(hp_online, &pinst->node); #endif + + put_online_cpus(); + return pinst; err_free_masks: free_cpumask_var(pinst->cpumask.pcpu); free_cpumask_var(pinst->cpumask.cbcpu); +err_free_serial_wq: + destroy_workqueue(pinst->serial_wq); +err_put_cpus: + put_online_cpus(); + destroy_workqueue(pinst->parallel_wq); err_free_inst: kfree(pinst); err: @@ -1079,14 +1035,11 @@ err: * Use the cpu_possible_mask for serial and * parallel workers. * - * @wq: workqueue to use for the allocated padata instance - * - * Must be called from a cpus_read_lock() protected region + * @name: used to identify the instance */ -struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq) +struct padata_instance *padata_alloc_possible(const char *name) { - lockdep_assert_cpus_held(); - return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask); + return padata_alloc(name, cpu_possible_mask, cpu_possible_mask); } EXPORT_SYMBOL(padata_alloc_possible); |