diff --git a/mtproto-client.c b/mtproto-client.c index 4a45ea7..61d77b0 100644 --- a/mtproto-client.c +++ b/mtproto-client.c @@ -1779,8 +1779,8 @@ int tc_becomes_ready (struct connection *c) { send_req_pq_packet (c); break; case st_authorized: - auth_work_start (c); - telegram_change_state (c->instance, STATE_AUTHORIZED, NULL); + c->mtconnection->on_ready(c->mtconnection, c->mtconnection->on_ready_data); + //telegram_change_state (c->instance, STATE_AUTHORIZED, NULL); break; default: logprintf ( "c_state = %d\n", c->mtconnection->c_state); @@ -1853,10 +1853,8 @@ struct mtproto_connection *mtproto_new(struct dc *DC, int fd, struct telegram *t void mtproto_connect(struct mtproto_connection *c) { on_start(c); - c->connection->methods->ready(c->connection); - - // Don't ping TODO: Really? Timeout callback functions of libpurple start_ping_timer (c->connection); + c->connection->methods->ready(c->connection); } /** @@ -1872,11 +1870,10 @@ void mtproto_close(struct mtproto_connection *mtp) { // in case the session is switched and this DC is not reachable anymore if (mtp->connection) { if (mtp->connection->session && mtp->connection->session->ack_tree) { - send_all_acks (mtp->connection->session); + struct session *S = mtp->connection->session; + send_all_acks (S); mtp->instance->config->on_output(mtp->handle); - - // connection no longer usable for session - mtp->connection->session->c = 0; + S->c = 0; } stop_ping_timer (mtp->connection); } @@ -1893,6 +1890,20 @@ void mtproto_destroy (struct mtproto_connection *self) { tfree(self, sizeof(struct mtproto_connection)); } +void mtproto_close_foreign (struct telegram *instance) +{ + int i; + for (i = 0; i < 100; i++) { + struct mtproto_connection * c = instance->Cs[i]; + if (c && + !c->destroy && + c->connection->session->dc->id != instance->auth.dc_working_num) { + logprintf ("closing connection for working_dc=%d, dc=%d\n", + instance->auth.dc_working_num, c->connection->session->dc->id); + mtproto_close (c); + } + } +} /** * Free all destroyed connections diff --git a/mtproto-client.h b/mtproto-client.h index 3d93e0e..f6dfc35 100644 --- a/mtproto-client.h +++ b/mtproto-client.h @@ -506,6 +506,7 @@ static inline void hexdump_out (struct mtproto_connection *self) { #endif void my_clock_gettime (int clock_id, struct timespec *T); +void mtproto_close_foreign (struct telegram *instance); void mtproto_free_closed (struct telegram *tg); #endif diff --git a/net.c b/net.c index b8d5779..a63b754 100644 --- a/net.c +++ b/net.c @@ -679,7 +679,8 @@ void dc_create_session (struct dc *DC) { * Wrap an existing socket file descriptor and make it usable as a connection, */ struct connection *fd_create_connection (struct dc *DC, int fd, - struct telegram *instance, struct connection_methods *methods, struct mtproto_connection *mtp) { + struct telegram *instance, struct connection_methods *methods, + struct mtproto_connection *mtp) { // create a connection struct connection *c = talloc0 (sizeof (*c)); @@ -693,8 +694,6 @@ struct connection *fd_create_connection (struct dc *DC, int fd, c->last_receive_time = get_double_time (); logprintf ( "connect to %s:%d successful\n", DC->ip, DC->port); - // TODO: Load existing session from state file - // create an empty session and attach it to the dc and the connection if (!DC->sessions[0]) { struct session *S = talloc0 (sizeof (*S)); assert (RAND_pseudo_bytes ((unsigned char *) &S->session_id, 8) >= 0); @@ -702,6 +701,9 @@ struct connection *fd_create_connection (struct dc *DC, int fd, S->c = c; DC->sessions[0] = S; } + if (!DC->sessions[0]->c) { + DC->sessions[0]->c = c; + } // add backreference to session c->session = DC->sessions[0]; diff --git a/net.h b/net.h index b7a4763..ed02eba 100644 --- a/net.h +++ b/net.h @@ -148,7 +148,7 @@ void dc_create_session (struct dc *DC); void insert_msg_id (struct session *S, long long id); struct dc *alloc_dc (struct dc* DC_list[], int id, char *ip, int port); -#define GET_DC(c) (telegram_get_working_dc(c->instance)) +#define GET_DC(c) (c->session->dc) // export read and write methods to redirect network control void try_read (struct connection *c); diff --git a/purple-plugin/telegram-purple.c b/purple-plugin/telegram-purple.c index fcc4e7d..c3e22e0 100644 --- a/purple-plugin/telegram-purple.c +++ b/purple-plugin/telegram-purple.c @@ -263,10 +263,12 @@ static void init_dc_settings(PurpleAccount *acc, struct dc *DC) /** * Telegram requests a new connectino to our configured proxy */ -void telegram_on_proxy_request(struct telegram *tg, const char *ip, int port) +void telegram_on_proxy_request(struct telegram *tg, struct proxy_request *req) { telegram_conn *conn = tg->extra; - purple_proxy_connect (conn->gc, conn->pa, ip, port, tgprpl_login_on_connected, tg); + req->extra = tg; + purple_proxy_connect (conn->gc, conn->pa, req->DC->ip, req->DC->port, + tgprpl_login_on_connected, req); } /** @@ -391,7 +393,9 @@ void telegram_on_ready (struct telegram *instance) void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_message) { purple_debug_info(PLUGIN_ID, "tgprpl_login_on_connected()\n"); - struct telegram *tg = (struct telegram*)data; + struct proxy_request *req = (struct proxy_request*)data; + struct telegram *tg = req->tg; + if (fd == -1) { logprintf("purple_proxy_connect failed: %s\n", error_message); telegram_destroy(tg); @@ -402,7 +406,7 @@ void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_messa conn->fd = fd; conn->wh = purple_input_add(fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, conn); conn->rh = purple_input_add(fd, PURPLE_INPUT_READ, tgprpl_input_cb, conn); - conn->mtp = telegram_add_proxy(tg, fd, conn); + conn->mtp = telegram_add_proxy(tg, req, fd, conn); } void telegram_on_disconnected (struct telegram *tg) diff --git a/queries.c b/queries.c index 41b68d9..915796f 100644 --- a/queries.c +++ b/queries.c @@ -1781,9 +1781,8 @@ void print_user_info (struct user *U) { int user_info_on_answer (struct query *q UU) { struct mtproto_connection *mtp = query_get_mtproto(q); - // TODO: Use user info struct user *U = fetch_alloc_user_full (mtp); - event_user_info_received_handler(mtp->instance, U, (int)q->extra); + event_user_info_received_handler (mtp->instance, U, (int)q->extra); //print_user_info (U); return 0; } @@ -1854,11 +1853,17 @@ void end_load (struct telegram *instance, struct download *D) { close (D->fd); logprintf ("Done: %s\n", D->name); event_download_finished_handler(instance, D); + instance->dl_curr = 0; + if (D->dc != telegram_get_working_dc(instance)->id) { + logprintf ("%d Not the working dc %d, closing...\n", D->dc, + telegram_get_working_dc(instance)->id); + } if (D->iv) { tfree_secure (D->iv, 32); } tfree_str (D->name); tfree (D, sizeof (*D)); + telegram_dl_next (instance); } struct download_extra { @@ -1946,11 +1951,6 @@ void load_next_part (struct telegram *instance, struct download *D) { instance->cur_downloaded_bytes += D->offset; //update_prompt (); } - if(instance->auth.dc_working_num != D->dc) - { - logprintf ("Unsupported DC! Cancel query! \n"); - return; - } clear_packet (mtp); out_int (mtp, CODE_upload_get_file); if (!D->id) { @@ -1995,7 +1995,9 @@ void do_load_photo_size (struct telegram *instance, struct photo_size *P, void * D->extra = extra; D->name = 0; D->fd = -1; - load_next_part (instance, D); + + telegram_dl_add (instance, D); + telegram_dl_next (instance); } void do_load_photo (struct telegram *instance, struct photo *photo, int photoBig, void *extra) { @@ -2004,13 +2006,13 @@ void do_load_photo (struct telegram *instance, struct photo *photo, int photoBig int sizei = 0; int i; for (i = 0; i < photo->sizes_num; i++) { - if(photoBig == 0) + if (photoBig == 0) { if (photo->sizes[i].w + photo->sizes[i].h < size) { size = photo->sizes[i].w + photo->sizes[i].h; sizei = i; } - }else{ + } else { if (photo->sizes[i].w + photo->sizes[i].h > size) { size = photo->sizes[i].w + photo->sizes[i].h; sizei = i; @@ -2100,6 +2102,11 @@ void do_load_encr_video (struct telegram *instance, struct encr_video *V, void * /* {{{ Export auth */ +struct export_info { + void *extra; + void (*cb)(char *export_auth_str, int len, void *extra); +}; + int export_auth_on_answer (struct query *q UU) { struct mtproto_connection *mtp = query_get_mtproto(q); struct telegram *instance = mtp->connection->instance; @@ -2116,6 +2123,10 @@ int export_auth_on_answer (struct query *q UU) { memcpy (s, fetch_str (mtp, l), l); instance->export_auth_str_len = l; instance->export_auth_str = s; + + struct export_info *info = q->extra; + info->cb(instance->export_auth_str, instance->export_auth_str_len, info->extra); + tfree(info, sizeof(struct export_info)); return 0; } @@ -2124,27 +2135,39 @@ struct query_methods export_auth_methods = { .on_error = fail_on_error }; -void do_export_auth (struct telegram *instance, int num) { +void do_export_auth (struct telegram *instance, int num, void (*cb)(char *export_auth_str, int len, void *extra), void *extra) { struct dc *DC_working = telegram_get_working_dc(instance); struct mtproto_connection *mtp = instance->connection; instance->export_auth_str = 0; clear_packet (mtp); out_int (mtp, CODE_auth_export_authorization); out_int (mtp, num); - send_query (instance, DC_working, mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &export_auth_methods, 0); + + struct export_info *info = talloc0(sizeof(struct export_info)); + info->cb = cb; + info->extra = extra; + send_query (instance, DC_working, mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &export_auth_methods, info); } /* }}} */ +struct import_info { + void *extra; + void (*cb)(void* extra); +}; + /* {{{ Import auth */ int import_auth_on_answer (struct query *q UU) { struct mtproto_connection *mtp = query_get_mtproto(q); struct telegram *instance = mtp->connection->instance; + struct import_info *info = q->extra; assert (fetch_int (mtp) == (int)CODE_auth_authorization); fetch_int (mtp); // expires fetch_alloc_user (mtp); tfree_str (instance->export_auth_str); instance->export_auth_str = 0; + info->cb(info->extra); + tfree (info, sizeof(struct import_info)); return 0; } @@ -2153,14 +2176,19 @@ struct query_methods import_auth_methods = { .on_error = fail_on_error }; -void do_import_auth (struct telegram *instance, int num) { +void do_import_auth (struct telegram *instance, int num, void (*cb)(void *extra), void *extra) { struct mtproto_connection *mtp = instance->connection; + struct import_info *info = talloc0(sizeof (struct import_info)); + info->cb = cb; + info->extra = extra; clear_packet (mtp); out_int (mtp, CODE_auth_import_authorization); out_int (mtp, instance->our_id); out_cstring (mtp, instance->export_auth_str, instance->export_auth_str_len); - send_query (instance, instance->auth.DC_list[num], mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &import_auth_methods, 0); + + send_query (instance, instance->auth.DC_list[num], mtp->packet_ptr - mtp->packet_buffer, + mtp->packet_buffer, &import_auth_methods, info); } /* }}} */ diff --git a/queries.h b/queries.h index d6a9489..c8c93b7 100644 --- a/queries.h +++ b/queries.h @@ -57,7 +57,9 @@ struct download { unsigned char *iv; unsigned char *key; int type; + struct mtproto_connection *c; }; +void load_next_part (struct telegram *instance, struct download *D); struct event_timer { double timeout; @@ -128,8 +130,8 @@ void do_help_get_config (struct telegram *instance); void do_auth_check_phone (struct telegram *instance, const char *user); void do_get_nearest_dc (struct telegram*); void do_send_code_result_auth (struct telegram *instance, const char *code, const char *first_name, const char *last_name); -void do_import_auth (struct telegram *instance, int num); -void do_export_auth (struct telegram *instance, int num); +void do_import_auth (struct telegram *instance, int num, void (*cb)(void *extra), void *extra); +void do_export_auth (struct telegram *instance, int num, void (*cb)(char *export_auth_str, int len, void *extra), void *extra); void do_add_contact (struct telegram *instance, const char *phone, int phone_len, const char *first_name, int first_name_len, const char *last_name, int last_name_len, int force); void do_msg_search (struct telegram *instance, peer_id_t id, int from, int to, int limit, const char *s); void do_accept_encr_chat_request (struct telegram *instance, struct secret_chat *E); diff --git a/telegram.c b/telegram.c index d0304b7..60c8739 100755 --- a/telegram.c +++ b/telegram.c @@ -116,8 +116,6 @@ void telegram_change_state (struct telegram *instance, int state, void *data) err = ""; } logprintf("telegram errored: %s\n", err); - - // mark the connection for closing mtproto_close (instance->connection); } break; @@ -147,7 +145,6 @@ void telegram_change_state (struct telegram *instance, int state, void *data) logprintf("phone authenticion, user needs to enter code, first and last name.\n"); assert (instance->config->on_phone_registration_required); instance->config->on_phone_registration_required (instance); - // wait for user input ... break; case STATE_CLIENT_NOT_REGISTERED: @@ -178,8 +175,6 @@ void telegram_change_state (struct telegram *instance, int state, void *data) // close old connection and mark it for destruction mtproto_close (instance->connection); assert (instance->config->proxy_request_cb); - // tell the proxy to close all connections - //instance->config->proxy_close_cb (instance, instance->connection->connection->fd); // remove all left over queries and timers free_timers (instance); @@ -343,8 +338,17 @@ void telegram_store_session(struct telegram *instance) void on_authorized(struct mtproto_connection *c, void* data); +void telegram_main_connected (struct proxy_request *req) +{ + struct telegram *instance = req->data; + logprintf("Authorized... storing current session %d.\n", + instance->connection->connection->session[0]); + telegram_store_session(instance); + telegram_change_state(instance, STATE_AUTHORIZED, NULL); +} + /** - * Connect to the currently active data center + * Connect to the nearest data center */ void telegram_connect (struct telegram *instance) { @@ -354,8 +358,101 @@ void telegram_connect (struct telegram *instance) assert(0); } struct dc *DC_working = telegram_get_working_dc (instance); + + struct proxy_request *req = talloc0(sizeof(struct proxy_request)); + req->type = REQ_CONNECTION; + req->tg = instance; + req->data = instance; + req->DC = DC_working; + req->done = telegram_main_connected; + assert (instance->config->proxy_request_cb); - instance->config->proxy_request_cb (instance, DC_working->ip, DC_working->port); + instance->config->proxy_request_cb (instance, req); +} + +void on_auth_imported (void *extra) +{ + logprintf ("on_auth_imported()\n"); + struct download *dl = extra; + struct mtproto_connection *c = dl->c; + struct telegram *tg = c->instance; + bl_do_dc_signed (tg->bl, c, dl->dc); + write_auth_file (&tg->auth, tg->auth_path); + load_next_part (tg, dl); + telegram_flush (tg); +} + +void on_auth_exported (char *export_auth_str UU, int len UU, void *extra) +{ + logprintf ("on_auth_exported()\n"); + struct download *dl = extra; + do_import_auth (dl->c->instance, dl->dc, on_auth_imported, extra); + telegram_flush (dl->c->instance); +} + +void telegram_dl_connected (struct proxy_request *req) +{ + logprintf ("telegram_dl_connected(dc=%d)\n", req->DC->id); + struct telegram *tg = req->tg; + // TODO: error handling + + struct download *dl = req->data; + dl->c = req->conn; + struct dc *DC = tg->auth.DC_list[dl->dc]; + if (!DC->has_auth) { + do_export_auth (tg, dl->dc, on_auth_exported, dl); + telegram_flush (tg); + } else { + on_auth_imported (dl); + } +} + + +/** + * Create a connection for the given download + */ +void telegram_dl_add (struct telegram *instance, struct download *dl) +{ + logprintf ("telegram_connect_dl(dc=%d)\n", instance->auth.DC_list[dl->dc]); + if (!instance->dl_queue) { + instance->dl_queue = g_queue_new (); + } + g_queue_push_tail(instance->dl_queue, dl); +} + +void telegram_dl_next (struct telegram *instance) +{ + assert (instance->dl_queue); + assert (instance->config->proxy_request_cb); + if (!instance->dl_curr) { + struct download *dl = g_queue_pop_head (instance->dl_queue); + if (dl) { + struct proxy_request *req = talloc0(sizeof(struct proxy_request)); + req->type = REQ_DOWNLOAD; + req->DC = instance->auth.DC_list[dl->dc]; + req->tg = instance; + req->done = telegram_dl_connected; + req->data = dl; + instance->dl_curr = dl; + + logprintf ("telegrma_dl_start(): starting new download..\n"); + if (dl->dc == instance->auth.dc_working_num) { + logprintf ("is working DC, start download...\n"); + assert (telegram_get_working_dc(instance)->sessions[0]->c); + req->conn = instance->connection; + dl->c = req->conn; + telegram_dl_connected (req); + } else { + logprintf ("is remote DC, requesting connection...\n"); + instance->config->proxy_request_cb (instance, req); + } + } else { + logprintf ("telegrma_dl_start(): no more downloads, DONE!\n"); + mtproto_close_foreign (instance); + } + } else { + logprintf ("telegrma_dl_start(): download busy...\n"); + } } /** @@ -372,23 +469,28 @@ int telegram_login(struct telegram *instance) return 0; } -void on_authorized(struct mtproto_connection *c, void *data) +void on_authorized(struct mtproto_connection *c UU, void *data) { - struct telegram *instance = data; - logprintf("Authorized... storing current session %d.\n", c->connection->session[0]); - telegram_store_session(instance); - telegram_change_state(instance, STATE_AUTHORIZED, NULL); + logprintf ("on_authorized()...\n"); + struct proxy_request *req = data; + assert (req->done); + req->done (req); + tfree (req, sizeof(struct proxy_request)); } -struct mtproto_connection *telegram_add_proxy(struct telegram *instance, int fd, void *handle) +struct mtproto_connection *telegram_add_proxy(struct telegram *instance, struct proxy_request *req, + int fd, void *handle) { - struct dc *DC_working = telegram_get_working_dc (instance); - instance->connection = mtproto_new (DC_working, fd, instance); - instance->connection->handle = handle; - instance->connection->on_ready = on_authorized; - instance->connection->on_ready_data = instance; - mtproto_connect (instance->connection); - return instance->connection; + struct mtproto_connection *c = mtproto_new (req->DC, fd, instance); + c->handle = handle; + c->on_ready = on_authorized; + c->on_ready_data = req; + req->conn = c; + if (req->type == REQ_CONNECTION) { + req->tg->connection = c; + } + mtproto_connect (c); + return c; } void mtp_read_input (struct mtproto_connection *mtp) @@ -415,11 +517,11 @@ void telegram_flush (struct telegram *instance) if (!c) continue; if (!c->connection) continue; if (c->connection->out_bytes) { - logprintf ("connection %d has %d bytes, triggering on_output.", + logprintf ("connection %d has %d bytes, triggering on_output.\n", i, c->connection->out_bytes); instance->config->on_output(c->handle); } else { - logprintf ("connection %d has no bytes, skipping\n"); + logprintf ("connection %d has no bytes, skipping\n", i); } } } diff --git a/telegram.h b/telegram.h index 33b0351..d39862f 100644 --- a/telegram.h +++ b/telegram.h @@ -95,6 +95,18 @@ struct binlog { peer_t *Peers[MAX_PEER_NUM]; }; +#define REQ_CONNECTION 1 +#define REQ_DOWNLOAD 2 +struct proxy_request { + struct telegram *tg; + struct dc *DC; + struct mtproto_connection *conn; + int type; + void *data; + void (*done) (struct proxy_request *req); + void *extra; +}; + struct telegram; struct download; /** @@ -117,7 +129,7 @@ struct telegram_config { * and port by calling telegram_set_proxy. This is useful for tunelling * the connection through a proxy server. */ - void (*proxy_request_cb) (struct telegram *instance, const char *ip, int port); + void (*proxy_request_cb) (struct telegram *instance, struct proxy_request *req); /** * A callback function that is called once the proxy connection is no longer @@ -267,6 +279,12 @@ struct telegram { int cs; struct mtproto_connection *Cs[100]; + /* + * Downloads + */ + GQueue *dl_queue; + struct download *dl_curr; + /* * additional user data */ @@ -412,7 +430,7 @@ void set_net_write_cb(ssize_t (*cb)(int fd, const void *buff, size_t size)); * @param fd The file-descriptor of the acquired connection * @param handle A handle that will be passed back on output and close callbacks */ -struct mtproto_connection *telegram_add_proxy(struct telegram *tg, int fd, void *handle); +struct mtproto_connection *telegram_add_proxy(struct telegram *tg, struct proxy_request *req, int fd, void *handle); /** * Return wether telegram is authenticated with the currently active data center @@ -421,4 +439,7 @@ int telegram_authenticated (struct telegram *instance); void telegram_flush (struct telegram *instance); +void telegram_dl_add (struct telegram *instance, struct download *dl); +void telegram_dl_next (struct telegram *instance); + #endif