1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

autocreate foreign broadcast sockets on broadcast

Also introduce libwebsockets_broadcast_foreign() separate from libwebsockets_broadcast()

Signed-off-by: Andy Green <andy.green@linaro.org>
This commit is contained in:
Andy Green 2013-01-25 17:34:15 +08:00
parent fefc6585db
commit 52f28ce67a
5 changed files with 167 additions and 82 deletions

View file

@ -25,6 +25,26 @@ similar to change the avaiable number of file descriptors, and when restarted
libwebsockets will adapt accordingly.
Procedure for sending data from other threads or process contexts
-----------------------------------------------------------------
Libwebsockets is carefully designed to work with no blocking in a single thread.
In some cases where you will add libwebsockets to something else that uses the
same single thread approach, you can so a safe implementation by combining the
poll() loops as described in "External Polling loop support" below.
In other cases, you find you have asynchronous events coming from other thread
or process contexts and there's not much you can do about it. If you just try
to randomly send, or broadcast using libwebsockets_broadcast() from these other
places things will blow up either quickly or when the events on the two threads
interefere with each other. It's not legal to do this.
For those situations, you can use libwebsockets_broadcast_foreign(). This
serializes the data you're sending using a private, per-protocol socket, so the
service thread picks it up when it's ready, and it is serviced from the service
thread context only.
Fragmented messages
-------------------
@ -55,6 +75,7 @@ to gather the whole contents of a message, eg:
The test app llibwebsockets-test-fraggle sources also show how to
deal with fragmented messages.
Debug Logging
-------------

View file

@ -1974,10 +1974,7 @@ libwebsocket_create_context(int port, const char *interf,
int
libwebsockets_fork_service_loop(struct libwebsocket_context *context)
{
int fd;
struct sockaddr_in cli_addr;
int n;
int p;
n = fork();
if (n < 0)
@ -1987,33 +1984,6 @@ libwebsockets_fork_service_loop(struct libwebsocket_context *context)
/* main process context */
/*
* set up the proxy sockets to allow broadcast from
* service process context
*/
for (p = 0; p < context->count_protocols; p++) {
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
lwsl_err("Unable to create socket\n");
return -1;
}
cli_addr.sin_family = AF_INET;
cli_addr.sin_port = htons(
context->protocols[p].broadcast_socket_port);
cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
n = connect(fd, (struct sockaddr *)&cli_addr,
sizeof cli_addr);
if (n < 0) {
lwsl_err("Unable to connect to "
"broadcast socket %d, %s\n",
n, strerror(errno));
return -1;
}
context->protocols[p].broadcast_socket_user_fd = fd;
}
return 0;
}
@ -2067,8 +2037,8 @@ libwebsockets_get_protocol(struct libwebsocket *wsi)
}
/**
* libwebsockets_broadcast() - Sends a buffer to the callback for all active
* connections of the given protocol.
* libwebsockets_broadcast() - Sends a buffer to tx callback for all connections of given protocol from single thread
*
* @protocol: pointer to the protocol you will broadcast to all members of
* @buf: buffer containing the data to be broadcase. NOTE: this has to be
* allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
@ -2082,9 +2052,9 @@ libwebsockets_get_protocol(struct libwebsocket *wsi)
* wants to actually send the data for that connection, the callback itself
* should call libwebsocket_write().
*
* libwebsockets_broadcast() can be called from another fork context without
* having to take any care about data visibility between the processes, it'll
* "just work".
* This version only works from the same thread / process context as the service
* loop. Use libwesockets_broadcast_foreign(...) to do the same job from a different
* thread in a safe way.
*/
@ -2099,53 +2069,84 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
if (!context)
return 1;
#ifndef LWS_NO_FORK
if (!protocol->broadcast_socket_user_fd) {
#endif
/*
* We are either running unforked / flat, or we are being
* called from poll thread context
* eg, from a callback. In that case don't use sockets for
* broadcast IPC (since we can't open a socket connection to
* a socket listening on our own thread) but directly do the
* send action.
*
* Locking is not needed because we are by definition being
* called in the poll thread context and are serialized.
*/
for (n = 0; n < context->fds_count; n++) {
wsi = context->lws_lookup[context->fds[n].fd];
if (!wsi)
continue;
if (wsi->mode != LWS_CONNMODE_WS_SERVING)
continue;
/*
* We are either running unforked / flat, or we are being
* called from poll thread context
* eg, from a callback. In that case don't use sockets for
* broadcast IPC (since we can't open a socket connection to
* a socket listening on our own thread) but directly do the
* send action.
*
* Locking is not needed because we are by definition being
* called in the poll thread context and are serialized.
* never broadcast to non-established connections
*/
if (wsi->state != WSI_STATE_ESTABLISHED)
continue;
for (n = 0; n < context->fds_count; n++) {
/* only broadcast to guys using
* requested protocol
*/
if (wsi->protocol != protocol)
continue;
wsi = context->lws_lookup[context->fds[n].fd];
if (!wsi)
continue;
if (wsi->mode != LWS_CONNMODE_WS_SERVING)
continue;
/*
* never broadcast to non-established connections
*/
if (wsi->state != WSI_STATE_ESTABLISHED)
continue;
/* only broadcast to guys using
* requested protocol
*/
if (wsi->protocol != protocol)
continue;
user_callback_handle_rxflow(wsi->protocol->callback,
context, wsi,
LWS_CALLBACK_BROADCAST,
wsi->user_space,
buf, len);
}
return 0;
#ifndef LWS_NO_FORK
user_callback_handle_rxflow(wsi->protocol->callback,
context, wsi,
LWS_CALLBACK_BROADCAST,
wsi->user_space,
buf, len);
}
return 0;
}
#ifndef LWS_NO_FORK
/**
* libwebsockets_broadcast_foreign() - Sends a buffer to the callback for all active
* connections of the given protocol.
* @protocol: pointer to the protocol you will broadcast to all members of
* @buf: buffer containing the data to be broadcase. NOTE: this has to be
* allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
* the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the
* case you are calling this function from callback context.
* @len: length of payload data in buf, starting from buf.
*
* This function allows bulk sending of a packet to every connection using
* the given protocol. It does not send the data directly; instead it calls
* the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback
* wants to actually send the data for that connection, the callback itself
* should call libwebsocket_write().
*
* This ..._foreign() version is designed to be randomly called from other thread or
* process contexts than the main libwebsocket service one. A private socket is used
* to serialize accesses here with the main service loop.
*/
int
libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol,
unsigned char *buf, size_t len)
{
struct libwebsocket_context *context = protocol->owning_server;
int n;
int fd;
struct sockaddr_in cli_addr;
if (!context)
return 1;
/*
* We're being called from a different process context than the server
* loop. Instead of broadcasting directly, we send our
@ -2156,11 +2157,37 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
* set up when the websocket server initializes
*/
/*
* autoconnect to this protocol's broadcast proxy socket for this
* thread if needed
*/
if (protocol->broadcast_socket_user_fd <= 0) {
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
lwsl_err("Unable to create socket\n");
return -1;
}
cli_addr.sin_family = AF_INET;
cli_addr.sin_port = htons(protocol->broadcast_socket_port);
cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
n = connect(fd, (struct sockaddr *)&cli_addr, sizeof cli_addr);
if (n < 0) {
lwsl_err("Unable to connect to "
"broadcast socket %d, %s\n",
n, strerror(errno));
return -1;
}
protocol->broadcast_socket_user_fd = fd;
}
n = send(protocol->broadcast_socket_user_fd, buf, len, MSG_NOSIGNAL);
return n;
#endif
}
#endif
int
libwebsocket_is_final_fragment(struct libwebsocket *wsi)

