mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
websocket: major improvements to websocket node type
This commit is contained in:
parent
68300bced9
commit
53fae53aeb
2 changed files with 183 additions and 218 deletions
|
@ -48,7 +48,6 @@ struct lws;
|
|||
|
||||
/** Internal data per websocket node */
|
||||
struct websocket {
|
||||
struct list connections; /**< List of active libwebsocket connections in server mode (struct websocket_connection). */
|
||||
struct list destinations; /**< List of websocket servers connect to in client mode (struct websocket_destination). */
|
||||
|
||||
struct pool pool;
|
||||
|
@ -57,7 +56,14 @@ struct websocket {
|
|||
|
||||
/* Internal datastructures */
|
||||
struct websocket_connection {
|
||||
enum state state; /**< The current status of this connection. */
|
||||
enum websocket_connection_state {
|
||||
STATE_DISCONNECTED,
|
||||
STATE_CONNECTING,
|
||||
STATE_RECONNECTING,
|
||||
STATE_ESTABLISHED,
|
||||
STATE_SHUTDOWN,
|
||||
STATE_ERROR
|
||||
} state; /**< The current status of this connection. */
|
||||
|
||||
enum {
|
||||
WEBSOCKET_MODE_CLIENT,
|
||||
|
@ -67,7 +73,7 @@ struct websocket_connection {
|
|||
struct lws *wsi;
|
||||
struct node *node;
|
||||
struct io_format *format; /**< The IO format used for this connection. */
|
||||
struct queue_signalled queue; /**< For samples which are sent to the WebSocket */
|
||||
struct queue queue; /**< For samples which are sent to the WebSocket */
|
||||
|
||||
union {
|
||||
/**< Only used in case websocket_connection::mode == WEBSOCKET_MODE_CLIENT */
|
||||
|
@ -80,8 +86,10 @@ struct websocket_connection {
|
|||
} peer;
|
||||
};
|
||||
|
||||
char *buf; /**< A buffer which is used to construct the messages. */
|
||||
size_t buflen; /**< Length of websocket_connection::buf. */
|
||||
struct {
|
||||
struct buffer recv; /**< A buffer for reconstructing fragmented messags. */
|
||||
struct buffer send; /**< A buffer for contsructing messages before calling lws_write() */
|
||||
} buffers;
|
||||
|
||||
char *_name;
|
||||
};
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "super_node.h"
|
||||
#include "timing.h"
|
||||
#include "utils.h"
|
||||
#include "buffer.h"
|
||||
#include "plugin.h"
|
||||
#include "io_format.h"
|
||||
#include "nodes/websocket.h"
|
||||
|
@ -43,7 +44,13 @@ static struct plugin p;
|
|||
static char * websocket_connection_name(struct websocket_connection *c)
|
||||
{
|
||||
if (!c->_name) {
|
||||
strcatf(&c->_name, "%s (%s)", c->peer.name, c->peer.ip);
|
||||
if (c->wsi) {
|
||||
lws_get_peer_addresses(c->wsi, lws_get_socket_fd(c->wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
strcatf(&c->_name, "%s", c->peer.name);
|
||||
}
|
||||
else if (c->destination)
|
||||
strcatf(&c->_name, "%s:%d", c->destination->info.address, c->destination->info.port);
|
||||
|
||||
if (c->node)
|
||||
strcatf(&c->_name, " for node %s", node_name(c->node));
|
||||
|
@ -54,64 +61,22 @@ static char * websocket_connection_name(struct websocket_connection *c)
|
|||
return c->_name;
|
||||
}
|
||||
|
||||
static int websocket_connection_init(struct websocket_connection *c, struct lws *wsi)
|
||||
{
|
||||
int ret;
|
||||
|
||||
lws_get_peer_addresses(wsi, lws_get_socket_fd(wsi), c->peer.name, sizeof(c->peer.name), c->peer.ip, sizeof(c->peer.ip));
|
||||
|
||||
info("LWS: New connection %s", websocket_connection_name(c));
|
||||
|
||||
c->state = STATE_INITIALIZED;
|
||||
c->wsi = wsi;
|
||||
|
||||
/** @todo: We must find a better way to determine the buffer size */
|
||||
c->buflen = 1 << 12;
|
||||
c->buf = alloc(c->buflen);
|
||||
|
||||
|
||||
if (c->node) {
|
||||
struct websocket *w = c->node->_vd;
|
||||
|
||||
list_push(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_push(&connections, c);
|
||||
|
||||
ret = queue_signalled_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int websocket_connection_destroy(struct websocket_connection *c)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (c->state == STATE_DESTROYED)
|
||||
return 0;
|
||||
|
||||
info("LWS: Connection %s closed", websocket_connection_name(c));
|
||||
|
||||
if (c->node) {
|
||||
struct websocket *w = c->node->_vd;
|
||||
list_remove(&w->connections, c);
|
||||
}
|
||||
else
|
||||
list_remove(&connections, c);
|
||||
list_remove(&connections, c);
|
||||
|
||||
if (c->_name)
|
||||
free(c->_name);
|
||||
|
||||
ret = queue_signalled_destroy(&c->queue);
|
||||
ret = queue_destroy(&c->queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
if (c->buf)
|
||||
free(c->buf);
|
||||
|
||||
c->state = STATE_DESTROYED;
|
||||
buffer_destroy(&c->buffers.recv);
|
||||
buffer_destroy(&c->buffers.send);
|
||||
|
||||
c->wsi = NULL;
|
||||
|
||||
return ret;
|
||||
|
@ -127,27 +92,19 @@ static void websocket_destination_destroy(struct websocket_destination *d)
|
|||
|
||||
static int websocket_connection_write(struct websocket_connection *c, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int ret;
|
||||
|
||||
switch (c->state) {
|
||||
case STATE_INITIALIZED:
|
||||
c->state = STATE_STARTED;
|
||||
/* fall through */
|
||||
|
||||
case STATE_STARTED:
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
sample_get(smps[i]); /* increase reference count */
|
||||
|
||||
ret = queue_signalled_push(&c->queue, (void **) smps[i]);
|
||||
if (ret != 1)
|
||||
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
|
||||
}
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
break;
|
||||
|
||||
default: { }
|
||||
}
|
||||
int pushed;
|
||||
|
||||
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
|
||||
if (pushed < cnt)
|
||||
warn("Queue overrun in websocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
sample_get_many(smps, cnt);
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Enqueued %u samples to %s", pushed, websocket_connection_name(c));
|
||||
|
||||
/* Client connections which are currently conecting don't have an associate c->wsi yet */
|
||||
if (c->wsi)
|
||||
lws_callback_on_writable(c->wsi);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -156,40 +113,33 @@ static void websocket_connection_close(struct websocket_connection *c, struct lw
|
|||
{
|
||||
lws_close_reason(wsi, status, (unsigned char *) reason, strlen(reason));
|
||||
|
||||
char *msg = strf("LWS: Closing connection");
|
||||
|
||||
if (c)
|
||||
msg = strcatf(&msg, " with %s", websocket_connection_name(c));
|
||||
|
||||
msg = strcatf(&msg, ": status=%u, reason=%s", status, reason);
|
||||
|
||||
warn("%s", msg);
|
||||
|
||||
free(msg);
|
||||
debug(LOG_WEBSOCKET | 10, "Closing WebSocket connection with %s: status=%u, reason=%s", websocket_connection_name(c), status, reason);
|
||||
}
|
||||
|
||||
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
|
||||
{
|
||||
int ret;
|
||||
int ret, recvd, pulled, cnt = 128;
|
||||
struct websocket_connection *c = user;
|
||||
|
||||
|
||||
switch (reason) {
|
||||
case LWS_CALLBACK_CLIENT_ESTABLISHED:
|
||||
ret = websocket_connection_init(c, wsi);
|
||||
if (ret)
|
||||
return -1;
|
||||
c->wsi = wsi;
|
||||
c->state = STATE_ESTABLISHED;
|
||||
|
||||
c->format = io_format_lookup("villas");
|
||||
buffer_init(&c->buffers.recv, 1 << 12);
|
||||
buffer_init(&c->buffers.send, 1 << 12);
|
||||
|
||||
break;
|
||||
debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
warn("Failed to establish connection: %s", websocket_connection_name(c));
|
||||
|
||||
/* Schedule writable callback in case we have something to send */
|
||||
if (queue_available(&c->queue) > 0)
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_ESTABLISHED:
|
||||
c->state = STATE_DESTROYED;
|
||||
c->wsi = wsi;
|
||||
c->state = STATE_ESTABLISHED;
|
||||
c->mode = WEBSOCKET_MODE_SERVER;
|
||||
|
||||
/* We use the URI to associate this connection to a node
|
||||
|
@ -229,134 +179,129 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
|
|||
}
|
||||
|
||||
if (!format)
|
||||
format = "villas";
|
||||
format = "webmsg";
|
||||
|
||||
c->format = io_format_lookup(format);
|
||||
if (!c->format) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid format");
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = websocket_connection_init(c, wsi);
|
||||
|
||||
buffer_init(&c->buffers.recv, 1 << 12);
|
||||
buffer_init(&c->buffers.send, 1 << 12);
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
list_push(&connections, c);
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Established WebSocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
|
||||
c->state = STATE_ERROR;
|
||||
|
||||
warn("Failed to establish WebSocket connection: %s (%s)", websocket_connection_name(c), in ? (char *) in : "Unkown reason");
|
||||
|
||||
return -1;
|
||||
|
||||
case LWS_CALLBACK_CLOSED:
|
||||
debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c));
|
||||
|
||||
if (c->state != STATE_SHUTDOWN) {
|
||||
/** @todo Attempt reconnect here */
|
||||
}
|
||||
|
||||
websocket_connection_destroy(c);
|
||||
|
||||
if (c->mode == WEBSOCKET_MODE_CLIENT)
|
||||
free(c);
|
||||
|
||||
return 0;
|
||||
break;
|
||||
|
||||
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
||||
case LWS_CALLBACK_SERVER_WRITEABLE: {
|
||||
if (c->node && c->node->state != STATE_STARTED) {
|
||||
size_t wbytes;
|
||||
|
||||
struct sample **smps = alloca(cnt * sizeof(struct sample *));
|
||||
|
||||
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
|
||||
if (pulled > 0) {
|
||||
io_format_sprint(c->format, c->buffers.send.buf, c->buffers.send.size, &wbytes, smps, pulled, IO_FORMAT_ALL);
|
||||
|
||||
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
|
||||
|
||||
sample_put_many(smps, pulled);
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Send %d samples on %s: bytes=%d", pulled, websocket_connection_name(c), ret);
|
||||
}
|
||||
|
||||
if (c->state == STATE_SHUTDOWN) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
|
||||
return -1;
|
||||
}
|
||||
|
||||
size_t wbytes;
|
||||
int cnt = 256; //c->node ? c->node->vectorize : 1;
|
||||
int pulled;
|
||||
|
||||
struct sample **smps = alloca(cnt * sizeof(struct sample *));
|
||||
|
||||
pulled = queue_signalled_pull_many(&c->queue, (void **) smps, cnt);
|
||||
|
||||
io_format_sprint(c->format, c->buf + LWS_PRE, c->buflen - LWS_PRE, &wbytes, smps, pulled, IO_FORMAT_ALL);
|
||||
|
||||
sample_put_many(smps, pulled);
|
||||
|
||||
ret = lws_write(wsi, (unsigned char *) c->buf + LWS_PRE, wbytes, c->format->flags & IO_FORMAT_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
|
||||
if (ret < 0) {
|
||||
warn("Failed lws_write() for connection %s", websocket_connection_name(c));
|
||||
return -1;
|
||||
else {
|
||||
if (queue_available(&c->queue) > 0)
|
||||
lws_callback_on_writable(wsi);
|
||||
}
|
||||
|
||||
if (c->state == STATE_STOPPED) {
|
||||
info("Closing connection %s", websocket_connection_name(c));
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_NORMAL, "Goodbye");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (queue_signalled_available(&c->queue) > 0)
|
||||
lws_callback_on_writable(wsi);
|
||||
|
||||
return 0;
|
||||
break;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_CLIENT_RECEIVE:
|
||||
case LWS_CALLBACK_RECEIVE: {
|
||||
if (c->format->flags & IO_FORMAT_BINARY && !lws_frame_is_binary(wsi)) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Binary data expected");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (len <= 0) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Invalid packet");
|
||||
return -1;
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_RECEIVE:
|
||||
if (!c->node) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, "Catch-all connection can not receive.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct timespec ts_recv = time_now();
|
||||
int recvd;
|
||||
int cnt = 256; //c->node->vectorize;
|
||||
struct websocket *w = c->node->_vd;
|
||||
struct sample **smps = alloca(cnt * sizeof(struct sample *));
|
||||
|
||||
ret = sample_alloc(&w->pool, smps, cnt);
|
||||
if (ret != 1) {
|
||||
warn("Pool underrun for connection: %s", websocket_connection_name(c));
|
||||
break;
|
||||
|
||||
if (lws_is_first_fragment(wsi))
|
||||
buffer_clear(&c->buffers.recv);
|
||||
|
||||
ret = buffer_append(&c->buffers.recv, in, len);
|
||||
if (ret) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
|
||||
if (lws_is_final_fragment(wsi)) {
|
||||
struct timespec ts_recv = time_now();
|
||||
|
||||
recvd = io_format_sscan(c->format, in, len, NULL, smps, cnt, NULL);
|
||||
if (recvd < 0) {
|
||||
warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
|
||||
break;
|
||||
}
|
||||
struct websocket *w = c->node->_vd;
|
||||
struct sample **smps = alloca(cnt * sizeof(struct sample *));
|
||||
|
||||
ret = sample_alloc(&w->pool, smps, cnt);
|
||||
if (ret != cnt) {
|
||||
warn("Pool underrun for connection: %s", websocket_connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
struct node *dest;
|
||||
recvd = io_format_sscan(c->format, c->buffers.recv.buf, c->buffers.recv.len, NULL, smps, cnt, NULL);
|
||||
if (recvd < 0) {
|
||||
warn("Failed to parse sample data received on connection: %s", websocket_connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Received %d samples on %s", recvd, websocket_connection_name(c));
|
||||
|
||||
for (int i = 0; i < recvd; i++) {
|
||||
/* Set receive timestamp */
|
||||
smps[i]->ts.received = ts_recv;
|
||||
for (int i = 0; i < recvd; i++)
|
||||
smps[i]->ts.received = ts_recv;
|
||||
|
||||
/* Find destination node of this message */
|
||||
if (c->node)
|
||||
dest = c->node;
|
||||
else {
|
||||
dest = NULL;
|
||||
|
||||
for (int i = 0; i < list_length(&p.node.instances); i++) {
|
||||
struct node *n = list_at(&p.node.instances, i);
|
||||
|
||||
if (n->id == smps[i]->id) {
|
||||
dest = n;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!dest)
|
||||
warn("Ignoring message due to invalid node id");
|
||||
ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
|
||||
if (ret != recvd)
|
||||
warn("Queue overrun for connection %s", websocket_connection_name(c));
|
||||
|
||||
if (c->state == STATE_SHUTDOWN) {
|
||||
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
ret = queue_signalled_push_many(&w->queue, (void **) smps, recvd);
|
||||
if (ret != 1) {
|
||||
warn("Queue overrun for connection %s", websocket_connection_name(c));
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
|
@ -382,19 +327,17 @@ int websocket_deinit()
|
|||
for (size_t i = 0; i < list_length(&connections); i++) {
|
||||
struct websocket_connection *c = list_at(&connections, i);
|
||||
|
||||
c->state = STATE_STOPPED;
|
||||
c->state = STATE_SHUTDOWN;
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
||||
/* Wait for all connections to be closed */
|
||||
while (list_length(&connections) > 0) {
|
||||
info("LWS: Waiting for connection shutdown");
|
||||
sched_yield();
|
||||
usleep(0.1 * 1e6);
|
||||
info("Waiting for WebSocket connection shutdown");
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
|
||||
list_destroy(&connections, (dtor_cb_t) websocket_destination_destroy, true);
|
||||
|
||||
return 0;
|
||||
|
@ -415,17 +358,24 @@ int websocket_start(struct node *n)
|
|||
|
||||
for (int i = 0; i < list_length(&w->destinations); i++) {
|
||||
struct websocket_destination *d = list_at(&w->destinations, i);
|
||||
|
||||
struct websocket_connection *c = alloc(sizeof(struct websocket_connection));
|
||||
|
||||
c->state = STATE_DESTROYED;
|
||||
c->state = STATE_CONNECTING;
|
||||
c->mode = WEBSOCKET_MODE_CLIENT;
|
||||
c->node = n;
|
||||
c->destination = d;
|
||||
|
||||
c->format = io_format_lookup("webmsg"); /** @todo We could parse the format from the URI */
|
||||
|
||||
d->info.context = web->context;
|
||||
d->info.vhost = web->vhost;
|
||||
d->info.userdata = c;
|
||||
|
||||
ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
list_push(&connections, c);
|
||||
|
||||
lws_client_connect_via_info(&d->info);
|
||||
}
|
||||
|
@ -437,22 +387,38 @@ int websocket_stop(struct node *n)
|
|||
{
|
||||
int ret;
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
/* Wait for all connections to be closed */
|
||||
for (;;) {
|
||||
int connecting = 0;
|
||||
|
||||
for (int i = 0; i < list_length(&w->destinations); i++) {
|
||||
struct websocket_destination *d = list_at(&w->destinations, i);
|
||||
struct websocket_connection *c = d->info.userdata;
|
||||
|
||||
if (c->state == STATE_CONNECTING)
|
||||
connecting++;
|
||||
}
|
||||
|
||||
if (connecting == 0)
|
||||
break;
|
||||
|
||||
debug(LOG_WEBSOCKET | 10, "Waiting for %d client connections to be established", connecting);
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < list_length(&connections); i++) {
|
||||
struct websocket_connection *c = list_at(&connections, i);
|
||||
|
||||
if (c->node != n)
|
||||
continue;
|
||||
|
||||
for (size_t i = 0; i < list_length(&w->connections); i++) {
|
||||
struct websocket_connection *c = list_at(&w->connections, i);
|
||||
|
||||
c->state = STATE_STOPPED;
|
||||
if (c->state != STATE_CONNECTING)
|
||||
c->state = STATE_SHUTDOWN;
|
||||
|
||||
lws_callback_on_writable(c->wsi);
|
||||
}
|
||||
|
||||
/* Wait for all connections to be closed */
|
||||
while (list_length(&w->connections) > 0) {
|
||||
info("LWS: Waiting for connection shutdown");
|
||||
sched_yield();
|
||||
usleep(0.1 * 1e6);
|
||||
}
|
||||
|
||||
ret = queue_signalled_destroy(&w->queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
@ -468,7 +434,6 @@ int websocket_destroy(struct node *n)
|
|||
{
|
||||
struct websocket *w = n->_vd;
|
||||
|
||||
list_destroy(&w->connections, NULL, false);
|
||||
list_destroy(&w->destinations, (dtor_cb_t) websocket_destination_destroy, true);
|
||||
|
||||
return 0;
|
||||
|
@ -481,11 +446,9 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
struct websocket *w = n->_vd;
|
||||
struct sample *cpys[cnt];
|
||||
|
||||
do {
|
||||
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
|
||||
if (avail < 0)
|
||||
return avail;
|
||||
} while (avail == 0);
|
||||
avail = queue_signalled_pull_many(&w->queue, (void **) cpys, cnt);
|
||||
if (avail < 0)
|
||||
return avail;
|
||||
|
||||
for (int i = 0; i < avail; i++) {
|
||||
sample_copy(smps[i], cpys[i]);
|
||||
|
@ -511,18 +474,14 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
sample_copy(cpys[i], smps[i]);
|
||||
|
||||
cpys[i]->source = n;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < list_length(&w->connections); i++) {
|
||||
struct websocket_connection *c = list_at(&w->connections, i);
|
||||
|
||||
websocket_connection_write(c, cpys, cnt);
|
||||
cpys[i]->id = n->id;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < list_length(&connections); i++) {
|
||||
struct websocket_connection *c = list_at(&connections, i);
|
||||
|
||||
websocket_connection_write(c, cpys, cnt);
|
||||
|
||||
if (c->node == n || c->node == NULL)
|
||||
websocket_connection_write(c, cpys, cnt);
|
||||
}
|
||||
|
||||
sample_put_many(cpys, avail);
|
||||
|
@ -540,7 +499,6 @@ int websocket_parse(struct node *n, json_t *cfg)
|
|||
json_t *cfg_dest;
|
||||
json_error_t err;
|
||||
|
||||
list_init(&w->connections);
|
||||
list_init(&w->destinations);
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o }", "destinations", &cfg_dests);
|
||||
|
@ -568,13 +526,12 @@ int websocket_parse(struct node *n, json_t *cfg)
|
|||
|
||||
d->info.ssl_connection = !strcmp(prot, "https");
|
||||
d->info.address = strdup(ads);
|
||||
d->info.path = strdup(path);
|
||||
d->info.host = d->info.address;
|
||||
d->info.origin = d->info.address;
|
||||
d->info.ietf_version_or_minus_one = -1;
|
||||
d->info.protocol = "live";
|
||||
|
||||
ret = asprintf((char **) &d->info.path, "/%s", path);
|
||||
|
||||
list_push(&w->destinations, d);
|
||||
}
|
||||
}
|
||||
|
@ -593,7 +550,7 @@ char * websocket_print(struct node *n)
|
|||
for (size_t i = 0; i < list_length(&w->destinations); i++) {
|
||||
struct websocket_destination *d = list_at(&w->destinations, i);
|
||||
|
||||
buf = strcatf(&buf, "%s://%s:%d%s ",
|
||||
buf = strcatf(&buf, "%s://%s:%d/%s ",
|
||||
d->info.ssl_connection ? "wss" : "ws",
|
||||
d->info.address,
|
||||
d->info.port,
|
||||
|
|
Loading…
Add table
Reference in a new issue