expose-event-loop.patch

Signed-off-by: Andy Green <andy@warmcat.com>
This commit is contained in:
Andy Green 2011-01-19 13:11:55 +00:00
parent 38e57bbd71
commit e92cd1764e
5 changed files with 294 additions and 231 deletions

View file

@ -193,6 +193,189 @@ nuke_this:
}
int
libwebsocket_service(struct libwebsocket_context *this, int timeout_ms)
{
int n;
int client;
unsigned int clilen;
struct sockaddr_in cli_addr;
int fd;
/* stay dead once we are dead */
if (this == NULL)
return 1;
n = poll(this->fds, this->fds_count, timeout_ms);
if (n < 0 || this->fds[0].revents & (POLLERR | POLLHUP)) {
fprintf(stderr, "Listen Socket dead\n");
goto fatal;
}
if (n == 0) /* poll timeout */
return 0;
/* handle accept on listening socket? */
for (client = 0; client < this->count_protocols + 1; client++) {
if (!this->fds[client].revents & POLLIN)
continue;
/* listen socket got an unencrypted connection... */
clilen = sizeof(cli_addr);
fd = accept(this->fds[client].fd,
(struct sockaddr *)&cli_addr, &clilen);
if (fd < 0) {
fprintf(stderr, "ERROR on accept");
continue;
}
if (this->fds_count >= MAX_CLIENTS) {
fprintf(stderr, "too busy");
close(fd);
continue;
}
if (client) {
/*
* accepting a connection to broadcast socket
* set wsi to be protocol index not pointer
*/
this->wsi[this->fds_count] =
(struct libwebsocket *)(long)(client - 1);
goto fill_in_fds;
}
/* accepting connection to main listener */
this->wsi[this->fds_count] =
malloc(sizeof(struct libwebsocket));
if (!this->wsi[this->fds_count]) {
fprintf(stderr, "Out of memory for new connection\n");
continue;
}
#ifdef LWS_OPENSSL_SUPPORT
if (this->use_ssl) {
this->wsi[this->fds_count]->ssl = SSL_new(ssl_ctx);
if (this->wsi[this->fds_count]->ssl == NULL) {
fprintf(stderr, "SSL_new failed: %s\n",
ERR_error_string(SSL_get_error(
this->wsi[this->fds_count]->ssl, 0),
NULL));
free(this->wsi[this->fds_count]);
continue;
}
SSL_set_fd(this->wsi[this->fds_count]->ssl, fd);
n = SSL_accept(this->wsi[this->fds_count]->ssl);
if (n != 1) {
/*
* browsers seem to probe with various
* ssl params which fail then retry
* and succeed
*/
debug("SSL_accept failed skt %u: %s\n",
fd,
ERR_error_string(SSL_get_error(
this->wsi[this->fds_count]->ssl,
n), NULL));
SSL_free(
this->wsi[this->fds_count]->ssl);
free(this->wsi[this->fds_count]);
continue;
}
debug("accepted new SSL conn "
"port %u on fd=%d SSL ver %s\n",
ntohs(cli_addr.sin_port), fd,
SSL_get_version(this->wsi[
this->fds_count]->ssl));
} else
#endif
debug("accepted new conn port %u on fd=%d\n",
ntohs(cli_addr.sin_port), fd);
/* intialize the instance struct */
this->wsi[this->fds_count]->sock = fd;
this->wsi[this->fds_count]->state = WSI_STATE_HTTP;
this->wsi[this->fds_count]->name_buffer_pos = 0;
for (n = 0; n < WSI_TOKEN_COUNT; n++) {
this->wsi[this->fds_count]->
utf8_token[n].token = NULL;
this->wsi[this->fds_count]->
utf8_token[n].token_len = 0;
}
/*
* these can only be set once the protocol is known
* we set an unestablished connection's protocol pointer
* to the start of the supported list, so it can look
* for matching ones during the handshake
*/
this->wsi[this->fds_count]->protocol = this->protocols;
this->wsi[this->fds_count]->user_space = NULL;
/*
* Default protocol is 76 / 00
* After 76, there's a header specified to inform which
* draft the client wants, when that's seen we modify
* the individual connection's spec revision accordingly
*/
this->wsi[this->fds_count]->ietf_spec_revision = 0;
fill_in_fds:
/*
* make sure NO events are seen yet on this new socket
* (otherwise we inherit old fds[client].revents from
* previous socket there and die mysteriously! )
*/
this->fds[this->fds_count].revents = 0;
this->fds[this->fds_count].events = POLLIN;
this->fds[this->fds_count++].fd = fd;
}
/* service anything incoming on websocket connection */
libwebsocket_poll_connections(this);
/* this round is done */
return 0;
fatal:
/* close listening skt and per-protocol broadcast sockets */
for (client = 0; client < this->fds_count; client++)
close(this->fds[0].fd);
#ifdef LWS_OPENSSL_SUPPORT
SSL_CTX_free(ssl_ctx);
#endif
kill(0, SIGTERM);
if (this)
free(this);
this = NULL;
/* inform caller we are dead */
return 1;
}
/**
* libwebsocket_create_server() - Create the listening websockets server
@ -212,10 +395,11 @@ nuke_this:
* This function creates the listening socket and takes care
* of all initialization in one step.
*
* After initialization, it forks a thread that will sits in a service loop
* and returns to the caller. The actual service actions are performed by
* user code in a per-protocol callback from the appropriate one selected
* by the client from the list in @protocols.
* After initialization, it returns a struct libwebsocket_context * that
* represents this server. After calling, user code needs to take care
* of calling libwebsocket_service() with the context pointer to get the
* server's sockets serviced. This can be done in the same process context
* or a forked process, or another thread,
*
* The protocol callback functions are called for a handful of events
* including http requests coming in, websocket connections becoming
@ -234,17 +418,16 @@ nuke_this:
* one place; they're all handled in the user callback.
*/
int libwebsocket_create_server(int port,
struct libwebsocket_context *
libwebsocket_create_server(int port,
struct libwebsocket_protocols *protocols,
const char *ssl_cert_filepath,
const char *ssl_private_key_filepath,
int gid, int uid)
{
int n;
int client;
int sockfd;
int fd;
unsigned int clilen;
struct sockaddr_in serv_addr, cli_addr;
int opt = 1;
struct libwebsocket_context *this = NULL;
@ -263,7 +446,7 @@ int libwebsocket_create_server(int port,
#else
if (ssl_cert_filepath != NULL && ssl_private_key_filepath != NULL) {
fprintf(stderr, " Not compiled for OpenSSl support!\n");
return -1;
return NULL;
}
fprintf(stderr, " Compiled without SSL support, serving unencrypted\n");
#endif
@ -284,13 +467,13 @@ int libwebsocket_create_server(int port,
if (!method) {
fprintf(stderr, "problem creating ssl method: %s\n",
ERR_error_string(ERR_get_error(), ssl_err_buf));
return -1;
return NULL;
}
ssl_ctx = SSL_CTX_new(method); /* create context */
if (!ssl_ctx) {
printf("problem creating ssl context: %s\n",
ERR_error_string(ERR_get_error(), ssl_err_buf));
return -1;
return NULL;
}
/* set the local certificate from CertFile */
n = SSL_CTX_use_certificate_file(ssl_ctx,
@ -299,7 +482,7 @@ int libwebsocket_create_server(int port,
fprintf(stderr, "problem getting cert '%s': %s\n",
ssl_cert_filepath,
ERR_error_string(ERR_get_error(), ssl_err_buf));
return -1;
return NULL;
}
/* set the private key from KeyFile */
if (SSL_CTX_use_PrivateKey_file(ssl_ctx,
@ -308,12 +491,12 @@ int libwebsocket_create_server(int port,
fprintf(stderr, "ssl problem getting key '%s': %s\n",
ssl_private_key_filepath,
ERR_error_string(ERR_get_error(), ssl_err_buf));
return -1;
return NULL;
}
/* verify private key */
if (!SSL_CTX_check_private_key(ssl_ctx)) {
fprintf(stderr, "Private SSL key doesn't match cert\n");
return -1;
return NULL;
}
/* SSL is happy and has a cert it's content with */
@ -323,17 +506,19 @@ int libwebsocket_create_server(int port,
/* selftest */
if (lws_b64_selftest())
return -1;
return NULL;
this = malloc(sizeof(struct libwebsocket_context));
this->protocols = protocols;
/* set up our external listening socket we serve on */
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
fprintf(stderr, "ERROR opening socket");
return -1;
return NULL;
}
/* allow us to restart even if old sockets in TIME_WAIT */
@ -348,7 +533,7 @@ int libwebsocket_create_server(int port,
if (n < 0) {
fprintf(stderr, "ERROR on binding to port %d (%d %d)\n",
port, n, errno);
return -1;
return NULL;
}
/* drop any root privs for this process */
@ -390,7 +575,7 @@ int libwebsocket_create_server(int port,
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
fprintf(stderr, "ERROR opening socket");
return -1;
return NULL;
}
/* allow us to restart even if old sockets in TIME_WAIT */
@ -405,14 +590,14 @@ int libwebsocket_create_server(int port,
if (n < 0) {
fprintf(stderr, "ERROR on binding to port %d (%d %d)\n",
port, n, errno);
return -1;
return NULL;
}
slen = sizeof cli_addr;
n = getsockname(fd, (struct sockaddr *)&cli_addr, &slen);
if (n < 0) {
fprintf(stderr, "getsockname failed\n");
return -1;
return NULL;
}
protocols[this->count_protocols].broadcast_socket_port =
ntohs(cli_addr.sin_port);
@ -429,53 +614,50 @@ int libwebsocket_create_server(int port,
this->fds_count++;
}
return this;
}
/*
* We will enter out poll and service loop now, just before that
* fork and return to caller for the main thread of execution
*/
/**
* libwebsockets_fork_service_loop() - Optional helper function forks off
* a process for the websocket server loop.
* You don't have to use this but if not, you
* have to make sure you are calling
* libwebsocket_service periodically to service
* the websocket traffic
* @this: server context returned by creation function
*/
n = fork();
if (n < 0) {
fprintf(stderr, "Failed to fork websocket poll loop\n");
return -1;
}
if (n) {
/* original process context */
int
libwebsockets_fork_service_loop(struct libwebsocket_context *this)
{
int client;
int fd;
struct sockaddr_in cli_addr;
int n;
/*
* before we return to caller, we set up per-protocol
* broadcast sockets connected to the server ready to use
*/
if (fork())
return 0;
/* give server fork a chance to start up */
usleep(500000);
for (client = 1; client < this->count_protocols + 1; client++) {
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
fprintf(stderr, "Unable to create socket\n");
return -1;
}
cli_addr.sin_family = AF_INET;
cli_addr.sin_port = htons(
protocols[client - 1].broadcast_socket_port);
cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
n = connect(fd, (struct sockaddr *)&cli_addr,
sizeof cli_addr);
if (n < 0) {
fprintf(stderr, "Unable to connect to "
"broadcast socket %d, %s\n",
client, strerror(errno));
return -1;
}
protocols[client - 1].broadcast_socket_user_fd = fd;
for (client = 1; client < this->count_protocols + 1; client++) {
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
fprintf(stderr, "Unable to create socket\n");
return -1;
}
cli_addr.sin_family = AF_INET;
cli_addr.sin_port = htons(
this->protocols[client - 1].broadcast_socket_port);
cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
n = connect(fd, (struct sockaddr *)&cli_addr,
sizeof cli_addr);
if (n < 0) {
fprintf(stderr, "Unable to connect to "
"broadcast socket %d, %s\n",
client, strerror(errno));
return -1;
}
fprintf(stderr, "libwebsocket poll process forked\n");
return 0;
this->protocols[client - 1].broadcast_socket_user_fd = fd;
}
/* we want a SIGHUP when our parent goes down */
@ -483,166 +665,9 @@ int libwebsocket_create_server(int port,
/* in this forked process, sit and service websocket connections */
while (1) {
n = poll(this->fds, this->fds_count, 1000);
if (n < 0 || this->fds[0].revents & (POLLERR | POLLHUP)) {
fprintf(stderr, "Listen Socket dead\n");
goto fatal;
}
if (n == 0) /* poll timeout */
continue;
/* handle accept on listening socket? */
for (client = 0; client < this->count_protocols + 1; client++) {
if (!this->fds[client].revents & POLLIN)
continue;
/* listen socket got an unencrypted connection... */
clilen = sizeof(cli_addr);
fd = accept(this->fds[client].fd,
(struct sockaddr *)&cli_addr, &clilen);
if (fd < 0) {
fprintf(stderr, "ERROR on accept");
continue;
}
if (this->fds_count >= MAX_CLIENTS) {
fprintf(stderr, "too busy");
close(fd);
continue;
}
if (client) {
/*
* accepting a connection to broadcast socket
* set wsi to be protocol index not pointer
*/
this->wsi[this->fds_count] =
(struct libwebsocket *)(long)(client - 1);
goto fill_in_fds;
}
/* accepting connection to main listener */
this->wsi[this->fds_count] =
malloc(sizeof(struct libwebsocket));
if (!this->wsi[this->fds_count])
return -1;
#ifdef LWS_OPENSSL_SUPPORT
if (this->use_ssl) {
this->wsi[this->fds_count]->ssl =
SSL_new(ssl_ctx);
if (this->wsi[this->fds_count]->ssl == NULL) {
fprintf(stderr, "SSL_new failed: %s\n",
ERR_error_string(SSL_get_error(
this->wsi[this->fds_count]->ssl, 0),
NULL));
free(this->wsi[this->fds_count]);
continue;
}
SSL_set_fd(this->wsi[this->fds_count]->ssl, fd);
n = SSL_accept(this->wsi[this->fds_count]->ssl);
if (n != 1) {
/*
* browsers seem to probe with various
* ssl params which fail then retry
* and succeed
*/
debug("SSL_accept failed skt %u: %s\n",
fd,
ERR_error_string(SSL_get_error(
this->wsi[this->fds_count]->ssl,
n), NULL));
SSL_free(
this->wsi[this->fds_count]->ssl);
free(this->wsi[this->fds_count]);
continue;
}
debug("accepted new SSL conn "
"port %u on fd=%d SSL ver %s\n",
ntohs(cli_addr.sin_port), fd,
SSL_get_version(this->wsi[
this->fds_count]->ssl));
} else
#endif
debug("accepted new conn port %u on fd=%d\n",
ntohs(cli_addr.sin_port), fd);
/* intialize the instance struct */
this->wsi[this->fds_count]->sock = fd;
this->wsi[this->fds_count]->state = WSI_STATE_HTTP;
this->wsi[this->fds_count]->name_buffer_pos = 0;
for (n = 0; n < WSI_TOKEN_COUNT; n++) {
this->wsi[this->fds_count]->
utf8_token[n].token = NULL;
this->wsi[this->fds_count]->
utf8_token[n].token_len = 0;
}
/*
* these can only be set once the protocol is known
* we set an unestablished connection's protocol pointer
* to the start of the supported list, so it can look
* for matching ones during the handshake
*/
this->wsi[this->fds_count]->protocol = protocols;
this->wsi[this->fds_count]->user_space = NULL;
/*
* Default protocol is 76 / 00
* After 76, there's a header specified to inform which
* draft the client wants, when that's seen we modify
* the individual connection's spec revision accordingly
*/
this->wsi[this->fds_count]->ietf_spec_revision = 0;
fill_in_fds:
/*
* make sure NO events are seen yet on this new socket
* (otherwise we inherit old fds[client].revents from
* previous socket there and die mysteriously! )
*/
this->fds[this->fds_count].revents = 0;
this->fds[this->fds_count].events = POLLIN;
this->fds[this->fds_count++].fd = fd;
}
/* service anything incoming on websocket connection */
libwebsocket_poll_connections(this);
}
fatal:
/* close listening skt and per-protocol broadcast sockets */
for (client = 0; client < this->fds_count; client++)
close(this->fds[0].fd);
#ifdef LWS_OPENSSL_SUPPORT
SSL_CTX_free(ssl_ctx);
#endif
kill(0, SIGTERM);
if (this)
free(this);
while (1)
if (libwebsocket_service(this, 1000))
return -1;
return 0;
}
@ -664,7 +689,7 @@ libwebsockets_get_protocol(struct libwebsocket *wsi)
}
/**
* libwebsockets_broadcast() - Sends a buffer to rthe callback for all active
* libwebsockets_broadcast() - Sends a buffer to the callback for all active
* connections of the given protocol.
* @protocol: pointer to the protocol you will broadcast to all members of
* @buf: buffer containing the data to be broadcase. NOTE: this has to be
@ -694,7 +719,8 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
if (!protocol->broadcast_socket_user_fd) {
/*
* we are being called from poll thread context
* We are either running unforked / flat, or we are being
* called from poll thread context
* eg, from a callback. In that case don't use sockets for
* broadcast IPC (since we can't open a socket connection to
* a socket listening on our own thread) but directly do the

View file

@ -134,11 +134,18 @@ struct libwebsocket_protocols {
int protocol_index;
};
extern int libwebsocket_create_server(int port,
extern struct libwebsocket_context *
libwebsocket_create_server(int port,
struct libwebsocket_protocols *protocols,
const char *ssl_cert_filepath,
const char *ssl_private_key_filepath, int gid, int uid);
extern int
libwebsockets_fork_service_loop(struct libwebsocket_context *this);
extern int
libwebsocket_service(struct libwebsocket_context *this, int timeout_ms);
/*
* IMPORTANT NOTICE!
*

View file

@ -151,6 +151,8 @@ struct lws_tokens {
int token_len;
};
struct libwebsocket_protocols;
struct libwebsocket_context {
struct libwebsocket *wsi[MAX_CLIENTS + 1];
struct pollfd fds[MAX_CLIENTS + 1];
@ -158,6 +160,7 @@ struct libwebsocket_context {
#ifdef LWS_OPENSSL_SUPPORT
int use_ssl;
#endif
struct libwebsocket_protocols *protocols;
int count_protocols;
};

View file

@ -1,5 +1,5 @@
<h2>libwebsocket_create_server - Create the listening websockets server</h2>
<i>int</i>
<i>struct libwebsocket_context *</i>
<b>libwebsocket_create_server</b>
(<i>int</i> <b>port</b>,
<i>struct libwebsocket_protocols *</i> <b>protocols</b>,
@ -33,10 +33,11 @@ else ignored
This function creates the listening socket and takes care
of all initialization in one step.
<p>
After initialization, it forks a thread that will sits in a service loop
and returns to the caller. The actual service actions are performed by
user code in a per-protocol callback from the appropriate one selected
by the client from the list in <tt><b>protocols</b></tt>.
After initialization, it returns a struct libwebsocket_context * that
represents this server. After calling, user code needs to take care
of calling <b>libwebsocket_service</b> with the context pointer to get the
server's sockets serviced. This can be done in the same process context
or a forked process, or another thread,
<p>
The protocol callback functions are called for a handful of events
including http requests coming in, websocket connections becoming
@ -55,6 +56,16 @@ images or whatever over http and dynamic data over websockets all in
one place; they're all handled in the user callback.
</blockquote>
<hr>
<h2>libwebsockets_fork_service_loop - Optional helper function forks off a process for the websocket server loop. You don't have to use this but if not, you have to make sure you are calling libwebsocket_service periodically to service the websocket traffic</h2>
<i>int</i>
<b>libwebsockets_fork_service_loop</b>
(<i>struct libwebsocket_context *</i> <b>this</b>)
<h3>Arguments</h3>
<dl>
<dt><b>this</b>
<dd>server context returned by creation function
</dl>
<hr>
<h2>libwebsockets_get_protocol - Returns a protocol pointer from a websocket connection.</h2>
<i>const struct libwebsocket_protocols *</i>
<b>libwebsockets_get_protocol</b>
@ -71,7 +82,7 @@ This is useful to get the protocol to broadcast back to from inside
the callback.
</blockquote>
<hr>
<h2>libwebsockets_broadcast - Sends a buffer to rthe callback for all active connections of the given protocol.</h2>
<h2>libwebsockets_broadcast - Sends a buffer to the callback for all active connections of the given protocol.</h2>
<i>int</i>
<b>libwebsockets_broadcast</b>
(<i>const struct libwebsocket_protocols *</i> <b>protocol</b>,

View file

@ -230,6 +230,7 @@ int main(int argc, char **argv)
LWS_SEND_BUFFER_POST_PADDING];
int port = 7681;
int use_ssl = 0;
struct libwebsocket_context *server;
fprintf(stderr, "libwebsockets test server\n"
"(C) Copyright 2010 Andy Green <andy@warmcat.com> "
@ -256,8 +257,9 @@ int main(int argc, char **argv)
if (!use_ssl)
cert_path = key_path = NULL;
if (libwebsocket_create_server(port, protocols, cert_path, key_path,
-1, -1) < 0) {
server = libwebsocket_create_server(port, protocols, cert_path,
key_path, -1, -1);
if (server == NULL) {
fprintf(stderr, "libwebsocket init failed\n");
return -1;
}
@ -288,6 +290,20 @@ int main(int argc, char **argv)
libwebsockets_broadcast(&protocols[PROTOCOL_DUMB_INCREMENT],
&buf[LWS_SEND_BUFFER_PRE_PADDING], 1);
/*
* This example server does not fork or create a thread for
* websocket service, it all runs in this single loop. So,
* we have to give the websockets an opportunity to service
* "manually".
*
* There's an optional call libwebsockets_fork_service_loop()
* we could have used before this while loop, then the
* websockets would have been serviced in a forked process
* and we would not have to do the call below inside our loop.
*/
libwebsocket_service(server, 0);
}
return 0;