1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

nodes: use C++ compiler

This commit is contained in:
Steffen Vogel 2019-04-22 23:43:46 +02:00
parent dabb1c5454
commit 37c724f61b
26 changed files with 479 additions and 418 deletions

View file

@ -53,19 +53,19 @@ struct file {
size_t buffer_size_out; /**< Defines size of output stream buffer. No buffer is created if value is set to zero. */
size_t buffer_size_in; /**< Defines size of input stream buffer. No buffer is created if value is set to zero. */
enum epoch_mode {
FILE_EPOCH_DIRECT,
FILE_EPOCH_WAIT,
FILE_EPOCH_RELATIVE,
FILE_EPOCH_ABSOLUTE,
FILE_EPOCH_ORIGINAL
enum epoch {
DIRECT,
WAIT,
RELATIVE,
ABSOLUTE,
ORIGINAL
} epoch_mode; /**< Specifies how file::offset is calculated. */
enum {
FILE_EOF_STOP, /**< Terminate when EOF is reached. */
FILE_EOF_REWIND, /**< Rewind the file when EOF is reached. */
FILE_EOF_WAIT /**< Blocking wait when EOF is reached. */
} eof;
enum eof {
STOP, /**< Terminate when EOF is reached. */
REWIND, /**< Rewind the file when EOF is reached. */
SUSPEND /**< Blocking wait when EOF is reached. */
} eof_mode;
struct timespec first; /**< The first timestamp in the file file::{read,write}::uri */
struct timespec epoch; /**< The epoch timestamp from the configuration. */

View file

@ -51,28 +51,28 @@ extern "C" {
enum iec61850_type {
/* According to IEC 61850-7-2 */
IEC61850_TYPE_BOOLEAN,
IEC61850_TYPE_INT8,
IEC61850_TYPE_INT16,
IEC61850_TYPE_INT32,
IEC61850_TYPE_INT64,
IEC61850_TYPE_INT8U,
IEC61850_TYPE_INT16U,
IEC61850_TYPE_INT32U,
IEC61850_TYPE_INT64U,
IEC61850_TYPE_FLOAT32,
IEC61850_TYPE_FLOAT64,
IEC61850_TYPE_ENUMERATED,
IEC61850_TYPE_CODED_ENUM,
IEC61850_TYPE_OCTET_STRING,
IEC61850_TYPE_VISIBLE_STRING,
IEC61850_TYPE_OBJECTNAME,
IEC61850_TYPE_OBJECTREFERENCE,
IEC61850_TYPE_TIMESTAMP,
IEC61850_TYPE_ENTRYTIME,
BOOLEAN,
INT8,
INT16,
INT32,
INT64,
INT8U,
INT16U,
INT32U,
INT64U,
FLOAT32,
FLOAT64,
ENUMERATED,
CODED_ENUM,
OCTET_STRING,
VISIBLE_STRING,
OBJECTNAME,
OBJECTREFERENCE,
TIMESTAMP,
ENTRYTIME,
/* According to IEC 61850-8-1 */
IEC61850_TYPE_BITSTRING
BITSTRING
};
struct iec61850_type_descriptor {
@ -89,9 +89,9 @@ struct iec61850_receiver {
EthernetSocket socket;
enum iec61850_receiver_type {
IEC61850_RECEIVER_GOOSE,
IEC61850_RECEIVER_SV
enum type {
GOOSE,
SAMPLED_VALUES
} type;
union {
@ -110,9 +110,9 @@ const struct iec61850_type_descriptor * iec61850_lookup_type(const char *name);
int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct vlist *node_signals);
struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver_type t, const char *intf);
struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver::type t, const char *intf);
struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver_type t, const char *intf);
struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver::type t, const char *intf);
int iec61850_receiver_start(struct iec61850_receiver *r);

View file

@ -89,11 +89,11 @@ struct infiniband {
int use_fallback;
/* Counter to keep track of available recv. WRs */
int available_recv_wrs;
unsigned available_recv_wrs;
/* Fixed number to substract from min. number available
* WRs in receive queue */
int buffer_subtraction;
unsigned buffer_subtraction;
/* Unrealiable connectionless data */
struct ud_s {

View file

@ -40,17 +40,6 @@ extern "C" {
struct node;
struct sample;
enum signal_generator_type {
SIGNAL_GENERATOR_TYPE_RANDOM,
SIGNAL_GENERATOR_TYPE_SINE,
SIGNAL_GENERATOR_TYPE_SQUARE,
SIGNAL_GENERATOR_TYPE_TRIANGLE,
SIGNAL_GENERATOR_TYPE_RAMP,
SIGNAL_GENERATOR_TYPE_COUNTER,
SIGNAL_GENERATOR_TYPE_CONSTANT,
SIGNAL_GENERATOR_TYPE_MIXED
};
/** Node-type for signal generation.
* @see node_type
*/
@ -58,7 +47,17 @@ struct signal_generator {
struct task task; /**< Timer for periodic events. */
int rt; /**< Real-time mode? */
enum signal_generator_type type; /**< Signal type */
enum type {
RANDOM,
SINE,
SQUARE,
TRIANGLE,
RAMP,
COUNTER,
CONSTANT,
MIXED,
INVALID
} type; /**< Signal type */
double rate; /**< Sampling rate. */
double frequency; /**< Frequency of the generated signals. */
@ -69,12 +68,12 @@ struct signal_generator {
double *last; /**< The values from the previous period which are required for random walk. */
int values; /**< The number of values which will be emitted by this node. */
unsigned values; /**< The number of values which will be emitted by this node. */
int limit; /**< The number of values which should be generated by this node. <0 for infinitve. */
struct timespec started; /**< Point in time when this node was started. */
int counter; /**< The number of packets already emitted. */
int missed_steps; /**< Total number of missed steps. */
unsigned counter; /**< The number of packets already emitted. */
unsigned missed_steps; /**< Total number of missed steps. */
};
/** @see node_type::print */

View file

@ -57,19 +57,19 @@ struct websocket {
/* Internal datastructures */
struct websocket_connection {
enum websocket_connection_state {
WEBSOCKET_CONNECTION_STATE_DESTROYED,
WEBSOCKET_CONNECTION_STATE_INITIALIZED,
WEBSOCKET_CONNECTION_STATE_CONNECTING,
WEBSOCKET_CONNECTION_STATE_RECONNECTING,
WEBSOCKET_CONNECTION_STATE_ESTABLISHED,
WEBSOCKET_CONNECTION_STATE_SHUTDOWN,
WEBSOCKET_CONNECTION_STATE_ERROR
enum state {
DESTROYED,
INITIALIZED,
CONNECTING,
RECONNECTING,
ESTABLISHED,
SHUTDOWN,
ERROR
} state; /**< The current status of this connection. */
enum {
WEBSOCKET_MODE_CLIENT,
WEBSOCKET_MODE_SERVER,
enum mode {
CLIENT,
SERVER,
} mode;
struct lws *wsi;

View file

@ -63,10 +63,10 @@ struct zeromq {
} server, client;
} curve;
enum {
ZEROMQ_PATTERN_PUBSUB,
enum pattern {
PUBSUB,
#ifdef ZMQ_BUILD_DISH
ZEROMQ_PATTERN_RADIODISH
RADIODISH
#endif
} pattern;

View file

@ -29,17 +29,17 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <villas/pool.h>
#include <villas/queue.h>
#include <villas/queue_signalled.h>
#include <villas/sample.h>
#define DEFAULT_SHMEM_QUEUELEN 512
#define DEFAULT_SHMEM_SAMPLELEN 64
#define DEFAULT_SHMEM_QUEUELEN 512u
#define DEFAULT_SHMEM_SAMPLELEN 64u
#ifdef __cplusplus
extern "C" {
#endif
/** Struct containing all parameters that need to be known when creating a new
* shared memory object. */

View file

@ -28,7 +28,7 @@ if(LIBNL3_ROUTE_FOUND)
endif()
if(WITH_NODE_INFLUXDB)
list(APPEND NODE_SRC influxdb.c)
list(APPEND NODE_SRC influxdb.cpp)
endif()
if(WITH_NODE_STATS)
@ -36,11 +36,11 @@ if(WITH_NODE_STATS)
endif()
if(WITH_NODE_SIGNAL)
list(APPEND NODE_SRC signal_generator.c)
list(APPEND NODE_SRC signal_generator.cpp)
endif()
if(WITH_NODE_LOOPBACK)
list(APPEND NODE_SRC loopback.c)
list(APPEND NODE_SRC loopback.cpp)
endif()
if(WITH_NODE_TEST_RTT)
@ -48,23 +48,23 @@ if(WITH_NODE_TEST_RTT)
endif()
if(WITH_NODE_SOCKET)
list(APPEND NODE_SRC socket.c)
list(APPEND NODE_SRC socket.cpp)
endif()
if(WITH_NODE_FILE)
list(APPEND NODE_SRC file.c)
list(APPEND NODE_SRC file.cpp)
endif()
# Enable Universal Library for Linux DAQ devices (libuldaq)
if(WITH_NODE_ULDAQ)
list(APPEND NODE_SRC uldaq.c)
list(APPEND NODE_SRC uldaq.cpp)
list(APPEND INCLUDE_DIRS ${LIBULDAQ_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::LIBULDAQ uldaq)
endif()
# Enable shared memory node-type
if(WITH_NODE_SHMEM)
list(APPEND NODE_SRC shmem.c)
list(APPEND NODE_SRC shmem.cpp)
if(CMAKE_SUSTEM_NAME STREQUAL Linux)
list(APPEND LIBRARIES rt)
@ -73,70 +73,70 @@ endif()
# Enable IEC61850 node-types when libiec61850 is available
if(WITH_NODE_IEC61850)
list(APPEND NODE_SRC iec61850_sv.c iec61850.c)
list(APPEND NODE_SRC iec61850_sv.cpp iec61850.cpp)
list(APPEND INCLUDE_DIRS ${LIBIEC61850_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::LIBIEC61850 ${LIBIEC61850_LIBRARIES})
endif()
# Enable OPAL-RT Asynchronous Process support (will result in 32bit binary!!!)
if(WITH_NODE_OPAL)
list(APPEND NODE_SRC opal.c)
list(APPEND NODE_SRC opal.cpp)
list(APPEND INCLUDE_DIRS ${OPAL_INCLUDE_DIRS})
list(APPEND LIBRARIES ${OPAL_LIBRARIES})
endif()
# Enable nanomsg node type when libnanomsg is available
if(WITH_NODE_NANOMSG)
list(APPEND NODE_SRC nanomsg.c)
list(APPEND NODE_SRC nanomsg.cpp)
list(APPEND INCLUDE_DIRS ${NANOMSG_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::NANOMSG)
endif()
# Enable ZeroMQ node type when libzmq is available
if(WITH_NODE_ZEROMQ)
list(APPEND NODE_SRC zeromq.c)
list(APPEND NODE_SRC zeromq.cpp)
list(APPEND INCLUDE_DIRS ${LIBZMQ_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::LIBZMQ)
endif()
# Enable NGSI support
if(WITH_NODE_NGSI)
list(APPEND NODE_SRC ngsi.c)
list(APPEND NODE_SRC ngsi.cpp)
list(APPEND INCLUDE_DIRS ${CURL_INCLUDE_DIRS})
list(APPEND LIBRARIES ${CURL_LIBRARIES})
endif()
# Enable WebSocket support
if(WITH_NODE_WEBSOCKET)
list(APPEND NODE_SRC websocket.c)
list(APPEND NODE_SRC websocket.cpp)
list(APPEND INCLUDE_DIRS ${LIBWEBSOCKETS_INCLUDE_DIRS})
list(APPEND LIBRARIES ${LIBWEBSOCKETS_LDLIBS})
endif()
# Enable AMQP support
if(WITH_NODE_AMQP)
list(APPEND NODE_SRC amqp.c)
list(APPEND NODE_SRC amqp.cpp)
list(APPEND INCLUDE_DIRS ${RABBITMQ_C_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::RABBITMQ_C)
endif()
# Enable MQTT support
if(WITH_NODE_MQTT)
list(APPEND NODE_SRC mqtt.c)
list(APPEND NODE_SRC mqtt.cpp)
list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS})
list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES})
endif()
# Enable Comedi support
if(WITH_NODE_COMEDI)
list(APPEND NODE_SRC comedi.c)
list(APPEND NODE_SRC comedi.cpp)
list(APPEND INCLUDE_DIRS ${COMEDILIB_INCLUDE_DIRS})
list(APPEND LIBRARIES PkgConfig::COMEDILIB)
endif()
# Enable Infiniband support
if(WITH_NODE_INFINIBAND)
list(APPEND NODE_SRC infiniband.c)
list(APPEND NODE_SRC infiniband.cpp)
list(APPEND INCLUDE_DIRS ${IBVERBS_INCLUDE_DIRS} ${RDMACM_INCLUDE_DIRS})
list(APPEND LIBRARIES ${IBVERBS_LIBRARIES} ${RDMACM_LIBRARIES})
endif()

View file

@ -114,7 +114,7 @@ static int amqp_close(amqp_connection_state_t conn)
int amqp_parse(struct node *n, json_t *json)
{
int ret;
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
int port = 5672;
const char *format = "json";
@ -195,7 +195,7 @@ int amqp_parse(struct node *n, json_t *json)
char * amqp_print(struct node *n)
{
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
char *buf = NULL;
@ -233,7 +233,7 @@ char * amqp_print(struct node *n)
int amqp_start(struct node *n)
{
int ret;
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
amqp_bytes_t queue;
amqp_rpc_reply_t rep;
@ -293,7 +293,7 @@ int amqp_start(struct node *n)
int amqp_stop(struct node *n)
{
int ret;
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
ret = amqp_close(a->consumer);
if (ret)
@ -313,7 +313,7 @@ int amqp_stop(struct node *n)
int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
amqp_envelope_t env;
amqp_rpc_reply_t rep;
@ -321,7 +321,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
if (rep.reply_type != AMQP_RESPONSE_NORMAL)
return -1;
ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt);
ret = io_sscan(&a->io, static_cast<char *>(env.message.body.bytes), env.message.body.len, NULL, smps, cnt);
amqp_destroy_envelope(&env);
@ -331,7 +331,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int ret;
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
char data[1500];
size_t wbytes;
@ -358,7 +358,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
int amqp_poll_fds(struct node *n, int fds[])
{
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
amqp_socket_t *sock = amqp_get_socket(a->consumer);
@ -369,7 +369,7 @@ int amqp_poll_fds(struct node *n, int fds[])
int amqp_destroy(struct node *n)
{
struct amqp *a = n->_vd;
struct amqp *a = (struct amqp *) n->_vd;
if (a->uri)
free(a->uri);
@ -399,13 +399,13 @@ static struct plugin p = {
.node = {
.vectorize = 0,
.size = sizeof(struct amqp),
.destroy = amqp_destroy,
.parse = amqp_parse,
.print = amqp_print,
.start = amqp_start,
.stop = amqp_stop,
.read = amqp_read,
.write = amqp_write,
.destroy = amqp_destroy,
.poll_fds = amqp_poll_fds
}
};

View file

@ -70,10 +70,10 @@ static int comedi_parse_direction(struct comedi *c, struct comedi_direction *d,
return 0;
}
d->chanlist = alloc(d->chanlist_len * sizeof(*d->chanlist));
d->chanlist = (unsigned int*) alloc(d->chanlist_len * sizeof(*d->chanlist));
assert(d->chanlist != NULL);
d->chanspecs = alloc(d->chanlist_len * sizeof(*d->chanspecs));
d->chanspecs = (comedi_chanspec *) alloc(d->chanlist_len * sizeof(*d->chanspecs));
assert(d->chanspecs != NULL);
json_array_foreach(json_chans, i, json_chan) {
@ -109,7 +109,7 @@ static int comedi_start_common(struct node *n)
continue;
/* Sanity-check channel config and populate chanspec for later */
for (int i = 0; i < d->chanlist_len; i++) {
for (unsigned i = 0; i < d->chanlist_len; i++) {
const unsigned int channel = CR_CHAN(d->chanlist[i]);
const int range = CR_RANGE(d->chanlist[i]);
@ -234,7 +234,7 @@ static int comedi_start_in(struct node *n)
#if COMEDI_USE_READ
/* Be prepared to consume one entire buffer */
c->buf = alloc(c->in.buffer_size);
c->buf = (char *) alloc(c->in.buffer_size);
c->bufptr = c->buf;
assert(c->bufptr != NULL);
@ -324,12 +324,12 @@ static int comedi_start_out(struct node *n)
/* Allocate buffer for one complete villas sample */
/** @todo: maybe increase buffer size according to c->vectorize */
const size_t local_buffer_size = d->sample_size * d->chanlist_len;
d->buffer = alloc(local_buffer_size);
d->buffer = (char *) alloc(local_buffer_size);
d->bufptr = d->buffer;
assert(d->buffer != NULL);
/* Initialize local buffer used for write() syscalls */
for (int channel = 0; channel < d->chanlist_len; channel++) {
for (unsigned channel = 0; channel < d->chanlist_len; channel++) {
const unsigned raw = comedi_from_phys(0.0f, d->chanspecs[channel].range, d->chanspecs[channel].maxdata);
if (d->sample_size == sizeof(sampl_t))
@ -341,9 +341,9 @@ static int comedi_start_out(struct node *n)
}
/* Preload comedi output buffer */
for (int i = 0; i < d->buffer_size / local_buffer_size; i++) {
ret = write(comedi_fileno(c->dev), d->buffer, local_buffer_size);
if (ret != local_buffer_size) {
for (unsigned i = 0; i < d->buffer_size / local_buffer_size; i++) {
size_t written = write(comedi_fileno(c->dev), d->buffer, local_buffer_size);
if (written != local_buffer_size) {
error("Cannot preload Comedi buffer");
}
}
@ -584,7 +584,7 @@ int comedi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
smps[i]->capacity, d->chanlist_len);
}
for (int si = 0; si < d->chanlist_len; si++) {
for (unsigned si = 0; si < d->chanlist_len; si++) {
unsigned int raw;
if (d->sample_size == sizeof(sampl_t))
@ -874,7 +874,7 @@ int comedi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *
d->bufptr = d->buffer;
/* Move samples from villas into local buffer for comedi */
for (int si = 0; si < sample->length; si++) {
for (unsigned si = 0; si < sample->length; si++) {
unsigned raw_value = 0;
switch (sample_format(sample, si)) {
@ -910,15 +910,15 @@ int comedi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *
}
/* Try to write one complete villas sample to comedi */
ret = write(comedi_fileno(c->dev), d->buffer, villas_sample_size);
if (ret < 0)
size_t written = write(comedi_fileno(c->dev), d->buffer, villas_sample_size);
if (written < 0)
error("write");
else if (ret == 0)
else if (written == 0)
break; /* Comedi doesn't accept any more samples at the moment */
else if (ret == villas_sample_size)
else if (written == villas_sample_size)
villas_samples_written++;
else
error("Only partial sample written (%d bytes), oops", ret);
error("Only partial sample written (%zu bytes), oops", written);
}
if (villas_samples_written == 0) {

View file

@ -37,7 +37,7 @@
static char * file_format_name(const char *format, struct timespec *ts)
{
struct tm tm;
char *buf = alloc(FILE_MAX_PATHLEN);
char *buf = (char *) alloc(FILE_MAX_PATHLEN);
/* Convert time */
gmtime_r(&ts->tv_sec, &tm);
@ -47,26 +47,26 @@ static char * file_format_name(const char *format, struct timespec *ts)
return buf;
}
static struct timespec file_calc_offset(const struct timespec *first, const struct timespec *epoch, enum epoch_mode mode)
static struct timespec file_calc_offset(const struct timespec *first, const struct timespec *epoch, enum file::epoch mode)
{
/* Get current time */
struct timespec now = time_now();
struct timespec offset;
/* Set offset depending on epoch_mode */
/* Set offset depending on epoch */
switch (mode) {
case FILE_EPOCH_DIRECT: /* read first value at now + epoch */
case file::epoch::DIRECT: /* read first value at now + epoch */
offset = time_diff(first, &now);
return time_add(&offset, epoch);
case FILE_EPOCH_WAIT: /* read first value at now + first + epoch */
case file::epoch::WAIT: /* read first value at now + first + epoch */
offset = now;
return time_add(&now, epoch);
case FILE_EPOCH_RELATIVE: /* read first value at first + epoch */
case file::epoch::RELATIVE: /* read first value at first + epoch */
return *epoch;
case FILE_EPOCH_ABSOLUTE: /* read first value at f->epoch */
case file::epoch::ABSOLUTE: /* read first value at f->epoch */
return time_diff(first, epoch);
default:
@ -76,7 +76,7 @@ static struct timespec file_calc_offset(const struct timespec *first, const stru
int file_parse(struct node *n, json_t *cfg)
{
struct file *f = n->_vd;
struct file *f = (struct file *) n->_vd;
int ret;
json_error_t err;
@ -84,13 +84,13 @@ int file_parse(struct node *n, json_t *cfg)
const char *uri_tmpl = NULL;
const char *format = "villas.human";
const char *eof = NULL;
const char *epoch_mode = NULL;
const char *epoch = NULL;
double epoch_flt = 0;
/* Default values */
f->rate = 0;
f->eof = FILE_EOF_STOP;
f->epoch_mode = FILE_EPOCH_DIRECT;
f->eof_mode = file::eof::STOP;
f->epoch_mode = file::epoch::DIRECT;
f->flush = 0;
f->buffer_size_in = 0;
f->buffer_size_out = 0;
@ -101,7 +101,7 @@ int file_parse(struct node *n, json_t *cfg)
"in",
"eof", &eof,
"rate", &f->rate,
"epoch_mode", &epoch_mode,
"epoch", &epoch,
"epoch", &epoch_flt,
"buffer_size", &f->buffer_size_in,
"out",
@ -120,28 +120,28 @@ int file_parse(struct node *n, json_t *cfg)
if (eof) {
if (!strcmp(eof, "exit") || !strcmp(eof, "stop"))
f->eof = FILE_EOF_STOP;
f->eof_mode = file::eof::STOP;
else if (!strcmp(eof, "rewind"))
f->eof = FILE_EOF_REWIND;
f->eof_mode = file::eof::REWIND;
else if (!strcmp(eof, "wait"))
f->eof = FILE_EOF_WAIT;
f->eof_mode = file::eof::SUSPEND;
else
error("Invalid mode '%s' for 'eof' setting of node %s", eof, node_name(n));
}
if (epoch_mode) {
if (!strcmp(epoch_mode, "direct"))
f->epoch_mode = FILE_EPOCH_DIRECT;
else if (!strcmp(epoch_mode, "wait"))
f->epoch_mode = FILE_EPOCH_WAIT;
else if (!strcmp(epoch_mode, "relative"))
f->epoch_mode = FILE_EPOCH_RELATIVE;
else if (!strcmp(epoch_mode, "absolute"))
f->epoch_mode = FILE_EPOCH_ABSOLUTE;
else if (!strcmp(epoch_mode, "original"))
f->epoch_mode = FILE_EPOCH_ORIGINAL;
if (epoch) {
if (!strcmp(epoch, "direct"))
f->epoch_mode = file::epoch::DIRECT;
else if (!strcmp(epoch, "wait"))
f->epoch_mode = file::epoch::WAIT;
else if (!strcmp(epoch, "relative"))
f->epoch_mode = file::epoch::RELATIVE;
else if (!strcmp(epoch, "absolute"))
f->epoch_mode = file::epoch::ABSOLUTE;
else if (!strcmp(epoch, "original"))
f->epoch_mode = file::epoch::ORIGINAL;
else
error("Invalid value '%s' for setting 'epoch_mode' of node %s", epoch_mode, node_name(n));
error("Invalid value '%s' for setting 'epoch' of node %s", epoch, node_name(n));
}
n->_vd = f;
@ -158,20 +158,50 @@ char * file_print(struct node *n)
const char *eof_str = NULL;
switch (f->epoch_mode) {
case FILE_EPOCH_DIRECT: epoch_str = "direct"; break;
case FILE_EPOCH_WAIT: epoch_str = "wait"; break;
case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break;
case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break;
case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break;
case file::epoch::DIRECT:
epoch_str = "direct";
break;
case file::epoch::WAIT:
epoch_str = "wait";
break;
case file::epoch::RELATIVE:
epoch_str = "relative";
break;
case file::epoch::ABSOLUTE:
epoch_str = "absolute";
break;
case file::epoch::ORIGINAL:
epoch_str = "original";
break;
default:
epoch_str = "";
break;
}
switch (f->eof) {
case FILE_EOF_STOP: eof_str = "stop"; break;
case FILE_EOF_WAIT: eof_str = "wait"; break;
case FILE_EOF_REWIND: eof_str = "rewind"; break;
switch (f->eof_mode) {
case file::eof::STOP:
eof_str = "stop";
break;
case file::eof::SUSPEND:
eof_str = "wait";
break;
case file::eof::REWIND:
eof_str = "rewind";
break;
default:
eof_str = "";
break;
}
strcatf(&buf, "uri=%s, format=%s, flush=%s, eof=%s, epoch_mode=%s, epoch=%.2f",
strcatf(&buf, "uri=%s, format=%s, flush=%s, eof=%s, epoch=%s, epoch=%.2f",
f->uri ? f->uri : f->uri_tmpl,
format_type_name(f->format),
f->flush ? "yes" : "no",
@ -273,7 +303,7 @@ int file_start(struct node *n)
serror("Failed to create timer");
/* Get timestamp of first line */
if (f->epoch_mode != FILE_EPOCH_ORIGINAL) {
if (f->epoch_mode != file::epoch::ORIGINAL) {
io_rewind(&f->io);
struct sample s = { .capacity = 0 };
@ -341,15 +371,15 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
retry: ret = io_scan(&f->io, smps, cnt);
if (ret <= 0) {
if (io_eof(&f->io)) {
switch (f->eof) {
case FILE_EOF_REWIND:
switch (f->eof_mode) {
case file::eof::REWIND:
info("Rewind input file of node %s", node_name(n));
f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode);
io_rewind(&f->io);
goto retry;
case FILE_EOF_WAIT:
case file::eof::SUSPEND:
/* We wait 10ms before fetching again. */
usleep(100000);
@ -369,12 +399,14 @@ retry: ret = io_scan(&f->io, smps, cnt);
goto retry;
case FILE_EOF_STOP:
case file::eof::STOP:
info("Reached end-of-file.");
n->state = STATE_STOPPING;
return -1;
default: { }
}
}
else
@ -384,7 +416,7 @@ retry: ret = io_scan(&f->io, smps, cnt);
}
/* We dont wait in FILE_EPOCH_ORIGINAL mode */
if (f->epoch_mode == FILE_EPOCH_ORIGINAL)
if (f->epoch_mode == file::epoch::ORIGINAL)
return cnt;
if (f->rate) {
@ -428,16 +460,16 @@ int file_poll_fds(struct node *n, int fds[])
if (f->rate) {
fds[0] = task_fd(&f->task);
return 1;
}
else {
if (f->epoch_mode == FILE_EPOCH_ORIGINAL) {
fds[0] = io_fd(&f->io);
return 1;
}
else
return -1; /** @todo not supported yet */
else if (f->epoch_mode == file::epoch::ORIGINAL) {
fds[0] = io_fd(&f->io);
return 1;
}
return -1; /** @todo not supported yet */
}
static struct plugin p = {
@ -450,8 +482,8 @@ static struct plugin p = {
.parse = file_parse,
.print = file_print,
.start = file_start,
.stop = file_stop,
.restart = file_restart,
.stop = file_stop,
.read = file_read,
.write = file_write,
.poll_fds = file_poll_fds

View file

@ -37,26 +37,26 @@
const struct iec61850_type_descriptor type_descriptors[] = {
/* name, iec_type, type, size, supported */
{ "boolean", IEC61850_TYPE_BOOLEAN, SIGNAL_TYPE_BOOLEAN, 1, false, false },
{ "int8", IEC61850_TYPE_INT8, SIGNAL_TYPE_INTEGER, 1, false, false },
{ "int16", IEC61850_TYPE_INT16, SIGNAL_TYPE_INTEGER, 2, false, false },
{ "int32", IEC61850_TYPE_INT32, SIGNAL_TYPE_INTEGER, 4, false, false },
{ "int64", IEC61850_TYPE_INT64, SIGNAL_TYPE_INTEGER, 8, false, false },
{ "int8u", IEC61850_TYPE_INT8U, SIGNAL_TYPE_INTEGER, 1, false, false },
{ "int16u", IEC61850_TYPE_INT16U, SIGNAL_TYPE_INTEGER, 2, false, false },
{ "int32u", IEC61850_TYPE_INT32U, SIGNAL_TYPE_INTEGER, 4, false, false },
{ "int64u", IEC61850_TYPE_INT64U, SIGNAL_TYPE_INTEGER, 8, false, false },
{ "float32", IEC61850_TYPE_FLOAT32, SIGNAL_TYPE_FLOAT, 4, false, false },
{ "float64", IEC61850_TYPE_FLOAT64, SIGNAL_TYPE_FLOAT, 8, false, false },
{ "enumerated", IEC61850_TYPE_ENUMERATED, SIGNAL_TYPE_INVALID, 4, false, false },
{ "coded_enum", IEC61850_TYPE_CODED_ENUM, SIGNAL_TYPE_INVALID, 4, false, false },
{ "octet_string", IEC61850_TYPE_OCTET_STRING, SIGNAL_TYPE_INVALID, 20, false, false },
{ "visible_string", IEC61850_TYPE_VISIBLE_STRING, SIGNAL_TYPE_INVALID, 35, false, false },
{ "objectname", IEC61850_TYPE_OBJECTNAME, SIGNAL_TYPE_INVALID, 20, false, false },
{ "objectreference", IEC61850_TYPE_OBJECTREFERENCE, SIGNAL_TYPE_INVALID, 20, false, false },
{ "timestamp", IEC61850_TYPE_TIMESTAMP, SIGNAL_TYPE_INVALID, 8, false, false },
{ "entrytime", IEC61850_TYPE_ENTRYTIME, SIGNAL_TYPE_INVALID, 6, false, false },
{ "bitstring", IEC61850_TYPE_BITSTRING, SIGNAL_TYPE_INVALID, 4, false, false }
{ "boolean", iec61850_type::BOOLEAN, SIGNAL_TYPE_BOOLEAN, 1, false, false },
{ "int8", iec61850_type::INT8, SIGNAL_TYPE_INTEGER, 1, false, false },
{ "int16", iec61850_type::INT16, SIGNAL_TYPE_INTEGER, 2, false, false },
{ "int32", iec61850_type::INT32, SIGNAL_TYPE_INTEGER, 4, false, false },
{ "int64", iec61850_type::INT64, SIGNAL_TYPE_INTEGER, 8, false, false },
{ "int8u", iec61850_type::INT8U, SIGNAL_TYPE_INTEGER, 1, false, false },
{ "int16u", iec61850_type::INT16U, SIGNAL_TYPE_INTEGER, 2, false, false },
{ "int32u", iec61850_type::INT32U, SIGNAL_TYPE_INTEGER, 4, false, false },
{ "int64u", iec61850_type::INT64U, SIGNAL_TYPE_INTEGER, 8, false, false },
{ "float32", iec61850_type::FLOAT32, SIGNAL_TYPE_FLOAT, 4, false, false },
{ "float64", iec61850_type::FLOAT64, SIGNAL_TYPE_FLOAT, 8, false, false },
{ "enumerated", iec61850_type::ENUMERATED, SIGNAL_TYPE_INVALID, 4, false, false },
{ "coded_enum", iec61850_type::CODED_ENUM, SIGNAL_TYPE_INVALID, 4, false, false },
{ "octet_string", iec61850_type::OCTET_STRING, SIGNAL_TYPE_INVALID, 20, false, false },
{ "visible_string", iec61850_type::VISIBLE_STRING, SIGNAL_TYPE_INVALID, 35, false, false },
{ "objectname", iec61850_type::OBJECTNAME, SIGNAL_TYPE_INVALID, 20, false, false },
{ "objectreference", iec61850_type::OBJECTREFERENCE, SIGNAL_TYPE_INVALID, 20, false, false },
{ "timestamp", iec61850_type::TIMESTAMP, SIGNAL_TYPE_INVALID, 8, false, false },
{ "entrytime", iec61850_type::ENTRYTIME, SIGNAL_TYPE_INVALID, 6, false, false },
{ "bitstring", iec61850_type::BITSTRING, SIGNAL_TYPE_INVALID, 4, false, false }
};
/** Each network interface needs a separate receiver */
@ -78,8 +78,8 @@ static void * iec61850_thread(void *ctx)
struct iec61850_receiver *r = (struct iec61850_receiver *) vlist_at(&receivers, i);
switch (r->type) {
case IEC61850_RECEIVER_GOOSE: GooseReceiver_tick(r->goose); break;
case IEC61850_RECEIVER_SV: SVReceiver_tick(r->sv); break;
case iec61850_receiver::type::GOOSE: GooseReceiver_tick(r->goose); break;
case iec61850_receiver::type::SAMPLED_VALUES: SVReceiver_tick(r->sv); break;
}
}
}
@ -122,7 +122,7 @@ int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct v
if (!node_signals)
return -1;
sig = vlist_at(node_signals, i);
sig = (struct signal *) vlist_at(node_signals, i);
if (!sig)
return -1;
@ -162,7 +162,7 @@ int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct v
if (!td)
return -1;
for (int i = 0; i < vlist_length(node_signals); i++) {
for (unsigned i = 0; i < vlist_length(node_signals); i++) {
vlist_push(signals, (void *) td);
total_size += td->size;
@ -224,11 +224,11 @@ int iec61850_type_stop()
int iec61850_receiver_start(struct iec61850_receiver *r)
{
switch (r->type) {
case IEC61850_RECEIVER_GOOSE:
case iec61850_receiver::type::GOOSE:
r->socket = GooseReceiver_startThreadless(r->goose);
break;
case IEC61850_RECEIVER_SV:
case iec61850_receiver::type::SAMPLED_VALUES:
r->socket = SVReceiver_startThreadless(r->sv);
break;
}
@ -243,11 +243,11 @@ int iec61850_receiver_stop(struct iec61850_receiver *r)
EthernetHandleSet_removeSocket(hset, r->socket);
switch (r->type) {
case IEC61850_RECEIVER_GOOSE:
case iec61850_receiver::type::GOOSE:
GooseReceiver_stopThreadless(r->goose);
break;
case IEC61850_RECEIVER_SV:
case iec61850_receiver::type::SAMPLED_VALUES:
SVReceiver_stopThreadless(r->sv);
break;
}
@ -258,11 +258,11 @@ int iec61850_receiver_stop(struct iec61850_receiver *r)
int iec61850_receiver_destroy(struct iec61850_receiver *r)
{
switch (r->type) {
case IEC61850_RECEIVER_GOOSE:
case iec61850_receiver::type::GOOSE:
GooseReceiver_destroy(r->goose);
break;
case IEC61850_RECEIVER_SV:
case iec61850_receiver::type::SAMPLED_VALUES:
SVReceiver_destroy(r->sv);
break;
}
@ -272,7 +272,7 @@ int iec61850_receiver_destroy(struct iec61850_receiver *r)
return 0;
}
struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver_type t, const char *intf)
struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver::type t, const char *intf)
{
for (unsigned i = 0; i < vlist_length(&receivers); i++) {
struct iec61850_receiver *r = (struct iec61850_receiver *) vlist_at(&receivers, i);
@ -284,14 +284,14 @@ struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver_type
return NULL;
}
struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver_type t, const char *intf)
struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver::type t, const char *intf)
{
struct iec61850_receiver *r;
/* Check if there is already a SVReceiver for this interface */
r = iec61850_receiver_lookup(t, intf);
if (!r) {
r = alloc(sizeof(struct iec61850_receiver));
r = (struct iec61850_receiver *) alloc(sizeof(struct iec61850_receiver));
if (!r)
return NULL;
@ -299,12 +299,12 @@ struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver_type
r->type = t;
switch (r->type) {
case IEC61850_RECEIVER_GOOSE:
case iec61850_receiver::type::GOOSE:
r->goose = GooseReceiver_create();
GooseReceiver_setInterfaceId(r->goose, r->interface);
break;
case IEC61850_RECEIVER_SV:
case iec61850_receiver::type::SAMPLED_VALUES:
r->sv = SVReceiver_create();
SVReceiver_setInterfaceId(r->sv, r->interface);
break;

View file

@ -92,35 +92,35 @@ static void iec61850_sv_listener(SVSubscriber subscriber, void *ctx, SVSubscribe
continue;
switch (td->iec_type) {
case IEC61850_TYPE_INT8:
case iec61850_type::INT8:
smp->data[j].i = SVSubscriber_ASDU_getINT8(asdu, offset);
break;
case IEC61850_TYPE_INT16:
case iec61850_type::INT16:
smp->data[j].i = SVSubscriber_ASDU_getINT16(asdu, offset);
break;
case IEC61850_TYPE_INT32:
case iec61850_type::INT32:
smp->data[j].i = SVSubscriber_ASDU_getINT32(asdu, offset);
break;
case IEC61850_TYPE_INT8U:
case iec61850_type::INT8U:
smp->data[j].i = SVSubscriber_ASDU_getINT8U(asdu, offset);
break;
case IEC61850_TYPE_INT16U:
case iec61850_type::INT16U:
smp->data[j].i = SVSubscriber_ASDU_getINT16U(asdu, offset);
break;
case IEC61850_TYPE_INT32U:
case iec61850_type::INT32U:
smp->data[j].i = SVSubscriber_ASDU_getINT32U(asdu, offset);
break;
case IEC61850_TYPE_FLOAT32:
case iec61850_type::FLOAT32:
smp->data[j].f = SVSubscriber_ASDU_getFLOAT32(asdu, offset);
break;
case IEC61850_TYPE_FLOAT64:
case iec61850_type::FLOAT64:
smp->data[j].f = SVSubscriber_ASDU_getFLOAT64(asdu, offset);
break;
@ -280,19 +280,19 @@ int iec61850_sv_start(struct node *n)
struct iec61850_type_descriptor *td = (struct iec61850_type_descriptor *) vlist_at(&i->out.signals, k);
switch (td->iec_type) {
case IEC61850_TYPE_INT8:
case iec61850_type::INT8:
SVPublisher_ASDU_addINT8(i->out.asdu);
break;
case IEC61850_TYPE_INT32:
case iec61850_type::INT32:
SVPublisher_ASDU_addINT32(i->out.asdu);
break;
case IEC61850_TYPE_FLOAT32:
case iec61850_type::FLOAT32:
SVPublisher_ASDU_addFLOAT(i->out.asdu);
break;
case IEC61850_TYPE_FLOAT64:
case iec61850_type::FLOAT64:
SVPublisher_ASDU_addFLOAT64(i->out.asdu);
break;
@ -314,7 +314,7 @@ int iec61850_sv_start(struct node *n)
/* Start subscriber */
if (i->in.enabled) {
struct iec61850_receiver *r = iec61850_receiver_create(IEC61850_RECEIVER_SV, i->interface);
struct iec61850_receiver *r = iec61850_receiver_create(iec61850_receiver::type::SAMPLED_VALUES, i->interface);
i->in.receiver = r->sv;
i->in.subscriber = SVSubscriber_create(i->dst_address.ether_addr_octet, i->app_id);
@ -414,13 +414,13 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig
double fval = 0;
switch (td->iec_type) {
case IEC61850_TYPE_INT8:
case IEC61850_TYPE_INT32:
case iec61850_type::INT8:
case iec61850_type::INT32:
ival = sample_format(smps[j], k) == SIGNAL_TYPE_FLOAT ? smps[j]->data[k].f : smps[j]->data[k].i;
break;
case IEC61850_TYPE_FLOAT32:
case IEC61850_TYPE_FLOAT64:
case iec61850_type::FLOAT32:
case iec61850_type::FLOAT64:
fval = sample_format(smps[j], k) == SIGNAL_TYPE_FLOAT ? smps[j]->data[k].f : smps[j]->data[k].i;
break;
@ -428,19 +428,19 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig
}
switch (td->iec_type) {
case IEC61850_TYPE_INT8:
case iec61850_type::INT8:
SVPublisher_ASDU_setINT8(i->out.asdu, offset, ival);
break;
case IEC61850_TYPE_INT32:
case iec61850_type::INT32:
SVPublisher_ASDU_setINT32(i->out.asdu, offset, ival);
break;
case IEC61850_TYPE_FLOAT32:
case iec61850_type::FLOAT32:
SVPublisher_ASDU_setFLOAT(i->out.asdu, offset, fval);
break;
case IEC61850_TYPE_FLOAT64:
case iec61850_type::FLOAT64:
SVPublisher_ASDU_setFLOAT64(i->out.asdu, offset, fval);
break;
@ -480,13 +480,15 @@ static struct plugin p = {
.node = {
.vectorize = 0,
.size = sizeof(struct iec61850_sv),
.type.start = iec61850_type_start,
.type.stop = iec61850_type_stop,
.type = {
.start = iec61850_type_start,
.stop = iec61850_type_stop
},
.destroy = iec61850_sv_destroy,
.parse = iec61850_sv_parse,
.print = iec61850_sv_print,
.start = iec61850_sv_start,
.stop = iec61850_sv_stop,
.destroy = iec61850_sv_destroy,
.read = iec61850_sv_read,
.write = iec61850_sv_write,
.poll_fds = iec61850_sv_poll_fds

View file

@ -351,8 +351,8 @@ int ib_check(struct node *n)
error("The buffer substraction value cannot be bigger than in.max_wrs - in.vectorize");
/* Check if the set value is a power of 2, and warn the user if this is not the case */
int max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr)));
int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
unsigned max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr)));
unsigned max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
if (ib->qp_init.cap.max_send_wr != max_send_pow) {
warning("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
@ -513,35 +513,35 @@ void * ib_rdma_cm_event_thread(void *n)
switch(event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
ret = ib_addr_resolved(n);
ret = ib_addr_resolved(node);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
warning("Address resolution (rdma_resolve_addr) failed!");
ib_continue_as_listen(n, event);
ib_continue_as_listen(node, event);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
ret = ib_route_resolved(n);
ret = ib_route_resolved(node);
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
warning("Route resolution (rdma_resovle_route) failed!");
ib_continue_as_listen(n, event);
ib_continue_as_listen(node, event);
break;
case RDMA_CM_EVENT_UNREACHABLE:
warning("Remote server unreachable!");
ib_continue_as_listen(n, event);
ib_continue_as_listen(node, event);
break;
case RDMA_CM_EVENT_CONNECT_REQUEST:
ret = ib_connect_request(n, event->id);
ret = ib_connect_request(node, event->id);
/* A target UDP node will never really connect. In order to receive data,
* we set it to connected after it answered the connection request
@ -557,14 +557,14 @@ void * ib_rdma_cm_event_thread(void *n)
case RDMA_CM_EVENT_CONNECT_ERROR:
warning("An error has occurred trying to establish a connection!");
ib_continue_as_listen(n, event);
ib_continue_as_listen(node, event);
break;
case RDMA_CM_EVENT_REJECTED:
warning("Connection request or response was rejected by the remote end point!");
ib_continue_as_listen(n, event);
ib_continue_as_listen(node, event);
break;
@ -577,14 +577,14 @@ void * ib_rdma_cm_event_thread(void *n)
node->state = STATE_CONNECTED;
info("Connection established in node %s", node_name(n));
info("Connection established in node %s", node_name(node));
break;
case RDMA_CM_EVENT_DISCONNECTED:
node->state = STATE_STARTED;
ret = ib_disconnect(n);
ret = ib_disconnect(node);
if (!ret)
info("Host disconnected. Ready to accept new connections.");
@ -874,7 +874,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
struct ibv_mr *mr;
int ret;
int sent = 0; /* Used for first loop: prepare work requests to post to send queue */
unsigned sent = 0; /* Used for first loop: prepare work requests to post to send queue */
debug(LOG_IB | 10, "ib_write is called");
@ -951,7 +951,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele
/* Reorder list. Place inline and unposted samples to the top
* m will always be equal or smaller than *release
*/
for (int m = 0; m < cnt; m++) {
for (unsigned m = 0; m < cnt; m++) {
/* We can't use wr_id as identifier, since it is 0 for inline
* elements
*/
@ -1007,15 +1007,15 @@ static struct plugin p = {
.vectorize = 0,
.size = sizeof(struct infiniband),
.pool_size = 8192,
.reverse = ib_reverse,
.destroy = ib_destroy,
.parse = ib_parse,
.check = ib_check,
.print = ib_print,
.start = ib_start,
.destroy = ib_destroy,
.stop = ib_stop,
.read = ib_read,
.write = ib_write,
.reverse = ib_reverse,
.memory_type = memory_ib
}
};

View file

@ -124,14 +124,14 @@ int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned
char *buf = NULL;
ssize_t sentlen, buflen;
for (int k = 0; k < cnt; k++) {
for (unsigned k = 0; k < cnt; k++) {
struct sample *smp = smps[k];
/* Key */
strcatf(&buf, "%s", i->key);
/* Fields */
for (int j = 0; j < smp->length; j++) {
for (unsigned j = 0; j < smp->length; j++) {
struct signal *sig = (struct signal *) vlist_at(smp->signals, j);
union signal_data *data = &smp->data[k];

View file

@ -117,11 +117,11 @@ int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
{
int copied;
struct loopback *l = (struct loopback *) n->_vd;
struct sample *copies[cnt];
unsigned copied;
copied = sample_alloc_many(&l->pool, copies, cnt);
if (copied < cnt)
warning("Pool underrun for node %s", node_name(n));

View file

@ -92,7 +92,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
return;
}
ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, smps, n->in.vectorize);
ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, NULL, smps, n->in.vectorize);
if (ret < 0) {
warning("MQTT: Node %s received an invalid message", node_name(n));
warning(" Payload: %s", (char *) msg->payload);
@ -104,7 +104,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
return;
}
queue_signalled_push_many(&m->queue, (void *) smps, n->in.vectorize);
queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize);
}
static void mqtt_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
@ -454,17 +454,19 @@ static struct plugin p = {
.node = {
.vectorize = 0,
.size = sizeof(struct mqtt),
.type.start = mqtt_type_start,
.type.stop = mqtt_type_stop,
.reverse = mqtt_reverse,
.type ={
.start = mqtt_type_start,
.stop = mqtt_type_stop
},
.destroy = mqtt_destroy,
.parse = mqtt_parse,
.check = mqtt_check,
.print = mqtt_print,
.start = mqtt_start,
.destroy = mqtt_destroy,
.stop = mqtt_stop,
.read = mqtt_read,
.write = mqtt_write,
.reverse = mqtt_reverse,
.poll_fds = mqtt_poll_fds
}
};

View file

@ -37,8 +37,8 @@ int nanomsg_reverse(struct node *n)
vlist_length(&m->in.endpoints) != 1)
return -1;
char *subscriber = vlist_first(&m->in.endpoints);
char *publisher = vlist_first(&m->out.endpoints);
char *subscriber = (char *) vlist_first(&m->in.endpoints);
char *publisher = (char *) vlist_first(&m->out.endpoints);
vlist_set(&m->in.endpoints, 0, publisher);
vlist_set(&m->out.endpoints, 0, subscriber);
@ -297,14 +297,16 @@ static struct plugin p = {
.node = {
.vectorize = 0,
.size = sizeof(struct nanomsg),
.type.stop = nanomsg_type_stop,
.reverse = nanomsg_reverse,
.type = {
.stop = nanomsg_type_stop
},
.parse = nanomsg_parse,
.print = nanomsg_print,
.start = nanomsg_start,
.stop = nanomsg_stop,
.read = nanomsg_read,
.write = nanomsg_write,
.reverse = nanomsg_reverse,
.poll_fds = nanomsg_poll_fds,
.netem_fds = nanomsg_netem_fds
}

View file

@ -83,7 +83,7 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct sample *smps[], unsigned
if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */
json_t *values = json_array();
for (int k = 0; k < cnt; k++) {
for (unsigned k = 0; k < cnt; k++) {
json_array_append_new(values, json_pack("[ f, f, i ]",
time_to_double(&smps[k]->ts.origin),
smps[k]->data[map->index].f,
@ -139,7 +139,7 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps
if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type))
return -2;
for (int k = 0; k < cnt; k++)
for (unsigned k = 0; k < cnt; k++)
smps[k]->length = json_array_size(attributes);
json_array_foreach(attributes, l, attribute) {
@ -158,7 +158,7 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps
return -3;
/* Check attribute name and type */
map = vlist_lookup(&i->mapping, name);
map = (struct ngsi_attribute *) vlist_lookup(&i->mapping, name);
if (!map || strcmp(map->type, type))
return -4;
@ -237,15 +237,15 @@ static int ngsi_parse_mapping(struct vlist *mapping, json_t *cfg)
/* Metadata: source(string)=name */
struct ngsi_metadata s = {
.name = "source",
.type = "string",
.name = strdup("source"),
.type = strdup("string"),
.value = name
};
/* Metadata: index(integer)=j */
struct ngsi_metadata i = {
.name = "index",
.type = "integer"
.name = strdup("index"),
.type = strdup("integer")
};
assert(asprintf(&i.value, "%zu", j));
@ -289,7 +289,7 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi
size_t realsize = size * nmemb;
struct ngsi_response *mem = (struct ngsi_response *) userp;
mem->data = realloc(mem->data, mem->len + realsize + 1);
mem->data = (char *) realloc(mem->data, mem->len + realsize + 1);
if (mem->data == NULL) /* out of memory! */
error("Not enough memory (realloc returned NULL)");
@ -592,8 +592,10 @@ static struct plugin p = {
.node = {
.vectorize = 0, /* unlimited */
.size = sizeof(struct ngsi),
.type.start = ngsi_type_start,
.type.stop = ngsi_type_stop,
.type = {
.start = ngsi_type_start,
.stop = ngsi_type_stop
},
.parse = ngsi_parse,
.print = ngsi_print,
.start = ngsi_start,

View file

@ -78,7 +78,7 @@ int shmem_parse(struct node *n, json_t *cfg)
if (!json_is_array(json_exec))
error("Setting 'exec' of node %s must be an array of strings", node_name(n));
shm->exec = alloc(sizeof(char *) * (json_array_size(json_exec) + 1));
shm->exec = (char **) alloc(sizeof(char *) * (json_array_size(json_exec) + 1));
size_t i;
json_t *json_val;
@ -161,7 +161,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
int avail, pushed, copied;
avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
if (avail != (int) cnt)
warning("Pool underrun for shmem node %s", shm->out_name);
copied = sample_copy_many(shared_smps, smps, avail);

View file

@ -28,53 +28,53 @@
#include <villas/plugin.h>
#include <villas/nodes/signal_generator.h>
static enum signal_generator_type signal_generator_lookup_type(const char *type)
static enum signal_generator::type signal_generator_lookup_type(const char *type)
{
if (!strcmp(type, "random"))
return SIGNAL_GENERATOR_TYPE_RANDOM;
return signal_generator::type::RANDOM;
else if (!strcmp(type, "sine"))
return SIGNAL_GENERATOR_TYPE_SINE;
return signal_generator::type::SINE;
else if (!strcmp(type, "square"))
return SIGNAL_GENERATOR_TYPE_SQUARE;
return signal_generator::type::SQUARE;
else if (!strcmp(type, "triangle"))
return SIGNAL_GENERATOR_TYPE_TRIANGLE;
return signal_generator::type::TRIANGLE;
else if (!strcmp(type, "ramp"))
return SIGNAL_GENERATOR_TYPE_RAMP;
return signal_generator::type::RAMP;
else if (!strcmp(type, "counter"))
return SIGNAL_GENERATOR_TYPE_COUNTER;
return signal_generator::type::COUNTER;
else if (!strcmp(type, "constant"))
return SIGNAL_GENERATOR_TYPE_CONSTANT;
return signal_generator::type::CONSTANT;
else if (!strcmp(type, "mixed"))
return SIGNAL_GENERATOR_TYPE_MIXED;
return signal_generator::type::MIXED;
else
return -1;
return signal_generator::type::INVALID;
}
static const char * signal_generator_type_str(enum signal_generator_type type)
static const char * signal_generator_type_str(enum signal_generator::type type)
{
switch (type) {
case SIGNAL_GENERATOR_TYPE_CONSTANT:
case signal_generator::type::CONSTANT:
return "constant";
case SIGNAL_GENERATOR_TYPE_SINE:
case signal_generator::type::SINE:
return "sine";
case SIGNAL_GENERATOR_TYPE_TRIANGLE:
case signal_generator::type::TRIANGLE:
return "triangle";
case SIGNAL_GENERATOR_TYPE_SQUARE:
case signal_generator::type::SQUARE:
return "square";
case SIGNAL_GENERATOR_TYPE_RAMP:
case signal_generator::type::RAMP:
return "ramp";
case SIGNAL_GENERATOR_TYPE_COUNTER:
case signal_generator::type::COUNTER:
return "counter";
case SIGNAL_GENERATOR_TYPE_RANDOM:
case signal_generator::type::RANDOM:
return "random";
case SIGNAL_GENERATOR_TYPE_MIXED:
case signal_generator::type::MIXED:
return "mixed";
default:
@ -88,12 +88,12 @@ int signal_generator_prepare(struct node *n)
assert(vlist_length(&n->in.signals) == 0);
for (int i = 0; i < s->values; i++) {
struct signal *sig = alloc(sizeof(struct signal));
for (unsigned i = 0; i < s->values; i++) {
struct signal *sig = (struct signal *) alloc(sizeof(struct signal));
int rtype = s->type == SIGNAL_GENERATOR_TYPE_MIXED ? i % 7 : s->type;
int rtype = s->type == signal_generator::type::MIXED ? i % 7 : s->type;
sig->name = strdup(signal_generator_type_str(rtype));
sig->name = strdup(signal_generator_type_str((enum signal_generator::type) rtype));
sig->type = SIGNAL_TYPE_FLOAT; /* All generated signals are of type float */
vlist_push(&n->in.signals, sig);
@ -141,10 +141,10 @@ int signal_generator_parse(struct node *n, json_t *cfg)
if (ret == -1)
error("Unknown signal type '%s' of node %s", type, node_name(n));
s->type = ret;
s->type = (enum signal_generator::type) ret;
}
else
s->type = SIGNAL_GENERATOR_TYPE_MIXED;
s->type = signal_generator::type::MIXED;
return 0;
}
@ -157,9 +157,9 @@ int signal_generator_start(struct node *n)
s->missed_steps = 0;
s->counter = 0;
s->started = time_now();
s->last = alloc(sizeof(double) * s->values);
s->last = (double *) alloc(sizeof(double) * s->values);
for (int i = 0; i < s->values; i++)
for (unsigned i = 0; i < s->values; i++)
s->last[i] = s->offset;
/* Setup task */
@ -228,42 +228,42 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u
t->length = MIN(s->values, t->capacity);
t->signals = &n->in.signals;
for (int i = 0; i < MIN(s->values, t->capacity); i++) {
int rtype = (s->type != SIGNAL_GENERATOR_TYPE_MIXED) ? s->type : i % 7;
for (unsigned i = 0; i < MIN(s->values, t->capacity); i++) {
int rtype = (s->type != signal_generator::type::MIXED) ? s->type : i % 7;
switch (rtype) {
case SIGNAL_GENERATOR_TYPE_CONSTANT:
case signal_generator::type::CONSTANT:
t->data[i].f = s->offset + s->amplitude;
break;
case SIGNAL_GENERATOR_TYPE_SINE:
case signal_generator::type::SINE:
t->data[i].f = s->offset + s->amplitude * sin(running * s->frequency * 2 * M_PI);
break;
case SIGNAL_GENERATOR_TYPE_TRIANGLE:
case signal_generator::type::TRIANGLE:
t->data[i].f = s->offset + s->amplitude * (fabs(fmod(running * s->frequency, 1) - .5) - 0.25) * 4;
break;
case SIGNAL_GENERATOR_TYPE_SQUARE:
case signal_generator::type::SQUARE:
t->data[i].f = s->offset + s->amplitude * ( (fmod(running * s->frequency, 1) < .5) ? -1 : 1);
break;
case SIGNAL_GENERATOR_TYPE_RAMP:
case signal_generator::type::RAMP:
t->data[i].f = s->offset + s->amplitude * fmod(running, s->frequency);
break;
case SIGNAL_GENERATOR_TYPE_COUNTER:
case signal_generator::type::COUNTER:
t->data[i].f = s->offset + s->amplitude * s->counter;
break;
case SIGNAL_GENERATOR_TYPE_RANDOM:
case signal_generator::type::RANDOM:
s->last[i] += box_muller(0, s->stddev);
t->data[i].f = s->last[i];
break;
}
}
if (s->limit > 0 && s->counter >= s->limit) {
if (s->limit > 0 && s->counter >= (unsigned) s->limit) {
info("Reached limit.");
n->state = STATE_STOPPING;

View file

@ -73,7 +73,8 @@ int socket_type_start(struct super_node *sn)
char * socket_print(struct node *n)
{
struct socket *s = (struct socket *) n->_vd;
char *layer = NULL, *buf;
const char *layer = NULL;
char *buf;
switch (s->layer) {
case SOCKET_LAYER_UDP:
@ -270,12 +271,12 @@ int socket_start(struct node *n)
}
s->out.buflen = SOCKET_INITIAL_BUFFER_LEN;
s->out.buf = alloc(s->out.buflen);
s->out.buf = (char *) alloc(s->out.buflen);
if (!s->out.buf)
return -1;
s->in.buflen = SOCKET_INITIAL_BUFFER_LEN;
s->in.buf = alloc(s->in.buflen);
s->in.buf = (char *) alloc(s->in.buflen);
if (!s->in.buf)
return -1;
@ -374,7 +375,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
}
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt);
if (ret < 0 || bytes != rbytes)
if (ret < 0 || (size_t) bytes != rbytes)
warning("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes);
return ret;
@ -401,7 +402,7 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt);
if (wbytes > s->out.buflen) {
s->out.buflen = wbytes;
s->out.buf = realloc(s->out.buf, s->out.buflen);
s->out.buf = (char *) realloc(s->out.buf, s->out.buflen);
goto retry;
}
@ -441,8 +442,7 @@ retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->out
else
warning("Failed sendto() to node %s", node_name(n));
}
if (bytes != wbytes)
else if ((size_t) bytes < wbytes)
warning("Partial sendto() to node %s", node_name(n));
return cnt;
@ -557,30 +557,35 @@ int socket_fds(struct node *n, int fds[])
return 1;
}
static struct plugin p = {
.name = "socket",
__attribute__((constructor(110)))
static void register_plugin() {
p.name = "socket";
#ifdef WITH_NETEM
.description = "BSD network sockets for Ethernet / IP / UDP (libnl3, netem support)",
p.description = "BSD network sockets for Ethernet / IP / UDP (libnl3, netem support)";
#else
.description = "BSD network sockets for Ethernet / IP / UDP",
p.description = "BSD network sockets for Ethernet / IP / UDP";
#endif
.type = PLUGIN_TYPE_NODE,
.node = {
.vectorize = 0,
.size = sizeof(struct socket),
.type.start = socket_type_start,
.reverse = socket_reverse,
.parse = socket_parse,
.print = socket_print,
.check = socket_check,
.start = socket_start,
.stop = socket_stop,
.read = socket_read,
.write = socket_write,
.poll_fds = socket_fds,
.netem_fds = socket_fds
}
};
p.type = PLUGIN_TYPE_NODE;
p.node.vectorize = 0;
p.node.size = sizeof(struct socket);
p.node.type.start = socket_type_start;
p.node.reverse = socket_reverse;
p.node.parse = socket_parse;
p.node.print = socket_print;
p.node.check = socket_check;
p.node.start = socket_start;
p.node.stop = socket_stop;
p.node.read = socket_read;
p.node.write = socket_write;
p.node.poll_fds = socket_fds;
p.node.netem_fds = socket_fds;
REGISTER_PLUGIN(&p)
LIST_INIT_STATIC(&p.node.instances)
vlist_init(&p.node.instances);
vlist_push(&plugins, &p);
}
__attribute__((destructor(110)))
static void deregister_plugin() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p);
}

View file

@ -104,27 +104,27 @@ static const struct {
static AiInputMode uldaq_parse_input_mode(const char *str)
{
for (int i = 0; i < ARRAY_LEN(input_modes); i++) {
for (unsigned i = 0; i < ARRAY_LEN(input_modes); i++) {
if (!strcmp(input_modes[i].name, str))
return input_modes[i].mode;
}
return -1;
return (AiInputMode) -1;
}
static DaqDeviceInterface uldaq_parse_interface_type(const char *str)
{
for (int i = 0; i < ARRAY_LEN(interface_types); i++) {
for (unsigned i = 0; i < ARRAY_LEN(interface_types); i++) {
if (!strcmp(interface_types[i].name, str))
return interface_types[i].interface;
}
return -1;
return (DaqDeviceInterface) -1;
}
static const char * uldaq_print_interface_type(DaqDeviceInterface iftype)
{
for (int i = 0; i < ARRAY_LEN(interface_types); i++) {
for (unsigned i = 0; i < ARRAY_LEN(interface_types); i++) {
if (interface_types[i].interface == iftype)
return interface_types[i].name;
}
@ -134,12 +134,12 @@ static const char * uldaq_print_interface_type(DaqDeviceInterface iftype)
static Range uldaq_parse_range(const char *str)
{
for (int i = 0; i < ARRAY_LEN(ranges); i++) {
for (unsigned i = 0; i < ARRAY_LEN(ranges); i++) {
if (!strcmp(ranges[i].name, str))
return ranges[i].range;
}
return -1;
return (Range) -1;
}
static DaqDeviceDescriptor * uldaq_find_device(struct uldaq *u) {
@ -151,7 +151,7 @@ static DaqDeviceDescriptor * uldaq_find_device(struct uldaq *u) {
if (u->device_interface_type == ANY_IFC && u->device_id == NULL)
return &descriptors[0];
for (int i = 0; i < num_devs; i++) {
for (unsigned i = 0; i < num_devs; i++) {
d = &descriptors[i];
if (u->device_id) {
@ -223,7 +223,7 @@ int uldaq_type_start(struct super_node *sn)
}
info("Found %d DAQ devices", num_devs);
for (int i = 0; i < num_devs; i++) {
for (unsigned i = 0; i < num_devs; i++) {
DaqDeviceDescriptor *desc = &descriptors[i];
info(" %d: %s %s (%s)", i, desc->uniqueId, desc->devString, uldaq_print_interface_type(desc->devInterface));
@ -306,11 +306,11 @@ int uldaq_parse(struct node *n, json_t *cfg)
if (iftype < 0)
error("Invalid interface type: %s for node '%s'", interface_type, node_name(n));
u->device_interface_type = iftype;
u->device_interface_type = (DaqDeviceInterface) iftype;
}
u->in.channel_count = vlist_length(&n->in.signals);
u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count);
u->in.queues = (struct AiQueueElement *) realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count);
json_array_foreach(json_signals, i, json_signal) {
const char *range_str = NULL, *input_mode_str = NULL;
@ -347,8 +347,8 @@ int uldaq_parse(struct node *n, json_t *cfg)
if (input_mode < 0)
error("Invalid input mode specified for signal %zu of node %s.", i, node_name(n));
u->in.queues[i].range = range;
u->in.queues[i].inputMode = input_mode;
u->in.queues[i].range = (Range) range;
u->in.queues[i].inputMode = (AiInputMode) input_mode;
u->in.queues[i].channel = channel;
}
@ -539,7 +539,7 @@ int uldaq_start(struct node *n)
err = ulEnableEvent(u->device_handle, DE_ON_DATA_AVAILABLE, n->in.vectorize, uldaq_data_available, n);
/* Start the acquisition */
err = ulAInScan(u->device_handle, 0, 0, 0, 0, u->in.buffer_len / u->in.channel_count, &u->in.sample_rate, u->in.scan_options, u->in.flags, u->in.buffer);
err = ulAInScan(u->device_handle, 0, 0, (AiInputMode) 0, (Range) 0, u->in.buffer_len / u->in.channel_count, &u->in.sample_rate, u->in.scan_options, u->in.flags, u->in.buffer);
if (err != ERR_NO_ERROR) {
warning("Failed to start acquisition on DAQ device for node '%s'", node_name(n));
return -1;
@ -610,12 +610,12 @@ int uldaq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
if (start_index + n->in.vectorize * u->in.channel_count > u->in.transfer_status.currentScanCount)
pthread_cond_wait(&u->in.cv, &u->in.mutex);
for (int j = 0; j < cnt; j++) {
for (unsigned j = 0; j < cnt; j++) {
struct sample *smp = smps[j];
long long scan_index = start_index + j * u->in.channel_count;
for (int i = 0; i < u->in.channel_count; i++) {
for (unsigned i = 0; i < u->in.channel_count; i++) {
long long channel_index = (scan_index + i) % u->in.buffer_len;
smp->data[i].f = u->in.buffer[channel_index];
@ -642,10 +642,12 @@ static struct plugin p = {
.vectorize = 0,
.flags = 0,
.size = sizeof(struct uldaq),
.type.start = uldaq_type_start,
.parse = uldaq_parse,
.type = {
.start = uldaq_type_start
},
.init = uldaq_init,
.destroy = uldaq_destroy,
.parse = uldaq_parse,
.print = uldaq_print,
.start = uldaq_start,
.stop = uldaq_stop,

View file

@ -58,13 +58,13 @@ static char * websocket_connection_name(struct websocket_connection *c)
strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name);
}
else if (c->mode == WEBSOCKET_MODE_CLIENT && c->destination != NULL)
else if (c->mode == websocket_connection::mode::CLIENT && c->destination != NULL)
strcatf(&c->_name, "dest=%s:%d", c->destination->info.address, c->destination->info.port);
if (c->node)
strcatf(&c->_name, ", node=%s", node_name(c->node));
strcatf(&c->_name, ", mode=%s", c->mode == WEBSOCKET_MODE_CLIENT ? "client" : "server");
strcatf(&c->_name, ", mode=%s", c->mode == websocket_connection::mode::CLIENT ? "client" : "server");
}
return c->_name;
@ -104,7 +104,7 @@ static int websocket_connection_init(struct websocket_connection *c)
if (ret)
return ret;
c->state = WEBSOCKET_CONNECTION_STATE_INITIALIZED;
c->state = websocket_connection::state::INITIALIZED;
return 0;
}
@ -113,7 +113,7 @@ static int websocket_connection_destroy(struct websocket_connection *c)
{
int ret;
assert(c->state != WEBSOCKET_CONNECTION_STATE_DESTROYED);
assert(c->state != websocket_connection::state::DESTROYED);
if (c->_name)
free(c->_name);
@ -143,7 +143,7 @@ static int websocket_connection_destroy(struct websocket_connection *c)
c->wsi = NULL;
c->_name = NULL;
c->state = WEBSOCKET_CONNECTION_STATE_DESTROYED;
c->state = websocket_connection::state::DESTROYED;
return 0;
}
@ -152,11 +152,11 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
{
int pushed;
if (c->state != WEBSOCKET_CONNECTION_STATE_INITIALIZED)
if (c->state != websocket_connection::state::INITIALIZED)
return -1;
pushed = queue_push_many(&c->queue, (void **) smps, cnt);
if (pushed < cnt)
if (pushed < (int) cnt)
warning("Queue overrun in WebSocket connection: %s", websocket_connection_name(c));
sample_incref_many(smps, pushed);
@ -180,20 +180,20 @@ static void websocket_connection_close(struct websocket_connection *c, struct lw
int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int ret, recvd, pulled, cnt = 128;
struct websocket_connection *c = user;
struct websocket_connection *c = (struct websocket_connection *) user;
switch (reason) {
case LWS_CALLBACK_CLIENT_ESTABLISHED:
case LWS_CALLBACK_ESTABLISHED:
c->wsi = wsi;
c->state = WEBSOCKET_CONNECTION_STATE_ESTABLISHED;
c->state = websocket_connection::state::ESTABLISHED;
info("Established WebSocket connection: %s", websocket_connection_name(c));
if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED)
c->mode = WEBSOCKET_MODE_CLIENT;
c->mode = websocket_connection::mode::CLIENT;
else {
c->mode = WEBSOCKET_MODE_SERVER;
c->mode = websocket_connection::mode::SERVER;
/* We use the URI to associate this connection to a node
* and choose a protocol.
*
@ -222,10 +222,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
format = strtok_r(NULL, "", &lasts);
if (!format)
format = "villas.web";
format = (char *) "villas.web";
/* Search for node whose name matches the URI. */
c->node = vlist_lookup(&p.node.instances, node);
c->node = (struct node *) vlist_lookup(&p.node.instances, node);
if (!c->node) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node");
warning("Failed to find node: node=%s", node);
@ -253,7 +253,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
c->state = WEBSOCKET_CONNECTION_STATE_ERROR;
c->state = websocket_connection::state::ERROR;
warning("Failed to establish WebSocket connection: %s, reason=%s", websocket_connection_name(c), in ? (char *) in : "unkown");
@ -262,24 +262,24 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
case LWS_CALLBACK_CLOSED:
debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c));
if (c->state != WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
if (c->state != websocket_connection::state::SHUTDOWN) {
/** @todo Attempt reconnect here */
}
if (connections.state == STATE_INITIALIZED)
vlist_remove_all(&connections, c);
if (c->state == WEBSOCKET_CONNECTION_STATE_INITIALIZED)
if (c->state == websocket_connection::state::INITIALIZED)
websocket_connection_destroy(c);
if (c->mode == WEBSOCKET_MODE_CLIENT)
if (c->mode == websocket_connection::mode::CLIENT)
free(c);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
case LWS_CALLBACK_SERVER_WRITEABLE: {
struct sample **smps = alloca(cnt * sizeof(struct sample *));
struct sample **smps = (struct sample **) alloca(cnt * sizeof(struct sample *));
pulled = queue_pull_many(&c->queue, (void **) smps, cnt);
if (pulled > 0) {
@ -298,7 +298,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (queue_available(&c->queue) > 0)
lws_callback_on_writable(wsi);
else if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
else if (c->state == websocket_connection::state::SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
@ -311,7 +311,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
if (lws_is_first_fragment(wsi))
buffer_clear(&c->buffers.recv);
ret = buffer_append(&c->buffers.recv, in, len);
ret = buffer_append(&c->buffers.recv, (char *) in, len);
if (ret) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data");
return -1;
@ -324,7 +324,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
int avail, enqueued;
struct websocket *w = (struct websocket *) n->_vd;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
struct sample **smps = (struct sample **) alloca(cnt * sizeof(struct sample *));
if (!smps) {
warning("Failed to allocate memory for connection: %s", websocket_connection_name(c));
break;
@ -358,7 +358,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
buffer_clear(&c->buffers.recv);
if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) {
if (c->state == websocket_connection::state::SHUTDOWN) {
websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped");
return -1;
}
@ -397,12 +397,12 @@ int websocket_start(struct node *n)
if (ret)
return ret;
for (int i = 0; i < vlist_length(&w->destinations); i++) {
for (size_t i = 0; i < vlist_length(&w->destinations); i++) {
const char *format;
struct websocket_destination *d = (struct websocket_destination *) vlist_at(&w->destinations, i);
struct websocket_connection *c = (struct websocket_connection *) alloc(sizeof(struct websocket_connection));
c->state = WEBSOCKET_CONNECTION_STATE_CONNECTING;
c->state = websocket_connection::state::CONNECTING;
format = strchr(d->info.path, '.');
if (format)
@ -438,13 +438,13 @@ int websocket_stop(struct node *n)
if (c->node != n)
continue;
c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN;
c->state = websocket_connection::state::SHUTDOWN;
lws_callback_on_writable(c->wsi);
}
/* Count open connections belonging to this node */
for (int i = 0; i < vlist_length(&connections); i++) {
for (size_t i = 0; i < vlist_length(&connections); i++) {
struct websocket_connection *c = (struct websocket_connection *) vlist_at(&connections, i);
if (c->node == n)
@ -509,7 +509,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigne
/* Make copies of all samples */
avail = sample_alloc_many(&w->pool, cpys, cnt);
if (avail < cnt)
if (avail < (int) cnt)
warning("Pool underrun for node %s: avail=%u", node_name(n), avail);
sample_copy_many(cpys, smps, avail);
@ -609,24 +609,32 @@ int websocket_poll_fds(struct node *n, int fds[])
return 1;
}
static struct plugin p = {
.name = "websocket",
.description = "Send and receive samples of a WebSocket connection (libwebsockets)",
.type = PLUGIN_TYPE_NODE,
.node = {
.vectorize = 0, /* unlimited */
.size = sizeof(struct websocket),
.type.start = websocket_type_start,
.start = websocket_start,
.stop = websocket_stop,
.destroy = websocket_destroy,
.read = websocket_read,
.write = websocket_write,
.print = websocket_print,
.parse = websocket_parse,
.poll_fds = websocket_poll_fds
}
};
__attribute__((constructor(110))) static void UNIQUE(__ctor)() {
if (plugins.state == STATE_DESTROYED)
vlist_init(&plugins);
REGISTER_PLUGIN(&p)
LIST_INIT_STATIC(&p.node.instances)
p.name = "websocket";
p.description = "Send and receive samples of a WebSocket connection (libwebsockets)";
p.type = PLUGIN_TYPE_NODE;
p.node.vectorize = 0; /* unlimited */
p.node.size = sizeof(struct websocket);
p.node.instances.state = STATE_DESTROYED;
p.node.type.start = websocket_type_start;
p.node.destroy = websocket_destroy;
p.node.parse = websocket_parse;
p.node.print = websocket_print;
p.node.start = websocket_start;
p.node.stop = websocket_stop;
p.node.read = websocket_read;
p.node.write = websocket_write;
p.node.poll_fds = websocket_poll_fds;
vlist_init(&p.node.instances);
vlist_push(&plugins, &p);
}
__attribute__((destructor(110))) static void UNIQUE(__dtor)() {
if (plugins.state != STATE_DESTROYED)
vlist_remove_all(&plugins, &p);
}

View file

@ -83,7 +83,7 @@ int zeromq_reverse(struct node *n)
return -1;
char *subscriber = z->in.endpoint;
char *publisher = vlist_first(&z->out.endpoints);
char *publisher = (char *) vlist_first(&z->out.endpoints);
z->in.endpoint = publisher;
vlist_set(&z->out.endpoints, 0, subscriber);
@ -189,10 +189,10 @@ int zeromq_parse(struct node *n, json_t *cfg)
if (type) {
if (!strcmp(type, "pubsub"))
z->pattern = ZEROMQ_PATTERN_PUBSUB;
z->pattern = zeromq::pattern::PUBSUB;
#ifdef ZMQ_BUILD_DISH
else if (!strcmp(type, "radiodish"))
z->pattern = ZEROMQ_PATTERN_RADIODISH;
z->pattern = zeromq::pattern::RADIODISH;
#endif
else
error("Invalid type for ZeroMQ node: %s", node_name_short(n));
@ -206,12 +206,17 @@ char * zeromq_print(struct node *n)
struct zeromq *z = (struct zeromq *) n->_vd;
char *buf = NULL;
char *pattern = NULL;
const char *pattern = NULL;
switch (z->pattern) {
case ZEROMQ_PATTERN_PUBSUB: pattern = "pubsub"; break;
case zeromq::pattern::PUBSUB:
pattern = "pubsub";
break;
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break;
case zeromq::pattern::RADIODISH:
pattern = "radiodish";
break;
#endif
}
@ -267,13 +272,13 @@ int zeromq_start(struct node *n)
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
case zeromq::pattern::RADIODISH:
z->in.socket = zmq_socket(context, ZMQ_DISH);
z->out.socket = zmq_socket(context, ZMQ_RADIO);
break;
#endif
case ZEROMQ_PATTERN_PUBSUB:
case zeromq::pattern::PUBSUB:
z->in.socket = zmq_socket(context, ZMQ_SUB);
z->out.socket = zmq_socket(context, ZMQ_PUB);
break;
@ -287,12 +292,12 @@ int zeromq_start(struct node *n)
/* Join group */
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
case zeromq::pattern::RADIODISH:
ret = zmq_join(z->in.socket, z->in.filter);
break;
#endif
case ZEROMQ_PATTERN_PUBSUB:
case zeromq::pattern::PUBSUB:
ret = zmq_setsockopt(z->in.socket, ZMQ_SUBSCRIBE, z->in.filter, z->in.filter ? strlen(z->in.filter) : 0);
break;
@ -452,7 +457,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
if (z->in.filter) {
switch (z->pattern) {
case ZEROMQ_PATTERN_PUBSUB:
case zeromq::pattern::PUBSUB:
/* Discard envelope */
zmq_recv(z->in.socket, NULL, 0, 0);
break;
@ -466,7 +471,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
if (ret < 0)
return ret;
recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt);
recv = io_sscan(&z->io, (const char *) zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt);
ret = zmq_msg_close(&m);
if (ret)
@ -494,14 +499,14 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *
if (z->out.filter) {
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
case zeromq::pattern::RADIODISH:
ret = zmq_msg_set_group(&m, z->out.filter);
if (ret < 0)
goto fail;
break;
#endif
case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */
case zeromq::pattern::PUBSUB: /* Send envelope */
zmq_send(z->out.socket, z->out.filter, strlen(z->out.filter), ZMQ_SNDMORE);
break;
}
@ -566,16 +571,18 @@ static struct plugin p = {
.node = {
.vectorize = 0,
.size = sizeof(struct zeromq),
.type.start = zeromq_type_start,
.type.stop = zeromq_type_stop,
.reverse = zeromq_reverse,
.type = {
.start = zeromq_type_start,
.stop = zeromq_type_stop,
},
.destroy = zeromq_destroy,
.parse = zeromq_parse,
.print = zeromq_print,
.start = zeromq_start,
.stop = zeromq_stop,
.destroy = zeromq_destroy,
.read = zeromq_read,
.write = zeromq_write,
.reverse = zeromq_reverse,
.poll_fds = zeromq_poll_fds,
.netem_fds = zeromq_netem_fds,
}