minimal-ws-broker

This commit is contained in:
Andy Green 2018-03-15 11:56:40 +08:00
parent 6232f5a2b7
commit db8cbb3b61
9 changed files with 552 additions and 1 deletions

View file

@ -3,3 +3,70 @@
server|Minimal examples providing a server
client|Minimal examples providing a client
client-server|Minimal examples providing client and server connections simultaneously
## FAQ
### What should I look at first
server/minimal-http-server
### Why are most of the sources split into a main C file file and a protocol file?
Lws supports three ways to implement the protocol callback code:
- you can just add it all in the same source file
- you can separate it as these examples do, and #include it
into the main sources
- you can build it as a standalone plugin that is discovered
and loaded at runtime.
The way these examples are structured, you can easily also build
the protocol callback as a plugin just with a different
CMakeLists.txt... see https://github.com/warmcat/libwebsockets/tree/master/plugin-standalone
for an example.
### Why would we want the protocol as a plugin?
You will notice a lot of the main C code is the same boilerplate
repeated for each example. The actual interesting part is in
the protocol callback only.
Lws provides a generic lightweight server app called 'lwsws' that
can be configured by JSON. Combined with your protocol as a plugin,
it means you don't actually have to make a special server "app"
part, you can just use lwsws and pass per-vhost configuration
from JSON into your protocol. (Of course in some cases you have
an existing app you are bolting lws on to, then you don't care
about this for that particular case).
Because lwsws has no dependency on whatever your plugin does, it
can mix and match different protocols without needing any code
changes. It reduces the size of the task to just writing the
code you care about in your protocol handler.
### I get why there is a pss, but why is there a vhd?
The pss is instantiated per-connection. But there are almost always
other variables that have a lifetime longer than a single connection.
You could make these variables "filescope" one-time globals, but that
means your protocol cannot instantiate multiple times.
Lws supports vhosts (virtual hosts), for example both https://warmcat.com
and https://libwebsockets are running on the same lwsws instance on the
same server and same IP... each of these is a separate vhost.
Your protocol may be enabled on multiple vhosts, each of these vhosts
provides a different vhd specific to the protocol instance on that
vhost. For example many of the samples keep a linked-list head to
a list of live pss in the vhd... that means it's cleanly a list of
pss opened **on that vhost**. If another vhost has the protocol
enabled, connections to that will point to a different vhd.
The example "server/minimal-ws-server-threads" demonstrates how to deliver
external configuration data to a specific vhost + protocol
combination using code. In lwsws, this is simply a matter of setting
the desired JSON config.

View file

@ -7,4 +7,4 @@ minimal-ws-server-pmd|Simple ws server with permessage-deflate support
minimal-ws-server-pmd-bulk|Simple ws server showing how to pass bulk data with permessage-deflate
minimal-ws-server-ring|Like minimal-ws-server but holds the chat in a multi-tail ringbuffer
minimal-ws-server-threads|Simple ws server where data is produced by different threads
minimal-ws-broker|Simple ws server with a publish / broker / subscribe architecture

View file

@ -0,0 +1,11 @@
cmake_minimum_required(VERSION 2.8)
set(SAMP lws-minimal-ws-broker)
set(SRCS minimal-ws-broker.c)
if (UNIX)
set(CMAKE_C_FLAGS "-Wall -Wsign-compare -Wignored-qualifiers -Wtype-limits -Wuninitialized -Werror -Wundef ${CMAKE_C_FLAGS}" )
endif()
add_executable(${SAMP} ${SRCS})
target_link_libraries(${SAMP} -lwebsockets)

View file

@ -0,0 +1,26 @@
# lws minimal ws broker
## build
```
$ cmake . && make
```
## usage
```
$ ./lws-minimal-ws-broker
[2018/03/15 12:23:12:1559] USER: LWS minimal ws broker | visit http://localhost:7681
[2018/03/15 12:23:12:1560] NOTICE: Creating Vhost 'default' port 7681, 2 protocols, IPv6 off
```
Visit http://localhost:7681 on multiple browser windows
The page opens a subscribe mode ws connection back to the broker,
and a publisher mode ws connection back to the broker.
The textarea shows the data from the subscription connection.
If you type text is in the text box and press send, the text
is passed to the broker on the publisher ws connection and
sent to all subscribers.

View file

