From 047ea0cebbf2196d64ba81ab90efe5f8aeaf87fa Mon Sep 17 00:00:00 2001 From: Zhenhua Zhang Date: Sat, 12 Jun 2010 13:50:40 +0800 Subject: gatserver: Suspend/resume GAtServer with GAtIO Support g_at_server_suspend and g_at_server_resume operation by using GAtIO to handle IO related function. --- gatchat/gatserver.c | 259 ++++++++++++++++++++++------------------------------ 1 file changed, 109 insertions(+), 150 deletions(-) (limited to 'gatchat/gatserver.c') diff --git a/gatchat/gatserver.c b/gatchat/gatserver.c index 148754a0..6bb92443 100644 --- a/gatchat/gatserver.c +++ b/gatchat/gatserver.c @@ -31,6 +31,7 @@ #include "ringbuffer.h" #include "gatserver.h" +#include "gatio.h" #define BUF_SIZE 4096 /* + the max length of information text + */ @@ -100,16 +101,13 @@ struct at_command { struct _GAtServer { gint ref_count; /* Ref count */ struct v250_settings v250; /* V.250 command setting */ - GIOChannel *channel; /* Server IO */ - guint read_watch; /* GSource read id, 0 if none */ - guint write_watch; /* GSource write id, 0 if none */ + GAtIO *io; /* Server IO */ guint read_so_far; /* Number of bytes processed */ GAtDisconnectFunc user_disconnect; /* User disconnect func */ gpointer user_disconnect_data; /* User disconnect data */ GAtDebugFunc debugf; /* Debugging output function */ gpointer debug_data; /* Data to pass to debug func */ GHashTable *command_list; /* List of AT commands */ - struct ring_buffer *read_buf; /* Current read buffer */ GQueue *write_queue; /* Write buffer queue */ guint max_read_attempts; /* Max reads per select */ enum ParserState parser_state; @@ -117,12 +115,13 @@ struct _GAtServer { char *last_line; /* Last read line */ unsigned int cur_pos; /* Where we are on the line */ GAtServerResult last_result; - gboolean processing_cmdline; + gboolean suspended; gboolean final_sent; gboolean final_async; + gboolean in_read_handler; }; -static void g_at_server_wakeup_writer(GAtServer *server); +static void server_wakeup_writer(GAtServer *server); static void server_parse_line(GAtServer *server); static struct ring_buffer *allocate_next(GAtServer *server) @@ -162,7 +161,7 @@ static void send_common(GAtServer *server, const char *buf, unsigned int len) write_buf = allocate_next(server); } - g_at_server_wakeup_writer(server); + server_wakeup_writer(server); } static void send_result_common(GAtServer *server, const char *result) @@ -198,14 +197,14 @@ void g_at_server_send_final(GAtServer *server, GAtServerResult result) server->final_sent = TRUE; server->last_result = result; - if (result == G_AT_SERVER_RESULT_OK && server->processing_cmdline) { + if (result == G_AT_SERVER_RESULT_OK && server->suspended) { if (server->final_async) server_parse_line(server); return; } - server->processing_cmdline = FALSE; + g_at_server_resume(server); if (server->v250.is_v1) sprintf(buf, "%s", server_result_to_string(result)); @@ -219,7 +218,7 @@ void g_at_server_send_ext_final(GAtServer *server, const char *result) { server->final_sent = TRUE; server->last_result = G_AT_SERVER_RESULT_EXT_ERROR; - server->processing_cmdline = FALSE; + g_at_server_resume(server); send_result_common(server, result); } @@ -668,7 +667,7 @@ static void server_parse_line(GAtServer *server) server->final_async = FALSE; if (pos == 0) - server->processing_cmdline = TRUE; + g_at_server_suspend(server); while (pos < len) { unsigned int consumed; @@ -702,7 +701,7 @@ static void server_parse_line(GAtServer *server) return; } - server->processing_cmdline = FALSE; + g_at_server_resume(server); g_at_server_send_final(server, G_AT_SERVER_RESULT_OK); } @@ -780,11 +779,11 @@ out: return res; } -static char *extract_line(GAtServer *p) +static char *extract_line(GAtServer *p, struct ring_buffer *rbuf) { - unsigned int wrap = ring_buffer_len_no_wrap(p->read_buf); + unsigned int wrap = ring_buffer_len_no_wrap(rbuf); unsigned int pos = 0; - unsigned char *buf = ring_buffer_read_ptr(p->read_buf, pos); + unsigned char *buf = ring_buffer_read_ptr(rbuf, pos); int strip_front = 0; int line_length = 0; gboolean in_string = FALSE; @@ -806,7 +805,7 @@ static char *extract_line(GAtServer *p) pos += 1; if (pos == wrap) - buf = ring_buffer_read_ptr(p->read_buf, pos); + buf = ring_buffer_read_ptr(rbuf, pos); } /* We will strip AT and S3 */ @@ -814,17 +813,17 @@ static char *extract_line(GAtServer *p) line = g_try_new(char, line_length + 1); if (!line) { - ring_buffer_drain(p->read_buf, p->read_so_far); + ring_buffer_drain(rbuf, p->read_so_far); return NULL; } /* Strip leading whitespace + AT */ - ring_buffer_drain(p->read_buf, strip_front + 2); + ring_buffer_drain(rbuf, strip_front + 2); pos = 0; i = 0; - wrap = ring_buffer_len_no_wrap(p->read_buf); - buf = ring_buffer_read_ptr(p->read_buf, pos); + wrap = ring_buffer_len_no_wrap(rbuf); + buf = ring_buffer_read_ptr(rbuf, pos); while (pos < (p->read_so_far - strip_front - 2)) { if (*buf == '"') @@ -839,33 +838,39 @@ static char *extract_line(GAtServer *p) pos += 1; if (pos == wrap) - buf = ring_buffer_read_ptr(p->read_buf, pos); + buf = ring_buffer_read_ptr(rbuf, pos); } /* Strip S3 */ - ring_buffer_drain(p->read_buf, p->read_so_far - strip_front - 2); + ring_buffer_drain(rbuf, p->read_so_far - strip_front - 2); line[i] = '\0'; return line; } -static void new_bytes(GAtServer *p) +static void new_bytes(struct ring_buffer *rbuf, gpointer user_data) { - unsigned int len = ring_buffer_len(p->read_buf); - unsigned int wrap = ring_buffer_len_no_wrap(p->read_buf); - unsigned char *buf = ring_buffer_read_ptr(p->read_buf, p->read_so_far); + GAtServer *p = user_data; + unsigned int len = ring_buffer_len(rbuf); + unsigned int wrap = ring_buffer_len_no_wrap(rbuf); + unsigned char *buf = ring_buffer_read_ptr(rbuf, p->read_so_far); enum ParserResult result; - while (p->channel && (p->read_so_far < len)) { + p->in_read_handler = TRUE; + + while (p->io && (p->read_so_far < len)) { gsize rbytes = MIN(len - p->read_so_far, wrap - p->read_so_far); result = server_feed(p, (char *)buf, &rbytes); + if (p->v250.echo) + send_common(p, (char *)buf, rbytes); + buf += rbytes; p->read_so_far += rbytes; if (p->read_so_far == wrap) { - buf = ring_buffer_read_ptr(p->read_buf, p->read_so_far); + buf = ring_buffer_read_ptr(rbuf, p->read_so_far); wrap = len; } @@ -879,14 +884,14 @@ static void new_bytes(GAtServer *p) * Empty commands must be OK by the DCE */ g_at_server_send_final(p, G_AT_SERVER_RESULT_OK); - ring_buffer_drain(p->read_buf, p->read_so_far); + ring_buffer_drain(rbuf, p->read_so_far); break; case PARSER_RESULT_COMMAND: { g_free(p->last_line); - p->last_line = extract_line(p); + p->last_line = extract_line(p, rbuf); p->cur_pos = 0; if (p->last_line) @@ -905,11 +910,11 @@ static void new_bytes(GAtServer *p) else g_at_server_send_final(p, G_AT_SERVER_RESULT_OK); - ring_buffer_drain(p->read_buf, p->read_so_far); + ring_buffer_drain(rbuf, p->read_so_far); break; default: - ring_buffer_drain(p->read_buf, p->read_so_far); + ring_buffer_drain(rbuf, p->read_so_far); break; } @@ -918,72 +923,15 @@ static void new_bytes(GAtServer *p) p->read_so_far = 0; } - /* We're overflowing the buffer, shutdown the socket */ - if (p->read_buf && ring_buffer_avail(p->read_buf) == 0) - g_source_remove(p->read_watch); -} - -static gboolean received_data(GIOChannel *channel, GIOCondition cond, - gpointer data) -{ - unsigned char *buf; - GAtServer *server = data; - GIOError err; - gsize rbytes; - gsize toread; - guint total_read = 0; - guint read_count = 0; - - if (cond & G_IO_NVAL) - return FALSE; - - do { - toread = ring_buffer_avail_no_wrap(server->read_buf); - - if (toread == 0) - break; - - rbytes = 0; - buf = ring_buffer_write_ptr(server->read_buf, 0); - - err = g_io_channel_read(channel, (char *) buf, toread, &rbytes); - g_at_util_debug_chat(TRUE, (char *)buf, rbytes, - server->debugf, server->debug_data); - - read_count++; - - if (rbytes == 0) - break; - - if (server->v250.echo) - send_common(server, (char *)buf, rbytes); - - /* Ignore incoming bytes when processing a command line */ - if (server->processing_cmdline) - continue; + p->in_read_handler = FALSE; - total_read += rbytes; - ring_buffer_write_advance(server->read_buf, rbytes); - } while (err == G_IO_ERROR_NONE && - read_count < server->max_read_attempts); - - if (total_read > 0) - new_bytes(server); - - if (cond & (G_IO_HUP | G_IO_ERR)) - return FALSE; - - if (read_count > 0 && rbytes == 0 && err != G_IO_ERROR_AGAIN) - return FALSE; - - return TRUE; + if (p->destroyed) + g_free(p); } -static gboolean can_write_data(GIOChannel *channel, GIOCondition cond, - gpointer data) +static gboolean can_write_data(gpointer data) { GAtServer *server = data; - GIOError err; gsize bytes_written; gsize towrite; struct ring_buffer *write_buf; @@ -992,9 +940,6 @@ static gboolean can_write_data(GIOChannel *channel, GIOCondition cond, int limiter; #endif - if (cond & (G_IO_NVAL | G_IO_HUP | G_IO_ERR)) - return FALSE; - if (!server->write_queue) return FALSE; @@ -1012,22 +957,17 @@ static gboolean can_write_data(GIOChannel *channel, GIOCondition cond, limiter = 5; #endif - err = g_io_channel_write(server->channel, + bytes_written = g_at_io_write(server->io, (char *)buf, #ifdef WRITE_SCHEDULER_DEBUG - limiter, + limiter #else - towrite, + towrite #endif - &bytes_written); + ); - if (err != G_IO_ERROR_NONE) { - g_source_remove(server->read_watch); + if (bytes_written == 0) return FALSE; - } - - g_at_util_debug_chat(FALSE, (char *)buf, bytes_written, server->debugf, - server->debug_data); ring_buffer_drain(write_buf, bytes_written); @@ -1059,10 +999,6 @@ static void write_queue_free(GQueue *write_queue) static void g_at_server_cleanup(GAtServer *server) { - /* Cleanup all received data */ - ring_buffer_free(server->read_buf); - server->read_buf = NULL; - /* Cleanup pending data to write */ write_queue_free(server->write_queue); @@ -1071,15 +1007,15 @@ static void g_at_server_cleanup(GAtServer *server) g_free(server->last_line); - server->channel = NULL; + g_at_io_unref(server->io); + server->io = NULL; } -static void read_watcher_destroy_notify(gpointer user_data) +static void io_disconnect(gpointer user_data) { GAtServer *server = user_data; g_at_server_cleanup(server); - server->read_watch = 0; if (server->user_disconnect) server->user_disconnect(server->user_disconnect_data); @@ -1088,23 +1024,9 @@ static void read_watcher_destroy_notify(gpointer user_data) g_free(server); } -static void write_watcher_destroy_notify(gpointer user_data) +static void server_wakeup_writer(GAtServer *server) { - GAtServer *server = user_data; - - server->write_watch = 0; -} - -static void g_at_server_wakeup_writer(GAtServer *server) -{ - if (server->write_watch != 0) - return; - - server->write_watch = g_io_add_watch_full(server->channel, - G_PRIORITY_DEFAULT, - G_IO_OUT | G_IO_HUP | G_IO_ERR | G_IO_NVAL, - can_write_data, server, - write_watcher_destroy_notify); + g_at_io_set_write_handler(server->io, can_write_data, server); } static void v250_settings_create(struct v250_settings *v250) @@ -1156,13 +1078,15 @@ GAtServer *g_at_server_new(GIOChannel *io) server->ref_count = 1; v250_settings_create(&server->v250); - server->channel = io; + server->io = g_at_io_new(io); + if (!server->io) + goto error; + + g_at_io_set_disconnect_function(server->io, io_disconnect, server); + server->command_list = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, at_notify_node_destroy); - server->read_buf = ring_buffer_new(BUF_SIZE); - if (!server->read_buf) - goto error; server->write_queue = g_queue_new(); if (!server->write_queue) @@ -1173,25 +1097,18 @@ GAtServer *g_at_server_new(GIOChannel *io) server->max_read_attempts = 3; - if (!g_at_util_setup_io(server->channel, G_IO_FLAG_NONBLOCK)) - goto error; - - server->read_watch = g_io_add_watch_full(io, G_PRIORITY_DEFAULT, - G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL, - received_data, server, - read_watcher_destroy_notify); + g_at_io_set_read_handler(server->io, new_bytes, server); basic_command_register(server); return server; error: + g_at_io_unref(server->io); + if (server->command_list) g_hash_table_destroy(server->command_list); - if (server->read_buf) - ring_buffer_free(server->read_buf); - if (server->write_queue) write_queue_free(server->write_queue); @@ -1201,6 +1118,22 @@ error: return NULL; } +GIOChannel *g_at_server_get_channel(GAtServer *server) +{ + if (server == NULL || server->io == NULL) + return NULL; + + return g_at_io_get_channel(server->io); +} + +GAtIO *g_at_server_get_io(GAtServer *server) +{ + if (server == NULL) + return NULL; + + return server->io; +} + GAtServer *g_at_server_ref(GAtServer *server) { if (server == NULL) @@ -1211,6 +1144,33 @@ GAtServer *g_at_server_ref(GAtServer *server) return server; } +void g_at_server_suspend(GAtServer *server) +{ + if (server == NULL) + return; + + server->suspended = TRUE; + + g_at_io_set_write_handler(server->io, NULL, NULL); + g_at_io_set_read_handler(server->io, NULL, NULL); + + g_at_io_set_debug(server->io, NULL, NULL); +} + +void g_at_server_resume(GAtServer *server) +{ + if (server == NULL) + return; + + server->suspended = FALSE; + + g_at_io_set_debug(server->io, server->debugf, server->debug_data); + g_at_io_set_read_handler(server->io, new_bytes, server); + + if (g_queue_get_length(server->write_queue) > 0) + server_wakeup_writer(server); +} + void g_at_server_unref(GAtServer *server) { gboolean is_zero; @@ -1223,6 +1183,11 @@ void g_at_server_unref(GAtServer *server) if (is_zero == FALSE) return; + if (server->io) { + g_at_server_suspend(server); + g_at_server_cleanup(server); + } + g_at_server_shutdown(server); /* glib delays the destruction of the watcher until it exits, this @@ -1230,7 +1195,7 @@ void g_at_server_unref(GAtServer *server) * destroyed already. We have to wait until the read_watcher * destroy function gets called */ - if (server->read_watch != 0) + if (server->in_read_handler) server->destroyed = TRUE; else g_free(server); @@ -1245,12 +1210,6 @@ gboolean g_at_server_shutdown(GAtServer *server) server->user_disconnect = NULL; server->user_disconnect_data = NULL; - if (server->write_watch) - g_source_remove(server->write_watch); - - if (server->read_watch) - g_source_remove(server->read_watch); - return TRUE; } -- cgit v1.2.3