diff options
author | Ilya Dryomov <idryomov@gmail.com> | 2016-05-26 01:15:02 +0200 |
---|---|---|
committer | Ilya Dryomov <idryomov@gmail.com> | 2016-05-26 01:15:02 +0200 |
commit | 922dab6134178cae317ae00de86376cba59f3147 (patch) | |
tree | a7047a5950b6a8505cc1e6852e4532656064fede /net/ceph/osd_client.c | |
parent | c525f03601f52c83ded046624138f2a45e0ba56c (diff) | |
download | linux-922dab6134178cae317ae00de86376cba59f3147.tar.bz2 |
libceph, rbd: ceph_osd_linger_request, watch/notify v2
This adds support and switches rbd to a new, more reliable version of
watch/notify protocol. As with the OSD client update, this is mostly
about getting the right structures linked into the right places so that
reconnects are properly sent when needed. watch/notify v2 also
requires sending regular pings to the OSDs - send_linger_ping().
A major change from the old watch/notify implementation is the
introduction of ceph_osd_linger_request - linger requests no longer
piggy back on ceph_osd_request. ceph_osd_event has been merged into
ceph_osd_linger_request.
All the details are now hidden within libceph, the interface consists
of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack().
ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep
the lifetime management simple.
ceph_osdc_notify_ack() accepts an optional data payload, which is
relayed back to the notifier.
Portions of this patch are loosely based on work by Douglas Fuller
<dfuller@redhat.com> and Mike Christie <michaelc@cs.wisc.edu>.
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 1148 |
1 files changed, 899 insertions, 249 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ef1bcbe9af2d..ca0a7b58ba4f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -45,6 +45,10 @@ static const struct ceph_connection_operations osd_con_ops; static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); +static void link_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq); +static void unlink_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq); #if 1 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) @@ -74,10 +78,15 @@ static inline void verify_osd_locked(struct ceph_osd *osd) rwsem_is_locked(&osdc->lock)) && !rwsem_is_wrlocked(&osdc->lock)); } +static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) +{ + WARN_ON(!mutex_is_locked(&lreq->lock)); +} #else static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } static inline void verify_osd_locked(struct ceph_osd *osd) { } +static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } #endif /* @@ -322,6 +331,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_STAT: ceph_osd_data_release(&op->raw_data_in); break; + case CEPH_OSD_OP_NOTIFY_ACK: + ceph_osd_data_release(&op->notify_ack.request_data); + break; default: break; } @@ -345,6 +357,29 @@ static void target_init(struct ceph_osd_request_target *t) t->osd = CEPH_HOMELESS_OSD; } +static void target_copy(struct ceph_osd_request_target *dest, + const struct ceph_osd_request_target *src) +{ + ceph_oid_copy(&dest->base_oid, &src->base_oid); + ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); + ceph_oid_copy(&dest->target_oid, &src->target_oid); + ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); + + dest->pgid = src->pgid; /* struct */ + dest->pg_num = src->pg_num; + dest->pg_num_mask = src->pg_num_mask; + ceph_osds_copy(&dest->acting, &src->acting); + ceph_osds_copy(&dest->up, &src->up); + dest->size = src->size; + dest->min_size = src->min_size; + dest->sort_bitwise = src->sort_bitwise; + + dest->flags = src->flags; + dest->paused = src->paused; + + dest->osd = src->osd; +} + static void target_destroy(struct ceph_osd_request_target *t) { ceph_oid_destroy(&t->base_oid); @@ -357,8 +392,6 @@ static void target_destroy(struct ceph_osd_request_target *t) static void request_release_checks(struct ceph_osd_request *req) { WARN_ON(!RB_EMPTY_NODE(&req->r_node)); - WARN_ON(!list_empty(&req->r_linger_item)); - WARN_ON(!list_empty(&req->r_linger_osd_item)); WARN_ON(!list_empty(&req->r_unsafe_item)); WARN_ON(req->r_osd); } @@ -419,13 +452,48 @@ static void request_init(struct ceph_osd_request *req) init_completion(&req->r_completion); init_completion(&req->r_safe_completion); RB_CLEAR_NODE(&req->r_node); - INIT_LIST_HEAD(&req->r_linger_item); - INIT_LIST_HEAD(&req->r_linger_osd_item); INIT_LIST_HEAD(&req->r_unsafe_item); target_init(&req->r_t); } +/* + * This is ugly, but it allows us to reuse linger registration and ping + * requests, keeping the structure of the code around send_linger{_ping}() + * reasonable. Setting up a min_nr=2 mempool for each linger request + * and dealing with copying ops (this blasts req only, watch op remains + * intact) isn't any better. + */ +static void request_reinit(struct ceph_osd_request *req) +{ + struct ceph_osd_client *osdc = req->r_osdc; + bool mempool = req->r_mempool; + unsigned int num_ops = req->r_num_ops; + u64 snapid = req->r_snapid; + struct ceph_snap_context *snapc = req->r_snapc; + bool linger = req->r_linger; + struct ceph_msg *request_msg = req->r_request; + struct ceph_msg *reply_msg = req->r_reply; + + dout("%s req %p\n", __func__, req); + WARN_ON(atomic_read(&req->r_kref.refcount) != 1); + request_release_checks(req); + + WARN_ON(atomic_read(&request_msg->kref.refcount) != 1); + WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1); + target_destroy(&req->r_t); + + request_init(req); + req->r_osdc = osdc; + req->r_mempool = mempool; + req->r_num_ops = num_ops; + req->r_snapid = snapid; + req->r_snapc = snapc; + req->r_linger = linger; + req->r_request = request_msg; + req->r_reply = reply_msg; +} + struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, unsigned int num_ops, @@ -681,21 +749,19 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, } EXPORT_SYMBOL(osd_req_op_xattr_init); -void osd_req_op_watch_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode, - u64 cookie, u64 version, int flag) +/* + * @watch_opcode: CEPH_OSD_WATCH_OP_* + */ +static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, + u64 cookie, u8 watch_opcode) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - opcode, 0); - - BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); + struct ceph_osd_req_op *op; + op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); op->watch.cookie = cookie; - op->watch.ver = version; - if (opcode == CEPH_OSD_OP_WATCH && flag) - op->watch.flag = (u8)1; + op->watch.op = watch_opcode; + op->watch.gen = 0; } -EXPORT_SYMBOL(osd_req_op_watch_init); void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, unsigned int which, @@ -771,11 +837,13 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); - dst->watch.ver = cpu_to_le64(src->watch.ver); - dst->watch.flag = src->watch.flag; + dst->watch.ver = cpu_to_le64(0); + dst->watch.op = src->watch.op; + dst->watch.gen = cpu_to_le32(src->watch.gen); + break; + case CEPH_OSD_OP_NOTIFY_ACK: break; case CEPH_OSD_OP_SETALLOCHINT: dst->alloc_hint.expected_object_size = @@ -915,7 +983,7 @@ static void osd_init(struct ceph_osd *osd) atomic_set(&osd->o_ref, 1); RB_CLEAR_NODE(&osd->o_node); osd->o_requests = RB_ROOT; - INIT_LIST_HEAD(&osd->o_linger_requests); + osd->o_linger_requests = RB_ROOT; INIT_LIST_HEAD(&osd->o_osd_lru); INIT_LIST_HEAD(&osd->o_keepalive_item); osd->o_incarnation = 1; @@ -926,7 +994,7 @@ static void osd_cleanup(struct ceph_osd *osd) { WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); WARN_ON(!list_empty(&osd->o_osd_lru)); WARN_ON(!list_empty(&osd->o_keepalive_item)); @@ -996,7 +1064,7 @@ static void __move_osd_to_lru(struct ceph_osd *osd) static void maybe_move_osd_to_lru(struct ceph_osd *osd) { if (RB_EMPTY_ROOT(&osd->o_requests) && - list_empty(&osd->o_linger_requests)) + RB_EMPTY_ROOT(&osd->o_linger_requests)) __move_osd_to_lru(osd); } @@ -1036,6 +1104,17 @@ static void close_osd(struct ceph_osd *osd) unlink_request(osd, req); link_request(&osdc->homeless_osd, req); } + for (n = rb_first(&osd->o_linger_requests); n; ) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + n = rb_next(n); /* unlink_linger() */ + + dout(" reassigning lreq %p linger_id %llu\n", lreq, + lreq->linger_id); + unlink_linger(osd, lreq); + link_linger(&osdc->homeless_osd, lreq); + } __remove_osd_from_lru(osd); erase_osd(&osdc->osds, osd); @@ -1052,7 +1131,7 @@ static int reopen_osd(struct ceph_osd *osd) dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); if (RB_EMPTY_ROOT(&osd->o_requests) && - list_empty(&osd->o_linger_requests)) { + RB_EMPTY_ROOT(&osd->o_linger_requests)) { close_osd(osd); return -ENODEV; } @@ -1148,52 +1227,6 @@ static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) atomic_dec(&osd->o_osdc->num_homeless); } -static void __register_linger_request(struct ceph_osd *osd, - struct ceph_osd_request *req) -{ - dout("%s %p tid %llu\n", __func__, req, req->r_tid); - WARN_ON(!req->r_linger); - - ceph_osdc_get_request(req); - list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger); - list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests); - __remove_osd_from_lru(osd); - req->r_osd = osd; -} - -static void __unregister_linger_request(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - WARN_ON(!req->r_linger); - - if (list_empty(&req->r_linger_item)) { - dout("%s %p tid %llu not registered\n", __func__, req, - req->r_tid); - return; - } - - dout("%s %p tid %llu\n", __func__, req, req->r_tid); - list_del_init(&req->r_linger_item); - - if (req->r_osd) { - list_del_init(&req->r_linger_osd_item); - maybe_move_osd_to_lru(req->r_osd); - if (RB_EMPTY_ROOT(&req->r_osd->o_requests)) - req->r_osd = NULL; - } - ceph_osdc_put_request(req); -} - -void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, - struct ceph_osd_request *req) -{ - if (!req->r_linger) { - dout("set_request_linger %p\n", req); - req->r_linger = 1; - } -} -EXPORT_SYMBOL(ceph_osdc_set_request_linger); - static bool __pool_full(struct ceph_pg_pool_info *pi) { return pi->flags & CEPH_POOL_FLAG_FULL; @@ -1379,6 +1412,10 @@ static void setup_request_data(struct ceph_osd_request *req, op->xattr.value_len); ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); break; + case CEPH_OSD_OP_NOTIFY_ACK: + ceph_osdc_msg_data_add(msg, + &op->notify_ack.request_data); + break; /* reply */ case CEPH_OSD_OP_STAT: @@ -1684,6 +1721,460 @@ static void cancel_request(struct ceph_osd_request *req) } /* + * lingering requests, watch/notify v2 infrastructure + */ +static void linger_release(struct kref *kref) +{ + struct ceph_osd_linger_request *lreq = + container_of(kref, struct ceph_osd_linger_request, kref); + + dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, + lreq->reg_req, lreq->ping_req); + WARN_ON(!RB_EMPTY_NODE(&lreq->node)); + WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); + WARN_ON(!list_empty(&lreq->scan_item)); + WARN_ON(lreq->osd); + + if (lreq->reg_req) + ceph_osdc_put_request(lreq->reg_req); + if (lreq->ping_req) + ceph_osdc_put_request(lreq->ping_req); + target_destroy(&lreq->t); + kfree(lreq); +} + +static void linger_put(struct ceph_osd_linger_request *lreq) +{ + if (lreq) + kref_put(&lreq->kref, linger_release); +} + +static struct ceph_osd_linger_request * +linger_get(struct ceph_osd_linger_request *lreq) +{ + kref_get(&lreq->kref); + return lreq; +} + +static struct ceph_osd_linger_request * +linger_alloc(struct ceph_osd_client *osdc) +{ + struct ceph_osd_linger_request *lreq; + + lreq = kzalloc(sizeof(*lreq), GFP_NOIO); + if (!lreq) + return NULL; + + kref_init(&lreq->kref); + mutex_init(&lreq->lock); + RB_CLEAR_NODE(&lreq->node); + RB_CLEAR_NODE(&lreq->osdc_node); + INIT_LIST_HEAD(&lreq->scan_item); + init_completion(&lreq->reg_commit_wait); + + lreq->osdc = osdc; + target_init(&lreq->t); + + dout("%s lreq %p\n", __func__, lreq); + return lreq; +} + +DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) +DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) + +/* + * Create linger request <-> OSD session relation. + * + * @lreq has to be registered, @osd may be homeless. + */ +static void link_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq) +{ + verify_osd_locked(osd); + WARN_ON(!lreq->linger_id || lreq->osd); + dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, + osd->o_osd, lreq, lreq->linger_id); + + if (!osd_homeless(osd)) + __remove_osd_from_lru(osd); + else + atomic_inc(&osd->o_osdc->num_homeless); + + get_osd(osd); + insert_linger(&osd->o_linger_requests, lreq); + lreq->osd = osd; +} + +static void unlink_linger(struct ceph_osd *osd, + struct ceph_osd_linger_request *lreq) +{ + verify_osd_locked(osd); + WARN_ON(lreq->osd != osd); + dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, + osd->o_osd, lreq, lreq->linger_id); + + lreq->osd = NULL; + erase_linger(&osd->o_linger_requests, lreq); + put_osd(osd); + + if (!osd_homeless(osd)) + maybe_move_osd_to_lru(osd); + else + atomic_dec(&osd->o_osdc->num_homeless); +} + +static bool __linger_registered(struct ceph_osd_linger_request *lreq) +{ + verify_osdc_locked(lreq->osdc); + + return !RB_EMPTY_NODE(&lreq->osdc_node); +} + +static bool linger_registered(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + bool registered; + + down_read(&osdc->lock); + registered = __linger_registered(lreq); + up_read(&osdc->lock); + + return registered; +} + +static void linger_register(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + verify_osdc_wrlocked(osdc); + WARN_ON(lreq->linger_id); + + linger_get(lreq); + lreq->linger_id = ++osdc->last_linger_id; + insert_linger_osdc(&osdc->linger_requests, lreq); +} + +static void linger_unregister(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + verify_osdc_wrlocked(osdc); + + erase_linger_osdc(&osdc->linger_requests, lreq); + linger_put(lreq); +} + +static void cancel_linger_request(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + WARN_ON(!req->r_linger); + cancel_request(req); + linger_put(lreq); +} + +struct linger_work { + struct work_struct work; + struct ceph_osd_linger_request *lreq; + + union { + struct { + u64 notify_id; + u64 notifier_id; + void *payload; /* points into @msg front */ + size_t payload_len; + + struct ceph_msg *msg; /* for ceph_msg_put() */ + } notify; + struct { + int err; + } error; + }; +}; + +static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, + work_func_t workfn) +{ + struct linger_work *lwork; + + lwork = kzalloc(sizeof(*lwork), GFP_NOIO); + if (!lwork) + return NULL; + + INIT_WORK(&lwork->work, workfn); + lwork->lreq = linger_get(lreq); + + return lwork; +} + +static void lwork_free(struct linger_work *lwork) +{ + struct ceph_osd_linger_request *lreq = lwork->lreq; + + linger_put(lreq); + kfree(lwork); +} + +static void lwork_queue(struct linger_work *lwork) +{ + struct ceph_osd_linger_request *lreq = lwork->lreq; + struct ceph_osd_client *osdc = lreq->osdc; + + verify_lreq_locked(lreq); + queue_work(osdc->notify_wq, &lwork->work); +} + +static void do_watch_notify(struct work_struct *w) +{ + struct linger_work *lwork = container_of(w, struct linger_work, work); + struct ceph_osd_linger_request *lreq = lwork->lreq; + + if (!linger_registered(lreq)) { + dout("%s lreq %p not registered\n", __func__, lreq); + goto out; + } + + dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", + __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, + lwork->notify.payload_len); + lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, + lwork->notify.notifier_id, lwork->notify.payload, + lwork->notify.payload_len); + +out: + ceph_msg_put(lwork->notify.msg); + lwork_free(lwork); +} + +static void do_watch_error(struct work_struct *w) +{ + struct linger_work *lwork = container_of(w, struct linger_work, work); + struct ceph_osd_linger_request *lreq = lwork->lreq; + + if (!linger_registered(lreq)) { + dout("%s lreq %p not registered\n", __func__, lreq); + goto out; + } + + dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); + lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); + +out: + lwork_free(lwork); +} + +static void queue_watch_error(struct ceph_osd_linger_request *lreq) +{ + struct linger_work *lwork; + + lwork = lwork_alloc(lreq, do_watch_error); + if (!lwork) { + pr_err("failed to allocate error-lwork\n"); + return; + } + + lwork->error.err = lreq->last_error; + lwork_queue(lwork); +} + +static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, + int result) +{ + if (!completion_done(&lreq->reg_commit_wait)) { + lreq->reg_commit_error = (result <= 0 ? result : 0); + complete_all(&lreq->reg_commit_wait); + } +} + +static void linger_commit_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, + lreq->linger_id, req->r_result); + WARN_ON(!__linger_registered(lreq)); + linger_reg_commit_complete(lreq, req->r_result); + lreq->committed = true; + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static int normalize_watch_error(int err) +{ + /* + * Translate ENOENT -> ENOTCONN so that a delete->disconnection + * notification and a failure to reconnect because we raced with + * the delete appear the same to the user. + */ + if (err == -ENOENT) + err = -ENOTCONN; + + return err; +} + +static void linger_reconnect_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, + lreq, lreq->linger_id, req->r_result, lreq->last_error); + if (req->r_result < 0) { + if (!lreq->last_error) { + lreq->last_error = normalize_watch_error(req->r_result); + queue_watch_error(lreq); + } + } + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static void send_linger(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_request *req = lreq->reg_req; + struct ceph_osd_req_op *op = &req->r_ops[0]; + + verify_osdc_wrlocked(req->r_osdc); + dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); + + if (req->r_osd) + cancel_linger_request(req); + + request_reinit(req); + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + req->r_flags = lreq->t.flags; + req->r_mtime = lreq->mtime; + + mutex_lock(&lreq->lock); + if (lreq->committed) { + WARN_ON(op->op != CEPH_OSD_OP_WATCH || + op->watch.cookie != lreq->linger_id); + op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; + op->watch.gen = ++lreq->register_gen; + dout("lreq %p reconnect register_gen %u\n", lreq, + op->watch.gen); + req->r_callback = linger_reconnect_cb; + } else { + WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); + dout("lreq %p register\n", lreq); + req->r_callback = linger_commit_cb; + } + mutex_unlock(&lreq->lock); + + req->r_priv = linger_get(lreq); + req->r_linger = true; + + submit_request(req, true); +} + +static void linger_ping_cb(struct ceph_osd_request *req) +{ + struct ceph_osd_linger_request *lreq = req->r_priv; + + mutex_lock(&lreq->lock); + dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", + __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, + lreq->last_error); + if (lreq->register_gen == req->r_ops[0].watch.gen) { + if (req->r_result && !lreq->last_error) { + lreq->last_error = normalize_watch_error(req->r_result); + queue_watch_error(lreq); + } + } else { + dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, + lreq->register_gen, req->r_ops[0].watch.gen); + } + + mutex_unlock(&lreq->lock); + linger_put(lreq); +} + +static void send_linger_ping(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd_request *req = lreq->ping_req; + struct ceph_osd_req_op *op = &req->r_ops[0]; + + if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) { + dout("%s PAUSERD\n", __func__); + return; + } + + lreq->ping_sent = jiffies; + dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", + __func__, lreq, lreq->linger_id, lreq->ping_sent, + lreq->register_gen); + + if (req->r_osd) + cancel_linger_request(req); + + request_reinit(req); + target_copy(&req->r_t, &lreq->t); + + WARN_ON(op->op != CEPH_OSD_OP_WATCH || + op->watch.cookie != lreq->linger_id || + op->watch.op != CEPH_OSD_WATCH_OP_PING); + op->watch.gen = lreq->register_gen; + req->r_callback = linger_ping_cb; + req->r_priv = linger_get(lreq); + req->r_linger = true; + + ceph_osdc_get_request(req); + account_request(req); + req->r_tid = atomic64_inc_return(&osdc->last_tid); + link_request(lreq->osd, req); + send_request(req); +} + +static void linger_submit(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + struct ceph_osd *osd; + + calc_target(osdc, &lreq->t, &lreq->last_force_resend, false); + osd = lookup_create_osd(osdc, lreq->t.osd, true); + link_linger(osd, lreq); + + send_linger(lreq); +} + +/* + * @lreq has to be both registered and linked. + */ +static void __linger_cancel(struct ceph_osd_linger_request *lreq) +{ + if (lreq->ping_req->r_osd) + cancel_linger_request(lreq->ping_req); + if (lreq->reg_req->r_osd) + cancel_linger_request(lreq->reg_req); + unlink_linger(lreq->osd, lreq); + linger_unregister(lreq); +} + +static void linger_cancel(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + + down_write(&osdc->lock); + if (__linger_registered(lreq)) + __linger_cancel(lreq); + up_write(&osdc->lock); +} + +static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) +{ + int ret; + + dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); + ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); + return ret ?: lreq->reg_commit_error; +} + +/* * Timeout callback, called every N seconds. When 1 or more OSD * requests has been active for more than N seconds, we send a keepalive * (tag + timestamp) to its OSD to ensure any communications channel @@ -1720,6 +2211,19 @@ static void handle_timeout(struct work_struct *work) found = true; } } + for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { + struct ceph_osd_linger_request *lreq = + rb_entry(p, struct ceph_osd_linger_request, node); + + dout(" lreq %p linger_id %llu is served by osd%d\n", + lreq, lreq->linger_id, osd->o_osd); + found = true; + + mutex_lock(&lreq->lock); + if (lreq->committed && !lreq->last_error) + send_linger_ping(lreq); + mutex_unlock(&lreq->lock); + } if (found) list_move_tail(&osd->o_keepalive_item, &slow_osds); @@ -1756,7 +2260,7 @@ static void handle_osds_timeout(struct work_struct *work) break; WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); - WARN_ON(!list_empty(&osd->o_linger_requests)); + WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); close_osd(osd); } @@ -2082,7 +2586,8 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) __finish_request(req); if (req->r_linger) { WARN_ON(req->r_unsafe_callback); - __register_linger_request(osd, req); + dout("req %p tid %llu cb (locked)\n", req, req->r_tid); + __complete_request(req); } } @@ -2093,7 +2598,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) if (already_acked && req->r_unsafe_callback) { dout("req %p tid %llu safe-cb\n", req, req->r_tid); req->r_unsafe_callback(req, false); - } else { + } else if (!req->r_linger) { dout("req %p tid %llu cb\n", req, req->r_tid); __complete_request(req); } @@ -2145,6 +2650,26 @@ static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) return pi->was_full && !__pool_full(pi); } +static enum calc_target_result +recalc_linger_target(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_client *osdc = lreq->osdc; + enum calc_target_result ct_res; + + ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true); + if (ct_res == CALC_TARGET_NEED_RESEND) { + struct ceph_osd *osd; + + osd = lookup_create_osd(osdc, lreq->t.osd, true); + if (osd != lreq->osd) { + unlink_linger(lreq->osd, lreq); + link_linger(osd, lreq); + } + } + + return ct_res; +} + /* * Requeue requests whose mapping to an OSD has changed. */ @@ -2159,6 +2684,39 @@ static void scan_requests(struct ceph_osd *osd, struct rb_node *n; bool force_resend_writes; + for (n = rb_first(&osd->o_linger_requests); n; ) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + enum calc_target_result ct_res; + + n = rb_next(n); /* recalc_linger_target() */ + + dout("%s lreq %p linger_id %llu\n", __func__, lreq, + lreq->linger_id); + ct_res = recalc_linger_target(lreq); + switch (ct_res) { + case CALC_TARGET_NO_ACTION: + force_resend_writes = cleared_full || + (check_pool_cleared_full && + pool_cleared_full(osdc, lreq->t.base_oloc.pool)); + if (!force_resend && !force_resend_writes) + break; + + /* fall through */ + case CALC_TARGET_NEED_RESEND: + /* + * scan_requests() for the previous epoch(s) + * may have already added it to the list, since + * it's not unlinked here. + */ + if (list_empty(&lreq->scan_item)) + list_add_tail(&lreq->scan_item, need_resend_linger); + break; + case CALC_TARGET_POOL_DNE: + break; + } + } + for (n = rb_first(&osd->o_requests); n; ) { struct ceph_osd_request *req = rb_entry(n, struct ceph_osd_request, r_node); @@ -2263,6 +2821,7 @@ static void kick_requests(struct ceph_osd_client *osdc, struct rb_root *need_resend, struct list_head *need_resend_linger) { + struct ceph_osd_linger_request *lreq, *nlreq; struct rb_node *n; for (n = rb_first(need_resend); n; ) { @@ -2280,8 +2839,17 @@ static void kick_requests(struct ceph_osd_client *osdc, if (!req->r_linger) { if (!osd_homeless(osd) && !req->r_t.paused) send_request(req); + } else { + cancel_linger_request(req); } } + + list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { + if (!osd_homeless(lreq->osd)) + send_linger(lreq); + + list_del_init(&lreq->scan_item); + } } /* @@ -2406,15 +2974,25 @@ static void kick_osd_requests(struct ceph_osd *osd) { struct rb_node *n; - for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { + for (n = rb_first(&osd->o_requests); n; ) { struct ceph_osd_request *req = rb_entry(n, struct ceph_osd_request, r_node); + n = rb_next(n); /* cancel_linger_request() */ + if (!req->r_linger) { if (!req->r_t.paused) send_request(req); + } else { + cancel_linger_request(req); } } + for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { + struct ceph_osd_linger_request *lreq = + rb_entry(n, struct ceph_osd_linger_request, node); + + send_linger(lreq); + } } /* @@ -2442,192 +3020,76 @@ out_unlock: } /* - * watch/notify callback event infrastructure - * - * These callbacks are used both for watch and notify operations. - */ -static void __release_event(struct kref *kref) -{ - struct ceph_osd_event *event = - container_of(kref, struct ceph_osd_event, kref); - - dout("__release_event %p\n", event); - kfree(event); -} - -static void get_event(struct ceph_osd_event *event) -{ - kref_get(&event->kref); -} - -void ceph_osdc_put_event(struct ceph_osd_event *event) -{ - kref_put(&event->kref, __release_event); -} -EXPORT_SYMBOL(ceph_osdc_put_event); - -static void __insert_event(struct ceph_osd_client *osdc, - struct ceph_osd_event *new) -{ - struct rb_node **p = &osdc->event_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_osd_event *event = NULL; - - while (*p) { - parent = *p; - event = rb_entry(parent, struct ceph_osd_event, node); - if (new->cookie < event->cookie) - p = &(*p)->rb_left; - else if (new->cookie > event->cookie) - p = &(*p)->rb_right; - else - BUG(); - } - - rb_link_node(&new->node, parent, p); - rb_insert_color(&new->node, &osdc->event_tree); -} - -static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, - u64 cookie) -{ - struct rb_node **p = &osdc->event_tree.rb_node; - struct rb_node *parent = NULL; - struct ceph_osd_event *event = NULL; - - while (*p) { - parent = *p; - event = rb_entry(parent, struct ceph_osd_event, node); - if (cookie < event->cookie) - p = &(*p)->rb_left; - else if (cookie > event->cookie) - p = &(*p)->rb_right; - else - return event; - } - return NULL; -} - -static void __remove_event(struct ceph_osd_event *event) -{ - struct ceph_osd_client *osdc = event->osdc; - - if (!RB_EMPTY_NODE(&event->node)) { - dout("__remove_event removed %p\n", event); - rb_erase(&event->node, &osdc->event_tree); - ceph_osdc_put_event(event); - } else { - dout("__remove_event didn't remove %p\n", event); - } -} - -int ceph_osdc_create_event(struct ceph_osd_client *osdc, - void (*event_cb)(u64, u64, u8, void *), - void *data, struct ceph_osd_event **pevent) -{ - struct ceph_osd_event *event; - - event = kmalloc(sizeof(*event), GFP_NOIO); - if (!event) - return -ENOMEM; - - dout("create_event %p\n", event); - event->cb = event_cb; - event->one_shot = 0; - event->data = data; - event->osdc = osdc; - INIT_LIST_HEAD(&event->osd_node); - RB_CLEAR_NODE(&event->node); - kref_init(&event->kref); /* one ref for us */ - kref_get(&event->kref); /* one ref for the caller */ - - spin_lock(&osdc->event_lock); - event->cookie = ++osdc->event_count; - __insert_event(osdc, event); - spin_unlock(&osdc->event_lock); - - *pevent = event; - return 0; -} -EXPORT_SYMBOL(ceph_osdc_create_event); - -void ceph_osdc_cancel_event(struct ceph_osd_event *event) -{ - struct ceph_osd_client *osdc = event->osdc; - - dout("cancel_event %p\n", event); - spin_lock(&osdc->event_lock); - __remove_event(event); - spin_unlock(&osdc->event_lock); - ceph_osdc_put_event(event); /* caller's */ -} -EXPORT_SYMBOL(ceph_osdc_cancel_event); - - -static void do_event_work(struct work_struct *work) -{ - struct ceph_osd_event_work *event_work = - container_of(work, struct ceph_osd_event_work, work); - struct ceph_osd_event *event = event_work->event; - u64 ver = event_work->ver; - u64 notify_id = event_work->notify_id; - u8 opcode = event_work->opcode; - - dout("do_event_work completing %p\n", event); - event->cb(ver, notify_id, opcode, event->data); - dout("do_event_work completed %p\n", event); - ceph_osdc_put_event(event); - kfree(event_work); -} - - -/* * Process osd watch notifications */ static void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) { - void *p, *end; - u8 proto_ver; - u64 cookie, ver, notify_id; - u8 opcode; - struct ceph_osd_event *event; - struct ceph_osd_event_work *event_work; - - p = msg->front.iov_base; - end = p + msg->front.iov_len; + void *p = msg->front.iov_base; + void *const end = p + msg->front.iov_len; + struct ceph_osd_linger_request *lreq; + struct linger_work *lwork; + u8 proto_ver, opcode; + u64 cookie, notify_id; + u64 notifier_id = 0; + void *payload = NULL; + u32 payload_len = 0; ceph_decode_8_safe(&p, end, proto_ver, bad); ceph_decode_8_safe(&p, end, opcode, bad); ceph_decode_64_safe(&p, end, cookie, bad); - ceph_decode_64_safe(&p, end, ver, bad); + p += 8; /* skip ver */ ceph_decode_64_safe(&p, end, notify_id, bad); - spin_lock(&osdc->event_lock); - event = __find_event(osdc, cookie); - if (event) { - BUG_ON(event->one_shot); - get_event(event); - } - spin_unlock(&osdc->event_lock); - dout("handle_watch_notify cookie %lld ver %lld event %p\n", - cookie, ver, event); - if (event) { - event_work = kmalloc(sizeof(*event_work), GFP_NOIO); - if (!event_work) { - pr_err("couldn't allocate event_work\n"); - ceph_osdc_put_event(event); - return; + if (proto_ver >= 1) { + ceph_decode_32_safe(&p, end, payload_len, bad); + ceph_decode_need(&p, end, payload_len, bad); + payload = p; + p += payload_len; + } + + if (le16_to_cpu(msg->hdr.version) >= 2) + p += 4; /* skip return_code */ + + if (le16_to_cpu(msg->hdr.version) >= 3) + ceph_decode_64_safe(&p, end, notifier_id, bad); + + down_read(&osdc->lock); + lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); + if (!lreq) { + dout("%s opcode %d cookie %llu dne\n", __func__, opcode, + cookie); + goto out_unlock_osdc; + } + + mutex_lock(&lreq->lock); + dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie, + lreq); + if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { + if (!lreq->last_error) { + lreq->last_error = -ENOTCONN; + queue_watch_error(lreq); + } + } else { + /* CEPH_WATCH_EVENT_NOTIFY */ + lwork = lwork_alloc(lreq, do_watch_notify); + if (!lwork) { + pr_err("failed to allocate notify-lwork\n"); + goto out_unlock_lreq; } - INIT_WORK(&event_work->work, do_event_work); - event_work->event = event; - event_work->ver = ver; - event_work->notify_id = notify_id; - event_work->opcode = opcode; - queue_work(osdc->notify_wq, &event_work->work); + lwork->notify.notify_id = notify_id; + lwork->notify.notifier_id = notifier_id; + lwork->notify.payload = payload; + lwork->notify.payload_len = payload_len; + lwork->notify.msg = ceph_msg_get(msg); + lwork_queue(lwork); } +out_unlock_lreq: + mutex_unlock(&lreq->lock); +out_unlock_osdc: + up_read(&osdc->lock); return; bad: @@ -2659,8 +3121,6 @@ void ceph_osdc_cancel_request(struct ceph_osd_request *req) struct ceph_osd_client *osdc = req->r_osdc; down_write(&osdc->lock); - if (req->r_linger) - __unregister_linger_request(osdc, req); if (req->r_osd) cancel_request(req); up_write(&osdc->lock); @@ -2743,6 +3203,198 @@ again: } EXPORT_SYMBOL(ceph_osdc_sync); +static struct ceph_osd_request * +alloc_linger_request(struct ceph_osd_linger_request *lreq) +{ + struct ceph_osd_request *req; + + req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return NULL; + + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + + if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { + ceph_osdc_put_request(req); + return NULL; + } + + return req; +} + +/* + * Returns a handle, caller owns a ref. + */ +struct ceph_osd_linger_request * +ceph_osdc_watch(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + rados_watchcb2_t wcb, + rados_watcherrcb_t errcb, + void *data) +{ + struct ceph_osd_linger_request *lreq; + int ret; + + lreq = linger_alloc(osdc); + if (!lreq) + return ERR_PTR(-ENOMEM); + + lreq->wcb = wcb; + lreq->errcb = errcb; + lreq->data = data; + + ceph_oid_copy(&lreq->t.base_oid, oid); + ceph_oloc_copy(&lreq->t.base_oloc, oloc); + lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + lreq->mtime = CURRENT_TIME; + + lreq->reg_req = alloc_linger_request(lreq); + if (!lreq->reg_req) { + ret = -ENOMEM; + goto err_put_lreq; + } + + lreq->ping_req = alloc_linger_request(lreq); + if (!lreq->ping_req) { + ret = -ENOMEM; + goto err_put_lreq; + } + + down_write(&osdc->lock); + linger_register(lreq); /* before osd_req_op_* */ + osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_WATCH); + osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_PING); + linger_submit(lreq); + up_write(&osdc->lock); + + ret = linger_reg_commit_wait(lreq); + if (ret) { + linger_cancel(lreq); + goto err_put_lreq; + } + + return lreq; + +err_put_lreq: + linger_put(lreq); + return ERR_PTR(ret); +} +EXPORT_SYMBOL(ceph_osdc_watch); + +/* + * Releases a ref. + * + * Times out after mount_timeout to preserve rbd unmap behaviour + * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap + * with mount_timeout"). + */ +int ceph_osdc_unwatch(struct ceph_osd_client *osdc, + struct ceph_osd_linger_request *lreq) +{ + struct ceph_options *opts = osdc->client->options; + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); + ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); + req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + req->r_mtime = CURRENT_TIME; + osd_req_op_watch_init(req, 0, lreq->linger_id, + CEPH_OSD_WATCH_OP_UNWATCH); + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + ceph_osdc_start_request(osdc, req, false); + linger_cancel(lreq); + linger_put(lreq); + ret = wait_request_timeout(req, opts->mount_timeout); + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_unwatch); + +static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, + u64 notify_id, u64 cookie, void *payload, + size_t payload_len) +{ + struct ceph_osd_req_op *op; + struct ceph_pagelist *pl; + int ret; + + op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); + + pl = kmalloc(sizeof(*pl), GFP_NOIO); + if (!pl) + return -ENOMEM; + + ceph_pagelist_init(pl); + ret = ceph_pagelist_encode_64(pl, notify_id); + ret |= ceph_pagelist_encode_64(pl, cookie); + if (payload) { + ret |= ceph_pagelist_encode_32(pl, payload_len); + ret |= ceph_pagelist_append(pl, payload, payload_len); + } else { + ret |= ceph_pagelist_encode_32(pl, 0); + } + if (ret) { + ceph_pagelist_release(pl); + return -ENOMEM; + } + + ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); + op->indata_len = pl->length; + return 0; +} + +int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + u64 notify_id, + u64 cookie, + void *payload, + size_t payload_len) +{ + struct ceph_osd_request *req; + int ret; + + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); + if (!req) + return -ENOMEM; + + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; + + ret = ceph_osdc_alloc_messages(req, GFP_NOIO); + if (ret) + goto out_put_req; + + ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, + payload_len); + if (ret) + goto out_put_req; + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + +out_put_req: + ceph_osdc_put_request(req); + return ret; +} +EXPORT_SYMBOL(ceph_osdc_notify_ack); + /* * Call all pending notify callbacks - for use after a watch is * unregistered, to make sure no more callbacks for it will be invoked @@ -2767,15 +3419,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->osds = RB_ROOT; INIT_LIST_HEAD(&osdc->osd_lru); spin_lock_init(&osdc->osd_lru_lock); - INIT_LIST_HEAD(&osdc->req_linger); osd_init(&osdc->homeless_osd); osdc->homeless_osd.o_osdc = osdc; osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; + osdc->linger_requests = RB_ROOT; INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); - spin_lock_init(&osdc->event_lock); - osdc->event_tree = RB_ROOT; - osdc->event_count = 0; err = -ENOMEM; osdc->osdmap = ceph_osdmap_alloc(); @@ -2838,6 +3487,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) osd_cleanup(&osdc->homeless_osd); WARN_ON(!list_empty(&osdc->osd_lru)); + WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); WARN_ON(atomic_read(&osdc->num_requests)); WARN_ON(atomic_read(&osdc->num_homeless)); |