From 663ae2cc04773608e1e741f693e41200fd4faf14 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 16 May 2016 13:18:57 +0200 Subject: rbd: get/put img_request in rbd_img_request_submit() By the time we get to checking for_each_obj_request_safe(img_request) terminating condition, all obj_requests may be complete and img_request ref, that rbd_img_request_submit() takes away from its caller, may be put. Moving the next_obj_request cursor is then a use-after-free on img_request. It's totally benign, as the value that's read is never used, but I think it's still worth fixing. Cc: Alex Elder Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 0ede6d7e2568..c3089f32a392 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2973,17 +2973,20 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request) { struct rbd_obj_request *obj_request; struct rbd_obj_request *next_obj_request; + int ret = 0; dout("%s: img %p\n", __func__, img_request); - for_each_obj_request_safe(img_request, obj_request, next_obj_request) { - int ret; + rbd_img_request_get(img_request); + for_each_obj_request_safe(img_request, obj_request, next_obj_request) { ret = rbd_img_obj_request_submit(obj_request); if (ret) - return ret; + goto out_put_ireq; } - return 0; +out_put_ireq: + rbd_img_request_put(img_request); + return ret; } static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) -- cgit v1.2.3 From 13d1ad16d05eebb4db977eb955716b9da2c19fbd Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 27 Apr 2016 14:15:51 +0200 Subject: libceph: move message allocation out of ceph_osdc_alloc_request() The size of ->r_request and ->r_reply messages depends on the size of the object name (ceph_object_id), while the size of ceph_osd_request is fixed. Move message allocation into a separate function that would have to be called after ceph_object_id and ceph_object_locator (which is also going to become variable in size with RADOS namespaces) have been filled in: req = ceph_osdc_alloc_request(...); r_base_oid> r_base_oloc> ceph_osdc_alloc_messages(req); Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 18 ++++++++- fs/ceph/addr.c | 8 ++++ fs/ceph/file.c | 7 ++++ include/linux/ceph/osd_client.h | 1 + net/ceph/osd_client.c | 88 +++++++++++++++++++++++------------------ 5 files changed, 82 insertions(+), 40 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c3089f32a392..bda4deade82e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1954,7 +1954,7 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); if (!osd_req) - return NULL; /* ENOMEM */ + goto fail; if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; @@ -1967,7 +1967,14 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); + if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) + goto fail; + return osd_req; + +fail: + ceph_osdc_put_request(osd_req); + return NULL; } /* @@ -2003,7 +2010,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops, false, GFP_NOIO); if (!osd_req) - return NULL; /* ENOMEM */ + goto fail; osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; osd_req->r_callback = rbd_osd_req_callback; @@ -2012,7 +2019,14 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); + if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) + goto fail; + return osd_req; + +fail: + ceph_osdc_put_request(osd_req); + return NULL; } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 3e61fc8bb371..6fee7e0b8931 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1762,6 +1762,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) "%llx.00000000", ci->i_vino.ino); rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); + err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); + if (err) + goto out_unlock; + wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 1, false, GFP_NOFS); if (!wr_req) { @@ -1775,6 +1779,10 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) wr_req->r_base_oloc.pool = pool; wr_req->r_base_oid = rd_req->r_base_oid; + err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); + if (err) + goto out_unlock; + /* one page should be large enough for STAT data */ pages = ceph_alloc_page_vector(1, GFP_KERNEL); if (IS_ERR(pages)) { diff --git a/fs/ceph/file.c b/fs/ceph/file.c index a79f9269831e..5d46d106bbb7 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -717,6 +717,13 @@ static void ceph_aio_retry_work(struct work_struct *work) req->r_base_oloc = orig_req->r_base_oloc; req->r_base_oid = orig_req->r_base_oid; + ret = ceph_osdc_alloc_messages(req, GFP_NOFS); + if (ret) { + ceph_osdc_put_request(req); + req = orig_req; + goto out; + } + req->r_ops[0] = orig_req->r_ops[0]; osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index cbf460927c42..66a1fcd5bff7 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -322,6 +322,7 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client * unsigned int num_ops, bool use_mempool, gfp_t gfp_flags); +int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp); extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, struct ceph_snap_context *snapc, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ccb9539dc780..d66dacc9d0d4 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -369,8 +369,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, gfp_t gfp_flags) { struct ceph_osd_request *req; - struct ceph_msg *msg; - size_t msg_size; if (use_mempool) { BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); @@ -407,53 +405,59 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, req->r_base_oloc.pool = -1; req->r_target_oloc.pool = -1; - msg_size = OSD_OPREPLY_FRONT_LEN; - if (num_ops > CEPH_OSD_SLAB_OPS) { - /* ceph_osd_op and rval */ - msg_size += (num_ops - CEPH_OSD_SLAB_OPS) * - (sizeof(struct ceph_osd_op) + 4); - } + dout("%s req %p\n", __func__, req); + return req; +} +EXPORT_SYMBOL(ceph_osdc_alloc_request); - /* create reply message */ - if (use_mempool) - msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); - else - msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, - gfp_flags, true); - if (!msg) { - ceph_osdc_put_request(req); - return NULL; - } - req->r_reply = msg; +int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) +{ + struct ceph_osd_client *osdc = req->r_osdc; + struct ceph_msg *msg; + int msg_size; + /* create request message */ msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pgid */ - msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ - msg_size += 2 + num_ops * sizeof(struct ceph_osd_op); + msg_size += 4 + req->r_base_oid.name_len; /* oid */ + msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); msg_size += 8; /* snapid */ msg_size += 8; /* snap_seq */ - msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ + msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); msg_size += 4; /* retry_attempt */ - /* create request message; allow space for oid */ - if (use_mempool) + if (req->r_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); - if (!msg) { - ceph_osdc_put_request(req); - return NULL; - } + msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); + if (!msg) + return -ENOMEM; memset(msg->front.iov_base, 0, msg->front.iov_len); - req->r_request = msg; - return req; + /* create reply message */ + msg_size = OSD_OPREPLY_FRONT_LEN; + if (req->r_num_ops > CEPH_OSD_SLAB_OPS) { + /* ceph_osd_op and rval */ + msg_size += (req->r_num_ops - CEPH_OSD_SLAB_OPS) * + (sizeof(struct ceph_osd_op) + 4); + } + + if (req->r_mempool) + msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); + else + msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); + if (!msg) + return -ENOMEM; + + req->r_reply = msg; + + return 0; } -EXPORT_SYMBOL(ceph_osdc_alloc_request); +EXPORT_SYMBOL(ceph_osdc_alloc_messages); static bool osd_req_opcode_valid(u16 opcode) { @@ -828,17 +832,17 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); - if (!req) - return ERR_PTR(-ENOMEM); + if (!req) { + r = -ENOMEM; + goto fail; + } req->r_flags = flags; /* calculate max write size */ r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); - if (r < 0) { - ceph_osdc_put_request(req); - return ERR_PTR(r); - } + if (r) + goto fail; if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { osd_req_op_init(req, which, opcode, 0); @@ -864,7 +868,15 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, "%llx.%08llx", vino.ino, objnum); req->r_base_oid.name_len = strlen(req->r_base_oid.name); + r = ceph_osdc_alloc_messages(req, GFP_NOFS); + if (r) + goto fail; + return req; + +fail: + ceph_osdc_put_request(req); + return ERR_PTR(r); } EXPORT_SYMBOL(ceph_osdc_new_request); -- cgit v1.2.3 From d30291b985d1854565d7f2c82a4457869d5265e8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 29 Apr 2016 19:54:20 +0200 Subject: libceph: variable-sized ceph_object_id Currently ceph_object_id can hold object names of up to 100 (CEPH_MAX_OID_NAME_LEN) characters. This is enough for all use cases, expect one - long rbd image names: - a format 1 header is named ".rbd" - an object that points to a format 2 header is named "rbd_id." We operate on these potentially long-named objects during rbd map, and, for format 1 images, during header refresh. (A format 2 header name is a small system-generated string.) Lift this 100 character limit by making ceph_object_id be able to point to an externally-allocated string. Apart from being able to work with almost arbitrarily-long named objects, this allows us to reduce the size of ceph_object_id from >100 bytes to 64 bytes. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 8 +++- fs/ceph/addr.c | 6 +-- fs/ceph/file.c | 2 +- fs/ceph/ioctl.c | 2 +- include/linux/ceph/osdmap.h | 62 ++++++++++++++++++------------ net/ceph/debugfs.c | 2 +- net/ceph/osd_client.c | 16 +++++--- net/ceph/osdmap.c | 93 ++++++++++++++++++++++++++++++++++++++++++++- 8 files changed, 150 insertions(+), 41 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index bda4deade82e..3bf93a2a20f0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1965,7 +1965,9 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req->r_priv = obj_request; osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); - ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); + if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", + obj_request->object_name)) + goto fail; if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) goto fail; @@ -2017,7 +2019,9 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) osd_req->r_priv = obj_request; osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); - ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name); + if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", + obj_request->object_name)) + goto fail; if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) goto fail; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 6fee7e0b8931..6f28dd9bacb2 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1758,9 +1758,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) rd_req->r_flags = CEPH_OSD_FLAG_READ; osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); rd_req->r_base_oloc.pool = pool; - snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name), - "%llx.00000000", ci->i_vino.ino); - rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); + ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); if (err) @@ -1777,7 +1775,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); wr_req->r_base_oloc.pool = pool; - wr_req->r_base_oid = rd_req->r_base_oid; + ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); if (err) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 5d46d106bbb7..9d470397e249 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -715,7 +715,7 @@ static void ceph_aio_retry_work(struct work_struct *work) CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE; req->r_base_oloc = orig_req->r_base_oloc; - req->r_base_oid = orig_req->r_base_oid; + ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); ret = ceph_osdc_alloc_messages(req, GFP_NOFS); if (ret) { diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index f851d8d70158..db296709784a 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -213,7 +213,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg) ceph_ino(inode), dl.object_no); oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); - ceph_oid_set_name(&oid, dl.object_name); + ceph_oid_printf(&oid, "%s", dl.object_name); r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid); if (r < 0) { diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index e55c08bc3a96..777a29412706 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -64,11 +64,47 @@ struct ceph_object_locator { */ #define CEPH_MAX_OID_NAME_LEN 100 +/* + * 51-char inline_name is long enough for all cephfs and all but one + * rbd requests: in ".rbd"/"rbd_id." can be + * arbitrarily long (~PAGE_SIZE). It's done once during rbd map; all + * other rbd requests fit into inline_name. + * + * Makes ceph_object_id 64 bytes on 64-bit. + */ +#define CEPH_OID_INLINE_LEN 52 + +/* + * Both inline and external buffers have space for a NUL-terminator, + * which is carried around. It's not required though - RADOS object + * names don't have to be NUL-terminated and may contain NULs. + */ struct ceph_object_id { - char name[CEPH_MAX_OID_NAME_LEN]; + char *name; + char inline_name[CEPH_OID_INLINE_LEN]; int name_len; }; +static inline void ceph_oid_init(struct ceph_object_id *oid) +{ + oid->name = oid->inline_name; + oid->name_len = 0; +} + +static inline bool ceph_oid_empty(const struct ceph_object_id *oid) +{ + return oid->name == oid->inline_name && !oid->name_len; +} + +void ceph_oid_copy(struct ceph_object_id *dest, + const struct ceph_object_id *src); +__printf(2, 3) +void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...); +__printf(3, 4) +int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, + const char *fmt, ...); +void ceph_oid_destroy(struct ceph_object_id *oid); + struct ceph_pg_mapping { struct rb_node node; struct ceph_pg pgid; @@ -113,30 +149,6 @@ struct ceph_osdmap { int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3]; }; -static inline void ceph_oid_set_name(struct ceph_object_id *oid, - const char *name) -{ - int len; - - len = strlen(name); - if (len > sizeof(oid->name)) { - WARN(1, "ceph_oid_set_name '%s' len %d vs %zu, truncating\n", - name, len, sizeof(oid->name)); - len = sizeof(oid->name); - } - - memcpy(oid->name, name, len); - oid->name_len = len; -} - -static inline void ceph_oid_copy(struct ceph_object_id *dest, - struct ceph_object_id *src) -{ - BUG_ON(src->name_len > sizeof(dest->name)); - memcpy(dest->name, src->name, src->name_len); - dest->name_len = src->name_len; -} - static inline int ceph_osd_exists(struct ceph_osdmap *map, int osd) { return osd >= 0 && osd < map->max_osd && diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index b902fbc7863e..6f8413293d15 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -161,7 +161,7 @@ static int osdc_show(struct seq_file *s, void *pp) req->r_osd ? req->r_osd->o_osd : -1, req->r_pgid.pool, req->r_pgid.seed); - seq_printf(s, "%.*s", req->r_base_oid.name_len, + seq_printf(s, "%*pE", req->r_base_oid.name_len, req->r_base_oid.name); if (req->r_reassert_version.epoch) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 75e27bd3d372..95910aed8e2e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -334,7 +334,10 @@ static void ceph_osdc_release_request(struct kref *kref) for (which = 0; which < req->r_num_ops; which++) osd_req_op_data_release(req, which); + ceph_oid_destroy(&req->r_base_oid); + ceph_oid_destroy(&req->r_target_oid); ceph_put_snap_context(req->r_snapc); + if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) @@ -401,7 +404,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); + ceph_oid_init(&req->r_base_oid); req->r_base_oloc.pool = -1; + ceph_oid_init(&req->r_target_oid); req->r_target_oloc.pool = -1; dout("%s req %p\n", __func__, req); @@ -415,6 +420,8 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) struct ceph_msg *msg; int msg_size; + WARN_ON(ceph_oid_empty(&req->r_base_oid)); + /* create request message */ msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ @@ -859,10 +866,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); - - snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), - "%llx.%08llx", vino.ino, objnum); - req->r_base_oid.name_len = strlen(req->r_base_oid.name); + ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); r = ceph_osdc_alloc_messages(req, GFP_NOFS); if (r) @@ -1410,7 +1414,7 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap, req->r_target_oloc = req->r_base_oloc; /* struct */ need_check_tiering = true; } - if (req->r_target_oid.name_len == 0) { + if (ceph_oid_empty(&req->r_target_oid)) { ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); need_check_tiering = true; } @@ -2501,7 +2505,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, /* oid */ ceph_encode_32(&p, req->r_base_oid.name_len); memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); - dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, + dout("oid %*pE len %d\n", req->r_base_oid.name_len, req->r_base_oid.name, req->r_base_oid.name_len); p += req->r_base_oid.name_len; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 243574c8cf33..4668b871ca47 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -1381,8 +1381,99 @@ bad: return ERR_PTR(err); } +void ceph_oid_copy(struct ceph_object_id *dest, + const struct ceph_object_id *src) +{ + WARN_ON(!ceph_oid_empty(dest)); + + if (src->name != src->inline_name) { + /* very rare, see ceph_object_id definition */ + dest->name = kmalloc(src->name_len + 1, + GFP_NOIO | __GFP_NOFAIL); + } + memcpy(dest->name, src->name, src->name_len + 1); + dest->name_len = src->name_len; +} +EXPORT_SYMBOL(ceph_oid_copy); +static __printf(2, 0) +int oid_printf_vargs(struct ceph_object_id *oid, const char *fmt, va_list ap) +{ + int len; + + WARN_ON(!ceph_oid_empty(oid)); + + len = vsnprintf(oid->inline_name, sizeof(oid->inline_name), fmt, ap); + if (len >= sizeof(oid->inline_name)) + return len; + + oid->name_len = len; + return 0; +} + +/* + * If oid doesn't fit into inline buffer, BUG. + */ +void ceph_oid_printf(struct ceph_object_id *oid, const char *fmt, ...) +{ + va_list ap; + + va_start(ap, fmt); + BUG_ON(oid_printf_vargs(oid, fmt, ap)); + va_end(ap); +} +EXPORT_SYMBOL(ceph_oid_printf); + +static __printf(3, 0) +int oid_aprintf_vargs(struct ceph_object_id *oid, gfp_t gfp, + const char *fmt, va_list ap) +{ + va_list aq; + int len; + + va_copy(aq, ap); + len = oid_printf_vargs(oid, fmt, aq); + va_end(aq); + + if (len) { + char *external_name; + + external_name = kmalloc(len + 1, gfp); + if (!external_name) + return -ENOMEM; + + oid->name = external_name; + WARN_ON(vsnprintf(oid->name, len + 1, fmt, ap) != len); + oid->name_len = len; + } + + return 0; +} + +/* + * If oid doesn't fit into inline buffer, allocate. + */ +int ceph_oid_aprintf(struct ceph_object_id *oid, gfp_t gfp, + const char *fmt, ...) +{ + va_list ap; + int ret; + + va_start(ap, fmt); + ret = oid_aprintf_vargs(oid, gfp, fmt, ap); + va_end(ap); + + return ret; +} +EXPORT_SYMBOL(ceph_oid_aprintf); + +void ceph_oid_destroy(struct ceph_object_id *oid) +{ + if (oid->name != oid->inline_name) + kfree(oid->name); +} +EXPORT_SYMBOL(ceph_oid_destroy); /* * calculate file layout from given offset, length. @@ -1474,7 +1565,7 @@ int ceph_oloc_oid_to_pg(struct ceph_osdmap *osdmap, pg_out->seed = ceph_str_hash(pi->object_hash, oid->name, oid->name_len); - dout("%s '%.*s' pgid %llu.%x\n", __func__, oid->name_len, oid->name, + dout("%s %*pE pgid %llu.%x\n", __func__, oid->name_len, oid->name, pg_out->pool, pg_out->seed); return 0; } -- cgit v1.2.3 From c41d13a31fefed303f734c0c5106f6dcd262168e Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 29 Apr 2016 20:01:25 +0200 Subject: rbd: use header_oid instead of header_name Switch to ceph_object_id and use ceph_oid_aprintf() instead of a bare const char *. This reduces noise in rbd_dev_header_name(). Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 57 ++++++++++++++++++++++------------------------------- 1 file changed, 24 insertions(+), 33 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3bf93a2a20f0..f3ea927f93de 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -350,7 +350,7 @@ struct rbd_device { struct rbd_spec *spec; struct rbd_options *opts; - char *header_name; + struct ceph_object_id header_oid; struct ceph_file_layout layout; @@ -3117,7 +3117,7 @@ static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; - obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0, OBJ_REQUEST_NODATA); if (!obj_request) return -ENOMEM; @@ -3148,7 +3148,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) int ret; dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, - rbd_dev->header_name, (unsigned long long)notify_id, + rbd_dev->header_oid.name, (unsigned long long)notify_id, (unsigned int)opcode); /* @@ -3179,7 +3179,7 @@ static struct rbd_obj_request *rbd_obj_watch_request_helper( struct rbd_obj_request *obj_request; int ret; - obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, + obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0, OBJ_REQUEST_NODATA); if (!obj_request) return ERR_PTR(-ENOMEM); @@ -3612,7 +3612,7 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) if (!ondisk) return -ENOMEM; - ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name, 0, size, ondisk); if (ret < 0) goto out; @@ -4054,6 +4054,8 @@ static void rbd_dev_release(struct device *dev) struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); bool need_put = !!rbd_dev->opts; + ceph_oid_destroy(&rbd_dev->header_oid); + rbd_put_client(rbd_dev->rbd_client); rbd_spec_put(rbd_dev->spec); kfree(rbd_dev->opts); @@ -4084,6 +4086,8 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); + ceph_oid_init(&rbd_dev->header_oid); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -4132,7 +4136,7 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, __le64 size; } __attribute__ ((packed)) size_buf = { 0 }; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_size", &snapid, sizeof (snapid), &size_buf, sizeof (size_buf)); @@ -4172,7 +4176,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_object_prefix", NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); @@ -4207,7 +4211,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 unsup; int ret; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_features", &snapid, sizeof (snapid), &features_buf, sizeof (features_buf)); @@ -4269,7 +4273,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) } snapid = cpu_to_le64(rbd_dev->spec->snap_id); - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_parent", &snapid, sizeof (snapid), reply_buf, size); @@ -4372,7 +4376,7 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) u64 stripe_count; int ret; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_stripe_unit_count", NULL, 0, (char *)&striping_info_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); @@ -4620,7 +4624,7 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_snapcontext", NULL, 0, reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); @@ -4685,7 +4689,7 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, return ERR_PTR(-ENOMEM); snapid = cpu_to_le64(snap_id); - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, + ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, "rbd", "get_snapshot_name", &snapid, sizeof (snapid), reply_buf, size); @@ -5281,35 +5285,25 @@ err_out_unlock: static int rbd_dev_header_name(struct rbd_device *rbd_dev) { struct rbd_spec *spec = rbd_dev->spec; - size_t size; + int ret; /* Record the header object name for this rbd image. */ rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); if (rbd_dev->image_format == 1) - size = strlen(spec->image_name) + sizeof (RBD_SUFFIX); + ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", + spec->image_name, RBD_SUFFIX); else - size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id); - - rbd_dev->header_name = kmalloc(size, GFP_KERNEL); - if (!rbd_dev->header_name) - return -ENOMEM; + ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", + RBD_HEADER_PREFIX, spec->image_id); - if (rbd_dev->image_format == 1) - sprintf(rbd_dev->header_name, "%s%s", - spec->image_name, RBD_SUFFIX); - else - sprintf(rbd_dev->header_name, "%s%s", - RBD_HEADER_PREFIX, spec->image_id); - return 0; + return ret; } static void rbd_dev_image_release(struct rbd_device *rbd_dev) { rbd_dev_unprobe(rbd_dev); - kfree(rbd_dev->header_name); - rbd_dev->header_name = NULL; rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); rbd_dev->spec->image_id = NULL; @@ -5348,7 +5342,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) pr_info("image %s/%s does not exist\n", rbd_dev->spec->pool_name, rbd_dev->spec->image_name); - goto out_header_name; + goto err_out_format; } } @@ -5394,7 +5388,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_probe; dout("discovered format %u image, header name is %s\n", - rbd_dev->image_format, rbd_dev->header_name); + rbd_dev->image_format, rbd_dev->header_oid.name); return 0; err_out_probe: @@ -5402,9 +5396,6 @@ err_out_probe: err_out_watch: if (!depth) rbd_dev_header_unwatch_sync(rbd_dev); -out_header_name: - kfree(rbd_dev->header_name); - rbd_dev->header_name = NULL; err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); -- cgit v1.2.3 From bb873b539154ab51893430b4ad6ba4051775276a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 26 May 2016 00:29:52 +0200 Subject: libceph: switch to calc_target(), part 2 The crux of this is getting rid of ceph_osdc_build_request(), so that MOSDOp can be encoded not before but after calc_target() calculates the actual target. Encoding now happens within ceph_osdc_start_request(). Also nuked is the accompanying bunch of pointers into the encoded buffer that was used to update fields on each send - instead, the entire front is re-encoded. If we want to support target->name_len != base->name_len in the future, there is no other way, because oid is surrounded by other fields in the encoded buffer. Encoding OSD ops and adding data items to the request message were mixed together in osd_req_encode_op(). While we want to re-encode OSD ops, we don't want to add duplicate data items to the message when resending, so all call to ceph_osdc_msg_data_add() are factored out into a new setup_request_data(). Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 18 +- fs/ceph/addr.c | 16 +- fs/ceph/file.c | 16 +- include/linux/ceph/osd_client.h | 29 ++-- include/linux/ceph/rados.h | 7 + net/ceph/debugfs.c | 61 ++++--- net/ceph/osd_client.c | 355 ++++++++++++++++++++-------------------- 7 files changed, 247 insertions(+), 255 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index f3ea927f93de..0e598916e048 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1896,27 +1896,17 @@ static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = obj_request->img_request; struct ceph_osd_request *osd_req = obj_request->osd_req; - u64 snap_id; - - rbd_assert(osd_req != NULL); - snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; - ceph_osdc_build_request(osd_req, obj_request->offset, - NULL, snap_id, NULL); + if (img_request) + osd_req->r_snapid = img_request->snap_id; } static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_osd_request *osd_req = obj_request->osd_req; - struct ceph_snap_context *snapc; - struct timespec mtime = CURRENT_TIME; - - rbd_assert(osd_req != NULL); - snapc = img_request ? img_request->snapc : NULL; - ceph_osdc_build_request(osd_req, obj_request->offset, - snapc, CEPH_NOSNAP, &mtime); + osd_req->r_mtime = CURRENT_TIME; + osd_req->r_data_offset = obj_request->offset; } /* diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index c5d75486823b..59b3c3fbd3bd 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -376,8 +376,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) req->r_callback = finish_read; req->r_inode = inode; - ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); - dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); ret = ceph_osdc_start_request(osdc, req, false); if (ret < 0) @@ -1063,10 +1061,7 @@ new_request: pages = NULL; } - vino = ceph_vino(inode); - ceph_osdc_build_request(req, offset, snapc, vino.snap, - &inode->i_mtime); - + req->r_mtime = inode->i_mtime; rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); BUG_ON(rc); req = NULL; @@ -1614,7 +1609,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) goto out; } - ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); + req->r_mtime = inode->i_mtime; err = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (!err) err = ceph_osdc_wait_request(&fsc->client->osdc, req); @@ -1657,7 +1652,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) goto out_put; } - ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); + req->r_mtime = inode->i_mtime; err = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (!err) err = ceph_osdc_wait_request(&fsc->client->osdc, req); @@ -1790,12 +1785,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 0, false, true); - ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP, - &ci->vfs_inode.i_mtime); err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); - ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, - &ci->vfs_inode.i_mtime); + wr_req->r_mtime = ci->vfs_inode.i_mtime; err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); if (!err) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 36b4a41dfa67..52e4b72dd5de 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -727,8 +727,8 @@ static void ceph_aio_retry_work(struct work_struct *work) req->r_ops[0] = orig_req->r_ops[0]; osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); - ceph_osdc_build_request(req, req->r_ops[0].extent.offset, - snapc, CEPH_NOSNAP, &aio_req->mtime); + req->r_mtime = aio_req->mtime; + req->r_data_offset = req->r_ops[0].extent.offset; ceph_osdc_put_request(orig_req); @@ -882,14 +882,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, (pos+len) | (PAGE_SIZE - 1)); osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); + req->r_mtime = mtime; } - osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, false, false); - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - if (aio_req) { aio_req->total_len += len; aio_req->num_reqs++; @@ -1074,9 +1072,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, true); - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - + req->r_mtime = mtime; ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); @@ -1532,9 +1528,7 @@ static int ceph_zero_partial_object(struct inode *inode, goto out; } - ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, - &inode->i_mtime); - + req->r_mtime = inode->i_mtime; ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (!ret) { ret = ceph_osdc_wait_request(&fsc->client->osdc, req); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 03bf9d9e1517..67a37d98e0ca 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -104,7 +104,7 @@ struct ceph_osd_req_op { struct ceph_osd_data response_data; __u8 class_len; __u8 method_len; - __u8 argc; + u32 indata_len; } cls; struct { u64 cookie; @@ -162,14 +162,6 @@ struct ceph_osd_request { /* request osd ops array */ unsigned int r_num_ops; - /* these are updated on each send */ - __le32 *r_request_osdmap_epoch; - __le32 *r_request_flags; - __le64 *r_request_pool; - void *r_request_pgid; - __le32 *r_request_attempts; - struct ceph_eversion *r_request_reassert_version; - int r_result; int r_got_reply; int r_linger; @@ -180,16 +172,22 @@ struct ceph_osd_request { struct completion r_completion, r_safe_completion; ceph_osdc_callback_t r_callback; ceph_osdc_unsafe_callback_t r_unsafe_callback; - struct ceph_eversion r_reassert_version; struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ void *r_priv; /* ditto */ - u64 r_snapid; - unsigned long r_stamp; /* send OR check time */ + /* set by submitter */ + u64 r_snapid; /* for reads, CEPH_NOSNAP o/w */ + struct ceph_snap_context *r_snapc; /* for writes */ + struct timespec r_mtime; /* ditto */ + u64 r_data_offset; /* ditto */ - struct ceph_snap_context *r_snapc; /* snap context for writes */ + /* internal */ + unsigned long r_stamp; /* jiffies, send or check time */ + int r_attempts; + struct ceph_eversion r_replay_version; /* aka reassert_version */ + u32 r_last_force_resend; struct ceph_osd_req_op r_ops[]; }; @@ -334,11 +332,6 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client * gfp_t gfp_flags); int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp); -extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, - struct ceph_snap_context *snapc, - u64 snap_id, - struct timespec *mtime); - extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_file_layout *layout, struct ceph_vino vino, diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index f28ed864e682..28740a58f32c 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -394,6 +394,13 @@ enum { CEPH_OSD_FLAG_SKIPRWLOCKS = 0x10000, /* skip rw locks */ CEPH_OSD_FLAG_IGNORE_OVERLAY = 0x20000, /* ignore pool overlay */ CEPH_OSD_FLAG_FLUSH = 0x40000, /* this is part of flush */ + CEPH_OSD_FLAG_MAP_SNAP_CLONE = 0x80000, /* map snap direct to clone id */ + CEPH_OSD_FLAG_ENFORCE_SNAPC = 0x100000, /* use snapc provided even if + pool uses pool snaps */ + CEPH_OSD_FLAG_REDIRECTED = 0x200000, /* op has been redirected */ + CEPH_OSD_FLAG_KNOWN_REDIR = 0x400000, /* redirect bit is authoritative */ + CEPH_OSD_FLAG_FULL_TRY = 0x800000, /* try op despite full flag */ + CEPH_OSD_FLAG_FULL_FORCE = 0x1000000, /* force op despite full flag */ }; enum { diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 0c11ab5f8c30..6d3ff713edeb 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -145,6 +145,43 @@ static int monc_show(struct seq_file *s, void *p) return 0; } +static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t) +{ + int i; + + seq_printf(s, "osd%d\t%llu.%x\t[", t->osd, t->pgid.pool, t->pgid.seed); + for (i = 0; i < t->up.size; i++) + seq_printf(s, "%s%d", (!i ? "" : ","), t->up.osds[i]); + seq_printf(s, "]/%d\t[", t->up.primary); + for (i = 0; i < t->acting.size; i++) + seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]); + seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary, + t->target_oid.name_len, t->target_oid.name, t->flags); + if (t->paused) + seq_puts(s, "\tP"); +} + +static void dump_request(struct seq_file *s, struct ceph_osd_request *req) +{ + int i; + + seq_printf(s, "%llu\t", req->r_tid); + dump_target(s, &req->r_t); + + seq_printf(s, "\t%d\t%u'%llu", req->r_attempts, + le32_to_cpu(req->r_replay_version.epoch), + le64_to_cpu(req->r_replay_version.version)); + + for (i = 0; i < req->r_num_ops; i++) { + struct ceph_osd_req_op *op = &req->r_ops[i]; + + seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), + ceph_osd_op_name(op->op)); + } + + seq_putc(s, '\n'); +} + static int osdc_show(struct seq_file *s, void *pp) { struct ceph_client *client = s->private; @@ -154,32 +191,10 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; - unsigned int i; - int opcode; req = rb_entry(p, struct ceph_osd_request, r_node); - seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1, - req->r_t.pgid.pool, req->r_t.pgid.seed); - - seq_printf(s, "%*pE", req->r_base_oid.name_len, - req->r_base_oid.name); - - if (req->r_reassert_version.epoch) - seq_printf(s, "\t%u'%llu", - (unsigned int)le32_to_cpu(req->r_reassert_version.epoch), - le64_to_cpu(req->r_reassert_version.version)); - else - seq_printf(s, "\t"); - - for (i = 0; i < req->r_num_ops; i++) { - opcode = req->r_ops[i].op; - seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), - ceph_osd_op_name(opcode)); - } - - seq_printf(s, "\n"); + dump_request(s, req); } mutex_unlock(&osdc->request_mutex); return 0; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 013101598c41..8a008f083283 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -34,8 +34,6 @@ static void __unregister_request(struct ceph_osd_client *osdc, static void __unregister_linger_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); static void __enqueue_request(struct ceph_osd_request *req); -static void __send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); /* * Implement client access to distributed object storage cluster. @@ -209,6 +207,8 @@ void osd_req_op_cls_request_data_pagelist( osd_data = osd_req_op_data(osd_req, which, cls, request_data); ceph_osd_data_pagelist_init(osd_data, pagelist); + osd_req->r_ops[which].cls.indata_len += pagelist->length; + osd_req->r_ops[which].indata_len += pagelist->length; } EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); @@ -221,6 +221,8 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, osd_data = osd_req_op_data(osd_req, which, cls, request_data); ceph_osd_data_pages_init(osd_data, pages, length, alignment, pages_from_pool, own_pages); + osd_req->r_ops[which].cls.indata_len += length; + osd_req->r_ops[which].indata_len += length; } EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); @@ -610,8 +612,6 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); - op->cls.argc = 0; /* currently unused */ - op->indata_len = payload_len; } EXPORT_SYMBOL(osd_req_op_cls_init); @@ -709,16 +709,9 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg, } } -static u64 osd_req_encode_op(struct ceph_osd_request *req, - struct ceph_osd_op *dst, unsigned int which) +static u32 osd_req_encode_op(struct ceph_osd_op *dst, + const struct ceph_osd_req_op *src) { - struct ceph_osd_req_op *src; - struct ceph_osd_data *osd_data; - u64 request_data_len = 0; - u64 data_length; - - BUG_ON(which >= req->r_num_ops); - src = &req->r_ops[which]; if (WARN_ON(!osd_req_opcode_valid(src->op))) { pr_err("unrecognized osd opcode %d\n", src->op); @@ -727,49 +720,23 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, switch (src->op) { case CEPH_OSD_OP_STAT: - osd_data = &src->raw_data_in; - ceph_osdc_msg_data_add(req->r_reply, osd_data); break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: case CEPH_OSD_OP_TRUNCATE: - if (src->op == CEPH_OSD_OP_WRITE || - src->op == CEPH_OSD_OP_WRITEFULL) - request_data_len = src->extent.length; dst->extent.offset = cpu_to_le64(src->extent.offset); dst->extent.length = cpu_to_le64(src->extent.length); dst->extent.truncate_size = cpu_to_le64(src->extent.truncate_size); dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); - osd_data = &src->extent.osd_data; - if (src->op == CEPH_OSD_OP_WRITE || - src->op == CEPH_OSD_OP_WRITEFULL) - ceph_osdc_msg_data_add(req->r_request, osd_data); - else - ceph_osdc_msg_data_add(req->r_reply, osd_data); break; case CEPH_OSD_OP_CALL: dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; - osd_data = &src->cls.request_info; - ceph_osdc_msg_data_add(req->r_request, osd_data); - BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); - request_data_len = osd_data->pagelist->length; - - osd_data = &src->cls.request_data; - data_length = ceph_osd_data_length(osd_data); - if (data_length) { - BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); - dst->cls.indata_len = cpu_to_le32(data_length); - ceph_osdc_msg_data_add(req->r_request, osd_data); - src->indata_len += data_length; - request_data_len += data_length; - } - osd_data = &src->cls.response_data; - ceph_osdc_msg_data_add(req->r_reply, osd_data); + dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); break; case CEPH_OSD_OP_STARTSYNC: break; @@ -791,9 +758,6 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); dst->xattr.cmp_op = src->xattr.cmp_op; dst->xattr.cmp_mode = src->xattr.cmp_mode; - osd_data = &src->xattr.osd_data; - ceph_osdc_msg_data_add(req->r_request, osd_data); - request_data_len = osd_data->pagelist->length; break; case CEPH_OSD_OP_CREATE: case CEPH_OSD_OP_DELETE: @@ -810,7 +774,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->flags = cpu_to_le32(src->flags); dst->payload_len = cpu_to_le32(src->indata_len); - return request_data_len; + return src->indata_len; } /* @@ -852,8 +816,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, goto fail; } - req->r_flags = flags; - /* calculate max write size */ r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); if (r) @@ -877,9 +839,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size, truncate_seq); } + req->r_flags = flags; req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); + req->r_snapid = vino.snap; + if (flags & CEPH_OSD_FLAG_WRITE) + req->r_data_offset = off; + r = ceph_osdc_alloc_messages(req, GFP_NOFS); if (r) goto fail; @@ -1509,37 +1476,173 @@ out: return err; } -/* - * caller should hold map_sem (for read) and request_mutex - */ -static void __send_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) +static void setup_request_data(struct ceph_osd_request *req, + struct ceph_msg *msg) { - void *p; + u32 data_len = 0; + int i; + + if (!list_empty(&msg->data)) + return; - dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", - req, req->r_tid, req->r_osd->o_osd, req->r_flags, - req->r_t.pgid.pool, req->r_t.pgid.seed); + WARN_ON(msg->data_length); + for (i = 0; i < req->r_num_ops; i++) { + struct ceph_osd_req_op *op = &req->r_ops[i]; + + switch (op->op) { + /* request */ + case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: + WARN_ON(op->indata_len != op->extent.length); + ceph_osdc_msg_data_add(msg, &op->extent.osd_data); + break; + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_CMPXATTR: + WARN_ON(op->indata_len != op->xattr.name_len + + op->xattr.value_len); + ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); + break; + + /* reply */ + case CEPH_OSD_OP_STAT: + ceph_osdc_msg_data_add(req->r_reply, + &op->raw_data_in); + break; + case CEPH_OSD_OP_READ: + ceph_osdc_msg_data_add(req->r_reply, + &op->extent.osd_data); + break; + + /* both */ + case CEPH_OSD_OP_CALL: + WARN_ON(op->indata_len != op->cls.class_len + + op->cls.method_len + + op->cls.indata_len); + ceph_osdc_msg_data_add(msg, &op->cls.request_info); + /* optional, can be NONE */ + ceph_osdc_msg_data_add(msg, &op->cls.request_data); + /* optional, can be NONE */ + ceph_osdc_msg_data_add(req->r_reply, + &op->cls.response_data); + break; + } + + data_len += op->indata_len; + } - /* fill in message content that changes each time we send it */ - put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); - put_unaligned_le32(req->r_flags, req->r_request_flags); - put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool); - p = req->r_request_pgid; + WARN_ON(data_len != msg->data_length); +} + +static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) +{ + void *p = msg->front.iov_base; + void *const end = p + msg->front_alloc_len; + u32 data_len = 0; + int i; + + if (req->r_flags & CEPH_OSD_FLAG_WRITE) { + /* snapshots aren't writeable */ + WARN_ON(req->r_snapid != CEPH_NOSNAP); + } else { + WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || + req->r_data_offset || req->r_snapc); + } + + setup_request_data(req, msg); + + ceph_encode_32(&p, 1); /* client_inc, always 1 */ + ceph_encode_32(&p, req->r_osdc->osdmap->epoch); + ceph_encode_32(&p, req->r_flags); + ceph_encode_timespec(p, &req->r_mtime); + p += sizeof(struct ceph_timespec); + /* aka reassert_version */ + memcpy(p, &req->r_replay_version, sizeof(req->r_replay_version)); + p += sizeof(req->r_replay_version); + + /* oloc */ + ceph_encode_8(&p, 4); + ceph_encode_8(&p, 4); + ceph_encode_32(&p, 8 + 4 + 4); + ceph_encode_64(&p, req->r_t.target_oloc.pool); + ceph_encode_32(&p, -1); /* preferred */ + ceph_encode_32(&p, 0); /* key len */ + + /* pgid */ + ceph_encode_8(&p, 1); ceph_encode_64(&p, req->r_t.pgid.pool); ceph_encode_32(&p, req->r_t.pgid.seed); - put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ - memcpy(req->r_request_reassert_version, &req->r_reassert_version, - sizeof(req->r_reassert_version)); + ceph_encode_32(&p, -1); /* preferred */ - req->r_stamp = jiffies; - list_move_tail(&req->r_req_lru_item, &osdc->req_lru); + /* oid */ + ceph_encode_32(&p, req->r_t.target_oid.name_len); + memcpy(p, req->r_t.target_oid.name, req->r_t.target_oid.name_len); + p += req->r_t.target_oid.name_len; - ceph_msg_get(req->r_request); /* send consumes a ref */ + /* ops, can imply data */ + ceph_encode_16(&p, req->r_num_ops); + for (i = 0; i < req->r_num_ops; i++) { + data_len += osd_req_encode_op(p, &req->r_ops[i]); + p += sizeof(struct ceph_osd_op); + } - req->r_sent = req->r_osd->o_incarnation; + ceph_encode_64(&p, req->r_snapid); /* snapid */ + if (req->r_snapc) { + ceph_encode_64(&p, req->r_snapc->seq); + ceph_encode_32(&p, req->r_snapc->num_snaps); + for (i = 0; i < req->r_snapc->num_snaps; i++) + ceph_encode_64(&p, req->r_snapc->snaps[i]); + } else { + ceph_encode_64(&p, 0); /* snap_seq */ + ceph_encode_32(&p, 0); /* snaps len */ + } + + ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ + + BUG_ON(p > end); + msg->front.iov_len = p - msg->front.iov_base; + msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + msg->hdr.data_len = cpu_to_le32(data_len); + /* + * The header "data_off" is a hint to the receiver allowing it + * to align received data into its buffers such that there's no + * need to re-copy it before writing it to disk (direct I/O). + */ + msg->hdr.data_off = cpu_to_le16(req->r_data_offset); - ceph_con_send(&req->r_osd->o_con, req->r_request); + dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__, + req, req->r_t.target_oid.name_len, req->r_t.target_oid.name, + req->r_t.target_oid.name_len, msg->front.iov_len, data_len); +} + +/* + * @req has to be assigned a tid and registered. + */ +static void send_request(struct ceph_osd_request *req) +{ + struct ceph_osd *osd = req->r_osd; + + WARN_ON(osd->o_osd != req->r_t.osd); + + req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; + if (req->r_attempts) + req->r_flags |= CEPH_OSD_FLAG_RETRY; + else + WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); + + encode_request(req, req->r_request); + + dout("%s req %p tid %llu to pg %llu.%x osd%d flags 0x%x attempt %d\n", + __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, + req->r_t.osd, req->r_flags, req->r_attempts); + + req->r_t.paused = false; + req->r_stamp = jiffies; + req->r_attempts++; + + req->r_sent = osd->o_incarnation; + req->r_request->hdr.tid = cpu_to_le64(req->r_tid); + ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); } /* @@ -1550,8 +1653,10 @@ static void __send_queued(struct ceph_osd_client *osdc) struct ceph_osd_request *req, *tmp; dout("__send_queued\n"); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) - __send_request(osdc, req); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + list_move_tail(&req->r_req_lru_item, &osdc->req_lru); + send_request(req); + } } /* @@ -1915,8 +2020,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) req->r_result = bytes; /* in case this is a write and we need to replay, */ - req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); - req->r_reassert_version.version = cpu_to_le64(reassert_version); + req->r_replay_version.epoch = cpu_to_le32(reassert_epoch); + req->r_replay_version.version = cpu_to_le64(reassert_version); req->r_got_reply = 1; } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { @@ -2432,105 +2537,6 @@ bad: pr_err("osdc handle_watch_notify corrupt msg\n"); } -/* - * build new request AND message - * - */ -void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, - struct ceph_snap_context *snapc, u64 snap_id, - struct timespec *mtime) -{ - struct ceph_msg *msg = req->r_request; - void *p; - size_t msg_size; - int flags = req->r_flags; - u64 data_len; - unsigned int i; - - req->r_snapid = snap_id; - WARN_ON(snapc != req->r_snapc); - - /* encode request */ - msg->hdr.version = cpu_to_le16(4); - - p = msg->front.iov_base; - ceph_encode_32(&p, 1); /* client_inc is always 1 */ - req->r_request_osdmap_epoch = p; - p += 4; - req->r_request_flags = p; - p += 4; - if (req->r_flags & CEPH_OSD_FLAG_WRITE) - ceph_encode_timespec(p, mtime); - p += sizeof(struct ceph_timespec); - req->r_request_reassert_version = p; - p += sizeof(struct ceph_eversion); /* will get filled in */ - - /* oloc */ - ceph_encode_8(&p, 4); - ceph_encode_8(&p, 4); - ceph_encode_32(&p, 8 + 4 + 4); - req->r_request_pool = p; - p += 8; - ceph_encode_32(&p, -1); /* preferred */ - ceph_encode_32(&p, 0); /* key len */ - - ceph_encode_8(&p, 1); - req->r_request_pgid = p; - p += 8 + 4; - ceph_encode_32(&p, -1); /* preferred */ - - /* oid */ - ceph_encode_32(&p, req->r_base_oid.name_len); - memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); - dout("oid %*pE len %d\n", req->r_base_oid.name_len, - req->r_base_oid.name, req->r_base_oid.name_len); - p += req->r_base_oid.name_len; - - /* ops--can imply data */ - ceph_encode_16(&p, (u16)req->r_num_ops); - data_len = 0; - for (i = 0; i < req->r_num_ops; i++) { - data_len += osd_req_encode_op(req, p, i); - p += sizeof(struct ceph_osd_op); - } - - /* snaps */ - ceph_encode_64(&p, req->r_snapid); - ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); - ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); - if (req->r_snapc) { - for (i = 0; i < req->r_snapc->num_snaps; i++) { - ceph_encode_64(&p, req->r_snapc->snaps[i]); - } - } - - req->r_request_attempts = p; - p += 4; - - /* data */ - if (flags & CEPH_OSD_FLAG_WRITE) { - u16 data_off; - - /* - * The header "data_off" is a hint to the receiver - * allowing it to align received data into its - * buffers such that there's no need to re-copy - * it before writing it to disk (direct I/O). - */ - data_off = (u16) (off & 0xffff); - req->r_request->hdr.data_off = cpu_to_le16(data_off); - } - req->r_request->hdr.data_len = cpu_to_le32(data_len); - - BUG_ON(p > msg->front.iov_base + msg->front.iov_len); - msg_size = p - msg->front.iov_base; - msg->front.iov_len = msg_size; - msg->hdr.front_len = cpu_to_le32(msg_size); - - dout("build_request msg_size was %d\n", (int)msg_size); -} -EXPORT_SYMBOL(ceph_osdc_build_request); - /* * Register request, send initial attempt. */ @@ -2749,15 +2755,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, return PTR_ERR(req); /* it may be a short read due to an object boundary */ - osd_req_op_extent_osd_data_pages(req, 0, pages, *plen, page_align, false, false); dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", off, *plen, *plen, page_align); - ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); - rc = ceph_osdc_start_request(osdc, req, false); if (!rc) rc = ceph_osdc_wait_request(osdc, req); @@ -2783,7 +2786,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, int rc = 0; int page_align = off & ~PAGE_MASK; - BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, @@ -2797,8 +2799,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, false, false); dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); - ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); - + req->r_mtime = *mtime; rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); -- cgit v1.2.3 From 85e084feb47349d62989efe1713a8723af95f4ea Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 28 Apr 2016 16:07:24 +0200 Subject: libceph: drop msg argument from ceph_osdc_callback_t finish_read(), its only user, uses it to get to hdr.data_len, which is what ->r_result is set to on success. This gains us the ability to safely call callbacks from contexts other than reply, e.g. map check. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 5 ++--- fs/ceph/addr.c | 9 ++++----- fs/ceph/file.c | 7 +++---- include/linux/ceph/osd_client.h | 3 +-- net/ceph/osd_client.c | 4 ++-- 5 files changed, 12 insertions(+), 16 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 0e598916e048..82b03aa509e6 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1828,13 +1828,12 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request) obj_request_done_set(obj_request); } -static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, - struct ceph_msg *msg) +static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) { struct rbd_obj_request *obj_request = osd_req->r_priv; u16 opcode; - dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); + dout("%s: osd_req %p\n", __func__, osd_req); rbd_assert(osd_req == obj_request->osd_req); if (obj_request_img_data_test(obj_request)) { rbd_assert(obj_request->img_request); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 59b3c3fbd3bd..a11756a39471 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -257,12 +257,12 @@ static int ceph_readpage(struct file *filp, struct page *page) /* * Finish an async read(ahead) op. */ -static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) +static void finish_read(struct ceph_osd_request *req) { struct inode *inode = req->r_inode; struct ceph_osd_data *osd_data; - int rc = req->r_result; - int bytes = le32_to_cpu(msg->hdr.data_len); + int rc = req->r_result <= 0 ? req->r_result : 0; + int bytes = req->r_result >= 0 ? req->r_result : 0; int num_pages; int i; @@ -598,8 +598,7 @@ static void ceph_release_pages(struct page **pages, int num) * If we get an error, set the mapping error bit, but not the individual * page error bits. */ -static void writepages_finish(struct ceph_osd_request *req, - struct ceph_msg *msg) +static void writepages_finish(struct ceph_osd_request *req) { struct inode *inode = req->r_inode; struct ceph_inode_info *ci = ceph_inode(inode); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 52e4b72dd5de..e75fd0b028e9 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -616,8 +616,7 @@ static void ceph_aio_complete(struct inode *inode, kfree(aio_req); } -static void ceph_aio_complete_req(struct ceph_osd_request *req, - struct ceph_msg *msg) +static void ceph_aio_complete_req(struct ceph_osd_request *req) { int rc = req->r_result; struct inode *inode = req->r_inode; @@ -740,7 +739,7 @@ static void ceph_aio_retry_work(struct work_struct *work) out: if (ret < 0) { req->r_result = ret; - ceph_aio_complete_req(req, NULL); + ceph_aio_complete_req(req); } ceph_put_snap_context(snapc); @@ -961,7 +960,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, req, false); if (ret < 0) { req->r_result = ret; - ceph_aio_complete_req(req, NULL); + ceph_aio_complete_req(req); } } return -EIOCBQUEUED; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 67a37d98e0ca..3bebd60e7f9f 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -20,8 +20,7 @@ struct ceph_osd_client; /* * completion callback for async writepages */ -typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *, - struct ceph_msg *); +typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool); #define CEPH_HOMELESS_OSD -1 diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 8a008f083283..2a30c0bb3045 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2048,7 +2048,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) req->r_unsafe_callback(req, true); if (req->r_callback) - req->r_callback(req, msg); + req->r_callback(req); else complete_all(&req->r_completion); } @@ -2072,7 +2072,7 @@ bad_put: req->r_result = -EIO; __unregister_request(osdc, req); if (req->r_callback) - req->r_callback(req, msg); + req->r_callback(req); else complete_all(&req->r_completion); complete_request(req); -- cgit v1.2.3 From c525f03601f52c83ded046624138f2a45e0ba56c Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 28 Apr 2016 16:07:26 +0200 Subject: rbd: rbd_dev_header_unwatch_sync() variant Introduce __rbd_dev_header_unwatch_sync(), which doesn't flush notify callbacks. This is for the new rados_watcherrcb_t, which would be called from a notify callback. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 82b03aa509e6..fce23dc908e3 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3246,10 +3246,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) return 0; } -/* - * Tear down a watch request, synchronously. - */ -static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) { struct rbd_obj_request *obj_request; @@ -3269,6 +3266,14 @@ static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) ceph_osdc_cancel_event(rbd_dev->watch_event); rbd_dev->watch_event = NULL; +} + +/* + * Tear down a watch request, synchronously. + */ +static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +{ + __rbd_dev_header_unwatch_sync(rbd_dev); dout("%s flushing notifies\n", __func__); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); -- cgit v1.2.3 From 922dab6134178cae317ae00de86376cba59f3147 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 26 May 2016 01:15:02 +0200 Subject: libceph, rbd: ceph_osd_linger_request, watch/notify v2 This adds support and switches rbd to a new, more reliable version of watch/notify protocol. As with the OSD client update, this is mostly about getting the right structures linked into the right places so that reconnects are properly sent when needed. watch/notify v2 also requires sending regular pings to the OSDs - send_linger_ping(). A major change from the old watch/notify implementation is the introduction of ceph_osd_linger_request - linger requests no longer piggy back on ceph_osd_request. ceph_osd_event has been merged into ceph_osd_linger_request. All the details are now hidden within libceph, the interface consists of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack(). ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep the lifetime management simple. ceph_osdc_notify_ack() accepts an optional data payload, which is relayed back to the notifier. Portions of this patch are loosely based on work by Douglas Fuller and Mike Christie . Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 179 ++---- include/linux/ceph/ceph_fs.h | 5 +- include/linux/ceph/osd_client.h | 97 ++-- include/linux/ceph/rados.h | 17 +- net/ceph/ceph_strings.c | 16 + net/ceph/debugfs.c | 36 ++ net/ceph/osd_client.c | 1148 ++++++++++++++++++++++++++++++--------- 7 files changed, 1067 insertions(+), 431 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index fce23dc908e3..d0834c477f96 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -351,11 +351,11 @@ struct rbd_device { struct rbd_options *opts; struct ceph_object_id header_oid; + struct ceph_object_locator header_oloc; struct ceph_file_layout layout; - struct ceph_osd_event *watch_event; - struct rbd_obj_request *watch_request; + struct ceph_osd_linger_request *watch_handle; struct rbd_spec *parent_spec; u64 parent_overlap; @@ -1596,12 +1596,6 @@ static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) return __rbd_obj_request_wait(obj_request, 0); } -static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request, - unsigned long timeout) -{ - return __rbd_obj_request_wait(obj_request, timeout); -} - static void rbd_img_request_complete(struct rbd_img_request *img_request) { @@ -1751,12 +1745,6 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) complete_all(&obj_request->completion); } -static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) -{ - dout("%s: obj %p\n", __func__, obj_request); - obj_request_done_set(obj_request); -} - static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = NULL; @@ -1877,10 +1865,6 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) case CEPH_OSD_OP_CALL: rbd_osd_call_callback(obj_request); break; - case CEPH_OSD_OP_NOTIFY_ACK: - case CEPH_OSD_OP_WATCH: - rbd_osd_trivial_callback(obj_request); - break; default: rbd_warn(NULL, "%s: unsupported op %hu", obj_request->object_name, (unsigned short) opcode); @@ -3100,45 +3084,18 @@ out_err: obj_request_done_set(obj_request); } -static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id) -{ - struct rbd_obj_request *obj_request; - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - int ret; - - obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) - return -ENOMEM; +static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev); +static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev); - ret = -ENOMEM; - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - obj_request); - if (!obj_request->osd_req) - goto out; - - osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, - notify_id, 0, 0); - rbd_osd_req_format_read(obj_request); - - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; - ret = rbd_obj_request_wait(obj_request); -out: - rbd_obj_request_put(obj_request); - - return ret; -} - -static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) { - struct rbd_device *rbd_dev = (struct rbd_device *)data; + struct rbd_device *rbd_dev = arg; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; - dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, - rbd_dev->header_oid.name, (unsigned long long)notify_id, - (unsigned int)opcode); + dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, + cookie, notify_id); /* * Until adequate refresh error handling is in place, there is @@ -3150,63 +3107,31 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) if (ret) rbd_warn(rbd_dev, "refresh failed: %d", ret); - ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id); + ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, notify_id, cookie, + NULL, 0); if (ret) rbd_warn(rbd_dev, "notify_ack ret %d", ret); } -/* - * Send a (un)watch request and wait for the ack. Return a request - * with a ref held on success or error. - */ -static struct rbd_obj_request *rbd_obj_watch_request_helper( - struct rbd_device *rbd_dev, - bool watch) +static void rbd_watch_errcb(void *arg, u64 cookie, int err) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - struct ceph_options *opts = osdc->client->options; - struct rbd_obj_request *obj_request; + struct rbd_device *rbd_dev = arg; int ret; - obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0, - OBJ_REQUEST_NODATA); - if (!obj_request) - return ERR_PTR(-ENOMEM); - - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1, - obj_request); - if (!obj_request->osd_req) { - ret = -ENOMEM; - goto out; - } - - osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, - rbd_dev->watch_event->cookie, 0, watch); - rbd_osd_req_format_write(obj_request); - - if (watch) - ceph_osdc_set_request_linger(osdc, obj_request->osd_req); - - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_warn(rbd_dev, "encountered watch error: %d", err); - ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout); - if (ret) - goto out; + __rbd_dev_header_unwatch_sync(rbd_dev); - ret = obj_request->result; + ret = rbd_dev_header_watch_sync(rbd_dev); if (ret) { - if (watch) - rbd_obj_request_end(obj_request); - goto out; + rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); + return; } - return obj_request; - -out: - rbd_obj_request_put(obj_request); - return ERR_PTR(ret); + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); } /* @@ -3215,57 +3140,33 @@ out: static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; - struct rbd_obj_request *obj_request; - int ret; + struct ceph_osd_linger_request *handle; - rbd_assert(!rbd_dev->watch_event); - rbd_assert(!rbd_dev->watch_request); - - ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, - &rbd_dev->watch_event); - if (ret < 0) - return ret; - - obj_request = rbd_obj_watch_request_helper(rbd_dev, true); - if (IS_ERR(obj_request)) { - ceph_osdc_cancel_event(rbd_dev->watch_event); - rbd_dev->watch_event = NULL; - return PTR_ERR(obj_request); - } + rbd_assert(!rbd_dev->watch_handle); - /* - * A watch request is set to linger, so the underlying osd - * request won't go away until we unregister it. We retain - * a pointer to the object request during that time (in - * rbd_dev->watch_request), so we'll keep a reference to it. - * We'll drop that reference after we've unregistered it in - * rbd_dev_header_unwatch_sync(). - */ - rbd_dev->watch_request = obj_request; + handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, rbd_watch_cb, + rbd_watch_errcb, rbd_dev); + if (IS_ERR(handle)) + return PTR_ERR(handle); + rbd_dev->watch_handle = handle; return 0; } static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) { - struct rbd_obj_request *obj_request; - - rbd_assert(rbd_dev->watch_event); - rbd_assert(rbd_dev->watch_request); + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int ret; - rbd_obj_request_end(rbd_dev->watch_request); - rbd_obj_request_put(rbd_dev->watch_request); - rbd_dev->watch_request = NULL; + if (!rbd_dev->watch_handle) + return; - obj_request = rbd_obj_watch_request_helper(rbd_dev, false); - if (!IS_ERR(obj_request)) - rbd_obj_request_put(obj_request); - else - rbd_warn(rbd_dev, "unable to tear down watch request (%ld)", - PTR_ERR(obj_request)); + ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); + if (ret) + rbd_warn(rbd_dev, "failed to unwatch: %d", ret); - ceph_osdc_cancel_event(rbd_dev->watch_event); - rbd_dev->watch_event = NULL; + rbd_dev->watch_handle = NULL; } /* @@ -4081,6 +3982,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, init_rwsem(&rbd_dev->header_rwsem); ceph_oid_init(&rbd_dev->header_oid); + ceph_oloc_init(&rbd_dev->header_oloc); rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; @@ -5285,6 +5187,7 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); + rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout); if (rbd_dev->image_format == 1) ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", spec->image_name, RBD_SUFFIX); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 37f28bf55ce4..3b911ff889dd 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -153,8 +153,9 @@ struct ceph_dir_layout { /* watch-notify operations */ enum { - WATCH_NOTIFY = 1, /* notifying watcher */ - WATCH_NOTIFY_COMPLETE = 2, /* notifier notified when done */ + CEPH_WATCH_EVENT_NOTIFY = 1, /* notifying watcher */ + CEPH_WATCH_EVENT_NOTIFY_COMPLETE = 2, /* notifier notified when done */ + CEPH_WATCH_EVENT_DISCONNECT = 3, /* we were disconnected */ }; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 342f22f1f040..cd2dcb8939de 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -34,7 +34,7 @@ struct ceph_osd { struct rb_node o_node; struct ceph_connection o_con; struct rb_root o_requests; - struct list_head o_linger_requests; + struct rb_root o_linger_requests; struct list_head o_osd_lru; struct ceph_auth_handshake o_auth; unsigned long lru_ttl; @@ -108,11 +108,12 @@ struct ceph_osd_req_op { } cls; struct { u64 cookie; - u64 ver; - u32 prot_ver; - u32 timeout; - __u8 flag; + __u8 op; /* CEPH_OSD_WATCH_OP_ */ + u32 gen; } watch; + struct { + struct ceph_osd_data request_data; + } notify_ack; struct { u64 expected_object_size; u64 expected_write_size; @@ -145,8 +146,6 @@ struct ceph_osd_request_target { struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; - struct list_head r_linger_item; - struct list_head r_linger_osd_item; struct ceph_osd *r_osd; struct ceph_osd_request_target r_t; @@ -162,7 +161,6 @@ struct ceph_osd_request { int r_result; bool r_got_reply; - int r_linger; struct ceph_osd_client *r_osdc; struct kref r_kref; @@ -181,6 +179,7 @@ struct ceph_osd_request { struct ceph_snap_context *r_snapc; /* for writes */ struct timespec r_mtime; /* ditto */ u64 r_data_offset; /* ditto */ + bool r_linger; /* don't resend on failure */ /* internal */ unsigned long r_stamp; /* jiffies, send or check time */ @@ -195,23 +194,40 @@ struct ceph_request_redirect { struct ceph_object_locator oloc; }; -struct ceph_osd_event { - u64 cookie; - int one_shot; +typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len); +typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err); + +struct ceph_osd_linger_request { struct ceph_osd_client *osdc; - void (*cb)(u64, u64, u8, void *); - void *data; - struct rb_node node; - struct list_head osd_node; + u64 linger_id; + bool committed; + + struct ceph_osd *osd; + struct ceph_osd_request *reg_req; + struct ceph_osd_request *ping_req; + unsigned long ping_sent; + + struct ceph_osd_request_target t; + u32 last_force_resend; + + struct timespec mtime; + struct kref kref; -}; + struct mutex lock; + struct rb_node node; /* osd */ + struct rb_node osdc_node; /* osdc */ + struct list_head scan_item; + + struct completion reg_commit_wait; + int reg_commit_error; + int last_error; + + u32 register_gen; -struct ceph_osd_event_work { - struct work_struct work; - struct ceph_osd_event *event; - u64 ver; - u64 notify_id; - u8 opcode; + rados_watchcb2_t wcb; + rados_watcherrcb_t errcb; + void *data; }; struct ceph_osd_client { @@ -223,9 +239,10 @@ struct ceph_osd_client { struct rb_root osds; /* osds */ struct list_head osd_lru; /* idle osds */ spinlock_t osd_lru_lock; - struct list_head req_linger; /* lingering requests */ struct ceph_osd homeless_osd; atomic64_t last_tid; /* tid of last request */ + u64 last_linger_id; + struct rb_root linger_requests; /* lingering requests */ atomic_t num_requests; atomic_t num_homeless; struct delayed_work timeout_work; @@ -239,10 +256,6 @@ struct ceph_osd_client { struct ceph_msgpool msgpool_op; struct ceph_msgpool msgpool_op_reply; - spinlock_t event_lock; - struct rb_root event_tree; - u64 event_count; - struct workqueue_struct *notify_wq; }; @@ -314,9 +327,6 @@ extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode); -extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - u64 cookie, u64 version, int flag); extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, unsigned int which, u64 expected_object_size, @@ -339,9 +349,6 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, u32 truncate_seq, u64 truncate_size, bool use_mempool); -extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, - struct ceph_osd_request *req); - extern void ceph_osdc_get_request(struct ceph_osd_request *req); extern void ceph_osdc_put_request(struct ceph_osd_request *req); @@ -372,11 +379,23 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct timespec *mtime, struct page **pages, int nr_pages); -/* watch/notify events */ -extern int ceph_osdc_create_event(struct ceph_osd_client *osdc, - void (*event_cb)(u64, u64, u8, void *), - void *data, struct ceph_osd_event **pevent); -extern void ceph_osdc_cancel_event(struct ceph_osd_event *event); -extern void ceph_osdc_put_event(struct ceph_osd_event *event); +/* watch/notify */ +struct ceph_osd_linger_request * +ceph_osdc_watch(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + rados_watchcb2_t wcb, + rados_watcherrcb_t errcb, + void *data); +int ceph_osdc_unwatch(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq); + +int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 notify_id, + u64 cookie, + void *payload, + size_t payload_len); #endif diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 28740a58f32c..204c8c944703 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -427,7 +427,17 @@ enum { CEPH_OSD_CMPXATTR_MODE_U64 = 2 }; -#define RADOS_NOTIFY_VER 1 +enum { + CEPH_OSD_WATCH_OP_UNWATCH = 0, + CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1, + /* note: use only ODD ids to prevent pre-giant code from + interpreting the op as UNWATCH */ + CEPH_OSD_WATCH_OP_WATCH = 3, + CEPH_OSD_WATCH_OP_RECONNECT = 5, + CEPH_OSD_WATCH_OP_PING = 7, +}; + +const char *ceph_osd_watch_op_name(int o); /* * an individual object operation. each may be accompanied by some data @@ -462,8 +472,9 @@ struct ceph_osd_op { } __attribute__ ((packed)) snap; struct { __le64 cookie; - __le64 ver; - __u8 flag; /* 0 = unwatch, 1 = watch */ + __le64 ver; /* no longer used */ + __u8 op; /* CEPH_OSD_WATCH_OP_* */ + __le32 gen; /* registration generation */ } __attribute__ ((packed)) watch; struct { __le64 offset, length; diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 139a9cb19b0c..3773a4fa11e3 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -27,6 +27,22 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) } } +const char *ceph_osd_watch_op_name(int o) +{ + switch (o) { + case CEPH_OSD_WATCH_OP_UNWATCH: + return "unwatch"; + case CEPH_OSD_WATCH_OP_WATCH: + return "watch"; + case CEPH_OSD_WATCH_OP_RECONNECT: + return "reconnect"; + case CEPH_OSD_WATCH_OP_PING: + return "ping"; + default: + return "???"; + } +} + const char *ceph_osd_state_name(int s) { switch (s) { diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 61dbd9de4650..e64cb8583533 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -177,6 +177,9 @@ static void dump_request(struct seq_file *s, struct ceph_osd_request *req) seq_printf(s, "%s%s", (i == 0 ? "\t" : ","), ceph_osd_op_name(op->op)); + if (op->op == CEPH_OSD_OP_WATCH) + seq_printf(s, "-%s", + ceph_osd_watch_op_name(op->watch.op)); } seq_putc(s, '\n'); @@ -197,6 +200,31 @@ static void dump_requests(struct seq_file *s, struct ceph_osd *osd) mutex_unlock(&osd->lock); } +static void dump_linger_request(struct seq_file *s, + struct ceph_osd_linger_request *lreq) +{ + seq_printf(s, "%llu\t", lreq->linger_id); + dump_target(s, &lreq->t); + + seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen, + lreq->committed ? "C" : "", lreq->last_error); +} + +static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd) +{ + struct rb_node *n; + + mutex_lock(&osd->lock); + for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + dump_linger_request(s, lreq); + } + + mutex_unlock(&osd->lock); +} + static int osdc_show(struct seq_file *s, void *pp) { struct ceph_client *client = s->private; @@ -214,6 +242,14 @@ static int osdc_show(struct seq_file *s, void *pp) } dump_requests(s, &osdc->homeless_osd); + seq_puts(s, "LINGER REQUESTS\n"); + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + dump_linger_requests(s, osd); + } + dump_linger_requests(s, &osdc->homeless_osd); + up_read(&osdc->lock); return 0; } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ef1bcbe9af2d..ca0a7b58ba4f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -45,6 +45,10 @@ static const struct ceph_connection_operations osd_con_ops; static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); +static void link_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq); +static void unlink_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq); #if 1 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) @@ -74,10 +78,15 @@ static inline void verify_osd_locked(struct ceph_osd *osd) rwsem_is_locked(&osdc->lock)) && !rwsem_is_wrlocked(&osdc->lock)); } +static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) +{ + WARN_ON(!mutex_is_locked(&lreq->lock)); +} #else static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } static inline void verify_osd_locked(struct ceph_osd *osd) { } +static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } #endif /* @@ -322,6 +331,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_STAT: ceph_osd_data_release(&op->raw_data_in); break; + case CEPH_OSD_OP_NOTIFY_ACK: + ceph_osd_data_release(&op->notify_ack.request_data); + break; default: break; } @@ -345,6 +357,29 @@ static void target_init(struct ceph_osd_request_target *t) t->osd = CEPH_HOMELESS_OSD; } +static void target_copy(struct ceph_osd_request_target *dest, + const struct ceph_osd_request_target *src) +{ + ceph_oid_copy(&dest->base_oid, &src->base_oid); + ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); + ceph_oid_copy(&dest->target_oid, &src->target_oid); + ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); + + dest->pgid = src->pgid; /* struct */ + dest->pg_num = src->pg_num; + dest->pg_num_mask = src->pg_num_mask; + ceph_osds_copy(&dest->acting, &src->acting); + ceph_osds_copy(&dest->up, &src->up); + dest->size = src->size; + dest->min_size = src->min_size; + dest->sort_bitwise = src->sort_bitwise; + + dest->flags = src->flags; + dest->paused = src->paused; + + dest->osd = src->osd; +} + static void target_destroy(struct ceph_osd_request_target *t) { ceph_oid_destroy(&t->base_oid); @@ -357,8 +392,6 @@ static void target_destroy(struct ceph_osd_request_target *t) static void request_release_checks(struct ceph_osd_request *req) { WARN_ON(!RB_EMPTY_NODE(&req->r_node)); - WARN_ON(!list_empty(&req->r_linger_item)); - WARN_ON(!list_empty(&req->r_linger_osd_item)); WARN_ON(!list_empty(&req->r_unsafe_item)); WARN_ON(req->r_osd); } @@ -419,13 +452,48 @@ static void request_init(struct ceph_osd_request *req) init_completion(&req->r_completion); init_completion(&req->r_safe_completion); RB_CLEAR_NODE(&req->r_node); - INIT_LIST_HEAD(&req->r_linger_item); - INIT_LIST_HEAD(&req->r_linger_osd_item); INIT_LIST_HEAD(&req->r_unsafe_item); target_init(&req->r_t); } +/* + * This is ugly, but it allows us to reuse linger registration and ping + * requests, keeping the structure of the code around send_linger{_ping}() + * reasonable. Setting up a min_nr=2 mempool for each linger request + * and dealing with copying ops (this blasts req only, watch op remains + * intact) isn't any better. + */ +static void request_reinit(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + bool mempool = req->r_mempool; + unsigned int num_ops = req->r_num_ops; + u64 snapid = req->r_snapid; + struct ceph_snap_context *snapc = req->r_snapc; + bool linger = req->r_linger; + struct ceph_msg *request_msg = req->r_request; + struct ceph_msg *reply_msg = req->r_reply; + + dout("%s req %p\n", __func__, req); + WARN_ON(atomic_read(&req->r_kref.refcount) != 1); + request_release_checks(req); + + WARN_ON(atomic_read(&request_msg->kref.refcount) != 1); + WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1); + target_destroy(&req->r_t); + + request_init(req); + req->r_osdc = osdc; + req->r_mempool = mempool; + req->r_num_ops = num_ops; + req->r_snapid = snapid; + req->r_snapc = snapc; + req->r_linger = linger; + req->r_request = request_msg; + req->r_reply = reply_msg; +} + struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, unsigned int num_ops, @@ -681,21 +749,19 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, } EXPORT_SYMBOL(osd_req_op_xattr_init); -void osd_req_op_watch_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - u64 cookie, u64 version, int flag) +/* + * @watch_opcode: CEPH_OSD_WATCH_OP_* + */ +static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, + u64 cookie, u8 watch_opcode) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - opcode, 0); - - BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); + struct ceph_osd_req_op *op; + op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); op->watch.cookie = cookie; - op->watch.ver = version; - if (opcode == CEPH_OSD_OP_WATCH && flag) - op->watch.flag = (u8)1; + op->watch.op = watch_opcode; + op->watch.gen = 0; } -EXPORT_SYMBOL(osd_req_op_watch_init); void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, unsigned int which, @@ -771,11 +837,13 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); - dst->watch.ver = cpu_to_le64(src->watch.ver); - dst->watch.flag = src->watch.flag; + dst->watch.ver = cpu_to_le64(0); + dst->watch.op = src->watch.op; + dst->watch.gen = cpu_to_le32(src->watch.gen); + break; + case CEPH_OSD_OP_NOTIFY_ACK: break; case CEPH_OSD_OP_SETALLOCHINT: dst->alloc_hint.expected_object_size = @@ -915,7 +983,7 @@ static void osd_init(struct ceph_osd *osd) atomic_set(&osd->o_ref, 1); RB_CLEAR_NODE(&osd->o_node); osd->o_requests = RB_ROOT; - INIT_LIST_HEAD(&osd->o_linger_requests); + osd->o_linger_requests = RB_ROOT; INIT_LIST_HEAD(&osd->o_osd_lru); INIT_LIST_HEAD(&osd->o_keepalive_item); osd->o_incarnation = 1; @@ -926,7 +994,7 @@ static void osd_cleanup(struct ceph_osd *osd) { WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); WARN_ON(!list_empty(&osd->o_osd_lru)); WARN_ON(!list_empty(&osd->o_keepalive_item)); @@ -996,7 +1064,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd) static void maybe_move_osd_to_lru(struct ceph_osd *osd) { if (RB_EMPTY_ROOT(&osd->o_requests) && - list_empty(&osd->o_linger_requests)) + RB_EMPTY_ROOT(&osd->o_linger_requests)) __move_osd_to_lru(osd); } @@ -1036,6 +1104,17 @@ static void close_osd(struct ceph_osd *osd) unlink_request(osd, req); link_request(&osdc->homeless_osd, req); } + for (n = rb_first(&osd->o_linger_requests); n; ) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + n = rb_next(n); /* unlink_linger() */ + + dout(" reassigning lreq %p linger_id %llu\n", lreq, + lreq->linger_id); + unlink_linger(osd, lreq); + link_linger(&osdc->homeless_osd, lreq); + } __remove_osd_from_lru(osd); erase_osd(&osdc->osds, osd); @@ -1052,7 +1131,7 @@ static int reopen_osd(struct ceph_osd *osd) dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); if (RB_EMPTY_ROOT(&osd->o_requests) && - list_empty(&osd->o_linger_requests)) { + RB_EMPTY_ROOT(&osd->o_linger_requests)) { close_osd(osd); return -ENODEV; } @@ -1148,52 +1227,6 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) atomic_dec(&osd->o_osdc->num_homeless); } -static void __register_linger_request(struct ceph_osd *osd, - struct ceph_osd_request *req) -{ - dout("%s %p tid %llu\n", __func__, req, req->r_tid); - WARN_ON(!req->r_linger); - - ceph_osdc_get_request(req); - list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger); - list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests); - __remove_osd_from_lru(osd); - req->r_osd = osd; -} - -static void __unregister_linger_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - WARN_ON(!req->r_linger); - - if (list_empty(&req->r_linger_item)) { - dout("%s %p tid %llu not registered\n", __func__, req, - req->r_tid); - return; - } - - dout("%s %p tid %llu\n", __func__, req, req->r_tid); - list_del_init(&req->r_linger_item); - - if (req->r_osd) { - list_del_init(&req->r_linger_osd_item); - maybe_move_osd_to_lru(req->r_osd); - if (RB_EMPTY_ROOT(&req->r_osd->o_requests)) - req->r_osd = NULL; - } - ceph_osdc_put_request(req); -} - -void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - if (!req->r_linger) { - dout("set_request_linger %p\n", req); - req->r_linger = 1; - } -} -EXPORT_SYMBOL(ceph_osdc_set_request_linger); - static bool __pool_full(struct ceph_pg_pool_info *pi) { return pi->flags & CEPH_POOL_FLAG_FULL; @@ -1379,6 +1412,10 @@ static void setup_request_data(struct ceph_osd_request *req, op->xattr.value_len); ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); break; + case CEPH_OSD_OP_NOTIFY_ACK: + ceph_osdc_msg_data_add(msg, + &op->notify_ack.request_data); + break; /* reply */ case CEPH_OSD_OP_STAT: @@ -1683,6 +1720,460 @@ static void cancel_request(struct ceph_osd_request *req) finish_request(req); } +/* + * lingering requests, watch/notify v2 infrastructure + */ +static void linger_release(struct kref *kref) +{ + struct ceph_osd_linger_request *lreq = + container_of(kref, struct ceph_osd_linger_request, kref); + + dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, + lreq->reg_req, lreq->ping_req); + WARN_ON(!RB_EMPTY_NODE(&lreq->node)); + WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); + WARN_ON(!list_empty(&lreq->scan_item)); + WARN_ON(lreq->osd); + + if (lreq->reg_req) + ceph_osdc_put_request(lreq->reg_req); + if (lreq->ping_req) + ceph_osdc_put_request(lreq->ping_req); + target_destroy(&lreq->t); + kfree(lreq); +} + +static void linger_put(struct ceph_osd_linger_request *lreq) +{ + if (lreq) + kref_put(&lreq->kref, linger_release); +} + +static struct ceph_osd_linger_request * +linger_get(struct ceph_osd_linger_request *lreq) +{ + kref_get(&lreq->kref); + return lreq; +} + +static struct ceph_osd_linger_request * +linger_alloc(struct ceph_osd_client *osdc) +{ + struct ceph_osd_linger_request *lreq; + + lreq = kzalloc(sizeof(*lreq), GFP_NOIO); + if (!lreq) + return NULL; + + kref_init(&lreq->kref); + mutex_init(&lreq->lock); + RB_CLEAR_NODE(&lreq->node); + RB_CLEAR_NODE(&lreq->osdc_node); + INIT_LIST_HEAD(&lreq->scan_item); + init_completion(&lreq->reg_commit_wait); + + lreq->osdc = osdc; + target_init(&lreq->t); + + dout("%s lreq %p\n", __func__, lreq); + return lreq; +} + +DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) +DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) + +/* + * Create linger request <-> OSD session relation. + * + * @lreq has to be registered, @osd may be homeless. + */ +static void link_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq) +{ + verify_osd_locked(osd); + WARN_ON(!lreq->linger_id || lreq->osd); + dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, + osd->o_osd, lreq, lreq->linger_id); + + if (!osd_homeless(osd)) + __remove_osd_from_lru(osd); + else + atomic_inc(&osd->o_osdc->num_homeless); + + get_osd(osd); + insert_linger(&osd->o_linger_requests, lreq); + lreq->osd = osd; +} + +static void unlink_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq) +{ + verify_osd_locked(osd); + WARN_ON(lreq->osd != osd); + dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, + osd->o_osd, lreq, lreq->linger_id); + + lreq->osd = NULL; + erase_linger(&osd->o_linger_requests, lreq); + put_osd(osd); + + if (!osd_homeless(osd)) + maybe_move_osd_to_lru(osd); + else + atomic_dec(&osd->o_osdc->num_homeless); +} + +static bool __linger_registered(struct ceph_osd_linger_request *lreq) +{ + verify_osdc_locked(lreq->osdc); + + return !RB_EMPTY_NODE(&lreq->osdc_node); +} + +static bool linger_registered(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + bool registered; + + down_read(&osdc->lock); + registered = __linger_registered(lreq); + up_read(&osdc->lock); + + return registered; +} + +static void linger_register(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + verify_osdc_wrlocked(osdc); + WARN_ON(lreq->linger_id); + + linger_get(lreq); + lreq->linger_id = ++osdc->last_linger_id; + insert_linger_osdc(&osdc->linger_requests, lreq); +} + +static void linger_unregister(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + verify_osdc_wrlocked(osdc); + + erase_linger_osdc(&osdc->linger_requests, lreq); + linger_put(lreq); +} + +static void cancel_linger_request(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + WARN_ON(!req->r_linger); + cancel_request(req); + linger_put(lreq); +} + +struct linger_work { + struct work_struct work; + struct ceph_osd_linger_request *lreq; + + union { + struct { + u64 notify_id; + u64 notifier_id; + void *payload; /* points into @msg front */ + size_t payload_len; + + struct ceph_msg *msg; /* for ceph_msg_put() */ + } notify; + struct { + int err; + } error; + }; +}; + +static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, + work_func_t workfn) +{ + struct linger_work *lwork; + + lwork = kzalloc(sizeof(*lwork), GFP_NOIO); + if (!lwork) + return NULL; + + INIT_WORK(&lwork->work, workfn); + lwork->lreq = linger_get(lreq); + + return lwork; +} + +static void lwork_free(struct linger_work *lwork) +{ + struct ceph_osd_linger_request *lreq = lwork->lreq; + + linger_put(lreq); + kfree(lwork); +} + +static void lwork_queue(struct linger_work *lwork) +{ + struct ceph_osd_linger_request *lreq = lwork->lreq; + struct ceph_osd_client *osdc = lreq->osdc; + + verify_lreq_locked(lreq); + queue_work(osdc->notify_wq, &lwork->work); +} + +static void do_watch_notify(struct work_struct *w) +{ + struct linger_work *lwork = container_of(w, struct linger_work, work); + struct ceph_osd_linger_request *lreq = lwork->lreq; + + if (!linger_registered(lreq)) { + dout("%s lreq %p not registered\n", __func__, lreq); + goto out; + } + + dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", + __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, + lwork->notify.payload_len); + lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, + lwork->notify.notifier_id, lwork->notify.payload, + lwork->notify.payload_len); + +out: + ceph_msg_put(lwork->notify.msg); + lwork_free(lwork); +} + +static void do_watch_error(struct work_struct *w) +{ + struct linger_work *lwork = container_of(w, struct linger_work, work); + struct ceph_osd_linger_request *lreq = lwork->lreq; + + if (!linger_registered(lreq)) { + dout("%s lreq %p not registered\n", __func__, lreq); + goto out; + } + + dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); + lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); + +out: + lwork_free(lwork); +} + +static void queue_watch_error(struct ceph_osd_linger_request *lreq) +{ + struct linger_work *lwork; + + lwork = lwork_alloc(lreq, do_watch_error); + if (!lwork) { + pr_err("failed to allocate error-lwork\n"); + return; + } + + lwork->error.err = lreq->last_error; + lwork_queue(lwork); +} + +static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, + int result) +{ + if (!completion_done(&lreq->reg_commit_wait)) { + lreq->reg_commit_error = (result <= 0 ? result : 0); + complete_all(&lreq->reg_commit_wait); + } +} + +static void linger_commit_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, + lreq->linger_id, req->r_result); + WARN_ON(!__linger_registered(lreq)); + linger_reg_commit_complete(lreq, req->r_result); + lreq->committed = true; + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static int normalize_watch_error(int err) +{ + /* + * Translate ENOENT -> ENOTCONN so that a delete->disconnection + * notification and a failure to reconnect because we raced with + * the delete appear the same to the user. + */ + if (err == -ENOENT) + err = -ENOTCONN; + + return err; +} + +static void linger_reconnect_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, + lreq, lreq->linger_id, req->r_result, lreq->last_error); + if (req->r_result < 0) { + if (!lreq->last_error) { + lreq->last_error = normalize_watch_error(req->r_result); + queue_watch_error(lreq); + } + } + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static void send_linger(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_request *req = lreq->reg_req; + struct ceph_osd_req_op *op = &req->r_ops[0]; + + verify_osdc_wrlocked(req->r_osdc); + dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); + + if (req->r_osd) + cancel_linger_request(req); + + request_reinit(req); + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + req->r_flags = lreq->t.flags; + req->r_mtime = lreq->mtime; + + mutex_lock(&lreq->lock); + if (lreq->committed) { + WARN_ON(op->op != CEPH_OSD_OP_WATCH || + op->watch.cookie != lreq->linger_id); + op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; + op->watch.gen = ++lreq->register_gen; + dout("lreq %p reconnect register_gen %u\n", lreq, + op->watch.gen); + req->r_callback = linger_reconnect_cb; + } else { + WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); + dout("lreq %p register\n", lreq); + req->r_callback = linger_commit_cb; + } + mutex_unlock(&lreq->lock); + + req->r_priv = linger_get(lreq); + req->r_linger = true; + + submit_request(req, true); +} + +static void linger_ping_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", + __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, + lreq->last_error); + if (lreq->register_gen == req->r_ops[0].watch.gen) { + if (req->r_result && !lreq->last_error) { + lreq->last_error = normalize_watch_error(req->r_result); + queue_watch_error(lreq); + } + } else { + dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, + lreq->register_gen, req->r_ops[0].watch.gen); + } + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static void send_linger_ping(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd_request *req = lreq->ping_req; + struct ceph_osd_req_op *op = &req->r_ops[0]; + + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) { + dout("%s PAUSERD\n", __func__); + return; + } + + lreq->ping_sent = jiffies; + dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", + __func__, lreq, lreq->linger_id, lreq->ping_sent, + lreq->register_gen); + + if (req->r_osd) + cancel_linger_request(req); + + request_reinit(req); + target_copy(&req->r_t, &lreq->t); + + WARN_ON(op->op != CEPH_OSD_OP_WATCH || + op->watch.cookie != lreq->linger_id || + op->watch.op != CEPH_OSD_WATCH_OP_PING); + op->watch.gen = lreq->register_gen; + req->r_callback = linger_ping_cb; + req->r_priv = linger_get(lreq); + req->r_linger = true; + + ceph_osdc_get_request(req); + account_request(req); + req->r_tid = atomic64_inc_return(&osdc->last_tid); + link_request(lreq->osd, req); + send_request(req); +} + +static void linger_submit(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd *osd; + + calc_target(osdc, &lreq->t, &lreq->last_force_resend, false); + osd = lookup_create_osd(osdc, lreq->t.osd, true); + link_linger(osd, lreq); + + send_linger(lreq); +} + +/* + * @lreq has to be both registered and linked. + */ +static void __linger_cancel(struct ceph_osd_linger_request *lreq) +{ + if (lreq->ping_req->r_osd) + cancel_linger_request(lreq->ping_req); + if (lreq->reg_req->r_osd) + cancel_linger_request(lreq->reg_req); + unlink_linger(lreq->osd, lreq); + linger_unregister(lreq); +} + +static void linger_cancel(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + down_write(&osdc->lock); + if (__linger_registered(lreq)) + __linger_cancel(lreq); + up_write(&osdc->lock); +} + +static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) +{ + int ret; + + dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); + ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); + return ret ?: lreq->reg_commit_error; +} + /* * Timeout callback, called every N seconds. When 1 or more OSD * requests has been active for more than N seconds, we send a keepalive @@ -1720,6 +2211,19 @@ static void handle_timeout(struct work_struct *work) found = true; } } + for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { + struct ceph_osd_linger_request *lreq = + rb_entry(p, struct ceph_osd_linger_request, node); + + dout(" lreq %p linger_id %llu is served by osd%d\n", + lreq, lreq->linger_id, osd->o_osd); + found = true; + + mutex_lock(&lreq->lock); + if (lreq->committed && !lreq->last_error) + send_linger_ping(lreq); + mutex_unlock(&lreq->lock); + } if (found) list_move_tail(&osd->o_keepalive_item, &slow_osds); @@ -1756,7 +2260,7 @@ static void handle_osds_timeout(struct work_struct *work) break; WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); close_osd(osd); } @@ -2082,7 +2586,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) __finish_request(req); if (req->r_linger) { WARN_ON(req->r_unsafe_callback); - __register_linger_request(osd, req); + dout("req %p tid %llu cb (locked)\n", req, req->r_tid); + __complete_request(req); } } @@ -2093,7 +2598,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) if (already_acked && req->r_unsafe_callback) { dout("req %p tid %llu safe-cb\n", req, req->r_tid); req->r_unsafe_callback(req, false); - } else { + } else if (!req->r_linger) { dout("req %p tid %llu cb\n", req, req->r_tid); __complete_request(req); } @@ -2145,6 +2650,26 @@ static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) return pi->was_full && !__pool_full(pi); } +static enum calc_target_result +recalc_linger_target(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + enum calc_target_result ct_res; + + ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true); + if (ct_res == CALC_TARGET_NEED_RESEND) { + struct ceph_osd *osd; + + osd = lookup_create_osd(osdc, lreq->t.osd, true); + if (osd != lreq->osd) { + unlink_linger(lreq->osd, lreq); + link_linger(osd, lreq); + } + } + + return ct_res; +} + /* * Requeue requests whose mapping to an OSD has changed. */ @@ -2159,6 +2684,39 @@ static void scan_requests(struct ceph_osd *osd, struct rb_node *n; bool force_resend_writes; + for (n = rb_first(&osd->o_linger_requests); n; ) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + enum calc_target_result ct_res; + + n = rb_next(n); /* recalc_linger_target() */ + + dout("%s lreq %p linger_id %llu\n", __func__, lreq, + lreq->linger_id); + ct_res = recalc_linger_target(lreq); + switch (ct_res) { + case CALC_TARGET_NO_ACTION: + force_resend_writes = cleared_full || + (check_pool_cleared_full && + pool_cleared_full(osdc, lreq->t.base_oloc.pool)); + if (!force_resend && !force_resend_writes) + break; + + /* fall through */ + case CALC_TARGET_NEED_RESEND: + /* + * scan_requests() for the previous epoch(s) + * may have already added it to the list, since + * it's not unlinked here. + */ + if (list_empty(&lreq->scan_item)) + list_add_tail(&lreq->scan_item, need_resend_linger); + break; + case CALC_TARGET_POOL_DNE: + break; + } + } + for (n = rb_first(&osd->o_requests); n; ) { struct ceph_osd_request *req = rb_entry(n, struct ceph_osd_request, r_node); @@ -2263,6 +2821,7 @@ static void kick_requests(struct ceph_osd_client *osdc, struct rb_root *need_resend, struct list_head *need_resend_linger) { + struct ceph_osd_linger_request *lreq, *nlreq; struct rb_node *n; for (n = rb_first(need_resend); n; ) { @@ -2280,8 +2839,17 @@ static void kick_requests(struct ceph_osd_client *osdc, if (!req->r_linger) { if (!osd_homeless(osd) && !req->r_t.paused) send_request(req); + } else { + cancel_linger_request(req); } } + + list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { + if (!osd_homeless(lreq->osd)) + send_linger(lreq); + + list_del_init(&lreq->scan_item); + } } /* @@ -2406,15 +2974,25 @@ static void kick_osd_requests(struct ceph_osd *osd) { struct rb_node *n; - for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { + for (n = rb_first(&osd->o_requests); n; ) { struct ceph_osd_request *req = rb_entry(n, struct ceph_osd_request, r_node); + n = rb_next(n); /* cancel_linger_request() */ + if (!req->r_linger) { if (!req->r_t.paused) send_request(req); + } else { + cancel_linger_request(req); } } + for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + send_linger(lreq); + } } /* @@ -2441,193 +3019,77 @@ out_unlock: up_write(&osdc->lock); } -/* - * watch/notify callback event infrastructure - * - * These callbacks are used both for watch and notify operations. - */ -static void __release_event(struct kref *kref) -{ - struct ceph_osd_event *event = - container_of(kref, struct ceph_osd_event, kref); - - dout("__release_event %p\n", event); - kfree(event); -} - -static void get_event(struct ceph_osd_event *event) -{ - kref_get(&event->kref); -} - -void ceph_osdc_put_event(struct ceph_osd_event *event) -{ - kref_put(&event->kref, __release_event); -} -EXPORT_SYMBOL(ceph_osdc_put_event); - -static void __insert_event(struct ceph_osd_client *osdc, - struct ceph_osd_event *new) -{ - struct rb_node **p = &osdc->event_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_osd_event *event = NULL; - - while (*p) { - parent = *p; - event = rb_entry(parent, struct ceph_osd_event, node); - if (new->cookie < event->cookie) - p = &(*p)->rb_left; - else if (new->cookie > event->cookie) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&new->node, parent, p); - rb_insert_color(&new->node, &osdc->event_tree); -} - -static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, - u64 cookie) -{ - struct rb_node **p = &osdc->event_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_osd_event *event = NULL; - - while (*p) { - parent = *p; - event = rb_entry(parent, struct ceph_osd_event, node); - if (cookie < event->cookie) - p = &(*p)->rb_left; - else if (cookie > event->cookie) - p = &(*p)->rb_right; - else - return event; - } - return NULL; -} - -static void __remove_event(struct ceph_osd_event *event) -{ - struct ceph_osd_client *osdc = event->osdc; - - if (!RB_EMPTY_NODE(&event->node)) { - dout("__remove_event removed %p\n", event); - rb_erase(&event->node, &osdc->event_tree); - ceph_osdc_put_event(event); - } else { - dout("__remove_event didn't remove %p\n", event); - } -} - -int ceph_osdc_create_event(struct ceph_osd_client *osdc, - void (*event_cb)(u64, u64, u8, void *), - void *data, struct ceph_osd_event **pevent) -{ - struct ceph_osd_event *event; - - event = kmalloc(sizeof(*event), GFP_NOIO); - if (!event) - return -ENOMEM; - - dout("create_event %p\n", event); - event->cb = event_cb; - event->one_shot = 0; - event->data = data; - event->osdc = osdc; - INIT_LIST_HEAD(&event->osd_node); - RB_CLEAR_NODE(&event->node); - kref_init(&event->kref); /* one ref for us */ - kref_get(&event->kref); /* one ref for the caller */ - - spin_lock(&osdc->event_lock); - event->cookie = ++osdc->event_count; - __insert_event(osdc, event); - spin_unlock(&osdc->event_lock); - - *pevent = event; - return 0; -} -EXPORT_SYMBOL(ceph_osdc_create_event); - -void ceph_osdc_cancel_event(struct ceph_osd_event *event) -{ - struct ceph_osd_client *osdc = event->osdc; - - dout("cancel_event %p\n", event); - spin_lock(&osdc->event_lock); - __remove_event(event); - spin_unlock(&osdc->event_lock); - ceph_osdc_put_event(event); /* caller's */ -} -EXPORT_SYMBOL(ceph_osdc_cancel_event); - - -static void do_event_work(struct work_struct *work) -{ - struct ceph_osd_event_work *event_work = - container_of(work, struct ceph_osd_event_work, work); - struct ceph_osd_event *event = event_work->event; - u64 ver = event_work->ver; - u64 notify_id = event_work->notify_id; - u8 opcode = event_work->opcode; - - dout("do_event_work completing %p\n", event); - event->cb(ver, notify_id, opcode, event->data); - dout("do_event_work completed %p\n", event); - ceph_osdc_put_event(event); - kfree(event_work); -} - - /* * Process osd watch notifications */ static void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) { - void *p, *end; - u8 proto_ver; - u64 cookie, ver, notify_id; - u8 opcode; - struct ceph_osd_event *event; - struct ceph_osd_event_work *event_work; - - p = msg->front.iov_base; - end = p + msg->front.iov_len; + void *p = msg->front.iov_base; + void *const end = p + msg->front.iov_len; + struct ceph_osd_linger_request *lreq; + struct linger_work *lwork; + u8 proto_ver, opcode; + u64 cookie, notify_id; + u64 notifier_id = 0; + void *payload = NULL; + u32 payload_len = 0; ceph_decode_8_safe(&p, end, proto_ver, bad); ceph_decode_8_safe(&p, end, opcode, bad); ceph_decode_64_safe(&p, end, cookie, bad); - ceph_decode_64_safe(&p, end, ver, bad); + p += 8; /* skip ver */ ceph_decode_64_safe(&p, end, notify_id, bad); - spin_lock(&osdc->event_lock); - event = __find_event(osdc, cookie); - if (event) { - BUG_ON(event->one_shot); - get_event(event); - } - spin_unlock(&osdc->event_lock); - dout("handle_watch_notify cookie %lld ver %lld event %p\n", - cookie, ver, event); - if (event) { - event_work = kmalloc(sizeof(*event_work), GFP_NOIO); - if (!event_work) { - pr_err("couldn't allocate event_work\n"); - ceph_osdc_put_event(event); - return; + if (proto_ver >= 1) { + ceph_decode_32_safe(&p, end, payload_len, bad); + ceph_decode_need(&p, end, payload_len, bad); + payload = p; + p += payload_len; + } + + if (le16_to_cpu(msg->hdr.version) >= 2) + p += 4; /* skip return_code */ + + if (le16_to_cpu(msg->hdr.version) >= 3) + ceph_decode_64_safe(&p, end, notifier_id, bad); + + down_read(&osdc->lock); + lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); + if (!lreq) { + dout("%s opcode %d cookie %llu dne\n", __func__, opcode, + cookie); + goto out_unlock_osdc; + } + + mutex_lock(&lreq->lock); + dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie, + lreq); + if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { + if (!lreq->last_error) { + lreq->last_error = -ENOTCONN; + queue_watch_error(lreq); + } + } else { + /* CEPH_WATCH_EVENT_NOTIFY */ + lwork = lwork_alloc(lreq, do_watch_notify); + if (!lwork) { + pr_err("failed to allocate notify-lwork\n"); + goto out_unlock_lreq; } - INIT_WORK(&event_work->work, do_event_work); - event_work->event = event; - event_work->ver = ver; - event_work->notify_id = notify_id; - event_work->opcode = opcode; - queue_work(osdc->notify_wq, &event_work->work); + lwork->notify.notify_id = notify_id; + lwork->notify.notifier_id = notifier_id; + lwork->notify.payload = payload; + lwork->notify.payload_len = payload_len; + lwork->notify.msg = ceph_msg_get(msg); + lwork_queue(lwork); } +out_unlock_lreq: + mutex_unlock(&lreq->lock); +out_unlock_osdc: + up_read(&osdc->lock); return; bad: @@ -2659,8 +3121,6 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req) struct ceph_osd_client *osdc = req->r_osdc; down_write(&osdc->lock); - if (req->r_linger) - __unregister_linger_request(osdc, req); if (req->r_osd) cancel_request(req); up_write(&osdc->lock); @@ -2743,6 +3203,198 @@ again: } EXPORT_SYMBOL(ceph_osdc_sync); +static struct ceph_osd_request * +alloc_linger_request(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_request *req; + + req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return NULL; + + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + + if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { + ceph_osdc_put_request(req); + return NULL; + } + + return req; +} + +/* + * Returns a handle, caller owns a ref. + */ +struct ceph_osd_linger_request * +ceph_osdc_watch(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + rados_watchcb2_t wcb, + rados_watcherrcb_t errcb, + void *data) +{ + struct ceph_osd_linger_request *lreq; + int ret; + + lreq = linger_alloc(osdc); + if (!lreq) + return ERR_PTR(-ENOMEM); + + lreq->wcb = wcb; + lreq->errcb = errcb; + lreq->data = data; + + ceph_oid_copy(&lreq->t.base_oid, oid); + ceph_oloc_copy(&lreq->t.base_oloc, oloc); + lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + lreq->mtime = CURRENT_TIME; + + lreq->reg_req = alloc_linger_request(lreq); + if (!lreq->reg_req) { + ret = -ENOMEM; + goto err_put_lreq; + } + + lreq->ping_req = alloc_linger_request(lreq); + if (!lreq->ping_req) { + ret = -ENOMEM; + goto err_put_lreq; + } + + down_write(&osdc->lock); + linger_register(lreq); /* before osd_req_op_* */ + osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_WATCH); + osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_PING); + linger_submit(lreq); + up_write(&osdc->lock); + + ret = linger_reg_commit_wait(lreq); + if (ret) { + linger_cancel(lreq); + goto err_put_lreq; + } + + return lreq; + +err_put_lreq: + linger_put(lreq); + return ERR_PTR(ret); +} +EXPORT_SYMBOL(ceph_osdc_watch); + +/* + * Releases a ref. + * + * Times out after mount_timeout to preserve rbd unmap behaviour + * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap + * with mount_timeout"). + */ +int ceph_osdc_unwatch(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq) +{ + struct ceph_options *opts = osdc->client->options; + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + req->r_mtime = CURRENT_TIME; + osd_req_op_watch_init(req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_UNWATCH); + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + ceph_osdc_start_request(osdc, req, false); + linger_cancel(lreq); + linger_put(lreq); + ret = wait_request_timeout(req, opts->mount_timeout); + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_unwatch); + +static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, + u64 notify_id, u64 cookie, void *payload, + size_t payload_len) +{ + struct ceph_osd_req_op *op; + struct ceph_pagelist *pl; + int ret; + + op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); + + pl = kmalloc(sizeof(*pl), GFP_NOIO); + if (!pl) + return -ENOMEM; + + ceph_pagelist_init(pl); + ret = ceph_pagelist_encode_64(pl, notify_id); + ret |= ceph_pagelist_encode_64(pl, cookie); + if (payload) { + ret |= ceph_pagelist_encode_32(pl, payload_len); + ret |= ceph_pagelist_append(pl, payload, payload_len); + } else { + ret |= ceph_pagelist_encode_32(pl, 0); + } + if (ret) { + ceph_pagelist_release(pl); + return -ENOMEM; + } + + ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); + op->indata_len = pl->length; + return 0; +} + +int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 notify_id, + u64 cookie, + void *payload, + size_t payload_len) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, + payload_len); + if (ret) + goto out_put_req; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_notify_ack); + /* * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked @@ -2767,15 +3419,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->osds = RB_ROOT; INIT_LIST_HEAD(&osdc->osd_lru); spin_lock_init(&osdc->osd_lru_lock); - INIT_LIST_HEAD(&osdc->req_linger); osd_init(&osdc->homeless_osd); osdc->homeless_osd.o_osdc = osdc; osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; + osdc->linger_requests = RB_ROOT; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); - spin_lock_init(&osdc->event_lock); - osdc->event_tree = RB_ROOT; - osdc->event_count = 0; err = -ENOMEM; osdc->osdmap = ceph_osdmap_alloc(); @@ -2838,6 +3487,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) osd_cleanup(&osdc->homeless_osd); WARN_ON(!list_empty(&osdc->osd_lru)); + WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); WARN_ON(atomic_read(&osdc->num_requests)); WARN_ON(atomic_read(&osdc->num_homeless)); -- cgit v1.2.3 From d0b19705e99939f5ae5aa9b57bfe41dd4777d951 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 28 Apr 2016 16:07:27 +0200 Subject: libceph: async MON client generic requests For map check, we are going to need to send CEPH_MSG_MON_GET_VERSION messages asynchronously and get a callback on completion. Refactor MON client to allow firing off generic requests asynchronously and add an async variant of ceph_monc_get_version(). ceph_monc_do_statfs() is switched over and remains sync. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 4 +- include/linux/ceph/mon_client.h | 19 ++- net/ceph/mon_client.c | 316 ++++++++++++++++++++++++++-------------- 3 files changed, 228 insertions(+), 111 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index d0834c477f96..8eae6f56194d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4896,8 +4896,8 @@ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) again: ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name); if (ret == -ENOENT && tries++ < 1) { - ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap", - &newest_epoch); + ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap", + &newest_epoch); if (ret < 0) return ret; diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h index c14e9d861cda..19800d9b45f3 100644 --- a/include/linux/ceph/mon_client.h +++ b/include/linux/ceph/mon_client.h @@ -39,20 +39,31 @@ struct ceph_mon_request { ceph_monc_request_func_t do_request; }; +typedef void (*ceph_monc_callback_t)(struct ceph_mon_generic_request *); + /* * ceph_mon_generic_request is being used for the statfs and * mon_get_version requests which are being done a bit differently * because we need to get data back to the caller */ struct ceph_mon_generic_request { + struct ceph_mon_client *monc; struct kref kref; u64 tid; struct rb_node node; int result; - void *buf; + struct completion completion; + ceph_monc_callback_t complete_cb; + u64 private_data; /* r_tid/linger_id */ + struct ceph_msg *request; /* original request */ struct ceph_msg *reply; /* and reply */ + + union { + struct ceph_statfs *st; + u64 newest; + } u; }; struct ceph_mon_client { @@ -124,8 +135,10 @@ extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, extern int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf); -extern int ceph_monc_do_get_version(struct ceph_mon_client *monc, - const char *what, u64 *newest); +int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + u64 *newest); +int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data); extern int ceph_monc_open_session(struct ceph_mon_client *monc); diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 98bfbe1f6807..4e49b2296920 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -493,6 +493,10 @@ static void release_generic_request(struct kref *kref) struct ceph_mon_generic_request *req = container_of(kref, struct ceph_mon_generic_request, kref); + dout("%s greq %p request %p reply %p\n", __func__, req, req->request, + req->reply); + WARN_ON(!RB_EMPTY_NODE(&req->node)); + if (req->reply) ceph_msg_put(req->reply); if (req->request) @@ -503,7 +507,8 @@ static void release_generic_request(struct kref *kref) static void put_generic_request(struct ceph_mon_generic_request *req) { - kref_put(&req->kref, release_generic_request); + if (req) + kref_put(&req->kref, release_generic_request); } static void get_generic_request(struct ceph_mon_generic_request *req) @@ -511,6 +516,103 @@ static void get_generic_request(struct ceph_mon_generic_request *req) kref_get(&req->kref); } +static struct ceph_mon_generic_request * +alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) +{ + struct ceph_mon_generic_request *req; + + req = kzalloc(sizeof(*req), gfp); + if (!req) + return NULL; + + req->monc = monc; + kref_init(&req->kref); + RB_CLEAR_NODE(&req->node); + init_completion(&req->completion); + + dout("%s greq %p\n", __func__, req); + return req; +} + +static void register_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + + WARN_ON(req->tid); + + get_generic_request(req); + req->tid = ++monc->last_tid; + insert_generic_request(&monc->generic_request_tree, req); +} + +static void send_generic_request(struct ceph_mon_client *monc, + struct ceph_mon_generic_request *req) +{ + WARN_ON(!req->tid); + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + req->request->hdr.tid = cpu_to_le64(req->tid); + ceph_con_send(&monc->con, ceph_msg_get(req->request)); +} + +static void __finish_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + erase_generic_request(&monc->generic_request_tree, req); + + ceph_msg_revoke(req->request); + ceph_msg_revoke_incoming(req->reply); +} + +static void finish_generic_request(struct ceph_mon_generic_request *req) +{ + __finish_generic_request(req); + put_generic_request(req); +} + +static void complete_generic_request(struct ceph_mon_generic_request *req) +{ + if (req->complete_cb) + req->complete_cb(req); + else + complete_all(&req->completion); + put_generic_request(req); +} + +void cancel_generic_request(struct ceph_mon_generic_request *req) +{ + struct ceph_mon_client *monc = req->monc; + struct ceph_mon_generic_request *lookup_req; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + + mutex_lock(&monc->mutex); + lookup_req = lookup_generic_request(&monc->generic_request_tree, + req->tid); + if (lookup_req) { + WARN_ON(lookup_req != req); + finish_generic_request(req); + } + + mutex_unlock(&monc->mutex); +} + +static int wait_generic_request(struct ceph_mon_generic_request *req) +{ + int ret; + + dout("%s greq %p tid %llu\n", __func__, req, req->tid); + ret = wait_for_completion_interruptible(&req->completion); + if (ret) + cancel_generic_request(req); + else + ret = req->result; /* completed */ + + return ret; +} + static struct ceph_msg *get_generic_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip) @@ -540,40 +642,6 @@ static struct ceph_msg *get_generic_reply(struct ceph_connection *con, return m; } -static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, - struct ceph_mon_generic_request *req) -{ - int err; - - /* register request */ - req->tid = tid != 0 ? tid : ++monc->last_tid; - req->request->hdr.tid = cpu_to_le64(req->tid); - insert_generic_request(&monc->generic_request_tree, req); - ceph_con_send(&monc->con, ceph_msg_get(req->request)); - mutex_unlock(&monc->mutex); - - err = wait_for_completion_interruptible(&req->completion); - - mutex_lock(&monc->mutex); - erase_generic_request(&monc->generic_request_tree, req); - - if (!err) - err = req->result; - return err; -} - -static int do_generic_request(struct ceph_mon_client *monc, - struct ceph_mon_generic_request *req) -{ - int err; - - mutex_lock(&monc->mutex); - err = __do_generic_request(monc, 0, req); - mutex_unlock(&monc->mutex); - - return err; -} - /* * statfs */ @@ -584,22 +652,24 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, struct ceph_mon_statfs_reply *reply = msg->front.iov_base; u64 tid = le64_to_cpu(msg->hdr.tid); + dout("%s msg %p tid %llu\n", __func__, msg, tid); + if (msg->front.iov_len != sizeof(*reply)) goto bad; - dout("handle_statfs_reply %p tid %llu\n", msg, tid); mutex_lock(&monc->mutex); req = lookup_generic_request(&monc->generic_request_tree, tid); - if (req) { - *(struct ceph_statfs *)req->buf = reply->st; - req->result = 0; - get_generic_request(req); + if (!req) { + mutex_unlock(&monc->mutex); + return; } + + req->result = 0; + *req->u.st = reply->st; /* struct */ + __finish_generic_request(req); mutex_unlock(&monc->mutex); - if (req) { - complete_all(&req->completion); - put_generic_request(req); - } + + complete_generic_request(req); return; bad: @@ -614,39 +684,38 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) { struct ceph_mon_generic_request *req; struct ceph_mon_statfs *h; - int err; + int ret = -ENOMEM; - req = kzalloc(sizeof(*req), GFP_NOFS); + req = alloc_generic_request(monc, GFP_NOFS); if (!req) - return -ENOMEM; - - kref_init(&req->kref); - RB_CLEAR_NODE(&req->node); - req->buf = buf; - init_completion(&req->completion); + goto out; - err = -ENOMEM; req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, true); if (!req->request) goto out; - req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, - true); + + req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); if (!req->reply) goto out; + req->u.st = buf; + + mutex_lock(&monc->mutex); + register_generic_request(req); /* fill out request */ h = req->request->front.iov_base; h->monhdr.have_version = 0; h->monhdr.session_mon = cpu_to_le16(-1); h->monhdr.session_mon_tid = 0; h->fsid = monc->monmap->fsid; + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); - err = do_generic_request(monc, req); - + ret = wait_generic_request(req); out: put_generic_request(req); - return err; + return ret; } EXPORT_SYMBOL(ceph_monc_do_statfs); @@ -659,7 +728,7 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, void *end = p + msg->front_alloc_len; u64 handle; - dout("%s %p tid %llu\n", __func__, msg, tid); + dout("%s msg %p tid %llu\n", __func__, msg, tid); ceph_decode_need(&p, end, 2*sizeof(u64), bad); handle = ceph_decode_64(&p); @@ -668,77 +737,110 @@ static void handle_get_version_reply(struct ceph_mon_client *monc, mutex_lock(&monc->mutex); req = lookup_generic_request(&monc->generic_request_tree, handle); - if (req) { - *(u64 *)req->buf = ceph_decode_64(&p); - req->result = 0; - get_generic_request(req); + if (!req) { + mutex_unlock(&monc->mutex); + return; } + + req->result = 0; + req->u.newest = ceph_decode_64(&p); + __finish_generic_request(req); mutex_unlock(&monc->mutex); - if (req) { - complete_all(&req->completion); - put_generic_request(req); - } + complete_generic_request(req); return; + bad: pr_err("corrupt mon_get_version reply, tid %llu\n", tid); ceph_msg_dump(msg); } -/* - * Send MMonGetVersion and wait for the reply. - * - * @what: one of "mdsmap", "osdmap" or "monmap" - */ -int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, - u64 *newest) +static struct ceph_mon_generic_request * +__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data) { struct ceph_mon_generic_request *req; - void *p, *end; - u64 tid; - int err; - req = kzalloc(sizeof(*req), GFP_NOFS); + req = alloc_generic_request(monc, GFP_NOIO); if (!req) - return -ENOMEM; - - kref_init(&req->kref); - RB_CLEAR_NODE(&req->node); - req->buf = newest; - init_completion(&req->completion); + goto err_put_req; req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, sizeof(u64) + sizeof(u32) + strlen(what), - GFP_NOFS, true); - if (!req->request) { - err = -ENOMEM; - goto out; - } + GFP_NOIO, true); + if (!req->request) + goto err_put_req; - req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, - GFP_NOFS, true); - if (!req->reply) { - err = -ENOMEM; - goto out; - } + req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, + true); + if (!req->reply) + goto err_put_req; - p = req->request->front.iov_base; - end = p + req->request->front_alloc_len; + req->complete_cb = cb; + req->private_data = private_data; - /* fill out request */ mutex_lock(&monc->mutex); - tid = ++monc->last_tid; - ceph_encode_64(&p, tid); /* handle */ - ceph_encode_string(&p, end, what, strlen(what)); + register_generic_request(req); + { + void *p = req->request->front.iov_base; + void *const end = p + req->request->front_alloc_len; + + ceph_encode_64(&p, req->tid); /* handle */ + ceph_encode_string(&p, end, what, strlen(what)); + WARN_ON(p != end); + } + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); - err = __do_generic_request(monc, tid, req); + return req; - mutex_unlock(&monc->mutex); -out: +err_put_req: put_generic_request(req); - return err; + return ERR_PTR(-ENOMEM); +} + +/* + * Send MMonGetVersion and wait for the reply. + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, + u64 *newest) +{ + struct ceph_mon_generic_request *req; + int ret; + + req = __ceph_monc_get_version(monc, what, NULL, 0); + if (IS_ERR(req)) + return PTR_ERR(req); + + ret = wait_generic_request(req); + if (!ret) + *newest = req->u.newest; + + put_generic_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_monc_get_version); + +/* + * Send MMonGetVersion, + * + * @what: one of "mdsmap", "osdmap" or "monmap" + */ +int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, + ceph_monc_callback_t cb, u64 private_data) +{ + struct ceph_mon_generic_request *req; + + req = __ceph_monc_get_version(monc, what, cb, private_data); + if (IS_ERR(req)) + return PTR_ERR(req); + + put_generic_request(req); + return 0; } -EXPORT_SYMBOL(ceph_monc_do_get_version); +EXPORT_SYMBOL(ceph_monc_get_version_async); /* * Resend pending generic requests. @@ -923,6 +1025,8 @@ void ceph_monc_stop(struct ceph_mon_client *monc) ceph_auth_destroy(monc->auth); + WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); + ceph_msg_put(monc->m_auth); ceph_msg_put(monc->m_auth_reply); ceph_msg_put(monc->m_subscribe); -- cgit v1.2.3 From 7cca78c9dcd1afa243e46edc31896730df85d2b5 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 28 Apr 2016 16:07:28 +0200 Subject: libceph: replace ceph_monc_request_next_osdmap() ... with a wrapper around maybe_request_map() - no need for two osdmap-specific functions. Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 2 +- include/linux/ceph/mon_client.h | 1 - include/linux/ceph/osd_client.h | 1 + net/ceph/mon_client.c | 14 -------------- net/ceph/osd_client.c | 7 +++++++ 5 files changed, 9 insertions(+), 16 deletions(-) (limited to 'drivers') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 8eae6f56194d..81666a56415e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4902,7 +4902,7 @@ again: return ret; if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { - ceph_monc_request_next_osdmap(&rbdc->client->monc); + ceph_osdc_maybe_request_map(&rbdc->client->osdc); (void) ceph_monc_wait_osdmap(&rbdc->client->monc, newest_epoch, opts->mount_timeout); diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h index 19800d9b45f3..1d730993c3f8 100644 --- a/include/linux/ceph/mon_client.h +++ b/include/linux/ceph/mon_client.h @@ -128,7 +128,6 @@ bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch); void ceph_monc_renew_subs(struct ceph_mon_client *monc); -extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc); extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, unsigned long timeout); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 3e7bf72e4078..19b14862d3e0 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -381,6 +381,7 @@ extern int ceph_osdc_wait_request(struct ceph_osd_client *osdc, extern void ceph_osdc_sync(struct ceph_osd_client *osdc); extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc); +void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct ceph_vino vino, diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 4e49b2296920..72a910bf7819 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -384,20 +384,6 @@ void ceph_monc_renew_subs(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_renew_subs); -/* - * Register interest in the next osdmap - */ -void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) -{ - dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have); - mutex_lock(&monc->mutex); - if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, - monc->subs[CEPH_SUB_OSDMAP].have + 1, false)) - __send_subscribe(monc); - mutex_unlock(&monc->mutex); -} -EXPORT_SYMBOL(ceph_monc_request_next_osdmap); - /* * Wait for an osdmap with a given epoch. * diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ece2d10a1208..55cafd3a2ff0 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -3869,6 +3869,13 @@ void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) } EXPORT_SYMBOL(ceph_osdc_flush_notifies); +void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) +{ + down_read(&osdc->lock); + maybe_request_map(osdc); + up_read(&osdc->lock); +} +EXPORT_SYMBOL(ceph_osdc_maybe_request_map); /* * init, shutdown -- cgit v1.2.3