@ -0,0 +1,87 @@
/*
* lws-minimal-ws-broker
*
* Copyright (C) 2018 Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This demonstrates the most minimal http server you can make with lws,
* with an added publish / broker / subscribe ws server.
*
* To keep it simple, it serves stuff in the subdirectory "./mount-origin" of
* the directory it was started in.
* You can change that by changing mount.origin.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#define LWS_PLUGIN_STATIC
#include "protocol_lws_minimal.c"
static struct lws_protocols protocols[] = {
{ "http", lws_callback_http_dummy, 0, 0 },
LWS_PLUGIN_PROTOCOL_MINIMAL,
{ NULL, NULL, 0, 0 } /* terminator */
};
static int interrupted;
static const struct lws_http_mount mount = {
/* .mount_next */ NULL, /* linked-list "next" */
/* .mountpoint */ "/", /* mountpoint URL */
/* .origin */ "./mount-origin", /* serve from dir */
/* .def */ "index.html", /* default filename */
/* .protocol */ NULL,
/* .cgienv */ NULL,
/* .extra_mimetypes */ NULL,
/* .interpret */ NULL,
/* .cgi_timeout */ 0,
/* .cache_max_age */ 0,
/* .auth_mask */ 0,
/* .cache_reusable */ 0,
/* .cache_revalidate */ 0,
/* .cache_intermediaries */ 0,
/* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
/* .mountpoint_len */ 1, /* char count */
/* .basic_auth_login_file */ NULL,
};
void sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, char **argv)
{
struct lws_context_creation_info info;
struct lws_context *context;
int n = 0;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = 7681;
info.mounts = &mount;
info.protocols = protocols;
lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_USER
/* | LLL_INFO */ /* | LLL_DEBUG */, NULL);
lwsl_user("LWS minimal ws broker | visit http://localhost:7681\n");
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
while (n >= 0 && !interrupted)
n = lws_service(context, 1000);
lws_context_destroy(context);
return 0;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

View file

@ -0,0 +1,102 @@
<meta charset="UTF-8">
<html>
<body>
<img src="libwebsockets.org-logo.png"><br>
LWS chat <b>minimal ws broker example</b>.<br>
This page opens two separate ws connections...<br>
A subscriber ws connection fills this textarea<br>
with data it receives from the broker...
<br>
<br>
<textarea id=r readonly cols=40 rows=10></textarea><br>
<br>
... and a publisher ws connection sends the string<br>
in the box below to the broker when you press Send.<br>
<input type="text" id=m cols=40 rows=1>
<button id=b onclick="sendmsg();">Send</button>
</body>
<script>
function get_appropriate_ws_url(extra_url)
{
var pcol;
var u = document.URL;
/*
* We open the websocket encrypted if this page came on an
* https:// url itself, otherwise unencrypted
*/
if (u.substring(0, 5) == "https") {
pcol = "wss://";
u = u.substr(8);
} else {
pcol = "ws://";
if (u.substring(0, 4) == "http")
u = u.substr(7);
}
u = u.split('/');
/* + "/xxx" bit is for IE10 workaround */
return pcol + u[0] + "/" + extra_url;
}
function new_ws(urlpath, protocol)
{
if (typeof MozWebSocket != "undefined")
return new MozWebSocket(urlpath, protocol);
return new WebSocket(urlpath, protocol);
}
subscriber_ws = new_ws(get_appropriate_ws_url(""), "lws-minimal-broker");
try {
subscriber_ws.onopen = function() {
document.getElementById("b").disabled = 0;
}
subscriber_ws.onmessage =function got_packet(msg) {
document.getElementById("r").value =
document.getElementById("r").value + msg.data + "\n";
document.getElementById("r").scrollTop =
document.getElementById("r").scrollHeight;
}
subscriber_ws.onclose = function(){
document.getElementById("b").disabled = 1;
}
} catch(exception) {
alert('<p>Error' + exception);
}
publisher_ws = new_ws(get_appropriate_ws_url("/publisher"), "lws-minimal-broker");
try {
publisher_ws.onopen = function() {
document.getElementById("m").disabled = 0;
}
publisher_ws.onmessage =function got_packet(msg) {
}
publisher_ws.onclose = function(){
document.getElementById("m").disabled = 1;
}
} catch(exception) {
alert('<p>Error' + exception);
}
function sendmsg()
{
publisher_ws.send(document.getElementById("m").value);
document.getElementById("m").value = "";
}
</script>
</html>

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

View file

