Use a separate handles for each mtproto_connection

This commit is contained in:
mjentsch 2014-09-17 22:51:46 +02:00
parent dcf9b4f427
commit 4a062b764f
8 changed files with 184 additions and 138 deletions

View file

@ -1843,11 +1843,12 @@ void mtproto_connect(struct mtproto_connection *c)
c->connection->methods->ready(c->connection);
// Don't ping TODO: Really? Timeout callback functions of libpurple
//start_ping_timer (c->connection);
start_ping_timer (c->connection);
}
/**
* Free all used resources and close the connection
* Mark the connection for destruction, stop all timers and initiate
* cleanup tasks
*/
void mtproto_close(struct mtproto_connection *mtp) {
logprintf ("closing mtproto_connection...\n");
@ -1856,24 +1857,26 @@ void mtproto_close(struct mtproto_connection *mtp) {
// send all pending acks on this connection so the server won't
// resend messages. We might not be able to send the acknowledgements later
// in case the session is switched and this DC is not reachable anymore
send_all_acks (mtp->connection->session);
// remove all ping timer that point to this connection
//stop_ping_timer (mtp->connection);
if (mtp->connection) {
if (mtp->connection->session && mtp->connection->session->ack_tree) {
send_all_acks (mtp->connection->session);
mtp->instance->config->on_output(mtp->handle);
//
mtp->connection->session->c = 0;
// connection no longer usable for session
mtp->connection->session->c = 0;
}
stop_ping_timer (mtp->connection);
}
// remove all ping timer that point to this connection
}
/**
* Close the connection and
* Close the underlying file descriptor
*/
void mtproto_destroy (struct mtproto_connection *self) {
logprintf("destroying mtproto_connection: %p\n", self);
// TODO: Call destruction callback
self->instance->config->proxy_close_cb(self->handle);
fd_close_connection(self->connection);
tfree(self->connection, sizeof(struct connection));
tfree(self, sizeof(struct mtproto_connection));
}
@ -1889,13 +1892,15 @@ void mtproto_free_closed (struct telegram *tg) {
logprintf ("checking mtproto_connection %d: c_state:%d destroy:%d, quries_num:%d\n",
i, c->c_state, c->destroy, c->queries_num);
if (c->destroy == 0) continue;
if (c->queries_num > 0) {
logprintf ("still pending queries left, skipping connection...\n");
if (c->connection->out_bytes > 0) {
logprintf ("still %d bytes ouput left, skipping connection...\n", c->connection->out_bytes);
continue;
}
mtproto_destroy (c);
if (tg->connection == c) {
tg->connection = NULL;
}
tg->Cs[i] = NULL;
tg->connection = NULL;
}
}

View file

@ -250,6 +250,7 @@ struct mtproto_connection {
// the corresponding telegram instance
//
struct telegram *instance;
void *handle;
};
void mtproto_connection_init (struct mtproto_connection *c);

1
net.c
View file

@ -730,5 +730,6 @@ void fd_close_connection(struct connection *c) {
c->out_head = c->out_tail = c->in_head = c->in_tail = 0;
c->state = conn_stopped;
c->out_bytes = c->in_bytes = 0;
tfree(c, sizeof(struct connection));
}

View file

@ -79,7 +79,7 @@ void on_new_user_status(struct telegram *instance, void *user);
void on_user_typing(struct telegram *instance, void *user);
void on_chat_joined (struct telegram *instance, peer_id_t chatid);
static PurpleChat *blist_find_chat_by_id(PurpleConnection *gc, const char *id);
static void tgprpl_has_output(struct telegram *tg);
static void tgprpl_has_output(void *handle);
static const char *chat_id_get_comp_val (PurpleConnection *gc, int id, char *value)
{
@ -107,7 +107,7 @@ static PurpleConversation *chat_show (PurpleConnection *gc, int id)
logprintf ("joining chat first...\n");
telegram_conn *conn = purple_connection_get_protocol_data(gc);
do_get_chat_info (conn->tg, MK_CHAT(id));
tgprpl_has_output (conn->tg);
telegram_flush (conn->tg);
}
return convo;
}
@ -175,52 +175,73 @@ static void login_verification_fail(PurpleAccount *acct)
"Please make sure you entered the correct verification code.", NULL, NULL, NULL);
}
/* OUTPUT */
/**
* Libpurple announced that new output should be written to the write-handle
*/
static void tgprpl_output_cb(gpointer data, gint source, PurpleInputCondition cond)
{
logprintf("tgprpl_output_cb()\n");
struct telegram *tg = data;
telegram_conn *conn = tg->extra;
if (telegram_write_output(tg) == 0) {
mtproto_handle *conn = data;
logprintf("tgprpl_output_cb(%p)\n", data);
logprintf("mtp=%p, fd=%d, rh=%d, wh=%d\n", conn->mtp, conn->fd, conn->rh, conn->wh);
if (!conn->mtp) {
logprintf ("connection no loner existing, do nothing. \n");
return;
}
if (mtp_write_output(conn->mtp) == 0) {
logprintf("no output, removing output...\n");
purple_input_remove(conn->wh);
conn->wh = 0;
}
}
static void tgprpl_has_output(struct telegram *tg)
/**
* Telegram announced new output in its buffers
*/
static void tgprpl_has_output(void *handle)
{
logprintf("tgprpl_has_output()\n");
telegram_conn *conn = tg->extra;
logprintf("tgprpl_has_output(%p)\n", handle);
mtproto_handle *conn = handle;
if (! conn->wh) {
conn->wh = purple_input_add(telegram_get_connection(tg)->fd, PURPLE_INPUT_WRITE,
tgprpl_output_cb, tg);
logprintf("Attached write handle: %u ", conn->wh);
conn->wh = purple_input_add(conn->fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, handle);
}
}
/*
* Libpurple announced that new input should be read from the read-handle
*/
static void tgprpl_input_cb(gpointer data, gint source, PurpleInputCondition cond)
{
struct telegram *tg = data;
//telegram_conn *conn = tg->extra;
mtproto_handle *conn = data;
logprintf("tgprpl_input_cb()\n");
if (!conn->fd) {
logprintf("conn for handle no longer existing, not reading input\n");
return;
}
mtp_read_input(conn->mtp);
// TODO: remove input handler when no more input
telegram_read_input(tg);
if (telegram_has_output(tg)) {
tgprpl_has_output(tg);
if (conn->mtp) {
// processing of the answer may have inserted new queries
telegram_flush (conn->mtp->instance);
// free all mtproto_connections that may have errored
mtproto_free_closed(conn->mtp->instance);
}
}
static void tgprpl_has_input(struct telegram *tg)
/**
* Telegram announced that it awaits new input from the read-handle
* TODO: this is currently unused, evaluate wether its needed at all
*/
static void tgprpl_has_input(void *handle)
{
logprintf("tgprpl_has_input()\n");
telegram_conn *conn = tg->extra;
mtproto_handle *conn = handle;
if (! conn->rh) {
conn->rh = purple_input_add(telegram_get_connection(tg)->fd, PURPLE_INPUT_READ,
tgprpl_input_cb, tg);
conn->rh = purple_input_add(conn->fd, PURPLE_INPUT_READ, tgprpl_input_cb, handle);
logprintf("Attached read handle: %u ", conn->rh);
}
}
static void init_dc_settings(PurpleAccount *acc, struct dc *DC)
@ -231,15 +252,12 @@ static void init_dc_settings(PurpleAccount *acc, struct dc *DC)
}
/**
* Handle a proxy-request of telegram
*
* Request a new proxy connection from purple, and execute tgprpl_login_on_connected
* as callback once the connection is ready
* Telegram requests a new connectino to our configured proxy
*/
void telegram_on_proxy_request(struct telegram *instance, const char *ip, int port)
void telegram_on_proxy_request(struct telegram *tg, const char *ip, int port)
{
telegram_conn *conn = instance->extra;
purple_proxy_connect (conn->gc, conn->pa, ip, port, tgprpl_login_on_connected, conn->tg);
telegram_conn *conn = tg->extra;
purple_proxy_connect (conn->gc, conn->pa, ip, port, tgprpl_login_on_connected, tg);
}
/**
@ -247,11 +265,20 @@ void telegram_on_proxy_request(struct telegram *instance, const char *ip, int po
*
* Remove all open inputs added to purple
*/
void telegram_on_proxy_close(struct telegram *instance, int fd UU)
void telegram_on_proxy_close(void *handle)
{
telegram_conn *conn = instance->extra;
purple_input_remove (conn->rh);
purple_input_remove (conn->wh);
mtproto_handle *conn = handle;
logprintf ("Closing proxy-handles - fd: %d, write-handle: %d, read-handle: %d\n",
conn->fd, conn->wh, conn->rh);
if (conn->rh) {
purple_input_remove (conn->rh);
}
if (conn->wh) {
purple_input_remove (conn->wh);
}
conn->rh = conn->wh = 0;
conn->mtp = 0;
tfree (conn, sizeof(mtproto_handle));
}
void telegram_on_phone_registration (struct telegram *instance)
@ -278,7 +305,7 @@ void client_registration_entered (gpointer data, const gchar *code)
{
struct telegram *tg = data;
do_send_code_result (tg, code);
tgprpl_has_output (tg);
telegram_flush (tg);
}
void client_registration_canceled (gpointer data)
@ -341,8 +368,31 @@ void telegram_on_ready (struct telegram *instance)
do_update_contact_list(instance);
do_get_dialog_list(instance);
do_get_difference(instance);
tgprpl_has_output(instance);
conn->timer = purple_timeout_add (500, queries_timerfunc, conn);
telegram_flush (conn->tg);
conn->timer = purple_timeout_add (5000, queries_timerfunc, conn);
}
/**
* A proxy connection was created by purple
*
* Use the connection to create a new mtproto-connection and create a handle,
* with additional info for libpurple associated with the new connection
*/
void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_message)
{
purple_debug_info(PLUGIN_ID, "tgprpl_login_on_connected()\n");
struct telegram *tg = (struct telegram*)data;
if (fd == -1) {
logprintf("purple_proxy_connect failed: %s\n", error_message);
telegram_destroy(tg);
return;
}
mtproto_handle *conn = talloc(sizeof (mtproto_handle));
conn->fd = fd;
conn->wh = purple_input_add(fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, conn);
conn->rh = purple_input_add(fd, PURPLE_INPUT_READ, tgprpl_input_cb, conn);
conn->mtp = telegram_add_proxy(tg, fd, conn);
}
void telegram_on_disconnected (struct telegram *tg)
@ -351,32 +401,9 @@ void telegram_on_disconnected (struct telegram *tg)
assert (0);
}
/**
* A proxy connection was created by purple
*
* Set the proxy to the current telegram-instance, and add callbacks to monitor
*/
void tgprpl_login_on_connected(gpointer *data, gint fd, const gchar *error_message)
{
purple_debug_info(PLUGIN_ID, "tgprpl_login_on_connected()\n");
struct telegram *tg = (struct telegram*) data;
telegram_conn *conn = tg->extra;
if (fd == -1) {
logprintf("purple_proxy_connect failed: %s\n", error_message);
telegram_destroy(tg);
return;
}
purple_debug_info(PLUGIN_ID, "Connecting to the telegram network...\n");
conn->wh = purple_input_add(fd, PURPLE_INPUT_WRITE, tgprpl_output_cb, tg);
conn->rh = purple_input_add(fd, PURPLE_INPUT_READ, tgprpl_input_cb, tg);
telegram_set_proxy(tg, fd);
}
struct telegram_config tgconf = {
NULL,
NULL, // on output
tgprpl_has_output, // on output
telegram_on_proxy_request,
telegram_on_proxy_close,
telegram_on_phone_registration,
@ -400,22 +427,21 @@ static void tgprpl_login(PurpleAccount * acct)
purple_debug_info(PLUGIN_ID, "tgprpl_login()\n");
PurpleConnection *gc = purple_account_get_connection(acct);
char const *username = purple_account_get_username(acct);
struct dc DC;
init_dc_settings(acct, &DC);
// TODO: fetch current home directory
// use this as root
// create a new instance of telegram
struct telegram *tg = telegram_new (&DC, username, &tgconf);
telegram_restore_session(tg);
// create handle to store additional info for libpurple in
// the new telegram instance
telegram_conn *conn = g_new0(telegram_conn, 1);
conn->tg = tg;
conn->gc = gc;
conn->pa = acct;
purple_connection_set_protocol_data(gc, conn);
tg->extra = conn;
purple_connection_set_state (conn->gc, PURPLE_CONNECTING);
telegram_connect (tg);
}
@ -658,7 +684,7 @@ static int tgprpl_send_im(PurpleConnection * gc, const char *who, const char *me
PurpleBuddy *b = purple_find_buddy (pa, who);
peer_id_t *peer = purple_buddy_get_protocol_data (b);
do_send_message (conn->tg, *peer, message, strlen(message));
tgprpl_has_output (conn->tg);
telegram_flush (conn->tg);
return 1;
}
@ -913,7 +939,7 @@ static void tgprpl_chat_join(PurpleConnection * gc, GHashTable * data)
logprintf ("chat now known\n");
char *subject, *owner, *part;
do_get_chat_info (conn->tg, MK_CHAT(atoi(id)));
tgprpl_has_output (conn->tg);
telegram_flush (conn->tg);
} else {
logprintf ("chat already known\n");
serv_got_joined_chat(conn->gc, atoi(id), groupname);

View file

@ -34,21 +34,12 @@
#include "version.h"
#include "account.h"
#include "connection.h"
#include "mtproto-client.h"
typedef struct {
struct telegram *tg;
PurpleAccount *pa;
PurpleConnection *gc;
/**
* Write handler returned by purple_input_add
*/
guint wh;
/**
* Read handler returned by purple_input_add
*/
guint rh;
/**
* Whether the state of the protocol has changed since the last save
@ -62,4 +53,28 @@ typedef struct {
} telegram_conn;
typedef struct {
/**
* The mtproto_connection associated with this handle
*/
struct mtproto_connection *mtp;
/**
* Write handler returned by purple_input_add
*/
guint wh;
/**
* Read handler returned by purple_input_add
*/
guint rh;
/**
* The file descriptor of the used socket
*/
int fd;
} mtproto_handle;
#endif

View file

@ -65,10 +65,6 @@ int offline_mode = 0;
extern int sync_from_start;
int sync_from_start = 0;
void telegram_flush_queries (struct telegram *instance) {
instance->config->on_output(instance);
}
#define memcmp8(a,b) memcmp ((a), (b), 8)
DEFINE_TREE (query, struct query *, memcmp8, 0) ;
@ -3071,14 +3067,3 @@ void do_update_typing (struct telegram *instance, peer_id_t id) {
send_query (instance, DC_working, mtp->packet_ptr - mtp->packet_buffer, mtp->packet_buffer, &update_typing_methods, 0);
}
int telegram_has_output (struct telegram *instance)
{
if (!instance->connection) {
return 0;
}
if (instance->session_state == STATE_READY) {
return tree_count_query (instance->queries_tree) > 0;
}
return instance->connection->queries_num > 0;
}

View file

@ -159,12 +159,12 @@ void telegram_change_state (struct telegram *instance, int state, void *data)
mtproto_close (instance->connection);
assert (instance->config->proxy_request_cb);
// tell the proxy to close all connections
instance->config->proxy_close_cb (instance, instance->connection->connection->fd);
//instance->config->proxy_close_cb (instance, instance->connection->connection->fd);
// remove all left over queries and timers
free_timers (instance);
free_queries (instance);
// start a new connection to the demanded data center. The pointer to the
// new dc should was already updated by the on_error function of the query
telegram_connect (instance);
@ -359,26 +359,25 @@ void on_authorized(struct mtproto_connection *c, void *data)
telegram_change_state(instance, STATE_AUTHORIZED, NULL);
}
void telegram_read_input (struct telegram *instance)
{
try_read (instance->connection->connection);
// free all mtproto_connections that may have errored
mtproto_free_closed(instance);
}
void telegram_set_proxy(struct telegram *instance, int fd)
struct mtproto_connection *telegram_add_proxy(struct telegram *instance, int fd, void *handle)
{
struct dc *DC_working = telegram_get_working_dc (instance);
instance->connection = mtproto_new (DC_working, fd, instance);
instance->connection->handle = handle;
instance->connection->on_ready = on_authorized;
instance->connection->on_ready_data = instance;
mtproto_connect (instance->connection);
return instance->connection;
}
int telegram_write_output (struct telegram *instance)
void mtp_read_input (struct mtproto_connection *mtp)
{
return try_write(instance->connection->connection);
try_read (mtp->connection);
}
int mtp_write_output (struct mtproto_connection *mtp)
{
return try_write(mtp->connection);
}
int telegram_authenticated (struct telegram *instance)
@ -386,4 +385,21 @@ int telegram_authenticated (struct telegram *instance)
return telegram_get_working_dc (instance)->auth_key_id > 0;
}
void telegram_flush (struct telegram *instance)
{
logprintf ("telegram flush()\n");
int i;
for (i = 0; i < 100; i++) {
struct mtproto_connection *c = instance->Cs[i];
if (!c) continue;
if (!c->connection) continue;
if (c->connection->out_bytes) {
logprintf ("connection %d has %d bytes, triggering on_output.",
i, c->connection->out_bytes);
instance->config->on_output(c->handle);
} else {
logprintf ("connection %d has no bytes, skipping\n");
}
}
}

