diff options
Diffstat (limited to 'fs/dlm/lowcomms.c')
-rw-r--r-- | fs/dlm/lowcomms.c | 1540 |
1 files changed, 742 insertions, 798 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 59f64c596233..4450721ec83c 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -63,41 +63,49 @@ #define NEEDED_RMEM (4*1024*1024) -/* Number of messages to send before rescheduling */ -#define MAX_SEND_MSG_COUNT 25 -#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) - struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ - struct mutex sock_mutex; + /* this semaphore is used to allow parallel recv/send in read + * lock mode. When we release a sock we need to held the write lock. + * + * However this is locking code and not nice. When we remove the + * othercon handling we can look into other mechanism to synchronize + * io handling to call sock_release() at the right time. + */ + struct rw_semaphore sock_lock; unsigned long flags; -#define CF_READ_PENDING 1 -#define CF_WRITE_PENDING 2 -#define CF_INIT_PENDING 4 +#define CF_APP_LIMITED 0 +#define CF_RECV_PENDING 1 +#define CF_SEND_PENDING 2 +#define CF_RECV_INTR 3 +#define CF_IO_STOP 4 #define CF_IS_OTHERCON 5 -#define CF_CLOSE 6 -#define CF_APP_LIMITED 7 -#define CF_CLOSING 8 -#define CF_SHUTDOWN 9 -#define CF_CONNECTED 10 -#define CF_RECONNECT 11 -#define CF_DELAY_CONNECT 12 -#define CF_EOF 13 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; - atomic_t writequeue_cnt; int retries; -#define MAX_CONNECT_RETRIES 3 struct hlist_node list; + /* due some connect()/accept() races we currently have this cross over + * connection attempt second connection for one node. + * + * There is a solution to avoid the race by introducing a connect + * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to + * connect. Otherside can connect but will only be considered that + * the other side wants to have a reconnect. + * + * However changing to this behaviour will break backwards compatible. + * In a DLM protocol major version upgrade we should remove this! + */ struct connection *othercon; - struct connection *sendcon; - struct work_struct rwork; /* Receive workqueue */ - struct work_struct swork; /* Send workqueue */ - wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ - unsigned char *rx_buf; - int rx_buflen; + struct work_struct rwork; /* receive worker */ + struct work_struct swork; /* send worker */ + unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; int rx_leftover; + int mark; + int addr_count; + int curr_addr_index; + struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; + spinlock_t addrs_lock; struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -136,13 +144,12 @@ struct dlm_msg { struct kref ref; }; -struct dlm_node_addr { - struct list_head list; +struct processqueue_entry { + unsigned char *buf; int nodeid; - int mark; - int addr_count; - int curr_addr_index; - struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; + int buflen; + + struct list_head list; }; struct dlm_proto_ops { @@ -157,10 +164,6 @@ struct dlm_proto_ops { int (*listen_validate)(void); void (*listen_sockopts)(struct socket *sock); int (*listen_bind)(struct socket *sock); - /* What to do to shutdown */ - void (*shutdown_action)(struct connection *con); - /* What to do to eof check */ - bool (*eof_condition)(struct connection *con); }; static struct listen_sock_callbacks { @@ -170,17 +173,13 @@ static struct listen_sock_callbacks { void (*sk_write_space)(struct sock *); } listen_sock; -static LIST_HEAD(dlm_node_addrs); -static DEFINE_SPINLOCK(dlm_node_addrs_spin); - static struct listen_connection listen_con; -static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; +static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; static int dlm_local_count; -int dlm_allow_conn; /* Work queues */ -static struct workqueue_struct *recv_workqueue; -static struct workqueue_struct *send_workqueue; +static struct workqueue_struct *io_workqueue; +static struct workqueue_struct *process_workqueue; static struct hlist_head connection_hash[CONN_HASH_SIZE]; static DEFINE_SPINLOCK(connections_lock); @@ -188,8 +187,45 @@ DEFINE_STATIC_SRCU(connections_srcu); static const struct dlm_proto_ops *dlm_proto_ops; +#define DLM_IO_SUCCESS 0 +#define DLM_IO_END 1 +#define DLM_IO_EOF 2 +#define DLM_IO_RESCHED 3 + static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); +static void process_dlm_messages(struct work_struct *work); + +static DECLARE_WORK(process_work, process_dlm_messages); +static DEFINE_SPINLOCK(processqueue_lock); +static bool process_dlm_messages_pending; +static LIST_HEAD(processqueue); + +bool dlm_lowcomms_is_running(void) +{ + return !!listen_con.sock; +} + +static void lowcomms_queue_swork(struct connection *con) +{ + assert_spin_locked(&con->writequeue_lock); + + if (!test_bit(CF_IO_STOP, &con->flags) && + !test_bit(CF_APP_LIMITED, &con->flags) && + !test_and_set_bit(CF_SEND_PENDING, &con->flags)) + queue_work(io_workqueue, &con->swork); +} + +static void lowcomms_queue_rwork(struct connection *con) +{ +#ifdef CONFIG_LOCKDEP + WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); +#endif + + if (!test_bit(CF_IO_STOP, &con->flags) && + !test_and_set_bit(CF_RECV_PENDING, &con->flags)) + queue_work(io_workqueue, &con->rwork); +} static void writequeue_entry_ctor(void *data) { @@ -214,15 +250,12 @@ static struct writequeue_entry *con_next_wq(struct connection *con) { struct writequeue_entry *e; - if (list_empty(&con->writequeue)) - return NULL; - - e = list_first_entry(&con->writequeue, struct writequeue_entry, - list); + e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, + list); /* if len is zero nothing is to send, if there are users filling * buffers we wait until the users are done so we can send more. */ - if (e->users || e->len == 0) + if (!e || e->users || e->len == 0) return NULL; return e; @@ -240,28 +273,15 @@ static struct connection *__find_con(int nodeid, int r) return NULL; } -static bool tcp_eof_condition(struct connection *con) -{ - return atomic_read(&con->writequeue_cnt); -} - -static int dlm_con_init(struct connection *con, int nodeid) +static void dlm_con_init(struct connection *con, int nodeid) { - con->rx_buflen = dlm_config.ci_buffer_size; - con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); - if (!con->rx_buf) - return -ENOMEM; - con->nodeid = nodeid; - mutex_init(&con->sock_mutex); + init_rwsem(&con->sock_lock); INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); - atomic_set(&con->writequeue_cnt, 0); INIT_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); - init_waitqueue_head(&con->shutdown_wait); - - return 0; + spin_lock_init(&con->addrs_lock); } /* @@ -271,7 +291,7 @@ static int dlm_con_init(struct connection *con, int nodeid) static struct connection *nodeid2con(int nodeid, gfp_t alloc) { struct connection *con, *tmp; - int r, ret; + int r; r = nodeid_hash(nodeid); con = __find_con(nodeid, r); @@ -282,11 +302,7 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) if (!con) return NULL; - ret = dlm_con_init(con, nodeid); - if (ret) { - kfree(con); - return NULL; - } + dlm_con_init(con, nodeid); spin_lock(&connections_lock); /* Because multiple workqueues/threads calls this function it can @@ -298,7 +314,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) tmp = __find_con(nodeid, r); if (tmp) { spin_unlock(&connections_lock); - kfree(con->rx_buf); kfree(con); return tmp; } @@ -309,29 +324,6 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) return con; } -/* Loop round all connections */ -static void foreach_conn(void (*conn_func)(struct connection *c)) -{ - int i; - struct connection *con; - - for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_rcu(con, &connection_hash[i], list) - conn_func(con); - } -} - -static struct dlm_node_addr *find_node_addr(int nodeid) -{ - struct dlm_node_addr *na; - - list_for_each_entry(na, &dlm_node_addrs, list) { - if (na->nodeid == nodeid) - return na; - } - return NULL; -} - static int addr_compare(const struct sockaddr_storage *x, const struct sockaddr_storage *y) { @@ -365,40 +357,47 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, unsigned int *mark) { struct sockaddr_storage sas; - struct dlm_node_addr *na; + struct connection *con; + int idx; if (!dlm_local_count) return -1; - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (na && na->addr_count) { - memcpy(&sas, na->addr[na->curr_addr_index], - sizeof(struct sockaddr_storage)); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; + } - if (try_new_addr) { - na->curr_addr_index++; - if (na->curr_addr_index == na->addr_count) - na->curr_addr_index = 0; - } + spin_lock(&con->addrs_lock); + if (!con->addr_count) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; } - spin_unlock(&dlm_node_addrs_spin); - if (!na) - return -EEXIST; + memcpy(&sas, &con->addr[con->curr_addr_index], + sizeof(struct sockaddr_storage)); - if (!na->addr_count) - return -ENOENT; + if (try_new_addr) { + con->curr_addr_index++; + if (con->curr_addr_index == con->addr_count) + con->curr_addr_index = 0; + } - *mark = na->mark; + *mark = con->mark; + spin_unlock(&con->addrs_lock); if (sas_out) memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); - if (!sa_out) + if (!sa_out) { + srcu_read_unlock(&connections_srcu, idx); return 0; + } - if (dlm_local_addr[0]->ss_family == AF_INET) { + if (dlm_local_addr[0].ss_family == AF_INET) { struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; ret4->sin_addr.s_addr = in4->sin_addr.s_addr; @@ -408,43 +407,46 @@ static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, ret6->sin6_addr = in6->sin6_addr; } + srcu_read_unlock(&connections_srcu, idx); return 0; } static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, unsigned int *mark) { - struct dlm_node_addr *na; - int rv = -EEXIST; - int addr_i; - - spin_lock(&dlm_node_addrs_spin); - list_for_each_entry(na, &dlm_node_addrs, list) { - if (!na->addr_count) - continue; - - for (addr_i = 0; addr_i < na->addr_count; addr_i++) { - if (addr_compare(na->addr[addr_i], addr)) { - *nodeid = na->nodeid; - *mark = na->mark; - rv = 0; - goto unlock; + struct connection *con; + int i, idx, addr_i; + + idx = srcu_read_lock(&connections_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(con, &connection_hash[i], list) { + WARN_ON_ONCE(!con->addr_count); + + spin_lock(&con->addrs_lock); + for (addr_i = 0; addr_i < con->addr_count; addr_i++) { + if (addr_compare(&con->addr[addr_i], addr)) { + *nodeid = con->nodeid; + *mark = con->mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); + return 0; + } } + spin_unlock(&con->addrs_lock); } } -unlock: - spin_unlock(&dlm_node_addrs_spin); - return rv; + srcu_read_unlock(&connections_srcu, idx); + + return -ENOENT; } -/* caller need to held dlm_node_addrs_spin lock */ -static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na, - const struct sockaddr_storage *addr) +static bool dlm_lowcomms_con_has_addr(const struct connection *con, + const struct sockaddr_storage *addr) { int i; - for (i = 0; i < na->addr_count; i++) { - if (addr_compare(na->addr[i], addr)) + for (i = 0; i < con->addr_count; i++) { + if (addr_compare(&con->addr[i], addr)) return true; } @@ -453,118 +455,82 @@ static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na, int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) { - struct sockaddr_storage *new_addr; - struct dlm_node_addr *new_node, *na; - bool ret; - - new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); - if (!new_node) - return -ENOMEM; + struct connection *con; + bool ret, idx; - new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); - if (!new_addr) { - kfree(new_node); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, GFP_NOFS); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return -ENOMEM; } - memcpy(new_addr, addr, len); - - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (!na) { - new_node->nodeid = nodeid; - new_node->addr[0] = new_addr; - new_node->addr_count = 1; - new_node->mark = dlm_config.ci_mark; - list_add(&new_node->list, &dlm_node_addrs); - spin_unlock(&dlm_node_addrs_spin); + spin_lock(&con->addrs_lock); + if (!con->addr_count) { + memcpy(&con->addr[0], addr, sizeof(*addr)); + con->addr_count = 1; + con->mark = dlm_config.ci_mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return 0; } - ret = dlm_lowcomms_na_has_addr(na, addr); + ret = dlm_lowcomms_con_has_addr(con, addr); if (ret) { - spin_unlock(&dlm_node_addrs_spin); - kfree(new_addr); - kfree(new_node); + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return -EEXIST; } - if (na->addr_count >= DLM_MAX_ADDR_COUNT) { - spin_unlock(&dlm_node_addrs_spin); - kfree(new_addr); - kfree(new_node); + if (con->addr_count >= DLM_MAX_ADDR_COUNT) { + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return -ENOSPC; } - na->addr[na->addr_count++] = new_addr; - spin_unlock(&dlm_node_addrs_spin); - kfree(new_node); + memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); + srcu_read_unlock(&connections_srcu, idx); + spin_unlock(&con->addrs_lock); return 0; } /* Data available on socket or listen socket received a connect */ static void lowcomms_data_ready(struct sock *sk) { - struct connection *con; - - con = sock2con(sk); - if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); -} - -static void lowcomms_listen_data_ready(struct sock *sk) -{ - if (!dlm_allow_conn) - return; + struct connection *con = sock2con(sk); - queue_work(recv_workqueue, &listen_con.rwork); + set_bit(CF_RECV_INTR, &con->flags); + lowcomms_queue_rwork(con); } static void lowcomms_write_space(struct sock *sk) { - struct connection *con; - - con = sock2con(sk); - if (!con) - return; - - if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { - log_print("connected to node %d", con->nodeid); - queue_work(send_workqueue, &con->swork); - return; - } + struct connection *con = sock2con(sk); clear_bit(SOCK_NOSPACE, &con->sock->flags); + spin_lock_bh(&con->writequeue_lock); if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { con->sock->sk->sk_write_pending--; clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); } - queue_work(send_workqueue, &con->swork); -} - -static inline void lowcomms_connect_sock(struct connection *con) -{ - if (test_bit(CF_CLOSE, &con->flags)) - return; - queue_work(send_workqueue, &con->swork); - cond_resched(); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); } static void lowcomms_state_change(struct sock *sk) { /* SCTP layer is not calling sk_data_ready when the connection - * is done, so we catch the signal through here. Also, it - * doesn't switch socket state when entering shutdown, so we - * skip the write in that case. + * is done, so we catch the signal through here. */ - if (sk->sk_shutdown) { - if (sk->sk_shutdown == RCV_SHUTDOWN) - lowcomms_data_ready(sk); - } else if (sk->sk_state == TCP_ESTABLISHED) { - lowcomms_write_space(sk); - } + if (sk->sk_shutdown == RCV_SHUTDOWN) + lowcomms_data_ready(sk); +} + +static void lowcomms_listen_data_ready(struct sock *sk) +{ + queue_work(io_workqueue, &listen_con.rwork); } int dlm_lowcomms_connect_node(int nodeid) @@ -576,47 +542,49 @@ int dlm_lowcomms_connect_node(int nodeid) return 0; idx = srcu_read_lock(&connections_srcu); - con = nodeid2con(nodeid, GFP_NOFS); - if (!con) { + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { srcu_read_unlock(&connections_srcu, idx); - return -ENOMEM; + return -ENOENT; } - lowcomms_connect_sock(con); + down_read(&con->sock_lock); + if (!con->sock) { + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + } + up_read(&con->sock_lock); srcu_read_unlock(&connections_srcu, idx); + cond_resched(); return 0; } int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) { - struct dlm_node_addr *na; + struct connection *con; + int idx; - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (!na) { - spin_unlock(&dlm_node_addrs_spin); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); return -ENOENT; } - na->mark = mark; - spin_unlock(&dlm_node_addrs_spin); - + spin_lock(&con->addrs_lock); + con->mark = mark; + spin_unlock(&con->addrs_lock); + srcu_read_unlock(&connections_srcu, idx); return 0; } static void lowcomms_error_report(struct sock *sk) { - struct connection *con; - void (*orig_report)(struct sock *) = NULL; + struct connection *con = sock2con(sk); struct inet_sock *inet; - con = sock2con(sk); - if (con == NULL) - goto out; - - orig_report = listen_sock.sk_error_report; - inet = inet_sk(sk); switch (sk->sk_family) { case AF_INET: @@ -642,66 +610,25 @@ static void lowcomms_error_report(struct sock *sk) "invalid socket family %d set, " "sk_err=%d/%d\n", dlm_our_nodeid(), sk->sk_family, sk->sk_err, sk->sk_err_soft); - goto out; - } - - /* below sendcon only handling */ - if (test_bit(CF_IS_OTHERCON, &con->flags)) - con = con->sendcon; - - switch (sk->sk_err) { - case ECONNREFUSED: - set_bit(CF_DELAY_CONNECT, &con->flags); - break; - default: break; } - if (!test_and_set_bit(CF_RECONNECT, &con->flags)) - queue_work(send_workqueue, &con->swork); + dlm_midcomms_unack_msg_resend(con->nodeid); -out: - if (orig_report) - orig_report(sk); + listen_sock.sk_error_report(sk); } -/* Note: sk_callback_lock must be locked before calling this function. */ -static void save_listen_callbacks(struct socket *sock) +static void restore_callbacks(struct sock *sk) { - struct sock *sk = sock->sk; - - listen_sock.sk_data_ready = sk->sk_data_ready; - listen_sock.sk_state_change = sk->sk_state_change; - listen_sock.sk_write_space = sk->sk_write_space; - listen_sock.sk_error_report = sk->sk_error_report; -} - -static void restore_callbacks(struct socket *sock) -{ - struct sock *sk = sock->sk; +#ifdef CONFIG_LOCKDEP + WARN_ON_ONCE(!lockdep_sock_is_held(sk)); +#endif - lock_sock(sk); sk->sk_user_data = NULL; sk->sk_data_ready = listen_sock.sk_data_ready; sk->sk_state_change = listen_sock.sk_state_change; sk->sk_write_space = listen_sock.sk_write_space; sk->sk_error_report = listen_sock.sk_error_report; - release_sock(sk); -} - -static void add_listen_sock(struct socket *sock, struct listen_connection *con) -{ - struct sock *sk = sock->sk; - - lock_sock(sk); - save_listen_callbacks(sock); - con->sock = sock; - - sk->sk_user_data = con; - sk->sk_allocation = GFP_NOFS; - /* Install a data_ready callback */ - sk->sk_data_ready = lowcomms_listen_data_ready; - release_sock(sk); } /* Make a socket active */ @@ -713,11 +640,12 @@ static void add_sock(struct socket *sock, struct connection *con) con->sock = sock; sk->sk_user_data = con; - /* Install a data_ready callback */ sk->sk_data_ready = lowcomms_data_ready; sk->sk_write_space = lowcomms_write_space; - sk->sk_state_change = lowcomms_state_change; + if (dlm_config.ci_protocol == DLM_PROTO_SCTP) + sk->sk_state_change = lowcomms_state_change; sk->sk_allocation = GFP_NOFS; + sk->sk_use_task_frag = false; sk->sk_error_report = lowcomms_error_report; release_sock(sk); } @@ -727,7 +655,7 @@ static void add_sock(struct socket *sock, struct connection *con) static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, int *addr_len) { - saddr->ss_family = dlm_local_addr[0]->ss_family; + saddr->ss_family = dlm_local_addr[0].ss_family; if (saddr->ss_family == AF_INET) { struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; in4_addr->sin_port = cpu_to_be16(port); @@ -773,43 +701,67 @@ static void free_entry(struct writequeue_entry *e) } list_del(&e->list); - atomic_dec(&e->con->writequeue_cnt); kref_put(&e->ref, dlm_page_release); } static void dlm_close_sock(struct socket **sock) { - if (*sock) { - restore_callbacks(*sock); - sock_release(*sock); - *sock = NULL; + lock_sock((*sock)->sk); + restore_callbacks((*sock)->sk); + release_sock((*sock)->sk); + + sock_release(*sock); + *sock = NULL; +} + +static void allow_connection_io(struct connection *con) +{ + if (con->othercon) + clear_bit(CF_IO_STOP, &con->othercon->flags); + clear_bit(CF_IO_STOP, &con->flags); +} + +static void stop_connection_io(struct connection *con) +{ + if (con->othercon) + stop_connection_io(con->othercon); + + down_write(&con->sock_lock); + if (con->sock) { + lock_sock(con->sock->sk); + restore_callbacks(con->sock->sk); + + spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + release_sock(con->sock->sk); + } else { + spin_lock_bh(&con->writequeue_lock); + set_bit(CF_IO_STOP, &con->flags); + spin_unlock_bh(&con->writequeue_lock); } + up_write(&con->sock_lock); + + cancel_work_sync(&con->swork); + cancel_work_sync(&con->rwork); } /* Close a remote connection and tidy up */ -static void close_connection(struct connection *con, bool and_other, - bool tx, bool rx) +static void close_connection(struct connection *con, bool and_other) { - bool closing = test_and_set_bit(CF_CLOSING, &con->flags); struct writequeue_entry *e; - if (tx && !closing && cancel_work_sync(&con->swork)) { - log_print("canceled swork for node %d", con->nodeid); - clear_bit(CF_WRITE_PENDING, &con->flags); - } - if (rx && !closing && cancel_work_sync(&con->rwork)) { - log_print("canceled rwork for node %d", con->nodeid); - clear_bit(CF_READ_PENDING, &con->flags); + if (con->othercon && and_other) + close_connection(con->othercon, false); + + down_write(&con->sock_lock); + if (!con->sock) { + up_write(&con->sock_lock); + return; } - mutex_lock(&con->sock_mutex); dlm_close_sock(&con->sock); - if (con->othercon && and_other) { - /* Will only re-enter once. */ - close_connection(con->othercon, false, tx, rx); - } - /* if we send a writequeue entry only a half way, we drop the * whole entry because reconnection and that we not start of the * middle of a msg which will confuse the other end. @@ -821,200 +773,209 @@ static void close_connection(struct connection *con, bool and_other, * our policy is to start on a clean state when disconnects, we don't * know what's send/received on transport layer in this case. */ - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); if (!list_empty(&con->writequeue)) { e = list_first_entry(&con->writequeue, struct writequeue_entry, list); if (e->dirty) free_entry(e); } - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); con->rx_leftover = 0; con->retries = 0; clear_bit(CF_APP_LIMITED, &con->flags); - clear_bit(CF_CONNECTED, &con->flags); - clear_bit(CF_DELAY_CONNECT, &con->flags); - clear_bit(CF_RECONNECT, &con->flags); - clear_bit(CF_EOF, &con->flags); - mutex_unlock(&con->sock_mutex); - clear_bit(CF_CLOSING, &con->flags); + clear_bit(CF_RECV_PENDING, &con->flags); + clear_bit(CF_SEND_PENDING, &con->flags); + up_write(&con->sock_lock); } -static void shutdown_connection(struct connection *con) +static struct processqueue_entry *new_processqueue_entry(int nodeid, + int buflen) { - int ret; - - flush_work(&con->swork); + struct processqueue_entry *pentry; - mutex_lock(&con->sock_mutex); - /* nothing to shutdown */ - if (!con->sock) { - mutex_unlock(&con->sock_mutex); - return; - } + pentry = kmalloc(sizeof(*pentry), GFP_NOFS); + if (!pentry) + return NULL; - set_bit(CF_SHUTDOWN, &con->flags); - ret = kernel_sock_shutdown(con->sock, SHUT_WR); - mutex_unlock(&con->sock_mutex); - if (ret) { - log_print("Connection %p failed to shutdown: %d will force close", - con, ret); - goto force_close; - } else { - ret = wait_event_timeout(con->shutdown_wait, - !test_bit(CF_SHUTDOWN, &con->flags), - DLM_SHUTDOWN_WAIT_TIMEOUT); - if (ret == 0) { - log_print("Connection %p shutdown timed out, will force close", - con); - goto force_close; - } + pentry->buf = kmalloc(buflen, GFP_NOFS); + if (!pentry->buf) { + kfree(pentry); + return NULL; } - return; + pentry->nodeid = nodeid; + return pentry; +} -force_close: - clear_bit(CF_SHUTDOWN, &con->flags); - close_connection(con, false, true, true); +static void free_processqueue_entry(struct processqueue_entry *pentry) +{ + kfree(pentry->buf); + kfree(pentry); } -static void dlm_tcp_shutdown(struct connection *con) +struct dlm_processed_nodes { + int nodeid; + + struct list_head list; +}; + +static void add_processed_node(int nodeid, struct list_head *processed_nodes) { - if (con->othercon) - shutdown_connection(con->othercon); - shutdown_connection(con); + struct dlm_processed_nodes *n; + + list_for_each_entry(n, processed_nodes, list) { + /* we already remembered this node */ + if (n->nodeid == nodeid) + return; + } + + /* if it's fails in worst case we simple don't send an ack back. + * We try it next time. + */ + n = kmalloc(sizeof(*n), GFP_NOFS); + if (!n) + return; + + n->nodeid = nodeid; + list_add(&n->list, processed_nodes); } -static int con_realloc_receive_buf(struct connection *con, int newlen) +static void process_dlm_messages(struct work_struct *work) { - unsigned char *newbuf; + struct dlm_processed_nodes *n, *n_tmp; + struct processqueue_entry *pentry; + LIST_HEAD(processed_nodes); - newbuf = kmalloc(newlen, GFP_NOFS); - if (!newbuf) - return -ENOMEM; + spin_lock(&processqueue_lock); + pentry = list_first_entry_or_null(&processqueue, + struct processqueue_entry, list); + if (WARN_ON_ONCE(!pentry)) { + spin_unlock(&processqueue_lock); + return; + } - /* copy any leftover from last receive */ - if (con->rx_leftover) - memmove(newbuf, con->rx_buf, con->rx_leftover); + list_del(&pentry->list); + spin_unlock(&processqueue_lock); - /* swap to new buffer space */ - kfree(con->rx_buf); - con->rx_buflen = newlen; - con->rx_buf = newbuf; + for (;;) { + dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, + pentry->buflen); + add_processed_node(pentry->nodeid, &processed_nodes); + free_processqueue_entry(pentry); + + spin_lock(&processqueue_lock); + pentry = list_first_entry_or_null(&processqueue, + struct processqueue_entry, list); + if (!pentry) { + process_dlm_messages_pending = false; + spin_unlock(&processqueue_lock); + break; + } - return 0; + list_del(&pentry->list); + spin_unlock(&processqueue_lock); + } + + /* send ack back after we processed couple of messages */ + list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { + list_del(&n->list); + dlm_midcomms_receive_done(n->nodeid); + kfree(n); + } } /* Data received from remote end */ -static int receive_from_sock(struct connection *con) +static int receive_from_sock(struct connection *con, int buflen) { + struct processqueue_entry *pentry; + int ret, buflen_real; struct msghdr msg; struct kvec iov; - int ret, buflen; - mutex_lock(&con->sock_mutex); + pentry = new_processqueue_entry(con->nodeid, buflen); + if (!pentry) + return DLM_IO_RESCHED; - if (con->sock == NULL) { - ret = -EAGAIN; - goto out_close; - } - - /* realloc if we get new buffer size to read out */ - buflen = dlm_config.ci_buffer_size; - if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { - ret = con_realloc_receive_buf(con, buflen); - if (ret < 0) - goto out_resched; - } + memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); - for (;;) { - /* calculate new buffer parameter regarding last receive and - * possible leftover bytes - */ - iov.iov_base = con->rx_buf + con->rx_leftover; - iov.iov_len = con->rx_buflen - con->rx_leftover; - - memset(&msg, 0, sizeof(msg)); - msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; - ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, - msg.msg_flags); - trace_dlm_recv(con->nodeid, ret); - if (ret == -EAGAIN) - break; - else if (ret <= 0) - goto out_close; - - /* new buflen according readed bytes and leftover from last receive */ - buflen = ret + con->rx_leftover; - ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); - if (ret < 0) - goto out_close; - - /* calculate leftover bytes from process and put it into begin of - * the receive buffer, so next receive we have the full message - * at the start address of the receive buffer. - */ - con->rx_leftover = buflen - ret; - if (con->rx_leftover) { - memmove(con->rx_buf, con->rx_buf + ret, - con->rx_leftover); + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes + */ + iov.iov_base = pentry->buf + con->rx_leftover; + iov.iov_len = buflen - con->rx_leftover; + + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + clear_bit(CF_RECV_INTR, &con->flags); +again: + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); + trace_dlm_recv(con->nodeid, ret); + if (ret == -EAGAIN) { + lock_sock(con->sock->sk); + if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { + release_sock(con->sock->sk); + goto again; } + + clear_bit(CF_RECV_PENDING, &con->flags); + release_sock(con->sock->sk); + free_processqueue_entry(pentry); + return DLM_IO_END; + } else if (ret == 0) { + /* close will clear CF_RECV_PENDING */ + free_processqueue_entry(pentry); + return DLM_IO_EOF; + } else if (ret < 0) { + free_processqueue_entry(pentry); + return ret; } - dlm_midcomms_receive_done(con->nodeid); - mutex_unlock(&con->sock_mutex); - return 0; + /* new buflen according readed bytes and leftover from last receive */ + buflen_real = ret + con->rx_leftover; + ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, + buflen_real); + if (ret < 0) { + free_processqueue_entry(pentry); + return ret; + } -out_resched: - if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) - queue_work(recv_workqueue, &con->rwork); - mutex_unlock(&con->sock_mutex); - return -EAGAIN; - -out_close: - if (ret == 0) { - log_print("connection %p got EOF from %d", - con, con->nodeid); - - if (dlm_proto_ops->eof_condition && - dlm_proto_ops->eof_condition(con)) { - set_bit(CF_EOF, &con->flags); - mutex_unlock(&con->sock_mutex); - } else { - mutex_unlock(&con->sock_mutex); - close_connection(con, false, true, false); + pentry->buflen = ret; - /* handling for tcp shutdown */ - clear_bit(CF_SHUTDOWN, &con->flags); - wake_up(&con->shutdown_wait); - } + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen_real - ret; + memmove(con->rx_leftover_buf, pentry->buf + ret, + con->rx_leftover); - /* signal to breaking receive worker */ - ret = -1; - } else { - mutex_unlock(&con->sock_mutex); + spin_lock(&processqueue_lock); + list_add_tail(&pentry->list, &processqueue); + if (!process_dlm_messages_pending) { + process_dlm_messages_pending = true; + queue_work(process_workqueue, &process_work); } - return ret; + spin_unlock(&processqueue_lock); + + return DLM_IO_SUCCESS; } /* Listening socket is busy, accept a connection */ -static int accept_from_sock(struct listen_connection *con) +static int accept_from_sock(void) { - int result; struct sockaddr_storage peeraddr; - struct socket *newsock; - int len, idx; - int nodeid; + int len, idx, result, nodeid; struct connection *newcon; - struct connection *addcon; + struct socket *newsock; unsigned int mark; - if (!con->sock) - return -ENOTCONN; - - result = kernel_accept(con->sock, &newsock, O_NONBLOCK); - if (result < 0) + result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); + if (result == -EAGAIN) + return DLM_IO_END; + else if (result < 0) goto accept_err; /* Get the connected socket's peer */ @@ -1062,16 +1023,16 @@ static int accept_from_sock(struct listen_connection *con) * In this case we store the incoming one in "othercon" */ idx = srcu_read_lock(&connections_srcu); - newcon = nodeid2con(nodeid, GFP_NOFS); - if (!newcon) { + newcon = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!newcon)) { srcu_read_unlock(&connections_srcu, idx); - result = -ENOMEM; + result = -ENOENT; goto accept_err; } sock_set_mark(newsock->sk, mark); - mutex_lock(&newcon->sock_mutex); + down_write(&newcon->sock_lock); if (newcon->sock) { struct connection *othercon = newcon->othercon; @@ -1079,63 +1040,50 @@ static int accept_from_sock(struct listen_connection *con) othercon = kzalloc(sizeof(*othercon), GFP_NOFS); if (!othercon) { log_print("failed to allocate incoming socket"); - mutex_unlock(&newcon->sock_mutex); + up_write(&newcon->sock_lock); srcu_read_unlock(&connections_srcu, idx); result = -ENOMEM; goto accept_err; } - result = dlm_con_init(othercon, nodeid); - if (result < 0) { - kfree(othercon); - mutex_unlock(&newcon->sock_mutex); - srcu_read_unlock(&connections_srcu, idx); - goto accept_err; - } - - lockdep_set_subclass(&othercon->sock_mutex, 1); - set_bit(CF_IS_OTHERCON, &othercon->flags); + dlm_con_init(othercon, nodeid); + lockdep_set_subclass(&othercon->sock_lock, 1); newcon->othercon = othercon; - othercon->sendcon = newcon; + set_bit(CF_IS_OTHERCON, &othercon->flags); } else { /* close other sock con if we have something new */ - close_connection(othercon, false, true, false); + close_connection(othercon, false); } - mutex_lock(&othercon->sock_mutex); + down_write(&othercon->sock_lock); add_sock(newsock, othercon); - addcon = othercon; - mutex_unlock(&othercon->sock_mutex); + + /* check if we receved something while adding */ + lock_sock(othercon->sock->sk); + lowcomms_queue_rwork(othercon); + release_sock(othercon->sock->sk); + up_write(&othercon->sock_lock); } else { /* accept copies the sk after we've saved the callbacks, so we don't want to save them a second time or comm errors will result in calling sk_error_report recursively. */ add_sock(newsock, newcon); - addcon = newcon; - } - - set_bit(CF_CONNECTED, &addcon->flags); - mutex_unlock(&newcon->sock_mutex); - - /* - * Add it to the active queue in case we got data - * between processing the accept adding the socket - * to the read_sockets list - */ - if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) - queue_work(recv_workqueue, &addcon->rwork); + /* check if we receved something while adding */ + lock_sock(newcon->sock->sk); + lowcomms_queue_rwork(newcon); + release_sock(newcon->sock->sk); + } + up_write(&newcon->sock_lock); srcu_read_unlock(&connections_srcu, idx); - return 0; + return DLM_IO_SUCCESS; accept_err: if (newsock) sock_release(newsock); - if (result != -EAGAIN) - log_print("error accepting connection from node: %d", result); return result; } @@ -1167,7 +1115,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port) int i, addr_len, result = 0; for (i = 0; i < dlm_local_count; i++) { - memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); + memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); make_sockaddr(&localaddr, port, &addr_len); if (!i) @@ -1187,7 +1135,7 @@ static int sctp_bind_addrs(struct socket *sock, uint16_t port) /* Get local addresses */ static void init_local(void) { - struct sockaddr_storage sas, *addr; + struct sockaddr_storage sas; int i; dlm_local_count = 0; @@ -1195,21 +1143,10 @@ static void init_local(void) if (dlm_our_addr(&sas, i)) break; - addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); - if (!addr) - break; - dlm_local_addr[dlm_local_count++] = addr; + memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); } } -static void deinit_local(void) -{ - int i; - - for (i = 0; i < dlm_local_count; i++) - kfree(dlm_local_addr[i]); -} - static struct writequeue_entry *new_writequeue_entry(struct connection *con) { struct writequeue_entry *entry; @@ -1240,7 +1177,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, { struct writequeue_entry *e; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); if (!list_empty(&con->writequeue)) { e = list_last_entry(&con->writequeue, struct writequeue_entry, list); if (DLM_WQ_REMAIN_BYTES(e) >= len) { @@ -1263,14 +1200,13 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len, kref_get(&e->ref); *ppc = page_address(e->page); e->end += len; - atomic_inc(&con->writequeue_cnt); if (cb) cb(data); list_add_tail(&e->list, &con->writequeue); out: - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); return e; }; @@ -1319,13 +1255,13 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, len < sizeof(struct dlm_header)) { BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); log_print("failed to allocate a buffer of size %d", len); - WARN_ON(1); + WARN_ON_ONCE(1); return NULL; } idx = srcu_read_lock(&connections_srcu); - con = nodeid2con(nodeid, allocation); - if (!con) { + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { srcu_read_unlock(&connections_srcu, idx); return NULL; } @@ -1350,7 +1286,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) struct connection *con = e->con; int users; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); kref_get(&msg->ref); list_add(&msg->list, &e->msgs); @@ -1359,13 +1295,11 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) goto out; e->len = DLM_WQ_LENGTH_BYTES(e); - spin_unlock(&con->writequeue_lock); - queue_work(send_workqueue, &con->swork); - return; + lowcomms_queue_swork(con); out: - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); return; } @@ -1387,7 +1321,7 @@ void dlm_lowcomms_put_msg(struct dlm_msg *msg) kref_put(&msg->ref, dlm_msg_release); } -/* does not held connections_srcu, usage workqueue only */ +/* does not held connections_srcu, usage lowcomms_error_report only */ int dlm_lowcomms_resend_msg(struct dlm_msg *msg) { struct dlm_msg *msg_resend; @@ -1413,90 +1347,79 @@ int dlm_lowcomms_resend_msg(struct dlm_msg *msg) } /* Send a message */ -static void send_to_sock(struct connection *con) +static int send_to_sock(struct connection *con) { const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; struct writequeue_entry *e; int len, offset, ret; - int count = 0; - mutex_lock(&con->sock_mutex); - if (con->sock == NULL) - goto out_connect; - - spin_lock(&con->writequeue_lock); - for (;;) { - e = con_next_wq(con); - if (!e) - break; + spin_lock_bh(&con->writequeue_lock); + e = con_next_wq(con); + if (!e) { + clear_bit(CF_SEND_PENDING, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + return DLM_IO_END; + } - len = e->len; - offset = e->offset; - BUG_ON(len == 0 && e->users == 0); - spin_unlock(&con->writequeue_lock); - - ret = kernel_sendpage(con->sock, e->page, offset, len, - msg_flags); - trace_dlm_send(con->nodeid, ret); - if (ret == -EAGAIN || ret == 0) { - if (ret == -EAGAIN && - test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && - !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { - /* Notify TCP that we're limited by the - * application window size. - */ - set_bit(SOCK_NOSPACE, &con->sock->flags); - con->sock->sk->sk_write_pending++; - } - cond_resched(); - goto out; - } else if (ret < 0) - goto out; + len = e->len; + offset = e->offset; + WARN_ON_ONCE(len == 0 && e->users == 0); + spin_unlock_bh(&con->writequeue_lock); - /* Don't starve people filling buffers */ - if (++count >= MAX_SEND_MSG_COUNT) { - cond_resched(); - count = 0; + ret = kernel_sendpage(con->sock, e->page, offset, len, + msg_flags); + trace_dlm_send(con->nodeid, ret); + if (ret == -EAGAIN || ret == 0) { + lock_sock(con->sock->sk); + spin_lock_bh(&con->writequeue_lock); + if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && + !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { + /* Notify TCP that we're limited by the + * application window size. + */ + set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); + con->sock->sk->sk_write_pending++; + + clear_bit(CF_SEND_PENDING, &con->flags); + spin_unlock_bh(&con->writequeue_lock); + release_sock(con->sock->sk); + + /* wait for write_space() event */ + return DLM_IO_END; } + spin_unlock_bh(&con->writequeue_lock); + release_sock(con->sock->sk); - spin_lock(&con->writequeue_lock); - writequeue_entry_complete(e, ret); - } - spin_unlock(&con->writequeue_lock); - - /* close if we got EOF */ - if (test_and_clear_bit(CF_EOF, &con->flags)) { - mutex_unlock(&con->sock_mutex); - close_connection(con, false, false, true); - - /* handling for tcp shutdown */ - clear_bit(CF_SHUTDOWN, &con->flags); - wake_up(&con->shutdown_wait); - } else { - mutex_unlock(&con->sock_mutex); + return DLM_IO_RESCHED; + } else if (ret < 0) { + return ret; } - return; - -out: - mutex_unlock(&con->sock_mutex); - return; + spin_lock_bh(&con->writequeue_lock); + writequeue_entry_complete(e, ret); + spin_unlock_bh(&con->writequeue_lock); -out_connect: - mutex_unlock(&con->sock_mutex); - queue_work(send_workqueue, &con->swork); - cond_resched(); + return DLM_IO_SUCCESS; } static void clean_one_writequeue(struct connection *con) { struct writequeue_entry *e, *safe; - spin_lock(&con->writequeue_lock); + spin_lock_bh(&con->writequeue_lock); list_for_each_entry_safe(e, safe, &con->writequeue, list) { free_entry(e); } - spin_unlock(&con->writequeue_lock); + spin_unlock_bh(&con->writequeue_lock); +} + +static void connection_release(struct rcu_head *rcu) +{ + struct connection *con = container_of(rcu, struct connection, rcu); + + WARN_ON_ONCE(!list_empty(&con->writequeue)); + WARN_ON_ONCE(con->sock); + kfree(con); } /* Called from recovery when it knows that a node has @@ -1504,286 +1427,311 @@ static void clean_one_writequeue(struct connection *con) int dlm_lowcomms_close(int nodeid) { struct connection *con; - struct dlm_node_addr *na; int idx; log_print("closing connection to node %d", nodeid); + idx = srcu_read_lock(&connections_srcu); con = nodeid2con(nodeid, 0); - if (con) { - set_bit(CF_CLOSE, &con->flags); - close_connection(con, true, true, true); - clean_one_writequeue(con); + if (WARN_ON_ONCE(!con)) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOENT; + } + + stop_connection_io(con); + log_print("io handling for node: %d stopped", nodeid); + close_connection(con, true); + + spin_lock(&connections_lock); + hlist_del_rcu(&con->list); + spin_unlock(&connections_lock); + + clean_one_writequeue(con); + call_srcu(&connections_srcu, &con->rcu, connection_release); + if (con->othercon) { + clean_one_writequeue(con->othercon); if (con->othercon) - clean_one_writequeue(con->othercon); + call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); } srcu_read_unlock(&connections_srcu, idx); - spin_lock(&dlm_node_addrs_spin); - na = find_node_addr(nodeid); - if (na) { - list_del(&na->list); - while (na->addr_count--) - kfree(na->addr[na->addr_count]); - kfree(na); - } - spin_unlock(&dlm_node_addrs_spin); + /* for debugging we print when we are done to compare with other + * messages in between. This function need to be correctly synchronized + * with io handling + */ + log_print("closing connection to node %d done", nodeid); return 0; } -/* Receive workqueue function */ +/* Receive worker function */ static void process_recv_sockets(struct work_struct *work) { struct connection *con = container_of(work, struct connection, rwork); + int ret, buflen; + + down_read(&con->sock_lock); + if (!con->sock) { + up_read(&con->sock_lock); + return; + } + + buflen = READ_ONCE(dlm_config.ci_buffer_size); + do { + ret = receive_from_sock(con, buflen); + } while (ret == DLM_IO_SUCCESS); + up_read(&con->sock_lock); - clear_bit(CF_READ_PENDING, &con->flags); - receive_from_sock(con); + switch (ret) { + case DLM_IO_END: + /* CF_RECV_PENDING cleared */ + break; + case DLM_IO_EOF: + close_connection(con, false); + /* CF_RECV_PENDING cleared */ + break; + case DLM_IO_RESCHED: + cond_resched(); + queue_work(io_workqueue, &con->rwork); + /* CF_RECV_PENDING not cleared */ + break; + default: + if (ret < 0) { + if (test_bit(CF_IS_OTHERCON, &con->flags)) { + close_connection(con, false); + } else { + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + } + + /* CF_RECV_PENDING cleared for othercon + * we trigger send queue if not already done + * and process_send_sockets will handle it + */ + break; + } + + WARN_ON_ONCE(1); + break; + } } static void process_listen_recv_socket(struct work_struct *work) { - accept_from_sock(&listen_con); + int ret; + + if (WARN_ON_ONCE(!listen_con.sock)) + return; + + do { + ret = accept_from_sock(); + } while (ret == DLM_IO_SUCCESS); + + if (ret < 0) + log_print("critical error accepting connection: %d", ret); } -static void dlm_connect(struct connection *con) +static int dlm_connect(struct connection *con) { struct sockaddr_storage addr; int result, addr_len; struct socket *sock; unsigned int mark; - /* Some odd races can cause double-connects, ignore them */ - if (con->retries++ > MAX_CONNECT_RETRIES) - return; - - if (con->sock) { - log_print("node %d already connected.", con->nodeid); - return; - } - memset(&addr, 0, sizeof(addr)); result = nodeid_to_addr(con->nodeid, &addr, NULL, dlm_proto_ops->try_new_addr, &mark); if (result < 0) { log_print("no address for nodeid %d", con->nodeid); - return; + return result; } /* Create a socket to communicate with */ - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, + result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, SOCK_STREAM, dlm_proto_ops->proto, &sock); if (result < 0) - goto socket_err; + return result; sock_set_mark(sock->sk, mark); dlm_proto_ops->sockopts(sock); - add_sock(sock, con); - result = dlm_proto_ops->bind(sock); - if (result < 0) - goto add_sock_err; + if (result < 0) { + sock_release(sock); + return result; + } + + add_sock(sock, con); log_print_ratelimited("connecting to %d", con->nodeid); make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, addr_len); - if (result < 0) - goto add_sock_err; - - return; - -add_sock_err: - dlm_close_sock(&con->sock); + switch (result) { + case -EINPROGRESS: + /* not an error */ + fallthrough; + case 0: + break; + default: + if (result < 0) + dlm_close_sock(&con->sock); -socket_err: - /* - * Some errors are fatal and this list might need adjusting. For other - * errors we try again until the max number of retries is reached. - */ - if (result != -EHOSTUNREACH && - result != -ENETUNREACH && - result != -ENETDOWN && - result != -EINVAL && - result != -EPROTONOSUPPORT) { - log_print("connect %d try %d error %d", con->nodeid, - con->retries, result); - msleep(1000); - lowcomms_connect_sock(con); + break; } + + return result; } -/* Send workqueue function */ +/* Send worker function */ static void process_send_sockets(struct work_struct *work) { struct connection *con = container_of(work, struct connection, swork); + int ret; - WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); - - clear_bit(CF_WRITE_PENDING, &con->flags); + WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); - if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { - close_connection(con, false, false, true); - dlm_midcomms_unack_msg_resend(con->nodeid); + down_read(&con->sock_lock); + if (!con->sock) { + up_read(&con->sock_lock); + down_write(&con->sock_lock); + if (!con->sock) { + ret = dlm_connect(con); + switch (ret) { + case 0: + break; + case -EINPROGRESS: + /* avoid spamming resched on connection + * we might can switch to a state_change + * event based mechanism if established + */ + msleep(100); + break; + default: + /* CF_SEND_PENDING not cleared */ + up_write(&con->sock_lock); + log_print("connect to node %d try %d error %d", + con->nodeid, con->retries++, ret); + msleep(1000); + /* For now we try forever to reconnect. In + * future we should send a event to cluster + * manager to fence itself after certain amount + * of retries. + */ + queue_work(io_workqueue, &con->swork); + return; + } + } + downgrade_write(&con->sock_lock); } - if (con->sock == NULL) { - if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) - msleep(1000); + do { + ret = send_to_sock(con); + } while (ret == DLM_IO_SUCCESS); + up_read(&con->sock_lock); - mutex_lock(&con->sock_mutex); - dlm_connect(con); - mutex_unlock(&con->sock_mutex); - } + switch (ret) { + case DLM_IO_END: + /* CF_SEND_PENDING cleared */ + break; + case DLM_IO_RESCHED: + /* CF_SEND_PENDING not cleared */ + cond_resched(); + queue_work(io_workqueue, &con->swork); + break; + default: + if (ret < 0) { + close_connection(con, false); + + /* CF_SEND_PENDING cleared */ + spin_lock_bh(&con->writequeue_lock); + lowcomms_queue_swork(con); + spin_unlock_bh(&con->writequeue_lock); + break; + } - if (!list_empty(&con->writequeue)) - send_to_sock(con); + WARN_ON_ONCE(1); + break; + } } static void work_stop(void) { - if (recv_workqueue) { - destroy_workqueue(recv_workqueue); - recv_workqueue = NULL; + if (io_workqueue) { + destroy_workqueue(io_workqueue); + io_workqueue = NULL; } - if (send_workqueue) { - destroy_workqueue(send_workqueue); - send_workqueue = NULL; + if (process_workqueue) { + destroy_workqueue(process_workqueue); + process_workqueue = NULL; } } static int work_start(void) { - recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM); - if (!recv_workqueue) { - log_print("can't start dlm_recv"); + io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM, + 0); + if (!io_workqueue) { + log_print("can't start dlm_io"); return -ENOMEM; } - send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM); - if (!send_workqueue) { - log_print("can't start dlm_send"); - destroy_workqueue(recv_workqueue); - recv_workqueue = NULL; + /* ordered dlm message process queue, + * should be converted to a tasklet + */ + process_workqueue = alloc_ordered_workqueue("dlm_process", + WQ_HIGHPRI | WQ_MEM_RECLAIM); + if (!process_workqueue) { + log_print("can't start dlm_process"); + destroy_workqueue(io_workqueue); + io_workqueue = NULL; return -ENOMEM; } return 0; } -static void shutdown_conn(struct connection *con) -{ - if (dlm_proto_ops->shutdown_action) - dlm_proto_ops->shutdown_action(con); -} - void dlm_lowcomms_shutdown(void) { - int idx; - - /* Set all the flags to prevent any - * socket activity. - */ - dlm_allow_conn = 0; - - if (recv_workqueue) - flush_workqueue(recv_workqueue); - if (send_workqueue) - flush_workqueue(send_workqueue); + /* stop lowcomms_listen_data_ready calls */ + lock_sock(listen_con.sock->sk); + listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; + release_sock(listen_con.sock->sk); + cancel_work_sync(&listen_con.rwork); dlm_close_sock(&listen_con.sock); - idx = srcu_read_lock(&connections_srcu); - foreach_conn(shutdown_conn); - srcu_read_unlock(&connections_srcu, idx); -} - -static void _stop_conn(struct connection *con, bool and_other) -{ - mutex_lock(&con->sock_mutex); - set_bit(CF_CLOSE, &con->flags); - set_bit(CF_READ_PENDING, &con->flags); - set_bit(CF_WRITE_PENDING, &con->flags); - if (con->sock && con->sock->sk) { - lock_sock(con->sock->sk); - con->sock->sk->sk_user_data = NULL; - release_sock(con->sock->sk); - } - if (con->othercon && and_other) - _stop_conn(con->othercon, false); - mutex_unlock(&con->sock_mutex); -} - -static void stop_conn(struct connection *con) -{ - _stop_conn(con, true); + flush_workqueue(process_workqueue); } -static void connection_release(struct rcu_head *rcu) +void dlm_lowcomms_shutdown_node(int nodeid, bool force) { - struct connection *con = container_of(rcu, struct connection, rcu); - - kfree(con->rx_buf); - kfree(con); -} + struct connection *con; + int idx; -static void free_conn(struct connection *con) -{ - close_connection(con, true, true, true); - spin_lock(&connections_lock); - hlist_del_rcu(&con->list); - spin_unlock(&connections_lock); - if (con->othercon) { - clean_one_writequeue(con->othercon); - call_srcu(&connections_srcu, &con->othercon->rcu, - connection_release); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, 0); + if (WARN_ON_ONCE(!con)) { + srcu_read_unlock(&connections_srcu, idx); + return; } - clean_one_writequeue(con); - call_srcu(&connections_srcu, &con->rcu, connection_release); -} -static void work_flush(void) -{ - int ok; - int i; - struct connection *con; - - do { - ok = 1; - foreach_conn(stop_conn); - if (recv_workqueue) - flush_workqueue(recv_workqueue); - if (send_workqueue) - flush_workqueue(send_workqueue); - for (i = 0; i < CONN_HASH_SIZE && ok; i++) { - hlist_for_each_entry_rcu(con, &connection_hash[i], - list) { - ok &= test_bit(CF_READ_PENDING, &con->flags); - ok &= test_bit(CF_WRITE_PENDING, &con->flags); - if (con->othercon) { - ok &= test_bit(CF_READ_PENDING, - &con->othercon->flags); - ok &= test_bit(CF_WRITE_PENDING, - &con->othercon->flags); - } - } - } - } while (!ok); + flush_work(&con->swork); + stop_connection_io(con); + WARN_ON_ONCE(!force && !list_empty(&con->writequeue)); + close_connection(con, true); + clean_one_writequeue(con); + if (con->othercon) + clean_one_writequeue(con->othercon); + allow_connection_io(con); + srcu_read_unlock(&connections_srcu, idx); } void dlm_lowcomms_stop(void) { - int idx; - - idx = srcu_read_lock(&connections_srcu); - work_flush(); - foreach_conn(free_conn); - srcu_read_unlock(&connections_srcu, idx); work_stop(); - deinit_local(); - dlm_proto_ops = NULL; } @@ -1799,7 +1747,7 @@ static int dlm_listen_for_all(void) if (result < 0) return result; - result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, + result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, SOCK_STREAM, dlm_proto_ops->proto, &sock); if (result < 0) { log_print("Can't create comms socket: %d", result); @@ -1813,14 +1761,23 @@ static int dlm_listen_for_all(void) if (result < 0) goto out; - save_listen_callbacks(sock); - add_listen_sock(sock, &listen_con); + lock_sock(sock->sk); + listen_sock.sk_data_ready = sock->sk->sk_data_ready; + listen_sock.sk_write_space = sock->sk->sk_write_space; + listen_sock.sk_error_report = sock->sk->sk_error_report; + listen_sock.sk_state_change = sock->sk->sk_state_change; + + listen_con.sock = sock; + + sock->sk->sk_allocation = GFP_NOFS; + sock->sk->sk_use_task_frag = false; + sock->sk->sk_data_ready = lowcomms_listen_data_ready; + release_sock(sock->sk); - INIT_WORK(&listen_con.rwork, process_listen_recv_socket); result = sock->ops->listen(sock, 5); if (result < 0) { dlm_close_sock(&listen_con.sock); - goto out; + return result; } return 0; @@ -1838,7 +1795,7 @@ static int dlm_tcp_bind(struct socket *sock) /* Bind to our cluster-known address connecting to avoid * routing problems. */ - memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); + memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); make_sockaddr(&src_addr, 0, &addr_len); result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, @@ -1854,17 +1811,7 @@ static int dlm_tcp_bind(struct socket *sock) static int dlm_tcp_connect(struct connection *con, struct socket *sock, struct sockaddr *addr, int addr_len) { - int ret; - - ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); - switch (ret) { - case -EINPROGRESS: - fallthrough; - case 0: - return 0; - } - - return ret; + return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); } static int dlm_tcp_listen_validate(void) @@ -1895,8 +1842,8 @@ static int dlm_tcp_listen_bind(struct socket *sock) int addr_len; /* Bind to our port */ - make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); - return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0], + make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); + return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], addr_len); } @@ -1909,8 +1856,6 @@ static const struct dlm_proto_ops dlm_tcp_ops = { .listen_validate = dlm_tcp_listen_validate, .listen_sockopts = dlm_tcp_listen_sockopts, .listen_bind = dlm_tcp_listen_bind, - .shutdown_action = dlm_tcp_shutdown, - .eof_condition = tcp_eof_condition, }; static int dlm_sctp_bind(struct socket *sock) @@ -1931,13 +1876,7 @@ static int dlm_sctp_connect(struct connection *con, struct socket *sock, sock_set_sndtimeo(sock->sk, 5); ret = sock->ops->connect(sock, addr, addr_len, 0); sock_set_sndtimeo(sock->sk, 0); - if (ret < 0) - return ret; - - if (!test_and_set_bit(CF_CONNECTED, &con->flags)) - log_print("connected to node %d", con->nodeid); - - return 0; + return ret; } static int dlm_sctp_listen_validate(void) @@ -1977,11 +1916,7 @@ static const struct dlm_proto_ops dlm_sctp_ops = { int dlm_lowcomms_start(void) { - int error = -EINVAL; - int i; - - for (i = 0; i < CONN_HASH_SIZE; i++) - INIT_HLIST_HEAD(&connection_hash[i]); + int error; init_local(); if (!dlm_local_count) { @@ -1990,13 +1925,9 @@ int dlm_lowcomms_start(void) goto fail; } - INIT_WORK(&listen_con.rwork, process_listen_recv_socket); - error = work_start(); if (error) - goto fail_local; - - dlm_allow_conn = 1; + goto fail; /* Start listening */ switch (dlm_config.ci_protocol) { @@ -2022,25 +1953,38 @@ int dlm_lowcomms_start(void) fail_listen: dlm_proto_ops = NULL; fail_proto_ops: - dlm_allow_conn = 0; - dlm_close_sock(&listen_con.sock); work_stop(); -fail_local: - deinit_local(); fail: return error; } +void dlm_lowcomms_init(void) +{ + int i; + + for (i = 0; i < CONN_HASH_SIZE; i++) + INIT_HLIST_HEAD(&connection_hash[i]); + + INIT_WORK(&listen_con.rwork, process_listen_recv_socket); +} + void dlm_lowcomms_exit(void) { - struct dlm_node_addr *na, *safe; + struct connection *con; + int i, idx; - spin_lock(&dlm_node_addrs_spin); - list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { - list_del(&na->list); - while (na->addr_count--) - kfree(na->addr[na->addr_count]); - kfree(na); + idx = srcu_read_lock(&connections_srcu); + for (i = 0; i < CONN_HASH_SIZE; i++) { + hlist_for_each_entry_rcu(con, &connection_hash[i], list) { + spin_lock(&connections_lock); + hlist_del_rcu(&con->list); + spin_unlock(&connections_lock); + + if (con->othercon) + call_srcu(&connections_srcu, &con->othercon->rcu, + connection_release); + call_srcu(&connections_srcu, &con->rcu, connection_release); + } } - spin_unlock(&dlm_node_addrs_spin); + srcu_read_unlock(&connections_srcu, idx); } |