@ -0,0 +1,258 @@
/*
* ws protocol handler plugin for "lws-minimal-broker"
*
* Copyright (C) 2010-2018 Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This implements a minimal "broker", for systems that look like this
*
* [ publisher ws client ] <-> [ ws server broker ws server ] <-> [ ws client subscriber ]
*
* The "publisher" role is to add data to the broker.
*
* The "subscriber" role is to hear about all data added to the system.
*
* The "broker" role is to manage incoming data from publishers and pass it out
* to subscribers.
*
* Any number of publishers and subscribers are supported.
*
* This example implements a single ws server, using one ws protocol, that treats ws
* connections as being in publisher or subscriber mode according to the URL the ws
* connection was made to. ws connections to "/publisher" URL are understood to be
* publishing data and to any other URL, subscribing.
*/
#if !defined (LWS_PLUGIN_STATIC)
#define LWS_DLL
#define LWS_INTERNAL
#include <libwebsockets.h>
#endif
#include <string.h>
/* one of these created for each message */
struct msg {
void *payload; /* is malloc'd */
size_t len;
};
/* one of these is created for each client connecting to us */
struct per_session_data__minimal {
struct per_session_data__minimal *pss_list;
struct lws *wsi;
uint32_t tail;
char publishing; /* nonzero: peer is publishing to us */
};
/* one of these is created for each vhost our protocol is used with */
struct per_vhost_data__minimal {
struct lws_context *context;
struct lws_vhost *vhost;
const struct lws_protocols *protocol;
struct per_session_data__minimal *pss_list; /* linked-list of live pss*/
struct lws_ring *ring; /* ringbuffer holding unsent messages */
};
/* destroys the message when everyone has had a copy of it */
static void
__minimal_destroy_message(void *_msg)
{
struct msg *msg = _msg;
free(msg->payload);
msg->payload = NULL;
msg->len = 0;
}
static int
callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct per_session_data__minimal *pss =
(struct per_session_data__minimal *)user;
struct per_vhost_data__minimal *vhd =
(struct per_vhost_data__minimal *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
const struct msg *pmsg;
struct msg amsg;
uint32_t oldest;
char buf[32];
int n, m;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct per_vhost_data__minimal));
vhd->context = lws_get_context(wsi);
vhd->protocol = lws_get_protocol(wsi);
vhd->vhost = lws_get_vhost(wsi);
vhd->ring = lws_ring_create(sizeof(struct msg), 8,
__minimal_destroy_message);
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
lws_ring_destroy(vhd->ring);
break;
case LWS_CALLBACK_ESTABLISHED:
/* add ourselves to the list of live pss held in the vhd */
pss->pss_list = vhd->pss_list;
vhd->pss_list = pss;
pss->tail = lws_ring_get_oldest_tail(vhd->ring);
pss->wsi = wsi;
if (lws_hdr_copy(wsi, buf, sizeof(buf), WSI_TOKEN_GET_URI) > 0)
pss->publishing = !strcmp(buf, "/publisher");
break;
case LWS_CALLBACK_CLOSED:
/* remove our closing pss from the list of live pss */
lws_start_foreach_llp(struct per_session_data__minimal **,
ppss, vhd->pss_list) {
if (*ppss == pss) {
*ppss = pss->pss_list;
break;
}
} lws_end_foreach_llp(ppss, pss_list);
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
if (pss->publishing)
break;
pmsg = lws_ring_get_element(vhd->ring, &pss->tail);
if (!pmsg)
break;
/* notice we allowed for LWS_PRE in the payload already */
m = lws_write(wsi, pmsg->payload + LWS_PRE, pmsg->len,
LWS_WRITE_TEXT);
if (m < (int)pmsg->len) {
lwsl_err("ERROR %d writing to di socket\n", n);
return -1;
}
n = lws_ring_get_oldest_tail(vhd->ring) == pss->tail;
lws_ring_consume(vhd->ring, &pss->tail, NULL, 1);
if (n) { /* we may have been the oldest tail */
n = 0;
oldest = pss->tail;
lws_start_foreach_llp(
struct per_session_data__minimal **,
ppss, vhd->pss_list) {
m = lws_ring_get_count_waiting_elements(
vhd->ring, &(*ppss)->tail);
if (m > n) {
n = m;
oldest = (*ppss)->tail;
}
} lws_end_foreach_llp(ppss, pss_list);
/* this will delete any entries behind the new oldest */
lws_ring_update_oldest_tail(vhd->ring, oldest);
}
/* more to do? */
if (lws_ring_get_element(vhd->ring, &pss->tail))
/* come back as soon as we can write more */
lws_callback_on_writable(pss->wsi);
break;
case LWS_CALLBACK_RECEIVE:
if (!pss->publishing)
break;
n = (int)lws_ring_get_count_free_elements(vhd->ring);
if (!n) {
lwsl_user("dropping!\n");
break;
}
amsg.len = len;
/* notice we over-allocate by LWS_PRE */
amsg.payload = malloc(LWS_PRE + len);
if (!amsg.payload) {
lwsl_user("OOM: dropping\n");
break;
}
memcpy((char *)amsg.payload + LWS_PRE, in, len);
if (!lws_ring_insert(vhd->ring, &amsg, 1)) {
__minimal_destroy_message(&amsg);
lwsl_user("dropping!\n");
break;
}
/*
* let every subscriber know we want to write something
* on them as soon as they are ready
*/
lws_start_foreach_llp(struct per_session_data__minimal **,
ppss, vhd->pss_list) {
if (!(*ppss)->publishing)
lws_callback_on_writable((*ppss)->wsi);
} lws_end_foreach_llp(ppss, pss_list);
break;
default:
break;
}
return 0;
}
#define LWS_PLUGIN_PROTOCOL_MINIMAL \
{ \
"lws-minimal-broker", \
callback_minimal, \
sizeof(struct per_session_data__minimal), \
128, \
0, NULL, 0 \
}
#if !defined (LWS_PLUGIN_STATIC)
/* boilerplate needed if we are built as a dynamic plugin */
static const struct lws_protocols protocols[] = {
LWS_PLUGIN_PROTOCOL_MINIMAL
};
LWS_EXTERN LWS_VISIBLE int
init_protocol_minimal(struct lws_context *context,
struct lws_plugin_capability *c)
{
if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
c->api_magic);
return 1;
}
c->protocols = protocols;
c->count_protocols = ARRAY_SIZE(protocols);
c->extensions = NULL;
c->count_extensions = 0;
return 0;
}
LWS_EXTERN LWS_VISIBLE int
destroy_protocol_minimal(struct lws_context *context)
{
return 0;
}
#endif