View file

@ -110,7 +110,7 @@ struct telegram_config {
/**
* Called when there is pending network output
*/
void (*on_output)(struct telegram *instance);
void (*on_output)(void *handle);
/**
* A callback function that delivers a connections to the given hostname
@ -123,7 +123,7 @@ struct telegram_config {
* A callback function that is called once the proxy connection is no longer
* needed. This is useful for freeing all used resources.
*/
void (*proxy_close_cb) (struct telegram *instance, int fd);
void (*proxy_close_cb) (void *handle);
/**
* A callback function that is called when a phone registration is required.
@ -249,6 +249,9 @@ struct telegram {
int get_difference_active;
struct message *ML[MSG_STORE_SIZE];
/*
* All active MtProto connections
*/
int cs;
struct mtproto_connection *Cs[100];
@ -304,27 +307,18 @@ int telegram_login (struct telegram *instance);
/**
* Read and process all available input from the network
*/
void telegram_read_input (struct telegram *instance);
void mtp_read_input (struct mtproto_connection *mtp);
/**
* Write all available output to the network
*/
int telegram_write_output (struct telegram *instance);
/**
* Return whether there is pending output.
*/
int telegram_has_output (struct telegram *instance);
int mtp_write_output (struct mtproto_connection *mtp);
/**
* Try to interpret RPC calls and apply the changes to the current telegram state
*/
void try_rpc_interpret(struct telegram *instance, int op, int len);
/*
* TODO: Refactor all old calls to take a telegrma instance
*/
/**
* Request a registration code
*/
@ -391,15 +385,18 @@ void set_net_read_cb(ssize_t (*cb)(int fd, void *buff, size_t size));
void set_net_write_cb(ssize_t (*cb)(int fd, const void *buff, size_t size));
/**
* Set the proxy-connection to use
* Set the connection after a proxy_request_cb
*
* NOTE: you may only call this function from the
* @param fd The file-descriptor of the acquired connection
* @param handle A handle that will be passed back on output and close callbacks
*/
void telegram_set_proxy(struct telegram *instance, int fd);
struct mtproto_connection *telegram_add_proxy(struct telegram *tg, int fd, void *handle);
/**
* Return wether telegram is authenticated with the currently active data center
*/
int telegram_authenticated (struct telegram *instance);
void telegram_flush (struct telegram *instance);
#endif