summaryrefslogtreecommitdiffstats
path: root/fs/dlm/lowcomms.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/dlm/lowcomms.c')
-rw-r--r--fs/dlm/lowcomms.c131
1 files changed, 106 insertions, 25 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 3543a8fec907..5050fe05769b 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,6 +63,7 @@
/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
+#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct cbuf {
unsigned int base;
@@ -110,10 +111,12 @@ struct connection {
#define CF_CLOSE 6
#define CF_APP_LIMITED 7
#define CF_CLOSING 8
+#define CF_SHUTDOWN 9
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
+ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
struct page *rx_page;
struct cbuf cb;
int retries;
@@ -122,6 +125,7 @@ struct connection {
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
+ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -218,6 +222,7 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
spin_lock_init(&con->writequeue_lock);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
+ init_waitqueue_head(&con->shutdown_wait);
/* Setup action pointers for child sockets */
if (con->nodeid) {
@@ -619,6 +624,54 @@ static void close_connection(struct connection *con, bool and_other,
clear_bit(CF_CLOSING, &con->flags);
}
+static void shutdown_connection(struct connection *con)
+{
+ int ret;
+
+ if (cancel_work_sync(&con->swork)) {
+ log_print("canceled swork for node %d", con->nodeid);
+ clear_bit(CF_WRITE_PENDING, &con->flags);
+ }
+
+ mutex_lock(&con->sock_mutex);
+ /* nothing to shutdown */
+ if (!con->sock) {
+ mutex_unlock(&con->sock_mutex);
+ return;
+ }
+
+ set_bit(CF_SHUTDOWN, &con->flags);
+ ret = kernel_sock_shutdown(con->sock, SHUT_WR);
+ mutex_unlock(&con->sock_mutex);
+ if (ret) {
+ log_print("Connection %p failed to shutdown: %d will force close",
+ con, ret);
+ goto force_close;
+ } else {
+ ret = wait_event_timeout(con->shutdown_wait,
+ !test_bit(CF_SHUTDOWN, &con->flags),
+ DLM_SHUTDOWN_WAIT_TIMEOUT);
+ if (ret == 0) {
+ log_print("Connection %p shutdown timed out, will force close",
+ con);
+ goto force_close;
+ }
+ }
+
+ return;
+
+force_close:
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ close_connection(con, false, true, true);
+}
+
+static void dlm_tcp_shutdown(struct connection *con)
+{
+ if (con->othercon)
+ shutdown_connection(con->othercon);
+ shutdown_connection(con);
+}
+
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
@@ -685,14 +738,14 @@ static int receive_from_sock(struct connection *con)
page_address(con->rx_page),
con->cb.base, con->cb.len,
PAGE_SIZE);
- if (ret == -EBADMSG) {
- log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
- page_address(con->rx_page), con->cb.base,
+ if (ret < 0) {
+ log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
+ ret, page_address(con->rx_page), con->cb.base,
con->cb.len, r);
+ cbuf_eat(&con->cb, r);
+ } else {
+ cbuf_eat(&con->cb, ret);
}
- if (ret < 0)
- goto out_close;
- cbuf_eat(&con->cb, ret);
if (cbuf_empty(&con->cb) && !call_again_soon) {
__free_page(con->rx_page);
@@ -713,13 +766,18 @@ out_resched:
out_close:
mutex_unlock(&con->sock_mutex);
if (ret != -EAGAIN) {
- close_connection(con, true, true, false);
/* Reconnect when there is something to send */
+ close_connection(con, false, true, false);
+ if (ret == 0) {
+ log_print("connection %p got EOF from %d",
+ con, con->nodeid);
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ /* signal to breaking receive worker */
+ ret = -1;
+ }
}
- /* Don't return success if we really got EOF */
- if (ret == 0)
- ret = -EAGAIN;
-
return ret;
}
@@ -803,22 +861,18 @@ static int accept_from_sock(struct connection *con)
spin_lock_init(&othercon->writequeue_lock);
INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets);
+ init_waitqueue_head(&othercon->shutdown_wait);
set_bit(CF_IS_OTHERCON, &othercon->flags);
+ } else {
+ /* close other sock con if we have something new */
+ close_connection(othercon, false, true, false);
}
+
mutex_lock_nested(&othercon->sock_mutex, 2);
- if (!othercon->sock) {
- newcon->othercon = othercon;
- add_sock(newsock, othercon);
- addcon = othercon;
- mutex_unlock(&othercon->sock_mutex);
- }
- else {
- printk("Extra connection from node %d attempted\n", nodeid);
- result = -EAGAIN;
- mutex_unlock(&othercon->sock_mutex);
- mutex_unlock(&newcon->sock_mutex);
- goto accept_err;
- }
+ newcon->othercon = othercon;
+ add_sock(newsock, othercon);
+ addcon = othercon;
+ mutex_unlock(&othercon->sock_mutex);
}
else {
newcon->rx_action = receive_from_sock;
@@ -914,6 +968,7 @@ static void sctp_connect_to_sock(struct connection *con)
int result;
int addr_len;
struct socket *sock;
+ unsigned int mark;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
@@ -944,6 +999,13 @@ static void sctp_connect_to_sock(struct connection *con)
if (result < 0)
goto socket_err;
+ /* set skb mark */
+ result = dlm_comm_mark(con->nodeid, &mark);
+ if (result < 0)
+ goto bind_err;
+
+ sock_set_mark(sock->sk, mark);
+
con->rx_action = receive_from_sock;
con->connect_action = sctp_connect_to_sock;
add_sock(sock, con);
@@ -1006,6 +1068,7 @@ static void tcp_connect_to_sock(struct connection *con)
struct sockaddr_storage saddr, src_addr;
int addr_len;
struct socket *sock = NULL;
+ unsigned int mark;
int result;
if (con->nodeid == 0) {
@@ -1027,6 +1090,13 @@ static void tcp_connect_to_sock(struct connection *con)
if (result < 0)
goto out_err;
+ /* set skb mark */
+ result = dlm_comm_mark(con->nodeid, &mark);
+ if (result < 0)
+ goto out_err;
+
+ sock_set_mark(sock->sk, mark);
+
memset(&saddr, 0, sizeof(saddr));
result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
if (result < 0) {
@@ -1036,6 +1106,7 @@ static void tcp_connect_to_sock(struct connection *con)
con->rx_action = receive_from_sock;
con->connect_action = tcp_connect_to_sock;
+ con->shutdown_action = dlm_tcp_shutdown;
add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid
@@ -1111,6 +1182,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
goto create_out;
}
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
+
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
@@ -1185,6 +1258,7 @@ static int sctp_listen_for_all(void)
}
sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
sctp_sock_set_nodelay(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
@@ -1396,7 +1470,7 @@ out:
send_error:
mutex_unlock(&con->sock_mutex);
- close_connection(con, true, false, true);
+ close_connection(con, false, false, true);
/* Requeue the send work. When the work daemon runs again, it will try
a new connection, then call this function again. */
queue_work(send_workqueue, &con->swork);
@@ -1528,6 +1602,12 @@ static void stop_conn(struct connection *con)
_stop_conn(con, true);
}
+static void shutdown_conn(struct connection *con)
+{
+ if (con->shutdown_action)
+ con->shutdown_action(con);
+}
+
static void free_conn(struct connection *con)
{
close_connection(con, true, true, true);
@@ -1579,6 +1659,7 @@ void dlm_lowcomms_stop(void)
mutex_lock(&connections_lock);
dlm_allow_conn = 0;
mutex_unlock(&connections_lock);
+ foreach_conn(shutdown_conn);
work_flush();
clean_writequeues();
foreach_conn(free_conn);