From 7971bd92baf729fcebe04d7330ac22dc668d0261 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 1 May 2013 21:15:58 -0700 Subject: ceph: revert commit 22cddde104 commit 22cddde104 breaks the atomicity of write operation, it also introduces a deadlock between write and truncate. Signed-off-by: Yan, Zheng Reviewed-by: Greg Farnum Conflicts: fs/ceph/addr.c --- fs/ceph/addr.c | 51 ++++----------------------------------------------- 1 file changed, 4 insertions(+), 47 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index a60ea977af6f..2a571fb4803b 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1067,51 +1067,23 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, struct page **pagep, void **fsdata) { struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_file_info *fi = file->private_data; struct page *page; pgoff_t index = pos >> PAGE_CACHE_SHIFT; - int r, want, got = 0; - - if (fi->fmode & CEPH_FILE_MODE_LAZY) - want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; - else - want = CEPH_CAP_FILE_BUFFER; - - dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n", - inode, ceph_vinop(inode), pos, len, inode->i_size); - r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len); - if (r < 0) - return r; - dout("write_begin %p %llx.%llx %llu~%u got cap refs on %s\n", - inode, ceph_vinop(inode), pos, len, ceph_cap_string(got)); - if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) { - ceph_put_cap_refs(ci, got); - return -EAGAIN; - } + int r; do { /* get a page */ page = grab_cache_page_write_begin(mapping, index, 0); - if (!page) { - r = -ENOMEM; - break; - } + if (!page) + return -ENOMEM; + *pagep = page; dout("write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len); r = ceph_update_writeable_page(file, pos, len, page); - if (r) - page_cache_release(page); } while (r == -EAGAIN); - if (r) { - ceph_put_cap_refs(ci, got); - } else { - *pagep = page; - *(int *)fsdata = got; - } return r; } @@ -1125,12 +1097,10 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, struct page *page, void *fsdata) { struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_mds_client *mdsc = fsc->mdsc; unsigned from = pos & (PAGE_CACHE_SIZE - 1); int check_cap = 0; - int got = (unsigned long)fsdata; dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, inode, page, (int)pos, (int)copied, (int)len); @@ -1153,19 +1123,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, up_read(&mdsc->snap_rwsem); page_cache_release(page); - if (copied > 0) { - int dirty; - spin_lock(&ci->i_ceph_lock); - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); - spin_unlock(&ci->i_ceph_lock); - if (dirty) - __mark_inode_dirty(inode, dirty); - } - - dout("write_end %p %llx.%llx %llu~%u dropping cap refs on %s\n", - inode, ceph_vinop(inode), pos, len, ceph_cap_string(got)); - ceph_put_cap_refs(ci, got); - if (check_cap) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); -- cgit v1.2.3 From cf7b7e1492e97dd0c44479239742eb4cb752eeed Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Mar 2013 18:00:15 -0600 Subject: ceph: use calc_pages_for() in start_read() There's a spot that computes the number of pages to allocate for a page-aligned length by just shifting it. Use calc_pages_for() instead, to be consistent with usage everywhere else. The result is the same. The reason for this is to make it clearer in an upcoming patch that this calculation is duplicated. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 2a571fb4803b..e53f24b15b12 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -314,7 +314,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) return PTR_ERR(req); /* build page vector */ - nr_pages = len >> PAGE_CACHE_SHIFT; + nr_pages = calc_pages_for(0, len); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); ret = -ENOMEM; if (!pages) -- cgit v1.2.3 From 153e5167e0e237faaefb7adf82db5748c1452d73 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 1 Mar 2013 18:00:15 -0600 Subject: libceph: don't assign page info in ceph_osdc_new_request() Currently ceph_osdc_new_request() assigns an osd request's r_num_pages and r_alignment fields. The only thing it does after that is call ceph_osdc_build_request(), and that doesn't need those fields to be assigned. Move the assignment of those fields out of ceph_osdc_new_request() and into its caller. As a result, the page_align parameter is no longer used, so get rid of it. Note that in ceph_sync_write(), the value for req->r_num_pages had already been calculated earlier (as num_pages, and fortunately it was computed the same way). So don't bother recomputing it, but because it's not needed earlier, move that calculation after the call to ceph_osdc_new_request(). Hold off making the assignment to r_alignment, doing it instead r_pages and r_num_pages are getting set. Similarly, in start_read(), nr_pages already holds the number of pages in the array (and is calculated the same way), so there's no need to recompute it. Move the assignment of the page alignment down with the others there as well. This and the next few patches are preparation work for: http://tracker.ceph.com/issues/4127 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 7 +++++-- fs/ceph/file.c | 9 +++++---- include/linux/ceph/osd_client.h | 2 +- net/ceph/osd_client.c | 19 ++++++++----------- 4 files changed, 19 insertions(+), 18 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e53f24b15b12..e324222acc82 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -309,7 +309,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, ci->i_truncate_seq, ci->i_truncate_size, - NULL, false, 0); + NULL, false); if (IS_ERR(req)) return PTR_ERR(req); @@ -338,6 +338,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } req->r_pages = pages; req->r_num_pages = nr_pages; + req->r_page_alignment = 0; req->r_callback = finish_read; req->r_inode = inode; @@ -820,7 +821,7 @@ get_more_pages: snapc, do_sync, ci->i_truncate_seq, ci->i_truncate_size, - &inode->i_mtime, true, 0); + &inode->i_mtime, true); if (IS_ERR(req)) { rc = PTR_ERR(req); @@ -828,6 +829,8 @@ get_more_pages: break; } + req->r_num_pages = calc_pages_for(0, len); + req->r_page_alignment = 0; max_pages = req->r_num_pages; alloc_page_vec(fsc, req); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 146ac9040141..f2754cdb5a03 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -527,19 +527,19 @@ more: buf_align = (unsigned long)data & ~PAGE_MASK; len = left; - /* write from beginning of first page, regardless of io alignment */ - page_align = file->f_flags & O_DIRECT ? buf_align : io_align; - num_pages = calc_pages_for(page_align, len); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), pos, &len, CEPH_OSD_OP_WRITE, flags, ci->i_snap_realm->cached_context, do_sync, ci->i_truncate_seq, ci->i_truncate_size, - &mtime, false, page_align); + &mtime, false); if (IS_ERR(req)) return PTR_ERR(req); + /* write from beginning of first page, regardless of io alignment */ + page_align = file->f_flags & O_DIRECT ? buf_align : io_align; + num_pages = calc_pages_for(page_align, len); if (file->f_flags & O_DIRECT) { pages = ceph_get_direct_page_vector(data, num_pages, false); if (IS_ERR(pages)) { @@ -573,6 +573,7 @@ more: } req->r_pages = pages; req->r_num_pages = num_pages; + req->r_page_alignment = page_align; req->r_inode = inode; ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index ec33588194ef..803a9db0b475 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -247,7 +247,7 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, int do_sync, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, int page_align); + bool use_mempool); extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, struct ceph_osd_request *req); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 38d09d13bb15..de427cc7f6d0 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -432,8 +432,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u32 truncate_seq, u64 truncate_size, struct timespec *mtime, - bool use_mempool, - int page_align) + bool use_mempool) { struct ceph_osd_req_op ops[2]; struct ceph_osd_request *req; @@ -470,11 +469,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); req->r_oid_len = strlen(req->r_oid); - /* The alignment may differ from the natural (file) alignment */ - - req->r_num_pages = calc_pages_for(page_align, *plen); - req->r_page_alignment = page_align; - ceph_osdc_build_request(req, off, *plen, num_op, ops, snapc, vino.snap, mtime); @@ -1945,12 +1939,14 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, req = ceph_osdc_new_request(osdc, layout, vino, off, plen, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, 0, truncate_seq, truncate_size, NULL, - false, page_align); + false); if (IS_ERR(req)) return PTR_ERR(req); /* it may be a short read due to an object boundary */ req->r_pages = pages; + req->r_num_pages = calc_pages_for(page_align, *plen); + req->r_page_alignment = page_align; dout("readpages final extent is %llu~%llu (%d pages align %d)\n", off, *plen, req->r_num_pages, page_align); @@ -1986,14 +1982,15 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, 0, truncate_seq, truncate_size, mtime, - true, page_align); + true); if (IS_ERR(req)) return PTR_ERR(req); /* it may be a short write due to an object boundary */ req->r_pages = pages; - dout("writepages %llu~%llu (%d pages)\n", off, len, - req->r_num_pages); + req->r_num_pages = calc_pages_for(page_align, len); + req->r_page_alignment = page_align; + dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); rc = ceph_osdc_start_request(osdc, req, true); if (!rc) -- cgit v1.2.3 From 2794a82a11cfeae0890741b18b0049ddb55ce646 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Feb 2013 12:16:43 -0600 Subject: libceph: separate osd request data info Pull the fields in an osd request structure that define the data for the request out into a separate structure. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 8 +++--- fs/ceph/addr.c | 55 +++++++++++++++++++++-------------------- fs/ceph/file.c | 8 +++--- include/linux/ceph/osd_client.h | 24 ++++++++++++------ net/ceph/osd_client.c | 44 ++++++++++++++++----------------- 5 files changed, 74 insertions(+), 65 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b7b7a88d9f68..0e814dfda48e 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1425,12 +1425,12 @@ static struct ceph_osd_request *rbd_osd_req_create( break; /* Nothing to do */ case OBJ_REQUEST_BIO: rbd_assert(obj_request->bio_list != NULL); - osd_req->r_bio = obj_request->bio_list; + osd_req->r_data.bio = obj_request->bio_list; break; case OBJ_REQUEST_PAGES: - osd_req->r_pages = obj_request->pages; - osd_req->r_num_pages = obj_request->page_count; - osd_req->r_page_alignment = offset & ~PAGE_MASK; + osd_req->r_data.pages = obj_request->pages; + osd_req->r_data.num_pages = obj_request->page_count; + osd_req->r_data.alignment = offset & ~PAGE_MASK; break; } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e324222acc82..3a1a77b0ae9f 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -243,8 +243,8 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) { - struct page *page = req->r_pages[i]; + for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { + struct page *page = req->r_data.pages[i]; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ @@ -258,7 +258,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) unlock_page(page); page_cache_release(page); } - kfree(req->r_pages); + kfree(req->r_data.pages); } static void ceph_unlock_page_vector(struct page **pages, int num_pages) @@ -336,9 +336,9 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - req->r_pages = pages; - req->r_num_pages = nr_pages; - req->r_page_alignment = 0; + req->r_data.pages = pages; + req->r_data.num_pages = nr_pages; + req->r_data.alignment = 0; req->r_callback = finish_read; req->r_inode = inode; @@ -374,7 +374,8 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) >> PAGE_SHIFT; - dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages, + dout("readpages %p file %p nr_pages %d max %d\n", inode, + file, nr_pages, max); while (!list_empty(page_list)) { rc = start_read(inode, page_list, max); @@ -567,7 +568,7 @@ static void writepages_finish(struct ceph_osd_request *req, * raced with a truncation and was adjusted at the osd, * so don't believe the reply. */ - wrote = req->r_num_pages; + wrote = req->r_data.num_pages; } else { wrote = 0; mapping_set_error(mapping, rc); @@ -576,8 +577,8 @@ static void writepages_finish(struct ceph_osd_request *req, inode, rc, bytes, wrote); /* clean all pages */ - for (i = 0; i < req->r_num_pages; i++) { - page = req->r_pages[i]; + for (i = 0; i < req->r_data.num_pages; i++) { + page = req->r_data.pages[i]; BUG_ON(!page); WARN_ON(!PageUptodate(page)); @@ -606,31 +607,31 @@ static void writepages_finish(struct ceph_osd_request *req, unlock_page(page); } dout("%p wrote+cleaned %d pages\n", inode, wrote); - ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc); + ceph_put_wrbuffer_cap_refs(ci, req->r_data.num_pages, snapc); - ceph_release_pages(req->r_pages, req->r_num_pages); - if (req->r_pages_from_pool) - mempool_free(req->r_pages, + ceph_release_pages(req->r_data.pages, req->r_data.num_pages); + if (req->r_data.pages_from_pool) + mempool_free(req->r_data.pages, ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); else - kfree(req->r_pages); + kfree(req->r_data.pages); ceph_osdc_put_request(req); } /* * allocate a page vec, either directly, or if necessary, via a the - * mempool. we avoid the mempool if we can because req->r_num_pages + * mempool. we avoid the mempool if we can because req->r_data.num_pages * may be less than the maximum write size. */ static void alloc_page_vec(struct ceph_fs_client *fsc, struct ceph_osd_request *req) { - req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages, + req->r_data.pages = kmalloc(sizeof(struct page *) * req->r_data.num_pages, GFP_NOFS); - if (!req->r_pages) { - req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); - req->r_pages_from_pool = 1; - WARN_ON(!req->r_pages); + if (!req->r_data.pages) { + req->r_data.pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); + req->r_data.pages_from_pool = 1; + WARN_ON(!req->r_data.pages); } } @@ -829,9 +830,9 @@ get_more_pages: break; } - req->r_num_pages = calc_pages_for(0, len); - req->r_page_alignment = 0; - max_pages = req->r_num_pages; + req->r_data.num_pages = calc_pages_for(0, len); + req->r_data.alignment = 0; + max_pages = req->r_data.num_pages; alloc_page_vec(fsc, req); req->r_callback = writepages_finish; @@ -853,7 +854,7 @@ get_more_pages: } set_page_writeback(page); - req->r_pages[locked_pages] = page; + req->r_data.pages[locked_pages] = page; locked_pages++; next = page->index + 1; } @@ -883,14 +884,14 @@ get_more_pages: } /* submit the write */ - offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT; + offset = req->r_data.pages[0]->index << PAGE_CACHE_SHIFT; len = min((snap_size ? snap_size : i_size_read(inode)) - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); /* revise final length, page count */ - req->r_num_pages = locked_pages; + req->r_data.num_pages = locked_pages; req->r_request_ops[0].extent.length = cpu_to_le64(len); req->r_request_ops[0].payload_len = cpu_to_le32(len); req->r_request->hdr.data_len = cpu_to_le32(len); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index f2754cdb5a03..d35fc05af06f 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -568,12 +568,12 @@ more: if ((file->f_flags & O_SYNC) == 0) { /* get a second commit callback */ req->r_safe_callback = sync_write_commit; - req->r_own_pages = 1; + req->r_data.own_pages = 1; } } - req->r_pages = pages; - req->r_num_pages = num_pages; - req->r_page_alignment = page_align; + req->r_data.pages = pages; + req->r_data.num_pages = num_pages; + req->r_data.alignment = page_align; req->r_inode = inode; ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 803a9db0b475..600b8278d11e 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -50,6 +50,21 @@ struct ceph_osd { #define CEPH_OSD_MAX_OP 10 +struct ceph_osd_data { + struct { + struct { + struct page **pages; + u32 num_pages; + u32 alignment; + bool pages_from_pool; + bool own_pages; + }; +#ifdef CONFIG_BLOCK + struct bio *bio; +#endif /* CONFIG_BLOCK */ + }; +}; + /* an in-flight request */ struct ceph_osd_request { u64 r_tid; /* unique for this client */ @@ -105,15 +120,8 @@ struct ceph_osd_request { struct ceph_file_layout r_file_layout; struct ceph_snap_context *r_snapc; /* snap context for writes */ - unsigned r_num_pages; /* size of page array (follows) */ - unsigned r_page_alignment; /* io offset in first page */ - struct page **r_pages; /* pages for data payload */ - int r_pages_from_pool; - int r_own_pages; /* if true, i own page list */ -#ifdef CONFIG_BLOCK - struct bio *r_bio; /* instead of pages */ -#endif + struct ceph_osd_data r_data; struct ceph_pagelist r_trail; /* trailing part of the data */ }; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index de427cc7f6d0..1f8c7a7c203b 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -122,9 +122,9 @@ void ceph_osdc_release_request(struct kref *kref) } if (req->r_reply) ceph_msg_put(req->r_reply); - if (req->r_own_pages) - ceph_release_page_vector(req->r_pages, - req->r_num_pages); + if (req->r_data.own_pages) + ceph_release_page_vector(req->r_data.pages, + req->r_data.num_pages); ceph_put_snap_context(req->r_snapc); ceph_pagelist_release(&req->r_trail); if (req->r_mempool) @@ -1739,11 +1739,11 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, { int rc = 0; - req->r_request->pages = req->r_pages; - req->r_request->page_count = req->r_num_pages; - req->r_request->page_alignment = req->r_page_alignment; + req->r_request->pages = req->r_data.pages; + req->r_request->page_count = req->r_data.num_pages; + req->r_request->page_alignment = req->r_data.alignment; #ifdef CONFIG_BLOCK - req->r_request->bio = req->r_bio; + req->r_request->bio = req->r_data.bio; #endif req->r_request->trail = &req->r_trail; @@ -1944,12 +1944,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, return PTR_ERR(req); /* it may be a short read due to an object boundary */ - req->r_pages = pages; - req->r_num_pages = calc_pages_for(page_align, *plen); - req->r_page_alignment = page_align; + req->r_data.pages = pages; + req->r_data.num_pages = calc_pages_for(page_align, *plen); + req->r_data.alignment = page_align; dout("readpages final extent is %llu~%llu (%d pages align %d)\n", - off, *plen, req->r_num_pages, page_align); + off, *plen, req->r_data.num_pages, page_align); rc = ceph_osdc_start_request(osdc, req, false); if (!rc) @@ -1987,10 +1987,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return PTR_ERR(req); /* it may be a short write due to an object boundary */ - req->r_pages = pages; - req->r_num_pages = calc_pages_for(page_align, len); - req->r_page_alignment = page_align; - dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_num_pages); + req->r_data.pages = pages; + req->r_data.num_pages = calc_pages_for(page_align, len); + req->r_data.alignment = page_align; + dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_data.num_pages); rc = ceph_osdc_start_request(osdc, req, true); if (!rc) @@ -2083,22 +2083,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_get(req->r_reply); if (data_len > 0) { - int want = calc_pages_for(req->r_page_alignment, data_len); + int want = calc_pages_for(req->r_data.alignment, data_len); - if (req->r_pages && unlikely(req->r_num_pages < want)) { + if (req->r_data.pages && unlikely(req->r_data.num_pages < want)) { pr_warning("tid %lld reply has %d bytes %d pages, we" " had only %d pages ready\n", tid, data_len, - want, req->r_num_pages); + want, req->r_data.num_pages); *skip = 1; ceph_msg_put(m); m = NULL; goto out; } - m->pages = req->r_pages; - m->page_count = req->r_num_pages; - m->page_alignment = req->r_page_alignment; + m->pages = req->r_data.pages; + m->page_count = req->r_data.num_pages; + m->page_alignment = req->r_data.alignment; #ifdef CONFIG_BLOCK - m->bio = req->r_bio; + m->bio = req->r_data.bio; #endif } *skip = 0; -- cgit v1.2.3 From 2ac2b7a6d4976bd6b5dc0751aa77d12d48d3ac4c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Feb 2013 12:16:43 -0600 Subject: libceph: distinguish page and bio requests An osd request uses either pages or a bio list for its data. Use a union to record information about the two, and add a data type tag to select between them. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 4 +++ fs/ceph/addr.c | 4 +++ fs/ceph/file.c | 1 + include/linux/ceph/osd_client.h | 11 +++++++- net/ceph/osd_client.c | 56 ++++++++++++++++++++++++++--------------- 5 files changed, 55 insertions(+), 21 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 0e814dfda48e..f189bc2909b0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1425,12 +1425,16 @@ static struct ceph_osd_request *rbd_osd_req_create( break; /* Nothing to do */ case OBJ_REQUEST_BIO: rbd_assert(obj_request->bio_list != NULL); + osd_req->r_data.type = CEPH_OSD_DATA_TYPE_BIO; osd_req->r_data.bio = obj_request->bio_list; break; case OBJ_REQUEST_PAGES: + osd_req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; osd_req->r_data.pages = obj_request->pages; osd_req->r_data.num_pages = obj_request->page_count; osd_req->r_data.alignment = offset & ~PAGE_MASK; + osd_req->r_data.pages_from_pool = false; + osd_req->r_data.own_pages = false; break; } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 3a1a77b0ae9f..276fe96f12e3 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -243,6 +243,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ + BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { struct page *page = req->r_data.pages[i]; @@ -336,6 +337,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } + req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data.pages = pages; req->r_data.num_pages = nr_pages; req->r_data.alignment = 0; @@ -561,6 +563,7 @@ static void writepages_finish(struct ceph_osd_request *req, long writeback_stat; unsigned issued = ceph_caps_issued(ci); + BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); if (rc >= 0) { /* * Assume we wrote the pages we originally sent. The @@ -830,6 +833,7 @@ get_more_pages: break; } + req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data.num_pages = calc_pages_for(0, len); req->r_data.alignment = 0; max_pages = req->r_data.num_pages; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index d35fc05af06f..3643a386ab23 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -571,6 +571,7 @@ more: req->r_data.own_pages = 1; } } + req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data.pages = pages; req->r_data.num_pages = num_pages; req->r_data.alignment = page_align; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 600b8278d11e..56604b33dc3c 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -50,8 +50,17 @@ struct ceph_osd { #define CEPH_OSD_MAX_OP 10 +enum ceph_osd_data_type { + CEPH_OSD_DATA_TYPE_NONE, + CEPH_OSD_DATA_TYPE_PAGES, +#ifdef CONFIG_BLOCK + CEPH_OSD_DATA_TYPE_BIO, +#endif /* CONFIG_BLOCK */ +}; + struct ceph_osd_data { - struct { + enum ceph_osd_data_type type; + union { struct { struct page **pages; u32 num_pages; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1f8c7a7c203b..591e1b0cccbe 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -122,7 +122,8 @@ void ceph_osdc_release_request(struct kref *kref) } if (req->r_reply) ceph_msg_put(req->r_reply); - if (req->r_data.own_pages) + if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES && + req->r_data.own_pages) ceph_release_page_vector(req->r_data.pages, req->r_data.num_pages); ceph_put_snap_context(req->r_snapc); @@ -188,6 +189,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } req->r_reply = msg; + req->r_data.type = CEPH_OSD_DATA_TYPE_NONE; ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ @@ -1739,12 +1741,17 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, { int rc = 0; - req->r_request->pages = req->r_data.pages; - req->r_request->page_count = req->r_data.num_pages; - req->r_request->page_alignment = req->r_data.alignment; + if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) { + req->r_request->pages = req->r_data.pages; + req->r_request->page_count = req->r_data.num_pages; + req->r_request->page_alignment = req->r_data.alignment; #ifdef CONFIG_BLOCK - req->r_request->bio = req->r_data.bio; + } else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) { + req->r_request->bio = req->r_data.bio; #endif + } else { + pr_err("unknown request data type %d\n", req->r_data.type); + } req->r_request->trail = &req->r_trail; register_request(osdc, req); @@ -1944,6 +1951,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, return PTR_ERR(req); /* it may be a short read due to an object boundary */ + req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data.pages = pages; req->r_data.num_pages = calc_pages_for(page_align, *plen); req->r_data.alignment = page_align; @@ -1987,6 +1995,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return PTR_ERR(req); /* it may be a short write due to an object boundary */ + req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data.pages = pages; req->r_data.num_pages = calc_pages_for(page_align, len); req->r_data.alignment = page_align; @@ -2083,23 +2092,30 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_get(req->r_reply); if (data_len > 0) { - int want = calc_pages_for(req->r_data.alignment, data_len); - - if (req->r_data.pages && unlikely(req->r_data.num_pages < want)) { - pr_warning("tid %lld reply has %d bytes %d pages, we" - " had only %d pages ready\n", tid, data_len, - want, req->r_data.num_pages); - *skip = 1; - ceph_msg_put(m); - m = NULL; - goto out; - } - m->pages = req->r_data.pages; - m->page_count = req->r_data.num_pages; - m->page_alignment = req->r_data.alignment; + if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) { + int want; + + want = calc_pages_for(req->r_data.alignment, data_len); + if (req->r_data.pages && + unlikely(req->r_data.num_pages < want)) { + + pr_warning("tid %lld reply has %d bytes %d " + "pages, we had only %d pages ready\n", + tid, data_len, want, + req->r_data.num_pages); + *skip = 1; + ceph_msg_put(m); + m = NULL; + goto out; + } + m->pages = req->r_data.pages; + m->page_count = req->r_data.num_pages; + m->page_alignment = req->r_data.alignment; #ifdef CONFIG_BLOCK - m->bio = req->r_data.bio; + } else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) { + m->bio = req->r_data.bio; #endif + } } *skip = 0; req->r_con_filling_msg = con->ops->get(con); -- cgit v1.2.3 From 0fff87ec798abdb4a99f01cbb0197266bb68c5dc Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Feb 2013 12:16:43 -0600 Subject: libceph: separate read and write data An osd request defines information about where data to be read should be placed as well as where data to write comes from. Currently these are represented by common fields. Keep information about data for writing separate from data to be read by splitting these into data_in and data_out fields. This is the key patch in this whole series, in that it actually identifies which osd requests generate outgoing data and which generate incoming data. It's less obvious (currently) that an osd CALL op generates both outgoing and incoming data; that's the focus of some upcoming work. This resolves: http://tracker.ceph.com/issues/4127 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 18 +++++---- fs/ceph/addr.c | 67 ++++++++++++++++++--------------- fs/ceph/file.c | 10 ++--- include/linux/ceph/osd_client.h | 5 ++- net/ceph/osd_client.c | 83 +++++++++++++++++++++++++---------------- 5 files changed, 105 insertions(+), 78 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index f189bc2909b0..3f69eb1bc656 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1398,6 +1398,7 @@ static struct ceph_osd_request *rbd_osd_req_create( struct ceph_snap_context *snapc = NULL; struct ceph_osd_client *osdc; struct ceph_osd_request *osd_req; + struct ceph_osd_data *osd_data; struct timespec now; struct timespec *mtime; u64 snap_id = CEPH_NOSNAP; @@ -1418,6 +1419,7 @@ static struct ceph_osd_request *rbd_osd_req_create( osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); if (!osd_req) return NULL; /* ENOMEM */ + osd_data = write_request ? &osd_req->r_data_out : &osd_req->r_data_in; rbd_assert(obj_request_type_valid(obj_request->type)); switch (obj_request->type) { @@ -1425,16 +1427,16 @@ static struct ceph_osd_request *rbd_osd_req_create( break; /* Nothing to do */ case OBJ_REQUEST_BIO: rbd_assert(obj_request->bio_list != NULL); - osd_req->r_data.type = CEPH_OSD_DATA_TYPE_BIO; - osd_req->r_data.bio = obj_request->bio_list; + osd_data->type = CEPH_OSD_DATA_TYPE_BIO; + osd_data->bio = obj_request->bio_list; break; case OBJ_REQUEST_PAGES: - osd_req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - osd_req->r_data.pages = obj_request->pages; - osd_req->r_data.num_pages = obj_request->page_count; - osd_req->r_data.alignment = offset & ~PAGE_MASK; - osd_req->r_data.pages_from_pool = false; - osd_req->r_data.own_pages = false; + osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; + osd_data->pages = obj_request->pages; + osd_data->num_pages = obj_request->page_count; + osd_data->alignment = offset & ~PAGE_MASK; + osd_data->pages_from_pool = false; + osd_data->own_pages = false; break; } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 276fe96f12e3..c117c51741d5 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -243,9 +243,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); - for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { - struct page *page = req->r_data.pages[i]; + BUG_ON(req->r_data_in.type != CEPH_OSD_DATA_TYPE_PAGES); + for (i = 0; i < req->r_data_in.num_pages; i++) { + struct page *page = req->r_data_in.pages[i]; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ @@ -258,8 +258,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) SetPageUptodate(page); unlock_page(page); page_cache_release(page); + bytes -= PAGE_CACHE_SIZE; } - kfree(req->r_data.pages); + kfree(req->r_data_in.pages); } static void ceph_unlock_page_vector(struct page **pages, int num_pages) @@ -337,10 +338,10 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.pages = pages; - req->r_data.num_pages = nr_pages; - req->r_data.alignment = 0; + req->r_data_in.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_in.pages = pages; + req->r_data_in.num_pages = nr_pages; + req->r_data_in.alignment = 0; req->r_callback = finish_read; req->r_inode = inode; @@ -563,7 +564,7 @@ static void writepages_finish(struct ceph_osd_request *req, long writeback_stat; unsigned issued = ceph_caps_issued(ci); - BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); + BUG_ON(req->r_data_out.type != CEPH_OSD_DATA_TYPE_PAGES); if (rc >= 0) { /* * Assume we wrote the pages we originally sent. The @@ -571,7 +572,7 @@ static void writepages_finish(struct ceph_osd_request *req, * raced with a truncation and was adjusted at the osd, * so don't believe the reply. */ - wrote = req->r_data.num_pages; + wrote = req->r_data_out.num_pages; } else { wrote = 0; mapping_set_error(mapping, rc); @@ -580,8 +581,8 @@ static void writepages_finish(struct ceph_osd_request *req, inode, rc, bytes, wrote); /* clean all pages */ - for (i = 0; i < req->r_data.num_pages; i++) { - page = req->r_data.pages[i]; + for (i = 0; i < req->r_data_out.num_pages; i++) { + page = req->r_data_out.pages[i]; BUG_ON(!page); WARN_ON(!PageUptodate(page)); @@ -610,31 +611,34 @@ static void writepages_finish(struct ceph_osd_request *req, unlock_page(page); } dout("%p wrote+cleaned %d pages\n", inode, wrote); - ceph_put_wrbuffer_cap_refs(ci, req->r_data.num_pages, snapc); + ceph_put_wrbuffer_cap_refs(ci, req->r_data_out.num_pages, snapc); - ceph_release_pages(req->r_data.pages, req->r_data.num_pages); - if (req->r_data.pages_from_pool) - mempool_free(req->r_data.pages, + ceph_release_pages(req->r_data_out.pages, req->r_data_out.num_pages); + if (req->r_data_out.pages_from_pool) + mempool_free(req->r_data_out.pages, ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); else - kfree(req->r_data.pages); + kfree(req->r_data_out.pages); ceph_osdc_put_request(req); } /* * allocate a page vec, either directly, or if necessary, via a the - * mempool. we avoid the mempool if we can because req->r_data.num_pages + * mempool. we avoid the mempool if we can because req->r_data_out.num_pages * may be less than the maximum write size. */ static void alloc_page_vec(struct ceph_fs_client *fsc, struct ceph_osd_request *req) { - req->r_data.pages = kmalloc(sizeof(struct page *) * req->r_data.num_pages, - GFP_NOFS); - if (!req->r_data.pages) { - req->r_data.pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); - req->r_data.pages_from_pool = 1; - WARN_ON(!req->r_data.pages); + size_t size; + + size = sizeof (struct page *) * req->r_data_out.num_pages; + req->r_data_out.pages = kmalloc(size, GFP_NOFS); + if (!req->r_data_out.pages) { + req->r_data_out.pages = mempool_alloc(fsc->wb_pagevec_pool, + GFP_NOFS); + req->r_data_out.pages_from_pool = 1; + WARN_ON(!req->r_data_out.pages); } } @@ -833,10 +837,11 @@ get_more_pages: break; } - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.num_pages = calc_pages_for(0, len); - req->r_data.alignment = 0; - max_pages = req->r_data.num_pages; + req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_out.num_pages = + calc_pages_for(0, len); + req->r_data_out.alignment = 0; + max_pages = req->r_data_out.num_pages; alloc_page_vec(fsc, req); req->r_callback = writepages_finish; @@ -858,7 +863,7 @@ get_more_pages: } set_page_writeback(page); - req->r_data.pages[locked_pages] = page; + req->r_data_out.pages[locked_pages] = page; locked_pages++; next = page->index + 1; } @@ -888,14 +893,14 @@ get_more_pages: } /* submit the write */ - offset = req->r_data.pages[0]->index << PAGE_CACHE_SHIFT; + offset = req->r_data_out.pages[0]->index << PAGE_CACHE_SHIFT; len = min((snap_size ? snap_size : i_size_read(inode)) - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); /* revise final length, page count */ - req->r_data.num_pages = locked_pages; + req->r_data_out.num_pages = locked_pages; req->r_request_ops[0].extent.length = cpu_to_le64(len); req->r_request_ops[0].payload_len = cpu_to_le32(len); req->r_request->hdr.data_len = cpu_to_le32(len); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3643a386ab23..501fb37b81a2 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -568,13 +568,13 @@ more: if ((file->f_flags & O_SYNC) == 0) { /* get a second commit callback */ req->r_safe_callback = sync_write_commit; - req->r_data.own_pages = 1; + req->r_data_out.own_pages = 1; } } - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.pages = pages; - req->r_data.num_pages = num_pages; - req->r_data.alignment = page_align; + req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_out.pages = pages; + req->r_data_out.num_pages = num_pages; + req->r_data_out.alignment = page_align; req->r_inode = inode; ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 56604b33dc3c..40e02603723d 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -130,8 +130,9 @@ struct ceph_osd_request { struct ceph_file_layout r_file_layout; struct ceph_snap_context *r_snapc; /* snap context for writes */ - struct ceph_osd_data r_data; - struct ceph_pagelist r_trail; /* trailing part of the data */ + struct ceph_osd_data r_data_in; + struct ceph_osd_data r_data_out; + struct ceph_pagelist r_trail; /* trailing part of data out */ }; struct ceph_osd_event { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 591e1b0cccbe..f9cf44504484 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -122,10 +122,16 @@ void ceph_osdc_release_request(struct kref *kref) } if (req->r_reply) ceph_msg_put(req->r_reply); - if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES && - req->r_data.own_pages) - ceph_release_page_vector(req->r_data.pages, - req->r_data.num_pages); + + if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && + req->r_data_in.own_pages) + ceph_release_page_vector(req->r_data_in.pages, + req->r_data_in.num_pages); + if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && + req->r_data_out.own_pages) + ceph_release_page_vector(req->r_data_out.pages, + req->r_data_out.num_pages); + ceph_put_snap_context(req->r_snapc); ceph_pagelist_release(&req->r_trail); if (req->r_mempool) @@ -189,7 +195,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, } req->r_reply = msg; - req->r_data.type = CEPH_OSD_DATA_TYPE_NONE; + req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE; + req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE; ceph_pagelist_init(&req->r_trail); /* create request message; allow space for oid */ @@ -1740,17 +1747,21 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, bool nofail) { int rc = 0; + struct ceph_osd_data *osd_data; + + /* Set up outgoing data */ - if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) { - req->r_request->pages = req->r_data.pages; - req->r_request->page_count = req->r_data.num_pages; - req->r_request->page_alignment = req->r_data.alignment; + osd_data = &req->r_data_out; + if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { + req->r_request->pages = osd_data->pages; + req->r_request->page_count = osd_data->num_pages; + req->r_request->page_alignment = osd_data->alignment; #ifdef CONFIG_BLOCK - } else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) { - req->r_request->bio = req->r_data.bio; + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { + req->r_request->bio = osd_data->bio; #endif } else { - pr_err("unknown request data type %d\n", req->r_data.type); + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); } req->r_request->trail = &req->r_trail; @@ -1939,6 +1950,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct page **pages, int num_pages, int page_align) { struct ceph_osd_request *req; + struct ceph_osd_data *osd_data; int rc = 0; dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, @@ -1951,13 +1963,15 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, return PTR_ERR(req); /* it may be a short read due to an object boundary */ - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.pages = pages; - req->r_data.num_pages = calc_pages_for(page_align, *plen); - req->r_data.alignment = page_align; + + osd_data = &req->r_data_in; + osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; + osd_data->pages = pages; + osd_data->num_pages = calc_pages_for(page_align, *plen); + osd_data->alignment = page_align; dout("readpages final extent is %llu~%llu (%d pages align %d)\n", - off, *plen, req->r_data.num_pages, page_align); + off, *plen, osd_data->num_pages, page_align); rc = ceph_osdc_start_request(osdc, req, false); if (!rc) @@ -1981,6 +1995,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct page **pages, int num_pages) { struct ceph_osd_request *req; + struct ceph_osd_data *osd_data; int rc = 0; int page_align = off & ~PAGE_MASK; @@ -1995,11 +2010,13 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return PTR_ERR(req); /* it may be a short write due to an object boundary */ - req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data.pages = pages; - req->r_data.num_pages = calc_pages_for(page_align, len); - req->r_data.alignment = page_align; - dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_data.num_pages); + osd_data = &req->r_data_out; + osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; + osd_data->pages = pages; + osd_data->num_pages = calc_pages_for(page_align, len); + osd_data->alignment = page_align; + dout("writepages %llu~%llu (%d pages)\n", off, len, + osd_data->num_pages); rc = ceph_osdc_start_request(osdc, req, true); if (!rc) @@ -2092,28 +2109,30 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_get(req->r_reply); if (data_len > 0) { - if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) { + struct ceph_osd_data *osd_data = &req->r_data_in; + + if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { int want; - want = calc_pages_for(req->r_data.alignment, data_len); - if (req->r_data.pages && - unlikely(req->r_data.num_pages < want)) { + want = calc_pages_for(osd_data->alignment, data_len); + if (osd_data->pages && + unlikely(osd_data->num_pages < want)) { pr_warning("tid %lld reply has %d bytes %d " "pages, we had only %d pages ready\n", tid, data_len, want, - req->r_data.num_pages); + osd_data->num_pages); *skip = 1; ceph_msg_put(m); m = NULL; goto out; } - m->pages = req->r_data.pages; - m->page_count = req->r_data.num_pages; - m->page_alignment = req->r_data.alignment; + m->pages = osd_data->pages; + m->page_count = osd_data->num_pages; + m->page_alignment = osd_data->alignment; #ifdef CONFIG_BLOCK - } else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) { - m->bio = req->r_data.bio; + } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { + m->bio = osd_data->bio; #endif } } -- cgit v1.2.3 From e0c594878e3211b09208c779df5f996f0b831d9e Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 7 Mar 2013 15:38:25 -0600 Subject: libceph: record byte count not page count Record the byte count for an osd request rather than the page count. The number of pages can always be derived from the byte count (and alignment/offset) but the reverse is not true. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 2 +- fs/ceph/addr.c | 33 ++++++++++++++++----------- fs/ceph/file.c | 2 +- include/linux/ceph/osd_client.h | 2 +- net/ceph/osd_client.c | 50 ++++++++++++++++++++++++----------------- 5 files changed, 52 insertions(+), 37 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 3f69eb1bc656..04cd5fdfc8f3 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1433,7 +1433,7 @@ static struct ceph_osd_request *rbd_osd_req_create( case OBJ_REQUEST_PAGES: osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; osd_data->pages = obj_request->pages; - osd_data->num_pages = obj_request->page_count; + osd_data->length = obj_request->length; osd_data->alignment = offset & ~PAGE_MASK; osd_data->pages_from_pool = false; osd_data->own_pages = false; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index c117c51741d5..45745aae4786 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -238,13 +238,16 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) struct inode *inode = req->r_inode; int rc = req->r_result; int bytes = le32_to_cpu(msg->hdr.data_len); + int num_pages; int i; dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ BUG_ON(req->r_data_in.type != CEPH_OSD_DATA_TYPE_PAGES); - for (i = 0; i < req->r_data_in.num_pages; i++) { + num_pages = calc_pages_for((u64)req->r_data_in.alignment, + (u64)req->r_data_in.length); + for (i = 0; i < num_pages; i++) { struct page *page = req->r_data_in.pages[i]; if (bytes < (int)PAGE_CACHE_SIZE) { @@ -340,7 +343,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } req->r_data_in.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data_in.pages = pages; - req->r_data_in.num_pages = nr_pages; + req->r_data_in.length = len; req->r_data_in.alignment = 0; req->r_callback = finish_read; req->r_inode = inode; @@ -555,6 +558,7 @@ static void writepages_finish(struct ceph_osd_request *req, struct ceph_inode_info *ci = ceph_inode(inode); unsigned wrote; struct page *page; + int num_pages; int i; struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; @@ -565,6 +569,8 @@ static void writepages_finish(struct ceph_osd_request *req, unsigned issued = ceph_caps_issued(ci); BUG_ON(req->r_data_out.type != CEPH_OSD_DATA_TYPE_PAGES); + num_pages = calc_pages_for((u64)req->r_data_out.alignment, + (u64)req->r_data_out.length); if (rc >= 0) { /* * Assume we wrote the pages we originally sent. The @@ -572,7 +578,7 @@ static void writepages_finish(struct ceph_osd_request *req, * raced with a truncation and was adjusted at the osd, * so don't believe the reply. */ - wrote = req->r_data_out.num_pages; + wrote = num_pages; } else { wrote = 0; mapping_set_error(mapping, rc); @@ -581,7 +587,7 @@ static void writepages_finish(struct ceph_osd_request *req, inode, rc, bytes, wrote); /* clean all pages */ - for (i = 0; i < req->r_data_out.num_pages; i++) { + for (i = 0; i < num_pages; i++) { page = req->r_data_out.pages[i]; BUG_ON(!page); WARN_ON(!PageUptodate(page)); @@ -611,9 +617,9 @@ static void writepages_finish(struct ceph_osd_request *req, unlock_page(page); } dout("%p wrote+cleaned %d pages\n", inode, wrote); - ceph_put_wrbuffer_cap_refs(ci, req->r_data_out.num_pages, snapc); + ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc); - ceph_release_pages(req->r_data_out.pages, req->r_data_out.num_pages); + ceph_release_pages(req->r_data_out.pages, num_pages); if (req->r_data_out.pages_from_pool) mempool_free(req->r_data_out.pages, ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); @@ -624,15 +630,18 @@ static void writepages_finish(struct ceph_osd_request *req, /* * allocate a page vec, either directly, or if necessary, via a the - * mempool. we avoid the mempool if we can because req->r_data_out.num_pages + * mempool. we avoid the mempool if we can because req->r_data_out.length * may be less than the maximum write size. */ static void alloc_page_vec(struct ceph_fs_client *fsc, struct ceph_osd_request *req) { size_t size; + int num_pages; - size = sizeof (struct page *) * req->r_data_out.num_pages; + num_pages = calc_pages_for((u64)req->r_data_out.alignment, + (u64)req->r_data_out.length); + size = sizeof (struct page *) * num_pages; req->r_data_out.pages = kmalloc(size, GFP_NOFS); if (!req->r_data_out.pages) { req->r_data_out.pages = mempool_alloc(fsc->wb_pagevec_pool, @@ -838,11 +847,9 @@ get_more_pages: } req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data_out.num_pages = - calc_pages_for(0, len); + req->r_data_out.length = len; req->r_data_out.alignment = 0; - max_pages = req->r_data_out.num_pages; - + max_pages = calc_pages_for(0, (u64)len); alloc_page_vec(fsc, req); req->r_callback = writepages_finish; req->r_inode = inode; @@ -900,7 +907,7 @@ get_more_pages: locked_pages, offset, len); /* revise final length, page count */ - req->r_data_out.num_pages = locked_pages; + req->r_data_out.length = len; req->r_request_ops[0].extent.length = cpu_to_le64(len); req->r_request_ops[0].payload_len = cpu_to_le32(len); req->r_request->hdr.data_len = cpu_to_le32(len); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 501fb37b81a2..0ac6e159bdc6 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -573,7 +573,7 @@ more: } req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data_out.pages = pages; - req->r_data_out.num_pages = num_pages; + req->r_data_out.length = len; req->r_data_out.alignment = page_align; req->r_inode = inode; diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 40e02603723d..a8016dfbfdba 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -63,7 +63,7 @@ struct ceph_osd_data { union { struct { struct page **pages; - u32 num_pages; + u64 length; u32 alignment; bool pages_from_pool; bool own_pages; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f9cf44504484..202af14dc6dc 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -107,6 +107,7 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, */ void ceph_osdc_release_request(struct kref *kref) { + int num_pages; struct ceph_osd_request *req = container_of(kref, struct ceph_osd_request, r_kref); @@ -124,13 +125,17 @@ void ceph_osdc_release_request(struct kref *kref) ceph_msg_put(req->r_reply); if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && - req->r_data_in.own_pages) - ceph_release_page_vector(req->r_data_in.pages, - req->r_data_in.num_pages); + req->r_data_in.own_pages) { + num_pages = calc_pages_for((u64)req->r_data_in.alignment, + (u64)req->r_data_in.length); + ceph_release_page_vector(req->r_data_in.pages, num_pages); + } if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && - req->r_data_out.own_pages) - ceph_release_page_vector(req->r_data_out.pages, - req->r_data_out.num_pages); + req->r_data_out.own_pages) { + num_pages = calc_pages_for((u64)req->r_data_out.alignment, + (u64)req->r_data_out.length); + ceph_release_page_vector(req->r_data_out.pages, num_pages); + } ceph_put_snap_context(req->r_snapc); ceph_pagelist_release(&req->r_trail); @@ -1753,8 +1758,12 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc, osd_data = &req->r_data_out; if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { + unsigned int page_count; + req->r_request->pages = osd_data->pages; - req->r_request->page_count = osd_data->num_pages; + page_count = calc_pages_for((u64)osd_data->alignment, + (u64)osd_data->length); + req->r_request->page_count = page_count; req->r_request->page_alignment = osd_data->alignment; #ifdef CONFIG_BLOCK } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { @@ -1967,11 +1976,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, osd_data = &req->r_data_in; osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; osd_data->pages = pages; - osd_data->num_pages = calc_pages_for(page_align, *plen); + osd_data->length = *plen; osd_data->alignment = page_align; - dout("readpages final extent is %llu~%llu (%d pages align %d)\n", - off, *plen, osd_data->num_pages, page_align); + dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", + off, *plen, osd_data->length, page_align); rc = ceph_osdc_start_request(osdc, req, false); if (!rc) @@ -2013,10 +2022,9 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, osd_data = &req->r_data_out; osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; osd_data->pages = pages; - osd_data->num_pages = calc_pages_for(page_align, len); + osd_data->length = len; osd_data->alignment = page_align; - dout("writepages %llu~%llu (%d pages)\n", off, len, - osd_data->num_pages); + dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); rc = ceph_osdc_start_request(osdc, req, true); if (!rc) @@ -2112,23 +2120,23 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_osd_data *osd_data = &req->r_data_in; if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { - int want; + unsigned int page_count; - want = calc_pages_for(osd_data->alignment, data_len); if (osd_data->pages && - unlikely(osd_data->num_pages < want)) { + unlikely(osd_data->length < data_len)) { - pr_warning("tid %lld reply has %d bytes %d " - "pages, we had only %d pages ready\n", - tid, data_len, want, - osd_data->num_pages); + pr_warning("tid %lld reply has %d bytes " + "we had only %llu bytes ready\n", + tid, data_len, osd_data->length); *skip = 1; ceph_msg_put(m); m = NULL; goto out; } + page_count = calc_pages_for((u64)osd_data->alignment, + (u64)osd_data->length); m->pages = osd_data->pages; - m->page_count = osd_data->num_pages; + m->page_count = page_count; m->page_alignment = osd_data->alignment; #ifdef CONFIG_BLOCK } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { -- cgit v1.2.3 From 25d71cb92d8eb48df9cbd8cc4bb28e88ee8e88d9 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 3 Apr 2013 15:03:53 -0500 Subject: ceph: use page_offset() in ceph_writepages_start() There's one spot in ceph_writepages_start() that open-codes what page_offset() does safely. Use the macro so we don't have to worry about wrapping. This resolves: http://tracker.ceph.com/issues/4648 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 45745aae4786..ae438d02a422 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -900,7 +900,7 @@ get_more_pages: } /* submit the write */ - offset = req->r_data_out.pages[0]->index << PAGE_CACHE_SHIFT; + offset = page_offset(req->r_data_out.pages[0]); len = min((snap_size ? snap_size : i_size_read(inode)) - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", -- cgit v1.2.3 From acead002b200569273bed331c93c4a91d25e10b8 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Mar 2013 14:09:05 -0500 Subject: libceph: don't build request in ceph_osdc_new_request() This patch moves the call to ceph_osdc_build_request() out of ceph_osdc_new_request() and into its caller. This is in order to defer formatting osd operation information into the request message until just before request is started. The only unusual (ab)user of ceph_osdc_build_request() is ceph_writepages_start(), where the final length of write request may change (downward) based on the current inode size or the oldest snapshot context with dirty data for the inode. The remaining callers don't change anything in the request after has been built. This means the ops array is now supplied by the caller. It also means there is no need to pass the mtime to ceph_osdc_new_request() (it gets provided to ceph_osdc_build_request()). And rather than passing a do_sync flag, have the number of ops in the ops array supplied imply adding a second STARTSYNC operation after the READ or WRITE requested. This and some of the patches that follow are related to having the messenger (only) be responsible for filling the content of the message header, as described here: http://tracker.ceph.com/issues/4589 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 36 +++++++++++++++++++++++------------- fs/ceph/file.c | 20 +++++++++++++------- include/linux/ceph/osd_client.h | 12 ++++++------ net/ceph/osd_client.c | 40 +++++++++++++++++++++------------------- 4 files changed, 63 insertions(+), 45 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ae438d02a422..681463d5459b 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -284,7 +284,9 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) &ceph_inode_to_client(inode)->client->osdc; struct ceph_inode_info *ci = ceph_inode(inode); struct page *page = list_entry(page_list->prev, struct page, lru); + struct ceph_vino vino; struct ceph_osd_request *req; + struct ceph_osd_req_op op; u64 off; u64 len; int i; @@ -308,16 +310,17 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) len = nr_pages << PAGE_CACHE_SHIFT; dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, off, len); - - req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), - off, &len, - CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, - NULL, 0, + vino = ceph_vino(inode); + req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, + 1, &op, CEPH_OSD_OP_READ, + CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, - NULL, false); + false); if (IS_ERR(req)) return PTR_ERR(req); + ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + /* build page vector */ nr_pages = calc_pages_for(0, len); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); @@ -736,6 +739,7 @@ retry: last_snapc = snapc; while (!done && index <= end) { + struct ceph_osd_req_op ops[2]; unsigned i; int first; pgoff_t next; @@ -825,20 +829,22 @@ get_more_pages: /* ok */ if (locked_pages == 0) { + struct ceph_vino vino; + int num_ops = do_sync ? 2 : 1; + /* prepare async write request */ offset = (u64) page_offset(page); len = wsize; + vino = ceph_vino(inode); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, - ceph_vino(inode), - offset, &len, + &ci->i_layout, vino, offset, &len, + num_ops, ops, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - snapc, do_sync, - ci->i_truncate_seq, - ci->i_truncate_size, - &inode->i_mtime, true); + snapc, ci->i_truncate_seq, + ci->i_truncate_size, true); if (IS_ERR(req)) { rc = PTR_ERR(req); @@ -846,6 +852,10 @@ get_more_pages: break; } + ceph_osdc_build_request(req, offset, + num_ops, ops, snapc, vino.snap, + &inode->i_mtime); + req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data_out.length = len; req->r_data_out.alignment = 0; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index aeafa67bfe99..3d6dcf23b4ad 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -475,14 +475,17 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_snap_context *snapc; + struct ceph_vino vino; struct ceph_osd_request *req; + struct ceph_osd_req_op ops[2]; + int num_ops = 1; struct page **pages; int num_pages; long long unsigned pos; u64 len; int written = 0; int flags; - int do_sync = 0; int check_caps = 0; int page_align, io_align; unsigned long buf_align; @@ -516,7 +519,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) flags |= CEPH_OSD_FLAG_ACK; else - do_sync = 1; + num_ops++; /* Also include a 'startsync' command. */ /* * we may need to do multiple writes here if we span an object @@ -527,16 +530,19 @@ more: buf_align = (unsigned long)data & ~PAGE_MASK; len = left; + snapc = ci->i_snap_realm->cached_context; + vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - ceph_vino(inode), pos, &len, - CEPH_OSD_OP_WRITE, flags, - ci->i_snap_realm->cached_context, - do_sync, + vino, pos, &len, num_ops, ops, + CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, - &mtime, false); + false); if (IS_ERR(req)) return PTR_ERR(req); + ceph_osdc_build_request(req, pos, num_ops, ops, + snapc, vino.snap, &mtime); + /* write from beginning of first page, regardless of io alignment */ page_align = file->f_flags & O_DIRECT ? buf_align : io_align; num_pages = calc_pages_for(page_align, len); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index fdda93ebbb4c..ffaf9076fdc4 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -243,12 +243,12 @@ extern void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_snap_context *snapc, - unsigned int num_op, + unsigned int num_ops, bool use_mempool, gfp_t gfp_flags); extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, - unsigned int num_op, + unsigned int num_ops, struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, u64 snap_id, @@ -257,11 +257,11 @@ extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_file_layout *layout, struct ceph_vino vino, - u64 offset, u64 *len, int op, int flags, + u64 offset, u64 *len, + int num_ops, struct ceph_osd_req_op *ops, + int opcode, int flags, struct ceph_snap_context *snapc, - int do_sync, u32 truncate_seq, - u64 truncate_size, - struct timespec *mtime, + u32 truncate_seq, u64 truncate_size, bool use_mempool); extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0b4951e27532..115790aac30a 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -512,9 +512,7 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, msg->front.iov_len = msg_size; msg->hdr.front_len = cpu_to_le32(msg_size); - dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, - num_ops); - return; + dout("build_request msg_size was %d\n", (int)msg_size); } EXPORT_SYMBOL(ceph_osdc_build_request); @@ -532,18 +530,15 @@ EXPORT_SYMBOL(ceph_osdc_build_request); struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_vino vino, - u64 off, u64 *plen, + u64 off, u64 *plen, int num_ops, + struct ceph_osd_req_op *ops, int opcode, int flags, struct ceph_snap_context *snapc, - int do_sync, u32 truncate_seq, u64 truncate_size, - struct timespec *mtime, bool use_mempool) { - struct ceph_osd_req_op ops[2]; struct ceph_osd_request *req; - unsigned int num_op = do_sync ? 2 : 1; u64 objnum = 0; u64 objoff = 0; u64 objlen = 0; @@ -553,7 +548,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); - req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, + req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); @@ -578,7 +573,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, osd_req_op_extent_init(&ops[0], opcode, objoff, objlen, truncate_size, truncate_seq); - if (do_sync) + /* + * A second op in the ops array means the caller wants to + * also issue a include a 'startsync' command so that the + * osd will flush data quickly. + */ + if (num_ops > 1) osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC); req->r_file_layout = *layout; /* keep a copy */ @@ -587,9 +587,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, vino.ino, objnum); req->r_oid_len = strlen(req->r_oid); - ceph_osdc_build_request(req, off, num_op, ops, - snapc, vino.snap, mtime); - return req; } EXPORT_SYMBOL(ceph_osdc_new_request); @@ -2047,17 +2044,20 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, { struct ceph_osd_request *req; struct ceph_osd_data *osd_data; + struct ceph_osd_req_op op; int rc = 0; dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, vino.snap, off, *plen); - req = ceph_osdc_new_request(osdc, layout, vino, off, plen, + req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, &op, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, - NULL, 0, truncate_seq, truncate_size, NULL, + NULL, truncate_seq, truncate_size, false); if (IS_ERR(req)) return PTR_ERR(req); + ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + /* it may be a short read due to an object boundary */ osd_data = &req->r_data_in; @@ -2092,19 +2092,21 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, { struct ceph_osd_request *req; struct ceph_osd_data *osd_data; + struct ceph_osd_req_op op; int rc = 0; int page_align = off & ~PAGE_MASK; - BUG_ON(vino.snap != CEPH_NOSNAP); - req = ceph_osdc_new_request(osdc, layout, vino, off, &len, + BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, &op, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - snapc, 0, - truncate_seq, truncate_size, mtime, + snapc, truncate_seq, truncate_size, true); if (IS_ERR(req)) return PTR_ERR(req); + ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime); + /* it may be a short write due to an object boundary */ osd_data = &req->r_data_out; osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; -- cgit v1.2.3 From 94fe8420bf519acd641ecbd442a0a79c1a024212 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Mar 2013 14:09:05 -0500 Subject: ceph: define ceph_writepages_osd_request() Mostly for readability, define ceph_writepages_osd_request() and use it to allocate the osd request for ceph_writepages_start(). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 681463d5459b..f2de9ec27db3 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -654,6 +654,26 @@ static void alloc_page_vec(struct ceph_fs_client *fsc, } } +static struct ceph_osd_request * +ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len, + struct ceph_snap_context *snapc, + int num_ops, struct ceph_osd_req_op *ops) +{ + struct ceph_fs_client *fsc; + struct ceph_inode_info *ci; + struct ceph_vino vino; + + fsc = ceph_inode_to_client(inode); + ci = ceph_inode(inode); + vino = ceph_vino(inode); + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + + return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + vino, offset, len, num_ops, ops, CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK, + snapc, ci->i_truncate_seq, ci->i_truncate_size, true); +} + /* * initiate async writeback */ @@ -835,16 +855,9 @@ get_more_pages: /* prepare async write request */ offset = (u64) page_offset(page); len = wsize; - vino = ceph_vino(inode); - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, offset, &len, - num_ops, ops, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, - snapc, ci->i_truncate_seq, - ci->i_truncate_size, true); + req = ceph_writepages_osd_request(inode, + offset, &len, snapc, + num_ops, ops); if (IS_ERR(req)) { rc = PTR_ERR(req); @@ -852,6 +865,7 @@ get_more_pages: break; } + vino = ceph_vino(inode); ceph_osdc_build_request(req, offset, num_ops, ops, snapc, vino.snap, &inode->i_mtime); -- cgit v1.2.3 From 88486957f9fbf52ff4313ff52d583110a6503c28 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Mar 2013 14:09:05 -0500 Subject: ceph: kill ceph alloc_page_vec() There is a helper function alloc_page_vec() that, despite its generic sounding name depends heavily on an osd request structure being populated with certain information. There is only one place this function is used, and it ends up being a bit simpler to just open code what it does, so get rid of the helper. The real motivation for this is deferring building the of the osd request message, and this is a step in that direction. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 45 ++++++++++++++++++--------------------------- 1 file changed, 18 insertions(+), 27 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f2de9ec27db3..7b6d9b22e254 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -631,29 +631,6 @@ static void writepages_finish(struct ceph_osd_request *req, ceph_osdc_put_request(req); } -/* - * allocate a page vec, either directly, or if necessary, via a the - * mempool. we avoid the mempool if we can because req->r_data_out.length - * may be less than the maximum write size. - */ -static void alloc_page_vec(struct ceph_fs_client *fsc, - struct ceph_osd_request *req) -{ - size_t size; - int num_pages; - - num_pages = calc_pages_for((u64)req->r_data_out.alignment, - (u64)req->r_data_out.length); - size = sizeof (struct page *) * num_pages; - req->r_data_out.pages = kmalloc(size, GFP_NOFS); - if (!req->r_data_out.pages) { - req->r_data_out.pages = mempool_alloc(fsc->wb_pagevec_pool, - GFP_NOFS); - req->r_data_out.pages_from_pool = 1; - WARN_ON(!req->r_data_out.pages); - } -} - static struct ceph_osd_request * ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len, struct ceph_snap_context *snapc, @@ -851,6 +828,9 @@ get_more_pages: if (locked_pages == 0) { struct ceph_vino vino; int num_ops = do_sync ? 2 : 1; + size_t size; + struct page **pages; + mempool_t *pool = NULL; /* prepare async write request */ offset = (u64) page_offset(page); @@ -870,13 +850,24 @@ get_more_pages: num_ops, ops, snapc, vino.snap, &inode->i_mtime); + req->r_callback = writepages_finish; + req->r_inode = inode; + + max_pages = calc_pages_for(0, (u64)len); + size = max_pages * sizeof (*pages); + pages = kmalloc(size, GFP_NOFS); + if (!pages) { + pool = fsc->wb_pagevec_pool; + + pages = mempool_alloc(pool, GFP_NOFS); + WARN_ON(!pages); + } + + req->r_data_out.pages = pages; + req->r_data_out.pages_from_pool = !!pool; req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; req->r_data_out.length = len; req->r_data_out.alignment = 0; - max_pages = calc_pages_for(0, (u64)len); - alloc_page_vec(fsc, req); - req->r_callback = writepages_finish; - req->r_inode = inode; } /* note position of first page in pvec */ -- cgit v1.2.3 From 02ee07d3002e6c0b0c4ea1982cd7e6aeca203ed6 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Mar 2013 14:09:06 -0500 Subject: libceph: hold off building osd request Defer building the osd request until just before submitting it in all callers except ceph_writepages_start(). (That caller will be handed in the next patch.) Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 4 ++-- fs/ceph/file.c | 7 ++++--- net/ceph/osd_client.c | 8 ++++---- 3 files changed, 10 insertions(+), 9 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 7b6d9b22e254..0a3d2ce89660 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -319,8 +319,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) if (IS_ERR(req)) return PTR_ERR(req); - ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); - /* build page vector */ nr_pages = calc_pages_for(0, len); pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); @@ -351,6 +349,8 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) req->r_callback = finish_read; req->r_inode = inode; + ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); ret = ceph_osdc_start_request(osdc, req, false); if (ret < 0) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3d6dcf23b4ad..47826c2ef511 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -540,9 +540,6 @@ more: if (IS_ERR(req)) return PTR_ERR(req); - ceph_osdc_build_request(req, pos, num_ops, ops, - snapc, vino.snap, &mtime); - /* write from beginning of first page, regardless of io alignment */ page_align = file->f_flags & O_DIRECT ? buf_align : io_align; num_pages = calc_pages_for(page_align, len); @@ -583,6 +580,10 @@ more: req->r_data_out.alignment = page_align; req->r_inode = inode; + /* BUG_ON(vino.snap != CEPH_NOSNAP); */ + ceph_osdc_build_request(req, pos, num_ops, ops, + snapc, vino.snap, &mtime); + ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (!ret) { if (req->r_safe_callback) { diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 115790aac30a..9ca693d0df19 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2056,8 +2056,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, if (IS_ERR(req)) return PTR_ERR(req); - ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); - /* it may be a short read due to an object boundary */ osd_data = &req->r_data_in; @@ -2069,6 +2067,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", off, *plen, osd_data->length, page_align); + ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + rc = ceph_osdc_start_request(osdc, req, false); if (!rc) rc = ceph_osdc_wait_request(osdc, req); @@ -2105,8 +2105,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, if (IS_ERR(req)) return PTR_ERR(req); - ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime); - /* it may be a short write due to an object boundary */ osd_data = &req->r_data_out; osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; @@ -2115,6 +2113,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, osd_data->alignment = page_align; dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); + ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime); + rc = ceph_osdc_start_request(osdc, req, true); if (!rc) rc = ceph_osdc_wait_request(osdc, req); -- cgit v1.2.3 From e5975c7c8eb6aeab8d2f76a98c368081082795e0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Thu, 14 Mar 2013 14:09:05 -0500 Subject: ceph: build osd request message later for writepages Hold off building the osd request message in ceph_writepages_start() until just before it will be submitted to the osd client for execution. We'll still create the request and allocate the page pointer array after we learn we have at least one page to write. A local variable will be used to keep track of the allocated array of pages. Wait until just before submitting the request for assigning that page array pointer to the request message. Create ands use a new function osd_req_op_extent_update() whose purpose is to serve this one spot where the length value supplied when an osd request's op was initially formatted might need to get changed (reduced, never increased) before submitting the request. Previously, ceph_writepages_start() assigned the message header's data length because of this update. That's no longer necessary, because ceph_osdc_build_request() will recalculate the right value to use based on the content of the ops in the request. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 59 +++++++++++++++++++++++------------------ include/linux/ceph/osd_client.h | 1 + net/ceph/osd_client.c | 13 +++++++++ 3 files changed, 47 insertions(+), 26 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 0a3d2ce89660..5d8ce79385ed 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -737,10 +737,14 @@ retry: while (!done && index <= end) { struct ceph_osd_req_op ops[2]; + int num_ops = do_sync ? 2 : 1; + struct ceph_vino vino; unsigned i; int first; pgoff_t next; int pvec_pages, locked_pages; + struct page **pages = NULL; + mempool_t *pool = NULL; /* Becomes non-null if mempool used */ struct page *page; int want; u64 offset, len; @@ -824,16 +828,19 @@ get_more_pages: break; } - /* ok */ + /* + * We have something to write. If this is + * the first locked page this time through, + * allocate an osd request and a page array + * that it will use. + */ if (locked_pages == 0) { - struct ceph_vino vino; - int num_ops = do_sync ? 2 : 1; size_t size; - struct page **pages; - mempool_t *pool = NULL; + + BUG_ON(pages); /* prepare async write request */ - offset = (u64) page_offset(page); + offset = (u64)page_offset(page); len = wsize; req = ceph_writepages_osd_request(inode, offset, &len, snapc, @@ -845,11 +852,6 @@ get_more_pages: break; } - vino = ceph_vino(inode); - ceph_osdc_build_request(req, offset, - num_ops, ops, snapc, vino.snap, - &inode->i_mtime); - req->r_callback = writepages_finish; req->r_inode = inode; @@ -858,16 +860,9 @@ get_more_pages: pages = kmalloc(size, GFP_NOFS); if (!pages) { pool = fsc->wb_pagevec_pool; - pages = mempool_alloc(pool, GFP_NOFS); - WARN_ON(!pages); + BUG_ON(!pages); } - - req->r_data_out.pages = pages; - req->r_data_out.pages_from_pool = !!pool; - req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data_out.length = len; - req->r_data_out.alignment = 0; } /* note position of first page in pvec */ @@ -885,7 +880,7 @@ get_more_pages: } set_page_writeback(page); - req->r_data_out.pages[locked_pages] = page; + pages[locked_pages] = page; locked_pages++; next = page->index + 1; } @@ -914,18 +909,30 @@ get_more_pages: pvec.nr -= i-first; } - /* submit the write */ - offset = page_offset(req->r_data_out.pages[0]); + /* Format the osd request message and submit the write */ + + offset = page_offset(pages[0]); len = min((snap_size ? snap_size : i_size_read(inode)) - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - /* revise final length, page count */ + req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; + req->r_data_out.pages = pages; req->r_data_out.length = len; - req->r_request_ops[0].extent.length = cpu_to_le64(len); - req->r_request_ops[0].payload_len = cpu_to_le32(len); - req->r_request->hdr.data_len = cpu_to_le32(len); + req->r_data_out.alignment = 0; + req->r_data_out.pages_from_pool = !!pool; + + pages = NULL; /* request message now owns the pages array */ + pool = NULL; + + /* Update the write op length in case we changed it */ + + osd_req_op_extent_update(&ops[0], len); + + vino = ceph_vino(inode); + ceph_osdc_build_request(req, offset, num_ops, ops, + snapc, vino.snap, &inode->i_mtime); rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); BUG_ON(rc); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index ffaf9076fdc4..5ee1a3776b4b 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -234,6 +234,7 @@ extern void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode); extern void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq); +extern void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length); extern void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, const char *class, const char *method, const void *request_data, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 9ca693d0df19..426ca1f2a721 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -296,6 +296,19 @@ void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, } EXPORT_SYMBOL(osd_req_op_extent_init); +void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length) +{ + u64 previous = op->extent.length; + + if (length == previous) + return; /* Nothing to do */ + BUG_ON(length > previous); + + op->extent.length = length; + op->payload_len -= previous - length; +} +EXPORT_SYMBOL(osd_req_op_extent_update); + void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size) -- cgit v1.2.3 From 43bfe5de9fa78e07248b70992ce50321efec622c Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 3 Apr 2013 01:28:57 -0500 Subject: libceph: define osd data initialization helpers Define and use functions that encapsulate the initializion of a ceph_osd_data structure. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 14 ++++------- fs/ceph/addr.c | 13 +++------- fs/ceph/file.c | 10 +++----- include/linux/ceph/osd_client.h | 11 +++++++++ net/ceph/osd_client.c | 55 +++++++++++++++++++++++++++++------------ 5 files changed, 63 insertions(+), 40 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index afbc9f6f8ff1..ab21b5218ae3 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1350,17 +1350,13 @@ static struct ceph_osd_request *rbd_osd_req_create( break; /* Nothing to do */ case OBJ_REQUEST_BIO: rbd_assert(obj_request->bio_list != NULL); - osd_data->type = CEPH_OSD_DATA_TYPE_BIO; - osd_data->bio = obj_request->bio_list; - osd_data->bio_length = obj_request->length; + ceph_osd_data_bio_init(osd_data, obj_request->bio_list, + obj_request->length); break; case OBJ_REQUEST_PAGES: - osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; - osd_data->pages = obj_request->pages; - osd_data->length = obj_request->length; - osd_data->alignment = offset & ~PAGE_MASK; - osd_data->pages_from_pool = false; - osd_data->own_pages = false; + ceph_osd_data_pages_init(osd_data, obj_request->pages, + obj_request->length, offset & ~PAGE_MASK, + false, false); break; } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5d8ce79385ed..cf9032abc8f5 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -342,10 +342,8 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - req->r_data_in.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data_in.pages = pages; - req->r_data_in.length = len; - req->r_data_in.alignment = 0; + ceph_osd_data_pages_init(&req->r_data_in, pages, len, 0, + false, false); req->r_callback = finish_read; req->r_inode = inode; @@ -917,11 +915,8 @@ get_more_pages: dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data_out.pages = pages; - req->r_data_out.length = len; - req->r_data_out.alignment = 0; - req->r_data_out.pages_from_pool = !!pool; + ceph_osd_data_pages_init(&req->r_data_out, pages, len, 0, + !!pool, false); pages = NULL; /* request message now owns the pages array */ pool = NULL; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 47826c2ef511..da642af14a28 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -491,6 +491,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, unsigned long buf_align; int ret; struct timespec mtime = CURRENT_TIME; + bool own_pages = false; if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; @@ -571,14 +572,11 @@ more: if ((file->f_flags & O_SYNC) == 0) { /* get a second commit callback */ req->r_safe_callback = sync_write_commit; - req->r_data_out.own_pages = 1; + own_pages = true; } } - req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES; - req->r_data_out.pages = pages; - req->r_data_out.length = len; - req->r_data_out.alignment = page_align; - req->r_inode = inode; + ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align, + false, own_pages); /* BUG_ON(vino.snap != CEPH_NOSNAP); */ ceph_osdc_build_request(req, pos, num_ops, ops, diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 5ee1a3776b4b..af60dac1f9c0 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -280,6 +280,17 @@ static inline void ceph_osdc_put_request(struct ceph_osd_request *req) kref_put(&req->r_kref, ceph_osdc_release_request); } +extern void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, + struct page **pages, u64 length, + u32 alignment, bool pages_from_pool, + bool own_pages); +extern void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, + struct ceph_pagelist *pagelist); +#ifdef CONFIG_BLOCK +extern void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, + struct bio *bio, size_t bio_length); +#endif /* CONFIG_BLOCK */ + extern int ceph_osdc_start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, bool nofail); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 1379b3313348..f8f8561b602e 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -79,6 +79,38 @@ static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, return 0; } +void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, + struct page **pages, u64 length, u32 alignment, + bool pages_from_pool, bool own_pages) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; + osd_data->pages = pages; + osd_data->length = length; + osd_data->alignment = alignment; + osd_data->pages_from_pool = pages_from_pool; + osd_data->own_pages = own_pages; +} +EXPORT_SYMBOL(ceph_osd_data_pages_init); + +void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, + struct ceph_pagelist *pagelist) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; + osd_data->pagelist = pagelist; +} +EXPORT_SYMBOL(ceph_osd_data_pagelist_init); + +#ifdef CONFIG_BLOCK +void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, + struct bio *bio, size_t bio_length) +{ + osd_data->type = CEPH_OSD_DATA_TYPE_BIO; + osd_data->bio = bio; + osd_data->bio_length = bio_length; +} +EXPORT_SYMBOL(ceph_osd_data_bio_init); +#endif /* CONFIG_BLOCK */ + /* * requests */ @@ -400,8 +432,7 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, ceph_pagelist_append(pagelist, src->cls.indata, src->cls.indata_len); - req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST; - req->r_data_out.pagelist = pagelist; + ceph_osd_data_pagelist_init(&req->r_data_out, pagelist); out_data_len = pagelist->length; break; case CEPH_OSD_OP_STARTSYNC: @@ -2056,7 +2087,6 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct page **pages, int num_pages, int page_align) { struct ceph_osd_request *req; - struct ceph_osd_data *osd_data; struct ceph_osd_req_op op; int rc = 0; @@ -2071,14 +2101,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, /* it may be a short read due to an object boundary */ - osd_data = &req->r_data_in; - osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; - osd_data->pages = pages; - osd_data->length = *plen; - osd_data->alignment = page_align; + ceph_osd_data_pages_init(&req->r_data_in, pages, *plen, page_align, + false, false); dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", - off, *plen, osd_data->length, page_align); + off, *plen, *plen, page_align); ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); @@ -2104,7 +2131,6 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct page **pages, int num_pages) { struct ceph_osd_request *req; - struct ceph_osd_data *osd_data; struct ceph_osd_req_op op; int rc = 0; int page_align = off & ~PAGE_MASK; @@ -2119,12 +2145,9 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return PTR_ERR(req); /* it may be a short write due to an object boundary */ - osd_data = &req->r_data_out; - osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; - osd_data->pages = pages; - osd_data->length = len; - osd_data->alignment = page_align; - dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); + ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align, + false, false); + dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime); -- cgit v1.2.3 From 87060c1089a94f89590fc0606b5178f5556833f0 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 3 Apr 2013 01:28:58 -0500 Subject: libceph: a few more osd data cleanups These are very small changes that make use osd_data local pointers as shorthands for structures being operated on. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- fs/ceph/addr.c | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index cf9032abc8f5..127be29a6c22 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -236,6 +236,7 @@ static int ceph_readpage(struct file *filp, struct page *page) static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) { struct inode *inode = req->r_inode; + struct ceph_osd_data *osd_data; int rc = req->r_result; int bytes = le32_to_cpu(msg->hdr.data_len); int num_pages; @@ -244,11 +245,12 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - BUG_ON(req->r_data_in.type != CEPH_OSD_DATA_TYPE_PAGES); - num_pages = calc_pages_for((u64)req->r_data_in.alignment, - (u64)req->r_data_in.length); + osd_data = &req->r_data_in; + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); + num_pages = calc_pages_for((u64)osd_data->alignment, + (u64)osd_data->length); for (i = 0; i < num_pages; i++) { - struct page *page = req->r_data_in.pages[i]; + struct page *page = osd_data->pages[i]; if (bytes < (int)PAGE_CACHE_SIZE) { /* zero (remainder of) page */ @@ -263,7 +265,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) page_cache_release(page); bytes -= PAGE_CACHE_SIZE; } - kfree(req->r_data_in.pages); + kfree(osd_data->pages); } static void ceph_unlock_page_vector(struct page **pages, int num_pages) @@ -557,6 +559,7 @@ static void writepages_finish(struct ceph_osd_request *req, { struct inode *inode = req->r_inode; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_osd_data *osd_data; unsigned wrote; struct page *page; int num_pages; @@ -569,9 +572,10 @@ static void writepages_finish(struct ceph_osd_request *req, long writeback_stat; unsigned issued = ceph_caps_issued(ci); - BUG_ON(req->r_data_out.type != CEPH_OSD_DATA_TYPE_PAGES); - num_pages = calc_pages_for((u64)req->r_data_out.alignment, - (u64)req->r_data_out.length); + osd_data = &req->r_data_out; + BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); + num_pages = calc_pages_for((u64)osd_data->alignment, + (u64)osd_data->length); if (rc >= 0) { /* * Assume we wrote the pages we originally sent. The @@ -589,7 +593,7 @@ static void writepages_finish(struct ceph_osd_request *req, /* clean all pages */ for (i = 0; i < num_pages; i++) { - page = req->r_data_out.pages[i]; + page = osd_data->pages[i]; BUG_ON(!page); WARN_ON(!PageUptodate(page)); @@ -620,12 +624,12 @@ static void writepages_finish(struct ceph_osd_request *req, dout("%p wrote+cleaned %d pages\n", inode, wrote); ceph_put_wrbuffer_cap_refs(ci, num_pages, snapc); - ceph_release_pages(req->r_data_out.pages, num_pages); - if (req->r_data_out.pages_from_pool) - mempool_free(req->r_data_out.pages, + ceph_release_pages(osd_data->pages, num_pages); + if (osd_data->pages_from_pool) + mempool_free(osd_data->pages, ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); else - kfree(req->r_data_out.pages); + kfree(osd_data->pages); ceph_osdc_put_request(req); } -- cgit v1.2.3 From 79528734f3ae4699a2886f62f55e18fb34fb3651 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 3 Apr 2013 21:32:51 -0500 Subject: libceph: keep source rather than message osd op array An osd request keeps a pointer to the osd operations (ops) array that it builds in its request message. In order to allow each op in the array to have its own distinct data, we will need to keep track of each op's data, and that information does not go over the wire. As long as we're tracking the data we might as well just track the entire (source) op definition for each of the ops. And if we're doing that, we'll have no more need to keep a pointer to the wire-encoded version. This patch makes the array of source ops be kept with the osd request structure, and uses that instead of the version encoded in the message in places where that was previously used. The array will be embedded in the request structure, and the maximum number of ops we ever actually use is currently 2. So reduce CEPH_OSD_MAX_OP to 2 to reduce the size of the structure. The result of doing this sort of ripples back up, and as a result various function parameters and local variables become unnecessary. Make r_num_ops be unsigned, and move the definition of struct ceph_osd_req_op earlier to ensure it's defined where needed. It does not yet add per-op data, that's coming soon. This resolves: http://tracker.ceph.com/issues/4656 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 42 ++++++++++++++----------- fs/ceph/addr.c | 21 ++++++------- fs/ceph/file.c | 6 ++-- include/linux/ceph/osd_client.h | 70 ++++++++++++++++++++--------------------- net/ceph/debugfs.c | 4 +-- net/ceph/osd_client.c | 53 ++++++++++++++++--------------- 6 files changed, 97 insertions(+), 99 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 4a4be14a9189..c12b55559f16 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1285,7 +1285,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, */ obj_request->xferred = osd_req->r_reply_op_len[0]; rbd_assert(obj_request->xferred < (u64) UINT_MAX); - opcode = osd_req->r_request_ops[0].op; + opcode = osd_req->r_ops[0].op; switch (opcode) { case CEPH_OSD_OP_READ: rbd_osd_read_callback(obj_request); @@ -1312,8 +1312,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, } static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request, - bool write_request, - struct ceph_osd_req_op *op) + bool write_request) { struct rbd_img_request *img_request = obj_request->img_request; struct ceph_snap_context *snapc = NULL; @@ -1333,7 +1332,7 @@ static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request, } ceph_osdc_build_request(obj_request->osd_req, obj_request->offset, - 1, op, snapc, snap_id, mtime); + snapc, snap_id, mtime); } static struct ceph_osd_request *rbd_osd_req_create( @@ -1562,7 +1561,7 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, while (resid) { const char *object_name; unsigned int clone_size; - struct ceph_osd_req_op op; + struct ceph_osd_req_op *op; u64 offset; u64 length; @@ -1591,8 +1590,9 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, if (!obj_request->osd_req) goto out_partial; - osd_req_op_extent_init(&op, opcode, offset, length, 0, 0); - rbd_osd_req_format_op(obj_request, write_request, &op); + op = &obj_request->osd_req->r_ops[0]; + osd_req_op_extent_init(op, opcode, offset, length, 0, 0); + rbd_osd_req_format_op(obj_request, write_request); /* status and version are initially zero-filled */ @@ -1694,7 +1694,7 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 ver, u64 notify_id) { struct rbd_obj_request *obj_request; - struct ceph_osd_req_op op; + struct ceph_osd_req_op *op; struct ceph_osd_client *osdc; int ret; @@ -1708,8 +1708,9 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - osd_req_op_watch_init(&op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0); - rbd_osd_req_format_op(obj_request, false, &op); + op = &obj_request->osd_req->r_ops[0]; + osd_req_op_watch_init(op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0); + rbd_osd_req_format_op(obj_request, false); osdc = &rbd_dev->rbd_client->client->osdc; obj_request->callback = rbd_obj_request_put; @@ -1749,7 +1750,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_req_op op; + struct ceph_osd_req_op *op; int ret; rbd_assert(start ^ !!rbd_dev->watch_event); @@ -1773,10 +1774,11 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) if (!obj_request->osd_req) goto out_cancel; - osd_req_op_watch_init(&op, CEPH_OSD_OP_WATCH, + op = &obj_request->osd_req->r_ops[0]; + osd_req_op_watch_init(op, CEPH_OSD_OP_WATCH, rbd_dev->watch_event->cookie, rbd_dev->header.obj_version, start); - rbd_osd_req_format_op(obj_request, true, &op); + rbd_osd_req_format_op(obj_request, true); if (start) ceph_osdc_set_request_linger(osdc, obj_request->osd_req); @@ -1836,7 +1838,7 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, { struct rbd_obj_request *obj_request; struct ceph_osd_client *osdc; - struct ceph_osd_req_op op; + struct ceph_osd_req_op *op; struct page **pages; u32 page_count; int ret; @@ -1866,9 +1868,10 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - osd_req_op_cls_init(&op, CEPH_OSD_OP_CALL, class_name, method_name, + op = &obj_request->osd_req->r_ops[0]; + osd_req_op_cls_init(op, CEPH_OSD_OP_CALL, class_name, method_name, outbound, outbound_size); - rbd_osd_req_format_op(obj_request, false, &op); + rbd_osd_req_format_op(obj_request, false); osdc = &rbd_dev->rbd_client->client->osdc; ret = rbd_obj_request_submit(osdc, obj_request); @@ -2046,8 +2049,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, char *buf, u64 *version) { - struct ceph_osd_req_op op; struct rbd_obj_request *obj_request; + struct ceph_osd_req_op *op; struct ceph_osd_client *osdc; struct page **pages = NULL; u32 page_count; @@ -2072,8 +2075,9 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - osd_req_op_extent_init(&op, CEPH_OSD_OP_READ, offset, length, 0, 0); - rbd_osd_req_format_op(obj_request, false, &op); + op = &obj_request->osd_req->r_ops[0]; + osd_req_op_extent_init(op, CEPH_OSD_OP_READ, offset, length, 0, 0); + rbd_osd_req_format_op(obj_request, false); osdc = &rbd_dev->rbd_client->client->osdc; ret = rbd_obj_request_submit(osdc, obj_request); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 127be29a6c22..c9da074f0fe6 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -288,7 +288,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) struct page *page = list_entry(page_list->prev, struct page, lru); struct ceph_vino vino; struct ceph_osd_request *req; - struct ceph_osd_req_op op; u64 off; u64 len; int i; @@ -314,7 +313,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) off, len); vino = ceph_vino(inode); req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, - 1, &op, CEPH_OSD_OP_READ, + 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, false); @@ -349,7 +348,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) req->r_callback = finish_read; req->r_inode = inode; - ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); ret = ceph_osdc_start_request(osdc, req, false); @@ -567,7 +566,7 @@ static void writepages_finish(struct ceph_osd_request *req, struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; int rc = req->r_result; - u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length); + u64 bytes = req->r_ops[0].extent.length; struct ceph_fs_client *fsc = ceph_inode_to_client(inode); long writeback_stat; unsigned issued = ceph_caps_issued(ci); @@ -635,8 +634,7 @@ static void writepages_finish(struct ceph_osd_request *req, static struct ceph_osd_request * ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len, - struct ceph_snap_context *snapc, - int num_ops, struct ceph_osd_req_op *ops) + struct ceph_snap_context *snapc, int num_ops) { struct ceph_fs_client *fsc; struct ceph_inode_info *ci; @@ -648,7 +646,7 @@ ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len, /* BUG_ON(vino.snap != CEPH_NOSNAP); */ return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, offset, len, num_ops, ops, CEPH_OSD_OP_WRITE, + vino, offset, len, num_ops, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK, snapc, ci->i_truncate_seq, ci->i_truncate_size, true); } @@ -738,7 +736,6 @@ retry: last_snapc = snapc; while (!done && index <= end) { - struct ceph_osd_req_op ops[2]; int num_ops = do_sync ? 2 : 1; struct ceph_vino vino; unsigned i; @@ -846,7 +843,7 @@ get_more_pages: len = wsize; req = ceph_writepages_osd_request(inode, offset, &len, snapc, - num_ops, ops); + num_ops); if (IS_ERR(req)) { rc = PTR_ERR(req); @@ -927,11 +924,11 @@ get_more_pages: /* Update the write op length in case we changed it */ - osd_req_op_extent_update(&ops[0], len); + osd_req_op_extent_update(&req->r_ops[0], len); vino = ceph_vino(inode); - ceph_osdc_build_request(req, offset, num_ops, ops, - snapc, vino.snap, &inode->i_mtime); + ceph_osdc_build_request(req, offset, snapc, vino.snap, + &inode->i_mtime); rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); BUG_ON(rc); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index da642af14a28..a12f47642c40 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -478,7 +478,6 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; - struct ceph_osd_req_op ops[2]; int num_ops = 1; struct page **pages; int num_pages; @@ -534,7 +533,7 @@ more: snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, num_ops, ops, + vino, pos, &len, num_ops, CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, false); @@ -579,8 +578,7 @@ more: false, own_pages); /* BUG_ON(vino.snap != CEPH_NOSNAP); */ - ceph_osdc_build_request(req, pos, num_ops, ops, - snapc, vino.snap, &mtime); + ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); if (!ret) { diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index af60dac1f9c0..f4c1a2a22a14 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -48,7 +48,7 @@ struct ceph_osd { }; -#define CEPH_OSD_MAX_OP 10 +#define CEPH_OSD_MAX_OP 2 enum ceph_osd_data_type { CEPH_OSD_DATA_TYPE_NONE, @@ -79,6 +79,34 @@ struct ceph_osd_data { }; }; +struct ceph_osd_req_op { + u16 op; /* CEPH_OSD_OP_* */ + u32 payload_len; + union { + struct { + u64 offset, length; + u64 truncate_size; + u32 truncate_seq; + } extent; + struct { + const char *class_name; + const char *method_name; + const void *indata; + u32 indata_len; + __u8 class_len; + __u8 method_len; + __u8 argc; + } cls; + struct { + u64 cookie; + u64 ver; + u32 prot_ver; + u32 timeout; + __u8 flag; + } watch; + }; +}; + /* an in-flight request */ struct ceph_osd_request { u64 r_tid; /* unique for this client */ @@ -95,10 +123,11 @@ struct ceph_osd_request { struct ceph_msg *r_request, *r_reply; int r_flags; /* any additional flags for the osd */ u32 r_sent; /* >0 if r_request is sending/sent */ - int r_num_ops; - /* encoded message content */ - struct ceph_osd_op *r_request_ops; + /* request osd ops array */ + unsigned int r_num_ops; + struct ceph_osd_req_op r_ops[CEPH_OSD_MAX_OP]; + /* these are updated on each send */ __le32 *r_request_osdmap_epoch; __le32 *r_request_flags; @@ -193,34 +222,6 @@ struct ceph_osd_client { struct workqueue_struct *notify_wq; }; -struct ceph_osd_req_op { - u16 op; /* CEPH_OSD_OP_* */ - u32 payload_len; - union { - struct { - u64 offset, length; - u64 truncate_size; - u32 truncate_seq; - } extent; - struct { - const char *class_name; - const char *method_name; - const void *indata; - u32 indata_len; - __u8 class_len; - __u8 method_len; - __u8 argc; - } cls; - struct { - u64 cookie; - u64 ver; - u32 prot_ver; - u32 timeout; - __u8 flag; - } watch; - }; -}; - extern int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client); extern void ceph_osdc_stop(struct ceph_osd_client *osdc); @@ -249,8 +250,6 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client * gfp_t gfp_flags); extern void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, - unsigned int num_ops, - struct ceph_osd_req_op *src_ops, struct ceph_snap_context *snapc, u64 snap_id, struct timespec *mtime); @@ -259,8 +258,7 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_file_layout *layout, struct ceph_vino vino, u64 offset, u64 *len, - int num_ops, struct ceph_osd_req_op *ops, - int opcode, int flags, + int num_ops, int opcode, int flags, struct ceph_snap_context *snapc, u32 truncate_seq, u64 truncate_size, bool use_mempool); diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index 00d051f4894e..83661cdc0766 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -123,8 +123,8 @@ static int osdc_show(struct seq_file *s, void *pp) mutex_lock(&osdc->request_mutex); for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { struct ceph_osd_request *req; + unsigned int i; int opcode; - int i; req = rb_entry(p, struct ceph_osd_request, r_node); @@ -142,7 +142,7 @@ static int osdc_show(struct seq_file *s, void *pp) seq_printf(s, "\t"); for (i = 0; i < req->r_num_ops; i++) { - opcode = le16_to_cpu(req->r_request_ops[i].op); + opcode = req->r_ops[i].op; seq_printf(s, "\t%s", ceph_osd_op_name(opcode)); } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e197c5c0b3a2..a498d2de17a4 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -186,6 +186,9 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_msg *msg; size_t msg_size; + BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); + BUG_ON(num_ops > CEPH_OSD_MAX_OP); + msg_size = 4 + 4 + 8 + 8 + 4+8; msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ msg_size += 1 + 8 + 4 + 4; /* pg_t */ @@ -207,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, req->r_osdc = osdc; req->r_mempool = use_mempool; + req->r_num_ops = num_ops; kref_init(&req->r_kref); init_completion(&req->r_completion); @@ -418,12 +422,14 @@ void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, EXPORT_SYMBOL(osd_req_op_watch_init); static u64 osd_req_encode_op(struct ceph_osd_request *req, - struct ceph_osd_op *dst, - struct ceph_osd_req_op *src) + struct ceph_osd_op *dst, unsigned int which) { + struct ceph_osd_req_op *src; u64 out_data_len = 0; struct ceph_pagelist *pagelist; + BUG_ON(which >= req->r_num_ops); + src = &req->r_ops[which]; if (WARN_ON(!osd_req_opcode_valid(src->op))) { pr_err("unrecognized osd opcode %d\n", src->op); @@ -487,21 +493,17 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, * build new request AND message * */ -void ceph_osdc_build_request(struct ceph_osd_request *req, - u64 off, unsigned int num_ops, - struct ceph_osd_req_op *src_ops, - struct ceph_snap_context *snapc, u64 snap_id, - struct timespec *mtime) +void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, + struct ceph_snap_context *snapc, u64 snap_id, + struct timespec *mtime) { struct ceph_msg *msg = req->r_request; - struct ceph_osd_req_op *src_op; void *p; size_t msg_size; int flags = req->r_flags; u64 data_len; - int i; + unsigned int i; - req->r_num_ops = num_ops; req->r_snapid = snap_id; req->r_snapc = ceph_get_snap_context(snapc); @@ -541,12 +543,10 @@ void ceph_osdc_build_request(struct ceph_osd_request *req, p += req->r_oid_len; /* ops--can imply data */ - ceph_encode_16(&p, num_ops); - src_op = src_ops; - req->r_request_ops = p; + ceph_encode_16(&p, (u16)req->r_num_ops); data_len = 0; - for (i = 0; i < num_ops; i++, src_op++) { - data_len += osd_req_encode_op(req, p, src_op); + for (i = 0; i < req->r_num_ops; i++) { + data_len += osd_req_encode_op(req, p, i); p += sizeof(struct ceph_osd_op); } @@ -602,7 +602,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_vino vino, u64 off, u64 *plen, int num_ops, - struct ceph_osd_req_op *ops, int opcode, int flags, struct ceph_snap_context *snapc, u32 truncate_seq, @@ -610,6 +609,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, bool use_mempool) { struct ceph_osd_request *req; + struct ceph_osd_req_op *op; u64 objnum = 0; u64 objoff = 0; u64 objlen = 0; @@ -623,6 +623,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); + req->r_flags = flags; /* calculate max write size */ @@ -642,7 +643,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size = object_size; } - osd_req_op_extent_init(&ops[0], opcode, objoff, objlen, + op = &req->r_ops[0]; + osd_req_op_extent_init(op, opcode, objoff, objlen, truncate_size, truncate_seq); /* * A second op in the ops array means the caller wants to @@ -650,7 +652,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, * osd will flush data quickly. */ if (num_ops > 1) - osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(++op, CEPH_OSD_OP_STARTSYNC); req->r_file_layout = *layout; /* keep a copy */ @@ -1342,7 +1344,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, struct ceph_osd_request *req; u64 tid; int object_len; - int numops, payload_len, flags; + unsigned int numops; + int payload_len, flags; s32 result; s32 retry_attempt; struct ceph_pg pg; @@ -1352,7 +1355,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, u32 osdmap_epoch; int already_completed; u32 bytes; - int i; + unsigned int i; tid = le64_to_cpu(msg->hdr.tid); dout("handle_reply %p tid %llu\n", msg, tid); @@ -2116,12 +2119,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct page **pages, int num_pages, int page_align) { struct ceph_osd_request *req; - struct ceph_osd_req_op op; int rc = 0; dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, vino.snap, off, *plen); - req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, &op, + req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, truncate_seq, truncate_size, false); @@ -2136,7 +2138,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", off, *plen, *plen, page_align); - ceph_osdc_build_request(req, off, 1, &op, NULL, vino.snap, NULL); + ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); rc = ceph_osdc_start_request(osdc, req, false); if (!rc) @@ -2160,12 +2162,11 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct page **pages, int num_pages) { struct ceph_osd_request *req; - struct ceph_osd_req_op op; int rc = 0; int page_align = off & ~PAGE_MASK; BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ - req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, &op, + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, @@ -2178,7 +2179,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, false, false); dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); - ceph_osdc_build_request(req, off, 1, &op, snapc, CEPH_NOSNAP, mtime); + ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); rc = ceph_osdc_start_request(osdc, req, true); if (!rc) -- cgit v1.2.3 From 8c042b0df99cd06ef8473ef6e204b87b3dc80158 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Wed, 3 Apr 2013 01:28:58 -0500 Subject: libceph: add data pointers in osd op structures An extent type osd operation currently implies that there will be corresponding data supplied in the data portion of the request (for write) or response (for read) message. Similarly, an osd class method operation implies a data item will be supplied to receive the response data from the operation. Add a ceph_osd_data pointer to each of those structures, and assign it to point to eithre the incoming or the outgoing data structure in the osd message. The data is not always available when an op is initially set up, so add two new functions to allow setting them after the op has been initialized. Begin to make use of the data item pointer available in the osd operation rather than the request data in or out structure in places where it's convenient. Add some assertions to verify pointers are always set the way they're expected to be. This is a sort of stepping stone toward really moving the data into the osd request ops, to allow for some validation before making that jump. This is the first in a series of patches that resolve: http://tracker.ceph.com/issues/4657 Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 24 ++++++++++++++++++++---- fs/ceph/addr.c | 8 +++++--- fs/ceph/file.c | 5 +++-- include/linux/ceph/osd_client.h | 6 ++++++ net/ceph/osd_client.c | 26 +++++++++++++++++++++++++- 5 files changed, 59 insertions(+), 10 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c12b55559f16..eb64ed0f228f 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1315,23 +1315,39 @@ static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request, bool write_request) { struct rbd_img_request *img_request = obj_request->img_request; + struct ceph_osd_request *osd_req = obj_request->osd_req; + struct ceph_osd_data *osd_data = NULL; struct ceph_snap_context *snapc = NULL; u64 snap_id = CEPH_NOSNAP; struct timespec *mtime = NULL; struct timespec now; - rbd_assert(obj_request->osd_req != NULL); + rbd_assert(osd_req != NULL); if (write_request) { + osd_data = &osd_req->r_data_out; now = CURRENT_TIME; mtime = &now; if (img_request) snapc = img_request->snapc; - } else if (img_request) { - snap_id = img_request->snap_id; + } else { + osd_data = &osd_req->r_data_in; + if (img_request) + snap_id = img_request->snap_id; } + if (obj_request->type != OBJ_REQUEST_NODATA) { + struct ceph_osd_req_op *op = &obj_request->osd_req->r_ops[0]; - ceph_osdc_build_request(obj_request->osd_req, obj_request->offset, + /* + * If it has data, it's either a object class method + * call (cls) or it's an extent operation. + */ + if (op->op == CEPH_OSD_OP_CALL) + osd_req_op_cls_response_data(op, osd_data); + else + osd_req_op_extent_osd_data(op, osd_data); + } + ceph_osdc_build_request(osd_req, obj_request->offset, snapc, snap_id, mtime); } diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index c9da074f0fe6..0ac3a37753cb 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -343,7 +343,8 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - ceph_osd_data_pages_init(&req->r_data_in, pages, len, 0, + BUG_ON(req->r_ops[0].extent.osd_data != &req->r_data_in); + ceph_osd_data_pages_init(req->r_ops[0].extent.osd_data, pages, len, 0, false, false); req->r_callback = finish_read; req->r_inode = inode; @@ -916,8 +917,9 @@ get_more_pages: dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - ceph_osd_data_pages_init(&req->r_data_out, pages, len, 0, - !!pool, false); + BUG_ON(req->r_ops[0].extent.osd_data != &req->r_data_out); + ceph_osd_data_pages_init(req->r_ops[0].extent.osd_data, pages, + len, 0, !!pool, false); pages = NULL; /* request message now owns the pages array */ pool = NULL; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index a12f47642c40..cddc10fd7cf9 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -574,8 +574,9 @@ more: own_pages = true; } } - ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align, - false, own_pages); + BUG_ON(req->r_ops[0].extent.osd_data != &req->r_data_out); + ceph_osd_data_pages_init(req->r_ops[0].extent.osd_data, pages, len, + page_align, false, own_pages); /* BUG_ON(vino.snap != CEPH_NOSNAP); */ ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index a9c4089894c8..ae5193550fbf 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -87,12 +87,14 @@ struct ceph_osd_req_op { u64 offset, length; u64 truncate_size; u32 truncate_seq; + struct ceph_osd_data *osd_data; } extent; struct { const char *class_name; const char *method_name; const void *request_data; u32 request_data_len; + struct ceph_osd_data *response_data; __u8 class_len; __u8 method_len; __u8 argc; @@ -236,10 +238,14 @@ extern void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq); extern void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length); +extern void osd_req_op_extent_osd_data(struct ceph_osd_req_op *op, + struct ceph_osd_data *osd_data); extern void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size); +extern void osd_req_op_cls_response_data(struct ceph_osd_req_op *op, + struct ceph_osd_data *response_data); extern void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, u64 cookie, u64 version, int flag); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 87fcf0b795c0..23491e92b229 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -372,6 +372,13 @@ void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length) } EXPORT_SYMBOL(osd_req_op_extent_update); +void osd_req_op_extent_osd_data(struct ceph_osd_req_op *op, + struct ceph_osd_data *osd_data) +{ + op->extent.osd_data = osd_data; +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data); + void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size) @@ -406,6 +413,13 @@ void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, } EXPORT_SYMBOL(osd_req_op_cls_init); +void osd_req_op_cls_response_data(struct ceph_osd_req_op *op, + struct ceph_osd_data *response_data) +{ + op->cls.response_data = response_data; +} +EXPORT_SYMBOL(osd_req_op_cls_response_data); + void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, u64 cookie, u64 version, int flag) { @@ -449,6 +463,10 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, cpu_to_le64(src->extent.truncate_size); dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); + if (src->op == CEPH_OSD_OP_WRITE) + WARN_ON(src->extent.osd_data != &req->r_data_out); + else + WARN_ON(src->extent.osd_data != &req->r_data_in); break; case CEPH_OSD_OP_CALL: pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); @@ -464,8 +482,9 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, src->cls.method_len); ceph_pagelist_append(pagelist, src->cls.request_data, src->cls.request_data_len); - ceph_osd_data_pagelist_init(&req->r_data_out, pagelist); + + WARN_ON(src->cls.response_data != &req->r_data_in); request_data_len = pagelist->length; break; case CEPH_OSD_OP_STARTSYNC: @@ -609,6 +628,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, bool use_mempool) { struct ceph_osd_request *req; + struct ceph_osd_data *osd_data; struct ceph_osd_req_op *op; u64 objnum = 0; u64 objoff = 0; @@ -623,6 +643,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); + osd_data = opcode == CEPH_OSD_OP_WRITE ? &req->r_data_out + : &req->r_data_in; req->r_flags = flags; @@ -646,6 +668,8 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, op = &req->r_ops[0]; osd_req_op_extent_init(op, opcode, objoff, objlen, truncate_size, truncate_seq); + osd_req_op_extent_osd_data(op, osd_data); + /* * A second op in the ops array means the caller wants to * also issue a include a 'startsync' command so that the -- cgit v1.2.3 From c99d2d4abb6c405ef52e9bc1da87b382b8f41739 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 5 Apr 2013 01:27:11 -0500 Subject: libceph: specify osd op by index in request An osd request now holds all of its source op structures, and every place that initializes one of these is in fact initializing one of the entries in the the osd request's array. So rather than supplying the address of the op to initialize, have caller specify the osd request and an indication of which op it would like to initialize. This better hides the details the op structure (and faciltates moving the data pointers they use). Since osd_req_op_init() is a common routine, and it's not used outside the osd client code, give it static scope. Also make it return the address of the specified op (so all the other init routines don't have to repeat that code). Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 35 ++++++++++------------ fs/ceph/addr.c | 2 +- include/linux/ceph/osd_client.h | 19 +++++++----- net/ceph/osd_client.c | 64 +++++++++++++++++++++++++---------------- 4 files changed, 67 insertions(+), 53 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index eb64ed0f228f..80ac772587c8 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1336,16 +1336,17 @@ static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request, snap_id = img_request->snap_id; } if (obj_request->type != OBJ_REQUEST_NODATA) { - struct ceph_osd_req_op *op = &obj_request->osd_req->r_ops[0]; - /* * If it has data, it's either a object class method * call (cls) or it's an extent operation. */ - if (op->op == CEPH_OSD_OP_CALL) - osd_req_op_cls_response_data(op, osd_data); + /* XXX This use of the ops array goes away in the next patch */ + if (obj_request->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL) + osd_req_op_cls_response_data(obj_request->osd_req, 0, + osd_data); else - osd_req_op_extent_osd_data(op, osd_data); + osd_req_op_extent_osd_data(obj_request->osd_req, 0, + osd_data); } ceph_osdc_build_request(osd_req, obj_request->offset, snapc, snap_id, mtime); @@ -1577,7 +1578,6 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, while (resid) { const char *object_name; unsigned int clone_size; - struct ceph_osd_req_op *op; u64 offset; u64 length; @@ -1606,8 +1606,8 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, if (!obj_request->osd_req) goto out_partial; - op = &obj_request->osd_req->r_ops[0]; - osd_req_op_extent_init(op, opcode, offset, length, 0, 0); + osd_req_op_extent_init(obj_request->osd_req, 0, + opcode, offset, length, 0, 0); rbd_osd_req_format_op(obj_request, write_request); /* status and version are initially zero-filled */ @@ -1710,7 +1710,6 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 ver, u64 notify_id) { struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; struct ceph_osd_client *osdc; int ret; @@ -1724,8 +1723,8 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - op = &obj_request->osd_req->r_ops[0]; - osd_req_op_watch_init(op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0); + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, + notify_id, ver, 0); rbd_osd_req_format_op(obj_request, false); osdc = &rbd_dev->rbd_client->client->osdc; @@ -1766,7 +1765,6 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; int ret; rbd_assert(start ^ !!rbd_dev->watch_event); @@ -1790,8 +1788,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) if (!obj_request->osd_req) goto out_cancel; - op = &obj_request->osd_req->r_ops[0]; - osd_req_op_watch_init(op, CEPH_OSD_OP_WATCH, + osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, rbd_dev->watch_event->cookie, rbd_dev->header.obj_version, start); rbd_osd_req_format_op(obj_request, true); @@ -1854,7 +1851,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, { struct rbd_obj_request *obj_request; struct ceph_osd_client *osdc; - struct ceph_osd_req_op *op; struct page **pages; u32 page_count; int ret; @@ -1884,8 +1880,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - op = &obj_request->osd_req->r_ops[0]; - osd_req_op_cls_init(op, CEPH_OSD_OP_CALL, class_name, method_name, + osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, + class_name, method_name, outbound, outbound_size); rbd_osd_req_format_op(obj_request, false); @@ -2066,7 +2062,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, { struct rbd_obj_request *obj_request; - struct ceph_osd_req_op *op; struct ceph_osd_client *osdc; struct page **pages = NULL; u32 page_count; @@ -2091,8 +2086,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - op = &obj_request->osd_req->r_ops[0]; - osd_req_op_extent_init(op, CEPH_OSD_OP_READ, offset, length, 0, 0); + osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, + offset, length, 0, 0); rbd_osd_req_format_op(obj_request, false); osdc = &rbd_dev->rbd_client->client->osdc; diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 0ac3a37753cb..cc57104a7266 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -926,7 +926,7 @@ get_more_pages: /* Update the write op length in case we changed it */ - osd_req_op_extent_update(&req->r_ops[0], len); + osd_req_op_extent_update(req, 0, len); vino = ceph_vino(inode); ceph_osdc_build_request(req, offset, snapc, vino.snap, diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index ae5193550fbf..144d57cbef9e 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -233,20 +233,25 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); -extern void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode); -extern void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, +extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req, + unsigned int which, u16 opcode, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq); -extern void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length); -extern void osd_req_op_extent_osd_data(struct ceph_osd_req_op *op, +extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req, + unsigned int which, u64 length); +extern void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, + unsigned int which, struct ceph_osd_data *osd_data); -extern void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, +extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, + unsigned int which, u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size); -extern void osd_req_op_cls_response_data(struct ceph_osd_req_op *op, +extern void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, + unsigned int which, struct ceph_osd_data *response_data); -extern void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, +extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, + unsigned int which, u16 opcode, u64 cookie, u64 version, int flag); extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 23491e92b229..ad24f210bf0c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -329,25 +329,32 @@ static bool osd_req_opcode_valid(u16 opcode) * other information associated with them. It also serves as a * common init routine for all the other init functions, below. */ -void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode) +static struct ceph_osd_req_op * +osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, + u16 opcode) { + struct ceph_osd_req_op *op; + + BUG_ON(which >= osd_req->r_num_ops); BUG_ON(!osd_req_opcode_valid(opcode)); + op = &osd_req->r_ops[which]; memset(op, 0, sizeof (*op)); - op->op = opcode; + + return op; } -void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, +void osd_req_op_extent_init(struct ceph_osd_request *osd_req, + unsigned int which, u16 opcode, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq) { + struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode); size_t payload_len = 0; BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); - osd_req_op_init(op, opcode); - op->extent.offset = offset; op->extent.length = length; op->extent.truncate_size = truncate_size; @@ -359,9 +366,15 @@ void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, } EXPORT_SYMBOL(osd_req_op_extent_init); -void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length) +void osd_req_op_extent_update(struct ceph_osd_request *osd_req, + unsigned int which, u64 length) { - u64 previous = op->extent.length; + struct ceph_osd_req_op *op; + u64 previous; + + BUG_ON(which >= osd_req->r_num_ops); + op = &osd_req->r_ops[which]; + previous = op->extent.length; if (length == previous) return; /* Nothing to do */ @@ -372,24 +385,25 @@ void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length) } EXPORT_SYMBOL(osd_req_op_extent_update); -void osd_req_op_extent_osd_data(struct ceph_osd_req_op *op, +void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, + unsigned int which, struct ceph_osd_data *osd_data) { - op->extent.osd_data = osd_data; + BUG_ON(which >= osd_req->r_num_ops); + osd_req->r_ops[which].extent.osd_data = osd_data; } EXPORT_SYMBOL(osd_req_op_extent_osd_data); -void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, - const char *class, const char *method, +void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, + u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size) { + struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode); size_t payload_len = 0; size_t size; BUG_ON(opcode != CEPH_OSD_OP_CALL); - osd_req_op_init(op, opcode); - op->cls.class_name = class; size = strlen(class); BUG_ON(size > (size_t) U8_MAX); @@ -412,26 +426,28 @@ void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, op->payload_len = payload_len; } EXPORT_SYMBOL(osd_req_op_cls_init); - -void osd_req_op_cls_response_data(struct ceph_osd_req_op *op, +void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, + unsigned int which, struct ceph_osd_data *response_data) { - op->cls.response_data = response_data; + BUG_ON(which >= osd_req->r_num_ops); + osd_req->r_ops[which].cls.response_data = response_data; } EXPORT_SYMBOL(osd_req_op_cls_response_data); -void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, +void osd_req_op_watch_init(struct ceph_osd_request *osd_req, + unsigned int which, u16 opcode, u64 cookie, u64 version, int flag) { - BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); + struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode); - osd_req_op_init(op, opcode); + BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); op->watch.cookie = cookie; /* op->watch.ver = version; */ /* XXX 3847 */ op->watch.ver = cpu_to_le64(version); if (opcode == CEPH_OSD_OP_WATCH && flag) - op->watch.flag = (u8) 1; + op->watch.flag = (u8)1; } EXPORT_SYMBOL(osd_req_op_watch_init); @@ -629,7 +645,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, { struct ceph_osd_request *req; struct ceph_osd_data *osd_data; - struct ceph_osd_req_op *op; u64 objnum = 0; u64 objoff = 0; u64 objlen = 0; @@ -665,10 +680,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size = object_size; } - op = &req->r_ops[0]; - osd_req_op_extent_init(op, opcode, objoff, objlen, + osd_req_op_extent_init(req, 0, opcode, objoff, objlen, truncate_size, truncate_seq); - osd_req_op_extent_osd_data(op, osd_data); + osd_req_op_extent_osd_data(req, 0, osd_data); /* * A second op in the ops array means the caller wants to @@ -676,7 +690,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, * osd will flush data quickly. */ if (num_ops > 1) - osd_req_op_init(++op, CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); req->r_file_layout = *layout; /* keep a copy */ -- cgit v1.2.3 From a4ce40a9a7c1053ac2a41cf64255e44e356e5522 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Fri, 5 Apr 2013 01:27:12 -0500 Subject: libceph: combine initializing and setting osd data This ends up being a rather large patch but what it's doing is somewhat straightforward. Basically, this is replacing two calls with one. The first of the two calls is initializing a struct ceph_osd_data with data (either a page array, a page list, or a bio list); the second is setting an osd request op so it associates that data with one of the op's parameters. In place of those two will be a single function that initializes the op directly. That means we sort of fan out a set of the needed functions: - extent ops with pages data - extent ops with pagelist data - extent ops with bio list data and - class ops with page data for receiving a response We also have define another one, but it's only used internally: - class ops with pagelist data for request parameters Note that we *still* haven't gotten rid of the osd request's r_data_in and r_data_out fields. All the osd ops refer to them for their data. For now, these data fields are pointers assigned to the appropriate r_data_* field when these new functions are called. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 20 ++---- fs/ceph/addr.c | 12 ++-- fs/ceph/file.c | 3 +- include/linux/ceph/osd_client.h | 43 ++++++----- net/ceph/osd_client.c | 155 +++++++++++++++++++++++++++++++--------- 5 files changed, 161 insertions(+), 72 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index db29783436c8..6f7a52cf75c7 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1592,7 +1592,6 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, rbd_assert(resid > 0); while (resid) { struct ceph_osd_request *osd_req; - struct ceph_osd_data *osd_data; const char *object_name; unsigned int clone_size; u64 offset; @@ -1625,13 +1624,10 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, obj_request->osd_req = osd_req; obj_request->callback = rbd_img_obj_callback; - osd_data = write_request ? &osd_req->r_data_out - : &osd_req->r_data_in; osd_req_op_extent_init(osd_req, 0, opcode, offset, length, 0, 0); - ceph_osd_data_bio_init(osd_data, obj_request->bio_list, - obj_request->length); - osd_req_op_extent_osd_data(osd_req, 0, osd_data); + osd_req_op_extent_osd_data_bio(osd_req, 0, write_request, + obj_request->bio_list, obj_request->length); rbd_osd_req_format(obj_request, write_request); rbd_img_obj_request_add(img_request, obj_request); @@ -1821,7 +1817,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_data *osd_data; struct page **pages; u32 page_count; int ret; @@ -1851,13 +1846,12 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - osd_data = &obj_request->osd_req->r_data_in; osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, class_name, method_name, outbound, outbound_size); - ceph_osd_data_pages_init(osd_data, obj_request->pages, inbound_size, + osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, + obj_request->pages, inbound_size, 0, false, false); - osd_req_op_cls_response_data(obj_request->osd_req, 0, osd_data); rbd_osd_req_format(obj_request, false); ret = rbd_obj_request_submit(osdc, obj_request); @@ -2037,7 +2031,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct rbd_obj_request *obj_request; - struct ceph_osd_data *osd_data; struct page **pages = NULL; u32 page_count; size_t size; @@ -2061,14 +2054,13 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, if (!obj_request->osd_req) goto out; - osd_data = &obj_request->osd_req->r_data_in; osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, offset, length, 0, 0); - ceph_osd_data_pages_init(osd_data, obj_request->pages, + osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false, + obj_request->pages, obj_request->length, obj_request->offset & ~PAGE_MASK, false, false); - osd_req_op_extent_osd_data(obj_request->osd_req, 0, osd_data); rbd_osd_req_format(obj_request, false); ret = rbd_obj_request_submit(osdc, obj_request); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index cc57104a7266..27d62070a8e9 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -245,7 +245,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - osd_data = &req->r_data_in; + osd_data = osd_req_op_extent_osd_data(req, 0, false); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); @@ -343,8 +343,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - BUG_ON(req->r_ops[0].extent.osd_data != &req->r_data_in); - ceph_osd_data_pages_init(req->r_ops[0].extent.osd_data, pages, len, 0, + osd_req_op_extent_osd_data_pages(req, 0, false, pages, len, 0, false, false); req->r_callback = finish_read; req->r_inode = inode; @@ -572,7 +571,7 @@ static void writepages_finish(struct ceph_osd_request *req, long writeback_stat; unsigned issued = ceph_caps_issued(ci); - osd_data = &req->r_data_out; + osd_data = osd_req_op_extent_osd_data(req, 0, true); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); @@ -917,9 +916,8 @@ get_more_pages: dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - BUG_ON(req->r_ops[0].extent.osd_data != &req->r_data_out); - ceph_osd_data_pages_init(req->r_ops[0].extent.osd_data, pages, - len, 0, !!pool, false); + osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, 0, + !!pool, false); pages = NULL; /* request message now owns the pages array */ pool = NULL; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index cddc10fd7cf9..0f9c4095614b 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -574,8 +574,7 @@ more: own_pages = true; } } - BUG_ON(req->r_ops[0].extent.osd_data != &req->r_data_out); - ceph_osd_data_pages_init(req->r_ops[0].extent.osd_data, pages, len, + osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, page_align, false, own_pages); /* BUG_ON(vino.snap != CEPH_NOSNAP); */ diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 71c41575646d..f8a00b48e550 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -240,17 +240,39 @@ extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req, u64 truncate_size, u32 truncate_seq); extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req, unsigned int which, u64 length); -extern void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, + +extern struct ceph_osd_data *osd_req_op_extent_osd_data( + struct ceph_osd_request *osd_req, + unsigned int which, bool write_request); +extern struct ceph_osd_data *osd_req_op_cls_response_data( + struct ceph_osd_request *osd_req, + unsigned int which); + +extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *, + unsigned int which, bool write_request, + struct page **pages, u64 length, + u32 alignment, bool pages_from_pool, + bool own_pages); +extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *, + unsigned int which, bool write_request, + struct ceph_pagelist *pagelist); +#ifdef CONFIG_BLOCK +extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *, + unsigned int which, bool write_request, + struct bio *bio, size_t bio_length); +#endif /* CONFIG_BLOCK */ + +extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, unsigned int which, - struct ceph_osd_data *osd_data); + struct page **pages, u64 length, + u32 alignment, bool pages_from_pool, + bool own_pages); + extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size); -extern void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, - unsigned int which, - struct ceph_osd_data *response_data); extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u64 cookie, u64 version, int flag); @@ -290,17 +312,6 @@ static inline void ceph_osdc_put_request(struct ceph_osd_request *req) kref_put(&req->r_kref, ceph_osdc_release_request); } -extern void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, - struct page **pages, u64 length, - u32 alignment, bool pages_from_pool, - bool own_pages); -extern void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, - struct ceph_pagelist *pagelist); -#ifdef CONFIG_BLOCK -extern void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, - struct bio *bio, size_t bio_length); -#endif /* CONFIG_BLOCK */ - extern int ceph_osdc_start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req, bool nofail); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 932b8af8b8ee..86cb52404f17 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1,3 +1,4 @@ + #include #include @@ -85,7 +86,7 @@ static void ceph_osd_data_init(struct ceph_osd_data *osd_data) osd_data->type = CEPH_OSD_DATA_TYPE_NONE; } -void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, +static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages) { @@ -96,27 +97,131 @@ void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, osd_data->pages_from_pool = pages_from_pool; osd_data->own_pages = own_pages; } -EXPORT_SYMBOL(ceph_osd_data_pages_init); -void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, +static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, struct ceph_pagelist *pagelist) { osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; osd_data->pagelist = pagelist; } -EXPORT_SYMBOL(ceph_osd_data_pagelist_init); #ifdef CONFIG_BLOCK -void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, +static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, struct bio *bio, size_t bio_length) { osd_data->type = CEPH_OSD_DATA_TYPE_BIO; osd_data->bio = bio; osd_data->bio_length = bio_length; } -EXPORT_SYMBOL(ceph_osd_data_bio_init); #endif /* CONFIG_BLOCK */ +struct ceph_osd_data * +osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, + unsigned int which, bool write_request) +{ + BUG_ON(which >= osd_req->r_num_ops); + + /* return &osd_req->r_ops[which].extent.osd_data; */ + return write_request ? &osd_req->r_data_out : &osd_req->r_data_in; +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data); + +struct ceph_osd_data * +osd_req_op_cls_request_info(struct ceph_osd_request *osd_req, + unsigned int which) +{ + BUG_ON(which >= osd_req->r_num_ops); + + /* return &osd_req->r_ops[which].cls.request_info; */ + return &osd_req->r_data_out; /* Request data is outgoing */ +} +EXPORT_SYMBOL(osd_req_op_cls_request_info); /* ??? */ + +struct ceph_osd_data * +osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, + unsigned int which) +{ + BUG_ON(which >= osd_req->r_num_ops); + + /* return &osd_req->r_ops[which].cls.response_data; */ + return &osd_req->r_data_in; /* Response data is incoming */ +} +EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */ + +void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, + unsigned int which, bool write_request, + struct page **pages, u64 length, u32 alignment, + bool pages_from_pool, bool own_pages) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request); + ceph_osd_data_pages_init(osd_data, pages, length, alignment, + pages_from_pool, own_pages); + + osd_req->r_ops[which].extent.osd_data = + osd_req_op_extent_osd_data(osd_req, which, write_request); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); + +void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, + unsigned int which, bool write_request, + struct ceph_pagelist *pagelist) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request); + ceph_osd_data_pagelist_init(osd_data, pagelist); + + osd_req->r_ops[which].extent.osd_data = + osd_req_op_extent_osd_data(osd_req, which, write_request); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); + +#ifdef CONFIG_BLOCK +void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, + unsigned int which, bool write_request, + struct bio *bio, size_t bio_length) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request); + ceph_osd_data_bio_init(osd_data, bio, bio_length); + + osd_req->r_ops[which].extent.osd_data = + osd_req_op_extent_osd_data(osd_req, which, write_request); +} +EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); +#endif /* CONFIG_BLOCK */ + +static void osd_req_op_cls_request_info_pagelist( + struct ceph_osd_request *osd_req, + unsigned int which, struct ceph_pagelist *pagelist) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_cls_request_info(osd_req, which); + ceph_osd_data_pagelist_init(osd_data, pagelist); + + osd_req->r_ops[which].cls.request_info = + osd_req_op_cls_request_info(osd_req, which); +} + +void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, + unsigned int which, struct page **pages, u64 length, + u32 alignment, bool pages_from_pool, bool own_pages) +{ + struct ceph_osd_data *osd_data; + + osd_data = osd_req_op_cls_response_data(osd_req, which); + ceph_osd_data_pages_init(osd_data, pages, length, alignment, + pages_from_pool, own_pages); + + osd_req->r_ops[which].cls.response_data = + osd_req_op_cls_response_data(osd_req, which); +} +EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); + static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) { switch (osd_data->type) { @@ -385,15 +490,6 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_update); -void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, - unsigned int which, - struct ceph_osd_data *osd_data) -{ - BUG_ON(which >= osd_req->r_num_ops); - osd_req->r_ops[which].extent.osd_data = osd_data; -} -EXPORT_SYMBOL(osd_req_op_extent_osd_data); - void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method, const void *request_data, size_t request_data_size) @@ -429,22 +525,13 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, ceph_pagelist_append(pagelist, request_data, request_data_size); payload_len += request_data_size; - op->cls.request_info = &osd_req->r_data_out; - ceph_osd_data_pagelist_init(op->cls.request_info, pagelist); + osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); op->cls.argc = 0; /* currently unused */ op->payload_len = payload_len; } EXPORT_SYMBOL(osd_req_op_cls_init); -void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, - unsigned int which, - struct ceph_osd_data *response_data) -{ - BUG_ON(which >= osd_req->r_num_ops); - osd_req->r_ops[which].cls.response_data = response_data; -} -EXPORT_SYMBOL(osd_req_op_cls_response_data); void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, @@ -547,7 +634,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, bool use_mempool) { struct ceph_osd_request *req; - struct ceph_osd_data *osd_data; u64 objnum = 0; u64 objoff = 0; u64 objlen = 0; @@ -561,8 +647,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, GFP_NOFS); if (!req) return ERR_PTR(-ENOMEM); - osd_data = opcode == CEPH_OSD_OP_WRITE ? &req->r_data_out - : &req->r_data_in; req->r_flags = flags; @@ -585,7 +669,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, osd_req_op_extent_init(req, 0, opcode, objoff, objlen, truncate_size, truncate_seq); - osd_req_op_extent_osd_data(req, 0, osd_data); /* * A second op in the ops array means the caller wants to @@ -2171,8 +2254,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, /* it may be a short read due to an object boundary */ - ceph_osd_data_pages_init(&req->r_data_in, pages, *plen, page_align, - false, false); + osd_req_op_extent_osd_data_pages(req, 0, false, + pages, *plen, page_align, false, false); dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", off, *plen, *plen, page_align); @@ -2214,7 +2297,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return PTR_ERR(req); /* it may be a short write due to an object boundary */ - ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align, + osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, page_align, false, false); dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); @@ -2308,8 +2391,14 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, m = ceph_msg_get(req->r_reply); if (data_len > 0) { - struct ceph_osd_data *osd_data = &req->r_data_in; + struct ceph_osd_data *osd_data; + /* + * XXX This is assuming there is only one op containing + * XXX page data. Probably OK for reads, but this + * XXX ought to be done more generally. + */ + osd_data = osd_req_op_extent_osd_data(req, 0, false); if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { if (osd_data->pages && unlikely(osd_data->length < data_len)) { -- cgit v1.2.3 From 1ac0fc8adfc725660ee53a953b06855f64f8e792 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 12 Apr 2013 21:45:42 +0800 Subject: ceph: fix race between writepages and truncate ceph_writepages_start() reads inode->i_size in two places. It can get different values between successive read, because truncate can change inode->i_size at any time. The race can lead to mismatch between data length of osd request and pages marked as writeback. When osd request finishes, it clear writeback page according to its data length. So some pages can be left in writeback state forever. The fix is only read inode->i_size once, save its value to a local variable and use the local variable when i_size is needed. Signed-off-by: Yan, Zheng Reviewed-by: Alex Elder --- fs/ceph/addr.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 27d62070a8e9..2d6466b5fe82 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -671,7 +671,7 @@ static int ceph_writepages_start(struct address_space *mapping, unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; int do_sync; - u64 snap_size = 0; + u64 snap_size; /* * Include a 'sync' in the OSD request if this is a data @@ -717,6 +717,7 @@ static int ceph_writepages_start(struct address_space *mapping, retry: /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); + snap_size = 0; snapc = get_oldest_context(inode, &snap_size); if (!snapc) { /* hmm, why does writepages get called when there @@ -724,6 +725,8 @@ retry: dout(" no snap context with dirty data?\n"); goto out; } + if (snap_size == 0) + snap_size = i_size_read(inode); dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); if (last_snapc && snapc != last_snapc) { @@ -795,11 +798,8 @@ get_more_pages: dout("waiting on writeback %p\n", page); wait_on_page_writeback(page); } - if ((snap_size && page_offset(page) > snap_size) || - (!snap_size && - page_offset(page) > i_size_read(inode))) { - dout("%p page eof %llu\n", page, snap_size ? - snap_size : i_size_read(inode)); + if (page_offset(page) >= snap_size) { + dout("%p page eof %llu\n", page, snap_size); done = 1; unlock_page(page); break; @@ -911,7 +911,7 @@ get_more_pages: /* Format the osd request message and submit the write */ offset = page_offset(pages[0]); - len = min((snap_size ? snap_size : i_size_read(inode)) - offset, + len = min(snap_size - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); -- cgit v1.2.3 From 406e2c9f9286fc93ae2191a7abf477dea05aadc9 Mon Sep 17 00:00:00 2001 From: Alex Elder Date: Mon, 15 Apr 2013 14:50:36 -0500 Subject: libceph: kill off osd data write_request parameters In the incremental move toward supporting distinct data items in an osd request some of the functions had "write_request" parameters to indicate, basically, whether the data belonged to in_data or the out_data. Now that we maintain the data fields in the op structure there is no need to indicate the direction, so get rid of the "write_request" parameters. Signed-off-by: Alex Elder Reviewed-by: Josh Durgin --- drivers/block/rbd.c | 4 ++-- fs/ceph/addr.c | 9 ++++----- fs/ceph/file.c | 4 ++-- include/linux/ceph/osd_client.h | 8 ++++---- net/ceph/osd_client.c | 25 +++++++++++-------------- 5 files changed, 23 insertions(+), 27 deletions(-) (limited to 'fs/ceph/addr.c') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 13a381b2a779..8e8b876e83c3 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1779,7 +1779,7 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, osd_req_op_extent_init(osd_req, 0, opcode, offset, length, 0, 0); - osd_req_op_extent_osd_data_bio(osd_req, 0, write_request, + osd_req_op_extent_osd_data_bio(osd_req, 0, obj_request->bio_list, obj_request->length); rbd_osd_req_format(obj_request, write_request); @@ -2281,7 +2281,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, offset, length, 0, 0); - osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, false, + osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, obj_request->pages, obj_request->length, obj_request->offset & ~PAGE_MASK, diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 2d6466b5fe82..3e68ac101040 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -245,7 +245,7 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); /* unlock all pages, zeroing any data we didn't read */ - osd_data = osd_req_op_extent_osd_data(req, 0, false); + osd_data = osd_req_op_extent_osd_data(req, 0); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); @@ -343,8 +343,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) } pages[i] = page; } - osd_req_op_extent_osd_data_pages(req, 0, false, pages, len, 0, - false, false); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false); req->r_callback = finish_read; req->r_inode = inode; @@ -571,7 +570,7 @@ static void writepages_finish(struct ceph_osd_request *req, long writeback_stat; unsigned issued = ceph_caps_issued(ci); - osd_data = osd_req_op_extent_osd_data(req, 0, true); + osd_data = osd_req_op_extent_osd_data(req, 0); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); num_pages = calc_pages_for((u64)osd_data->alignment, (u64)osd_data->length); @@ -916,7 +915,7 @@ get_more_pages: dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); - osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, 0, + osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, !!pool, false); pages = NULL; /* request message now owns the pages array */ diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 7e94dcb66d92..d70830c66833 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -585,8 +585,8 @@ more: own_pages = true; } } - osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, - page_align, false, own_pages); + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, + false, own_pages); /* BUG_ON(vino.snap != CEPH_NOSNAP); */ ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 0d3358ef5285..0e406934a551 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -241,22 +241,22 @@ extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req, extern struct ceph_osd_data *osd_req_op_extent_osd_data( struct ceph_osd_request *osd_req, - unsigned int which, bool write_request); + unsigned int which); extern struct ceph_osd_data *osd_req_op_cls_response_data( struct ceph_osd_request *osd_req, unsigned int which); extern void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *, - unsigned int which, bool write_request, + unsigned int which, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); extern void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *, - unsigned int which, bool write_request, + unsigned int which, struct ceph_pagelist *pagelist); #ifdef CONFIG_BLOCK extern void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *, - unsigned int which, bool write_request, + unsigned int which, struct bio *bio, size_t bio_length); #endif /* CONFIG_BLOCK */ diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 0c5bf2fb5075..409c443c8d1f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -117,7 +117,7 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, struct ceph_osd_data * osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, - unsigned int which, bool write_request) + unsigned int which) { BUG_ON(which >= osd_req->r_num_ops); @@ -156,37 +156,34 @@ osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */ void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, - unsigned int which, bool write_request, - struct page **pages, u64 length, u32 alignment, + unsigned int which, struct page **pages, + u64 length, u32 alignment, bool pages_from_pool, bool own_pages) { struct ceph_osd_data *osd_data; - osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request); + osd_data = osd_req_op_extent_osd_data(osd_req, which); ceph_osd_data_pages_init(osd_data, pages, length, alignment, pages_from_pool, own_pages); } EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, - unsigned int which, bool write_request, - struct ceph_pagelist *pagelist) + unsigned int which, struct ceph_pagelist *pagelist) { struct ceph_osd_data *osd_data; - osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request); + osd_data = osd_req_op_extent_osd_data(osd_req, which); ceph_osd_data_pagelist_init(osd_data, pagelist); } EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); #ifdef CONFIG_BLOCK void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, - unsigned int which, bool write_request, - struct bio *bio, size_t bio_length) + unsigned int which, struct bio *bio, size_t bio_length) { struct ceph_osd_data *osd_data; - - osd_data = osd_req_op_extent_osd_data(osd_req, which, write_request); + osd_data = osd_req_op_extent_osd_data(osd_req, which); ceph_osd_data_bio_init(osd_data, bio, bio_length); } EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); @@ -2284,7 +2281,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, /* it may be a short read due to an object boundary */ - osd_req_op_extent_osd_data_pages(req, 0, false, + osd_req_op_extent_osd_data_pages(req, 0, pages, *plen, page_align, false, false); dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", @@ -2327,7 +2324,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, return PTR_ERR(req); /* it may be a short write due to an object boundary */ - osd_req_op_extent_osd_data_pages(req, 0, true, pages, len, page_align, + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, false, false); dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); @@ -2428,7 +2425,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, * XXX page data. Probably OK for reads, but this * XXX ought to be done more generally. */ - osd_data = osd_req_op_extent_osd_data(req, 0, false); + osd_data = osd_req_op_extent_osd_data(req, 0); if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { if (osd_data->pages && unlikely(osd_data->length < data_len)) { -- cgit v1.2.3