diff options
-rw-r--r-- | Documentation/ABI/testing/sysfs-bus-rbd | 29 | ||||
-rw-r--r-- | Documentation/filesystems/ceph.txt | 4 | ||||
-rw-r--r-- | drivers/block/rbd.c | 1432 | ||||
-rw-r--r-- | drivers/block/rbd_types.h | 11 | ||||
-rw-r--r-- | fs/ceph/addr.c | 24 | ||||
-rw-r--r-- | fs/ceph/file.c | 4 | ||||
-rw-r--r-- | fs/ceph/locks.c | 4 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 30 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 1 | ||||
-rw-r--r-- | fs/ceph/strings.c | 2 | ||||
-rw-r--r-- | fs/ceph/super.c | 49 | ||||
-rw-r--r-- | include/linux/ceph/auth.h | 2 | ||||
-rw-r--r-- | include/linux/ceph/ceph_fs.h | 12 | ||||
-rw-r--r-- | include/linux/ceph/cls_lock_client.h | 49 | ||||
-rw-r--r-- | include/linux/ceph/libceph.h | 3 | ||||
-rw-r--r-- | include/linux/ceph/mon_client.h | 3 | ||||
-rw-r--r-- | include/linux/ceph/osd_client.h | 23 | ||||
-rw-r--r-- | net/ceph/Makefile | 1 | ||||
-rw-r--r-- | net/ceph/auth.c | 13 | ||||
-rw-r--r-- | net/ceph/auth_none.c | 2 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 13 | ||||
-rw-r--r-- | net/ceph/ceph_strings.c | 1 | ||||
-rw-r--r-- | net/ceph/cls_lock_client.c | 325 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 17 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 82 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 169 |
26 files changed, 1966 insertions, 339 deletions
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd index 2ddd680929d8..f208ac58d613 100644 --- a/Documentation/ABI/testing/sysfs-bus-rbd +++ b/Documentation/ABI/testing/sysfs-bus-rbd @@ -6,7 +6,7 @@ Description: Being used for adding and removing rbd block devices. -Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name] +Usage: <mon ip addr> <options> <pool name> <rbd image name> [<snap name>] $ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add @@ -14,9 +14,13 @@ The snapshot name can be "-" or omitted to map the image read/write. A <dev-id> will be assigned for any registered block device. If snapshot is used, it will be mapped read-only. -Removal of a device: +Usage: <dev-id> [force] - $ echo <dev-id> > /sys/bus/rbd/remove + $ echo 2 > /sys/bus/rbd/remove + +Optional "force" argument which when passed will wait for running requests and +then unmap the image. Requests sent to the driver after initiating the removal +will be failed. (August 2016, since 4.9.) What: /sys/bus/rbd/add_single_major Date: December 2013 @@ -43,10 +47,25 @@ Description: Available only if rbd module is inserted with single_major Entries under /sys/bus/rbd/devices/<dev-id>/ -------------------------------------------- +client_addr + + The ceph unique client entity_addr_t (address + nonce). + The format is <address>:<port>/<nonce>: '1.2.3.4:1234/5678' or + '[1:2:3:4:5:6:7:8]:1234/5678'. (August 2016, since 4.9.) + client_id The ceph unique client id that was assigned for this specific session. +cluster_fsid + + The ceph cluster UUID. (August 2016, since 4.9.) + +config_info + + The string written into /sys/bus/rbd/add{,_single_major}. (August + 2016, since 4.9.) + features A hexadecimal encoding of the feature bits for this image. @@ -92,6 +111,10 @@ current_snap The current snapshot for which the device is mapped. +snap_id + + The current snapshot's id. (August 2016, since 4.9.) + parent Information identifying the chain of parent images in a layered rbd diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt index d6030aa33376..f5306ee40ea9 100644 --- a/Documentation/filesystems/ceph.txt +++ b/Documentation/filesystems/ceph.txt @@ -98,6 +98,10 @@ Mount Options size. rsize=X + Specify the maximum read size in bytes. By default there is no + maximum. + + rasize=X Specify the maximum readahead. mount_timeout=X diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c1f84df7838b..abb71628ab61 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -31,6 +31,7 @@ #include <linux/ceph/libceph.h> #include <linux/ceph/osd_client.h> #include <linux/ceph/mon_client.h> +#include <linux/ceph/cls_lock_client.h> #include <linux/ceph/decode.h> #include <linux/parser.h> #include <linux/bsearch.h> @@ -114,12 +115,17 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_OBJ_PREFIX_LEN_MAX 64 +#define RBD_NOTIFY_TIMEOUT 5 /* seconds */ +#define RBD_RETRY_DELAY msecs_to_jiffies(1000) + /* Feature bits */ #define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_STRIPINGV2 (1<<1) -#define RBD_FEATURES_ALL \ - (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) +#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) +#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ + RBD_FEATURE_STRIPINGV2 | \ + RBD_FEATURE_EXCLUSIVE_LOCK) /* Features supported by this (client software) implementation. */ @@ -128,11 +134,8 @@ static int atomic_dec_return_safe(atomic_t *v) /* * An RBD device name will be "rbd#", where the "rbd" comes from * RBD_DRV_NAME above, and # is a unique integer identifier. - * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big - * enough to hold all possible device names. */ #define DEV_NAME_LEN 32 -#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) /* * block device image metadata (in-memory version) @@ -322,6 +325,24 @@ struct rbd_img_request { #define for_each_obj_request_safe(ireq, oreq, n) \ list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) +enum rbd_watch_state { + RBD_WATCH_STATE_UNREGISTERED, + RBD_WATCH_STATE_REGISTERED, + RBD_WATCH_STATE_ERROR, +}; + +enum rbd_lock_state { + RBD_LOCK_STATE_UNLOCKED, + RBD_LOCK_STATE_LOCKED, + RBD_LOCK_STATE_RELEASING, +}; + +/* WatchNotify::ClientId */ +struct rbd_client_id { + u64 gid; + u64 handle; +}; + struct rbd_mapping { u64 size; u64 features; @@ -349,13 +370,29 @@ struct rbd_device { unsigned long flags; /* possibly lock protected */ struct rbd_spec *spec; struct rbd_options *opts; + char *config_info; /* add{,_single_major} string */ struct ceph_object_id header_oid; struct ceph_object_locator header_oloc; - struct ceph_file_layout layout; + struct ceph_file_layout layout; /* used for all rbd requests */ + struct mutex watch_mutex; + enum rbd_watch_state watch_state; struct ceph_osd_linger_request *watch_handle; + u64 watch_cookie; + struct delayed_work watch_dwork; + + struct rw_semaphore lock_rwsem; + enum rbd_lock_state lock_state; + struct rbd_client_id owner_cid; + struct work_struct acquired_lock_work; + struct work_struct released_lock_work; + struct delayed_work lock_dwork; + struct work_struct unlock_work; + wait_queue_head_t lock_waitq; + + struct workqueue_struct *task_wq; struct rbd_spec *parent_spec; u64 parent_overlap; @@ -439,6 +476,29 @@ static int minor_to_rbd_dev_id(int minor) return minor >> RBD_SINGLE_MAJOR_PART_SHIFT; } +static bool rbd_is_lock_supported(struct rbd_device *rbd_dev) +{ + return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) && + rbd_dev->spec->snap_id == CEPH_NOSNAP && + !rbd_dev->mapping.read_only; +} + +static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED || + rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING; +} + +static bool rbd_is_lock_owner(struct rbd_device *rbd_dev) +{ + bool is_lock_owner; + + down_read(&rbd_dev->lock_rwsem); + is_lock_owner = __rbd_is_lock_owner(rbd_dev); + up_read(&rbd_dev->lock_rwsem); + return is_lock_owner; +} + static BUS_ATTR(add, S_IWUSR, NULL, rbd_add); static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove); static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major); @@ -735,6 +795,7 @@ enum { /* string args above */ Opt_read_only, Opt_read_write, + Opt_lock_on_read, Opt_err }; @@ -746,16 +807,19 @@ static match_table_t rbd_opts_tokens = { {Opt_read_only, "ro"}, /* Alternate spelling */ {Opt_read_write, "read_write"}, {Opt_read_write, "rw"}, /* Alternate spelling */ + {Opt_lock_on_read, "lock_on_read"}, {Opt_err, NULL} }; struct rbd_options { int queue_depth; bool read_only; + bool lock_on_read; }; #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ #define RBD_READ_ONLY_DEFAULT false +#define RBD_LOCK_ON_READ_DEFAULT false static int parse_rbd_opts_token(char *c, void *private) { @@ -791,6 +855,9 @@ static int parse_rbd_opts_token(char *c, void *private) case Opt_read_write: rbd_opts->read_only = false; break; + case Opt_lock_on_read: + rbd_opts->lock_on_read = true; + break; default: /* libceph prints "bad option" msg */ return -EINVAL; @@ -919,7 +986,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, char *snap_names = NULL; u64 *snap_sizes = NULL; u32 snap_count; - size_t size; int ret = -ENOMEM; u32 i; @@ -957,9 +1023,9 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, goto out_err; /* ...as well as the array of their sizes. */ - - size = snap_count * sizeof (*header->snap_sizes); - snap_sizes = kmalloc(size, GFP_KERNEL); + snap_sizes = kmalloc_array(snap_count, + sizeof(*header->snap_sizes), + GFP_KERNEL); if (!snap_sizes) goto out_err; @@ -1551,11 +1617,18 @@ static bool obj_request_type_valid(enum obj_request_type type) } } -static int rbd_obj_request_submit(struct ceph_osd_client *osdc, - struct rbd_obj_request *obj_request) +static void rbd_img_obj_callback(struct rbd_obj_request *obj_request); + +static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { - dout("%s %p\n", __func__, obj_request); - return ceph_osdc_start_request(osdc, obj_request->osd_req, false); + struct ceph_osd_request *osd_req = obj_request->osd_req; + + dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); + if (obj_request_img_data_test(obj_request)) { + WARN_ON(obj_request->callback != rbd_img_obj_callback); + rbd_img_request_get(obj_request->img_request); + } + ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } static void rbd_obj_request_end(struct rbd_obj_request *obj_request) @@ -1745,6 +1818,22 @@ static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) complete_all(&obj_request->completion); } +static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err) +{ + obj_request->result = err; + obj_request->xferred = 0; + /* + * kludge - mirror rbd_obj_request_submit() to match a put in + * rbd_img_obj_callback() + */ + if (obj_request_img_data_test(obj_request)) { + WARN_ON(obj_request->callback != rbd_img_obj_callback); + rbd_img_request_get(obj_request->img_request); + } + obj_request_done_set(obj_request); + rbd_obj_request_complete(obj_request); +} + static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request = NULL; @@ -1877,11 +1966,10 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = obj_request->img_request; struct ceph_osd_request *osd_req = obj_request->osd_req; - if (img_request) - osd_req->r_snapid = img_request->snap_id; + rbd_assert(obj_request_img_data_test(obj_request)); + osd_req->r_snapid = obj_request->img_request->snap_id; } static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) @@ -2074,7 +2162,9 @@ static void rbd_obj_request_destroy(struct kref *kref) bio_chain_put(obj_request->bio_list); break; case OBJ_REQUEST_PAGES: - if (obj_request->pages) + /* img_data requests don't own their page array */ + if (obj_request->pages && + !obj_request_img_data_test(obj_request)) ceph_release_page_vector(obj_request->pages, obj_request->page_count); break; @@ -2295,13 +2385,6 @@ static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) xferred = obj_request->length; } - /* Image object requests don't own their page array */ - - if (obj_request->type == OBJ_REQUEST_PAGES) { - obj_request->pages = NULL; - obj_request->page_count = 0; - } - if (img_request_child_test(img_request)) { rbd_assert(img_request->obj_request != NULL); more = obj_request->which < img_request->obj_request_count - 1; @@ -2520,8 +2603,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0); - rbd_img_request_get(img_request); - img_offset += length; resid -= length; } @@ -2579,7 +2660,6 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) { struct rbd_obj_request *orig_request; struct ceph_osd_request *osd_req; - struct ceph_osd_client *osdc; struct rbd_device *rbd_dev; struct page **pages; enum obj_operation_type op_type; @@ -2603,7 +2683,7 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) rbd_assert(obj_request_type_valid(orig_request->type)); img_result = img_request->result; parent_length = img_request->length; - rbd_assert(parent_length == img_request->xferred); + rbd_assert(img_result || parent_length == img_request->xferred); rbd_img_request_put(img_request); rbd_assert(orig_request->img_request); @@ -2616,13 +2696,9 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) * and re-submit the original write request. */ if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - ceph_release_page_vector(pages, page_count); - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, orig_request); - if (!img_result) - return; + rbd_obj_request_submit(orig_request); + return; } if (img_result) @@ -2656,17 +2732,12 @@ rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) /* All set, send it off. */ - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, orig_request); - if (!img_result) - return; -out_err: - /* Record the error code and complete the request */ + rbd_obj_request_submit(orig_request); + return; - orig_request->result = img_result; - orig_request->xferred = 0; - obj_request_done_set(orig_request); - rbd_obj_request_complete(orig_request); +out_err: + ceph_release_page_vector(pages, page_count); + rbd_obj_request_error(orig_request, img_result); } /* @@ -2680,26 +2751,19 @@ out_err: * When the read completes, this page array will be transferred to * the original object request for the copyup operation. * - * If an error occurs, record it as the result of the original - * object request and mark it done so it gets completed. + * If an error occurs, it is recorded as the result of the original + * object request in rbd_img_obj_exists_callback(). */ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request = NULL; + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_img_request *parent_request = NULL; - struct rbd_device *rbd_dev; u64 img_offset; u64 length; struct page **pages = NULL; u32 page_count; int result; - rbd_assert(obj_request_img_data_test(obj_request)); - rbd_assert(obj_request_type_valid(obj_request->type)); - - img_request = obj_request->img_request; - rbd_assert(img_request != NULL); - rbd_dev = img_request->rbd_dev; rbd_assert(rbd_dev->parent != NULL); /* @@ -2740,10 +2804,11 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); if (result) goto out_err; + parent_request->copyup_pages = pages; parent_request->copyup_page_count = page_count; - parent_request->callback = rbd_img_obj_parent_read_full_callback; + result = rbd_img_request_submit(parent_request); if (!result) return 0; @@ -2757,10 +2822,6 @@ out_err: ceph_release_page_vector(pages, page_count); if (parent_request) rbd_img_request_put(parent_request); - obj_request->result = result; - obj_request->xferred = 0; - obj_request_done_set(obj_request); - return result; } @@ -2793,17 +2854,13 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) /* * If the overlap has become 0 (most likely because the - * image has been flattened) we need to free the pages - * and re-submit the original write request. + * image has been flattened) we need to re-submit the + * original request. */ rbd_dev = orig_request->img_request->rbd_dev; if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - - osdc = &rbd_dev->rbd_client->client->osdc; - result = rbd_obj_request_submit(osdc, orig_request); - if (!result) - return; + rbd_obj_request_submit(orig_request); + return; } /* @@ -2816,31 +2873,45 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) obj_request_existence_set(orig_request, true); } else if (result == -ENOENT) { obj_request_existence_set(orig_request, false); - } else if (result) { - orig_request->result = result; - goto out; + } else { + goto fail_orig_request; } /* * Resubmit the original request now that we have recorded * whether the target object exists. */ - orig_request->result = rbd_img_obj_request_submit(orig_request); -out: - if (orig_request->result) - rbd_obj_request_complete(orig_request); + result = rbd_img_obj_request_submit(orig_request); + if (result) + goto fail_orig_request; + + return; + +fail_orig_request: + rbd_obj_request_error(orig_request, result); } static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) { + struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev; struct rbd_obj_request *stat_request; - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - struct page **pages = NULL; + struct page **pages; u32 page_count; size_t size; int ret; + stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + OBJ_REQUEST_PAGES); + if (!stat_request) + return -ENOMEM; + + stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, + stat_request); + if (!stat_request->osd_req) { + ret = -ENOMEM; + goto fail_stat_request; + } + /* * The response data for a STAT call consists of: * le64 length; @@ -2852,52 +2923,33 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); page_count = (u32)calc_pages_for(0, size); pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto fail_stat_request; + } - ret = -ENOMEM; - stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, - OBJ_REQUEST_PAGES); - if (!stat_request) - goto out; + osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); + osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, + false, false); rbd_obj_request_get(obj_request); stat_request->obj_request = obj_request; stat_request->pages = pages; stat_request->page_count = page_count; - - rbd_assert(obj_request->img_request); - rbd_dev = obj_request->img_request->rbd_dev; - stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - stat_request); - if (!stat_request->osd_req) - goto out; stat_request->callback = rbd_img_obj_exists_callback; - osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0); - osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, - false, false); - rbd_osd_req_format_read(stat_request); - - osdc = &rbd_dev->rbd_client->client->osdc; - ret = rbd_obj_request_submit(osdc, stat_request); -out: - if (ret) - rbd_obj_request_put(obj_request); + rbd_obj_request_submit(stat_request); + return 0; +fail_stat_request: + rbd_obj_request_put(stat_request); return ret; } static bool img_obj_request_simple(struct rbd_obj_request *obj_request) { - struct rbd_img_request *img_request; - struct rbd_device *rbd_dev; - - rbd_assert(obj_request_img_data_test(obj_request)); - - img_request = obj_request->img_request; - rbd_assert(img_request); - rbd_dev = img_request->rbd_dev; + struct rbd_img_request *img_request = obj_request->img_request; + struct rbd_device *rbd_dev = img_request->rbd_dev; /* Reads */ if (!img_request_write_test(img_request) && @@ -2936,14 +2988,13 @@ static bool img_obj_request_simple(struct rbd_obj_request *obj_request) static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) { - if (img_obj_request_simple(obj_request)) { - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - - rbd_dev = obj_request->img_request->rbd_dev; - osdc = &rbd_dev->rbd_client->client->osdc; + rbd_assert(obj_request_img_data_test(obj_request)); + rbd_assert(obj_request_type_valid(obj_request->type)); + rbd_assert(obj_request->img_request); - return rbd_obj_request_submit(osdc, obj_request); + if (img_obj_request_simple(obj_request)) { + rbd_obj_request_submit(obj_request); + return 0; } /* @@ -3006,12 +3057,8 @@ static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) rbd_assert(obj_request->img_request); rbd_dev = obj_request->img_request->rbd_dev; if (!rbd_dev->parent_overlap) { - struct ceph_osd_client *osdc; - - osdc = &rbd_dev->rbd_client->client->osdc; - img_result = rbd_obj_request_submit(osdc, obj_request); - if (!img_result) - return; + rbd_obj_request_submit(obj_request); + return; } obj_request->result = img_result; @@ -3084,65 +3131,724 @@ out_err: obj_request_done_set(obj_request); } -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev); -static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev); +static const struct rbd_client_id rbd_empty_cid; -static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, - u64 notifier_id, void *data, size_t data_len) +static bool rbd_cid_equal(const struct rbd_client_id *lhs, + const struct rbd_client_id *rhs) +{ + return lhs->gid == rhs->gid && lhs->handle == rhs->handle; +} + +static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev) +{ + struct rbd_client_id cid; + + mutex_lock(&rbd_dev->watch_mutex); + cid.gid = ceph_client_gid(rbd_dev->rbd_client->client); + cid.handle = rbd_dev->watch_cookie; + mutex_unlock(&rbd_dev->watch_mutex); + return cid; +} + +/* + * lock_rwsem must be held for write + */ +static void rbd_set_owner_cid(struct rbd_device *rbd_dev, + const struct rbd_client_id *cid) +{ + dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle, + cid->gid, cid->handle); + rbd_dev->owner_cid = *cid; /* struct */ +} + +static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf) +{ + mutex_lock(&rbd_dev->watch_mutex); + sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie); + mutex_unlock(&rbd_dev->watch_mutex); +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_lock(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = arg; struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + char cookie[32]; + int ret; + + WARN_ON(__rbd_is_lock_owner(rbd_dev)); + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie, + RBD_LOCK_TAG, "", 0); + if (ret) + return ret; + + rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED; + rbd_set_owner_cid(rbd_dev, &cid); + queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work); + return 0; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_unlock(struct rbd_device *rbd_dev) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + char cookie[32]; + int ret; + + WARN_ON(!__rbd_is_lock_owner(rbd_dev)); + + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + + format_lock_cookie(rbd_dev, cookie); + ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, + RBD_LOCK_NAME, cookie); + if (ret && ret != -ENOENT) { + rbd_warn(rbd_dev, "cls_unlock failed: %d", ret); + return ret; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work); + return 0; +} + +static int __rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op, + struct page ***preply_pages, + size_t *preply_len) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct rbd_client_id cid = rbd_get_cid(rbd_dev); + int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + void *p = buf; + + dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op); + + /* encode *LockPayload NotifyMessage (op + ClientId) */ + ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, notify_op); + ceph_encode_64(&p, cid.gid); + ceph_encode_64(&p, cid.handle); + + return ceph_osdc_notify(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, buf, buf_size, + RBD_NOTIFY_TIMEOUT, preply_pages, preply_len); +} + +static void rbd_notify_op_lock(struct rbd_device *rbd_dev, + enum rbd_notify_op notify_op) +{ + struct page **reply_pages; + size_t reply_len; + + __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len); + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); +} + +static void rbd_notify_acquired_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + acquired_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK); +} + +static void rbd_notify_released_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + released_lock_work); + + rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK); +} + +static int rbd_request_lock(struct rbd_device *rbd_dev) +{ + struct page **reply_pages; + size_t reply_len; + bool lock_owner_responded = false; int ret; - dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev, - cookie, notify_id); + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK, + &reply_pages, &reply_len); + if (ret && ret != -ETIMEDOUT) { + rbd_warn(rbd_dev, "failed to request lock: %d", ret); + goto out; + } + + if (reply_len > 0 && reply_len <= PAGE_SIZE) { + void *p = page_address(reply_pages[0]); + void *const end = p + reply_len; + u32 n; + + ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */ + while (n--) { + u8 struct_v; + u32 len; + ceph_decode_need(&p, end, 8 + 8, e_inval); + p += 8 + 8; /* skip gid and cookie */ + + ceph_decode_32_safe(&p, end, len, e_inval); + if (!len) + continue; + + if (lock_owner_responded) { + rbd_warn(rbd_dev, + "duplicate lock owners detected"); + ret = -EIO; + goto out; + } + + lock_owner_responded = true; + ret = ceph_start_decoding(&p, end, 1, "ResponseMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, + "failed to decode ResponseMessage: %d", + ret); + goto e_inval; + } + + ret = ceph_decode_32(&p); + } + } + + if (!lock_owner_responded) { + rbd_warn(rbd_dev, "no lock owners detected"); + ret = -ETIMEDOUT; + } + +out: + ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)); + return ret; + +e_inval: + ret = -EINVAL; + goto out; +} + +static void wake_requests(struct rbd_device *rbd_dev, bool wake_all) +{ + dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all); + + cancel_delayed_work(&rbd_dev->lock_dwork); + if (wake_all) + wake_up_all(&rbd_dev->lock_waitq); + else + wake_up(&rbd_dev->lock_waitq); +} + +static int get_lock_owner_info(struct rbd_device *rbd_dev, + struct ceph_locker **lockers, u32 *num_lockers) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + u8 lock_type; + char *lock_tag; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + &lock_type, &lock_tag, lockers, num_lockers); + if (ret) + return ret; + + if (*num_lockers == 0) { + dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev); + goto out; + } + + if (strcmp(lock_tag, RBD_LOCK_TAG)) { + rbd_warn(rbd_dev, "locked by external mechanism, tag %s", + lock_tag); + ret = -EBUSY; + goto out; + } + + if (lock_type == CEPH_CLS_LOCK_SHARED) { + rbd_warn(rbd_dev, "shared lock type detected"); + ret = -EBUSY; + goto out; + } + + if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX, + strlen(RBD_LOCK_COOKIE_PREFIX))) { + rbd_warn(rbd_dev, "locked by external mechanism, cookie %s", + (*lockers)[0].id.cookie); + ret = -EBUSY; + goto out; + } + +out: + kfree(lock_tag); + return ret; +} + +static int find_watcher(struct rbd_device *rbd_dev, + const struct ceph_locker *locker) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_watch_item *watchers; + u32 num_watchers; + u64 cookie; + int i; + int ret; + + ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, &watchers, + &num_watchers); + if (ret) + return ret; + + sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie); + for (i = 0; i < num_watchers; i++) { + if (!memcmp(&watchers[i].addr, &locker->info.addr, + sizeof(locker->info.addr)) && + watchers[i].cookie == cookie) { + struct rbd_client_id cid = { + .gid = le64_to_cpu(watchers[i].name.num), + .handle = cookie, + }; + + dout("%s rbd_dev %p found cid %llu-%llu\n", __func__, + rbd_dev, cid.gid, cid.handle); + rbd_set_owner_cid(rbd_dev, &cid); + ret = 1; + goto out; + } + } + + dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev); + ret = 0; +out: + kfree(watchers); + return ret; +} + +/* + * lock_rwsem must be held for write + */ +static int rbd_try_lock(struct rbd_device *rbd_dev) +{ + struct ceph_client *client = rbd_dev->rbd_client->client; + struct ceph_locker *lockers; + u32 num_lockers; + int ret; + + for (;;) { + ret = rbd_lock(rbd_dev); + if (ret != -EBUSY) + return ret; + + /* determine if the current lock holder is still alive */ + ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers); + if (ret) + return ret; + + if (num_lockers == 0) + goto again; + + ret = find_watcher(rbd_dev, lockers); + if (ret) { + if (ret > 0) + ret = 0; /* have to request lock */ + goto out; + } + + rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock", + ENTITY_NAME(lockers[0].id.name)); + + ret = ceph_monc_blacklist_add(&client->monc, + &lockers[0].info.addr); + if (ret) { + rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d", + ENTITY_NAME(lockers[0].id.name), ret); + goto out; + } + + ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid, + &rbd_dev->header_oloc, RBD_LOCK_NAME, + lockers[0].id.cookie, + &lockers[0].id.name); + if (ret && ret != -ENOENT) + goto out; + +again: + ceph_free_lockers(lockers, num_lockers); + } + +out: + ceph_free_lockers(lockers, num_lockers); + return ret; +} + +/* + * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED + */ +static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev, + int *pret) +{ + enum rbd_lock_state lock_state; + + down_read(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (__rbd_is_lock_owner(rbd_dev)) { + lock_state = rbd_dev->lock_state; + up_read(&rbd_dev->lock_rwsem); + return lock_state; + } + + up_read(&rbd_dev->lock_rwsem); + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (!__rbd_is_lock_owner(rbd_dev)) { + *pret = rbd_try_lock(rbd_dev); + if (*pret) + rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret); + } + + lock_state = rbd_dev->lock_state; + up_write(&rbd_dev->lock_rwsem); + return lock_state; +} + +static void rbd_acquire_lock(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, lock_dwork); + enum rbd_lock_state lock_state; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); +again: + lock_state = rbd_try_acquire_lock(rbd_dev, &ret); + if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) { + if (lock_state == RBD_LOCK_STATE_LOCKED) + wake_requests(rbd_dev, true); + dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__, + rbd_dev, lock_state, ret); + return; + } + + ret = rbd_request_lock(rbd_dev); + if (ret == -ETIMEDOUT) { + goto again; /* treat this as a dead client */ + } else if (ret < 0) { + rbd_warn(rbd_dev, "error requesting lock: %d", ret); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + RBD_RETRY_DELAY); + } else { + /* + * lock owner acked, but resend if we don't see them + * release the lock + */ + dout("%s rbd_dev %p requeueing lock_dwork\n", __func__, + rbd_dev); + mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, + msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC)); + } +} + +/* + * lock_rwsem must be held for write + */ +static bool rbd_release_lock(struct rbd_device *rbd_dev) +{ + dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + return false; + + rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING; + downgrade_write(&rbd_dev->lock_rwsem); /* - * Until adequate refresh error handling is in place, there is - * not much we can do here, except warn. + * Ensure that all in-flight IO is flushed. * - * See http://tracker.ceph.com/issues/5040 + * FIXME: ceph_osdc_sync() flushes the entire OSD client, which + * may be shared with other devices. */ - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "refresh failed: %d", ret); + ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc); + up_read(&rbd_dev->lock_rwsem); + + down_write(&rbd_dev->lock_rwsem); + dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev, + rbd_dev->lock_state); + if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING) + return false; + + if (!rbd_unlock(rbd_dev)) + /* + * Give others a chance to grab the lock - we would re-acquire + * almost immediately if we got new IO during ceph_osdc_sync() + * otherwise. We need to ack our own notifications, so this + * lock_dwork will be requeued from rbd_wait_state_locked() + * after wake_requests() in rbd_handle_released_lock(). + */ + cancel_delayed_work(&rbd_dev->lock_dwork); + + return true; +} + +static void rbd_release_lock_work(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(work, struct rbd_device, + unlock_work); + + down_write(&rbd_dev->lock_rwsem); + rbd_release_lock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + /* + * we already know that the remote client is + * the owner + */ + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id cid = { 0 }; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (!rbd_cid_equal(&cid, &rbd_empty_cid)) { + down_write(&rbd_dev->lock_rwsem); + if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) { + dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n", + __func__, rbd_dev, cid.gid, cid.handle, + rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle); + up_write(&rbd_dev->lock_rwsem); + return; + } + + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + downgrade_write(&rbd_dev->lock_rwsem); + } else { + down_read(&rbd_dev->lock_rwsem); + } + + if (!__rbd_is_lock_owner(rbd_dev)) + wake_requests(rbd_dev, false); + up_read(&rbd_dev->lock_rwsem); +} + +static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v, + void **p) +{ + struct rbd_client_id my_cid = rbd_get_cid(rbd_dev); + struct rbd_client_id cid = { 0 }; + bool need_to_send; + + if (struct_v >= 2) { + cid.gid = ceph_decode_64(p); + cid.handle = ceph_decode_64(p); + } + + dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid, + cid.handle); + if (rbd_cid_equal(&cid, &my_cid)) + return false; + + down_read(&rbd_dev->lock_rwsem); + need_to_send = __rbd_is_lock_owner(rbd_dev); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) { + if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) { + dout("%s rbd_dev %p queueing unlock_work\n", __func__, + rbd_dev); + queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work); + } + } + up_read(&rbd_dev->lock_rwsem); + return need_to_send; +} + +static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 *result) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN; + char buf[buf_size]; + int ret; + + if (result) { + void *p = buf; + + /* encode ResponseMessage */ + ceph_start_encoding(&p, 1, 1, + buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_32(&p, *result); + } else { + buf_size = 0; + } ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, notify_id, cookie, - NULL, 0); + buf, buf_size); if (ret) - rbd_warn(rbd_dev, "notify_ack ret %d", ret); + rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret); } -static void rbd_watch_errcb(void *arg, u64 cookie, int err) +static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id, + u64 cookie) +{ + dout("%s rbd_dev %p\n", __func__, rbd_dev); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL); +} + +static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev, + u64 notify_id, u64 cookie, s32 result) +{ + dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result); + __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result); +} + +static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie, + u64 notifier_id, void *data, size_t data_len) { struct rbd_device *rbd_dev = arg; + void *p = data; + void *const end = p + data_len; + u8 struct_v; + u32 len; + u32 notify_op; int ret; - rbd_warn(rbd_dev, "encountered watch error: %d", err); + dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n", + __func__, rbd_dev, cookie, notify_id, data_len); + if (data_len) { + ret = ceph_start_decoding(&p, end, 1, "NotifyMessage", + &struct_v, &len); + if (ret) { + rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d", + ret); + return; + } - __rbd_dev_header_unwatch_sync(rbd_dev); + notify_op = ceph_decode_32(&p); + } else { + /* legacy notification for header updates */ + notify_op = RBD_NOTIFY_OP_HEADER_UPDATE; + len = 0; + } - ret = rbd_dev_header_watch_sync(rbd_dev); - if (ret) { - rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); - return; + dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op); + switch (notify_op) { + case RBD_NOTIFY_OP_ACQUIRED_LOCK: + rbd_handle_acquired_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_RELEASED_LOCK: + rbd_handle_released_lock(rbd_dev, struct_v, &p); + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_REQUEST_LOCK: + if (rbd_handle_request_lock(rbd_dev, struct_v, &p)) + /* + * send ResponseMessage(0) back so the client + * can detect a missing owner + */ + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, 0); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + case RBD_NOTIFY_OP_HEADER_UPDATE: + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "refresh failed: %d", ret); + + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; + default: + if (rbd_is_lock_owner(rbd_dev)) + rbd_acknowledge_notify_result(rbd_dev, notify_id, + cookie, -EOPNOTSUPP); + else + rbd_acknowledge_notify(rbd_dev, notify_id, cookie); + break; } +} - ret = rbd_dev_refresh(rbd_dev); - if (ret) - rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); +static void __rbd_unregister_watch(struct rbd_device *rbd_dev); + +static void rbd_watch_errcb(void *arg, u64 cookie, int err) +{ + struct rbd_device *rbd_dev = arg; + + rbd_warn(rbd_dev, "encountered watch error: %d", err); + + down_write(&rbd_dev->lock_rwsem); + rbd_set_owner_cid(rbd_dev, &rbd_empty_cid); + up_write(&rbd_dev->lock_rwsem); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) { + __rbd_unregister_watch(rbd_dev); + rbd_dev->watch_state = RBD_WATCH_STATE_ERROR; + + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0); + } + mutex_unlock(&rbd_dev->watch_mutex); } /* - * Initiate a watch request, synchronously. + * watch_mutex must be locked */ -static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) +static int __rbd_register_watch(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_linger_request *handle; rbd_assert(!rbd_dev->watch_handle); + dout("%s rbd_dev %p\n", __func__, rbd_dev); handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc, rbd_watch_cb, @@ -3154,13 +3860,16 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev) return 0; } -static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +/* + * watch_mutex must be locked + */ +static void __rbd_unregister_watch(struct rbd_device *rbd_dev) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; int ret; - if (!rbd_dev->watch_handle) - return; + rbd_assert(rbd_dev->watch_handle); + dout("%s rbd_dev %p\n", __func__, rbd_dev); ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle); if (ret) @@ -3169,17 +3878,100 @@ static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) rbd_dev->watch_handle = NULL; } -/* - * Tear down a watch request, synchronously. - */ -static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev) +static int rbd_register_watch(struct rbd_device *rbd_dev) +{ + int ret; + + mutex_lock(&rbd_dev->watch_mutex); + rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED); + ret = __rbd_register_watch(rbd_dev); + if (ret) + goto out; + + rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; + rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; + +out: + mutex_unlock(&rbd_dev->watch_mutex); + return ret; +} + +static void cancel_tasks_sync(struct rbd_device *rbd_dev) { - __rbd_dev_header_unwatch_sync(rbd_dev); + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + cancel_delayed_work_sync(&rbd_dev->watch_dwork); + cancel_work_sync(&rbd_dev->acquired_lock_work); + cancel_work_sync(&rbd_dev->released_lock_work); + cancel_delayed_work_sync(&rbd_dev->lock_dwork); + cancel_work_sync(&rbd_dev->unlock_work); +} + +static void rbd_unregister_watch(struct rbd_device *rbd_dev) +{ + WARN_ON(waitqueue_active(&rbd_dev->lock_waitq)); + cancel_tasks_sync(rbd_dev); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) + __rbd_unregister_watch(rbd_dev); + rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; + mutex_unlock(&rbd_dev->watch_mutex); - dout("%s flushing notifies\n", __func__); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); } +static void rbd_reregister_watch(struct work_struct *work) +{ + struct rbd_device *rbd_dev = container_of(to_delayed_work(work), + struct rbd_device, watch_dwork); + bool was_lock_owner = false; + int ret; + + dout("%s rbd_dev %p\n", __func__, rbd_dev); + + down_write(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) + was_lock_owner = rbd_release_lock(rbd_dev); + + mutex_lock(&rbd_dev->watch_mutex); + if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) + goto fail_unlock; + + ret = __rbd_register_watch(rbd_dev); + if (ret) { + rbd_warn(rbd_dev, "failed to reregister watch: %d", ret); + if (ret != -EBLACKLISTED) + queue_delayed_work(rbd_dev->task_wq, + &rbd_dev->watch_dwork, + RBD_RETRY_DELAY); + goto fail_unlock; + } + + rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED; + rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id; + mutex_unlock(&rbd_dev->watch_mutex); + + ret = rbd_dev_refresh(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret); + + if (was_lock_owner) { + ret = rbd_try_lock(rbd_dev); + if (ret) + rbd_warn(rbd_dev, "reregisteration lock failed: %d", + ret); + } + + up_write(&rbd_dev->lock_rwsem); + wake_requests(rbd_dev, true); + return; + +fail_unlock: + mutex_unlock(&rbd_dev->watch_mutex); + up_write(&rbd_dev->lock_rwsem); +} + /* * Synchronous osd object method call. Returns the number of bytes * returned in the outbound buffer, or a negative error code. @@ -3193,7 +3985,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, void *inbound, size_t inbound_size) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct page **pages; u32 page_count; @@ -3242,11 +4033,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, obj_request->pages, inbound_size, 0, false, false); - rbd_osd_req_format_read(obj_request); - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); if (ret) goto out; @@ -3267,6 +4055,29 @@ out: return ret; } +/* + * lock_rwsem must be held for read + */ +static void rbd_wait_state_locked(struct rbd_device *rbd_dev) +{ + DEFINE_WAIT(wait); + + do { + /* + * Note the use of mod_delayed_work() in rbd_acquire_lock() + * and cancel_delayed_work() in wake_requests(). + */ + dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev); + queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0); + prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait, + TASK_UNINTERRUPTIBLE); + up_read(&rbd_dev->lock_rwsem); + schedule(); + down_read(&rbd_dev->lock_rwsem); + } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED); + finish_wait(&rbd_dev->lock_waitq, &wait); +} + static void rbd_queue_workfn(struct work_struct *work) { struct request *rq = blk_mq_rq_from_pdu(work); @@ -3277,6 +4088,7 @@ static void rbd_queue_workfn(struct work_struct *work) u64 length = blk_rq_bytes(rq); enum obj_operation_type op_type; u64 mapping_size; + bool must_be_locked; int result; if (rq->cmd_type != REQ_TYPE_FS) { @@ -3338,6 +4150,10 @@ static void rbd_queue_workfn(struct work_struct *work) if (op_type != OBJ_OP_READ) { snapc = rbd_dev->header.snapc; ceph_get_snap_context(snapc); + must_be_locked = rbd_is_lock_supported(rbd_dev); + } else { + must_be_locked = rbd_dev->opts->lock_on_read && + rbd_is_lock_supported(rbd_dev); } up_read(&rbd_dev->header_rwsem); @@ -3348,11 +4164,17 @@ static void rbd_queue_workfn(struct work_struct *work) goto err_rq; } + if (must_be_locked) { + down_read(&rbd_dev->lock_rwsem); + if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED) + rbd_wait_state_locked(rbd_dev); + } + img_request = rbd_img_request_create(rbd_dev, offset, length, op_type, snapc); if (!img_request) { result = -ENOMEM; - goto err_rq; + goto err_unlock; } img_request->rq = rq; snapc = NULL; /* img_request consumes a ref */ @@ -3370,10 +4192,15 @@ static void rbd_queue_workfn(struct work_struct *work) if (result) goto err_img_request; + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); return; err_img_request: rbd_img_request_put(img_request); +err_unlock: + if (must_be_locked) + up_read(&rbd_dev->lock_rwsem); err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", @@ -3415,7 +4242,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, u64 offset, u64 length, void *buf) { - struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; struct page **pages = NULL; u32 page_count; @@ -3448,11 +4274,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, obj_request->length, obj_request->offset & ~PAGE_MASK, false, false); - rbd_osd_req_format_read(obj_request); - ret = rbd_obj_request_submit(osdc, obj_request); - if (ret) - goto out; + rbd_obj_request_submit(obj_request); ret = rbd_obj_request_wait(obj_request); if (ret) goto out; @@ -3751,13 +4574,40 @@ static ssize_t rbd_minor_show(struct device *dev, return sprintf(buf, "%d\n", rbd_dev->minor); } +static ssize_t rbd_client_addr_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + struct ceph_entity_addr *client_addr = + ceph_client_addr(rbd_dev->rbd_client->client); + + return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr, + le32_to_cpu(client_addr->nonce)); +} + static ssize_t rbd_client_id_show(struct device *dev, struct device_attribute *attr, char *buf) { struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); return sprintf(buf, "client%lld\n", - ceph_client_id(rbd_dev->rbd_client->client)); + ceph_client_gid(rbd_dev->rbd_client->client)); +} + +static ssize_t rbd_cluster_fsid_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid); +} + +static ssize_t rbd_config_info_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%s\n", rbd_dev->config_info); } static ssize_t rbd_pool_show(struct device *dev, @@ -3809,6 +4659,14 @@ static ssize_t rbd_snap_show(struct device *dev, return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); } +static ssize_t rbd_snap_id_show(struct device *dev, + struct device_attribute *attr, char *buf) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + + return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id); +} + /* * For a v2 image, shows the chain of parent images, separated by empty * lines. For v1 images or if there is no parent, shows "(no parent @@ -3861,13 +4719,17 @@ static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL); +static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL); static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); +static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL); +static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL); static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); +static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); static struct attribute *rbd_attrs[] = { @@ -3875,12 +4737,16 @@ static struct attribute *rbd_attrs[] = { &dev_attr_features.attr, &dev_attr_major.attr, &dev_attr_minor.attr, + &dev_attr_client_addr.attr, &dev_attr_client_id.attr, + &dev_attr_cluster_fsid.attr, + &dev_attr_config_info.attr, &dev_attr_pool.attr, &dev_attr_pool_id.attr, &dev_attr_name.attr, &dev_attr_image_id.attr, &dev_attr_current_snap.attr, + &dev_attr_snap_id.attr, &dev_attr_parent.attr, &dev_attr_refresh.attr, NULL @@ -3943,18 +4809,32 @@ static void rbd_spec_free(struct kref *kref) kfree(spec); } -static void rbd_dev_release(struct device *dev) +static void rbd_dev_free(struct rbd_device *rbd_dev) { - struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); - bool need_put = !!rbd_dev->opts; + WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED); + WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED); ceph_oid_destroy(&rbd_dev->header_oid); ceph_oloc_destroy(&rbd_dev->header_oloc); + kfree(rbd_dev->config_info); rbd_put_client(rbd_dev->rbd_client); rbd_spec_put(rbd_dev->spec); kfree(rbd_dev->opts); kfree(rbd_dev); +} + +static void rbd_dev_release(struct device *dev) +{ + struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); + bool need_put = !!rbd_dev->opts; + + if (need_put) { + destroy_workqueue(rbd_dev->task_wq); + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); + } + + rbd_dev_free(rbd_dev); /* * This is racy, but way better than putting module outside of @@ -3965,25 +4845,34 @@ static void rbd_dev_release(struct device *dev) module_put(THIS_MODULE); } -static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, - struct rbd_spec *spec, - struct rbd_options *opts) +static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec) { struct rbd_device *rbd_dev; - rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); + rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL); if (!rbd_dev) return NULL; spin_lock_init(&rbd_dev->lock); - rbd_dev->flags = 0; - atomic_set(&rbd_dev->parent_ref, 0); INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); ceph_oid_init(&rbd_dev->header_oid); ceph_oloc_init(&rbd_dev->header_oloc); + mutex_init(&rbd_dev->watch_mutex); + rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; + INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch); + + init_rwsem(&rbd_dev->lock_rwsem); + rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED; + INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock); + INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock); + INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock); + INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work); + init_waitqueue_head(&rbd_dev->lock_waitq); + rbd_dev->dev.bus = &rbd_bus_type; rbd_dev->dev.type = &rbd_device_type; rbd_dev->dev.parent = &rbd_root_dev; @@ -3991,9 +4880,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; - rbd_dev->opts = opts; - - /* Initialize the layout used for all rbd requests */ rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; rbd_dev->layout.stripe_count = 1; @@ -4001,15 +4887,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, rbd_dev->layout.pool_id = spec->pool_id; RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); - /* - * If this is a mapping rbd_dev (as opposed to a parent one), - * pin our module. We have a ref from do_rbd_add(), so use - * __module_get(). - */ - if (rbd_dev->opts) - __module_get(THIS_MODULE); + return rbd_dev; +} +/* + * Create a mapping rbd_dev. + */ +static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, + struct rbd_spec *spec, + struct rbd_options *opts) +{ + struct rbd_device *rbd_dev; + + rbd_dev = __rbd_dev_create(rbdc, spec); + if (!rbd_dev) + return NULL; + + rbd_dev->opts = opts; + + /* get an id and fill in device name */ + rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0, + minor_to_rbd_dev_id(1 << MINORBITS), + GFP_KERNEL); + if (rbd_dev->dev_id < 0) + goto fail_rbd_dev; + + sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id); + rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM, + rbd_dev->name); + if (!rbd_dev->task_wq) + goto fail_dev_id; + + /* we have a ref from do_rbd_add() */ + __module_get(THIS_MODULE); + + dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id); return rbd_dev; + +fail_dev_id: + ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); +fail_rbd_dev: + rbd_dev_free(rbd_dev); + return NULL; } static void rbd_dev_destroy(struct rbd_device *rbd_dev) @@ -4645,46 +5564,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev) } /* - * Get a unique rbd identifier for the given new rbd_dev, and add - * the rbd_dev to the global list. - */ -static int rbd_dev_id_get(struct rbd_device *rbd_dev) -{ - int new_dev_id; - - new_dev_id = ida_simple_get(&rbd_dev_id_ida, - 0, minor_to_rbd_dev_id(1 << MINORBITS), - GFP_KERNEL); - if (new_dev_id < 0) - return new_dev_id; - - rbd_dev->dev_id = new_dev_id; - - spin_lock(&rbd_dev_list_lock); - list_add_tail(&rbd_dev->node, &rbd_dev_list); - spin_unlock(&rbd_dev_list_lock); - - dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id); - - return 0; -} - -/* - * Remove an rbd_dev from the global list, and record that its - * identifier is no longer in use. - */ -static void rbd_dev_id_put(struct rbd_device *rbd_dev) -{ - spin_lock(&rbd_dev_list_lock); - list_del_init(&rbd_dev->node); - spin_unlock(&rbd_dev_list_lock); - - ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id); - - dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id); -} - -/* * Skips over white space at *buf, and updates *buf to point to the * first found non-space character (if any). Returns the length of * the token (string of non-white space characters) found. Note @@ -4859,6 +5738,7 @@ static int rbd_add_parse_args(const char *buf, rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT; + rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT; copts = ceph_parse_options(options, mon_addrs, mon_addrs + mon_addrs_size - 1, @@ -5076,8 +5956,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth) goto out_err; } - parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec, - NULL); + parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec); if (!parent) { ret = -ENOMEM; goto out_err; @@ -5112,22 +5991,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) { int ret; - /* Get an id and fill in device name. */ - - ret = rbd_dev_id_get(rbd_dev); - if (ret) - goto err_out_unlock; - - BUILD_BUG_ON(DEV_NAME_LEN - < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); - sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); - /* Record our major and minor device numbers. */ if (!single_major) { ret = register_blkdev(0, rbd_dev->name); if (ret < 0) - goto err_out_id; + goto err_out_unlock; rbd_dev->major = ret; rbd_dev->minor = 0; @@ -5159,9 +6028,14 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev) set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); up_write(&rbd_dev->header_rwsem); + spin_lock(&rbd_dev_list_lock); + list_add_tail(&rbd_dev->node, &rbd_dev_list); + spin_unlock(&rbd_dev_list_lock); + add_disk(rbd_dev->disk); - pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, - (unsigned long long) rbd_dev->mapping.size); + pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name, + (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT, + rbd_dev->header.features); return ret; @@ -5172,8 +6046,6 @@ err_out_disk: err_out_blkdev: if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); -err_out_id: - rbd_dev_id_put(rbd_dev); err_out_unlock: up_write(&rbd_dev->header_rwsem); return ret; @@ -5234,7 +6106,7 @@ static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth) goto err_out_format; if (!depth) { - ret = rbd_dev_header_watch_sync(rbd_dev); + ret = rbd_register_watch(rbd_dev); if (ret) { if (ret == -ENOENT) pr_info("image %s/%s does not exist\n", @@ -5293,7 +6165,7 @@ err_out_probe: rbd_dev_unprobe(rbd_dev); err_out_watch: if (!depth) - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); err_out_format: rbd_dev->image_format = 0; kfree(rbd_dev->spec->image_id); @@ -5345,10 +6217,18 @@ static ssize_t do_rbd_add(struct bus_type *bus, spec = NULL; /* rbd_dev now owns this */ rbd_opts = NULL; /* rbd_dev now owns this */ + rbd_dev->config_info = kstrdup(buf, GFP_KERNEL); + if (!rbd_dev->config_info) { + rc = -ENOMEM; + goto err_out_rbd_dev; + } + down_write(&rbd_dev->header_rwsem); rc = rbd_dev_image_probe(rbd_dev, 0); - if (rc < 0) + if (rc < 0) { + up_write(&rbd_dev->header_rwsem); goto err_out_rbd_dev; + } /* If we are mapping a snapshot it must be marked read-only */ @@ -5360,11 +6240,11 @@ static ssize_t do_rbd_add(struct bus_type *bus, rc = rbd_dev_device_setup(rbd_dev); if (rc) { /* - * rbd_dev_header_unwatch_sync() can't be moved into + * rbd_unregister_watch() can't be moved into * rbd_dev_image_release() without refactoring, see * commit 1f3ef78861ac. */ - rbd_dev_header_unwatch_sync(rbd_dev); + rbd_unregister_watch(rbd_dev); rbd_dev_image_release(rbd_dev); goto out; } @@ -5375,7 +6255,6 @@ out: return rc; err_out_rbd_dev: - up_write(&rbd_dev->header_rwsem); rbd_dev_destroy(rbd_dev); err_out_client: rbd_put_client(rbdc); @@ -5405,12 +6284,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus, static void rbd_dev_device_release(struct rbd_device *rbd_dev) { rbd_free_disk(rbd_dev); + + spin_lock(&rbd_dev_list_lock); + list_del_init(&rbd_dev->node); + spin_unlock(&rbd_dev_list_lock); + clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); device_del(&rbd_dev->dev); rbd_dev_mapping_clear(rbd_dev); if (!single_major) unregister_blkdev(rbd_dev->major, rbd_dev->name); - rbd_dev_id_put(rbd_dev); } static void rbd_dev_remove_parent(struct rbd_device *rbd_dev) @@ -5446,18 +6329,26 @@ static ssize_t do_rbd_remove(struct bus_type *bus, struct rbd_device *rbd_dev = NULL; struct list_head *tmp; int dev_id; - unsigned long ul; + char opt_buf[6]; bool already = false; + bool force = false; int ret; - ret = kstrtoul(buf, 10, &ul); - if (ret) - return ret; - - /* convert to int; abort if we lost anything in the conversion */ - dev_id = (int)ul; - if (dev_id != ul) + dev_id = -1; + opt_buf[0] = '\0'; + sscanf(buf, "%d %5s", &dev_id, opt_buf); + if (dev_id < 0) { + pr_err("dev_id out of range\n"); return -EINVAL; + } + if (opt_buf[0] != '\0') { + if (!strcmp(opt_buf, "force")) { + force = true; + } else { + pr_err("bad remove option at '%s'\n", opt_buf); + return -EINVAL; + } + } ret = -ENOENT; spin_lock(&rbd_dev_list_lock); @@ -5470,7 +6361,7 @@ static ssize_t do_rbd_remove(struct bus_type *bus, } if (!ret) { spin_lock_irq(&rbd_dev->lock); - if (rbd_dev->open_count) + if (rbd_dev->open_count && !force) ret = -EBUSY; else already = test_and_set_bit(RBD_DEV_FLAG_REMOVING, @@ -5481,7 +6372,20 @@ static ssize_t do_rbd_remove(struct bus_type *bus, if (ret < 0 || already) return ret; - rbd_dev_header_unwatch_sync(rbd_dev); + if (force) { + /* + * Prevent new IO from being queued and wait for existing + * IO to complete/fail. + */ + blk_mq_freeze_queue(rbd_dev->disk->queue); + blk_set_queue_dying(rbd_dev->disk->queue); + } + + down_write(&rbd_dev->lock_rwsem); + if (__rbd_is_lock_owner(rbd_dev)) + rbd_unlock(rbd_dev); + up_write(&rbd_dev->lock_rwsem); + rbd_unregister_watch(rbd_dev); /* * Don't free anything from rbd_dev->disk until after all diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index 49d77cbcf8bd..94f367db27b0 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -28,6 +28,17 @@ #define RBD_DATA_PREFIX "rbd_data." #define RBD_ID_PREFIX "rbd_id." +#define RBD_LOCK_NAME "rbd_lock" +#define RBD_LOCK_TAG "internal" +#define RBD_LOCK_COOKIE_PREFIX "auto" + +enum rbd_notify_op { + RBD_NOTIFY_OP_ACQUIRED_LOCK = 0, + RBD_NOTIFY_OP_RELEASED_LOCK = 1, + RBD_NOTIFY_OP_REQUEST_LOCK = 2, + RBD_NOTIFY_OP_HEADER_UPDATE = 3, +}; + /* * For format version 1, rbd image 'foo' consists of objects * foo.rbd - image metadata diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index d5b6f959a3c3..ef3ebd780aff 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -175,9 +175,8 @@ static void ceph_invalidatepage(struct page *page, unsigned int offset, static int ceph_releasepage(struct page *page, gfp_t g) { - dout("%p releasepage %p idx %lu\n", page->mapping->host, - page, page->index); - WARN_ON(PageDirty(page)); + dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host, + page, page->index, PageDirty(page) ? "" : "not "); /* Can we release the page from the cache? */ if (!ceph_release_fscache_page(page, g)) @@ -298,14 +297,6 @@ unlock: kfree(osd_data->pages); } -static void ceph_unlock_page_vector(struct page **pages, int num_pages) -{ - int i; - - for (i = 0; i < num_pages; i++) - unlock_page(pages[i]); -} - /* * start an async read(ahead) operation. return nr_pages we submitted * a read for on success, or negative error code. @@ -370,6 +361,10 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) dout("start_read %p add_to_page_cache failed %p\n", inode, page); nr_pages = i; + if (nr_pages > 0) { + len = nr_pages << PAGE_SHIFT; + break; + } goto out_pages; } pages[i] = page; @@ -386,8 +381,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) return nr_pages; out_pages: - ceph_unlock_page_vector(pages, nr_pages); - ceph_release_page_vector(pages, nr_pages); + for (i = 0; i < nr_pages; ++i) { + ceph_fscache_readpage_cancel(inode, pages[i]); + unlock_page(pages[i]); + } + ceph_put_page_vector(pages, nr_pages, false); out: ceph_osdc_put_request(req); return ret; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0f5375d8e030..395c7fcb1cea 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -902,10 +902,10 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, return ret; if (write) { - ret = invalidate_inode_pages2_range(inode->i_mapping, + int ret2 = invalidate_inode_pages2_range(inode->i_mapping, pos >> PAGE_SHIFT, (pos + count) >> PAGE_SHIFT); - if (ret < 0) + if (ret2 < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); flags = CEPH_OSD_FLAG_ORDERSNAP | diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index a2cb0c254060..6806dbeaee19 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -210,8 +210,8 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) if (!(fl->fl_flags & FL_FLOCK)) return -ENOLCK; /* No mandatory locks */ - if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK) - return -ENOLCK; + if (fl->fl_type & LOCK_MAND) + return -EOPNOTSUPP; dout("ceph_flock, fl_file: %p", fl->fl_file); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index f72d4ae303b2..815acd1a56d4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -370,6 +370,7 @@ const char *ceph_session_state_name(int s) case CEPH_MDS_SESSION_CLOSING: return "closing"; case CEPH_MDS_SESSION_RESTARTING: return "restarting"; case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; + case CEPH_MDS_SESSION_REJECTED: return "rejected"; default: return "???"; } } @@ -1150,8 +1151,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, while (!list_empty(&ci->i_cap_flush_list)) { cf = list_first_entry(&ci->i_cap_flush_list, struct ceph_cap_flush, i_list); - list_del(&cf->i_list); - list_add(&cf->i_list, &to_remove); + list_move(&cf->i_list, &to_remove); } spin_lock(&mdsc->cap_dirty_lock); @@ -1378,7 +1378,7 @@ static int request_close_session(struct ceph_mds_client *mdsc, if (!msg) return -ENOMEM; ceph_con_send(&session->s_con, msg); - return 0; + return 1; } /* @@ -2131,6 +2131,10 @@ static int __do_request(struct ceph_mds_client *mdsc, ceph_session_state_name(session->s_state)); if (session->s_state != CEPH_MDS_SESSION_OPEN && session->s_state != CEPH_MDS_SESSION_HUNG) { + if (session->s_state == CEPH_MDS_SESSION_REJECTED) { + err = -EACCES; + goto out_session; + } if (session->s_state == CEPH_MDS_SESSION_NEW || session->s_state == CEPH_MDS_SESSION_CLOSING) __open_session(mdsc, session); @@ -2652,6 +2656,15 @@ static void handle_session(struct ceph_mds_session *session, wake_up_session_caps(session, 0); break; + case CEPH_SESSION_REJECT: + WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); + pr_info("mds%d rejected session\n", session->s_mds); + session->s_state = CEPH_MDS_SESSION_REJECTED; + cleanup_session_requests(mdsc, session); + remove_session_caps(session); + wake = 2; /* for good measure */ + break; + default: pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); WARN_ON(1); @@ -3557,11 +3570,11 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) /* * true if all sessions are closed, or we force unmount */ -static bool done_closing_sessions(struct ceph_mds_client *mdsc) +static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) { if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return true; - return atomic_read(&mdsc->num_sessions) == 0; + return atomic_read(&mdsc->num_sessions) <= skipped; } /* @@ -3572,6 +3585,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_session *session; int i; + int skipped = 0; dout("close_sessions\n"); @@ -3583,7 +3597,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) continue; mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); - __close_session(mdsc, session); + if (__close_session(mdsc, session) <= 0) + skipped++; mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); mutex_lock(&mdsc->mutex); @@ -3591,7 +3606,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) mutex_unlock(&mdsc->mutex); dout("waiting for sessions to close\n"); - wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), + wait_event_timeout(mdsc->session_close_wq, + done_closing_sessions(mdsc, skipped), ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining sessions */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 6b3679737d4a..3c6f77b7bb02 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -121,6 +121,7 @@ enum { CEPH_MDS_SESSION_CLOSING = 5, CEPH_MDS_SESSION_RESTARTING = 6, CEPH_MDS_SESSION_RECONNECTING = 7, + CEPH_MDS_SESSION_REJECTED = 8, }; struct ceph_mds_session { diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c index 89e6bc321df3..913dea163d5c 100644 --- a/fs/ceph/strings.c +++ b/fs/ceph/strings.c @@ -43,6 +43,8 @@ const char *ceph_session_op_name(int op) case CEPH_SESSION_RECALL_STATE: return "recall_state"; case CEPH_SESSION_FLUSHMSG: return "flushmsg"; case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack"; + case CEPH_SESSION_FORCE_RO: return "force_ro"; + case CEPH_SESSION_REJECT: return "reject"; } return "???"; } diff --git a/fs/ceph/super.c b/fs/ceph/super.c index e247f6f0feb7..a29ffce98187 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -396,10 +396,12 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt, */ dev_name_end = strchr(dev_name, '/'); if (dev_name_end) { - fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL); - if (!fsopt->server_path) { - err = -ENOMEM; - goto out; + if (strlen(dev_name_end) > 1) { + fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL); + if (!fsopt->server_path) { + err = -ENOMEM; + goto out; + } } } else { dev_name_end = dev_name + strlen(dev_name); @@ -788,15 +790,10 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc, struct inode *inode = req->r_target_inode; req->r_target_inode = NULL; dout("open_root_inode success\n"); - if (ceph_ino(inode) == CEPH_INO_ROOT && - fsc->sb->s_root == NULL) { - root = d_make_root(inode); - if (!root) { - root = ERR_PTR(-ENOMEM); - goto out; - } - } else { - root = d_obtain_root(inode); + root = d_make_root(inode); + if (!root) { + root = ERR_PTR(-ENOMEM); + goto out; } ceph_init_dentry(root); dout("open_root_inode success, root dentry is %p\n", root); @@ -825,17 +822,24 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc) mutex_lock(&fsc->client->mount_mutex); if (!fsc->sb->s_root) { + const char *path; err = __ceph_open_session(fsc->client, started); if (err < 0) goto out; - dout("mount opening root\n"); - root = open_root_dentry(fsc, "", started); + if (!fsc->mount_options->server_path) { + path = ""; + dout("mount opening path \\t\n"); + } else { + path = fsc->mount_options->server_path + 1; + dout("mount opening path %s\n", path); + } + root = open_root_dentry(fsc, path, started); if (IS_ERR(root)) { err = PTR_ERR(root); goto out; } - fsc->sb->s_root = root; + fsc->sb->s_root = dget(root); first = 1; err = ceph_fs_debugfs_init(fsc); @@ -843,19 +847,6 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc) goto fail; } - if (!fsc->mount_options->server_path) { - root = fsc->sb->s_root; - dget(root); - } else { - const char *path = fsc->mount_options->server_path + 1; - dout("mount opening path %s\n", path); - root = open_root_dentry(fsc, path, started); - if (IS_ERR(root)) { - err = PTR_ERR(root); - goto fail; - } - } - fsc->mount_state = CEPH_MOUNT_MOUNTED; dout("mount success\n"); mutex_unlock(&fsc->client->mount_mutex); diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h index 1563265d2097..374bb1c4ef52 100644 --- a/include/linux/ceph/auth.h +++ b/include/linux/ceph/auth.h @@ -104,7 +104,7 @@ extern int ceph_auth_build_hello(struct ceph_auth_client *ac, extern int ceph_handle_auth_reply(struct ceph_auth_client *ac, void *buf, size_t len, void *reply_buf, size_t reply_len); -extern int ceph_entity_name_encode(const char *name, void **p, void *end); +int ceph_auth_entity_name_encode(const char *name, void **p, void *end); extern int ceph_build_auth(struct ceph_auth_client *ac, void *msg_buf, size_t msg_len); diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 7868d602c0a0..f96de8de4fa7 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -138,6 +138,9 @@ struct ceph_dir_layout { #define CEPH_MSG_POOLOP_REPLY 48 #define CEPH_MSG_POOLOP 49 +/* mon commands */ +#define CEPH_MSG_MON_COMMAND 50 +#define CEPH_MSG_MON_COMMAND_ACK 51 /* osd */ #define CEPH_MSG_OSD_MAP 41 @@ -176,6 +179,14 @@ struct ceph_mon_statfs_reply { struct ceph_statfs st; } __attribute__ ((packed)); +struct ceph_mon_command { + struct ceph_mon_request_header monhdr; + struct ceph_fsid fsid; + __le32 num_strs; /* always 1 */ + __le32 str_len; + char str[]; +} __attribute__ ((packed)); + struct ceph_osd_getmap { struct ceph_mon_request_header monhdr; struct ceph_fsid fsid; @@ -270,6 +281,7 @@ enum { CEPH_SESSION_FLUSHMSG, CEPH_SESSION_FLUSHMSG_ACK, CEPH_SESSION_FORCE_RO, + CEPH_SESSION_REJECT, }; extern const char *ceph_session_op_name(int op); diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h new file mode 100644 index 000000000000..84884d8d4710 --- /dev/null +++ b/include/linux/ceph/cls_lock_client.h @@ -0,0 +1,49 @@ +#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H +#define _LINUX_CEPH_CLS_LOCK_CLIENT_H + +#include <linux/ceph/osd_client.h> + +enum ceph_cls_lock_type { + CEPH_CLS_LOCK_NONE = 0, + CEPH_CLS_LOCK_EXCLUSIVE = 1, + CEPH_CLS_LOCK_SHARED = 2, +}; + +struct ceph_locker_id { + struct ceph_entity_name name; /* locker's client name */ + char *cookie; /* locker's cookie */ +}; + +struct ceph_locker_info { + struct ceph_entity_addr addr; /* locker's address */ +}; + +struct ceph_locker { + struct ceph_locker_id id; + struct ceph_locker_info info; +}; + +int ceph_cls_lock(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, u8 type, char *cookie, + char *tag, char *desc, u8 flags); +int ceph_cls_unlock(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, char *cookie); +int ceph_cls_break_lock(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, char *cookie, + struct ceph_entity_name *locker); + +void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers); + +int ceph_cls_lock_info(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, u8 *type, char **tag, + struct ceph_locker **lockers, u32 *num_lockers); + +#endif diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 83fc1fff7061..1816c5e26581 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -264,7 +264,8 @@ extern struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, u64 supported_features, u64 required_features); -extern u64 ceph_client_id(struct ceph_client *client); +struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client); +u64 ceph_client_gid(struct ceph_client *client); extern void ceph_destroy_client(struct ceph_client *client); extern int __ceph_open_session(struct ceph_client *client, unsigned long started); diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h index 24d704d1ea5c..d5a3ecea578d 100644 --- a/include/linux/ceph/mon_client.h +++ b/include/linux/ceph/mon_client.h @@ -141,6 +141,9 @@ int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, ceph_monc_callback_t cb, u64 private_data); +int ceph_monc_blacklist_add(struct ceph_mon_client *monc, + struct ceph_entity_addr *client_addr); + extern int ceph_monc_open_session(struct ceph_mon_client *monc); extern int ceph_monc_validate_auth(struct ceph_mon_client *monc); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 858932304260..96337b15a60d 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -121,6 +121,9 @@ struct ceph_osd_req_op { struct ceph_osd_data response_data; } notify; struct { + struct ceph_osd_data response_data; + } list_watchers; + struct { u64 expected_object_size; u64 expected_write_size; } alloc_hint; @@ -249,6 +252,12 @@ struct ceph_osd_linger_request { size_t *preply_len; }; +struct ceph_watch_item { + struct ceph_entity_name name; + u64 cookie; + struct ceph_entity_addr addr; +}; + struct ceph_osd_client { struct ceph_client *client; @@ -346,7 +355,6 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); - extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method); @@ -389,6 +397,14 @@ extern void ceph_osdc_sync(struct ceph_osd_client *osdc); extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc); void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc); +int ceph_osdc_call(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + const char *class, const char *method, + unsigned int flags, + struct page *req_page, size_t req_len, + struct page *resp_page, size_t *resp_len); + extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, @@ -434,5 +450,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, size_t *preply_len); int ceph_osdc_watch_check(struct ceph_osd_client *osdc, struct ceph_osd_linger_request *lreq); +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_watch_item **watchers, + u32 *num_watchers); #endif diff --git a/net/ceph/Makefile b/net/ceph/Makefile index 84cbed630c4b..6a5180903e7b 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -5,6 +5,7 @@ obj-$(CONFIG_CEPH_LIB) += libceph.o libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ mon_client.o \ + cls_lock_client.o \ osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ debugfs.o \ auth.o auth_none.o \ diff --git a/net/ceph/auth.c b/net/ceph/auth.c index 2bc5965fdd1e..c822b3ae1bd3 100644 --- a/net/ceph/auth.c +++ b/net/ceph/auth.c @@ -82,7 +82,10 @@ void ceph_auth_reset(struct ceph_auth_client *ac) mutex_unlock(&ac->mutex); } -int ceph_entity_name_encode(const char *name, void **p, void *end) +/* + * EntityName, not to be confused with entity_name_t + */ +int ceph_auth_entity_name_encode(const char *name, void **p, void *end) { int len = strlen(name); @@ -111,7 +114,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len) monhdr->session_mon = cpu_to_le16(-1); monhdr->session_mon_tid = 0; - ceph_encode_32(&p, 0); /* no protocol, yet */ + ceph_encode_32(&p, CEPH_AUTH_UNKNOWN); /* no protocol, yet */ lenp = p; p += sizeof(u32); @@ -124,7 +127,7 @@ int ceph_auth_build_hello(struct ceph_auth_client *ac, void *buf, size_t len) for (i = 0; i < num; i++) ceph_encode_32(&p, supported_protocols[i]); - ret = ceph_entity_name_encode(ac->name, &p, end); + ret = ceph_auth_entity_name_encode(ac->name, &p, end); if (ret < 0) goto out; ceph_decode_need(&p, end, sizeof(u64), bad); @@ -259,9 +262,7 @@ int ceph_build_auth(struct ceph_auth_client *ac, int ret = 0; mutex_lock(&ac->mutex); - if (!ac->protocol) - ret = ceph_auth_build_hello(ac, msg_buf, msg_len); - else if (ac->ops->should_authenticate(ac)) + if (ac->ops->should_authenticate(ac)) ret = ceph_build_auth_request(ac, msg_buf, msg_len); mutex_unlock(&ac->mutex); return ret; diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c index 5f836f02ae36..df45e467c81f 100644 --- a/net/ceph/auth_none.c +++ b/net/ceph/auth_none.c @@ -46,7 +46,7 @@ static int ceph_auth_none_build_authorizer(struct ceph_auth_client *ac, int ret; ceph_encode_8_safe(&p, end, 1, e_range); - ret = ceph_entity_name_encode(ac->name, &p, end); + ret = ceph_auth_entity_name_encode(ac->name, &p, end); if (ret < 0) return ret; diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index bddfcf6f09c2..464e88599b9d 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -566,11 +566,17 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) } EXPORT_SYMBOL(ceph_print_client_options); -u64 ceph_client_id(struct ceph_client *client) +struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client) +{ + return &client->msgr.inst.addr; +} +EXPORT_SYMBOL(ceph_client_addr); + +u64 ceph_client_gid(struct ceph_client *client) { return client->monc.auth->global_id; } -EXPORT_SYMBOL(ceph_client_id); +EXPORT_SYMBOL(ceph_client_gid); /* * create a fresh client instance @@ -685,7 +691,8 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started) return client->auth_err; } - pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid); + pr_info("client%llu fsid %pU\n", ceph_client_gid(client), + &client->fsid); ceph_debugfs_client_init(client); return 0; diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 3773a4fa11e3..19b7d8aa915c 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -15,6 +15,7 @@ const char *ceph_entity_type_name(int type) default: return "unknown"; } } +EXPORT_SYMBOL(ceph_entity_type_name); const char *ceph_osd_op_name(int op) { diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c new file mode 100644 index 000000000000..50f040fdb2a9 --- /dev/null +++ b/net/ceph/cls_lock_client.c @@ -0,0 +1,325 @@ +#include <linux/ceph/ceph_debug.h> + +#include <linux/types.h> +#include <linux/slab.h> + +#include <linux/ceph/cls_lock_client.h> +#include <linux/ceph/decode.h> + +/** + * ceph_cls_lock - grab rados lock for object + * @oid, @oloc: object to lock + * @lock_name: the name of the lock + * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED) + * @cookie: user-defined identifier for this instance of the lock + * @tag: user-defined tag + * @desc: user-defined lock description + * @flags: lock flags + * + * All operations on the same lock should use the same tag. + */ +int ceph_cls_lock(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, u8 type, char *cookie, + char *tag, char *desc, u8 flags) +{ + int lock_op_buf_size; + int name_len = strlen(lock_name); + int cookie_len = strlen(cookie); + int tag_len = strlen(tag); + int desc_len = strlen(desc); + void *p, *end; + struct page *lock_op_page; + struct timespec mtime; + int ret; + + lock_op_buf_size = name_len + sizeof(__le32) + + cookie_len + sizeof(__le32) + + tag_len + sizeof(__le32) + + desc_len + sizeof(__le32) + + sizeof(struct ceph_timespec) + + /* flag and type */ + sizeof(u8) + sizeof(u8) + + CEPH_ENCODING_START_BLK_LEN; + if (lock_op_buf_size > PAGE_SIZE) + return -E2BIG; + + lock_op_page = alloc_page(GFP_NOIO); + if (!lock_op_page) + return -ENOMEM; + + p = page_address(lock_op_page); + end = p + lock_op_buf_size; + + /* encode cls_lock_lock_op struct */ + ceph_start_encoding(&p, 1, 1, + lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_string(&p, end, lock_name, name_len); + ceph_encode_8(&p, type); + ceph_encode_string(&p, end, cookie, cookie_len); + ceph_encode_string(&p, end, tag, tag_len); + ceph_encode_string(&p, end, desc, desc_len); + /* only support infinite duration */ + memset(&mtime, 0, sizeof(mtime)); + ceph_encode_timespec(p, &mtime); + p += sizeof(struct ceph_timespec); + ceph_encode_8(&p, flags); + + dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n", + __func__, lock_name, type, cookie, tag, desc, flags); + ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock", + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + lock_op_page, lock_op_buf_size, NULL, NULL); + + dout("%s: status %d\n", __func__, ret); + __free_page(lock_op_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_lock); + +/** + * ceph_cls_unlock - release rados lock for object + * @oid, @oloc: object to lock + * @lock_name: the name of the lock + * @cookie: user-defined identifier for this instance of the lock + */ +int ceph_cls_unlock(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, char *cookie) +{ + int unlock_op_buf_size; + int name_len = strlen(lock_name); + int cookie_len = strlen(cookie); + void *p, *end; + struct page *unlock_op_page; + int ret; + + unlock_op_buf_size = name_len + sizeof(__le32) + + cookie_len + sizeof(__le32) + + CEPH_ENCODING_START_BLK_LEN; + if (unlock_op_buf_size > PAGE_SIZE) + return -E2BIG; + + unlock_op_page = alloc_page(GFP_NOIO); + if (!unlock_op_page) + return -ENOMEM; + + p = page_address(unlock_op_page); + end = p + unlock_op_buf_size; + + /* encode cls_lock_unlock_op struct */ + ceph_start_encoding(&p, 1, 1, + unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_string(&p, end, lock_name, name_len); + ceph_encode_string(&p, end, cookie, cookie_len); + + dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie); + ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock", + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + unlock_op_page, unlock_op_buf_size, NULL, NULL); + + dout("%s: status %d\n", __func__, ret); + __free_page(unlock_op_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_unlock); + +/** + * ceph_cls_break_lock - release rados lock for object for specified client + * @oid, @oloc: object to lock + * @lock_name: the name of the lock + * @cookie: user-defined identifier for this instance of the lock + * @locker: current lock owner + */ +int ceph_cls_break_lock(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, char *cookie, + struct ceph_entity_name *locker) +{ + int break_op_buf_size; + int name_len = strlen(lock_name); + int cookie_len = strlen(cookie); + struct page *break_op_page; + void *p, *end; + int ret; + + break_op_buf_size = name_len + sizeof(__le32) + + cookie_len + sizeof(__le32) + + sizeof(u8) + sizeof(__le64) + + CEPH_ENCODING_START_BLK_LEN; + if (break_op_buf_size > PAGE_SIZE) + return -E2BIG; + + break_op_page = alloc_page(GFP_NOIO); + if (!break_op_page) + return -ENOMEM; + + p = page_address(break_op_page); + end = p + break_op_buf_size; + + /* encode cls_lock_break_op struct */ + ceph_start_encoding(&p, 1, 1, + break_op_buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_string(&p, end, lock_name, name_len); + ceph_encode_copy(&p, locker, sizeof(*locker)); + ceph_encode_string(&p, end, cookie, cookie_len); + + dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name, + cookie, ENTITY_NAME(*locker)); + ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock", + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + break_op_page, break_op_buf_size, NULL, NULL); + + dout("%s: status %d\n", __func__, ret); + __free_page(break_op_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_break_lock); + +void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers) +{ + int i; + + for (i = 0; i < num_lockers; i++) + kfree(lockers[i].id.cookie); + kfree(lockers); +} +EXPORT_SYMBOL(ceph_free_lockers); + +static int decode_locker(void **p, void *end, struct ceph_locker *locker) +{ + u8 struct_v; + u32 len; + char *s; + int ret; + + ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len); + if (ret) + return ret; + + ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name)); + s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO); + if (IS_ERR(s)) + return PTR_ERR(s); + + locker->id.cookie = s; + + ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len); + if (ret) + return ret; + + *p += sizeof(struct ceph_timespec); /* skip expiration */ + ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr)); + ceph_decode_addr(&locker->info.addr); + len = ceph_decode_32(p); + *p += len; /* skip description */ + + dout("%s %s%llu cookie %s addr %s\n", __func__, + ENTITY_NAME(locker->id.name), locker->id.cookie, + ceph_pr_addr(&locker->info.addr.in_addr)); + return 0; +} + +static int decode_lockers(void **p, void *end, u8 *type, char **tag, + struct ceph_locker **lockers, u32 *num_lockers) +{ + u8 struct_v; + u32 struct_len; + char *s; + int i; + int ret; + + ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply", + &struct_v, &struct_len); + if (ret) + return ret; + + *num_lockers = ceph_decode_32(p); + *lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO); + if (!*lockers) + return -ENOMEM; + + for (i = 0; i < *num_lockers; i++) { + ret = decode_locker(p, end, *lockers + i); + if (ret) + goto err_free_lockers; + } + + *type = ceph_decode_8(p); + s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO); + if (IS_ERR(s)) { + ret = PTR_ERR(s); + goto err_free_lockers; + } + + *tag = s; + return 0; + +err_free_lockers: + ceph_free_lockers(*lockers, *num_lockers); + return ret; +} + +/* + * On success, the caller is responsible for: + * + * kfree(tag); + * ceph_free_lockers(lockers, num_lockers); + */ +int ceph_cls_lock_info(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + char *lock_name, u8 *type, char **tag, + struct ceph_locker **lockers, u32 *num_lockers) +{ + int get_info_op_buf_size; + int name_len = strlen(lock_name); + struct page *get_info_op_page, *reply_page; + size_t reply_len; + void *p, *end; + int ret; + + get_info_op_buf_size = name_len + sizeof(__le32) + + CEPH_ENCODING_START_BLK_LEN; + if (get_info_op_buf_size > PAGE_SIZE) + return -E2BIG; + + get_info_op_page = alloc_page(GFP_NOIO); + if (!get_info_op_page) + return -ENOMEM; + + reply_page = alloc_page(GFP_NOIO); + if (!reply_page) { + __free_page(get_info_op_page); + return -ENOMEM; + } + + p = page_address(get_info_op_page); + end = p + get_info_op_buf_size; + + /* encode cls_lock_get_info_op struct */ + ceph_start_encoding(&p, 1, 1, + get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN); + ceph_encode_string(&p, end, lock_name, name_len); + + dout("%s lock_name %s\n", __func__, lock_name); + ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info", + CEPH_OSD_FLAG_READ, get_info_op_page, + get_info_op_buf_size, reply_page, &reply_len); + + dout("%s: status %d\n", __func__, ret); + if (ret >= 0) { + p = page_address(reply_page); + end = p + reply_len; + + ret = decode_lockers(&p, end, type, tag, lockers, num_lockers); + } + + __free_page(get_info_op_page); + __free_page(reply_page); + return ret; +} +EXPORT_SYMBOL(ceph_cls_lock_info); diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 5fcfb98f309e..a421e905331a 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -245,7 +245,7 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket, /* compute 2^44*log2(input+1) */ static __u64 crush_ln(unsigned int xin) { - unsigned int x = xin, x1; + unsigned int x = xin; int iexpon, index1, index2; __u64 RH, LH, LL, xl64, result; @@ -253,9 +253,15 @@ static __u64 crush_ln(unsigned int xin) /* normalize input */ iexpon = 15; - while (!(x & 0x18000)) { - x <<= 1; - iexpon--; + + /* + * figure out number of bits we need to shift and + * do it in one step instead of iteratively + */ + if (!(x & 0x18000)) { + int bits = __builtin_clz(x & 0x1FFFF) - 16; + x <<= bits; + iexpon = 15 - bits; } index1 = (x >> 8) << 1; @@ -267,12 +273,11 @@ static __u64 crush_ln(unsigned int xin) /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */ xl64 = (__s64)x * RH; xl64 >>= 48; - x1 = xl64; result = iexpon; result <<= (12 + 32); - index2 = x1 & 0xff; + index2 = xl64 & 0xff; /* LL ~ 2^48*log2(1.0+index2/2^15) */ LL = __LL_tbl[index2]; diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index ef34a02719d7..a8effc8b7280 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -835,6 +835,83 @@ int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, } EXPORT_SYMBOL(ceph_monc_get_version_async); +static void handle_command_ack(struct ceph_mon_client *monc, + struct ceph_msg *msg) +{ + struct ceph_mon_generic_request *req; + void *p = msg->front.iov_base; + void *const end = p + msg->front_alloc_len; + u64 tid = le64_to_cpu(msg->hdr.tid); + + dout("%s msg %p tid %llu\n", __func__, msg, tid); + + ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + + sizeof(u32), bad); + p += sizeof(struct ceph_mon_request_header); + + mutex_lock(&monc->mutex); + req = lookup_generic_request(&monc->generic_request_tree, tid); + if (!req) { + mutex_unlock(&monc->mutex); + return; + } + + req->result = ceph_decode_32(&p); + __finish_generic_request(req); + mutex_unlock(&monc->mutex); + + complete_generic_request(req); + return; + +bad: + pr_err("corrupt mon_command ack, tid %llu\n", tid); + ceph_msg_dump(msg); +} + +int ceph_monc_blacklist_add(struct ceph_mon_client *monc, + struct ceph_entity_addr *client_addr) +{ + struct ceph_mon_generic_request *req; + struct ceph_mon_command *h; + int ret = -ENOMEM; + int len; + + req = alloc_generic_request(monc, GFP_NOIO); + if (!req) + goto out; + + req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); + if (!req->request) + goto out; + + req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, + true); + if (!req->reply) + goto out; + + mutex_lock(&monc->mutex); + register_generic_request(req); + h = req->request->front.iov_base; + h->monhdr.have_version = 0; + h->monhdr.session_mon = cpu_to_le16(-1); + h->monhdr.session_mon_tid = 0; + h->fsid = monc->monmap->fsid; + h->num_strs = cpu_to_le32(1); + len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \ + \"blacklistop\": \"add\", \ + \"addr\": \"%pISpc/%u\" }", + &client_addr->in_addr, le32_to_cpu(client_addr->nonce)); + h->str_len = cpu_to_le32(len); + send_generic_request(monc, req); + mutex_unlock(&monc->mutex); + + ret = wait_generic_request(req); +out: + put_generic_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_monc_blacklist_add); + /* * Resend pending generic requests. */ @@ -1139,6 +1216,10 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) handle_get_version_reply(monc, msg); break; + case CEPH_MSG_MON_COMMAND_ACK: + handle_command_ack(monc, msg); + break; + case CEPH_MSG_MON_MAP: ceph_monc_handle_map(monc, msg); break; @@ -1178,6 +1259,7 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, m = ceph_msg_get(monc->m_subscribe_ack); break; case CEPH_MSG_STATFS_REPLY: + case CEPH_MSG_MON_COMMAND_ACK: return get_generic_reply(con, hdr, skip); case CEPH_MSG_AUTH_REPLY: m = ceph_msg_get(monc->m_auth_reply); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a97e7b506612..d9bf7a1d0a58 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,6 +338,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, ceph_osd_data_release(&op->notify.request_data); ceph_osd_data_release(&op->notify.response_data); break; + case CEPH_OSD_OP_LIST_WATCHERS: + ceph_osd_data_release(&op->list_watchers.response_data); + break; default: break; } @@ -863,6 +866,8 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, case CEPH_OSD_OP_NOTIFY: dst->notify.cookie = cpu_to_le64(src->notify.cookie); break; + case CEPH_OSD_OP_LIST_WATCHERS: + break; case CEPH_OSD_OP_SETALLOCHINT: dst->alloc_hint.expected_object_size = cpu_to_le64(src->alloc_hint.expected_object_size); @@ -1445,6 +1450,10 @@ static void setup_request_data(struct ceph_osd_request *req, ceph_osdc_msg_data_add(req->r_reply, &op->extent.osd_data); break; + case CEPH_OSD_OP_LIST_WATCHERS: + ceph_osdc_msg_data_add(req->r_reply, + &op->list_watchers.response_data); + break; /* both */ case CEPH_OSD_OP_CALL: @@ -3891,12 +3900,121 @@ int ceph_osdc_watch_check(struct ceph_osd_client *osdc, return ret; } +static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) +{ + u8 struct_v; + u32 struct_len; + int ret; + + ret = ceph_start_decoding(p, end, 2, "watch_item_t", + &struct_v, &struct_len); + if (ret) + return ret; + + ceph_decode_copy(p, &item->name, sizeof(item->name)); + item->cookie = ceph_decode_64(p); + *p += 4; /* skip timeout_seconds */ + if (struct_v >= 2) { + ceph_decode_copy(p, &item->addr, sizeof(item->addr)); + ceph_decode_addr(&item->addr); + } + + dout("%s %s%llu cookie %llu addr %s\n", __func__, + ENTITY_NAME(item->name), item->cookie, + ceph_pr_addr(&item->addr.in_addr)); + return 0; +} + +static int decode_watchers(void **p, void *end, + struct ceph_watch_item **watchers, + u32 *num_watchers) +{ + u8 struct_v; + u32 struct_len; + int i; + int ret; + + ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", + &struct_v, &struct_len); + if (ret) + return ret; + + *num_watchers = ceph_decode_32(p); + *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); + if (!*watchers) + return -ENOMEM; + + for (i = 0; i < *num_watchers; i++) { + ret = decode_watcher(p, end, *watchers + i); + if (ret) { + kfree(*watchers); + return ret; + } + } + + return 0; +} + +/* + * On success, the caller is responsible for: + * + * kfree(watchers); + */ +int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + struct ceph_watch_item **watchers, + u32 *num_watchers) +{ + struct ceph_osd_request *req; + struct page **pages; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + pages = ceph_alloc_page_vector(1, GFP_NOIO); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_put_req; + } + + osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); + ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, + response_data), + pages, PAGE_SIZE, 0, false, true); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) { + void *p = page_address(pages[0]); + void *const end = p + req->r_ops[0].outdata_len; + + ret = decode_watchers(&p, end, watchers, num_watchers); + } + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_list_watchers); + /* * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked */ void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) { + dout("%s osdc %p\n", __func__, osdc); flush_workqueue(osdc->notify_wq); } EXPORT_SYMBOL(ceph_osdc_flush_notifies); @@ -3910,6 +4028,57 @@ void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) EXPORT_SYMBOL(ceph_osdc_maybe_request_map); /* + * Execute an OSD class method on an object. + * + * @flags: CEPH_OSD_FLAG_* + * @resp_len: out param for reply length + */ +int ceph_osdc_call(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + const char *class, const char *method, + unsigned int flags, + struct page *req_page, size_t req_len, + struct page *resp_page, size_t *resp_len) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = flags; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + if (req_page) + osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, + 0, false, false); + if (resp_page) + osd_req_op_cls_response_data_pages(req, 0, &resp_page, + PAGE_SIZE, 0, false, false); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) { + ret = req->r_ops[0].rval; + if (resp_page) + *resp_len = req->r_ops[0].outdata_len; + } + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_call); + +/* * init, shutdown */ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) |