diff options
author | Al Viro <viro@zeniv.linux.org.uk> | 2016-02-13 21:01:21 -0500 |
---|---|---|
committer | Mike Marshall <hubcap@omnibond.com> | 2016-02-19 13:45:54 -0500 |
commit | ea2c9c9f6574e835cbc903c94b82b5a34a334866 (patch) | |
tree | 47637785f81f2e7edb746de19a7b1a3c6ccd9627 /fs/orangefs/orangefs-bufmap.c | |
parent | 178041848a6e7072cc6ebc1c6c7763e33f564722 (diff) | |
download | linux-ea2c9c9f6574e835cbc903c94b82b5a34a334866.tar.bz2 |
orangefs: bufmap rewrite
new waiting-for-slot logics:
* make request for slot wait for bufmap to be set up if it
comes before it's installed *OR* while it's running down
* make closing control device wait for all slots to be freed
* waiting itself rewritten to (open-coded) analogues of wait_event_...
primitives - we would need wait_event_locked() and, pardon an obscenely
long name, wait_event_interruptible_exclusive_timeout_locked().
* we never wait for more than slot_timeout_secs in total and,
if during the wait the daemon goes away, we only allow
ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS for it to come back.
* (cosmetical) bitmap is used instead of an array of zeroes and ones
* old (and only reached if we are about to corrupt memory) waiting
for daemon restart in service_operation() removed.
[Martin's fixes folded]
Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
Diffstat (limited to 'fs/orangefs/orangefs-bufmap.c')
-rw-r--r-- | fs/orangefs/orangefs-bufmap.c | 341 |
1 files changed, 166 insertions, 175 deletions
diff --git a/fs/orangefs/orangefs-bufmap.c b/fs/orangefs/orangefs-bufmap.c index cd484665bf72..96faf4ee6529 100644 --- a/fs/orangefs/orangefs-bufmap.c +++ b/fs/orangefs/orangefs-bufmap.c @@ -7,7 +7,133 @@ #include "orangefs-kernel.h" #include "orangefs-bufmap.h" -DECLARE_WAIT_QUEUE_HEAD(orangefs_bufmap_init_waitq); +struct slot_map { + int c; + wait_queue_head_t q; + int count; + unsigned long *map; +}; + +static struct slot_map rw_map = { + .c = -1, + .q = __WAIT_QUEUE_HEAD_INITIALIZER(rw_map.q) +}; +static struct slot_map readdir_map = { + .c = -1, + .q = __WAIT_QUEUE_HEAD_INITIALIZER(readdir_map.q) +}; + + +static void install(struct slot_map *m, int count, unsigned long *map) +{ + spin_lock(&m->q.lock); + m->c = m->count = count; + m->map = map; + wake_up_all_locked(&m->q); + spin_unlock(&m->q.lock); +} + +static void mark_killed(struct slot_map *m) +{ + spin_lock(&m->q.lock); + m->c -= m->count + 1; + spin_unlock(&m->q.lock); +} + +static void run_down(struct slot_map *m) +{ + DEFINE_WAIT(wait); + spin_lock(&m->q.lock); + if (m->c != -1) { + for (;;) { + if (likely(list_empty(&wait.task_list))) + __add_wait_queue_tail(&m->q, &wait); + set_current_state(TASK_UNINTERRUPTIBLE); + + if (m->c == -1) + break; + + spin_unlock(&m->q.lock); + schedule(); + spin_lock(&m->q.lock); + } + __remove_wait_queue(&m->q, &wait); + __set_current_state(TASK_RUNNING); + } + m->map = NULL; + spin_unlock(&m->q.lock); +} + +static void put(struct slot_map *m, int slot) +{ + int v; + spin_lock(&m->q.lock); + __clear_bit(slot, m->map); + v = ++m->c; + if (unlikely(v == 1)) /* no free slots -> one free slot */ + wake_up_locked(&m->q); + else if (unlikely(v == -1)) /* finished dying */ + wake_up_all_locked(&m->q); + spin_unlock(&m->q.lock); +} + +static int wait_for_free(struct slot_map *m) +{ + long left = slot_timeout_secs * HZ; + DEFINE_WAIT(wait); + + do { + long n = left, t; + if (likely(list_empty(&wait.task_list))) + __add_wait_queue_tail_exclusive(&m->q, &wait); + set_current_state(TASK_INTERRUPTIBLE); + + if (m->c > 0) + break; + + if (m->c < 0) { + /* we are waiting for map to be installed */ + /* it would better be there soon, or we go away */ + if (n > ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ) + n = ORANGEFS_BUFMAP_WAIT_TIMEOUT_SECS * HZ; + } + spin_unlock(&m->q.lock); + t = schedule_timeout(n); + spin_lock(&m->q.lock); + if (unlikely(!t) && n != left && m->c < 0) + left = t; + else + left = t + (left - n); + if (unlikely(signal_pending(current))) + left = -EINTR; + } while (left > 0); + + if (!list_empty(&wait.task_list)) + list_del(&wait.task_list); + else if (left <= 0 && waitqueue_active(&m->q)) + __wake_up_locked_key(&m->q, TASK_INTERRUPTIBLE, NULL); + __set_current_state(TASK_RUNNING); + + if (likely(left > 0)) + return 0; + + return left < 0 ? -EINTR : -ETIMEDOUT; +} + +static int get(struct slot_map *m) +{ + int res = 0; + spin_lock(&m->q.lock); + if (unlikely(m->c <= 0)) + res = wait_for_free(m); + if (likely(!res)) { + m->c--; + res = find_first_zero_bit(m->map, m->count); + __set_bit(res, m->map); + } + spin_unlock(&m->q.lock); + return res; +} /* used to describe mapped buffers */ struct orangefs_bufmap_desc { @@ -18,8 +144,6 @@ struct orangefs_bufmap_desc { }; static struct orangefs_bufmap { - atomic_t refcnt; - int desc_size; int desc_shift; int desc_count; @@ -30,12 +154,12 @@ static struct orangefs_bufmap { struct orangefs_bufmap_desc *desc_array; /* array to track usage of buffer descriptors */ - int *buffer_index_array; - spinlock_t buffer_index_lock; + unsigned long *buffer_index_array; /* array to track usage of buffer descriptors for readdir */ - int readdir_index_array[ORANGEFS_READDIR_DEFAULT_DESC_COUNT]; - spinlock_t readdir_index_lock; +#define N DIV_ROUND_UP(ORANGEFS_READDIR_DEFAULT_DESC_COUNT, BITS_PER_LONG) + unsigned long readdir_index_array[N]; +#undef N } *__orangefs_bufmap; static DEFINE_SPINLOCK(orangefs_bufmap_lock); @@ -58,30 +182,6 @@ orangefs_bufmap_free(struct orangefs_bufmap *bufmap) kfree(bufmap); } -static struct orangefs_bufmap *orangefs_bufmap_ref(void) -{ - struct orangefs_bufmap *bufmap = NULL; - - spin_lock(&orangefs_bufmap_lock); - if (__orangefs_bufmap) { - bufmap = __orangefs_bufmap; - atomic_inc(&bufmap->refcnt); - } - spin_unlock(&orangefs_bufmap_lock); - return bufmap; -} - -static void orangefs_bufmap_unref(struct orangefs_bufmap *bufmap) -{ - if (atomic_dec_and_lock(&bufmap->refcnt, &orangefs_bufmap_lock)) { - __orangefs_bufmap = NULL; - spin_unlock(&orangefs_bufmap_lock); - - orangefs_bufmap_unmap(bufmap); - orangefs_bufmap_free(bufmap); - } -} - /* * XXX: Can the size and shift change while the caller gives up the * XXX: lock between calling this and doing something useful? @@ -137,21 +237,18 @@ orangefs_bufmap_alloc(struct ORANGEFS_dev_map_desc *user_desc) if (!bufmap) goto out; - atomic_set(&bufmap->refcnt, 1); bufmap->total_size = user_desc->total_size; bufmap->desc_count = user_desc->count; bufmap->desc_size = user_desc->size; bufmap->desc_shift = ilog2(bufmap->desc_size); - spin_lock_init(&bufmap->buffer_index_lock); bufmap->buffer_index_array = - kcalloc(bufmap->desc_count, sizeof(int), GFP_KERNEL); + kzalloc(DIV_ROUND_UP(bufmap->desc_count, BITS_PER_LONG), GFP_KERNEL); if (!bufmap->buffer_index_array) { gossip_err("orangefs: could not allocate %d buffer indices\n", bufmap->desc_count); goto out_free_bufmap; } - spin_lock_init(&bufmap->readdir_index_lock); bufmap->desc_array = kcalloc(bufmap->desc_count, sizeof(struct orangefs_bufmap_desc), @@ -294,24 +391,18 @@ int orangefs_bufmap_initialize(struct ORANGEFS_dev_map_desc *user_desc) if (__orangefs_bufmap) { spin_unlock(&orangefs_bufmap_lock); gossip_err("orangefs: error: bufmap already initialized.\n"); - ret = -EALREADY; + ret = -EINVAL; goto out_unmap_bufmap; } __orangefs_bufmap = bufmap; + install(&rw_map, + bufmap->desc_count, + bufmap->buffer_index_array); + install(&readdir_map, + ORANGEFS_READDIR_DEFAULT_DESC_COUNT, + bufmap->readdir_index_array); spin_unlock(&orangefs_bufmap_lock); - /* - * If there are operations in orangefs_bufmap_init_waitq, wake them up. - * This scenario occurs when the client-core is restarted and I/O - * requests in the in-progress or waiting tables are restarted. I/O - * requests cannot be restarted until the shared memory system is - * completely re-initialized, so we put the I/O requests in this - * waitq until initialization has completed. NOTE: the I/O requests - * are also on a timer, so they don't wait forever just in case the - * client-core doesn't come back up. - */ - wake_up_interruptible(&orangefs_bufmap_init_waitq); - gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_initialize: exiting normally\n"); return 0; @@ -334,91 +425,28 @@ out: */ void orangefs_bufmap_finalize(void) { + struct orangefs_bufmap *bufmap = __orangefs_bufmap; + if (!bufmap) + return; gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: called\n"); - BUG_ON(!__orangefs_bufmap); - orangefs_bufmap_unref(__orangefs_bufmap); + mark_killed(&rw_map); + mark_killed(&readdir_map); gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs_bufmap_finalize: exiting normally\n"); } -struct slot_args { - int slot_count; - int *slot_array; - spinlock_t *slot_lock; - wait_queue_head_t *slot_wq; -}; - -static int wait_for_a_slot(struct slot_args *slargs, int *buffer_index) +void orangefs_bufmap_run_down(void) { - int ret = -1; - int i = 0; - DEFINE_WAIT(wait_entry); - - while (1) { - /* - * check for available desc, slot_lock is the appropriate - * index_lock - */ - spin_lock(slargs->slot_lock); - prepare_to_wait_exclusive(slargs->slot_wq, - &wait_entry, - TASK_INTERRUPTIBLE); - for (i = 0; i < slargs->slot_count; i++) - if (slargs->slot_array[i] == 0) { - slargs->slot_array[i] = 1; - *buffer_index = i; - ret = 0; - break; - } - spin_unlock(slargs->slot_lock); - - /* if we acquired a buffer, then break out of while */ - if (ret == 0) - break; - - if (!signal_pending(current)) { - gossip_debug(GOSSIP_BUFMAP_DEBUG, - "[BUFMAP]: waiting %d " - "seconds for a slot\n", - slot_timeout_secs); - if (!schedule_timeout(slot_timeout_secs * HZ)) { - gossip_debug(GOSSIP_BUFMAP_DEBUG, - "*** wait_for_a_slot timed out\n"); - ret = -ETIMEDOUT; - break; - } - gossip_debug(GOSSIP_BUFMAP_DEBUG, - "[BUFMAP]: woken up by a slot becoming available.\n"); - continue; - } - - gossip_debug(GOSSIP_BUFMAP_DEBUG, "orangefs: %s interrupted.\n", - __func__); - ret = -EINTR; - break; - } - - spin_lock(slargs->slot_lock); - finish_wait(slargs->slot_wq, &wait_entry); - spin_unlock(slargs->slot_lock); - return ret; -} - -static void put_back_slot(struct slot_args *slargs, int buffer_index) -{ - /* slot_lock is the appropriate index_lock */ - spin_lock(slargs->slot_lock); - if (buffer_index < 0 || buffer_index >= slargs->slot_count) { - spin_unlock(slargs->slot_lock); + struct orangefs_bufmap *bufmap = __orangefs_bufmap; + if (!bufmap) return; - } - - /* put the desc back on the queue */ - slargs->slot_array[buffer_index] = 0; - spin_unlock(slargs->slot_lock); - - /* wake up anyone who may be sleeping on the queue */ - wake_up_interruptible(slargs->slot_wq); + run_down(&rw_map); + run_down(&readdir_map); + spin_lock(&orangefs_bufmap_lock); + __orangefs_bufmap = NULL; + spin_unlock(&orangefs_bufmap_lock); + orangefs_bufmap_unmap(bufmap); + orangefs_bufmap_free(bufmap); } /* @@ -431,23 +459,12 @@ static void put_back_slot(struct slot_args *slargs, int buffer_index) */ int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index) { - struct orangefs_bufmap *bufmap = orangefs_bufmap_ref(); - struct slot_args slargs; - int ret; - - if (!bufmap) { - gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n"); - return -EIO; + int ret = get(&rw_map); + if (ret >= 0) { + *mapp = __orangefs_bufmap; + *buffer_index = ret; + ret = 0; } - - slargs.slot_count = bufmap->desc_count; - slargs.slot_array = bufmap->buffer_index_array; - slargs.slot_lock = &bufmap->buffer_index_lock; - slargs.slot_wq = &bufmap_waitq; - ret = wait_for_a_slot(&slargs, buffer_index); - if (ret) - orangefs_bufmap_unref(bufmap); - *mapp = bufmap; return ret; } @@ -460,15 +477,7 @@ int orangefs_bufmap_get(struct orangefs_bufmap **mapp, int *buffer_index) */ void orangefs_bufmap_put(int buffer_index) { - struct slot_args slargs; - struct orangefs_bufmap *bufmap = __orangefs_bufmap; - - slargs.slot_count = bufmap->desc_count; - slargs.slot_array = bufmap->buffer_index_array; - slargs.slot_lock = &bufmap->buffer_index_lock; - slargs.slot_wq = &bufmap_waitq; - put_back_slot(&slargs, buffer_index); - orangefs_bufmap_unref(bufmap); + put(&rw_map, buffer_index); } /* @@ -484,36 +493,18 @@ void orangefs_bufmap_put(int buffer_index) */ int orangefs_readdir_index_get(struct orangefs_bufmap **mapp, int *buffer_index) { - struct orangefs_bufmap *bufmap = orangefs_bufmap_ref(); - struct slot_args slargs; - int ret; - - if (!bufmap) { - gossip_err("orangefs: please confirm that pvfs2-client daemon is running.\n"); - return -EIO; + int ret = get(&readdir_map); + if (ret >= 0) { + *mapp = __orangefs_bufmap; + *buffer_index = ret; + ret = 0; } - - slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT; - slargs.slot_array = bufmap->readdir_index_array; - slargs.slot_lock = &bufmap->readdir_index_lock; - slargs.slot_wq = &readdir_waitq; - ret = wait_for_a_slot(&slargs, buffer_index); - if (ret) - orangefs_bufmap_unref(bufmap); - *mapp = bufmap; return ret; } void orangefs_readdir_index_put(struct orangefs_bufmap *bufmap, int buffer_index) { - struct slot_args slargs; - - slargs.slot_count = ORANGEFS_READDIR_DEFAULT_DESC_COUNT; - slargs.slot_array = bufmap->readdir_index_array; - slargs.slot_lock = &bufmap->readdir_index_lock; - slargs.slot_wq = &readdir_waitq; - put_back_slot(&slargs, buffer_index); - orangefs_bufmap_unref(bufmap); + put(&readdir_map, buffer_index); } /* |