Experimental support for channel history

Fetch missing channel history, keep track of message IDs. Still broken in Adium.
This commit is contained in:
mjentsch 2016-03-23 12:46:04 +01:00
parent a3341cb5cf
commit 004f84b544
8 changed files with 207 additions and 90 deletions

View File

@ -201,7 +201,7 @@ static void update_user_status_handler (struct tgl_state *TLS, struct tgl_user *
static void update_message_handler (struct tgl_state *TLS, struct tgl_message *M) {
write_files_schedule (TLS);
tgp_msg_recv (TLS, M);
tgp_msg_recv (TLS, M, NULL);
}
static void update_secret_chat_typing (struct tgl_state *TLS, struct tgl_secret_chat *E) {

View File

@ -92,6 +92,8 @@
#define TGP_KEY_RESET_AUTH "reset-authorization"
#define TGP_CHANNEL_HISTORY_LIMIT 500
extern const char *pk_path;
extern const char *user_pk_filename;
extern const char *config_dir;

View File

@ -21,12 +21,13 @@
#include "tgp-chat.h"
GHashTable *tgp_chat_info_new (struct tgl_state *TLS, tgl_peer_t *P) {
GHashTable *ht = g_hash_table_new_full (g_str_hash, g_str_equal, NULL, g_free);
g_hash_table_insert (ht, "subject", g_strdup (P->print_name));
g_hash_table_insert (ht, "id", g_strdup_printf ("%d", tgl_get_peer_id (P->id)));
g_hash_table_insert (ht, "type", g_strdup_printf ("%d", tgl_get_peer_type (P->id)));
// libpurple chat components own the keys and the values when created from blist be consistent
GHashTable *ht = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
g_hash_table_insert (ht, g_strdup ("subject"), g_strdup (P->print_name));
g_hash_table_insert (ht, g_strdup ("id"), g_strdup_printf ("%d", tgl_get_peer_id (P->id)));
g_hash_table_insert (ht, g_strdup ("type"), g_strdup_printf ("%d", tgl_get_peer_type (P->id)));
if (tgl_get_peer_type (P->id) == TGL_PEER_CHANNEL) {
g_hash_table_insert (ht, "last_server_id", g_strdup_printf ("%d", 0));
g_hash_table_insert (ht, g_strdup ("last_server_id"), g_strdup_printf ("%d", 0));
}
return ht;
}
@ -46,8 +47,21 @@ tgl_peer_id_t tgp_chat_get_id (PurpleChat *C) {
return tgl_set_peer_id (type, (I && *I) ? atoi (I) : 0);
}
void tgp_chat_blist_store (struct tgl_state *TLS, tgl_peer_t *P, const char *group) {
g_return_if_fail(tgl_get_peer_type (P->id) == TGL_PEER_CHAT || tgl_get_peer_type (P->id) == TGL_PEER_CHANNEL);
void tgp_chat_set_last_server_id (PurpleChat *C, long long id) {
info ("setting channel message server_id=%lld", id);
g_hash_table_replace (purple_chat_get_components (C), g_strdup ("last_server_id"), g_strdup_printf ("%lld", id));
}
long long tgp_chat_get_last_server_id (PurpleChat *C) {
const char *last = g_hash_table_lookup (purple_chat_get_components (C), "last_server_id");
if (last && *last) {
return atoi (last);
}
return 0;
}
PurpleChat *tgp_chat_blist_store (struct tgl_state *TLS, tgl_peer_t *P, const char *group) {
g_return_val_if_fail(tgl_get_peer_type (P->id) == TGL_PEER_CHAT || tgl_get_peer_type (P->id) == TGL_PEER_CHANNEL, NULL);
PurpleChat *PC = tgp_blist_chat_find (TLS, P->id);
if (! (P->flags & TGLCF_LEFT)) {
@ -95,7 +109,7 @@ static void tgp_chat_on_loaded_chat_full_joining (struct tgl_state *TLS, void *_
}
}
static void tgp_chat_on_loaded_channel_full_joining (struct tgl_state *TLS, int success, tgl_peer_t *P, void *extra) {
static void tgp_chat_on_loaded_channel_full_joining (struct tgl_state *TLS, void *extra, int success, tgl_peer_t *P) {
debug ("tgp_chat_on_loaded_channel_full_joining()");
if (! success) {
tgp_notify_on_error_gw (TLS, NULL, success);
@ -149,7 +163,7 @@ static void tgp_chat_add_all_users (struct tgl_state *TLS, PurpleConversation *c
}
purple_conv_chat_add_users (PURPLE_CONV_CHAT(conv), users, NULL, flags, FALSE);
g_list_free_full (users, g_free);
tgp_g_list_free_full (users, g_free);
g_list_free (flags);
}
@ -259,7 +273,7 @@ void tgprpl_chat_join (PurpleConnection *gc, GHashTable *data) {
} else {
g_return_if_fail(tgl_get_peer_type (P->id) == TGL_PEER_CHANNEL);
debug ("joining channel by id %d ...", tgl_get_peer_id (P->id));
tgp_chat_load_channel_members (gc_get_tls (gc), P, tgp_chat_on_loaded_channel_full_joining, NULL);
tgp_channel_load (gc_get_tls (gc), P, tgp_chat_on_loaded_channel_full_joining, NULL);
}
} else {
warning ("Cannot join chat %d, peer not found...", tgl_get_peer_id (P->id));
@ -292,7 +306,7 @@ void tgprpl_chat_join (PurpleConnection *gc, GHashTable *data) {
return;
} else if (tgl_get_peer_type (P->id) == TGL_PEER_CHANNEL) {
debug ("joining channel by subject %s ...", subject);
tgp_chat_load_channel_members (gc_get_tls (gc), P, tgp_chat_on_loaded_channel_full_joining, NULL);
tgp_channel_load (gc_get_tls (gc), P, tgp_chat_on_loaded_channel_full_joining, NULL);
return;
} else {
warning ("Cannot join chat %s, wrong peer type", subject);
@ -377,15 +391,36 @@ void tgp_chat_join_all_pending (struct tgl_state *TLS) {
}
}
static void tgp_channel_load_finish (struct tgl_state *TLS, struct tgp_channel_loading *D, int success) {
GList *cb = D->callbacks;
GList *extra = D->extras;
while (cb) {
if (cb->data) {
((void (*) (struct tgl_state *, void *, int, tgl_peer_t *)) cb->data) (TLS, extra->data, success, D->P);
}
cb = g_list_next(cb);
extra = g_list_next(extra);
}
}
static void tgp_channel_loading_free (struct tgp_channel_loading *D) {
if (D->callbacks) {
g_list_free (D->callbacks);
}
if (D->extras) {
g_list_free (D->extras);
}
free (D);
}
static void tgp_channel_load_admins_done (struct tgl_state *TLS, void *extra, int success, int users_num,
struct tgl_user **users) {
debug ("tgp_channel_load_admins_done()");
struct tgp_channel_members_loading *D = extra;
struct tgp_channel_loading *D = extra;
if (success) {
GHashTable *HT = g_hash_table_new (g_direct_hash, g_direct_equal);
int i;
for (i = 0; i < users_num; i ++) {
g_hash_table_insert (HT, GINT_TO_POINTER(tgl_get_peer_id (users[i]->id)), GINT_TO_POINTER(1));
@ -403,19 +438,19 @@ static void tgp_channel_load_admins_done (struct tgl_state *TLS, void *extra, in
g_hash_table_destroy (HT);
}
D->callback (TLS, success, D->P, D->extra);
free (D);
tgp_channel_load_finish (TLS, D, success);
tgp_channel_loading_free (D);
}
static void tgp_channel_load_members_done (struct tgl_state *TLS, void *extra, int success, int users_num,
static void tgp_channel_get_members_done (struct tgl_state *TLS, void *extra, int success, int users_num,
struct tgl_user **users) {
debug ("tgp_channel_load_members_done()");
struct tgp_channel_members_loading *D = extra;
struct tgp_channel_loading *D = extra;
if (! success) {
D->callback (TLS, FALSE, D->P, NULL);
free (D);
tgp_channel_load_finish (TLS, D, FALSE);
tgp_channel_loading_free (D);
return;
}
@ -433,30 +468,85 @@ static void tgp_channel_load_members_done (struct tgl_state *TLS, void *extra, i
} else {
g_hash_table_insert (tls_get_data (TLS)->channel_members, GINT_TO_POINTER(tgl_get_peer_id (D->P->id)),
D->members);
D->callback (TLS, success, D->P, D->extra);
free (D);
tgp_channel_load_finish (TLS, D, success);
tgp_channel_loading_free (D);
}
}
void tgp_chat_load_channel_members (struct tgl_state *TLS, tgl_peer_t *P,
void (*callback) (struct tgl_state *TLS, int success, tgl_peer_t *P, void *extra), void *extra) {
static gint tgp_channel_find_higher_id (gconstpointer a, gconstpointer b) {
return ((struct tgp_msg_loading *)a)->msg->server_id < GPOINTER_TO_INT(b);
}
static void tgp_channel_get_history_done (struct tgl_state *TLS, void *extra, int success, int size,
struct tgl_message **list) {
struct tgp_channel_loading *D = extra;
if (success) {
if (size > 0 && tgp_chat_get_last_server_id (D->CH) < list[size - 1]->server_id) {
tgp_chat_set_last_server_id (D->CH, list[size - 1]->server_id);
}
GList *where = g_queue_find_custom (tls_get_data (TLS)->new_messages,
GINT_TO_POINTER(tgp_chat_get_last_server_id (D->CH)), tgp_channel_find_higher_id);
int i;
for (i = size - 1; i >= 0; -- i) {
if (list[i]->server_id > tgp_chat_get_last_server_id (D->CH)) {
tgp_msg_recv (TLS, list[i], where);
}
}
// tgp_msg_process_in_ready (TLS);
} else {
// gap in history
g_warn_if_reached();
}
tgl_do_channel_get_members (TLS, D->P->id, purple_account_get_int (tls_get_pa (TLS),
TGP_KEY_CHANNEL_MEMBERS, TGP_DEFAULT_CHANNEL_MEMBERS), 0, 0, tgp_channel_get_members_done, extra);
}
void tgp_channel_load (struct tgl_state *TLS, tgl_peer_t *P,
void (*callback) (struct tgl_state *, void *, int, tgl_peer_t *),
void *extra) {
g_return_if_fail(tgl_get_peer_type (P->id) == TGL_PEER_CHANNEL);
struct tgp_channel_members_loading *D = talloc0 (sizeof(struct tgp_channel_members_loading));
D->P = P;
D->callback = callback;
D->remaining = 2;
D->extra = extra;
tgl_do_channel_get_members (TLS, P->id,
purple_account_get_int (tls_get_pa (TLS), TGP_KEY_CHANNEL_MEMBERS, TGP_DEFAULT_CHANNEL_MEMBERS),
0, 0, tgp_channel_load_members_done, D);
gpointer ID = GINT_TO_POINTER(tgl_get_peer_id (P->id));
if (! g_hash_table_lookup (tls_get_data (TLS)->pending_channels, ID)) {
// FIXME: adium doesn't store chats
PurpleChat *CH = tgp_blist_chat_find (TLS, P->id);
g_return_if_fail(CH != NULL);
struct tgp_channel_loading *D = talloc0 (sizeof(struct tgp_channel_loading));
D->P = P;
D->callbacks = g_list_append (NULL, callback);
D->extras = g_list_append (NULL, extra);
D->remaining = 2;
D->CH = CH;
tgl_do_get_history_range (TLS, P->id, (int) tgp_chat_get_last_server_id (CH), 0,
TGP_CHANNEL_HISTORY_LIMIT, tgp_channel_get_history_done, D);
g_hash_table_replace (tls_get_data (TLS)->pending_channels, ID, D);
} else {
if (! tgp_channel_loaded (TLS, P->id)) {
struct tgp_channel_loading *D = g_hash_table_lookup (tls_get_data (TLS)->pending_channels, ID);
D->callbacks = g_list_append (D->callbacks, callback);
D->extras = g_list_append (D->extras, extra);
} else {
callback (TLS, extra, TRUE, P);
}
}
}
int tgp_channel_loaded (struct tgl_state *TLS, tgl_peer_id_t id) {
return NULL != g_hash_table_lookup (tls_get_data (TLS)->channel_members,
GINT_TO_POINTER(tgl_get_peer_id (id)));
}
static void update_chat (struct tgl_state *TLS, tgl_peer_t *C, unsigned flags, const char *group) {
if (flags & TGL_UPDATE_CREATED) {
tgp_blist_lookup_add (TLS, C->id, C->print_name);
tgp_chat_blist_store (TLS, tgl_peer_get (TLS, C->id), group);
} else {
PurpleChat *PC = tgp_blist_chat_find (TLS, C->id);
if (PC) {

View File

@ -28,18 +28,22 @@ struct tgp_channel_member {
int flags;
};
struct tgp_channel_members_loading {
struct tgp_channel_loading {
tgl_peer_t *P;
GList *members;
void (*callback) (struct tgl_state *TLS, int success, tgl_peer_t *P, void *extra);
void *extra;
GList *callbacks;
GList *extras;
int remaining;
PurpleChat *CH;
};
tgl_peer_id_t tgp_chat_get_id (PurpleChat *C);
int tgp_chat_has_id (PurpleChat *C);
void tgp_chat_blist_store (struct tgl_state *TLS, tgl_peer_t *P, const char *group);
void tgp_chat_set_last_server_id (PurpleChat *C, long long id);
long long tgp_chat_get_last_server_id (PurpleChat *C);
PurpleChat *tgp_chat_blist_store (struct tgl_state *TLS, tgl_peer_t *P, const char *group);
PurpleConversation *tgp_chat_show (struct tgl_state *TLS, tgl_peer_t *P);
int tgprpl_send_chat (PurpleConnection *gc, int id, const char *message, PurpleMessageFlags flags);
@ -51,8 +55,11 @@ void tgprpl_roomlist_cancel (PurpleRoomlist *list);
GHashTable *tgprpl_chat_info_defaults (PurpleConnection *gc, const char *chat_name);
void tgp_chat_join_all_pending (struct tgl_state *TLS);
void tgp_chat_load_channel_members (struct tgl_state *TLS, tgl_peer_t *P,
void (*callback) (struct tgl_state *TLS, int success, tgl_peer_t *P, void *extra), void *extra);
void tgp_channel_load (struct tgl_state *TLS, tgl_peer_t *P,
void (*callback) (struct tgl_state *, void *, int, tgl_peer_t *),
void *extra);
int tgp_channel_loaded (struct tgl_state *TLS, tgl_peer_id_t id);
void update_channel_handler (struct tgl_state *TLS, struct tgl_channel *C, unsigned flags);
void update_chat_handler (struct tgl_state *TLS, struct tgl_chat *C, unsigned flags);

113
tgp-msg.c
View File

@ -15,7 +15,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
Copyright Matthias Jentsch 2014-2015
Copyright Matthias Jentsch 2014-2016
*/
#include "telegram-purple.h"
@ -699,21 +699,14 @@ static void tgp_msg_on_loaded_chat_full (struct tgl_state *TLS, void *extra, int
tgp_msg_process_in_ready (TLS);
}
static void tgp_msg_on_loaded_channel_members (struct tgl_state *TLS, int success, tgl_peer_t *P, void *extra) {
debug ("tgp_msg_on_loaded_channel_members()");
if (! success) {
// user names won't be available in the channel
g_warn_if_reached();
}
static void tgp_msg_on_loaded_channel_history (struct tgl_state *TLS, void *extra, int success, tgl_peer_t *P) {
struct tgp_msg_loading *C = extra;
-- C->pending;
tgp_msg_process_in_ready (TLS);
}
/*
static void tgp_msg_on_loaded_user_full (struct tgl_state *TLS, void *extra, int success, struct tgl_user *U) {
debug ("tgp_msg_on_loaded_user_full()");
@ -724,14 +717,24 @@ static void tgp_msg_on_loaded_user_full (struct tgl_state *TLS, void *extra, int
}
*/
void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M) {
connection_data *conn = TLS->ev_base;
/*
Libpurple message history is immutable and cannot be changed after printing a message.
TGP currently keeps the first-in first-out queue *new_messages* to ensure that
the messages are being printed in the correct order. When its necessary to fetch
additional info (like attached pictures) before this can be done, the queue will hold
all newer messages until the old message was completely loaded.
*/
void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M, GList *before) {
debug ("tgp_msg_recv before=%p server_id=%lld", before, M->server_id);
if (M->flags & (TGLMF_EMPTY | TGLMF_DELETED)) {
return;
}
if (!(M->flags & TGLMF_CREATED)) {
return;
}
if (!(M->flags | TGLMF_UNREAD) && M->date != 0 && M->date < tgp_msg_oldest_relevant_ts (TLS)) {
debug ("Message from %d on %d too old, ignored.", tgl_get_peer_id (M->from_id), M->date);
return;
@ -739,6 +742,36 @@ void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M) {
struct tgp_msg_loading *C = tgp_msg_loading_init (M);
/*
For non-channels telegram ensures that tgp receives the messages in the correct order, but in channels
there may be holes that need to be filled before the message log can be printed. This means that the
queue may not be processed till all historic messages have been fetched and the messages have been
inserted into the correct position of the queue
*/
if (tgl_get_peer_type (C->msg->from_id) == TGL_PEER_CHANNEL
|| tgl_get_peer_type (C->msg->to_id) == TGL_PEER_CHANNEL) {
tgl_peer_id_t id = tgl_get_peer_type (C->msg->from_id) == TGL_PEER_CHANNEL ?
C->msg->from_id : C->msg->to_id;
if (! tgp_channel_loaded (TLS, id)) {
++ C->pending;
tgp_channel_load (TLS, tgl_peer_get (TLS, id), tgp_msg_on_loaded_channel_history, C);
}
PurpleChat *CH = tgp_blist_chat_find (TLS, id);
if (CH) {
if (tgp_chat_get_last_server_id (CH) >= C->msg->server_id) {
info ("dropping duplicate channel messages server_id=%lld", C->msg->server_id);
return;
}
if (tgp_chat_get_last_server_id (CH) == C->msg->server_id - 1) {
tgp_chat_set_last_server_id (CH, C->msg->server_id);
}
}
}
if (! (M->flags & TGLMF_SERVICE)) {
// handle all messages that need to load content before they can be displayed
@ -746,8 +779,8 @@ void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M) {
switch (M->media.type) {
case tgl_message_media_photo: {
// include the "bad photo" check from telegram-cli interface.c:3287 to avoid crashes when fetching history
// TODO: find out the reason for this behavior
// include the "bad photo" check from telegram-cli interface.c:3287 to avoid crashes
// when fetching history. TODO: find out the reason for this behavior
if (M->media.photo) {
++ C->pending;
tgl_do_load_photo (TLS, M->media.photo, tgp_msg_on_loaded_document, C);
@ -783,27 +816,15 @@ void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M) {
}
/*
// for forwarded messages assure that the forwarded user is always loaded
if (tgl_get_peer_id (M->fwd_from_id) != TGL_PEER_UNKNOWN) {
tgl_peer_t *FP = tgl_peer_get (TLS , M->fwd_from_id);
if (! FP) {
++ C->pending;
debug ("type=%d, id=%d, hash=%lld", M->fwd_from_id.peer_type, M->fwd_from_id.peer_id, M->fwd_from_id.access_hash);
// FIXME: fwd_from_id.access_hash is always 0, submit fix to libtgl
tgl_do_get_user_info (TLS, M->fwd_from_id, FALSE, tgp_msg_on_loaded_user_full, C);
}
}
To display a chat, the full name of every single user is needed, but the updates received from the server only
contain the names of users mentioned in the events. In order to display a messages we always need to fetch the
full chat info first. If the user list is empty, this means that we still haven't fetched the full chat
information. Assure that there is only one chat info request for every chat to avoid causing FLOOD_WAIT_X
errors that will lead to delays or dropped messages
*/
// To display a chat the full name of every single user is needed, but the updates received from the server only
// contain the names of users mentioned in the events. In order to display a messages we always need to fetch the
// full chat info first. If the user list is empty, this means that we still haven't fetched the full chat information.
// assure that there is only one chat info request for every
// chat to avoid causing FLOOD_WAIT_X errors that will lead to delays or dropped messages
gpointer to_ptr = GINT_TO_POINTER(tgl_get_peer_id (M->to_id));
if (! g_hash_table_lookup (conn->pending_chat_info, to_ptr)) {
if (! g_hash_table_lookup (tls_get_data (TLS)->pending_chat_info, to_ptr)) {
if (tgl_get_peer_type (M->to_id) == TGL_PEER_CHAT) {
tgl_peer_t *P = tgl_peer_get (TLS, M->to_id);
@ -813,25 +834,19 @@ void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M) {
++ C->pending;
tgl_do_get_chat_info (TLS, M->to_id, FALSE, tgp_msg_on_loaded_chat_full, C);
g_hash_table_replace (conn->pending_chat_info, to_ptr, to_ptr);
}
}
if (tgl_get_peer_type (M->to_id) == TGL_PEER_CHANNEL) {
tgl_peer_t *P = tgl_peer_get (TLS, M->to_id);
g_warn_if_fail(P);
// FIXME: check if the types are actually valid
if (P && ((P->channel.flags & (TGLCHF_ADMIN | TGLCHF_CREATOR)) || (P->channel.flags & TGLCHF_MEGAGROUP))) {
++ C->pending;
tgp_chat_load_channel_members (TLS, P, tgp_msg_on_loaded_channel_members, C);
g_hash_table_replace (conn->pending_chat_info, to_ptr, to_ptr);
g_hash_table_replace (tls_get_data (TLS)->pending_chat_info, to_ptr, to_ptr);
}
}
}
g_queue_push_tail (conn->new_messages, C);
GList *b = g_queue_find (tls_get_data (TLS)->new_messages, before);
if (b) {
struct tgp_msg_loading *M = before->data;
debug ("inserting before server_id=%lld", M->msg->server_id);
g_queue_insert_before (tls_get_data (TLS)->new_messages, b, C);
} else {
g_queue_push_tail (tls_get_data (TLS)->new_messages, C);
}
tgp_msg_process_in_ready (TLS);
}

View File

@ -27,7 +27,7 @@
* Loads embedded ressources like pictures or document thumbnails and ensures that
* that all messages are still displayed in the original incoming order.
*/
void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M);
void tgp_msg_recv (struct tgl_state *TLS, struct tgl_message *M, GList *before);
/**
* Process a message and send it the peer

View File

@ -102,6 +102,7 @@ connection_data *connection_data_init (struct tgl_state *TLS, PurpleConnection *
conn->out_messages = g_queue_new ();
conn->pending_reads = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, g_free);
conn->pending_chat_info = g_hash_table_new (g_direct_hash, g_direct_equal);
conn->pending_channels = g_hash_table_new (g_direct_hash, g_direct_equal);
conn->id_to_purple_name = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, g_free);
conn->purple_name_to_id = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free);
conn->channel_members = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, (void (*) (gpointer)) g_list_free);
@ -120,6 +121,7 @@ void *connection_data_free (connection_data *conn) {
tgp_g_list_free_full (conn->pending_joins, g_free);
g_hash_table_destroy (conn->pending_reads);
g_hash_table_destroy (conn->pending_chat_info);
g_hash_table_destroy (conn->pending_channels);
g_hash_table_destroy (conn->id_to_purple_name);
g_hash_table_destroy (conn->purple_name_to_id);
g_hash_table_destroy (conn->channel_members);

View File

@ -44,6 +44,7 @@ typedef struct {
int login_retries;
PurpleRoomlist *roomlist;
GHashTable *pending_chat_info;
GHashTable *pending_channels;
GHashTable *id_to_purple_name;
GHashTable *purple_name_to_id;
GHashTable *channel_members;