diff options
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r-- | fs/io-wq.c | 187 |
1 files changed, 108 insertions, 79 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c index 9174007ce107..91b85df0861e 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -33,6 +33,7 @@ enum { enum { IO_WQ_BIT_EXIT = 0, /* wq exiting */ IO_WQ_BIT_CANCEL = 1, /* cancel work on list */ + IO_WQ_BIT_ERROR = 2, /* error on setup */ }; enum { @@ -56,6 +57,7 @@ struct io_worker { struct rcu_head rcu; struct mm_struct *mm; + const struct cred *creds; struct files_struct *restore_files; }; @@ -82,7 +84,7 @@ enum { struct io_wqe { struct { spinlock_t lock; - struct list_head work_list; + struct io_wq_work_list work_list; unsigned long hash_map; unsigned flags; } ____cacheline_aligned_in_smp; @@ -103,13 +105,13 @@ struct io_wqe { struct io_wq { struct io_wqe **wqes; unsigned long state; - unsigned nr_wqes; get_work_fn *get_work; put_work_fn *put_work; struct task_struct *manager; struct user_struct *user; + struct cred *creds; struct mm_struct *mm; refcount_t refs; struct completion done; @@ -135,6 +137,11 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) { bool dropped_lock = false; + if (worker->creds) { + revert_creds(worker->creds); + worker->creds = NULL; + } + if (current->files != worker->restore_files) { __acquire(&wqe->lock); spin_unlock_irq(&wqe->lock); @@ -229,7 +236,8 @@ static void io_worker_exit(struct io_worker *worker) static inline bool io_wqe_run_queue(struct io_wqe *wqe) __must_hold(wqe->lock) { - if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED)) + if (!wq_list_empty(&wqe->work_list) && + !(wqe->flags & IO_WQE_FLAG_STALLED)) return true; return false; } @@ -327,9 +335,9 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, * If worker is moving from bound to unbound (or vice versa), then * ensure we update the running accounting. */ - worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; - work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; - if (worker_bound != work_bound) { + worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; + work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; + if (worker_bound != work_bound) { io_wqe_dec_running(wqe, worker); if (work_bound) { worker->flags |= IO_WORKER_F_BOUND; @@ -368,12 +376,15 @@ static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) __must_hold(wqe->lock) { + struct io_wq_work_node *node, *prev; struct io_wq_work *work; - list_for_each_entry(work, &wqe->work_list, list) { + wq_list_for_each(node, prev, &wqe->work_list) { + work = container_of(node, struct io_wq_work, list); + /* not hashed, can run anytime */ if (!(work->flags & IO_WQ_WORK_HASHED)) { - list_del(&work->list); + wq_node_del(&wqe->work_list, node, prev); return work; } @@ -381,7 +392,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash) *hash = work->flags >> IO_WQ_HASH_SHIFT; if (!(wqe->hash_map & BIT_ULL(*hash))) { wqe->hash_map |= BIT_ULL(*hash); - list_del(&work->list); + wq_node_del(&wqe->work_list, node, prev); return work; } } @@ -409,7 +420,7 @@ static void io_worker_handle_work(struct io_worker *worker) work = io_get_next_work(wqe, &hash); if (work) __io_worker_busy(wqe, worker, work); - else if (!list_empty(&wqe->work_list)) + else if (!wq_list_empty(&wqe->work_list)) wqe->flags |= IO_WQE_FLAG_STALLED; spin_unlock_irq(&wqe->lock); @@ -426,6 +437,9 @@ next: worker->cur_work = work; spin_unlock_irq(&worker->lock); + if (work->flags & IO_WQ_WORK_CB) + work->func(&work); + if ((work->flags & IO_WQ_WORK_NEEDS_FILES) && current->files != work->files) { task_lock(current); @@ -438,6 +452,8 @@ next: set_fs(USER_DS); worker->mm = wq->mm; } + if (!worker->creds) + worker->creds = override_creds(wq->creds); if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) work->flags |= IO_WQ_WORK_CANCEL; if (worker->mm) @@ -514,7 +530,7 @@ static int io_wqe_worker(void *data) if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) { spin_lock_irq(&wqe->lock); - if (!list_empty(&wqe->work_list)) + if (!wq_list_empty(&wqe->work_list)) io_worker_handle_work(worker); else spin_unlock_irq(&wqe->lock); @@ -562,14 +578,14 @@ void io_wq_worker_sleeping(struct task_struct *tsk) spin_unlock_irq(&wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) +static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { struct io_wqe_acct *acct =&wqe->acct[index]; struct io_worker *worker; - worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node); + worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); if (!worker) - return; + return false; refcount_set(&worker->ref, 1); worker->nulls_node.pprev = NULL; @@ -581,7 +597,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) "io_wqe_worker-%d/%d", index, wqe->node); if (IS_ERR(worker->task)) { kfree(worker); - return; + return false; } spin_lock_irq(&wqe->lock); @@ -599,6 +615,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) atomic_inc(&wq->user->processes); wake_up_process(worker->task); + return true; } static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) @@ -606,9 +623,6 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) { struct io_wqe_acct *acct = &wqe->acct[index]; - /* always ensure we have one bounded worker */ - if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers) - return true; /* if we have available workers or no work, no need */ if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe)) return false; @@ -621,12 +635,22 @@ static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) static int io_wq_manager(void *data) { struct io_wq *wq = data; + int workers_to_create = num_possible_nodes(); + int node; - while (!kthread_should_stop()) { - int i; + /* create fixed workers */ + refcount_set(&wq->refs, workers_to_create); + for_each_node(node) { + if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND)) + goto err; + workers_to_create--; + } + + complete(&wq->done); - for (i = 0; i < wq->nr_wqes; i++) { - struct io_wqe *wqe = wq->wqes[i]; + while (!kthread_should_stop()) { + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; bool fork_worker[2] = { false, false }; spin_lock_irq(&wqe->lock); @@ -645,6 +669,12 @@ static int io_wq_manager(void *data) } return 0; +err: + set_bit(IO_WQ_BIT_ERROR, &wq->state); + set_bit(IO_WQ_BIT_EXIT, &wq->state); + if (refcount_sub_and_test(workers_to_create, &wq->refs)) + complete(&wq->done); + return 0; } static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, @@ -688,7 +718,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) } spin_lock_irqsave(&wqe->lock, flags); - list_add_tail(&work->list, &wqe->work_list); + wq_list_add_tail(&work->list, &wqe->work_list); wqe->flags &= ~IO_WQE_FLAG_STALLED; spin_unlock_irqrestore(&wqe->lock, flags); @@ -750,7 +780,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe, void io_wq_cancel_all(struct io_wq *wq) { - int i; + int node; set_bit(IO_WQ_BIT_CANCEL, &wq->state); @@ -759,8 +789,8 @@ void io_wq_cancel_all(struct io_wq *wq) * to a worker and the worker putting itself on the busy_list */ rcu_read_lock(); - for (i = 0; i < wq->nr_wqes; i++) { - struct io_wqe *wqe = wq->wqes[i]; + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL); } @@ -803,14 +833,17 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, .cancel = cancel, .caller_data = cancel_data, }; + struct io_wq_work_node *node, *prev; struct io_wq_work *work; unsigned long flags; bool found = false; spin_lock_irqsave(&wqe->lock, flags); - list_for_each_entry(work, &wqe->work_list, list) { + wq_list_for_each(node, prev, &wqe->work_list) { + work = container_of(node, struct io_wq_work, list); + if (cancel(work, cancel_data)) { - list_del(&work->list); + wq_node_del(&wqe->work_list, node, prev); found = true; break; } @@ -833,10 +866,10 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, void *data) { enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int i; + int node; - for (i = 0; i < wq->nr_wqes; i++) { - struct io_wqe *wqe = wq->wqes[i]; + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; ret = io_wqe_cancel_cb_work(wqe, cancel, data); if (ret != IO_WQ_CANCEL_NOTFOUND) @@ -868,6 +901,7 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data) static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, struct io_wq_work *cwork) { + struct io_wq_work_node *node, *prev; struct io_wq_work *work; unsigned long flags; bool found = false; @@ -880,9 +914,11 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, * no completion will be posted for it. */ spin_lock_irqsave(&wqe->lock, flags); - list_for_each_entry(work, &wqe->work_list, list) { + wq_list_for_each(node, prev, &wqe->work_list) { + work = container_of(node, struct io_wq_work, list); + if (work == cwork) { - list_del(&work->list); + wq_node_del(&wqe->work_list, node, prev); found = true; break; } @@ -910,10 +946,10 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork) { enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND; - int i; + int node; - for (i = 0; i < wq->nr_wqes; i++) { - struct io_wqe *wqe = wq->wqes[i]; + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; ret = io_wqe_cancel_work(wqe, cwork); if (ret != IO_WQ_CANCEL_NOTFOUND) @@ -944,10 +980,10 @@ static void io_wq_flush_func(struct io_wq_work **workptr) void io_wq_flush(struct io_wq *wq) { struct io_wq_flush_data data; - int i; + int node; - for (i = 0; i < wq->nr_wqes; i++) { - struct io_wqe *wqe = wq->wqes[i]; + for_each_node(node) { + struct io_wqe *wqe = wq->wqes[node]; init_completion(&data.done); INIT_IO_WORK(&data.work, io_wq_flush_func); @@ -957,43 +993,39 @@ void io_wq_flush(struct io_wq *wq) } } -struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, - struct user_struct *user, get_work_fn *get_work, - put_work_fn *put_work) +struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) { - int ret = -ENOMEM, i, node; + int ret = -ENOMEM, node; struct io_wq *wq; - wq = kcalloc(1, sizeof(*wq), GFP_KERNEL); + wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return ERR_PTR(-ENOMEM); - wq->nr_wqes = num_online_nodes(); - wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL); + wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL); if (!wq->wqes) { kfree(wq); return ERR_PTR(-ENOMEM); } - wq->get_work = get_work; - wq->put_work = put_work; + wq->get_work = data->get_work; + wq->put_work = data->put_work; /* caller must already hold a reference to this */ - wq->user = user; + wq->user = data->user; + wq->creds = data->creds; - i = 0; - refcount_set(&wq->refs, wq->nr_wqes); - for_each_online_node(node) { + for_each_node(node) { struct io_wqe *wqe; - wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node); + wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node); if (!wqe) - break; - wq->wqes[i] = wqe; + goto err; + wq->wqes[node] = wqe; wqe->node = node; wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); - if (user) { + if (wq->user) { wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = task_rlimit(current, RLIMIT_NPROC); } @@ -1001,33 +1033,36 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, wqe->node = node; wqe->wq = wq; spin_lock_init(&wqe->lock); - INIT_LIST_HEAD(&wqe->work_list); + INIT_WQ_LIST(&wqe->work_list); INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0); INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1); INIT_LIST_HEAD(&wqe->all_list); - - i++; } init_completion(&wq->done); - if (i != wq->nr_wqes) - goto err; - /* caller must have already done mmgrab() on this mm */ - wq->mm = mm; + wq->mm = data->mm; wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager"); if (!IS_ERR(wq->manager)) { wake_up_process(wq->manager); + wait_for_completion(&wq->done); + if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) { + ret = -ENOMEM; + goto err; + } + reinit_completion(&wq->done); return wq; } ret = PTR_ERR(wq->manager); - wq->manager = NULL; -err: complete(&wq->done); - io_wq_destroy(wq); +err: + for_each_node(node) + kfree(wq->wqes[node]); + kfree(wq->wqes); + kfree(wq); return ERR_PTR(ret); } @@ -1039,27 +1074,21 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) void io_wq_destroy(struct io_wq *wq) { - int i; + int node; - if (wq->manager) { - set_bit(IO_WQ_BIT_EXIT, &wq->state); + set_bit(IO_WQ_BIT_EXIT, &wq->state); + if (wq->manager) kthread_stop(wq->manager); - } rcu_read_lock(); - for (i = 0; i < wq->nr_wqes; i++) { - struct io_wqe *wqe = wq->wqes[i]; - - if (!wqe) - continue; - io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL); - } + for_each_node(node) + io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); rcu_read_unlock(); wait_for_completion(&wq->done); - for (i = 0; i < wq->nr_wqes; i++) - kfree(wq->wqes[i]); + for_each_node(node) + kfree(wq->wqes[node]); kfree(wq->wqes); kfree(wq); } |