summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/dlm/config.c44
-rw-r--r--fs/dlm/config.h2
-rw-r--r--fs/dlm/lockspace.c6
-rw-r--r--fs/dlm/lowcomms.c131
-rw-r--r--include/net/sock.h1
-rw-r--r--net/core/sock.c8
6 files changed, 164 insertions, 28 deletions
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 3b21082e1b55..47f0b98b707f 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -73,6 +73,7 @@ struct dlm_cluster {
unsigned int cl_log_debug;
unsigned int cl_log_info;
unsigned int cl_protocol;
+ unsigned int cl_mark;
unsigned int cl_timewarn_cs;
unsigned int cl_waitwarn_us;
unsigned int cl_new_rsb_count;
@@ -96,6 +97,7 @@ enum {
CLUSTER_ATTR_LOG_DEBUG,
CLUSTER_ATTR_LOG_INFO,
CLUSTER_ATTR_PROTOCOL,
+ CLUSTER_ATTR_MARK,
CLUSTER_ATTR_TIMEWARN_CS,
CLUSTER_ATTR_WAITWARN_US,
CLUSTER_ATTR_NEW_RSB_COUNT,
@@ -168,6 +170,7 @@ CLUSTER_ATTR(scan_secs, 1);
CLUSTER_ATTR(log_debug, 0);
CLUSTER_ATTR(log_info, 0);
CLUSTER_ATTR(protocol, 0);
+CLUSTER_ATTR(mark, 0);
CLUSTER_ATTR(timewarn_cs, 1);
CLUSTER_ATTR(waitwarn_us, 0);
CLUSTER_ATTR(new_rsb_count, 0);
@@ -183,6 +186,7 @@ static struct configfs_attribute *cluster_attrs[] = {
[CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug,
[CLUSTER_ATTR_LOG_INFO] = &cluster_attr_log_info,
[CLUSTER_ATTR_PROTOCOL] = &cluster_attr_protocol,
+ [CLUSTER_ATTR_MARK] = &cluster_attr_mark,
[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs,
[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us,
[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count,
@@ -196,6 +200,7 @@ enum {
COMM_ATTR_LOCAL,
COMM_ATTR_ADDR,
COMM_ATTR_ADDR_LIST,
+ COMM_ATTR_MARK,
};
enum {
@@ -228,6 +233,7 @@ struct dlm_comm {
int nodeid;
int local;
int addr_count;
+ unsigned int mark;
struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
};
@@ -465,6 +471,7 @@ static struct config_item *make_comm(struct config_group *g, const char *name)
cm->nodeid = -1;
cm->local = 0;
cm->addr_count = 0;
+ cm->mark = 0;
return &cm->item;
}
@@ -660,8 +667,28 @@ static ssize_t comm_addr_list_show(struct config_item *item, char *buf)
return 4096 - allowance;
}
+static ssize_t comm_mark_show(struct config_item *item, char *buf)
+{
+ return sprintf(buf, "%u\n", config_item_to_comm(item)->mark);
+}
+
+static ssize_t comm_mark_store(struct config_item *item, const char *buf,
+ size_t len)
+{
+ unsigned int mark;
+ int rc;
+
+ rc = kstrtouint(buf, 0, &mark);
+ if (rc)
+ return rc;
+
+ config_item_to_comm(item)->mark = mark;
+ return len;
+}
+
CONFIGFS_ATTR(comm_, nodeid);
CONFIGFS_ATTR(comm_, local);
+CONFIGFS_ATTR(comm_, mark);
CONFIGFS_ATTR_WO(comm_, addr);
CONFIGFS_ATTR_RO(comm_, addr_list);
@@ -670,6 +697,7 @@ static struct configfs_attribute *comm_attrs[] = {
[COMM_ATTR_LOCAL] = &comm_attr_local,
[COMM_ATTR_ADDR] = &comm_attr_addr,
[COMM_ATTR_ADDR_LIST] = &comm_attr_addr_list,
+ [COMM_ATTR_MARK] = &comm_attr_mark,
NULL,
};
@@ -829,6 +857,20 @@ int dlm_comm_seq(int nodeid, uint32_t *seq)
return 0;
}
+int dlm_comm_mark(int nodeid, unsigned int *mark)
+{
+ struct dlm_comm *cm;
+
+ cm = get_comm(nodeid);
+ if (!cm)
+ return -ENOENT;
+
+ *mark = cm->mark;
+ put_comm(cm);
+
+ return 0;
+}
+
int dlm_our_nodeid(void)
{
return local_comm ? local_comm->nodeid : 0;
@@ -855,6 +897,7 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
#define DEFAULT_LOG_DEBUG 0
#define DEFAULT_LOG_INFO 1
#define DEFAULT_PROTOCOL 0
+#define DEFAULT_MARK 0
#define DEFAULT_TIMEWARN_CS 500 /* 5 sec = 500 centiseconds */
#define DEFAULT_WAITWARN_US 0
#define DEFAULT_NEW_RSB_COUNT 128
@@ -871,6 +914,7 @@ struct dlm_config_info dlm_config = {
.ci_log_debug = DEFAULT_LOG_DEBUG,
.ci_log_info = DEFAULT_LOG_INFO,
.ci_protocol = DEFAULT_PROTOCOL,
+ .ci_mark = DEFAULT_MARK,
.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
.ci_waitwarn_us = DEFAULT_WAITWARN_US,
.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 2b471aae4e61..f62996cad561 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -31,6 +31,7 @@ struct dlm_config_info {
int ci_log_debug;
int ci_log_info;
int ci_protocol;
+ int ci_mark;
int ci_timewarn_cs;
int ci_waitwarn_us;
int ci_new_rsb_count;
@@ -45,6 +46,7 @@ void dlm_config_exit(void);
int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
int *count_out);
int dlm_comm_seq(int nodeid, uint32_t *seq);
+int dlm_comm_mark(int nodeid, unsigned int *mark);
int dlm_our_nodeid(void);
int dlm_our_addr(struct sockaddr_storage *addr, int num);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index e93670ecfae5..624617c12250 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -622,6 +622,9 @@ static int new_lockspace(const char *name, const char *cluster,
wait_event(ls->ls_recover_lock_wait,
test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
+ /* let kobject handle freeing of ls if there's an error */
+ do_unreg = 1;
+
ls->ls_kobj.kset = dlm_kset;
error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
"%s", ls->ls_name);
@@ -629,9 +632,6 @@ static int new_lockspace(const char *name, const char *cluster,
goto out_recoverd;
kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
- /* let kobject handle freeing of ls if there's an error */
- do_unreg = 1;
-
/* This uevent triggers dlm_controld in userspace to add us to the
group of nodes that are members of this lockspace (managed by the
cluster infrastructure.) Once it's done that, it tells us who the
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 3543a8fec907..5050fe05769b 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -63,6 +63,7 @@
/* Number of messages to send before rescheduling */
#define MAX_SEND_MSG_COUNT 25
+#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
struct cbuf {
unsigned int base;
@@ -110,10 +111,12 @@ struct connection {
#define CF_CLOSE 6
#define CF_APP_LIMITED 7
#define CF_CLOSING 8
+#define CF_SHUTDOWN 9
struct list_head writequeue; /* List of outgoing writequeue_entries */
spinlock_t writequeue_lock;
int (*rx_action) (struct connection *); /* What to do when active */
void (*connect_action) (struct connection *); /* What to do to connect */
+ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
struct page *rx_page;
struct cbuf cb;
int retries;
@@ -122,6 +125,7 @@ struct connection {
struct connection *othercon;
struct work_struct rwork; /* Receive workqueue */
struct work_struct swork; /* Send workqueue */
+ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
};
#define sock2con(x) ((struct connection *)(x)->sk_user_data)
@@ -218,6 +222,7 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
spin_lock_init(&con->writequeue_lock);
INIT_WORK(&con->swork, process_send_sockets);
INIT_WORK(&con->rwork, process_recv_sockets);
+ init_waitqueue_head(&con->shutdown_wait);
/* Setup action pointers for child sockets */
if (con->nodeid) {
@@ -619,6 +624,54 @@ static void close_connection(struct connection *con, bool and_other,
clear_bit(CF_CLOSING, &con->flags);
}
+static void shutdown_connection(struct connection *con)
+{
+ int ret;
+
+ if (cancel_work_sync(&con->swork)) {
+ log_print("canceled swork for node %d", con->nodeid);
+ clear_bit(CF_WRITE_PENDING, &con->flags);
+ }
+
+ mutex_lock(&con->sock_mutex);
+ /* nothing to shutdown */
+ if (!con->sock) {
+ mutex_unlock(&con->sock_mutex);
+ return;
+ }
+
+ set_bit(CF_SHUTDOWN, &con->flags);
+ ret = kernel_sock_shutdown(con->sock, SHUT_WR);
+ mutex_unlock(&con->sock_mutex);
+ if (ret) {
+ log_print("Connection %p failed to shutdown: %d will force close",
+ con, ret);
+ goto force_close;
+ } else {
+ ret = wait_event_timeout(con->shutdown_wait,
+ !test_bit(CF_SHUTDOWN, &con->flags),
+ DLM_SHUTDOWN_WAIT_TIMEOUT);
+ if (ret == 0) {
+ log_print("Connection %p shutdown timed out, will force close",
+ con);
+ goto force_close;
+ }
+ }
+
+ return;
+
+force_close:
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ close_connection(con, false, true, true);
+}
+
+static void dlm_tcp_shutdown(struct connection *con)
+{
+ if (con->othercon)
+ shutdown_connection(con->othercon);
+ shutdown_connection(con);
+}
+
/* Data received from remote end */
static int receive_from_sock(struct connection *con)
{
@@ -685,14 +738,14 @@ static int receive_from_sock(struct connection *con)
page_address(con->rx_page),
con->cb.base, con->cb.len,
PAGE_SIZE);
- if (ret == -EBADMSG) {
- log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
- page_address(con->rx_page), con->cb.base,
+ if (ret < 0) {
+ log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d",
+ ret, page_address(con->rx_page), con->cb.base,
con->cb.len, r);
+ cbuf_eat(&con->cb, r);
+ } else {
+ cbuf_eat(&con->cb, ret);
}
- if (ret < 0)
- goto out_close;
- cbuf_eat(&con->cb, ret);
if (cbuf_empty(&con->cb) && !call_again_soon) {
__free_page(con->rx_page);
@@ -713,13 +766,18 @@ out_resched:
out_close:
mutex_unlock(&con->sock_mutex);
if (ret != -EAGAIN) {
- close_connection(con, true, true, false);
/* Reconnect when there is something to send */
+ close_connection(con, false, true, false);
+ if (ret == 0) {
+ log_print("connection %p got EOF from %d",
+ con, con->nodeid);
+ /* handling for tcp shutdown */
+ clear_bit(CF_SHUTDOWN, &con->flags);
+ wake_up(&con->shutdown_wait);
+ /* signal to breaking receive worker */
+ ret = -1;
+ }
}
- /* Don't return success if we really got EOF */
- if (ret == 0)
- ret = -EAGAIN;
-
return ret;
}
@@ -803,22 +861,18 @@ static int accept_from_sock(struct connection *con)
spin_lock_init(&othercon->writequeue_lock);
INIT_WORK(&othercon->swork, process_send_sockets);
INIT_WORK(&othercon->rwork, process_recv_sockets);
+ init_waitqueue_head(&othercon->shutdown_wait);
set_bit(CF_IS_OTHERCON, &othercon->flags);
+ } else {
+ /* close other sock con if we have something new */
+ close_connection(othercon, false, true, false);
}
+
mutex_lock_nested(&othercon->sock_mutex, 2);
- if (!othercon->sock) {
- newcon->othercon = othercon;
- add_sock(newsock, othercon);
- addcon = othercon;
- mutex_unlock(&othercon->sock_mutex);
- }
- else {
- printk("Extra connection from node %d attempted\n", nodeid);
- result = -EAGAIN;
- mutex_unlock(&othercon->sock_mutex);
- mutex_unlock(&newcon->sock_mutex);
- goto accept_err;
- }
+ newcon->othercon = othercon;
+ add_sock(newsock, othercon);
+ addcon = othercon;
+ mutex_unlock(&othercon->sock_mutex);
}
else {
newcon->rx_action = receive_from_sock;
@@ -914,6 +968,7 @@ static void sctp_connect_to_sock(struct connection *con)
int result;
int addr_len;
struct socket *sock;
+ unsigned int mark;
if (con->nodeid == 0) {
log_print("attempt to connect sock 0 foiled");
@@ -944,6 +999,13 @@ static void sctp_connect_to_sock(struct connection *con)
if (result < 0)
goto socket_err;
+ /* set skb mark */
+ result = dlm_comm_mark(con->nodeid, &mark);
+ if (result < 0)
+ goto bind_err;
+
+ sock_set_mark(sock->sk, mark);
+
con->rx_action = receive_from_sock;
con->connect_action = sctp_connect_to_sock;
add_sock(sock, con);
@@ -1006,6 +1068,7 @@ static void tcp_connect_to_sock(struct connection *con)
struct sockaddr_storage saddr, src_addr;
int addr_len;
struct socket *sock = NULL;
+ unsigned int mark;
int result;
if (con->nodeid == 0) {
@@ -1027,6 +1090,13 @@ static void tcp_connect_to_sock(struct connection *con)
if (result < 0)
goto out_err;
+ /* set skb mark */
+ result = dlm_comm_mark(con->nodeid, &mark);
+ if (result < 0)
+ goto out_err;
+
+ sock_set_mark(sock->sk, mark);
+
memset(&saddr, 0, sizeof(saddr));
result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
if (result < 0) {
@@ -1036,6 +1106,7 @@ static void tcp_connect_to_sock(struct connection *con)
con->rx_action = receive_from_sock;
con->connect_action = tcp_connect_to_sock;
+ con->shutdown_action = dlm_tcp_shutdown;
add_sock(sock, con);
/* Bind to our cluster-known address connecting to avoid
@@ -1111,6 +1182,8 @@ static struct socket *tcp_create_listen_sock(struct connection *con,
goto create_out;
}
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
+
/* Turn off Nagle's algorithm */
tcp_sock_set_nodelay(sock->sk);
@@ -1185,6 +1258,7 @@ static int sctp_listen_for_all(void)
}
sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
+ sock_set_mark(sock->sk, dlm_config.ci_mark);
sctp_sock_set_nodelay(sock->sk);
write_lock_bh(&sock->sk->sk_callback_lock);
@@ -1396,7 +1470,7 @@ out:
send_error:
mutex_unlock(&con->sock_mutex);
- close_connection(con, true, false, true);
+ close_connection(con, false, false, true);
/* Requeue the send work. When the work daemon runs again, it will try
a new connection, then call this function again. */
queue_work(send_workqueue, &con->swork);
@@ -1528,6 +1602,12 @@ static void stop_conn(struct connection *con)
_stop_conn(con, true);
}
+static void shutdown_conn(struct connection *con)
+{
+ if (con->shutdown_action)
+ con->shutdown_action(con);
+}
+
static void free_conn(struct connection *con)
{
close_connection(con, true, true, true);
@@ -1579,6 +1659,7 @@ void dlm_lowcomms_stop(void)
mutex_lock(&connections_lock);
dlm_allow_conn = 0;
mutex_unlock(&connections_lock);
+ foreach_conn(shutdown_conn);
work_flush();
clean_writequeues();
foreach_conn(free_conn);
diff --git a/include/net/sock.h b/include/net/sock.h
index 497c64ff8bda..064637d1ddf6 100644
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -2696,6 +2696,7 @@ void sock_no_linger(struct sock *sk);
void sock_set_keepalive(struct sock *sk);
void sock_set_priority(struct sock *sk, u32 priority);
void sock_set_rcvbuf(struct sock *sk, int val);
+void sock_set_mark(struct sock *sk, u32 val);
void sock_set_reuseaddr(struct sock *sk);
void sock_set_reuseport(struct sock *sk);
void sock_set_sndtimeo(struct sock *sk, s64 secs);
diff --git a/net/core/sock.c b/net/core/sock.c
index 49cd5ffe673e..d29709e0790d 100644
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -820,6 +820,14 @@ void sock_set_rcvbuf(struct sock *sk, int val)
}
EXPORT_SYMBOL(sock_set_rcvbuf);
+void sock_set_mark(struct sock *sk, u32 val)
+{
+ lock_sock(sk);
+ sk->sk_mark = val;
+ release_sock(sk);
+}
+EXPORT_SYMBOL(sock_set_mark);
+
/*
* This is meant for all protocols to use and covers goings on
* at the socket level. Everything here is generic.