From 44c288732fdbd7e38460d156a40d29590bf93bce Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 3 Jan 2006 09:55:06 +0100 Subject: NFSv4: stateful NFSv4 RPC call interface The NFSv4 model requires us to complete all RPC calls that might establish state on the server whether or not the user wants to interrupt it. We may also need to schedule new work (including new RPC calls) in order to cancel the new state. The asynchronous RPC model will allow us to ensure that RPC calls always complete, but in order to allow for "synchronous" RPC, we want to add the ability to wait for completion. The waits are, of course, interruptible. Signed-off-by: Trond Myklebust --- net/sunrpc/sched.c | 78 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 19 deletions(-) (limited to 'net') diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index 2d74a1672028..82d158dad16d 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -264,6 +264,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) } EXPORT_SYMBOL(rpc_init_wait_queue); +static int rpc_wait_bit_interruptible(void *word) +{ + if (signal_pending(current)) + return -ERESTARTSYS; + schedule(); + return 0; +} + +/* + * Mark an RPC call as having completed by clearing the 'active' bit + */ +static inline void rpc_mark_complete_task(struct rpc_task *task) +{ + rpc_clear_active(task); + wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); +} + +/* + * Allow callers to wait for completion of an RPC call + */ +int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) +{ + if (action == NULL) + action = rpc_wait_bit_interruptible; + return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, + action, TASK_INTERRUPTIBLE); +} +EXPORT_SYMBOL(__rpc_wait_for_completion_task); + /* * Make an RPC task runnable. * @@ -299,10 +328,7 @@ static void rpc_make_runnable(struct rpc_task *task) static inline void rpc_schedule_run(struct rpc_task *task) { - /* Don't run a child twice! */ - if (RPC_IS_ACTIVATED(task)) - return; - task->tk_active = 1; + rpc_set_active(task); rpc_make_runnable(task); } @@ -324,8 +350,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, } /* Mark the task as being activated if so needed */ - if (!RPC_IS_ACTIVATED(task)) - task->tk_active = 1; + rpc_set_active(task); __rpc_add_wait_queue(q, task); @@ -580,14 +605,6 @@ void rpc_exit_task(struct rpc_task *task) } EXPORT_SYMBOL(rpc_exit_task); -static int rpc_wait_bit_interruptible(void *word) -{ - if (signal_pending(current)) - return -ERESTARTSYS; - schedule(); - return 0; -} - /* * This is the RPC `scheduler' (or rather, the finite state machine). */ @@ -680,6 +697,8 @@ static int __rpc_execute(struct rpc_task *task) dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); status = task->tk_status; + /* Wake up anyone who is waiting for task completion */ + rpc_mark_complete_task(task); /* Release all resources associated with the task */ rpc_release_task(task); return status; @@ -697,9 +716,7 @@ static int __rpc_execute(struct rpc_task *task) int rpc_execute(struct rpc_task *task) { - BUG_ON(task->tk_active); - - task->tk_active = 1; + rpc_set_active(task); rpc_set_running(task); return __rpc_execute(task); } @@ -761,6 +778,7 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons init_timer(&task->tk_timer); task->tk_timer.data = (unsigned long) task; task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; + atomic_set(&task->tk_count, 1); task->tk_client = clnt; task->tk_flags = flags; task->tk_ops = tk_ops; @@ -848,11 +866,13 @@ void rpc_release_task(struct rpc_task *task) { const struct rpc_call_ops *tk_ops = task->tk_ops; void *calldata = task->tk_calldata; - dprintk("RPC: %4d release task\n", task->tk_pid); #ifdef RPC_DEBUG BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); #endif + if (!atomic_dec_and_test(&task->tk_count)) + return; + dprintk("RPC: %4d release task\n", task->tk_pid); /* Remove from global task list */ spin_lock(&rpc_sched_lock); @@ -860,7 +880,6 @@ void rpc_release_task(struct rpc_task *task) spin_unlock(&rpc_sched_lock); BUG_ON (RPC_IS_QUEUED(task)); - task->tk_active = 0; /* Synchronously delete any running timer */ rpc_delete_timer(task); @@ -885,6 +904,27 @@ void rpc_release_task(struct rpc_task *task) tk_ops->rpc_release(calldata); } +/** + * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it + * @clnt - pointer to RPC client + * @flags - RPC flags + * @ops - RPC call ops + * @data - user call data + */ +struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, + const struct rpc_call_ops *ops, + void *data) +{ + struct rpc_task *task; + task = rpc_new_task(clnt, flags, ops, data); + if (task == NULL) + return ERR_PTR(-ENOMEM); + atomic_inc(&task->tk_count); + rpc_execute(task); + return task; +} +EXPORT_SYMBOL(rpc_run_task); + /** * rpc_find_parent - find the parent of a child task. * @child: child task -- cgit v1.2.3