diff options
-rw-r--r-- | drivers/block/drbd/drbd_int.h | 45 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_main.c | 316 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_nl.c | 8 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_receiver.c | 1 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_req.c | 157 | ||||
-rw-r--r-- | drivers/block/drbd/drbd_worker.c | 194 |
6 files changed, 291 insertions, 430 deletions
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h index c0d0de54ae57..309c121557ae 100644 --- a/drivers/block/drbd/drbd_int.h +++ b/drivers/block/drbd/drbd_int.h @@ -562,12 +562,16 @@ struct drbd_request { struct bio *private_bio; struct drbd_interval i; - unsigned int epoch; /* barrier_nr */ - /* barrier_nr: used to check on "completion" whether this req was in + /* epoch: used to check on "completion" whether this req was in * the current epoch, and we therefore have to close it, - * starting a new epoch... + * causing a p_barrier packet to be send, starting a new epoch. + * + * This corresponds to "barrier" in struct p_barrier[_ack], + * and to "barrier_nr" in struct drbd_epoch (and various + * comments/function parameters/local variable names). */ + unsigned int epoch; struct list_head tl_requests; /* ring list in the transfer log */ struct bio *master_bio; /* master bio pointer */ @@ -575,14 +579,6 @@ struct drbd_request { unsigned long start_time; }; -struct drbd_tl_epoch { - struct drbd_work w; - struct list_head requests; /* requests before */ - struct drbd_tl_epoch *next; /* pointer to the next barrier */ - unsigned int br_number; /* the barriers identifier. */ - int n_writes; /* number of requests attached before this barrier */ -}; - struct drbd_epoch { struct drbd_tconn *tconn; struct list_head list; @@ -845,11 +841,8 @@ struct drbd_tconn { /* is a resource from the config file */ unsigned int ko_count; spinlock_t req_lock; - struct drbd_tl_epoch *unused_spare_tle; /* for pre-allocation */ - struct drbd_tl_epoch *newest_tle; - struct drbd_tl_epoch *oldest_tle; - struct list_head out_of_sequence_requests; - struct list_head barrier_acked_requests; + + struct list_head transfer_log; /* all requests not yet fully processed */ struct crypto_hash *cram_hmac_tfm; struct crypto_hash *integrity_tfm; /* checksums we compute, updates protected by tconn->data->mutex */ @@ -859,18 +852,36 @@ struct drbd_tconn { /* is a resource from the config file */ void *int_dig_in; void *int_dig_vv; + /* receiver side */ struct drbd_epoch *current_epoch; spinlock_t epoch_lock; unsigned int epochs; enum write_ordering_e write_ordering; atomic_t current_tle_nr; /* transfer log epoch number */ + unsigned current_tle_writes; /* writes seen within this tl epoch */ unsigned long last_reconnect_jif; struct drbd_thread receiver; struct drbd_thread worker; struct drbd_thread asender; cpumask_var_t cpu_mask; + + /* sender side */ struct drbd_work_queue sender_work; + + struct { + /* whether this sender thread + * has processed a single write yet. */ + bool seen_any_write_yet; + + /* Which barrier number to send with the next P_BARRIER */ + int current_epoch_nr; + + /* how many write requests have been sent + * with req->epoch == current_epoch_nr. + * If none, no P_BARRIER will be sent. */ + unsigned current_epoch_writes; + } send; }; struct drbd_conf { @@ -1054,7 +1065,6 @@ extern void drbd_calc_cpu_mask(struct drbd_tconn *tconn); extern void tl_release(struct drbd_tconn *, unsigned int barrier_nr, unsigned int set_size); extern void tl_clear(struct drbd_tconn *); -extern void _tl_add_barrier(struct drbd_tconn *, struct drbd_tl_epoch *); extern void drbd_free_sock(struct drbd_tconn *tconn); extern int drbd_send(struct drbd_tconn *tconn, struct socket *sock, void *buf, size_t size, unsigned msg_flags); @@ -1460,7 +1470,6 @@ extern int w_resync_timer(struct drbd_work *, int); extern int w_send_write_hint(struct drbd_work *, int); extern int w_make_resync_request(struct drbd_work *, int); extern int w_send_dblock(struct drbd_work *, int); -extern int w_send_barrier(struct drbd_work *, int); extern int w_send_read_req(struct drbd_work *, int); extern int w_prev_work_done(struct drbd_work *, int); extern int w_e_reissue(struct drbd_work *, int); diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index 7e37149684e4..8c6c48e363cd 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -188,147 +188,75 @@ int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins) #endif /** - * DOC: The transfer log - * - * The transfer log is a single linked list of &struct drbd_tl_epoch objects. - * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail - * of the list. There is always at least one &struct drbd_tl_epoch object. - * - * Each &struct drbd_tl_epoch has a circular double linked list of requests - * attached. - */ -static int tl_init(struct drbd_tconn *tconn) -{ - struct drbd_tl_epoch *b; - - /* during device minor initialization, we may well use GFP_KERNEL */ - b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL); - if (!b) - return 0; - INIT_LIST_HEAD(&b->requests); - INIT_LIST_HEAD(&b->w.list); - b->next = NULL; - b->br_number = atomic_inc_return(&tconn->current_tle_nr); - b->n_writes = 0; - b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */ - - tconn->oldest_tle = b; - tconn->newest_tle = b; - INIT_LIST_HEAD(&tconn->out_of_sequence_requests); - INIT_LIST_HEAD(&tconn->barrier_acked_requests); - - return 1; -} - -static void tl_cleanup(struct drbd_tconn *tconn) -{ - if (tconn->oldest_tle != tconn->newest_tle) - conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n"); - if (!list_empty(&tconn->out_of_sequence_requests)) - conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n"); - kfree(tconn->oldest_tle); - tconn->oldest_tle = NULL; - kfree(tconn->unused_spare_tle); - tconn->unused_spare_tle = NULL; -} - -/** - * _tl_add_barrier() - Adds a barrier to the transfer log - * @mdev: DRBD device. - * @new: Barrier to be added before the current head of the TL. - * - * The caller must hold the req_lock. - */ -void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new) -{ - INIT_LIST_HEAD(&new->requests); - INIT_LIST_HEAD(&new->w.list); - new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */ - new->next = NULL; - new->n_writes = 0; - - new->br_number = atomic_inc_return(&tconn->current_tle_nr); - if (tconn->newest_tle != new) { - tconn->newest_tle->next = new; - tconn->newest_tle = new; - } -} - -/** - * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL - * @mdev: DRBD device. + * tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch + * @tconn: DRBD connection. * @barrier_nr: Expected identifier of the DRBD write barrier packet. * @set_size: Expected number of requests before that barrier. * * In case the passed barrier_nr or set_size does not match the oldest - * &struct drbd_tl_epoch objects this function will cause a termination - * of the connection. + * epoch of not yet barrier-acked requests, this function will cause a + * termination of the connection. */ void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr, unsigned int set_size) { - struct drbd_conf *mdev; - struct drbd_tl_epoch *b, *nob; /* next old barrier */ - struct list_head *le, *tle; struct drbd_request *r; + struct drbd_request *req = NULL; + int expect_epoch = 0; + int expect_size = 0; spin_lock_irq(&tconn->req_lock); - b = tconn->oldest_tle; + /* find latest not yet barrier-acked write request, + * count writes in its epoch. */ + list_for_each_entry(r, &tconn->transfer_log, tl_requests) { + const unsigned long s = r->rq_state; + if (!req) { + if (!(s & RQ_WRITE)) + continue; + if (!(s & RQ_NET_MASK)) + continue; + if (s & RQ_NET_DONE) + continue; + req = r; + expect_epoch = req->epoch; + expect_size ++; + } else { + if (r->epoch != expect_epoch) + break; + if (!(s & RQ_WRITE)) + continue; + /* if (s & RQ_DONE): not expected */ + /* if (!(s & RQ_NET_MASK)): not expected */ + expect_size++; + } + } /* first some paranoia code */ - if (b == NULL) { + if (req == NULL) { conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n", barrier_nr); goto bail; } - if (b->br_number != barrier_nr) { + if (expect_epoch != barrier_nr) { conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n", - barrier_nr, b->br_number); + barrier_nr, expect_epoch); goto bail; } - if (b->n_writes != set_size) { + + if (expect_size != set_size) { conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n", - barrier_nr, set_size, b->n_writes); + barrier_nr, set_size, expect_size); goto bail; } /* Clean up list of requests processed during current epoch */ - list_for_each_safe(le, tle, &b->requests) { - r = list_entry(le, struct drbd_request, tl_requests); - _req_mod(r, BARRIER_ACKED); - } - /* There could be requests on the list waiting for completion - of the write to the local disk. To avoid corruptions of - slab's data structures we have to remove the lists head. - - Also there could have been a barrier ack out of sequence, overtaking - the write acks - which would be a bug and violating write ordering. - To not deadlock in case we lose connection while such requests are - still pending, we need some way to find them for the - _req_mode(CONNECTION_LOST_WHILE_PENDING). - - These have been list_move'd to the out_of_sequence_requests list in - _req_mod(, BARRIER_ACKED) above. - */ - list_splice_init(&b->requests, &tconn->barrier_acked_requests); - mdev = b->w.mdev; - - nob = b->next; - if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) { - _tl_add_barrier(tconn, b); - if (nob) - tconn->oldest_tle = nob; - /* if nob == NULL b was the only barrier, and becomes the new - barrier. Therefore tconn->oldest_tle points already to b */ - } else { - D_ASSERT(nob != NULL); - tconn->oldest_tle = nob; - kfree(b); + list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) { + if (req->epoch != expect_epoch) + break; + _req_mod(req, BARRIER_ACKED); } - spin_unlock_irq(&tconn->req_lock); - dec_ap_pending(mdev); return; @@ -346,91 +274,20 @@ bail: * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO, * RESTART_FROZEN_DISK_IO. */ +/* must hold resource->req_lock */ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) { - struct drbd_tl_epoch *b, *tmp, **pn; - struct list_head *le, *tle, carry_reads; - struct drbd_request *req; - int rv, n_writes, n_reads; - - b = tconn->oldest_tle; - pn = &tconn->oldest_tle; - while (b) { - n_writes = 0; - n_reads = 0; - INIT_LIST_HEAD(&carry_reads); - list_for_each_safe(le, tle, &b->requests) { - req = list_entry(le, struct drbd_request, tl_requests); - rv = _req_mod(req, what); - - if (rv & MR_WRITE) - n_writes++; - if (rv & MR_READ) - n_reads++; - } - tmp = b->next; - - if (n_writes) { - if (what == RESEND) { - b->n_writes = n_writes; - if (b->w.cb == NULL) { - b->w.cb = w_send_barrier; - inc_ap_pending(b->w.mdev); - set_bit(CREATE_BARRIER, &tconn->flags); - } - - drbd_queue_work(&tconn->sender_work, &b->w); - } - pn = &b->next; - } else { - if (n_reads) - list_add(&carry_reads, &b->requests); - /* there could still be requests on that ring list, - * in case local io is still pending */ - list_del(&b->requests); - - /* dec_ap_pending corresponding to queue_barrier. - * the newest barrier may not have been queued yet, - * in which case w.cb is still NULL. */ - if (b->w.cb != NULL) - dec_ap_pending(b->w.mdev); - - if (b == tconn->newest_tle) { - /* recycle, but reinit! */ - if (tmp != NULL) - conn_err(tconn, "ASSERT FAILED tmp == NULL"); - INIT_LIST_HEAD(&b->requests); - list_splice(&carry_reads, &b->requests); - INIT_LIST_HEAD(&b->w.list); - b->w.cb = NULL; - b->br_number = atomic_inc_return(&tconn->current_tle_nr); - b->n_writes = 0; - - *pn = b; - break; - } - *pn = tmp; - kfree(b); - } - b = tmp; - list_splice(&carry_reads, &b->requests); - } - - /* Actions operating on the disk state, also want to work on - requests that got barrier acked. */ - switch (what) { - case FAIL_FROZEN_DISK_IO: - case RESTART_FROZEN_DISK_IO: - list_for_each_safe(le, tle, &tconn->barrier_acked_requests) { - req = list_entry(le, struct drbd_request, tl_requests); - _req_mod(req, what); - } - case CONNECTION_LOST_WHILE_PENDING: - case RESEND: - break; - default: - conn_err(tconn, "what = %d in _tl_restart()\n", what); - } + struct drbd_request *req, *r; + + list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) + _req_mod(req, what); +} + +void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) +{ + spin_lock_irq(&tconn->req_lock); + _tl_restart(tconn, what); + spin_unlock_irq(&tconn->req_lock); } /** @@ -443,36 +300,7 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) */ void tl_clear(struct drbd_tconn *tconn) { - struct list_head *le, *tle; - struct drbd_request *r; - - spin_lock_irq(&tconn->req_lock); - - _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING); - - /* we expect this list to be empty. */ - if (!list_empty(&tconn->out_of_sequence_requests)) - conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n"); - - /* but just in case, clean it up anyways! */ - list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) { - r = list_entry(le, struct drbd_request, tl_requests); - /* It would be nice to complete outside of spinlock. - * But this is easier for now. */ - _req_mod(r, CONNECTION_LOST_WHILE_PENDING); - } - - /* ensure bit indicating barrier is required is clear */ - clear_bit(CREATE_BARRIER, &tconn->flags); - - spin_unlock_irq(&tconn->req_lock); -} - -void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) -{ - spin_lock_irq(&tconn->req_lock); - _tl_restart(tconn, what); - spin_unlock_irq(&tconn->req_lock); + tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING); } /** @@ -482,31 +310,16 @@ void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what) void tl_abort_disk_io(struct drbd_conf *mdev) { struct drbd_tconn *tconn = mdev->tconn; - struct drbd_tl_epoch *b; - struct list_head *le, *tle; - struct drbd_request *req; + struct drbd_request *req, *r; spin_lock_irq(&tconn->req_lock); - b = tconn->oldest_tle; - while (b) { - list_for_each_safe(le, tle, &b->requests) { - req = list_entry(le, struct drbd_request, tl_requests); - if (!(req->rq_state & RQ_LOCAL_PENDING)) - continue; - if (req->w.mdev == mdev) - _req_mod(req, ABORT_DISK_IO); - } - b = b->next; - } - - list_for_each_safe(le, tle, &tconn->barrier_acked_requests) { - req = list_entry(le, struct drbd_request, tl_requests); + list_for_each_entry_safe(req, r, &tconn->transfer_log, tl_requests) { if (!(req->rq_state & RQ_LOCAL_PENDING)) continue; - if (req->w.mdev == mdev) - _req_mod(req, ABORT_DISK_IO); + if (req->w.mdev != mdev) + continue; + _req_mod(req, ABORT_DISK_IO); } - spin_unlock_irq(&tconn->req_lock); } @@ -2680,17 +2493,21 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts) if (set_resource_options(tconn, res_opts)) goto fail; - if (!tl_init(tconn)) - goto fail; - tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL); if (!tconn->current_epoch) goto fail; + + INIT_LIST_HEAD(&tconn->transfer_log); + INIT_LIST_HEAD(&tconn->current_epoch->list); tconn->epochs = 1; spin_lock_init(&tconn->epoch_lock); tconn->write_ordering = WO_bdev_flush; + tconn->send.seen_any_write_yet = false; + tconn->send.current_epoch_nr = 0; + tconn->send.current_epoch_writes = 0; + tconn->cstate = C_STANDALONE; mutex_init(&tconn->cstate_mutex); spin_lock_init(&tconn->req_lock); @@ -2713,7 +2530,6 @@ struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts) fail: kfree(tconn->current_epoch); - tl_cleanup(tconn); free_cpumask_var(tconn->cpu_mask); drbd_free_socket(&tconn->meta); drbd_free_socket(&tconn->data); diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c index c5d4fac1a111..bbc5c2f4a9b4 100644 --- a/drivers/block/drbd/drbd_nl.c +++ b/drivers/block/drbd/drbd_nl.c @@ -622,6 +622,8 @@ drbd_set_role(struct drbd_conf *mdev, enum drbd_role new_role, int force) /* Wait until nothing is on the fly :) */ wait_event(mdev->misc_wait, atomic_read(&mdev->ap_pending_cnt) == 0); + /* FIXME also wait for all pending P_BARRIER_ACK? */ + if (new_role == R_SECONDARY) { set_disk_ro(mdev->vdisk, true); if (get_ldev(mdev)) { @@ -1436,6 +1438,12 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info) drbd_suspend_io(mdev); /* also wait for the last barrier ack. */ + /* FIXME see also https://daiquiri.linbit/cgi-bin/bugzilla/show_bug.cgi?id=171 + * We need a way to either ignore barrier acks for barriers sent before a device + * was attached, or a way to wait for all pending barrier acks to come in. + * As barriers are counted per resource, + * we'd need to suspend io on all devices of a resource. + */ wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_pending_cnt) || drbd_suspended(mdev)); /* and for any other previously queued work */ drbd_flush_workqueue(mdev); diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c index 34fc33b5eb45..7fe6b01618d4 100644 --- a/drivers/block/drbd/drbd_receiver.c +++ b/drivers/block/drbd/drbd_receiver.c @@ -4451,6 +4451,7 @@ static void conn_disconnect(struct drbd_tconn *tconn) conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n"); /* ok, no more ee's on the fly, it is safe to reset the epoch_size */ atomic_set(&tconn->current_epoch->epoch_size, 0); + tconn->send.seen_any_write_yet = false; conn_info(tconn, "Connection closed\n"); diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c index e609557a9425..ca28b56b7a2f 100644 --- a/drivers/block/drbd/drbd_req.c +++ b/drivers/block/drbd/drbd_req.c @@ -149,46 +149,16 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const drbd_req_free(req); } -static void queue_barrier(struct drbd_conf *mdev) -{ - struct drbd_tl_epoch *b; - struct drbd_tconn *tconn = mdev->tconn; - - /* We are within the req_lock. Once we queued the barrier for sending, - * we set the CREATE_BARRIER bit. It is cleared as soon as a new - * barrier/epoch object is added. This is the only place this bit is - * set. It indicates that the barrier for this epoch is already queued, - * and no new epoch has been created yet. */ - if (test_bit(CREATE_BARRIER, &tconn->flags)) - return; - - b = tconn->newest_tle; - b->w.cb = w_send_barrier; - b->w.mdev = mdev; - /* inc_ap_pending done here, so we won't - * get imbalanced on connection loss. - * dec_ap_pending will be done in got_BarrierAck - * or (on connection loss) in tl_clear. */ - inc_ap_pending(mdev); - drbd_queue_work(&tconn->sender_work, &b->w); - set_bit(CREATE_BARRIER, &tconn->flags); +static void wake_all_senders(struct drbd_tconn *tconn) { + wake_up(&tconn->sender_work.q_wait); } -static void _about_to_complete_local_write(struct drbd_conf *mdev, - struct drbd_request *req) +/* must hold resource->req_lock */ +static void start_new_tl_epoch(struct drbd_tconn *tconn) { - const unsigned long s = req->rq_state; - - /* Before we can signal completion to the upper layers, - * we may need to close the current epoch. - * We can skip this, if this request has not even been sent, because we - * did not have a fully established connection yet/anymore, during - * bitmap exchange, or while we are C_AHEAD due to congestion policy. - */ - if (mdev->state.conn >= C_CONNECTED && - (s & RQ_NET_SENT) != 0 && - req->epoch == atomic_read(&mdev->tconn->current_tle_nr)) - queue_barrier(mdev); + tconn->current_tle_writes = 0; + atomic_inc(&tconn->current_tle_nr); + wake_all_senders(tconn); } void complete_master_bio(struct drbd_conf *mdev, @@ -320,9 +290,16 @@ void req_may_be_completed(struct drbd_request *req, struct bio_and_error *m) } else if (!(s & RQ_POSTPONED)) D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0); - /* for writes we need to do some extra housekeeping */ - if (rw == WRITE) - _about_to_complete_local_write(mdev, req); + /* Before we can signal completion to the upper layers, + * we may need to close the current transfer log epoch. + * We are within the request lock, so we can simply compare + * the request epoch number with the current transfer log + * epoch number. If they match, increase the current_tle_nr, + * and reset the transfer log epoch write_cnt. + */ + if (rw == WRITE && + req->epoch == atomic_read(&mdev->tconn->current_tle_nr)) + start_new_tl_epoch(mdev->tconn); /* Update disk stats */ _drbd_end_io_acct(mdev, req); @@ -514,15 +491,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, * hurting performance. */ set_bit(UNPLUG_REMOTE, &mdev->flags); - /* see __drbd_make_request, - * just after it grabs the req_lock */ - D_ASSERT(test_bit(CREATE_BARRIER, &mdev->tconn->flags) == 0); - - req->epoch = atomic_read(&mdev->tconn->current_tle_nr); - - /* increment size of current epoch */ - mdev->tconn->newest_tle->n_writes++; - /* queue work item to send data */ D_ASSERT(req->rq_state & RQ_NET_PENDING); req->rq_state |= RQ_NET_QUEUED; @@ -534,8 +502,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, nc = rcu_dereference(mdev->tconn->net_conf); p = nc->max_epoch_size; rcu_read_unlock(); - if (mdev->tconn->newest_tle->n_writes >= p) - queue_barrier(mdev); + if (mdev->tconn->current_tle_writes >= p) + start_new_tl_epoch(mdev->tconn); break; @@ -692,6 +660,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, During connection handshake, we ensure that the peer was not rebooted. */ if (!(req->rq_state & RQ_NET_OK)) { if (req->w.cb) { + /* w.cb expected to be w_send_dblock, or w_send_read_req */ drbd_queue_work(&mdev->tconn->sender_work, &req->w); rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ; } @@ -708,7 +677,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what, * this is bad, because if the connection is lost now, * we won't be able to clean them up... */ dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n"); - list_move(&req->tl_requests, &mdev->tconn->out_of_sequence_requests); } if ((req->rq_state & RQ_NET_MASK) != 0) { req->rq_state |= RQ_NET_DONE; @@ -835,7 +803,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s const int rw = bio_rw(bio); const int size = bio->bi_size; const sector_t sector = bio->bi_sector; - struct drbd_tl_epoch *b = NULL; struct drbd_request *req; struct net_conf *nc; int local, remote, send_oos = 0; @@ -916,24 +883,6 @@ int __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long s goto fail_free_complete; } - /* For WRITE request, we have to make sure that we have an - * unused_spare_tle, in case we need to start a new epoch. - * I try to be smart and avoid to pre-allocate always "just in case", - * but there is a race between testing the bit and pointer outside the - * spinlock, and grabbing the spinlock. - * if we lost that race, we retry. */ - if (rw == WRITE && (remote || send_oos) && - mdev->tconn->unused_spare_tle == NULL && - test_bit(CREATE_BARRIER, &mdev->tconn->flags)) { -allocate_barrier: - b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO); - if (!b) { - dev_err(DEV, "Failed to alloc barrier.\n"); - err = -ENOMEM; - goto fail_free_complete; - } - } - /* GOOD, everything prepared, grab the spin_lock */ spin_lock_irq(&mdev->tconn->req_lock); @@ -969,42 +918,9 @@ allocate_barrier: } } - if (b && mdev->tconn->unused_spare_tle == NULL) { - mdev->tconn->unused_spare_tle = b; - b = NULL; - } - if (rw == WRITE && (remote || send_oos) && - mdev->tconn->unused_spare_tle == NULL && - test_bit(CREATE_BARRIER, &mdev->tconn->flags)) { - /* someone closed the current epoch - * while we were grabbing the spinlock */ - spin_unlock_irq(&mdev->tconn->req_lock); - goto allocate_barrier; - } - - /* Update disk stats */ _drbd_start_io_acct(mdev, req, bio); - /* _maybe_start_new_epoch(mdev); - * If we need to generate a write barrier packet, we have to add the - * new epoch (barrier) object, and queue the barrier packet for sending, - * and queue the req's data after it _within the same lock_, otherwise - * we have race conditions were the reorder domains could be mixed up. - * - * Even read requests may start a new epoch and queue the corresponding - * barrier packet. To get the write ordering right, we only have to - * make sure that, if this is a write request and it triggered a - * barrier packet, this request is queued within the same spinlock. */ - if ((remote || send_oos) && mdev->tconn->unused_spare_tle && - test_and_clear_bit(CREATE_BARRIER, &mdev->tconn->flags)) { - _tl_add_barrier(mdev->tconn, mdev->tconn->unused_spare_tle); - mdev->tconn->unused_spare_tle = NULL; - } else { - D_ASSERT(!(remote && rw == WRITE && - test_bit(CREATE_BARRIER, &mdev->tconn->flags))); - } - /* NOTE * Actually, 'local' may be wrong here already, since we may have failed * to write to the meta data, and may become wrong anytime because of @@ -1025,7 +941,12 @@ allocate_barrier: if (local) _req_mod(req, TO_BE_SUBMITTED); - list_add_tail(&req->tl_requests, &mdev->tconn->newest_tle->requests); + /* which transfer log epoch does this belong to? */ + req->epoch = atomic_read(&mdev->tconn->current_tle_nr); + if (rw == WRITE) + mdev->tconn->current_tle_writes++; + + list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log); /* NOTE remote first: to get the concurrent write detection right, * we must register the request before start of local IO. */ @@ -1059,7 +980,9 @@ allocate_barrier: } if (congested) { - queue_barrier(mdev); /* last barrier, after mirrored writes */ + if (mdev->tconn->current_tle_writes) + /* start a new epoch for non-mirrored writes */ + start_new_tl_epoch(mdev->tconn); if (nc->on_congestion == OC_PULL_AHEAD) _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL); @@ -1070,7 +993,6 @@ allocate_barrier: rcu_read_unlock(); spin_unlock_irq(&mdev->tconn->req_lock); - kfree(b); /* if someone else has beaten us to it... */ if (local) { req->private_bio->bi_bdev = mdev->ldev->backing_bdev; @@ -1108,7 +1030,6 @@ fail_and_free_req: drbd_req_free(req); dec_ap_bio(mdev); - kfree(b); return ret; } @@ -1164,12 +1085,23 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct return limit; } +struct drbd_request *find_oldest_request(struct drbd_tconn *tconn) +{ + /* Walk the transfer log, + * and find the oldest not yet completed request */ + struct drbd_request *r; + list_for_each_entry(r, &tconn->transfer_log, tl_requests) { + if (r->rq_state & (RQ_NET_PENDING|RQ_LOCAL_PENDING)) + return r; + } + return NULL; +} + void request_timer_fn(unsigned long data) { struct drbd_conf *mdev = (struct drbd_conf *) data; struct drbd_tconn *tconn = mdev->tconn; struct drbd_request *req; /* oldest request */ - struct list_head *le; struct net_conf *nc; unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */ unsigned long now; @@ -1193,16 +1125,13 @@ void request_timer_fn(unsigned long data) now = jiffies; spin_lock_irq(&tconn->req_lock); - le = &tconn->oldest_tle->requests; - if (list_empty(le)) { + req = find_oldest_request(tconn); + if (!req) { spin_unlock_irq(&tconn->req_lock); mod_timer(&mdev->request_timer, now + et); return; } - le = le->prev; - req = list_entry(le, struct drbd_request, tl_requests); - /* The request is considered timed out, if * - we have some effective timeout from the configuration, * with above state restrictions applied, diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index 39ece3a2f53a..66be3910e8d2 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c @@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel) return 0; } -int w_send_barrier(struct drbd_work *w, int cancel) +/* FIXME + * We need to track the number of pending barrier acks, + * and to be able to wait for them. + * See also comment in drbd_adm_attach before drbd_suspend_io. + */ +int drbd_send_barrier(struct drbd_tconn *tconn) { - struct drbd_socket *sock; - struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); - struct drbd_conf *mdev = w->mdev; struct p_barrier *p; + struct drbd_socket *sock; - /* really avoid racing with tl_clear. w.cb may have been referenced - * just before it was reassigned and re-queued, so double check that. - * actually, this race was harmless, since we only try to send the - * barrier packet here, and otherwise do nothing with the object. - * but compare with the head of w_clear_epoch */ - spin_lock_irq(&mdev->tconn->req_lock); - if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) - cancel = 1; - spin_unlock_irq(&mdev->tconn->req_lock); - if (cancel) - return 0; - - sock = &mdev->tconn->data; - p = drbd_prepare_command(mdev, sock); + sock = &tconn->data; + p = conn_prepare_command(tconn, sock); if (!p) return -EIO; - p->barrier = b->br_number; - /* inc_ap_pending was done where this was queued. - * dec_ap_pending will be done in got_BarrierAck - * or (on connection loss) in w_clear_epoch. */ - return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0); + p->barrier = tconn->send.current_epoch_nr; + p->pad = 0; + tconn->send.current_epoch_writes = 0; + + return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0); } int w_send_write_hint(struct drbd_work *w, int cancel) @@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_conf *mdev = w->mdev; + struct drbd_tconn *tconn = mdev->tconn; int err; if (unlikely(cancel)) { @@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) return 0; } + if (!tconn->send.seen_any_write_yet) { + tconn->send.seen_any_write_yet = true; + tconn->send.current_epoch_nr = req->epoch; + } + if (tconn->send.current_epoch_nr != req->epoch) { + if (tconn->send.current_epoch_writes) + drbd_send_barrier(tconn); + tconn->send.current_epoch_nr = req->epoch; + } + /* this time, no tconn->send.current_epoch_writes++; + * If it was sent, it was the closing barrier for the last + * replicated epoch, before we went into AHEAD mode. + * No more barriers will be sent, until we leave AHEAD mode again. */ + err = drbd_send_out_of_sync(mdev, req); req_mod(req, OOS_HANDED_TO_NETWORK); @@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_conf *mdev = w->mdev; + struct drbd_tconn *tconn = mdev->tconn; int err; if (unlikely(cancel)) { @@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel) return 0; } + if (!tconn->send.seen_any_write_yet) { + tconn->send.seen_any_write_yet = true; + tconn->send.current_epoch_nr = req->epoch; + } + if (tconn->send.current_epoch_nr != req->epoch) { + if (tconn->send.current_epoch_writes) + drbd_send_barrier(tconn); + tconn->send.current_epoch_nr = req->epoch; + } + tconn->send.current_epoch_writes++; + err = drbd_send_dblock(mdev, req); req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); @@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_conf *mdev = w->mdev; + struct drbd_tconn *tconn = mdev->tconn; int err; if (unlikely(cancel)) { @@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel) return 0; } + /* Even read requests may close a write epoch, + * if there was any yet. */ + if (tconn->send.seen_any_write_yet && + tconn->send.current_epoch_nr != req->epoch) { + if (tconn->send.current_epoch_writes) + drbd_send_barrier(tconn); + tconn->send.current_epoch_nr = req->epoch; + } + err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size, (unsigned long)req); @@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) mutex_unlock(mdev->state_mutex); } +/* If the resource already closed the current epoch, but we did not + * (because we have not yet seen new requests), we should send the + * corresponding barrier now. Must be checked within the same spinlock + * that is used to check for new requests. */ +bool need_to_send_barrier(struct drbd_tconn *connection) +{ + if (!connection->send.seen_any_write_yet) + return false; + + /* Skip barriers that do not contain any writes. + * This may happen during AHEAD mode. */ + if (!connection->send.current_epoch_writes) + return false; + + /* ->req_lock is held when requests are queued on + * connection->sender_work, and put into ->transfer_log. + * It is also held when ->current_tle_nr is increased. + * So either there are already new requests queued, + * and corresponding barriers will be send there. + * Or nothing new is queued yet, so the difference will be 1. + */ + if (atomic_read(&connection->current_tle_nr) != + connection->send.current_epoch_nr + 1) + return false; + + return true; +} + bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) { spin_lock_irq(&queue->q_lock); @@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis return !list_empty(work_list); } +void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list) +{ + DEFINE_WAIT(wait); + struct net_conf *nc; + int uncork, cork; + + dequeue_work_item(&connection->sender_work, work_list); + if (!list_empty(work_list)) + return; + + /* Still nothing to do? + * Maybe we still need to close the current epoch, + * even if no new requests are queued yet. + * + * Also, poke TCP, just in case. + * Then wait for new work (or signal). */ + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + uncork = nc ? nc->tcp_cork : 0; + rcu_read_unlock(); + if (uncork) { + mutex_lock(&connection->data.mutex); + if (connection->data.socket) + drbd_tcp_uncork(connection->data.socket); + mutex_unlock(&connection->data.mutex); + } + + for (;;) { + int send_barrier; + prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); + spin_lock_irq(&connection->req_lock); + spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ + list_splice_init(&connection->sender_work.q, work_list); + spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ + if (!list_empty(work_list) || signal_pending(current)) { + spin_unlock_irq(&connection->req_lock); + break; + } + send_barrier = need_to_send_barrier(connection); + spin_unlock_irq(&connection->req_lock); + if (send_barrier) { + drbd_send_barrier(connection); + connection->send.current_epoch_nr++; + } + schedule(); + /* may be woken up for other things but new work, too, + * e.g. if the current epoch got closed. + * In which case we send the barrier above. */ + } + finish_wait(&connection->sender_work.q_wait, &wait); + + /* someone may have changed the config while we have been waiting above. */ + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + cork = nc ? nc->tcp_cork : 0; + rcu_read_unlock(); + mutex_lock(&connection->data.mutex); + if (connection->data.socket) { + if (cork) + drbd_tcp_cork(connection->data.socket); + else if (!uncork) + drbd_tcp_uncork(connection->data.socket); + } + mutex_unlock(&connection->data.mutex); +} + int drbd_worker(struct drbd_thread *thi) { struct drbd_tconn *tconn = thi->tconn; struct drbd_work *w = NULL; struct drbd_conf *mdev; - struct net_conf *nc; LIST_HEAD(work_list); int vnr; - int cork; while (get_t_state(thi) == RUNNING) { drbd_thread_current_set_cpu(thi); @@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi) /* as long as we use drbd_queue_work_front(), * we may only dequeue single work items here, not batches. */ if (list_empty(&work_list)) - dequeue_work_item(&tconn->sender_work, &work_list); - - /* Still nothing to do? Poke TCP, just in case, - * then wait for new work (or signal). */ - if (list_empty(&work_list)) { - mutex_lock(&tconn->data.mutex); - rcu_read_lock(); - nc = rcu_dereference(tconn->net_conf); - cork = nc ? nc->tcp_cork : 0; - rcu_read_unlock(); - - if (tconn->data.socket && cork) - drbd_tcp_uncork(tconn->data.socket); - mutex_unlock(&tconn->data.mutex); - - wait_event_interruptible(tconn->sender_work.q_wait, - dequeue_work_item(&tconn->sender_work, &work_list)); - - mutex_lock(&tconn->data.mutex); - if (tconn->data.socket && cork) - drbd_tcp_cork(tconn->data.socket); - mutex_unlock(&tconn->data.mutex); - } + wait_for_work(tconn, &work_list); if (signal_pending(current)) { flush_signals(current); |