Support downloads from foreign DCs

This commit is contained in:
mjentsch 2014-10-02 12:29:02 +02:00
parent d6530b69bb
commit 5c488f9c8d
9 changed files with 228 additions and 57 deletions

View file

@ -1779,8 +1779,8 @@ int tc_becomes_ready (struct connection *c) {
send_req_pq_packet (c);
break;
case st_authorized:
auth_work_start (c);
telegram_change_state (c->instance, STATE_AUTHORIZED, NULL);
c->mtconnection->on_ready(c->mtconnection, c->mtconnection->on_ready_data);
//telegram_change_state (c->instance, STATE_AUTHORIZED, NULL);
break;
default:
logprintf ( "c_state = %d\n", c->mtconnection->c_state);
@ -1853,10 +1853,8 @@ struct mtproto_connection *mtproto_new(struct dc *DC, int fd, struct telegram *t
void mtproto_connect(struct mtproto_connection *c)
{
on_start(c);
c->connection->methods->ready(c->connection);
// Don't ping TODO: Really? Timeout callback functions of libpurple
start_ping_timer (c->connection);
c->connection->methods->ready(c->connection);
}
/**
@ -1872,11 +1870,10 @@ void mtproto_close(struct mtproto_connection *mtp) {
// in case the session is switched and this DC is not reachable anymore
if (mtp->connection) {
if (mtp->connection->session && mtp->connection->session->ack_tree) {
send_all_acks (mtp->connection->session);
struct session *S = mtp->connection->session;
send_all_acks (S);
mtp->instance->config->on_output(mtp->handle);
// connection no longer usable for session
mtp->connection->session->c = 0;
S->c = 0;
}
stop_ping_timer (mtp->connection);
}
@ -1893,6 +1890,20 @@ void mtproto_destroy (struct mtproto_connection *self) {
tfree(self, sizeof(struct mtproto_connection));
}
void mtproto_close_foreign (struct telegram *instance)
{
int i;
for (i = 0; i < 100; i++) {
struct mtproto_connection * c = instance->Cs[i];
if (c &&
!c->destroy &&
c->connection->session->dc->id != instance->auth.dc_working_num) {
logprintf ("closing connection for working_dc=%d, dc=%d\n",
instance->auth.dc_working_num, c->connection->session->dc->id);
mtproto_close (c);
}
}
}
/**
* Free all destroyed connections

View file

@ -506,6 +506,7 @@ static inline void hexdump_out (struct mtproto_connection *self) {
#endif
void my_clock_gettime (int clock_id, struct timespec *T);
void mtproto_close_foreign (struct telegram *instance);
void mtproto_free_closed (struct telegram *tg);
#endif

8
net.c
View file

@ -679,7 +679,8 @@ void dc_create_session (struct dc *DC) {
* Wrap an existing socket file descriptor and make it usable as a connection,
*/
struct connection *fd_create_connection (struct dc *DC, int fd,
struct telegram *instance, struct connection_methods *methods, struct mtproto_connection *mtp) {
struct telegram *instance, struct connection_methods *methods,
struct mtproto_connection *mtp) {
// create a connection
struct connection *c = talloc0 (sizeof (*c));
@ -693,8 +694,6 @@ struct connection *fd_create_connection (struct dc *DC, int fd,
c->last_receive_time = get_double_time ();
logprintf ( "connect to %s:%d successful\n", DC->ip, DC->port);
// TODO: Load existing session from state file
// create an empty session and attach it to the dc and the connection
if (!DC->sessions[0]) {
struct session *S = talloc0 (sizeof (*S));
assert (RAND_pseudo_bytes ((unsigned char *) &S->session_id, 8) >= 0);
@ -702,6 +701,9 @@ struct connection *fd_create_connection (struct dc *DC, int fd,
S->c = c;
DC->sessions[0] = S;
}
if (!DC->sessions[0]->c) {
DC->sessions[0]->c = c;
}
// add backreference to session
c->session = DC->sessions[0];

2
net.h
View file

@ -148,7 +148,7 @@ void dc_create_session (struct dc *DC);
void insert_msg_id (struct session *S, long long id);
struct dc *alloc_dc (struct dc* DC_list[], int id, char *ip, int port);
#define GET_DC(c) (telegram_get_working_dc(c->instance))
#define GET_DC(c) (c->session->dc)
// export read and write methods to redirect network control
void try_read (struct connection *c);

View file

@ -263,10 +263,12 @@ static void init_dc_settings(PurpleAccount *acc, struct dc *DC)
/**
* Telegram requests a new connectino to our configured proxy
*/
void telegram_on_proxy_request(struct telegram *tg, const char *ip, int port)
void telegram_on_proxy_request(struct telegram *tg, struct proxy_request *req)
{
telegram_conn *conn = tg->extra;
purple_proxy_connect (conn->gc, conn->pa, ip, port, tgprpl_login_on_connected, tg);
req->extra = tg;
purple_proxy_connect (conn->gc, conn->pa, req->DC->ip, req->DC->port,
tgprpl_login_on_connected, req);
}
/**
@ -391,7 +393,9 @@ void telegram_on_ready (struct telegram *instance)
void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_message)
{
purple_debug_info(PLUGIN_ID, "tgprpl_login_on_connected()\n");
struct telegram *tg = (struct telegram*)data;
struct proxy_request *req = (struct proxy_request*)data;
struct telegram *tg = req->tg;
if (fd == -1) {
logprintf("purple_proxy_connect failed: %s\n", error_message);
telegram_destroy(tg);
@ -402,7 +406,7 @@ void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_messa
conn->fd = fd;
conn->wh = purple_input_add(fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, conn);
conn->rh = purple_input_add(fd, PURPLE_INPUT_READ, tgprpl_input_cb, conn);
conn->mtp = telegram_add_proxy(tg, fd, conn);
conn->mtp = telegram_add_proxy(tg, req, fd, conn);
}
void telegram_on_disconnected (struct telegram *tg)

View file

@ -1781,9 +1781,8 @@ void print_user_info (struct user *U) {
int user_info_on_answer (struct query *q UU) {
struct mtproto_connection *mtp = query_get_mtproto(q);
// TODO: Use user info
struct user *U = fetch_alloc_user_full (mtp);
event_user_info_received_handler(mtp->instance, U, (int)q->extra);
event_user_info_received_handler (mtp->instance, U, (int)q->extra);
//print_user_info (U);
return 0;
}
@ -1854,11 +1853,17 @@ void end_load (struct telegram *instance, struct download *D) {
close (D->fd);
logprintf ("Done: %s\n", D->name);
event_download_finished_handler(instance, D);
instance->dl_curr = 0;
if (D->dc != telegram_get_working_dc(instance)->id) {
logprintf ("%d Not the working dc %d, closing...\n", D->dc,
telegram_get_working_dc(instance)->id);
}
if (D->iv) {
tfree_secure (D->iv, 32);
}
tfree_str (D->name);
tfree (D, sizeof (*D));
telegram_dl_next (instance);
}
struct download_extra {
@ -1946,11 +1951,6 @@ void load_next_part (struct telegram *instance, struct download *D) {
instance->cur_downloaded_bytes += D->offset;
//update_prompt ();
}
if(instance->auth.dc_working_num != D->dc)
{
logprintf ("Unsupported DC! Cancel query! \n");
return;
}
clear_packet (mtp);
out_int (mtp, CODE_upload_get_file);
if (!D->id) {
@ -1995,7 +1995,9 @@ void do_load_photo_size (struct telegram *instance, struct photo_size *P, void *
D->extra = extra;
D->name = 0;
D->fd = -1;
load_next_part (instance, D);
telegram_dl_add (instance, D);
telegram_dl_next (instance);
}
void do_load_photo (struct telegram *instance, struct photo *photo, int photoBig, void *extra) {
@ -2004,13 +2006,13 @@ void do_load_photo (struct telegram *instance, struct photo *photo, int photoBig
int sizei = 0;
int i;
for (i = 0; i < photo->sizes_num; i++) {
if(photoBig == 0)
if (photoBig == 0)
{
if (photo->sizes[i].w + photo->sizes[i].h < size) {
size = photo->sizes[i].w + photo->sizes[i].h;
sizei = i;
}
}else{
} else {
if (photo->sizes[i].w + photo->sizes[i].h > size) {
size = photo->sizes[i].w + photo->sizes[i].h;
sizei = i;
@ -2100,6 +2102,11 @@ void do_load_encr_video (struct telegram *instance, struct encr_video *V, void *
/* {{{ Export auth */
struct export_info {
void *extra;
void (*cb)(char *export_auth_str, int len, void *extra);
};
int export_auth_on_answer (struct query *q UU) {
struct mtproto_connection *mtp = query_get_mtproto(q);
struct telegram *instance = mtp->connection->instance;
@ -2116,6 +2123,10 @@ int export_auth_on_answer (struct query *q UU) {
memcpy (s, fetch_str (mtp, l), l);
instance->export_auth_str_len = l;
instance->export_auth_str = s;
struct export_info *info = q->extra;
info->cb(instance->export_auth_str, instance->export_auth_str_len, info->extra);
tfree(info, sizeof(struct export_info));
return 0;
}
@ -2124,27 +2135,39 @@ struct query_methods export_auth_methods = {
.on_error = fail_on_error
};
void do_export_auth (struct telegram *instance, int num) {
void do_export_auth (struct telegram *instance, int num, void (*cb)(char *export_auth_str, int len, void *extra), void *extra) {
struct dc *DC_working = telegram_get_working_dc(instance);
struct mtproto_connection *mtp = instance->connection;
instance->export_auth_str = 0;
clear_packet (mtp);
out_int (mtp, CODE_auth_export_authorization);
out_int (mtp, num);
send_query (instance, DC_working, mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &export_auth_methods, 0);
struct export_info *info = talloc0(sizeof(struct export_info));
info->cb = cb;
info->extra = extra;
send_query (instance, DC_working, mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &export_auth_methods, info);
}
/* }}} */
struct import_info {
void *extra;
void (*cb)(void* extra);
};
/* {{{ Import auth */
int import_auth_on_answer (struct query *q UU) {
struct mtproto_connection *mtp = query_get_mtproto(q);
struct telegram *instance = mtp->connection->instance;
struct import_info *info = q->extra;
assert (fetch_int (mtp) == (int)CODE_auth_authorization);
fetch_int (mtp); // expires
fetch_alloc_user (mtp);
tfree_str (instance->export_auth_str);
instance->export_auth_str = 0;
info->cb(info->extra);
tfree (info, sizeof(struct import_info));
return 0;
}
@ -2153,14 +2176,19 @@ struct query_methods import_auth_methods = {
.on_error = fail_on_error
};
void do_import_auth (struct telegram *instance, int num) {
void do_import_auth (struct telegram *instance, int num, void (*cb)(void *extra), void *extra) {
struct mtproto_connection *mtp = instance->connection;
struct import_info *info = talloc0(sizeof (struct import_info));
info->cb = cb;
info->extra = extra;
clear_packet (mtp);
out_int (mtp, CODE_auth_import_authorization);
out_int (mtp, instance->our_id);
out_cstring (mtp, instance->export_auth_str, instance->export_auth_str_len);
send_query (instance, instance->auth.DC_list[num], mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &import_auth_methods, 0);
send_query (instance, instance->auth.DC_list[num], mtp->packet_ptr - mtp->packet_buffer,
mtp->packet_buffer, &import_auth_methods, info);
}
/* }}} */

View file

@ -57,7 +57,9 @@ struct download {
unsigned char *iv;
unsigned char *key;
int type;
struct mtproto_connection *c;
};
void load_next_part (struct telegram *instance, struct download *D);
struct event_timer {
double timeout;
@ -128,8 +130,8 @@ void do_help_get_config (struct telegram *instance);
void do_auth_check_phone (struct telegram *instance, const char *user);
void do_get_nearest_dc (struct telegram*);
void do_send_code_result_auth (struct telegram *instance, const char *code, const char *first_name, const char *last_name);
void do_import_auth (struct telegram *instance, int num);
void do_export_auth (struct telegram *instance, int num);
void do_import_auth (struct telegram *instance, int num, void (*cb)(void *extra), void *extra);
void do_export_auth (struct telegram *instance, int num, void (*cb)(char *export_auth_str, int len, void *extra), void *extra);
void do_add_contact (struct telegram *instance, const char *phone, int phone_len, const char *first_name, int first_name_len, const char *last_name, int last_name_len, int force);
void do_msg_search (struct telegram *instance, peer_id_t id, int from, int to, int limit, const char *s);
void do_accept_encr_chat_request (struct telegram *instance, struct secret_chat *E);

View file

@ -116,8 +116,6 @@ void telegram_change_state (struct telegram *instance, int state, void *data)
err = "<no description>";
}
logprintf("telegram errored: %s\n", err);
// mark the connection for closing
mtproto_close (instance->connection);
}
break;
@ -147,7 +145,6 @@ void telegram_change_state (struct telegram *instance, int state, void *data)
logprintf("phone authenticion, user needs to enter code, first and last name.\n");
assert (instance->config->on_phone_registration_required);
instance->config->on_phone_registration_required (instance);
// wait for user input ...
break;
case STATE_CLIENT_NOT_REGISTERED:
@ -178,8 +175,6 @@ void telegram_change_state (struct telegram *instance, int state, void *data)
// close old connection and mark it for destruction
mtproto_close (instance->connection);
assert (instance->config->proxy_request_cb);
// tell the proxy to close all connections
//instance->config->proxy_close_cb (instance, instance->connection->connection->fd);
// remove all left over queries and timers
free_timers (instance);
@ -343,8 +338,17 @@ void telegram_store_session(struct telegram *instance)
void on_authorized(struct mtproto_connection *c, void* data);
void telegram_main_connected (struct proxy_request *req)
{
struct telegram *instance = req->data;
logprintf("Authorized... storing current session %d.\n",
instance->connection->connection->session[0]);
telegram_store_session(instance);
telegram_change_state(instance, STATE_AUTHORIZED, NULL);
}
/**
* Connect to the currently active data center
* Connect to the nearest data center
*/
void telegram_connect (struct telegram *instance)
{
@ -354,8 +358,101 @@ void telegram_connect (struct telegram *instance)
assert(0);
}
struct dc *DC_working = telegram_get_working_dc (instance);
struct proxy_request *req = talloc0(sizeof(struct proxy_request));
req->type = REQ_CONNECTION;
req->tg = instance;
req->data = instance;
req->DC = DC_working;
req->done = telegram_main_connected;
assert (instance->config->proxy_request_cb);
instance->config->proxy_request_cb (instance, DC_working->ip, DC_working->port);
instance->config->proxy_request_cb (instance, req);
}
void on_auth_imported (void *extra)
{
logprintf ("on_auth_imported()\n");
struct download *dl = extra;
struct mtproto_connection *c = dl->c;
struct telegram *tg = c->instance;
bl_do_dc_signed (tg->bl, c, dl->dc);
write_auth_file (&tg->auth, tg->auth_path);
load_next_part (tg, dl);
telegram_flush (tg);
}
void on_auth_exported (char *export_auth_str UU, int len UU, void *extra)
{
logprintf ("on_auth_exported()\n");
struct download *dl = extra;
do_import_auth (dl->c->instance, dl->dc, on_auth_imported, extra);
telegram_flush (dl->c->instance);
}
void telegram_dl_connected (struct proxy_request *req)
{
logprintf ("telegram_dl_connected(dc=%d)\n", req->DC->id);
struct telegram *tg = req->tg;
// TODO: error handling
struct download *dl = req->data;
dl->c = req->conn;
struct dc *DC = tg->auth.DC_list[dl->dc];
if (!DC->has_auth) {
do_export_auth (tg, dl->dc, on_auth_exported, dl);
telegram_flush (tg);
} else {
on_auth_imported (dl);
}
}
/**
* Create a connection for the given download
*/
void telegram_dl_add (struct telegram *instance, struct download *dl)
{
logprintf ("telegram_connect_dl(dc=%d)\n", instance->auth.DC_list[dl->dc]);
if (!instance->dl_queue) {
instance->dl_queue = g_queue_new ();
}
g_queue_push_tail(instance->dl_queue, dl);
}
void telegram_dl_next (struct telegram *instance)
{
assert (instance->dl_queue);
assert (instance->config->proxy_request_cb);
if (!instance->dl_curr) {
struct download *dl = g_queue_pop_head (instance->dl_queue);
if (dl) {
struct proxy_request *req = talloc0(sizeof(struct proxy_request));
req->type = REQ_DOWNLOAD;
req->DC = instance->auth.DC_list[dl->dc];
req->tg = instance;
req->done = telegram_dl_connected;
req->data = dl;
instance->dl_curr = dl;
logprintf ("telegrma_dl_start(): starting new download..\n");
if (dl->dc == instance->auth.dc_working_num) {
logprintf ("is working DC, start download...\n");
assert (telegram_get_working_dc(instance)->sessions[0]->c);
req->conn = instance->connection;
dl->c = req->conn;
telegram_dl_connected (req);
} else {
logprintf ("is remote DC, requesting connection...\n");
instance->config->proxy_request_cb (instance, req);
}
} else {
logprintf ("telegrma_dl_start(): no more downloads, DONE!\n");
mtproto_close_foreign (instance);
}
} else {
logprintf ("telegrma_dl_start(): download busy...\n");
}
}
/**
@ -372,23 +469,28 @@ int telegram_login(struct telegram *instance)
return 0;
}
void on_authorized(struct mtproto_connection *c, void *data)
void on_authorized(struct mtproto_connection *c UU, void *data)
{
struct telegram *instance = data;
logprintf("Authorized... storing current session %d.\n", c->connection->session[0]);
telegram_store_session(instance);
telegram_change_state(instance, STATE_AUTHORIZED, NULL);
logprintf ("on_authorized()...\n");
struct proxy_request *req = data;
assert (req->done);
req->done (req);
tfree (req, sizeof(struct proxy_request));
}
struct mtproto_connection *telegram_add_proxy(struct telegram *instance, int fd, void *handle)
struct mtproto_connection *telegram_add_proxy(struct telegram *instance, struct proxy_request *req,
int fd, void *handle)
{
struct dc *DC_working = telegram_get_working_dc (instance);
instance->connection = mtproto_new (DC_working, fd, instance);
instance->connection->handle = handle;
instance->connection->on_ready = on_authorized;
instance->connection->on_ready_data = instance;
mtproto_connect (instance->connection);
return instance->connection;
struct mtproto_connection *c = mtproto_new (req->DC, fd, instance);
c->handle = handle;
c->on_ready = on_authorized;
c->on_ready_data = req;
req->conn = c;
if (req->type == REQ_CONNECTION) {
req->tg->connection = c;
}
mtproto_connect (c);
return c;
}
void mtp_read_input (struct mtproto_connection *mtp)
@ -415,11 +517,11 @@ void telegram_flush (struct telegram *instance)
if (!c) continue;
if (!c->connection) continue;
if (c->connection->out_bytes) {
logprintf ("connection %d has %d bytes, triggering on_output.",
logprintf ("connection %d has %d bytes, triggering on_output.\n",
i, c->connection->out_bytes);
instance->config->on_output(c->handle);
} else {
logprintf ("connection %d has no bytes, skipping\n");
logprintf ("connection %d has no bytes, skipping\n", i);
}
}
}

View file

@ -95,6 +95,18 @@ struct binlog {
peer_t *Peers[MAX_PEER_NUM];
};
#define REQ_CONNECTION 1
#define REQ_DOWNLOAD 2
struct proxy_request {
struct telegram *tg;
struct dc *DC;
struct mtproto_connection *conn;
int type;
void *data;
void (*done) (struct proxy_request *req);
void *extra;
};
struct telegram;
struct download;
/**
@ -117,7 +129,7 @@ struct telegram_config {
* and port by calling telegram_set_proxy. This is useful for tunelling
* the connection through a proxy server.
*/
void (*proxy_request_cb) (struct telegram *instance, const char *ip, int port);
void (*proxy_request_cb) (struct telegram *instance, struct proxy_request *req);
/**
* A callback function that is called once the proxy connection is no longer
@ -267,6 +279,12 @@ struct telegram {
int cs;
struct mtproto_connection *Cs[100];
/*
* Downloads
*/
GQueue *dl_queue;
struct download *dl_curr;
/*
* additional user data
*/
@ -412,7 +430,7 @@ void set_net_write_cb(ssize_t (*cb)(int fd, const void *buff, size_t size));
* @param fd The file-descriptor of the acquired connection
* @param handle A handle that will be passed back on output and close callbacks
*/
struct mtproto_connection *telegram_add_proxy(struct telegram *tg, int fd, void *handle);
struct mtproto_connection *telegram_add_proxy(struct telegram *tg, struct proxy_request *req, int fd, void *handle);
/**
* Return wether telegram is authenticated with the currently active data center
@ -421,4 +439,7 @@ int telegram_authenticated (struct telegram *instance);
void telegram_flush (struct telegram *instance);
void telegram_dl_add (struct telegram *instance, struct download *dl);
void telegram_dl_next (struct telegram *instance);
#endif