summaryrefslogtreecommitdiffstats
path: root/io_uring
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring')
-rw-r--r--io_uring/filetable.c2
-rw-r--r--io_uring/io_uring.c379
-rw-r--r--io_uring/io_uring.h84
-rw-r--r--io_uring/kbuf.c16
-rw-r--r--io_uring/msg_ring.c168
-rw-r--r--io_uring/msg_ring.h1
-rw-r--r--io_uring/net.c110
-rw-r--r--io_uring/notif.c57
-rw-r--r--io_uring/notif.h15
-rw-r--r--io_uring/opdef.c8
-rw-r--r--io_uring/opdef.h2
-rw-r--r--io_uring/poll.c181
-rw-r--r--io_uring/rsrc.c71
-rw-r--r--io_uring/rsrc.h1
-rw-r--r--io_uring/rw.c16
-rw-r--r--io_uring/timeout.c10
-rw-r--r--io_uring/uring_cmd.c2
-rw-r--r--io_uring/xattr.c8
18 files changed, 737 insertions, 394 deletions
diff --git a/io_uring/filetable.c b/io_uring/filetable.c
index 7b473259f3f4..68dfc6936aa7 100644
--- a/io_uring/filetable.c
+++ b/io_uring/filetable.c
@@ -101,8 +101,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
err:
if (needs_switch)
io_rsrc_node_switch(ctx, ctx->file_data);
- if (ret)
- fput(file);
return ret;
}
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 6cc16e39b27f..b521186efa5c 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req);
static void io_queue_sqe(struct io_kiocb *req);
static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
+static __cold void io_fallback_tw(struct io_uring_task *tctx);
static struct kmem_cache *req_cachep;
@@ -167,7 +168,8 @@ EXPORT_SYMBOL(io_uring_get_socket);
static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
{
- if (!wq_list_empty(&ctx->submit_state.compl_reqs))
+ if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
+ ctx->submit_state.cqes_count)
__io_submit_flush_completions(ctx);
}
@@ -176,6 +178,11 @@ static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
}
+static inline unsigned int __io_cqring_events_user(struct io_ring_ctx *ctx)
+{
+ return READ_ONCE(ctx->rings->cq.tail) - READ_ONCE(ctx->rings->cq.head);
+}
+
static bool io_match_linked(struct io_kiocb *head)
{
struct io_kiocb *req;
@@ -320,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
spin_lock_init(&ctx->rsrc_ref_lock);
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
+ init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
init_llist_head(&ctx->rsrc_put_llist);
init_llist_head(&ctx->work_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
@@ -490,7 +498,7 @@ static void io_eventfd_ops(struct rcu_head *rcu)
int ops = atomic_xchg(&ev_fd->ops, 0);
if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
- eventfd_signal(ev_fd->cq_ev_fd, 1);
+ eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
/* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
* ordering in a race but if references are 0 we know we have to free
@@ -526,7 +534,7 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx)
goto out;
if (likely(eventfd_signal_allowed())) {
- eventfd_signal(ev_fd->cq_ev_fd, 1);
+ eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
} else {
atomic_inc(&ev_fd->refs);
if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
@@ -576,33 +584,63 @@ void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
io_eventfd_flush_signal(ctx);
}
-static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+static inline void __io_cq_lock(struct io_ring_ctx *ctx)
+ __acquires(ctx->completion_lock)
+{
+ if (!ctx->task_complete)
+ spin_lock(&ctx->completion_lock);
+}
+
+static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
+{
+ if (!ctx->task_complete)
+ spin_unlock(&ctx->completion_lock);
+}
+
+/* keep it inlined for io_submit_flush_completions() */
+static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
+ __releases(ctx->completion_lock)
{
+ io_commit_cqring(ctx);
+ __io_cq_unlock(ctx);
io_commit_cqring_flush(ctx);
io_cqring_wake(ctx);
}
-static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
+void io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
{
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
- io_cqring_ev_posted(ctx);
+ io_commit_cqring_flush(ctx);
+ io_cqring_wake(ctx);
}
-void io_cq_unlock_post(struct io_ring_ctx *ctx)
+/* Returns true if there are no backlogged entries after the flush */
+static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
- __io_cq_unlock_post(ctx);
+ struct io_overflow_cqe *ocqe;
+ LIST_HEAD(list);
+
+ io_cq_lock(ctx);
+ list_splice_init(&ctx->cq_overflow_list, &list);
+ clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
+ io_cq_unlock(ctx);
+
+ while (!list_empty(&list)) {
+ ocqe = list_first_entry(&list, struct io_overflow_cqe, list);
+ list_del(&ocqe->list);
+ kfree(ocqe);
+ }
}
/* Returns true if there are no backlogged entries after the flush */
-static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
+static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{
- bool all_flushed;
size_t cqe_size = sizeof(struct io_uring_cqe);
- if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
- return false;
+ if (__io_cqring_events(ctx) == ctx->cq_entries)
+ return;
if (ctx->flags & IORING_SETUP_CQE32)
cqe_size <<= 1;
@@ -612,43 +650,32 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
struct io_overflow_cqe *ocqe;
- if (!cqe && !force)
+ if (!cqe)
break;
ocqe = list_first_entry(&ctx->cq_overflow_list,
struct io_overflow_cqe, list);
- if (cqe)
- memcpy(cqe, &ocqe->cqe, cqe_size);
- else
- io_account_cq_overflow(ctx);
-
+ memcpy(cqe, &ocqe->cqe, cqe_size);
list_del(&ocqe->list);
kfree(ocqe);
}
- all_flushed = list_empty(&ctx->cq_overflow_list);
- if (all_flushed) {
+ if (list_empty(&ctx->cq_overflow_list)) {
clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
}
-
io_cq_unlock_post(ctx);
- return all_flushed;
}
-static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
+static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{
- bool ret = true;
-
if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
/* iopoll syncs against uring_lock, not completion_lock */
if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_lock(&ctx->uring_lock);
- ret = __io_cqring_overflow_flush(ctx, false);
+ __io_cqring_overflow_flush(ctx);
if (ctx->flags & IORING_SETUP_IOPOLL)
mutex_unlock(&ctx->uring_lock);
}
-
- return ret;
}
void __io_put_task(struct task_struct *task, int nr)
@@ -773,11 +800,14 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
return &rings->cqes[off];
}
-bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
- bool allow_overflow)
+static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
+ u32 cflags)
{
struct io_uring_cqe *cqe;
+ if (!ctx->task_complete)
+ lockdep_assert_held(&ctx->completion_lock);
+
ctx->cq_extra++;
/*
@@ -799,34 +829,100 @@ bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags
}
return true;
}
-
- if (allow_overflow)
- return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
-
return false;
}
-bool io_post_aux_cqe(struct io_ring_ctx *ctx,
- u64 user_data, s32 res, u32 cflags,
- bool allow_overflow)
+static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
+ __must_hold(&ctx->uring_lock)
+{
+ struct io_submit_state *state = &ctx->submit_state;
+ unsigned int i;
+
+ lockdep_assert_held(&ctx->uring_lock);
+ for (i = 0; i < state->cqes_count; i++) {
+ struct io_uring_cqe *cqe = &state->cqes[i];
+
+ if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
+ if (ctx->task_complete) {
+ spin_lock(&ctx->completion_lock);
+ io_cqring_event_overflow(ctx, cqe->user_data,
+ cqe->res, cqe->flags, 0, 0);
+ spin_unlock(&ctx->completion_lock);
+ } else {
+ io_cqring_event_overflow(ctx, cqe->user_data,
+ cqe->res, cqe->flags, 0, 0);
+ }
+ }
+ }
+ state->cqes_count = 0;
+}
+
+static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
+ bool allow_overflow)
{
bool filled;
io_cq_lock(ctx);
- filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow);
+ filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
+ if (!filled && allow_overflow)
+ filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
+
io_cq_unlock_post(ctx);
return filled;
}
-static void __io_req_complete_put(struct io_kiocb *req)
+bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
+{
+ return __io_post_aux_cqe(ctx, user_data, res, cflags, true);
+}
+
+bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags,
+ bool allow_overflow)
+{
+ struct io_uring_cqe *cqe;
+ unsigned int length;
+
+ if (!defer)
+ return __io_post_aux_cqe(ctx, user_data, res, cflags, allow_overflow);
+
+ length = ARRAY_SIZE(ctx->submit_state.cqes);
+
+ lockdep_assert_held(&ctx->uring_lock);
+
+ if (ctx->submit_state.cqes_count == length) {
+ __io_cq_lock(ctx);
+ __io_flush_post_cqes(ctx);
+ /* no need to flush - flush is deferred */
+ __io_cq_unlock_post(ctx);
+ }
+
+ /* For defered completions this is not as strict as it is otherwise,
+ * however it's main job is to prevent unbounded posted completions,
+ * and in that it works just as well.
+ */
+ if (!allow_overflow && test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
+ return false;
+
+ cqe = &ctx->submit_state.cqes[ctx->submit_state.cqes_count++];
+ cqe->user_data = user_data;
+ cqe->res = res;
+ cqe->flags = cflags;
+ return true;
+}
+
+static void __io_req_complete_post(struct io_kiocb *req)
{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ io_cq_lock(ctx);
+ if (!(req->flags & REQ_F_CQE_SKIP))
+ __io_fill_cqe_req(ctx, req);
+
/*
* If we're the last reference to this request, add to our locked
* free_list cache.
*/
if (req_ref_put_and_test(req)) {
- struct io_ring_ctx *ctx = req->ctx;
-
if (req->flags & IO_REQ_LINK_FLAGS) {
if (req->flags & IO_DISARM_MASK)
io_disarm_next(req);
@@ -847,38 +943,38 @@ static void __io_req_complete_put(struct io_kiocb *req)
wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
ctx->locked_free_nr++;
}
-}
-
-void __io_req_complete_post(struct io_kiocb *req)
-{
- if (!(req->flags & REQ_F_CQE_SKIP))
- __io_fill_cqe_req(req->ctx, req);
- __io_req_complete_put(req);
-}
-
-void io_req_complete_post(struct io_kiocb *req)
-{
- struct io_ring_ctx *ctx = req->ctx;
-
- io_cq_lock(ctx);
- __io_req_complete_post(req);
io_cq_unlock_post(ctx);
}
-inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
+void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
- io_req_complete_post(req);
+ if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) {
+ req->io_task_work.func = io_req_task_complete;
+ io_req_task_work_add(req);
+ } else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
+ !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
+ __io_req_complete_post(req);
+ } else {
+ struct io_ring_ctx *ctx = req->ctx;
+
+ mutex_lock(&ctx->uring_lock);
+ __io_req_complete_post(req);
+ mutex_unlock(&ctx->uring_lock);
+ }
}
-void io_req_complete_failed(struct io_kiocb *req, s32 res)
+void io_req_defer_failed(struct io_kiocb *req, s32 res)
+ __must_hold(&ctx->uring_lock)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
+ lockdep_assert_held(&req->ctx->uring_lock);
+
req_set_fail(req);
io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
if (def->fail)
def->fail(req);
- io_req_complete_post(req);
+ io_req_complete_defer(req);
}
/*
@@ -1079,10 +1175,17 @@ void tctx_task_work(struct callback_head *cb)
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work);
struct llist_node fake = {};
- struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake);
+ struct llist_node *node;
unsigned int loops = 1;
- unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL);
+ unsigned int count;
+
+ if (unlikely(current->flags & PF_EXITING)) {
+ io_fallback_tw(tctx);
+ return;
+ }
+ node = io_llist_xchg(&tctx->task_list, &fake);
+ count = handle_tw_list(node, &ctx, &uring_locked, NULL);
node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
while (node != &fake) {
loops++;
@@ -1100,6 +1203,20 @@ void tctx_task_work(struct callback_head *cb)
trace_io_uring_task_work_run(tctx, count, loops);
}
+static __cold void io_fallback_tw(struct io_uring_task *tctx)
+{
+ struct llist_node *node = llist_del_all(&tctx->task_list);
+ struct io_kiocb *req;
+
+ while (node) {
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ node = node->next;
+ if (llist_add(&req->io_task_work.node,
+ &req->ctx->fallback_llist))
+ schedule_delayed_work(&req->ctx->fallback_work, 1);
+ }
+}
+
static void io_req_local_work_add(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -1122,11 +1239,10 @@ static void io_req_local_work_add(struct io_kiocb *req)
__io_cqring_wake(ctx);
}
-static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
+void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
- struct llist_node *node;
if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
io_req_local_work_add(req);
@@ -1143,20 +1259,7 @@ static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local
if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
return;
- node = llist_del_all(&tctx->task_list);
-
- while (node) {
- req = container_of(node, struct io_kiocb, io_task_work.node);
- node = node->next;
- if (llist_add(&req->io_task_work.node,
- &req->ctx->fallback_llist))
- schedule_delayed_work(&req->ctx->fallback_work, 1);
- }
-}
-
-void io_req_task_work_add(struct io_kiocb *req)
-{
- __io_req_task_work_add(req, true);
+ io_fallback_tw(tctx);
}
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
@@ -1173,7 +1276,7 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
}
}
-int __io_run_local_work(struct io_ring_ctx *ctx, bool locked)
+int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
{
struct llist_node *node;
struct llist_node fake;
@@ -1192,7 +1295,7 @@ again:
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
prefetch(container_of(next, struct io_kiocb, io_task_work.node));
- req->io_task_work.func(req, &locked);
+ req->io_task_work.func(req, locked);
ret++;
node = next;
}
@@ -1208,7 +1311,7 @@ again:
goto again;
}
- if (locked)
+ if (*locked)
io_submit_flush_completions(ctx);
trace_io_uring_local_work_run(ctx, ret, loops);
return ret;
@@ -1225,30 +1328,17 @@ int io_run_local_work(struct io_ring_ctx *ctx)
__set_current_state(TASK_RUNNING);
locked = mutex_trylock(&ctx->uring_lock);
- ret = __io_run_local_work(ctx, locked);
+ ret = __io_run_local_work(ctx, &locked);
if (locked)
mutex_unlock(&ctx->uring_lock);
return ret;
}
-static void io_req_tw_post(struct io_kiocb *req, bool *locked)
-{
- io_req_complete_post(req);
-}
-
-void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags)
-{
- io_req_set_res(req, res, cflags);
- req->io_task_work.func = io_req_tw_post;
- io_req_task_work_add(req);
-}
-
static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
{
- /* not needed for normal modes, but SQPOLL depends on it */
io_tw_lock(req->ctx, locked);
- io_req_complete_failed(req, req->cqe.res);
+ io_req_defer_failed(req, req->cqe.res);
}
void io_req_task_submit(struct io_kiocb *req, bool *locked)
@@ -1258,7 +1348,7 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked)
if (likely(!(req->task->flags & PF_EXITING)))
io_queue_sqe(req);
else
- io_req_complete_failed(req, -EFAULT);
+ io_req_defer_failed(req, -EFAULT);
}
void io_req_task_queue_fail(struct io_kiocb *req, int ret)
@@ -1338,18 +1428,31 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
struct io_wq_work_node *node, *prev;
struct io_submit_state *state = &ctx->submit_state;
- io_cq_lock(ctx);
+ __io_cq_lock(ctx);
+ /* must come first to preserve CQE ordering in failure cases */
+ if (state->cqes_count)
+ __io_flush_post_cqes(ctx);
wq_list_for_each(node, prev, &state->compl_reqs) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
comp_list);
- if (!(req->flags & REQ_F_CQE_SKIP))
- __io_fill_cqe_req(ctx, req);
+ if (!(req->flags & REQ_F_CQE_SKIP) &&
+ unlikely(!__io_fill_cqe_req(ctx, req))) {
+ if (ctx->task_complete) {
+ spin_lock(&ctx->completion_lock);
+ io_req_cqe_overflow(req);
+ spin_unlock(&ctx->completion_lock);
+ } else {
+ io_req_cqe_overflow(req);
+ }
+ }
}
__io_cq_unlock_post(ctx);
- io_free_batch_list(ctx, state->compl_reqs.first);
- INIT_WQ_LIST(&state->compl_reqs);
+ if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
+ io_free_batch_list(ctx, state->compl_reqs.first);
+ INIT_WQ_LIST(&state->compl_reqs);
+ }
}
/*
@@ -1415,7 +1518,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
check_cq = READ_ONCE(ctx->check_cq);
if (unlikely(check_cq)) {
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
- __io_cqring_overflow_flush(ctx, false);
+ __io_cqring_overflow_flush(ctx);
/*
* Similarly do not spin if we have not informed the user of any
* dropped CQE.
@@ -1446,8 +1549,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
io_task_work_pending(ctx)) {
u32 tail = ctx->cached_cq_tail;
- if (!llist_empty(&ctx->work_llist))
- __io_run_local_work(ctx, true);
+ (void) io_run_local_work_locked(ctx);
if (task_work_pending(current) ||
wq_list_empty(&ctx->iopoll_list)) {
@@ -1472,16 +1574,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
void io_req_task_complete(struct io_kiocb *req, bool *locked)
{
- if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
- unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
-
- req->cqe.flags |= io_put_kbuf(req, issue_flags);
- }
-
if (*locked)
io_req_complete_defer(req);
else
- io_req_complete_post(req);
+ io_req_complete_post(req, IO_URING_F_UNLOCKED);
}
/*
@@ -1631,6 +1727,7 @@ static u32 io_get_sequence(struct io_kiocb *req)
}
static __cold void io_drain_req(struct io_kiocb *req)
+ __must_hold(&ctx->uring_lock)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_defer_entry *de;
@@ -1651,7 +1748,7 @@ queue:
ret = io_req_prep_async(req);
if (ret) {
fail:
- io_req_complete_failed(req, ret);
+ io_req_defer_failed(req, ret);
return;
}
io_prep_async_link(req);
@@ -1748,12 +1845,12 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
if (issue_flags & IO_URING_F_COMPLETE_DEFER)
io_req_complete_defer(req);
else
- io_req_complete_post(req);
+ io_req_complete_post(req, issue_flags);
} else if (ret != IOU_ISSUE_SKIP_COMPLETE)
return ret;
/* If the op doesn't have a file, we're not polling for it */
- if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
+ if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
io_iopoll_req_issued(req, issue_flags);
return 0;
@@ -1762,9 +1859,8 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
int io_poll_issue(struct io_kiocb *req, bool *locked)
{
io_tw_lock(req->ctx, locked);
- if (unlikely(req->task->flags & PF_EXITING))
- return -EFAULT;
- return io_issue_sqe(req, IO_URING_F_NONBLOCK);
+ return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
+ IO_URING_F_COMPLETE_DEFER);
}
struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
@@ -1779,11 +1875,11 @@ void io_wq_submit_work(struct io_wq_work *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
const struct io_op_def *def = &io_op_defs[req->opcode];
- unsigned int issue_flags = IO_URING_F_UNLOCKED;
+ unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
bool needs_poll = false;
int ret = 0, err = -ECANCELED;
- /* one will be dropped by ->io_free_work() after returning to io-wq */
+ /* one will be dropped by ->io_wq_free_work() after returning to io-wq */
if (!(req->flags & REQ_F_REFCOUNT))
__io_req_set_refcount(req, 2);
else
@@ -1881,7 +1977,7 @@ static void io_queue_async(struct io_kiocb *req, int ret)
struct io_kiocb *linked_timeout;
if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
- io_req_complete_failed(req, ret);
+ io_req_defer_failed(req, ret);
return;
}
@@ -1931,14 +2027,14 @@ static void io_queue_sqe_fallback(struct io_kiocb *req)
*/
req->flags &= ~REQ_F_HARDLINK;
req->flags |= REQ_F_LINK;
- io_req_complete_failed(req, req->cqe.res);
+ io_req_defer_failed(req, req->cqe.res);
} else if (unlikely(req->ctx->drain_active)) {
io_drain_req(req);
} else {
int ret = io_req_prep_async(req);
if (unlikely(ret))
- io_req_complete_failed(req, ret);
+ io_req_defer_failed(req, ret);
else
io_queue_iowq(req, NULL);
}
@@ -2316,7 +2412,7 @@ static inline bool io_has_work(struct io_ring_ctx *ctx)
static inline bool io_should_wake(struct io_wait_queue *iowq)
{
struct io_ring_ctx *ctx = iowq->ctx;
- int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
+ int dist = READ_ONCE(ctx->rings->cq.tail) - (int) iowq->cq_tail;
/*
* Wake up if we have enough events, or if a timeout occurred since we
@@ -2400,7 +2496,8 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret;
io_cqring_overflow_flush(ctx);
- if (io_cqring_events(ctx) >= min_events)
+ /* if user messes with these they will just get an early return */
+ if (__io_cqring_events_user(ctx) >= min_events)
return 0;
} while (ret > 0);
@@ -2434,11 +2531,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
trace_io_uring_cqring_wait(ctx, min_events);
do {
- /* if we can't even flush overflow, don't wait for more */
- if (!io_cqring_overflow_flush(ctx)) {
- ret = -EBUSY;
- break;
- }
+ io_cqring_overflow_flush(ctx);
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
TASK_INTERRUPTIBLE);
ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
@@ -2589,8 +2682,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
__io_sqe_buffers_unregister(ctx);
if (ctx->file_data)
__io_sqe_files_unregister(ctx);
- if (ctx->rings)
- __io_cqring_overflow_flush(ctx, true);
+ io_cqring_overflow_kill(ctx);
io_eventfd_unregister(ctx);
io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
@@ -2665,7 +2757,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
* lock(&ep->mtx);
*
* Users may get EPOLLIN meanwhile seeing nothing in cqring, this
- * pushs them to do the flush.
+ * pushes them to do the flush.
*/
if (io_cqring_events(ctx) || io_has_work(ctx))
@@ -2702,8 +2794,10 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb)
/*
* When @in_idle, we're in cancellation and it's racy to remove the
* node. It'll be removed by the end of cancellation, just ignore it.
+ * tctx can be NULL if the queueing of this task_work raced with
+ * work cancelation off the exec path.
*/
- if (!atomic_read(&tctx->in_idle))
+ if (tctx && !atomic_read(&tctx->in_idle))
io_uring_del_tctx_node((unsigned long)work->ctx);
complete(&work->completion);
}
@@ -2731,6 +2825,12 @@ static __cold void io_ring_exit_work(struct work_struct *work)
* as nobody else will be looking for them.
*/
do {
+ if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
+ mutex_lock(&ctx->uring_lock);
+ io_cqring_overflow_kill(ctx);
+ mutex_unlock(&ctx->uring_lock);
+ }
+
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
io_move_task_work_from_local(ctx);
@@ -2796,8 +2896,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
mutex_lock(&ctx->uring_lock);
percpu_ref_kill(&ctx->refs);
- if (ctx->rings)
- __io_cqring_overflow_flush(ctx, true);
xa_for_each(&ctx->personalities, index, creds)
io_unregister_personality(ctx, index);
if (ctx->rings)
@@ -2864,7 +2962,7 @@ static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
while (!list_empty(&list)) {
de = list_first_entry(&list, struct io_defer_entry, list);
list_del_init(&de->list);
- io_req_complete_failed(de->req, -ECANCELED);
+ io_req_task_queue_fail(de->req, -ECANCELED);
kfree(de);
}
return true;
@@ -3439,6 +3537,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
if (!ctx)
return -ENOMEM;
+ if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+ !(ctx->flags & IORING_SETUP_IOPOLL) &&
+ !(ctx->flags & IORING_SETUP_SQPOLL))
+ ctx->task_complete = true;
+
/*
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
* space applications don't need to do io completion events
@@ -4057,8 +4160,6 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
ctx = f.file->private_data;
- io_run_task_work_ctx(ctx);
-
mutex_lock(&ctx->uring_lock);
ret = __io_uring_register(ctx, opcode, arg, nr_args);
mutex_unlock(&ctx->uring_lock);
diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h
index ef77d2aa3172..1b2f0b2cc888 100644
--- a/io_uring/io_uring.h
+++ b/io_uring/io_uring.h
@@ -4,6 +4,7 @@
#include <linux/errno.h>
#include <linux/lockdep.h>
#include <linux/io_uring_types.h>
+#include <uapi/linux/eventpoll.h>
#include "io-wq.h"
#include "slist.h"
#include "filetable.h"
@@ -17,8 +18,8 @@ enum {
IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED,
/*
- * Intended only when both REQ_F_POLLED and REQ_F_APOLL_MULTISHOT
- * are set to indicate to the poll runner that multishot should be
+ * Intended only when both IO_URING_F_MULTISHOT is passed
+ * to indicate to the poll runner that multishot should be
* removed and the result is set on req->cqe.res.
*/
IOU_STOP_MULTISHOT = -ECANCELED,
@@ -27,16 +28,13 @@ enum {
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow);
bool io_req_cqe_overflow(struct io_kiocb *req);
int io_run_task_work_sig(struct io_ring_ctx *ctx);
-int __io_run_local_work(struct io_ring_ctx *ctx, bool locked);
+int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked);
int io_run_local_work(struct io_ring_ctx *ctx);
-void io_req_complete_failed(struct io_kiocb *req, s32 res);
-void __io_req_complete(struct io_kiocb *req, unsigned issue_flags);
-void io_req_complete_post(struct io_kiocb *req);
-void __io_req_complete_post(struct io_kiocb *req);
-bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
- bool allow_overflow);
-bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
- bool allow_overflow);
+void io_req_defer_failed(struct io_kiocb *req, s32 res);
+void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags);
+bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
+bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags,
+ bool allow_overflow);
void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages);
@@ -50,10 +48,9 @@ static inline bool io_req_ffs_set(struct io_kiocb *req)
return req->flags & REQ_F_FIXED_FILE;
}
+void __io_req_task_work_add(struct io_kiocb *req, bool allow_local);
bool io_is_uring_fops(struct file *file);
bool io_alloc_async_data(struct io_kiocb *req);
-void io_req_task_work_add(struct io_kiocb *req);
-void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags);
void io_req_task_queue(struct io_kiocb *req);
void io_queue_iowq(struct io_kiocb *req, bool *dont_use);
void io_req_task_complete(struct io_kiocb *req, bool *locked);
@@ -82,6 +79,11 @@ bool __io_alloc_req_refill(struct io_ring_ctx *ctx);
bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
bool cancel_all);
+static inline void io_req_task_work_add(struct io_kiocb *req)
+{
+ __io_req_task_work_add(req, true);
+}
+
#define io_for_each_link(pos, head) \
for (pos = (head); pos; pos = pos->link)
@@ -91,6 +93,11 @@ static inline void io_cq_lock(struct io_ring_ctx *ctx)
spin_lock(&ctx->completion_lock);
}
+static inline void io_cq_unlock(struct io_ring_ctx *ctx)
+{
+ spin_unlock(&ctx->completion_lock);
+}
+
void io_cq_unlock_post(struct io_ring_ctx *ctx);
static inline struct io_uring_cqe *io_get_cqe_overflow(struct io_ring_ctx *ctx,
@@ -126,7 +133,7 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
*/
cqe = io_get_cqe(ctx);
if (unlikely(!cqe))
- return io_req_cqe_overflow(req);
+ return false;
trace_io_uring_complete(req->ctx, req, req->cqe.user_data,
req->cqe.res, req->cqe.flags,
@@ -149,6 +156,14 @@ static inline bool __io_fill_cqe_req(struct io_ring_ctx *ctx,
return true;
}
+static inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
+ struct io_kiocb *req)
+{
+ if (likely(__io_fill_cqe_req(ctx, req)))
+ return true;
+ return io_req_cqe_overflow(req);
+}
+
static inline void req_set_fail(struct io_kiocb *req)
{
req->flags |= REQ_F_FAIL;
@@ -207,12 +222,18 @@ static inline void io_commit_cqring(struct io_ring_ctx *ctx)
static inline void __io_cqring_wake(struct io_ring_ctx *ctx)
{
/*
- * wake_up_all() may seem excessive, but io_wake_function() and
- * io_should_wake() handle the termination of the loop and only
- * wake as many waiters as we need to.
+ * Trigger waitqueue handler on all waiters on our waitqueue. This
+ * won't necessarily wake up all the tasks, io_should_wake() will make
+ * that decision.
+ *
+ * Pass in EPOLLIN|EPOLL_URING_WAKE as the poll wakeup key. The latter
+ * set in the mask so that if we recurse back into our own poll
+ * waitqueue handlers, we know we have a dependency between eventfd or
+ * epoll and should terminate multishot poll at that point.
*/
if (waitqueue_active(&ctx->cq_wait))
- wake_up_all(&ctx->cq_wait);
+ __wake_up(&ctx->cq_wait, TASK_NORMAL, 0,
+ poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
}
static inline void io_cqring_wake(struct io_ring_ctx *ctx)
@@ -238,9 +259,14 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
static inline int io_run_task_work(void)
{
+ /*
+ * Always check-and-clear the task_work notification signal. With how
+ * signaling works for task_work, we can find it set with nothing to
+ * run. We need to clear it for that case, like get_signal() does.
+ */
+ if (test_thread_flag(TIF_NOTIFY_SIGNAL))
+ clear_notify_signal();
if (task_work_pending(current)) {
- if (test_thread_flag(TIF_NOTIFY_SIGNAL))
- clear_notify_signal();
__set_current_state(TASK_RUNNING);
task_work_run();
return 1;
@@ -277,9 +303,18 @@ static inline int io_run_task_work_ctx(struct io_ring_ctx *ctx)
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
{
+ bool locked;
+ int ret;
+
if (llist_empty(&ctx->work_llist))
return 0;
- return __io_run_local_work(ctx, true);
+
+ locked = true;
+ ret = __io_run_local_work(ctx, &locked);
+ /* shouldn't happen! */
+ if (WARN_ON_ONCE(!locked))
+ mutex_lock(&ctx->uring_lock);
+ return ret;
}
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
@@ -355,4 +390,11 @@ static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx)
ctx->submitter_task == current);
}
+static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res)
+{
+ io_req_set_res(req, res, 0);
+ req->io_task_work.func = io_req_task_complete;
+ io_req_task_work_add(req);
+}
+
#endif
diff --git a/io_uring/kbuf.c b/io_uring/kbuf.c
index 25cd724ade18..4a6401080c1f 100644
--- a/io_uring/kbuf.c
+++ b/io_uring/kbuf.c
@@ -306,14 +306,11 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
if (!bl->buf_nr_pages)
ret = __io_remove_buffers(ctx, bl, p->nbufs);
}
+ io_ring_submit_unlock(ctx, issue_flags);
if (ret < 0)
req_set_fail(req);
-
- /* complete before unlock, IOPOLL may need the lock */
io_req_set_res(req, ret, 0);
- __io_req_complete(req, issue_flags);
- io_ring_submit_unlock(ctx, issue_flags);
- return IOU_ISSUE_SKIP_COMPLETE;
+ return IOU_OK;
}
int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
@@ -346,6 +343,8 @@ int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe
tmp = READ_ONCE(sqe->off);
if (tmp > USHRT_MAX)
return -E2BIG;
+ if (tmp + p->nbufs >= USHRT_MAX)
+ return -EINVAL;
p->bid = tmp;
return 0;
}
@@ -456,13 +455,12 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
ret = io_add_buffers(ctx, p, bl);
err:
+ io_ring_submit_unlock(ctx, issue_flags);
+
if (ret < 0)
req_set_fail(req);
- /* complete before unlock, IOPOLL may need the lock */
io_req_set_res(req, ret, 0);
- __io_req_complete(req, issue_flags);
- io_ring_submit_unlock(ctx, issue_flags);
- return IOU_ISSUE_SKIP_COMPLETE;
+ return IOU_OK;
}
int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
diff --git a/io_uring/msg_ring.c b/io_uring/msg_ring.c
index 90d2fc6fd80e..2d3cd945a531 100644
--- a/io_uring/msg_ring.c
+++ b/io_uring/msg_ring.c
@@ -15,6 +15,8 @@
struct io_msg {
struct file *file;
+ struct file *src_file;
+ struct callback_head tw;
u64 user_data;
u32 len;
u32 cmd;
@@ -23,6 +25,34 @@ struct io_msg {
u32 flags;
};
+void io_msg_ring_cleanup(struct io_kiocb *req)
+{
+ struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+
+ if (WARN_ON_ONCE(!msg->src_file))
+ return;
+
+ fput(msg->src_file);
+ msg->src_file = NULL;
+}
+
+static void io_msg_tw_complete(struct callback_head *head)
+{
+ struct io_msg *msg = container_of(head, struct io_msg, tw);
+ struct io_kiocb *req = cmd_to_io_kiocb(msg);
+ struct io_ring_ctx *target_ctx = req->file->private_data;
+ int ret = 0;
+
+ if (current->flags & PF_EXITING)
+ ret = -EOWNERDEAD;
+ else if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
+ ret = -EOVERFLOW;
+
+ if (ret < 0)
+ req_set_fail(req);
+ io_req_queue_tw_complete(req, ret);
+}
+
static int io_msg_ring_data(struct io_kiocb *req)
{
struct io_ring_ctx *target_ctx = req->file->private_data;
@@ -31,23 +61,29 @@ static int io_msg_ring_data(struct io_kiocb *req)
if (msg->src_fd || msg->dst_fd || msg->flags)
return -EINVAL;
- if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0, true))
+ if (target_ctx->task_complete && current != target_ctx->submitter_task) {
+ init_task_work(&msg->tw, io_msg_tw_complete);
+ if (task_work_add(target_ctx->submitter_task, &msg->tw,
+ TWA_SIGNAL_NO_IPI))
+ return -EOWNERDEAD;
+
+ atomic_or(IORING_SQ_TASKRUN, &target_ctx->rings->sq_flags);
+ return IOU_ISSUE_SKIP_COMPLETE;
+ }
+
+ if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
return 0;
return -EOVERFLOW;
}
-static void io_double_unlock_ctx(struct io_ring_ctx *ctx,
- struct io_ring_ctx *octx,
+static void io_double_unlock_ctx(struct io_ring_ctx *octx,
unsigned int issue_flags)
{
- if (issue_flags & IO_URING_F_UNLOCKED)
- mutex_unlock(&ctx->uring_lock);
mutex_unlock(&octx->uring_lock);
}
-static int io_double_lock_ctx(struct io_ring_ctx *ctx,
- struct io_ring_ctx *octx,
+static int io_double_lock_ctx(struct io_ring_ctx *octx,
unsigned int issue_flags)
{
/*
@@ -60,69 +96,103 @@ static int io_double_lock_ctx(struct io_ring_ctx *ctx,
return -EAGAIN;
return 0;
}
-
- /* Always grab smallest value ctx first. We know ctx != octx. */
- if (ctx < octx) {
- mutex_lock(&ctx->uring_lock);
- mutex_lock(&octx->uring_lock);
- } else {
- mutex_lock(&octx->uring_lock);
- mutex_lock(&ctx->uring_lock);
- }
-
+ mutex_lock(&octx->uring_lock);
return 0;
}
-static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
+static struct file *io_msg_grab_file(struct io_kiocb *req, unsigned int issue_flags)
{
- struct io_ring_ctx *target_ctx = req->file->private_data;
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
struct io_ring_ctx *ctx = req->ctx;
+ struct file *file = NULL;
unsigned long file_ptr;
- struct file *src_file;
- int ret;
-
- if (target_ctx == ctx)
- return -EINVAL;
-
- ret = io_double_lock_ctx(ctx, target_ctx, issue_flags);
- if (unlikely(ret))
- return ret;
-
- ret = -EBADF;
- if (unlikely(msg->src_fd >= ctx->nr_user_files))
- goto out_unlock;
+ int idx = msg->src_fd;
+
+ io_ring_submit_lock(ctx, issue_flags);
+ if (likely(idx < ctx->nr_user_files)) {
+ idx = array_index_nospec(idx, ctx->nr_user_files);
+ file_ptr = io_fixed_file_slot(&ctx->file_table, idx)->file_ptr;
+ file = (struct file *) (file_ptr & FFS_MASK);
+ if (file)
+ get_file(file);
+ }
+ io_ring_submit_unlock(ctx, issue_flags);
+ return file;
+}
- msg->src_fd = array_index_nospec(msg->src_fd, ctx->nr_user_files);
- file_ptr = io_fixed_file_slot(&ctx->file_table, msg->src_fd)->file_ptr;
- if (!file_ptr)
- goto out_unlock;
+static int io_msg_install_complete(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_ring_ctx *target_ctx = req->file->private_data;
+ struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+ struct file *src_file = msg->src_file;
+ int ret;
- src_file = (struct file *) (file_ptr & FFS_MASK);
- get_file(src_file);
+ if (unlikely(io_double_lock_ctx(target_ctx, issue_flags)))
+ return -EAGAIN;
ret = __io_fixed_fd_install(target_ctx, src_file, msg->dst_fd);
- if (ret < 0) {
- fput(src_file);
+ if (ret < 0)
goto out_unlock;
- }
+
+ msg->src_file = NULL;
+ req->flags &= ~REQ_F_NEED_CLEANUP;
if (msg->flags & IORING_MSG_RING_CQE_SKIP)
goto out_unlock;
-
/*
* If this fails, the target still received the file descriptor but
* wasn't notified of the fact. This means that if this request
* completes with -EOVERFLOW, then the sender must ensure that a
* later IORING_OP_MSG_RING delivers the message.
*/
- if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0, true))
+ if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
ret = -EOVERFLOW;
out_unlock:
- io_double_unlock_ctx(ctx, target_ctx, issue_flags);
+ io_double_unlock_ctx(target_ctx, issue_flags);
return ret;
}
+static void io_msg_tw_fd_complete(struct callback_head *head)
+{
+ struct io_msg *msg = container_of(head, struct io_msg, tw);
+ struct io_kiocb *req = cmd_to_io_kiocb(msg);
+ int ret = -EOWNERDEAD;
+
+ if (!(current->flags & PF_EXITING))
+ ret = io_msg_install_complete(req, IO_URING_F_UNLOCKED);
+ if (ret < 0)
+ req_set_fail(req);
+ io_req_queue_tw_complete(req, ret);
+}
+
+static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
+{
+ struct io_ring_ctx *target_ctx = req->file->private_data;
+ struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
+ struct io_ring_ctx *ctx = req->ctx;
+ struct file *src_file = msg->src_file;
+
+ if (target_ctx == ctx)
+ return -EINVAL;
+ if (!src_file) {
+ src_file = io_msg_grab_file(req, issue_flags);
+ if (!src_file)
+ return -EBADF;
+ msg->src_file = src_file;
+ req->flags |= REQ_F_NEED_CLEANUP;
+ }
+
+ if (target_ctx->task_complete && current != target_ctx->submitter_task) {
+ init_task_work(&msg->tw, io_msg_tw_fd_complete);
+ if (task_work_add(target_ctx->submitter_task, &msg->tw,
+ TWA_SIGNAL))
+ return -EOWNERDEAD;
+
+ return IOU_ISSUE_SKIP_COMPLETE;
+ }
+ return io_msg_install_complete(req, issue_flags);
+}
+
int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
@@ -130,6 +200,7 @@ int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (unlikely(sqe->buf_index || sqe->personality))
return -EINVAL;
+ msg->src_file = NULL;
msg->user_data = READ_ONCE(sqe->off);
msg->len = READ_ONCE(sqe->len);
msg->cmd = READ_ONCE(sqe->addr);
@@ -164,12 +235,11 @@ int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags)
}
done:
- if (ret < 0)
+ if (ret < 0) {
+ if (ret == -EAGAIN || ret == IOU_ISSUE_SKIP_COMPLETE)
+ return ret;
req_set_fail(req);
+ }
io_req_set_res(req, ret, 0);
- /* put file to avoid an attempt to IOPOLL the req */
- if (!(req->flags & REQ_F_FIXED_FILE))
- io_put_file(req->file);
- req->file = NULL;
return IOU_OK;
}
diff --git a/io_uring/msg_ring.h b/io_uring/msg_ring.h
index fb9601f202d0..3987ee6c0e5f 100644
--- a/io_uring/msg_ring.h
+++ b/io_uring/msg_ring.h
@@ -2,3 +2,4 @@
int io_msg_ring_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
int io_msg_ring(struct io_kiocb *req, unsigned int issue_flags);
+void io_msg_ring_cleanup(struct io_kiocb *req);
diff --git a/io_uring/net.c b/io_uring/net.c
index 15dea91625e2..5229976cb582 100644
--- a/io_uring/net.c
+++ b/io_uring/net.c
@@ -67,7 +67,18 @@ struct io_sr_msg {
struct io_kiocb *notif;
};
-#define IO_APOLL_MULTI_POLLED (REQ_F_APOLL_MULTISHOT | REQ_F_POLLED)
+static inline bool io_check_multishot(struct io_kiocb *req,
+ unsigned int issue_flags)
+{
+ /*
+ * When ->locked_cq is set we only allow to post CQEs from the original
+ * task context. Usual request completions will be handled in other
+ * generic paths but multipoll may decide to post extra cqes.
+ */
+ return !(issue_flags & IO_URING_F_IOWQ) ||
+ !(issue_flags & IO_URING_F_MULTISHOT) ||
+ !req->ctx->task_complete;
+}
int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
@@ -127,13 +138,15 @@ static struct io_async_msghdr *io_msg_alloc_async(struct io_kiocb *req,
struct io_cache_entry *entry;
struct io_async_msghdr *hdr;
- if (!(issue_flags & IO_URING_F_UNLOCKED) &&
- (entry = io_alloc_cache_get(&ctx->netmsg_cache)) != NULL) {
- hdr = container_of(entry, struct io_async_msghdr, cache);
- hdr->free_iov = NULL;
- req->flags |= REQ_F_ASYNC_DATA;
- req->async_data = hdr;
- return hdr;
+ if (!(issue_flags & IO_URING_F_UNLOCKED)) {
+ entry = io_alloc_cache_get(&ctx->netmsg_cache);
+ if (entry) {
+ hdr = container_of(entry, struct io_async_msghdr, cache);
+ hdr->free_iov = NULL;
+ req->flags |= REQ_F_ASYNC_DATA;
+ req->async_data = hdr;
+ return hdr;
+ }
}
if (!io_alloc_async_data(req)) {
@@ -365,7 +378,7 @@ int io_send(struct io_kiocb *req, unsigned int issue_flags)
if (unlikely(!sock))
return -ENOTSOCK;
- ret = import_single_range(WRITE, sr->buf, sr->len, &iov, &msg.msg_iter);
+ ret = import_single_range(ITER_SOURCE, sr->buf, sr->len, &iov, &msg.msg_iter);
if (unlikely(ret))
return ret;
@@ -451,7 +464,7 @@ static int __io_recvmsg_copy_hdr(struct io_kiocb *req,
}
} else {
iomsg->free_iov = iomsg->fast_iov;
- ret = __import_iovec(READ, msg.msg_iov, msg.msg_iovlen, UIO_FASTIOV,
+ ret = __import_iovec(ITER_DEST, msg.msg_iov, msg.msg_iovlen, UIO_FASTIOV,
&iomsg->free_iov, &iomsg->msg.msg_iter,
false);
if (ret > 0)
@@ -503,7 +516,7 @@ static int __io_compat_recvmsg_copy_hdr(struct io_kiocb *req,
}
} else {
iomsg->free_iov = iomsg->fast_iov;
- ret = __import_iovec(READ, (struct iovec __user *)uiov, msg.msg_iovlen,
+ ret = __import_iovec(ITER_DEST, (struct iovec __user *)uiov, msg.msg_iovlen,
UIO_FASTIOV, &iomsg->free_iov,
&iomsg->msg.msg_iter, true);
if (ret < 0)
@@ -591,7 +604,8 @@ static inline void io_recv_prep_retry(struct io_kiocb *req)
* again (for multishot).
*/
static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
- unsigned int cflags, bool mshot_finished)
+ unsigned int cflags, bool mshot_finished,
+ unsigned issue_flags)
{
if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
io_req_set_res(req, *ret, cflags);
@@ -600,21 +614,17 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
}
if (!mshot_finished) {
- if (io_post_aux_cqe(req->ctx, req->cqe.user_data, *ret,
- cflags | IORING_CQE_F_MORE, false)) {
+ if (io_aux_cqe(req->ctx, issue_flags & IO_URING_F_COMPLETE_DEFER,
+ req->cqe.user_data, *ret, cflags | IORING_CQE_F_MORE, true)) {
io_recv_prep_retry(req);
return false;
}
- /*
- * Otherwise stop multishot but use the current result.
- * Probably will end up going into overflow, but this means
- * we cannot trust the ordering anymore
- */
+ /* Otherwise stop multishot but use the current result. */
}
io_req_set_res(req, *ret, cflags);
- if (req->flags & REQ_F_POLLED)
+ if (issue_flags & IO_URING_F_MULTISHOT)
*ret = IOU_STOP_MULTISHOT;
else
*ret = IOU_OK;
@@ -733,6 +743,9 @@ int io_recvmsg(struct io_kiocb *req, unsigned int issue_flags)
(sr->flags & IORING_RECVSEND_POLL_FIRST))
return io_setup_async_msg(req, kmsg, issue_flags);
+ if (!io_check_multishot(req, issue_flags))
+ return io_setup_async_msg(req, kmsg, issue_flags);
+
retry_multishot:
if (io_do_buffer_select(req)) {
void __user *buf;
@@ -752,7 +765,7 @@ retry_multishot:
kmsg->fast_iov[0].iov_base = buf;
kmsg->fast_iov[0].iov_len = len;
- iov_iter_init(&kmsg->msg.msg_iter, READ, kmsg->fast_iov, 1,
+ iov_iter_init(&kmsg->msg.msg_iter, ITER_DEST, kmsg->fast_iov, 1,
len);
}
@@ -773,8 +786,7 @@ retry_multishot:
if (ret < min_ret) {
if (ret == -EAGAIN && force_nonblock) {
ret = io_setup_async_msg(req, kmsg, issue_flags);
- if (ret == -EAGAIN && (req->flags & IO_APOLL_MULTI_POLLED) ==
- IO_APOLL_MULTI_POLLED) {
+ if (ret == -EAGAIN && (issue_flags & IO_URING_F_MULTISHOT)) {
io_kbuf_recycle(req, issue_flags);
return IOU_ISSUE_SKIP_COMPLETE;
}
@@ -803,7 +815,7 @@ retry_multishot:
if (kmsg->msg.msg_inq)
cflags |= IORING_CQE_F_SOCK_NONEMPTY;
- if (!io_recv_finish(req, &ret, cflags, mshot_finished))
+ if (!io_recv_finish(req, &ret, cflags, mshot_finished, issue_flags))
goto retry_multishot;
if (mshot_finished) {
@@ -833,6 +845,9 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
(sr->flags & IORING_RECVSEND_POLL_FIRST))
return -EAGAIN;
+ if (!io_check_multishot(req, issue_flags))
+ return -EAGAIN;
+
sock = sock_from_file(req->file);
if (unlikely(!sock))
return -ENOTSOCK;
@@ -847,7 +862,7 @@ retry_multishot:
sr->buf = buf;
}
- ret = import_single_range(READ, sr->buf, len, &iov, &msg.msg_iter);
+ ret = import_single_range(ITER_DEST, sr->buf, len, &iov, &msg.msg_iter);
if (unlikely(ret))
goto out_free;
@@ -869,7 +884,7 @@ retry_multishot:
ret = sock_recvmsg(sock, &msg, flags);
if (ret < min_ret) {
if (ret == -EAGAIN && force_nonblock) {
- if ((req->flags & IO_APOLL_MULTI_POLLED) == IO_APOLL_MULTI_POLLED) {
+ if (issue_flags & IO_URING_F_MULTISHOT) {
io_kbuf_recycle(req, issue_flags);
return IOU_ISSUE_SKIP_COMPLETE;
}
@@ -902,7 +917,7 @@ out_free:
if (msg.msg_inq)
cflags |= IORING_CQE_F_SOCK_NONEMPTY;
- if (!io_recv_finish(req, &ret, cflags, ret <= 0))
+ if (!io_recv_finish(req, &ret, cflags, ret <= 0, issue_flags))
goto retry_multishot;
return ret;
@@ -925,6 +940,9 @@ void io_send_zc_cleanup(struct io_kiocb *req)
}
}
+#define IO_ZC_FLAGS_COMMON (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_FIXED_BUF)
+#define IO_ZC_FLAGS_VALID (IO_ZC_FLAGS_COMMON | IORING_SEND_ZC_REPORT_USAGE)
+
int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{
struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);
@@ -937,10 +955,6 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (req->flags & REQ_F_CQE_SKIP)
return -EINVAL;
- zc->flags = READ_ONCE(sqe->ioprio);
- if (zc->flags & ~(IORING_RECVSEND_POLL_FIRST |
- IORING_RECVSEND_FIXED_BUF))
- return -EINVAL;
notif = zc->notif = io_alloc_notif(ctx);
if (!notif)
return -ENOMEM;
@@ -948,6 +962,17 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
notif->cqe.res = 0;
notif->cqe.flags = IORING_CQE_F_NOTIF;
req->flags |= REQ_F_NEED_CLEANUP;
+
+ zc->flags = READ_ONCE(sqe->ioprio);
+ if (unlikely(zc->flags & ~IO_ZC_FLAGS_COMMON)) {
+ if (zc->flags & ~IO_ZC_FLAGS_VALID)
+ return -EINVAL;
+ if (zc->flags & IORING_SEND_ZC_REPORT_USAGE) {
+ io_notif_set_extended(notif);
+ io_notif_to_data(notif)->zc_report = true;
+ }
+ }
+
if (zc->flags & IORING_RECVSEND_FIXED_BUF) {
unsigned idx = READ_ONCE(sqe->buf_index);
@@ -1083,13 +1108,14 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags)
return io_setup_async_addr(req, &__address, issue_flags);
if (zc->flags & IORING_RECVSEND_FIXED_BUF) {
- ret = io_import_fixed(WRITE, &msg.msg_iter, req->imu,
+ ret = io_import_fixed(ITER_SOURCE, &msg.msg_iter, req->imu,
(u64)(uintptr_t)zc->buf, zc->len);
if (unlikely(ret))
return ret;
msg.sg_from_iter = io_sg_from_iter;
} else {
- ret = import_single_range(WRITE, zc->buf, zc->len, &iov,
+ io_notif_set_extended(zc->notif);
+ ret = import_single_range(ITER_SOURCE, zc->buf, zc->len, &iov,
&msg.msg_iter);
if (unlikely(ret))
return ret;
@@ -1150,6 +1176,8 @@ int io_sendmsg_zc(struct io_kiocb *req, unsigned int issue_flags)
unsigned flags;
int ret, min_ret = 0;
+ io_notif_set_extended(sr->notif);
+
sock = sock_from_file(req->file);
if (unlikely(!sock))
return -ENOTSOCK;
@@ -1271,6 +1299,8 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags)
struct file *file;
int ret, fd;
+ if (!io_check_multishot(req, issue_flags))
+ return -EAGAIN;
retry:
if (!fixed) {
fd = __get_unused_fd_flags(accept->flags, accept->nofile);
@@ -1289,8 +1319,7 @@ retry:
* return EAGAIN to arm the poll infra since it
* has already been done
*/
- if ((req->flags & IO_APOLL_MULTI_POLLED) ==
- IO_APOLL_MULTI_POLLED)
+ if (issue_flags & IO_URING_F_MULTISHOT)
ret = IOU_ISSUE_SKIP_COMPLETE;
return ret;
}
@@ -1310,14 +1339,13 @@ retry:
return IOU_OK;
}
- if (ret >= 0 &&
- io_post_aux_cqe(ctx, req->cqe.user_data, ret, IORING_CQE_F_MORE, false))
+ if (ret < 0)
+ return ret;
+ if (io_aux_cqe(ctx, issue_flags & IO_URING_F_COMPLETE_DEFER,
+ req->cqe.user_data, ret, IORING_CQE_F_MORE, true))
goto retry;
- io_req_set_res(req, ret, 0);
- if (req->flags & REQ_F_POLLED)
- return IOU_STOP_MULTISHOT;
- return IOU_OK;
+ return -ECANCELED;
}
int io_socket_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
diff --git a/io_uring/notif.c b/io_uring/notif.c
index e37c6569d82e..c4bb793ebf0e 100644
--- a/io_uring/notif.c
+++ b/io_uring/notif.c
@@ -9,11 +9,14 @@
#include "notif.h"
#include "rsrc.h"
-static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked)
+static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked)
{
struct io_notif_data *nd = io_notif_to_data(notif);
struct io_ring_ctx *ctx = notif->ctx;
+ if (nd->zc_report && (nd->zc_copied || !nd->zc_used))
+ notif->cqe.res |= IORING_NOTIF_USAGE_ZC_COPIED;
+
if (nd->account_pages && ctx->user) {
__io_unaccount_mem(ctx->user, nd->account_pages);
nd->account_pages = 0;
@@ -21,16 +24,41 @@ static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked)
io_req_task_complete(notif, locked);
}
-static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
- struct ubuf_info *uarg,
- bool success)
+static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
+ bool success)
{
struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg);
struct io_kiocb *notif = cmd_to_io_kiocb(nd);
- if (refcount_dec_and_test(&uarg->refcnt)) {
- notif->io_task_work.func = __io_notif_complete_tw;
+ if (refcount_dec_and_test(&uarg->refcnt))
io_req_task_work_add(notif);
+}
+
+static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
+ bool success)
+{
+ struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg);
+
+ if (nd->zc_report) {
+ if (success && !nd->zc_used && skb)
+ WRITE_ONCE(nd->zc_used, true);
+ else if (!success && !nd->zc_copied)
+ WRITE_ONCE(nd->zc_copied, true);
+ }
+ io_tx_ubuf_callback(skb, uarg, success);
+}
+
+void io_notif_set_extended(struct io_kiocb *notif)
+{
+ struct io_notif_data *nd = io_notif_to_data(notif);
+
+ if (nd->uarg.callback != io_tx_ubuf_callback_ext) {
+ nd->account_pages = 0;
+ nd->zc_report = false;
+ nd->zc_used = false;
+ nd->zc_copied = false;
+ nd->uarg.callback = io_tx_ubuf_callback_ext;
+ notif->io_task_work.func = io_notif_complete_tw_ext;
}
}
@@ -49,24 +77,11 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx)
notif->task = current;
io_get_task_refs(1);
notif->rsrc_node = NULL;
- io_req_set_rsrc_node(notif, ctx, 0);
+ notif->io_task_work.func = io_req_task_complete;
nd = io_notif_to_data(notif);
- nd->account_pages = 0;
nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
- nd->uarg.callback = io_uring_tx_zerocopy_callback;
+ nd->uarg.callback = io_tx_ubuf_callback;
refcount_set(&nd->uarg.refcnt, 1);
return notif;
}
-
-void io_notif_flush(struct io_kiocb *notif)
- __must_hold(&slot->notif->ctx->uring_lock)
-{
- struct io_notif_data *nd = io_notif_to_data(notif);
-
- /* drop slot's master ref */
- if (refcount_dec_and_test(&nd->uarg.refcnt)) {
- notif->io_task_work.func = __io_notif_complete_tw;
- io_req_task_work_add(notif);
- }
-}
diff --git a/io_uring/notif.h b/io_uring/notif.h
index 5b4d710c8ca5..c88c800cd89d 100644
--- a/io_uring/notif.h
+++ b/io_uring/notif.h
@@ -13,16 +13,29 @@ struct io_notif_data {
struct file *file;
struct ubuf_info uarg;
unsigned long account_pages;
+ bool zc_report;
+ bool zc_used;
+ bool zc_copied;
};
-void io_notif_flush(struct io_kiocb *notif);
struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx);
+void io_notif_set_extended(struct io_kiocb *notif);
static inline struct io_notif_data *io_notif_to_data(struct io_kiocb *notif)
{
return io_kiocb_to_cmd(notif, struct io_notif_data);
}
+static inline void io_notif_flush(struct io_kiocb *notif)
+ __must_hold(&notif->ctx->uring_lock)
+{
+ struct io_notif_data *nd = io_notif_to_data(notif);
+
+ /* drop slot's master ref */
+ if (refcount_dec_and_test(&nd->uarg.refcnt))
+ io_req_task_work_add(notif);
+}
+
static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
{
struct io_ring_ctx *ctx = notif->ctx;
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index 83dc0f9ad3b2..3aa0d65c50e3 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -63,6 +63,7 @@ const struct io_op_def io_op_defs[] = {
.audit_skip = 1,
.ioprio = 1,
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = sizeof(struct io_async_rw),
.name = "READV",
.prep = io_prep_rw,
@@ -80,6 +81,7 @@ const struct io_op_def io_op_defs[] = {
.audit_skip = 1,
.ioprio = 1,
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = sizeof(struct io_async_rw),
.name = "WRITEV",
.prep = io_prep_rw,
@@ -103,6 +105,7 @@ const struct io_op_def io_op_defs[] = {
.audit_skip = 1,
.ioprio = 1,
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = sizeof(struct io_async_rw),
.name = "READ_FIXED",
.prep = io_prep_rw,
@@ -118,6 +121,7 @@ const struct io_op_def io_op_defs[] = {
.audit_skip = 1,
.ioprio = 1,
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = sizeof(struct io_async_rw),
.name = "WRITE_FIXED",
.prep = io_prep_rw,
@@ -277,6 +281,7 @@ const struct io_op_def io_op_defs[] = {
.audit_skip = 1,
.ioprio = 1,
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = sizeof(struct io_async_rw),
.name = "READ",
.prep = io_prep_rw,
@@ -292,6 +297,7 @@ const struct io_op_def io_op_defs[] = {
.audit_skip = 1,
.ioprio = 1,
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = sizeof(struct io_async_rw),
.name = "WRITE",
.prep = io_prep_rw,
@@ -439,6 +445,7 @@ const struct io_op_def io_op_defs[] = {
.name = "MSG_RING",
.prep = io_msg_ring_prep,
.issue = io_msg_ring,
+ .cleanup = io_msg_ring_cleanup,
},
[IORING_OP_FSETXATTR] = {
.needs_file = 1,
@@ -481,6 +488,7 @@ const struct io_op_def io_op_defs[] = {
.plug = 1,
.name = "URING_CMD",
.iopoll = 1,
+ .iopoll_queue = 1,
.async_size = uring_cmd_pdu_size(1),
.prep = io_uring_cmd_prep,
.issue = io_uring_cmd,
diff --git a/io_uring/opdef.h b/io_uring/opdef.h
index 3efe06d25473..df7e13d9bfba 100644
--- a/io_uring/opdef.h
+++ b/io_uring/opdef.h
@@ -25,6 +25,8 @@ struct io_op_def {
unsigned ioprio : 1;
/* supports iopoll */
unsigned iopoll : 1;
+ /* have to be put into the iopoll list */
+ unsigned iopoll_queue : 1;
/* opcode specific path will handle ->async_data allocation if needed */
unsigned manual_alloc : 1;
/* size of async data needed, if any */
diff --git a/io_uring/poll.c b/io_uring/poll.c
index 0d9f49c575e0..ee7da6150ec4 100644
--- a/io_uring/poll.c
+++ b/io_uring/poll.c
@@ -40,7 +40,14 @@ struct io_poll_table {
};
#define IO_POLL_CANCEL_FLAG BIT(31)
-#define IO_POLL_REF_MASK GENMASK(30, 0)
+#define IO_POLL_RETRY_FLAG BIT(30)
+#define IO_POLL_REF_MASK GENMASK(29, 0)
+
+/*
+ * We usually have 1-2 refs taken, 128 is more than enough and we want to
+ * maximise the margin between this amount and the moment when it overflows.
+ */
+#define IO_POLL_REF_BIAS 128
#define IO_WQE_F_DOUBLE 1
@@ -58,6 +65,21 @@ static inline bool wqe_is_double(struct wait_queue_entry *wqe)
return priv & IO_WQE_F_DOUBLE;
}
+static bool io_poll_get_ownership_slowpath(struct io_kiocb *req)
+{
+ int v;
+
+ /*
+ * poll_refs are already elevated and we don't have much hope for
+ * grabbing the ownership. Instead of incrementing set a retry flag
+ * to notify the loop that there might have been some change.
+ */
+ v = atomic_fetch_or(IO_POLL_RETRY_FLAG, &req->poll_refs);
+ if (v & IO_POLL_REF_MASK)
+ return false;
+ return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
+}
+
/*
* If refs part of ->poll_refs (see IO_POLL_REF_MASK) is 0, it's free. We can
* bump it and acquire ownership. It's disallowed to modify requests while not
@@ -66,6 +88,8 @@ static inline bool wqe_is_double(struct wait_queue_entry *wqe)
*/
static inline bool io_poll_get_ownership(struct io_kiocb *req)
{
+ if (unlikely(atomic_read(&req->poll_refs) >= IO_POLL_REF_BIAS))
+ return io_poll_get_ownership_slowpath(req);
return !(atomic_fetch_inc(&req->poll_refs) & IO_POLL_REF_MASK);
}
@@ -116,6 +140,8 @@ static void io_poll_req_insert_locked(struct io_kiocb *req)
struct io_hash_table *table = &req->ctx->cancel_table_locked;
u32 index = hash_long(req->cqe.user_data, table->hash_bits);
+ lockdep_assert_held(&req->ctx->uring_lock);
+
hlist_add_head(&req->hash_node, &table->hbs[index].list);
}
@@ -211,7 +237,6 @@ enum {
*/
static int io_poll_check_events(struct io_kiocb *req, bool *locked)
{
- struct io_ring_ctx *ctx = req->ctx;
int v, ret;
/* req->task == current here, checking PF_EXITING is safe */
@@ -221,11 +246,31 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
do {
v = atomic_read(&req->poll_refs);
- /* tw handler should be the owner, and so have some references */
- if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
- return IOU_POLL_DONE;
- if (v & IO_POLL_CANCEL_FLAG)
- return -ECANCELED;
+ if (unlikely(v != 1)) {
+ /* tw should be the owner and so have some refs */
+ if (WARN_ON_ONCE(!(v & IO_POLL_REF_MASK)))
+ return IOU_POLL_NO_ACTION;
+ if (v & IO_POLL_CANCEL_FLAG)
+ return -ECANCELED;
+ /*
+ * cqe.res contains only events of the first wake up
+ * and all others are to be lost. Redo vfs_poll() to get
+ * up to date state.
+ */
+ if ((v & IO_POLL_REF_MASK) != 1)
+ req->cqe.res = 0;
+
+ if (v & IO_POLL_RETRY_FLAG) {
+ req->cqe.res = 0;
+ /*
+ * We won't find new events that came in between
+ * vfs_poll and the ref put unless we clear the
+ * flag in advance.
+ */
+ atomic_andnot(IO_POLL_RETRY_FLAG, &req->poll_refs);
+ v &= ~IO_POLL_RETRY_FLAG;
+ }
+ }
/* the mask was stashed in __io_poll_execute */
if (!req->cqe.res) {
@@ -243,8 +288,8 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
- if (!io_post_aux_cqe(ctx, req->cqe.user_data,
- mask, IORING_CQE_F_MORE, false)) {
+ if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data,
+ mask, IORING_CQE_F_MORE, false)) {
io_req_set_res(req, mask, 0);
return IOU_POLL_REMOVE_POLL_USE_RES;
}
@@ -256,11 +301,15 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
return ret;
}
+ /* force the next iteration to vfs_poll() */
+ req->cqe.res = 0;
+
/*
* Release all references, retry if someone tried to restart
* task_work while we were executing it.
*/
- } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs));
+ } while (atomic_sub_return(v & IO_POLL_REF_MASK, &req->poll_refs) &
+ IO_POLL_REF_MASK);
return IOU_POLL_NO_ACTION;
}
@@ -272,54 +321,38 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
ret = io_poll_check_events(req, locked);
if (ret == IOU_POLL_NO_ACTION)
return;
-
- if (ret == IOU_POLL_DONE) {
- struct io_poll *poll = io_kiocb_to_cmd(req, struct io_poll);
- req->cqe.res = mangle_poll(req->cqe.res & poll->events);
- } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
- req->cqe.res = ret;
- req_set_fail(req);
- }
-
io_poll_remove_entries(req);
io_poll_tw_hash_eject(req, locked);
- io_req_set_res(req, req->cqe.res, 0);
- io_req_task_complete(req, locked);
-}
-
-static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
-{
- int ret;
-
- ret = io_poll_check_events(req, locked);
- if (ret == IOU_POLL_NO_ACTION)
- return;
+ if (req->opcode == IORING_OP_POLL_ADD) {
+ if (ret == IOU_POLL_DONE) {
+ struct io_poll *poll;
- io_poll_remove_entries(req);
- io_poll_tw_hash_eject(req, locked);
+ poll = io_kiocb_to_cmd(req, struct io_poll);
+ req->cqe.res = mangle_poll(req->cqe.res & poll->events);
+ } else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
+ req->cqe.res = ret;
+ req_set_fail(req);
+ }
- if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
- io_req_complete_post(req);
- else if (ret == IOU_POLL_DONE)
- io_req_task_submit(req, locked);
- else
- io_req_complete_failed(req, ret);
+ io_req_set_res(req, req->cqe.res, 0);
+ io_req_task_complete(req, locked);
+ } else {
+ io_tw_lock(req->ctx, locked);
+
+ if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
+ io_req_task_complete(req, locked);
+ else if (ret == IOU_POLL_DONE)
+ io_req_task_submit(req, locked);
+ else
+ io_req_defer_failed(req, ret);
+ }
}
static void __io_poll_execute(struct io_kiocb *req, int mask)
{
io_req_set_res(req, mask, 0);
- /*
- * This is useful for poll that is armed on behalf of another
- * request, and where the wakeup path could be on a different
- * CPU. We want to avoid pulling in req->apoll->events for that
- * case.
- */
- if (req->opcode == IORING_OP_POLL_ADD)
- req->io_task_work.func = io_poll_task_func;
- else
- req->io_task_work.func = io_apoll_task_func;
+ req->io_task_work.func = io_poll_task_func;
trace_io_uring_task_add(req, mask);
io_req_task_work_add(req);
@@ -380,6 +413,14 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
return 0;
if (io_poll_get_ownership(req)) {
+ /*
+ * If we trigger a multishot poll off our own wakeup path,
+ * disable multishot as there is a circular dependency between
+ * CQ posting and triggering the event.
+ */
+ if (mask & EPOLL_URING_WAKE)
+ poll->events |= EPOLLONESHOT;
+
/* optional, saves extra locking for removal in tw handler */
if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry);
@@ -394,7 +435,8 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
return 1;
}
-static void io_poll_double_prepare(struct io_kiocb *req)
+/* fails only when polling is already completing by the first entry */
+static bool io_poll_double_prepare(struct io_kiocb *req)
{
struct wait_queue_head *head;
struct io_poll *poll = io_poll_get_single(req);
@@ -403,20 +445,20 @@ static void io_poll_double_prepare(struct io_kiocb *req)
rcu_read_lock();
head = smp_load_acquire(&poll->head);
/*
- * poll arm may not hold ownership and so race with
- * io_poll_wake() by modifying req->flags. There is only one
- * poll entry queued, serialise with it by taking its head lock.
+ * poll arm might not hold ownership and so race for req->flags with
+ * io_poll_wake(). There is only one poll entry queued, serialise with
+ * it by taking its head lock. As we're still arming the tw hanlder
+ * is not going to be run, so there are no races with it.
*/
- if (head)
+ if (head) {
spin_lock_irq(&head->lock);
-
- req->flags |= REQ_F_DOUBLE_POLL;
- if (req->opcode == IORING_OP_POLL_ADD)
- req->flags |= REQ_F_ASYNC_DATA;
-
- if (head)
+ req->flags |= REQ_F_DOUBLE_POLL;
+ if (req->opcode == IORING_OP_POLL_ADD)
+ req->flags |= REQ_F_ASYNC_DATA;
spin_unlock_irq(&head->lock);
+ }
rcu_read_unlock();
+ return !!head;
}
static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
@@ -454,7 +496,11 @@ static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
/* mark as double wq entry */
wqe_private |= IO_WQE_F_DOUBLE;
io_init_poll_iocb(poll, first->events, first->wait.func);
- io_poll_double_prepare(req);
+ if (!io_poll_double_prepare(req)) {
+ /* the request is completing, just back off */
+ kfree(poll);
+ return;
+ }
*poll_ptr = poll;
} else {
/* fine to modify, there is no poll queued to race with us */
@@ -499,7 +545,6 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
unsigned issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
- int v;
INIT_HLIST_NODE(&req->hash_node);
req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
@@ -567,11 +612,10 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
if (ipt->owning) {
/*
- * Release ownership. If someone tried to queue a tw while it was
- * locked, kick it off for them.
+ * Try to release ownership. If we see a change of state, e.g.
+ * poll was waken up, queue up a tw, it'll deal with it.
*/
- v = atomic_dec_return(&req->poll_refs);
- if (unlikely(v & IO_POLL_REF_MASK))
+ if (atomic_cmpxchg(&req->poll_refs, 1, 0) != 1)
__io_poll_execute(req, 0);
}
return 0;
@@ -596,10 +640,13 @@ static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req,
if (req->flags & REQ_F_POLLED) {
apoll = req->apoll;
kfree(apoll->double_poll);
- } else if (!(issue_flags & IO_URING_F_UNLOCKED) &&
- (entry = io_alloc_cache_get(&ctx->apoll_cache)) != NULL) {
+ } else if (!(issue_flags & IO_URING_F_UNLOCKED)) {
+ entry = io_alloc_cache_get(&ctx->apoll_cache);
+ if (entry == NULL)
+ goto alloc_apoll;
apoll = container_of(entry, struct async_poll, cache);
} else {
+alloc_apoll:
apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
if (unlikely(!apoll))
return NULL;
diff --git a/io_uring/rsrc.c b/io_uring/rsrc.c
index 55d4ab96fb92..18de10c68a15 100644
--- a/io_uring/rsrc.c
+++ b/io_uring/rsrc.c
@@ -170,10 +170,10 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
if (prsrc->tag) {
if (ctx->flags & IORING_SETUP_IOPOLL) {
mutex_lock(&ctx->uring_lock);
- io_post_aux_cqe(ctx, prsrc->tag, 0, 0, true);
+ io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
mutex_unlock(&ctx->uring_lock);
} else {
- io_post_aux_cqe(ctx, prsrc->tag, 0, 0, true);
+ io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
}
}
@@ -204,6 +204,14 @@ void io_rsrc_put_work(struct work_struct *work)
}
}
+void io_rsrc_put_tw(struct callback_head *cb)
+{
+ struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
+ rsrc_put_tw);
+
+ io_rsrc_put_work(&ctx->rsrc_put_work.work);
+}
+
void io_wait_rsrc_data(struct io_rsrc_data *data)
{
if (data && !atomic_dec_and_test(&data->refs))
@@ -242,8 +250,15 @@ static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref)
}
spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags);
- if (first_add)
- mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
+ if (!first_add)
+ return;
+
+ if (ctx->submitter_task) {
+ if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw,
+ ctx->notify_method))
+ return;
+ }
+ mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
}
static struct io_rsrc_node *io_rsrc_node_alloc(void)
@@ -309,41 +324,41 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
/* As we may drop ->uring_lock, other task may have started quiesce */
if (data->quiesce)
return -ENXIO;
+ ret = io_rsrc_node_switch_start(ctx);
+ if (ret)
+ return ret;
+ io_rsrc_node_switch(ctx, data);
+
+ /* kill initial ref, already quiesced if zero */
+ if (atomic_dec_and_test(&data->refs))
+ return 0;
data->quiesce = true;
+ mutex_unlock(&ctx->uring_lock);
do {
- ret = io_rsrc_node_switch_start(ctx);
- if (ret)
+ ret = io_run_task_work_sig(ctx);
+ if (ret < 0) {
+ atomic_inc(&data->refs);
+ /* wait for all works potentially completing data->done */
+ flush_delayed_work(&ctx->rsrc_put_work);
+ reinit_completion(&data->done);
+ mutex_lock(&ctx->uring_lock);
break;
- io_rsrc_node_switch(ctx, data);
+ }
- /* kill initial ref, already quiesced if zero */
- if (atomic_dec_and_test(&data->refs))
- break;
- mutex_unlock(&ctx->uring_lock);
flush_delayed_work(&ctx->rsrc_put_work);
ret = wait_for_completion_interruptible(&data->done);
if (!ret) {
mutex_lock(&ctx->uring_lock);
- if (atomic_read(&data->refs) > 0) {
- /*
- * it has been revived by another thread while
- * we were unlocked
- */
- mutex_unlock(&ctx->uring_lock);
- } else {
+ if (atomic_read(&data->refs) <= 0)
break;
- }
+ /*
+ * it has been revived by another thread while
+ * we were unlocked
+ */
+ mutex_unlock(&ctx->uring_lock);
}
-
- atomic_inc(&data->refs);
- /* wait for all works potentially completing data->done */
- flush_delayed_work(&ctx->rsrc_put_work);
- reinit_completion(&data->done);
-
- ret = io_run_task_work_sig(ctx);
- mutex_lock(&ctx->uring_lock);
- } while (ret >= 0);
+ } while (1);
data->quiesce = false;
return ret;
diff --git a/io_uring/rsrc.h b/io_uring/rsrc.h
index 81445a477622..2b8743645efc 100644
--- a/io_uring/rsrc.h
+++ b/io_uring/rsrc.h
@@ -53,6 +53,7 @@ struct io_mapped_ubuf {
struct bio_vec bvec[];
};
+void io_rsrc_put_tw(struct callback_head *cb);
void io_rsrc_put_work(struct work_struct *work);
void io_rsrc_refs_refill(struct io_ring_ctx *ctx);
void io_wait_rsrc_data(struct io_rsrc_data *data);
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 5c91cc80b348..b9cac5706e8d 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -286,6 +286,12 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
static void io_req_rw_complete(struct io_kiocb *req, bool *locked)
{
io_req_io_end(req);
+
+ if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
+ unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
+
+ req->cqe.flags |= io_put_kbuf(req, issue_flags);
+ }
io_req_task_complete(req, locked);
}
@@ -548,12 +554,12 @@ static inline int io_rw_prep_async(struct io_kiocb *req, int rw)
int io_readv_prep_async(struct io_kiocb *req)
{
- return io_rw_prep_async(req, READ);
+ return io_rw_prep_async(req, ITER_DEST);
}
int io_writev_prep_async(struct io_kiocb *req)
{
- return io_rw_prep_async(req, WRITE);
+ return io_rw_prep_async(req, ITER_SOURCE);
}
/*
@@ -705,7 +711,7 @@ int io_read(struct io_kiocb *req, unsigned int issue_flags)
loff_t *ppos;
if (!req_has_async_data(req)) {
- ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
+ ret = io_import_iovec(ITER_DEST, req, &iovec, s, issue_flags);
if (unlikely(ret < 0))
return ret;
} else {
@@ -717,7 +723,7 @@ int io_read(struct io_kiocb *req, unsigned int issue_flags)
* buffers, as we dropped the selected one before retry.
*/
if (io_do_buffer_select(req)) {
- ret = io_import_iovec(READ, req, &iovec, s, issue_flags);
+ ret = io_import_iovec(ITER_DEST, req, &iovec, s, issue_flags);
if (unlikely(ret < 0))
return ret;
}
@@ -852,7 +858,7 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
loff_t *ppos;
if (!req_has_async_data(req)) {
- ret = io_import_iovec(WRITE, req, &iovec, s, issue_flags);
+ ret = io_import_iovec(ITER_SOURCE, req, &iovec, s, issue_flags);
if (unlikely(ret < 0))
return ret;
} else {
diff --git a/io_uring/timeout.c b/io_uring/timeout.c
index e8a8c2099480..5b4bc93fd6e0 100644
--- a/io_uring/timeout.c
+++ b/io_uring/timeout.c
@@ -63,7 +63,7 @@ static bool io_kill_timeout(struct io_kiocb *req, int status)
atomic_set(&req->ctx->cq_timeouts,
atomic_read(&req->ctx->cq_timeouts) + 1);
list_del_init(&timeout->list);
- io_req_tw_post_queue(req, status, 0);
+ io_req_queue_tw_complete(req, status);
return true;
}
return false;
@@ -159,7 +159,7 @@ void io_disarm_next(struct io_kiocb *req)
req->flags &= ~REQ_F_ARM_LTIMEOUT;
if (link && link->opcode == IORING_OP_LINK_TIMEOUT) {
io_remove_next_linked(req);
- io_req_tw_post_queue(link, -ECANCELED, 0);
+ io_req_queue_tw_complete(link, -ECANCELED);
}
} else if (req->flags & REQ_F_LINK_TIMEOUT) {
struct io_ring_ctx *ctx = req->ctx;
@@ -168,7 +168,7 @@ void io_disarm_next(struct io_kiocb *req)
link = io_disarm_linked_timeout(req);
spin_unlock_irq(&ctx->timeout_lock);
if (link)
- io_req_tw_post_queue(link, -ECANCELED, 0);
+ io_req_queue_tw_complete(link, -ECANCELED);
}
if (unlikely((req->flags & REQ_F_FAIL) &&
!(req->flags & REQ_F_HARDLINK)))
@@ -282,11 +282,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
ret = io_try_cancel(req->task->io_uring, &cd, issue_flags);
}
io_req_set_res(req, ret ?: -ETIME, 0);
- io_req_complete_post(req);
+ io_req_task_complete(req, locked);
io_put_req(prev);
} else {
io_req_set_res(req, -ETIME, 0);
- io_req_complete_post(req);
+ io_req_task_complete(req, locked);
}
}
diff --git a/io_uring/uring_cmd.c b/io_uring/uring_cmd.c
index e50de0b6b9f8..446a189b78b0 100644
--- a/io_uring/uring_cmd.c
+++ b/io_uring/uring_cmd.c
@@ -56,7 +56,7 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2)
/* order with io_iopoll_req_issued() checking ->iopoll_complete */
smp_store_release(&req->iopoll_completed, 1);
else
- __io_req_complete(req, 0);
+ io_req_complete_post(req, 0);
}
EXPORT_SYMBOL_GPL(io_uring_cmd_done);
diff --git a/io_uring/xattr.c b/io_uring/xattr.c
index 99df641594d7..6201a9f442c6 100644
--- a/io_uring/xattr.c
+++ b/io_uring/xattr.c
@@ -112,7 +112,7 @@ int io_fgetxattr(struct io_kiocb *req, unsigned int issue_flags)
if (issue_flags & IO_URING_F_NONBLOCK)
return -EAGAIN;
- ret = do_getxattr(mnt_user_ns(req->file->f_path.mnt),
+ ret = do_getxattr(mnt_idmap(req->file->f_path.mnt),
req->file->f_path.dentry,
&ix->ctx);
@@ -133,9 +133,7 @@ int io_getxattr(struct io_kiocb *req, unsigned int issue_flags)
retry:
ret = filename_lookup(AT_FDCWD, ix->filename, lookup_flags, &path, NULL);
if (!ret) {
- ret = do_getxattr(mnt_user_ns(path.mnt),
- path.dentry,
- &ix->ctx);
+ ret = do_getxattr(mnt_idmap(path.mnt), path.dentry, &ix->ctx);
path_put(&path);
if (retry_estale(ret, lookup_flags)) {
@@ -213,7 +211,7 @@ static int __io_setxattr(struct io_kiocb *req, unsigned int issue_flags,
ret = mnt_want_write(path->mnt);
if (!ret) {
- ret = do_setxattr(mnt_user_ns(path->mnt), path->dentry, &ix->ctx);
+ ret = do_setxattr(mnt_idmap(path->mnt), path->dentry, &ix->ctx);
mnt_drop_write(path->mnt);
}