diff --git a/mtproto-client.c b/mtproto-client.c index c2f641f..d221f4c 100644 --- a/mtproto-client.c +++ b/mtproto-client.c @@ -1843,11 +1843,12 @@ void mtproto_connect(struct mtproto_connection *c) c->connection->methods->ready(c->connection); // Don't ping TODO: Really? Timeout callback functions of libpurple - //start_ping_timer (c->connection); + start_ping_timer (c->connection); } /** - * Free all used resources and close the connection + * Mark the connection for destruction, stop all timers and initiate + * cleanup tasks */ void mtproto_close(struct mtproto_connection *mtp) { logprintf ("closing mtproto_connection...\n"); @@ -1856,24 +1857,26 @@ void mtproto_close(struct mtproto_connection *mtp) { // send all pending acks on this connection so the server won't // resend messages. We might not be able to send the acknowledgements later // in case the session is switched and this DC is not reachable anymore - send_all_acks (mtp->connection->session); - - // remove all ping timer that point to this connection - //stop_ping_timer (mtp->connection); + if (mtp->connection) { + if (mtp->connection->session && mtp->connection->session->ack_tree) { + send_all_acks (mtp->connection->session); + mtp->instance->config->on_output(mtp->handle); - // - mtp->connection->session->c = 0; + // connection no longer usable for session + mtp->connection->session->c = 0; + } + stop_ping_timer (mtp->connection); + } + // remove all ping timer that point to this connection } /** - * Close the connection and + * Close the underlying file descriptor */ void mtproto_destroy (struct mtproto_connection *self) { logprintf("destroying mtproto_connection: %p\n", self); - - // TODO: Call destruction callback + self->instance->config->proxy_close_cb(self->handle); fd_close_connection(self->connection); - tfree(self->connection, sizeof(struct connection)); tfree(self, sizeof(struct mtproto_connection)); } @@ -1889,13 +1892,15 @@ void mtproto_free_closed (struct telegram *tg) { logprintf ("checking mtproto_connection %d: c_state:%d destroy:%d, quries_num:%d\n", i, c->c_state, c->destroy, c->queries_num); if (c->destroy == 0) continue; - if (c->queries_num > 0) { - logprintf ("still pending queries left, skipping connection...\n"); + if (c->connection->out_bytes > 0) { + logprintf ("still %d bytes ouput left, skipping connection...\n", c->connection->out_bytes); continue; } mtproto_destroy (c); + if (tg->connection == c) { + tg->connection = NULL; + } tg->Cs[i] = NULL; - tg->connection = NULL; } } diff --git a/mtproto-client.h b/mtproto-client.h index 5fdcebe..3d93e0e 100644 --- a/mtproto-client.h +++ b/mtproto-client.h @@ -250,6 +250,7 @@ struct mtproto_connection { // the corresponding telegram instance // struct telegram *instance; + void *handle; }; void mtproto_connection_init (struct mtproto_connection *c); diff --git a/net.c b/net.c index bbc54a2..459c7d4 100644 --- a/net.c +++ b/net.c @@ -730,5 +730,6 @@ void fd_close_connection(struct connection *c) { c->out_head = c->out_tail = c->in_head = c->in_tail = 0; c->state = conn_stopped; c->out_bytes = c->in_bytes = 0; + tfree(c, sizeof(struct connection)); } diff --git a/purple-plugin/telegram-purple.c b/purple-plugin/telegram-purple.c index 887a778..f6c2d51 100644 --- a/purple-plugin/telegram-purple.c +++ b/purple-plugin/telegram-purple.c @@ -79,7 +79,7 @@ void on_new_user_status(struct telegram *instance, void *user); void on_user_typing(struct telegram *instance, void *user); void on_chat_joined (struct telegram *instance, peer_id_t chatid); static PurpleChat *blist_find_chat_by_id(PurpleConnection *gc, const char *id); -static void tgprpl_has_output(struct telegram *tg); +static void tgprpl_has_output(void *handle); static const char *chat_id_get_comp_val (PurpleConnection *gc, int id, char *value) { @@ -107,7 +107,7 @@ static PurpleConversation *chat_show (PurpleConnection *gc, int id) logprintf ("joining chat first...\n"); telegram_conn *conn = purple_connection_get_protocol_data(gc); do_get_chat_info (conn->tg, MK_CHAT(id)); - tgprpl_has_output (conn->tg); + telegram_flush (conn->tg); } return convo; } @@ -175,52 +175,73 @@ static void login_verification_fail(PurpleAccount *acct) "Please make sure you entered the correct verification code.", NULL, NULL, NULL); } +/* OUTPUT */ + +/** + * Libpurple announced that new output should be written to the write-handle + */ static void tgprpl_output_cb(gpointer data, gint source, PurpleInputCondition cond) { - logprintf("tgprpl_output_cb()\n"); - struct telegram *tg = data; - telegram_conn *conn = tg->extra; - if (telegram_write_output(tg) == 0) { + mtproto_handle *conn = data; + logprintf("tgprpl_output_cb(%p)\n", data); + logprintf("mtp=%p, fd=%d, rh=%d, wh=%d\n", conn->mtp, conn->fd, conn->rh, conn->wh); + if (!conn->mtp) { + logprintf ("connection no loner existing, do nothing. \n"); + return; + } + if (mtp_write_output(conn->mtp) == 0) { logprintf("no output, removing output...\n"); purple_input_remove(conn->wh); conn->wh = 0; } } -static void tgprpl_has_output(struct telegram *tg) +/** + * Telegram announced new output in its buffers + */ +static void tgprpl_has_output(void *handle) { - logprintf("tgprpl_has_output()\n"); - telegram_conn *conn = tg->extra; + logprintf("tgprpl_has_output(%p)\n", handle); + mtproto_handle *conn = handle; if (! conn->wh) { - conn->wh = purple_input_add(telegram_get_connection(tg)->fd, PURPLE_INPUT_WRITE, - tgprpl_output_cb, tg); - logprintf("Attached write handle: %u ", conn->wh); + conn->wh = purple_input_add(conn->fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, handle); } } +/* + * Libpurple announced that new input should be read from the read-handle + */ static void tgprpl_input_cb(gpointer data, gint source, PurpleInputCondition cond) { - struct telegram *tg = data; - //telegram_conn *conn = tg->extra; + mtproto_handle *conn = data; logprintf("tgprpl_input_cb()\n"); + if (!conn->fd) { + logprintf("conn for handle no longer existing, not reading input\n"); + return; + } + mtp_read_input(conn->mtp); - // TODO: remove input handler when no more input - telegram_read_input(tg); - if (telegram_has_output(tg)) { - tgprpl_has_output(tg); + if (conn->mtp) { + // processing of the answer may have inserted new queries + telegram_flush (conn->mtp->instance); + + // free all mtproto_connections that may have errored + mtproto_free_closed(conn->mtp->instance); } } -static void tgprpl_has_input(struct telegram *tg) +/** + * Telegram announced that it awaits new input from the read-handle + * TODO: this is currently unused, evaluate wether its needed at all + */ +static void tgprpl_has_input(void *handle) { logprintf("tgprpl_has_input()\n"); - telegram_conn *conn = tg->extra; + mtproto_handle *conn = handle; if (! conn->rh) { - conn->rh = purple_input_add(telegram_get_connection(tg)->fd, PURPLE_INPUT_READ, - tgprpl_input_cb, tg); + conn->rh = purple_input_add(conn->fd, PURPLE_INPUT_READ, tgprpl_input_cb, handle); logprintf("Attached read handle: %u ", conn->rh); } - } static void init_dc_settings(PurpleAccount *acc, struct dc *DC) @@ -231,15 +252,12 @@ static void init_dc_settings(PurpleAccount *acc, struct dc *DC) } /** - * Handle a proxy-request of telegram - * - * Request a new proxy connection from purple, and execute tgprpl_login_on_connected - * as callback once the connection is ready + * Telegram requests a new connectino to our configured proxy */ -void telegram_on_proxy_request(struct telegram *instance, const char *ip, int port) +void telegram_on_proxy_request(struct telegram *tg, const char *ip, int port) { - telegram_conn *conn = instance->extra; - purple_proxy_connect (conn->gc, conn->pa, ip, port, tgprpl_login_on_connected, conn->tg); + telegram_conn *conn = tg->extra; + purple_proxy_connect (conn->gc, conn->pa, ip, port, tgprpl_login_on_connected, tg); } /** @@ -247,11 +265,20 @@ void telegram_on_proxy_request(struct telegram *instance, const char *ip, int po * * Remove all open inputs added to purple */ -void telegram_on_proxy_close(struct telegram *instance, int fd UU) +void telegram_on_proxy_close(void *handle) { - telegram_conn *conn = instance->extra; - purple_input_remove (conn->rh); - purple_input_remove (conn->wh); + mtproto_handle *conn = handle; + logprintf ("Closing proxy-handles - fd: %d, write-handle: %d, read-handle: %d\n", + conn->fd, conn->wh, conn->rh); + if (conn->rh) { + purple_input_remove (conn->rh); + } + if (conn->wh) { + purple_input_remove (conn->wh); + } + conn->rh = conn->wh = 0; + conn->mtp = 0; + tfree (conn, sizeof(mtproto_handle)); } void telegram_on_phone_registration (struct telegram *instance) @@ -278,7 +305,7 @@ void client_registration_entered (gpointer data, const gchar *code) { struct telegram *tg = data; do_send_code_result (tg, code); - tgprpl_has_output (tg); + telegram_flush (tg); } void client_registration_canceled (gpointer data) @@ -341,8 +368,31 @@ void telegram_on_ready (struct telegram *instance) do_update_contact_list(instance); do_get_dialog_list(instance); do_get_difference(instance); - tgprpl_has_output(instance); - conn->timer = purple_timeout_add (500, queries_timerfunc, conn); + telegram_flush (conn->tg); + conn->timer = purple_timeout_add (5000, queries_timerfunc, conn); +} + +/** + * A proxy connection was created by purple + * + * Use the connection to create a new mtproto-connection and create a handle, + * with additional info for libpurple associated with the new connection + */ +void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_message) +{ + purple_debug_info(PLUGIN_ID, "tgprpl_login_on_connected()\n"); + struct telegram *tg = (struct telegram*)data; + if (fd == -1) { + logprintf("purple_proxy_connect failed: %s\n", error_message); + telegram_destroy(tg); + return; + } + + mtproto_handle *conn = talloc(sizeof (mtproto_handle)); + conn->fd = fd; + conn->wh = purple_input_add(fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, conn); + conn->rh = purple_input_add(fd, PURPLE_INPUT_READ, tgprpl_input_cb, conn); + conn->mtp = telegram_add_proxy(tg, fd, conn); } void telegram_on_disconnected (struct telegram *tg) @@ -351,32 +401,9 @@ void telegram_on_disconnected (struct telegram *tg) assert (0); } -/** - * A proxy connection was created by purple - * - * Set the proxy to the current telegram-instance, and add callbacks to monitor - */ -void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_message) -{ - purple_debug_info(PLUGIN_ID, "tgprpl_login_on_connected()\n"); - struct telegram *tg = (struct telegram*) data; - telegram_conn *conn = tg->extra; - if (fd == -1) { - logprintf("purple_proxy_connect failed: %s\n", error_message); - telegram_destroy(tg); - return; - } - - purple_debug_info(PLUGIN_ID, "Connecting to the telegram network...\n"); - conn->wh = purple_input_add(fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, tg); - conn->rh = purple_input_add(fd, PURPLE_INPUT_READ, tgprpl_input_cb, tg); - - telegram_set_proxy(tg, fd); -} - struct telegram_config tgconf = { NULL, - NULL, // on output + tgprpl_has_output, // on output telegram_on_proxy_request, telegram_on_proxy_close, telegram_on_phone_registration, @@ -400,22 +427,21 @@ static void tgprpl_login(PurpleAccount * acct) purple_debug_info(PLUGIN_ID, "tgprpl_login()\n"); PurpleConnection *gc = purple_account_get_connection(acct); char const *username = purple_account_get_username(acct); - struct dc DC; init_dc_settings(acct, &DC); - - // TODO: fetch current home directory - // use this as root + + // create a new instance of telegram struct telegram *tg = telegram_new (&DC, username, &tgconf); telegram_restore_session(tg); - + + // create handle to store additional info for libpurple in + // the new telegram instance telegram_conn *conn = g_new0(telegram_conn, 1); conn->tg = tg; conn->gc = gc; conn->pa = acct; purple_connection_set_protocol_data(gc, conn); tg->extra = conn; - purple_connection_set_state (conn->gc, PURPLE_CONNECTING); telegram_connect (tg); } @@ -658,7 +684,7 @@ static int tgprpl_send_im(PurpleConnection * gc, const char *who, const char *me PurpleBuddy *b = purple_find_buddy (pa, who); peer_id_t *peer = purple_buddy_get_protocol_data (b); do_send_message (conn->tg, *peer, message, strlen(message)); - tgprpl_has_output (conn->tg); + telegram_flush (conn->tg); return 1; } @@ -913,7 +939,7 @@ static void tgprpl_chat_join(PurpleConnection * gc, GHashTable * data) logprintf ("chat now known\n"); char *subject, *owner, *part; do_get_chat_info (conn->tg, MK_CHAT(atoi(id))); - tgprpl_has_output (conn->tg); + telegram_flush (conn->tg); } else { logprintf ("chat already known\n"); serv_got_joined_chat(conn->gc, atoi(id), groupname); diff --git a/purple-plugin/telegram-purple.h b/purple-plugin/telegram-purple.h index b44fe81..4fde230 100644 --- a/purple-plugin/telegram-purple.h +++ b/purple-plugin/telegram-purple.h @@ -34,21 +34,12 @@ #include "version.h" #include "account.h" #include "connection.h" +#include "mtproto-client.h" typedef struct { struct telegram *tg; PurpleAccount *pa; PurpleConnection *gc; - - /** - * Write handler returned by purple_input_add - */ - guint wh; - - /** - * Read handler returned by purple_input_add - */ - guint rh; /** * Whether the state of the protocol has changed since the last save @@ -62,4 +53,28 @@ typedef struct { } telegram_conn; +typedef struct { + + /** + * The mtproto_connection associated with this handle + */ + struct mtproto_connection *mtp; + + /** + * Write handler returned by purple_input_add + */ + guint wh; + + /** + * Read handler returned by purple_input_add + */ + guint rh; + + /** + * The file descriptor of the used socket + */ + int fd; + +} mtproto_handle; + #endif diff --git a/queries.c b/queries.c index fc4c7e4..c095b54 100644 --- a/queries.c +++ b/queries.c @@ -65,10 +65,6 @@ int offline_mode = 0; extern int sync_from_start; int sync_from_start = 0; -void telegram_flush_queries (struct telegram *instance) { - instance->config->on_output(instance); -} - #define memcmp8(a,b) memcmp ((a), (b), 8) DEFINE_TREE (query, struct query *, memcmp8, 0) ; @@ -3071,14 +3067,3 @@ void do_update_typing (struct telegram *instance, peer_id_t id) { send_query (instance, DC_working, mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &update_typing_methods, 0); } -int telegram_has_output (struct telegram *instance) -{ - if (!instance->connection) { - return 0; - } - if (instance->session_state == STATE_READY) { - return tree_count_query (instance->queries_tree) > 0; - } - return instance->connection->queries_num > 0; -} - diff --git a/telegram.c b/telegram.c index 49bffd1..d402997 100755 --- a/telegram.c +++ b/telegram.c @@ -159,12 +159,12 @@ void telegram_change_state (struct telegram *instance, int state, void *data) mtproto_close (instance->connection); assert (instance->config->proxy_request_cb); // tell the proxy to close all connections - instance->config->proxy_close_cb (instance, instance->connection->connection->fd); + //instance->config->proxy_close_cb (instance, instance->connection->connection->fd); // remove all left over queries and timers free_timers (instance); free_queries (instance); - + // start a new connection to the demanded data center. The pointer to the // new dc should was already updated by the on_error function of the query telegram_connect (instance); @@ -359,26 +359,25 @@ void on_authorized(struct mtproto_connection *c, void *data) telegram_change_state(instance, STATE_AUTHORIZED, NULL); } -void telegram_read_input (struct telegram *instance) -{ - try_read (instance->connection->connection); - - // free all mtproto_connections that may have errored - mtproto_free_closed(instance); -} - -void telegram_set_proxy(struct telegram *instance, int fd) +struct mtproto_connection *telegram_add_proxy(struct telegram *instance, int fd, void *handle) { struct dc *DC_working = telegram_get_working_dc (instance); instance->connection = mtproto_new (DC_working, fd, instance); + instance->connection->handle = handle; instance->connection->on_ready = on_authorized; instance->connection->on_ready_data = instance; mtproto_connect (instance->connection); + return instance->connection; } -int telegram_write_output (struct telegram *instance) +void mtp_read_input (struct mtproto_connection *mtp) { - return try_write(instance->connection->connection); + try_read (mtp->connection); +} + +int mtp_write_output (struct mtproto_connection *mtp) +{ + return try_write(mtp->connection); } int telegram_authenticated (struct telegram *instance) @@ -386,4 +385,21 @@ int telegram_authenticated (struct telegram *instance) return telegram_get_working_dc (instance)->auth_key_id > 0; } +void telegram_flush (struct telegram *instance) +{ + logprintf ("telegram flush()\n"); + int i; + for (i = 0; i < 100; i++) { + struct mtproto_connection *c = instance->Cs[i]; + if (!c) continue; + if (!c->connection) continue; + if (c->connection->out_bytes) { + logprintf ("connection %d has %d bytes, triggering on_output.", + i, c->connection->out_bytes); + instance->config->on_output(c->handle); + } else { + logprintf ("connection %d has no bytes, skipping\n"); + } + } +} diff --git a/telegram.h b/telegram.h index 2b90f9b..3c19742 100644 --- a/telegram.h +++ b/telegram.h @@ -110,7 +110,7 @@ struct telegram_config { /** * Called when there is pending network output */ - void (*on_output)(struct telegram *instance); + void (*on_output)(void *handle); /** * A callback function that delivers a connections to the given hostname @@ -123,7 +123,7 @@ struct telegram_config { * A callback function that is called once the proxy connection is no longer * needed. This is useful for freeing all used resources. */ - void (*proxy_close_cb) (struct telegram *instance, int fd); + void (*proxy_close_cb) (void *handle); /** * A callback function that is called when a phone registration is required. @@ -249,6 +249,9 @@ struct telegram { int get_difference_active; struct message *ML[MSG_STORE_SIZE]; + /* + * All active MtProto connections + */ int cs; struct mtproto_connection *Cs[100]; @@ -304,27 +307,18 @@ int telegram_login (struct telegram *instance); /** * Read and process all available input from the network */ -void telegram_read_input (struct telegram *instance); +void mtp_read_input (struct mtproto_connection *mtp); /** * Write all available output to the network */ -int telegram_write_output (struct telegram *instance); - -/** - * Return whether there is pending output. - */ -int telegram_has_output (struct telegram *instance); +int mtp_write_output (struct mtproto_connection *mtp); /** * Try to interpret RPC calls and apply the changes to the current telegram state */ void try_rpc_interpret(struct telegram *instance, int op, int len); -/* - * TODO: Refactor all old calls to take a telegrma instance - */ - /** * Request a registration code */ @@ -391,15 +385,18 @@ void set_net_read_cb(ssize_t (*cb)(int fd, void *buff, size_t size)); void set_net_write_cb(ssize_t (*cb)(int fd, const void *buff, size_t size)); /** - * Set the proxy-connection to use + * Set the connection after a proxy_request_cb * - * NOTE: you may only call this function from the + * @param fd The file-descriptor of the acquired connection + * @param handle A handle that will be passed back on output and close callbacks */ -void telegram_set_proxy(struct telegram *instance, int fd); +struct mtproto_connection *telegram_add_proxy(struct telegram *tg, int fd, void *handle); /** * Return wether telegram is authenticated with the currently active data center */ int telegram_authenticated (struct telegram *instance); +void telegram_flush (struct telegram *instance); + #endif