diff options
Diffstat (limited to 'fs/dlm/member.c')
-rw-r--r-- | fs/dlm/member.c | 486 |
1 files changed, 408 insertions, 78 deletions
diff --git a/fs/dlm/member.c b/fs/dlm/member.c index b12532e553f8..862640a36d5c 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -1,7 +1,7 @@ /****************************************************************************** ******************************************************************************* ** -** Copyright (C) 2005-2009 Red Hat, Inc. All rights reserved. +** Copyright (C) 2005-2011 Red Hat, Inc. All rights reserved. ** ** This copyrighted material is made available to anyone wishing to use, ** modify, copy, or redistribute it subject to the terms and conditions @@ -19,6 +19,280 @@ #include "config.h" #include "lowcomms.h" +int dlm_slots_version(struct dlm_header *h) +{ + if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS) + return 0; + return 1; +} + +void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc, + struct dlm_member *memb) +{ + struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; + + if (!dlm_slots_version(&rc->rc_header)) + return; + + memb->slot = le16_to_cpu(rf->rf_our_slot); + memb->generation = le32_to_cpu(rf->rf_generation); +} + +void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc) +{ + struct dlm_slot *slot; + struct rcom_slot *ro; + int i; + + ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); + + /* ls_slots array is sparse, but not rcom_slots */ + + for (i = 0; i < ls->ls_slots_size; i++) { + slot = &ls->ls_slots[i]; + if (!slot->nodeid) + continue; + ro->ro_nodeid = cpu_to_le32(slot->nodeid); + ro->ro_slot = cpu_to_le16(slot->slot); + ro++; + } +} + +#define SLOT_DEBUG_LINE 128 + +static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots, + struct rcom_slot *ro0, struct dlm_slot *array, + int array_size) +{ + char line[SLOT_DEBUG_LINE]; + int len = SLOT_DEBUG_LINE - 1; + int pos = 0; + int ret, i; + + if (!dlm_config.ci_log_debug) + return; + + memset(line, 0, sizeof(line)); + + if (array) { + for (i = 0; i < array_size; i++) { + if (!array[i].nodeid) + continue; + + ret = snprintf(line + pos, len - pos, " %d:%d", + array[i].slot, array[i].nodeid); + if (ret >= len - pos) + break; + pos += ret; + } + } else if (ro0) { + for (i = 0; i < num_slots; i++) { + ret = snprintf(line + pos, len - pos, " %d:%d", + ro0[i].ro_slot, ro0[i].ro_nodeid); + if (ret >= len - pos) + break; + pos += ret; + } + } + + log_debug(ls, "generation %u slots %d%s", gen, num_slots, line); +} + +int dlm_slots_copy_in(struct dlm_ls *ls) +{ + struct dlm_member *memb; + struct dlm_rcom *rc = ls->ls_recover_buf; + struct rcom_config *rf = (struct rcom_config *)rc->rc_buf; + struct rcom_slot *ro0, *ro; + int our_nodeid = dlm_our_nodeid(); + int i, num_slots; + uint32_t gen; + + if (!dlm_slots_version(&rc->rc_header)) + return -1; + + gen = le32_to_cpu(rf->rf_generation); + if (gen <= ls->ls_generation) { + log_error(ls, "dlm_slots_copy_in gen %u old %u", + gen, ls->ls_generation); + } + ls->ls_generation = gen; + + num_slots = le16_to_cpu(rf->rf_num_slots); + if (!num_slots) + return -1; + + ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config)); + + for (i = 0, ro = ro0; i < num_slots; i++, ro++) { + ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid); + ro->ro_slot = le16_to_cpu(ro->ro_slot); + } + + log_debug_slots(ls, gen, num_slots, ro0, NULL, 0); + + list_for_each_entry(memb, &ls->ls_nodes, list) { + for (i = 0, ro = ro0; i < num_slots; i++, ro++) { + if (ro->ro_nodeid != memb->nodeid) + continue; + memb->slot = ro->ro_slot; + memb->slot_prev = memb->slot; + break; + } + + if (memb->nodeid == our_nodeid) { + if (ls->ls_slot && ls->ls_slot != memb->slot) { + log_error(ls, "dlm_slots_copy_in our slot " + "changed %d %d", ls->ls_slot, + memb->slot); + return -1; + } + + if (!ls->ls_slot) + ls->ls_slot = memb->slot; + } + + if (!memb->slot) { + log_error(ls, "dlm_slots_copy_in nodeid %d no slot", + memb->nodeid); + return -1; + } + } + + return 0; +} + +/* for any nodes that do not support slots, we will not have set memb->slot + in wait_status_all(), so memb->slot will remain -1, and we will not + assign slots or set ls_num_slots here */ + +int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size, + struct dlm_slot **slots_out, uint32_t *gen_out) +{ + struct dlm_member *memb; + struct dlm_slot *array; + int our_nodeid = dlm_our_nodeid(); + int array_size, max_slots, i; + int need = 0; + int max = 0; + int num = 0; + uint32_t gen = 0; + + /* our own memb struct will have slot -1 gen 0 */ + + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->nodeid == our_nodeid) { + memb->slot = ls->ls_slot; + memb->generation = ls->ls_generation; + break; + } + } + + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->generation > gen) + gen = memb->generation; + + /* node doesn't support slots */ + + if (memb->slot == -1) + return -1; + + /* node needs a slot assigned */ + + if (!memb->slot) + need++; + + /* node has a slot assigned */ + + num++; + + if (!max || max < memb->slot) + max = memb->slot; + + /* sanity check, once slot is assigned it shouldn't change */ + + if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) { + log_error(ls, "nodeid %d slot changed %d %d", + memb->nodeid, memb->slot_prev, memb->slot); + return -1; + } + memb->slot_prev = memb->slot; + } + + array_size = max + need; + + array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS); + if (!array) + return -ENOMEM; + + num = 0; + + /* fill in slots (offsets) that are used */ + + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (!memb->slot) + continue; + + if (memb->slot > array_size) { + log_error(ls, "invalid slot number %d", memb->slot); + kfree(array); + return -1; + } + + array[memb->slot - 1].nodeid = memb->nodeid; + array[memb->slot - 1].slot = memb->slot; + num++; + } + + /* assign new slots from unused offsets */ + + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->slot) + continue; + + for (i = 0; i < array_size; i++) { + if (array[i].nodeid) + continue; + + memb->slot = i + 1; + memb->slot_prev = memb->slot; + array[i].nodeid = memb->nodeid; + array[i].slot = memb->slot; + num++; + + if (!ls->ls_slot && memb->nodeid == our_nodeid) + ls->ls_slot = memb->slot; + break; + } + + if (!memb->slot) { + log_error(ls, "no free slot found"); + kfree(array); + return -1; + } + } + + gen++; + + log_debug_slots(ls, gen, num, NULL, array, array_size); + + max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) - + sizeof(struct rcom_config)) / sizeof(struct rcom_slot); + + if (num > max_slots) { + log_error(ls, "num_slots %d exceeds max_slots %d", + num, max_slots); + kfree(array); + return -1; + } + + *gen_out = gen; + *slots_out = array; + *slots_size = array_size; + *num_slots = num; + return 0; +} + static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) { struct dlm_member *memb = NULL; @@ -43,59 +317,51 @@ static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) } } -static int dlm_add_member(struct dlm_ls *ls, int nodeid) +static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node) { struct dlm_member *memb; - int w, error; + int error; memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS); if (!memb) return -ENOMEM; - w = dlm_node_weight(ls->ls_name, nodeid); - if (w < 0) { - kfree(memb); - return w; - } - - error = dlm_lowcomms_connect_node(nodeid); + error = dlm_lowcomms_connect_node(node->nodeid); if (error < 0) { kfree(memb); return error; } - memb->nodeid = nodeid; - memb->weight = w; + memb->nodeid = node->nodeid; + memb->weight = node->weight; + memb->comm_seq = node->comm_seq; add_ordered_member(ls, memb); ls->ls_num_nodes++; return 0; } -static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) -{ - list_move(&memb->list, &ls->ls_nodes_gone); - ls->ls_num_nodes--; -} - -int dlm_is_member(struct dlm_ls *ls, int nodeid) +static struct dlm_member *find_memb(struct list_head *head, int nodeid) { struct dlm_member *memb; - list_for_each_entry(memb, &ls->ls_nodes, list) { + list_for_each_entry(memb, head, list) { if (memb->nodeid == nodeid) - return 1; + return memb; } + return NULL; +} + +int dlm_is_member(struct dlm_ls *ls, int nodeid) +{ + if (find_memb(&ls->ls_nodes, nodeid)) + return 1; return 0; } int dlm_is_removed(struct dlm_ls *ls, int nodeid) { - struct dlm_member *memb; - - list_for_each_entry(memb, &ls->ls_nodes_gone, list) { - if (memb->nodeid == nodeid) - return 1; - } + if (find_memb(&ls->ls_nodes_gone, nodeid)) + return 1; return 0; } @@ -176,7 +442,7 @@ static int ping_members(struct dlm_ls *ls) error = dlm_recovery_stopped(ls); if (error) break; - error = dlm_rcom_status(ls, memb->nodeid); + error = dlm_rcom_status(ls, memb->nodeid, 0); if (error) break; } @@ -186,10 +452,88 @@ static int ping_members(struct dlm_ls *ls) return error; } +static void dlm_lsop_recover_prep(struct dlm_ls *ls) +{ + if (!ls->ls_ops || !ls->ls_ops->recover_prep) + return; + ls->ls_ops->recover_prep(ls->ls_ops_arg); +} + +static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb) +{ + struct dlm_slot slot; + uint32_t seq; + int error; + + if (!ls->ls_ops || !ls->ls_ops->recover_slot) + return; + + /* if there is no comms connection with this node + or the present comms connection is newer + than the one when this member was added, then + we consider the node to have failed (versus + being removed due to dlm_release_lockspace) */ + + error = dlm_comm_seq(memb->nodeid, &seq); + + if (!error && seq == memb->comm_seq) + return; + + slot.nodeid = memb->nodeid; + slot.slot = memb->slot; + + ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot); +} + +void dlm_lsop_recover_done(struct dlm_ls *ls) +{ + struct dlm_member *memb; + struct dlm_slot *slots; + int i, num; + + if (!ls->ls_ops || !ls->ls_ops->recover_done) + return; + + num = ls->ls_num_nodes; + + slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL); + if (!slots) + return; + + i = 0; + list_for_each_entry(memb, &ls->ls_nodes, list) { + if (i == num) { + log_error(ls, "dlm_lsop_recover_done bad num %d", num); + goto out; + } + slots[i].nodeid = memb->nodeid; + slots[i].slot = memb->slot; + i++; + } + + ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num, + ls->ls_slot, ls->ls_generation); + out: + kfree(slots); +} + +static struct dlm_config_node *find_config_node(struct dlm_recover *rv, + int nodeid) +{ + int i; + + for (i = 0; i < rv->nodes_count; i++) { + if (rv->nodes[i].nodeid == nodeid) + return &rv->nodes[i]; + } + return NULL; +} + int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) { struct dlm_member *memb, *safe; - int i, error, found, pos = 0, neg = 0, low = -1; + struct dlm_config_node *node; + int i, error, neg = 0, low = -1; /* previously removed members that we've not finished removing need to count as a negative change so the "neg" recovery steps will happen */ @@ -202,46 +546,32 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) /* move departed members from ls_nodes to ls_nodes_gone */ list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { - found = 0; - for (i = 0; i < rv->node_count; i++) { - if (memb->nodeid == rv->nodeids[i]) { - found = 1; - break; - } - } + node = find_config_node(rv, memb->nodeid); + if (node && !node->new) + continue; - if (!found) { - neg++; - dlm_remove_member(ls, memb); + if (!node) { log_debug(ls, "remove member %d", memb->nodeid); + } else { + /* removed and re-added */ + log_debug(ls, "remove member %d comm_seq %u %u", + memb->nodeid, memb->comm_seq, node->comm_seq); } - } - - /* Add an entry to ls_nodes_gone for members that were removed and - then added again, so that previous state for these nodes will be - cleared during recovery. */ - - for (i = 0; i < rv->new_count; i++) { - if (!dlm_is_member(ls, rv->new[i])) - continue; - log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]); - memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS); - if (!memb) - return -ENOMEM; - memb->nodeid = rv->new[i]; - list_add_tail(&memb->list, &ls->ls_nodes_gone); neg++; + list_move(&memb->list, &ls->ls_nodes_gone); + ls->ls_num_nodes--; + dlm_lsop_recover_slot(ls, memb); } /* add new members to ls_nodes */ - for (i = 0; i < rv->node_count; i++) { - if (dlm_is_member(ls, rv->nodeids[i])) + for (i = 0; i < rv->nodes_count; i++) { + node = &rv->nodes[i]; + if (dlm_is_member(ls, node->nodeid)) continue; - dlm_add_member(ls, rv->nodeids[i]); - pos++; - log_debug(ls, "add member %d", rv->nodeids[i]); + dlm_add_member(ls, node); + log_debug(ls, "add member %d", node->nodeid); } list_for_each_entry(memb, &ls->ls_nodes, list) { @@ -251,7 +581,6 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) ls->ls_low_nodeid = low; make_member_array(ls); - dlm_set_recover_status(ls, DLM_RS_NODES); *neg_out = neg; error = ping_members(ls); @@ -261,12 +590,8 @@ int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) ls->ls_members_result = error; complete(&ls->ls_members_done); } - if (error) - goto out; - error = dlm_recover_members_wait(ls); - out: - log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error); + log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes); return error; } @@ -327,26 +652,35 @@ int dlm_ls_stop(struct dlm_ls *ls) */ dlm_recoverd_suspend(ls); + + spin_lock(&ls->ls_recover_lock); + kfree(ls->ls_slots); + ls->ls_slots = NULL; + ls->ls_num_slots = 0; + ls->ls_slots_size = 0; ls->ls_recover_status = 0; + spin_unlock(&ls->ls_recover_lock); + dlm_recoverd_resume(ls); if (!ls->ls_recover_begin) ls->ls_recover_begin = jiffies; + + dlm_lsop_recover_prep(ls); return 0; } int dlm_ls_start(struct dlm_ls *ls) { struct dlm_recover *rv = NULL, *rv_old; - int *ids = NULL, *new = NULL; - int error, ids_count = 0, new_count = 0; + struct dlm_config_node *nodes; + int error, count; rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS); if (!rv) return -ENOMEM; - error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count, - &new, &new_count); + error = dlm_config_nodes(ls->ls_name, &nodes, &count); if (error < 0) goto fail; @@ -361,10 +695,8 @@ int dlm_ls_start(struct dlm_ls *ls) goto fail; } - rv->nodeids = ids; - rv->node_count = ids_count; - rv->new = new; - rv->new_count = new_count; + rv->nodes = nodes; + rv->nodes_count = count; rv->seq = ++ls->ls_recover_seq; rv_old = ls->ls_recover_args; ls->ls_recover_args = rv; @@ -372,9 +704,8 @@ int dlm_ls_start(struct dlm_ls *ls) if (rv_old) { log_error(ls, "unused recovery %llx %d", - (unsigned long long)rv_old->seq, rv_old->node_count); - kfree(rv_old->nodeids); - kfree(rv_old->new); + (unsigned long long)rv_old->seq, rv_old->nodes_count); + kfree(rv_old->nodes); kfree(rv_old); } @@ -383,8 +714,7 @@ int dlm_ls_start(struct dlm_ls *ls) fail: kfree(rv); - kfree(ids); - kfree(new); + kfree(nodes); return error; } |