mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
further steps to implement websocket client functionality
This commit is contained in:
parent
15e7e78e04
commit
fab53b0302
2 changed files with 173 additions and 125 deletions
|
@ -19,6 +19,8 @@
|
|||
#ifndef _WEBSOCKET_H_
|
||||
#define _WEBSOCKET_H_
|
||||
|
||||
#include <libwebsockets.h>
|
||||
|
||||
#include "node.h"
|
||||
#include "pool.h"
|
||||
#include "queue.h"
|
||||
|
@ -28,38 +30,15 @@ struct lws;
|
|||
|
||||
/** Internal data per websocket node */
|
||||
struct websocket {
|
||||
struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection) */
|
||||
|
||||
struct list destinations; /**< List of struct lws_client_connect_info to connect to in client mode. */
|
||||
struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection). */
|
||||
struct list destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */
|
||||
|
||||
struct pool pool;
|
||||
|
||||
struct queue queue; /**< For samples which are received from WebSockets a */
|
||||
|
||||
int id; /**< The index of this node */
|
||||
};
|
||||
|
||||
struct websocket_connection {
|
||||
enum {
|
||||
WEBSOCKET_ESTABLISHED,
|
||||
WEBSOCKET_ACTIVE,
|
||||
WEBSOCKET_SHUTDOWN,
|
||||
WEBSOCKET_CLOSED
|
||||
} state;
|
||||
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct lws *wsi;
|
||||
|
||||
struct {
|
||||
char name[64];
|
||||
char ip[64];
|
||||
} peer;
|
||||
};
|
||||
|
||||
/** @see node_vtable::init */
|
||||
int websocket_init(int argc, char * argv[], config_setting_t *cfg);
|
||||
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
#include <libwebsockets.h>
|
||||
#include <libconfig.h>
|
||||
|
||||
#include "nodes/websocket.h"
|
||||
|
@ -23,6 +22,35 @@
|
|||
#include "cfg.h"
|
||||
#include "config.h"
|
||||
|
||||
/* Internal datastructures */
|
||||
struct connection {
|
||||
enum {
|
||||
WEBSOCKET_ESTABLISHED,
|
||||
WEBSOCKET_ACTIVE,
|
||||
WEBSOCKET_SHUTDOWN,
|
||||
WEBSOCKET_CLOSED
|
||||
} state;
|
||||
|
||||
struct node *node;
|
||||
struct path *path;
|
||||
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
struct lws *wsi;
|
||||
|
||||
struct {
|
||||
char name[64];
|
||||
char ip[64];
|
||||
} peer;
|
||||
|
||||
char *_name;
|
||||
};
|
||||
|
||||
struct destination {
|
||||
char *uri;
|
||||
struct lws_client_connect_info info;
|
||||
};
|
||||
|
||||
/* Private static storage */
|
||||
static config_setting_t *cfg_root; /**< Root config */
|
||||
static pthread_t thread; /**< All nodes are served by a single websocket server. This server is running in a dedicated thread. */
|
||||
|
@ -45,10 +73,93 @@ static int protocol_cb_live(struct lws *, enum lws_callback_reasons, void *, voi
|
|||
|
||||
static struct lws_protocols protocols[] = {
|
||||
{ "http-only", protocol_cb_http, 0, 0 },
|
||||
{ "live", protocol_cb_live, sizeof(struct websocket_connection), 0 },
|
||||
{ "live", protocol_cb_live, sizeof(struct connection), 0 },
|
||||
{ NULL }
|
||||
};
|
||||
|
||||
__attribute__((unused)) static int connection_init(struct connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return -1;
|
||||
}
|
||||
|
||||
__attribute__((unused)) static void connection_destroy(struct connection *c)
|
||||
{
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
}
|
||||
|
||||
static char * connection_name(struct connection *c)
|
||||
{
|
||||
if (!c->_name) {
|
||||
if (c->node)
|
||||
asprintf(&c->_name, "%s (%s) for node %s", c->peer.name, c->peer.ip, node_name(c->node));
|
||||
else
|
||||
asprintf(&c->_name, "%s (%s) for all nodes", c->peer.name, c->peer.ip);
|
||||
}
|
||||
|
||||
return c->_name;
|
||||
}
|
||||
|
||||
static void destination_destroy(struct destination *d)
|
||||
{
|
||||
free(d->uri);
|
||||
}
|
||||
|
||||
static int connection_write(struct connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int blocks, enqueued;
|
||||
char *bufs[cnt];
|
||||
|
||||
struct websocket *w = c->node->_vd;
|
||||
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_SHUTDOWN:
|
||||
return -1;
|
||||
case WEBSOCKET_CLOSED:
|
||||
if (c->node) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_remove(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
/* fall through */
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket connection: %s", connection_name(c));
|
||||
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE);
|
||||
|
||||
msg->version = WEBMSG_VERSION;
|
||||
msg->type = WEBMSG_TYPE_DATA;
|
||||
msg->endian = WEBMSG_ENDIAN_HOST;
|
||||
msg->length = smps[i]->length;
|
||||
msg->sequence = smps[i]->sequence;
|
||||
msg->id = w->id;
|
||||
msg->ts.sec = smps[i]->ts.origin.tv_sec;
|
||||
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
|
||||
|
||||
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
|
||||
}
|
||||
|
||||
enqueued = queue_push_many(&c->queue, (void **) bufs, cnt);
|
||||
if (enqueued != blocks)
|
||||
warn("Queue overrun in websocket connection: %s", connection_name(c));
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void logger(int level, const char *msg) {
|
||||
int len = strlen(msg);
|
||||
if (strchr(msg, '\n'))
|
||||
|
@ -102,7 +213,7 @@ static char * get_mimetype(const char *resource_path)
|
|||
return "text/plain";
|
||||
}
|
||||
|
||||
int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
static int protocol_cb_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_HTTP:
|
||||
|
@ -220,10 +331,10 @@ try_to_reuse:
|
|||
return 0;
|
||||
}
|
||||
|
||||
int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
static int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
int ret;
|
||||
struct websocket_connection *c = user;
|
||||
struct connection *c = user;
|
||||
struct websocket *w;
|
||||
|
||||
switch (reason) {
|
||||
|
@ -268,10 +379,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
/* Lookup peer address for debug output */
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
if (c->node != NULL)
|
||||
info("LWS: New connection for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
else
|
||||
info("LWS: New connection from %s (%s)", c->peer.name, c->peer.ip);
|
||||
info("LWS: New connection %s", connection_name(c));
|
||||
|
||||
if (c->node != NULL) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
|
@ -285,10 +393,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
}
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
if (c->node != NULL)
|
||||
info("LWS: Connection closed for node %s from %s (%s)", node_name(c->node), c->peer.name, c->peer.ip);
|
||||
else
|
||||
info("LWS: Connection closed from %s (%s)", c->peer.name, c->peer.ip);
|
||||
info("LWS: Connection %s closed", connection_name(c));
|
||||
|
||||
c->state = WEBSOCKET_CLOSED;
|
||||
c->wsi = NULL;
|
||||
|
@ -304,7 +409,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
if (c->node && c->node->state != NODE_RUNNING)
|
||||
return -1;
|
||||
|
||||
if (w->state == WEBSOCKET_SHUTDOWN) {
|
||||
if (c->state == WEBSOCKET_SHUTDOWN) {
|
||||
lws_close_reason(wsi, LWS_CLOSE_STATUS_GOINGAWAY, (unsigned char *) "Node stopped", 4);
|
||||
return -1;
|
||||
}
|
||||
|
@ -338,14 +443,14 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
return -1;
|
||||
|
||||
if (!lws_frame_is_binary(wsi) || len < WEBMSG_LEN(0))
|
||||
warn("LWS: Received invalid packet for node: %s", node_name(c->node));
|
||||
warn("LWS: Received invalid packet for connection %s", connection_name(c));
|
||||
|
||||
struct webmsg *msg = (struct webmsg *) in;
|
||||
|
||||
while ((char *) msg + WEBMSG_LEN(msg->length) <= (char *) in + len) {
|
||||
struct webmsg *msg2 = pool_get(&w->pool);
|
||||
if (!msg2) {
|
||||
warn("Pool underrun for node: %s", node_name(c->node));
|
||||
warn("Pool underrun for connection %s", connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -353,7 +458,7 @@ int protocol_cb_live(struct lws *wsi, enum lws_callback_reasons reason, void *us
|
|||
|
||||
ret = queue_push(&w->queue, msg2);
|
||||
if (ret != 1) {
|
||||
warn("Queue overrun for node: %s", node_name(c->node));
|
||||
warn("Queue overrun for connection %s", connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -451,7 +556,7 @@ int websocket_close(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
list_foreach(struct connection *c, &w->connections) {
|
||||
c->state = WEBSOCKET_SHUTDOWN;
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
@ -466,7 +571,9 @@ int websocket_close(struct node *n)
|
|||
|
||||
int websocket_destroy(struct node *n)
|
||||
{
|
||||
// struct websocket *w = n->_vd;
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_destroy(&w->destinations, (dtor_cb_t) destination_destroy, true);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -497,85 +604,16 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
return got;
|
||||
}
|
||||
|
||||
static int websocket_connection_init(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
}
|
||||
|
||||
static char * websocket_connection_name(struct websocket_connection *c)
|
||||
{
|
||||
/** @todo */
|
||||
return "(todo)";
|
||||
}
|
||||
|
||||
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int blocks, enqueued;
|
||||
char *bufs[cnt];
|
||||
|
||||
switch (c->state) {
|
||||
case WEBSOCKET_SHUTDOWN:
|
||||
return -1;
|
||||
case WEBSOCKET_CLOSED:
|
||||
if (c->node) {
|
||||
struct websocket *w = (struct websocket *) c->node->_vd;
|
||||
list_remove(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
break;
|
||||
|
||||
case WEBSOCKET_ESTABLISHED:
|
||||
c->state = WEBSOCKET_ACTIVE;
|
||||
/* fall through */
|
||||
|
||||
case WEBSOCKET_ACTIVE:
|
||||
blocks = pool_get_many(&w->pool, (void **) bufs, cnt);
|
||||
if (blocks != cnt)
|
||||
warn("Pool underrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
for (int i = 0; i < blocks; i++) {
|
||||
struct webmsg *msg = (struct webmsg *) (bufs[i] + LWS_PRE);
|
||||
|
||||
msg->version = WEBMSG_VERSION;
|
||||
msg->type = WEBMSG_TYPE_DATA;
|
||||
msg->endian = WEBMSG_ENDIAN_HOST;
|
||||
msg->length = smps[i]->length;
|
||||
msg->sequence = smps[i]->sequence;
|
||||
msg->id = w->id;
|
||||
msg->ts.sec = smps[i]->ts.origin.tv_sec;
|
||||
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
|
||||
|
||||
memcpy(&msg->data, &smps[i]->data, smps[i]->length * 4);
|
||||
}
|
||||
|
||||
enqueued = queue_push_many(&c->queue, (void **) bufs, cnt);
|
||||
if (enqueued != blocks)
|
||||
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_foreach(struct websocket_connection *c, &w->connections) {
|
||||
websocket_write_connection(c, smps, cnt);
|
||||
list_foreach(struct connection *c, &w->connections) {
|
||||
connection_write(c, smps, cnt);
|
||||
}
|
||||
|
||||
list_foreach(struct websocket_connection *c, &connections) {
|
||||
websocket_write_connection(c, smps, cnt);
|
||||
list_foreach(struct connection *c, &connections) {
|
||||
connection_write(c, smps, cnt);
|
||||
}
|
||||
|
||||
return cnt;
|
||||
|
@ -583,30 +621,61 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
|
||||
int websocket_parse(struct node *n, config_setting_t *cfg)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
config_setting_t *cfg_dests;
|
||||
int ret;
|
||||
|
||||
cfg_dests = config_setting_get_member(cfg, "destinations");
|
||||
|
||||
if (!config_setting_is_array(cfg_dests))
|
||||
cerror(dests, "The 'destinations' setting must be an array of URLs");
|
||||
cerror(cfg_dests, "The 'destinations' setting must be an array of URLs");
|
||||
|
||||
for (int i = 0; i < config_setting_length(cfg_dests); i++) {
|
||||
config_setting_t *cfg_dest;
|
||||
const char *url;
|
||||
struct lws_client_connect_info *i;
|
||||
struct destination *d;
|
||||
const char *uri, *prot, *ads, *path;
|
||||
|
||||
url = config_setting_get_string_elem(cfg_dests, i);
|
||||
if (!url)
|
||||
cerror(dests, "The 'destinations' setting must be an array of URLs");
|
||||
uri = config_setting_get_string_elem(cfg_dests, i);
|
||||
if (!uri)
|
||||
cerror(cfg_dests, "The 'destinations' setting must be an array of URLs");
|
||||
|
||||
i = alloc(sizeof())
|
||||
d = alloc(sizeof(struct destination));
|
||||
|
||||
d->uri = strdup(uri);
|
||||
if (!d->uri)
|
||||
serror("Failed to allocate memory");
|
||||
|
||||
ret = lws_parse_uri(d->uri, &prot, &ads, &d->info.port, &path);
|
||||
if (ret)
|
||||
cerror(cfg_dests, "Failed to parse websocket URI: '%s'", uri);
|
||||
|
||||
d->info.ssl_connection = !strcmp(prot, "https");
|
||||
d->info.address = ads;
|
||||
d->info.path = path;
|
||||
d->info.protocol = prot;
|
||||
d->info.ietf_version_or_minus_one = -1;
|
||||
|
||||
list_push(&w->destinations, d);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
char * websocket_print(struct node *n)
|
||||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
char *buf = NULL;
|
||||
|
||||
list_foreach(struct lws_client_connect_info *in, &w->destinations) {
|
||||
buf = strcatf(&buf, "%s://%s:%d/%s",
|
||||
in->ssl_connection ? "https" : "http",
|
||||
in->address,
|
||||
in->port,
|
||||
in->path
|
||||
);
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static struct node_type vt = {
|
||||
|
|
Loading…
Add table
Reference in a new issue