diff options
32 files changed, 4783 insertions, 146 deletions
diff --git a/arch/x86/entry/syscalls/syscall_32.tbl b/arch/x86/entry/syscalls/syscall_32.tbl index 955ab6a3b61f..8da78595d69d 100644 --- a/arch/x86/entry/syscalls/syscall_32.tbl +++ b/arch/x86/entry/syscalls/syscall_32.tbl @@ -429,3 +429,6 @@ 421 i386 rt_sigtimedwait_time64 sys_rt_sigtimedwait __ia32_compat_sys_rt_sigtimedwait_time64 422 i386 futex_time64 sys_futex __ia32_sys_futex 423 i386 sched_rr_get_interval_time64 sys_sched_rr_get_interval __ia32_sys_sched_rr_get_interval +425 i386 io_uring_setup sys_io_uring_setup __ia32_sys_io_uring_setup +426 i386 io_uring_enter sys_io_uring_enter __ia32_sys_io_uring_enter +427 i386 io_uring_register sys_io_uring_register __ia32_sys_io_uring_register diff --git a/arch/x86/entry/syscalls/syscall_64.tbl b/arch/x86/entry/syscalls/syscall_64.tbl index 2ae92fddb6d5..c768447f97ec 100644 --- a/arch/x86/entry/syscalls/syscall_64.tbl +++ b/arch/x86/entry/syscalls/syscall_64.tbl @@ -345,6 +345,9 @@ 334 common rseq __x64_sys_rseq # don't use numbers 387 through 423, add new calls after the last # 'common' entry +425 common io_uring_setup __x64_sys_io_uring_setup +426 common io_uring_enter __x64_sys_io_uring_enter +427 common io_uring_register __x64_sys_io_uring_register # # x32-specific system call numbers start at 512 to avoid cache impact diff --git a/block/bio.c b/block/bio.c index 83a2dfa417ca..71a78d9fb8b7 100644 --- a/block/bio.c +++ b/block/bio.c @@ -836,6 +836,40 @@ int bio_add_page(struct bio *bio, struct page *page, } EXPORT_SYMBOL(bio_add_page); +static int __bio_iov_bvec_add_pages(struct bio *bio, struct iov_iter *iter) +{ + const struct bio_vec *bv = iter->bvec; + unsigned int len; + size_t size; + + if (WARN_ON_ONCE(iter->iov_offset > bv->bv_len)) + return -EINVAL; + + len = min_t(size_t, bv->bv_len - iter->iov_offset, iter->count); + size = bio_add_page(bio, bv->bv_page, len, + bv->bv_offset + iter->iov_offset); + if (size == len) { + struct page *page; + int i; + + /* + * For the normal O_DIRECT case, we could skip grabbing this + * reference and then not have to put them again when IO + * completes. But this breaks some in-kernel users, like + * splicing to/from a loop device, where we release the pipe + * pages unconditionally. If we can fix that case, we can + * get rid of the get here and the need to call + * bio_release_pages() at IO completion time. + */ + mp_bvec_for_each_page(page, bv, i) + get_page(page); + iov_iter_advance(iter, size); + return 0; + } + + return -EINVAL; +} + #define PAGE_PTRS_PER_BVEC (sizeof(struct bio_vec) / sizeof(struct page *)) /** @@ -884,23 +918,35 @@ static int __bio_iov_iter_get_pages(struct bio *bio, struct iov_iter *iter) } /** - * bio_iov_iter_get_pages - pin user or kernel pages and add them to a bio + * bio_iov_iter_get_pages - add user or kernel pages to a bio * @bio: bio to add pages to - * @iter: iov iterator describing the region to be mapped + * @iter: iov iterator describing the region to be added + * + * This takes either an iterator pointing to user memory, or one pointing to + * kernel pages (BVEC iterator). If we're adding user pages, we pin them and + * map them into the kernel. On IO completion, the caller should put those + * pages. For now, when adding kernel pages, we still grab a reference to the + * page. This isn't strictly needed for the common case, but some call paths + * end up releasing pages from eg a pipe and we can't easily control these. + * See comment in __bio_iov_bvec_add_pages(). * - * Pins pages from *iter and appends them to @bio's bvec array. The - * pages will have to be released using put_page() when done. * The function tries, but does not guarantee, to pin as many pages as - * fit into the bio, or are requested in *iter, whatever is smaller. - * If MM encounters an error pinning the requested pages, it stops. - * Error is returned only if 0 pages could be pinned. + * fit into the bio, or are requested in *iter, whatever is smaller. If + * MM encounters an error pinning the requested pages, it stops. Error + * is returned only if 0 pages could be pinned. */ int bio_iov_iter_get_pages(struct bio *bio, struct iov_iter *iter) { + const bool is_bvec = iov_iter_is_bvec(iter); unsigned short orig_vcnt = bio->bi_vcnt; do { - int ret = __bio_iov_iter_get_pages(bio, iter); + int ret; + + if (is_bvec) + ret = __bio_iov_bvec_add_pages(bio, iter); + else + ret = __bio_iov_iter_get_pages(bio, iter); if (unlikely(ret)) return bio->bi_vcnt > orig_vcnt ? 0 : ret; diff --git a/fs/Makefile b/fs/Makefile index 23fcd8c164a3..ffeaa6632ab4 100644 --- a/fs/Makefile +++ b/fs/Makefile @@ -31,6 +31,7 @@ obj-$(CONFIG_TIMERFD) += timerfd.o obj-$(CONFIG_EVENTFD) += eventfd.o obj-$(CONFIG_USERFAULTFD) += userfaultfd.o obj-$(CONFIG_AIO) += aio.o +obj-$(CONFIG_IO_URING) += io_uring.o obj-$(CONFIG_FS_DAX) += dax.o obj-$(CONFIG_FS_ENCRYPTION) += crypto/ obj-$(CONFIG_FILE_LOCKING) += locks.o diff --git a/fs/file.c b/fs/file.c index a10487aa0a84..3da91a112bab 100644 --- a/fs/file.c +++ b/fs/file.c @@ -706,7 +706,7 @@ void do_close_on_exec(struct files_struct *files) spin_unlock(&files->file_lock); } -static struct file *__fget(unsigned int fd, fmode_t mask) +static struct file *__fget(unsigned int fd, fmode_t mask, unsigned int refs) { struct files_struct *files = current->files; struct file *file; @@ -721,7 +721,7 @@ loop: */ if (file->f_mode & mask) file = NULL; - else if (!get_file_rcu(file)) + else if (!get_file_rcu_many(file, refs)) goto loop; } rcu_read_unlock(); @@ -729,15 +729,20 @@ loop: return file; } +struct file *fget_many(unsigned int fd, unsigned int refs) +{ + return __fget(fd, FMODE_PATH, refs); +} + struct file *fget(unsigned int fd) { - return __fget(fd, FMODE_PATH); + return __fget(fd, FMODE_PATH, 1); } EXPORT_SYMBOL(fget); struct file *fget_raw(unsigned int fd) { - return __fget(fd, 0); + return __fget(fd, 0, 1); } EXPORT_SYMBOL(fget_raw); @@ -768,7 +773,7 @@ static unsigned long __fget_light(unsigned int fd, fmode_t mask) return 0; return (unsigned long)file; } else { - file = __fget(fd, mask); + file = __fget(fd, mask, 1); if (!file) return 0; return FDPUT_FPUT | (unsigned long)file; diff --git a/fs/file_table.c b/fs/file_table.c index 5679e7fcb6b0..155d7514a094 100644 --- a/fs/file_table.c +++ b/fs/file_table.c @@ -326,9 +326,9 @@ void flush_delayed_fput(void) static DECLARE_DELAYED_WORK(delayed_fput_work, delayed_fput); -void fput(struct file *file) +void fput_many(struct file *file, unsigned int refs) { - if (atomic_long_dec_and_test(&file->f_count)) { + if (atomic_long_sub_and_test(refs, &file->f_count)) { struct task_struct *task = current; if (likely(!in_interrupt() && !(task->flags & PF_KTHREAD))) { @@ -347,6 +347,11 @@ void fput(struct file *file) } } +void fput(struct file *file) +{ + fput_many(file, 1); +} + /* * synchronous analog of fput(); for kernel threads that might be needed * in some umount() (and thus can't use flush_delayed_fput() without diff --git a/fs/io_uring.c b/fs/io_uring.c new file mode 100644 index 000000000000..5d99376d2369 --- /dev/null +++ b/fs/io_uring.c @@ -0,0 +1,2971 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Shared application/kernel submission and completion ring pairs, for + * supporting fast/efficient IO. + * + * A note on the read/write ordering memory barriers that are matched between + * the application and kernel side. When the application reads the CQ ring + * tail, it must use an appropriate smp_rmb() to order with the smp_wmb() + * the kernel uses after writing the tail. Failure to do so could cause a + * delay in when the application notices that completion events available. + * This isn't a fatal condition. Likewise, the application must use an + * appropriate smp_wmb() both before writing the SQ tail, and after writing + * the SQ tail. The first one orders the sqe writes with the tail write, and + * the latter is paired with the smp_rmb() the kernel will issue before + * reading the SQ tail on submission. + * + * Also see the examples in the liburing library: + * + * git://git.kernel.dk/liburing + * + * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens + * from data shared between the kernel and application. This is done both + * for ordering purposes, but also to ensure that once a value is loaded from + * data that the application could potentially modify, it remains stable. + * + * Copyright (C) 2018-2019 Jens Axboe + * Copyright (c) 2018-2019 Christoph Hellwig + */ +#include <linux/kernel.h> +#include <linux/init.h> +#include <linux/errno.h> +#include <linux/syscalls.h> +#include <linux/compat.h> +#include <linux/refcount.h> +#include <linux/uio.h> + +#include <linux/sched/signal.h> +#include <linux/fs.h> +#include <linux/file.h> +#include <linux/fdtable.h> +#include <linux/mm.h> +#include <linux/mman.h> +#include <linux/mmu_context.h> +#include <linux/percpu.h> +#include <linux/slab.h> +#include <linux/workqueue.h> +#include <linux/kthread.h> +#include <linux/blkdev.h> +#include <linux/bvec.h> +#include <linux/net.h> +#include <net/sock.h> +#include <net/af_unix.h> +#include <net/scm.h> +#include <linux/anon_inodes.h> +#include <linux/sched/mm.h> +#include <linux/uaccess.h> +#include <linux/nospec.h> +#include <linux/sizes.h> +#include <linux/hugetlb.h> + +#include <uapi/linux/io_uring.h> + +#include "internal.h" + +#define IORING_MAX_ENTRIES 4096 +#define IORING_MAX_FIXED_FILES 1024 + +struct io_uring { + u32 head ____cacheline_aligned_in_smp; + u32 tail ____cacheline_aligned_in_smp; +}; + +struct io_sq_ring { + struct io_uring r; + u32 ring_mask; + u32 ring_entries; + u32 dropped; + u32 flags; + u32 array[]; +}; + +struct io_cq_ring { + struct io_uring r; + u32 ring_mask; + u32 ring_entries; + u32 overflow; + struct io_uring_cqe cqes[]; +}; + +struct io_mapped_ubuf { + u64 ubuf; + size_t len; + struct bio_vec *bvec; + unsigned int nr_bvecs; +}; + +struct async_list { + spinlock_t lock; + atomic_t cnt; + struct list_head list; + + struct file *file; + off_t io_end; + size_t io_pages; +}; + +struct io_ring_ctx { + struct { + struct percpu_ref refs; + } ____cacheline_aligned_in_smp; + + struct { + unsigned int flags; + bool compat; + bool account_mem; + + /* SQ ring */ + struct io_sq_ring *sq_ring; + unsigned cached_sq_head; + unsigned sq_entries; + unsigned sq_mask; + unsigned sq_thread_idle; + struct io_uring_sqe *sq_sqes; + } ____cacheline_aligned_in_smp; + + /* IO offload */ + struct workqueue_struct *sqo_wq; + struct task_struct *sqo_thread; /* if using sq thread polling */ + struct mm_struct *sqo_mm; + wait_queue_head_t sqo_wait; + unsigned sqo_stop; + + struct { + /* CQ ring */ + struct io_cq_ring *cq_ring; + unsigned cached_cq_tail; + unsigned cq_entries; + unsigned cq_mask; + struct wait_queue_head cq_wait; + struct fasync_struct *cq_fasync; + } ____cacheline_aligned_in_smp; + + /* + * If used, fixed file set. Writers must ensure that ->refs is dead, + * readers must ensure that ->refs is alive as long as the file* is + * used. Only updated through io_uring_register(2). + */ + struct file **user_files; + unsigned nr_user_files; + + /* if used, fixed mapped user buffers */ + unsigned nr_user_bufs; + struct io_mapped_ubuf *user_bufs; + + struct user_struct *user; + + struct completion ctx_done; + + struct { + struct mutex uring_lock; + wait_queue_head_t wait; + } ____cacheline_aligned_in_smp; + + struct { + spinlock_t completion_lock; + bool poll_multi_file; + /* + * ->poll_list is protected by the ctx->uring_lock for + * io_uring instances that don't use IORING_SETUP_SQPOLL. + * For SQPOLL, only the single threaded io_sq_thread() will + * manipulate the list, hence no extra locking is needed there. + */ + struct list_head poll_list; + struct list_head cancel_list; + } ____cacheline_aligned_in_smp; + + struct async_list pending_async[2]; + +#if defined(CONFIG_UNIX) + struct socket *ring_sock; +#endif +}; + +struct sqe_submit { + const struct io_uring_sqe *sqe; + unsigned short index; + bool has_user; + bool needs_lock; + bool needs_fixed_file; +}; + +struct io_poll_iocb { + struct file *file; + struct wait_queue_head *head; + __poll_t events; + bool woken; + bool canceled; + struct wait_queue_entry wait; +}; + +struct io_kiocb { + union { + struct kiocb rw; + struct io_poll_iocb poll; + }; + + struct sqe_submit submit; + + struct io_ring_ctx *ctx; + struct list_head list; + unsigned int flags; + refcount_t refs; +#define REQ_F_FORCE_NONBLOCK 1 /* inline submission attempt */ +#define REQ_F_IOPOLL_COMPLETED 2 /* polled IO has completed */ +#define REQ_F_FIXED_FILE 4 /* ctx owns file */ +#define REQ_F_SEQ_PREV 8 /* sequential with previous */ + u64 user_data; + u64 error; + + struct work_struct work; +}; + +#define IO_PLUG_THRESHOLD 2 +#define IO_IOPOLL_BATCH 8 + +struct io_submit_state { + struct blk_plug plug; + + /* + * io_kiocb alloc cache + */ + void *reqs[IO_IOPOLL_BATCH]; + unsigned int free_reqs; + unsigned int cur_req; + + /* + * File reference cache + */ + struct file *file; + unsigned int fd; + unsigned int has_refs; + unsigned int used_refs; + unsigned int ios_left; +}; + +static struct kmem_cache *req_cachep; + +static const struct file_operations io_uring_fops; + +struct sock *io_uring_get_socket(struct file *file) +{ +#if defined(CONFIG_UNIX) + if (file->f_op == &io_uring_fops) { + struct io_ring_ctx *ctx = file->private_data; + + return ctx->ring_sock->sk; + } +#endif + return NULL; +} +EXPORT_SYMBOL(io_uring_get_socket); + +static void io_ring_ctx_ref_free(struct percpu_ref *ref) +{ + struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs); + + complete(&ctx->ctx_done); +} + +static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) +{ + struct io_ring_ctx *ctx; + int i; + + ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); + if (!ctx) + return NULL; + + if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) { + kfree(ctx); + return NULL; + } + + ctx->flags = p->flags; + init_waitqueue_head(&ctx->cq_wait); + init_completion(&ctx->ctx_done); + mutex_init(&ctx->uring_lock); + init_waitqueue_head(&ctx->wait); + for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) { + spin_lock_init(&ctx->pending_async[i].lock); + INIT_LIST_HEAD(&ctx->pending_async[i].list); + atomic_set(&ctx->pending_async[i].cnt, 0); + } + spin_lock_init(&ctx->completion_lock); + INIT_LIST_HEAD(&ctx->poll_list); + INIT_LIST_HEAD(&ctx->cancel_list); + return ctx; +} + +static void io_commit_cqring(struct io_ring_ctx *ctx) +{ + struct io_cq_ring *ring = ctx->cq_ring; + + if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) { + /* order cqe stores with ring update */ + smp_store_release(&ring->r.tail, ctx->cached_cq_tail); + + /* + * Write sider barrier of tail update, app has read side. See + * comment at the top of this file. + */ + smp_wmb(); + + if (wq_has_sleeper(&ctx->cq_wait)) { + wake_up_interruptible(&ctx->cq_wait); + kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN); + } + } +} + +static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) +{ + struct io_cq_ring *ring = ctx->cq_ring; + unsigned tail; + + tail = ctx->cached_cq_tail; + /* See comment at the top of the file */ + smp_rmb(); + if (tail + 1 == READ_ONCE(ring->r.head)) + return NULL; + + ctx->cached_cq_tail++; + return &ring->cqes[tail & ctx->cq_mask]; +} + +static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data, + long res, unsigned ev_flags) +{ + struct io_uring_cqe *cqe; + + /* + * If we can't get a cq entry, userspace overflowed the + * submission (by quite a lot). Increment the overflow count in + * the ring. + */ + cqe = io_get_cqring(ctx); + if (cqe) { + WRITE_ONCE(cqe->user_data, ki_user_data); + WRITE_ONCE(cqe->res, res); + WRITE_ONCE(cqe->flags, ev_flags); + } else { + unsigned overflow = READ_ONCE(ctx->cq_ring->overflow); + + WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1); + } +} + +static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data, + long res, unsigned ev_flags) +{ + unsigned long flags; + + spin_lock_irqsave(&ctx->completion_lock, flags); + io_cqring_fill_event(ctx, ki_user_data, res, ev_flags); + io_commit_cqring(ctx); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + + if (waitqueue_active(&ctx->wait)) + wake_up(&ctx->wait); + if (waitqueue_active(&ctx->sqo_wait)) + wake_up(&ctx->sqo_wait); +} + +static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs) +{ + percpu_ref_put_many(&ctx->refs, refs); + + if (waitqueue_active(&ctx->wait)) + wake_up(&ctx->wait); +} + +static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx, + struct io_submit_state *state) +{ + struct io_kiocb *req; + + if (!percpu_ref_tryget(&ctx->refs)) + return NULL; + + if (!state) { + req = kmem_cache_alloc(req_cachep, __GFP_NOWARN); + if (unlikely(!req)) + goto out; + } else if (!state->free_reqs) { + size_t sz; + int ret; + + sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs)); + ret = kmem_cache_alloc_bulk(req_cachep, __GFP_NOWARN, sz, + state->reqs); + if (unlikely(ret <= 0)) + goto out; + state->free_reqs = ret - 1; + state->cur_req = 1; + req = state->reqs[0]; + } else { + req = state->reqs[state->cur_req]; + state->free_reqs--; + state->cur_req++; + } + + req->ctx = ctx; + req->flags = 0; + refcount_set(&req->refs, 0); + return req; +out: + io_ring_drop_ctx_refs(ctx, 1); + return NULL; +} + +static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr) +{ + if (*nr) { + kmem_cache_free_bulk(req_cachep, *nr, reqs); + io_ring_drop_ctx_refs(ctx, *nr); + *nr = 0; + } +} + +static void io_free_req(struct io_kiocb *req) +{ + if (!refcount_read(&req->refs) || refcount_dec_and_test(&req->refs)) { + io_ring_drop_ctx_refs(req->ctx, 1); + kmem_cache_free(req_cachep, req); + } +} + +/* + * Find and free completed poll iocbs + */ +static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events, + struct list_head *done) +{ + void *reqs[IO_IOPOLL_BATCH]; + int file_count, to_free; + struct file *file = NULL; + struct io_kiocb *req; + + file_count = to_free = 0; + while (!list_empty(done)) { + req = list_first_entry(done, struct io_kiocb, list); + list_del(&req->list); + + io_cqring_fill_event(ctx, req->user_data, req->error, 0); + + reqs[to_free++] = req; + (*nr_events)++; + + /* + * Batched puts of the same file, to avoid dirtying the + * file usage count multiple times, if avoidable. + */ + if (!(req->flags & REQ_F_FIXED_FILE)) { + if (!file) { + file = req->rw.ki_filp; + file_count = 1; + } else if (file == req->rw.ki_filp) { + file_count++; + } else { + fput_many(file, file_count); + file = req->rw.ki_filp; + file_count = 1; + } + } + + if (to_free == ARRAY_SIZE(reqs)) + io_free_req_many(ctx, reqs, &to_free); + } + io_commit_cqring(ctx); + + if (file) + fput_many(file, file_count); + io_free_req_many(ctx, reqs, &to_free); +} + +static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events, + long min) +{ + struct io_kiocb *req, *tmp; + LIST_HEAD(done); + bool spin; + int ret; + + /* + * Only spin for completions if we don't have multiple devices hanging + * off our complete list, and we're under the requested amount. + */ + spin = !ctx->poll_multi_file && *nr_events < min; + + ret = 0; + list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) { + struct kiocb *kiocb = &req->rw; + + /* + * Move completed entries to our local list. If we find a + * request that requires polling, break out and complete + * the done list first, if we have entries there. + */ + if (req->flags & REQ_F_IOPOLL_COMPLETED) { + list_move_tail(&req->list, &done); + continue; + } + if (!list_empty(&done)) + break; + + ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin); + if (ret < 0) + break; + + if (ret && spin) + spin = false; + ret = 0; + } + + if (!list_empty(&done)) + io_iopoll_complete(ctx, nr_events, &done); + + return ret; +} + +/* + * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a + * non-spinning poll check - we'll still enter the driver poll loop, but only + * as a non-spinning completion check. + */ +static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events, + long min) +{ + while (!list_empty(&ctx->poll_list)) { + int ret; + + ret = io_do_iopoll(ctx, nr_events, min); + if (ret < 0) + return ret; + if (!min || *nr_events >= min) + return 0; + } + + return 1; +} + +/* + * We can't just wait for polled events to come to us, we have to actively + * find and complete them. + */ +static void io_iopoll_reap_events(struct io_ring_ctx *ctx) +{ + if (!(ctx->flags & IORING_SETUP_IOPOLL)) + return; + + mutex_lock(&ctx->uring_lock); + while (!list_empty(&ctx->poll_list)) { + unsigned int nr_events = 0; + + io_iopoll_getevents(ctx, &nr_events, 1); + } + mutex_unlock(&ctx->uring_lock); +} + +static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, + long min) +{ + int ret = 0; + + do { + int tmin = 0; + + if (*nr_events < min) + tmin = min - *nr_events; + + ret = io_iopoll_getevents(ctx, nr_events, tmin); + if (ret <= 0) + break; + ret = 0; + } while (min && !*nr_events && !need_resched()); + + return ret; +} + +static void kiocb_end_write(struct kiocb *kiocb) +{ + if (kiocb->ki_flags & IOCB_WRITE) { + struct inode *inode = file_inode(kiocb->ki_filp); + + /* + * Tell lockdep we inherited freeze protection from submission + * thread. + */ + if (S_ISREG(inode->i_mode)) + __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE); + file_end_write(kiocb->ki_filp); + } +} + +static void io_fput(struct io_kiocb *req) +{ + if (!(req->flags & REQ_F_FIXED_FILE)) + fput(req->rw.ki_filp); +} + +static void io_complete_rw(struct kiocb *kiocb, long res, long res2) +{ + struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); + + kiocb_end_write(kiocb); + + io_fput(req); + io_cqring_add_event(req->ctx, req->user_data, res, 0); + io_free_req(req); +} + +static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2) +{ + struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw); + + kiocb_end_write(kiocb); + + req->error = res; + if (res != -EAGAIN) + req->flags |= REQ_F_IOPOLL_COMPLETED; +} + +/* + * After the iocb has been issued, it's safe to be found on the poll list. + * Adding the kiocb to the list AFTER submission ensures that we don't + * find it from a io_iopoll_getevents() thread before the issuer is done + * accessing the kiocb cookie. + */ +static void io_iopoll_req_issued(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + + /* + * Track whether we have multiple files in our lists. This will impact + * how we do polling eventually, not spinning if we're on potentially + * different devices. + */ + if (list_empty(&ctx->poll_list)) { + ctx->poll_multi_file = false; + } else if (!ctx->poll_multi_file) { + struct io_kiocb *list_req; + + list_req = list_first_entry(&ctx->poll_list, struct io_kiocb, + list); + if (list_req->rw.ki_filp != req->rw.ki_filp) + ctx->poll_multi_file = true; + } + + /* + * For fast devices, IO may have already completed. If it has, add + * it to the front so we find it first. + */ + if (req->flags & REQ_F_IOPOLL_COMPLETED) + list_add(&req->list, &ctx->poll_list); + else + list_add_tail(&req->list, &ctx->poll_list); +} + +static void io_file_put(struct io_submit_state *state, struct file *file) +{ + if (!state) { + fput(file); + } else if (state->file) { + int diff = state->has_refs - state->used_refs; + + if (diff) + fput_many(state->file, diff); + state->file = NULL; + } +} + +/* + * Get as many references to a file as we have IOs left in this submission, + * assuming most submissions are for one file, or at least that each file + * has more than one submission. + */ +static struct file *io_file_get(struct io_submit_state *state, int fd) +{ + if (!state) + return fget(fd); + + if (state->file) { + if (state->fd == fd) { + state->used_refs++; + state->ios_left--; + return state->file; + } + io_file_put(state, NULL); + } + state->file = fget_many(fd, state->ios_left); + if (!state->file) + return NULL; + + state->fd = fd; + state->has_refs = state->ios_left; + state->used_refs = 1; + state->ios_left--; + return state->file; +} + +/* + * If we tracked the file through the SCM inflight mechanism, we could support + * any file. For now, just ensure that anything potentially problematic is done + * inline. + */ +static bool io_file_supports_async(struct file *file) +{ + umode_t mode = file_inode(file)->i_mode; + + if (S_ISBLK(mode) || S_ISCHR(mode)) + return true; + if (S_ISREG(mode) && file->f_op != &io_uring_fops) + return true; + + return false; +} + +static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s, + bool force_nonblock, struct io_submit_state *state) +{ + const struct io_uring_sqe *sqe = s->sqe; + struct io_ring_ctx *ctx = req->ctx; + struct kiocb *kiocb = &req->rw; + unsigned ioprio, flags; + int fd, ret; + + /* For -EAGAIN retry, everything is already prepped */ + if (kiocb->ki_filp) + return 0; + + flags = READ_ONCE(sqe->flags); + fd = READ_ONCE(sqe->fd); + + if (flags & IOSQE_FIXED_FILE) { + if (unlikely(!ctx->user_files || + (unsigned) fd >= ctx->nr_user_files)) + return -EBADF; + kiocb->ki_filp = ctx->user_files[fd]; + req->flags |= REQ_F_FIXED_FILE; + } else { + if (s->needs_fixed_file) + return -EBADF; + kiocb->ki_filp = io_file_get(state, fd); + if (unlikely(!kiocb->ki_filp)) + return -EBADF; + if (force_nonblock && !io_file_supports_async(kiocb->ki_filp)) + force_nonblock = false; + } + kiocb->ki_pos = READ_ONCE(sqe->off); + kiocb->ki_flags = iocb_flags(kiocb->ki_filp); + kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp)); + + ioprio = READ_ONCE(sqe->ioprio); + if (ioprio) { + ret = ioprio_check_cap(ioprio); + if (ret) + goto out_fput; + + kiocb->ki_ioprio = ioprio; + } else + kiocb->ki_ioprio = get_current_ioprio(); + + ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags)); + if (unlikely(ret)) + goto out_fput; + if (force_nonblock) { + kiocb->ki_flags |= IOCB_NOWAIT; + req->flags |= REQ_F_FORCE_NONBLOCK; + } + if (ctx->flags & IORING_SETUP_IOPOLL) { + ret = -EOPNOTSUPP; + if (!(kiocb->ki_flags & IOCB_DIRECT) || + !kiocb->ki_filp->f_op->iopoll) + goto out_fput; + + req->error = 0; + kiocb->ki_flags |= IOCB_HIPRI; + kiocb->ki_complete = io_complete_rw_iopoll; + } else { + if (kiocb->ki_flags & IOCB_HIPRI) { + ret = -EINVAL; + goto out_fput; + } + kiocb->ki_complete = io_complete_rw; + } + return 0; +out_fput: + if (!(flags & IOSQE_FIXED_FILE)) { + /* + * in case of error, we didn't use this file reference. drop it. + */ + if (state) + state->used_refs--; + io_file_put(state, kiocb->ki_filp); + } + return ret; +} + +static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret) +{ + switch (ret) { + case -EIOCBQUEUED: + break; + case -ERESTARTSYS: + case -ERESTARTNOINTR: + case -ERESTARTNOHAND: + case -ERESTART_RESTARTBLOCK: + /* + * We can't just restart the syscall, since previously + * submitted sqes may already be in progress. Just fail this + * IO with EINTR. + */ + ret = -EINTR; + /* fall through */ + default: + kiocb->ki_complete(kiocb, ret, 0); + } +} + +static int io_import_fixed(struct io_ring_ctx *ctx, int rw, + const struct io_uring_sqe *sqe, + struct iov_iter *iter) +{ + size_t len = READ_ONCE(sqe->len); + struct io_mapped_ubuf *imu; + unsigned index, buf_index; + size_t offset; + u64 buf_addr; + + /* attempt to use fixed buffers without having provided iovecs */ + if (unlikely(!ctx->user_bufs)) + return -EFAULT; + + buf_index = READ_ONCE(sqe->buf_index); + if (unlikely(buf_index >= ctx->nr_user_bufs)) + return -EFAULT; + + index = array_index_nospec(buf_index, ctx->nr_user_bufs); + imu = &ctx->user_bufs[index]; + buf_addr = READ_ONCE(sqe->addr); + + /* overflow */ + if (buf_addr + len < buf_addr) + return -EFAULT; + /* not inside the mapped region */ + if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len) + return -EFAULT; + + /* + * May not be a start of buffer, set size appropriately + * and advance us to the beginning. + */ + offset = buf_addr - imu->ubuf; + iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len); + if (offset) + iov_iter_advance(iter, offset); + return 0; +} + +static int io_import_iovec(struct io_ring_ctx *ctx, int rw, + const struct sqe_submit *s, struct iovec **iovec, + struct iov_iter *iter) +{ + const struct io_uring_sqe *sqe = s->sqe; + void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr)); + size_t sqe_len = READ_ONCE(sqe->len); + u8 opcode; + + /* + * We're reading ->opcode for the second time, but the first read + * doesn't care whether it's _FIXED or not, so it doesn't matter + * whether ->opcode changes concurrently. The first read does care + * about whether it is a READ or a WRITE, so we don't trust this read + * for that purpose and instead let the caller pass in the read/write + * flag. + */ + opcode = READ_ONCE(sqe->opcode); + if (opcode == IORING_OP_READ_FIXED || + opcode == IORING_OP_WRITE_FIXED) { + ssize_t ret = io_import_fixed(ctx, rw, sqe, iter); + *iovec = NULL; + return ret; + } + + if (!s->has_user) + return -EFAULT; + +#ifdef CONFIG_COMPAT + if (ctx->compat) + return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV, + iovec, iter); +#endif + + return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter); +} + +/* + * Make a note of the last file/offset/direction we punted to async + * context. We'll use this information to see if we can piggy back a + * sequential request onto the previous one, if it's still hasn't been + * completed by the async worker. + */ +static void io_async_list_note(int rw, struct io_kiocb *req, size_t len) +{ + struct async_list *async_list = &req->ctx->pending_async[rw]; + struct kiocb *kiocb = &req->rw; + struct file *filp = kiocb->ki_filp; + off_t io_end = kiocb->ki_pos + len; + + if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) { + unsigned long max_pages; + + /* Use 8x RA size as a decent limiter for both reads/writes */ + max_pages = filp->f_ra.ra_pages; + if (!max_pages) + max_pages = VM_MAX_READAHEAD >> (PAGE_SHIFT - 10); + max_pages *= 8; + + /* If max pages are exceeded, reset the state */ + len >>= PAGE_SHIFT; + if (async_list->io_pages + len <= max_pages) { + req->flags |= REQ_F_SEQ_PREV; + async_list->io_pages += len; + } else { + io_end = 0; + async_list->io_pages = 0; + } + } + + /* New file? Reset state. */ + if (async_list->file != filp) { + async_list->io_pages = 0; + async_list->file = filp; + } + async_list->io_end = io_end; +} + +static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s, + bool force_nonblock, struct io_submit_state *state) +{ + struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; + struct kiocb *kiocb = &req->rw; + struct iov_iter iter; + struct file *file; + size_t iov_count; + ssize_t ret; + + ret = io_prep_rw(req, s, force_nonblock, state); + if (ret) + return ret; + file = kiocb->ki_filp; + + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_READ))) + goto out_fput; + ret = -EINVAL; + if (unlikely(!file->f_op->read_iter)) + goto out_fput; + + ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter); + if (ret) + goto out_fput; + + iov_count = iov_iter_count(&iter); + ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count); + if (!ret) { + ssize_t ret2; + + /* Catch -EAGAIN return for forced non-blocking submission */ + ret2 = call_read_iter(file, kiocb, &iter); + if (!force_nonblock || ret2 != -EAGAIN) { + io_rw_done(kiocb, ret2); + } else { + /* + * If ->needs_lock is true, we're already in async + * context. + */ + if (!s->needs_lock) + io_async_list_note(READ, req, iov_count); + ret = -EAGAIN; + } + } + kfree(iovec); +out_fput: + /* Hold on to the file for -EAGAIN */ + if (unlikely(ret && ret != -EAGAIN)) + io_fput(req); + return ret; +} + +static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s, + bool force_nonblock, struct io_submit_state *state) +{ + struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs; + struct kiocb *kiocb = &req->rw; + struct iov_iter iter; + struct file *file; + size_t iov_count; + ssize_t ret; + + ret = io_prep_rw(req, s, force_nonblock, state); + if (ret) + return ret; + + ret = -EBADF; + file = kiocb->ki_filp; + if (unlikely(!(file->f_mode & FMODE_WRITE))) + goto out_fput; + ret = -EINVAL; + if (unlikely(!file->f_op->write_iter)) + goto out_fput; + + ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter); + if (ret) + goto out_fput; + + iov_count = iov_iter_count(&iter); + + ret = -EAGAIN; + if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) { + /* If ->needs_lock is true, we're already in async context. */ + if (!s->needs_lock) + io_async_list_note(WRITE, req, iov_count); + goto out_free; + } + + ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count); + if (!ret) { + /* + * Open-code file_start_write here to grab freeze protection, + * which will be released by another thread in + * io_complete_rw(). Fool lockdep by telling it the lock got + * released so that it doesn't complain about the held lock when + * we return to userspace. + */ + if (S_ISREG(file_inode(file)->i_mode)) { + __sb_start_write(file_inode(file)->i_sb, + SB_FREEZE_WRITE, true); + __sb_writers_release(file_inode(file)->i_sb, + SB_FREEZE_WRITE); + } + kiocb->ki_flags |= IOCB_WRITE; + io_rw_done(kiocb, call_write_iter(file, kiocb, &iter)); + } +out_free: + kfree(iovec); +out_fput: + /* Hold on to the file for -EAGAIN */ + if (unlikely(ret && ret != -EAGAIN)) + io_fput(req); + return ret; +} + +/* + * IORING_OP_NOP just posts a completion event, nothing else. + */ +static int io_nop(struct io_kiocb *req, u64 user_data) +{ + struct io_ring_ctx *ctx = req->ctx; + long err = 0; + + if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + + /* + * Twilight zone - it's possible that someone issued an opcode that + * has a file attached, then got -EAGAIN on submission, and changed + * the sqe before we retried it from async context. Avoid dropping + * a file reference for this malicious case, and flag the error. + */ + if (req->rw.ki_filp) { + err = -EBADF; + io_fput(req); + } + io_cqring_add_event(ctx, user_data, err, 0); + io_free_req(req); + return 0; +} + +static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_ring_ctx *ctx = req->ctx; + unsigned flags; + int fd; + + /* Prep already done */ + if (req->rw.ki_filp) + return 0; + + if (unlikely(ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index)) + return -EINVAL; + + fd = READ_ONCE(sqe->fd); + flags = READ_ONCE(sqe->flags); + + if (flags & IOSQE_FIXED_FILE) { + if (unlikely(!ctx->user_files || fd >= ctx->nr_user_files)) + return -EBADF; + req->rw.ki_filp = ctx->user_files[fd]; + req->flags |= REQ_F_FIXED_FILE; + } else { + req->rw.ki_filp = fget(fd); + if (unlikely(!req->rw.ki_filp)) + return -EBADF; + } + + return 0; +} + +static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe, + bool force_nonblock) +{ + loff_t sqe_off = READ_ONCE(sqe->off); + loff_t sqe_len = READ_ONCE(sqe->len); + loff_t end = sqe_off + sqe_len; + unsigned fsync_flags; + int ret; + + fsync_flags = READ_ONCE(sqe->fsync_flags); + if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC)) + return -EINVAL; + + ret = io_prep_fsync(req, sqe); + if (ret) + return ret; + + /* fsync always requires a blocking context */ + if (force_nonblock) + return -EAGAIN; + + ret = vfs_fsync_range(req->rw.ki_filp, sqe_off, + end > 0 ? end : LLONG_MAX, + fsync_flags & IORING_FSYNC_DATASYNC); + + io_fput(req); + io_cqring_add_event(req->ctx, sqe->user_data, ret, 0); + io_free_req(req); + return 0; +} + +static void io_poll_remove_one(struct io_kiocb *req) +{ + struct io_poll_iocb *poll = &req->poll; + + spin_lock(&poll->head->lock); + WRITE_ONCE(poll->canceled, true); + if (!list_empty(&poll->wait.entry)) { + list_del_init(&poll->wait.entry); + queue_work(req->ctx->sqo_wq, &req->work); + } + spin_unlock(&poll->head->lock); + + list_del_init(&req->list); +} + +static void io_poll_remove_all(struct io_ring_ctx *ctx) +{ + struct io_kiocb *req; + + spin_lock_irq(&ctx->completion_lock); + while (!list_empty(&ctx->cancel_list)) { + req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list); + io_poll_remove_one(req); + } + spin_unlock_irq(&ctx->completion_lock); +} + +/* + * Find a running poll command that matches one specified in sqe->addr, + * and remove it if found. + */ +static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_ring_ctx *ctx = req->ctx; + struct io_kiocb *poll_req, *next; + int ret = -ENOENT; + + if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index || + sqe->poll_events) + return -EINVAL; + + spin_lock_irq(&ctx->completion_lock); + list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) { + if (READ_ONCE(sqe->addr) == poll_req->user_data) { + io_poll_remove_one(poll_req); + ret = 0; + break; + } + } + spin_unlock_irq(&ctx->completion_lock); + + io_cqring_add_event(req->ctx, sqe->user_data, ret, 0); + io_free_req(req); + return 0; +} + +static void io_poll_complete(struct io_kiocb *req, __poll_t mask) +{ + io_cqring_add_event(req->ctx, req->user_data, mangle_poll(mask), 0); + io_fput(req); + io_free_req(req); +} + +static void io_poll_complete_work(struct work_struct *work) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + struct io_poll_iocb *poll = &req->poll; + struct poll_table_struct pt = { ._key = poll->events }; + struct io_ring_ctx *ctx = req->ctx; + __poll_t mask = 0; + + if (!READ_ONCE(poll->canceled)) + mask = vfs_poll(poll->file, &pt) & poll->events; + + /* + * Note that ->ki_cancel callers also delete iocb from active_reqs after + * calling ->ki_cancel. We need the ctx_lock roundtrip here to + * synchronize with them. In the cancellation case the list_del_init + * itself is not actually needed, but harmless so we keep it in to + * avoid further branches in the fast path. + */ + spin_lock_irq(&ctx->completion_lock); + if (!mask && !READ_ONCE(poll->canceled)) { + add_wait_queue(poll->head, &poll->wait); + spin_unlock_irq(&ctx->completion_lock); + return; + } + list_del_init(&req->list); + spin_unlock_irq(&ctx->completion_lock); + + io_poll_complete(req, mask); +} + +static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, + void *key) +{ + struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb, + wait); + struct io_kiocb *req = container_of(poll, struct io_kiocb, poll); + struct io_ring_ctx *ctx = req->ctx; + __poll_t mask = key_to_poll(key); + + poll->woken = true; + + /* for instances that support it check for an event match first: */ + if (mask) { + unsigned long flags; + + if (!(mask & poll->events)) + return 0; + + /* try to complete the iocb inline if we can: */ + if (spin_trylock_irqsave(&ctx->completion_lock, flags)) { + list_del(&req->list); + spin_unlock_irqrestore(&ctx->completion_lock, flags); + + list_del_init(&poll->wait.entry); + io_poll_complete(req, mask); + return 1; + } + } + + list_del_init(&poll->wait.entry); + queue_work(ctx->sqo_wq, &req->work); + return 1; +} + +struct io_poll_table { + struct poll_table_struct pt; + struct io_kiocb *req; + int error; +}; + +static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, + struct poll_table_struct *p) +{ + struct io_poll_table *pt = container_of(p, struct io_poll_table, pt); + + if (unlikely(pt->req->poll.head)) { + pt->error = -EINVAL; + return; + } + + pt->error = 0; + pt->req->poll.head = head; + add_wait_queue(head, &pt->req->poll.wait); +} + +static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_poll_iocb *poll = &req->poll; + struct io_ring_ctx *ctx = req->ctx; + struct io_poll_table ipt; + unsigned flags; + __poll_t mask; + u16 events; + int fd; + + if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL)) + return -EINVAL; + if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index) + return -EINVAL; + + INIT_WORK(&req->work, io_poll_complete_work); + events = READ_ONCE(sqe->poll_events); + poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP; + + flags = READ_ONCE(sqe->flags); + fd = READ_ONCE(sqe->fd); + + if (flags & IOSQE_FIXED_FILE) { + if (unlikely(!ctx->user_files || fd >= ctx->nr_user_files)) + return -EBADF; + poll->file = ctx->user_files[fd]; + req->flags |= REQ_F_FIXED_FILE; + } else { + poll->file = fget(fd); + } + if (unlikely(!poll->file)) + return -EBADF; + + poll->head = NULL; + poll->woken = false; + poll->canceled = false; + + ipt.pt._qproc = io_poll_queue_proc; + ipt.pt._key = poll->events; + ipt.req = req; + ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */ + + /* initialized the list so that we can do list_empty checks */ + INIT_LIST_HEAD(&poll->wait.entry); + init_waitqueue_func_entry(&poll->wait, io_poll_wake); + + /* one for removal from waitqueue, one for this function */ + refcount_set(&req->refs, 2); + + mask = vfs_poll(poll->file, &ipt.pt) & poll->events; + if (unlikely(!poll->head)) { + /* we did not manage to set up a waitqueue, done */ + goto out; + } + + spin_lock_irq(&ctx->completion_lock); + spin_lock(&poll->head->lock); + if (poll->woken) { + /* wake_up context handles the rest */ + mask = 0; + ipt.error = 0; + } else if (mask || ipt.error) { + /* if we get an error or a mask we are done */ + WARN_ON_ONCE(list_empty(&poll->wait.entry)); + list_del_init(&poll->wait.entry); + } else { + /* actually waiting for an event */ + list_add_tail(&req->list, &ctx->cancel_list); + } + spin_unlock(&poll->head->lock); + spin_unlock_irq(&ctx->completion_lock); + +out: + if (unlikely(ipt.error)) { + if (!(flags & IOSQE_FIXED_FILE)) + fput(poll->file); + /* + * Drop one of our refs to this req, __io_submit_sqe() will + * drop the other one since we're returning an error. + */ + io_free_req(req); + return ipt.error; + } + + if (mask) + io_poll_complete(req, mask); + io_free_req(req); + return 0; +} + +static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, + const struct sqe_submit *s, bool force_nonblock, + struct io_submit_state *state) +{ + ssize_t ret; + int opcode; + + if (unlikely(s->index >= ctx->sq_entries)) + return -EINVAL; + req->user_data = READ_ONCE(s->sqe->user_data); + + opcode = READ_ONCE(s->sqe->opcode); + switch (opcode) { + case IORING_OP_NOP: + ret = io_nop(req, req->user_data); + break; + case IORING_OP_READV: + if (unlikely(s->sqe->buf_index)) + return -EINVAL; + ret = io_read(req, s, force_nonblock, state); + break; + case IORING_OP_WRITEV: + if (unlikely(s->sqe->buf_index)) + return -EINVAL; + ret = io_write(req, s, force_nonblock, state); + break; + case IORING_OP_READ_FIXED: + ret = io_read(req, s, force_nonblock, state); + break; + case IORING_OP_WRITE_FIXED: + ret = io_write(req, s, force_nonblock, state); + break; + case IORING_OP_FSYNC: + ret = io_fsync(req, s->sqe, force_nonblock); + break; + case IORING_OP_POLL_ADD: + ret = io_poll_add(req, s->sqe); + break; + case IORING_OP_POLL_REMOVE: + ret = io_poll_remove(req, s->sqe); + break; + default: + ret = -EINVAL; + break; + } + + if (ret) + return ret; + + if (ctx->flags & IORING_SETUP_IOPOLL) { + if (req->error == -EAGAIN) + return -EAGAIN; + + /* workqueue context doesn't hold uring_lock, grab it now */ + if (s->needs_lock) + mutex_lock(&ctx->uring_lock); + io_iopoll_req_issued(req); + if (s->needs_lock) + mutex_unlock(&ctx->uring_lock); + } + + return 0; +} + +static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx, + const struct io_uring_sqe *sqe) +{ + switch (sqe->opcode) { + case IORING_OP_READV: + case IORING_OP_READ_FIXED: + return &ctx->pending_async[READ]; + case IORING_OP_WRITEV: + case IORING_OP_WRITE_FIXED: + return &ctx->pending_async[WRITE]; + default: + return NULL; + } +} + +static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe) +{ + u8 opcode = READ_ONCE(sqe->opcode); + + return !(opcode == IORING_OP_READ_FIXED || + opcode == IORING_OP_WRITE_FIXED); +} + +static void io_sq_wq_submit_work(struct work_struct *work) +{ + struct io_kiocb *req = container_of(work, struct io_kiocb, work); + struct io_ring_ctx *ctx = req->ctx; + struct mm_struct *cur_mm = NULL; + struct async_list *async_list; + LIST_HEAD(req_list); + mm_segment_t old_fs; + int ret; + + async_list = io_async_list_from_sqe(ctx, req->submit.sqe); +restart: + do { + struct sqe_submit *s = &req->submit; + const struct io_uring_sqe *sqe = s->sqe; + + /* Ensure we clear previously set forced non-block flag */ + req->flags &= ~REQ_F_FORCE_NONBLOCK; + req->rw.ki_flags &= ~IOCB_NOWAIT; + + ret = 0; + if (io_sqe_needs_user(sqe) && !cur_mm) { + if (!mmget_not_zero(ctx->sqo_mm)) { + ret = -EFAULT; + } else { + cur_mm = ctx->sqo_mm; + use_mm(cur_mm); + old_fs = get_fs(); + set_fs(USER_DS); + } + } + + if (!ret) { + s->has_user = cur_mm != NULL; + s->needs_lock = true; + do { + ret = __io_submit_sqe(ctx, req, s, false, NULL); + /* + * We can get EAGAIN for polled IO even though + * we're forcing a sync submission from here, + * since we can't wait for request slots on the + * block side. + */ + if (ret != -EAGAIN) + break; + cond_resched(); + } while (1); + } + if (ret) { + io_cqring_add_event(ctx, sqe->user_data, ret, 0); + io_free_req(req); + } + + /* async context always use a copy of the sqe */ + kfree(sqe); + + if (!async_list) + break; + if (!list_empty(&req_list)) { + req = list_first_entry(&req_list, struct io_kiocb, + list); + list_del(&req->list); + continue; + } + if (list_empty(&async_list->list)) + break; + + req = NULL; + spin_lock(&async_list->lock); + if (list_empty(&async_list->list)) { + spin_unlock(&async_list->lock); + break; + } + list_splice_init(&async_list->list, &req_list); + spin_unlock(&async_list->lock); + + req = list_first_entry(&req_list, struct io_kiocb, list); + list_del(&req->list); + } while (req); + + /* + * Rare case of racing with a submitter. If we find the count has + * dropped to zero AND we have pending work items, then restart + * the processing. This is a tiny race window. + */ + if (async_list) { + ret = atomic_dec_return(&async_list->cnt); + while (!ret && !list_empty(&async_list->list)) { + spin_lock(&async_list->lock); + atomic_inc(&async_list->cnt); + list_splice_init(&async_list->list, &req_list); + spin_unlock(&async_list->lock); + + if (!list_empty(&req_list)) { + req = list_first_entry(&req_list, + struct io_kiocb, list); + list_del(&req->list); + goto restart; + } + ret = atomic_dec_return(&async_list->cnt); + } + } + + if (cur_mm) { + set_fs(old_fs); + unuse_mm(cur_mm); + mmput(cur_mm); + } +} + +/* + * See if we can piggy back onto previously submitted work, that is still + * running. We currently only allow this if the new request is sequential + * to the previous one we punted. + */ +static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req) +{ + bool ret = false; + + if (!list) + return false; + if (!(req->flags & REQ_F_SEQ_PREV)) + return false; + if (!atomic_read(&list->cnt)) + return false; + + ret = true; + spin_lock(&list->lock); + list_add_tail(&req->list, &list->list); + if (!atomic_read(&list->cnt)) { + list_del_init(&req->list); + ret = false; + } + spin_unlock(&list->lock); + return ret; +} + +static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, + struct io_submit_state *state) +{ + struct io_kiocb *req; + ssize_t ret; + + /* enforce forwards compatibility on users */ + if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE)) + return -EINVAL; + + req = io_get_req(ctx, state); + if (unlikely(!req)) + return -EAGAIN; + + req->rw.ki_filp = NULL; + + ret = __io_submit_sqe(ctx, req, s, true, state); + if (ret == -EAGAIN) { + struct io_uring_sqe *sqe_copy; + + sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL); + if (sqe_copy) { + struct async_list *list; + + memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy)); + s->sqe = sqe_copy; + + memcpy(&req->submit, s, sizeof(*s)); + list = io_async_list_from_sqe(ctx, s->sqe); + if (!io_add_to_prev_work(list, req)) { + if (list) + atomic_inc(&list->cnt); + INIT_WORK(&req->work, io_sq_wq_submit_work); + queue_work(ctx->sqo_wq, &req->work); + } + ret = 0; + } + } + if (ret) + io_free_req(req); + + return ret; +} + +/* + * Batched submission is done, ensure local IO is flushed out. + */ +static void io_submit_state_end(struct io_submit_state *state) +{ + blk_finish_plug(&state->plug); + io_file_put(state, NULL); + if (state->free_reqs) + kmem_cache_free_bulk(req_cachep, state->free_reqs, + &state->reqs[state->cur_req]); +} + +/* + * Start submission side cache. + */ +static void io_submit_state_start(struct io_submit_state *state, + struct io_ring_ctx *ctx, unsigned max_ios) +{ + blk_start_plug(&state->plug); + state->free_reqs = 0; + state->file = NULL; + state->ios_left = max_ios; +} + +static void io_commit_sqring(struct io_ring_ctx *ctx) +{ + struct io_sq_ring *ring = ctx->sq_ring; + + if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) { + /* + * Ensure any loads from the SQEs are done at this point, + * since once we write the new head, the application could + * write new data to them. + */ + smp_store_release(&ring->r.head, ctx->cached_sq_head); + + /* + * write side barrier of head update, app has read side. See + * comment at the top of this file + */ + smp_wmb(); + } +} + +/* + * Undo last io_get_sqring() + */ +static void io_drop_sqring(struct io_ring_ctx *ctx) +{ + ctx->cached_sq_head--; +} + +/* + * Fetch an sqe, if one is available. Note that s->sqe will point to memory + * that is mapped by userspace. This means that care needs to be taken to + * ensure that reads are stable, as we cannot rely on userspace always + * being a good citizen. If members of the sqe are validated and then later + * used, it's important that those reads are done through READ_ONCE() to + * prevent a re-load down the line. + */ +static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s) +{ + struct io_sq_ring *ring = ctx->sq_ring; + unsigned head; + + /* + * The cached sq head (or cq tail) serves two purposes: + * + * 1) allows us to batch the cost of updating the user visible + * head updates. + * 2) allows the kernel side to track the head on its own, even + * though the application is the one updating it. + */ + head = ctx->cached_sq_head; + /* See comment at the top of this file */ + smp_rmb(); + if (head == READ_ONCE(ring->r.tail)) + return false; + + head = READ_ONCE(ring->array[head & ctx->sq_mask]); + if (head < ctx->sq_entries) { + s->index = head; + s->sqe = &ctx->sq_sqes[head]; + ctx->cached_sq_head++; + return true; + } + + /* drop invalid entries */ + ctx->cached_sq_head++; + ring->dropped++; + /* See comment at the top of this file */ + smp_wmb(); + return false; +} + +static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, + unsigned int nr, bool has_user, bool mm_fault) +{ + struct io_submit_state state, *statep = NULL; + int ret, i, submitted = 0; + + if (nr > IO_PLUG_THRESHOLD) { + io_submit_state_start(&state, ctx, nr); + statep = &state; + } + + for (i = 0; i < nr; i++) { + if (unlikely(mm_fault)) { + ret = -EFAULT; + } else { + sqes[i].has_user = has_user; + sqes[i].needs_lock = true; + sqes[i].needs_fixed_file = true; + ret = io_submit_sqe(ctx, &sqes[i], statep); + } + if (!ret) { + submitted++; + continue; + } + + io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0); + } + + if (statep) + io_submit_state_end(&state); + + return submitted; +} + +static int io_sq_thread(void *data) +{ + struct sqe_submit sqes[IO_IOPOLL_BATCH]; + struct io_ring_ctx *ctx = data; + struct mm_struct *cur_mm = NULL; + mm_segment_t old_fs; + DEFINE_WAIT(wait); + unsigned inflight; + unsigned long timeout; + + old_fs = get_fs(); + set_fs(USER_DS); + + timeout = inflight = 0; + while (!kthread_should_stop() && !ctx->sqo_stop) { + bool all_fixed, mm_fault = false; + int i; + + if (inflight) { + unsigned nr_events = 0; + + if (ctx->flags & IORING_SETUP_IOPOLL) { + /* + * We disallow the app entering submit/complete + * with polling, but we still need to lock the + * ring to prevent racing with polled issue + * that got punted to a workqueue. + */ + mutex_lock(&ctx->uring_lock); + io_iopoll_check(ctx, &nr_events, 0); + mutex_unlock(&ctx->uring_lock); + } else { + /* + * Normal IO, just pretend everything completed. + * We don't have to poll completions for that. + */ + nr_events = inflight; + } + + inflight -= nr_events; + if (!inflight) + timeout = jiffies + ctx->sq_thread_idle; + } + + if (!io_get_sqring(ctx, &sqes[0])) { + /* + * We're polling. If we're within the defined idle + * period, then let us spin without work before going + * to sleep. + */ + if (inflight || !time_after(jiffies, timeout)) { + cpu_relax(); + continue; + } + + /* + * Drop cur_mm before scheduling, we can't hold it for + * long periods (or over schedule()). Do this before + * adding ourselves to the waitqueue, as the unuse/drop + * may sleep. + */ + if (cur_mm) { + unuse_mm(cur_mm); + mmput(cur_mm); + cur_mm = NULL; + } + + prepare_to_wait(&ctx->sqo_wait, &wait, + TASK_INTERRUPTIBLE); + + /* Tell userspace we may need a wakeup call */ + ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP; + smp_wmb(); + + if (!io_get_sqring(ctx, &sqes[0])) { + if (kthread_should_stop()) { + finish_wait(&ctx->sqo_wait, &wait); + break; + } + if (signal_pending(current)) + flush_signals(current); + schedule(); + finish_wait(&ctx->sqo_wait, &wait); + + ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; + smp_wmb(); + continue; + } + finish_wait(&ctx->sqo_wait, &wait); + + ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP; + smp_wmb(); + } + + i = 0; + all_fixed = true; + do { + if (all_fixed && io_sqe_needs_user(sqes[i].sqe)) + all_fixed = false; + + i++; + if (i == ARRAY_SIZE(sqes)) + break; + } while (io_get_sqring(ctx, &sqes[i])); + + /* Unless all new commands are FIXED regions, grab mm */ + if (!all_fixed && !cur_mm) { + mm_fault = !mmget_not_zero(ctx->sqo_mm); + if (!mm_fault) { + use_mm(ctx->sqo_mm); + cur_mm = ctx->sqo_mm; + } + } + + inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL, + mm_fault); + + /* Commit SQ ring head once we've consumed all SQEs */ + io_commit_sqring(ctx); + } + + set_fs(old_fs); + if (cur_mm) { + unuse_mm(cur_mm); + mmput(cur_mm); + } + return 0; +} + +static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) +{ + struct io_submit_state state, *statep = NULL; + int i, ret = 0, submit = 0; + + if (to_submit > IO_PLUG_THRESHOLD) { + io_submit_state_start(&state, ctx, to_submit); + statep = &state; + } + + for (i = 0; i < to_submit; i++) { + struct sqe_submit s; + + if (!io_get_sqring(ctx, &s)) + break; + + s.has_user = true; + s.needs_lock = false; + s.needs_fixed_file = false; + + ret = io_submit_sqe(ctx, &s, statep); + if (ret) { + io_drop_sqring(ctx); + break; + } + + submit++; + } + io_commit_sqring(ctx); + + if (statep) + io_submit_state_end(statep); + + return submit ? submit : ret; +} + +static unsigned io_cqring_events(struct io_cq_ring *ring) +{ + return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head); +} + +/* + * Wait until events become available, if we don't already have some. The + * application must reap them itself, as they reside on the shared cq ring. + */ +static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, + const sigset_t __user *sig, size_t sigsz) +{ + struct io_cq_ring *ring = ctx->cq_ring; + sigset_t ksigmask, sigsaved; + DEFINE_WAIT(wait); + int ret; + + /* See comment at the top of this file */ + smp_rmb(); + if (io_cqring_events(ring) >= min_events) + return 0; + + if (sig) { + ret = set_user_sigmask(sig, &ksigmask, &sigsaved, sigsz); + if (ret) + return ret; + } + + do { + prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE); + + ret = 0; + /* See comment at the top of this file */ + smp_rmb(); + if (io_cqring_events(ring) >= min_events) + break; + + schedule(); + + ret = -EINTR; + if (signal_pending(current)) + break; + } while (1); + + finish_wait(&ctx->wait, &wait); + + if (sig) + restore_user_sigmask(sig, &sigsaved); + + return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0; +} + +static void __io_sqe_files_unregister(struct io_ring_ctx *ctx) +{ +#if defined(CONFIG_UNIX) + if (ctx->ring_sock) { + struct sock *sock = ctx->ring_sock->sk; + struct sk_buff *skb; + + while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL) + kfree_skb(skb); + } +#else + int i; + + for (i = 0; i < ctx->nr_user_files; i++) + fput(ctx->user_files[i]); +#endif +} + +static int io_sqe_files_unregister(struct io_ring_ctx *ctx) +{ + if (!ctx->user_files) + return -ENXIO; + + __io_sqe_files_unregister(ctx); + kfree(ctx->user_files); + ctx->user_files = NULL; + ctx->nr_user_files = 0; + return 0; +} + +static void io_sq_thread_stop(struct io_ring_ctx *ctx) +{ + if (ctx->sqo_thread) { + ctx->sqo_stop = 1; + mb(); + kthread_stop(ctx->sqo_thread); + ctx->sqo_thread = NULL; + } +} + +static void io_finish_async(struct io_ring_ctx *ctx) +{ + io_sq_thread_stop(ctx); + + if (ctx->sqo_wq) { + destroy_workqueue(ctx->sqo_wq); + ctx->sqo_wq = NULL; + } +} + +#if defined(CONFIG_UNIX) +static void io_destruct_skb(struct sk_buff *skb) +{ + struct io_ring_ctx *ctx = skb->sk->sk_user_data; + + io_finish_async(ctx); + unix_destruct_scm(skb); +} + +/* + * Ensure the UNIX gc is aware of our file set, so we are certain that + * the io_uring can be safely unregistered on process exit, even if we have + * loops in the file referencing. + */ +static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset) +{ + struct sock *sk = ctx->ring_sock->sk; + struct scm_fp_list *fpl; + struct sk_buff *skb; + int i; + + if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) { + unsigned long inflight = ctx->user->unix_inflight + nr; + + if (inflight > task_rlimit(current, RLIMIT_NOFILE)) + return -EMFILE; + } + + fpl = kzalloc(sizeof(*fpl), GFP_KERNEL); + if (!fpl) + return -ENOMEM; + + skb = alloc_skb(0, GFP_KERNEL); + if (!skb) { + kfree(fpl); + return -ENOMEM; + } + + skb->sk = sk; + skb->destructor = io_destruct_skb; + + fpl->user = get_uid(ctx->user); + for (i = 0; i < nr; i++) { + fpl->fp[i] = get_file(ctx->user_files[i + offset]); + unix_inflight(fpl->user, fpl->fp[i]); + } + + fpl->max = fpl->count = nr; + UNIXCB(skb).fp = fpl; + refcount_add(skb->truesize, &sk->sk_wmem_alloc); + skb_queue_head(&sk->sk_receive_queue, skb); + + for (i = 0; i < nr; i++) + fput(fpl->fp[i]); + + return 0; +} + +/* + * If UNIX sockets are enabled, fd passing can cause a reference cycle which + * causes regular reference counting to break down. We rely on the UNIX + * garbage collection to take care of this problem for us. + */ +static int io_sqe_files_scm(struct io_ring_ctx *ctx) +{ + unsigned left, total; + int ret = 0; + + total = 0; + left = ctx->nr_user_files; + while (left) { + unsigned this_files = min_t(unsigned, left, SCM_MAX_FD); + int ret; + + ret = __io_sqe_files_scm(ctx, this_files, total); + if (ret) + break; + left -= this_files; + total += this_files; + } + + if (!ret) + return 0; + + while (total < ctx->nr_user_files) { + fput(ctx->user_files[total]); + total++; + } + + return ret; +} +#else +static int io_sqe_files_scm(struct io_ring_ctx *ctx) +{ + return 0; +} +#endif + +static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg, + unsigned nr_args) +{ + __s32 __user *fds = (__s32 __user *) arg; + int fd, ret = 0; + unsigned i; + + if (ctx->user_files) + return -EBUSY; + if (!nr_args) + return -EINVAL; + if (nr_args > IORING_MAX_FIXED_FILES) + return -EMFILE; + + ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL); + if (!ctx->user_files) + return -ENOMEM; + + for (i = 0; i < nr_args; i++) { + ret = -EFAULT; + if (copy_from_user(&fd, &fds[i], sizeof(fd))) + break; + + ctx->user_files[i] = fget(fd); + + ret = -EBADF; + if (!ctx->user_files[i]) + break; + /* + * Don't allow io_uring instances to be registered. If UNIX + * isn't enabled, then this causes a reference cycle and this + * instance can never get freed. If UNIX is enabled we'll + * handle it just fine, but there's still no point in allowing + * a ring fd as it doesn't support regular read/write anyway. + */ + if (ctx->user_files[i]->f_op == &io_uring_fops) { + fput(ctx->user_files[i]); + break; + } + ctx->nr_user_files++; + ret = 0; + } + + if (ret) { + for (i = 0; i < ctx->nr_user_files; i++) + fput(ctx->user_files[i]); + + kfree(ctx->user_files); + ctx->nr_user_files = 0; + return ret; + } + + ret = io_sqe_files_scm(ctx); + if (ret) + io_sqe_files_unregister(ctx); + + return ret; +} + +static int io_sq_offload_start(struct io_ring_ctx *ctx, + struct io_uring_params *p) +{ + int ret; + + init_waitqueue_head(&ctx->sqo_wait); + mmgrab(current->mm); + ctx->sqo_mm = current->mm; + + ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle); + if (!ctx->sq_thread_idle) + ctx->sq_thread_idle = HZ; + + ret = -EINVAL; + if (!cpu_possible(p->sq_thread_cpu)) + goto err; + + if (ctx->flags & IORING_SETUP_SQPOLL) { + if (p->flags & IORING_SETUP_SQ_AFF) { + int cpu; + + cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS); + ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread, + ctx, cpu, + "io_uring-sq"); + } else { + ctx->sqo_thread = kthread_create(io_sq_thread, ctx, + "io_uring-sq"); + } + if (IS_ERR(ctx->sqo_thread)) { + ret = PTR_ERR(ctx->sqo_thread); + ctx->sqo_thread = NULL; + goto err; + } + wake_up_process(ctx->sqo_thread); + } else if (p->flags & IORING_SETUP_SQ_AFF) { + /* Can't have SQ_AFF without SQPOLL */ + ret = -EINVAL; + goto err; + } + + /* Do QD, or 2 * CPUS, whatever is smallest */ + ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE, + min(ctx->sq_entries - 1, 2 * num_online_cpus())); + if (!ctx->sqo_wq) { + ret = -ENOMEM; + goto err; + } + + return 0; +err: + io_sq_thread_stop(ctx); + mmdrop(ctx->sqo_mm); + ctx->sqo_mm = NULL; + return ret; +} + +static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages) +{ + atomic_long_sub(nr_pages, &user->locked_vm); +} + +static int io_account_mem(struct user_struct *user, unsigned long nr_pages) +{ + unsigned long page_limit, cur_pages, new_pages; + + /* Don't allow more pages than we can safely lock */ + page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT; + + do { + cur_pages = atomic_long_read(&user->locked_vm); + new_pages = cur_pages + nr_pages; + if (new_pages > page_limit) + return -ENOMEM; + } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages, + new_pages) != cur_pages); + + return 0; +} + +static void io_mem_free(void *ptr) +{ + struct page *page = virt_to_head_page(ptr); + + if (put_page_testzero(page)) + free_compound_page(page); +} + +static void *io_mem_alloc(size_t size) +{ + gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP | + __GFP_NORETRY; + + return (void *) __get_free_pages(gfp_flags, get_order(size)); +} + +static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries) +{ + struct io_sq_ring *sq_ring; + struct io_cq_ring *cq_ring; + size_t bytes; + + bytes = struct_size(sq_ring, array, sq_entries); + bytes += array_size(sizeof(struct io_uring_sqe), sq_entries); + bytes += struct_size(cq_ring, cqes, cq_entries); + + return (bytes + PAGE_SIZE - 1) / PAGE_SIZE; +} + +static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx) +{ + int i, j; + + if (!ctx->user_bufs) + return -ENXIO; + + for (i = 0; i < ctx->nr_user_bufs; i++) { + struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; + + for (j = 0; j < imu->nr_bvecs; j++) + put_page(imu->bvec[j].bv_page); + + if (ctx->account_mem) + io_unaccount_mem(ctx->user, imu->nr_bvecs); + kfree(imu->bvec); + imu->nr_bvecs = 0; + } + + kfree(ctx->user_bufs); + ctx->user_bufs = NULL; + ctx->nr_user_bufs = 0; + return 0; +} + +static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst, + void __user *arg, unsigned index) +{ + struct iovec __user *src; + +#ifdef CONFIG_COMPAT + if (ctx->compat) { + struct compat_iovec __user *ciovs; + struct compat_iovec ciov; + + ciovs = (struct compat_iovec __user *) arg; + if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov))) + return -EFAULT; + + dst->iov_base = (void __user *) (unsigned long) ciov.iov_base; + dst->iov_len = ciov.iov_len; + return 0; + } +#endif + src = (struct iovec __user *) arg; + if (copy_from_user(dst, &src[index], sizeof(*dst))) + return -EFAULT; + return 0; +} + +static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg, + unsigned nr_args) +{ + struct vm_area_struct **vmas = NULL; + struct page **pages = NULL; + int i, j, got_pages = 0; + int ret = -EINVAL; + + if (ctx->user_bufs) + return -EBUSY; + if (!nr_args || nr_args > UIO_MAXIOV) + return -EINVAL; + + ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf), + GFP_KERNEL); + if (!ctx->user_bufs) + return -ENOMEM; + + for (i = 0; i < nr_args; i++) { + struct io_mapped_ubuf *imu = &ctx->user_bufs[i]; + unsigned long off, start, end, ubuf; + int pret, nr_pages; + struct iovec iov; + size_t size; + + ret = io_copy_iov(ctx, &iov, arg, i); + if (ret) + break; + + /* + * Don't impose further limits on the size and buffer + * constraints here, we'll -EINVAL later when IO is + * submitted if they are wrong. + */ + ret = -EFAULT; + if (!iov.iov_base || !iov.iov_len) + goto err; + + /* arbitrary limit, but we need something */ + if (iov.iov_len > SZ_1G) + goto err; + + ubuf = (unsigned long) iov.iov_base; + end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT; + start = ubuf >> PAGE_SHIFT; + nr_pages = end - start; + + if (ctx->account_mem) { + ret = io_account_mem(ctx->user, nr_pages); + if (ret) + goto err; + } + + ret = 0; + if (!pages || nr_pages > got_pages) { + kfree(vmas); + kfree(pages); + pages = kmalloc_array(nr_pages, sizeof(struct page *), + GFP_KERNEL); + vmas = kmalloc_array(nr_pages, + sizeof(struct vm_area_struct *), + GFP_KERNEL); + if (!pages || !vmas) { + ret = -ENOMEM; + if (ctx->account_mem) + io_unaccount_mem(ctx->user, nr_pages); + goto err; + } + got_pages = nr_pages; + } + + imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec), + GFP_KERNEL); + ret = -ENOMEM; + if (!imu->bvec) { + if (ctx->account_mem) + io_unaccount_mem(ctx->user, nr_pages); + goto err; + } + + ret = 0; + down_read(¤t->mm->mmap_sem); + pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE, + pages, vmas); + if (pret == nr_pages) { + /* don't support file backed memory */ + for (j = 0; j < nr_pages; j++) { + struct vm_area_struct *vma = vmas[j]; + + if (vma->vm_file && + !is_file_hugepages(vma->vm_file)) { + ret = -EOPNOTSUPP; + break; + } + } + } else { + ret = pret < 0 ? pret : -EFAULT; + } + up_read(¤t->mm->mmap_sem); + if (ret) { + /* + * if we did partial map, or found file backed vmas, + * release any pages we did get + */ + if (pret > 0) { + for (j = 0; j < pret; j++) + put_page(pages[j]); + } + if (ctx->account_mem) + io_unaccount_mem(ctx->user, nr_pages); + goto err; + } + + off = ubuf & ~PAGE_MASK; + size = iov.iov_len; + for (j = 0; j < nr_pages; j++) { + size_t vec_len; + + vec_len = min_t(size_t, size, PAGE_SIZE - off); + imu->bvec[j].bv_page = pages[j]; + imu->bvec[j].bv_len = vec_len; + imu->bvec[j].bv_offset = off; + off = 0; + size -= vec_len; + } + /* store original address for later verification */ + imu->ubuf = ubuf; + imu->len = iov.iov_len; + imu->nr_bvecs = nr_pages; + + ctx->nr_user_bufs++; + } + kfree(pages); + kfree(vmas); + return 0; +err: + kfree(pages); + kfree(vmas); + io_sqe_buffer_unregister(ctx); + return ret; +} + +static void io_ring_ctx_free(struct io_ring_ctx *ctx) +{ + io_finish_async(ctx); + if (ctx->sqo_mm) + mmdrop(ctx->sqo_mm); + + io_iopoll_reap_events(ctx); + io_sqe_buffer_unregister(ctx); + io_sqe_files_unregister(ctx); + +#if defined(CONFIG_UNIX) + if (ctx->ring_sock) + sock_release(ctx->ring_sock); +#endif + + io_mem_free(ctx->sq_ring); + io_mem_free(ctx->sq_sqes); + io_mem_free(ctx->cq_ring); + + percpu_ref_exit(&ctx->refs); + if (ctx->account_mem) + io_unaccount_mem(ctx->user, + ring_pages(ctx->sq_entries, ctx->cq_entries)); + free_uid(ctx->user); + kfree(ctx); +} + +static __poll_t io_uring_poll(struct file *file, poll_table *wait) +{ + struct io_ring_ctx *ctx = file->private_data; + __poll_t mask = 0; + + poll_wait(file, &ctx->cq_wait, wait); + /* See comment at the top of this file */ + smp_rmb(); + if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head) + mask |= EPOLLOUT | EPOLLWRNORM; + if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail) + mask |= EPOLLIN | EPOLLRDNORM; + + return mask; +} + +static int io_uring_fasync(int fd, struct file *file, int on) +{ + struct io_ring_ctx *ctx = file->private_data; + + return fasync_helper(fd, file, on, &ctx->cq_fasync); +} + +static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) +{ + mutex_lock(&ctx->uring_lock); + percpu_ref_kill(&ctx->refs); + mutex_unlock(&ctx->uring_lock); + + io_poll_remove_all(ctx); + io_iopoll_reap_events(ctx); + wait_for_completion(&ctx->ctx_done); + io_ring_ctx_free(ctx); +} + +static int io_uring_release(struct inode *inode, struct file *file) +{ + struct io_ring_ctx *ctx = file->private_data; + + file->private_data = NULL; + io_ring_ctx_wait_and_kill(ctx); + return 0; +} + +static int io_uring_mmap(struct file *file, struct vm_area_struct *vma) +{ + loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT; + unsigned long sz = vma->vm_end - vma->vm_start; + struct io_ring_ctx *ctx = file->private_data; + unsigned long pfn; + struct page *page; + void *ptr; + + switch (offset) { + case IORING_OFF_SQ_RING: + ptr = ctx->sq_ring; + break; + case IORING_OFF_SQES: + ptr = ctx->sq_sqes; + break; + case IORING_OFF_CQ_RING: + ptr = ctx->cq_ring; + break; + default: + return -EINVAL; + } + + page = virt_to_head_page(ptr); + if (sz > (PAGE_SIZE << compound_order(page))) + return -EINVAL; + + pfn = virt_to_phys(ptr) >> PAGE_SHIFT; + return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot); +} + +SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, + u32, min_complete, u32, flags, const sigset_t __user *, sig, + size_t, sigsz) +{ + struct io_ring_ctx *ctx; + long ret = -EBADF; + int submitted = 0; + struct fd f; + + if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP)) + return -EINVAL; + + f = fdget(fd); + if (!f.file) + return -EBADF; + + ret = -EOPNOTSUPP; + if (f.file->f_op != &io_uring_fops) + goto out_fput; + + ret = -ENXIO; + ctx = f.file->private_data; + if (!percpu_ref_tryget(&ctx->refs)) + goto out_fput; + + /* + * For SQ polling, the thread will do all submissions and completions. + * Just return the requested submit count, and wake the thread if + * we were asked to. + */ + if (ctx->flags & IORING_SETUP_SQPOLL) { + if (flags & IORING_ENTER_SQ_WAKEUP) + wake_up(&ctx->sqo_wait); + submitted = to_submit; + goto out_ctx; + } + + ret = 0; + if (to_submit) { + to_submit = min(to_submit, ctx->sq_entries); + + mutex_lock(&ctx->uring_lock); + submitted = io_ring_submit(ctx, to_submit); + mutex_unlock(&ctx->uring_lock); + + if (submitted < 0) + goto out_ctx; + } + if (flags & IORING_ENTER_GETEVENTS) { + unsigned nr_events = 0; + + min_complete = min(min_complete, ctx->cq_entries); + + /* + * The application could have included the 'to_submit' count + * in how many events it wanted to wait for. If we failed to + * submit the desired count, we may need to adjust the number + * of events to poll/wait for. + */ + if (submitted < to_submit) + min_complete = min_t(unsigned, submitted, min_complete); + + if (ctx->flags & IORING_SETUP_IOPOLL) { + mutex_lock(&ctx->uring_lock); + ret = io_iopoll_check(ctx, &nr_events, min_complete); + mutex_unlock(&ctx->uring_lock); + } else { + ret = io_cqring_wait(ctx, min_complete, sig, sigsz); + } + } + +out_ctx: + io_ring_drop_ctx_refs(ctx, 1); +out_fput: + fdput(f); + return submitted ? submitted : ret; +} + +static const struct file_operations io_uring_fops = { + .release = io_uring_release, + .mmap = io_uring_mmap, + .poll = io_uring_poll, + .fasync = io_uring_fasync, +}; + +static int io_allocate_scq_urings(struct io_ring_ctx *ctx, + struct io_uring_params *p) +{ + struct io_sq_ring *sq_ring; + struct io_cq_ring *cq_ring; + size_t size; + + sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries)); + if (!sq_ring) + return -ENOMEM; + + ctx->sq_ring = sq_ring; + sq_ring->ring_mask = p->sq_entries - 1; + sq_ring->ring_entries = p->sq_entries; + ctx->sq_mask = sq_ring->ring_mask; + ctx->sq_entries = sq_ring->ring_entries; + + size = array_size(sizeof(struct io_uring_sqe), p->sq_entries); + if (size == SIZE_MAX) + return -EOVERFLOW; + + ctx->sq_sqes = io_mem_alloc(size); + if (!ctx->sq_sqes) { + io_mem_free(ctx->sq_ring); + return -ENOMEM; + } + + cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries)); + if (!cq_ring) { + io_mem_free(ctx->sq_ring); + io_mem_free(ctx->sq_sqes); + return -ENOMEM; + } + + ctx->cq_ring = cq_ring; + cq_ring->ring_mask = p->cq_entries - 1; + cq_ring->ring_entries = p->cq_entries; + ctx->cq_mask = cq_ring->ring_mask; + ctx->cq_entries = cq_ring->ring_entries; + return 0; +} + +/* + * Allocate an anonymous fd, this is what constitutes the application + * visible backing of an io_uring instance. The application mmaps this + * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled, + * we have to tie this fd to a socket for file garbage collection purposes. + */ +static int io_uring_get_fd(struct io_ring_ctx *ctx) +{ + struct file *file; + int ret; + +#if defined(CONFIG_UNIX) + ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP, + &ctx->ring_sock); + if (ret) + return ret; +#endif + + ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC); + if (ret < 0) + goto err; + + file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx, + O_RDWR | O_CLOEXEC); + if (IS_ERR(file)) { + put_unused_fd(ret); + ret = PTR_ERR(file); + goto err; + } + +#if defined(CONFIG_UNIX) + ctx->ring_sock->file = file; + ctx->ring_sock->sk->sk_user_data = ctx; +#endif + fd_install(ret, file); + return ret; +err: +#if defined(CONFIG_UNIX) + sock_release(ctx->ring_sock); + ctx->ring_sock = NULL; +#endif + return ret; +} + +static int io_uring_create(unsigned entries, struct io_uring_params *p) +{ + struct user_struct *user = NULL; + struct io_ring_ctx *ctx; + bool account_mem; + int ret; + + if (!entries || entries > IORING_MAX_ENTRIES) + return -EINVAL; + + /* + * Use twice as many entries for the CQ ring. It's possible for the + * application to drive a higher depth than the size of the SQ ring, + * since the sqes are only used at submission time. This allows for + * some flexibility in overcommitting a bit. + */ + p->sq_entries = roundup_pow_of_two(entries); + p->cq_entries = 2 * p->sq_entries; + + user = get_uid(current_user()); + account_mem = !capable(CAP_IPC_LOCK); + + if (account_mem) { + ret = io_account_mem(user, + ring_pages(p->sq_entries, p->cq_entries)); + if (ret) { + free_uid(user); + return ret; + } + } + + ctx = io_ring_ctx_alloc(p); + if (!ctx) { + if (account_mem) + io_unaccount_mem(user, ring_pages(p->sq_entries, + p->cq_entries)); + free_uid(user); + return -ENOMEM; + } + ctx->compat = in_compat_syscall(); + ctx->account_mem = account_mem; + ctx->user = user; + + ret = io_allocate_scq_urings(ctx, p); + if (ret) + goto err; + + ret = io_sq_offload_start(ctx, p); + if (ret) + goto err; + + ret = io_uring_get_fd(ctx); + if (ret < 0) + goto err; + + memset(&p->sq_off, 0, sizeof(p->sq_off)); + p->sq_off.head = offsetof(struct io_sq_ring, r.head); + p->sq_off.tail = offsetof(struct io_sq_ring, r.tail); + p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask); + p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries); + p->sq_off.flags = offsetof(struct io_sq_ring, flags); + p->sq_off.dropped = offsetof(struct io_sq_ring, dropped); + p->sq_off.array = offsetof(struct io_sq_ring, array); + + memset(&p->cq_off, 0, sizeof(p->cq_off)); + p->cq_off.head = offsetof(struct io_cq_ring, r.head); + p->cq_off.tail = offsetof(struct io_cq_ring, r.tail); + p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask); + p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries); + p->cq_off.overflow = offsetof(struct io_cq_ring, overflow); + p->cq_off.cqes = offsetof(struct io_cq_ring, cqes); + return ret; +err: + io_ring_ctx_wait_and_kill(ctx); + return ret; +} + +/* + * Sets up an aio uring context, and returns the fd. Applications asks for a + * ring size, we return the actual sq/cq ring sizes (among other things) in the + * params structure passed in. + */ +static long io_uring_setup(u32 entries, struct io_uring_params __user *params) +{ + struct io_uring_params p; + long ret; + int i; + + if (copy_from_user(&p, params, sizeof(p))) + return -EFAULT; + for (i = 0; i < ARRAY_SIZE(p.resv); i++) { + if (p.resv[i]) + return -EINVAL; + } + + if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL | + IORING_SETUP_SQ_AFF)) + return -EINVAL; + + ret = io_uring_create(entries, &p); + if (ret < 0) + return ret; + + if (copy_to_user(params, &p, sizeof(p))) + return -EFAULT; + + return ret; +} + +SYSCALL_DEFINE2(io_uring_setup, u32, entries, + struct io_uring_params __user *, params) +{ + return io_uring_setup(entries, params); +} + +static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, + void __user *arg, unsigned nr_args) +{ + int ret; + + percpu_ref_kill(&ctx->refs); + wait_for_completion(&ctx->ctx_done); + + switch (opcode) { + case IORING_REGISTER_BUFFERS: + ret = io_sqe_buffer_register(ctx, arg, nr_args); + break; + case IORING_UNREGISTER_BUFFERS: + ret = -EINVAL; + if (arg || nr_args) + break; + ret = io_sqe_buffer_unregister(ctx); + break; + case IORING_REGISTER_FILES: + ret = io_sqe_files_register(ctx, arg, nr_args); + break; + case IORING_UNREGISTER_FILES: + ret = -EINVAL; + if (arg || nr_args) + break; + ret = io_sqe_files_unregister(ctx); + break; + default: + ret = -EINVAL; + break; + } + + /* bring the ctx back to life */ + reinit_completion(&ctx->ctx_done); + percpu_ref_reinit(&ctx->refs); + return ret; +} + +SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, + void __user *, arg, unsigned int, nr_args) +{ + struct io_ring_ctx *ctx; + long ret = -EBADF; + struct fd f; + + f = fdget(fd); + if (!f.file) + return -EBADF; + + ret = -EOPNOTSUPP; + if (f.file->f_op != &io_uring_fops) + goto out_fput; + + ctx = f.file->private_data; + + mutex_lock(&ctx->uring_lock); + ret = __io_uring_register(ctx, opcode, arg, nr_args); + mutex_unlock(&ctx->uring_lock); +out_fput: + fdput(f); + return ret; +} + +static int __init io_uring_init(void) +{ + req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC); + return 0; +}; +__initcall(io_uring_init); diff --git a/include/linux/file.h b/include/linux/file.h index 6b2fb032416c..3fcddff56bc4 100644 --- a/include/linux/file.h +++ b/include/linux/file.h @@ -13,6 +13,7 @@ struct file; extern void fput(struct file *); +extern void fput_many(struct file *, unsigned int); struct file_operations; struct vfsmount; @@ -44,6 +45,7 @@ static inline void fdput(struct fd fd) } extern struct file *fget(unsigned int fd); +extern struct file *fget_many(unsigned int fd, unsigned int refs); extern struct file *fget_raw(unsigned int fd); extern unsigned long __fdget(unsigned int fd); extern unsigned long __fdget_raw(unsigned int fd); diff --git a/include/linux/fs.h b/include/linux/fs.h index 7442329a0011..0a257d89208e 100644 --- a/include/linux/fs.h +++ b/include/linux/fs.h @@ -961,7 +961,9 @@ static inline struct file *get_file(struct file *f) atomic_long_inc(&f->f_count); return f; } -#define get_file_rcu(x) atomic_long_inc_not_zero(&(x)->f_count) +#define get_file_rcu_many(x, cnt) \ + atomic_long_add_unless(&(x)->f_count, (cnt), 0) +#define get_file_rcu(x) get_file_rcu_many((x), 1) #define fput_atomic(x) atomic_long_add_unless(&(x)->f_count, -1, 1) #define file_count(x) atomic_long_read(&(x)->f_count) @@ -3511,4 +3513,13 @@ extern void inode_nohighmem(struct inode *inode); extern int vfs_fadvise(struct file *file, loff_t offset, loff_t len, int advice); +#if defined(CONFIG_IO_URING) +extern struct sock *io_uring_get_socket(struct file *file); +#else +static inline struct sock *io_uring_get_socket(struct file *file) +{ + return NULL; +} +#endif + #endif /* _LINUX_FS_H */ diff --git a/include/linux/sched/user.h b/include/linux/sched/user.h index 39ad98c09c58..c7b5f86b91a1 100644 --- a/include/linux/sched/user.h +++ b/include/linux/sched/user.h @@ -40,7 +40,7 @@ struct user_struct { kuid_t uid; #if defined(CONFIG_PERF_EVENTS) || defined(CONFIG_BPF_SYSCALL) || \ - defined(CONFIG_NET) + defined(CONFIG_NET) || defined(CONFIG_IO_URING) atomic_long_t locked_vm; #endif diff --git a/include/linux/syscalls.h b/include/linux/syscalls.h index 94369f5bd8e5..c2962953bf11 100644 --- a/include/linux/syscalls.h +++ b/include/linux/syscalls.h @@ -69,6 +69,7 @@ struct file_handle; struct sigaltstack; struct rseq; union bpf_attr; +struct io_uring_params; #include <linux/types.h> #include <linux/aio_abi.h> @@ -314,6 +315,13 @@ asmlinkage long sys_io_pgetevents_time32(aio_context_t ctx_id, struct io_event __user *events, struct old_timespec32 __user *timeout, const struct __aio_sigset *sig); +asmlinkage long sys_io_uring_setup(u32 entries, + struct io_uring_params __user *p); +asmlinkage long sys_io_uring_enter(unsigned int fd, u32 to_submit, + u32 min_complete, u32 flags, + const sigset_t __user *sig, size_t sigsz); +asmlinkage long sys_io_uring_register(unsigned int fd, unsigned int op, + void __user *arg, unsigned int nr_args); /* fs/xattr.c */ asmlinkage long sys_setxattr(const char __user *path, const char __user *name, diff --git a/include/net/af_unix.h b/include/net/af_unix.h index ddbba838d048..3426d6dacc45 100644 --- a/include/net/af_unix.h +++ b/include/net/af_unix.h @@ -10,6 +10,7 @@ void unix_inflight(struct user_struct *user, struct file *fp); void unix_notinflight(struct user_struct *user, struct file *fp); +void unix_destruct_scm(struct sk_buff *skb); void unix_gc(void); void wait_for_unix_gc(void); struct sock *unix_get_socket(struct file *filp); diff --git a/include/uapi/asm-generic/unistd.h b/include/uapi/asm-generic/unistd.h index 12cdf611d217..bf4624efe5e6 100644 --- a/include/uapi/asm-generic/unistd.h +++ b/include/uapi/asm-generic/unistd.h @@ -824,8 +824,15 @@ __SYSCALL(__NR_futex_time64, sys_futex) __SYSCALL(__NR_sched_rr_get_interval_time64, sys_sched_rr_get_interval) #endif +#define __NR_io_uring_setup 425 +__SYSCALL(__NR_io_uring_setup, sys_io_uring_setup) +#define __NR_io_uring_enter 426 +__SYSCALL(__NR_io_uring_enter, sys_io_uring_enter) +#define __NR_io_uring_register 427 +__SYSCALL(__NR_io_uring_register, sys_io_uring_register) + #undef __NR_syscalls -#define __NR_syscalls 424 +#define __NR_syscalls 428 /* * 32 bit systems traditionally used different diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h new file mode 100644 index 000000000000..e23408692118 --- /dev/null +++ b/include/uapi/linux/io_uring.h @@ -0,0 +1,137 @@ +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ +/* + * Header file for the io_uring interface. + * + * Copyright (C) 2019 Jens Axboe + * Copyright (C) 2019 Christoph Hellwig + */ +#ifndef LINUX_IO_URING_H +#define LINUX_IO_URING_H + +#include <linux/fs.h> +#include <linux/types.h> + +/* + * IO submission data structure (Submission Queue Entry) + */ +struct io_uring_sqe { + __u8 opcode; /* type of operation for this sqe */ + __u8 flags; /* IOSQE_ flags */ + __u16 ioprio; /* ioprio for the request */ + __s32 fd; /* file descriptor to do IO on */ + __u64 off; /* offset into file */ + __u64 addr; /* pointer to buffer or iovecs */ + __u32 len; /* buffer size or number of iovecs */ + union { + __kernel_rwf_t rw_flags; + __u32 fsync_flags; + __u16 poll_events; + }; + __u64 user_data; /* data to be passed back at completion time */ + union { + __u16 buf_index; /* index into fixed buffers, if used */ + __u64 __pad2[3]; + }; +}; + +/* + * sqe->flags + */ +#define IOSQE_FIXED_FILE (1U << 0) /* use fixed fileset */ + +/* + * io_uring_setup() flags + */ +#define IORING_SETUP_IOPOLL (1U << 0) /* io_context is polled */ +#define IORING_SETUP_SQPOLL (1U << 1) /* SQ poll thread */ +#define IORING_SETUP_SQ_AFF (1U << 2) /* sq_thread_cpu is valid */ + +#define IORING_OP_NOP 0 +#define IORING_OP_READV 1 +#define IORING_OP_WRITEV 2 +#define IORING_OP_FSYNC 3 +#define IORING_OP_READ_FIXED 4 +#define IORING_OP_WRITE_FIXED 5 +#define IORING_OP_POLL_ADD 6 +#define IORING_OP_POLL_REMOVE 7 + +/* + * sqe->fsync_flags + */ +#define IORING_FSYNC_DATASYNC (1U << 0) + +/* + * IO completion data structure (Completion Queue Entry) + */ +struct io_uring_cqe { + __u64 user_data; /* sqe->data submission passed back */ + __s32 res; /* result code for this event */ + __u32 flags; +}; + +/* + * Magic offsets for the application to mmap the data it needs + */ +#define IORING_OFF_SQ_RING 0ULL +#define IORING_OFF_CQ_RING 0x8000000ULL +#define IORING_OFF_SQES 0x10000000ULL + +/* + * Filled with the offset for mmap(2) + */ +struct io_sqring_offsets { + __u32 head; + __u32 tail; + __u32 ring_mask; + __u32 ring_entries; + __u32 flags; + __u32 dropped; + __u32 array; + __u32 resv1; + __u64 resv2; +}; + +/* + * sq_ring->flags + */ +#define IORING_SQ_NEED_WAKEUP (1U << 0) /* needs io_uring_enter wakeup */ + +struct io_cqring_offsets { + __u32 head; + __u32 tail; + __u32 ring_mask; + __u32 ring_entries; + __u32 overflow; + __u32 cqes; + __u64 resv[2]; +}; + +/* + * io_uring_enter(2) flags + */ +#define IORING_ENTER_GETEVENTS (1U << 0) +#define IORING_ENTER_SQ_WAKEUP (1U << 1) + +/* + * Passed in for io_uring_setup(2). Copied back with updated info on success + */ +struct io_uring_params { + __u32 sq_entries; + __u32 cq_entries; + __u32 flags; + __u32 sq_thread_cpu; + __u32 sq_thread_idle; + __u32 resv[5]; + struct io_sqring_offsets sq_off; + struct io_cqring_offsets cq_off; +}; + +/* + * io_uring_register(2) opcodes and arguments + */ +#define IORING_REGISTER_BUFFERS 0 +#define IORING_UNREGISTER_BUFFERS 1 +#define IORING_REGISTER_FILES 2 +#define IORING_UNREGISTER_FILES 3 + +#endif diff --git a/init/Kconfig b/init/Kconfig index c9386a365eea..53b54214a36e 100644 --- a/init/Kconfig +++ b/init/Kconfig @@ -1414,6 +1414,15 @@ config AIO by some high performance threaded applications. Disabling this option saves about 7k. +config IO_URING + bool "Enable IO uring support" if EXPERT + select ANON_INODES + default y + help + This option enables support for the io_uring interface, enabling + applications to submit and complete IO through submission and + completion rings that are shared between the kernel and application. + config ADVISE_SYSCALLS bool "Enable madvise/fadvise syscalls" if EXPERT default y diff --git a/kernel/sys_ni.c b/kernel/sys_ni.c index 62a6c8707799..51d7c6794bf1 100644 --- a/kernel/sys_ni.c +++ b/kernel/sys_ni.c @@ -48,6 +48,9 @@ COND_SYSCALL(io_pgetevents_time32); COND_SYSCALL(io_pgetevents); COND_SYSCALL_COMPAT(io_pgetevents_time32); COND_SYSCALL_COMPAT(io_pgetevents); +COND_SYSCALL(io_uring_setup); +COND_SYSCALL(io_uring_enter); +COND_SYSCALL(io_uring_register); /* fs/xattr.c */ diff --git a/net/Makefile b/net/Makefile index bdaf53925acd..449fc0b221f8 100644 --- a/net/Makefile +++ b/net/Makefile @@ -18,7 +18,7 @@ obj-$(CONFIG_NETFILTER) += netfilter/ obj-$(CONFIG_INET) += ipv4/ obj-$(CONFIG_TLS) += tls/ obj-$(CONFIG_XFRM) += xfrm/ -obj-$(CONFIG_UNIX) += unix/ +obj-$(CONFIG_UNIX_SCM) += unix/ obj-$(CONFIG_NET) += ipv6/ obj-$(CONFIG_BPFILTER) += bpfilter/ obj-$(CONFIG_PACKET) += packet/ diff --git a/net/unix/Kconfig b/net/unix/Kconfig index 8b31ab85d050..3b9e450656a4 100644 --- a/net/unix/Kconfig +++ b/net/unix/Kconfig @@ -19,6 +19,11 @@ config UNIX Say Y unless you know what you are doing. +config UNIX_SCM + bool + depends on UNIX + default y + config UNIX_DIAG tristate "UNIX: socket monitoring interface" depends on UNIX diff --git a/net/unix/Makefile b/net/unix/Makefile index ffd0a275c3a7..54e58cc4f945 100644 --- a/net/unix/Makefile +++ b/net/unix/Makefile @@ -10,3 +10,5 @@ unix-$(CONFIG_SYSCTL) += sysctl_net_unix.o obj-$(CONFIG_UNIX_DIAG) += unix_diag.o unix_diag-y := diag.o + +obj-$(CONFIG_UNIX_SCM) += scm.o diff --git a/net/unix/af_unix.c b/net/unix/af_unix.c index a95d479caeea..ddb838a1b74c 100644 --- a/net/unix/af_unix.c +++ b/net/unix/af_unix.c @@ -119,6 +119,8 @@ #include <linux/freezer.h> #include <linux/file.h> +#include "scm.h" + struct hlist_head unix_socket_table[2 * UNIX_HASH_SIZE]; EXPORT_SYMBOL_GPL(unix_socket_table); DEFINE_SPINLOCK(unix_table_lock); @@ -1496,67 +1498,6 @@ out: return err; } -static void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb) -{ - int i; - - scm->fp = UNIXCB(skb).fp; - UNIXCB(skb).fp = NULL; - - for (i = scm->fp->count-1; i >= 0; i--) - unix_notinflight(scm->fp->user, scm->fp->fp[i]); -} - -static void unix_destruct_scm(struct sk_buff *skb) -{ - struct scm_cookie scm; - memset(&scm, 0, sizeof(scm)); - scm.pid = UNIXCB(skb).pid; - if (UNIXCB(skb).fp) - unix_detach_fds(&scm, skb); - - /* Alas, it calls VFS */ - /* So fscking what? fput() had been SMP-safe since the last Summer */ - scm_destroy(&scm); - sock_wfree(skb); -} - -/* - * The "user->unix_inflight" variable is protected by the garbage - * collection lock, and we just read it locklessly here. If you go - * over the limit, there might be a tiny race in actually noticing - * it across threads. Tough. - */ -static inline bool too_many_unix_fds(struct task_struct *p) -{ - struct user_struct *user = current_user(); - - if (unlikely(user->unix_inflight > task_rlimit(p, RLIMIT_NOFILE))) - return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN); - return false; -} - -static int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb) -{ - int i; - - if (too_many_unix_fds(current)) - return -ETOOMANYREFS; - - /* - * Need to duplicate file references for the sake of garbage - * collection. Otherwise a socket in the fps might become a - * candidate for GC while the skb is not yet queued. - */ - UNIXCB(skb).fp = scm_fp_dup(scm->fp); - if (!UNIXCB(skb).fp) - return -ENOMEM; - - for (i = scm->fp->count - 1; i >= 0; i--) - unix_inflight(scm->fp->user, scm->fp->fp[i]); - return 0; -} - static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool send_fds) { int err = 0; diff --git a/net/unix/garbage.c b/net/unix/garbage.c index c36757e72844..8bbe1b8e4ff7 100644 --- a/net/unix/garbage.c +++ b/net/unix/garbage.c @@ -86,77 +86,13 @@ #include <net/scm.h> #include <net/tcp_states.h> +#include "scm.h" + /* Internal data structures and random procedures: */ -static LIST_HEAD(gc_inflight_list); static LIST_HEAD(gc_candidates); -static DEFINE_SPINLOCK(unix_gc_lock); static DECLARE_WAIT_QUEUE_HEAD(unix_gc_wait); -unsigned int unix_tot_inflight; - -struct sock *unix_get_socket(struct file *filp) -{ - struct sock *u_sock = NULL; - struct inode *inode = file_inode(filp); - - /* Socket ? */ - if (S_ISSOCK(inode->i_mode) && !(filp->f_mode & FMODE_PATH)) { - struct socket *sock = SOCKET_I(inode); - struct sock *s = sock->sk; - - /* PF_UNIX ? */ - if (s && sock->ops && sock->ops->family == PF_UNIX) - u_sock = s; - } - return u_sock; -} - -/* Keep the number of times in flight count for the file - * descriptor if it is for an AF_UNIX socket. - */ - -void unix_inflight(struct user_struct *user, struct file *fp) -{ - struct sock *s = unix_get_socket(fp); - - spin_lock(&unix_gc_lock); - - if (s) { - struct unix_sock *u = unix_sk(s); - - if (atomic_long_inc_return(&u->inflight) == 1) { - BUG_ON(!list_empty(&u->link)); - list_add_tail(&u->link, &gc_inflight_list); - } else { - BUG_ON(list_empty(&u->link)); - } - unix_tot_inflight++; - } - user->unix_inflight++; - spin_unlock(&unix_gc_lock); -} - -void unix_notinflight(struct user_struct *user, struct file *fp) -{ - struct sock *s = unix_get_socket(fp); - - spin_lock(&unix_gc_lock); - - if (s) { - struct unix_sock *u = unix_sk(s); - - BUG_ON(!atomic_long_read(&u->inflight)); - BUG_ON(list_empty(&u->link)); - - if (atomic_long_dec_and_test(&u->inflight)) - list_del_init(&u->link); - unix_tot_inflight--; - } - user->unix_inflight--; - spin_unlock(&unix_gc_lock); -} - static void scan_inflight(struct sock *x, void (*func)(struct unix_sock *), struct sk_buff_head *hitlist) { diff --git a/net/unix/scm.c b/net/unix/scm.c new file mode 100644 index 000000000000..8c40f2b32392 --- /dev/null +++ b/net/unix/scm.c @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: GPL-2.0 +#include <linux/module.h> +#include <linux/kernel.h> +#include <linux/string.h> +#include <linux/socket.h> +#include <linux/net.h> +#include <linux/fs.h> +#include <net/af_unix.h> +#include <net/scm.h> +#include <linux/init.h> + +#include "scm.h" + +unsigned int unix_tot_inflight; +EXPORT_SYMBOL(unix_tot_inflight); + +LIST_HEAD(gc_inflight_list); +EXPORT_SYMBOL(gc_inflight_list); + +DEFINE_SPINLOCK(unix_gc_lock); +EXPORT_SYMBOL(unix_gc_lock); + +struct sock *unix_get_socket(struct file *filp) +{ + struct sock *u_sock = NULL; + struct inode *inode = file_inode(filp); + + /* Socket ? */ + if (S_ISSOCK(inode->i_mode) && !(filp->f_mode & FMODE_PATH)) { + struct socket *sock = SOCKET_I(inode); + struct sock *s = sock->sk; + + /* PF_UNIX ? */ + if (s && sock->ops && sock->ops->family == PF_UNIX) + u_sock = s; + } else { + /* Could be an io_uring instance */ + u_sock = io_uring_get_socket(filp); + } + return u_sock; +} +EXPORT_SYMBOL(unix_get_socket); + +/* Keep the number of times in flight count for the file + * descriptor if it is for an AF_UNIX socket. + */ +void unix_inflight(struct user_struct *user, struct file *fp) +{ + struct sock *s = unix_get_socket(fp); + + spin_lock(&unix_gc_lock); + + if (s) { + struct unix_sock *u = unix_sk(s); + + if (atomic_long_inc_return(&u->inflight) == 1) { + BUG_ON(!list_empty(&u->link)); + list_add_tail(&u->link, &gc_inflight_list); + } else { + BUG_ON(list_empty(&u->link)); + } + unix_tot_inflight++; + } + user->unix_inflight++; + spin_unlock(&unix_gc_lock); +} + +void unix_notinflight(struct user_struct *user, struct file *fp) +{ + struct sock *s = unix_get_socket(fp); + + spin_lock(&unix_gc_lock); + + if (s) { + struct unix_sock *u = unix_sk(s); + + BUG_ON(!atomic_long_read(&u->inflight)); + BUG_ON(list_empty(&u->link)); + + if (atomic_long_dec_and_test(&u->inflight)) + list_del_init(&u->link); + unix_tot_inflight--; + } + user->unix_inflight--; + spin_unlock(&unix_gc_lock); +} + +/* + * The "user->unix_inflight" variable is protected by the garbage + * collection lock, and we just read it locklessly here. If you go + * over the limit, there might be a tiny race in actually noticing + * it across threads. Tough. + */ +static inline bool too_many_unix_fds(struct task_struct *p) +{ + struct user_struct *user = current_user(); + + if (unlikely(user->unix_inflight > task_rlimit(p, RLIMIT_NOFILE))) + return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN); + return false; +} + +int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb) +{ + int i; + + if (too_many_unix_fds(current)) + return -ETOOMANYREFS; + + /* + * Need to duplicate file references for the sake of garbage + * collection. Otherwise a socket in the fps might become a + * candidate for GC while the skb is not yet queued. + */ + UNIXCB(skb).fp = scm_fp_dup(scm->fp); + if (!UNIXCB(skb).fp) + return -ENOMEM; + + for (i = scm->fp->count - 1; i >= 0; i--) + unix_inflight(scm->fp->user, scm->fp->fp[i]); + return 0; +} +EXPORT_SYMBOL(unix_attach_fds); + +void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb) +{ + int i; + + scm->fp = UNIXCB(skb).fp; + UNIXCB(skb).fp = NULL; + + for (i = scm->fp->count-1; i >= 0; i--) + unix_notinflight(scm->fp->user, scm->fp->fp[i]); +} +EXPORT_SYMBOL(unix_detach_fds); + +void unix_destruct_scm(struct sk_buff *skb) +{ + struct scm_cookie scm; + + memset(&scm, 0, sizeof(scm)); + scm.pid = UNIXCB(skb).pid; + if (UNIXCB(skb).fp) + unix_detach_fds(&scm, skb); + + /* Alas, it calls VFS */ + /* So fscking what? fput() had been SMP-safe since the last Summer */ + scm_destroy(&scm); + sock_wfree(skb); +} +EXPORT_SYMBOL(unix_destruct_scm); diff --git a/net/unix/scm.h b/net/unix/scm.h new file mode 100644 index 000000000000..5a255a477f16 --- /dev/null +++ b/net/unix/scm.h @@ -0,0 +1,10 @@ +#ifndef NET_UNIX_SCM_H +#define NET_UNIX_SCM_H + +extern struct list_head gc_inflight_list; +extern spinlock_t unix_gc_lock; + +int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb); +void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb); + +#endif diff --git a/tools/io_uring/Makefile b/tools/io_uring/Makefile new file mode 100644 index 000000000000..f79522fc37b5 --- /dev/null +++ b/tools/io_uring/Makefile @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: GPL-2.0 +# Makefile for io_uring test tools +CFLAGS += -Wall -Wextra -g -D_GNU_SOURCE +LDLIBS += -lpthread + +all: io_uring-cp io_uring-bench +%: %.c + $(CC) $(CFLAGS) -o $@ $^ + +io_uring-bench: syscall.o io_uring-bench.o + $(CC) $(CFLAGS) $(LDLIBS) -o $@ $^ + +io_uring-cp: setup.o syscall.o queue.o + +clean: + $(RM) io_uring-cp io_uring-bench *.o + +.PHONY: all clean diff --git a/tools/io_uring/README b/tools/io_uring/README new file mode 100644 index 000000000000..67fd70115cff --- /dev/null +++ b/tools/io_uring/README @@ -0,0 +1,29 @@ +This directory includes a few programs that demonstrate how to use io_uring +in an application. The examples are: + +io_uring-cp + A very basic io_uring implementation of cp(1). It takes two + arguments, copies the first argument to the second. This example + is part of liburing, and hence uses the simplified liburing API + for setting up an io_uring instance, submitting IO, completing IO, + etc. The support functions in queue.c and setup.c are straight + out of liburing. + +io_uring-bench + Benchmark program that does random reads on a number of files. This + app demonstrates the various features of io_uring, like fixed files, + fixed buffers, and polled IO. There are options in the program to + control which features to use. Arguments is the file (or files) that + io_uring-bench should operate on. This uses the raw io_uring + interface. + +liburing can be cloned with git here: + + git://git.kernel.dk/liburing + +and contains a number of unit tests as well for testing io_uring. It also +comes with man pages for the three system calls. + +Fio includes an io_uring engine, you can clone fio here: + + git://git.kernel.dk/fio diff --git a/tools/io_uring/barrier.h b/tools/io_uring/barrier.h new file mode 100644 index 000000000000..ef00f6722ba9 --- /dev/null +++ b/tools/io_uring/barrier.h @@ -0,0 +1,16 @@ +#ifndef LIBURING_BARRIER_H +#define LIBURING_BARRIER_H + +#if defined(__x86_64) || defined(__i386__) +#define read_barrier() __asm__ __volatile__("":::"memory") +#define write_barrier() __asm__ __volatile__("":::"memory") +#else +/* + * Add arch appropriate definitions. Be safe and use full barriers for + * archs we don't have support for. + */ +#define read_barrier() __sync_synchronize() +#define write_barrier() __sync_synchronize() +#endif + +#endif diff --git a/tools/io_uring/io_uring-bench.c b/tools/io_uring/io_uring-bench.c new file mode 100644 index 000000000000..512306a37531 --- /dev/null +++ b/tools/io_uring/io_uring-bench.c @@ -0,0 +1,616 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Simple benchmark program that uses the various features of io_uring + * to provide fast random access to a device/file. It has various + * options that are control how we use io_uring, see the OPTIONS section + * below. This uses the raw io_uring interface. + * + * Copyright (C) 2018-2019 Jens Axboe + */ +#include <stdio.h> +#include <errno.h> +#include <assert.h> +#include <stdlib.h> +#include <stddef.h> +#include <signal.h> +#include <inttypes.h> + +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/ioctl.h> +#include <sys/syscall.h> +#include <sys/resource.h> +#include <sys/mman.h> +#include <sys/uio.h> +#include <linux/fs.h> +#include <fcntl.h> +#include <unistd.h> +#include <string.h> +#include <pthread.h> +#include <sched.h> + +#include "liburing.h" +#include "barrier.h" + +#ifndef IOCQE_FLAG_CACHEHIT +#define IOCQE_FLAG_CACHEHIT (1U << 0) +#endif + +#define min(a, b) ((a < b) ? (a) : (b)) + +struct io_sq_ring { + unsigned *head; + unsigned *tail; + unsigned *ring_mask; + unsigned *ring_entries; + unsigned *flags; + unsigned *array; +}; + +struct io_cq_ring { + unsigned *head; + unsigned *tail; + unsigned *ring_mask; + unsigned *ring_entries; + struct io_uring_cqe *cqes; +}; + +#define DEPTH 128 + +#define BATCH_SUBMIT 32 +#define BATCH_COMPLETE 32 + +#define BS 4096 + +#define MAX_FDS 16 + +static unsigned sq_ring_mask, cq_ring_mask; + +struct file { + unsigned long max_blocks; + unsigned pending_ios; + int real_fd; + int fixed_fd; +}; + +struct submitter { + pthread_t thread; + int ring_fd; + struct drand48_data rand; + struct io_sq_ring sq_ring; + struct io_uring_sqe *sqes; + struct iovec iovecs[DEPTH]; + struct io_cq_ring cq_ring; + int inflight; + unsigned long reaps; + unsigned long done; + unsigned long calls; + unsigned long cachehit, cachemiss; + volatile int finish; + + __s32 *fds; + + struct file files[MAX_FDS]; + unsigned nr_files; + unsigned cur_file; +}; + +static struct submitter submitters[1]; +static volatile int finish; + +/* + * OPTIONS: Set these to test the various features of io_uring. + */ +static int polled = 1; /* use IO polling */ +static int fixedbufs = 1; /* use fixed user buffers */ +static int register_files = 1; /* use fixed files */ +static int buffered = 0; /* use buffered IO, not O_DIRECT */ +static int sq_thread_poll = 0; /* use kernel submission/poller thread */ +static int sq_thread_cpu = -1; /* pin above thread to this CPU */ +static int do_nop = 0; /* no-op SQ ring commands */ + +static int io_uring_register_buffers(struct submitter *s) +{ + if (do_nop) + return 0; + + return io_uring_register(s->ring_fd, IORING_REGISTER_BUFFERS, s->iovecs, + DEPTH); +} + +static int io_uring_register_files(struct submitter *s) +{ + unsigned i; + + if (do_nop) + return 0; + + s->fds = calloc(s->nr_files, sizeof(__s32)); + for (i = 0; i < s->nr_files; i++) { + s->fds[i] = s->files[i].real_fd; + s->files[i].fixed_fd = i; + } + + return io_uring_register(s->ring_fd, IORING_REGISTER_FILES, s->fds, + s->nr_files); +} + +static int gettid(void) +{ + return syscall(__NR_gettid); +} + +static unsigned file_depth(struct submitter *s) +{ + return (DEPTH + s->nr_files - 1) / s->nr_files; +} + +static void init_io(struct submitter *s, unsigned index) +{ + struct io_uring_sqe *sqe = &s->sqes[index]; + unsigned long offset; + struct file *f; + long r; + + if (do_nop) { + sqe->opcode = IORING_OP_NOP; + return; + } + + if (s->nr_files == 1) { + f = &s->files[0]; + } else { + f = &s->files[s->cur_file]; + if (f->pending_ios >= file_depth(s)) { + s->cur_file++; + if (s->cur_file == s->nr_files) + s->cur_file = 0; + f = &s->files[s->cur_file]; + } + } + f->pending_ios++; + + lrand48_r(&s->rand, &r); + offset = (r % (f->max_blocks - 1)) * BS; + + if (register_files) { + sqe->flags = IOSQE_FIXED_FILE; + sqe->fd = f->fixed_fd; + } else { + sqe->flags = 0; + sqe->fd = f->real_fd; + } + if (fixedbufs) { + sqe->opcode = IORING_OP_READ_FIXED; + sqe->addr = (unsigned long) s->iovecs[index].iov_base; + sqe->len = BS; + sqe->buf_index = index; + } else { + sqe->opcode = IORING_OP_READV; + sqe->addr = (unsigned long) &s->iovecs[index]; + sqe->len = 1; + sqe->buf_index = 0; + } + sqe->ioprio = 0; + sqe->off = offset; + sqe->user_data = (unsigned long) f; +} + +static int prep_more_ios(struct submitter *s, unsigned max_ios) +{ + struct io_sq_ring *ring = &s->sq_ring; + unsigned index, tail, next_tail, prepped = 0; + + next_tail = tail = *ring->tail; + do { + next_tail++; + read_barrier(); + if (next_tail == *ring->head) + break; + + index = tail & sq_ring_mask; + init_io(s, index); + ring->array[index] = index; + prepped++; + tail = next_tail; + } while (prepped < max_ios); + + if (*ring->tail != tail) { + /* order tail store with writes to sqes above */ + write_barrier(); + *ring->tail = tail; + write_barrier(); + } + return prepped; +} + +static int get_file_size(struct file *f) +{ + struct stat st; + + if (fstat(f->real_fd, &st) < 0) + return -1; + if (S_ISBLK(st.st_mode)) { + unsigned long long bytes; + + if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0) + return -1; + + f->max_blocks = bytes / BS; + return 0; + } else if (S_ISREG(st.st_mode)) { + f->max_blocks = st.st_size / BS; + return 0; + } + + return -1; +} + +static int reap_events(struct submitter *s) +{ + struct io_cq_ring *ring = &s->cq_ring; + struct io_uring_cqe *cqe; + unsigned head, reaped = 0; + + head = *ring->head; + do { + struct file *f; + + read_barrier(); + if (head == *ring->tail) + break; + cqe = &ring->cqes[head & cq_ring_mask]; + if (!do_nop) { + f = (struct file *) (uintptr_t) cqe->user_data; + f->pending_ios--; + if (cqe->res != BS) { + printf("io: unexpected ret=%d\n", cqe->res); + if (polled && cqe->res == -EOPNOTSUPP) + printf("Your filesystem doesn't support poll\n"); + return -1; + } + } + if (cqe->flags & IOCQE_FLAG_CACHEHIT) + s->cachehit++; + else + s->cachemiss++; + reaped++; + head++; + } while (1); + + s->inflight -= reaped; + *ring->head = head; + write_barrier(); + return reaped; +} + +static void *submitter_fn(void *data) +{ + struct submitter *s = data; + struct io_sq_ring *ring = &s->sq_ring; + int ret, prepped; + + printf("submitter=%d\n", gettid()); + + srand48_r(pthread_self(), &s->rand); + + prepped = 0; + do { + int to_wait, to_submit, this_reap, to_prep; + + if (!prepped && s->inflight < DEPTH) { + to_prep = min(DEPTH - s->inflight, BATCH_SUBMIT); + prepped = prep_more_ios(s, to_prep); + } + s->inflight += prepped; +submit_more: + to_submit = prepped; +submit: + if (to_submit && (s->inflight + to_submit <= DEPTH)) + to_wait = 0; + else + to_wait = min(s->inflight + to_submit, BATCH_COMPLETE); + + /* + * Only need to call io_uring_enter if we're not using SQ thread + * poll, or if IORING_SQ_NEED_WAKEUP is set. + */ + if (!sq_thread_poll || (*ring->flags & IORING_SQ_NEED_WAKEUP)) { + unsigned flags = 0; + + if (to_wait) + flags = IORING_ENTER_GETEVENTS; + if ((*ring->flags & IORING_SQ_NEED_WAKEUP)) + flags |= IORING_ENTER_SQ_WAKEUP; + ret = io_uring_enter(s->ring_fd, to_submit, to_wait, + flags, NULL); + s->calls++; + } + + /* + * For non SQ thread poll, we already got the events we needed + * through the io_uring_enter() above. For SQ thread poll, we + * need to loop here until we find enough events. + */ + this_reap = 0; + do { + int r; + r = reap_events(s); + if (r == -1) { + s->finish = 1; + break; + } else if (r > 0) + this_reap += r; + } while (sq_thread_poll && this_reap < to_wait); + s->reaps += this_reap; + + if (ret >= 0) { + if (!ret) { + to_submit = 0; + if (s->inflight) + goto submit; + continue; + } else if (ret < to_submit) { + int diff = to_submit - ret; + + s->done += ret; + prepped -= diff; + goto submit_more; + } + s->done += ret; + prepped = 0; + continue; + } else if (ret < 0) { + if (errno == EAGAIN) { + if (s->finish) + break; + if (this_reap) + goto submit; + to_submit = 0; + goto submit; + } + printf("io_submit: %s\n", strerror(errno)); + break; + } + } while (!s->finish); + + finish = 1; + return NULL; +} + +static void sig_int(int sig) +{ + printf("Exiting on signal %d\n", sig); + submitters[0].finish = 1; + finish = 1; +} + +static void arm_sig_int(void) +{ + struct sigaction act; + + memset(&act, 0, sizeof(act)); + act.sa_handler = sig_int; + act.sa_flags = SA_RESTART; + sigaction(SIGINT, &act, NULL); +} + +static int setup_ring(struct submitter *s) +{ + struct io_sq_ring *sring = &s->sq_ring; + struct io_cq_ring *cring = &s->cq_ring; + struct io_uring_params p; + int ret, fd; + void *ptr; + + memset(&p, 0, sizeof(p)); + + if (polled && !do_nop) + p.flags |= IORING_SETUP_IOPOLL; + if (sq_thread_poll) { + p.flags |= IORING_SETUP_SQPOLL; + if (sq_thread_cpu != -1) { + p.flags |= IORING_SETUP_SQ_AFF; + p.sq_thread_cpu = sq_thread_cpu; + } + } + + fd = io_uring_setup(DEPTH, &p); + if (fd < 0) { + perror("io_uring_setup"); + return 1; + } + s->ring_fd = fd; + + if (fixedbufs) { + ret = io_uring_register_buffers(s); + if (ret < 0) { + perror("io_uring_register_buffers"); + return 1; + } + } + + if (register_files) { + ret = io_uring_register_files(s); + if (ret < 0) { + perror("io_uring_register_files"); + return 1; + } + } + + ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, + IORING_OFF_SQ_RING); + printf("sq_ring ptr = 0x%p\n", ptr); + sring->head = ptr + p.sq_off.head; + sring->tail = ptr + p.sq_off.tail; + sring->ring_mask = ptr + p.sq_off.ring_mask; + sring->ring_entries = ptr + p.sq_off.ring_entries; + sring->flags = ptr + p.sq_off.flags; + sring->array = ptr + p.sq_off.array; + sq_ring_mask = *sring->ring_mask; + + s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, + IORING_OFF_SQES); + printf("sqes ptr = 0x%p\n", s->sqes); + + ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, + IORING_OFF_CQ_RING); + printf("cq_ring ptr = 0x%p\n", ptr); + cring->head = ptr + p.cq_off.head; + cring->tail = ptr + p.cq_off.tail; + cring->ring_mask = ptr + p.cq_off.ring_mask; + cring->ring_entries = ptr + p.cq_off.ring_entries; + cring->cqes = ptr + p.cq_off.cqes; + cq_ring_mask = *cring->ring_mask; + return 0; +} + +static void file_depths(char *buf) +{ + struct submitter *s = &submitters[0]; + unsigned i; + char *p; + + buf[0] = '\0'; + p = buf; + for (i = 0; i < s->nr_files; i++) { + struct file *f = &s->files[i]; + + if (i + 1 == s->nr_files) + p += sprintf(p, "%d", f->pending_ios); + else + p += sprintf(p, "%d, ", f->pending_ios); + } +} + +int main(int argc, char *argv[]) +{ + struct submitter *s = &submitters[0]; + unsigned long done, calls, reap, cache_hit, cache_miss; + int err, i, flags, fd; + char *fdepths; + void *ret; + + if (!do_nop && argc < 2) { + printf("%s: filename\n", argv[0]); + return 1; + } + + flags = O_RDONLY | O_NOATIME; + if (!buffered) + flags |= O_DIRECT; + + i = 1; + while (!do_nop && i < argc) { + struct file *f; + + if (s->nr_files == MAX_FDS) { + printf("Max number of files (%d) reached\n", MAX_FDS); + break; + } + fd = open(argv[i], flags); + if (fd < 0) { + perror("open"); + return 1; + } + + f = &s->files[s->nr_files]; + f->real_fd = fd; + if (get_file_size(f)) { + printf("failed getting size of device/file\n"); + return 1; + } + if (f->max_blocks <= 1) { + printf("Zero file/device size?\n"); + return 1; + } + f->max_blocks--; + + printf("Added file %s\n", argv[i]); + s->nr_files++; + i++; + } + + if (fixedbufs) { + struct rlimit rlim; + + rlim.rlim_cur = RLIM_INFINITY; + rlim.rlim_max = RLIM_INFINITY; + if (setrlimit(RLIMIT_MEMLOCK, &rlim) < 0) { + perror("setrlimit"); + return 1; + } + } + + arm_sig_int(); + + for (i = 0; i < DEPTH; i++) { + void *buf; + + if (posix_memalign(&buf, BS, BS)) { + printf("failed alloc\n"); + return 1; + } + s->iovecs[i].iov_base = buf; + s->iovecs[i].iov_len = BS; + } + + err = setup_ring(s); + if (err) { + printf("ring setup failed: %s, %d\n", strerror(errno), err); + return 1; + } + printf("polled=%d, fixedbufs=%d, buffered=%d", polled, fixedbufs, buffered); + printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", DEPTH, *s->sq_ring.ring_entries, *s->cq_ring.ring_entries); + + pthread_create(&s->thread, NULL, submitter_fn, s); + + fdepths = malloc(8 * s->nr_files); + cache_hit = cache_miss = reap = calls = done = 0; + do { + unsigned long this_done = 0; + unsigned long this_reap = 0; + unsigned long this_call = 0; + unsigned long this_cache_hit = 0; + unsigned long this_cache_miss = 0; + unsigned long rpc = 0, ipc = 0; + double hit = 0.0; + + sleep(1); + this_done += s->done; + this_call += s->calls; + this_reap += s->reaps; + this_cache_hit += s->cachehit; + this_cache_miss += s->cachemiss; + if (this_cache_hit && this_cache_miss) { + unsigned long hits, total; + + hits = this_cache_hit - cache_hit; + total = hits + this_cache_miss - cache_miss; + hit = (double) hits / (double) total; + hit *= 100.0; + } + if (this_call - calls) { + rpc = (this_done - done) / (this_call - calls); + ipc = (this_reap - reap) / (this_call - calls); + } else + rpc = ipc = -1; + file_depths(fdepths); + printf("IOPS=%lu, IOS/call=%ld/%ld, inflight=%u (%s), Cachehit=%0.2f%%\n", + this_done - done, rpc, ipc, s->inflight, + fdepths, hit); + done = this_done; + calls = this_call; + reap = this_reap; + cache_hit = s->cachehit; + cache_miss = s->cachemiss; + } while (!finish); + + pthread_join(s->thread, &ret); + close(s->ring_fd); + free(fdepths); + return 0; +} diff --git a/tools/io_uring/io_uring-cp.c b/tools/io_uring/io_uring-cp.c new file mode 100644 index 000000000000..633f65bb43a7 --- /dev/null +++ b/tools/io_uring/io_uring-cp.c @@ -0,0 +1,251 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Simple test program that demonstrates a file copy through io_uring. This + * uses the API exposed by liburing. + * + * Copyright (C) 2018-2019 Jens Axboe + */ +#include <stdio.h> +#include <fcntl.h> +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <assert.h> +#include <errno.h> +#include <inttypes.h> +#include <sys/stat.h> +#include <sys/ioctl.h> + +#include "liburing.h" + +#define QD 64 +#define BS (32*1024) + +static int infd, outfd; + +struct io_data { + int read; + off_t first_offset, offset; + size_t first_len; + struct iovec iov; +}; + +static int setup_context(unsigned entries, struct io_uring *ring) +{ + int ret; + + ret = io_uring_queue_init(entries, ring, 0); + if (ret < 0) { + fprintf(stderr, "queue_init: %s\n", strerror(-ret)); + return -1; + } + + return 0; +} + +static int get_file_size(int fd, off_t *size) +{ + struct stat st; + + if (fstat(fd, &st) < 0) + return -1; + if (S_ISREG(st.st_mode)) { + *size = st.st_size; + return 0; + } else if (S_ISBLK(st.st_mode)) { + unsigned long long bytes; + + if (ioctl(fd, BLKGETSIZE64, &bytes) != 0) + return -1; + + *size = bytes; + return 0; + } + + return -1; +} + +static void queue_prepped(struct io_uring *ring, struct io_data *data) +{ + struct io_uring_sqe *sqe; + + sqe = io_uring_get_sqe(ring); + assert(sqe); + + if (data->read) + io_uring_prep_readv(sqe, infd, &data->iov, 1, data->offset); + else + io_uring_prep_writev(sqe, outfd, &data->iov, 1, data->offset); + + io_uring_sqe_set_data(sqe, data); +} + +static int queue_read(struct io_uring *ring, off_t size, off_t offset) +{ + struct io_uring_sqe *sqe; + struct io_data *data; + + sqe = io_uring_get_sqe(ring); + if (!sqe) + return 1; + + data = malloc(size + sizeof(*data)); + data->read = 1; + data->offset = data->first_offset = offset; + + data->iov.iov_base = data + 1; + data->iov.iov_len = size; + data->first_len = size; + + io_uring_prep_readv(sqe, infd, &data->iov, 1, offset); + io_uring_sqe_set_data(sqe, data); + return 0; +} + +static void queue_write(struct io_uring *ring, struct io_data *data) +{ + data->read = 0; + data->offset = data->first_offset; + + data->iov.iov_base = data + 1; + data->iov.iov_len = data->first_len; + + queue_prepped(ring, data); + io_uring_submit(ring); +} + +static int copy_file(struct io_uring *ring, off_t insize) +{ + unsigned long reads, writes; + struct io_uring_cqe *cqe; + off_t write_left, offset; + int ret; + + write_left = insize; + writes = reads = offset = 0; + + while (insize || write_left) { + unsigned long had_reads; + int got_comp; + + /* + * Queue up as many reads as we can + */ + had_reads = reads; + while (insize) { + off_t this_size = insize; + + if (reads + writes >= QD) + break; + if (this_size > BS) + this_size = BS; + else if (!this_size) + break; + + if (queue_read(ring, this_size, offset)) + break; + + insize -= this_size; + offset += this_size; + reads++; + } + + if (had_reads != reads) { + ret = io_uring_submit(ring); + if (ret < 0) { + fprintf(stderr, "io_uring_submit: %s\n", strerror(-ret)); + break; + } + } + + /* + * Queue is full at this point. Find at least one completion. + */ + got_comp = 0; + while (write_left) { + struct io_data *data; + + if (!got_comp) { + ret = io_uring_wait_completion(ring, &cqe); + got_comp = 1; + } else + ret = io_uring_get_completion(ring, &cqe); + if (ret < 0) { + fprintf(stderr, "io_uring_get_completion: %s\n", + strerror(-ret)); + return 1; + } + if (!cqe) + break; + + data = (struct io_data *) (uintptr_t) cqe->user_data; + if (cqe->res < 0) { + if (cqe->res == -EAGAIN) { + queue_prepped(ring, data); + continue; + } + fprintf(stderr, "cqe failed: %s\n", + strerror(-cqe->res)); + return 1; + } else if ((size_t) cqe->res != data->iov.iov_len) { + /* Short read/write, adjust and requeue */ + data->iov.iov_base += cqe->res; + data->iov.iov_len -= cqe->res; + data->offset += cqe->res; + queue_prepped(ring, data); + continue; + } + + /* + * All done. if write, nothing else to do. if read, + * queue up corresponding write. + */ + if (data->read) { + queue_write(ring, data); + write_left -= data->first_len; + reads--; + writes++; + } else { + free(data); + writes--; + } + } + } + + return 0; +} + +int main(int argc, char *argv[]) +{ + struct io_uring ring; + off_t insize; + int ret; + + if (argc < 3) { + printf("%s: infile outfile\n", argv[0]); + return 1; + } + + infd = open(argv[1], O_RDONLY); + if (infd < 0) { + perror("open infile"); + return 1; + } + outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (outfd < 0) { + perror("open outfile"); + return 1; + } + + if (setup_context(QD, &ring)) + return 1; + if (get_file_size(infd, &insize)) + return 1; + + ret = copy_file(&ring, insize); + + close(infd); + close(outfd); + io_uring_queue_exit(&ring); + return ret; +} diff --git a/tools/io_uring/liburing.h b/tools/io_uring/liburing.h new file mode 100644 index 000000000000..cab0f50257ba --- /dev/null +++ b/tools/io_uring/liburing.h @@ -0,0 +1,143 @@ +#ifndef LIB_URING_H +#define LIB_URING_H + +#include <sys/uio.h> +#include <signal.h> +#include <string.h> +#include "../../include/uapi/linux/io_uring.h" + +/* + * Library interface to io_uring + */ +struct io_uring_sq { + unsigned *khead; + unsigned *ktail; + unsigned *kring_mask; + unsigned *kring_entries; + unsigned *kflags; + unsigned *kdropped; + unsigned *array; + struct io_uring_sqe *sqes; + + unsigned sqe_head; + unsigned sqe_tail; + + size_t ring_sz; +}; + +struct io_uring_cq { + unsigned *khead; + unsigned *ktail; + unsigned *kring_mask; + unsigned *kring_entries; + unsigned *koverflow; + struct io_uring_cqe *cqes; + + size_t ring_sz; +}; + +struct io_uring { + struct io_uring_sq sq; + struct io_uring_cq cq; + int ring_fd; +}; + +/* + * System calls + */ +extern int io_uring_setup(unsigned entries, struct io_uring_params *p); +extern int io_uring_enter(unsigned fd, unsigned to_submit, + unsigned min_complete, unsigned flags, sigset_t *sig); +extern int io_uring_register(int fd, unsigned int opcode, void *arg, + unsigned int nr_args); + +/* + * Library interface + */ +extern int io_uring_queue_init(unsigned entries, struct io_uring *ring, + unsigned flags); +extern int io_uring_queue_mmap(int fd, struct io_uring_params *p, + struct io_uring *ring); +extern void io_uring_queue_exit(struct io_uring *ring); +extern int io_uring_get_completion(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr); +extern int io_uring_wait_completion(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr); +extern int io_uring_submit(struct io_uring *ring); +extern struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring); + +/* + * Command prep helpers + */ +static inline void io_uring_sqe_set_data(struct io_uring_sqe *sqe, void *data) +{ + sqe->user_data = (unsigned long) data; +} + +static inline void io_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd, + void *addr, unsigned len, off_t offset) +{ + memset(sqe, 0, sizeof(*sqe)); + sqe->opcode = op; + sqe->fd = fd; + sqe->off = offset; + sqe->addr = (unsigned long) addr; + sqe->len = len; +} + +static inline void io_uring_prep_readv(struct io_uring_sqe *sqe, int fd, + struct iovec *iovecs, unsigned nr_vecs, + off_t offset) +{ + io_uring_prep_rw(IORING_OP_READV, sqe, fd, iovecs, nr_vecs, offset); +} + +static inline void io_uring_prep_read_fixed(struct io_uring_sqe *sqe, int fd, + void *buf, unsigned nbytes, + off_t offset) +{ + io_uring_prep_rw(IORING_OP_READ_FIXED, sqe, fd, buf, nbytes, offset); +} + +static inline void io_uring_prep_writev(struct io_uring_sqe *sqe, int fd, + struct iovec *iovecs, unsigned nr_vecs, + off_t offset) +{ + io_uring_prep_rw(IORING_OP_WRITEV, sqe, fd, iovecs, nr_vecs, offset); +} + +static inline void io_uring_prep_write_fixed(struct io_uring_sqe *sqe, int fd, + void *buf, unsigned nbytes, + off_t offset) +{ + io_uring_prep_rw(IORING_OP_WRITE_FIXED, sqe, fd, buf, nbytes, offset); +} + +static inline void io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, + short poll_mask) +{ + memset(sqe, 0, sizeof(*sqe)); + sqe->opcode = IORING_OP_POLL_ADD; + sqe->fd = fd; + sqe->poll_events = poll_mask; +} + +static inline void io_uring_prep_poll_remove(struct io_uring_sqe *sqe, + void *user_data) +{ + memset(sqe, 0, sizeof(*sqe)); + sqe->opcode = IORING_OP_POLL_REMOVE; + sqe->addr = (unsigned long) user_data; +} + +static inline void io_uring_prep_fsync(struct io_uring_sqe *sqe, int fd, + int datasync) +{ + memset(sqe, 0, sizeof(*sqe)); + sqe->opcode = IORING_OP_FSYNC; + sqe->fd = fd; + if (datasync) + sqe->fsync_flags = IORING_FSYNC_DATASYNC; +} + +#endif diff --git a/tools/io_uring/queue.c b/tools/io_uring/queue.c new file mode 100644 index 000000000000..88505e873ad9 --- /dev/null +++ b/tools/io_uring/queue.c @@ -0,0 +1,164 @@ +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/mman.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> + +#include "liburing.h" +#include "barrier.h" + +static int __io_uring_get_completion(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr, int wait) +{ + struct io_uring_cq *cq = &ring->cq; + const unsigned mask = *cq->kring_mask; + unsigned head; + int ret; + + *cqe_ptr = NULL; + head = *cq->khead; + do { + /* + * It's necessary to use a read_barrier() before reading + * the CQ tail, since the kernel updates it locklessly. The + * kernel has the matching store barrier for the update. The + * kernel also ensures that previous stores to CQEs are ordered + * with the tail update. + */ + read_barrier(); + if (head != *cq->ktail) { + *cqe_ptr = &cq->cqes[head & mask]; + break; + } + if (!wait) + break; + ret = io_uring_enter(ring->ring_fd, 0, 1, + IORING_ENTER_GETEVENTS, NULL); + if (ret < 0) + return -errno; + } while (1); + + if (*cqe_ptr) { + *cq->khead = head + 1; + /* + * Ensure that the kernel sees our new head, the kernel has + * the matching read barrier. + */ + write_barrier(); + } + + return 0; +} + +/* + * Return an IO completion, if one is readily available + */ +int io_uring_get_completion(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr) +{ + return __io_uring_get_completion(ring, cqe_ptr, 0); +} + +/* + * Return an IO completion, waiting for it if necessary + */ +int io_uring_wait_completion(struct io_uring *ring, + struct io_uring_cqe **cqe_ptr) +{ + return __io_uring_get_completion(ring, cqe_ptr, 1); +} + +/* + * Submit sqes acquired from io_uring_get_sqe() to the kernel. + * + * Returns number of sqes submitted + */ +int io_uring_submit(struct io_uring *ring) +{ + struct io_uring_sq *sq = &ring->sq; + const unsigned mask = *sq->kring_mask; + unsigned ktail, ktail_next, submitted; + int ret; + + /* + * If we have pending IO in the kring, submit it first. We need a + * read barrier here to match the kernels store barrier when updating + * the SQ head. + */ + read_barrier(); + if (*sq->khead != *sq->ktail) { + submitted = *sq->kring_entries; + goto submit; + } + + if (sq->sqe_head == sq->sqe_tail) + return 0; + + /* + * Fill in sqes that we have queued up, adding them to the kernel ring + */ + submitted = 0; + ktail = ktail_next = *sq->ktail; + while (sq->sqe_head < sq->sqe_tail) { + ktail_next++; + read_barrier(); + + sq->array[ktail & mask] = sq->sqe_head & mask; + ktail = ktail_next; + + sq->sqe_head++; + submitted++; + } + + if (!submitted) + return 0; + + if (*sq->ktail != ktail) { + /* + * First write barrier ensures that the SQE stores are updated + * with the tail update. This is needed so that the kernel + * will never see a tail update without the preceeding sQE + * stores being done. + */ + write_barrier(); + *sq->ktail = ktail; + /* + * The kernel has the matching read barrier for reading the + * SQ tail. + */ + write_barrier(); + } + +submit: + ret = io_uring_enter(ring->ring_fd, submitted, 0, + IORING_ENTER_GETEVENTS, NULL); + if (ret < 0) + return -errno; + + return 0; +} + +/* + * Return an sqe to fill. Application must later call io_uring_submit() + * when it's ready to tell the kernel about it. The caller may call this + * function multiple times before calling io_uring_submit(). + * + * Returns a vacant sqe, or NULL if we're full. + */ +struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring) +{ + struct io_uring_sq *sq = &ring->sq; + unsigned next = sq->sqe_tail + 1; + struct io_uring_sqe *sqe; + + /* + * All sqes are used + */ + if (next - sq->sqe_head > *sq->kring_entries) + return NULL; + + sqe = &sq->sqes[sq->sqe_tail & *sq->kring_mask]; + sq->sqe_tail = next; + return sqe; +} diff --git a/tools/io_uring/setup.c b/tools/io_uring/setup.c new file mode 100644 index 000000000000..4da19a77132c --- /dev/null +++ b/tools/io_uring/setup.c @@ -0,0 +1,103 @@ +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/mman.h> +#include <unistd.h> +#include <errno.h> +#include <string.h> + +#include "liburing.h" + +static int io_uring_mmap(int fd, struct io_uring_params *p, + struct io_uring_sq *sq, struct io_uring_cq *cq) +{ + size_t size; + void *ptr; + int ret; + + sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned); + ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); + if (ptr == MAP_FAILED) + return -errno; + sq->khead = ptr + p->sq_off.head; + sq->ktail = ptr + p->sq_off.tail; + sq->kring_mask = ptr + p->sq_off.ring_mask; + sq->kring_entries = ptr + p->sq_off.ring_entries; + sq->kflags = ptr + p->sq_off.flags; + sq->kdropped = ptr + p->sq_off.dropped; + sq->array = ptr + p->sq_off.array; + + size = p->sq_entries * sizeof(struct io_uring_sqe), + sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, fd, + IORING_OFF_SQES); + if (sq->sqes == MAP_FAILED) { + ret = -errno; +err: + munmap(sq->khead, sq->ring_sz); + return ret; + } + + cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe); + ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); + if (ptr == MAP_FAILED) { + ret = -errno; + munmap(sq->sqes, p->sq_entries * sizeof(struct io_uring_sqe)); + goto err; + } + cq->khead = ptr + p->cq_off.head; + cq->ktail = ptr + p->cq_off.tail; + cq->kring_mask = ptr + p->cq_off.ring_mask; + cq->kring_entries = ptr + p->cq_off.ring_entries; + cq->koverflow = ptr + p->cq_off.overflow; + cq->cqes = ptr + p->cq_off.cqes; + return 0; +} + +/* + * For users that want to specify sq_thread_cpu or sq_thread_idle, this + * interface is a convenient helper for mmap()ing the rings. + * Returns -1 on error, or zero on success. On success, 'ring' + * contains the necessary information to read/write to the rings. + */ +int io_uring_queue_mmap(int fd, struct io_uring_params *p, struct io_uring *ring) +{ + int ret; + + memset(ring, 0, sizeof(*ring)); + ret = io_uring_mmap(fd, p, &ring->sq, &ring->cq); + if (!ret) + ring->ring_fd = fd; + return ret; +} + +/* + * Returns -1 on error, or zero on success. On success, 'ring' + * contains the necessary information to read/write to the rings. + */ +int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags) +{ + struct io_uring_params p; + int fd; + + memset(&p, 0, sizeof(p)); + p.flags = flags; + + fd = io_uring_setup(entries, &p); + if (fd < 0) + return fd; + + return io_uring_queue_mmap(fd, &p, ring); +} + +void io_uring_queue_exit(struct io_uring *ring) +{ + struct io_uring_sq *sq = &ring->sq; + struct io_uring_cq *cq = &ring->cq; + + munmap(sq->sqes, *sq->kring_entries * sizeof(struct io_uring_sqe)); + munmap(sq->khead, sq->ring_sz); + munmap(cq->khead, cq->ring_sz); + close(ring->ring_fd); +} diff --git a/tools/io_uring/syscall.c b/tools/io_uring/syscall.c new file mode 100644 index 000000000000..6b835e5c6a5b --- /dev/null +++ b/tools/io_uring/syscall.c @@ -0,0 +1,40 @@ +/* + * Will go away once libc support is there + */ +#include <unistd.h> +#include <sys/syscall.h> +#include <sys/uio.h> +#include <signal.h> +#include "liburing.h" + +#if defined(__x86_64) || defined(__i386__) +#ifndef __NR_sys_io_uring_setup +#define __NR_sys_io_uring_setup 425 +#endif +#ifndef __NR_sys_io_uring_enter +#define __NR_sys_io_uring_enter 426 +#endif +#ifndef __NR_sys_io_uring_register +#define __NR_sys_io_uring_register 427 +#endif +#else +#error "Arch not supported yet" +#endif + +int io_uring_register(int fd, unsigned int opcode, void *arg, + unsigned int nr_args) +{ + return syscall(__NR_sys_io_uring_register, fd, opcode, arg, nr_args); +} + +int io_uring_setup(unsigned entries, struct io_uring_params *p) +{ + return syscall(__NR_sys_io_uring_setup, entries, p); +} + +int io_uring_enter(unsigned fd, unsigned to_submit, unsigned min_complete, + unsigned flags, sigset_t *sig) +{ + return syscall(__NR_sys_io_uring_enter, fd, to_submit, min_complete, + flags, sig, _NSIG / 8); +} |