diff options
Diffstat (limited to 'fs/ceph/mds_client.c')
-rw-r--r-- | fs/ceph/mds_client.c | 472 |
1 files changed, 386 insertions, 86 deletions
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index bbbbddf71326..4a26862d7667 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -10,6 +10,7 @@ #include <linux/seq_file.h> #include <linux/ratelimit.h> #include <linux/bits.h> +#include <linux/ktime.h> #include "super.h" #include "mds_client.h" @@ -415,21 +416,121 @@ bad: return -EIO; } + +#if BITS_PER_LONG == 64 + +#define DELEGATED_INO_AVAILABLE xa_mk_value(1) + +static int ceph_parse_deleg_inos(void **p, void *end, + struct ceph_mds_session *s) +{ + u32 sets; + + ceph_decode_32_safe(p, end, sets, bad); + dout("got %u sets of delegated inodes\n", sets); + while (sets--) { + u64 start, len, ino; + + ceph_decode_64_safe(p, end, start, bad); + ceph_decode_64_safe(p, end, len, bad); + while (len--) { + int err = xa_insert(&s->s_delegated_inos, ino = start++, + DELEGATED_INO_AVAILABLE, + GFP_KERNEL); + if (!err) { + dout("added delegated inode 0x%llx\n", + start - 1); + } else if (err == -EBUSY) { + pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", + start - 1); + } else { + return err; + } + } + } + return 0; +bad: + return -EIO; +} + +u64 ceph_get_deleg_ino(struct ceph_mds_session *s) +{ + unsigned long ino; + void *val; + + xa_for_each(&s->s_delegated_inos, ino, val) { + val = xa_erase(&s->s_delegated_inos, ino); + if (val == DELEGATED_INO_AVAILABLE) + return ino; + } + return 0; +} + +int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) +{ + return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, + GFP_KERNEL); +} +#else /* BITS_PER_LONG == 64 */ +/* + * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just + * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top + * and bottom words? + */ +static int ceph_parse_deleg_inos(void **p, void *end, + struct ceph_mds_session *s) +{ + u32 sets; + + ceph_decode_32_safe(p, end, sets, bad); + if (sets) + ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); + return 0; +bad: + return -EIO; +} + +u64 ceph_get_deleg_ino(struct ceph_mds_session *s) +{ + return 0; +} + +int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) +{ + return 0; +} +#endif /* BITS_PER_LONG == 64 */ + /* * parse create results */ static int parse_reply_info_create(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - u64 features) + u64 features, struct ceph_mds_session *s) { + int ret; + if (features == (u64)-1 || (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { - /* Malformed reply? */ if (*p == end) { + /* Malformed reply? */ info->has_create_ino = false; - } else { + } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { + u8 struct_v, struct_compat; + u32 len; + info->has_create_ino = true; + ceph_decode_8_safe(p, end, struct_v, bad); + ceph_decode_8_safe(p, end, struct_compat, bad); + ceph_decode_32_safe(p, end, len, bad); + ceph_decode_64_safe(p, end, info->ino, bad); + ret = ceph_parse_deleg_inos(p, end, s); + if (ret) + return ret; + } else { + /* legacy */ ceph_decode_64_safe(p, end, info->ino, bad); + info->has_create_ino = true; } } else { if (*p != end) @@ -448,7 +549,7 @@ bad: */ static int parse_reply_info_extra(void **p, void *end, struct ceph_mds_reply_info_parsed *info, - u64 features) + u64 features, struct ceph_mds_session *s) { u32 op = le32_to_cpu(info->head->op); @@ -457,7 +558,7 @@ static int parse_reply_info_extra(void **p, void *end, else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) return parse_reply_info_readdir(p, end, info, features); else if (op == CEPH_MDS_OP_CREATE) - return parse_reply_info_create(p, end, info, features); + return parse_reply_info_create(p, end, info, features, s); else return -EIO; } @@ -465,7 +566,7 @@ static int parse_reply_info_extra(void **p, void *end, /* * parse entire mds reply */ -static int parse_reply_info(struct ceph_msg *msg, +static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, struct ceph_mds_reply_info_parsed *info, u64 features) { @@ -490,7 +591,7 @@ static int parse_reply_info(struct ceph_msg *msg, ceph_decode_32_safe(&p, end, len, bad); if (len > 0) { ceph_decode_need(&p, end, len, bad); - err = parse_reply_info_extra(&p, p+len, info, features); + err = parse_reply_info_extra(&p, p+len, info, features, s); if (err < 0) goto out_bad; } @@ -558,6 +659,8 @@ void ceph_put_mds_session(struct ceph_mds_session *s) if (refcount_dec_and_test(&s->s_ref)) { if (s->s_auth.authorizer) ceph_auth_destroy_authorizer(s->s_auth.authorizer); + WARN_ON(mutex_is_locked(&s->s_mutex)); + xa_destroy(&s->s_delegated_inos); kfree(s); } } @@ -645,12 +748,14 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, refcount_set(&s->s_ref, 1); INIT_LIST_HEAD(&s->s_waiting); INIT_LIST_HEAD(&s->s_unsafe); + xa_init(&s->s_delegated_inos); s->s_num_cap_releases = 0; s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); + INIT_LIST_HEAD(&s->s_cap_dirty); INIT_LIST_HEAD(&s->s_cap_flushing); mdsc->sessions[mds] = s; @@ -699,6 +804,7 @@ void ceph_mdsc_release_request(struct kref *kref) struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request, r_kref); + ceph_mdsc_release_dir_caps_no_check(req); destroy_reply_info(&req->r_reply_info); if (req->r_request) ceph_msg_put(req->r_request); @@ -736,7 +842,7 @@ void ceph_mdsc_release_request(struct kref *kref) put_request_session(req); ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); WARN_ON_ONCE(!list_empty(&req->r_wait)); - kfree(req); + kmem_cache_free(ceph_mds_request_cachep, req); } DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) @@ -793,8 +899,13 @@ static void __register_request(struct ceph_mds_client *mdsc, mdsc->oldest_tid = req->r_tid; if (dir) { + struct ceph_inode_info *ci = ceph_inode(dir); + ihold(dir); req->r_unsafe_dir = dir; + spin_lock(&ci->i_unsafe_lock); + list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); + spin_unlock(&ci->i_unsafe_lock); } } @@ -822,8 +933,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc, erase_request(&mdsc->request_tree, req); - if (req->r_unsafe_dir && - test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { + if (req->r_unsafe_dir) { struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); spin_lock(&ci->i_unsafe_lock); list_del_init(&req->r_unsafe_dir_item); @@ -993,8 +1103,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, frag.frag, mds); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= CEPH_MDS_STATE_ACTIVE) { - if (mode == USE_ANY_MDS && - !ceph_mdsmap_is_laggy(mdsc->mdsmap, + if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) goto out; } @@ -1058,7 +1167,7 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq) static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) -static void encode_supported_features(void **p, void *end) +static int encode_supported_features(void **p, void *end) { static const size_t count = ARRAY_SIZE(feature_bits); @@ -1066,16 +1175,64 @@ static void encode_supported_features(void **p, void *end) size_t i; size_t size = FEATURE_BYTES(count); - BUG_ON(*p + 4 + size > end); + if (WARN_ON_ONCE(*p + 4 + size > end)) + return -ERANGE; + ceph_encode_32(p, size); memset(*p, 0, size); for (i = 0; i < count; i++) ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); *p += size; } else { - BUG_ON(*p + 4 > end); + if (WARN_ON_ONCE(*p + 4 > end)) + return -ERANGE; + ceph_encode_32(p, 0); } + + return 0; +} + +static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; +#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) +static int encode_metric_spec(void **p, void *end) +{ + static const size_t count = ARRAY_SIZE(metric_bits); + + /* header */ + if (WARN_ON_ONCE(*p + 2 > end)) + return -ERANGE; + + ceph_encode_8(p, 1); /* version */ + ceph_encode_8(p, 1); /* compat */ + + if (count > 0) { + size_t i; + size_t size = METRIC_BYTES(count); + + if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) + return -ERANGE; + + /* metric spec info length */ + ceph_encode_32(p, 4 + size); + + /* metric spec */ + ceph_encode_32(p, size); + memset(*p, 0, size); + for (i = 0; i < count; i++) + ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); + *p += size; + } else { + if (WARN_ON_ONCE(*p + 4 + 4 > end)) + return -ERANGE; + + /* metric spec info length */ + ceph_encode_32(p, 4); + /* metric spec */ + ceph_encode_32(p, 0); + } + + return 0; } /* @@ -1093,6 +1250,7 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; size_t size, count; void *p, *end; + int ret; const char* metadata[][2] = { {"hostname", mdsc->nodename}, @@ -1117,12 +1275,19 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 size = FEATURE_BYTES(count); extra_bytes += 4 + size; + /* metric spec */ + size = 0; + count = ARRAY_SIZE(metric_bits); + if (count > 0) + size = METRIC_BYTES(count); + extra_bytes += 2 + 4 + 4 + size; + /* Allocate the message */ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, GFP_NOFS, false); if (!msg) { pr_err("create_session_msg ENOMEM creating msg\n"); - return NULL; + return ERR_PTR(-ENOMEM); } p = msg->front.iov_base; end = p + msg->front.iov_len; @@ -1135,9 +1300,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 * Serialize client metadata into waiting buffer space, using * the format that userspace expects for map<string, string> * - * ClientSession messages with metadata are v3 + * ClientSession messages with metadata are v4 */ - msg->hdr.version = cpu_to_le16(3); + msg->hdr.version = cpu_to_le16(4); msg->hdr.compat_version = cpu_to_le16(1); /* The write pointer, following the session_head structure */ @@ -1159,7 +1324,20 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 p += val_len; } - encode_supported_features(&p, end); + ret = encode_supported_features(&p, end); + if (ret) { + pr_err("encode_supported_features failed!\n"); + ceph_msg_put(msg); + return ERR_PTR(ret); + } + + ret = encode_metric_spec(&p, end); + if (ret) { + pr_err("encode_metric_spec failed!\n"); + ceph_msg_put(msg); + return ERR_PTR(ret); + } + msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); @@ -1187,8 +1365,8 @@ static int __open_session(struct ceph_mds_client *mdsc, /* send connect message */ msg = create_session_open_msg(mdsc, session->s_seq); - if (!msg) - return -ENOMEM; + if (IS_ERR(msg)) + return PTR_ERR(msg); ceph_con_send(&session->s_con, msg); return 0; } @@ -1202,6 +1380,7 @@ static struct ceph_mds_session * __open_export_target_session(struct ceph_mds_client *mdsc, int target) { struct ceph_mds_session *session; + int ret; session = __ceph_lookup_mds_session(mdsc, target); if (!session) { @@ -1210,8 +1389,11 @@ __open_export_target_session(struct ceph_mds_client *mdsc, int target) return session; } if (session->s_state == CEPH_MDS_SESSION_NEW || - session->s_state == CEPH_MDS_SESSION_CLOSING) - __open_session(mdsc, session); + session->s_state == CEPH_MDS_SESSION_CLOSING) { + ret = __open_session(mdsc, session); + if (ret) + return ERR_PTR(ret); + } return session; } @@ -1375,6 +1557,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; + atomic64_dec(&session->s_mdsc->metric.total_caps); if (cap->queue_release) __ceph_queue_cap_release(session, cap); else @@ -1407,8 +1590,6 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, dout("removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); spin_lock(&ci->i_ceph_lock); - if (cap->mds_wanted | cap->issued) - ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; __ceph_remove_cap(cap, false); if (!ci->i_auth_cap) { struct ceph_cap_flush *cf; @@ -1574,9 +1755,6 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, /* mds did not re-issue stale cap */ spin_lock(&ci->i_ceph_lock); cap->issued = cap->implemented = CEPH_CAP_PIN; - /* make sure mds knows what we want */ - if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted) - ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; spin_unlock(&ci->i_ceph_lock); } } else if (ev == FORCE_RO) { @@ -1680,8 +1858,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc, /* * send a session close request */ -static int request_close_session(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) +static int request_close_session(struct ceph_mds_session *session) { struct ceph_msg *msg; @@ -1704,7 +1881,7 @@ static int __close_session(struct ceph_mds_client *mdsc, if (session->s_state >= CEPH_MDS_SESSION_CLOSING) return 0; session->s_state = CEPH_MDS_SESSION_CLOSING; - return request_close_session(mdsc, session); + return request_close_session(session); } static bool drop_negative_children(struct dentry *dentry) @@ -1772,7 +1949,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) } /* The inode has cached pages, but it's no longer used. * we can safely drop it */ - if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && + if (S_ISREG(inode->i_mode) && + wanted == 0 && used == CEPH_CAP_FILE_CACHE && !(oissued & CEPH_CAP_FILE_CACHE)) { used = 0; oissued = 0; @@ -2089,14 +2267,16 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, struct ceph_mds_request * ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) { - struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); + struct ceph_mds_request *req; + req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); mutex_init(&req->r_fill_mutex); req->r_mdsc = mdsc; req->r_started = jiffies; + req->r_start_latency = ktime_get(); req->r_resend_mds = -1; INIT_LIST_HEAD(&req->r_unsafe_dir_item); INIT_LIST_HEAD(&req->r_unsafe_target_item); @@ -2368,7 +2548,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, head->op = cpu_to_le32(req->r_op); head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); - head->ino = 0; + head->ino = cpu_to_le64(req->r_deleg_ino); head->args = req->r_args; ceph_encode_filepath(&p, end, ino1, path1); @@ -2382,7 +2562,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, if (req->r_inode_drop) releases += ceph_encode_inode_release(&p, req->r_inode ? req->r_inode : d_inode(req->r_dentry), - mds, req->r_inode_drop, req->r_inode_unless, 0); + mds, req->r_inode_drop, req->r_inode_unless, + req->r_op == CEPH_MDS_OP_READDIR); if (req->r_dentry_drop) releases += ceph_encode_dentry_release(&p, req->r_dentry, req->r_parent, mds, req->r_dentry_drop, @@ -2411,7 +2592,12 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ceph_encode_copy(&p, &ts, sizeof(ts)); } - BUG_ON(p > end); + if (WARN_ON_ONCE(p > end)) { + ceph_msg_put(msg); + msg = ERR_PTR(-ERANGE); + goto out_free2; + } + msg->front.iov_len = p - msg->front.iov_base; msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); @@ -2442,6 +2628,8 @@ out: static void complete_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { + req->r_end_latency = ktime_get(); + if (req->r_callback) req->r_callback(mdsc, req); complete_all(&req->r_completion); @@ -2522,12 +2710,13 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_REPLAY; + if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) + flags |= CEPH_MDS_FLAG_ASYNC; if (req->r_parent) flags |= CEPH_MDS_FLAG_WANT_DENTRY; rhead->flags = cpu_to_le32(flags); rhead->num_fwd = req->r_num_fwd; rhead->num_retry = req->r_attempts - 1; - rhead->ino = 0; dout(" r_parent = %p\n", req->r_parent); return 0; @@ -2573,7 +2762,7 @@ static void __do_request(struct ceph_mds_client *mdsc, if (req->r_timeout && time_after_eq(jiffies, req->r_started + req->r_timeout)) { dout("do_request timed out\n"); - err = -EIO; + err = -ETIMEDOUT; goto finish; } if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { @@ -2605,6 +2794,10 @@ static void __do_request(struct ceph_mds_client *mdsc, mds = __choose_mds(mdsc, req, &random); if (mds < 0 || ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { + if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { + err = -EJUKEBOX; + goto finish; + } dout("do_request no mds or not active, waiting for map\n"); list_add(&req->r_wait, &mdsc->waiting_for_map); return; @@ -2629,9 +2822,20 @@ static void __do_request(struct ceph_mds_client *mdsc, err = -EACCES; goto out_session; } + /* + * We cannot queue async requests since the caps and delegated + * inodes are bound to the session. Just return -EJUKEBOX and + * let the caller retry a sync request in that case. + */ + if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { + err = -EJUKEBOX; + goto out_session; + } if (session->s_state == CEPH_MDS_SESSION_NEW || session->s_state == CEPH_MDS_SESSION_CLOSING) { - __open_session(mdsc, session); + err = __open_session(mdsc, session); + if (err) + goto out_session; /* retry the same mds later */ if (random) req->r_resend_mds = mds; @@ -2709,19 +2913,43 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, struct ceph_mds_request *req) { - int err; + int err = 0; /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ if (req->r_inode) ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); if (req->r_parent) { - ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); + struct ceph_inode_info *ci = ceph_inode(req->r_parent); + int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? + CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; + spin_lock(&ci->i_ceph_lock); + ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); + __ceph_touch_fmode(ci, mdsc, fmode); + spin_unlock(&ci->i_ceph_lock); ihold(req->r_parent); } if (req->r_old_dentry_dir) ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); + if (req->r_inode) { + err = ceph_wait_on_async_create(req->r_inode); + if (err) { + dout("%s: wait for async create returned: %d\n", + __func__, err); + return err; + } + } + + if (!err && req->r_old_inode) { + err = ceph_wait_on_async_create(req->r_old_inode); + if (err) { + dout("%s: wait for async create returned: %d\n", + __func__, err); + return err; + } + } + dout("submit_request on %p for inode %p\n", req, dir); mutex_lock(&mdsc->mutex); __register_request(mdsc, req, dir); @@ -2747,7 +2975,7 @@ static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, if (timeleft > 0) err = 0; else if (!timeleft) - err = -EIO; /* timed out */ + err = -ETIMEDOUT; /* timed out */ else err = timeleft; /* killed */ } @@ -2935,22 +3163,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } else { set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); - if (req->r_unsafe_dir) { - struct ceph_inode_info *ci = - ceph_inode(req->r_unsafe_dir); - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_dir_item, - &ci->i_unsafe_dirops); - spin_unlock(&ci->i_unsafe_lock); - } } dout("handle_reply tid %lld result %d\n", tid, result); rinfo = &req->r_reply_info; if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) - err = parse_reply_info(msg, rinfo, (u64)-1); + err = parse_reply_info(session, msg, rinfo, (u64)-1); else - err = parse_reply_info(msg, rinfo, session->s_con.peer_features); + err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); @@ -3020,6 +3240,9 @@ out_err: /* kick calling process */ complete_request(mdsc, req); + + ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, + req->r_end_latency, err); out: ceph_mdsc_put_request(req); return; @@ -3116,8 +3339,7 @@ static void handle_session(struct ceph_mds_session *session, void *end = p + msg->front.iov_len; struct ceph_mds_session_head *h; u32 op; - u64 seq; - unsigned long features = 0; + u64 seq, features = 0; int wake = 0; bool blacklisted = false; @@ -3136,9 +3358,10 @@ static void handle_session(struct ceph_mds_session *session, goto bad; /* version >= 3, feature bits */ ceph_decode_32_safe(&p, end, len, bad); - ceph_decode_need(&p, end, len, bad); - memcpy(&features, p, min_t(size_t, len, sizeof(features))); - p += len; + if (len) { + ceph_decode_64_safe(&p, end, features, bad); + p += len - sizeof(features); + } } mutex_lock(&mdsc->mutex); @@ -3168,6 +3391,8 @@ static void handle_session(struct ceph_mds_session *session, session->s_state = CEPH_MDS_SESSION_OPEN; session->s_features = features; renewed_caps(mdsc, session, 0); + if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) + metric_schedule_delayed(&mdsc->metric); wake = 1; if (mdsc->stopping) __close_session(mdsc, session); @@ -3249,6 +3474,29 @@ bad: return; } +void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) +{ + int dcaps; + + dcaps = xchg(&req->r_dir_caps, 0); + if (dcaps) { + dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); + ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); + } +} + +void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) +{ + int dcaps; + + dcaps = xchg(&req->r_dir_caps, 0); + if (dcaps) { + dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); + ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), + dcaps); + } +} + /* * called under session->mutex. */ @@ -3276,9 +3524,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, continue; if (req->r_attempts == 0) continue; /* only old requests */ - if (req->r_session && - req->r_session->s_mds == session->s_mds) - __send_request(mdsc, session, req, true); + if (!req->r_session) + continue; + if (req->r_session->s_mds != session->s_mds) + continue; + + ceph_mdsc_release_dir_caps_no_check(req); + + __send_request(mdsc, session, req, true); } mutex_unlock(&mdsc->mutex); } @@ -3362,7 +3615,7 @@ fail_msg: /* * Encode information about a cap for a reconnect with the MDS. */ -static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, +static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { union { @@ -3385,6 +3638,15 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, cap->mseq = 0; /* and migrate_seq */ cap->cap_gen = cap->session->s_cap_gen; + /* These are lost when the session goes away */ + if (S_ISDIR(inode->i_mode)) { + if (cap->issued & CEPH_CAP_DIR_CREATE) { + ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); + memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); + } + cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; + } + if (recon_state->msg_version >= 2) { rec.v2.cap_id = cpu_to_le64(cap->cap_id); rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); @@ -3602,8 +3864,6 @@ fail: * recovering MDS might have. * * This is a relatively heavyweight operation, but it's rare. - * - * called with mdsc->mutex held. */ static void send_mds_reconnect(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) @@ -3626,6 +3886,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, if (!reply) goto fail_nomsg; + xa_destroy(&session->s_delegated_inos); + mutex_lock(&session->s_mutex); session->s_state = CEPH_MDS_SESSION_RECONNECTING; session->s_seq = 0; @@ -3681,7 +3943,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, recon_state.msg_version = 2; } /* trsaverse this session's caps */ - err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state); + err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); spin_lock(&session->s_cap_lock); session->s_cap_reconnect = 0; @@ -3855,7 +4117,11 @@ static void check_new_map(struct ceph_mds_client *mdsc, oldstate != CEPH_MDS_STATE_STARTING) pr_info("mds%d recovery completed\n", s->s_mds); kick_requests(mdsc, i); + mutex_unlock(&mdsc->mutex); + mutex_lock(&s->s_mutex); + mutex_lock(&mdsc->mutex); ceph_kick_flushing_caps(mdsc, s); + mutex_unlock(&s->s_mutex); wake_up_session_caps(s, RECONNECT); } } @@ -4080,6 +4346,30 @@ static void maybe_recover_session(struct ceph_mds_client *mdsc) ceph_force_reconnect(fsc->sb); } +bool check_session_state(struct ceph_mds_session *s) +{ + if (s->s_state == CEPH_MDS_SESSION_CLOSING) { + dout("resending session close request for mds%d\n", + s->s_mds); + request_close_session(s); + return false; + } + if (s->s_ttl && time_after(jiffies, s->s_ttl)) { + if (s->s_state == CEPH_MDS_SESSION_OPEN) { + s->s_state = CEPH_MDS_SESSION_HUNG; + pr_info("mds%d hung\n", s->s_mds); + } + } + if (s->s_state == CEPH_MDS_SESSION_NEW || + s->s_state == CEPH_MDS_SESSION_RESTARTING || + s->s_state == CEPH_MDS_SESSION_CLOSED || + s->s_state == CEPH_MDS_SESSION_REJECTED) + /* this mds is failed or recovering, just wait */ + return false; + + return true; +} + /* * delayed work -- periodically trim expired leases, renew caps with mds */ @@ -4100,6 +4390,9 @@ static void delayed_work(struct work_struct *work) dout("mdsc delayed_work\n"); + if (mdsc->stopping) + return; + mutex_lock(&mdsc->mutex); renew_interval = mdsc->mdsmap->m_session_timeout >> 2; renew_caps = time_after_eq(jiffies, HZ*renew_interval + @@ -4111,23 +4404,8 @@ static void delayed_work(struct work_struct *work) struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); if (!s) continue; - if (s->s_state == CEPH_MDS_SESSION_CLOSING) { - dout("resending session close request for mds%d\n", - s->s_mds); - request_close_session(mdsc, s); - ceph_put_mds_session(s); - continue; - } - if (s->s_ttl && time_after(jiffies, s->s_ttl)) { - if (s->s_state == CEPH_MDS_SESSION_OPEN) { - s->s_state = CEPH_MDS_SESSION_HUNG; - pr_info("mds%d hung\n", s->s_mds); - } - } - if (s->s_state == CEPH_MDS_SESSION_NEW || - s->s_state == CEPH_MDS_SESSION_RESTARTING || - s->s_state == CEPH_MDS_SESSION_REJECTED) { - /* this mds is failed or recovering, just wait */ + + if (!check_session_state(s)) { ceph_put_mds_session(s); continue; } @@ -4163,6 +4441,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) { struct ceph_mds_client *mdsc; + int err; mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); if (!mdsc) @@ -4171,11 +4450,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) mutex_init(&mdsc->mutex); mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); if (!mdsc->mdsmap) { - kfree(mdsc); - return -ENOMEM; + err = -ENOMEM; + goto err_mdsc; } - fsc->mdsc = mdsc; init_completion(&mdsc->safe_umount_waiters); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); @@ -4204,13 +4482,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->snap_flush_lock); mdsc->last_cap_flush_tid = 1; INIT_LIST_HEAD(&mdsc->cap_flush_list); - INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; spin_lock_init(&mdsc->cap_dirty_lock); init_waitqueue_head(&mdsc->cap_flushing_wq); INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); atomic_set(&mdsc->cap_reclaim_pending, 0); + err = ceph_metric_init(&mdsc->metric); + if (err) + goto err_mdsmap; spin_lock_init(&mdsc->dentry_list_lock); INIT_LIST_HEAD(&mdsc->dentry_leases); @@ -4228,7 +4508,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) strscpy(mdsc->nodename, utsname()->nodename, sizeof(mdsc->nodename)); + + fsc->mdsc = mdsc; return 0; + +err_mdsmap: + kfree(mdsc->mdsmap); +err_mdsc: + kfree(mdsc); + return err; } /* @@ -4465,7 +4753,16 @@ void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) { dout("stop\n"); - cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ + /* + * Make sure the delayed work stopped before releasing + * the resources. + * + * Because the cancel_delayed_work_sync() will only + * guarantee that the work finishes executing. But the + * delayed work will re-arm itself again after that. + */ + flush_delayed_work(&mdsc->delayed_work); + if (mdsc->mdsmap) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); @@ -4486,6 +4783,9 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) ceph_mdsc_stop(mdsc); + ceph_metric_destroy(&mdsc->metric); + + flush_delayed_work(&mdsc->metric.delayed_work); fsc->mdsc = NULL; kfree(mdsc); dout("mdsc_destroy %p done\n", mdsc); |