diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2015-07-02 11:35:00 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2015-07-02 11:35:00 -0700 |
commit | 0c76c6ba246043bbc5c0f9620a0645ae78217421 (patch) | |
tree | 644a4db58706c4e97478951f0a3a0087ddf26e5e /fs/ceph/inode.c | |
parent | 8688d9540cc6e17df4cba71615e27f04e0378fe6 (diff) | |
parent | 5a60e87603c4c533492c515b7f62578189b03c9c (diff) | |
download | linux-0c76c6ba246043bbc5c0f9620a0645ae78217421.tar.bz2 |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
"We have a pile of bug fixes from Ilya, including a few patches that
sync up the CRUSH code with the latest from userspace.
There is also a long series from Zheng that fixes various issues with
snapshots, inline data, and directory fsync, some simplification and
improvement in the cap release code, and a rework of the caching of
directory contents.
To top it off there are a few small fixes and cleanups from Benoit and
Hong"
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (40 commits)
rbd: use GFP_NOIO in rbd_obj_request_create()
crush: fix a bug in tree bucket decode
libceph: Fix ceph_tcp_sendpage()'s more boolean usage
libceph: Remove spurious kunmap() of the zero page
rbd: queue_depth map option
rbd: store rbd_options in rbd_device
rbd: terminate rbd_opts_tokens with Opt_err
ceph: fix ceph_writepages_start()
rbd: bump queue_max_segments
ceph: rework dcache readdir
crush: sync up with userspace
crush: fix crash from invalid 'take' argument
ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL
ceph: pre-allocate data structure that tracks caps flushing
ceph: re-send flushing caps (which are revoked) in reconnect stage
ceph: send TID of the oldest pending caps flush to MDS
ceph: track pending caps flushing globally
ceph: track pending caps flushing accurately
libceph: fix wrong name "Ceph filesystem for Linux"
ceph: fix directory fsync
...
Diffstat (limited to 'fs/ceph/inode.c')
-rw-r--r-- | fs/ceph/inode.c | 155 |
1 files changed, 121 insertions, 34 deletions
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 571acd88606c..96d2bd829902 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -389,9 +389,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_inline_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; - ci->i_ordered_count = 0; - atomic_set(&ci->i_release_count, 1); - atomic_set(&ci->i_complete_count, 0); + atomic64_set(&ci->i_ordered_count, 1); + atomic64_set(&ci->i_release_count, 1); + atomic64_set(&ci->i_complete_seq[0], 0); + atomic64_set(&ci->i_complete_seq[1], 0); ci->i_symlink = NULL; memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); @@ -415,9 +416,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_flushing_caps = 0; INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); - ci->i_cap_flush_seq = 0; - ci->i_cap_flush_last_tid = 0; - memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid)); + ci->i_prealloc_cap_flush = NULL; + ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; @@ -752,7 +752,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { + if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; ci->i_layout = info->layout; + queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), le64_to_cpu(info->truncate_size), @@ -858,9 +861,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); + i_size_write(inode, 0); __ceph_dir_set_complete(ci, - atomic_read(&ci->i_release_count), - ci->i_ordered_count); + atomic64_read(&ci->i_release_count), + atomic64_read(&ci->i_ordered_count)); } wake = true; @@ -1212,6 +1216,10 @@ retry_lookup: dout("fill_trace doing d_move %p -> %p\n", req->r_old_dentry, dn); + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); + d_move(req->r_old_dentry, dn); dout(" src %p '%pd' dst %p '%pd'\n", req->r_old_dentry, @@ -1222,10 +1230,6 @@ retry_lookup: rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(dn); - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_ordered(dir); - ceph_dir_clear_ordered(olddir); - dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1333,6 +1337,49 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, return err; } +void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl) +{ + if (ctl->page) { + kunmap(ctl->page); + page_cache_release(ctl->page); + ctl->page = NULL; + } +} + +static int fill_readdir_cache(struct inode *dir, struct dentry *dn, + struct ceph_readdir_cache_control *ctl, + struct ceph_mds_request *req) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*); + unsigned idx = ctl->index % nsize; + pgoff_t pgoff = ctl->index / nsize; + + if (!ctl->page || pgoff != page_index(ctl->page)) { + ceph_readdir_cache_release(ctl); + ctl->page = grab_cache_page(&dir->i_data, pgoff); + if (!ctl->page) { + ctl->index = -1; + return -ENOMEM; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(ctl->page); + ctl->dentries = kmap(ctl->page); + } + + if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) && + req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) { + dout("readdir cache dn %p idx %d\n", dn, ctl->index); + ctl->dentries[idx] = dn; + ctl->index++; + } else { + dout("disable readdir cache\n"); + ctl->index = -1; + } + return 0; +} + int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct ceph_mds_session *session) { @@ -1345,8 +1392,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct inode *snapdir = NULL; struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; struct ceph_dentry_info *di; - u64 r_readdir_offset = req->r_readdir_offset; u32 frag = le32_to_cpu(rhead->args.readdir.frag); + struct ceph_readdir_cache_control cache_ctl = {}; + + if (req->r_aborted) + return readdir_prepopulate_inodes_only(req, session); if (rinfo->dir_dir && le32_to_cpu(rinfo->dir_dir->frag) != frag) { @@ -1354,14 +1404,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, frag, le32_to_cpu(rinfo->dir_dir->frag)); frag = le32_to_cpu(rinfo->dir_dir->frag); if (ceph_frag_is_leftmost(frag)) - r_readdir_offset = 2; + req->r_readdir_offset = 2; else - r_readdir_offset = 0; + req->r_readdir_offset = 0; } - if (req->r_aborted) - return readdir_prepopulate_inodes_only(req, session); - if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { snapdir = ceph_get_snapdir(d_inode(parent)); parent = d_find_alias(snapdir); @@ -1374,6 +1421,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir); } + if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + struct ceph_inode_info *ci = ceph_inode(d_inode(parent)); + req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); + req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count); + req->r_readdir_cache_idx = 0; + } + + cache_ctl.index = req->r_readdir_cache_idx; + /* FIXME: release caps/leases if error occurs */ for (i = 0; i < rinfo->dir_nr; i++) { struct ceph_vino vino; @@ -1413,13 +1471,6 @@ retry_lookup: d_delete(dn); dput(dn); goto retry_lookup; - } else { - /* reorder parent's d_subdirs */ - spin_lock(&parent->d_lock); - spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED); - list_move(&dn->d_child, &parent->d_subdirs); - spin_unlock(&dn->d_lock); - spin_unlock(&parent->d_lock); } /* inode */ @@ -1436,13 +1487,15 @@ retry_lookup: } } - if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, - req->r_request_started, -1, - &req->r_caps_reservation) < 0) { + ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, + req->r_request_started, -1, + &req->r_caps_reservation); + if (ret < 0) { pr_err("fill_inode badness on %p\n", in); if (d_really_is_negative(dn)) iput(in); d_drop(dn); + err = ret; goto next_item; } @@ -1458,19 +1511,28 @@ retry_lookup: } di = dn->d_fsdata; - di->offset = ceph_make_fpos(frag, i + r_readdir_offset); + di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); update_dentry_lease(dn, rinfo->dir_dlease[i], req->r_session, req->r_request_started); + + if (err == 0 && cache_ctl.index >= 0) { + ret = fill_readdir_cache(d_inode(parent), dn, + &cache_ctl, req); + if (ret < 0) + err = ret; + } next_item: if (dn) dput(dn); } - if (err == 0) - req->r_did_prepopulate = true; - out: + if (err == 0) { + req->r_did_prepopulate = true; + req->r_readdir_cache_idx = cache_ctl.index; + } + ceph_readdir_cache_release(&cache_ctl); if (snapdir) { iput(snapdir); dput(parent); @@ -1712,11 +1774,13 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf; int issued; int release = 0, dirtied = 0; int mask = 0; int err = 0; int inode_dirty_flags = 0; + bool lock_snap_rwsem = false; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; @@ -1725,13 +1789,31 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (err != 0) return err; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR, USE_AUTH_MDS); - if (IS_ERR(req)) + if (IS_ERR(req)) { + ceph_free_cap_flush(prealloc_cf); return PTR_ERR(req); + } spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); + + if (!ci->i_head_snapc && + (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + issued = __ceph_caps_issued(ci, NULL); + } + } + dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ia_valid & ATTR_UID) { @@ -1874,12 +1956,15 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dout("setattr %p ATTR_FILE ... hrm!\n", inode); if (dirtied) { - inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied); + inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, + &prealloc_cf); inode->i_ctime = CURRENT_TIME; } release &= issued; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); @@ -1904,9 +1989,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ceph_mdsc_put_request(req); if (mask & CEPH_SETATTR_SIZE) __ceph_do_pending_vmtruncate(inode); + ceph_free_cap_flush(prealloc_cf); return err; out_put: ceph_mdsc_put_request(req); + ceph_free_cap_flush(prealloc_cf); return err; } |