diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2013-02-28 17:43:09 -0800 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2013-02-28 17:43:09 -0800 |
commit | 1cf0209c431fa7790253c532039d53b0773193aa (patch) | |
tree | 24310eaaf4c9583988d9098f6c85a4a34970b5b9 /net | |
parent | de1a2262b006220dae2561a299a6ea128c46f4fe (diff) | |
parent | 83ca14fdd35821554058e5fd4fa7b118ee504a33 (diff) | |
download | linux-1cf0209c431fa7790253c532039d53b0773193aa.tar.bz2 |
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil:
"A few groups of patches here. Alex has been hard at work improving
the RBD code, layout groundwork for understanding the new formats and
doing layering. Most of the infrastructure is now in place for the
final bits that will come with the next window.
There are a few changes to the data layout. Jim Schutt's patch fixes
some non-ideal CRUSH behavior, and a set of patches from me updates
the client to speak a newer version of the protocol and implement an
improved hashing strategy across storage nodes (when the server side
supports it too).
A pair of patches from Sam Lang fix the atomicity of open+create
operations. Several patches from Yan, Zheng fix various mds/client
issues that turned up during multi-mds torture tests.
A final set of patches expose file layouts via virtual xattrs, and
allow the policies to be set on directories via xattrs as well
(avoiding the awkward ioctl interface and providing a consistent
interface for both kernel mount and ceph-fuse users)."
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (143 commits)
libceph: add support for HASHPSPOOL pool flag
libceph: update osd request/reply encoding
libceph: calculate placement based on the internal data types
ceph: update support for PGID64, PGPOOL3, OSDENC protocol features
ceph: update "ceph_features.h"
libceph: decode into cpu-native ceph_pg type
libceph: rename ceph_pg -> ceph_pg_v1
rbd: pass length, not op for osd completions
rbd: move rbd_osd_trivial_callback()
libceph: use a do..while loop in con_work()
libceph: use a flag to indicate a fault has occurred
libceph: separate non-locked fault handling
libceph: encapsulate connection backoff
libceph: eliminate sparse warnings
ceph: eliminate sparse warnings in fs code
rbd: eliminate sparse warnings
libceph: define connection flag helpers
rbd: normalize dout() calls
rbd: barriers are hard
rbd: ignore zero-length requests
...
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/ceph_common.c | 22 | ||||
-rw-r--r-- | net/ceph/ceph_strings.c | 39 | ||||
-rw-r--r-- | net/ceph/crush/mapper.c | 15 | ||||
-rw-r--r-- | net/ceph/crypto.c | 7 | ||||
-rw-r--r-- | net/ceph/debugfs.c | 29 | ||||
-rw-r--r-- | net/ceph/messenger.c | 260 | ||||
-rw-r--r-- | net/ceph/mon_client.c | 2 | ||||
-rw-r--r-- | net/ceph/osd_client.c | 635 | ||||
-rw-r--r-- | net/ceph/osdmap.c | 290 | ||||
-rw-r--r-- | net/ceph/pagevec.c | 24 |
10 files changed, 749 insertions, 574 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 1deb29af82fd..e65e6e4be38b 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -28,6 +28,22 @@ #include "crypto.h" +/* + * Module compatibility interface. For now it doesn't do anything, + * but its existence signals a certain level of functionality. + * + * The data buffer is used to pass information both to and from + * libceph. The return value indicates whether libceph determines + * it is compatible with the caller (from another kernel module), + * given the provided data. + * + * The data pointer can be null. + */ +bool libceph_compatible(void *data) +{ + return true; +} +EXPORT_SYMBOL(libceph_compatible); /* * find filename portion of a path (/foo/bar/baz -> baz) @@ -590,10 +606,8 @@ static int __init init_ceph_lib(void) if (ret < 0) goto out_crypto; - pr_info("loaded (mon/osd proto %d/%d, osdmap %d/%d %d/%d)\n", - CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL, - CEPH_OSDMAP_VERSION, CEPH_OSDMAP_VERSION_EXT, - CEPH_OSDMAP_INC_VERSION, CEPH_OSDMAP_INC_VERSION_EXT); + pr_info("loaded (mon/osd proto %d/%d)\n", + CEPH_MONC_PROTOCOL, CEPH_OSDC_PROTOCOL); return 0; diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c index 3fbda04de29c..1348df96fe15 100644 --- a/net/ceph/ceph_strings.c +++ b/net/ceph/ceph_strings.c @@ -21,9 +21,15 @@ const char *ceph_osd_op_name(int op) switch (op) { case CEPH_OSD_OP_READ: return "read"; case CEPH_OSD_OP_STAT: return "stat"; + case CEPH_OSD_OP_MAPEXT: return "mapext"; + case CEPH_OSD_OP_SPARSE_READ: return "sparse-read"; + case CEPH_OSD_OP_NOTIFY: return "notify"; + case CEPH_OSD_OP_NOTIFY_ACK: return "notify-ack"; + case CEPH_OSD_OP_ASSERT_VER: return "assert-version"; case CEPH_OSD_OP_MASKTRUNC: return "masktrunc"; + case CEPH_OSD_OP_CREATE: return "create"; case CEPH_OSD_OP_WRITE: return "write"; case CEPH_OSD_OP_DELETE: return "delete"; case CEPH_OSD_OP_TRUNCATE: return "truncate"; @@ -39,6 +45,11 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_TMAPUP: return "tmapup"; case CEPH_OSD_OP_TMAPGET: return "tmapget"; case CEPH_OSD_OP_TMAPPUT: return "tmapput"; + case CEPH_OSD_OP_WATCH: return "watch"; + + case CEPH_OSD_OP_CLONERANGE: return "clonerange"; + case CEPH_OSD_OP_ASSERT_SRC_VERSION: return "assert-src-version"; + case CEPH_OSD_OP_SRC_CMPXATTR: return "src-cmpxattr"; case CEPH_OSD_OP_GETXATTR: return "getxattr"; case CEPH_OSD_OP_GETXATTRS: return "getxattrs"; @@ -53,6 +64,10 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_BALANCEREADS: return "balance-reads"; case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads"; case CEPH_OSD_OP_SCRUB: return "scrub"; + case CEPH_OSD_OP_SCRUB_RESERVE: return "scrub-reserve"; + case CEPH_OSD_OP_SCRUB_UNRESERVE: return "scrub-unreserve"; + case CEPH_OSD_OP_SCRUB_STOP: return "scrub-stop"; + case CEPH_OSD_OP_SCRUB_MAP: return "scrub-map"; case CEPH_OSD_OP_WRLOCK: return "wrlock"; case CEPH_OSD_OP_WRUNLOCK: return "wrunlock"; @@ -64,10 +79,34 @@ const char *ceph_osd_op_name(int op) case CEPH_OSD_OP_CALL: return "call"; case CEPH_OSD_OP_PGLS: return "pgls"; + case CEPH_OSD_OP_PGLS_FILTER: return "pgls-filter"; + case CEPH_OSD_OP_OMAPGETKEYS: return "omap-get-keys"; + case CEPH_OSD_OP_OMAPGETVALS: return "omap-get-vals"; + case CEPH_OSD_OP_OMAPGETHEADER: return "omap-get-header"; + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: return "omap-get-vals-by-keys"; + case CEPH_OSD_OP_OMAPSETVALS: return "omap-set-vals"; + case CEPH_OSD_OP_OMAPSETHEADER: return "omap-set-header"; + case CEPH_OSD_OP_OMAPCLEAR: return "omap-clear"; + case CEPH_OSD_OP_OMAPRMKEYS: return "omap-rm-keys"; } return "???"; } +const char *ceph_osd_state_name(int s) +{ + switch (s) { + case CEPH_OSD_EXISTS: + return "exists"; + case CEPH_OSD_UP: + return "up"; + case CEPH_OSD_AUTOOUT: + return "autoout"; + case CEPH_OSD_NEW: + return "new"; + default: + return "???"; + } +} const char *ceph_pool_op_name(int op) { diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 35fce755ce10..cbd06a91941c 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -287,6 +287,7 @@ static int is_out(const struct crush_map *map, const __u32 *weight, int item, in * @outpos: our position in that vector * @firstn: true if choosing "first n" items, false if choosing "indep" * @recurse_to_leaf: true if we want one device under each item of given type + * @descend_once: true if we should only try one descent before giving up * @out2: second output vector for leaf items (if @recurse_to_leaf) */ static int crush_choose(const struct crush_map *map, @@ -295,7 +296,7 @@ static int crush_choose(const struct crush_map *map, int x, int numrep, int type, int *out, int outpos, int firstn, int recurse_to_leaf, - int *out2) + int descend_once, int *out2) { int rep; unsigned int ftotal, flocal; @@ -391,7 +392,7 @@ static int crush_choose(const struct crush_map *map, } reject = 0; - if (recurse_to_leaf) { + if (!collide && recurse_to_leaf) { if (item < 0) { if (crush_choose(map, map->buckets[-1-item], @@ -399,6 +400,7 @@ static int crush_choose(const struct crush_map *map, x, outpos+1, 0, out2, outpos, firstn, 0, + map->chooseleaf_descend_once, NULL) <= outpos) /* didn't get leaf */ reject = 1; @@ -422,7 +424,10 @@ reject: ftotal++; flocal++; - if (collide && flocal <= map->choose_local_tries) + if (reject && descend_once) + /* let outer call try again */ + skip_rep = 1; + else if (collide && flocal <= map->choose_local_tries) /* retry locally a few times */ retry_bucket = 1; else if (map->choose_local_fallback_tries > 0 && @@ -485,6 +490,7 @@ int crush_do_rule(const struct crush_map *map, int i, j; int numrep; int firstn; + const int descend_once = 0; if ((__u32)ruleno >= map->max_rules) { dprintk(" bad ruleno %d\n", ruleno); @@ -544,7 +550,8 @@ int crush_do_rule(const struct crush_map *map, curstep->arg2, o+osize, j, firstn, - recurse_to_leaf, c+osize); + recurse_to_leaf, + descend_once, c+osize); } if (recurse_to_leaf) diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index af14cb425164..6e7a236525b6 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -423,7 +423,8 @@ int ceph_encrypt2(struct ceph_crypto_key *secret, void *dst, size_t *dst_len, } } -int ceph_key_instantiate(struct key *key, struct key_preparsed_payload *prep) +static int ceph_key_instantiate(struct key *key, + struct key_preparsed_payload *prep) { struct ceph_crypto_key *ckey; size_t datalen = prep->datalen; @@ -458,12 +459,12 @@ err: return ret; } -int ceph_key_match(const struct key *key, const void *description) +static int ceph_key_match(const struct key *key, const void *description) { return strcmp(key->description, description) == 0; } -void ceph_key_destroy(struct key *key) { +static void ceph_key_destroy(struct key *key) { struct ceph_crypto_key *ckey = key->payload.data; ceph_crypto_key_destroy(ckey); diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 38b5dc1823d4..00d051f4894e 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -66,9 +66,9 @@ static int osdmap_show(struct seq_file *s, void *p) for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) { struct ceph_pg_pool_info *pool = rb_entry(n, struct ceph_pg_pool_info, node); - seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n", - pool->id, pool->v.pg_num, pool->pg_num_mask, - pool->v.lpg_num, pool->lpg_num_mask); + seq_printf(s, "pg_pool %llu pg_num %d / %d\n", + (unsigned long long)pool->id, pool->pg_num, + pool->pg_num_mask); } for (i = 0; i < client->osdc.osdmap->max_osd; i++) { struct ceph_entity_addr *addr = @@ -123,26 +123,16 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; - struct ceph_osd_request_head *head; - struct ceph_osd_op *op; - int num_ops; - int opcode, olen; + int opcode; int i; req = rb_entry(p, struct ceph_osd_request, r_node); - seq_printf(s, "%lld\tosd%d\t%d.%x\t", req->r_tid, + seq_printf(s, "%lld\tosd%d\t%lld.%x\t", req->r_tid, req->r_osd ? req->r_osd->o_osd : -1, - le32_to_cpu(req->r_pgid.pool), - le16_to_cpu(req->r_pgid.ps)); + req->r_pgid.pool, req->r_pgid.seed); - head = req->r_request->front.iov_base; - op = (void *)(head + 1); - - num_ops = le16_to_cpu(head->num_ops); - olen = le32_to_cpu(head->object_len); - seq_printf(s, "%.*s", olen, - (const char *)(head->ops + num_ops)); + seq_printf(s, "%.*s", req->r_oid_len, req->r_oid); if (req->r_reassert_version.epoch) seq_printf(s, "\t%u'%llu", @@ -151,10 +141,9 @@ static int osdc_show(struct seq_file *s, void *pp) else seq_printf(s, "\t"); - for (i = 0; i < num_ops; i++) { - opcode = le16_to_cpu(op->op); + for (i = 0; i < req->r_num_ops; i++) { + opcode = le16_to_cpu(req->r_request_ops[i].op); seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); - op++; } seq_printf(s, "\n"); diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 5ccf87ed8d68..2c0669fb54e3 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -9,8 +9,9 @@ #include <linux/slab.h> #include <linux/socket.h> #include <linux/string.h> +#ifdef CONFIG_BLOCK #include <linux/bio.h> -#include <linux/blkdev.h> +#endif /* CONFIG_BLOCK */ #include <linux/dns_resolver.h> #include <net/tcp.h> @@ -97,6 +98,57 @@ #define CON_FLAG_SOCK_CLOSED 3 /* socket state changed to closed */ #define CON_FLAG_BACKOFF 4 /* need to retry queuing delayed work */ +static bool con_flag_valid(unsigned long con_flag) +{ + switch (con_flag) { + case CON_FLAG_LOSSYTX: + case CON_FLAG_KEEPALIVE_PENDING: + case CON_FLAG_WRITE_PENDING: + case CON_FLAG_SOCK_CLOSED: + case CON_FLAG_BACKOFF: + return true; + default: + return false; + } +} + +static void con_flag_clear(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + clear_bit(con_flag, &con->flags); +} + +static void con_flag_set(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + set_bit(con_flag, &con->flags); +} + +static bool con_flag_test(struct ceph_connection *con, unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_bit(con_flag, &con->flags); +} + +static bool con_flag_test_and_clear(struct ceph_connection *con, + unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_and_clear_bit(con_flag, &con->flags); +} + +static bool con_flag_test_and_set(struct ceph_connection *con, + unsigned long con_flag) +{ + BUG_ON(!con_flag_valid(con_flag)); + + return test_and_set_bit(con_flag, &con->flags); +} + /* static tag bytes (protocol control messages) */ static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; @@ -114,7 +166,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void con_work(struct work_struct *); -static void ceph_fault(struct ceph_connection *con); +static void con_fault(struct ceph_connection *con); /* * Nicely render a sockaddr as a string. An array of formatted @@ -171,7 +223,7 @@ static void encode_my_addr(struct ceph_messenger *msgr) */ static struct workqueue_struct *ceph_msgr_wq; -void _ceph_msgr_exit(void) +static void _ceph_msgr_exit(void) { if (ceph_msgr_wq) { destroy_workqueue(ceph_msgr_wq); @@ -308,7 +360,7 @@ static void ceph_sock_write_space(struct sock *sk) * buffer. See net/ipv4/tcp_input.c:tcp_check_space() * and net/core/stream.c:sk_stream_write_space(). */ - if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) { + if (con_flag_test(con, CON_FLAG_WRITE_PENDING)) { if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { dout("%s %p queueing write work\n", __func__, con); clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -333,7 +385,7 @@ static void ceph_sock_state_change(struct sock *sk) case TCP_CLOSE_WAIT: dout("%s TCP_CLOSE_WAIT\n", __func__); con_sock_state_closing(con); - set_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_flag_set(con, CON_FLAG_SOCK_CLOSED); queue_con(con); break; case TCP_ESTABLISHED: @@ -474,7 +526,7 @@ static int con_close_socket(struct ceph_connection *con) * received a socket close event before we had the chance to * shut the socket down. */ - clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags); + con_flag_clear(con, CON_FLAG_SOCK_CLOSED); con_sock_state_closed(con); return rc; @@ -538,11 +590,10 @@ void ceph_con_close(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr)); con->state = CON_STATE_CLOSED; - clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */ - clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); - clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags); - clear_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_clear(con, CON_FLAG_LOSSYTX); /* so we retry next connect */ + con_flag_clear(con, CON_FLAG_KEEPALIVE_PENDING); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); + con_flag_clear(con, CON_FLAG_BACKOFF); reset_connection(con); con->peer_global_seq = 0; @@ -798,7 +849,7 @@ static void prepare_write_message(struct ceph_connection *con) /* no, queue up footer too and be done */ prepare_write_message_footer(con); - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -819,7 +870,7 @@ static void prepare_write_ack(struct ceph_connection *con) &con->out_temp_ack); con->out_more = 1; /* more will follow.. eventually.. */ - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -830,7 +881,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } /* @@ -873,7 +924,7 @@ static void prepare_write_banner(struct ceph_connection *con) &con->msgr->my_enc_addr); con->out_more = 0; - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); } static int prepare_write_connect(struct ceph_connection *con) @@ -923,7 +974,7 @@ static int prepare_write_connect(struct ceph_connection *con) auth->authorizer_buf); con->out_more = 0; - set_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_set(con, CON_FLAG_WRITE_PENDING); return 0; } @@ -1643,7 +1694,7 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->in_reply.connect_seq)); if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - set_bit(CON_FLAG_LOSSYTX, &con->flags); + con_flag_set(con, CON_FLAG_LOSSYTX); con->delay = 0; /* reset backoff memory */ @@ -2080,15 +2131,14 @@ do_next: prepare_write_ack(con); goto more; } - if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING, - &con->flags)) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { prepare_write_keepalive(con); goto more; } } /* Nothing to do! */ - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); dout("try_write nothing else to write.\n"); ret = 0; out: @@ -2268,7 +2318,7 @@ static void queue_con(struct ceph_connection *con) static bool con_sock_closed(struct ceph_connection *con) { - if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) + if (!con_flag_test_and_clear(con, CON_FLAG_SOCK_CLOSED)) return false; #define CASE(x) \ @@ -2295,6 +2345,41 @@ static bool con_sock_closed(struct ceph_connection *con) return true; } +static bool con_backoff(struct ceph_connection *con) +{ + int ret; + + if (!con_flag_test_and_clear(con, CON_FLAG_BACKOFF)) + return false; + + ret = queue_con_delay(con, round_jiffies_relative(con->delay)); + if (ret) { + dout("%s: con %p FAILED to back off %lu\n", __func__, + con, con->delay); + BUG_ON(ret == -ENOENT); + con_flag_set(con, CON_FLAG_BACKOFF); + } + + return true; +} + +/* Finish fault handling; con->mutex must *not* be held here */ + +static void con_fault_finish(struct ceph_connection *con) +{ + /* + * in case we faulted due to authentication, invalidate our + * current tickets so that we can get new ones. + */ + if (con->auth_retry && con->ops->invalidate_authorizer) { + dout("calling invalidate_authorizer()\n"); + con->ops->invalidate_authorizer(con); + } + + if (con->ops->fault) + con->ops->fault(con); +} + /* * Do some work on a connection. Drop a connection ref when we're done. */ @@ -2302,73 +2387,68 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); - int ret; + bool fault; mutex_lock(&con->mutex); -restart: - if (con_sock_closed(con)) - goto fault; + while (true) { + int ret; - if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) { - dout("con_work %p backing off\n", con); - ret = queue_con_delay(con, round_jiffies_relative(con->delay)); - if (ret) { - dout("con_work %p FAILED to back off %lu\n", con, - con->delay); - BUG_ON(ret == -ENOENT); - set_bit(CON_FLAG_BACKOFF, &con->flags); + if ((fault = con_sock_closed(con))) { + dout("%s: con %p SOCK_CLOSED\n", __func__, con); + break; + } + if (con_backoff(con)) { + dout("%s: con %p BACKOFF\n", __func__, con); + break; + } + if (con->state == CON_STATE_STANDBY) { + dout("%s: con %p STANDBY\n", __func__, con); + break; + } + if (con->state == CON_STATE_CLOSED) { + dout("%s: con %p CLOSED\n", __func__, con); + BUG_ON(con->sock); + break; + } + if (con->state == CON_STATE_PREOPEN) { + dout("%s: con %p PREOPEN\n", __func__, con); + BUG_ON(con->sock); } - goto done; - } - if (con->state == CON_STATE_STANDBY) { - dout("con_work %p STANDBY\n", con); - goto done; - } - if (con->state == CON_STATE_CLOSED) { - dout("con_work %p CLOSED\n", con); - BUG_ON(con->sock); - goto done; - } - if (con->state == CON_STATE_PREOPEN) { - dout("con_work OPENING\n"); - BUG_ON(con->sock); - } + ret = try_read(con); + if (ret < 0) { + if (ret == -EAGAIN) + continue; + con->error_msg = "socket error on read"; + fault = true; + break; + } - ret = try_read(con); - if (ret == -EAGAIN) - goto restart; - if (ret < 0) { - con->error_msg = "socket error on read"; - goto fault; - } + ret = try_write(con); + if (ret < 0) { + if (ret == -EAGAIN) + continue; + con->error_msg = "socket error on write"; + fault = true; + } - ret = try_write(con); - if (ret == -EAGAIN) - goto restart; - if (ret < 0) { - con->error_msg = "socket error on write"; - goto fault; + break; /* If we make it to here, we're done */ } - -done: + if (fault) + con_fault(con); mutex_unlock(&con->mutex); -done_unlocked: - con->ops->put(con); - return; -fault: - ceph_fault(con); /* error/fault path */ - goto done_unlocked; -} + if (fault) + con_fault_finish(con); + con->ops->put(con); +} /* * Generic error/fault handler. A retry mechanism is used with * exponential backoff */ -static void ceph_fault(struct ceph_connection *con) - __releases(con->mutex) +static void con_fault(struct ceph_connection *con) { pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg); @@ -2381,10 +2461,10 @@ static void ceph_fault(struct ceph_connection *con) con_close_socket(con); - if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) { + if (con_flag_test(con, CON_FLAG_LOSSYTX)) { dout("fault on LOSSYTX channel, marking CLOSED\n"); con->state = CON_STATE_CLOSED; - goto out_unlock; + return; } if (con->in_msg) { @@ -2401,9 +2481,9 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages queued or keepalive pending, place * the connection in a STANDBY state */ if (list_empty(&con->out_queue) && - !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) { + !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) { dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); - clear_bit(CON_FLAG_WRITE_PENDING, &con->flags); + con_flag_clear(con, CON_FLAG_WRITE_PENDING); con->state = CON_STATE_STANDBY; } else { /* retry after a delay. */ @@ -2412,23 +2492,9 @@ static void ceph_fault(struct ceph_connection *con) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) con->delay *= 2; - set_bit(CON_FLAG_BACKOFF, &con->flags); + con_flag_set(con, CON_FLAG_BACKOFF); queue_con(con); } - -out_unlock: - mutex_unlock(&con->mutex); - /* - * in case we faulted due to authentication, invalidate our - * current tickets so that we can get new ones. - */ - if (con->auth_retry && con->ops->invalidate_authorizer) { - dout("calling invalidate_authorizer()\n"); - con->ops->invalidate_authorizer(con); - } - - if (con->ops->fault) - con->ops->fault(con); } @@ -2469,8 +2535,8 @@ static void clear_standby(struct ceph_connection *con) dout("clear_standby %p and ++connect_seq\n", con); con->state = CON_STATE_PREOPEN; con->connect_seq++; - WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags)); - WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)); + WARN_ON(con_flag_test(con, CON_FLAG_WRITE_PENDING)); + WARN_ON(con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)); } } @@ -2511,7 +2577,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) /* if there wasn't anything waiting to send before, queue * new work */ - if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) + if (con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_send); @@ -2600,8 +2666,8 @@ void ceph_con_keepalive(struct ceph_connection *con) mutex_lock(&con->mutex); clear_standby(con); mutex_unlock(&con->mutex); - if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 && - test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0) + if (con_flag_test_and_set(con, CON_FLAG_KEEPALIVE_PENDING) == 0 && + con_flag_test_and_set(con, CON_FLAG_WRITE_PENDING) == 0) queue_con(con); } EXPORT_SYMBOL(ceph_con_keepalive); @@ -2651,9 +2717,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, m->page_alignment = 0; m->pages = NULL; m->pagelist = NULL; +#ifdef CONFIG_BLOCK m->bio = NULL; m->bio_iter = NULL; m->bio_seg = 0; +#endif /* CONFIG_BLOCK */ m->trail = NULL; /* front */ diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 812eb3b46c1f..aef5b1062bee 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -697,7 +697,7 @@ int ceph_monc_delete_snapid(struct ceph_mon_client *monc, u32 pool, u64 snapid) { return do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, - pool, snapid, 0, 0); + pool, snapid, NULL, 0); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index eb9a44478764..d730dd4d8eb2 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -23,7 +23,7 @@ static const struct ceph_connection_operations osd_con_ops; -static void send_queued(struct ceph_osd_client *osdc); +static void __send_queued(struct ceph_osd_client *osdc); static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); static void __register_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); @@ -32,64 +32,12 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc, static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req); -static int op_needs_trail(int op) -{ - switch (op) { - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - case CEPH_OSD_OP_CALL: - case CEPH_OSD_OP_NOTIFY: - return 1; - default: - return 0; - } -} - static int op_has_extent(int op) { return (op == CEPH_OSD_OP_READ || op == CEPH_OSD_OP_WRITE); } -int ceph_calc_raw_layout(struct ceph_osd_client *osdc, - struct ceph_file_layout *layout, - u64 snapid, - u64 off, u64 *plen, u64 *bno, - struct ceph_osd_request *req, - struct ceph_osd_req_op *op) -{ - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; - u64 orig_len = *plen; - u64 objoff, objlen; /* extent in object */ - int r; - - reqhead->snapid = cpu_to_le64(snapid); - - /* object extent? */ - r = ceph_calc_file_object_mapping(layout, off, plen, bno, - &objoff, &objlen); - if (r < 0) - return r; - if (*plen < orig_len) - dout(" skipping last %llu, final file extent %llu~%llu\n", - orig_len - *plen, off, *plen); - - if (op_has_extent(op->op)) { - op->extent.offset = objoff; - op->extent.length = objlen; - } - req->r_num_pages = calc_pages_for(off, *plen); - req->r_page_alignment = off & ~PAGE_MASK; - if (op->op == CEPH_OSD_OP_WRITE) - op->payload_len = *plen; - - dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", - *bno, objoff, objlen, req->r_num_pages); - return 0; -} -EXPORT_SYMBOL(ceph_calc_raw_layout); - /* * Implement client access to distributed object storage cluster. * @@ -115,20 +63,48 @@ EXPORT_SYMBOL(ceph_calc_raw_layout); * * fill osd op in request message. */ -static int calc_layout(struct ceph_osd_client *osdc, - struct ceph_vino vino, +static int calc_layout(struct ceph_vino vino, struct ceph_file_layout *layout, u64 off, u64 *plen, struct ceph_osd_request *req, struct ceph_osd_req_op *op) { - u64 bno; + u64 orig_len = *plen; + u64 bno = 0; + u64 objoff = 0; + u64 objlen = 0; int r; - r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, - plen, &bno, req, op); + /* object extent? */ + r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno, + &objoff, &objlen); if (r < 0) return r; + if (objlen < orig_len) { + *plen = objlen; + dout(" skipping last %llu, final file extent %llu~%llu\n", + orig_len - *plen, off, *plen); + } + + if (op_has_extent(op->op)) { + u32 osize = le32_to_cpu(layout->fl_object_size); + op->extent.offset = objoff; + op->extent.length = objlen; + if (op->extent.truncate_size <= off - objoff) { + op->extent.truncate_size = 0; + } else { + op->extent.truncate_size -= off - objoff; + if (op->extent.truncate_size > osize) + op->extent.truncate_size = osize; + } + } + req->r_num_pages = calc_pages_for(off, *plen); + req->r_page_alignment = off & ~PAGE_MASK; + if (op->op == CEPH_OSD_OP_WRITE) + op->payload_len = *plen; + + dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", + bno, objoff, objlen, req->r_num_pages); snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); req->r_oid_len = strlen(req->r_oid); @@ -148,25 +124,19 @@ void ceph_osdc_release_request(struct kref *kref) if (req->r_request) ceph_msg_put(req->r_request); if (req->r_con_filling_msg) { - dout("%s revoking pages %p from con %p\n", __func__, - req->r_pages, req->r_con_filling_msg); + dout("%s revoking msg %p from con %p\n", __func__, + req->r_reply, req->r_con_filling_msg); ceph_msg_revoke_incoming(req->r_reply); req->r_con_filling_msg->ops->put(req->r_con_filling_msg); + req->r_con_filling_msg = NULL; } if (req->r_reply) ceph_msg_put(req->r_reply); if (req->r_own_pages) ceph_release_page_vector(req->r_pages, req->r_num_pages); -#ifdef CONFIG_BLOCK - if (req->r_bio) - bio_put(req->r_bio); -#endif ceph_put_snap_context(req->r_snapc); - if (req->r_trail) { - ceph_pagelist_release(req->r_trail); - kfree(req->r_trail); - } + ceph_pagelist_release(&req->r_trail); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); else @@ -174,37 +144,25 @@ void ceph_osdc_release_request(struct kref *kref) } EXPORT_SYMBOL(ceph_osdc_release_request); -static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) -{ - int i = 0; - - if (needs_trail) - *needs_trail = 0; - while (ops[i].op) { - if (needs_trail && op_needs_trail(ops[i].op)) - *needs_trail = 1; - i++; - } - - return i; -} - struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, - int flags, struct ceph_snap_context *snapc, - struct ceph_osd_req_op *ops, + unsigned int num_ops, bool use_mempool, - gfp_t gfp_flags, - struct page **pages, - struct bio *bio) + gfp_t gfp_flags) { struct ceph_osd_request *req; struct ceph_msg *msg; - int needs_trail; - int num_op = get_num_ops(ops, &needs_trail); - size_t msg_size = sizeof(struct ceph_osd_request_head); - - msg_size += num_op*sizeof(struct ceph_osd_op); + size_t msg_size; + + msg_size = 4 + 4 + 8 + 8 + 4+8; + msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ + msg_size += 1 + 8 + 4 + 4; /* pg_t */ + msg_size += 4 + MAX_OBJ_NAME_SIZE; + msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); + msg_size += 8; /* snapid */ + msg_size += 8; /* snap_seq */ + msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ + msg_size += 4; if (use_mempool) { req = mempool_alloc(osdc->req_mempool, gfp_flags); @@ -228,10 +186,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, INIT_LIST_HEAD(&req->r_req_lru_item); INIT_LIST_HEAD(&req->r_osd_item); - req->r_flags = flags; - - WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); - /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); @@ -244,20 +198,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } req->r_reply = msg; - /* allocate space for the trailing data */ - if (needs_trail) { - req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); - if (!req->r_trail) { - ceph_osdc_put_request(req); - return NULL; - } - ceph_pagelist_init(req->r_trail); - } + ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ - msg_size += MAX_OBJ_NAME_SIZE; - if (snapc) - msg_size += sizeof(u64) * snapc->num_snaps; if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op, 0); else @@ -270,13 +213,6 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, memset(msg->front.iov_base, 0, msg->front.iov_len); req->r_request = msg; - req->r_pages = pages; -#ifdef CONFIG_BLOCK - if (bio) { - req->r_bio = bio; - bio_get(req->r_bio); - } -#endif return req; } @@ -289,6 +225,8 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->op = cpu_to_le16(src->op); switch (src->op) { + case CEPH_OSD_OP_STAT: + break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: dst->extent.offset = @@ -300,52 +238,20 @@ static void osd_req_encode_op(struct ceph_osd_request *req, dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); break; - - case CEPH_OSD_OP_GETXATTR: - case CEPH_OSD_OP_SETXATTR: - case CEPH_OSD_OP_CMPXATTR: - BUG_ON(!req->r_trail); - - dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); - dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); - dst->xattr.cmp_op = src->xattr.cmp_op; - dst->xattr.cmp_mode = src->xattr.cmp_mode; - ceph_pagelist_append(req->r_trail, src->xattr.name, - src->xattr.name_len); - ceph_pagelist_append(req->r_trail, src->xattr.val, - src->xattr.value_len); - break; case CEPH_OSD_OP_CALL: - BUG_ON(!req->r_trail); - dst->cls.class_len = src->cls.class_len; dst->cls.method_len = src->cls.method_len; dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); - ceph_pagelist_append(req->r_trail, src->cls.class_name, + ceph_pagelist_append(&req->r_trail, src->cls.class_name, src->cls.class_len); - ceph_pagelist_append(req->r_trail, src->cls.method_name, + ceph_pagelist_append(&req->r_trail, src->cls.method_name, src->cls.method_len); - ceph_pagelist_append(req->r_trail, src->cls.indata, + ceph_pagelist_append(&req->r_trail, src->cls.indata, src->cls.indata_len); break; - case CEPH_OSD_OP_ROLLBACK: - dst->snap.snapid = cpu_to_le64(src->snap.snapid); - break; case CEPH_OSD_OP_STARTSYNC: break; - case CEPH_OSD_OP_NOTIFY: - { - __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); - __le32 timeout = cpu_to_le32(src->watch.timeout); - - BUG_ON(!req->r_trail); - - ceph_pagelist_append(req->r_trail, - &prot_ver, sizeof(prot_ver)); - ceph_pagelist_append(req->r_trail, - &timeout, sizeof(timeout)); - } case CEPH_OSD_OP_NOTIFY_ACK: case CEPH_OSD_OP_WATCH: dst->watch.cookie = cpu_to_le64(src->watch.cookie); @@ -356,6 +262,64 @@ static void osd_req_encode_op(struct ceph_osd_request *req, pr_err("unrecognized osd opcode %d\n", dst->op); WARN_ON(1); break; + case CEPH_OSD_OP_MAPEXT: + case CEPH_OSD_OP_MASKTRUNC: + case CEPH_OSD_OP_SPARSE_READ: + case CEPH_OSD_OP_NOTIFY: + case CEPH_OSD_OP_ASSERT_VER: + case CEPH_OSD_OP_WRITEFULL: + case CEPH_OSD_OP_TRUNCATE: + case CEPH_OSD_OP_ZERO: + case CEPH_OSD_OP_DELETE: + case CEPH_OSD_OP_APPEND: + case CEPH_OSD_OP_SETTRUNC: + case CEPH_OSD_OP_TRIMTRUNC: + case CEPH_OSD_OP_TMAPUP: + case CEPH_OSD_OP_TMAPPUT: + case CEPH_OSD_OP_TMAPGET: + case CEPH_OSD_OP_CREATE: + case CEPH_OSD_OP_ROLLBACK: + case CEPH_OSD_OP_OMAPGETKEYS: + case CEPH_OSD_OP_OMAPGETVALS: + case CEPH_OSD_OP_OMAPGETHEADER: + case CEPH_OSD_OP_OMAPGETVALSBYKEYS: + case CEPH_OSD_OP_MODE_RD: + case CEPH_OSD_OP_OMAPSETVALS: + case CEPH_OSD_OP_OMAPSETHEADER: + case CEPH_OSD_OP_OMAPCLEAR: + case CEPH_OSD_OP_OMAPRMKEYS: + case CEPH_OSD_OP_OMAP_CMP: + case CEPH_OSD_OP_CLONERANGE: + case CEPH_OSD_OP_ASSERT_SRC_VERSION: + case CEPH_OSD_OP_SRC_CMPXATTR: + case CEPH_OSD_OP_GETXATTR: + case CEPH_OSD_OP_GETXATTRS: + case CEPH_OSD_OP_CMPXATTR: + case CEPH_OSD_OP_SETXATTR: + case CEPH_OSD_OP_SETXATTRS: + case CEPH_OSD_OP_RESETXATTRS: + case CEPH_OSD_OP_RMXATTR: + case CEPH_OSD_OP_PULL: + case CEPH_OSD_OP_PUSH: + case CEPH_OSD_OP_BALANCEREADS: + case CEPH_OSD_OP_UNBALANCEREADS: + case CEPH_OSD_OP_SCRUB: + case CEPH_OSD_OP_SCRUB_RESERVE: + case CEPH_OSD_OP_SCRUB_UNRESERVE: + case CEPH_OSD_OP_SCRUB_STOP: + case CEPH_OSD_OP_SCRUB_MAP: + case CEPH_OSD_OP_WRLOCK: + case CEPH_OSD_OP_WRUNLOCK: + case CEPH_OSD_OP_RDLOCK: + case CEPH_OSD_OP_RDUNLOCK: + case CEPH_OSD_OP_UPLOCK: + case CEPH_OSD_OP_DNLOCK: + case CEPH_OSD_OP_PGLS: + case CEPH_OSD_OP_PGLS_FILTER: + pr_err("unsupported osd opcode %s\n", + ceph_osd_op_name(dst->op)); + WARN_ON(1); + break; } dst->payload_len = cpu_to_le32(src->payload_len); } @@ -365,75 +329,95 @@ static void osd_req_encode_op(struct ceph_osd_request *req, * */ void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, u64 *plen, + u64 off, u64 len, unsigned int num_ops, struct ceph_osd_req_op *src_ops, - struct ceph_snap_context *snapc, - struct timespec *mtime, - const char *oid, - int oid_len) + struct ceph_snap_context *snapc, u64 snap_id, + struct timespec *mtime) { struct ceph_msg *msg = req->r_request; - struct ceph_osd_request_head *head; struct ceph_osd_req_op *src_op; - struct ceph_osd_op *op; void *p; - int num_op = get_num_ops(src_ops, NULL); - size_t msg_size = sizeof(*head) + num_op*sizeof(*op); + size_t msg_size; int flags = req->r_flags; - u64 data_len = 0; + u64 data_len; int i; - head = msg->front.iov_base; - op = (void *)(head + 1); - p = (void *)(op + num_op); - + req->r_num_ops = num_ops; + req->r_snapid = snap_id; req->r_snapc = ceph_get_snap_context(snapc); - head->client_inc = cpu_to_le32(1); /* always, for now. */ - head->flags = cpu_to_le32(flags); - if (flags & CEPH_OSD_FLAG_WRITE) - ceph_encode_timespec(&head->mtime, mtime); - head->num_ops = cpu_to_le16(num_op); - - - /* fill in oid */ - head->object_len = cpu_to_le32(oid_len); - memcpy(p, oid, oid_len); - p += oid_len; + /* encode request */ + msg->hdr.version = cpu_to_le16(4); + p = msg->front.iov_base; + ceph_encode_32(&p, 1); /* client_inc is always 1 */ + req->r_request_osdmap_epoch = p; + p += 4; + req->r_request_flags = p; + p += 4; + if (req->r_flags & CEPH_OSD_FLAG_WRITE) + ceph_encode_timespec(p, mtime); + p += sizeof(struct ceph_timespec); + req->r_request_reassert_version = p; + p += sizeof(struct ceph_eversion); /* will get filled in */ + + /* oloc */ + ceph_encode_8(&p, 4); + ceph_encode_8(&p, 4); + ceph_encode_32(&p, 8 + 4 + 4); + req->r_request_pool = p; + p += 8; + ceph_encode_32(&p, -1); /* preferred */ + ceph_encode_32(&p, 0); /* key len */ + + ceph_encode_8(&p, 1); + req->r_request_pgid = p; + p += 8 + 4; + ceph_encode_32(&p, -1); /* preferred */ + + /* oid */ + ceph_encode_32(&p, req->r_oid_len); + memcpy(p, req->r_oid, req->r_oid_len); + dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); + p += req->r_oid_len; + + /* ops */ + ceph_encode_16(&p, num_ops); src_op = src_ops; - while (src_op->op) { - osd_req_encode_op(req, op, src_op); - src_op++; - op++; + req->r_request_ops = p; + for (i = 0; i < num_ops; i++, src_op++) { + osd_req_encode_op(req, p, src_op); + p += sizeof(struct ceph_osd_op); } - if (req->r_trail) - data_len += req->r_trail->length; - - if (snapc) { - head->snap_seq = cpu_to_le64(snapc->seq); - head->num_snaps = cpu_to_le32(snapc->num_snaps); + /* snaps */ + ceph_encode_64(&p, req->r_snapid); + ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); + ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); + if (req->r_snapc) { for (i = 0; i < snapc->num_snaps; i++) { - put_unaligned_le64(snapc->snaps[i], p); - p += sizeof(u64); + ceph_encode_64(&p, req->r_snapc->snaps[i]); } } + req->r_request_attempts = p; + p += 4; + + data_len = req->r_trail.length; if (flags & CEPH_OSD_FLAG_WRITE) { req->r_request->hdr.data_off = cpu_to_le16(off); - req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); - } else if (data_len) { - req->r_request->hdr.data_off = 0; - req->r_request->hdr.data_len = cpu_to_le32(data_len); + data_len += len; } - + req->r_request->hdr.data_len = cpu_to_le32(data_len); req->r_request->page_alignment = req->r_page_alignment; BUG_ON(p > msg->front.iov_base + msg->front.iov_len); msg_size = p - msg->front.iov_base; msg->front.iov_len = msg_size; msg->hdr.front_len = cpu_to_le32(msg_size); + + dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, + num_ops); return; } EXPORT_SYMBOL(ceph_osdc_build_request); @@ -459,34 +443,33 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, int num_reply, + bool use_mempool, int page_align) { - struct ceph_osd_req_op ops[3]; + struct ceph_osd_req_op ops[2]; struct ceph_osd_request *req; + unsigned int num_op = 1; int r; + memset(&ops, 0, sizeof ops); + ops[0].op = opcode; ops[0].extent.truncate_seq = truncate_seq; ops[0].extent.truncate_size = truncate_size; - ops[0].payload_len = 0; if (do_sync) { ops[1].op = CEPH_OSD_OP_STARTSYNC; - ops[1].payload_len = 0; - ops[2].op = 0; - } else - ops[1].op = 0; - - req = ceph_osdc_alloc_request(osdc, flags, - snapc, ops, - use_mempool, - GFP_NOFS, NULL, NULL); + num_op++; + } + + req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, + GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); + req->r_flags = flags; /* calculate max write size */ - r = calc_layout(osdc, vino, layout, off, plen, req, ops); + r = calc_layout(vino, layout, off, plen, req, ops); if (r < 0) return ERR_PTR(r); req->r_file_layout = *layout; /* keep a copy */ @@ -496,10 +479,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, req->r_num_pages = calc_pages_for(page_align, *plen); req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, plen, ops, - snapc, - mtime, - req->r_oid, req->r_oid_len); + ceph_osdc_build_request(req, off, *plen, num_op, ops, + snapc, vino.snap, mtime); return req; } @@ -623,8 +604,8 @@ static void osd_reset(struct ceph_connection *con) down_read(&osdc->map_sem); mutex_lock(&osdc->request_mutex); __kick_osd_requests(osdc, osd); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -739,31 +720,35 @@ static void remove_old_osds(struct ceph_osd_client *osdc) */ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) { - struct ceph_osd_request *req; - int ret = 0; + struct ceph_entity_addr *peer_addr; dout("__reset_osd %p osd%d\n", osd, osd->o_osd); if (list_empty(&osd->o_requests) && list_empty(&osd->o_linger_requests)) { __remove_osd(osdc, osd); - ret = -ENODEV; - } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], - &osd->o_con.peer_addr, - sizeof(osd->o_con.peer_addr)) == 0 && - !ceph_con_opened(&osd->o_con)) { + + return -ENODEV; + } + + peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; + if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && + !ceph_con_opened(&osd->o_con)) { + struct ceph_osd_request *req; + dout(" osd addr hasn't changed and connection never opened," " letting msgr retry"); /* touch each r_stamp for handle_timeout()'s benfit */ list_for_each_entry(req, &osd->o_requests, r_osd_item) req->r_stamp = jiffies; - ret = -EAGAIN; - } else { - ceph_con_close(&osd->o_con); - ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, - &osdc->osdmap->osd_addr[osd->o_osd]); - osd->o_incarnation++; + + return -EAGAIN; } - return ret; + + ceph_con_close(&osd->o_con); + ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); + osd->o_incarnation++; + + return 0; } static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) @@ -961,20 +946,18 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger); static int __map_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, int force_resend) { - struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; struct ceph_pg pgid; int acting[CEPH_PG_MAX_SIZE]; int o = -1, num = 0; int err; dout("map_request %p tid %lld\n", req, req->r_tid); - err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, + err = ceph_calc_object_layout(&pgid, req->r_oid, &req->r_file_layout, osdc->osdmap); if (err) { list_move(&req->r_req_lru_item, &osdc->req_notarget); return err; } - pgid = reqhead->layout.ol_pgid; req->r_pgid = pgid; err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); @@ -991,8 +974,8 @@ static int __map_request(struct ceph_osd_client *osdc, (req->r_osd == NULL && o == -1)) return 0; /* no change */ - dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", - req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, + dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", + req->r_tid, pgid.pool, pgid.seed, o, req->r_osd ? req->r_osd->o_osd : -1); /* record full pg acting set */ @@ -1041,15 +1024,22 @@ out: static void __send_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req) { - struct ceph_osd_request_head *reqhead; - - dout("send_request %p tid %llu to osd%d flags %d\n", - req, req->r_tid, req->r_osd->o_osd, req->r_flags); + void *p; - reqhead = req->r_request->front.iov_base; - reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); - reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ - reqhead->reassert_version = req->r_reassert_version; + dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", + req, req->r_tid, req->r_osd->o_osd, req->r_flags, + (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); + + /* fill in message content that changes each time we send it */ + put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); + put_unaligned_le32(req->r_flags, req->r_request_flags); + put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); + p = req->r_request_pgid; + ceph_encode_64(&p, req->r_pgid.pool); + ceph_encode_32(&p, req->r_pgid.seed); + put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ + memcpy(req->r_request_reassert_version, &req->r_reassert_version, + sizeof(req->r_reassert_version)); req->r_stamp = jiffies; list_move_tail(&req->r_req_lru_item, &osdc->req_lru); @@ -1062,16 +1052,13 @@ static void __send_request(struct ceph_osd_client *osdc, /* * Send any requests in the queue (req_unsent). */ -static void send_queued(struct ceph_osd_client *osdc) +static void __send_queued(struct ceph_osd_client *osdc) { struct ceph_osd_request *req, *tmp; - dout("send_queued\n"); - mutex_lock(&osdc->request_mutex); - list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { + dout("__send_queued\n"); + list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) __send_request(osdc, req); - } - mutex_unlock(&osdc->request_mutex); } /* @@ -1123,8 +1110,8 @@ static void handle_timeout(struct work_struct *work) } __schedule_osd_timeout(osdc); + __send_queued(osdc); mutex_unlock(&osdc->request_mutex); - send_queued(osdc); up_read(&osdc->map_sem); } @@ -1152,6 +1139,26 @@ static void complete_request(struct ceph_osd_request *req) complete_all(&req->r_safe_completion); /* fsync waiter */ } +static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid) +{ + __u8 v; + + ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad); + v = ceph_decode_8(p); + if (v > 1) { + pr_warning("do not understand pg encoding %d > 1", v); + return -EINVAL; + } + pgid->pool = ceph_decode_64(p); + pgid->seed = ceph_decode_32(p); + *p += 4; + return 0; + +bad: + pr_warning("incomplete pg encoding"); + return -EINVAL; +} + /* * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. @@ -1159,22 +1166,42 @@ static void complete_request(struct ceph_osd_request *req) static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_connection *con) { - struct ceph_osd_reply_head *rhead = msg->front.iov_base; + void *p, *end; struct ceph_osd_request *req; u64 tid; - int numops, object_len, flags; + int object_len; + int numops, payload_len, flags; s32 result; + s32 retry_attempt; + struct ceph_pg pg; + int err; + u32 reassert_epoch; + u64 reassert_version; + u32 osdmap_epoch; + int i; tid = le64_to_cpu(msg->hdr.tid); - if (msg->front.iov_len < sizeof(*rhead)) - goto bad; - numops = le32_to_cpu(rhead->num_ops); - object_len = le32_to_cpu(rhead->object_len); - result = le32_to_cpu(rhead->result); - if (msg->front.iov_len != sizeof(*rhead) + object_len + - numops * sizeof(struct ceph_osd_op)) + dout("handle_reply %p tid %llu\n", msg, tid); + + p = msg->front.iov_base; + end = p + msg->front.iov_len; + + ceph_decode_need(&p, end, 4, bad); + object_len = ceph_decode_32(&p); + ceph_decode_need(&p, end, object_len, bad); + p += object_len; + + err = __decode_pgid(&p, end, &pg); + if (err) goto bad; - dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); + + ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); + flags = ceph_decode_64(&p); + result = ceph_decode_32(&p); + reassert_epoch = ceph_decode_32(&p); + reassert_version = ceph_decode_64(&p); + osdmap_epoch = ceph_decode_32(&p); + /* lookup */ mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); @@ -1184,7 +1211,38 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, return; } ceph_osdc_get_request(req); - flags = le32_to_cpu(rhead->flags); + + dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, + req, result); + + ceph_decode_need(&p, end, 4, bad); + numops = ceph_decode_32(&p); + if (numops > CEPH_OSD_MAX_OP) + goto bad_put; + if (numops != req->r_num_ops) + goto bad_put; + payload_len = 0; + ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); + for (i = 0; i < numops; i++) { + struct ceph_osd_op *op = p; + int len; + + len = le32_to_cpu(op->payload_len); + req->r_reply_op_len[i] = len; + dout(" op %d has %d bytes\n", i, len); + payload_len += len; + p += sizeof(*op); + } + if (payload_len != le32_to_cpu(msg->hdr.data_len)) { + pr_warning("sum of op payload lens %d != data_len %d", + payload_len, le32_to_cpu(msg->hdr.data_len)); + goto bad_put; + } + + ceph_decode_need(&p, end, 4 + numops * 4, bad); + retry_attempt = ceph_decode_32(&p); + for (i = 0; i < numops; i++) + req->r_reply_op_result[i] = ceph_decode_32(&p); /* * if this connection filled our message, drop our reference now, to @@ -1199,7 +1257,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, if (!req->r_got_reply) { unsigned int bytes; - req->r_result = le32_to_cpu(rhead->result); + req->r_result = result; bytes = le32_to_cpu(msg->hdr.data_len); dout("handle_reply result %d bytes %d\n", req->r_result, bytes); @@ -1207,7 +1265,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, req->r_result = bytes; /* in case this is a write and we need to replay, */ - req->r_reassert_version = rhead->reassert_version; + req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); + req->r_reassert_version.version = cpu_to_le64(reassert_version); req->r_got_reply = 1; } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { @@ -1242,10 +1301,11 @@ done: ceph_osdc_put_request(req); return; +bad_put: + ceph_osdc_put_request(req); bad: - pr_err("corrupt osd_op_reply got %d %d expected %d\n", - (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), - (int)sizeof(*rhead)); + pr_err("corrupt osd_op_reply got %d %d\n", + (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); ceph_msg_dump(msg); } @@ -1462,7 +1522,9 @@ done: if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) ceph_monc_request_next_osdmap(&osdc->client->monc); - send_queued(osdc); + mutex_lock(&osdc->request_mutex); + __send_queued(osdc); + mutex_unlock(&osdc->request_mutex); up_read(&osdc->map_sem); wake_up_all(&osdc->client->auth_wq); return; @@ -1556,8 +1618,7 @@ static void __remove_event(struct ceph_osd_event *event) int ceph_osdc_create_event(struct ceph_osd_client *osdc, void (*event_cb)(u64, u64, u8, void *), - int one_shot, void *data, - struct ceph_osd_event **pevent) + void *data, struct ceph_osd_event **pevent) { struct ceph_osd_event *event; @@ -1567,14 +1628,13 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc, dout("create_event %p\n", event); event->cb = event_cb; - event->one_shot = one_shot; + event->one_shot = 0; event->data = data; event->osdc = osdc; INIT_LIST_HEAD(&event->osd_node); RB_CLEAR_NODE(&event->node); kref_init(&event->kref); /* one ref for us */ kref_get(&event->kref); /* one ref for the caller */ - init_completion(&event->completion); spin_lock(&osdc->event_lock); event->cookie = ++osdc->event_count; @@ -1610,7 +1670,6 @@ static void do_event_work(struct work_struct *work) dout("do_event_work completing %p\n", event); event->cb(ver, notify_id, opcode, event->data); - complete(&event->completion); dout("do_event_work completed %p\n", event); ceph_osdc_put_event(event); kfree(event_work); @@ -1620,7 +1679,8 @@ static void do_event_work(struct work_struct *work) /* * Process osd watch notifications */ -void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) +static void handle_watch_notify(struct ceph_osd_client *osdc, + struct ceph_msg *msg) { void *p, *end; u8 proto_ver; @@ -1641,9 +1701,8 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) spin_lock(&osdc->event_lock); event = __find_event(osdc, cookie); if (event) { + BUG_ON(event->one_shot); get_event(event); - if (event->one_shot) - __remove_event(event); } spin_unlock(&osdc->event_lock); dout("handle_watch_notify cookie %lld ver %lld event %p\n", @@ -1668,7 +1727,6 @@ void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) return; done_err: - complete(&event->completion); ceph_osdc_put_event(event); return; @@ -1677,21 +1735,6 @@ bad: return; } -int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) -{ - int err; - - dout("wait_event %p\n", event); - err = wait_for_completion_interruptible_timeout(&event->completion, - timeout * HZ); - ceph_osdc_put_event(event); - if (err > 0) - err = 0; - dout("wait_event %p returns %d\n", event, err); - return err; -} -EXPORT_SYMBOL(ceph_osdc_wait_event); - /* * Register request, send initial attempt. */ @@ -1706,7 +1749,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, #ifdef CONFIG_BLOCK req->r_request->bio = req->r_bio; #endif - req->r_request->trail = req->r_trail; + req->r_request->trail = &req->r_trail; register_request(osdc, req); @@ -1865,7 +1908,6 @@ out_mempool: out: return err; } -EXPORT_SYMBOL(ceph_osdc_init); void ceph_osdc_stop(struct ceph_osd_client *osdc) { @@ -1882,7 +1924,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc) ceph_msgpool_destroy(&osdc->msgpool_op); ceph_msgpool_destroy(&osdc->msgpool_op_reply); } -EXPORT_SYMBOL(ceph_osdc_stop); /* * Read some contiguous pages. If we cross a stripe boundary, shorten @@ -1902,7 +1943,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, req = ceph_osdc_new_request(osdc, layout, vino, off, plen, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, - false, 1, page_align); + false, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1931,8 +1972,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - struct page **pages, int num_pages, - int flags, int do_sync, bool nofail) + struct page **pages, int num_pages) { struct ceph_osd_request *req; int rc = 0; @@ -1941,11 +1981,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); req = ceph_osdc_new_request(osdc, layout, vino, off, &len, CEPH_OSD_OP_WRITE, - flags | CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE, - snapc, do_sync, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + snapc, 0, truncate_seq, truncate_size, mtime, - nofail, 1, page_align); + true, page_align); if (IS_ERR(req)) return PTR_ERR(req); @@ -1954,7 +1993,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); - rc = ceph_osdc_start_request(osdc, req, nofail); + rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); @@ -2047,7 +2086,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, if (data_len > 0) { int want = calc_pages_for(req->r_page_alignment, data_len); - if (unlikely(req->r_num_pages < want)) { + if (req->r_pages && unlikely(req->r_num_pages < want)) { pr_warning("tid %lld reply has %d bytes %d pages, we" " had only %d pages ready\n", tid, data_len, want, req->r_num_pages); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index de73214b5d26..69bc4bf89e3e 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -13,26 +13,18 @@ char *ceph_osdmap_state_str(char *str, int len, int state) { - int flag = 0; - if (!len) - goto done; - - *str = '\0'; - if (state) { - if (state & CEPH_OSD_EXISTS) { - snprintf(str, len, "exists"); - flag = 1; - } - if (state & CEPH_OSD_UP) { - snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""), - "up"); - flag = 1; - } - } else { + return str; + + if ((state & CEPH_OSD_EXISTS) && (state & CEPH_OSD_UP)) + snprintf(str, len, "exists, up"); + else if (state & CEPH_OSD_EXISTS) + snprintf(str, len, "exists"); + else if (state & CEPH_OSD_UP) + snprintf(str, len, "up"); + else snprintf(str, len, "doesn't exist"); - } -done: + return str; } @@ -53,13 +45,8 @@ static int calc_bits_of(unsigned int t) */ static void calc_pg_masks(struct ceph_pg_pool_info *pi) { - pi->pg_num_mask = (1 << calc_bits_of(le32_to_cpu(pi->v.pg_num)-1)) - 1; - pi->pgp_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.pgp_num)-1)) - 1; - pi->lpg_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.lpg_num)-1)) - 1; - pi->lpgp_num_mask = - (1 << calc_bits_of(le32_to_cpu(pi->v.lpgp_num)-1)) - 1; + pi->pg_num_mask = (1 << calc_bits_of(pi->pg_num-1)) - 1; + pi->pgp_num_mask = (1 << calc_bits_of(pi->pgp_num-1)) - 1; } /* @@ -170,6 +157,7 @@ static struct crush_map *crush_decode(void *pbyval, void *end) c->choose_local_tries = 2; c->choose_local_fallback_tries = 5; c->choose_total_tries = 19; + c->chooseleaf_descend_once = 0; ceph_decode_need(p, end, 4*sizeof(u32), bad); magic = ceph_decode_32(p); @@ -336,6 +324,11 @@ static struct crush_map *crush_decode(void *pbyval, void *end) dout("crush decode tunable choose_total_tries = %d", c->choose_total_tries); + ceph_decode_need(p, end, sizeof(u32), done); + c->chooseleaf_descend_once = ceph_decode_32(p); + dout("crush decode tunable chooseleaf_descend_once = %d", + c->chooseleaf_descend_once); + done: dout("crush_decode success\n"); return c; @@ -354,12 +347,13 @@ bad: */ static int pgid_cmp(struct ceph_pg l, struct ceph_pg r) { - u64 a = *(u64 *)&l; - u64 b = *(u64 *)&r; - - if (a < b) + if (l.pool < r.pool) + return -1; + if (l.pool > r.pool) + return 1; + if (l.seed < r.seed) return -1; - if (a > b) + if (l.seed > r.seed) return 1; return 0; } @@ -405,8 +399,8 @@ static struct ceph_pg_mapping *__lookup_pg_mapping(struct rb_root *root, } else if (c > 0) { n = n->rb_right; } else { - dout("__lookup_pg_mapping %llx got %p\n", - *(u64 *)&pgid, pg); + dout("__lookup_pg_mapping %lld.%x got %p\n", + pgid.pool, pgid.seed, pg); return pg; } } @@ -418,12 +412,13 @@ static int __remove_pg_mapping(struct rb_root *root, struct ceph_pg pgid) struct ceph_pg_mapping *pg = __lookup_pg_mapping(root, pgid); if (pg) { - dout("__remove_pg_mapping %llx %p\n", *(u64 *)&pgid, pg); + dout("__remove_pg_mapping %lld.%x %p\n", pgid.pool, pgid.seed, + pg); rb_erase(&pg->node, root); kfree(pg); return 0; } - dout("__remove_pg_mapping %llx dne\n", *(u64 *)&pgid); + dout("__remove_pg_mapping %lld.%x dne\n", pgid.pool, pgid.seed); return -ENOENT; } @@ -452,7 +447,7 @@ static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new) return 0; } -static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id) +static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, u64 id) { struct ceph_pg_pool_info *pi; struct rb_node *n = root->rb_node; @@ -508,24 +503,57 @@ static void __remove_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *pi) static int __decode_pool(void **p, void *end, struct ceph_pg_pool_info *pi) { - unsigned int n, m; + u8 ev, cv; + unsigned len, num; + void *pool_end; + + ceph_decode_need(p, end, 2 + 4, bad); + ev = ceph_decode_8(p); /* encoding version */ + cv = ceph_decode_8(p); /* compat version */ + if (ev < 5) { + pr_warning("got v %d < 5 cv %d of ceph_pg_pool\n", ev, cv); + return -EINVAL; + } + if (cv > 7) { + pr_warning("got v %d cv %d > 7 of ceph_pg_pool\n", ev, cv); + return -EINVAL; + } + len = ceph_decode_32(p); + ceph_decode_need(p, end, len, bad); + pool_end = *p + len; - ceph_decode_copy(p, &pi->v, sizeof(pi->v)); - calc_pg_masks(pi); + pi->type = ceph_decode_8(p); + pi->size = ceph_decode_8(p); + pi->crush_ruleset = ceph_decode_8(p); + pi->object_hash = ceph_decode_8(p); + + pi->pg_num = ceph_decode_32(p); + pi->pgp_num = ceph_decode_32(p); + + *p += 4 + 4; /* skip lpg* */ + *p += 4; /* skip last_change */ + *p += 8 + 4; /* skip snap_seq, snap_epoch */ - /* num_snaps * snap_info_t */ - n = le32_to_cpu(pi->v.num_snaps); - while (n--) { - ceph_decode_need(p, end, sizeof(u64) + 1 + sizeof(u64) + - sizeof(struct ceph_timespec), bad); - *p += sizeof(u64) + /* key */ - 1 + sizeof(u64) + /* u8, snapid */ - sizeof(struct ceph_timespec); - m = ceph_decode_32(p); /* snap name */ - *p += m; + /* skip snaps */ + num = ceph_decode_32(p); + while (num--) { + *p += 8; /* snapid key */ + *p += 1 + 1; /* versions */ + len = ceph_decode_32(p); + *p += len; } - *p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2; + /* skip removed snaps */ + num = ceph_decode_32(p); + *p += num * (8 + 8); + + *p += 8; /* skip auid */ + pi->flags = ceph_decode_64(p); + + /* ignore the rest */ + + *p = pool_end; + calc_pg_masks(pi); return 0; bad: @@ -535,14 +563,15 @@ bad: static int __decode_pool_names(void **p, void *end, struct ceph_osdmap *map) { struct ceph_pg_pool_info *pi; - u32 num, len, pool; + u32 num, len; + u64 pool; ceph_decode_32_safe(p, end, num, bad); dout(" %d pool names\n", num); while (num--) { - ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, bad); ceph_decode_32_safe(p, end, len, bad); - dout(" pool %d len %d\n", pool, len); + dout(" pool %llu len %d\n", pool, len); ceph_decode_need(p, end, len, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) { @@ -633,7 +662,6 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) struct ceph_osdmap *map; u16 version; u32 len, max, i; - u8 ev; int err = -EINVAL; void *start = *p; struct ceph_pg_pool_info *pi; @@ -646,9 +674,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) map->pg_temp = RB_ROOT; ceph_decode_16_safe(p, end, version, bad); - if (version > CEPH_OSDMAP_VERSION) { - pr_warning("got unknown v %d > %d of osdmap\n", version, - CEPH_OSDMAP_VERSION); + if (version > 6) { + pr_warning("got unknown v %d > 6 of osdmap\n", version); + goto bad; + } + if (version < 6) { + pr_warning("got old v %d < 6 of osdmap\n", version); goto bad; } @@ -660,20 +691,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) ceph_decode_32_safe(p, end, max, bad); while (max--) { - ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad); + ceph_decode_need(p, end, 8 + 2, bad); err = -ENOMEM; pi = kzalloc(sizeof(*pi), GFP_NOFS); if (!pi) goto bad; - pi->id = ceph_decode_32(p); - err = -EINVAL; - ev = ceph_decode_8(p); /* encoding version */ - if (ev > CEPH_PG_POOL_VERSION) { - pr_warning("got unknown v %d > %d of ceph_pg_pool\n", - ev, CEPH_PG_POOL_VERSION); - kfree(pi); - goto bad; - } + pi->id = ceph_decode_64(p); err = __decode_pool(p, end, pi); if (err < 0) { kfree(pi); @@ -682,12 +705,10 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) __insert_pg_pool(&map->pg_pools, pi); } - if (version >= 5) { - err = __decode_pool_names(p, end, map); - if (err < 0) { - dout("fail to decode pool names"); - goto bad; - } + err = __decode_pool_names(p, end, map); + if (err < 0) { + dout("fail to decode pool names"); + goto bad; } ceph_decode_32_safe(p, end, map->pool_max, bad); @@ -724,10 +745,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) for (i = 0; i < len; i++) { int n, j; struct ceph_pg pgid; + struct ceph_pg_v1 pgid_v1; struct ceph_pg_mapping *pg; ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad); - ceph_decode_copy(p, &pgid, sizeof(pgid)); + ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1)); + pgid.pool = le32_to_cpu(pgid_v1.pool); + pgid.seed = le16_to_cpu(pgid_v1.ps); n = ceph_decode_32(p); err = -EINVAL; if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) @@ -745,7 +769,8 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end) err = __insert_pg_mapping(pg, &map->pg_temp); if (err) goto bad; - dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, len); + dout(" added pg_temp %lld.%x len %d\n", pgid.pool, pgid.seed, + len); } /* crush */ @@ -784,16 +809,17 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, struct ceph_fsid fsid; u32 epoch = 0; struct ceph_timespec modified; - u32 len, pool; - __s32 new_pool_max, new_flags, max; + s32 len; + u64 pool; + __s64 new_pool_max; + __s32 new_flags, max; void *start = *p; int err = -EINVAL; u16 version; ceph_decode_16_safe(p, end, version, bad); - if (version > CEPH_OSDMAP_INC_VERSION) { - pr_warning("got unknown v %d > %d of inc osdmap\n", version, - CEPH_OSDMAP_INC_VERSION); + if (version > 6) { + pr_warning("got unknown v %d > %d of inc osdmap\n", version, 6); goto bad; } @@ -803,7 +829,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, epoch = ceph_decode_32(p); BUG_ON(epoch != map->epoch+1); ceph_decode_copy(p, &modified, sizeof(modified)); - new_pool_max = ceph_decode_32(p); + new_pool_max = ceph_decode_64(p); new_flags = ceph_decode_32(p); /* full map? */ @@ -853,18 +879,9 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, /* new_pool */ ceph_decode_32_safe(p, end, len, bad); while (len--) { - __u8 ev; struct ceph_pg_pool_info *pi; - ceph_decode_32_safe(p, end, pool, bad); - ceph_decode_need(p, end, 1 + sizeof(pi->v), bad); - ev = ceph_decode_8(p); /* encoding version */ - if (ev > CEPH_PG_POOL_VERSION) { - pr_warning("got unknown v %d > %d of ceph_pg_pool\n", - ev, CEPH_PG_POOL_VERSION); - err = -EINVAL; - goto bad; - } + ceph_decode_64_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (!pi) { pi = kzalloc(sizeof(*pi), GFP_NOFS); @@ -890,7 +907,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_pool_info *pi; - ceph_decode_32_safe(p, end, pool, bad); + ceph_decode_64_safe(p, end, pool, bad); pi = __lookup_pg_pool(&map->pg_pools, pool); if (pi) __remove_pg_pool(&map->pg_pools, pi); @@ -946,10 +963,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, while (len--) { struct ceph_pg_mapping *pg; int j; + struct ceph_pg_v1 pgid_v1; struct ceph_pg pgid; u32 pglen; ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad); - ceph_decode_copy(p, &pgid, sizeof(pgid)); + ceph_decode_copy(p, &pgid_v1, sizeof(pgid_v1)); + pgid.pool = le32_to_cpu(pgid_v1.pool); + pgid.seed = le16_to_cpu(pgid_v1.ps); pglen = ceph_decode_32(p); if (pglen) { @@ -975,8 +995,8 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, kfree(pg); goto bad; } - dout(" added pg_temp %llx len %d\n", *(u64 *)&pgid, - pglen); + dout(" added pg_temp %lld.%x len %d\n", pgid.pool, + pgid.seed, pglen); } else { /* remove */ __remove_pg_mapping(&map->pg_temp, pgid); @@ -1010,7 +1030,7 @@ bad: * pass a stride back to the caller. */ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, - u64 off, u64 *plen, + u64 off, u64 len, u64 *ono, u64 *oxoff, u64 *oxlen) { @@ -1021,7 +1041,7 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, u32 su_per_object; u64 t, su_offset; - dout("mapping %llu~%llu osize %u fl_su %u\n", off, *plen, + dout("mapping %llu~%llu osize %u fl_su %u\n", off, len, osize, su); if (su == 0 || sc == 0) goto invalid; @@ -1054,11 +1074,10 @@ int ceph_calc_file_object_mapping(struct ceph_file_layout *layout, /* * Calculate the length of the extent being written to the selected - * object. This is the minimum of the full length requested (plen) or + * object. This is the minimum of the full length requested (len) or * the remainder of the current stripe being written to. */ - *oxlen = min_t(u64, *plen, su - su_offset); - *plen = *oxlen; + *oxlen = min_t(u64, len, su - su_offset); dout(" obj extent %llu~%llu\n", *oxoff, *oxlen); return 0; @@ -1076,33 +1095,24 @@ EXPORT_SYMBOL(ceph_calc_file_object_mapping); * calculate an object layout (i.e. pgid) from an oid, * file_layout, and osdmap */ -int ceph_calc_object_layout(struct ceph_object_layout *ol, +int ceph_calc_object_layout(struct ceph_pg *pg, const char *oid, struct ceph_file_layout *fl, struct ceph_osdmap *osdmap) { unsigned int num, num_mask; - struct ceph_pg pgid; - int poolid = le32_to_cpu(fl->fl_pg_pool); struct ceph_pg_pool_info *pool; - unsigned int ps; BUG_ON(!osdmap); - - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + pg->pool = le32_to_cpu(fl->fl_pg_pool); + pool = __lookup_pg_pool(&osdmap->pg_pools, pg->pool); if (!pool) return -EIO; - ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid)); - num = le32_to_cpu(pool->v.pg_num); + pg->seed = ceph_str_hash(pool->object_hash, oid, strlen(oid)); + num = pool->pg_num; num_mask = pool->pg_num_mask; - pgid.ps = cpu_to_le16(ps); - pgid.preferred = cpu_to_le16(-1); - pgid.pool = fl->fl_pg_pool; - dout("calc_object_layout '%s' pgid %d.%x\n", oid, poolid, ps); - - ol->ol_pgid = pgid; - ol->ol_stripe_unit = fl->fl_object_stripe_unit; + dout("calc_object_layout '%s' pgid %lld.%x\n", oid, pg->pool, pg->seed); return 0; } EXPORT_SYMBOL(ceph_calc_object_layout); @@ -1117,19 +1127,16 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, struct ceph_pg_mapping *pg; struct ceph_pg_pool_info *pool; int ruleno; - unsigned int poolid, ps, pps, t, r; - - poolid = le32_to_cpu(pgid.pool); - ps = le16_to_cpu(pgid.ps); + int r; + u32 pps; - pool = __lookup_pg_pool(&osdmap->pg_pools, poolid); + pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool); if (!pool) return NULL; /* pg_temp? */ - t = ceph_stable_mod(ps, le32_to_cpu(pool->v.pg_num), - pool->pgp_num_mask); - pgid.ps = cpu_to_le16(t); + pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num, + pool->pgp_num_mask); pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid); if (pg) { *num = pg->len; @@ -1137,26 +1144,39 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid, } /* crush */ - ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset, - pool->v.type, pool->v.size); + ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset, + pool->type, pool->size); if (ruleno < 0) { - pr_err("no crush rule pool %d ruleset %d type %d size %d\n", - poolid, pool->v.crush_ruleset, pool->v.type, - pool->v.size); + pr_err("no crush rule pool %lld ruleset %d type %d size %d\n", + pgid.pool, pool->crush_ruleset, pool->type, + pool->size); return NULL; } - pps = ceph_stable_mod(ps, - le32_to_cpu(pool->v.pgp_num), - pool->pgp_num_mask); - pps += poolid; + if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) { + /* hash pool id and seed sothat pool PGs do not overlap */ + pps = crush_hash32_2(CRUSH_HASH_RJENKINS1, + ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask), + pgid.pool); + } else { + /* + * legacy ehavior: add ps and pool together. this is + * not a great approach because the PGs from each pool + * will overlap on top of each other: 0.5 == 1.4 == + * 2.3 == ... + */ + pps = ceph_stable_mod(pgid.seed, pool->pgp_num, + pool->pgp_num_mask) + + (unsigned)pgid.pool; + } r = crush_do_rule(osdmap->crush, ruleno, pps, osds, - min_t(int, pool->v.size, *num), + min_t(int, pool->size, *num), osdmap->osd_weight); if (r < 0) { - pr_err("error %d from crush rule: pool %d ruleset %d type %d" - " size %d\n", r, poolid, pool->v.crush_ruleset, - pool->v.type, pool->v.size); + pr_err("error %d from crush rule: pool %lld ruleset %d type %d" + " size %d\n", r, pgid.pool, pool->crush_ruleset, + pool->type, pool->size); return NULL; } *num = r; diff --git a/net/ceph/pagevec.c b/net/ceph/pagevec.c index cd9c21df87d1..815a2249cfa9 100644 --- a/net/ceph/pagevec.c +++ b/net/ceph/pagevec.c @@ -12,7 +12,7 @@ /* * build a vector of user pages */ -struct page **ceph_get_direct_page_vector(const char __user *data, +struct page **ceph_get_direct_page_vector(const void __user *data, int num_pages, bool write_page) { struct page **pages; @@ -93,7 +93,7 @@ EXPORT_SYMBOL(ceph_alloc_page_vector); * copy user data into a page vector */ int ceph_copy_user_to_page_vector(struct page **pages, - const char __user *data, + const void __user *data, loff_t off, size_t len) { int i = 0; @@ -118,17 +118,17 @@ int ceph_copy_user_to_page_vector(struct page **pages, } EXPORT_SYMBOL(ceph_copy_user_to_page_vector); -int ceph_copy_to_page_vector(struct page **pages, - const char *data, +void ceph_copy_to_page_vector(struct page **pages, + const void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(page_address(pages[i]) + po, data, l); data += l; left -= l; @@ -138,21 +138,20 @@ int ceph_copy_to_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_to_page_vector); -int ceph_copy_from_page_vector(struct page **pages, - char *data, +void ceph_copy_from_page_vector(struct page **pages, + void *data, loff_t off, size_t len) { int i = 0; size_t po = off & ~PAGE_CACHE_MASK; size_t left = len; - size_t l; while (left > 0) { - l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + size_t l = min_t(size_t, PAGE_CACHE_SIZE-po, left); + memcpy(data, page_address(pages[i]) + po, l); data += l; left -= l; @@ -162,7 +161,6 @@ int ceph_copy_from_page_vector(struct page **pages, i++; } } - return len; } EXPORT_SYMBOL(ceph_copy_from_page_vector); @@ -170,7 +168,7 @@ EXPORT_SYMBOL(ceph_copy_from_page_vector); * copy user data from a page vector into a user pointer */ int ceph_copy_page_vector_to_user(struct page **pages, - char __user *data, + void __user *data, loff_t off, size_t len) { int i = 0; |