diff --git a/mtproto-client.c b/mtproto-client.c index c2a66d8..fd6bf21 100644 --- a/mtproto-client.c +++ b/mtproto-client.c @@ -1062,8 +1062,23 @@ void work_bad_server_salt (struct connection *c UU, long long msg_id UU) { GET_DC(c)->server_salt = new_server_salt; } +void work_pong (struct connection *c UU, long long msg_id UU) { + assert (fetch_int () == CODE_pong); + fetch_long (); // msg_id + fetch_long (); // ping_id +} + +void work_detained_info (struct connection *c UU, long long msg_id UU) { + assert (fetch_int () == CODE_msg_detained_info); + fetch_long (); // msg_id + fetch_long (); // answer_msg_id + fetch_int (); // bytes + fetch_int (); // status +} + void rpc_execute_answer (struct connection *c, long long msg_id UU) { if (verbosity >= 5) { + logprintf ("rpc_execute_answer: fd=%d\n", c->fd); hexdump_in (); } int op = prefetch_int (); @@ -1098,6 +1113,12 @@ void rpc_execute_answer (struct connection *c, long long msg_id UU) { case CODE_bad_server_salt: work_bad_server_salt (c, msg_id); return; + case CODE_pong: + work_pong (c, msg_id); + return; + case CODE_msg_detained_info: + work_detained_info (c, msg_id); + return; } logprintf ( "Unknown message: \n"); hexdump_in (); @@ -1158,6 +1179,7 @@ int process_rpc_message (struct connection *c UU, struct encrypted_message *enc, } assert (c->session->session_id == enc->session_id); rpc_execute_answer (c, enc->msg_id); + assert (in_ptr == in_end); return 0; } diff --git a/mtproto-common.h b/mtproto-common.h index ecb082a..ff4f20a 100644 --- a/mtproto-common.h +++ b/mtproto-common.h @@ -76,6 +76,8 @@ #define CODE_input_peer_notify_settings_old 0x3cf4b1be #define CODE_peer_notify_settings_old 0xddbcd4a5 + +#define CODE_msg_detained_info 0x276d3ec6 /* not really a limit, for struct encrypted_message only */ // #define MAX_MESSAGE_INTS 16384 #define MAX_MESSAGE_INTS 1048576 diff --git a/net.c b/net.c index 6fba56e..7d74d79 100644 --- a/net.c +++ b/net.c @@ -42,6 +42,51 @@ DEFINE_TREE(int,int,int_cmp,0) int verbosity; extern struct connection_methods auth_methods; +void fail_connection (struct connection *c); + +void start_ping_timer (struct connection *c); +int ping_alarm (struct connection *c) { + if (verbosity > 2) { + logprintf ("ping alarm\n"); + } + if (get_double_time () - c->last_receive_time > 20) { + c->state = conn_failed; + fail_connection (c); + } else if (get_double_time () - c->last_receive_time > 5 && c->state == conn_ready) { + int x[3]; + x[0] = CODE_ping; + *(long long *)(x + 1) = lrand48 () * (1ll << 32) + lrand48 (); + encrypt_send_message (c, x, 3, 0); + start_ping_timer (c); + } else { + start_ping_timer (c); + } + return 0; +} + +void stop_ping_timer (struct connection *c) { + remove_event_timer (&c->ev); +} + +void start_ping_timer (struct connection *c) { + c->ev.timeout = get_double_time () + 1; + c->ev.alarm = (void *)ping_alarm; + c->ev.self = c; + insert_event_timer (&c->ev); +} + +void restart_connection (struct connection *c); +int fail_alarm (void *ev) { + restart_connection (ev); + return 0; +} +void start_fail_timer (struct connection *c) { + c->ev.timeout = get_double_time () + 10; + c->ev.alarm = (void *)fail_alarm; + c->ev.self = c; + insert_event_timer (&c->ev); +} + struct connection_buffer *new_connection_buffer (int size) { struct connection_buffer *b = malloc (sizeof (*b)); memset (b, 0, sizeof (*b)); @@ -195,10 +240,11 @@ struct connection *create_connection (const char *host, int port, struct session c->session = session; c->fd = fd; - c->ip = htonl (*(int *)h->h_addr); + c->ip = strdup (host); c->flags = 0; c->state = conn_ready; c->methods = methods; + c->port = port; assert (!Connections[fd]); Connections[fd] = c; if (verbosity) { @@ -207,10 +253,60 @@ struct connection *create_connection (const char *host, int port, struct session if (c->methods->ready) { c->methods->ready (c); } + c->last_receive_time = get_double_time (); + start_ping_timer (c); return c; } +void restart_connection (struct connection *c) { + if (c->last_connect_time == time (0)) { + return; + } + + c->last_connect_time = time (0); + int fd; + assert ((fd = socket (AF_INET, SOCK_STREAM, 0)) != -1); + assert (fd >= 0 && fd < MAX_CONNECTIONS); + if (fd > max_connection_fd) { + max_connection_fd = fd; + } + int flags = -1; + setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof (flags)); + setsockopt (fd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof (flags)); + setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof (flags)); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons (c->port); + addr.sin_addr.s_addr = inet_addr (c->ip); + + + fcntl (fd, F_SETFL, O_NONBLOCK); + + if (connect (fd, (struct sockaddr *) &addr, sizeof (addr)) == -1) { + if (errno != EINPROGRESS) { + logprintf ( "Can not connect to %s:%d %m\n", c->ip, c->port); + start_fail_timer (c); + close (fd); + return; + } + } + + c->fd = fd; + c->state = conn_connecting; + c->last_receive_time = get_double_time (); + start_ping_timer (c); + Connections[fd] = c; + + char byte = 0xef; + assert (write_out (c, &byte, 1) == 1); + flush_out (c); +} + void fail_connection (struct connection *c) { + if (c->state == conn_ready || c->state == conn_connecting) { + stop_ping_timer (c); + } struct connection_buffer *b = c->out_head; while (b) { struct connection_buffer *d = b; @@ -226,6 +322,10 @@ void fail_connection (struct connection *c) { c->out_head = c->out_tail = c->in_head = c->in_tail = 0; c->state = conn_failed; c->out_bytes = c->in_bytes = 0; + close (c->fd); + Connections[c->fd] = 0; + logprintf ("Lost connection to server... \n"); + restart_connection (c); } void try_write (struct connection *c) { @@ -334,6 +434,11 @@ void try_read (struct connection *c) { int x = 0; while (1) { int r = read (c->fd, c->in_tail->wptr, c->in_tail->end - c->in_tail->wptr); + if (r > 0) { + c->last_receive_time = get_double_time (); + stop_ping_timer (c); + start_ping_timer (c); + } if (r >= 0) { c->in_tail->wptr += r; x += r; @@ -364,16 +469,21 @@ void try_read (struct connection *c) { int connections_make_poll_array (struct pollfd *fds, int max) { int _max = max; int i; - for (i = 0; i <= max_connection_fd; i++) if (Connections[i] && Connections[i]->state != conn_failed) { - assert (max > 0); - struct connection *c = Connections[i]; - fds[0].fd = c->fd; - fds[0].events = POLLERR | POLLHUP | POLLRDHUP | POLLIN; - if (c->out_bytes || c->state == conn_connecting) { - fds[0].events |= POLLOUT; + for (i = 0; i <= max_connection_fd; i++) { + if (Connections[i] && Connections[i]->state == conn_failed) { + restart_connection (Connections[i]); + } + if (Connections[i] && Connections[i]->state != conn_failed) { + assert (max > 0); + struct connection *c = Connections[i]; + fds[0].fd = c->fd; + fds[0].events = POLLERR | POLLHUP | POLLRDHUP | POLLIN; + if (c->out_bytes || c->state == conn_connecting) { + fds[0].events |= POLLOUT; + } + fds ++; + max --; } - fds ++; - max --; } if (verbosity >= 10) { logprintf ( "%d connections in poll\n", _max - max); @@ -398,7 +508,9 @@ void connections_poll_result (struct pollfd *fds, int max) { fail_connection (c); } else if (fds[i].revents & POLLOUT) { if (c->state == conn_connecting) { + logprintf ("connection ready\n"); c->state = conn_ready; + c->last_receive_time = get_double_time (); } if (c->out_bytes) { try_write (c); diff --git a/net.h b/net.h index 4e8487a..674ead9 100644 --- a/net.h +++ b/net.h @@ -32,7 +32,7 @@ struct dc; #define ACK_TIMEOUT 60 #define MAX_DC_ID 10 -enum dc_state{ +enum dc_state { st_init, st_reqpq_sent, st_reqdh_sent, @@ -104,7 +104,7 @@ enum conn_state { struct connection { int fd; - int ip; + char *ip; int port; int flags; enum conn_state state; @@ -117,9 +117,12 @@ struct connection { int out_bytes; int packet_num; int out_packet_num; + int last_connect_time; struct connection_methods *methods; struct session *session; void *extra; + struct event_timer ev; + double last_receive_time; }; extern struct connection *Connections[]; diff --git a/queries.c b/queries.c index 719cbd3..872b9d8 100644 --- a/queries.c +++ b/queries.c @@ -1000,8 +1000,15 @@ int user_info_on_answer (struct query *q UU) { printf ("User "); print_user_name (U->id, C); printf (":\n"); - printf ("\t real name: %s %s\n", U->real_first_name, U->real_last_name); - printf ("\t phone: %s\n", U->phone); + printf ("\treal name: %s %s\n", U->real_first_name, U->real_last_name); + printf ("\tphone: %s\n", U->phone); + if (U->status.online > 0) { + printf ("\tonline\n"); + } else { + printf ("\toffline (was online "); + print_date_full (U->status.when); + printf (")\n"); + } pop_color (); print_end (); return 0;