summaryrefslogtreecommitdiffstats
path: root/fs/xfs/xfs_log_cil.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/xfs/xfs_log_cil.c')
-rw-r--r--fs/xfs/xfs_log_cil.c472
1 files changed, 360 insertions, 112 deletions
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c
index db6cb7800251..eccbfb99e894 100644
--- a/fs/xfs/xfs_log_cil.c
+++ b/fs/xfs/xfs_log_cil.c
@@ -44,9 +44,20 @@ xlog_cil_ticket_alloc(
* transaction overhead reservation from the first transaction commit.
*/
tic->t_curr_res = 0;
+ tic->t_iclog_hdrs = 0;
return tic;
}
+static inline void
+xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil)
+{
+ struct xlog *log = cil->xc_log;
+
+ atomic_set(&cil->xc_iclog_hdrs,
+ (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) /
+ (log->l_iclog_size - log->l_iclog_hsize)));
+}
+
/*
* Check if the current log item was first committed in this sequence.
* We can't rely on just the log item being in the CIL, we have to check
@@ -61,7 +72,7 @@ xlog_item_in_current_chkpt(
struct xfs_cil *cil,
struct xfs_log_item *lip)
{
- if (list_empty(&lip->li_cil))
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
return false;
/*
@@ -93,15 +104,88 @@ xlog_cil_ctx_alloc(void)
ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS);
INIT_LIST_HEAD(&ctx->committing);
INIT_LIST_HEAD(&ctx->busy_extents);
+ INIT_LIST_HEAD(&ctx->log_items);
+ INIT_LIST_HEAD(&ctx->lv_chain);
INIT_WORK(&ctx->push_work, xlog_cil_push_work);
return ctx;
}
+/*
+ * Aggregate the CIL per cpu structures into global counts, lists, etc and
+ * clear the percpu state ready for the next context to use. This is called
+ * from the push code with the context lock held exclusively, hence nothing else
+ * will be accessing or modifying the per-cpu counters.
+ */
+static void
+xlog_cil_push_pcp_aggregate(
+ struct xfs_cil *cil,
+ struct xfs_cil_ctx *ctx)
+{
+ struct xlog_cil_pcp *cilpcp;
+ int cpu;
+
+ for_each_online_cpu(cpu) {
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+
+ ctx->ticket->t_curr_res += cilpcp->space_reserved;
+ cilpcp->space_reserved = 0;
+
+ if (!list_empty(&cilpcp->busy_extents)) {
+ list_splice_init(&cilpcp->busy_extents,
+ &ctx->busy_extents);
+ }
+ if (!list_empty(&cilpcp->log_items))
+ list_splice_init(&cilpcp->log_items, &ctx->log_items);
+
+ /*
+ * We're in the middle of switching cil contexts. Reset the
+ * counter we use to detect when the current context is nearing
+ * full.
+ */
+ cilpcp->space_used = 0;
+ }
+}
+
+/*
+ * Aggregate the CIL per-cpu space used counters into the global atomic value.
+ * This is called when the per-cpu counter aggregation will first pass the soft
+ * limit threshold so we can switch to atomic counter aggregation for accurate
+ * detection of hard limit traversal.
+ */
+static void
+xlog_cil_insert_pcp_aggregate(
+ struct xfs_cil *cil,
+ struct xfs_cil_ctx *ctx)
+{
+ struct xlog_cil_pcp *cilpcp;
+ int cpu;
+ int count = 0;
+
+ /* Trigger atomic updates then aggregate only for the first caller */
+ if (!test_and_clear_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags))
+ return;
+
+ for_each_online_cpu(cpu) {
+ int old, prev;
+
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+ do {
+ old = cilpcp->space_used;
+ prev = cmpxchg(&cilpcp->space_used, old, 0);
+ } while (old != prev);
+ count += old;
+ }
+ atomic_add(count, &ctx->space_used);
+}
+
static void
xlog_cil_ctx_switch(
struct xfs_cil *cil,
struct xfs_cil_ctx *ctx)
{
+ xlog_cil_set_iclog_hdr_count(cil);
+ set_bit(XLOG_CIL_EMPTY, &cil->xc_flags);
+ set_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags);
ctx->sequence = ++cil->xc_current_sequence;
ctx->cil = cil;
cil->xc_ctx = ctx;
@@ -123,6 +207,7 @@ xlog_cil_init_post_recovery(
{
log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log);
log->l_cilp->xc_ctx->sequence = 1;
+ xlog_cil_set_iclog_hdr_count(log->l_cilp);
}
static inline int
@@ -254,6 +339,7 @@ xlog_cil_alloc_shadow_bufs(
memset(lv, 0, xlog_cil_iovec_space(niovecs));
+ INIT_LIST_HEAD(&lv->lv_list);
lv->lv_item = lip;
lv->lv_size = buf_size;
if (ordered)
@@ -269,7 +355,6 @@ xlog_cil_alloc_shadow_bufs(
else
lv->lv_buf_len = 0;
lv->lv_bytes = 0;
- lv->lv_next = NULL;
}
/* Ensure the lv is set up according to ->iop_size */
@@ -396,7 +481,6 @@ xlog_cil_insert_format_items(
if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) {
/* same or smaller, optimise common overwrite case */
lv = lip->li_lv;
- lv->lv_next = NULL;
if (ordered)
goto insert;
@@ -434,6 +518,23 @@ insert:
}
/*
+ * The use of lockless waitqueue_active() requires that the caller has
+ * serialised itself against the wakeup call in xlog_cil_push_work(). That
+ * can be done by either holding the push lock or the context lock.
+ */
+static inline bool
+xlog_cil_over_hard_limit(
+ struct xlog *log,
+ int32_t space_used)
+{
+ if (waitqueue_active(&log->l_cilp->xc_push_wait))
+ return true;
+ if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log))
+ return true;
+ return false;
+}
+
+/*
* Insert the log items into the CIL and calculate the difference in space
* consumed by the item. Add the space to the checkpoint ticket and calculate
* if the change requires additional log metadata. If it does, take that space
@@ -450,8 +551,10 @@ xlog_cil_insert_items(
struct xfs_cil_ctx *ctx = cil->xc_ctx;
struct xfs_log_item *lip;
int len = 0;
- int iclog_space;
int iovhdr_res = 0, split_res = 0, ctx_res = 0;
+ int space_used;
+ int order;
+ struct xlog_cil_pcp *cilpcp;
ASSERT(tp);
@@ -461,93 +564,135 @@ xlog_cil_insert_items(
*/
xlog_cil_insert_format_items(log, tp, &len);
- spin_lock(&cil->xc_cil_lock);
+ /*
+ * Subtract the space released by intent cancelation from the space we
+ * consumed so that we remove it from the CIL space and add it back to
+ * the current transaction reservation context.
+ */
+ len -= released_space;
- /* attach the transaction to the CIL if it has any busy extents */
- if (!list_empty(&tp->t_busy))
- list_splice_init(&tp->t_busy, &ctx->busy_extents);
+ /*
+ * Grab the per-cpu pointer for the CIL before we start any accounting.
+ * That ensures that we are running with pre-emption disabled and so we
+ * can't be scheduled away between split sample/update operations that
+ * are done without outside locking to serialise them.
+ */
+ cilpcp = get_cpu_ptr(cil->xc_pcp);
/*
- * Now transfer enough transaction reservation to the context ticket
- * for the checkpoint. The context ticket is special - the unit
- * reservation has to grow as well as the current reservation as we
- * steal from tickets so we can correctly determine the space used
- * during the transaction commit.
+ * We need to take the CIL checkpoint unit reservation on the first
+ * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't
+ * unnecessarily do an atomic op in the fast path here. We can clear the
+ * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that
+ * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit.
*/
- if (ctx->ticket->t_curr_res == 0) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) &&
+ test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
ctx_res = ctx->ticket->t_unit_res;
- ctx->ticket->t_curr_res = ctx_res;
- tp->t_ticket->t_curr_res -= ctx_res;
- }
- /* do we need space for more log record headers? */
- iclog_space = log->l_iclog_size - log->l_iclog_hsize;
- if (len > 0 && (ctx->space_used / iclog_space !=
- (ctx->space_used + len) / iclog_space)) {
- split_res = (len + iclog_space - 1) / iclog_space;
- /* need to take into account split region headers, too */
- split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header);
- ctx->ticket->t_unit_res += split_res;
- ctx->ticket->t_curr_res += split_res;
- tp->t_ticket->t_curr_res -= split_res;
- ASSERT(tp->t_ticket->t_curr_res >= len);
+ /*
+ * Check if we need to steal iclog headers. atomic_read() is not a
+ * locked atomic operation, so we can check the value before we do any
+ * real atomic ops in the fast path. If we've already taken the CIL unit
+ * reservation from this commit, we've already got one iclog header
+ * space reserved so we have to account for that otherwise we risk
+ * overrunning the reservation on this ticket.
+ *
+ * If the CIL is already at the hard limit, we might need more header
+ * space that originally reserved. So steal more header space from every
+ * commit that occurs once we are over the hard limit to ensure the CIL
+ * push won't run out of reservation space.
+ *
+ * This can steal more than we need, but that's OK.
+ *
+ * The cil->xc_ctx_lock provides the serialisation necessary for safely
+ * calling xlog_cil_over_hard_limit() in this context.
+ */
+ space_used = atomic_read(&ctx->space_used) + cilpcp->space_used + len;
+ if (atomic_read(&cil->xc_iclog_hdrs) > 0 ||
+ xlog_cil_over_hard_limit(log, space_used)) {
+ split_res = log->l_iclog_hsize +
+ sizeof(struct xlog_op_header);
+ if (ctx_res)
+ ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1);
+ else
+ ctx_res = split_res * tp->t_ticket->t_iclog_hdrs;
+ atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs);
}
- tp->t_ticket->t_curr_res -= len;
- tp->t_ticket->t_curr_res += released_space;
- ctx->space_used += len;
- ctx->space_used -= released_space;
+ cilpcp->space_reserved += ctx_res;
/*
- * If we've overrun the reservation, dump the tx details before we move
- * the log items. Shutdown is imminent...
+ * Accurately account when over the soft limit, otherwise fold the
+ * percpu count into the global count if over the per-cpu threshold.
*/
- if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
- xfs_warn(log->l_mp, "Transaction log reservation overrun:");
- xfs_warn(log->l_mp,
- " log items: %d bytes (iov hdrs: %d bytes)",
- len, iovhdr_res);
- xfs_warn(log->l_mp, " split region headers: %d bytes",
- split_res);
- xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res);
- xlog_print_trans(tp);
+ if (!test_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) {
+ atomic_add(len, &ctx->space_used);
+ } else if (cilpcp->space_used + len >
+ (XLOG_CIL_SPACE_LIMIT(log) / num_online_cpus())) {
+ space_used = atomic_add_return(cilpcp->space_used + len,
+ &ctx->space_used);
+ cilpcp->space_used = 0;
+
+ /*
+ * If we just transitioned over the soft limit, we need to
+ * transition to the global atomic counter.
+ */
+ if (space_used >= XLOG_CIL_SPACE_LIMIT(log))
+ xlog_cil_insert_pcp_aggregate(cil, ctx);
+ } else {
+ cilpcp->space_used += len;
}
+ /* attach the transaction to the CIL if it has any busy extents */
+ if (!list_empty(&tp->t_busy))
+ list_splice_init(&tp->t_busy, &cilpcp->busy_extents);
/*
- * Now (re-)position everything modified at the tail of the CIL.
+ * Now update the order of everything modified in the transaction
+ * and insert items into the CIL if they aren't already there.
* We do this here so we only need to take the CIL lock once during
* the transaction commit.
*/
+ order = atomic_inc_return(&ctx->order_id);
list_for_each_entry(lip, &tp->t_items, li_trans) {
-
/* Skip items which aren't dirty in this transaction. */
if (!test_bit(XFS_LI_DIRTY, &lip->li_flags))
continue;
- /*
- * Only move the item if it isn't already at the tail. This is
- * to prevent a transient list_empty() state when reinserting
- * an item that is already the only item in the CIL.
- */
- if (!list_is_last(&lip->li_cil, &cil->xc_cil))
- list_move_tail(&lip->li_cil, &cil->xc_cil);
+ lip->li_order_id = order;
+ if (!list_empty(&lip->li_cil))
+ continue;
+ list_add_tail(&lip->li_cil, &cilpcp->log_items);
}
+ put_cpu_ptr(cilpcp);
- spin_unlock(&cil->xc_cil_lock);
-
- if (tp->t_ticket->t_curr_res < 0)
+ /*
+ * If we've overrun the reservation, dump the tx details before we move
+ * the log items. Shutdown is imminent...
+ */
+ tp->t_ticket->t_curr_res -= ctx_res + len;
+ if (WARN_ON(tp->t_ticket->t_curr_res < 0)) {
+ xfs_warn(log->l_mp, "Transaction log reservation overrun:");
+ xfs_warn(log->l_mp,
+ " log items: %d bytes (iov hdrs: %d bytes)",
+ len, iovhdr_res);
+ xfs_warn(log->l_mp, " split region headers: %d bytes",
+ split_res);
+ xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res);
+ xlog_print_trans(tp);
xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
+ }
}
static void
xlog_cil_free_logvec(
- struct xfs_log_vec *log_vector)
+ struct list_head *lv_chain)
{
struct xfs_log_vec *lv;
- for (lv = log_vector; lv; ) {
- struct xfs_log_vec *next = lv->lv_next;
+ while (!list_empty(lv_chain)) {
+ lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list);
+ list_del_init(&lv->lv_list);
kmem_free(lv);
- lv = next;
}
}
@@ -647,7 +792,7 @@ xlog_cil_committed(
spin_unlock(&ctx->cil->xc_push_lock);
}
- xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain,
+ xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, &ctx->lv_chain,
ctx->start_lsn, abort);
xfs_extent_busy_sort(&ctx->busy_extents);
@@ -658,7 +803,7 @@ xlog_cil_committed(
list_del(&ctx->committing);
spin_unlock(&ctx->cil->xc_push_lock);
- xlog_cil_free_logvec(ctx->lv_chain);
+ xlog_cil_free_logvec(&ctx->lv_chain);
if (!list_empty(&ctx->busy_extents))
xlog_discard_busy_extents(mp, ctx);
@@ -817,7 +962,6 @@ restart:
static int
xlog_cil_write_chain(
struct xfs_cil_ctx *ctx,
- struct xfs_log_vec *chain,
uint32_t chain_len)
{
struct xlog *log = ctx->cil->xc_log;
@@ -826,7 +970,7 @@ xlog_cil_write_chain(
error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD);
if (error)
return error;
- return xlog_write(log, ctx, chain, ctx->ticket, chain_len);
+ return xlog_write(log, ctx, &ctx->lv_chain, ctx->ticket, chain_len);
}
/*
@@ -855,6 +999,8 @@ xlog_cil_write_commit_record(
.lv_iovecp = &reg,
};
int error;
+ LIST_HEAD(lv_chain);
+ list_add(&vec.lv_list, &lv_chain);
if (xlog_is_shutdown(log))
return -EIO;
@@ -865,7 +1011,7 @@ xlog_cil_write_commit_record(
/* account for space used by record data */
ctx->ticket->t_curr_res -= reg.i_len;
- error = xlog_write(log, ctx, &vec, ctx->ticket, reg.i_len);
+ error = xlog_write(log, ctx, &lv_chain, ctx->ticket, reg.i_len);
if (error)
xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR);
return error;
@@ -931,12 +1077,31 @@ xlog_cil_build_trans_hdr(
lvhdr->lv_niovecs = 2;
lvhdr->lv_iovecp = &hdr->lhdr[0];
lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len;
- lvhdr->lv_next = ctx->lv_chain;
tic->t_curr_res -= lvhdr->lv_bytes;
}
/*
+ * CIL item reordering compare function. We want to order in ascending ID order,
+ * but we want to leave items with the same ID in the order they were added to
+ * the list. This is important for operations like reflink where we log 4 order
+ * dependent intents in a single transaction when we overwrite an existing
+ * shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop),
+ * CUI (inc), BUI(remap)...
+ */
+static int
+xlog_cil_order_cmp(
+ void *priv,
+ const struct list_head *a,
+ const struct list_head *b)
+{
+ struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list);
+ struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list);
+
+ return l1->lv_order_id > l2->lv_order_id;
+}
+
+/*
* Pull all the log vectors off the items in the CIL, and remove the items from
* the CIL. We don't need the CIL lock here because it's only needed on the
* transaction commit side which is currently locked out by the flush lock.
@@ -947,18 +1112,16 @@ xlog_cil_build_trans_hdr(
*/
static void
xlog_cil_build_lv_chain(
- struct xfs_cil *cil,
struct xfs_cil_ctx *ctx,
struct list_head *whiteouts,
uint32_t *num_iovecs,
uint32_t *num_bytes)
{
- struct xfs_log_vec *lv = NULL;
-
- while (!list_empty(&cil->xc_cil)) {
+ while (!list_empty(&ctx->log_items)) {
struct xfs_log_item *item;
+ struct xfs_log_vec *lv;
- item = list_first_entry(&cil->xc_cil,
+ item = list_first_entry(&ctx->log_items,
struct xfs_log_item, li_cil);
if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) {
@@ -967,18 +1130,18 @@ xlog_cil_build_lv_chain(
continue;
}
- list_del_init(&item->li_cil);
- if (!ctx->lv_chain)
- ctx->lv_chain = item->li_lv;
- else
- lv->lv_next = item->li_lv;
lv = item->li_lv;
- item->li_lv = NULL;
- *num_iovecs += lv->lv_niovecs;
+ lv->lv_order_id = item->li_order_id;
/* we don't write ordered log vectors */
if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED)
*num_bytes += lv->lv_bytes;
+ *num_iovecs += lv->lv_niovecs;
+ list_add_tail(&lv->lv_list, &ctx->lv_chain);
+
+ list_del_init(&item->li_cil);
+ item->li_order_id = 0;
+ item->li_lv = NULL;
}
}
@@ -1022,10 +1185,11 @@ xlog_cil_push_work(
int num_bytes = 0;
int error = 0;
struct xlog_cil_trans_hdr thdr;
- struct xfs_log_vec lvhdr = { NULL };
+ struct xfs_log_vec lvhdr = {};
xfs_csn_t push_seq;
bool push_commit_stable;
LIST_HEAD (whiteouts);
+ struct xlog_ticket *ticket;
new_ctx = xlog_cil_ctx_alloc();
new_ctx->ticket = xlog_cil_ticket_alloc(log);
@@ -1049,12 +1213,14 @@ xlog_cil_push_work(
if (waitqueue_active(&cil->xc_push_wait))
wake_up_all(&cil->xc_push_wait);
+ xlog_cil_push_pcp_aggregate(cil, ctx);
+
/*
* Check if we've anything to push. If there is nothing, then we don't
* move on to a new sequence number and so we have to be able to push
* this sequence again later.
*/
- if (list_empty(&cil->xc_cil)) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
cil->xc_push_seq = 0;
spin_unlock(&cil->xc_push_lock);
goto out_skip;
@@ -1094,7 +1260,7 @@ xlog_cil_push_work(
list_add(&ctx->committing, &cil->xc_committing);
spin_unlock(&cil->xc_push_lock);
- xlog_cil_build_lv_chain(cil, ctx, &whiteouts, &num_iovecs, &num_bytes);
+ xlog_cil_build_lv_chain(ctx, &whiteouts, &num_iovecs, &num_bytes);
/*
* Switch the contexts so we can drop the context lock and move out
@@ -1127,14 +1293,30 @@ xlog_cil_push_work(
up_write(&cil->xc_ctx_lock);
/*
+ * Sort the log vector chain before we add the transaction headers.
+ * This ensures we always have the transaction headers at the start
+ * of the chain.
+ */
+ list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp);
+
+ /*
* Build a checkpoint transaction header and write it to the log to
* begin the transaction. We need to account for the space used by the
* transaction header here as it is not accounted for in xlog_write().
+ * Add the lvhdr to the head of the lv chain we pass to xlog_write() so
+ * it gets written into the iclog first.
*/
xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs);
num_bytes += lvhdr.lv_bytes;
+ list_add(&lvhdr.lv_list, &ctx->lv_chain);
- error = xlog_cil_write_chain(ctx, &lvhdr, num_bytes);
+ /*
+ * Take the lvhdr back off the lv_chain immediately after calling
+ * xlog_cil_write_chain() as it should not be passed to log IO
+ * completion.
+ */
+ error = xlog_cil_write_chain(ctx, num_bytes);
+ list_del(&lvhdr.lv_list);
if (error)
goto out_abort_free_ticket;
@@ -1142,7 +1324,14 @@ xlog_cil_push_work(
if (error)
goto out_abort_free_ticket;
- xfs_log_ticket_ungrant(log, ctx->ticket);
+ /*
+ * Grab the ticket from the ctx so we can ungrant it after releasing the
+ * commit_iclog. The ctx may be freed by the time we return from
+ * releasing the commit_iclog (i.e. checkpoint has been completed and
+ * callback run) so we can't reference the ctx after the call to
+ * xlog_state_release_iclog().
+ */
+ ticket = ctx->ticket;
/*
* If the checkpoint spans multiple iclogs, wait for all previous iclogs
@@ -1192,12 +1381,14 @@ xlog_cil_push_work(
if (push_commit_stable &&
ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE)
xlog_state_switch_iclogs(log, ctx->commit_iclog, 0);
- xlog_state_release_iclog(log, ctx->commit_iclog);
+ ticket = ctx->ticket;
+ xlog_state_release_iclog(log, ctx->commit_iclog, ticket);
/* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock);
xlog_cil_cleanup_whiteouts(&whiteouts);
+ xfs_log_ticket_ungrant(log, ticket);
return;
out_skip:
@@ -1207,17 +1398,19 @@ out_skip:
return;
out_abort_free_ticket:
- xfs_log_ticket_ungrant(log, ctx->ticket);
ASSERT(xlog_is_shutdown(log));
xlog_cil_cleanup_whiteouts(&whiteouts);
if (!ctx->commit_iclog) {
+ xfs_log_ticket_ungrant(log, ctx->ticket);
xlog_cil_committed(ctx);
return;
}
spin_lock(&log->l_icloglock);
- xlog_state_release_iclog(log, ctx->commit_iclog);
+ ticket = ctx->ticket;
+ xlog_state_release_iclog(log, ctx->commit_iclog, ticket);
/* Not safe to reference ctx now! */
spin_unlock(&log->l_icloglock);
+ xfs_log_ticket_ungrant(log, ticket);
}
/*
@@ -1232,18 +1425,27 @@ xlog_cil_push_background(
struct xlog *log) __releases(cil->xc_ctx_lock)
{
struct xfs_cil *cil = log->l_cilp;
+ int space_used = atomic_read(&cil->xc_ctx->space_used);
/*
* The cil won't be empty because we are called while holding the
- * context lock so whatever we added to the CIL will still be there
+ * context lock so whatever we added to the CIL will still be there.
*/
- ASSERT(!list_empty(&cil->xc_cil));
+ ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
/*
- * Don't do a background push if we haven't used up all the
- * space available yet.
+ * We are done if:
+ * - we haven't used up all the space available yet; or
+ * - we've already queued up a push; and
+ * - we're not over the hard limit; and
+ * - nothing has been over the hard limit.
+ *
+ * If so, we don't need to take the push lock as there's nothing to do.
*/
- if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) {
+ if (space_used < XLOG_CIL_SPACE_LIMIT(log) ||
+ (cil->xc_push_seq == cil->xc_current_sequence &&
+ space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) &&
+ !waitqueue_active(&cil->xc_push_wait))) {
up_read(&cil->xc_ctx_lock);
return;
}
@@ -1270,12 +1472,11 @@ xlog_cil_push_background(
* dipping back down under the hard limit.
*
* The ctx->xc_push_lock provides the serialisation necessary for safely
- * using the lockless waitqueue_active() check in this context.
+ * calling xlog_cil_over_hard_limit() in this context.
*/
- if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) ||
- waitqueue_active(&cil->xc_push_wait)) {
+ if (xlog_cil_over_hard_limit(log, space_used)) {
trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket);
- ASSERT(cil->xc_ctx->space_used < log->l_logsize);
+ ASSERT(space_used < log->l_logsize);
xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock);
return;
}
@@ -1334,7 +1535,8 @@ xlog_cil_push_now(
* If the CIL is empty or we've already pushed the sequence then
* there's no more work that we need to do.
*/
- if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) {
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) ||
+ push_seq <= cil->xc_push_seq) {
spin_unlock(&cil->xc_push_lock);
return;
}
@@ -1352,7 +1554,7 @@ xlog_cil_empty(
bool empty = false;
spin_lock(&cil->xc_push_lock);
- if (list_empty(&cil->xc_cil))
+ if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags))
empty = true;
spin_unlock(&cil->xc_push_lock);
return empty;
@@ -1483,7 +1685,7 @@ xlog_cil_flush(
* If the CIL is empty, make sure that any previous checkpoint that may
* still be in an active iclog is pushed to stable storage.
*/
- if (list_empty(&log->l_cilp->xc_cil))
+ if (test_bit(XLOG_CIL_EMPTY, &log->l_cilp->xc_flags))
xfs_log_force(log->l_mp, 0);
}
@@ -1568,7 +1770,7 @@ restart:
* we would have found the context on the committing list.
*/
if (sequence == cil->xc_current_sequence &&
- !list_empty(&cil->xc_cil)) {
+ !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) {
spin_unlock(&cil->xc_push_lock);
goto restart;
}
@@ -1589,14 +1791,48 @@ out_shutdown:
}
/*
+ * Move dead percpu state to the relevant CIL context structures.
+ *
+ * We have to lock the CIL context here to ensure that nothing is modifying
+ * the percpu state, either addition or removal. Both of these are done under
+ * the CIL context lock, so grabbing that exclusively here will ensure we can
+ * safely drain the cilpcp for the CPU that is dying.
+ */
+void
+xlog_cil_pcp_dead(
+ struct xlog *log,
+ unsigned int cpu)
+{
+ struct xfs_cil *cil = log->l_cilp;
+ struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+ struct xfs_cil_ctx *ctx;
+
+ down_write(&cil->xc_ctx_lock);
+ ctx = cil->xc_ctx;
+ if (ctx->ticket)
+ ctx->ticket->t_curr_res += cilpcp->space_reserved;
+ cilpcp->space_reserved = 0;
+
+ if (!list_empty(&cilpcp->log_items))
+ list_splice_init(&cilpcp->log_items, &ctx->log_items);
+ if (!list_empty(&cilpcp->busy_extents))
+ list_splice_init(&cilpcp->busy_extents, &ctx->busy_extents);
+ atomic_add(cilpcp->space_used, &ctx->space_used);
+ cilpcp->space_used = 0;
+ up_write(&cil->xc_ctx_lock);
+}
+
+/*
* Perform initial CIL structure initialisation.
*/
int
xlog_cil_init(
- struct xlog *log)
+ struct xlog *log)
{
- struct xfs_cil *cil;
- struct xfs_cil_ctx *ctx;
+ struct xfs_cil *cil;
+ struct xfs_cil_ctx *ctx;
+ struct xlog_cil_pcp *cilpcp;
+ int cpu;
cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL);
if (!cil)
@@ -1611,22 +1847,31 @@ xlog_cil_init(
if (!cil->xc_push_wq)
goto out_destroy_cil;
- INIT_LIST_HEAD(&cil->xc_cil);
+ cil->xc_log = log;
+ cil->xc_pcp = alloc_percpu(struct xlog_cil_pcp);
+ if (!cil->xc_pcp)
+ goto out_destroy_wq;
+
+ for_each_possible_cpu(cpu) {
+ cilpcp = per_cpu_ptr(cil->xc_pcp, cpu);
+ INIT_LIST_HEAD(&cilpcp->busy_extents);
+ INIT_LIST_HEAD(&cilpcp->log_items);
+ }
+
INIT_LIST_HEAD(&cil->xc_committing);
- spin_lock_init(&cil->xc_cil_lock);
spin_lock_init(&cil->xc_push_lock);
init_waitqueue_head(&cil->xc_push_wait);
init_rwsem(&cil->xc_ctx_lock);
init_waitqueue_head(&cil->xc_start_wait);
init_waitqueue_head(&cil->xc_commit_wait);
- cil->xc_log = log;
log->l_cilp = cil;
ctx = xlog_cil_ctx_alloc();
xlog_cil_ctx_switch(cil, ctx);
-
return 0;
+out_destroy_wq:
+ destroy_workqueue(cil->xc_push_wq);
out_destroy_cil:
kmem_free(cil);
return -ENOMEM;
@@ -1636,14 +1881,17 @@ void
xlog_cil_destroy(
struct xlog *log)
{
- if (log->l_cilp->xc_ctx) {
- if (log->l_cilp->xc_ctx->ticket)
- xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket);
- kmem_free(log->l_cilp->xc_ctx);
+ struct xfs_cil *cil = log->l_cilp;
+
+ if (cil->xc_ctx) {
+ if (cil->xc_ctx->ticket)
+ xfs_log_ticket_put(cil->xc_ctx->ticket);
+ kmem_free(cil->xc_ctx);
}
- ASSERT(list_empty(&log->l_cilp->xc_cil));
- destroy_workqueue(log->l_cilp->xc_push_wq);
- kmem_free(log->l_cilp);
+ ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags));
+ free_percpu(cil->xc_pcp);
+ destroy_workqueue(cil->xc_push_wq);
+ kmem_free(cil);
}