diff --git a/README.coding b/README.coding index 0cfb07a8..05bc09fc 100644 --- a/README.coding +++ b/README.coding @@ -25,6 +25,26 @@ similar to change the avaiable number of file descriptors, and when restarted libwebsockets will adapt accordingly. +Procedure for sending data from other threads or process contexts +----------------------------------------------------------------- + +Libwebsockets is carefully designed to work with no blocking in a single thread. +In some cases where you will add libwebsockets to something else that uses the +same single thread approach, you can so a safe implementation by combining the +poll() loops as described in "External Polling loop support" below. + +In other cases, you find you have asynchronous events coming from other thread +or process contexts and there's not much you can do about it. If you just try +to randomly send, or broadcast using libwebsockets_broadcast() from these other +places things will blow up either quickly or when the events on the two threads +interefere with each other. It's not legal to do this. + +For those situations, you can use libwebsockets_broadcast_foreign(). This +serializes the data you're sending using a private, per-protocol socket, so the +service thread picks it up when it's ready, and it is serviced from the service +thread context only. + + Fragmented messages ------------------- @@ -55,6 +75,7 @@ to gather the whole contents of a message, eg: The test app llibwebsockets-test-fraggle sources also show how to deal with fragmented messages. + Debug Logging ------------- diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 188bb271..b2f4e91e 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -1974,10 +1974,7 @@ libwebsocket_create_context(int port, const char *interf, int libwebsockets_fork_service_loop(struct libwebsocket_context *context) { - int fd; - struct sockaddr_in cli_addr; int n; - int p; n = fork(); if (n < 0) @@ -1987,33 +1984,6 @@ libwebsockets_fork_service_loop(struct libwebsocket_context *context) /* main process context */ - /* - * set up the proxy sockets to allow broadcast from - * service process context - */ - - for (p = 0; p < context->count_protocols; p++) { - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - lwsl_err("Unable to create socket\n"); - return -1; - } - cli_addr.sin_family = AF_INET; - cli_addr.sin_port = htons( - context->protocols[p].broadcast_socket_port); - cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); - n = connect(fd, (struct sockaddr *)&cli_addr, - sizeof cli_addr); - if (n < 0) { - lwsl_err("Unable to connect to " - "broadcast socket %d, %s\n", - n, strerror(errno)); - return -1; - } - - context->protocols[p].broadcast_socket_user_fd = fd; - } - return 0; } @@ -2067,8 +2037,8 @@ libwebsockets_get_protocol(struct libwebsocket *wsi) } /** - * libwebsockets_broadcast() - Sends a buffer to the callback for all active - * connections of the given protocol. + * libwebsockets_broadcast() - Sends a buffer to tx callback for all connections of given protocol from single thread + * * @protocol: pointer to the protocol you will broadcast to all members of * @buf: buffer containing the data to be broadcase. NOTE: this has to be * allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before @@ -2082,9 +2052,9 @@ libwebsockets_get_protocol(struct libwebsocket *wsi) * wants to actually send the data for that connection, the callback itself * should call libwebsocket_write(). * - * libwebsockets_broadcast() can be called from another fork context without - * having to take any care about data visibility between the processes, it'll - * "just work". + * This version only works from the same thread / process context as the service + * loop. Use libwesockets_broadcast_foreign(...) to do the same job from a different + * thread in a safe way. */ @@ -2099,53 +2069,84 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, if (!context) return 1; -#ifndef LWS_NO_FORK - if (!protocol->broadcast_socket_user_fd) { -#endif + /* + * We are either running unforked / flat, or we are being + * called from poll thread context + * eg, from a callback. In that case don't use sockets for + * broadcast IPC (since we can't open a socket connection to + * a socket listening on our own thread) but directly do the + * send action. + * + * Locking is not needed because we are by definition being + * called in the poll thread context and are serialized. + */ + + for (n = 0; n < context->fds_count; n++) { + + wsi = context->lws_lookup[context->fds[n].fd]; + if (!wsi) + continue; + + if (wsi->mode != LWS_CONNMODE_WS_SERVING) + continue; + /* - * We are either running unforked / flat, or we are being - * called from poll thread context - * eg, from a callback. In that case don't use sockets for - * broadcast IPC (since we can't open a socket connection to - * a socket listening on our own thread) but directly do the - * send action. - * - * Locking is not needed because we are by definition being - * called in the poll thread context and are serialized. + * never broadcast to non-established connections */ + if (wsi->state != WSI_STATE_ESTABLISHED) + continue; - for (n = 0; n < context->fds_count; n++) { + /* only broadcast to guys using + * requested protocol + */ + if (wsi->protocol != protocol) + continue; - wsi = context->lws_lookup[context->fds[n].fd]; - if (!wsi) - continue; - - if (wsi->mode != LWS_CONNMODE_WS_SERVING) - continue; - - /* - * never broadcast to non-established connections - */ - if (wsi->state != WSI_STATE_ESTABLISHED) - continue; - - /* only broadcast to guys using - * requested protocol - */ - if (wsi->protocol != protocol) - continue; - - user_callback_handle_rxflow(wsi->protocol->callback, - context, wsi, - LWS_CALLBACK_BROADCAST, - wsi->user_space, - buf, len); - } - - return 0; -#ifndef LWS_NO_FORK + user_callback_handle_rxflow(wsi->protocol->callback, + context, wsi, + LWS_CALLBACK_BROADCAST, + wsi->user_space, + buf, len); } + return 0; +} + + +#ifndef LWS_NO_FORK +/** + * libwebsockets_broadcast_foreign() - Sends a buffer to the callback for all active + * connections of the given protocol. + * @protocol: pointer to the protocol you will broadcast to all members of + * @buf: buffer containing the data to be broadcase. NOTE: this has to be + * allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before + * the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the + * case you are calling this function from callback context. + * @len: length of payload data in buf, starting from buf. + * + * This function allows bulk sending of a packet to every connection using + * the given protocol. It does not send the data directly; instead it calls + * the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback + * wants to actually send the data for that connection, the callback itself + * should call libwebsocket_write(). + * + * This ..._foreign() version is designed to be randomly called from other thread or + * process contexts than the main libwebsocket service one. A private socket is used + * to serialize accesses here with the main service loop. + */ + +int +libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol, + unsigned char *buf, size_t len) +{ + struct libwebsocket_context *context = protocol->owning_server; + int n; + int fd; + struct sockaddr_in cli_addr; + + if (!context) + return 1; + /* * We're being called from a different process context than the server * loop. Instead of broadcasting directly, we send our @@ -2156,11 +2157,37 @@ libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, * set up when the websocket server initializes */ + + /* + * autoconnect to this protocol's broadcast proxy socket for this + * thread if needed + */ + + if (protocol->broadcast_socket_user_fd <= 0) { + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + lwsl_err("Unable to create socket\n"); + return -1; + } + cli_addr.sin_family = AF_INET; + cli_addr.sin_port = htons(protocol->broadcast_socket_port); + cli_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + n = connect(fd, (struct sockaddr *)&cli_addr, sizeof cli_addr); + if (n < 0) { + lwsl_err("Unable to connect to " + "broadcast socket %d, %s\n", + n, strerror(errno)); + return -1; + } + + protocol->broadcast_socket_user_fd = fd; + } + n = send(protocol->broadcast_socket_user_fd, buf, len, MSG_NOSIGNAL); return n; -#endif } +#endif int libwebsocket_is_final_fragment(struct libwebsocket *wsi) diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index f1395ef1..fadbdd19 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -790,6 +790,12 @@ LWS_EXTERN int libwebsockets_broadcast(const struct libwebsocket_protocols *protocol, unsigned char *buf, size_t len); +/* notice - you need the pre- and post- padding allocation for buf below */ + +LWS_EXTERN int +libwebsockets_broadcast_foreign(struct libwebsocket_protocols *protocol, + unsigned char *buf, size_t len); + LWS_EXTERN const struct libwebsocket_protocols * libwebsockets_get_protocol(struct libwebsocket *wsi); diff --git a/libwebsockets-api-doc.html b/libwebsockets-api-doc.html index e240eed6..156afe79 100644 --- a/libwebsockets-api-doc.html +++ b/libwebsockets-api-doc.html @@ -426,7 +426,7 @@ This is useful to get the protocol to broadcast back to from inside the callback.
-

