1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

updated WebSocket node to latest develop state

This commit is contained in:
Steffen Vogel 2016-01-14 23:17:39 +01:00
parent 9e63bf4070
commit fcf01e5410
4 changed files with 189 additions and 37 deletions

View file

@ -227,30 +227,3 @@ function getParameterByName(name) {
var match = RegExp('[?&]' + name + '=([^&]*)').exec(window.location.search);
return match && decodeURIComponent(match[1].replace(/\+/g, ' '));
}
/* Class for parsing and printing a message / sample */
function Msg(ts, values, seq) {
this.ts = ts;
this.values = values;
this.seq = seq;
if (this.seq == undefined)
this.seq = 0;
this.parse = function(line) {
var res = line.split(/\s+/).map(Number);
this.ts = res[0] * 1000;
this.values = res.slice(1)
};
this.print = function() {
return (this.ts / 1000).toFixed(9) + "(" + this.seq + ") " + this.values.join(" ");
}
this.send = function(connection) {
var line = this.print();
connection.send(line);
}
}

108
contrib/websocket/msg.js Normal file
View file

@ -0,0 +1,108 @@
/** Javascript class for parsing binary messages
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC
* This file is part of S2SS. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
/**
* @addtogroup websocket
* @ingroup node
* @{
**********************************************************************************/
var S2SS = S2SS || {};
/* Some constants for the binary protocol */
const Msg.VERSION = 1;
const Msg.TYPE_DATA = 0; /**< Message contains float values */
const Msg.TYPE_START = 1; /**< Message marks the beginning of a new simulation case */
const Msg.TYPE_STOP = 2; /**< Message marks the end of a simulation case */
const Msg.TYPE_EMPTY = 3; /**< Message does not carry useful data */
const Msg.ENDIAN_LITTLE = 0; /**< Message values are in little endian format (float too!) */
const Msg.ENDIAN_BIG = 1; /**< Message values are in bit endian format */
/* Some offsets in the binary message */
const Msg.OFFSET_ENDIAN = 1;
const Msg.OFFSET_TYPE = 2;
const Msg.OFFSET_VERSION = 4;
/* Class for parsing and printing a message / sample */
function Msg(members) {
for(var k in members)
this[k] = members[k];
}
Msg.prototype.length = function() {
return this.length * 4 + 16;
}
Msg.prototype.toArrayBuffer = function() {
var blob = new ArrayBuffer(this.length());
return blob;
}
Msg.prototype.fromArrayBuffer = function(blob) {
var hdr = new UInt32Array(blob, 0, 16);
var hdr16 = new UInt16Array(blob, 0, 16);
var msg = new Msg({
endian: (hdr[0] >> MSG_OFFSET_ENDIAN) & 0x1,
version: (hdr[0] >> MSG_OFFSET_VERSION) & 0xF,
type: (hdr[0] >> MSG_OFFSET_TYPE) & 0x3,
length: hdr16[1],
sequence: hdr[1],
timestamp: 1e3 * (hdr[2] + hdr[3]), // in milliseconds
blob : blob
});
if (msg.endian == MSG_ENDIAN_BIG) {
console.warn("Unsupported endianness. Skipping message!");
continue;
/* @todo not working yet
hdr = hdr.map(swap32);
values = values.map(swap32);
*/
}
msg.values = new Float32Array(msg, offset + 16, length * 4); // values reinterpreted as floats with 16byte offset in msg
}
Msg.prototype.fromArrayBufferVector = function(blob) {
/* some local variables for parsing */
var offset = 0;
var msgs = [];
/* for every msg in vector */
while (offset < msg.byteLength) {
var msg = Msg.fromArrayBuffer(ArrayBuffer(blob, offset));
if (msg != undefined)
msgs.push(msg);
offset += msg.blobLength;
}
return msgs;
}
/** @todo parsing of big endian messages not yet supported */
function swap16(val) {
return ((val & 0xFF) << 8)
| ((val >> 8) & 0xFF);
}
function swap32(val) {
return ((val & 0xFF) << 24)
| ((val & 0xFF00) << 8)
| ((val >> 8) & 0xFF00)
| ((val >> 24) & 0xFF);
}
/** @} */

56
etc/ws.conf Normal file
View file

@ -0,0 +1,56 @@
############ Global Options ############
affinity = 0x02; # Mask of cores the server should run on
# This also maps the NIC interrupts to those cores!
priority = 50; # Priority for the server tasks.
# Usually the server is using a real-time FIFO
# scheduling algorithm
debug = 10; # The level of verbosity for debug messages
# Higher number => increased verbosity
stats = 1; # The interval in seconds to print path statistics.
# A value of 0 disables the statistics.
############ nodes ############
nodes = {
file = {
type = "file",
in = {
path = "input.log", # These options specify the path prefix where the the files are stored
mode = "r", # The mode in which files should be opened (see open(2))
}
},
ws = {
type = "websocket",
unit = "MVa",
units = [ "V", "A", "Var" ],
description = "Das ist ein Test",
source = {
simulator = "OP5600",
location = "ACS lab"
}
}
ws2 = {
type = "websocket",
description = "Warum nicht noch einanderer?"
}
};
############ List of paths ############
paths = (
{ in = "ws"; out = "ws"; hook = [ "ts", "fir:0" ]; rate = 20 }
);
http = {
htdocs = "/s2ss/contrib/websocket",
port = 80
}

View file

@ -258,12 +258,27 @@ found: * (void **) user = n;
w = n->_vd;
if (!w->read.m)
if (!w->read.cnt)
return 0;
pthread_mutex_lock(&w->read.mutex);
msg_scan(in, w->read.m, NULL, NULL);
size_t offset = 0;
for (int i = 0; i < w->read.cnt; i++) {
struct msg *dst = pool_get_relative(w->read.pool, i);
#if 1
struct msg *src = (char *) in + offset;
memcpy(dst, src, MSG_LEN(src->values));
offset += MSG_LEN(src->values);
if (offset >= len)
break;
#else
/** @todo untested */
msg_scan(in, dst, NULL, NULL);
#endif
}
pthread_mutex_unlock(&w->read.mutex);
pthread_cond_broadcast(&w->read.cond);
@ -379,31 +394,31 @@ int websocket_close(struct node *n)
return 0;
}
int websocket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int websocket_read(struct node *n, struct pool *pool, int cnt)
{
struct websocket *w = n->_vd;
struct msg *m = pool + (first % poolsize);
w->read.m = m;
w->read.pool = pool;
w->read.cnt = cnt;
pthread_cond_wait(&w->read.cond, &w->read.mutex);
return 1;
}
int websocket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt)
int websocket_write(struct node *n, struct pool *pool, int cnt)
{
struct websocket *w = n->_vd;
struct msg *m = pool + (first % poolsize);
pthread_mutex_lock(&w->write.mutex);
w->write.m = m;
w->write.pool = pool;
w->write.cnt = cnt;
/* Notify all active websocket connections to send new data */
list_foreach(struct lws *wsi, &w->connections) {
list_foreach(struct lws *wsi, &w->connections)
lws_callback_on_writable(context, wsi);
}
pthread_mutex_unlock(&w->write.mutex);