diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index b11f4f8d..065cb018 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -30,8 +31,10 @@ #endif void md5(const unsigned char *input, int ilen, unsigned char output[16]); -static void libwebsocket_service(struct libwebsocket *wsi, int sock); +static int +libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len); +#define MAX_CLIENTS 100 #define LWS_MAX_HEADER_NAME_LENGTH 64 #define LWS_MAX_HEADER_LEN 4096 #define LWS_INITIAL_HDR_ALLOC 256 @@ -89,7 +92,7 @@ struct lws_tokens { struct libwebsocket { int (*callback)(struct libwebsocket *, - enum libwebsocket_callback_reasons reason, void *, size_t); + enum libwebsocket_callback_reasons reason, void *, void *, size_t); enum lws_connection_states state; @@ -104,6 +107,9 @@ struct libwebsocket { enum lws_rx_parse_state lws_rx_parse_state; size_t rx_packet_length; + + /* last */ + char user_space[0]; }; @@ -119,11 +125,37 @@ const struct lws_tokens lws_tokens[WSI_TOKEN_COUNT] = { { "\x0d\x0a", 2 }, }; +static void +libwebsocket_close_and_free_session(struct libwebsocket *wsi) +{ + int n = wsi->state; + + wsi->state = WSI_STATE_DEAD_SOCKET; + + if (wsi->callback && n == WSI_STATE_ESTABLISHED) + wsi->callback(wsi, LWS_CALLBACK_CLOSED, &wsi->user_space[0], + NULL, 0); + + for (n = 0; n < WSI_TOKEN_COUNT; n++) + if (wsi->utf8_token[n].token) + free(wsi->utf8_token[n].token); + +// fprintf(stderr, "closing fd=%d\n", wsi->sock); + + shutdown(wsi->sock, SHUT_RDWR); + close(wsi->sock); + free(wsi); +} + /** * libwebsocket_create_server() - Create the listening websockets server * @port: Port to listen on * @callback: The callback in user code to perform actual serving * @protocol: Which version of the websockets protocol (currently 76) + * @user_area_size: How much memory to allocate per connection session + * which will be used by the user application to store + * per-session data. A pointer to this space is given + * when the user callback is called. * * This function forks to create the listening socket and takes care * of all initialization in one step. @@ -143,50 +175,53 @@ const struct lws_tokens lws_tokens[WSI_TOKEN_COUNT] = { int libwebsocket_create_server(int port, int (*callback)(struct libwebsocket *, - enum libwebsocket_callback_reasons, void *, size_t), - int protocol) + enum libwebsocket_callback_reasons, + void *, void *, size_t), + int protocol, size_t user_area_size) { int n; + int client; int sockfd; - int sessfd; + int fd; unsigned int clilen; struct sockaddr_in serv_addr, cli_addr; - int pid; - struct libwebsocket *wsi = malloc(sizeof(struct libwebsocket)); - - if (!wsi) - return -1; - - wsi->state = WSI_STATE_HTTP; - wsi->name_buffer_pos = 0; + struct libwebsocket *wsi[MAX_CLIENTS + 1]; + struct pollfd fds[MAX_CLIENTS + 1]; + int fds_count = 0; + unsigned char buf[256]; + int opt = 1; + + /* sanity check */ - for (n = 0; n < WSI_TOKEN_COUNT; n++) { - wsi->utf8_token[n].token = NULL; - wsi->utf8_token[n].token_len = 0; - } - - wsi->callback = callback; switch (protocol) { case 0: case 2: case 76: fprintf(stderr, " Using protocol v%d\n", protocol); - wsi->ietf_spec_revision = protocol; break; default: fprintf(stderr, "protocol %d not supported (try 0 2 or 76)\n", protocol); return -1; } + + if (!callback) { + fprintf(stderr, "callback is not optional!\n"); + return -1; + } /* sit there listening for connects, accept and spawn session servers */ sockfd = socket(AF_INET, SOCK_STREAM, 0); if (sockfd < 0) { fprintf(stderr, "ERROR opening socket"); + return -1; } - bzero((char *) &serv_addr, sizeof(serv_addr)); + + /* allow us to restart even if old sockets in TIME_WAIT */ + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + bzero((char *) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(port); @@ -202,66 +237,148 @@ int libwebsocket_create_server(int port, n = fork(); if (n < 0) { fprintf(stderr, "Failed on forking server thread: %d\n", n); - exit(1); + return -1; } /* we are done as far as the caller is concerned */ if (n) - return 0; + return sockfd; - fprintf(stderr, " Listening on port %d\n", port); + /* we are running in a forked subprocess now */ listen(sockfd, 5); + fprintf(stderr, " Listening on port %d\n", port); + + fds[0].fd = sockfd; + fds_count = 1; + fds[0].events = POLLIN; while (1) { - clilen = sizeof(cli_addr); - sessfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen); - if (sessfd < 0) { - fprintf(stderr, "ERROR on accept"); - continue; + n = poll(fds, fds_count, 50); + if (n < 0 || fds[0].revents & (POLLERR | POLLHUP)) { +// fprintf(stderr, "Listen Socket dead\n"); + goto fatal; } + if (n == 0) /* poll timeout */ + goto poll_out; + + if (fds[0].revents & POLLIN) { + + /* listen socket got a new connection... */ + + clilen = sizeof(cli_addr); + fd = accept(sockfd, (struct sockaddr *)&cli_addr, + &clilen); + if (fd < 0) { + fprintf(stderr, "ERROR on accept"); + continue; + } - /* fork off a new server instance */ + if (fds_count >= MAX_CLIENTS) { + fprintf(stderr, "too busy"); + close(fd); + continue; + } - pid = fork(); - if (pid < 0) { - fprintf(stderr, "ERROR on fork"); - continue; +// fprintf(stderr, "accepted new conn port %u on fd=%d\n", +// ntohs(cli_addr.sin_port), fd); + + /* intialize the instance struct */ + + wsi[fds_count] = malloc(sizeof(struct libwebsocket) + + user_area_size); + if (!wsi[fds_count]) + return -1; + + wsi[fds_count]->sock = fd; + wsi[fds_count]->state = WSI_STATE_HTTP; + wsi[fds_count]->name_buffer_pos = 0; + + for (n = 0; n < WSI_TOKEN_COUNT; n++) { + wsi[fds_count]->utf8_token[n].token = NULL; + wsi[fds_count]->utf8_token[n].token_len = 0; + } + + wsi[fds_count]->callback = callback; + wsi[fds_count]->ietf_spec_revision = protocol; + + fds[fds_count].events = POLLIN; + fds[fds_count++].fd = fd; } - if (pid) { - close(sessfd); - continue; + /* check for activity on client sockets */ + + for (client = 1; client < fds_count; client++) { + + /* handle session socket closed */ + + if (fds[client].revents & (POLLERR | POLLHUP)) { + + fprintf(stderr, "Session Socket dead\n"); + + libwebsocket_close_and_free_session(wsi[client]); + goto nuke_this; + } + + /* any incoming data ready? */ + + if (!(fds[client].revents & POLLIN)) + continue; + +// fprintf(stderr, "POLLIN\n"); + + n = recv(fds[client].fd, buf, sizeof(buf), 0); + if (n < 0) { + fprintf(stderr, "Socket read returned %d\n", n); + continue; + } + if (!n) { +// fprintf(stderr, "POLLIN with 0 len waiting\n"); + libwebsocket_close_and_free_session(wsi[client]); + goto nuke_this; + } + + /* service incoming data */ + + if (libwebsocket_read(wsi[client], buf, n) >= 0) + continue; + + /* it closed and nuked wsi[client] */ +nuke_this: + for (n = client; n < fds_count - 1; n++) { + fds[n] = fds[n + 1]; + wsi[n] = wsi[n + 1]; + } + fds_count--; + client--; } - /* we are the session process */ +poll_out: + for (client = 1; client < fds_count; client++) { - close(sockfd); - - /* sit in libwebsocket_service() until session socket closed */ - - libwebsocket_service(wsi, sessfd); + if (wsi[client]->state != WSI_STATE_ESTABLISHED) + continue; + + if (!wsi[client]->callback) + continue; - exit(0); + wsi[client]->callback(wsi[client], LWS_CALLBACK_SEND, + &wsi[client]->user_space[0], NULL, 0); + } + + continue; } -} + +fatal: + close(fds[0].fd); + for (client = 1; client < fds_count; client++) + libwebsocket_close_and_free_session(wsi[client]); -static void libwebsocket_close(struct libwebsocket *wsi) -{ - int n; - - wsi->state = WSI_STATE_DEAD_SOCKET; - - if (wsi->callback) - wsi->callback(wsi, LWS_CALLBACK_CLOSED, NULL, 0); - - for (n = 0; n < WSI_TOKEN_COUNT; n++) - if (wsi->utf8_token[n].token) - free(wsi->utf8_token[n].token); - - close(wsi->sock); + kill(0, SIGTERM); + + return 0; } /** @@ -518,7 +635,8 @@ static int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi, return -1; if (n != len && wsi->callback) - wsi->callback(wsi, LWS_CALLBACK_RECEIVE, &buf[n], len - n); + wsi->callback(wsi, LWS_CALLBACK_RECEIVE, &wsi->user_space[0], + &buf[n], len - n); return -0; } @@ -563,6 +681,7 @@ libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len) !wsi->utf8_token[WSI_TOKEN_CONNECTION].token_len) { if (wsi->callback) (wsi->callback)(wsi, LWS_CALLBACK_HTTP, + &wsi->user_space[0], NULL, 0); wsi->state = WSI_STATE_HTTP; return 0; @@ -674,7 +793,8 @@ libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len) /* notify user code that we're ready to roll */ if (wsi->callback) - wsi->callback(wsi, LWS_CALLBACK_ESTABLISHED, NULL, 0); + wsi->callback(wsi, LWS_CALLBACK_ESTABLISHED, + &wsi->user_space[0], NULL, 0); break; case WSI_STATE_ESTABLISHED: @@ -688,7 +808,7 @@ libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len) return 0; bail: - libwebsocket_close(wsi); + libwebsocket_close_and_free_session(wsi); return -1; } @@ -826,11 +946,13 @@ int libwebsocket_write(struct libwebsocket * wsi, unsigned char *buf, } break; } - -// for (n = 0; n < (len + pre + post); n++) -// fprintf(stderr, "%02X ", buf[n - pre]); -// -// fprintf(stderr, "\n"); + +#if 0 + for (n = 0; n < (len + pre + post); n++) + fprintf(stderr, "%02X ", buf[n - pre]); + + fprintf(stderr, "\n"); +#endif send_raw: @@ -845,63 +967,6 @@ send_raw: return 0; } -static void libwebsocket_service(struct libwebsocket *wsi, int sock) -{ - int n; - unsigned char buf[256]; - struct pollfd fds; - - wsi->sock = sock; - - while (1) { - fds.fd = sock; - fds.events = POLLIN; - fds.revents = 0; - n = poll(&fds, 1, 50); - - if (n < 0) { - fprintf(stderr, "Socket dead (poll = %d)\n", n); - return; - } - if (n == 0) - goto pout; - - if (fds.revents & (POLLERR | POLLHUP)) { - fprintf(stderr, "Socket dead\n"); - return; - } - - if (wsi->state == WSI_STATE_DEAD_SOCKET) { - fprintf(stderr, "Seen socket dead, returning\n"); - return; - } - - if (fds.revents & POLLIN) { - -// fprintf(stderr, "POLLIN\n"); - - n = recv(sock, buf, sizeof(buf), 0); - if (n < 0) { - fprintf(stderr, "Socket read returned %d\n", n); - continue; - } - if (n) - libwebsocket_read(wsi, buf, n); - else { - fprintf(stderr, "POLLIN with 0 len waiting\n"); - usleep(50000); - } - } -pout: - if (wsi->state != WSI_STATE_ESTABLISHED) - continue; - -// fprintf(stderr, "POLLOUT\n"); - - if (wsi->callback) - wsi->callback(wsi, LWS_CALLBACK_SEND, NULL, 0); - } -} /** * libwebsockets_serve_http_file() - Send a file back to the client using http diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 03974370..df351acc 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -21,7 +21,8 @@ struct libwebsocket; extern int libwebsocket_create_server(int port, int (*callback)(struct libwebsocket *wsi, enum libwebsocket_callback_reasons reason, - void *in, size_t len), int protocol); + void *user, void *in, size_t len), + int protocol, size_t user_space); /* * IMPORTANT NOTICE! diff --git a/libwebsockets-api-doc.txt b/libwebsockets-api-doc.txt index d431e63c..00ebce13 100644 --- a/libwebsockets-api-doc.txt +++ b/libwebsockets-api-doc.txt @@ -5,8 +5,9 @@ libwebsocket_create_server - Create the listening websockets server Synopsis: int libwebsocket_create_server (int port, - int (*callback) (struct libwebsocket *, enum libwebsocket_callback_reasons, void *, size_t, - int protocol); + int (*callback) (struct libwebsocket *, enum libwebsocket_callback_reasons, void *, void *, size_t, + int protocol, + size_t user_area_size); Arguments: @@ -19,6 +20,12 @@ callback protocol Which version of the websockets protocol (currently 76) +user_area_size + How much memory to allocate per connection session +which will be used by the user application to store +per-session data. A pointer to this space is given +when the user callback is called. + Description: diff --git a/test-server/test-server.c b/test-server/test-server.c index b4d0ef79..ae8deed8 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -18,16 +18,28 @@ static int port = 7681; static int ws_protocol = 76; +struct per_session_data { + int number; +}; + /** * libwebsocket_callback() - User server actions * @wsi: Opaque websocket instance pointer * @reason: The reason for the call + * @user: Pointer to per-session user data allocated by library * @in: Pointer used for some callback reasons * @len: Length set for some callback reasons * * This callback is the way the user controls what is served. All the * protocol detail is hidden and handled by the library. * + * For each connection / session there is user data allocated that is + * pointed to by "user". You set the size of this user data area when + * the library is initialized with libwebsocket_create_server. + * + * You get an opportunity to initialize user data when called back with + * LWS_CALLBACK_ESTABLISHED reason. + * * LWS_CALLBACK_ESTABLISHED: after successful websocket handshake * LWS_CALLBACK_CLOSED: when the websocket session ends * LWS_CALLBACK_SEND: opportunity to send to client (you would use @@ -47,14 +59,15 @@ static int ws_protocol = 76; */ static int websocket_callback(struct libwebsocket * wsi, - enum libwebsocket_callback_reasons reason, void *in, size_t len) + enum libwebsocket_callback_reasons reason, void * user, + void *in, size_t len) { int n; char buf[LWS_SEND_BUFFER_PRE_PADDING + 512 + LWS_SEND_BUFFER_POST_PADDING]; - static int bump; char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING]; const char *uri; + struct per_session_data * pss = user; switch (reason) { /* @@ -62,6 +75,7 @@ static int websocket_callback(struct libwebsocket * wsi, */ case LWS_CALLBACK_ESTABLISHED: fprintf(stderr, "Websocket connection established\n"); + pss->number = 0; break; /* @@ -75,7 +89,7 @@ static int websocket_callback(struct libwebsocket * wsi, * Opportunity for us to send something on the connection */ case LWS_CALLBACK_SEND: - n = sprintf(p, "%d", bump++); + n = sprintf(p, "%d", pss->number++); n = libwebsocket_write(wsi, (unsigned char *)p, n, 0); if (n < 0) { fprintf(stderr, "ERROR writing to socket"); @@ -154,8 +168,8 @@ int main(int argc, char **argv) } } - if (libwebsocket_create_server(port, websocket_callback, ws_protocol) < - 0) { + if (libwebsocket_create_server(port, websocket_callback, ws_protocol, + sizeof(struct per_session_data)) < 0) { fprintf(stderr, "libwebsocket init failed\n"); return -1; }