diff options
author | Tejun Heo <tj@kernel.org> | 2011-01-03 14:49:46 +0100 |
---|---|---|
committer | Sage Weil <sage@newdream.net> | 2011-01-12 15:15:14 -0800 |
commit | f363e45fd1184219b472ea549cb7e192e24ef4d2 (patch) | |
tree | 1332feb2f7a0a47ce482a0fd4ee9afb547a27090 | |
parent | 01e6acc4ea4c284c44bfb3d46c76f4ae580c6435 (diff) | |
download | linux-f363e45fd1184219b472ea549cb7e192e24ef4d2.tar.bz2 |
net/ceph: make ceph_msgr_wq non-reentrant
ceph messenger code does a rather complex dancing around multithread
workqueue to make sure the same work item isn't executed concurrently
on different CPUs. This restriction can be provided by workqueue with
WQ_NON_REENTRANT.
Make ceph_msgr_wq non-reentrant workqueue with the default concurrency
level and remove the QUEUED/BUSY logic.
* This removes backoff handling in con_work() but it couldn't reliably
block execution of con_work() to begin with - queue_con() can be
called after the work started but before BUSY is set. It seems that
it was an optimization for a rather cold path and can be safely
removed.
* The number of concurrent work items is bound by the number of
connections and connetions are independent from each other. With
the default concurrency level, different connections will be
executed independently.
Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: Sage Weil <sage@newdream.net>
Cc: ceph-devel@vger.kernel.org
Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r-- | include/linux/ceph/messenger.h | 5 | ||||
-rw-r--r-- | net/ceph/messenger.c | 46 |
2 files changed, 2 insertions, 49 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index a108b425fee2..c3011beac30d 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -110,17 +110,12 @@ struct ceph_msg_pos { /* * ceph_connection state bit flags - * - * QUEUED and BUSY are used together to ensure that only a single - * thread is currently opening, reading or writing data to the socket. */ #define LOSSYTX 0 /* we can close channel or drop messages on errors */ #define CONNECTING 1 #define NEGOTIATING 2 #define KEEPALIVE_PENDING 3 #define WRITE_PENDING 4 /* we have data ready to send */ -#define QUEUED 5 /* there is work queued on this connection */ -#define BUSY 6 /* work is being done */ #define STANDBY 8 /* no outgoing messages, socket closed. we keep * the ceph_connection around to maintain shared * state with the peer. */ diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index b6ff4a1519ab..dff633d62e5b 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -96,7 +96,7 @@ struct workqueue_struct *ceph_msgr_wq; int ceph_msgr_init(void) { - ceph_msgr_wq = create_workqueue("ceph-msgr"); + ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0); if (!ceph_msgr_wq) { pr_err("msgr_init failed to create workqueue\n"); return -ENOMEM; @@ -1920,20 +1920,6 @@ bad_tag: /* * Atomically queue work on a connection. Bump @con reference to * avoid races with connection teardown. - * - * There is some trickery going on with QUEUED and BUSY because we - * only want a _single_ thread operating on each connection at any - * point in time, but we want to use all available CPUs. - * - * The worker thread only proceeds if it can atomically set BUSY. It - * clears QUEUED and does it's thing. When it thinks it's done, it - * clears BUSY, then rechecks QUEUED.. if it's set again, it loops - * (tries again to set BUSY). - * - * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we - * try to queue work. If that fails (work is already queued, or BUSY) - * we give up (work also already being done or is queued) but leave QUEUED - * set so that the worker thread will loop if necessary. */ static void queue_con(struct ceph_connection *con) { @@ -1948,11 +1934,7 @@ static void queue_con(struct ceph_connection *con) return; } - set_bit(QUEUED, &con->state); - if (test_bit(BUSY, &con->state)) { - dout("queue_con %p - already BUSY\n", con); - con->ops->put(con); - } else if (!queue_work(ceph_msgr_wq, &con->work.work)) { + if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) { dout("queue_con %p - already queued\n", con); con->ops->put(con); } else { @@ -1967,15 +1949,6 @@ static void con_work(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); - int backoff = 0; - -more: - if (test_and_set_bit(BUSY, &con->state) != 0) { - dout("con_work %p BUSY already set\n", con); - goto out; - } - dout("con_work %p start, clearing QUEUED\n", con); - clear_bit(QUEUED, &con->state); mutex_lock(&con->mutex); @@ -1994,28 +1967,13 @@ more: try_read(con) < 0 || try_write(con) < 0) { mutex_unlock(&con->mutex); - backoff = 1; ceph_fault(con); /* error/fault path */ goto done_unlocked; } done: mutex_unlock(&con->mutex); - done_unlocked: - clear_bit(BUSY, &con->state); - dout("con->state=%lu\n", con->state); - if (test_bit(QUEUED, &con->state)) { - if (!backoff || test_bit(OPENING, &con->state)) { - dout("con_work %p QUEUED reset, looping\n", con); - goto more; - } - dout("con_work %p QUEUED reset, but just faulted\n", con); - clear_bit(QUEUED, &con->state); - } - dout("con_work %p done\n", con); - -out: con->ops->put(con); } |