summaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_worker.c
diff options
context:
space:
mode:
authorLars Ellenberg <lars.ellenberg@linbit.com>2011-11-28 15:04:49 +0100
committerPhilipp Reisner <philipp.reisner@linbit.com>2012-11-08 16:58:35 +0100
commitb6dd1a89767bc33e9c98b3195f8925b46c5c95f3 (patch)
treee82371062171f5cade79cb0c4a6cd22486b5f082 /drivers/block/drbd/drbd_worker.c
parentd5b27b01f17ef1f0badc45f9eea521be3457c9cb (diff)
downloadlinux-b6dd1a89767bc33e9c98b3195f8925b46c5c95f3.tar.bz2
drbd: remove struct drbd_tl_epoch objects (barrier works)
cherry-picked and adapted from drbd 9 devel branch DRBD requests (struct drbd_request) are already on the per resource transfer log list, and carry their epoch number. We do not need to additionally link them on other ring lists in other structs. The drbd sender thread can recognize itself when to send a P_BARRIER, by tracking the currently processed epoch, and how many writes have been processed for that epoch. If the epoch of the request to be processed does not match the currently processed epoch, any writes have been processed in it, a P_BARRIER for this last processed epoch is send out first. The new epoch then becomes the currently processed epoch. To not get stuck in drbd_al_begin_io() waiting for P_BARRIER_ACK, the sender thread also needs to handle the case when the current epoch was closed already, but no new requests are queued yet, and send out P_BARRIER as soon as possible. This is done by comparing the per resource "current transfer log epoch" (tconn->current_tle_nr) with the per connection "currently processed epoch number" (tconn->send.current_epoch_nr), while waiting for new requests to be processed in wait_for_work(). Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com> Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Diffstat (limited to 'drivers/block/drbd/drbd_worker.c')
-rw-r--r--drivers/block/drbd/drbd_worker.c194
1 files changed, 146 insertions, 48 deletions
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 39ece3a2f53a..66be3910e8d2 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel)
return 0;
}
-int w_send_barrier(struct drbd_work *w, int cancel)
+/* FIXME
+ * We need to track the number of pending barrier acks,
+ * and to be able to wait for them.
+ * See also comment in drbd_adm_attach before drbd_suspend_io.
+ */
+int drbd_send_barrier(struct drbd_tconn *tconn)
{
- struct drbd_socket *sock;
- struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
- struct drbd_conf *mdev = w->mdev;
struct p_barrier *p;
+ struct drbd_socket *sock;
- /* really avoid racing with tl_clear. w.cb may have been referenced
- * just before it was reassigned and re-queued, so double check that.
- * actually, this race was harmless, since we only try to send the
- * barrier packet here, and otherwise do nothing with the object.
- * but compare with the head of w_clear_epoch */
- spin_lock_irq(&mdev->tconn->req_lock);
- if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
- cancel = 1;
- spin_unlock_irq(&mdev->tconn->req_lock);
- if (cancel)
- return 0;
-
- sock = &mdev->tconn->data;
- p = drbd_prepare_command(mdev, sock);
+ sock = &tconn->data;
+ p = conn_prepare_command(tconn, sock);
if (!p)
return -EIO;
- p->barrier = b->br_number;
- /* inc_ap_pending was done where this was queued.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in w_clear_epoch. */
- return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
+ p->barrier = tconn->send.current_epoch_nr;
+ p->pad = 0;
+ tconn->send.current_epoch_writes = 0;
+
+ return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
}
int w_send_write_hint(struct drbd_work *w, int cancel)
@@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ /* this time, no tconn->send.current_epoch_writes++;
+ * If it was sent, it was the closing barrier for the last
+ * replicated epoch, before we went into AHEAD mode.
+ * No more barriers will be sent, until we leave AHEAD mode again. */
+
err = drbd_send_out_of_sync(mdev, req);
req_mod(req, OOS_HANDED_TO_NETWORK);
@@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel)
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ tconn->send.current_epoch_writes++;
+
err = drbd_send_dblock(mdev, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel)
return 0;
}
+ /* Even read requests may close a write epoch,
+ * if there was any yet. */
+ if (tconn->send.seen_any_write_yet &&
+ tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+
err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req);
@@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
mutex_unlock(mdev->state_mutex);
}
+/* If the resource already closed the current epoch, but we did not
+ * (because we have not yet seen new requests), we should send the
+ * corresponding barrier now. Must be checked within the same spinlock
+ * that is used to check for new requests. */
+bool need_to_send_barrier(struct drbd_tconn *connection)
+{
+ if (!connection->send.seen_any_write_yet)
+ return false;
+
+ /* Skip barriers that do not contain any writes.
+ * This may happen during AHEAD mode. */
+ if (!connection->send.current_epoch_writes)
+ return false;
+
+ /* ->req_lock is held when requests are queued on
+ * connection->sender_work, and put into ->transfer_log.
+ * It is also held when ->current_tle_nr is increased.
+ * So either there are already new requests queued,
+ * and corresponding barriers will be send there.
+ * Or nothing new is queued yet, so the difference will be 1.
+ */
+ if (atomic_read(&connection->current_tle_nr) !=
+ connection->send.current_epoch_nr + 1)
+ return false;
+
+ return true;
+}
+
bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
@@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis
return !list_empty(work_list);
}
+void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
+{
+ DEFINE_WAIT(wait);
+ struct net_conf *nc;
+ int uncork, cork;
+
+ dequeue_work_item(&connection->sender_work, work_list);
+ if (!list_empty(work_list))
+ return;
+
+ /* Still nothing to do?
+ * Maybe we still need to close the current epoch,
+ * even if no new requests are queued yet.
+ *
+ * Also, poke TCP, just in case.
+ * Then wait for new work (or signal). */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ uncork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ if (uncork) {
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket)
+ drbd_tcp_uncork(connection->data.socket);
+ mutex_unlock(&connection->data.mutex);
+ }
+
+ for (;;) {
+ int send_barrier;
+ prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
+ spin_lock_irq(&connection->req_lock);
+ spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ list_splice_init(&connection->sender_work.q, work_list);
+ spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ if (!list_empty(work_list) || signal_pending(current)) {
+ spin_unlock_irq(&connection->req_lock);
+ break;
+ }
+ send_barrier = need_to_send_barrier(connection);
+ spin_unlock_irq(&connection->req_lock);
+ if (send_barrier) {
+ drbd_send_barrier(connection);
+ connection->send.current_epoch_nr++;
+ }
+ schedule();
+ /* may be woken up for other things but new work, too,
+ * e.g. if the current epoch got closed.
+ * In which case we send the barrier above. */
+ }
+ finish_wait(&connection->sender_work.q_wait, &wait);
+
+ /* someone may have changed the config while we have been waiting above. */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ cork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket) {
+ if (cork)
+ drbd_tcp_cork(connection->data.socket);
+ else if (!uncork)
+ drbd_tcp_uncork(connection->data.socket);
+ }
+ mutex_unlock(&connection->data.mutex);
+}
+
int drbd_worker(struct drbd_thread *thi)
{
struct drbd_tconn *tconn = thi->tconn;
struct drbd_work *w = NULL;
struct drbd_conf *mdev;
- struct net_conf *nc;
LIST_HEAD(work_list);
int vnr;
- int cork;
while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);
@@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi)
/* as long as we use drbd_queue_work_front(),
* we may only dequeue single work items here, not batches. */
if (list_empty(&work_list))
- dequeue_work_item(&tconn->sender_work, &work_list);
-
- /* Still nothing to do? Poke TCP, just in case,
- * then wait for new work (or signal). */
- if (list_empty(&work_list)) {
- mutex_lock(&tconn->data.mutex);
- rcu_read_lock();
- nc = rcu_dereference(tconn->net_conf);
- cork = nc ? nc->tcp_cork : 0;
- rcu_read_unlock();
-
- if (tconn->data.socket && cork)
- drbd_tcp_uncork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
-
- wait_event_interruptible(tconn->sender_work.q_wait,
- dequeue_work_item(&tconn->sender_work, &work_list));
-
- mutex_lock(&tconn->data.mutex);
- if (tconn->data.socket && cork)
- drbd_tcp_cork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
- }
+ wait_for_work(tconn, &work_list);
if (signal_pending(current)) {
flush_signals(current);