View file

@ -790,6 +790,12 @@ LWS_EXTERN int
libwebsockets_broadcast(const struct libwebsocket_protocols *protocol,
unsigned char *buf, size_t len);
/* notice - you need the pre- and post- padding allocation for buf below */
LWS_EXTERN int
libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol,
unsigned char *buf, size_t len);
LWS_EXTERN const struct libwebsocket_protocols *
libwebsockets_get_protocol(struct libwebsocket *wsi);

View file

@ -426,7 +426,7 @@ This is useful to get the protocol to broadcast back to from inside
the callback.
</blockquote>
<hr>
<h2>libwebsockets_broadcast - Sends a buffer to the callback for all active connections of the given protocol.</h2>
<h2>libwebsockets_broadcast - Sends a buffer to tx callback for all connections of given protocol from single thread</h2>
<i>int</i>
<b>libwebsockets_broadcast</b>
(<i>const struct libwebsocket_protocols *</i> <b>protocol</b>,
@ -452,9 +452,40 @@ the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback
wants to actually send the data for that connection, the callback itself
should call <b>libwebsocket_write</b>.
<p>
<b>libwebsockets_broadcast</b> can be called from another fork context without
having to take any care about data visibility between the processes, it'll
"just work".
This version only works from the same thread / process context as the service
loop. Use libwesockets_broadcast_foreign(...) to do the same job from a different
thread in a safe way.
</blockquote>
<hr>
<h2>libwebsockets_broadcast_foreign - Sends a buffer to the callback for all active connections of the given protocol.</h2>
<i>int</i>
<b>libwebsockets_broadcast_foreign</b>
(<i>struct libwebsocket_protocols *</i> <b>protocol</b>,
<i>unsigned char *</i> <b>buf</b>,
<i>size_t</i> <b>len</b>)
<h3>Arguments</h3>
<dl>
<dt><b>protocol</b>
<dd>pointer to the protocol you will broadcast to all members of
<dt><b>buf</b>
<dd>buffer containing the data to be broadcase. NOTE: this has to be
allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before
the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the
case you are calling this function from callback context.
<dt><b>len</b>
<dd>length of payload data in buf, starting from buf.
</dl>
<h3>Description</h3>
<blockquote>
This function allows bulk sending of a packet to every connection using
the given protocol. It does not send the data directly; instead it calls
the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback
wants to actually send the data for that connection, the callback itself
should call <b>libwebsocket_write</b>.
<p>
This ...<b>_foreign</b> version is designed to be randomly called from other thread or
process contexts than the main libwebsocket service one. A private socket is used
to serialize accesses here with the main service loop.
</blockquote>
<hr>
<h2>lws_confirm_legit_wsi - </h2>

View file

@ -717,7 +717,7 @@ int main(int argc, char **argv)
* We take care of pre-and-post padding allocation.
*/
libwebsockets_broadcast(&protocols[PROTOCOL_DUMB_INCREMENT],
libwebsockets_broadcast_foreign(&protocols[PROTOCOL_DUMB_INCREMENT],
&buf[LWS_SEND_BUFFER_PRE_PADDING], 1);
}