diff --git a/contrib/websocket/app.js b/contrib/websocket/app.js index 7293a76ec..ca554b461 100644 --- a/contrib/websocket/app.js +++ b/contrib/websocket/app.js @@ -227,30 +227,3 @@ function getParameterByName(name) { var match = RegExp('[?&]' + name + '=([^&]*)').exec(window.location.search); return match && decodeURIComponent(match[1].replace(/\+/g, ' ')); } - -/* Class for parsing and printing a message / sample */ -function Msg(ts, values, seq) { - this.ts = ts; - this.values = values; - this.seq = seq; - - if (this.seq == undefined) - this.seq = 0; - - this.parse = function(line) { - var res = line.split(/\s+/).map(Number); - - this.ts = res[0] * 1000; - this.values = res.slice(1) - }; - - this.print = function() { - return (this.ts / 1000).toFixed(9) + "(" + this.seq + ") " + this.values.join(" "); - } - - this.send = function(connection) { - var line = this.print(); - - connection.send(line); - } -} diff --git a/contrib/websocket/msg.js b/contrib/websocket/msg.js new file mode 100644 index 000000000..f83a13cf9 --- /dev/null +++ b/contrib/websocket/msg.js @@ -0,0 +1,108 @@ +/** Javascript class for parsing binary messages + * + * @file + * @author Steffen Vogel + * @copyright 2014-2015, Institute for Automation of Complex Power Systems, EONERC + * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + */ +/** + * @addtogroup websocket + * @ingroup node + * @{ + **********************************************************************************/ + +var S2SS = S2SS || {}; + +/* Some constants for the binary protocol */ +const Msg.VERSION = 1; + +const Msg.TYPE_DATA = 0; /**< Message contains float values */ +const Msg.TYPE_START = 1; /**< Message marks the beginning of a new simulation case */ +const Msg.TYPE_STOP = 2; /**< Message marks the end of a simulation case */ +const Msg.TYPE_EMPTY = 3; /**< Message does not carry useful data */ + +const Msg.ENDIAN_LITTLE = 0; /**< Message values are in little endian format (float too!) */ +const Msg.ENDIAN_BIG = 1; /**< Message values are in bit endian format */ + +/* Some offsets in the binary message */ +const Msg.OFFSET_ENDIAN = 1; +const Msg.OFFSET_TYPE = 2; +const Msg.OFFSET_VERSION = 4; + +/* Class for parsing and printing a message / sample */ +function Msg(members) { + for(var k in members) + this[k] = members[k]; +} + +Msg.prototype.length = function() { + return this.length * 4 + 16; +} + +Msg.prototype.toArrayBuffer = function() { + var blob = new ArrayBuffer(this.length()); + + return blob; +} + +Msg.prototype.fromArrayBuffer = function(blob) { + var hdr = new UInt32Array(blob, 0, 16); + var hdr16 = new UInt16Array(blob, 0, 16); + + var msg = new Msg({ + endian: (hdr[0] >> MSG_OFFSET_ENDIAN) & 0x1, + version: (hdr[0] >> MSG_OFFSET_VERSION) & 0xF, + type: (hdr[0] >> MSG_OFFSET_TYPE) & 0x3, + length: hdr16[1], + sequence: hdr[1], + timestamp: 1e3 * (hdr[2] + hdr[3]), // in milliseconds + blob : blob + }); + + if (msg.endian == MSG_ENDIAN_BIG) { + console.warn("Unsupported endianness. Skipping message!"); + continue; + + /* @todo not working yet + hdr = hdr.map(swap32); + values = values.map(swap32); + */ + } + + + msg.values = new Float32Array(msg, offset + 16, length * 4); // values reinterpreted as floats with 16byte offset in msg +} + +Msg.prototype.fromArrayBufferVector = function(blob) { + /* some local variables for parsing */ + var offset = 0; + var msgs = []; + + /* for every msg in vector */ + while (offset < msg.byteLength) { + var msg = Msg.fromArrayBuffer(ArrayBuffer(blob, offset)); + + if (msg != undefined) + msgs.push(msg); + + offset += msg.blobLength; + } + + return msgs; +} + +/** @todo parsing of big endian messages not yet supported */ +function swap16(val) { + return ((val & 0xFF) << 8) + | ((val >> 8) & 0xFF); +} + +function swap32(val) { + return ((val & 0xFF) << 24) + | ((val & 0xFF00) << 8) + | ((val >> 8) & 0xFF00) + | ((val >> 24) & 0xFF); +} + +/** @} */ \ No newline at end of file diff --git a/etc/ws.conf b/etc/ws.conf new file mode 100644 index 000000000..f44518786 --- /dev/null +++ b/etc/ws.conf @@ -0,0 +1,56 @@ + +############ Global Options ############ + +affinity = 0x02; # Mask of cores the server should run on + # This also maps the NIC interrupts to those cores! + +priority = 50; # Priority for the server tasks. + # Usually the server is using a real-time FIFO + # scheduling algorithm + +debug = 10; # The level of verbosity for debug messages + # Higher number => increased verbosity + +stats = 1; # The interval in seconds to print path statistics. + # A value of 0 disables the statistics. + + +############ nodes ############ + +nodes = { + file = { + type = "file", + in = { + path = "input.log", # These options specify the path prefix where the the files are stored + mode = "r", # The mode in which files should be opened (see open(2)) + } + }, + + ws = { + type = "websocket", + unit = "MVa", + units = [ "V", "A", "Var" ], + description = "Das ist ein Test", + source = { + simulator = "OP5600", + location = "ACS lab" + } + } + + ws2 = { + type = "websocket", + description = "Warum nicht noch einanderer?" + } +}; + + +############ List of paths ############ + +paths = ( + { in = "ws"; out = "ws"; hook = [ "ts", "fir:0" ]; rate = 20 } +); + +http = { + htdocs = "/s2ss/contrib/websocket", + port = 80 +} diff --git a/lib/websocket.c b/lib/websocket.c index d6f5f901b..18d557baa 100644 --- a/lib/websocket.c +++ b/lib/websocket.c @@ -258,12 +258,27 @@ found: * (void **) user = n; w = n->_vd; - if (!w->read.m) + if (!w->read.cnt) return 0; pthread_mutex_lock(&w->read.mutex); - - msg_scan(in, w->read.m, NULL, NULL); + + size_t offset = 0; + for (int i = 0; i < w->read.cnt; i++) { + struct msg *dst = pool_get_relative(w->read.pool, i); +#if 1 + struct msg *src = (char *) in + offset; + + memcpy(dst, src, MSG_LEN(src->values)); + + offset += MSG_LEN(src->values); + if (offset >= len) + break; +#else + /** @todo untested */ + msg_scan(in, dst, NULL, NULL); +#endif + } pthread_mutex_unlock(&w->read.mutex); pthread_cond_broadcast(&w->read.cond); @@ -379,31 +394,31 @@ int websocket_close(struct node *n) return 0; } -int websocket_read(struct node *n, struct msg *pool, int poolsize, int first, int cnt) +int websocket_read(struct node *n, struct pool *pool, int cnt) { struct websocket *w = n->_vd; - struct msg *m = pool + (first % poolsize); - w->read.m = m; + w->read.pool = pool; + w->read.cnt = cnt; pthread_cond_wait(&w->read.cond, &w->read.mutex); return 1; } -int websocket_write(struct node *n, struct msg *pool, int poolsize, int first, int cnt) +int websocket_write(struct node *n, struct pool *pool, int cnt) { struct websocket *w = n->_vd; struct msg *m = pool + (first % poolsize); pthread_mutex_lock(&w->write.mutex); - w->write.m = m; + w->write.pool = pool; + w->write.cnt = cnt; /* Notify all active websocket connections to send new data */ - list_foreach(struct lws *wsi, &w->connections) { + list_foreach(struct lws *wsi, &w->connections) lws_callback_on_writable(context, wsi); - } pthread_mutex_unlock(&w->write.mutex);