diff options
author | Ilya Dryomov <idryomov@gmail.com> | 2020-11-12 15:48:06 +0100 |
---|---|---|
committer | Ilya Dryomov <idryomov@gmail.com> | 2020-12-14 23:21:50 +0100 |
commit | 2f713615ddd9d805b6c5e79c52e0e11af99d2bf1 (patch) | |
tree | 1a2940cfdb53882ace9f27167d594828cdbf6658 | |
parent | 566050e17e53db283d4e26b73b4b50556f97ce7b (diff) | |
download | linux-2f713615ddd9d805b6c5e79c52e0e11af99d2bf1.tar.bz2 |
libceph: move msgr1 protocol implementation to its own file
A pure move, no other changes.
Note that ceph_tcp_recv{msg,page}() and ceph_tcp_send{msg,page}()
helpers are also moved. msgr2 will bring its own, more efficient,
variants based on iov_iter. Switching msgr1 to them was considered
but decided against to avoid subtle regressions.
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
-rw-r--r-- | include/linux/ceph/messenger.h | 1 | ||||
-rw-r--r-- | net/ceph/Makefile | 3 | ||||
-rw-r--r-- | net/ceph/messenger.c | 1495 | ||||
-rw-r--r-- | net/ceph/messenger_v1.c | 1502 |
4 files changed, 1505 insertions, 1496 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 8cc8b08eb3dd..b5268127c55e 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -382,6 +382,7 @@ int ceph_con_in_msg_alloc(struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); void ceph_con_get_out_msg(struct ceph_connection *con); +/* messenger_v1.c */ int ceph_con_v1_try_read(struct ceph_connection *con); int ceph_con_v1_try_write(struct ceph_connection *con); void ceph_con_v1_revoke(struct ceph_connection *con); diff --git a/net/ceph/Makefile b/net/ceph/Makefile index ce09bb4fb249..df02bd8d6c7b 100644 --- a/net/ceph/Makefile +++ b/net/ceph/Makefile @@ -14,4 +14,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \ crypto.o armor.o \ auth_x.o \ ceph_strings.o ceph_hash.o \ - pagevec.o snapshot.o string_table.o + pagevec.o snapshot.o string_table.o \ + messenger_v1.o diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4ca7d9b594c7..544cfdbe52d6 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -137,12 +137,6 @@ bool ceph_con_flag_test_and_set(struct ceph_connection *con, static struct kmem_cache *ceph_msg_cache; -/* static tag bytes (protocol control messages) */ -static char tag_msg = CEPH_MSGR_TAG_MSG; -static char tag_ack = CEPH_MSGR_TAG_ACK; -static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; -static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; - #ifdef CONFIG_LOCKDEP static struct lock_class_key socket_class; #endif @@ -478,96 +472,6 @@ int ceph_tcp_connect(struct ceph_connection *con) } /* - * If @buf is NULL, discard up to @len bytes. - */ -static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) -{ - struct kvec iov = {buf, len}; - struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - int r; - - if (!buf) - msg.msg_flags |= MSG_TRUNC; - - iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len); - r = sock_recvmsg(sock, &msg, msg.msg_flags); - if (r == -EAGAIN) - r = 0; - return r; -} - -static int ceph_tcp_recvpage(struct socket *sock, struct page *page, - int page_offset, size_t length) -{ - struct bio_vec bvec = { - .bv_page = page, - .bv_offset = page_offset, - .bv_len = length - }; - struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - int r; - - BUG_ON(page_offset + length > PAGE_SIZE); - iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length); - r = sock_recvmsg(sock, &msg, msg.msg_flags); - if (r == -EAGAIN) - r = 0; - return r; -} - -/* - * write something. @more is true if caller will be sending more data - * shortly. - */ -static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, - size_t kvlen, size_t len, bool more) -{ - struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - int r; - - if (more) - msg.msg_flags |= MSG_MORE; - else - msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ - - r = kernel_sendmsg(sock, &msg, iov, kvlen, len); - if (r == -EAGAIN) - r = 0; - return r; -} - -/* - * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST - */ -static int ceph_tcp_sendpage(struct socket *sock, struct page *page, - int offset, size_t size, int more) -{ - ssize_t (*sendpage)(struct socket *sock, struct page *page, - int offset, size_t size, int flags); - int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; - int ret; - - /* - * sendpage cannot properly handle pages with page_count == 0, - * we need to fall back to sendmsg if that's the case. - * - * Same goes for slab pages: skb_can_coalesce() allows - * coalescing neighboring slab objects into a single frag which - * triggers one of hardened usercopy checks. - */ - if (sendpage_ok(page)) - sendpage = sock->ops->sendpage; - else - sendpage = sock_no_sendpage; - - ret = sendpage(sock, page, offset, size, flags); - if (ret == -EAGAIN) - ret = 0; - - return ret; -} - -/* * Shutdown/close the socket for the given connection. */ int ceph_con_close_socket(struct ceph_connection *con) @@ -593,11 +497,6 @@ int ceph_con_close_socket(struct ceph_connection *con) return rc; } -void ceph_con_v1_reset_protocol(struct ceph_connection *con) -{ - con->out_skip = 0; -} - static void ceph_con_reset_protocol(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -636,12 +535,6 @@ static void ceph_msg_remove_list(struct list_head *head) } } -void ceph_con_v1_reset_session(struct ceph_connection *con) -{ - con->connect_seq = 0; - con->peer_global_seq = 0; -} - void ceph_con_reset_session(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -702,11 +595,6 @@ void ceph_con_open(struct ceph_connection *con, } EXPORT_SYMBOL(ceph_con_open); -bool ceph_con_v1_opened(struct ceph_connection *con) -{ - return con->connect_seq; -} - /* * return true if this connection ever successfully opened */ @@ -739,7 +627,6 @@ void ceph_con_init(struct ceph_connection *con, void *private, } EXPORT_SYMBOL(ceph_con_init); - /* * We maintain a global counter to order connection attempts. Get * a unique seq greater than @gt. @@ -805,50 +692,6 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) } } -static void con_out_kvec_reset(struct ceph_connection *con) -{ - BUG_ON(con->out_skip); - - con->out_kvec_left = 0; - con->out_kvec_bytes = 0; - con->out_kvec_cur = &con->out_kvec[0]; -} - -static void con_out_kvec_add(struct ceph_connection *con, - size_t size, void *data) -{ - int index = con->out_kvec_left; - - BUG_ON(con->out_skip); - BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); - - con->out_kvec[index].iov_len = size; - con->out_kvec[index].iov_base = data; - con->out_kvec_left++; - con->out_kvec_bytes += size; -} - -/* - * Chop off a kvec from the end. Return residual number of bytes for - * that kvec, i.e. how many bytes would have been written if the kvec - * hadn't been nuked. - */ -static int con_out_kvec_skip(struct ceph_connection *con) -{ - int off = con->out_kvec_cur - con->out_kvec; - int skip = 0; - - if (con->out_kvec_bytes > 0) { - skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; - BUG_ON(con->out_kvec_bytes < skip); - BUG_ON(!con->out_kvec_left); - con->out_kvec_bytes -= skip; - con->out_kvec_left--; - } - - return skip; -} - #ifdef CONFIG_BLOCK /* @@ -1260,307 +1103,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) cursor->need_crc = new_piece; } -static size_t sizeof_footer(struct ceph_connection *con) -{ - return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? - sizeof(struct ceph_msg_footer) : - sizeof(struct ceph_msg_footer_old); -} - -static void prepare_message_data(struct ceph_msg *msg, u32 data_len) -{ - /* Initialize data cursor */ - - ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); -} - -/* - * Prepare footer for currently outgoing message, and finish things - * off. Assumes out_kvec* are already valid.. we just add on to the end. - */ -static void prepare_write_message_footer(struct ceph_connection *con) -{ - struct ceph_msg *m = con->out_msg; - - m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; - - dout("prepare_write_message_footer %p\n", con); - con_out_kvec_add(con, sizeof_footer(con), &m->footer); - if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { - if (con->ops->sign_message) - con->ops->sign_message(m); - else - m->footer.sig = 0; - } else { - m->old_footer.flags = m->footer.flags; - } - con->out_more = m->more_to_follow; - con->out_msg_done = true; -} - -/* - * Prepare headers for the next outgoing message. - */ -static void prepare_write_message(struct ceph_connection *con) -{ - struct ceph_msg *m; - u32 crc; - - con_out_kvec_reset(con); - con->out_msg_done = false; - - /* Sneak an ack in there first? If we can get it into the same - * TCP packet that's a good thing. */ - if (con->in_seq > con->in_seq_acked) { - con->in_seq_acked = con->in_seq; - con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); - } - - ceph_con_get_out_msg(con); - m = con->out_msg; - - dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", - m, con->out_seq, le16_to_cpu(m->hdr.type), - le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), - m->data_length); - WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); - WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); - - /* tag + hdr + front + middle */ - con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); - con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); - - if (m->middle) - con_out_kvec_add(con, m->middle->vec.iov_len, - m->middle->vec.iov_base); - - /* fill in hdr crc and finalize hdr */ - crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); - con->out_msg->hdr.crc = cpu_to_le32(crc); - memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); - - /* fill in front and middle crc, footer */ - crc = crc32c(0, m->front.iov_base, m->front.iov_len); - con->out_msg->footer.front_crc = cpu_to_le32(crc); - if (m->middle) { - crc = crc32c(0, m->middle->vec.iov_base, - m->middle->vec.iov_len); - con->out_msg->footer.middle_crc = cpu_to_le32(crc); - } else - con->out_msg->footer.middle_crc = 0; - dout("%s front_crc %u middle_crc %u\n", __func__, - le32_to_cpu(con->out_msg->footer.front_crc), - le32_to_cpu(con->out_msg->footer.middle_crc)); - con->out_msg->footer.flags = 0; - - /* is there a data payload? */ - con->out_msg->footer.data_crc = 0; - if (m->data_length) { - prepare_message_data(con->out_msg, m->data_length); - con->out_more = 1; /* data + footer will follow */ - } else { - /* no, queue up footer too and be done */ - prepare_write_message_footer(con); - } - - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Prepare an ack. - */ -static void prepare_write_ack(struct ceph_connection *con) -{ - dout("prepare_write_ack %p %llu -> %llu\n", con, - con->in_seq_acked, con->in_seq); - con->in_seq_acked = con->in_seq; - - con_out_kvec_reset(con); - - con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); - - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); - - con->out_more = 1; /* more will follow.. eventually.. */ - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Prepare to share the seq during handshake - */ -static void prepare_write_seq(struct ceph_connection *con) -{ - dout("prepare_write_seq %p %llu -> %llu\n", con, - con->in_seq_acked, con->in_seq); - con->in_seq_acked = con->in_seq; - - con_out_kvec_reset(con); - - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); - - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Prepare to write keepalive byte. - */ -static void prepare_write_keepalive(struct ceph_connection *con) -{ - dout("prepare_write_keepalive %p\n", con); - con_out_kvec_reset(con); - if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { - struct timespec64 now; - - ktime_get_real_ts64(&now); - con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); - ceph_encode_timespec64(&con->out_temp_keepalive2, &now); - con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), - &con->out_temp_keepalive2); - } else { - con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); - } - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Connection negotiation. - */ - -static int get_connect_authorizer(struct ceph_connection *con) -{ - struct ceph_auth_handshake *auth; - int auth_proto; - - if (!con->ops->get_authorizer) { - con->auth = NULL; - con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; - con->out_connect.authorizer_len = 0; - return 0; - } - - auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); - if (IS_ERR(auth)) - return PTR_ERR(auth); - - con->auth = auth; - con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); - con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); - return 0; -} - -/* - * We connected to a peer and are saying hello. - */ -static void prepare_write_banner(struct ceph_connection *con) -{ - con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); - con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), - &con->msgr->my_enc_addr); - - con->out_more = 0; - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -static void __prepare_write_connect(struct ceph_connection *con) -{ - con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); - if (con->auth) - con_out_kvec_add(con, con->auth->authorizer_buf_len, - con->auth->authorizer_buf); - - con->out_more = 0; - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -static int prepare_write_connect(struct ceph_connection *con) -{ - unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); - int proto; - int ret; - - switch (con->peer_name.type) { - case CEPH_ENTITY_TYPE_MON: - proto = CEPH_MONC_PROTOCOL; - break; - case CEPH_ENTITY_TYPE_OSD: - proto = CEPH_OSDC_PROTOCOL; - break; - case CEPH_ENTITY_TYPE_MDS: - proto = CEPH_MDSC_PROTOCOL; - break; - default: - BUG(); - } - - dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, - con->connect_seq, global_seq, proto); - - con->out_connect.features = - cpu_to_le64(from_msgr(con->msgr)->supported_features); - con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); - con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); - con->out_connect.global_seq = cpu_to_le32(global_seq); - con->out_connect.protocol_version = cpu_to_le32(proto); - con->out_connect.flags = 0; - - ret = get_connect_authorizer(con); - if (ret) - return ret; - - __prepare_write_connect(con); - return 0; -} - -/* - * write as much of pending kvecs to the socket as we can. - * 1 -> done - * 0 -> socket full, but more to do - * <0 -> error - */ -static int write_partial_kvec(struct ceph_connection *con) -{ - int ret; - - dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); - while (con->out_kvec_bytes > 0) { - ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, - con->out_kvec_left, con->out_kvec_bytes, - con->out_more); - if (ret <= 0) - goto out; - con->out_kvec_bytes -= ret; - if (con->out_kvec_bytes == 0) - break; /* done */ - - /* account for full iov entries consumed */ - while (ret >= con->out_kvec_cur->iov_len) { - BUG_ON(!con->out_kvec_left); - ret -= con->out_kvec_cur->iov_len; - con->out_kvec_cur++; - con->out_kvec_left--; - } - /* and for a partially-consumed entry */ - if (ret) { - con->out_kvec_cur->iov_len -= ret; - con->out_kvec_cur->iov_base += ret; - } - } - con->out_kvec_left = 0; - ret = 1; -out: - dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, - con->out_kvec_bytes, con->out_kvec_left, ret); - return ret; /* done! */ -} - u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, unsigned int length) { @@ -1573,256 +1115,6 @@ u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, return crc; } -/* - * Write as much message data payload as we can. If we finish, queue - * up the footer. - * 1 -> done, footer is now queued in out_kvec[]. - * 0 -> socket full, but more to do - * <0 -> error - */ -static int write_partial_message_data(struct ceph_connection *con) -{ - struct ceph_msg *msg = con->out_msg; - struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; - u32 crc; - - dout("%s %p msg %p\n", __func__, con, msg); - - if (!msg->num_data_items) - return -EINVAL; - - /* - * Iterate through each page that contains data to be - * written, and send as much as possible for each. - * - * If we are calculating the data crc (the default), we will - * need to map the page. If we have no pages, they have - * been revoked, so use the zero page. - */ - crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; - while (cursor->total_resid) { - struct page *page; - size_t page_offset; - size_t length; - int ret; - - if (!cursor->resid) { - ceph_msg_data_advance(cursor, 0); - continue; - } - - page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); - if (length == cursor->total_resid) - more = MSG_MORE; - ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, - more); - if (ret <= 0) { - if (do_datacrc) - msg->footer.data_crc = cpu_to_le32(crc); - - return ret; - } - if (do_datacrc && cursor->need_crc) - crc = ceph_crc32c_page(crc, page, page_offset, length); - ceph_msg_data_advance(cursor, (size_t)ret); - } - - dout("%s %p msg %p done\n", __func__, con, msg); - - /* prepare and queue up footer, too */ - if (do_datacrc) - msg->footer.data_crc = cpu_to_le32(crc); - else - msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; - con_out_kvec_reset(con); - prepare_write_message_footer(con); - - return 1; /* must return > 0 to indicate success */ -} - -/* - * write some zeros - */ -static int write_partial_skip(struct ceph_connection *con) -{ - int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; - int ret; - - dout("%s %p %d left\n", __func__, con, con->out_skip); - while (con->out_skip > 0) { - size_t size = min(con->out_skip, (int) PAGE_SIZE); - - if (size == con->out_skip) - more = MSG_MORE; - ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, - more); - if (ret <= 0) - goto out; - con->out_skip -= ret; - } - ret = 1; -out: - return ret; -} - -/* - * Prepare to read connection handshake, or an ack. - */ -static void prepare_read_banner(struct ceph_connection *con) -{ - dout("prepare_read_banner %p\n", con); - con->in_base_pos = 0; -} - -static void prepare_read_connect(struct ceph_connection *con) -{ - dout("prepare_read_connect %p\n", con); - con->in_base_pos = 0; -} - -static void prepare_read_ack(struct ceph_connection *con) -{ - dout("prepare_read_ack %p\n", con); - con->in_base_pos = 0; -} - -static void prepare_read_seq(struct ceph_connection *con) -{ - dout("prepare_read_seq %p\n", con); - con->in_base_pos = 0; - con->in_tag = CEPH_MSGR_TAG_SEQ; -} - -static void prepare_read_tag(struct ceph_connection *con) -{ - dout("prepare_read_tag %p\n", con); - con->in_base_pos = 0; - con->in_tag = CEPH_MSGR_TAG_READY; -} - -static void prepare_read_keepalive_ack(struct ceph_connection *con) -{ - dout("prepare_read_keepalive_ack %p\n", con); - con->in_base_pos = 0; -} - -/* - * Prepare to read a message. - */ -static int prepare_read_message(struct ceph_connection *con) -{ - dout("prepare_read_message %p\n", con); - BUG_ON(con->in_msg != NULL); - con->in_base_pos = 0; - con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; - return 0; -} - - -static int read_partial(struct ceph_connection *con, - int end, int size, void *object) -{ - while (con->in_base_pos < end) { - int left = end - con->in_base_pos; - int have = size - left; - int ret = ceph_tcp_recvmsg(con->sock, object + have, left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } - return 1; -} - - -/* - * Read all or part of the connect-side handshake on a new connection - */ -static int read_partial_banner(struct ceph_connection *con) -{ - int size; - int end; - int ret; - - dout("read_partial_banner %p at %d\n", con, con->in_base_pos); - - /* peer's banner */ - size = strlen(CEPH_BANNER); - end = size; - ret = read_partial(con, end, size, con->in_banner); - if (ret <= 0) - goto out; - - size = sizeof (con->actual_peer_addr); - end += size; - ret = read_partial(con, end, size, &con->actual_peer_addr); - if (ret <= 0) - goto out; - ceph_decode_banner_addr(&con->actual_peer_addr); - - size = sizeof (con->peer_addr_for_me); - end += size; - ret = read_partial(con, end, size, &con->peer_addr_for_me); - if (ret <= 0) - goto out; - ceph_decode_banner_addr(&con->peer_addr_for_me); - -out: - return ret; -} - -static int read_partial_connect(struct ceph_connection *con) -{ - int size; - int end; - int ret; - - dout("read_partial_connect %p at %d\n", con, con->in_base_pos); - - size = sizeof (con->in_reply); - end = size; - ret = read_partial(con, end, size, &con->in_reply); - if (ret <= 0) - goto out; - - if (con->auth) { - size = le32_to_cpu(con->in_reply.authorizer_len); - if (size > con->auth->authorizer_reply_buf_len) { - pr_err("authorizer reply too big: %d > %zu\n", size, - con->auth->authorizer_reply_buf_len); - ret = -EINVAL; - goto out; - } - - end += size; - ret = read_partial(con, end, size, - con->auth->authorizer_reply_buf); - if (ret <= 0) - goto out; - } - - dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", - con, (int)con->in_reply.tag, - le32_to_cpu(con->in_reply.connect_seq), - le32_to_cpu(con->in_reply.global_seq)); -out: - return ret; -} - -/* - * Verify the hello banner looks okay. - */ -static int verify_hello(struct ceph_connection *con) -{ - if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - pr_err("connect to %s got bad banner\n", - ceph_pr_addr(&con->peer_addr)); - con->error_msg = "protocol error, bad banner"; - return -1; - } - return 0; -} bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) { @@ -2032,492 +1324,6 @@ bad: return ret; } -static int process_banner(struct ceph_connection *con) -{ - struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; - - dout("process_banner on %p\n", con); - - if (verify_hello(con) < 0) - return -1; - - /* - * Make sure the other end is who we wanted. note that the other - * end may not yet know their ip address, so if it's 0.0.0.0, give - * them the benefit of the doubt. - */ - if (memcmp(&con->peer_addr, &con->actual_peer_addr, - sizeof(con->peer_addr)) != 0 && - !(ceph_addr_is_blank(&con->actual_peer_addr) && - con->actual_peer_addr.nonce == con->peer_addr.nonce)) { - pr_warn("wrong peer, want %s/%u, got %s/%u\n", - ceph_pr_addr(&con->peer_addr), - le32_to_cpu(con->peer_addr.nonce), - ceph_pr_addr(&con->actual_peer_addr), - le32_to_cpu(con->actual_peer_addr.nonce)); - con->error_msg = "wrong peer at address"; - return -1; - } - - /* - * did we learn our address? - */ - if (ceph_addr_is_blank(my_addr)) { - memcpy(&my_addr->in_addr, - &con->peer_addr_for_me.in_addr, - sizeof(con->peer_addr_for_me.in_addr)); - ceph_addr_set_port(my_addr, 0); - ceph_encode_my_addr(con->msgr); - dout("process_banner learned my addr is %s\n", - ceph_pr_addr(my_addr)); - } - - return 0; -} - -static int process_connect(struct ceph_connection *con) -{ - u64 sup_feat = from_msgr(con->msgr)->supported_features; - u64 req_feat = from_msgr(con->msgr)->required_features; - u64 server_feat = le64_to_cpu(con->in_reply.features); - int ret; - - dout("process_connect on %p tag %d\n", con, (int)con->in_tag); - - if (con->auth) { - int len = le32_to_cpu(con->in_reply.authorizer_len); - - /* - * Any connection that defines ->get_authorizer() - * should also define ->add_authorizer_challenge() and - * ->verify_authorizer_reply(). - * - * See get_connect_authorizer(). - */ - if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { - ret = con->ops->add_authorizer_challenge( - con, con->auth->authorizer_reply_buf, len); - if (ret < 0) - return ret; - - con_out_kvec_reset(con); - __prepare_write_connect(con); - prepare_read_connect(con); - return 0; - } - - if (len) { - ret = con->ops->verify_authorizer_reply(con); - if (ret < 0) { - con->error_msg = "bad authorize reply"; - return ret; - } - } - } - - switch (con->in_reply.tag) { - case CEPH_MSGR_TAG_FEATURES: - pr_err("%s%lld %s feature set mismatch," - " my %llx < server's %llx, missing %llx\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - sup_feat, server_feat, server_feat & ~sup_feat); - con->error_msg = "missing required protocol features"; - return -1; - - case CEPH_MSGR_TAG_BADPROTOVER: - pr_err("%s%lld %s protocol version mismatch," - " my %d != server's %d\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - le32_to_cpu(con->out_connect.protocol_version), - le32_to_cpu(con->in_reply.protocol_version)); - con->error_msg = "protocol version mismatch"; - return -1; - - case CEPH_MSGR_TAG_BADAUTHORIZER: - con->auth_retry++; - dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, - con->auth_retry); - if (con->auth_retry == 2) { - con->error_msg = "connect authorization failure"; - return -1; - } - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - break; - - case CEPH_MSGR_TAG_RESETSESSION: - /* - * If we connected with a large connect_seq but the peer - * has no record of a session with us (no connection, or - * connect_seq == 0), they will send RESETSESION to indicate - * that they must have reset their session, and may have - * dropped messages. - */ - dout("process_connect got RESET peer seq %u\n", - le32_to_cpu(con->in_reply.connect_seq)); - pr_info("%s%lld %s session reset\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr)); - ceph_con_reset_session(con); - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - - /* Tell ceph about it. */ - mutex_unlock(&con->mutex); - if (con->ops->peer_reset) - con->ops->peer_reset(con); - mutex_lock(&con->mutex); - if (con->state != CEPH_CON_S_V1_CONNECT_MSG) - return -EAGAIN; - break; - - case CEPH_MSGR_TAG_RETRY_SESSION: - /* - * If we sent a smaller connect_seq than the peer has, try - * again with a larger value. - */ - dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", - le32_to_cpu(con->out_connect.connect_seq), - le32_to_cpu(con->in_reply.connect_seq)); - con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - break; - - case CEPH_MSGR_TAG_RETRY_GLOBAL: - /* - * If we sent a smaller global_seq than the peer has, try - * again with a larger value. - */ - dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", - con->peer_global_seq, - le32_to_cpu(con->in_reply.global_seq)); - ceph_get_global_seq(con->msgr, - le32_to_cpu(con->in_reply.global_seq)); - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - break; - - case CEPH_MSGR_TAG_SEQ: - case CEPH_MSGR_TAG_READY: - if (req_feat & ~server_feat) { - pr_err("%s%lld %s protocol feature mismatch," - " my required %llx > server's %llx, need %llx\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - req_feat, server_feat, req_feat & ~server_feat); - con->error_msg = "missing required protocol features"; - return -1; - } - - WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); - con->state = CEPH_CON_S_OPEN; - con->auth_retry = 0; /* we authenticated; clear flag */ - con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); - con->connect_seq++; - con->peer_features = server_feat; - dout("process_connect got READY gseq %d cseq %d (%d)\n", - con->peer_global_seq, - le32_to_cpu(con->in_reply.connect_seq), - con->connect_seq); - WARN_ON(con->connect_seq != - le32_to_cpu(con->in_reply.connect_seq)); - - if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); - - con->delay = 0; /* reset backoff memory */ - - if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { - prepare_write_seq(con); - prepare_read_seq(con); - } else { - prepare_read_tag(con); - } - break; - - case CEPH_MSGR_TAG_WAIT: - /* - * If there is a connection race (we are opening - * connections to each other), one of us may just have - * to WAIT. This shouldn't happen if we are the - * client. - */ - con->error_msg = "protocol error, got WAIT as client"; - return -1; - - default: - con->error_msg = "protocol error, garbage tag during connect"; - return -1; - } - return 0; -} - - -/* - * read (part of) an ack - */ -static int read_partial_ack(struct ceph_connection *con) -{ - int size = sizeof (con->in_temp_ack); - int end = size; - - return read_partial(con, end, size, &con->in_temp_ack); -} - -/* - * We can finally discard anything that's been acked. - */ -static void process_ack(struct ceph_connection *con) -{ - u64 ack = le64_to_cpu(con->in_temp_ack); - - if (con->in_tag == CEPH_MSGR_TAG_ACK) - ceph_con_discard_sent(con, ack); - else - ceph_con_discard_requeued(con, ack); - - prepare_read_tag(con); -} - - -static int read_partial_message_section(struct ceph_connection *con, - struct kvec *section, - unsigned int sec_len, u32 *crc) -{ - int ret, left; - - BUG_ON(!section); - - while (section->iov_len < sec_len) { - BUG_ON(section->iov_base == NULL); - left = sec_len - section->iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + - section->iov_len, left); - if (ret <= 0) - return ret; - section->iov_len += ret; - } - if (section->iov_len == sec_len) - *crc = crc32c(0, section->iov_base, section->iov_len); - - return 1; -} - -static int read_partial_msg_data(struct ceph_connection *con) -{ - struct ceph_msg *msg = con->in_msg; - struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - struct page *page; - size_t page_offset; - size_t length; - u32 crc = 0; - int ret; - - if (!msg->num_data_items) - return -EIO; - - if (do_datacrc) - crc = con->in_data_crc; - while (cursor->total_resid) { - if (!cursor->resid) { - ceph_msg_data_advance(cursor, 0); - continue; - } - - page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); - ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); - if (ret <= 0) { - if (do_datacrc) - con->in_data_crc = crc; - - return ret; - } - - if (do_datacrc) - crc = ceph_crc32c_page(crc, page, page_offset, ret); - ceph_msg_data_advance(cursor, (size_t)ret); - } - if (do_datacrc) - con->in_data_crc = crc; - - return 1; /* must return > 0 to indicate success */ -} - -/* - * read (part of) a message. - */ -static int read_partial_message(struct ceph_connection *con) -{ - struct ceph_msg *m = con->in_msg; - int size; - int end; - int ret; - unsigned int front_len, middle_len, data_len; - bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); - u64 seq; - u32 crc; - - dout("read_partial_message con %p msg %p\n", con, m); - - /* header */ - size = sizeof (con->in_hdr); - end = size; - ret = read_partial(con, end, size, &con->in_hdr); - if (ret <= 0) - return ret; - - crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); - if (cpu_to_le32(crc) != con->in_hdr.crc) { - pr_err("read_partial_message bad hdr crc %u != expected %u\n", - crc, con->in_hdr.crc); - return -EBADMSG; - } - - front_len = le32_to_cpu(con->in_hdr.front_len); - if (front_len > CEPH_MSG_MAX_FRONT_LEN) - return -EIO; - middle_len = le32_to_cpu(con->in_hdr.middle_len); - if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) - return -EIO; - data_len = le32_to_cpu(con->in_hdr.data_len); - if (data_len > CEPH_MSG_MAX_DATA_LEN) - return -EIO; - - /* verify seq# */ - seq = le64_to_cpu(con->in_hdr.seq); - if ((s64)seq - (s64)con->in_seq < 1) { - pr_info("skipping %s%lld %s seq %lld expected %lld\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - seq, con->in_seq + 1); - con->in_base_pos = -front_len - middle_len - data_len - - sizeof_footer(con); - con->in_tag = CEPH_MSGR_TAG_READY; - return 1; - } else if ((s64)seq - (s64)con->in_seq > 1) { - pr_err("read_partial_message bad seq %lld expected %lld\n", - seq, con->in_seq + 1); - con->error_msg = "bad message sequence # for incoming message"; - return -EBADE; - } - - /* allocate message? */ - if (!con->in_msg) { - int skip = 0; - - dout("got hdr type %d front %d data %d\n", con->in_hdr.type, - front_len, data_len); - ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); - if (ret < 0) - return ret; - - BUG_ON(!con->in_msg ^ skip); - if (skip) { - /* skip this message */ - dout("alloc_msg said skip message\n"); - con->in_base_pos = -front_len - middle_len - data_len - - sizeof_footer(con); - con->in_tag = CEPH_MSGR_TAG_READY; - con->in_seq++; - return 1; - } - - BUG_ON(!con->in_msg); - BUG_ON(con->in_msg->con != con); - m = con->in_msg; - m->front.iov_len = 0; /* haven't read it yet */ - if (m->middle) - m->middle->vec.iov_len = 0; - - /* prepare for data payload, if any */ - - if (data_len) - prepare_message_data(con->in_msg, data_len); - } - - /* front */ - ret = read_partial_message_section(con, &m->front, front_len, - &con->in_front_crc); - if (ret <= 0) - return ret; - - /* middle */ - if (m->middle) { - ret = read_partial_message_section(con, &m->middle->vec, - middle_len, - &con->in_middle_crc); - if (ret <= 0) - return ret; - } - - /* (page) data */ - if (data_len) { - ret = read_partial_msg_data(con); - if (ret <= 0) - return ret; - } - - /* footer */ - size = sizeof_footer(con); - end += size; - ret = read_partial(con, end, size, &m->footer); - if (ret <= 0) - return ret; - - if (!need_sign) { - m->footer.flags = m->old_footer.flags; - m->footer.sig = 0; - } - - dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", - m, front_len, m->footer.front_crc, middle_len, - m->footer.middle_crc, data_len, m->footer.data_crc); - - /* crc ok? */ - if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { - pr_err("read_partial_message %p front crc %u != exp. %u\n", - m, con->in_front_crc, m->footer.front_crc); - return -EBADMSG; - } - if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { - pr_err("read_partial_message %p middle crc %u != exp %u\n", - m, con->in_middle_crc, m->footer.middle_crc); - return -EBADMSG; - } - if (do_datacrc && - (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && - con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { - pr_err("read_partial_message %p data crc %u != exp. %u\n", m, - con->in_data_crc, le32_to_cpu(m->footer.data_crc)); - return -EBADMSG; - } - - if (need_sign && con->ops->check_message_signature && - con->ops->check_message_signature(m)) { - pr_err("read_partial_message %p signature check failed\n", m); - return -EBADMSG; - } - - return 1; /* done! */ -} - /* * Process message. This happens in the worker thread. The callback should * be careful not to do anything that waits on other incoming messages or it @@ -2551,263 +1357,6 @@ void ceph_con_process_message(struct ceph_connection *con) mutex_lock(&con->mutex); } -static int read_keepalive_ack(struct ceph_connection *con) -{ - struct ceph_timespec ceph_ts; - size_t size = sizeof(ceph_ts); - int ret = read_partial(con, size, size, &ceph_ts); - if (ret <= 0) - return ret; - ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); - prepare_read_tag(con); - return 1; -} - -/* - * Write something to the socket. Called in a worker thread when the - * socket appears to be writeable and we have something ready to send. - */ -int ceph_con_v1_try_write(struct ceph_connection *con) -{ - int ret = 1; - - dout("try_write start %p state %d\n", con, con->state); - if (con->state != CEPH_CON_S_PREOPEN && - con->state != CEPH_CON_S_V1_BANNER && - con->state != CEPH_CON_S_V1_CONNECT_MSG && - con->state != CEPH_CON_S_OPEN) - return 0; - - /* open the socket first? */ - if (con->state == CEPH_CON_S_PREOPEN) { - BUG_ON(con->sock); - con->state = CEPH_CON_S_V1_BANNER; - - con_out_kvec_reset(con); - prepare_write_banner(con); - prepare_read_banner(con); - - BUG_ON(con->in_msg); - con->in_tag = CEPH_MSGR_TAG_READY; - dout("try_write initiating connect on %p new state %d\n", - con, con->state); - ret = ceph_tcp_connect(con); - if (ret < 0) { - con->error_msg = "connect error"; - goto out; - } - } - -more: - dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); - BUG_ON(!con->sock); - - /* kvec data queued? */ - if (con->out_kvec_left) { - ret = write_partial_kvec(con); - if (ret <= 0) - goto out; - } - if (con->out_skip) { - ret = write_partial_skip(con); - if (ret <= 0) - goto out; - } - - /* msg pages? */ - if (con->out_msg) { - if (con->out_msg_done) { - ceph_msg_put(con->out_msg); - con->out_msg = NULL; /* we're done with this one */ - goto do_next; - } - - ret = write_partial_message_data(con); - if (ret == 1) - goto more; /* we need to send the footer, too! */ - if (ret == 0) - goto out; - if (ret < 0) { - dout("try_write write_partial_message_data err %d\n", - ret); - goto out; - } - } - -do_next: - if (con->state == CEPH_CON_S_OPEN) { - if (ceph_con_flag_test_and_clear(con, - CEPH_CON_F_KEEPALIVE_PENDING)) { - prepare_write_keepalive(con); - goto more; - } - /* is anything else pending? */ - if (!list_empty(&con->out_queue)) { - prepare_write_message(con); - goto more; - } - if (con->in_seq > con->in_seq_acked) { - prepare_write_ack(con); - goto more; - } - } - - /* Nothing to do! */ - ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); - dout("try_write nothing else to write.\n"); - ret = 0; -out: - dout("try_write done on %p ret %d\n", con, ret); - return ret; -} - -/* - * Read what we can from the socket. - */ -int ceph_con_v1_try_read(struct ceph_connection *con) -{ - int ret = -1; - -more: - dout("try_read start %p state %d\n", con, con->state); - if (con->state != CEPH_CON_S_V1_BANNER && - con->state != CEPH_CON_S_V1_CONNECT_MSG && - con->state != CEPH_CON_S_OPEN) - return 0; - - BUG_ON(!con->sock); - - dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, - con->in_base_pos); - - if (con->state == CEPH_CON_S_V1_BANNER) { - ret = read_partial_banner(con); - if (ret <= 0) - goto out; - ret = process_banner(con); - if (ret < 0) - goto out; - - con->state = CEPH_CON_S_V1_CONNECT_MSG; - - /* - * Received banner is good, exchange connection info. - * Do not reset out_kvec, as sending our banner raced - * with receiving peer banner after connect completed. - */ - ret = prepare_write_connect(con); - if (ret < 0) - goto out; - prepare_read_connect(con); - - /* Send connection info before awaiting response */ - goto out; - } - - if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { - ret = read_partial_connect(con); - if (ret <= 0) - goto out; - ret = process_connect(con); - if (ret < 0) - goto out; - goto more; - } - - WARN_ON(con->state != CEPH_CON_S_OPEN); - - if (con->in_base_pos < 0) { - /* - * skipping + discarding content. - */ - ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); - if (ret <= 0) - goto out; - dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); - con->in_base_pos += ret; - if (con->in_base_pos) - goto more; - } - if (con->in_tag == CEPH_MSGR_TAG_READY) { - /* - * what's next? - */ - ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); - if (ret <= 0) - goto out; - dout("try_read got tag %d\n", (int)con->in_tag); - switch (con->in_tag) { - case CEPH_MSGR_TAG_MSG: - prepare_read_message(con); - break; - case CEPH_MSGR_TAG_ACK: - prepare_read_ack(con); - break; - case CEPH_MSGR_TAG_KEEPALIVE2_ACK: - prepare_read_keepalive_ack(con); - break; - case CEPH_MSGR_TAG_CLOSE: - ceph_con_close_socket(con); - con->state = CEPH_CON_S_CLOSED; - goto out; - default: - goto bad_tag; - } - } - if (con->in_tag == CEPH_MSGR_TAG_MSG) { - ret = read_partial_message(con); - if (ret <= 0) { - switch (ret) { - case -EBADMSG: - con->error_msg = "bad crc/signature"; - fallthrough; - case -EBADE: - ret = -EIO; - break; - case -EIO: - con->error_msg = "io error"; - break; - } - goto out; - } - if (con->in_tag == CEPH_MSGR_TAG_READY) - goto more; - ceph_con_process_message(con); - if (con->state == CEPH_CON_S_OPEN) - prepare_read_tag(con); - goto more; - } - if (con->in_tag == CEPH_MSGR_TAG_ACK || - con->in_tag == CEPH_MSGR_TAG_SEQ) { - /* - * the final handshake seq exchange is semantically - * equivalent to an ACK - */ - ret = read_partial_ack(con); - if (ret <= 0) - goto out; - process_ack(con); - goto more; - } - if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { - ret = read_keepalive_ack(con); - if (ret <= 0) - goto out; - goto more; - } - -out: - dout("try_read done on %p ret %d\n", con, ret); - return ret; - -bad_tag: - pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); - con->error_msg = "protocol error, garbage tag"; - ret = -1; - goto out; -} - - /* * Atomically queue work on a connection after the specified delay. * Bump @con reference to avoid races with connection teardown. @@ -3026,7 +1575,6 @@ static void con_fault(struct ceph_connection *con) } } - void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) { u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; @@ -3131,29 +1679,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) } EXPORT_SYMBOL(ceph_con_send); -void ceph_con_v1_revoke(struct ceph_connection *con) -{ - struct ceph_msg *msg = con->out_msg; - - WARN_ON(con->out_skip); - /* footer */ - if (con->out_msg_done) { - con->out_skip += con_out_kvec_skip(con); - } else { - WARN_ON(!msg->data_length); - con->out_skip += sizeof_footer(con); - } - /* data, middle, front */ - if (msg->data_length) - con->out_skip += msg->cursor.total_resid; - if (msg->middle) - con->out_skip += con_out_kvec_skip(con); - con->out_skip += con_out_kvec_skip(con); - - dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, - con->out_kvec_bytes, con->out_skip); -} - /* * Revoke a message that was previously queued for send */ @@ -3191,26 +1716,6 @@ void ceph_msg_revoke(struct ceph_msg *msg) mutex_unlock(&con->mutex); } -void ceph_con_v1_revoke_incoming(struct ceph_connection *con) -{ - unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); - unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); - unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); - - /* skip rest of message */ - con->in_base_pos = con->in_base_pos - - sizeof(struct ceph_msg_header) - - front_len - - middle_len - - data_len - - sizeof(struct ceph_msg_footer); - - con->in_tag = CEPH_MSGR_TAG_READY; - con->in_seq++; - - dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); -} - /* * Revoke a message that we may be reading data into */ diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c new file mode 100644 index 000000000000..899038a9678e --- /dev/null +++ b/net/ceph/messenger_v1.c @@ -0,0 +1,1502 @@ +// SPDX-License-Identifier: GPL-2.0 +#include <linux/ceph/ceph_debug.h> + +#include <linux/bvec.h> +#include <linux/crc32c.h> +#include <linux/net.h> +#include <linux/socket.h> +#include <net/sock.h> + +#include <linux/ceph/ceph_features.h> +#include <linux/ceph/decode.h> +#include <linux/ceph/libceph.h> +#include <linux/ceph/messenger.h> + +/* static tag bytes (protocol control messages) */ +static char tag_msg = CEPH_MSGR_TAG_MSG; +static char tag_ack = CEPH_MSGR_TAG_ACK; +static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; +static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; + +/* + * If @buf is NULL, discard up to @len bytes. + */ +static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) +{ + struct kvec iov = {buf, len}; + struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; + int r; + + if (!buf) + msg.msg_flags |= MSG_TRUNC; + + iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len); + r = sock_recvmsg(sock, &msg, msg.msg_flags); + if (r == -EAGAIN) + r = 0; + return r; +} + +static int ceph_tcp_recvpage(struct socket *sock, struct page *page, + int page_offset, size_t length) +{ + struct bio_vec bvec = { + .bv_page = page, + .bv_offset = page_offset, + .bv_len = length + }; + struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; + int r; + + BUG_ON(page_offset + length > PAGE_SIZE); + iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length); + r = sock_recvmsg(sock, &msg, msg.msg_flags); + if (r == -EAGAIN) + r = 0; + return r; +} + +/* + * write something. @more is true if caller will be sending more data + * shortly. + */ +static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, + size_t kvlen, size_t len, bool more) +{ + struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; + int r; + + if (more) + msg.msg_flags |= MSG_MORE; + else + msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ + + r = kernel_sendmsg(sock, &msg, iov, kvlen, len); + if (r == -EAGAIN) + r = 0; + return r; +} + +/* + * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST + */ +static int ceph_tcp_sendpage(struct socket *sock, struct page *page, + int offset, size_t size, int more) +{ + ssize_t (*sendpage)(struct socket *sock, struct page *page, + int offset, size_t size, int flags); + int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; + int ret; + + /* + * sendpage cannot properly handle pages with page_count == 0, + * we need to fall back to sendmsg if that's the case. + * + * Same goes for slab pages: skb_can_coalesce() allows + * coalescing neighboring slab objects into a single frag which + * triggers one of hardened usercopy checks. + */ + if (sendpage_ok(page)) + sendpage = sock->ops->sendpage; + else + sendpage = sock_no_sendpage; + + ret = sendpage(sock, page, offset, size, flags); + if (ret == -EAGAIN) + ret = 0; + + return ret; +} + +static void con_out_kvec_reset(struct ceph_connection *con) +{ + BUG_ON(con->out_skip); + + con->out_kvec_left = 0; + con->out_kvec_bytes = 0; + con->out_kvec_cur = &con->out_kvec[0]; +} + +static void con_out_kvec_add(struct ceph_connection *con, + size_t size, void *data) +{ + int index = con->out_kvec_left; + + BUG_ON(con->out_skip); + BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); + + con->out_kvec[index].iov_len = size; + con->out_kvec[index].iov_base = data; + con->out_kvec_left++; + con->out_kvec_bytes += size; +} + +/* + * Chop off a kvec from the end. Return residual number of bytes for + * that kvec, i.e. how many bytes would have been written if the kvec + * hadn't been nuked. + */ +static int con_out_kvec_skip(struct ceph_connection *con) +{ + int off = con->out_kvec_cur - con->out_kvec; + int skip = 0; + + if (con->out_kvec_bytes > 0) { + skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; + BUG_ON(con->out_kvec_bytes < skip); + BUG_ON(!con->out_kvec_left); + con->out_kvec_bytes -= skip; + con->out_kvec_left--; + } + + return skip; +} + +static size_t sizeof_footer(struct ceph_connection *con) +{ + return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? + sizeof(struct ceph_msg_footer) : + sizeof(struct ceph_msg_footer_old); +} + +static void prepare_message_data(struct ceph_msg *msg, u32 data_len) +{ + /* Initialize data cursor */ + + ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); +} + +/* + * Prepare footer for currently outgoing message, and finish things + * off. Assumes out_kvec* are already valid.. we just add on to the end. + */ +static void prepare_write_message_footer(struct ceph_connection *con) +{ + struct ceph_msg *m = con->out_msg; + + m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; + + dout("prepare_write_message_footer %p\n", con); + con_out_kvec_add(con, sizeof_footer(con), &m->footer); + if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { + if (con->ops->sign_message) + con->ops->sign_message(m); + else + m->footer.sig = 0; + } else { + m->old_footer.flags = m->footer.flags; + } + con->out_more = m->more_to_follow; + con->out_msg_done = true; +} + +/* + * Prepare headers for the next outgoing message. + */ +static void prepare_write_message(struct ceph_connection *con) +{ + struct ceph_msg *m; + u32 crc; + + con_out_kvec_reset(con); + con->out_msg_done = false; + + /* Sneak an ack in there first? If we can get it into the same + * TCP packet that's a good thing. */ + if (con->in_seq > con->in_seq_acked) { + con->in_seq_acked = con->in_seq; + con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + con->out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof (con->out_temp_ack), + &con->out_temp_ack); + } + + ceph_con_get_out_msg(con); + m = con->out_msg; + + dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", + m, con->out_seq, le16_to_cpu(m->hdr.type), + le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), + m->data_length); + WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); + WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); + + /* tag + hdr + front + middle */ + con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); + con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); + con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); + + if (m->middle) + con_out_kvec_add(con, m->middle->vec.iov_len, + m->middle->vec.iov_base); + + /* fill in hdr crc and finalize hdr */ + crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); + con->out_msg->hdr.crc = cpu_to_le32(crc); + memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); + + /* fill in front and middle crc, footer */ + crc = crc32c(0, m->front.iov_base, m->front.iov_len); + con->out_msg->footer.front_crc = cpu_to_le32(crc); + if (m->middle) { + crc = crc32c(0, m->middle->vec.iov_base, + m->middle->vec.iov_len); + con->out_msg->footer.middle_crc = cpu_to_le32(crc); + } else + con->out_msg->footer.middle_crc = 0; + dout("%s front_crc %u middle_crc %u\n", __func__, + le32_to_cpu(con->out_msg->footer.front_crc), + le32_to_cpu(con->out_msg->footer.middle_crc)); + con->out_msg->footer.flags = 0; + + /* is there a data payload? */ + con->out_msg->footer.data_crc = 0; + if (m->data_length) { + prepare_message_data(con->out_msg, m->data_length); + con->out_more = 1; /* data + footer will follow */ + } else { + /* no, queue up footer too and be done */ + prepare_write_message_footer(con); + } + + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); +} + +/* + * Prepare an ack. + */ +static void prepare_write_ack(struct ceph_connection *con) +{ + dout("prepare_write_ack %p %llu -> %llu\n", con, + con->in_seq_acked, con->in_seq); + con->in_seq_acked = con->in_seq; + + con_out_kvec_reset(con); + + con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); + + con->out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof (con->out_temp_ack), + &con->out_temp_ack); + + con->out_more = 1; /* more will follow.. eventually.. */ + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); +} + +/* + * Prepare to share the seq during handshake + */ +static void prepare_write_seq(struct ceph_connection *con) +{ + dout("prepare_write_seq %p %llu -> %llu\n", con, + con->in_seq_acked, con->in_seq); + con->in_seq_acked = con->in_seq; + + con_out_kvec_reset(con); + + con->out_temp_ack = cpu_to_le64(con->in_seq_acked); + con_out_kvec_add(con, sizeof (con->out_temp_ack), + &con->out_temp_ack); + + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); +} + +/* + * Prepare to write keepalive byte. + */ +static void prepare_write_keepalive(struct ceph_connection *con) +{ + dout("prepare_write_keepalive %p\n", con); + con_out_kvec_reset(con); + if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { + struct timespec64 now; + + ktime_get_real_ts64(&now); + con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); + ceph_encode_timespec64(&con->out_temp_keepalive2, &now); + con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), + &con->out_temp_keepalive2); + } else { + con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); + } + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); +} + +/* + * Connection negotiation. + */ + +static int get_connect_authorizer(struct ceph_connection *con) +{ + struct ceph_auth_handshake *auth; + int auth_proto; + + if (!con->ops->get_authorizer) { + con->auth = NULL; + con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; + con->out_connect.authorizer_len = 0; + return 0; + } + + auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); + if (IS_ERR(auth)) + return PTR_ERR(auth); + + con->auth = auth; + con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); + con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); + return 0; +} + +/* + * We connected to a peer and are saying hello. + */ +static void prepare_write_banner(struct ceph_connection *con) +{ + con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); + con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), + &con->msgr->my_enc_addr); + + con->out_more = 0; + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); +} + +static void __prepare_write_connect(struct ceph_connection *con) +{ + con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); + if (con->auth) + con_out_kvec_add(con, con->auth->authorizer_buf_len, + con->auth->authorizer_buf); + + con->out_more = 0; + ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); +} + +static int prepare_write_connect(struct ceph_connection *con) +{ + unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); + int proto; + int ret; + + switch (con->peer_name.type) { + case CEPH_ENTITY_TYPE_MON: + proto = CEPH_MONC_PROTOCOL; + break; + case CEPH_ENTITY_TYPE_OSD: + proto = CEPH_OSDC_PROTOCOL; + break; + case CEPH_ENTITY_TYPE_MDS: + proto = CEPH_MDSC_PROTOCOL; + break; + default: + BUG(); + } + + dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, + con->connect_seq, global_seq, proto); + + con->out_connect.features = + cpu_to_le64(from_msgr(con->msgr)->supported_features); + con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); + con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); + con->out_connect.global_seq = cpu_to_le32(global_seq); + con->out_connect.protocol_version = cpu_to_le32(proto); + con->out_connect.flags = 0; + + ret = get_connect_authorizer(con); + if (ret) + return ret; + + __prepare_write_connect(con); + return 0; +} + +/* + * write as much of pending kvecs to the socket as we can. + * 1 -> done + * 0 -> socket full, but more to do + * <0 -> error + */ +static int write_partial_kvec(struct ceph_connection *con) +{ + int ret; + + dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); + while (con->out_kvec_bytes > 0) { + ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, + con->out_kvec_left, con->out_kvec_bytes, + con->out_more); + if (ret <= 0) + goto out; + con->out_kvec_bytes -= ret; + if (con->out_kvec_bytes == 0) + break; /* done */ + + /* account for full iov entries consumed */ + while (ret >= con->out_kvec_cur->iov_len) { + BUG_ON(!con->out_kvec_left); + ret -= con->out_kvec_cur->iov_len; + con->out_kvec_cur++; + con->out_kvec_left--; + } + /* and for a partially-consumed entry */ + if (ret) { + con->out_kvec_cur->iov_len -= ret; + con->out_kvec_cur->iov_base += ret; + } + } + con->out_kvec_left = 0; + ret = 1; +out: + dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, + con->out_kvec_bytes, con->out_kvec_left, ret); + return ret; /* done! */ +} + +/* + * Write as much message data payload as we can. If we finish, queue + * up the footer. + * 1 -> done, footer is now queued in out_kvec[]. + * 0 -> socket full, but more to do + * <0 -> error + */ +static int write_partial_message_data(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->out_msg; + struct ceph_msg_data_cursor *cursor = &msg->cursor; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); + int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; + u32 crc; + + dout("%s %p msg %p\n", __func__, con, msg); + + if (!msg->num_data_items) + return -EINVAL; + + /* + * Iterate through each page that contains data to be + * written, and send as much as possible for each. + * + * If we are calculating the data crc (the default), we will + * need to map the page. If we have no pages, they have + * been revoked, so use the zero page. + */ + crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; + while (cursor->total_resid) { + struct page *page; + size_t page_offset; + size_t length; + int ret; + + if (!cursor->resid) { + ceph_msg_data_advance(cursor, 0); + continue; + } + + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); + if (length == cursor->total_resid) + more = MSG_MORE; + ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, + more); + if (ret <= 0) { + if (do_datacrc) + msg->footer.data_crc = cpu_to_le32(crc); + + return ret; + } + if (do_datacrc && cursor->need_crc) + crc = ceph_crc32c_page(crc, page, page_offset, length); + ceph_msg_data_advance(cursor, (size_t)ret); + } + + dout("%s %p msg %p done\n", __func__, con, msg); + + /* prepare and queue up footer, too */ + if (do_datacrc) + msg->footer.data_crc = cpu_to_le32(crc); + else + msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; + con_out_kvec_reset(con); + prepare_write_message_footer(con); + + return 1; /* must return > 0 to indicate success */ +} + +/* + * write some zeros + */ +static int write_partial_skip(struct ceph_connection *con) +{ + int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; + int ret; + + dout("%s %p %d left\n", __func__, con, con->out_skip); + while (con->out_skip > 0) { + size_t size = min(con->out_skip, (int) PAGE_SIZE); + + if (size == con->out_skip) + more = MSG_MORE; + ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, + more); + if (ret <= 0) + goto out; + con->out_skip -= ret; + } + ret = 1; +out: + return ret; +} + +/* + * Prepare to read connection handshake, or an ack. + */ +static void prepare_read_banner(struct ceph_connection *con) +{ + dout("prepare_read_banner %p\n", con); + con->in_base_pos = 0; +} + +static void prepare_read_connect(struct ceph_connection *con) +{ + dout("prepare_read_connect %p\n", con); + con->in_base_pos = 0; +} + +static void prepare_read_ack(struct ceph_connection *con) +{ + dout("prepare_read_ack %p\n", con); + con->in_base_pos = 0; +} + +static void prepare_read_seq(struct ceph_connection *con) +{ + dout("prepare_read_seq %p\n", con); + con->in_base_pos = 0; + con->in_tag = CEPH_MSGR_TAG_SEQ; +} + +static void prepare_read_tag(struct ceph_connection *con) +{ + dout("prepare_read_tag %p\n", con); + con->in_base_pos = 0; + con->in_tag = CEPH_MSGR_TAG_READY; +} + +static void prepare_read_keepalive_ack(struct ceph_connection *con) +{ + dout("prepare_read_keepalive_ack %p\n", con); + con->in_base_pos = 0; +} + +/* + * Prepare to read a message. + */ +static int prepare_read_message(struct ceph_connection *con) +{ + dout("prepare_read_message %p\n", con); + BUG_ON(con->in_msg != NULL); + con->in_base_pos = 0; + con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; + return 0; +} + +static int read_partial(struct ceph_connection *con, + int end, int size, void *object) +{ + while (con->in_base_pos < end) { + int left = end - con->in_base_pos; + int have = size - left; + int ret = ceph_tcp_recvmsg(con->sock, object + have, left); + if (ret <= 0) + return ret; + con->in_base_pos += ret; + } + return 1; +} + +/* + * Read all or part of the connect-side handshake on a new connection + */ +static int read_partial_banner(struct ceph_connection *con) +{ + int size; + int end; + int ret; + + dout("read_partial_banner %p at %d\n", con, con->in_base_pos); + + /* peer's banner */ + size = strlen(CEPH_BANNER); + end = size; + ret = read_partial(con, end, size, con->in_banner); + if (ret <= 0) + goto out; + + size = sizeof (con->actual_peer_addr); + end += size; + ret = read_partial(con, end, size, &con->actual_peer_addr); + if (ret <= 0) + goto out; + ceph_decode_banner_addr(&con->actual_peer_addr); + + size = sizeof (con->peer_addr_for_me); + end += size; + ret = read_partial(con, end, size, &con->peer_addr_for_me); + if (ret <= 0) + goto out; + ceph_decode_banner_addr(&con->peer_addr_for_me); + +out: + return ret; +} + +static int read_partial_connect(struct ceph_connection *con) +{ + int size; + int end; + int ret; + + dout("read_partial_connect %p at %d\n", con, con->in_base_pos); + + size = sizeof (con->in_reply); + end = size; + ret = read_partial(con, end, size, &con->in_reply); + if (ret <= 0) + goto out; + + if (con->auth) { + size = le32_to_cpu(con->in_reply.authorizer_len); + if (size > con->auth->authorizer_reply_buf_len) { + pr_err("authorizer reply too big: %d > %zu\n", size, + con->auth->authorizer_reply_buf_len); + ret = -EINVAL; + goto out; + } + + end += size; + ret = read_partial(con, end, size, + con->auth->authorizer_reply_buf); + if (ret <= 0) + goto out; + } + + dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", + con, (int)con->in_reply.tag, + le32_to_cpu(con->in_reply.connect_seq), + le32_to_cpu(con->in_reply.global_seq)); +out: + return ret; +} + +/* + * Verify the hello banner looks okay. + */ +static int verify_hello(struct ceph_connection *con) +{ + if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + pr_err("connect to %s got bad banner\n", + ceph_pr_addr(&con->peer_addr)); + con->error_msg = "protocol error, bad banner"; + return -1; + } + return 0; +} + +static int process_banner(struct ceph_connection *con) +{ + struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; + + dout("process_banner on %p\n", con); + + if (verify_hello(con) < 0) + return -1; + + /* + * Make sure the other end is who we wanted. note that the other + * end may not yet know their ip address, so if it's 0.0.0.0, give + * them the benefit of the doubt. + */ + if (memcmp(&con->peer_addr, &con->actual_peer_addr, + sizeof(con->peer_addr)) != 0 && + !(ceph_addr_is_blank(&con->actual_peer_addr) && + con->actual_peer_addr.nonce == con->peer_addr.nonce)) { + pr_warn("wrong peer, want %s/%u, got %s/%u\n", + ceph_pr_addr(&con->peer_addr), + le32_to_cpu(con->peer_addr.nonce), + ceph_pr_addr(&con->actual_peer_addr), + le32_to_cpu(con->actual_peer_addr.nonce)); + con->error_msg = "wrong peer at address"; + return -1; + } + + /* + * did we learn our address? + */ + if (ceph_addr_is_blank(my_addr)) { + memcpy(&my_addr->in_addr, + &con->peer_addr_for_me.in_addr, + sizeof(con->peer_addr_for_me.in_addr)); + ceph_addr_set_port(my_addr, 0); + ceph_encode_my_addr(con->msgr); + dout("process_banner learned my addr is %s\n", + ceph_pr_addr(my_addr)); + } + + return 0; +} + +static int process_connect(struct ceph_connection *con) +{ + u64 sup_feat = from_msgr(con->msgr)->supported_features; + u64 req_feat = from_msgr(con->msgr)->required_features; + u64 server_feat = le64_to_cpu(con->in_reply.features); + int ret; + + dout("process_connect on %p tag %d\n", con, (int)con->in_tag); + + if (con->auth) { + int len = le32_to_cpu(con->in_reply.authorizer_len); + + /* + * Any connection that defines ->get_authorizer() + * should also define ->add_authorizer_challenge() and + * ->verify_authorizer_reply(). + * + * See get_connect_authorizer(). + */ + if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { + ret = con->ops->add_authorizer_challenge( + con, con->auth->authorizer_reply_buf, len); + if (ret < 0) + return ret; + + con_out_kvec_reset(con); + __prepare_write_connect(con); + prepare_read_connect(con); + return 0; + } + + if (len) { + ret = con->ops->verify_authorizer_reply(con); + if (ret < 0) { + con->error_msg = "bad authorize reply"; + return ret; + } + } + } + + switch (con->in_reply.tag) { + case CEPH_MSGR_TAG_FEATURES: + pr_err("%s%lld %s feature set mismatch," + " my %llx < server's %llx, missing %llx\n", + ENTITY_NAME(con->peer_name), + ceph_pr_addr(&con->peer_addr), + sup_feat, server_feat, server_feat & ~sup_feat); + con->error_msg = "missing required protocol features"; + return -1; + + case CEPH_MSGR_TAG_BADPROTOVER: + pr_err("%s%lld %s protocol version mismatch," + " my %d != server's %d\n", + ENTITY_NAME(con->peer_name), + ceph_pr_addr(&con->peer_addr), + le32_to_cpu(con->out_connect.protocol_version), + le32_to_cpu(con->in_reply.protocol_version)); + con->error_msg = "protocol version mismatch"; + return -1; + + case CEPH_MSGR_TAG_BADAUTHORIZER: + con->auth_retry++; + dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, + con->auth_retry); + if (con->auth_retry == 2) { + con->error_msg = "connect authorization failure"; + return -1; + } + con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; + prepare_read_connect(con); + break; + + case CEPH_MSGR_TAG_RESETSESSION: + /* + * If we connected with a large connect_seq but the peer + * has no record of a session with us (no connection, or + * connect_seq == 0), they will send RESETSESION to indicate + * that they must have reset their session, and may have + * dropped messages. + */ + dout("process_connect got RESET peer seq %u\n", + le32_to_cpu(con->in_reply.connect_seq)); + pr_info("%s%lld %s session reset\n", + ENTITY_NAME(con->peer_name), + ceph_pr_addr(&con->peer_addr)); + ceph_con_reset_session(con); + con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; + prepare_read_connect(con); + + /* Tell ceph about it. */ + mutex_unlock(&con->mutex); + if (con->ops->peer_reset) + con->ops->peer_reset(con); + mutex_lock(&con->mutex); + if (con->state != CEPH_CON_S_V1_CONNECT_MSG) + return -EAGAIN; + break; + + case CEPH_MSGR_TAG_RETRY_SESSION: + /* + * If we sent a smaller connect_seq than the peer has, try + * again with a larger value. + */ + dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", + le32_to_cpu(con->out_connect.connect_seq), + le32_to_cpu(con->in_reply.connect_seq)); + con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); + con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; + prepare_read_connect(con); + break; + + case CEPH_MSGR_TAG_RETRY_GLOBAL: + /* + * If we sent a smaller global_seq than the peer has, try + * again with a larger value. + */ + dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", + con->peer_global_seq, + le32_to_cpu(con->in_reply.global_seq)); + ceph_get_global_seq(con->msgr, + le32_to_cpu(con->in_reply.global_seq)); + con_out_kvec_reset(con); + ret = prepare_write_connect(con); + if (ret < 0) + return ret; + prepare_read_connect(con); + break; + + case CEPH_MSGR_TAG_SEQ: + case CEPH_MSGR_TAG_READY: + if (req_feat & ~server_feat) { + pr_err("%s%lld %s protocol feature mismatch," + " my required %llx > server's %llx, need %llx\n", + ENTITY_NAME(con->peer_name), + ceph_pr_addr(&con->peer_addr), + req_feat, server_feat, req_feat & ~server_feat); + con->error_msg = "missing required protocol features"; + return -1; + } + + WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); + con->state = CEPH_CON_S_OPEN; + con->auth_retry = 0; /* we authenticated; clear flag */ + con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); + con->connect_seq++; + con->peer_features = server_feat; + dout("process_connect got READY gseq %d cseq %d (%d)\n", + con->peer_global_seq, + le32_to_cpu(con->in_reply.connect_seq), + con->connect_seq); + WARN_ON(con->connect_seq != + le32_to_cpu(con->in_reply.connect_seq)); + + if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) + ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); + + con->delay = 0; /* reset backoff memory */ + + if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { + prepare_write_seq(con); + prepare_read_seq(con); + } else { + prepare_read_tag(con); + } + break; + + case CEPH_MSGR_TAG_WAIT: + /* + * If there is a connection race (we are opening + * connections to each other), one of us may just have + * to WAIT. This shouldn't happen if we are the + * client. + */ + con->error_msg = "protocol error, got WAIT as client"; + return -1; + + default: + con->error_msg = "protocol error, garbage tag during connect"; + return -1; + } + return 0; +} + +/* + * read (part of) an ack + */ +static int read_partial_ack(struct ceph_connection *con) +{ + int size = sizeof (con->in_temp_ack); + int end = size; + + return read_partial(con, end, size, &con->in_temp_ack); +} + +/* + * We can finally discard anything that's been acked. + */ +static void process_ack(struct ceph_connection *con) +{ + u64 ack = le64_to_cpu(con->in_temp_ack); + + if (con->in_tag == CEPH_MSGR_TAG_ACK) + ceph_con_discard_sent(con, ack); + else + ceph_con_discard_requeued(con, ack); + + prepare_read_tag(con); +} + +static int read_partial_message_section(struct ceph_connection *con, + struct kvec *section, + unsigned int sec_len, u32 *crc) +{ + int ret, left; + + BUG_ON(!section); + + while (section->iov_len < sec_len) { + BUG_ON(section->iov_base == NULL); + left = sec_len - section->iov_len; + ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + + section->iov_len, left); + if (ret <= 0) + return ret; + section->iov_len += ret; + } + if (section->iov_len == sec_len) + *crc = crc32c(0, section->iov_base, section->iov_len); + + return 1; +} + +static int read_partial_msg_data(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->in_msg; + struct ceph_msg_data_cursor *cursor = &msg->cursor; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); + struct page *page; + size_t page_offset; + size_t length; + u32 crc = 0; + int ret; + + if (!msg->num_data_items) + return -EIO; + + if (do_datacrc) + crc = con->in_data_crc; + while (cursor->total_resid) { + if (!cursor->resid) { + ceph_msg_data_advance(cursor, 0); + continue; + } + + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); + ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); + if (ret <= 0) { + if (do_datacrc) + con->in_data_crc = crc; + + return ret; + } + + if (do_datacrc) + crc = ceph_crc32c_page(crc, page, page_offset, ret); + ceph_msg_data_advance(cursor, (size_t)ret); + } + if (do_datacrc) + con->in_data_crc = crc; + + return 1; /* must return > 0 to indicate success */ +} + +/* + * read (part of) a message. + */ +static int read_partial_message(struct ceph_connection *con) +{ + struct ceph_msg *m = con->in_msg; + int size; + int end; + int ret; + unsigned int front_len, middle_len, data_len; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); + bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); + u64 seq; + u32 crc; + + dout("read_partial_message con %p msg %p\n", con, m); + + /* header */ + size = sizeof (con->in_hdr); + end = size; + ret = read_partial(con, end, size, &con->in_hdr); + if (ret <= 0) + return ret; + + crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); + if (cpu_to_le32(crc) != con->in_hdr.crc) { + pr_err("read_partial_message bad hdr crc %u != expected %u\n", + crc, con->in_hdr.crc); + return -EBADMSG; + } + + front_len = le32_to_cpu(con->in_hdr.front_len); + if (front_len > CEPH_MSG_MAX_FRONT_LEN) + return -EIO; + middle_len = le32_to_cpu(con->in_hdr.middle_len); + if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) + return -EIO; + data_len = le32_to_cpu(con->in_hdr.data_len); + if (data_len > CEPH_MSG_MAX_DATA_LEN) + return -EIO; + + /* verify seq# */ + seq = le64_to_cpu(con->in_hdr.seq); + if ((s64)seq - (s64)con->in_seq < 1) { + pr_info("skipping %s%lld %s seq %lld expected %lld\n", + ENTITY_NAME(con->peer_name), + ceph_pr_addr(&con->peer_addr), + seq, con->in_seq + 1); + con->in_base_pos = -front_len - middle_len - data_len - + sizeof_footer(con); + con->in_tag = CEPH_MSGR_TAG_READY; + return 1; + } else if ((s64)seq - (s64)con->in_seq > 1) { + pr_err("read_partial_message bad seq %lld expected %lld\n", + seq, con->in_seq + 1); + con->error_msg = "bad message sequence # for incoming message"; + return -EBADE; + } + + /* allocate message? */ + if (!con->in_msg) { + int skip = 0; + + dout("got hdr type %d front %d data %d\n", con->in_hdr.type, + front_len, data_len); + ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); + if (ret < 0) + return ret; + + BUG_ON(!con->in_msg ^ skip); + if (skip) { + /* skip this message */ + dout("alloc_msg said skip message\n"); + con->in_base_pos = -front_len - middle_len - data_len - + sizeof_footer(con); + con->in_tag = CEPH_MSGR_TAG_READY; + con->in_seq++; + return 1; + } + + BUG_ON(!con->in_msg); + BUG_ON(con->in_msg->con != con); + m = con->in_msg; + m->front.iov_len = 0; /* haven't read it yet */ + if (m->middle) + m->middle->vec.iov_len = 0; + + /* prepare for data payload, if any */ + + if (data_len) + prepare_message_data(con->in_msg, data_len); + } + + /* front */ + ret = read_partial_message_section(con, &m->front, front_len, + &con->in_front_crc); + if (ret <= 0) + return ret; + + /* middle */ + if (m->middle) { + ret = read_partial_message_section(con, &m->middle->vec, + middle_len, + &con->in_middle_crc); + if (ret <= 0) + return ret; + } + + /* (page) data */ + if (data_len) { + ret = read_partial_msg_data(con); + if (ret <= 0) + return ret; + } + + /* footer */ + size = sizeof_footer(con); + end += size; + ret = read_partial(con, end, size, &m->footer); + if (ret <= 0) + return ret; + + if (!need_sign) { + m->footer.flags = m->old_footer.flags; + m->footer.sig = 0; + } + + dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", + m, front_len, m->footer.front_crc, middle_len, + m->footer.middle_crc, data_len, m->footer.data_crc); + + /* crc ok? */ + if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { + pr_err("read_partial_message %p front crc %u != exp. %u\n", + m, con->in_front_crc, m->footer.front_crc); + return -EBADMSG; + } + if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { + pr_err("read_partial_message %p middle crc %u != exp %u\n", + m, con->in_middle_crc, m->footer.middle_crc); + return -EBADMSG; + } + if (do_datacrc && + (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && + con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { + pr_err("read_partial_message %p data crc %u != exp. %u\n", m, + con->in_data_crc, le32_to_cpu(m->footer.data_crc)); + return -EBADMSG; + } + + if (need_sign && con->ops->check_message_signature && + con->ops->check_message_signature(m)) { + pr_err("read_partial_message %p signature check failed\n", m); + return -EBADMSG; + } + + return 1; /* done! */ +} + +static int read_keepalive_ack(struct ceph_connection *con) +{ + struct ceph_timespec ceph_ts; + size_t size = sizeof(ceph_ts); + int ret = read_partial(con, size, size, &ceph_ts); + if (ret <= 0) + return ret; + ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); + prepare_read_tag(con); + return 1; +} + +/* + * Read what we can from the socket. + */ +int ceph_con_v1_try_read(struct ceph_connection *con) +{ + int ret = -1; + +more: + dout("try_read start %p state %d\n", con, con->state); + if (con->state != CEPH_CON_S_V1_BANNER && + con->state != CEPH_CON_S_V1_CONNECT_MSG && + con->state != CEPH_CON_S_OPEN) + return 0; + + BUG_ON(!con->sock); + + dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, + con->in_base_pos); + + if (con->state == CEPH_CON_S_V1_BANNER) { + ret = read_partial_banner(con); + if (ret <= 0) + goto out; + ret = process_banner(con); + if (ret < 0) + goto out; + + con->state = CEPH_CON_S_V1_CONNECT_MSG; + + /* + * Received banner is good, exchange connection info. + * Do not reset out_kvec, as sending our banner raced + * with receiving peer banner after connect completed. + */ + ret = prepare_write_connect(con); + if (ret < 0) + goto out; + prepare_read_connect(con); + + /* Send connection info before awaiting response */ + goto out; + } + + if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { + ret = read_partial_connect(con); + if (ret <= 0) + goto out; + ret = process_connect(con); + if (ret < 0) + goto out; + goto more; + } + + WARN_ON(con->state != CEPH_CON_S_OPEN); + + if (con->in_base_pos < 0) { + /* + * skipping + discarding content. + */ + ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); + if (ret <= 0) + goto out; + dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); + con->in_base_pos += ret; + if (con->in_base_pos) + goto more; + } + if (con->in_tag == CEPH_MSGR_TAG_READY) { + /* + * what's next? + */ + ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); + if (ret <= 0) + goto out; + dout("try_read got tag %d\n", (int)con->in_tag); + switch (con->in_tag) { + case CEPH_MSGR_TAG_MSG: + prepare_read_message(con); + break; + case CEPH_MSGR_TAG_ACK: + prepare_read_ack(con); + break; + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + prepare_read_keepalive_ack(con); + break; + case CEPH_MSGR_TAG_CLOSE: + ceph_con_close_socket(con); + con->state = CEPH_CON_S_CLOSED; + goto out; + default: + goto bad_tag; + } + } + if (con->in_tag == CEPH_MSGR_TAG_MSG) { + ret = read_partial_message(con); + if (ret <= 0) { + switch (ret) { + case -EBADMSG: + con->error_msg = "bad crc/signature"; + fallthrough; + case -EBADE: + ret = -EIO; + break; + case -EIO: + con->error_msg = "io error"; + break; + } + goto out; + } + if (con->in_tag == CEPH_MSGR_TAG_READY) + goto more; + ceph_con_process_message(con); + if (con->state == CEPH_CON_S_OPEN) + prepare_read_tag(con); + goto more; + } + if (con->in_tag == CEPH_MSGR_TAG_ACK || + con->in_tag == CEPH_MSGR_TAG_SEQ) { + /* + * the final handshake seq exchange is semantically + * equivalent to an ACK + */ + ret = read_partial_ack(con); + if (ret <= 0) + goto out; + process_ack(con); + goto more; + } + if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ret = read_keepalive_ack(con); + if (ret <= 0) + goto out; + goto more; + } + +out: + dout("try_read done on %p ret %d\n", con, ret); + return ret; + +bad_tag: + pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); + con->error_msg = "protocol error, garbage tag"; + ret = -1; + goto out; +} + +/* + * Write something to the socket. Called in a worker thread when the + * socket appears to be writeable and we have something ready to send. + */ +int ceph_con_v1_try_write(struct ceph_connection *con) +{ + int ret = 1; + + dout("try_write start %p state %d\n", con, con->state); + if (con->state != CEPH_CON_S_PREOPEN && + con->state != CEPH_CON_S_V1_BANNER && + con->state != CEPH_CON_S_V1_CONNECT_MSG && + con->state != CEPH_CON_S_OPEN) + return 0; + + /* open the socket first? */ + if (con->state == CEPH_CON_S_PREOPEN) { + BUG_ON(con->sock); + con->state = CEPH_CON_S_V1_BANNER; + + con_out_kvec_reset(con); + prepare_write_banner(con); + prepare_read_banner(con); + + BUG_ON(con->in_msg); + con->in_tag = CEPH_MSGR_TAG_READY; + dout("try_write initiating connect on %p new state %d\n", + con, con->state); + ret = ceph_tcp_connect(con); + if (ret < 0) { + con->error_msg = "connect error"; + goto out; + } + } + +more: + dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); + BUG_ON(!con->sock); + + /* kvec data queued? */ + if (con->out_kvec_left) { + ret = write_partial_kvec(con); + if (ret <= 0) + goto out; + } + if (con->out_skip) { + ret = write_partial_skip(con); + if (ret <= 0) + goto out; + } + + /* msg pages? */ + if (con->out_msg) { + if (con->out_msg_done) { + ceph_msg_put(con->out_msg); + con->out_msg = NULL; /* we're done with this one */ + goto do_next; + } + + ret = write_partial_message_data(con); + if (ret == 1) + goto more; /* we need to send the footer, too! */ + if (ret == 0) + goto out; + if (ret < 0) { + dout("try_write write_partial_message_data err %d\n", + ret); + goto out; + } + } + +do_next: + if (con->state == CEPH_CON_S_OPEN) { + if (ceph_con_flag_test_and_clear(con, + CEPH_CON_F_KEEPALIVE_PENDING)) { + prepare_write_keepalive(con); + goto more; + } + /* is anything else pending? */ + if (!list_empty(&con->out_queue)) { + prepare_write_message(con); + goto more; + } + if (con->in_seq > con->in_seq_acked) { + prepare_write_ack(con); + goto more; + } + } + + /* Nothing to do! */ + ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); + dout("try_write nothing else to write.\n"); + ret = 0; +out: + dout("try_write done on %p ret %d\n", con, ret); + return ret; +} + +void ceph_con_v1_revoke(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->out_msg; + + WARN_ON(con->out_skip); + /* footer */ + if (con->out_msg_done) { + con->out_skip += con_out_kvec_skip(con); + } else { + WARN_ON(!msg->data_length); + con->out_skip += sizeof_footer(con); + } + /* data, middle, front */ + if (msg->data_length) + con->out_skip += msg->cursor.total_resid; + if (msg->middle) + con->out_skip += con_out_kvec_skip(con); + con->out_skip += con_out_kvec_skip(con); + + dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, + con->out_kvec_bytes, con->out_skip); +} + +void ceph_con_v1_revoke_incoming(struct ceph_connection *con) +{ + unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); + unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); + unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); + + /* skip rest of message */ + con->in_base_pos = con->in_base_pos - + sizeof(struct ceph_msg_header) - + front_len - + middle_len - + data_len - + sizeof(struct ceph_msg_footer); + + con->in_tag = CEPH_MSGR_TAG_READY; + con->in_seq++; + + dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); +} + +bool ceph_con_v1_opened(struct ceph_connection *con) +{ + return con->connect_seq; +} + +void ceph_con_v1_reset_session(struct ceph_connection *con) +{ + con->connect_seq = 0; + con->peer_global_seq = 0; +} + +void ceph_con_v1_reset_protocol(struct ceph_connection *con) +{ + con->out_skip = 0; +} |