mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
get first working version of web socket node-type with MPMC queues
This commit is contained in:
parent
735f08f551
commit
9e13b06a59
5 changed files with 214 additions and 153 deletions
|
@ -33,7 +33,8 @@ nodes = {
|
|||
series = (
|
||||
{ label = "Random walk" },
|
||||
{ label = "Sine" },
|
||||
{ label = "Rect" }
|
||||
{ label = "Rect" },
|
||||
{ label = "Ramp" }
|
||||
)
|
||||
}
|
||||
};
|
||||
|
|
|
@ -29,38 +29,35 @@ struct lws;
|
|||
/** Internal data per websocket node */
|
||||
struct websocket {
|
||||
struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection) */
|
||||
|
||||
struct list destinations; /**< List of struct lws_client_connect_info to connect to in client mode. */
|
||||
|
||||
struct pool pool;
|
||||
|
||||
struct queue queue_tx; /**< For samples which are sent to WebSockets */
|
||||
struct queue queue_rx; /**< For samples which are received from WebSockets */
|
||||
|
||||
qptr_t sent;
|
||||
qptr_t received;
|
||||
|
||||
int shutdown;
|
||||
struct queue queue; /**< For samples which are received from WebSockets a */
|
||||
|
||||
int id; /**< The index of this node */
|
||||
};
|
||||
|
||||
struct websocket_connection {
|
||||
enum {
|
||||
WEBSOCKET_ESTABLISHED,
|
||||
WEBSOCKET_ACTIVE,
|
||||
WEBSOCKET_SHUTDOWN,
|
||||
WEBSOCKET_CLOSED
|
||||
} state;
|
||||
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct lws *wsi;
|
||||
|
||||
struct {
|
||||
char name[64];
|
||||
char ip[64];
|
||||
} peer;
|
||||
|
||||
qptr_t sent;
|
||||
qptr_t received;
|
||||
};
|
||||
|
||||
/** @see node_vtable::init */
|
||||
|
|
|
@ -45,8 +45,6 @@ struct sample {
|
|||
|
||||
atomic_int refcnt; /**< Reference counter. */
|
||||
struct pool *pool; /**< This sample is belong to this memory pool. */
|
||||
|
||||
int endian; /**< Endianess of data in the sample. */
|
||||
|
||||
/** All timestamps are seconds / nano seconds after 1.1.1970 UTC */
|
||||
struct {
|
||||
|
@ -57,9 +55,9 @@ struct sample {
|
|||
|
||||
/** The values. */
|
||||
union {
|
||||
float f; /**< Floating point values (note msg::endian) */
|
||||
uint32_t i; /**< Integer values (note msg::endian) */
|
||||
} data[];
|
||||
float f; /**< Floating point values. */
|
||||
uint32_t i; /**< Integer values. */
|
||||
} data[]; /**< Data is in host endianess! */
|
||||
};
|
||||
|
||||
/** Request \p cnt samples from memory pool \p p and initialize them.
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <libconfig.h>
|
||||
|
||||
#include "nodes/websocket.h"
|
||||
#include "webmsg_format.h"
|
||||
#include "timing.h"
|
||||
#include "utils.h"
|
||||
#include "msg.h"
|
||||
|
@ -33,43 +34,21 @@ static const char *ssl_cert; /**< Path to the SSL certitifcate for HTTPS / WSS
|
|||
static const char *ssl_private_key; /**< Path to the SSL private key for HTTPS / WSS */
|
||||
static const char *htdocs; /**< Path to the directory which should be served by build in HTTP server */
|
||||
|
||||
static int id = 0;
|
||||
|
||||
struct list connections; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
|
||||
|
||||
/* Forward declarations */
|
||||
static struct node_type vt;
|
||||
static int protocol_cb_http(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, void *, size_t);
|
||||
|
||||
static struct lws_protocols protocols[] = {
|
||||
{
|
||||
"http-only",
|
||||
protocol_cb_http,
|
||||
0,
|
||||
0
|
||||
},
|
||||
{
|
||||
"live",
|
||||
protocol_cb_live,
|
||||
sizeof(struct websocket_connection),
|
||||
0
|
||||
},
|
||||
{ 0 /* terminator */ }
|
||||
{ "http-only", protocol_cb_http, 0, 0 },
|
||||
{ "live", protocol_cb_live, sizeof(struct websocket_connection), 0 },
|
||||
{ NULL }
|
||||
};
|
||||
|
||||
#if 0
|
||||
static const struct lws_extension exts[] = {
|
||||
{
|
||||
"permessage-deflate",
|
||||
lws_extension_callback_pm_deflate,
|
||||
"permessage-deflate"
|
||||
},
|
||||
{
|
||||
"deflate-frame",
|
||||
lws_extension_callback_pm_deflate,
|
||||
"deflate_frame"
|
||||
},
|
||||
{ NULL, NULL, NULL /* terminator */ }
|
||||
};
|
||||
#endif
|
||||
|
||||
static void logger(int level, const char *msg) {
|
||||
int len = strlen(msg);
|
||||
if (strchr(msg, '\n'))
|
||||
|
@ -80,7 +59,7 @@ static void logger(int level, const char *msg) {
|
|||
level = LLL_WARN;
|
||||
|
||||
switch (level) {
|
||||
case LLL_ERR: error("LWS: %.*s", len, msg); break;
|
||||
case LLL_ERR: warn("LWS: %.*s", len, msg); break;
|
||||
case LLL_WARN: warn("LWS: %.*s", len, msg); break;
|
||||
case LLL_INFO: info("LWS: %.*s", len, msg); break;
|
||||
default: debug(DBG_WEBSOCKET | 1, "LWS: %.*s", len, msg); break;
|
||||
|
@ -160,8 +139,9 @@ int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
list_foreach(struct node *n, &vt.instances) {
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
|
||||
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i, s: i }",
|
||||
"name", node_name_short(n),
|
||||
"id", w->id,
|
||||
"connections", list_length(&w->connections),
|
||||
"state", n->state,
|
||||
"vectorize", n->vectorize,
|
||||
|
@ -228,7 +208,7 @@ int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -242,6 +222,7 @@ try_to_reuse:
|
|||
|
||||
int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
int ret;
|
||||
struct websocket_connection *c = user;
|
||||
struct websocket *w;
|
||||
|
||||
|
@ -256,36 +237,63 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
return -1;
|
||||
}
|
||||
|
||||
/* Search for node whose name matches the URI. */
|
||||
c->node = list_lookup(&vt.instances, uri + 1);
|
||||
if (c->node == NULL) {
|
||||
warn("LWS: Closing Connection for non-existent node: %s", uri + 1);
|
||||
return -1;
|
||||
if ((uri[0] == '/' && uri[1] == 0) || uri[0] == 0){
|
||||
/* Catch all connection */
|
||||
c->node = NULL;
|
||||
}
|
||||
else {
|
||||
char *node = uri + 1;
|
||||
|
||||
/* Check if node is running */
|
||||
if (c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
/* Search for node whose name matches the URI. */
|
||||
c->node = list_lookup(&vt.instances, node);
|
||||
if (c->node == NULL) {
|
||||
warn("LWS: Closing Connection for non-existent node: %s", uri + 1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Check if node is running */
|
||||
if (c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
}
|
||||
|
||||
c->state = WEBSOCKET_ESTABLISHED;
|
||||
c->wsi = wsi;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret) {
|
||||
warn("Failed to create queue for incoming websocket connection. Closing..");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Lookup peer address for debug output */
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New Connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
if (c->node != NULL)
|
||||
info("LWS: New connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
else
|
||||
info("LWS: New connection from %s (%s)", c->peer.name, c->peer.ip);
|
||||
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_push(&w->connections, c);
|
||||
if (c->node != NULL) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_push(&w->connections, c);
|
||||
}
|
||||
else {
|
||||
list_push(&connections, c);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
if (c->node != NULL)
|
||||
info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
else
|
||||
info("LWS: Connection closed from %s (%s)", c->peer.name, c->peer.ip);
|
||||
|
||||
c->state = WEBSOCKET_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
||||
queue_destroy(&c->queue);
|
||||
|
||||
return 0;
|
||||
|
||||
|
@ -293,36 +301,30 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||
w = (struct websocket *) c->node->_vd;
|
||||
|
||||
if (c->node->state != NODE_RUNNING)
|
||||
if (c->node && c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (w->shutdown) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Bye", 4);
|
||||
if (w->state == WEBSOCKET_SHUTDOWN) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int cnt, sent, ret;
|
||||
unsigned char *bufs[DEFAULT_QUEUELEN];
|
||||
|
||||
cnt = queue_get_many(&w->queue_tx, (void **) bufs, DEFAULT_QUEUELEN, c->sent);
|
||||
|
||||
for (sent = 0; sent < cnt; sent++) {
|
||||
struct msg *msg = (struct msg *) (bufs[sent] + LWS_PRE);
|
||||
char *buf;
|
||||
int cnt;
|
||||
while ((cnt = queue_pull(&c->queue, (void **) &buf))) {
|
||||
struct webmsg *msg = (struct webmsg *) (buf + LWS_PRE);
|
||||
|
||||
ret = lws_write(wsi, (unsigned char *) msg, MSG_LEN(msg->length), LWS_WRITE_BINARY);
|
||||
if (ret < MSG_LEN(msg->length))
|
||||
pool_put(&w->pool, (void *) buf);
|
||||
|
||||
ret = lws_write(wsi, (unsigned char *) msg, WEBMSG_LEN(msg->length), LWS_WRITE_BINARY);
|
||||
if (ret < WEBMSG_LEN(msg->length))
|
||||
error("Failed lws_write()");
|
||||
|
||||
if (lws_send_pipe_choked(wsi))
|
||||
break;
|
||||
break;
|
||||
}
|
||||
|
||||
queue_pull_many(&w->queue_tx, (void **) bufs, sent, &c->sent);
|
||||
|
||||
pool_put_many(&w->pool, (void **) bufs, sent);
|
||||
|
||||
if (sent < cnt)
|
||||
if (queue_available(&c->queue) > 0)
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
return 0;
|
||||
|
@ -335,27 +337,30 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
if (c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (!lws_frame_is_binary(wsi) || len < MSG_LEN(0))
|
||||
if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0))
|
||||
warn("LWS: Received invalid packet for node: %s", node_name(c->node));
|
||||
|
||||
struct msg *msg = (struct msg *) in;
|
||||
struct webmsg *msg = (struct webmsg *) in;
|
||||
|
||||
while ((char *) msg + MSG_LEN(msg->length) <= (char *) in + len) {
|
||||
struct msg *msg2 = pool_get(&w->pool);
|
||||
while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) {
|
||||
struct webmsg *msg2 = pool_get(&w->pool);
|
||||
if (!msg2) {
|
||||
warn("Pool underrun for node: %s", node_name(c->node));
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
|
||||
memcpy(msg2, msg, MSG_LEN(msg->length));
|
||||
memcpy(msg2, msg, WEBMSG_LEN(msg->length));
|
||||
|
||||
queue_push(&w->queue_rx, msg2, &c->received);
|
||||
ret = queue_push(&w->queue, msg2);
|
||||
if (ret != 1) {
|
||||
warn("Queue overrun for node: %s", node_name(c->node));
|
||||
break;
|
||||
}
|
||||
|
||||
/* Next message */
|
||||
msg = (struct msg *) ((char *) msg + MSG_LEN(msg->length));
|
||||
msg = (struct webmsg *) ((char *) msg + WEBMSG_LEN(msg->length));
|
||||
}
|
||||
|
||||
/** @todo Implement */
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -421,29 +426,24 @@ int websocket_deinit()
|
|||
|
||||
int websocket_open(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
int ret;
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
w->id = id++;
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
size_t blocklen = LWS_PRE + MSG_LEN(DEFAULT_VALUES);
|
||||
size_t blocklen = LWS_PRE + WEBMSG_LEN(DEFAULT_VALUES);
|
||||
|
||||
ret = pool_init_mmap(&w->pool, blocklen, 2 * DEFAULT_QUEUELEN);
|
||||
ret = pool_init(&w->pool, 64 * DEFAULT_QUEUELEN, blocklen, &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = queue_init(&w->queue_tx, DEFAULT_QUEUELEN);
|
||||
ret = queue_init(&w->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = queue_init(&w->queue_rx, DEFAULT_QUEUELEN);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
queue_reader_add(&w->queue_rx, 0, 0);
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -451,13 +451,13 @@ int websocket_close(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
w->shutdown = 1;
|
||||
|
||||
list_foreach(struct lws *wsi, &w->connections)
|
||||
lws_callback_on_writable(wsi);
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
c->state = WEBSOCKET_SHUTDOWN;
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
||||
pool_destroy(&w->pool);
|
||||
queue_destroy(&w->queue_tx);
|
||||
queue_destroy(&w->queue);
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
|
||||
|
@ -475,17 +475,21 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
struct msg *msgs[cnt];
|
||||
struct webmsg *msgs[cnt];
|
||||
|
||||
int got;
|
||||
|
||||
got = queue_pull_many(&w->queue_rx, (void **) msgs, cnt, &w->received);
|
||||
do {
|
||||
got = queue_pull_many(&w->queue, (void **) msgs, cnt);
|
||||
pthread_yield();
|
||||
} while (got == 0);
|
||||
|
||||
for (int i = 0; i < got; i++) {
|
||||
smps[i]->sequence = msgs[i]->sequence;
|
||||
smps[i]->length = msgs[i]->length;
|
||||
smps[i]->ts.origin = MSG_TS(msgs[i]);
|
||||
smps[i]->ts.origin = WEBMSG_TS(msgs[i]);
|
||||
|
||||
memcpy(&smps[i]->data, &msgs[i]->data, MSG_DATA_LEN(msgs[i]->length));
|
||||
memcpy(&smps[i]->data, &msgs[i]->data, WEBMSG_DATA_LEN(msgs[i]->length));
|
||||
}
|
||||
|
||||
pool_put_many(&w->pool, (void **) msgs, got);
|
||||
|
@ -493,59 +497,118 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
return got;
|
||||
}
|
||||
|
||||
static int websocket_connection_init(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
}
|
||||
|
||||
static char * websocket_connection_name(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return "(todo)";
|
||||
}
|
||||
|
||||
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int blocks, enqueued;
|
||||
char *bufs[cnt];
|
||||
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_SHUTDOWN:
|
||||
return -1;
|
||||
case WEBSOCKET_CLOSED:
|
||||
if (c->node) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_remove(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
/* fall through */
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE);
|
||||
|
||||
msg->version = WEBMSG_VERSION;
|
||||
msg->type = WEBMSG_TYPE_DATA;
|
||||
msg->endian = WEBMSG_ENDIAN_HOST;
|
||||
msg->length = smps[i]->length;
|
||||
msg->sequence = smps[i]->sequence;
|
||||
msg->id = w->id;
|
||||
msg->ts.sec = smps[i]->ts.origin.tv_sec;
|
||||
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
|
||||
|
||||
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
|
||||
}
|
||||
|
||||
enqueued = queue_push_many(&c->queue, (void **) bufs, cnt);
|
||||
if (enqueued != blocks)
|
||||
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
int blocks, enqueued;
|
||||
char *bufs[cnt];
|
||||
|
||||
/* Copy samples to websocket queue */
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket node: %s", node_name(n));
|
||||
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
struct msg *msg = (struct msg *) (bufs[i] + LWS_PRE);
|
||||
|
||||
msg->version = MSG_VERSION;
|
||||
msg->type = MSG_TYPE_DATA;
|
||||
msg->endian = MSG_ENDIAN_HOST;
|
||||
msg->length = smps[i]->length;
|
||||
msg->sequence = smps[i]->sequence;
|
||||
msg->ts.sec = smps[i]->ts.origin.tv_sec;
|
||||
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
|
||||
|
||||
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
websocket_write_connection(c, smps, cnt);
|
||||
}
|
||||
|
||||
enqueued = queue_push_many(&w->queue_tx, (void **) bufs, cnt, &w->sent);
|
||||
if (enqueued != blocks)
|
||||
warn("Queue overrun in websocket node: %s", node_name(n));
|
||||
|
||||
/* Notify all active websocket connections to send new data */
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_CLOSED:
|
||||
queue_reader_remove(&w->queue_tx, c->sent, w->sent);
|
||||
list_remove(&w->connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->sent = w->sent;
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
|
||||
queue_reader_add(&w->queue_tx, c->sent, w->sent);
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
}
|
||||
list_foreach(struct websocket_connection *c, &connections) {
|
||||
websocket_write_connection(c, smps, cnt);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int websocket_parse(struct node *n, config_setting_t *cfg)
|
||||
{
|
||||
config_setting_t *cfg_dests;
|
||||
|
||||
cfg_dests = config_setting_get_member(cfg, "destinations");
|
||||
|
||||
if (!config_setting_is_array(cfg_dests))
|
||||
cerror(dests, "The 'destinations' setting must be an array of URLs");
|
||||
|
||||
for (int i = 0; i < config_setting_length(cfg_dests); i++) {
|
||||
config_setting_t *cfg_dest;
|
||||
const char *url;
|
||||
struct lws_client_connect_info *i;
|
||||
|
||||
url = config_setting_get_string_elem(cfg_dests, i);
|
||||
if (!url)
|
||||
cerror(dests, "The 'destinations' setting must be an array of URLs");
|
||||
|
||||
i = alloc(sizeof())
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
char * websocket_print(struct node *n)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
static struct node_type vt = {
|
||||
.name = "websocket",
|
||||
.description = "Send and receive samples of a WebSocket connection (libwebsockets)",
|
||||
|
@ -557,7 +620,9 @@ static struct node_type vt = {
|
|||
.read = websocket_read,
|
||||
.write = websocket_write,
|
||||
.init = websocket_init,
|
||||
.deinit = websocket_deinit
|
||||
.deinit = websocket_deinit,
|
||||
.print = websocket_print,
|
||||
.parse = websocket_parse
|
||||
};
|
||||
|
||||
REGISTER_NODE_TYPE(&vt)
|
2
thirdparty/libwebsockets
vendored
2
thirdparty/libwebsockets
vendored
|
@ -1 +1 @@
|
|||
Subproject commit 0c984014f0a82e184af2ff18f97b45e2cbccd0bd
|
||||
Subproject commit 5fb327754ab4d202fca903dd5bd6b546b340eecb
|
Loading…
Add table
Reference in a new issue