add-per-session-user-data.patch

Signed-off-by: Andy Green <andy@warmcat.com>
This commit is contained in:
Andy Green 2010-11-03 11:13:06 +00:00
parent 69fa072c3b
commit 251f6faf7b
4 changed files with 219 additions and 132 deletions

View file

@ -12,6 +12,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -30,8 +31,10 @@
#endif
void md5(const unsigned char *input, int ilen, unsigned char output[16]);
static void libwebsocket_service(struct libwebsocket *wsi, int sock);
static int
libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len);
#define MAX_CLIENTS 100
#define LWS_MAX_HEADER_NAME_LENGTH 64
#define LWS_MAX_HEADER_LEN 4096
#define LWS_INITIAL_HDR_ALLOC 256
@ -89,7 +92,7 @@ struct lws_tokens {
struct libwebsocket {
int (*callback)(struct libwebsocket *,
enum libwebsocket_callback_reasons reason, void *, size_t);
enum libwebsocket_callback_reasons reason, void *, void *, size_t);
enum lws_connection_states state;
@ -104,6 +107,9 @@ struct libwebsocket {
enum lws_rx_parse_state lws_rx_parse_state;
size_t rx_packet_length;
/* last */
char user_space[0];
};
@ -119,11 +125,37 @@ const struct lws_tokens lws_tokens[WSI_TOKEN_COUNT] = {
{ "\x0d\x0a", 2 },
};
static void
libwebsocket_close_and_free_session(struct libwebsocket *wsi)
{
int n = wsi->state;
wsi->state = WSI_STATE_DEAD_SOCKET;
if (wsi->callback && n == WSI_STATE_ESTABLISHED)
wsi->callback(wsi, LWS_CALLBACK_CLOSED, &wsi->user_space[0],
NULL, 0);
for (n = 0; n < WSI_TOKEN_COUNT; n++)
if (wsi->utf8_token[n].token)
free(wsi->utf8_token[n].token);
// fprintf(stderr, "closing fd=%d\n", wsi->sock);
shutdown(wsi->sock, SHUT_RDWR);
close(wsi->sock);
free(wsi);
}
/**
* libwebsocket_create_server() - Create the listening websockets server
* @port: Port to listen on
* @callback: The callback in user code to perform actual serving
* @protocol: Which version of the websockets protocol (currently 76)
* @user_area_size: How much memory to allocate per connection session
* which will be used by the user application to store
* per-session data. A pointer to this space is given
* when the user callback is called.
*
* This function forks to create the listening socket and takes care
* of all initialization in one step.
@ -143,50 +175,53 @@ const struct lws_tokens lws_tokens[WSI_TOKEN_COUNT] = {
int libwebsocket_create_server(int port,
int (*callback)(struct libwebsocket *,
enum libwebsocket_callback_reasons, void *, size_t),
int protocol)
enum libwebsocket_callback_reasons,
void *, void *, size_t),
int protocol, size_t user_area_size)
{
int n;
int client;
int sockfd;
int sessfd;
int fd;
unsigned int clilen;
struct sockaddr_in serv_addr, cli_addr;
int pid;
struct libwebsocket *wsi = malloc(sizeof(struct libwebsocket));
if (!wsi)
return -1;
wsi->state = WSI_STATE_HTTP;
wsi->name_buffer_pos = 0;
struct libwebsocket *wsi[MAX_CLIENTS + 1];
struct pollfd fds[MAX_CLIENTS + 1];
int fds_count = 0;
unsigned char buf[256];
int opt = 1;
/* sanity check */
for (n = 0; n < WSI_TOKEN_COUNT; n++) {
wsi->utf8_token[n].token = NULL;
wsi->utf8_token[n].token_len = 0;
}
wsi->callback = callback;
switch (protocol) {
case 0:
case 2:
case 76:
fprintf(stderr, " Using protocol v%d\n", protocol);
wsi->ietf_spec_revision = protocol;
break;
default:
fprintf(stderr, "protocol %d not supported (try 0 2 or 76)\n",
protocol);
return -1;
}
if (!callback) {
fprintf(stderr, "callback is not optional!\n");
return -1;
}
/* sit there listening for connects, accept and spawn session servers */
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
fprintf(stderr, "ERROR opening socket");
return -1;
}
bzero((char *) &serv_addr, sizeof(serv_addr));
/* allow us to restart even if old sockets in TIME_WAIT */
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(port);
@ -202,66 +237,148 @@ int libwebsocket_create_server(int port,
n = fork();
if (n < 0) {
fprintf(stderr, "Failed on forking server thread: %d\n", n);
exit(1);
return -1;
}
/* we are done as far as the caller is concerned */
if (n)
return 0;
return sockfd;
fprintf(stderr, " Listening on port %d\n", port);
/* we are running in a forked subprocess now */
listen(sockfd, 5);
fprintf(stderr, " Listening on port %d\n", port);
fds[0].fd = sockfd;
fds_count = 1;
fds[0].events = POLLIN;
while (1) {
clilen = sizeof(cli_addr);
sessfd = accept(sockfd, (struct sockaddr *)&cli_addr, &clilen);
if (sessfd < 0) {
fprintf(stderr, "ERROR on accept");
continue;
n = poll(fds, fds_count, 50);
if (n < 0 || fds[0].revents & (POLLERR | POLLHUP)) {
// fprintf(stderr, "Listen Socket dead\n");
goto fatal;
}
if (n == 0) /* poll timeout */
goto poll_out;
if (fds[0].revents & POLLIN) {
/* listen socket got a new connection... */
clilen = sizeof(cli_addr);
fd = accept(sockfd, (struct sockaddr *)&cli_addr,
&clilen);
if (fd < 0) {
fprintf(stderr, "ERROR on accept");
continue;
}
/* fork off a new server instance */
if (fds_count >= MAX_CLIENTS) {
fprintf(stderr, "too busy");
close(fd);
continue;
}
pid = fork();
if (pid < 0) {
fprintf(stderr, "ERROR on fork");
continue;
// fprintf(stderr, "accepted new conn port %u on fd=%d\n",
// ntohs(cli_addr.sin_port), fd);
/* intialize the instance struct */
wsi[fds_count] = malloc(sizeof(struct libwebsocket) +
user_area_size);
if (!wsi[fds_count])
return -1;
wsi[fds_count]->sock = fd;
wsi[fds_count]->state = WSI_STATE_HTTP;
wsi[fds_count]->name_buffer_pos = 0;
for (n = 0; n < WSI_TOKEN_COUNT; n++) {
wsi[fds_count]->utf8_token[n].token = NULL;
wsi[fds_count]->utf8_token[n].token_len = 0;
}
wsi[fds_count]->callback = callback;
wsi[fds_count]->ietf_spec_revision = protocol;
fds[fds_count].events = POLLIN;
fds[fds_count++].fd = fd;
}
if (pid) {
close(sessfd);
continue;
/* check for activity on client sockets */
for (client = 1; client < fds_count; client++) {
/* handle session socket closed */
if (fds[client].revents & (POLLERR | POLLHUP)) {
fprintf(stderr, "Session Socket dead\n");
libwebsocket_close_and_free_session(wsi[client]);
goto nuke_this;
}
/* any incoming data ready? */
if (!(fds[client].revents & POLLIN))
continue;
// fprintf(stderr, "POLLIN\n");
n = recv(fds[client].fd, buf, sizeof(buf), 0);
if (n < 0) {
fprintf(stderr, "Socket read returned %d\n", n);
continue;
}
if (!n) {
// fprintf(stderr, "POLLIN with 0 len waiting\n");
libwebsocket_close_and_free_session(wsi[client]);
goto nuke_this;
}
/* service incoming data */
if (libwebsocket_read(wsi[client], buf, n) >= 0)
continue;
/* it closed and nuked wsi[client] */
nuke_this:
for (n = client; n < fds_count - 1; n++) {
fds[n] = fds[n + 1];
wsi[n] = wsi[n + 1];
}
fds_count--;
client--;
}
/* we are the session process */
poll_out:
for (client = 1; client < fds_count; client++) {
close(sockfd);
/* sit in libwebsocket_service() until session socket closed */
libwebsocket_service(wsi, sessfd);
if (wsi[client]->state != WSI_STATE_ESTABLISHED)
continue;
if (!wsi[client]->callback)
continue;
exit(0);
wsi[client]->callback(wsi[client], LWS_CALLBACK_SEND,
&wsi[client]->user_space[0], NULL, 0);
}
continue;
}
}
fatal:
close(fds[0].fd);
for (client = 1; client < fds_count; client++)
libwebsocket_close_and_free_session(wsi[client]);
static void libwebsocket_close(struct libwebsocket *wsi)
{
int n;
wsi->state = WSI_STATE_DEAD_SOCKET;
if (wsi->callback)
wsi->callback(wsi, LWS_CALLBACK_CLOSED, NULL, 0);
for (n = 0; n < WSI_TOKEN_COUNT; n++)
if (wsi->utf8_token[n].token)
free(wsi->utf8_token[n].token);
close(wsi->sock);
kill(0, SIGTERM);
return 0;
}
/**
@ -518,7 +635,8 @@ static int libwebsocket_interpret_incoming_packet(struct libwebsocket *wsi,
return -1;
if (n != len && wsi->callback)
wsi->callback(wsi, LWS_CALLBACK_RECEIVE, &buf[n], len - n);
wsi->callback(wsi, LWS_CALLBACK_RECEIVE, &wsi->user_space[0],
&buf[n], len - n);
return -0;
}
@ -563,6 +681,7 @@ libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len)
!wsi->utf8_token[WSI_TOKEN_CONNECTION].token_len) {
if (wsi->callback)
(wsi->callback)(wsi, LWS_CALLBACK_HTTP,
&wsi->user_space[0],
NULL, 0);
wsi->state = WSI_STATE_HTTP;
return 0;
@ -674,7 +793,8 @@ libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len)
/* notify user code that we're ready to roll */
if (wsi->callback)
wsi->callback(wsi, LWS_CALLBACK_ESTABLISHED, NULL, 0);
wsi->callback(wsi, LWS_CALLBACK_ESTABLISHED,
&wsi->user_space[0], NULL, 0);
break;
case WSI_STATE_ESTABLISHED:
@ -688,7 +808,7 @@ libwebsocket_read(struct libwebsocket *wsi, unsigned char * buf, size_t len)
return 0;
bail:
libwebsocket_close(wsi);
libwebsocket_close_and_free_session(wsi);
return -1;
}
@ -826,11 +946,13 @@ int libwebsocket_write(struct libwebsocket * wsi, unsigned char *buf,
}
break;
}
// for (n = 0; n < (len + pre + post); n++)
// fprintf(stderr, "%02X ", buf[n - pre]);
//
// fprintf(stderr, "\n");
#if 0
for (n = 0; n < (len + pre + post); n++)
fprintf(stderr, "%02X ", buf[n - pre]);
fprintf(stderr, "\n");
#endif
send_raw:
@ -845,63 +967,6 @@ send_raw:
return 0;
}
static void libwebsocket_service(struct libwebsocket *wsi, int sock)
{
int n;
unsigned char buf[256];
struct pollfd fds;
wsi->sock = sock;
while (1) {
fds.fd = sock;
fds.events = POLLIN;
fds.revents = 0;
n = poll(&fds, 1, 50);
if (n < 0) {
fprintf(stderr, "Socket dead (poll = %d)\n", n);
return;
}
if (n == 0)
goto pout;
if (fds.revents & (POLLERR | POLLHUP)) {
fprintf(stderr, "Socket dead\n");
return;
}
if (wsi->state == WSI_STATE_DEAD_SOCKET) {
fprintf(stderr, "Seen socket dead, returning\n");
return;
}
if (fds.revents & POLLIN) {
// fprintf(stderr, "POLLIN\n");
n = recv(sock, buf, sizeof(buf), 0);
if (n < 0) {
fprintf(stderr, "Socket read returned %d\n", n);
continue;
}
if (n)
libwebsocket_read(wsi, buf, n);
else {
fprintf(stderr, "POLLIN with 0 len waiting\n");
usleep(50000);
}
}
pout:
if (wsi->state != WSI_STATE_ESTABLISHED)
continue;
// fprintf(stderr, "POLLOUT\n");
if (wsi->callback)
wsi->callback(wsi, LWS_CALLBACK_SEND, NULL, 0);
}
}
/**
* libwebsockets_serve_http_file() - Send a file back to the client using http

View file

@ -21,7 +21,8 @@ struct libwebsocket;
extern int libwebsocket_create_server(int port,
int (*callback)(struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *in, size_t len), int protocol);
void *user, void *in, size_t len),
int protocol, size_t user_space);
/*
* IMPORTANT NOTICE!

View file

@ -5,8 +5,9 @@ libwebsocket_create_server - Create the listening websockets server
Synopsis:
int libwebsocket_create_server (int port,
int (*callback) (struct libwebsocket *, enum libwebsocket_callback_reasons, void *, size_t,
int protocol);
int (*callback) (struct libwebsocket *, enum libwebsocket_callback_reasons, void *, void *, size_t,
int protocol,
size_t user_area_size);
Arguments:
@ -19,6 +20,12 @@ callback
protocol
Which version of the websockets protocol (currently 76)
user_area_size
How much memory to allocate per connection session
which will be used by the user application to store
per-session data. A pointer to this space is given
when the user callback is called.
Description:

View file

@ -18,16 +18,28 @@
static int port = 7681;
static int ws_protocol = 76;
struct per_session_data {
int number;
};
/**
* libwebsocket_callback() - User server actions
* @wsi: Opaque websocket instance pointer
* @reason: The reason for the call
* @user: Pointer to per-session user data allocated by library
* @in: Pointer used for some callback reasons
* @len: Length set for some callback reasons
*
* This callback is the way the user controls what is served. All the
* protocol detail is hidden and handled by the library.
*
* For each connection / session there is user data allocated that is
* pointed to by "user". You set the size of this user data area when
* the library is initialized with libwebsocket_create_server.
*
* You get an opportunity to initialize user data when called back with
* LWS_CALLBACK_ESTABLISHED reason.
*
* LWS_CALLBACK_ESTABLISHED: after successful websocket handshake
* LWS_CALLBACK_CLOSED: when the websocket session ends
* LWS_CALLBACK_SEND: opportunity to send to client (you would use
@ -47,14 +59,15 @@ static int ws_protocol = 76;
*/
static int websocket_callback(struct libwebsocket * wsi,
enum libwebsocket_callback_reasons reason, void *in, size_t len)
enum libwebsocket_callback_reasons reason, void * user,
void *in, size_t len)
{
int n;
char buf[LWS_SEND_BUFFER_PRE_PADDING + 512 +
LWS_SEND_BUFFER_POST_PADDING];
static int bump;
char *p = &buf[LWS_SEND_BUFFER_PRE_PADDING];
const char *uri;
struct per_session_data * pss = user;
switch (reason) {
/*
@ -62,6 +75,7 @@ static int websocket_callback(struct libwebsocket * wsi,
*/
case LWS_CALLBACK_ESTABLISHED:
fprintf(stderr, "Websocket connection established\n");
pss->number = 0;
break;
/*
@ -75,7 +89,7 @@ static int websocket_callback(struct libwebsocket * wsi,
* Opportunity for us to send something on the connection
*/
case LWS_CALLBACK_SEND:
n = sprintf(p, "%d", bump++);
n = sprintf(p, "%d", pss->number++);
n = libwebsocket_write(wsi, (unsigned char *)p, n, 0);
if (n < 0) {
fprintf(stderr, "ERROR writing to socket");
@ -154,8 +168,8 @@ int main(int argc, char **argv)
}
}
if (libwebsocket_create_server(port, websocket_callback, ws_protocol) <
0) {
if (libwebsocket_create_server(port, websocket_callback, ws_protocol,
sizeof(struct per_session_data)) < 0) {
fprintf(stderr, "libwebsocket init failed\n");
return -1;
}