diff options
-rw-r--r-- | Documentation/filesystems/ceph.txt | 5 | ||||
-rw-r--r-- | drivers/block/rbd.c | 28 | ||||
-rw-r--r-- | fs/ceph/acl.c | 13 | ||||
-rw-r--r-- | fs/ceph/addr.c | 2 | ||||
-rw-r--r-- | fs/ceph/caps.c | 21 | ||||
-rw-r--r-- | fs/ceph/file.c | 573 | ||||
-rw-r--r-- | fs/ceph/inode.c | 13 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 9 | ||||
-rw-r--r-- | fs/ceph/super.c | 13 | ||||
-rw-r--r-- | fs/ceph/super.h | 3 | ||||
-rw-r--r-- | fs/ceph/xattr.c | 3 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 8 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 24 | ||||
-rw-r--r-- | include/linux/ceph/msgpool.h | 11 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 22 | ||||
-rw-r--r-- | include/linux/ceph/pagelist.h | 11 | ||||
-rw-r--r-- | include/linux/ceph/rados.h | 28 | ||||
-rw-r--r-- | net/ceph/messenger.c | 107 | ||||
-rw-r--r-- | net/ceph/msgpool.c | 27 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 363 | ||||
-rw-r--r-- | net/ceph/pagelist.c | 20 |
21 files changed, 900 insertions, 404 deletions
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt index 8bf62240e10d..1177052701e1 100644 --- a/Documentation/filesystems/ceph.txt +++ b/Documentation/filesystems/ceph.txt @@ -151,6 +151,11 @@ Mount Options Report overall filesystem usage in statfs instead of using the root directory quota. + nocopyfrom + Don't use the RADOS 'copy-from' operation to perform remote object + copies. Currently, it's only used in copy_file_range, which will revert + to the default VFS implementation if this option is used. + More Information ================ diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 73ed5f3a862d..8e5140bbf241 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) goto err_req; - if (ceph_osdc_alloc_messages(req, GFP_NOIO)) - goto err_req; - return req; err_req: @@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) } if (ret) return ret; + + ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + if (ret) + return ret; } return 0; @@ -2374,8 +2375,7 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) if (!obj_req->osd_req) return -ENOMEM; - ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", - "copyup"); + ret = osd_req_op_cls_init(obj_req->osd_req, 0, "rbd", "copyup"); if (ret) return ret; @@ -2405,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) rbd_assert(0); } + ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO); + if (ret) + return ret; + rbd_obj_request_submit(obj_req); return 0; } @@ -3784,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); - if (ret) - goto out_req; - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -3798,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, true); + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) @@ -6067,7 +6071,7 @@ static ssize_t rbd_remove_single_major(struct bus_type *bus, * create control files in sysfs * /sys/bus/rbd/... */ -static int rbd_sysfs_init(void) +static int __init rbd_sysfs_init(void) { int ret; @@ -6082,13 +6086,13 @@ static int rbd_sysfs_init(void) return ret; } -static void rbd_sysfs_cleanup(void) +static void __exit rbd_sysfs_cleanup(void) { bus_unregister(&rbd_bus_type); device_unregister(&rbd_root_dev); } -static int rbd_slab_init(void) +static int __init rbd_slab_init(void) { rbd_assert(!rbd_img_request_cache); rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0); diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 027408d55aee..5f0103f40079 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -104,6 +104,11 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type) struct timespec64 old_ctime = inode->i_ctime; umode_t new_mode = inode->i_mode, old_mode = inode->i_mode; + if (ceph_snap(inode) != CEPH_NOSNAP) { + ret = -EROFS; + goto out; + } + switch (type) { case ACL_TYPE_ACCESS: name = XATTR_NAME_POSIX_ACL_ACCESS; @@ -138,11 +143,6 @@ int ceph_set_acl(struct inode *inode, struct posix_acl *acl, int type) goto out_free; } - if (ceph_snap(inode) != CEPH_NOSNAP) { - ret = -EROFS; - goto out_free; - } - if (new_mode != old_mode) { newattrs.ia_ctime = current_time(inode); newattrs.ia_mode = new_mode; @@ -206,10 +206,9 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode, tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL); if (!tmp_buf) goto out_err; - pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL); + pagelist = ceph_pagelist_alloc(GFP_KERNEL); if (!pagelist) goto out_err; - ceph_pagelist_init(pagelist); err = ceph_pagelist_reserve(pagelist, PAGE_SIZE); if (err) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 9c332a6f6667..8eade7a993c1 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -322,7 +322,7 @@ static int start_read(struct inode *inode, struct ceph_rw_context *rw_ctx, /* caller of readpages does not hold buffer and read caps * (fadvise, madvise and readahead cases) */ int want = CEPH_CAP_FILE_CACHE; - ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got); + ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, true, &got); if (ret < 0) { dout("start_read %p, error getting cap\n", inode); } else if (!(got & want)) { diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index dd7dfdd2ba13..f3496db4bb3e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -519,9 +519,9 @@ static void __cap_set_timeouts(struct ceph_mds_client *mdsc, * -> we take mdsc->cap_delay_lock */ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, - struct ceph_inode_info *ci) + struct ceph_inode_info *ci, + bool set_timeout) { - __cap_set_timeouts(mdsc, ci); dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, ci->i_ceph_flags, ci->i_hold_caps_max); if (!mdsc->stopping) { @@ -531,6 +531,8 @@ static void __cap_delay_requeue(struct ceph_mds_client *mdsc, goto no_change; list_del_init(&ci->i_cap_delay_list); } + if (set_timeout) + __cap_set_timeouts(mdsc, ci); list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); no_change: spin_unlock(&mdsc->cap_delay_lock); @@ -720,7 +722,7 @@ void ceph_add_cap(struct inode *inode, dout(" issued %s, mds wanted %s, actual %s, queueing\n", ceph_cap_string(issued), ceph_cap_string(wanted), ceph_cap_string(actual_wanted)); - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, true); } if (flags & CEPH_CAP_FLAG_AUTH) { @@ -1647,7 +1649,7 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && (mask & CEPH_CAP_FILE_BUFFER)) dirty |= I_DIRTY_DATASYNC; - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, true); return dirty; } @@ -2065,7 +2067,7 @@ ack: /* Reschedule delayed caps release if we delayed anything */ if (delayed) - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, false); spin_unlock(&ci->i_ceph_lock); @@ -2125,7 +2127,7 @@ retry: if (delayed) { spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); + __cap_delay_requeue(mdsc, ci, true); spin_unlock(&ci->i_ceph_lock); } } else { @@ -2671,17 +2673,18 @@ static void check_max_size(struct inode *inode, loff_t endoff) ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); } -int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) +int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, + bool nonblock, int *got) { int ret, err = 0; BUG_ON(need & ~CEPH_CAP_FILE_RD); - BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); + BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED)); ret = ceph_pool_perm_check(ci, need); if (ret < 0) return ret; - ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); + ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err); if (ret) { if (err == -EAGAIN) { ret = 0; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 92ab20433682..f788496fafcc 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -1,5 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 #include <linux/ceph/ceph_debug.h> +#include <linux/ceph/striper.h> #include <linux/module.h> #include <linux/sched.h> @@ -557,90 +558,26 @@ enum { }; /* - * Read a range of bytes striped over one or more objects. Iterate over - * objects we stripe over. (That's not atomic, but good enough for now.) - * - * If we get a short result from the OSD, check against i_size; we need to - * only return a short read to the caller if we hit EOF. - */ -static int striped_read(struct inode *inode, - u64 pos, u64 len, - struct page **pages, int num_pages, - int page_align, int *checkeof) -{ - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_inode_info *ci = ceph_inode(inode); - u64 this_len; - loff_t i_size; - int page_idx; - int ret, read = 0; - bool hit_stripe, was_short; - - /* - * we may need to do multiple reads. not atomic, unfortunately. - */ -more: - this_len = len; - page_idx = (page_align + read) >> PAGE_SHIFT; - ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), - &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, ci->i_truncate_size, - pages + page_idx, num_pages - page_idx, - ((page_align + read) & ~PAGE_MASK)); - if (ret == -ENOENT) - ret = 0; - hit_stripe = this_len < len; - was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, - ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - - i_size = i_size_read(inode); - if (ret >= 0) { - if (was_short && (pos + ret < i_size)) { - int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = page_align + read + ret; - dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); - ceph_zero_page_vector_range(zoff, zlen, pages); - ret += zlen; - } - - read += ret; - pos += ret; - len -= ret; - - /* hit stripe and need continue*/ - if (len && hit_stripe && pos < i_size) - goto more; - } - - if (read > 0) { - ret = read; - /* did we bounce off eof? */ - if (pos + len > i_size) - *checkeof = CHECK_EOF; - } - - dout("striped_read returns %d\n", ret); - return ret; -} - -/* * Completely synchronous read and write methods. Direct from __user * buffer to osd, or directly to user pages (if O_DIRECT). * - * If the read spans object boundary, just do multiple reads. + * If the read spans object boundary, just do multiple reads. (That's not + * atomic, but good enough for now.) + * + * If we get a short result from the OSD, check against i_size; we need to + * only return a short read to the caller if we hit EOF. */ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, - int *checkeof) + int *retry_op) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); - struct page **pages; - u64 off = iocb->ki_pos; - int num_pages; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_client *osdc = &fsc->client->osdc; ssize_t ret; - size_t len = iov_iter_count(to); + u64 off = iocb->ki_pos; + u64 len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); @@ -653,61 +590,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait_range(inode->i_mapping, off, - off + len); + ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len); if (ret < 0) return ret; - if (unlikely(to->type & ITER_PIPE)) { + ret = 0; + while ((len = iov_iter_count(to)) > 0) { + struct ceph_osd_request *req; + struct page **pages; + int num_pages; size_t page_off; - ret = iov_iter_get_pages_alloc(to, &pages, len, - &page_off); - if (ret <= 0) - return -ENOMEM; - num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + u64 i_size; + bool more; + + req = ceph_osdc_new_request(osdc, &ci->i_layout, + ci->i_vino, off, &len, 0, 1, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + break; + } + + more = len < iov_iter_count(to); - ret = striped_read(inode, off, ret, pages, num_pages, - page_off, checkeof); - if (ret > 0) { - iov_iter_advance(to, ret); - off += ret; + if (unlikely(to->type & ITER_PIPE)) { + ret = iov_iter_get_pages_alloc(to, &pages, len, + &page_off); + if (ret <= 0) { + ceph_osdc_put_request(req); + ret = -ENOMEM; + break; + } + num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + if (ret < len) { + len = ret; + osd_req_op_extent_update(req, 0, len); + more = false; + } } else { - iov_iter_advance(to, 0); + num_pages = calc_pages_for(off, len); + page_off = off & ~PAGE_MASK; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ceph_osdc_put_request(req); + ret = PTR_ERR(pages); + break; + } } - ceph_put_page_vector(pages, num_pages, false); - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = striped_read(inode, off, len, pages, num_pages, - (off & ~PAGE_MASK), checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, - copy, to); - off += l; - left -= l; - if (l < copy) + + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, + false, false); + ret = ceph_osdc_start_request(osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(osdc, req); + ceph_osdc_put_request(req); + + i_size = i_size_read(inode); + dout("sync_read %llu~%llu got %zd i_size %llu%s\n", + off, len, ret, i_size, (more ? " MORE" : "")); + + if (ret == -ENOENT) + ret = 0; + if (ret >= 0 && ret < len && (off + ret < i_size)) { + int zlen = min(len - ret, i_size - off - ret); + int zoff = page_off + ret; + dout("sync_read zero gap %llu~%llu\n", + off + ret, off + ret + zlen); + ceph_zero_page_vector_range(zoff, zlen, pages); + ret += zlen; + } + + if (unlikely(to->type & ITER_PIPE)) { + if (ret > 0) { + iov_iter_advance(to, ret); + off += ret; + } else { + iov_iter_advance(to, 0); + } + ceph_put_page_vector(pages, num_pages, false); + } else { + int idx = 0; + size_t left = ret > 0 ? ret : 0; + while (left > 0) { + size_t len, copied; + page_off = off & ~PAGE_MASK; + len = min_t(size_t, left, PAGE_SIZE - page_off); + copied = copy_page_to_iter(pages[idx++], + page_off, len, to); + off += copied; + left -= copied; + if (copied < len) { + ret = -EFAULT; break; + } } + ceph_release_page_vector(pages, num_pages); } - ceph_release_page_vector(pages, num_pages); + + if (ret <= 0 || off >= i_size || !more) + break; } if (off > iocb->ki_pos) { + if (ret >= 0 && + iov_iter_count(to) > 0 && off >= i_size_read(inode)) + *retry_op = CHECK_EOF; ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %zd\n", ret); + dout("sync_read result %zd retry_op %d\n", ret, *retry_op); return ret; } @@ -865,7 +859,7 @@ static void ceph_aio_retry_work(struct work_struct *work) } spin_unlock(&ci->i_ceph_lock); - req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2, + req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1, false, GFP_NOFS); if (!req) { ret = -ENOMEM; @@ -877,6 +871,11 @@ static void ceph_aio_retry_work(struct work_struct *work) ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); + req->r_ops[0] = orig_req->r_ops[0]; + + req->r_mtime = aio_req->mtime; + req->r_data_offset = req->r_ops[0].extent.offset; + ret = ceph_osdc_alloc_messages(req, GFP_NOFS); if (ret) { ceph_osdc_put_request(req); @@ -884,11 +883,6 @@ static void ceph_aio_retry_work(struct work_struct *work) goto out; } - req->r_ops[0] = orig_req->r_ops[0]; - - req->r_mtime = aio_req->mtime; - req->r_data_offset = req->r_ops[0].extent.offset; - ceph_osdc_put_request(orig_req); req->r_callback = ceph_aio_complete_req; @@ -1735,7 +1729,6 @@ static long ceph_fallocate(struct file *file, int mode, struct ceph_file_info *fi = file->private_data; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_cap_flush *prealloc_cf; int want, got = 0; int dirty; @@ -1743,10 +1736,7 @@ static long ceph_fallocate(struct file *file, int mode, loff_t endoff = 0; loff_t size; - if ((offset + length) > max(i_size_read(inode), fsc->max_file_size)) - return -EFBIG; - - if (mode & ~(FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) + if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) return -EOPNOTSUPP; if (!S_ISREG(inode->i_mode)) @@ -1763,18 +1753,6 @@ static long ceph_fallocate(struct file *file, int mode, goto unlock; } - if (!(mode & (FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE)) && - ceph_quota_is_max_bytes_exceeded(inode, offset + length)) { - ret = -EDQUOT; - goto unlock; - } - - if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL) && - !(mode & FALLOC_FL_PUNCH_HOLE)) { - ret = -ENOSPC; - goto unlock; - } - if (ci->i_inline_version != CEPH_INLINE_NONE) { ret = ceph_uninline_data(file, NULL); if (ret < 0) @@ -1782,12 +1760,12 @@ static long ceph_fallocate(struct file *file, int mode, } size = i_size_read(inode); - if (!(mode & FALLOC_FL_KEEP_SIZE)) { - endoff = offset + length; - ret = inode_newsize_ok(inode, endoff); - if (ret) - goto unlock; - } + + /* Are we punching a hole beyond EOF? */ + if (offset >= size) + goto unlock; + if ((offset + length) > size) + length = size - offset; if (fi->fmode & CEPH_FILE_MODE_LAZY) want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; @@ -1798,16 +1776,8 @@ static long ceph_fallocate(struct file *file, int mode, if (ret < 0) goto unlock; - if (mode & FALLOC_FL_PUNCH_HOLE) { - if (offset < size) - ceph_zero_pagecache_range(inode, offset, length); - ret = ceph_zero_objects(inode, offset, length); - } else if (endoff > size) { - truncate_pagecache_range(inode, size, -1); - if (ceph_inode_set_size(inode, endoff)) - ceph_check_caps(ceph_inode(inode), - CHECK_CAPS_AUTHONLY, NULL); - } + ceph_zero_pagecache_range(inode, offset, length); + ret = ceph_zero_objects(inode, offset, length); if (!ret) { spin_lock(&ci->i_ceph_lock); @@ -1817,9 +1787,6 @@ static long ceph_fallocate(struct file *file, int mode, spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); - if ((endoff > size) && - ceph_quota_is_max_bytes_approaching(inode, endoff)) - ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL); } ceph_put_cap_refs(ci, got); @@ -1829,6 +1796,300 @@ unlock: return ret; } +/* + * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for + * src_ci. Two attempts are made to obtain both caps, and an error is return if + * this fails; zero is returned on success. + */ +static int get_rd_wr_caps(struct ceph_inode_info *src_ci, + loff_t src_endoff, int *src_got, + struct ceph_inode_info *dst_ci, + loff_t dst_endoff, int *dst_got) +{ + int ret = 0; + bool retrying = false; + +retry_caps: + ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, + dst_endoff, dst_got, NULL); + if (ret < 0) + return ret; + + /* + * Since we're already holding the FILE_WR capability for the dst file, + * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some + * retry dance instead to try to get both capabilities. + */ + ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, + false, src_got); + if (ret <= 0) { + /* Start by dropping dst_ci caps and getting src_ci caps */ + ceph_put_cap_refs(dst_ci, *dst_got); + if (retrying) { + if (!ret) + /* ceph_try_get_caps masks EAGAIN */ + ret = -EAGAIN; + return ret; + } + ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD, + CEPH_CAP_FILE_SHARED, src_endoff, + src_got, NULL); + if (ret < 0) + return ret; + /*... drop src_ci caps too, and retry */ + ceph_put_cap_refs(src_ci, *src_got); + retrying = true; + goto retry_caps; + } + return ret; +} + +static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, + struct ceph_inode_info *dst_ci, int dst_got) +{ + ceph_put_cap_refs(src_ci, src_got); + ceph_put_cap_refs(dst_ci, dst_got); +} + +/* + * This function does several size-related checks, returning an error if: + * - source file is smaller than off+len + * - destination file size is not OK (inode_newsize_ok()) + * - max bytes quotas is exceeded + */ +static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, + loff_t src_off, loff_t dst_off, size_t len) +{ + loff_t size, endoff; + + size = i_size_read(src_inode); + /* + * Don't copy beyond source file EOF. Instead of simply setting length + * to (size - src_off), just drop to VFS default implementation, as the + * local i_size may be stale due to other clients writing to the source + * inode. + */ + if (src_off + len > size) { + dout("Copy beyond EOF (%llu + %zu > %llu)\n", + src_off, len, size); + return -EOPNOTSUPP; + } + size = i_size_read(dst_inode); + + endoff = dst_off + len; + if (inode_newsize_ok(dst_inode, endoff)) + return -EOPNOTSUPP; + + if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) + return -EDQUOT; + + return 0; +} + +static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, + struct file *dst_file, loff_t dst_off, + size_t len, unsigned int flags) +{ + struct inode *src_inode = file_inode(src_file); + struct inode *dst_inode = file_inode(dst_file); + struct ceph_inode_info *src_ci = ceph_inode(src_inode); + struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); + struct ceph_cap_flush *prealloc_cf; + struct ceph_object_locator src_oloc, dst_oloc; + struct ceph_object_id src_oid, dst_oid; + loff_t endoff = 0, size; + ssize_t ret = -EIO; + u64 src_objnum, dst_objnum, src_objoff, dst_objoff; + u32 src_objlen, dst_objlen, object_size; + int src_got = 0, dst_got = 0, err, dirty; + bool do_final_copy = false; + + if (src_inode == dst_inode) + return -EINVAL; + if (ceph_snap(dst_inode) != CEPH_NOSNAP) + return -EROFS; + + /* + * Some of the checks below will return -EOPNOTSUPP, which will force a + * fallback to the default VFS copy_file_range implementation. This is + * desirable in several cases (for ex, the 'len' is smaller than the + * size of the objects, or in cases where that would be more + * efficient). + */ + + if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM)) + return -EOPNOTSUPP; + + if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || + (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) || + (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) + return -EOPNOTSUPP; + + if (len < src_ci->i_layout.object_size) + return -EOPNOTSUPP; /* no remote copy will be done */ + + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + + /* Start by sync'ing the source file */ + ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); + if (ret < 0) + goto out; + + /* + * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other + * clients may have dirty data in their caches. And OSDs know nothing + * about caps, so they can't safely do the remote object copies. + */ + err = get_rd_wr_caps(src_ci, (src_off + len), &src_got, + dst_ci, (dst_off + len), &dst_got); + if (err < 0) { + dout("get_rd_wr_caps returned %d\n", err); + ret = -EOPNOTSUPP; + goto out; + } + + ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); + if (ret < 0) + goto out_caps; + + size = i_size_read(dst_inode); + endoff = dst_off + len; + + /* Drop dst file cached pages */ + ret = invalidate_inode_pages2_range(dst_inode->i_mapping, + dst_off >> PAGE_SHIFT, + endoff >> PAGE_SHIFT); + if (ret < 0) { + dout("Failed to invalidate inode pages (%zd)\n", ret); + ret = 0; /* XXX */ + } + src_oloc.pool = src_ci->i_layout.pool_id; + src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); + dst_oloc.pool = dst_ci->i_layout.pool_id; + dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); + + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + src_ci->i_layout.object_size, + &src_objnum, &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + dst_ci->i_layout.object_size, + &dst_objnum, &dst_objoff, &dst_objlen); + /* object-level offsets need to the same */ + if (src_objoff != dst_objoff) { + ret = -EOPNOTSUPP; + goto out_caps; + } + + /* + * Do a manual copy if the object offset isn't object aligned. + * 'src_objlen' contains the bytes left until the end of the object, + * starting at the src_off + */ + if (src_objoff) { + /* + * we need to temporarily drop all caps as we'll be calling + * {read,write}_iter, which will get caps again. + */ + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + ret = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, src_objlen, flags); + if (ret < 0) { + dout("do_splice_direct returned %d\n", err); + goto out; + } + len -= ret; + err = get_rd_wr_caps(src_ci, (src_off + len), + &src_got, dst_ci, + (dst_off + len), &dst_got); + if (err < 0) + goto out; + err = is_file_size_ok(src_inode, dst_inode, + src_off, dst_off, len); + if (err < 0) + goto out_caps; + } + object_size = src_ci->i_layout.object_size; + while (len >= object_size) { + ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, + object_size, &src_objnum, + &src_objoff, &src_objlen); + ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, + object_size, &dst_objnum, + &dst_objoff, &dst_objlen); + ceph_oid_init(&src_oid); + ceph_oid_printf(&src_oid, "%llx.%08llx", + src_ci->i_vino.ino, src_objnum); + ceph_oid_init(&dst_oid); + ceph_oid_printf(&dst_oid, "%llx.%08llx", + dst_ci->i_vino.ino, dst_objnum); + /* Do an object remote copy */ + err = ceph_osdc_copy_from( + &ceph_inode_to_client(src_inode)->client->osdc, + src_ci->i_vino.snap, 0, + &src_oid, &src_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE, + &dst_oid, &dst_oloc, + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0); + if (err) { + dout("ceph_osdc_copy_from returned %d\n", err); + if (!ret) + ret = err; + goto out_caps; + } + len -= object_size; + src_off += object_size; + dst_off += object_size; + ret += object_size; + } + + if (len) + /* We still need one final local copy */ + do_final_copy = true; + + file_update_time(dst_file); + if (endoff > size) { + int caps_flags = 0; + + /* Let the MDS know about dst file size change */ + if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) + caps_flags |= CHECK_CAPS_NODELAY; + if (ceph_inode_set_size(dst_inode, endoff)) + caps_flags |= CHECK_CAPS_AUTHONLY; + if (caps_flags) + ceph_check_caps(dst_ci, caps_flags, NULL); + } + /* Mark Fw dirty */ + spin_lock(&dst_ci->i_ceph_lock); + dst_ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); + spin_unlock(&dst_ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(dst_inode, dirty); + +out_caps: + put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); + + if (do_final_copy) { + err = do_splice_direct(src_file, &src_off, dst_file, + &dst_off, len, flags); + if (err < 0) { + dout("do_splice_direct returned %d\n", err); + goto out; + } + len -= err; + ret += err; + } + +out: + ceph_free_cap_flush(prealloc_cf); + + return ret; +} + const struct file_operations ceph_file_fops = { .open = ceph_open, .release = ceph_release, @@ -1844,5 +2105,5 @@ const struct file_operations ceph_file_fops = { .unlocked_ioctl = ceph_ioctl, .compat_ioctl = ceph_ioctl, .fallocate = ceph_fallocate, + .copy_file_range = ceph_copy_file_range, }; - diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index ebc7bdaed2d0..79dd5e6ed755 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1132,8 +1132,12 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) if (IS_ERR(realdn)) { pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n", PTR_ERR(realdn), dn, in, ceph_vinop(in)); - dput(dn); - dn = realdn; /* note realdn contains the error */ + dn = realdn; + /* + * Caller should release 'dn' in the case of error. + * If 'req->r_dentry' is passed to this function, + * caller should leave 'req->r_dentry' untouched. + */ goto out; } else if (realdn) { dout("dn %p (%d) spliced with %p (%d) " @@ -1196,7 +1200,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) WARN_ON_ONCE(1); } - if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME) { + if (dir && req->r_op == CEPH_MDS_OP_LOOKUPNAME && + test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) && + !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { struct qstr dname; struct dentry *dn, *parent; @@ -1677,7 +1683,6 @@ retry_lookup: if (IS_ERR(realdn)) { err = PTR_ERR(realdn); d_drop(dn); - dn = NULL; goto next_item; } dn = realdn; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index bc43c822426a..67a9aeb2f4ec 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2071,7 +2071,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_old_dentry_drop) len += req->r_old_dentry->d_name.len; - msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); + msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); if (!msg) { msg = ERR_PTR(-ENOMEM); goto out_free2; @@ -2136,7 +2136,6 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_pagelist) { struct ceph_pagelist *pagelist = req->r_pagelist; - refcount_inc(&pagelist->refcnt); ceph_msg_data_add_pagelist(msg, pagelist); msg->hdr.data_len = cpu_to_le32(pagelist->length); } else { @@ -3126,12 +3125,11 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, pr_info("mds%d reconnect start\n", mds); - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) goto fail_nopagelist; - ceph_pagelist_init(pagelist); - reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); + reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); if (!reply) goto fail_nomsg; @@ -3241,6 +3239,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); up_read(&mdsc->snap_rwsem); + ceph_pagelist_release(pagelist); return; fail: diff --git a/fs/ceph/super.c b/fs/ceph/super.c index eab1359d0553..b5ecd6f50360 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -165,6 +165,8 @@ enum { Opt_noacl, Opt_quotadf, Opt_noquotadf, + Opt_copyfrom, + Opt_nocopyfrom, }; static match_table_t fsopt_tokens = { @@ -203,6 +205,8 @@ static match_table_t fsopt_tokens = { {Opt_noacl, "noacl"}, {Opt_quotadf, "quotadf"}, {Opt_noquotadf, "noquotadf"}, + {Opt_copyfrom, "copyfrom"}, + {Opt_nocopyfrom, "nocopyfrom"}, {-1, NULL} }; @@ -355,6 +359,12 @@ static int parse_fsopt_token(char *c, void *private) case Opt_noquotadf: fsopt->flags |= CEPH_MOUNT_OPT_NOQUOTADF; break; + case Opt_copyfrom: + fsopt->flags &= ~CEPH_MOUNT_OPT_NOCOPYFROM; + break; + case Opt_nocopyfrom: + fsopt->flags |= CEPH_MOUNT_OPT_NOCOPYFROM; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= SB_POSIXACL; @@ -553,6 +563,9 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",noacl"); #endif + if (fsopt->flags & CEPH_MOUNT_OPT_NOCOPYFROM) + seq_puts(m, ",nocopyfrom"); + if (fsopt->mds_namespace) seq_show_option(m, "mds_namespace", fsopt->mds_namespace); if (fsopt->wsize != CEPH_MAX_WRITE_SIZE) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 582e28fd1b7b..c005a5400f2e 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -40,6 +40,7 @@ #define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ #define CEPH_MOUNT_OPT_MOUNTWAIT (1<<12) /* mount waits if no mds is up */ #define CEPH_MOUNT_OPT_NOQUOTADF (1<<13) /* no root dir quota in statfs */ +#define CEPH_MOUNT_OPT_NOCOPYFROM (1<<14) /* don't use RADOS 'copy-from' op */ #define CEPH_MOUNT_OPT_DEFAULT CEPH_MOUNT_OPT_DCACHE @@ -1008,7 +1009,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page); extern int ceph_try_get_caps(struct ceph_inode_info *ci, - int need, int want, int *got); + int need, int want, bool nonblock, int *got); /* for counting open files by mode */ extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 5cc8b94f8206..316f6ad10644 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -951,11 +951,10 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name, if (size > 0) { /* copy value into pagelist */ - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); err = ceph_pagelist_append(pagelist, value, size); if (err) goto out; diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 49c93b9308d7..68bb09c29ce8 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -81,7 +81,13 @@ struct ceph_options { #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) -#define CEPH_MSG_MAX_DATA_LEN (16*1024*1024) + +/* + * Handle the largest possible rbd object in one message. + * There is no limit on the size of cephfs objects, but it has to obey + * rsize and wsize mount options anyway. + */ +#define CEPH_MSG_MAX_DATA_LEN (32*1024*1024) #define CEPH_AUTH_NAME_DEFAULT "guest" diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index fc2b4491ee0a..800a2128d411 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -82,22 +82,6 @@ enum ceph_msg_data_type { CEPH_MSG_DATA_BVECS, /* data source/destination is a bio_vec array */ }; -static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) -{ - switch (type) { - case CEPH_MSG_DATA_NONE: - case CEPH_MSG_DATA_PAGES: - case CEPH_MSG_DATA_PAGELIST: -#ifdef CONFIG_BLOCK - case CEPH_MSG_DATA_BIO: -#endif /* CONFIG_BLOCK */ - case CEPH_MSG_DATA_BVECS: - return true; - default: - return false; - } -} - #ifdef CONFIG_BLOCK struct ceph_bio_iter { @@ -181,7 +165,6 @@ struct ceph_bvec_iter { } while (0) struct ceph_msg_data { - struct list_head links; /* ceph_msg->data */ enum ceph_msg_data_type type; union { #ifdef CONFIG_BLOCK @@ -202,7 +185,6 @@ struct ceph_msg_data { struct ceph_msg_data_cursor { size_t total_resid; /* across all data items */ - struct list_head *data_head; /* = &ceph_msg->data */ struct ceph_msg_data *data; /* current data item */ size_t resid; /* bytes not yet consumed */ @@ -240,7 +222,9 @@ struct ceph_msg { struct ceph_buffer *middle; size_t data_length; - struct list_head data; + struct ceph_msg_data *data; + int num_data_items; + int max_data_items; struct ceph_msg_data_cursor cursor; struct ceph_connection *con; @@ -381,6 +365,8 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, void ceph_msg_data_add_bvecs(struct ceph_msg *msg, struct ceph_bvec_iter *bvec_pos); +struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, + gfp_t flags, bool can_fail); extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, bool can_fail); diff --git a/include/linux/ceph/msgpool.h b/include/linux/ceph/msgpool.h index 76c98a512758..729cdf700eae 100644 --- a/include/linux/ceph/msgpool.h +++ b/include/linux/ceph/msgpool.h @@ -13,14 +13,15 @@ struct ceph_msgpool { mempool_t *pool; int type; /* preallocated message type */ int front_len; /* preallocated payload size */ + int max_data_items; }; -extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type, - int front_len, int size, bool blocking, - const char *name); +int ceph_msgpool_init(struct ceph_msgpool *pool, int type, + int front_len, int max_data_items, int size, + const char *name); extern void ceph_msgpool_destroy(struct ceph_msgpool *pool); -extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *, - int front_len); +struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len, + int max_data_items); extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *); #endif diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 02096da01845..7a2af5034278 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -136,6 +136,13 @@ struct ceph_osd_req_op { u64 expected_object_size; u64 expected_write_size; } alloc_hint; + struct { + u64 snapid; + u64 src_version; + u8 flags; + u32 src_fadvise_flags; + struct ceph_osd_data osd_data; + } copy_from; }; }; @@ -444,9 +451,8 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); -extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - const char *class, const char *method); +int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, + const char *class, const char *method); extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode); @@ -511,6 +517,16 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct timespec64 *mtime, struct page **pages, int nr_pages); +int ceph_osdc_copy_from(struct ceph_osd_client *osdc, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + struct ceph_object_id *dst_oid, + struct ceph_object_locator *dst_oloc, + u32 dst_fadvise_flags, + u8 copy_from_flags); + /* watch/notify */ struct ceph_osd_linger_request * ceph_osdc_watch(struct ceph_osd_client *osdc, diff --git a/include/linux/ceph/pagelist.h b/include/linux/ceph/pagelist.h index d0223364349f..5dead8486fd8 100644 --- a/include/linux/ceph/pagelist.h +++ b/include/linux/ceph/pagelist.h @@ -23,16 +23,7 @@ struct ceph_pagelist_cursor { size_t room; /* room remaining to reset to */ }; -static inline void ceph_pagelist_init(struct ceph_pagelist *pl) -{ - INIT_LIST_HEAD(&pl->head); - pl->mapped_tail = NULL; - pl->length = 0; - pl->room = 0; - INIT_LIST_HEAD(&pl->free_list); - pl->num_pages_free = 0; - refcount_set(&pl->refcnt, 1); -} +struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags); extern void ceph_pagelist_release(struct ceph_pagelist *pl); diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index f1988387c5ad..3eb0e55665b4 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -410,6 +410,14 @@ enum { enum { CEPH_OSD_OP_FLAG_EXCL = 1, /* EXCL object create */ CEPH_OSD_OP_FLAG_FAILOK = 2, /* continue despite failure */ + CEPH_OSD_OP_FLAG_FADVISE_RANDOM = 0x4, /* the op is random */ + CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL = 0x8, /* the op is sequential */ + CEPH_OSD_OP_FLAG_FADVISE_WILLNEED = 0x10,/* data will be accessed in + the near future */ + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED = 0x20,/* data will not be accessed + in the near future */ + CEPH_OSD_OP_FLAG_FADVISE_NOCACHE = 0x40,/* data will be accessed only + once by this client */ }; #define EOLDSNAPC ERESTART /* ORDERSNAP flag set; writer has old snapc*/ @@ -432,6 +440,15 @@ enum { }; enum { + CEPH_OSD_COPY_FROM_FLAG_FLUSH = 1, /* part of a flush operation */ + CEPH_OSD_COPY_FROM_FLAG_IGNORE_OVERLAY = 2, /* ignore pool overlay */ + CEPH_OSD_COPY_FROM_FLAG_IGNORE_CACHE = 4, /* ignore osd cache logic */ + CEPH_OSD_COPY_FROM_FLAG_MAP_SNAP_CLONE = 8, /* map snap direct to + * cloneid */ + CEPH_OSD_COPY_FROM_FLAG_RWORDERED = 16, /* order with write */ +}; + +enum { CEPH_OSD_WATCH_OP_UNWATCH = 0, CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1, /* note: use only ODD ids to prevent pre-giant code from @@ -497,6 +514,17 @@ struct ceph_osd_op { __le64 expected_object_size; __le64 expected_write_size; } __attribute__ ((packed)) alloc_hint; + struct { + __le64 snapid; + __le64 src_version; + __u8 flags; /* CEPH_OSD_COPY_FROM_FLAG_* */ + /* + * CEPH_OSD_OP_FLAG_FADVISE_*: fadvise flags + * for src object, flags for dest object are in + * ceph_osd_op::flags. + */ + __le32 src_fadvise_flags; + } __attribute__ ((packed)) copy_from; }; __le32 payload_len; } __attribute__ ((packed)); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 0a187196aeed..88e35830198c 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -156,7 +156,6 @@ static bool con_flag_test_and_set(struct ceph_connection *con, /* Slab caches for frequently-allocated structures */ static struct kmem_cache *ceph_msg_cache; -static struct kmem_cache *ceph_msg_data_cache; /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; @@ -235,23 +234,11 @@ static int ceph_msgr_slab_init(void) if (!ceph_msg_cache) return -ENOMEM; - BUG_ON(ceph_msg_data_cache); - ceph_msg_data_cache = KMEM_CACHE(ceph_msg_data, 0); - if (ceph_msg_data_cache) - return 0; - - kmem_cache_destroy(ceph_msg_cache); - ceph_msg_cache = NULL; - - return -ENOMEM; + return 0; } static void ceph_msgr_slab_exit(void) { - BUG_ON(!ceph_msg_data_cache); - kmem_cache_destroy(ceph_msg_data_cache); - ceph_msg_data_cache = NULL; - BUG_ON(!ceph_msg_cache); kmem_cache_destroy(ceph_msg_cache); ceph_msg_cache = NULL; @@ -1141,16 +1128,13 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor) static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) { struct ceph_msg_data_cursor *cursor = &msg->cursor; - struct ceph_msg_data *data; BUG_ON(!length); BUG_ON(length > msg->data_length); - BUG_ON(list_empty(&msg->data)); + BUG_ON(!msg->num_data_items); - cursor->data_head = &msg->data; cursor->total_resid = length; - data = list_first_entry(&msg->data, struct ceph_msg_data, links); - cursor->data = data; + cursor->data = msg->data; __ceph_msg_data_cursor_init(cursor); } @@ -1231,8 +1215,7 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, if (!cursor->resid && cursor->total_resid) { WARN_ON(!cursor->last_piece); - BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); - cursor->data = list_next_entry(cursor->data, links); + cursor->data++; __ceph_msg_data_cursor_init(cursor); new_piece = true; } @@ -1248,9 +1231,6 @@ static size_t sizeof_footer(struct ceph_connection *con) static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { - BUG_ON(!msg); - BUG_ON(!data_len); - /* Initialize data cursor */ ceph_msg_data_cursor_init(msg, (size_t)data_len); @@ -1590,7 +1570,7 @@ static int write_partial_message_data(struct ceph_connection *con) dout("%s %p msg %p\n", __func__, con, msg); - if (list_empty(&msg->data)) + if (!msg->num_data_items) return -EINVAL; /* @@ -2347,8 +2327,7 @@ static int read_partial_msg_data(struct ceph_connection *con) u32 crc = 0; int ret; - BUG_ON(!msg); - if (list_empty(&msg->data)) + if (!msg->num_data_items) return -EIO; if (do_datacrc) @@ -3256,32 +3235,16 @@ bool ceph_con_keepalive_expired(struct ceph_connection *con, return false; } -static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) +static struct ceph_msg_data *ceph_msg_data_add(struct ceph_msg *msg) { - struct ceph_msg_data *data; - - if (WARN_ON(!ceph_msg_data_type_valid(type))) - return NULL; - - data = kmem_cache_zalloc(ceph_msg_data_cache, GFP_NOFS); - if (!data) - return NULL; - - data->type = type; - INIT_LIST_HEAD(&data->links); - - return data; + BUG_ON(msg->num_data_items >= msg->max_data_items); + return &msg->data[msg->num_data_items++]; } static void ceph_msg_data_destroy(struct ceph_msg_data *data) { - if (!data) - return; - - WARN_ON(!list_empty(&data->links)); if (data->type == CEPH_MSG_DATA_PAGELIST) ceph_pagelist_release(data->pagelist); - kmem_cache_free(ceph_msg_data_cache, data); } void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, @@ -3292,13 +3255,12 @@ void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages, BUG_ON(!pages); BUG_ON(!length); - data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_PAGES; data->pages = pages; data->length = length; data->alignment = alignment & ~PAGE_MASK; - list_add_tail(&data->links, &msg->data); msg->data_length += length; } EXPORT_SYMBOL(ceph_msg_data_add_pages); @@ -3311,11 +3273,11 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg, BUG_ON(!pagelist); BUG_ON(!pagelist->length); - data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_PAGELIST; + refcount_inc(&pagelist->refcnt); data->pagelist = pagelist; - list_add_tail(&data->links, &msg->data); msg->data_length += pagelist->length; } EXPORT_SYMBOL(ceph_msg_data_add_pagelist); @@ -3326,12 +3288,11 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos, { struct ceph_msg_data *data; - data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_BIO; data->bio_pos = *bio_pos; data->bio_length = length; - list_add_tail(&data->links, &msg->data); msg->data_length += length; } EXPORT_SYMBOL(ceph_msg_data_add_bio); @@ -3342,11 +3303,10 @@ void ceph_msg_data_add_bvecs(struct ceph_msg *msg, { struct ceph_msg_data *data; - data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS); - BUG_ON(!data); + data = ceph_msg_data_add(msg); + data->type = CEPH_MSG_DATA_BVECS; data->bvec_pos = *bvec_pos; - list_add_tail(&data->links, &msg->data); msg->data_length += bvec_pos->iter.bi_size; } EXPORT_SYMBOL(ceph_msg_data_add_bvecs); @@ -3355,8 +3315,8 @@ EXPORT_SYMBOL(ceph_msg_data_add_bvecs); * construct a new message with given type, size * the new msg has a ref count of 1. */ -struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, - bool can_fail) +struct ceph_msg *ceph_msg_new2(int type, int front_len, int max_data_items, + gfp_t flags, bool can_fail) { struct ceph_msg *m; @@ -3370,7 +3330,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, INIT_LIST_HEAD(&m->list_head); kref_init(&m->kref); - INIT_LIST_HEAD(&m->data); /* front */ if (front_len) { @@ -3385,6 +3344,15 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, } m->front_alloc_len = m->front.iov_len = front_len; + if (max_data_items) { + m->data = kmalloc_array(max_data_items, sizeof(*m->data), + flags); + if (!m->data) + goto out2; + + m->max_data_items = max_data_items; + } + dout("ceph_msg_new %p front %d\n", m, front_len); return m; @@ -3401,6 +3369,13 @@ out: } return NULL; } +EXPORT_SYMBOL(ceph_msg_new2); + +struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, + bool can_fail) +{ + return ceph_msg_new2(type, front_len, 0, flags, can_fail); +} EXPORT_SYMBOL(ceph_msg_new); /* @@ -3496,13 +3471,14 @@ static void ceph_msg_free(struct ceph_msg *m) { dout("%s %p\n", __func__, m); kvfree(m->front.iov_base); + kfree(m->data); kmem_cache_free(ceph_msg_cache, m); } static void ceph_msg_release(struct kref *kref) { struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); - struct ceph_msg_data *data, *next; + int i; dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); @@ -3515,11 +3491,8 @@ static void ceph_msg_release(struct kref *kref) m->middle = NULL; } - list_for_each_entry_safe(data, next, &m->data, links) { - list_del_init(&data->links); - ceph_msg_data_destroy(data); - } - m->data_length = 0; + for (i = 0; i < m->num_data_items; i++) + ceph_msg_data_destroy(&m->data[i]); if (m->pool) ceph_msgpool_put(m->pool, m); diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c index 72571535883f..e3ecb80cd182 100644 --- a/net/ceph/msgpool.c +++ b/net/ceph/msgpool.c @@ -14,7 +14,8 @@ static void *msgpool_alloc(gfp_t gfp_mask, void *arg) struct ceph_msgpool *pool = arg; struct ceph_msg *msg; - msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true); + msg = ceph_msg_new2(pool->type, pool->front_len, pool->max_data_items, + gfp_mask, true); if (!msg) { dout("msgpool_alloc %s failed\n", pool->name); } else { @@ -35,11 +36,13 @@ static void msgpool_free(void *element, void *arg) } int ceph_msgpool_init(struct ceph_msgpool *pool, int type, - int front_len, int size, bool blocking, const char *name) + int front_len, int max_data_items, int size, + const char *name) { dout("msgpool %s init\n", name); pool->type = type; pool->front_len = front_len; + pool->max_data_items = max_data_items; pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool); if (!pool->pool) return -ENOMEM; @@ -53,18 +56,21 @@ void ceph_msgpool_destroy(struct ceph_msgpool *pool) mempool_destroy(pool->pool); } -struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, - int front_len) +struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len, + int max_data_items) { struct ceph_msg *msg; - if (front_len > pool->front_len) { - dout("msgpool_get %s need front %d, pool size is %d\n", - pool->name, front_len, pool->front_len); - WARN_ON(1); + if (front_len > pool->front_len || + max_data_items > pool->max_data_items) { + pr_warn_ratelimited("%s need %d/%d, pool %s has %d/%d\n", + __func__, front_len, max_data_items, pool->name, + pool->front_len, pool->max_data_items); + WARN_ON_ONCE(1); /* try to alloc a fresh message */ - return ceph_msg_new(pool->type, front_len, GFP_NOFS, false); + return ceph_msg_new2(pool->type, front_len, max_data_items, + GFP_NOFS, false); } msg = mempool_alloc(pool->pool, GFP_NOFS); @@ -80,6 +86,9 @@ void ceph_msgpool_put(struct ceph_msgpool *pool, struct ceph_msg *msg) msg->front.iov_len = pool->front_len; msg->hdr.front_len = cpu_to_le32(pool->front_len); + msg->data_length = 0; + msg->num_data_items = 0; + kref_init(&msg->kref); /* retake single ref */ mempool_free(msg, pool->pool); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 60934bd8796c..d23a9f81f3d7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -126,6 +126,9 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data) osd_data->type = CEPH_OSD_DATA_TYPE_NONE; } +/* + * Consumes @pages if @own_pages is true. + */ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages) @@ -138,6 +141,9 @@ static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, osd_data->own_pages = own_pages; } +/* + * Consumes a ref on @pagelist. + */ static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, struct ceph_pagelist *pagelist) { @@ -362,6 +368,8 @@ static void ceph_osd_data_release(struct ceph_osd_data *osd_data) num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); ceph_release_page_vector(osd_data->pages, num_pages); + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { + ceph_pagelist_release(osd_data->pagelist); } ceph_osd_data_init(osd_data); } @@ -402,6 +410,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_LIST_WATCHERS: ceph_osd_data_release(&op->list_watchers.response_data); break; + case CEPH_OSD_OP_COPY_FROM: + ceph_osd_data_release(&op->copy_from.osd_data); + break; default: break; } @@ -606,12 +617,15 @@ static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); } -int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp, + int num_request_data_items, + int num_reply_data_items) { struct ceph_osd_client *osdc = req->r_osdc; struct ceph_msg *msg; int msg_size; + WARN_ON(req->r_request || req->r_reply); WARN_ON(ceph_oid_empty(&req->r_base_oid)); WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); @@ -633,9 +647,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += 4 + 8; /* retry_attempt, features */ if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op, 0); + msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size, + num_request_data_items); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); + msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size, + num_request_data_items, gfp, true); if (!msg) return -ENOMEM; @@ -648,9 +664,11 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); if (req->r_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); + msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size, + num_reply_data_items); else - msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); + msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size, + num_reply_data_items, gfp, true); if (!msg) return -ENOMEM; @@ -658,7 +676,6 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) return 0; } -EXPORT_SYMBOL(ceph_osdc_alloc_messages); static bool osd_req_opcode_valid(u16 opcode) { @@ -671,6 +688,65 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) } } +static void get_num_data_items(struct ceph_osd_request *req, + int *num_request_data_items, + int *num_reply_data_items) +{ + struct ceph_osd_req_op *op; + + *num_request_data_items = 0; + *num_reply_data_items = 0; + + for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { + switch (op->op) { + /* request */ + case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_NOTIFY_ACK: + case CEPH_OSD_OP_COPY_FROM: + *num_request_data_items += 1; + break; + + /* reply */ + case CEPH_OSD_OP_STAT: + case CEPH_OSD_OP_READ: + case CEPH_OSD_OP_LIST_WATCHERS: + *num_reply_data_items += 1; + break; + + /* both */ + case CEPH_OSD_OP_NOTIFY: + *num_request_data_items += 1; + *num_reply_data_items += 1; + break; + case CEPH_OSD_OP_CALL: + *num_request_data_items += 2; + *num_reply_data_items += 1; + break; + + default: + WARN_ON(!osd_req_opcode_valid(op->op)); + break; + } + } +} + +/* + * oid, oloc and OSD op opcode(s) must be filled in before this function + * is called. + */ +int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +{ + int num_request_data_items, num_reply_data_items; + + get_num_data_items(req, &num_request_data_items, &num_reply_data_items); + return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items, + num_reply_data_items); +} +EXPORT_SYMBOL(ceph_osdc_alloc_messages); + /* * This is an osd op init function for opcodes that have no data or * other information associated with them. It also serves as a @@ -767,22 +843,19 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, EXPORT_SYMBOL(osd_req_op_extent_dup_last); int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, - u16 opcode, const char *class, const char *method) + const char *class, const char *method) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - opcode, 0); + struct ceph_osd_req_op *op; struct ceph_pagelist *pagelist; size_t payload_len = 0; size_t size; - BUG_ON(opcode != CEPH_OSD_OP_CALL); + op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0); - pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); - op->cls.class_name = class; size = strlen(class); BUG_ON(size > (size_t) U8_MAX); @@ -815,12 +888,10 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); - pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); + pagelist = ceph_pagelist_alloc(GFP_NOFS); if (!pagelist) return -ENOMEM; - ceph_pagelist_init(pagelist); - payload_len = strlen(name); op->xattr.name_len = payload_len; ceph_pagelist_append(pagelist, name, payload_len); @@ -900,12 +971,6 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, static u32 osd_req_encode_op(struct ceph_osd_op *dst, const struct ceph_osd_req_op *src) { - if (WARN_ON(!osd_req_opcode_valid(src->op))) { - pr_err("unrecognized osd opcode %d\n", src->op); - - return 0; - } - switch (src->op) { case CEPH_OSD_OP_STAT: break; @@ -955,6 +1020,14 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_CREATE: case CEPH_OSD_OP_DELETE: break; + case CEPH_OSD_OP_COPY_FROM: + dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid); + dst->copy_from.src_version = + cpu_to_le64(src->copy_from.src_version); + dst->copy_from.flags = src->copy_from.flags; + dst->copy_from.src_fadvise_flags = + cpu_to_le32(src->copy_from.src_fadvise_flags); + break; default: pr_err("unsupported osd opcode %s\n", ceph_osd_op_name(src->op)); @@ -1038,7 +1111,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, if (flags & CEPH_OSD_FLAG_WRITE) req->r_data_offset = off; - r = ceph_osdc_alloc_messages(req, GFP_NOFS); + if (num_ops > 1) + /* + * This is a special case for ceph_writepages_start(), but it + * also covers ceph_uninline_data(). If more multi-op request + * use cases emerge, we will need a separate helper. + */ + r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0); + else + r = ceph_osdc_alloc_messages(req, GFP_NOFS); if (r) goto fail; @@ -1845,48 +1926,55 @@ static bool should_plug_request(struct ceph_osd_request *req) return true; } -static void setup_request_data(struct ceph_osd_request *req, - struct ceph_msg *msg) +/* + * Keep get_num_data_items() in sync with this function. + */ +static void setup_request_data(struct ceph_osd_request *req) { - u32 data_len = 0; - int i; + struct ceph_msg *request_msg = req->r_request; + struct ceph_msg *reply_msg = req->r_reply; + struct ceph_osd_req_op *op; - if (!list_empty(&msg->data)) + if (req->r_request->num_data_items || req->r_reply->num_data_items) return; - WARN_ON(msg->data_length); - for (i = 0; i < req->r_num_ops; i++) { - struct ceph_osd_req_op *op = &req->r_ops[i]; - + WARN_ON(request_msg->data_length || reply_msg->data_length); + for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { switch (op->op) { /* request */ case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: WARN_ON(op->indata_len != op->extent.length); - ceph_osdc_msg_data_add(msg, &op->extent.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->extent.osd_data); break; case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_CMPXATTR: WARN_ON(op->indata_len != op->xattr.name_len + op->xattr.value_len); - ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); + ceph_osdc_msg_data_add(request_msg, + &op->xattr.osd_data); break; case CEPH_OSD_OP_NOTIFY_ACK: - ceph_osdc_msg_data_add(msg, + ceph_osdc_msg_data_add(request_msg, &op->notify_ack.request_data); break; + case CEPH_OSD_OP_COPY_FROM: + ceph_osdc_msg_data_add(request_msg, + &op->copy_from.osd_data); + break; /* reply */ case CEPH_OSD_OP_STAT: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->raw_data_in); break; case CEPH_OSD_OP_READ: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->extent.osd_data); break; case CEPH_OSD_OP_LIST_WATCHERS: - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->list_watchers.response_data); break; @@ -1895,25 +1983,23 @@ static void setup_request_data(struct ceph_osd_request *req, WARN_ON(op->indata_len != op->cls.class_len + op->cls.method_len + op->cls.indata_len); - ceph_osdc_msg_data_add(msg, &op->cls.request_info); + ceph_osdc_msg_data_add(request_msg, + &op->cls.request_info); /* optional, can be NONE */ - ceph_osdc_msg_data_add(msg, &op->cls.request_data); + ceph_osdc_msg_data_add(request_msg, + &op->cls.request_data); /* optional, can be NONE */ - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->cls.response_data); break; case CEPH_OSD_OP_NOTIFY: - ceph_osdc_msg_data_add(msg, + ceph_osdc_msg_data_add(request_msg, &op->notify.request_data); - ceph_osdc_msg_data_add(req->r_reply, + ceph_osdc_msg_data_add(reply_msg, &op->notify.response_data); break; } - - data_len += op->indata_len; } - - WARN_ON(data_len != msg->data_length); } static void encode_pgid(void **p, const struct ceph_pg *pgid) @@ -1961,7 +2047,7 @@ static void encode_request_partial(struct ceph_osd_request *req, req->r_data_offset || req->r_snapc); } - setup_request_data(req, msg); + setup_request_data(req); encode_spgid(&p, &req->r_t.spgid); /* actual spg */ ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ @@ -3001,11 +3087,21 @@ static void linger_submit(struct ceph_osd_linger_request *lreq) struct ceph_osd_client *osdc = lreq->osdc; struct ceph_osd *osd; + down_write(&osdc->lock); + linger_register(lreq); + if (lreq->is_watch) { + lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id; + lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id; + } else { + lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id; + } + calc_target(osdc, &lreq->t, NULL, false); osd = lookup_create_osd(osdc, lreq->t.osd, true); link_linger(osd, lreq); send_linger(lreq); + up_write(&osdc->lock); } static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) @@ -4318,9 +4414,7 @@ static void handle_watch_notify(struct ceph_osd_client *osdc, lreq->notify_id, notify_id); } else if (!completion_done(&lreq->notify_finish_wait)) { struct ceph_msg_data *data = - list_first_entry_or_null(&msg->data, - struct ceph_msg_data, - links); + msg->num_data_items ? &msg->data[0] : NULL; if (data) { if (lreq->preply_pages) { @@ -4476,6 +4570,23 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq) ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + return req; +} + +static struct ceph_osd_request * +alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode) +{ + struct ceph_osd_request *req; + + req = alloc_linger_request(lreq); + if (!req) + return NULL; + + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + osd_req_op_watch_init(req, 0, 0, watch_opcode); if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { ceph_osdc_put_request(req); @@ -4514,27 +4625,19 @@ ceph_osdc_watch(struct ceph_osd_client *osdc, lreq->t.flags = CEPH_OSD_FLAG_WRITE; ktime_get_real_ts64(&lreq->mtime); - lreq->reg_req = alloc_linger_request(lreq); + lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH); if (!lreq->reg_req) { ret = -ENOMEM; goto err_put_lreq; } - lreq->ping_req = alloc_linger_request(lreq); + lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING); if (!lreq->ping_req) { ret = -ENOMEM; goto err_put_lreq; } - down_write(&osdc->lock); - linger_register(lreq); /* before osd_req_op_* */ - osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, - CEPH_OSD_WATCH_OP_WATCH); - osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, - CEPH_OSD_WATCH_OP_PING); linger_submit(lreq); - up_write(&osdc->lock); - ret = linger_reg_commit_wait(lreq); if (ret) { linger_cancel(lreq); @@ -4599,11 +4702,10 @@ static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); - pl = kmalloc(sizeof(*pl), GFP_NOIO); + pl = ceph_pagelist_alloc(GFP_NOIO); if (!pl) return -ENOMEM; - ceph_pagelist_init(pl); ret = ceph_pagelist_encode_64(pl, notify_id); ret |= ceph_pagelist_encode_64(pl, cookie); if (payload) { @@ -4641,12 +4743,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, + payload_len); if (ret) goto out_put_req; - ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, - payload_len); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); if (ret) goto out_put_req; @@ -4670,11 +4772,10 @@ static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); op->notify.cookie = cookie; - pl = kmalloc(sizeof(*pl), GFP_NOIO); + pl = ceph_pagelist_alloc(GFP_NOIO); if (!pl) return -ENOMEM; - ceph_pagelist_init(pl); ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ ret |= ceph_pagelist_encode_32(pl, timeout); ret |= ceph_pagelist_encode_32(pl, payload_len); @@ -4733,29 +4834,30 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, goto out_put_lreq; } + /* + * Pass 0 for cookie because we don't know it yet, it will be + * filled in by linger_submit(). + */ + ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout, + payload, payload_len); + if (ret) + goto out_put_lreq; + /* for notify_id */ pages = ceph_alloc_page_vector(1, GFP_NOIO); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out_put_lreq; } - - down_write(&osdc->lock); - linger_register(lreq); /* before osd_req_op_* */ - ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, - timeout, payload, payload_len); - if (ret) { - linger_unregister(lreq); - up_write(&osdc->lock); - ceph_release_page_vector(pages, 1); - goto out_put_lreq; - } ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, response_data), pages, PAGE_SIZE, 0, false, true); - linger_submit(lreq); - up_write(&osdc->lock); + ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO); + if (ret) + goto out_put_lreq; + + linger_submit(lreq); ret = linger_reg_commit_wait(lreq); if (!ret) ret = linger_notify_finish_wait(lreq); @@ -4881,10 +4983,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = CEPH_OSD_FLAG_READ; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); - if (ret) - goto out_put_req; - pages = ceph_alloc_page_vector(1, GFP_NOIO); if (IS_ERR(pages)) { ret = PTR_ERR(pages); @@ -4896,6 +4994,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, response_data), pages, PAGE_SIZE, 0, false, true); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { @@ -4958,11 +5060,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ceph_oloc_copy(&req->r_base_oloc, oloc); req->r_flags = flags; - ret = ceph_osdc_alloc_messages(req, GFP_NOIO); - if (ret) - goto out_put_req; - - ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + ret = osd_req_op_cls_init(req, 0, class, method); if (ret) goto out_put_req; @@ -4973,6 +5071,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, osd_req_op_cls_response_data_pages(req, 0, &resp_page, *resp_len, 0, false, false); + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); if (ret >= 0) { @@ -5021,11 +5123,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) goto out_map; err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, - PAGE_SIZE, 10, true, "osd_op"); + PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op"); if (err < 0) goto out_mempool; err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, - PAGE_SIZE, 10, true, "osd_op_reply"); + PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, + "osd_op_reply"); if (err < 0) goto out_msgpool; @@ -5168,6 +5271,80 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, } EXPORT_SYMBOL(ceph_osdc_writepages); +static int osd_req_op_copy_from_init(struct ceph_osd_request *req, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + u32 dst_fadvise_flags, + u8 copy_from_flags) +{ + struct ceph_osd_req_op *op; + struct page **pages; + void *p, *end; + + pages = ceph_alloc_page_vector(1, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags); + op->copy_from.snapid = src_snapid; + op->copy_from.src_version = src_version; + op->copy_from.flags = copy_from_flags; + op->copy_from.src_fadvise_flags = src_fadvise_flags; + + p = page_address(pages[0]); + end = p + PAGE_SIZE; + ceph_encode_string(&p, end, src_oid->name, src_oid->name_len); + encode_oloc(&p, end, src_oloc); + op->indata_len = PAGE_SIZE - (end - p); + + ceph_osd_data_pages_init(&op->copy_from.osd_data, pages, + op->indata_len, 0, false, true); + return 0; +} + +int ceph_osdc_copy_from(struct ceph_osd_client *osdc, + u64 src_snapid, u64 src_version, + struct ceph_object_id *src_oid, + struct ceph_object_locator *src_oloc, + u32 src_fadvise_flags, + struct ceph_object_id *dst_oid, + struct ceph_object_locator *dst_oloc, + u32 dst_fadvise_flags, + u8 copy_from_flags) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; + + req->r_flags = CEPH_OSD_FLAG_WRITE; + + ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc); + ceph_oid_copy(&req->r_t.base_oid, dst_oid); + + ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid, + src_oloc, src_fadvise_flags, + dst_fadvise_flags, copy_from_flags); + if (ret) + goto out; + + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); + if (ret) + goto out; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_copy_from); + int __init ceph_osdc_setup(void) { size_t size = sizeof(struct ceph_osd_request) + @@ -5295,7 +5472,7 @@ static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) u32 front_len = le32_to_cpu(hdr->front_len); u32 data_len = le32_to_cpu(hdr->data_len); - m = ceph_msg_new(type, front_len, GFP_NOIO, false); + m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false); if (!m) return NULL; diff --git a/net/ceph/pagelist.c b/net/ceph/pagelist.c index 2ea0564771d2..65e34f78b05d 100644 --- a/net/ceph/pagelist.c +++ b/net/ceph/pagelist.c @@ -6,6 +6,26 @@ #include <linux/highmem.h> #include <linux/ceph/pagelist.h> +struct ceph_pagelist *ceph_pagelist_alloc(gfp_t gfp_flags) +{ + struct ceph_pagelist *pl; + + pl = kmalloc(sizeof(*pl), gfp_flags); + if (!pl) + return NULL; + + INIT_LIST_HEAD(&pl->head); + pl->mapped_tail = NULL; + pl->length = 0; + pl->room = 0; + INIT_LIST_HEAD(&pl->free_list); + pl->num_pages_free = 0; + refcount_set(&pl->refcnt, 1); + + return pl; +} +EXPORT_SYMBOL(ceph_pagelist_alloc); + static void ceph_pagelist_unmap_tail(struct ceph_pagelist *pl) { if (pl->mapped_tail) { |