libwebsockets_broadcast - Sends a buffer to the callback for all active connections of the given protocol.

+

libwebsockets_broadcast - Sends a buffer to tx callback for all connections of given protocol from single thread

int libwebsockets_broadcast (const struct libwebsocket_protocols * protocol, @@ -452,9 +452,40 @@ the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback wants to actually send the data for that connection, the callback itself should call libwebsocket_write.

-libwebsockets_broadcast can be called from another fork context without -having to take any care about data visibility between the processes, it'll -"just work". +This version only works from the same thread / process context as the service +loop. Use libwesockets_broadcast_foreign(...) to do the same job from a different +thread in a safe way. + +


+

libwebsockets_broadcast_foreign - Sends a buffer to the callback for all active connections of the given protocol.

+int +libwebsockets_broadcast_foreign +(struct libwebsocket_protocols * protocol, +unsigned char * buf, +size_t len) +

Arguments

+
+
protocol +
pointer to the protocol you will broadcast to all members of +
buf +
buffer containing the data to be broadcase. NOTE: this has to be +allocated with LWS_SEND_BUFFER_PRE_PADDING valid bytes before +the pointer and LWS_SEND_BUFFER_POST_PADDING afterwards in the +case you are calling this function from callback context. +
len +
length of payload data in buf, starting from buf. +
+

Description

+
+This function allows bulk sending of a packet to every connection using +the given protocol. It does not send the data directly; instead it calls +the callback with a reason type of LWS_CALLBACK_BROADCAST. If the callback +wants to actually send the data for that connection, the callback itself +should call libwebsocket_write. +

+This ..._foreign version is designed to be randomly called from other thread or +process contexts than the main libwebsocket service one. A private socket is used +to serialize accesses here with the main service loop.


lws_confirm_legit_wsi -

diff --git a/test-server/test-server.c b/test-server/test-server.c index 4e90b34e..2fa0eae9 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -717,7 +717,7 @@ int main(int argc, char **argv) * We take care of pre-and-post padding allocation. */ - libwebsockets_broadcast(&protocols[PROTOCOL_DUMB_INCREMENT], + libwebsockets_broadcast_foreign(&protocols[PROTOCOL_DUMB_INCREMENT], &buf[LWS_SEND_BUFFER_PRE_PADDING], 1); }