diff --git a/lib/context.c b/lib/context.c index d309906a..c78684a8 100644 --- a/lib/context.c +++ b/lib/context.c @@ -909,7 +909,7 @@ lws_cancel_service(struct lws_context *context) struct lws_context_per_thread *pt = &context->pt[0]; short m = context->count_threads; - lwsl_notice("%s\n", __func__); + lwsl_info("%s\n", __func__); while (m--) { if (pt->pipe_wsi) diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index ea483f23..a5833475 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -2354,7 +2354,7 @@ lws_finalize_startup(struct lws_context *context); * * Returns NULL, or a pointer to the name pvo in the linked-list */ -const struct lws_protocol_vhost_options * +LWS_VISIBLE LWS_EXTERN const struct lws_protocol_vhost_options * lws_pvo_search(const struct lws_protocol_vhost_options *pvo, const char *name); LWS_VISIBLE LWS_EXTERN int @@ -6554,6 +6554,7 @@ struct lejp_ctx; #ifndef ARRAY_SIZE #define ARRAY_SIZE(_x) (sizeof(_x) / sizeof(_x[0])) #endif +#define LWS_ARRAY_SIZE(_x) (sizeof(_x) / sizeof(_x[0])) #define LEJP_FLAG_WS_KEEP 64 #define LEJP_FLAG_WS_COMMENTLINE 32 diff --git a/minimal-examples/minimal-ws-server-threads/CMakeLists.txt b/minimal-examples/minimal-ws-server-threads/CMakeLists.txt new file mode 100644 index 00000000..0849c579 --- /dev/null +++ b/minimal-examples/minimal-ws-server-threads/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 2.8) +include(CheckIncludeFile) + +set(SAMP lws-minimal-ws-server-threads) +set(SRCS minimal-ws-server.c) + +if (UNIX) + set(CMAKE_C_FLAGS "-Wall -Wsign-compare -Wignored-qualifiers -Wtype-limits -Wuninitialized -Werror -Wundef ${CMAKE_C_FLAGS}" ) +endif() + +CHECK_INCLUDE_FILE(pthread.h LWS_HAVE_PTHREAD_H) +if (NOT LWS_HAVE_PTHREAD_H) + message(FATAL_ERROR "threading support requires pthreads") +endif() + +add_executable(${SAMP} ${SRCS}) +target_link_libraries(${SAMP} -lwebsockets -pthread) diff --git a/minimal-examples/minimal-ws-server-threads/README.md b/minimal-examples/minimal-ws-server-threads/README.md new file mode 100644 index 00000000..123b7bbf --- /dev/null +++ b/minimal-examples/minimal-ws-server-threads/README.md @@ -0,0 +1,25 @@ +# lws minimal ws server (threads) + +## build + +``` + $ cmake . && make +``` + +Pthreads is required on your system. + +## usage + +``` + $ ./lws-minimal-ws-server-threads +[2018/03/13 13:09:52:2208] USER: LWS minimal ws server + threads | visit http://localhost:7681 +[2018/03/13 13:09:52:2365] NOTICE: Creating Vhost 'default' port 7681, 2 protocols, IPv6 off +``` + +Visit http://localhost:7681 on multiple browser windows + +Two asynchronous threads generate strings and add them to a ringbuffer, +signalling lws to send new entries to all the browser windows. + +This demonstrates how to safely manage asynchronously generated content +and hook it up to the lws service thread. diff --git a/minimal-examples/minimal-ws-server-threads/minimal-ws-server.c b/minimal-examples/minimal-ws-server-threads/minimal-ws-server.c new file mode 100644 index 00000000..90773454 --- /dev/null +++ b/minimal-examples/minimal-ws-server-threads/minimal-ws-server.c @@ -0,0 +1,117 @@ +/* + * lws-minimal-ws-server + * + * Copyright (C) 2018 Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + * + * This demonstrates a minimal ws server that can cooperate with + * other threads cleanly. Two other threads are started, which fill + * a ringbuffer with strings at 10Hz. + * + * The actual work and thread spawning etc are done in the protocol + * implementation in protocol_lws_minimal.c. + * + * To keep it simple, it serves stuff in the subdirectory "./mount-origin" of + * the directory it was started in. + * You can change that by changing mount.origin. + */ + +#include +#include +#include + +#define LWS_PLUGIN_STATIC +#include "protocol_lws_minimal.c" + +static struct lws_protocols protocols[] = { + { "http", lws_callback_http_dummy, 0, 0 }, + LWS_PLUGIN_PROTOCOL_MINIMAL, + { NULL, NULL, 0, 0 } /* terminator */ +}; + +static int interrupted; + +static const struct lws_http_mount mount = { + /* .mount_next */ NULL, /* linked-list "next" */ + /* .mountpoint */ "/", /* mountpoint URL */ + /* .origin */ "./mount-origin", /* serve from dir */ + /* .def */ "index.html", /* default filename */ + /* .protocol */ NULL, + /* .cgienv */ NULL, + /* .extra_mimetypes */ NULL, + /* .interpret */ NULL, + /* .cgi_timeout */ 0, + /* .cache_max_age */ 0, + /* .auth_mask */ 0, + /* .cache_reusable */ 0, + /* .cache_revalidate */ 0, + /* .cache_intermediaries */ 0, + /* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */ + /* .mountpoint_len */ 1, /* char count */ + /* .basic_auth_login_file */ NULL, +}; + +/* + * This demonstrates how to pass a pointer into a specific protocol handler + * running on a specific vhost. In this case, it's our default vhost and + * we pass the pvo named "config" with the value a const char * "myconfig". + * + * This is the preferred way to pass configuration into a specific vhost + + * protocol instance. + */ + +static const struct lws_protocol_vhost_options pvo_ops = { + NULL, + NULL, + "config", /* pvo name */ + (void *)"myconfig" /* pvo value */ +}; + +static const struct lws_protocol_vhost_options pvo = { + NULL, /* "next" pvo linked-list */ + &pvo_ops, /* "child" pvo linked-list */ + "lws-minimal", /* protocol name we belong to on this vhost */ + "" /* ignored */ +}; + +void sigint_handler(int sig) +{ + interrupted = 1; +} + +int main(int argc, char **argv) +{ + struct lws_context_creation_info info; + struct lws_context *context; + + signal(SIGINT, sigint_handler); + + memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ + info.port = 7681; + info.mounts = &mount; + info.protocols = protocols; + info.pvo = &pvo; /* per-vhost options */ + + lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_USER + /* | LLL_INFO */ /* | LLL_DEBUG */, NULL); + + lwsl_user("LWS minimal ws server + threads | visit http://localhost:7681\n"); + + context = lws_create_context(&info); + if (!context) { + lwsl_err("lws init failed\n"); + return 1; + } + + /* start the threads that create content */ + + while (!interrupted) + if (lws_service(context, 1000)) + interrupted = 1; + + lws_context_destroy(context); + + return 0; +} diff --git a/minimal-examples/minimal-ws-server-threads/mount-origin/favicon.ico b/minimal-examples/minimal-ws-server-threads/mount-origin/favicon.ico new file mode 100644 index 00000000..c0cc2e3d Binary files /dev/null and b/minimal-examples/minimal-ws-server-threads/mount-origin/favicon.ico differ diff --git a/minimal-examples/minimal-ws-server-threads/mount-origin/index.html b/minimal-examples/minimal-ws-server-threads/mount-origin/index.html new file mode 100644 index 00000000..5f6b28e4 --- /dev/null +++ b/minimal-examples/minimal-ws-server-threads/mount-origin/index.html @@ -0,0 +1,90 @@ + + + +
+ + Minimal ws server threads example.
+ Strings generated by server threads are sent to + all browsers open on this page.
+ The textarea show the last 50 lines received. +
+
+
+ + + + + + diff --git a/minimal-examples/minimal-ws-server-threads/mount-origin/libwebsockets.org-logo.png b/minimal-examples/minimal-ws-server-threads/mount-origin/libwebsockets.org-logo.png new file mode 100644 index 00000000..2060a10c Binary files /dev/null and b/minimal-examples/minimal-ws-server-threads/mount-origin/libwebsockets.org-logo.png differ diff --git a/minimal-examples/minimal-ws-server-threads/protocol_lws_minimal.c b/minimal-examples/minimal-ws-server-threads/protocol_lws_minimal.c new file mode 100644 index 00000000..f648c888 --- /dev/null +++ b/minimal-examples/minimal-ws-server-threads/protocol_lws_minimal.c @@ -0,0 +1,341 @@ +/* + * ws protocol handler plugin for "lws-minimal" demonstrating multithread + * + * Copyright (C) 2010-2018 Andy Green + * + * This file is made available under the Creative Commons CC0 1.0 + * Universal Public Domain Dedication. + */ + +#if !defined (LWS_PLUGIN_STATIC) +#define LWS_DLL +#define LWS_INTERNAL +#include +#endif + +#include + +/* one of these created for each message in the ringbuffer */ + +struct msg { + void *payload; /* is malloc'd */ + size_t len; +}; + +/* + * One of these is created for each client connecting to us. + * + * It is ONLY read or written from the lws service thread context. + */ + +struct per_session_data__minimal { + struct per_session_data__minimal *pss_list; + struct lws *wsi; + uint32_t tail; +}; + +/* one of these is created for each vhost our protocol is used with */ + +struct per_vhost_data__minimal { + struct lws_context *context; + struct lws_vhost *vhost; + const struct lws_protocols *protocol; + + struct per_session_data__minimal *pss_list; /* linked-list of live pss*/ + pthread_t pthread_spam[2]; + + pthread_mutex_t lock_ring; /* serialize access to the ring buffer */ + struct lws_ring *ring; /* {lock_ring} ringbuffer holding unsent content */ + + const char *config; + char finished; +}; + +/* + * This runs under both lws service and "spam threads" contexts. + * Access is serialized by vhd->lock_ring. + */ + +static void +__minimal_destroy_message(void *_msg) +{ + struct msg *msg = _msg; + + free(msg->payload); + msg->payload = NULL; + msg->len = 0; +} + +/* + * This runs under the "spam thread" thread context only. + * + * We spawn two threads that generate messages with this. + * + */ + +static void * +thread_spam(void *d) +{ + struct per_vhost_data__minimal *vhd = + (struct per_vhost_data__minimal *)d; + struct msg amsg; + int len = 128, index = 1, n; + + do { + /* don't generate output if nobody connected */ + if (!vhd->pss_list) + goto wait; + + pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ + + /* only create if space in ringbuffer */ + n = (int)lws_ring_get_count_free_elements(vhd->ring); + if (!n) { + lwsl_user("dropping!\n"); + goto wait_unlock; + } + + amsg.payload = malloc(LWS_PRE + len); + if (!amsg.payload) { + lwsl_user("OOM: dropping\n"); + goto wait_unlock; + } + n = lws_snprintf((char *)amsg.payload + LWS_PRE, len, + "%s: tid: %p, msg: %d", vhd->config, + (void *)pthread_self(), index++); + amsg.len = n; + n = lws_ring_insert(vhd->ring, &amsg, 1); + if (n != 1) { + __minimal_destroy_message(&amsg); + lwsl_user("dropping!\n"); + } else + /* + * This will cause a LWS_CALLBACK_EVENT_WAIT_CANCELLED + * in the lws service thread context. + */ + lws_cancel_service(vhd->context); + +wait_unlock: + pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ + +wait: + usleep(100000); + + } while (!vhd->finished); + + lwsl_notice("thread_spam %p exiting\n", (void *)pthread_self()); + + pthread_exit(NULL); +} + +/* this runs under the lws service thread context only */ + +static int +callback_minimal(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + struct per_session_data__minimal *pss = + (struct per_session_data__minimal *)user; + struct per_vhost_data__minimal *vhd = + (struct per_vhost_data__minimal *) + lws_protocol_vh_priv_get(lws_get_vhost(wsi), + lws_get_protocol(wsi)); + const struct lws_protocol_vhost_options *pvo; + const struct msg *pmsg; + uint32_t oldest; + void *retval; + int n, m, r = 0; + + switch (reason) { + case LWS_CALLBACK_PROTOCOL_INIT: + /* create our per-vhost struct */ + vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(struct per_vhost_data__minimal)); + if (!vhd) + return 1; + + pthread_mutex_init(&vhd->lock_ring, NULL); + + /* recover the pointer to the globals struct */ + pvo = lws_pvo_search( + (const struct lws_protocol_vhost_options *)in, + "config"); + if (!pvo || !pvo->value) { + lwsl_err("%s: Can't find \"config\" pvo\n", __func__); + return 1; + } + vhd->config = pvo->value; + + vhd->context = lws_get_context(wsi); + vhd->protocol = lws_get_protocol(wsi); + vhd->vhost = lws_get_vhost(wsi); + + vhd->ring = lws_ring_create(sizeof(struct msg), 8, + __minimal_destroy_message); + if (!vhd->ring) { + lwsl_err("%s: failed to create ring\n", __func__); + return 1; + } + + /* start the content-creating threads */ + + for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) + if (pthread_create(&vhd->pthread_spam[n], NULL, + thread_spam, vhd)) { + lwsl_err("thread creation failed\n"); + r = 1; + goto init_fail; + } + break; + + case LWS_CALLBACK_PROTOCOL_DESTROY: +init_fail: + vhd->finished = 1; + for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++) + if (vhd->pthread_spam[n]) + pthread_join(vhd->pthread_spam[n], &retval); + + if (vhd->ring) + lws_ring_destroy(vhd->ring); + + pthread_mutex_destroy(&vhd->lock_ring); + break; + + case LWS_CALLBACK_ESTABLISHED: + /* add ourselves to the list of live pss held in the vhd */ + pss->pss_list = vhd->pss_list; + vhd->pss_list = pss; + pss->tail = lws_ring_get_oldest_tail(vhd->ring); + pss->wsi = wsi; + break; + + case LWS_CALLBACK_CLOSED: + /* remove our closing pss from the list of live pss */ + lws_start_foreach_llp(struct per_session_data__minimal **, + ppss, vhd->pss_list) { + if (*ppss == pss) { + *ppss = pss->pss_list; + break; + } + } lws_end_foreach_llp(ppss, pss_list); + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */ + + pmsg = lws_ring_get_element(vhd->ring, &pss->tail); + if (!pmsg) { + pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ + break; + } + + /* notice we allowed for LWS_PRE in the payload already */ + m = lws_write(wsi, pmsg->payload + LWS_PRE, pmsg->len, + LWS_WRITE_TEXT); + if (m < (int)pmsg->len) { + pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ + lwsl_err("ERROR %d writing to di socket\n", n); + return -1; + } + + n = lws_ring_get_oldest_tail(vhd->ring) == pss->tail; + lws_ring_consume(vhd->ring, &pss->tail, NULL, 1); + + if (n) { /* we may have been the oldest tail */ + n = 0; + oldest = pss->tail; + lws_start_foreach_llp( + struct per_session_data__minimal **, + ppss, vhd->pss_list) { + m = lws_ring_get_count_waiting_elements( + vhd->ring, &(*ppss)->tail); + if (m > n) { + n = m; + oldest = (*ppss)->tail; + } + } lws_end_foreach_llp(ppss, pss_list); + + /* this will delete any entries behind the new oldest */ + lws_ring_update_oldest_tail(vhd->ring, oldest); + } + + /* more to do? */ + if (lws_ring_get_element(vhd->ring, &pss->tail)) + /* come back as soon as we can write more */ + lws_callback_on_writable(pss->wsi); + + pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */ + break; + + case LWS_CALLBACK_RECEIVE: + break; + + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + /* + * When the "spam" threads add a message to the ringbuffer, + * they create this event in the lws service thread context + * using lws_cancel_service(). + * + * We respond by scheduling a writable callback for all + * connected clients. + */ + lws_start_foreach_llp(struct per_session_data__minimal **, + ppss, vhd->pss_list) { + lws_callback_on_writable((*ppss)->wsi); + } lws_end_foreach_llp(ppss, pss_list); + break; + + case LWS_CALLBACK_TIMER: + lwsl_notice("%s: LWS_CALLBACK_TIMER\n", __func__); + lws_set_timer(wsi, 3); + break; + + default: + break; + } + + return r; +} + +#define LWS_PLUGIN_PROTOCOL_MINIMAL \ + { \ + "lws-minimal", \ + callback_minimal, \ + sizeof(struct per_session_data__minimal), \ + 128, \ + 0, NULL, 0 \ + } + +#if !defined (LWS_PLUGIN_STATIC) + +/* boilerplate needed if we are built as a dynamic plugin */ + +static const struct lws_protocols protocols[] = { + LWS_PLUGIN_PROTOCOL_MINIMAL +}; + +LWS_EXTERN LWS_VISIBLE int +init_protocol_minimal(struct lws_context *context, + struct lws_plugin_capability *c) +{ + if (c->api_magic != LWS_PLUGIN_API_MAGIC) { + lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC, + c->api_magic); + return 1; + } + + c->protocols = protocols; + c->count_protocols = ARRAY_SIZE(protocols); + c->extensions = NULL; + c->count_extensions = 0; + + return 0; +} + +LWS_EXTERN LWS_VISIBLE int +destroy_protocol_minimal(struct lws_context *context) +{ + return 0; +} +#endif