diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2012-07-31 14:35:28 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2012-07-31 14:35:28 -0700 |
commit | cc8362b1f6d724e46f515121d442779924b19fec (patch) | |
tree | 86fb5c3767e538ec9ded57dd7b3ce5d69dcde691 /net/ceph/osd_client.c | |
parent | 2e3ee613480563a6d5c01b57d342e65cc58c06df (diff) | |
parent | 1fe5e9932156f6122c3b1ff6ba7541c27c86718c (diff) | |
download | linux-cc8362b1f6d724e46f515121d442779924b19fec.tar.bz2 |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph changes from Sage Weil:
"Lots of stuff this time around:
- lots of cleanup and refactoring in the libceph messenger code, and
many hard to hit races and bugs closed as a result.
- lots of cleanup and refactoring in the rbd code from Alex Elder,
mostly in preparation for the layering functionality that will be
coming in 3.7.
- some misc rbd cleanups from Josh Durgin that are finally going
upstream
- support for CRUSH tunables (used by newer clusters to improve the
data placement)
- some cleanup in our use of d_parent that Al brought up a while back
- a random collection of fixes across the tree
There is another patch coming that fixes up our ->atomic_open()
behavior, but I'm going to hammer on it a bit more before sending it."
Fix up conflicts due to commits that were already committed earlier in
drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c}
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits)
rbd: create rbd_refresh_helper()
rbd: return obj version in __rbd_refresh_header()
rbd: fixes in rbd_header_from_disk()
rbd: always pass ops array to rbd_req_sync_op()
rbd: pass null version pointer in add_snap()
rbd: make rbd_create_rw_ops() return a pointer
rbd: have __rbd_add_snap_dev() return a pointer
libceph: recheck con state after allocating incoming message
libceph: change ceph_con_in_msg_alloc convention to be less weird
libceph: avoid dropping con mutex before fault
libceph: verify state after retaking con lock after dispatch
libceph: revoke mon_client messages on session restart
libceph: fix handling of immediate socket connect failure
ceph: update MAINTAINERS file
libceph: be less chatty about stray replies
libceph: clear all flags on con_close
libceph: clean up con flags
libceph: replace connection state bits with states
libceph: drop unnecessary CLOSED check in socket state change callback
libceph: close socket directly from ceph_con_close()
...
Diffstat (limited to 'net/ceph/osd_client.c')
-rw-r--r-- | net/ceph/osd_client.c | 77 |
1 files changed, 48 insertions, 29 deletions
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ca59e66c9787..42119c05e82c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -140,10 +140,9 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); if (req->r_con_filling_msg) { - dout("release_request revoking pages %p from con %p\n", + dout("%s revoking pages %p from con %p\n", __func__, req->r_pages, req->r_con_filling_msg); - ceph_con_revoke_message(req->r_con_filling_msg, - req->r_reply); + ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); } if (req->r_reply) @@ -214,10 +213,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, kref_init(&req->r_kref); init_completion(&req->r_completion); init_completion(&req->r_safe_completion); + rb_init_node(&req->r_node); INIT_LIST_HEAD(&req->r_unsafe_item); INIT_LIST_HEAD(&req->r_linger_item); INIT_LIST_HEAD(&req->r_linger_osd); INIT_LIST_HEAD(&req->r_req_lru_item); + INIT_LIST_HEAD(&req->r_osd_item); + req->r_flags = flags; WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); @@ -243,6 +245,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } ceph_pagelist_init(req->r_trail); } + /* create request message; allow space for oid */ msg_size += MAX_OBJ_NAME_SIZE; if (snapc) @@ -256,7 +259,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, return NULL; } - msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); memset(msg->front.iov_base, 0, msg->front.iov_len); req->r_request = msg; @@ -624,7 +626,7 @@ static void osd_reset(struct ceph_connection *con) /* * Track open sessions with osds. */ -static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) +static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) { struct ceph_osd *osd; @@ -634,15 +636,13 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) atomic_set(&osd->o_ref, 1); osd->o_osdc = osdc; + osd->o_osd = onum; INIT_LIST_HEAD(&osd->o_requests); INIT_LIST_HEAD(&osd->o_linger_requests); INIT_LIST_HEAD(&osd->o_osd_lru); osd->o_incarnation = 1; - ceph_con_init(osdc->client->msgr, &osd->o_con); - osd->o_con.private = osd; - osd->o_con.ops = &osd_con_ops; - osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; + ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); INIT_LIST_HEAD(&osd->o_keepalive_item); return osd; @@ -688,7 +688,7 @@ static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) static void remove_all_osds(struct ceph_osd_client *osdc) { - dout("__remove_old_osds %p\n", osdc); + dout("%s %p\n", __func__, osdc); mutex_lock(&osdc->request_mutex); while (!RB_EMPTY_ROOT(&osdc->osds)) { struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), @@ -752,7 +752,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) ret = -EAGAIN; } else { ceph_con_close(&osd->o_con); - ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, + &osdc->osdmap->osd_addr[osd->o_osd]); osd->o_incarnation++; } return ret; @@ -853,7 +854,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, if (req->r_osd) { /* make sure the original request isn't in flight. */ - ceph_con_revoke(&req->r_osd->o_con, req->r_request); + ceph_msg_revoke(req->r_request); list_del_init(&req->r_osd_item); if (list_empty(&req->r_osd->o_requests) && @@ -880,7 +881,7 @@ static void __unregister_request(struct ceph_osd_client *osdc, static void __cancel_request(struct ceph_osd_request *req) { if (req->r_sent && req->r_osd) { - ceph_con_revoke(&req->r_osd->o_con, req->r_request); + ceph_msg_revoke(req->r_request); req->r_sent = 0; } } @@ -890,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc, { dout("__register_linger_request %p\n", req); list_add_tail(&req->r_linger_item, &osdc->req_linger); - list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); + if (req->r_osd) + list_add_tail(&req->r_linger_osd, + &req->r_osd->o_linger_requests); } static void __unregister_linger_request(struct ceph_osd_client *osdc, @@ -998,18 +1001,18 @@ static int __map_request(struct ceph_osd_client *osdc, req->r_osd = __lookup_osd(osdc, o); if (!req->r_osd && o >= 0) { err = -ENOMEM; - req->r_osd = create_osd(osdc); + req->r_osd = create_osd(osdc, o); if (!req->r_osd) { list_move(&req->r_req_lru_item, &osdc->req_notarget); goto out; } dout("map_request osd %p is osd%d\n", req->r_osd, o); - req->r_osd->o_osd = o; - req->r_osd->o_con.peer_name.num = cpu_to_le64(o); __insert_osd(osdc, req->r_osd); - ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); + ceph_con_open(&req->r_osd->o_con, + CEPH_ENTITY_TYPE_OSD, o, + &osdc->osdmap->osd_addr[o]); } if (req->r_osd) { @@ -1304,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); mutex_lock(&osdc->request_mutex); - for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { + for (p = rb_first(&osdc->requests); p; ) { req = rb_entry(p, struct ceph_osd_request, r_node); + p = rb_next(p); err = __map_request(osdc, req, force_resend); if (err < 0) continue; /* error */ @@ -1313,10 +1317,23 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) dout("%p tid %llu maps to no osd\n", req, req->r_tid); needmap++; /* request a newer map */ } else if (err > 0) { - dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, - req->r_osd ? req->r_osd->o_osd : -1); - if (!req->r_linger) + if (!req->r_linger) { + dout("%p tid %llu requeued on osd%d\n", req, + req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); req->r_flags |= CEPH_OSD_FLAG_RETRY; + } + } + if (req->r_linger && list_empty(&req->r_linger_item)) { + /* + * register as a linger so that we will + * re-submit below and get a new tid + */ + dout("%p tid %llu restart on osd%d\n", + req, req->r_tid, + req->r_osd ? req->r_osd->o_osd : -1); + __register_linger_request(osdc, req); + __unregister_request(osdc, req); } } @@ -1391,7 +1408,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) epoch, maplen); newmap = osdmap_apply_incremental(&p, next, osdc->osdmap, - osdc->client->msgr); + &osdc->client->msgr); if (IS_ERR(newmap)) { err = PTR_ERR(newmap); goto bad; @@ -1839,11 +1856,12 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->req_mempool) goto out; - err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, + err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, + OSD_OP_FRONT_LEN, 10, true, "osd_op"); if (err < 0) goto out_mempool; - err = ceph_msgpool_init(&osdc->msgpool_op_reply, + err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, OSD_OPREPLY_FRONT_LEN, 10, true, "osd_op_reply"); if (err < 0) @@ -2019,15 +2037,15 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (!req) { *skip = 1; m = NULL; - pr_info("get_reply unknown tid %llu from osd%d\n", tid, - osd->o_osd); + dout("get_reply unknown tid %llu from osd%d\n", tid, + osd->o_osd); goto out; } if (req->r_con_filling_msg) { - dout("get_reply revoking msg %p from old con %p\n", + dout("%s revoking msg %p from old con %p\n", __func__, req->r_reply, req->r_con_filling_msg); - ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); + ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); req->r_con_filling_msg = NULL; } @@ -2080,6 +2098,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con, int type = le16_to_cpu(hdr->type); int front = le32_to_cpu(hdr->front_len); + *skip = 0; switch (type) { case CEPH_MSG_OSD_MAP: case CEPH_MSG_WATCH_NOTIFY: |