From 10183a69551f76702ac68bc74a437b25419c6de0 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 27 Apr 2015 15:33:28 +0800 Subject: ceph: check OSD caps before read/write Signed-off-by: Yan, Zheng --- fs/ceph/inode.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e876e1944519..1a68c0e38a52 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -753,7 +753,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { + if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; ci->i_layout = info->layout; + queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), le64_to_cpu(info->truncate_size), -- cgit v1.2.3 From 604d1b0245b97738cde4341944ad93edff4b2827 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 1 May 2015 17:49:16 +0800 Subject: ceph: take snap_rwsem when accessing snap realm's cached_context When ceph inode's i_head_snapc is NULL, __ceph_mark_dirty_caps() accesses snap realm's cached_context. So we need take read lock of snap_rwsem. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 4 +++- fs/ceph/inode.c | 15 +++++++++++++++ fs/ceph/xattr.c | 45 +++++++++++++++++++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 7 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f1dbcae7c75c..900c05fd77d8 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1413,9 +1413,11 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { - if (!ci->i_head_snapc) + if (!ci->i_head_snapc) { + WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); ci->i_head_snapc = ceph_get_snap_context( ci->i_snap_realm->cached_context); + } dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 1a68c0e38a52..1c991df276c9 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1727,6 +1727,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) int mask = 0; int err = 0; int inode_dirty_flags = 0; + bool lock_snap_rwsem = false; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; @@ -1742,6 +1743,18 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); + + if (!ci->i_head_snapc && + (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + issued = __ceph_caps_issued(ci, NULL); + } + } + dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ia_valid & ATTR_UID) { @@ -1890,6 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) release &= issued; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index cd7ffad4041d..c6f7d9b82085 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -911,6 +911,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, struct inode *inode = d_inode(dentry); struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; int issued; int err; int dirty = 0; @@ -920,6 +921,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, char *newval = NULL; struct ceph_inode_xattr *xattr = NULL; int required_blob_size; + bool lock_snap_rwsem = false; if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -951,9 +953,20 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); - dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; + + if (!lock_snap_rwsem && !ci->i_head_snapc) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + goto retry; + } + } + + dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); __build_xattrs(inode); required_blob_size = __get_required_blob_size(ci, name_len, val_len); @@ -966,7 +979,7 @@ retry: dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) - goto out; + goto do_sync_unlocked; spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); @@ -984,6 +997,8 @@ retry: } spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); return err; @@ -991,6 +1006,8 @@ retry: do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); err = ceph_sync_setxattr(dentry, name, value, size, flags); out: kfree(newname); @@ -1044,10 +1061,12 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) struct inode *inode = d_inode(dentry); struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; int issued; int err; int required_blob_size; int dirty; + bool lock_snap_rwsem = false; if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -1064,10 +1083,21 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); - dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); - if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; + + if (!lock_snap_rwsem && !ci->i_head_snapc) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + goto retry; + } + } + + dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); + __build_xattrs(inode); required_blob_size = __get_required_blob_size(ci, 0, 0); @@ -1080,7 +1110,7 @@ retry: dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) - goto out; + goto do_sync_unlocked; spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); @@ -1094,14 +1124,17 @@ retry: ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); return err; do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); err = ceph_send_removexattr(dentry, name); -out: return err; } -- cgit v1.2.3 From 553adfd941f8ca622965ef809553d918ea039929 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 9 Jun 2015 15:48:57 +0800 Subject: ceph: track pending caps flushing accurately Previously we do not trace accurate TID for flushing caps. when MDS failovers, we have no choice but to re-send all flushing caps with a new TID. This can cause problem because MDS can has already flushed some caps and has issued the same caps to other client. The re-sent cap flush has a new TID, which makes MDS unable to detect if it has already processed the cap flush. This patch adds code to track pending caps flushing accurately. When re-sending cap flush is needed, we use its original flush TID. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 245 +++++++++++++++++++++++++++++++++------------------ fs/ceph/inode.c | 3 +- fs/ceph/mds_client.c | 20 +++++ fs/ceph/mds_client.h | 1 + fs/ceph/super.h | 11 ++- 5 files changed, 192 insertions(+), 88 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index dc988337f841..9a25f8d66fbc 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1097,7 +1097,8 @@ void ceph_queue_caps_release(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, int used, int want, int retain, int flushing) + int op, int used, int want, int retain, int flushing, + u64 flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; @@ -1115,8 +1116,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, u64 xattr_version = 0; struct ceph_buffer *xattr_blob = NULL; int delayed = 0; - u64 flush_tid = 0; - int i; int ret; bool inline_data; @@ -1160,24 +1159,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - if (flushing) { - /* - * assign a tid for flush operations so we can avoid - * flush1 -> dirty1 -> flush2 -> flushack1 -> mark - * clean type races. track latest tid for every bit - * so we can handle flush AxFw, flush Fw, and have the - * first ack clean Ax. - */ - flush_tid = ++ci->i_cap_flush_last_tid; - dout(" cap_flush_tid %d\n", (int)flush_tid); - for (i = 0; i < CEPH_CAP_BITS; i++) - if (flushing & (1 << i)) - ci->i_cap_flush_tid[i] = flush_tid; - - follows = ci->i_head_snapc->seq; - } else { - follows = 0; - } + follows = flushing ? ci->i_head_snapc->seq : 0; keep = cap->implemented; seq = cap->seq; @@ -1311,7 +1293,10 @@ retry: goto retry; } - capsnap->flush_tid = ++ci->i_cap_flush_last_tid; + spin_lock(&mdsc->cap_dirty_lock); + capsnap->flush_tid = ++mdsc->last_cap_flush_tid; + spin_unlock(&mdsc->cap_dirty_lock); + atomic_inc(&capsnap->nref); if (list_empty(&capsnap->flushing_item)) list_add_tail(&capsnap->flushing_item, @@ -1407,6 +1392,29 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) return dirty; } +static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &ci->i_cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, i_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->i_node, parent, p); + rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1414,10 +1422,12 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, - struct ceph_mds_session *session) + struct ceph_mds_session *session, + u64 *flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap_flush *cf; int flushing; BUG_ON(ci->i_dirty_caps == 0); @@ -1432,9 +1442,14 @@ static int __mark_caps_flushing(struct inode *inode, ci->i_dirty_caps = 0; dout(" inode %p now !dirty\n", inode); + cf = kmalloc(sizeof(*cf), GFP_ATOMIC); + cf->caps = flushing; + spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); + cf->tid = ++mdsc->last_cap_flush_tid; + if (list_empty(&ci->i_flushing_item)) { ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); @@ -1448,6 +1463,9 @@ static int __mark_caps_flushing(struct inode *inode, } spin_unlock(&mdsc->cap_dirty_lock); + __add_cap_flushing_to_inode(ci, cf); + + *flush_tid = cf->tid; return flushing; } @@ -1493,6 +1511,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; + u64 flush_tid; int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; @@ -1711,17 +1730,20 @@ ack: took_snap_rwsem = 1; } - if (cap == ci->i_auth_cap && ci->i_dirty_caps) - flushing = __mark_caps_flushing(inode, session); - else + if (cap == ci->i_auth_cap && ci->i_dirty_caps) { + flushing = __mark_caps_flushing(inode, session, + &flush_tid); + } else { flushing = 0; + flush_tid = 0; + } mds = cap->mds; /* remember mds, so we don't repeat */ sent++; /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing); + want, retain, flushing, flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1750,12 +1772,13 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, u16 flush_tid[]) +static int try_flush_caps(struct inode *inode, u64 *ptid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_session *session = NULL; int flushing = 0; + u64 flush_tid = 0; retry: spin_lock(&ci->i_ceph_lock); @@ -1780,46 +1803,52 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session); + flushing = __mark_caps_flushing(inode, session, &flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - cap->issued | cap->implemented, flushing); + (cap->issued | cap->implemented), + flushing, flush_tid); - spin_lock(&ci->i_ceph_lock); - if (delayed) + if (delayed) { + spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); + spin_unlock(&ci->i_ceph_lock); + } + } else { + struct rb_node *n = rb_last(&ci->i_cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, i_node); + flush_tid = cf->tid; + } + flushing = ci->i_flushing_caps; + spin_unlock(&ci->i_ceph_lock); } - - flushing = ci->i_flushing_caps; - if (flushing) - memcpy(flush_tid, ci->i_cap_flush_tid, - sizeof(ci->i_cap_flush_tid)); out: - spin_unlock(&ci->i_ceph_lock); if (session) mutex_unlock(&session->s_mutex); + + *ptid = flush_tid; return flushing; } /* * Return true if we've flushed caps through the given flush_tid. */ -static int caps_are_flushed(struct inode *inode, u16 flush_tid[]) +static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - int i, ret = 1; + struct ceph_cap_flush *cf; + struct rb_node *n; + int ret = 1; spin_lock(&ci->i_ceph_lock); - for (i = 0; i < CEPH_CAP_BITS; i++) { - if (!(ci->i_flushing_caps & (1 << i))) - continue; - // tid only has 16 bits. we need to handle wrapping - if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) { - /* still flushing this bit */ + n = rb_first(&ci->i_cap_flush_tree); + if (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid <= flush_tid) ret = 0; - break; - } } spin_unlock(&ci->i_ceph_lock); return ret; @@ -1922,7 +1951,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); - u16 flush_tid[CEPH_CAP_BITS]; + u64 flush_tid; int ret; int dirty; @@ -1938,7 +1967,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, flush_tid); + dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); ret = unsafe_dirop_wait(inode); @@ -1967,14 +1996,14 @@ out: int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) { struct ceph_inode_info *ci = ceph_inode(inode); - u16 flush_tid[CEPH_CAP_BITS]; + u64 flush_tid; int err = 0; int dirty; int wait = wbc->sync_mode == WB_SYNC_ALL; dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, flush_tid); + dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); @@ -2022,6 +2051,51 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, } } +static int __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + int delayed = 0; + u64 first_tid = 0; + + while (true) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", inode, + cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + break; + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid >= first_tid) + break; + } + if (!n) { + spin_unlock(&ci->i_ceph_lock); + break; + } + + cf = rb_entry(n, struct ceph_cap_flush, i_node); + first_tid = cf->tid + 1; + + dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, + cap, cf->tid, ceph_cap_string(cf->caps)); + delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + cap->issued | cap->implemented, + cf->caps, cf->tid); + } + return delayed; +} + void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { @@ -2031,28 +2105,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps mds%d\n", session->s_mds); list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - struct inode *inode = &ci->vfs_inode; - struct ceph_cap *cap; - int delayed = 0; - - spin_lock(&ci->i_ceph_lock); - cap = ci->i_auth_cap; - if (cap && cap->session == session) { - dout("kick_flushing_caps %p cap %p %s\n", inode, - cap, ceph_cap_string(ci->i_flushing_caps)); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } - } else { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); + int delayed = __kick_flushing_caps(mdsc, session, ci); + if (delayed) { + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(mdsc, ci); spin_unlock(&ci->i_ceph_lock); } } @@ -2064,7 +2120,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; - int delayed = 0; spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; @@ -2074,16 +2129,16 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, __ceph_flush_snaps(ci, &session, 1); if (ci->i_flushing_caps) { + int delayed; + spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &cap->session->s_cap_flushing); spin_unlock(&mdsc->cap_dirty_lock); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps); + spin_unlock(&ci->i_ceph_lock); + + delayed = __kick_flushing_caps(mdsc, session, ci); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2836,16 +2891,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_cap_flush *cf; + struct rb_node *n; + LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; int drop = 0; - int i; - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((dirty & (1 << i)) && - (u16)flush_tid == ci->i_cap_flush_tid[i]) - cleaned |= 1 << i; + n = rb_first(&ci->i_cap_flush_tree); + while (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + n = rb_next(&cf->i_node); + if (cf->tid == flush_tid) + cleaned = cf->caps; + if (cf->tid <= flush_tid) { + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add_tail(&cf->list, &to_remove); + } else { + cleaned &= ~cf->caps; + if (!cleaned) + break; + } + } dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," " flushing %s -> %s\n", @@ -2890,6 +2958,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, out: spin_unlock(&ci->i_ceph_lock); + + while (!list_empty(&to_remove)) { + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + kfree(cf); + } if (drop) iput(inode); } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 1c991df276c9..6d3f19db8c8a 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -417,8 +417,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); ci->i_cap_flush_seq = 0; - ci->i_cap_flush_last_tid = 0; - memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid)); + ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 8080d486a991..839901f51512 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1142,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + LIST_HEAD(to_remove); int drop = 0; dout("removing cap %p, ci is %p, inode is %p\n", @@ -1149,9 +1150,19 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&ci->i_ceph_lock); __ceph_remove_cap(cap, false); if (!ci->i_auth_cap) { + struct ceph_cap_flush *cf; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + while (true) { + struct rb_node *n = rb_first(&ci->i_cap_flush_tree); + if (!n) + break; + cf = rb_entry(n, struct ceph_cap_flush, i_node); + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add(&cf->list, &to_remove); + } + spin_lock(&mdsc->cap_dirty_lock); if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( @@ -1173,8 +1184,16 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } spin_unlock(&mdsc->cap_dirty_lock); + } spin_unlock(&ci->i_ceph_lock); + while (!list_empty(&to_remove)) { + struct ceph_cap_flush *cf; + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + kfree(cf); + } while (drop--) iput(inode); return 0; @@ -3408,6 +3427,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); mdsc->cap_flush_seq = 0; + mdsc->last_cap_flush_tid = 1; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 509d6822e9b1..19f6084203f0 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -307,6 +307,7 @@ struct ceph_mds_client { spinlock_t snap_flush_lock; u64 cap_flush_seq; + u64 last_cap_flush_tid; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index c4961353d058..cc597f52e046 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -186,6 +186,15 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) } } +struct ceph_cap_flush { + u64 tid; + int caps; + union { + struct rb_node i_node; + struct list_head list; + }; +}; + /* * The frag tree describes how a directory is fragmented, potentially across * multiple metadata servers. It is also used to indicate points where @@ -299,7 +308,7 @@ struct ceph_inode_info { /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ - u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS]; + struct rb_root i_cap_flush_tree; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ -- cgit v1.2.3 From 8310b08913eca8aee98744c9aff1ec0d1f603b19 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 9 Jun 2015 17:20:12 +0800 Subject: ceph: track pending caps flushing globally So we know TID of the oldest pending caps flushing. Later patch will send this information to MDS, so that MDS can trim its completed caps flush list. Tracking pending caps flushing globally also simplifies syncfs code. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 50 +++++++++++++++++++++++----- fs/ceph/inode.c | 1 - fs/ceph/mds_client.c | 93 +++++++++++++++++++++++++++------------------------- fs/ceph/mds_client.h | 2 +- fs/ceph/super.h | 2 +- 5 files changed, 91 insertions(+), 57 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 9a25f8d66fbc..0295048724d2 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1415,6 +1415,29 @@ static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); } +static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &mdsc->cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, g_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->g_node, parent, p); + rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1449,17 +1472,16 @@ static int __mark_caps_flushing(struct inode *inode, list_del_init(&ci->i_dirty_item); cf->tid = ++mdsc->last_cap_flush_tid; + __add_cap_flushing_to_mdsc(mdsc, cf); if (list_empty(&ci->i_flushing_item)) { - ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; - dout(" inode %p now flushing seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing tid %llu\n", inode, cf->tid); } else { list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); - dout(" inode %p now flushing (more) seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing (more) tid %llu\n", + inode, cf->tid); } spin_unlock(&mdsc->cap_dirty_lock); @@ -2123,8 +2145,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; - dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, - ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); + dout("kick_flushing_inode_caps %p flushing %s\n", inode, + ceph_cap_string(ci->i_flushing_caps)); __ceph_flush_snaps(ci, &session, 1); @@ -2921,12 +2943,23 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), ceph_cap_string(ci->i_flushing_caps & ~cleaned)); - if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned)) + if (list_empty(&to_remove) && !cleaned) goto out; ci->i_flushing_caps &= ~cleaned; spin_lock(&mdsc->cap_dirty_lock); + + if (!list_empty(&to_remove)) { + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (!cf || cf->tid > flush_tid) + wake_up_all(&mdsc->cap_flushing_wq); + } + if (ci->i_flushing_caps == 0) { list_del_init(&ci->i_flushing_item); if (!list_empty(&session->s_cap_flushing)) @@ -2936,7 +2969,6 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_inode_info, i_flushing_item)->vfs_inode); mdsc->num_cap_flushing--; - wake_up_all(&mdsc->cap_flushing_wq); dout(" inode %p now !flushing\n", inode); if (ci->i_dirty_caps == 0) { diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6d3f19db8c8a..3326302f5884 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -416,7 +416,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_flushing_caps = 0; INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); - ci->i_cap_flush_seq = 0; ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 839901f51512..31f6a78caa0a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1164,6 +1164,10 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, } spin_lock(&mdsc->cap_dirty_lock); + + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( " dropping dirty %s state for %p %lld\n", @@ -1467,39 +1471,56 @@ static int trim_caps(struct ceph_mds_client *mdsc, return 0; } -static int check_cap_flush(struct ceph_inode_info *ci, - u64 want_flush_seq, u64 want_snap_seq) +static int check_capsnap_flush(struct ceph_inode_info *ci, + u64 want_snap_seq) { - int ret1 = 1, ret2 = 1; + int ret = 1; spin_lock(&ci->i_ceph_lock); - if (want_flush_seq > 0 && ci->i_flushing_caps) - ret1 = ci->i_cap_flush_seq >= want_flush_seq; - if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { struct ceph_cap_snap *capsnap = list_first_entry(&ci->i_cap_snaps, struct ceph_cap_snap, ci_item); - ret2 = capsnap->follows >= want_snap_seq; + ret = capsnap->follows >= want_snap_seq; } spin_unlock(&ci->i_ceph_lock); - return ret1 && ret2; + return ret; +} + +static int check_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid) +{ + struct rb_node *n; + struct ceph_cap_flush *cf; + int ret = 1; + + spin_lock(&mdsc->cap_dirty_lock); + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (cf && cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid %llu <= %llu\n", + cf->tid, want_flush_tid); + ret = 0; + } + spin_unlock(&mdsc->cap_dirty_lock); + return ret; } /* * flush all dirty inode data to disk. * - * returns true if we've flushed through want_flush_seq + * returns true if we've flushed through want_flush_tid */ static void wait_caps_flush(struct ceph_mds_client *mdsc, - u64 want_flush_seq, u64 want_snap_seq) + u64 want_flush_tid, u64 want_snap_seq) { int mds; - dout("check_cap_flush want %lld\n", want_flush_seq); + dout("check_caps_flush want %llu snap want %llu\n", + want_flush_tid, want_snap_seq); mutex_lock(&mdsc->mutex); for (mds = 0; mds < mdsc->max_sessions; ) { struct ceph_mds_session *session = mdsc->sessions[mds]; - struct inode *inode1 = NULL, *inode2 = NULL; + struct inode *inode = NULL; if (!session) { mds++; @@ -1509,58 +1530,40 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_flushing)) { - struct ceph_inode_info *ci = - list_first_entry(&session->s_cap_flushing, - struct ceph_inode_info, - i_flushing_item); - - if (!check_cap_flush(ci, want_flush_seq, 0)) { - dout("check_cap_flush still flushing %p " - "seq %lld <= %lld to mds%d\n", - &ci->vfs_inode, ci->i_cap_flush_seq, - want_flush_seq, mds); - inode1 = igrab(&ci->vfs_inode); - } - } if (!list_empty(&session->s_cap_snaps_flushing)) { struct ceph_cap_snap *capsnap = list_first_entry(&session->s_cap_snaps_flushing, struct ceph_cap_snap, flushing_item); struct ceph_inode_info *ci = capsnap->ci; - if (!check_cap_flush(ci, 0, want_snap_seq)) { + if (!check_capsnap_flush(ci, want_snap_seq)) { dout("check_cap_flush still flushing snap %p " "follows %lld <= %lld to mds%d\n", &ci->vfs_inode, capsnap->follows, want_snap_seq, mds); - inode2 = igrab(&ci->vfs_inode); + inode = igrab(&ci->vfs_inode); } } mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); - if (inode1) { - wait_event(mdsc->cap_flushing_wq, - check_cap_flush(ceph_inode(inode1), - want_flush_seq, 0)); - iput(inode1); - } - if (inode2) { + if (inode) { wait_event(mdsc->cap_flushing_wq, - check_cap_flush(ceph_inode(inode2), - 0, want_snap_seq)); - iput(inode2); - } - - if (!inode1 && !inode2) + check_capsnap_flush(ceph_inode(inode), + want_snap_seq)); + iput(inode); + } else { mds++; + } mutex_lock(&mdsc->mutex); } - mutex_unlock(&mdsc->mutex); - dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); + + wait_event(mdsc->cap_flushing_wq, + check_caps_flush(mdsc, want_flush_tid)); + + dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); } /* @@ -3426,8 +3429,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->cap_delay_lock); INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); - mdsc->cap_flush_seq = 0; mdsc->last_cap_flush_tid = 1; + mdsc->cap_flush_tree = RB_ROOT; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; @@ -3554,7 +3557,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ceph_flush_dirty_caps(mdsc); spin_lock(&mdsc->cap_dirty_lock); - want_flush = mdsc->cap_flush_seq; + want_flush = mdsc->last_cap_flush_tid; spin_unlock(&mdsc->cap_dirty_lock); down_read(&mdsc->snap_rwsem); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 19f6084203f0..470be4eb25f3 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -306,8 +306,8 @@ struct ceph_mds_client { struct list_head snap_flush_list; /* cap_snaps ready to flush */ spinlock_t snap_flush_lock; - u64 cap_flush_seq; u64 last_cap_flush_tid; + struct rb_root cap_flush_tree; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index cc597f52e046..94d91471165f 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -189,6 +189,7 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) struct ceph_cap_flush { u64 tid; int caps; + struct rb_node g_node; union { struct rb_node i_node; struct list_head list; @@ -304,7 +305,6 @@ struct ceph_inode_info { struct ceph_cap *i_auth_cap; /* authoritative cap, if any */ unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */ struct list_head i_dirty_item, i_flushing_item; - u64 i_cap_flush_seq; /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ -- cgit v1.2.3 From f66fd9f0952187d274c13c136b74548f792c1925 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Jun 2015 17:26:13 +0800 Subject: ceph: pre-allocate data structure that tracks caps flushing Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 19 +++++++++++++++---- fs/ceph/caps.c | 26 ++++++++++++++++++++++---- fs/ceph/file.c | 18 ++++++++++++++++-- fs/ceph/inode.c | 15 +++++++++++++-- fs/ceph/mds_client.c | 6 +++++- fs/ceph/super.c | 8 ++++++++ fs/ceph/super.h | 6 +++++- fs/ceph/xattr.c | 20 ++++++++++++++++++-- include/linux/ceph/libceph.h | 1 + 9 files changed, 103 insertions(+), 16 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5f53ac0d9d7c..7edf3c49e661 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1308,12 +1308,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; + struct ceph_cap_flush *prealloc_cf; struct page *page = vmf->page; loff_t off = page_offset(page); loff_t size = i_size_read(inode); size_t len; int want, got, ret; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return VM_FAULT_SIGBUS; + if (ci->i_inline_version != CEPH_INLINE_NONE) { struct page *locked_page = NULL; if (off == 0) { @@ -1323,8 +1328,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = ceph_uninline_data(vma->vm_file, locked_page); if (locked_page) unlock_page(locked_page); - if (ret < 0) - return VM_FAULT_SIGBUS; + if (ret < 0) { + ret = VM_FAULT_SIGBUS; + goto out_free; + } } if (off + PAGE_CACHE_SIZE <= size) @@ -1346,7 +1353,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) break; if (ret != -ERESTARTSYS) { WARN_ON(1); - return VM_FAULT_SIGBUS; + ret = VM_FAULT_SIGBUS; + goto out_free; } } dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", @@ -1381,7 +1389,8 @@ out: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1390,6 +1399,8 @@ out: dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, len, ceph_cap_string(got), ret); ceph_put_cap_refs(ci, got); +out_free: + ceph_free_cap_flush(prealloc_cf); return ret; } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 69a16044ec41..dd7b20adf1d4 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1356,7 +1356,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) * Caller is then responsible for calling __mark_inode_dirty with the * returned flags value. */ -int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) +int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf) { struct ceph_mds_client *mdsc = ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; @@ -1376,6 +1377,9 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { + WARN_ON_ONCE(ci->i_prealloc_cap_flush); + swap(ci->i_prealloc_cap_flush, *pcf); + if (!ci->i_head_snapc) { WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); ci->i_head_snapc = ceph_get_snap_context( @@ -1391,6 +1395,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ihold(inode); dirty |= I_DIRTY_SYNC; } + } else { + WARN_ON_ONCE(!ci->i_prealloc_cap_flush); } BUG_ON(list_empty(&ci->i_dirty_item)); if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && @@ -1446,6 +1452,17 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); } +struct ceph_cap_flush *ceph_alloc_cap_flush(void) +{ + return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); +} + +void ceph_free_cap_flush(struct ceph_cap_flush *cf) +{ + if (cf) + kmem_cache_free(ceph_cap_flush_cachep, cf); +} + static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) { struct rb_node *n = rb_first(&mdsc->cap_flush_tree); @@ -1469,11 +1486,12 @@ static int __mark_caps_flushing(struct inode *inode, { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap_flush *cf; + struct ceph_cap_flush *cf = NULL; int flushing; BUG_ON(ci->i_dirty_caps == 0); BUG_ON(list_empty(&ci->i_dirty_item)); + BUG_ON(!ci->i_prealloc_cap_flush); flushing = ci->i_dirty_caps; dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", @@ -1484,7 +1502,7 @@ static int __mark_caps_flushing(struct inode *inode, ci->i_dirty_caps = 0; dout(" inode %p now !dirty\n", inode); - cf = kmalloc(sizeof(*cf), GFP_ATOMIC); + swap(cf, ci->i_prealloc_cap_flush); cf->caps = flushing; cf->kick = false; @@ -3075,7 +3093,7 @@ out: cf = list_first_entry(&to_remove, struct ceph_cap_flush, list); list_del(&cf->list); - kfree(cf); + ceph_free_cap_flush(cf); } if (drop) iput(inode); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0a76a370d798..8a4eb4d21d3c 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -939,6 +939,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; + struct ceph_cap_flush *prealloc_cf; ssize_t count, written = 0; int err, want, got; loff_t pos; @@ -946,6 +947,10 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + mutex_lock(&inode->i_mutex); /* We can write back this queue in page reclaim */ @@ -1050,7 +1055,8 @@ retry_snap: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1074,6 +1080,7 @@ retry_snap: out: mutex_unlock(&inode->i_mutex); out_unlocked: + ceph_free_cap_flush(prealloc_cf); current->backing_dev_info = NULL; return written ? written : err; } @@ -1270,6 +1277,7 @@ static long ceph_fallocate(struct file *file, int mode, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; + struct ceph_cap_flush *prealloc_cf; int want, got = 0; int dirty; int ret = 0; @@ -1282,6 +1290,10 @@ static long ceph_fallocate(struct file *file, int mode, if (!S_ISREG(inode->i_mode)) return -EOPNOTSUPP; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + mutex_lock(&inode->i_mutex); if (ceph_snap(inode) != CEPH_NOSNAP) { @@ -1328,7 +1340,8 @@ static long ceph_fallocate(struct file *file, int mode, if (!ret) { spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1337,6 +1350,7 @@ static long ceph_fallocate(struct file *file, int mode, ceph_put_cap_refs(ci, got); unlock: mutex_unlock(&inode->i_mutex); + ceph_free_cap_flush(prealloc_cf); return ret; } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 3326302f5884..e86d1a4efc46 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -416,6 +416,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_flushing_caps = 0; INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); + ci->i_prealloc_cap_flush = NULL; ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; @@ -1720,6 +1721,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf; int issued; int release = 0, dirtied = 0; int mask = 0; @@ -1734,10 +1736,16 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (err != 0) return err; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR, USE_AUTH_MDS); - if (IS_ERR(req)) + if (IS_ERR(req)) { + ceph_free_cap_flush(prealloc_cf); return PTR_ERR(req); + } spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); @@ -1895,7 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dout("setattr %p ATTR_FILE ... hrm!\n", inode); if (dirtied) { - inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied); + inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, + &prealloc_cf); inode->i_ctime = CURRENT_TIME; } @@ -1927,9 +1936,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ceph_mdsc_put_request(req); if (mask & CEPH_SETATTR_SIZE) __ceph_do_pending_vmtruncate(inode); + ceph_free_cap_flush(prealloc_cf); return err; out_put: ceph_mdsc_put_request(req); + ceph_free_cap_flush(prealloc_cf); return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 89e4305a94d4..8d73fe9d488b 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1189,6 +1189,10 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, } spin_unlock(&mdsc->cap_dirty_lock); + if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { + list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + ci->i_prealloc_cap_flush = NULL; + } } spin_unlock(&ci->i_ceph_lock); while (!list_empty(&to_remove)) { @@ -1196,7 +1200,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, cf = list_first_entry(&to_remove, struct ceph_cap_flush, list); list_del(&cf->list); - kfree(cf); + ceph_free_cap_flush(cf); } while (drop--) iput(inode); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index edeb83c43112..d1c833c321b9 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -622,6 +622,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) */ struct kmem_cache *ceph_inode_cachep; struct kmem_cache *ceph_cap_cachep; +struct kmem_cache *ceph_cap_flush_cachep; struct kmem_cache *ceph_dentry_cachep; struct kmem_cache *ceph_file_cachep; @@ -647,6 +648,10 @@ static int __init init_caches(void) SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); if (ceph_cap_cachep == NULL) goto bad_cap; + ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush, + SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); + if (ceph_cap_flush_cachep == NULL) + goto bad_cap_flush; ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info, SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); @@ -665,6 +670,8 @@ static int __init init_caches(void) bad_file: kmem_cache_destroy(ceph_dentry_cachep); bad_dentry: + kmem_cache_destroy(ceph_cap_flush_cachep); +bad_cap_flush: kmem_cache_destroy(ceph_cap_cachep); bad_cap: kmem_cache_destroy(ceph_inode_cachep); @@ -681,6 +688,7 @@ static void destroy_caches(void) kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_cap_cachep); + kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index e7f13f742357..4415e977d72b 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -309,6 +309,7 @@ struct ceph_inode_info { /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ + struct ceph_cap_flush *i_prealloc_cap_flush; struct rb_root i_cap_flush_tree; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ @@ -578,7 +579,10 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci) { return ci->i_dirty_caps | ci->i_flushing_caps; } -extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); +extern struct ceph_cap_flush *ceph_alloc_cap_flush(void); +extern void ceph_free_cap_flush(struct ceph_cap_flush *cf); +extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf); extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci, struct ceph_cap *ocap, int mask); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index c6f7d9b82085..819163d8313b 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -912,6 +912,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf = NULL; int issued; int err; int dirty = 0; @@ -950,6 +951,10 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, if (!xattr) goto out; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + goto out; + spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); @@ -991,7 +996,8 @@ retry: flags, value ? 1 : -1, &xattr); if (!err) { - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, + &prealloc_cf); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; } @@ -1001,6 +1007,7 @@ retry: up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); + ceph_free_cap_flush(prealloc_cf); return err; do_sync: @@ -1010,6 +1017,7 @@ do_sync_unlocked: up_read(&mdsc->snap_rwsem); err = ceph_sync_setxattr(dentry, name, value, size, flags); out: + ceph_free_cap_flush(prealloc_cf); kfree(newname); kfree(newval); kfree(xattr); @@ -1062,6 +1070,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf = NULL; int issued; int err; int required_blob_size; @@ -1079,6 +1088,10 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN)) goto do_sync_unlocked; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + err = -ENOMEM; spin_lock(&ci->i_ceph_lock); retry: @@ -1120,7 +1133,8 @@ retry: err = __remove_xattr_by_name(ceph_inode(inode), name); - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, + &prealloc_cf); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; spin_unlock(&ci->i_ceph_lock); @@ -1128,12 +1142,14 @@ retry: up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); + ceph_free_cap_flush(prealloc_cf); return err; do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: if (lock_snap_rwsem) up_read(&mdsc->snap_rwsem); + ceph_free_cap_flush(prealloc_cf); err = ceph_send_removexattr(dentry, name); return err; } diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index d73a569f9bf5..9ebee53d3bf5 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -174,6 +174,7 @@ static inline int calc_pages_for(u64 off, u64 len) extern struct kmem_cache *ceph_inode_cachep; extern struct kmem_cache *ceph_cap_cachep; +extern struct kmem_cache *ceph_cap_flush_cachep; extern struct kmem_cache *ceph_dentry_cachep; extern struct kmem_cache *ceph_file_cachep; -- cgit v1.2.3 From fdd4e15838e59c394a1ec4963b57c22c12608685 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 16 Jun 2015 20:48:56 +0800 Subject: ceph: rework dcache readdir Previously our dcache readdir code relies on that child dentries in directory dentry's d_subdir list are sorted by dentry's offset in descending order. When adding dentries to the dcache, if a dentry already exists, our readdir code moves it to head of directory dentry's d_subdir list. This design relies on dcache internals. Al Viro suggests using ncpfs's approach: keeping array of pointers to dentries in page cache of directory inode. the validity of those pointers are presented by directory inode's complete and ordered flags. When a dentry gets pruned, we clear directory inode's complete flag in the d_prune() callback. Before moving a dentry to other directory, we clear the ordered flag for both old and new directory. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 14 ++- fs/ceph/dir.c | 313 ++++++++++++++++++++++++++------------------------- fs/ceph/file.c | 2 +- fs/ceph/inode.c | 118 ++++++++++++++----- fs/ceph/mds_client.h | 3 + fs/ceph/super.h | 60 +++++----- 6 files changed, 295 insertions(+), 215 deletions(-) (limited to 'fs/ceph/inode.c') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index dd7b20adf1d4..dc10c9dd36c1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci) used |= CEPH_CAP_PIN; if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; - if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) + if (ci->i_rdcache_ref || + (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ + ci->vfs_inode.i_data.nrpages)) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; @@ -1651,9 +1653,10 @@ retry_locked: * If we fail, it's because pages are locked.... try again later. */ if ((!is_delayed || mdsc->stopping) && - ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ - inode->i_data.nrpages && /* have cached pages */ - (file_wanted == 0 || /* no open files */ + !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ + ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ + inode->i_data.nrpages && /* have cached pages */ + (file_wanted == 0 || /* no open files */ (revoking & (CEPH_CAP_FILE_CACHE| CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ !tried_invalidate) { @@ -2805,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, * try to invalidate (once). (If there are dirty buffers, we * will invalidate _after_ writeback.) */ - if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ + ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { if (try_nonblocking_invalidate(inode)) { diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index b99f2ff8189d..9314b4ea2375 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -106,6 +106,27 @@ static int fpos_cmp(loff_t l, loff_t r) return (int)(fpos_off(l) - fpos_off(r)); } +/* + * make note of the last dentry we read, so we can + * continue at the same lexicographical point, + * regardless of what dir changes take place on the + * server. + */ +static int note_last_dentry(struct ceph_file_info *fi, const char *name, + int len, unsigned next_offset) +{ + char *buf = kmalloc(len+1, GFP_KERNEL); + if (!buf) + return -ENOMEM; + kfree(fi->last_name); + fi->last_name = buf; + memcpy(fi->last_name, name, len); + fi->last_name[len] = 0; + fi->next_offset = next_offset; + dout("note_last_dentry '%s'\n", fi->last_name); + return 0; +} + /* * When possible, we try to satisfy a readdir by peeking at the * dcache. We make this work by carefully ordering dentries on @@ -123,123 +144,113 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, struct ceph_file_info *fi = file->private_data; struct dentry *parent = file->f_path.dentry; struct inode *dir = d_inode(parent); - struct list_head *p; - struct dentry *dentry, *last; + struct dentry *dentry, *last = NULL; struct ceph_dentry_info *di; + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *); int err = 0; + loff_t ptr_pos = 0; + struct ceph_readdir_cache_control cache_ctl = {}; - /* claim ref on last dentry we returned */ - last = fi->dentry; - fi->dentry = NULL; - - dout("__dcache_readdir %p v%u at %llu (last %p)\n", - dir, shared_gen, ctx->pos, last); - - spin_lock(&parent->d_lock); + dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); - /* start at beginning? */ - if (ctx->pos == 2 || last == NULL || - fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) { - if (list_empty(&parent->d_subdirs)) - goto out_unlock; - p = parent->d_subdirs.prev; - dout(" initial p %p/%p\n", p->prev, p->next); - } else { - p = last->d_child.prev; + /* we can calculate cache index for the first dirfrag */ + if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) { + cache_ctl.index = fpos_off(ctx->pos) - 2; + BUG_ON(cache_ctl.index < 0); + ptr_pos = cache_ctl.index * sizeof(struct dentry *); } -more: - dentry = list_entry(p, struct dentry, d_child); - di = ceph_dentry(dentry); - while (1) { - dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next, - d_unhashed(dentry) ? "!hashed" : "hashed", - parent->d_subdirs.prev, parent->d_subdirs.next); - if (p == &parent->d_subdirs) { + while (true) { + pgoff_t pgoff; + bool emit_dentry; + + if (ptr_pos >= i_size_read(dir)) { fi->flags |= CEPH_F_ATEND; - goto out_unlock; + err = 0; + break; } - spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); + + err = -EAGAIN; + pgoff = ptr_pos >> PAGE_CACHE_SHIFT; + if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) { + ceph_readdir_cache_release(&cache_ctl); + cache_ctl.page = find_lock_page(&dir->i_data, pgoff); + if (!cache_ctl.page) { + dout(" page %lu not found\n", pgoff); + break; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(cache_ctl.page); + cache_ctl.dentries = kmap(cache_ctl.page); + } + + rcu_read_lock(); + spin_lock(&parent->d_lock); + /* check i_size again here, because empty directory can be + * marked as complete while not holding the i_mutex. */ + if (ceph_dir_is_complete_ordered(dir) && + ptr_pos < i_size_read(dir)) + dentry = cache_ctl.dentries[cache_ctl.index % nsize]; + else + dentry = NULL; + spin_unlock(&parent->d_lock); + if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) + dentry = NULL; + rcu_read_unlock(); + if (!dentry) + break; + + emit_dentry = false; + di = ceph_dentry(dentry); + spin_lock(&dentry->d_lock); if (di->lease_shared_gen == shared_gen && - !d_unhashed(dentry) && d_really_is_positive(dentry) && + d_really_is_positive(dentry) && ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR && ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH && - fpos_cmp(ctx->pos, di->offset) <= 0) - break; - dout(" skipping %p %pd at %llu (%llu)%s%s\n", dentry, - dentry, di->offset, - ctx->pos, d_unhashed(dentry) ? " unhashed" : "", - !d_inode(dentry) ? " null" : ""); + fpos_cmp(ctx->pos, di->offset) <= 0) { + emit_dentry = true; + } spin_unlock(&dentry->d_lock); - p = p->prev; - dentry = list_entry(p, struct dentry, d_child); - di = ceph_dentry(dentry); - } - - dget_dlock(dentry); - spin_unlock(&dentry->d_lock); - spin_unlock(&parent->d_lock); - /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_is_complete_ordered(dir)) { - dout(" lost dir complete on %p; falling back to mds\n", dir); - dput(dentry); - err = -EAGAIN; - goto out; - } + if (emit_dentry) { + dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, + dentry, dentry, d_inode(dentry)); + ctx->pos = di->offset; + if (!dir_emit(ctx, dentry->d_name.name, + dentry->d_name.len, + ceph_translate_ino(dentry->d_sb, + d_inode(dentry)->i_ino), + d_inode(dentry)->i_mode >> 12)) { + dput(dentry); + err = 0; + break; + } + ctx->pos++; - dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, - dentry, dentry, d_inode(dentry)); - if (!dir_emit(ctx, dentry->d_name.name, - dentry->d_name.len, - ceph_translate_ino(dentry->d_sb, d_inode(dentry)->i_ino), - d_inode(dentry)->i_mode >> 12)) { - if (last) { - /* remember our position */ - fi->dentry = last; - fi->next_offset = fpos_off(di->offset); + if (last) + dput(last); + last = dentry; + } else { + dput(dentry); } - dput(dentry); - return 0; - } - - ctx->pos = di->offset + 1; - - if (last) - dput(last); - last = dentry; - - spin_lock(&parent->d_lock); - p = p->prev; /* advance to next dentry */ - goto more; -out_unlock: - spin_unlock(&parent->d_lock); -out: - if (last) + cache_ctl.index++; + ptr_pos += sizeof(struct dentry *); + } + ceph_readdir_cache_release(&cache_ctl); + if (last) { + int ret; + di = ceph_dentry(last); + ret = note_last_dentry(fi, last->d_name.name, last->d_name.len, + fpos_off(di->offset) + 1); + if (ret < 0) + err = ret; dput(last); + } return err; } -/* - * make note of the last dentry we read, so we can - * continue at the same lexicographical point, - * regardless of what dir changes take place on the - * server. - */ -static int note_last_dentry(struct ceph_file_info *fi, const char *name, - int len) -{ - kfree(fi->last_name); - fi->last_name = kmalloc(len+1, GFP_KERNEL); - if (!fi->last_name) - return -ENOMEM; - memcpy(fi->last_name, name, len); - fi->last_name[len] = 0; - dout("note_last_dentry '%s'\n", fi->last_name); - return 0; -} - static int ceph_readdir(struct file *file, struct dir_context *ctx) { struct ceph_file_info *fi = file->private_data; @@ -280,8 +291,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* can we use the dcache? */ spin_lock(&ci->i_ceph_lock); - if ((ctx->pos == 2 || fi->dentry) && - ceph_test_mount_opt(fsc, DCACHE) && + if (ceph_test_mount_opt(fsc, DCACHE) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete_ordered(ci) && @@ -296,24 +306,8 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) } else { spin_unlock(&ci->i_ceph_lock); } - if (fi->dentry) { - err = note_last_dentry(fi, fi->dentry->d_name.name, - fi->dentry->d_name.len); - if (err) - return err; - dput(fi->dentry); - fi->dentry = NULL; - } /* proceed with a normal readdir */ - - if (ctx->pos == 2) { - /* note dir version at start of readdir so we can tell - * if any dentries get dropped */ - fi->dir_release_count = atomic_read(&ci->i_release_count); - fi->dir_ordered_count = ci->i_ordered_count; - } - more: /* do we have the correct frag content buffered? */ if (fi->frag != frag || fi->last_readdir == NULL) { @@ -348,6 +342,9 @@ more: return -ENOMEM; } } + req->r_dir_release_cnt = fi->dir_release_count; + req->r_dir_ordered_cnt = fi->dir_ordered_count; + req->r_readdir_cache_idx = fi->readdir_cache_idx; req->r_readdir_offset = fi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); @@ -364,26 +361,38 @@ more: (int)req->r_reply_info.dir_end, (int)req->r_reply_info.dir_complete); - if (!req->r_did_prepopulate) { - dout("readdir !did_prepopulate"); - /* preclude from marking dir complete */ - fi->dir_release_count--; - } /* note next offset and last dentry name */ rinfo = &req->r_reply_info; if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { frag = le32_to_cpu(rinfo->dir_dir->frag); - if (ceph_frag_is_leftmost(frag)) - fi->next_offset = 2; - else - fi->next_offset = 0; - off = fi->next_offset; + off = req->r_readdir_offset; + fi->next_offset = off; } + fi->frag = frag; fi->offset = fi->next_offset; fi->last_readdir = req; + if (req->r_did_prepopulate) { + fi->readdir_cache_idx = req->r_readdir_cache_idx; + if (fi->readdir_cache_idx < 0) { + /* preclude from marking dir ordered */ + fi->dir_ordered_count = 0; + } else if (ceph_frag_is_leftmost(frag) && off == 2) { + /* note dir version at start of readdir so + * we can tell if any dentries get dropped */ + fi->dir_release_count = req->r_dir_release_cnt; + fi->dir_ordered_count = req->r_dir_ordered_cnt; + } + } else { + dout("readdir !did_prepopulate"); + /* disable readdir cache */ + fi->readdir_cache_idx = -1; + /* preclude from marking dir complete */ + fi->dir_release_count = 0; + } + if (req->r_reply_info.dir_end) { kfree(fi->last_name); fi->last_name = NULL; @@ -394,10 +403,10 @@ more: } else { err = note_last_dentry(fi, rinfo->dir_dname[rinfo->dir_nr-1], - rinfo->dir_dname_len[rinfo->dir_nr-1]); + rinfo->dir_dname_len[rinfo->dir_nr-1], + fi->next_offset + rinfo->dir_nr); if (err) return err; - fi->next_offset += rinfo->dir_nr; } } @@ -453,16 +462,22 @@ more: * were released during the whole readdir, and we should have * the complete dir contents in our cache. */ - spin_lock(&ci->i_ceph_lock); - if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { - if (ci->i_ordered_count == fi->dir_ordered_count) + if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) { + spin_lock(&ci->i_ceph_lock); + if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) { dout(" marking %p complete and ordered\n", inode); - else + /* use i_size to track number of entries in + * readdir cache */ + BUG_ON(fi->readdir_cache_idx < 0); + i_size_write(inode, fi->readdir_cache_idx * + sizeof(struct dentry*)); + } else { dout(" marking %p complete\n", inode); + } __ceph_dir_set_complete(ci, fi->dir_release_count, fi->dir_ordered_count); + spin_unlock(&ci->i_ceph_lock); } - spin_unlock(&ci->i_ceph_lock); dout("readdir %p file %p done.\n", inode, file); return 0; @@ -476,14 +491,12 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag) } kfree(fi->last_name); fi->last_name = NULL; + fi->dir_release_count = 0; + fi->readdir_cache_idx = -1; if (ceph_frag_is_leftmost(frag)) fi->next_offset = 2; /* compensate for . and .. */ else fi->next_offset = 0; - if (fi->dentry) { - dput(fi->dentry); - fi->dentry = NULL; - } fi->flags &= ~CEPH_F_ATEND; } @@ -497,13 +510,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) mutex_lock(&inode->i_mutex); retval = -EINVAL; switch (whence) { - case SEEK_END: - offset += inode->i_size + 2; /* FIXME */ - break; case SEEK_CUR: offset += file->f_pos; case SEEK_SET: break; + case SEEK_END: + retval = -EOPNOTSUPP; default: goto out; } @@ -516,20 +528,18 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) } retval = offset; - /* - * discard buffered readdir content on seekdir(0), or - * seek to new frag, or seek prior to current chunk. - */ if (offset == 0 || fpos_frag(offset) != fi->frag || fpos_off(offset) < fi->offset) { + /* discard buffered readdir content on seekdir(0), or + * seek to new frag, or seek prior to current chunk */ dout("dir_llseek dropping %p content\n", file); reset_readdir(fi, fpos_frag(offset)); + } else if (fpos_cmp(offset, old_offset) > 0) { + /* reset dir_release_count if we did a forward seek */ + fi->dir_release_count = 0; + fi->readdir_cache_idx = -1; } - - /* bump dir_release_count if we did a forward seek */ - if (fpos_cmp(offset, old_offset) > 0) - fi->dir_release_count--; } out: mutex_unlock(&inode->i_mutex); @@ -985,16 +995,15 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, * to do it here. */ + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_complete(old_dir); + ceph_dir_clear_complete(new_dir); + d_move(old_dentry, new_dentry); /* ensure target dentry is invalidated, despite rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(new_dentry); - - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(old_dir); - ceph_dir_clear_complete(new_dir); - } ceph_mdsc_put_request(req); return err; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 424b5b540207..faf92095e105 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -96,6 +96,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) } cf->fmode = fmode; cf->next_offset = 2; + cf->readdir_cache_idx = -1; file->private_data = cf; BUG_ON(inode->i_fop->release != ceph_release); break; @@ -324,7 +325,6 @@ int ceph_release(struct inode *inode, struct file *file) ceph_mdsc_put_request(cf->last_readdir); kfree(cf->last_name); kfree(cf->dir_info); - dput(cf->dentry); kmem_cache_free(ceph_file_cachep, cf); /* wake up anyone waiting for caps on this inode */ diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e86d1a4efc46..2a6d93befbae 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -390,9 +390,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_inline_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; - ci->i_ordered_count = 0; - atomic_set(&ci->i_release_count, 1); - atomic_set(&ci->i_complete_count, 0); + atomic64_set(&ci->i_ordered_count, 1); + atomic64_set(&ci->i_release_count, 1); + atomic64_set(&ci->i_complete_seq[0], 0); + atomic64_set(&ci->i_complete_seq[1], 0); ci->i_symlink = NULL; memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); @@ -860,9 +861,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); + i_size_write(inode, 0); __ceph_dir_set_complete(ci, - atomic_read(&ci->i_release_count), - ci->i_ordered_count); + atomic64_read(&ci->i_release_count), + atomic64_read(&ci->i_ordered_count)); } wake = true; @@ -1214,6 +1216,10 @@ retry_lookup: dout("fill_trace doing d_move %p -> %p\n", req->r_old_dentry, dn); + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); + d_move(req->r_old_dentry, dn); dout(" src %p '%pd' dst %p '%pd'\n", req->r_old_dentry, @@ -1224,10 +1230,6 @@ retry_lookup: rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(dn); - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_ordered(dir); - ceph_dir_clear_ordered(olddir); - dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1335,6 +1337,49 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, return err; } +void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl) +{ + if (ctl->page) { + kunmap(ctl->page); + page_cache_release(ctl->page); + ctl->page = NULL; + } +} + +static int fill_readdir_cache(struct inode *dir, struct dentry *dn, + struct ceph_readdir_cache_control *ctl, + struct ceph_mds_request *req) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*); + unsigned idx = ctl->index % nsize; + pgoff_t pgoff = ctl->index / nsize; + + if (!ctl->page || pgoff != page_index(ctl->page)) { + ceph_readdir_cache_release(ctl); + ctl->page = grab_cache_page(&dir->i_data, pgoff); + if (!ctl->page) { + ctl->index = -1; + return -ENOMEM; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(ctl->page); + ctl->dentries = kmap(ctl->page); + } + + if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) && + req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) { + dout("readdir cache dn %p idx %d\n", dn, ctl->index); + ctl->dentries[idx] = dn; + ctl->index++; + } else { + dout("disable readdir cache\n"); + ctl->index = -1; + } + return 0; +} + int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct ceph_mds_session *session) { @@ -1347,8 +1392,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct inode *snapdir = NULL; struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; struct ceph_dentry_info *di; - u64 r_readdir_offset = req->r_readdir_offset; u32 frag = le32_to_cpu(rhead->args.readdir.frag); + struct ceph_readdir_cache_control cache_ctl = {}; + + if (req->r_aborted) + return readdir_prepopulate_inodes_only(req, session); if (rinfo->dir_dir && le32_to_cpu(rinfo->dir_dir->frag) != frag) { @@ -1356,14 +1404,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, frag, le32_to_cpu(rinfo->dir_dir->frag)); frag = le32_to_cpu(rinfo->dir_dir->frag); if (ceph_frag_is_leftmost(frag)) - r_readdir_offset = 2; + req->r_readdir_offset = 2; else - r_readdir_offset = 0; + req->r_readdir_offset = 0; } - if (req->r_aborted) - return readdir_prepopulate_inodes_only(req, session); - if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { snapdir = ceph_get_snapdir(d_inode(parent)); parent = d_find_alias(snapdir); @@ -1376,6 +1421,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir); } + if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + struct ceph_inode_info *ci = ceph_inode(d_inode(parent)); + req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); + req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count); + req->r_readdir_cache_idx = 0; + } + + cache_ctl.index = req->r_readdir_cache_idx; + /* FIXME: release caps/leases if error occurs */ for (i = 0; i < rinfo->dir_nr; i++) { struct ceph_vino vino; @@ -1415,13 +1471,6 @@ retry_lookup: d_delete(dn); dput(dn); goto retry_lookup; - } else { - /* reorder parent's d_subdirs */ - spin_lock(&parent->d_lock); - spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED); - list_move(&dn->d_child, &parent->d_subdirs); - spin_unlock(&dn->d_lock); - spin_unlock(&parent->d_lock); } /* inode */ @@ -1438,13 +1487,15 @@ retry_lookup: } } - if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, - req->r_request_started, -1, - &req->r_caps_reservation) < 0) { + ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, + req->r_request_started, -1, + &req->r_caps_reservation); + if (ret < 0) { pr_err("fill_inode badness on %p\n", in); if (d_really_is_negative(dn)) iput(in); d_drop(dn); + err = ret; goto next_item; } @@ -1460,19 +1511,28 @@ retry_lookup: } di = dn->d_fsdata; - di->offset = ceph_make_fpos(frag, i + r_readdir_offset); + di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); update_dentry_lease(dn, rinfo->dir_dlease[i], req->r_session, req->r_request_started); + + if (err == 0 && cache_ctl.index >= 0) { + ret = fill_readdir_cache(d_inode(parent), dn, + &cache_ctl, req); + if (ret < 0) + err = ret; + } next_item: if (dn) dput(dn); } - if (err == 0) - req->r_did_prepopulate = true; - out: + if (err == 0) { + req->r_did_prepopulate = true; + req->r_readdir_cache_idx = cache_ctl.index; + } + ceph_readdir_cache_release(&cache_ctl); if (snapdir) { iput(snapdir); dput(parent); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 470be4eb25f3..762757e6cebf 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -253,6 +253,9 @@ struct ceph_mds_request { bool r_got_unsafe, r_got_safe, r_got_result; bool r_did_prepopulate; + long long r_dir_release_cnt; + long long r_dir_ordered_cnt; + int r_readdir_cache_idx; u32 r_readdir_offset; struct ceph_cap_reservation r_caps_reservation; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 4415e977d72b..860cc016e70d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -282,9 +282,9 @@ struct ceph_inode_info { u32 i_time_warp_seq; unsigned i_ceph_flags; - int i_ordered_count; - atomic_t i_release_count; - atomic_t i_complete_count; + atomic64_t i_release_count; + atomic64_t i_ordered_count; + atomic64_t i_complete_seq[2]; struct ceph_dir_layout i_dir_layout; struct ceph_file_layout i_layout; @@ -471,30 +471,36 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, - int release_count, int ordered_count) + long long release_count, + long long ordered_count) { - atomic_set(&ci->i_complete_count, release_count); - if (ci->i_ordered_count == ordered_count) - ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; - else - ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; + smp_mb__before_atomic(); + atomic64_set(&ci->i_complete_seq[0], release_count); + atomic64_set(&ci->i_complete_seq[1], ordered_count); } static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) { - atomic_inc(&ci->i_release_count); + atomic64_inc(&ci->i_release_count); +} + +static inline void __ceph_dir_clear_ordered(struct ceph_inode_info *ci) +{ + atomic64_inc(&ci->i_ordered_count); } static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) { - return atomic_read(&ci->i_complete_count) == - atomic_read(&ci->i_release_count); + return atomic64_read(&ci->i_complete_seq[0]) == + atomic64_read(&ci->i_release_count); } static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) { - return __ceph_dir_is_complete(ci) && - (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); + return atomic64_read(&ci->i_complete_seq[0]) == + atomic64_read(&ci->i_release_count) && + atomic64_read(&ci->i_complete_seq[1]) == + atomic64_read(&ci->i_ordered_count); } static inline void ceph_dir_clear_complete(struct inode *inode) @@ -504,20 +510,13 @@ static inline void ceph_dir_clear_complete(struct inode *inode) static inline void ceph_dir_clear_ordered(struct inode *inode) { - struct ceph_inode_info *ci = ceph_inode(inode); - spin_lock(&ci->i_ceph_lock); - ci->i_ordered_count++; - ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; - spin_unlock(&ci->i_ceph_lock); + __ceph_dir_clear_ordered(ceph_inode(inode)); } static inline bool ceph_dir_is_complete_ordered(struct inode *inode) { - struct ceph_inode_info *ci = ceph_inode(inode); - bool ret; - spin_lock(&ci->i_ceph_lock); - ret = __ceph_dir_is_complete_ordered(ci); - spin_unlock(&ci->i_ceph_lock); + bool ret = __ceph_dir_is_complete_ordered(ceph_inode(inode)); + smp_rmb(); return ret; } @@ -636,16 +635,20 @@ struct ceph_file_info { unsigned offset; /* offset of last chunk, adjusted for . and .. */ unsigned next_offset; /* offset of next chunk (last_name's + 1) */ char *last_name; /* last entry in previous chunk */ - struct dentry *dentry; /* next dentry (for dcache readdir) */ - int dir_release_count; - int dir_ordered_count; + long long dir_release_count; + long long dir_ordered_count; + int readdir_cache_idx; /* used for -o dirstat read() on directory thing */ char *dir_info; int dir_info_len; }; - +struct ceph_readdir_cache_control { + struct page *page; + struct dentry **dentries; + int index; +}; /* * A "snap realm" describes a subset of the file hierarchy sharing @@ -944,6 +947,7 @@ extern void ceph_dentry_lru_del(struct dentry *dn); extern void ceph_invalidate_dentry_lease(struct dentry *dentry); extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry); +extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl); /* * our d_ops vary depending on whether the inode is live, -- cgit v1.2.3