diff --git a/lib/client-handshake.c b/lib/client-handshake.c index 70b6c816..e3a6efc1 100644 --- a/lib/client-handshake.c +++ b/lib/client-handshake.c @@ -50,11 +50,10 @@ libwebsocket_client_close(struct libwebsocket *wsi) clients = wsi->protocol->owning_server; if (clients) for (n = 0; n < clients->fds_count; n++) { - if (clients->wsi[n] != wsi) + if (clients->fds[n].fd != wsi->sock) continue; while (n < clients->fds_count - 1) { clients->fds[n] = clients->fds[n + 1]; - clients->wsi[n] = clients->wsi[n + 1]; n++; } /* we only have to deal with one */ @@ -150,8 +149,6 @@ libwebsocket_client_connect(struct libwebsocket_context *this, return NULL; } - this->wsi[this->fds_count] = wsi; - /* -1 means just use latest supported */ if (ietf_version_or_minus_one == -1) @@ -225,6 +222,7 @@ libwebsocket_client_connect(struct libwebsocket_context *this, goto bail1; } + insert_wsi(this, wsi); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index d4bfe75d..586d603a 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -21,6 +21,60 @@ #include "private-libwebsockets.h" +/* file descriptor hash management */ + +struct libwebsocket * +wsi_from_fd(struct libwebsocket_context *this, int fd) +{ + int h = LWS_FD_HASH(fd); + int n = 0; + + for (n = 0; n < this->fd_hashtable[h].length; n++) + if (this->fd_hashtable[h].wsi[n]->sock == fd) + return this->fd_hashtable[h].wsi[n]; + + return NULL; +} + +int +insert_wsi(struct libwebsocket_context *this, struct libwebsocket *wsi) +{ + int h = LWS_FD_HASH(wsi->sock); + + if (this->fd_hashtable[h].length == MAX_CLIENTS - 1) { + fprintf(stderr, "hash table overflow\n"); + return 1; + } + + this->fd_hashtable[h].wsi[this->fd_hashtable[h].length++] = wsi; + + return 0; +} + +int +delete_from_fd(struct libwebsocket_context *this, int fd) +{ + int h = LWS_FD_HASH(fd); + int n = 0; + + for (n = 0; n < this->fd_hashtable[h].length; n++) + if (this->fd_hashtable[h].wsi[n]->sock == fd) { + while (n < this->fd_hashtable[h].length) { + this->fd_hashtable[h].wsi[n] = + this->fd_hashtable[h].wsi[n + 1]; + n++; + } + this->fd_hashtable[h].length--; + + return 0; + } + + fprintf(stderr, "Failed to find fd %d requested for " + "delete in hashtable\n", fd); + return 1; +} + + void libwebsocket_close_and_free_session(struct libwebsocket *wsi) { @@ -80,73 +134,239 @@ libwebsocket_close_and_free_session(struct libwebsocket *wsi) } static int -libwebsocket_poll_connections(struct libwebsocket_context *this) +libwebsocket_service_fd(struct libwebsocket_context *this, + struct pollfd *pollfd) { unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MAX_BROADCAST_PAYLOAD + LWS_SEND_BUFFER_POST_PADDING]; - int client = this->count_protocols + 1; - struct libwebsocket *wsi; + struct libwebsocket *wsi = wsi_from_fd(this, pollfd->fd); + struct libwebsocket *new_wsi; int n; + int m; size_t len; + int accept_fd; + unsigned int clilen; + struct sockaddr_in cli_addr; - /* check for activity on client sockets */ + if (wsi == NULL) + return 1; - for (; client < this->fds_count; client++) { + switch (wsi->mode) { + case LWS_CONNMODE_SERVER_LISTENER: + + /* pollin means a client has connected to us then */ + + if (!pollfd->revents & POLLIN) + break; + + /* listen socket got an unencrypted connection... */ + + clilen = sizeof(cli_addr); + accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr, + &clilen); + if (accept_fd < 0) { + fprintf(stderr, "ERROR on accept"); + break; + } + + if (this->fds_count >= MAX_CLIENTS) { + fprintf(stderr, "too busy"); + close(accept_fd); + break; + } + + /* accepting connection to main listener */ + + new_wsi = malloc(sizeof(struct libwebsocket)); + if (new_wsi == NULL) { + fprintf(stderr, "Out of memory for new connection\n"); + break; + } + + memset(new_wsi, 0, sizeof (struct libwebsocket)); + new_wsi->sock = accept_fd; + +#ifdef LWS_OPENSSL_SUPPORT + new_wsi->ssl = NULL; + this->ssl_ctx = NULL; + + if (this->use_ssl) { + + new_wsi->ssl = SSL_new(this->ssl_ctx); + if (new_wsi->ssl == NULL) { + fprintf(stderr, "SSL_new failed: %s\n", + ERR_error_string(SSL_get_error( + new_wsi->ssl, 0), NULL)); + free(new_wsi); + break; + } + + SSL_set_fd(new_wsi->ssl, accept_fd); + + n = SSL_accept(new_wsi->ssl); + if (n != 1) { + /* + * browsers seem to probe with various + * ssl params which fail then retry + * and succeed + */ + debug("SSL_accept failed skt %u: %s\n", + pollfd->fd, + ERR_error_string(SSL_get_error( + new_wsi->ssl, n), NULL)); + SSL_free( + new_wsi->ssl); + free(new_wsi); + break; + } + debug("accepted new SSL conn " + "port %u on fd=%d SSL ver %s\n", + ntohs(cli_addr.sin_port), accept_fd, + SSL_get_version(new_wsi->ssl)); + + } else +#endif + debug("accepted new conn port %u on fd=%d\n", + ntohs(cli_addr.sin_port), accept_fd); + + /* intialize the instance struct */ + + new_wsi->state = WSI_STATE_HTTP; + new_wsi->name_buffer_pos = 0; + new_wsi->mode = LWS_CONNMODE_WS_SERVING; + + for (n = 0; n < WSI_TOKEN_COUNT; n++) { + new_wsi->utf8_token[n].token = NULL; + new_wsi->utf8_token[n].token_len = 0; + } + + /* + * these can only be set once the protocol is known + * we set an unestablished connection's protocol pointer + * to the start of the supported list, so it can look + * for matching ones during the handshake + */ + new_wsi->protocol = this->protocols; + new_wsi->user_space = NULL; + + /* + * Default protocol is 76 / 00 + * After 76, there's a header specified to inform which + * draft the client wants, when that's seen we modify + * the individual connection's spec revision accordingly + */ + new_wsi->ietf_spec_revision = 0; + + insert_wsi(this, new_wsi); + + + /* + * make sure NO events are seen yet on this new socket + * (otherwise we inherit old fds[client].revents from + * previous socket there and die mysteriously! ) + */ + this->fds[this->fds_count].revents = 0; + + this->fds[this->fds_count].events = POLLIN; + this->fds[this->fds_count++].fd = accept_fd; + + break; + + case LWS_CONNMODE_BROADCAST_PROXY_LISTENER: + + /* as we are listening, POLLIN means accept() is needed */ + + if (!pollfd->revents & POLLIN) + break; + + /* listen socket got an unencrypted connection... */ + + clilen = sizeof(cli_addr); + accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr, + &clilen); + if (accept_fd < 0) { + fprintf(stderr, "ERROR on accept"); + break; + } + + if (this->fds_count >= MAX_CLIENTS) { + fprintf(stderr, "too busy"); + close(accept_fd); + break; + } + + /* create a dummy wsi for the connection and add it */ + + new_wsi = malloc(sizeof(struct libwebsocket)); + memset(new_wsi, 0, sizeof (struct libwebsocket)); + new_wsi->sock = accept_fd; + new_wsi->mode = LWS_CONNMODE_BROADCAST_PROXY; + new_wsi->state = WSI_STATE_ESTABLISHED; + /* note which protocol we are proxying */ + new_wsi->protocol_index_for_broadcast_proxy = + wsi->protocol_index_for_broadcast_proxy; + insert_wsi(this, new_wsi); + + /* add connected socket to internal poll array */ + + this->fds[this->fds_count].revents = 0; + this->fds[this->fds_count].events = POLLIN; + this->fds[this->fds_count++].fd = accept_fd; + + break; + + case LWS_CONNMODE_BROADCAST_PROXY: /* handle session socket closed */ - if (this->fds[client].revents & (POLLERR | POLLHUP)) { + if (pollfd->revents & (POLLERR | POLLHUP)) { - debug("Session Socket %d %p (fd=%d) dead\n", - client, (void *)this->wsi[client], - this->fds[client].fd); + debug("Session Socket %p (fd=%d) dead\n", + (void *)wsi, accept_fd); - libwebsocket_close_and_free_session(this->wsi[client]); + libwebsocket_close_and_free_session(wsi); goto nuke_this; } /* the guy requested a callback when it was OK to write */ - if ((unsigned long)this->wsi[client] > LWS_MAX_PROTOCOLS && - this->fds[client].revents & POLLOUT) { + if (pollfd->revents & POLLOUT) { - this->fds[client].events &= ~POLLOUT; + /* one shot */ - this->wsi[client]->protocol->callback(this->wsi[client], + pollfd->events &= ~POLLOUT; + + wsi->protocol->callback(wsi, LWS_CALLBACK_CLIENT_WRITEABLE, - this->wsi[client]->user_space, + wsi->user_space, NULL, 0); } /* any incoming data ready? */ - if (!(this->fds[client].revents & POLLIN)) - continue; + if (!(pollfd->revents & POLLIN)) + break; - /* broadcast? */ + /* get the issued broadcast payload from the socket */ - if ((unsigned long)this->wsi[client] < LWS_MAX_PROTOCOLS) { + len = read(pollfd->fd, buf + LWS_SEND_BUFFER_PRE_PADDING, + MAX_BROADCAST_PAYLOAD); + if (len < 0) { + fprintf(stderr, "Error reading broadcast payload\n"); + break;; + } - /* get the issued broadcast payload from the socket */ + /* broadcast it to all guys with this protocol index */ - len = read(this->fds[client].fd, - buf + LWS_SEND_BUFFER_PRE_PADDING, - MAX_BROADCAST_PAYLOAD); + for (n = 0; n < FD_HASHTABLE_MODULUS; n++) { - if (len < 0) { - fprintf(stderr, - "Error reading broadcast payload\n"); - continue; - } + for (m = 0; m < this->fd_hashtable[n].length; m++) { - /* broadcast it to all guys with this protocol index */ + new_wsi = this->fd_hashtable[n].wsi[m]; - for (n = this->count_protocols + 1; - n < this->fds_count; n++) { + /* only to clients we are serving to */ - wsi = this->wsi[n]; - - if ((unsigned long)wsi < LWS_MAX_PROTOCOLS) + if (new_wsi->mode != LWS_CONNMODE_WS_SERVING) continue; /* @@ -154,12 +374,7 @@ libwebsocket_poll_connections(struct libwebsocket_context *this) * connection */ - if (wsi->state != WSI_STATE_ESTABLISHED) - continue; - - /* only to clients connected to us */ - - if (wsi->mode != LWS_CONNMODE_WS_SERVING) + if (new_wsi->state != WSI_STATE_ESTABLISHED) continue; /* @@ -167,42 +382,72 @@ libwebsocket_poll_connections(struct libwebsocket_context *this) * the requested protocol */ - if (wsi->protocol->protocol_index != - (int)(unsigned long)this->wsi[client]) + if (new_wsi->protocol->protocol_index != + wsi->protocol_index_for_broadcast_proxy) continue; /* broadcast it to this connection */ - wsi->protocol->callback(wsi, + new_wsi->protocol->callback(new_wsi, LWS_CALLBACK_BROADCAST, - wsi->user_space, + new_wsi->user_space, buf + LWS_SEND_BUFFER_PRE_PADDING, len); } + } + break; - continue; + case LWS_CONNMODE_WS_SERVING: + case LWS_CONNMODE_WS_CLIENT: + + /* handle session socket closed */ + + if (pollfd->revents & (POLLERR | POLLHUP)) { + + debug("Session Socket %p (fd=%d) dead\n", + (void *)wsi, pollfd->fd); + + libwebsocket_close_and_free_session(wsi); + goto nuke_this; } + /* the guy requested a callback when it was OK to write */ + + if (pollfd->revents & POLLOUT) { + + pollfd->events &= ~POLLOUT; + + wsi->protocol->callback(wsi, + LWS_CALLBACK_CLIENT_WRITEABLE, + wsi->user_space, + NULL, 0); + } + + /* any incoming data ready? */ + + if (!(pollfd->revents & POLLIN)) + break; + #ifdef LWS_OPENSSL_SUPPORT - if (this->wsi[client]->ssl) - n = SSL_read(this->wsi[client]->ssl, buf, sizeof buf); + if (wsi->ssl) + n = SSL_read(wsi->ssl, buf, sizeof buf); else #endif - n = recv(this->fds[client].fd, buf, sizeof buf, 0); + n = recv(pollfd->fd, buf, sizeof buf, 0); if (n < 0) { fprintf(stderr, "Socket read returned %d\n", n); - continue; + break;; } if (!n) { - libwebsocket_close_and_free_session(this->wsi[client]); + libwebsocket_close_and_free_session(wsi); goto nuke_this; } /* service incoming data */ - n = libwebsocket_read(this->wsi[client], buf, n); + n = libwebsocket_read(wsi, buf, n); if (n >= 0) - continue; + break;; /* * it closed and nuked wsi[client], so remove the * socket handle and wsi from our service list @@ -210,21 +455,27 @@ libwebsocket_poll_connections(struct libwebsocket_context *this) nuke_this: debug("nuking wsi %p, fsd_count = %d\n", - (void *)this->wsi[client], this->fds_count - 1); + (void *)wsi, this->fds_count - 1); + + delete_from_fd(this, pollfd->fd); this->fds_count--; - for (n = client; n < this->fds_count; n++) { - this->fds[n] = this->fds[n + 1]; - this->wsi[n] = this->wsi[n + 1]; - } - - return 0; + for (n = 0; n < this->fds_count; n++) + if (this->fds[n].fd == pollfd->fd) { + while (n < this->fds_count) { + this->fds[n] = this->fds[n + 1]; + n++; + } + n = this->fds_count; + } + break; } return 0; } + /** * libwebsocket_context_destroy() - Destroy the websocket context * @this: Websocket context @@ -236,18 +487,28 @@ nuke_this: void libwebsocket_context_destroy(struct libwebsocket_context *this) { - int client; + int n; + int m; + struct libwebsocket *wsi; - /* close listening skt and per-protocol broadcast sockets */ - for (client = this->count_protocols + 1; client < this->fds_count; client++) - switch (this->wsi[client]->mode) { - case LWS_CONNMODE_WS_SERVING: - libwebsocket_close_and_free_session(this->wsi[client]); - break; - case LWS_CONNMODE_WS_CLIENT: - libwebsocket_client_close(this->wsi[client]); - break; + for (n = 0; n < FD_HASHTABLE_MODULUS; n++) { + + for (m = 0; m < this->fd_hashtable[n].length; m++) { + + wsi = this->fd_hashtable[n].wsi[m]; + + switch (wsi->mode) { + case LWS_CONNMODE_WS_SERVING: + libwebsocket_close_and_free_session(wsi); + break; + case LWS_CONNMODE_WS_CLIENT: + libwebsocket_client_close(wsi); + break; + default: + break; + } } + } close(this->fd_random); @@ -301,182 +562,32 @@ int libwebsocket_service(struct libwebsocket_context *this, int timeout_ms) { int n; - int client; - unsigned int clilen; - struct sockaddr_in cli_addr; - int fd; /* stay dead once we are dead */ if (this == NULL) return 1; - /* don't check listen socket if we are not listening */ - - if (this->listen_port) - n = poll(this->fds, this->fds_count, timeout_ms); - else - n = poll(&this->fds[1], this->fds_count - 1, timeout_ms); + /* wait for something to need service */ + n = poll(this->fds, this->fds_count, timeout_ms); if (n < 0 || this->fds[0].revents & (POLLERR | POLLHUP)) { /* fprintf(stderr, "Listen Socket dead\n"); */ - goto fatal; + return 1; } if (n == 0) /* poll timeout */ return 0; /* handle accept on listening socket? */ - for (client = 0; client < this->count_protocols + 1; client++) { - - if (!this->fds[client].revents & POLLIN) - continue; - - /* listen socket got an unencrypted connection... */ - - clilen = sizeof(cli_addr); - fd = accept(this->fds[client].fd, - (struct sockaddr *)&cli_addr, &clilen); - if (fd < 0) { - fprintf(stderr, "ERROR on accept"); - continue; - } - - if (this->fds_count >= MAX_CLIENTS) { - fprintf(stderr, "too busy"); - close(fd); - continue; - } - - if (client) { - /* - * accepting a connection to broadcast socket - * set wsi to be protocol index not pointer - */ - - this->wsi[this->fds_count] = - (struct libwebsocket *)(long)(client - 1); - - goto fill_in_fds; - } - - /* accepting connection to main listener */ - - this->wsi[this->fds_count] = - malloc(sizeof(struct libwebsocket)); - if (!this->wsi[this->fds_count]) { - fprintf(stderr, "Out of memory for new connection\n"); - continue; - } - -#ifdef LWS_OPENSSL_SUPPORT - this->wsi[this->fds_count]->ssl = NULL; - this->ssl_ctx = NULL; - - if (this->use_ssl) { - - this->wsi[this->fds_count]->ssl = - SSL_new(this->ssl_ctx); - if (this->wsi[this->fds_count]->ssl == NULL) { - fprintf(stderr, "SSL_new failed: %s\n", - ERR_error_string(SSL_get_error( - this->wsi[this->fds_count]->ssl, 0), - NULL)); - free(this->wsi[this->fds_count]); - continue; - } - - SSL_set_fd(this->wsi[this->fds_count]->ssl, fd); - - n = SSL_accept(this->wsi[this->fds_count]->ssl); - if (n != 1) { - /* - * browsers seem to probe with various - * ssl params which fail then retry - * and succeed - */ - debug("SSL_accept failed skt %u: %s\n", - fd, - ERR_error_string(SSL_get_error( - this->wsi[this->fds_count]->ssl, - n), NULL)); - SSL_free( - this->wsi[this->fds_count]->ssl); - free(this->wsi[this->fds_count]); - continue; - } - debug("accepted new SSL conn " - "port %u on fd=%d SSL ver %s\n", - ntohs(cli_addr.sin_port), fd, - SSL_get_version(this->wsi[ - this->fds_count]->ssl)); - - } else -#endif - debug("accepted new conn port %u on fd=%d\n", - ntohs(cli_addr.sin_port), fd); - - /* intialize the instance struct */ - - this->wsi[this->fds_count]->sock = fd; - this->wsi[this->fds_count]->state = WSI_STATE_HTTP; - this->wsi[this->fds_count]->name_buffer_pos = 0; - this->wsi[this->fds_count]->mode = LWS_CONNMODE_WS_SERVING; - - for (n = 0; n < WSI_TOKEN_COUNT; n++) { - this->wsi[this->fds_count]-> - utf8_token[n].token = NULL; - this->wsi[this->fds_count]-> - utf8_token[n].token_len = 0; - } - - /* - * these can only be set once the protocol is known - * we set an unestablished connection's protocol pointer - * to the start of the supported list, so it can look - * for matching ones during the handshake - */ - this->wsi[this->fds_count]->protocol = this->protocols; - this->wsi[this->fds_count]->user_space = NULL; - - /* - * Default protocol is 76 / 00 - * After 76, there's a header specified to inform which - * draft the client wants, when that's seen we modify - * the individual connection's spec revision accordingly - */ - this->wsi[this->fds_count]->ietf_spec_revision = 0; - -fill_in_fds: - - /* - * make sure NO events are seen yet on this new socket - * (otherwise we inherit old fds[client].revents from - * previous socket there and die mysteriously! ) - */ - this->fds[this->fds_count].revents = 0; - - this->fds[this->fds_count].events = POLLIN; - this->fds[this->fds_count++].fd = fd; - - } - - /* service anything incoming on websocket connection */ - - libwebsocket_poll_connections(this); - - /* this round is done */ + for (n = 0; n < this->fds_count; n++) + if (this->fds[n].revents) + libwebsocket_service_fd(this, &this->fds[n]); return 0; - -fatal: - - /* inform caller we are dead */ - - return 1; } /** @@ -484,6 +595,10 @@ fatal: * becomes able to be written to without * blocking * + * This only works for internal poll() management, (ie, calling the libwebsocket + * service loop, you will have to make your own arrangements if your poll() + * loop is managed externally. + * * @wsi: Websocket connection instance to get callback for */ @@ -493,8 +608,8 @@ libwebsocket_callback_on_writable(struct libwebsocket *wsi) struct libwebsocket_context *this = wsi->protocol->owning_server; int n; - for (n = this->count_protocols + 1; n < this->fds_count; n++) - if (this->wsi[n] == wsi) { + for (n = 0; n < this->fds_count; n++) + if (this->fds[n].fd == wsi->sock) { this->fds[n].events |= POLLOUT; return 0; } @@ -510,6 +625,10 @@ libwebsocket_callback_on_writable(struct libwebsocket *wsi) * becomes possible to write to each socket without * blocking in turn. * + * This only works for internal poll() management, (ie, calling the libwebsocket + * service loop, you will have to make your own arrangements if your poll() + * loop is managed externally. + * * @protocol: Protocol whose connections will get callbacks */ @@ -519,11 +638,19 @@ libwebsocket_callback_on_writable_all_protocol( { struct libwebsocket_context *this = protocol->owning_server; int n; + int m; + struct libwebsocket *wsi; - for (n = this->count_protocols + 1; n < this->fds_count; n++) - if ((unsigned long)this->wsi[n] > LWS_MAX_PROTOCOLS) - if (this->wsi[n]->protocol == protocol) - this->fds[n].events |= POLLOUT; + for (n = 0; n < FD_HASHTABLE_MODULUS; n++) { + + for (m = 0; m < this->fd_hashtable[n].length; m++) { + + wsi = this->fd_hashtable[n].wsi[m]; + + if (wsi->protocol == protocol) + libwebsocket_callback_on_writable(wsi); + } + } return 0; } @@ -550,6 +677,10 @@ libwebsocket_get_socket_fd(struct libwebsocket *wsi) * If the output side of a server process becomes choked, this allows flow * control for the input side. * + * This only works for internal poll() management, (ie, calling the libwebsocket + * service loop, you will have to make your own arrangements if your poll() + * loop is managed externally. + * * @wsi: Websocket connection instance to get callback for * @enable: 0 = disable read servicing for this connection, 1 = enable */ @@ -560,8 +691,8 @@ libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable) struct libwebsocket_context *this = wsi->protocol->owning_server; int n; - for (n = this->count_protocols + 1; n < this->fds_count; n++) - if (this->wsi[n] == wsi) { + for (n = 0; n < this->fds_count; n++) + if (this->fds[n].fd == wsi->sock) { if (enable) this->fds[n].events |= POLLIN; else @@ -598,6 +729,7 @@ static void sigpipe_handler(int x) } + /** * libwebsocket_create_context() - Create the websocket handler * @port: Port to listen on... you can use 0 to suppress listening on @@ -659,6 +791,7 @@ libwebsocket_create_context(int port, char *p; char hostname[1024]; struct hostent *he; + struct libwebsocket *wsi; #ifdef LWS_OPENSSL_SUPPORT SSL_METHOD *method; @@ -675,6 +808,7 @@ libwebsocket_create_context(int port, this->http_proxy_port = 0; this->http_proxy_address[0] = '\0'; this->options = options; + this->fds_count = 0; this->fd_random = open(SYSTEM_RANDOM_FILEPATH, O_RDONLY); if (this->fd_random < 0) { @@ -834,6 +968,11 @@ libwebsocket_create_context(int port, if (lws_b64_selftest()) return NULL; + /* fd hashtable init */ + + for (n = 0; n < FD_HASHTABLE_MODULUS; n++) + this->fd_hashtable[n].length = 0; + /* set up our external listening socket we serve on */ if (port) { @@ -859,6 +998,20 @@ libwebsocket_create_context(int port, port, n, errno); return NULL; } + + wsi = malloc(sizeof(struct libwebsocket)); + memset(wsi, 0, sizeof (struct libwebsocket)); + wsi->sock = sockfd; + wsi->mode = LWS_CONNMODE_SERVER_LISTENER; + insert_wsi(this, wsi); + + listen(sockfd, 5); + fprintf(stderr, " Listening on port %d\n", port); + + /* list in the internal poll array */ + + this->fds[this->fds_count].fd = sockfd; + this->fds[this->fds_count++].events = POLLIN; } /* drop any root privs for this process */ @@ -870,27 +1023,11 @@ libwebsocket_create_context(int port, if (setuid(uid)) fprintf(stderr, "setuid: %s\n", strerror(errno)); - /* - * prepare the poll() fd array... it's like this - * - * [0] = external listening socket - * [1 .. this->count_protocols] = per-protocol broadcast sockets - * [this->count_protocols + 1 ... this->fds_count-1] = connection skts - */ - - this->fds_count = 1; - this->fds[0].fd = sockfd; - this->fds[0].events = POLLIN; - this->count_protocols = 0; - - if (port) { - listen(sockfd, 5); - fprintf(stderr, " Listening on port %d\n", port); - } /* set up our internal broadcast trigger sockets per-protocol */ - for (; protocols[this->count_protocols].callback; + for (this->count_protocols = 0; + protocols[this->count_protocols].callback; this->count_protocols++) { protocols[this->count_protocols].owning_server = this; protocols[this->count_protocols].protocol_index = @@ -931,10 +1068,20 @@ libwebsocket_create_context(int port, protocols[this->count_protocols].name, ntohs(cli_addr.sin_port)); + /* dummy wsi per broadcast proxy socket */ + + wsi = malloc(sizeof(struct libwebsocket)); + memset(wsi, 0, sizeof (struct libwebsocket)); + wsi->sock = fd; + wsi->mode = LWS_CONNMODE_BROADCAST_PROXY_LISTENER; + /* note which protocol we are proxying */ + wsi->protocol_index_for_broadcast_proxy = this->count_protocols; + insert_wsi(this, wsi); + + /* list in internal poll array */ + this->fds[this->fds_count].fd = fd; this->fds[this->fds_count].events = POLLIN; - /* wsi only exists for connections, not broadcast listener */ - this->wsi[this->fds_count] = NULL; this->fds_count++; } @@ -1055,6 +1202,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, { struct libwebsocket_context *this = protocol->owning_server; int n; + int m; + struct libwebsocket * wsi; if (!protocol->broadcast_socket_user_fd) { /* @@ -1069,23 +1218,33 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, * called in the poll thread context and are serialized. */ - for (n = this->count_protocols + 1; n < this->fds_count; n++) { + for (n = 0; n < FD_HASHTABLE_MODULUS; n++) { - if ((unsigned long)this->wsi[n] < LWS_MAX_PROTOCOLS) - continue; + for (m = 0; m < this->fd_hashtable[n].length; m++) { - /* never broadcast to non-established connection */ - if (this->wsi[n]->state != WSI_STATE_ESTABLISHED) - continue; + wsi = this->fd_hashtable[n].wsi[m]; - /* only broadcast to guys using requested protocol */ - if (this->wsi[n]->protocol != protocol) - continue; + if (wsi->mode != LWS_CONNMODE_WS_SERVING) + continue; - this->wsi[n]->protocol->callback(this->wsi[n], + /* + * never broadcast to + * non-established connections + */ + if (wsi->state != WSI_STATE_ESTABLISHED) + continue; + + /* only broadcast to guys using + * requested protocol + */ + if (wsi->protocol != protocol) + continue; + + wsi->protocol->callback(wsi, LWS_CALLBACK_BROADCAST, - this->wsi[n]->user_space, + wsi->user_space, buf, len); + } } return 0; diff --git a/lib/parsers.c b/lib/parsers.c index 6f06bcf9..a2287941 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -1143,6 +1143,8 @@ int libwebsocket_write(struct libwebsocket *wsi, unsigned char *buf, */ buf[0] = 'C'; break; + default: + break; } } break; diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 300cfeac..bf8d5286 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -68,7 +68,7 @@ static inline void debug(const char *format, ...) } #endif - +#define FD_HASHTABLE_MODULUS 32 #define MAX_CLIENTS 100 #define LWS_MAX_HEADER_NAME_LENGTH 64 #define LWS_MAX_HEADER_LEN 4096 @@ -166,11 +166,29 @@ struct lws_tokens { int token_len; }; +enum connection_mode { + LWS_CONNMODE_WS_SERVING, + LWS_CONNMODE_WS_CLIENT, + + /* special internal types */ + LWS_CONNMODE_SERVER_LISTENER, + LWS_CONNMODE_BROADCAST_PROXY_LISTENER, + LWS_CONNMODE_BROADCAST_PROXY +}; + + +#define LWS_FD_HASH(fd) ((fd ^ (fd >> 8) ^ (fd >> 16)) % FD_HASHTABLE_MODULUS) + +struct libwebsocket_fd_hashtable { + struct libwebsocket *wsi[MAX_CLIENTS + 1]; + int length; +}; + struct libwebsocket_protocols; struct libwebsocket_context { - struct libwebsocket *wsi[MAX_CLIENTS + 1]; - struct pollfd fds[MAX_CLIENTS + 1]; + struct libwebsocket_fd_hashtable fd_hashtable[FD_HASHTABLE_MODULUS]; + struct pollfd fds[MAX_CLIENTS * FD_HASHTABLE_MODULUS + 1]; int fds_count; int listen_port; char http_proxy_address[256]; @@ -189,10 +207,6 @@ struct libwebsocket_context { int count_protocols; }; -enum connection_mode { - LWS_CONNMODE_WS_SERVING, - LWS_CONNMODE_WS_CLIENT, -}; /* @@ -215,6 +229,7 @@ struct libwebsocket { char rx_user_buffer[LWS_SEND_BUFFER_PRE_PADDING + MAX_USER_RX_BUFFER + LWS_SEND_BUFFER_POST_PADDING]; int rx_user_buffer_head; + int protocol_index_for_broadcast_proxy; int sock; @@ -279,3 +294,12 @@ xor_mask_04(struct libwebsocket *wsi, unsigned char c); extern unsigned char xor_mask_05(struct libwebsocket *wsi, unsigned char c); + +extern struct libwebsocket * +wsi_from_fd(struct libwebsocket_context *this, int fd); + +extern int +insert_wsi(struct libwebsocket_context *this, struct libwebsocket *wsi); + +extern int +delete_from_fd(struct libwebsocket_context *this, int fd); diff --git a/libwebsockets-api-doc.html b/libwebsockets-api-doc.html index 422ed7c5..2184f162 100644 --- a/libwebsockets-api-doc.html +++ b/libwebsockets-api-doc.html @@ -67,6 +67,13 @@ nothing is pending, or as soon as it services whatever was pending.
wsi
Websocket connection instance to get callback for +

Description

+
+

+This only works for internal poll management, (ie, calling the libwebsocket +service loop, you will have to make your own arrangements if your poll +loop is managed externally. +


libwebsocket_callback_on_writable_all_protocol - Request a callback for all connections using the given protocol when it becomes possible to write to each socket without blocking in turn.

int @@ -77,6 +84,13 @@ nothing is pending, or as soon as it services whatever was pending.
protocol
Protocol whose connections will get callbacks +

Description

+
+

+This only works for internal poll management, (ie, calling the libwebsocket +service loop, you will have to make your own arrangements if your poll +loop is managed externally. +


libwebsocket_get_socket_fd - returns the socket file descriptor

int @@ -110,6 +124,10 @@ You will not need this unless you are doing something special

If the output side of a server process becomes choked, this allows flow control for the input side. +

+This only works for internal poll management, (ie, calling the libwebsocket +service loop, you will have to make your own arrangements if your poll +loop is managed externally.


libwebsocket_canonical_hostname - returns this host's hostname