diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 6b5567c5c..cc753661e 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -53,19 +53,19 @@ struct file { size_t buffer_size_out; /**< Defines size of output stream buffer. No buffer is created if value is set to zero. */ size_t buffer_size_in; /**< Defines size of input stream buffer. No buffer is created if value is set to zero. */ - enum epoch_mode { - FILE_EPOCH_DIRECT, - FILE_EPOCH_WAIT, - FILE_EPOCH_RELATIVE, - FILE_EPOCH_ABSOLUTE, - FILE_EPOCH_ORIGINAL + enum epoch { + DIRECT, + WAIT, + RELATIVE, + ABSOLUTE, + ORIGINAL } epoch_mode; /**< Specifies how file::offset is calculated. */ - enum { - FILE_EOF_STOP, /**< Terminate when EOF is reached. */ - FILE_EOF_REWIND, /**< Rewind the file when EOF is reached. */ - FILE_EOF_WAIT /**< Blocking wait when EOF is reached. */ - } eof; + enum eof { + STOP, /**< Terminate when EOF is reached. */ + REWIND, /**< Rewind the file when EOF is reached. */ + SUSPEND /**< Blocking wait when EOF is reached. */ + } eof_mode; struct timespec first; /**< The first timestamp in the file file::{read,write}::uri */ struct timespec epoch; /**< The epoch timestamp from the configuration. */ diff --git a/include/villas/nodes/iec61850.h b/include/villas/nodes/iec61850.h index 6ef689dec..a6ff7b14e 100644 --- a/include/villas/nodes/iec61850.h +++ b/include/villas/nodes/iec61850.h @@ -51,28 +51,28 @@ extern "C" { enum iec61850_type { /* According to IEC 61850-7-2 */ - IEC61850_TYPE_BOOLEAN, - IEC61850_TYPE_INT8, - IEC61850_TYPE_INT16, - IEC61850_TYPE_INT32, - IEC61850_TYPE_INT64, - IEC61850_TYPE_INT8U, - IEC61850_TYPE_INT16U, - IEC61850_TYPE_INT32U, - IEC61850_TYPE_INT64U, - IEC61850_TYPE_FLOAT32, - IEC61850_TYPE_FLOAT64, - IEC61850_TYPE_ENUMERATED, - IEC61850_TYPE_CODED_ENUM, - IEC61850_TYPE_OCTET_STRING, - IEC61850_TYPE_VISIBLE_STRING, - IEC61850_TYPE_OBJECTNAME, - IEC61850_TYPE_OBJECTREFERENCE, - IEC61850_TYPE_TIMESTAMP, - IEC61850_TYPE_ENTRYTIME, + BOOLEAN, + INT8, + INT16, + INT32, + INT64, + INT8U, + INT16U, + INT32U, + INT64U, + FLOAT32, + FLOAT64, + ENUMERATED, + CODED_ENUM, + OCTET_STRING, + VISIBLE_STRING, + OBJECTNAME, + OBJECTREFERENCE, + TIMESTAMP, + ENTRYTIME, /* According to IEC 61850-8-1 */ - IEC61850_TYPE_BITSTRING + BITSTRING }; struct iec61850_type_descriptor { @@ -89,9 +89,9 @@ struct iec61850_receiver { EthernetSocket socket; - enum iec61850_receiver_type { - IEC61850_RECEIVER_GOOSE, - IEC61850_RECEIVER_SV + enum type { + GOOSE, + SAMPLED_VALUES } type; union { @@ -110,9 +110,9 @@ const struct iec61850_type_descriptor * iec61850_lookup_type(const char *name); int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct vlist *node_signals); -struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver_type t, const char *intf); +struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver::type t, const char *intf); -struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver_type t, const char *intf); +struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver::type t, const char *intf); int iec61850_receiver_start(struct iec61850_receiver *r); diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index d8ab15638..e4961c2a5 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -89,11 +89,11 @@ struct infiniband { int use_fallback; /* Counter to keep track of available recv. WRs */ - int available_recv_wrs; + unsigned available_recv_wrs; /* Fixed number to substract from min. number available * WRs in receive queue */ - int buffer_subtraction; + unsigned buffer_subtraction; /* Unrealiable connectionless data */ struct ud_s { diff --git a/include/villas/nodes/signal_generator.h b/include/villas/nodes/signal_generator.h index cc7a9e7cc..f924f2035 100644 --- a/include/villas/nodes/signal_generator.h +++ b/include/villas/nodes/signal_generator.h @@ -40,17 +40,6 @@ extern "C" { struct node; struct sample; -enum signal_generator_type { - SIGNAL_GENERATOR_TYPE_RANDOM, - SIGNAL_GENERATOR_TYPE_SINE, - SIGNAL_GENERATOR_TYPE_SQUARE, - SIGNAL_GENERATOR_TYPE_TRIANGLE, - SIGNAL_GENERATOR_TYPE_RAMP, - SIGNAL_GENERATOR_TYPE_COUNTER, - SIGNAL_GENERATOR_TYPE_CONSTANT, - SIGNAL_GENERATOR_TYPE_MIXED -}; - /** Node-type for signal generation. * @see node_type */ @@ -58,7 +47,17 @@ struct signal_generator { struct task task; /**< Timer for periodic events. */ int rt; /**< Real-time mode? */ - enum signal_generator_type type; /**< Signal type */ + enum type { + RANDOM, + SINE, + SQUARE, + TRIANGLE, + RAMP, + COUNTER, + CONSTANT, + MIXED, + INVALID + } type; /**< Signal type */ double rate; /**< Sampling rate. */ double frequency; /**< Frequency of the generated signals. */ @@ -69,12 +68,12 @@ struct signal_generator { double *last; /**< The values from the previous period which are required for random walk. */ - int values; /**< The number of values which will be emitted by this node. */ + unsigned values; /**< The number of values which will be emitted by this node. */ int limit; /**< The number of values which should be generated by this node. <0 for infinitve. */ struct timespec started; /**< Point in time when this node was started. */ - int counter; /**< The number of packets already emitted. */ - int missed_steps; /**< Total number of missed steps. */ + unsigned counter; /**< The number of packets already emitted. */ + unsigned missed_steps; /**< Total number of missed steps. */ }; /** @see node_type::print */ diff --git a/include/villas/nodes/websocket.h b/include/villas/nodes/websocket.h index d11b9b60b..c83de246e 100644 --- a/include/villas/nodes/websocket.h +++ b/include/villas/nodes/websocket.h @@ -57,19 +57,19 @@ struct websocket { /* Internal datastructures */ struct websocket_connection { - enum websocket_connection_state { - WEBSOCKET_CONNECTION_STATE_DESTROYED, - WEBSOCKET_CONNECTION_STATE_INITIALIZED, - WEBSOCKET_CONNECTION_STATE_CONNECTING, - WEBSOCKET_CONNECTION_STATE_RECONNECTING, - WEBSOCKET_CONNECTION_STATE_ESTABLISHED, - WEBSOCKET_CONNECTION_STATE_SHUTDOWN, - WEBSOCKET_CONNECTION_STATE_ERROR + enum state { + DESTROYED, + INITIALIZED, + CONNECTING, + RECONNECTING, + ESTABLISHED, + SHUTDOWN, + ERROR } state; /**< The current status of this connection. */ - enum { - WEBSOCKET_MODE_CLIENT, - WEBSOCKET_MODE_SERVER, + enum mode { + CLIENT, + SERVER, } mode; struct lws *wsi; diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 1e1823c32..b9b586f93 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -63,10 +63,10 @@ struct zeromq { } server, client; } curve; - enum { - ZEROMQ_PATTERN_PUBSUB, + enum pattern { + PUBSUB, #ifdef ZMQ_BUILD_DISH - ZEROMQ_PATTERN_RADIODISH + RADIODISH #endif } pattern; diff --git a/include/villas/shmem.h b/include/villas/shmem.h index b7b37942d..4c696ddfd 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -29,17 +29,17 @@ #pragma once -#ifdef __cplusplus -extern "C" { -#endif - #include #include #include #include -#define DEFAULT_SHMEM_QUEUELEN 512 -#define DEFAULT_SHMEM_SAMPLELEN 64 +#define DEFAULT_SHMEM_QUEUELEN 512u +#define DEFAULT_SHMEM_SAMPLELEN 64u + +#ifdef __cplusplus +extern "C" { +#endif /** Struct containing all parameters that need to be known when creating a new * shared memory object. */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 70556553c..48c0d2d5a 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -28,7 +28,7 @@ if(LIBNL3_ROUTE_FOUND) endif() if(WITH_NODE_INFLUXDB) - list(APPEND NODE_SRC influxdb.c) + list(APPEND NODE_SRC influxdb.cpp) endif() if(WITH_NODE_STATS) @@ -36,11 +36,11 @@ if(WITH_NODE_STATS) endif() if(WITH_NODE_SIGNAL) - list(APPEND NODE_SRC signal_generator.c) + list(APPEND NODE_SRC signal_generator.cpp) endif() if(WITH_NODE_LOOPBACK) - list(APPEND NODE_SRC loopback.c) + list(APPEND NODE_SRC loopback.cpp) endif() if(WITH_NODE_TEST_RTT) @@ -48,23 +48,23 @@ if(WITH_NODE_TEST_RTT) endif() if(WITH_NODE_SOCKET) - list(APPEND NODE_SRC socket.c) + list(APPEND NODE_SRC socket.cpp) endif() if(WITH_NODE_FILE) - list(APPEND NODE_SRC file.c) + list(APPEND NODE_SRC file.cpp) endif() # Enable Universal Library for Linux DAQ devices (libuldaq) if(WITH_NODE_ULDAQ) - list(APPEND NODE_SRC uldaq.c) + list(APPEND NODE_SRC uldaq.cpp) list(APPEND INCLUDE_DIRS ${LIBULDAQ_INCLUDE_DIRS}) list(APPEND LIBRARIES PkgConfig::LIBULDAQ uldaq) endif() # Enable shared memory node-type if(WITH_NODE_SHMEM) - list(APPEND NODE_SRC shmem.c) + list(APPEND NODE_SRC shmem.cpp) if(CMAKE_SUSTEM_NAME STREQUAL Linux) list(APPEND LIBRARIES rt) @@ -73,70 +73,70 @@ endif() # Enable IEC61850 node-types when libiec61850 is available if(WITH_NODE_IEC61850) - list(APPEND NODE_SRC iec61850_sv.c iec61850.c) + list(APPEND NODE_SRC iec61850_sv.cpp iec61850.cpp) list(APPEND INCLUDE_DIRS ${LIBIEC61850_INCLUDE_DIRS}) list(APPEND LIBRARIES PkgConfig::LIBIEC61850 ${LIBIEC61850_LIBRARIES}) endif() # Enable OPAL-RT Asynchronous Process support (will result in 32bit binary!!!) if(WITH_NODE_OPAL) - list(APPEND NODE_SRC opal.c) + list(APPEND NODE_SRC opal.cpp) list(APPEND INCLUDE_DIRS ${OPAL_INCLUDE_DIRS}) list(APPEND LIBRARIES ${OPAL_LIBRARIES}) endif() # Enable nanomsg node type when libnanomsg is available if(WITH_NODE_NANOMSG) - list(APPEND NODE_SRC nanomsg.c) + list(APPEND NODE_SRC nanomsg.cpp) list(APPEND INCLUDE_DIRS ${NANOMSG_INCLUDE_DIRS}) list(APPEND LIBRARIES PkgConfig::NANOMSG) endif() # Enable ZeroMQ node type when libzmq is available if(WITH_NODE_ZEROMQ) - list(APPEND NODE_SRC zeromq.c) + list(APPEND NODE_SRC zeromq.cpp) list(APPEND INCLUDE_DIRS ${LIBZMQ_INCLUDE_DIRS}) list(APPEND LIBRARIES PkgConfig::LIBZMQ) endif() # Enable NGSI support if(WITH_NODE_NGSI) - list(APPEND NODE_SRC ngsi.c) + list(APPEND NODE_SRC ngsi.cpp) list(APPEND INCLUDE_DIRS ${CURL_INCLUDE_DIRS}) list(APPEND LIBRARIES ${CURL_LIBRARIES}) endif() # Enable WebSocket support if(WITH_NODE_WEBSOCKET) - list(APPEND NODE_SRC websocket.c) + list(APPEND NODE_SRC websocket.cpp) list(APPEND INCLUDE_DIRS ${LIBWEBSOCKETS_INCLUDE_DIRS}) list(APPEND LIBRARIES ${LIBWEBSOCKETS_LDLIBS}) endif() # Enable AMQP support if(WITH_NODE_AMQP) - list(APPEND NODE_SRC amqp.c) + list(APPEND NODE_SRC amqp.cpp) list(APPEND INCLUDE_DIRS ${RABBITMQ_C_INCLUDE_DIRS}) list(APPEND LIBRARIES PkgConfig::RABBITMQ_C) endif() # Enable MQTT support if(WITH_NODE_MQTT) - list(APPEND NODE_SRC mqtt.c) + list(APPEND NODE_SRC mqtt.cpp) list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS}) list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES}) endif() # Enable Comedi support if(WITH_NODE_COMEDI) - list(APPEND NODE_SRC comedi.c) + list(APPEND NODE_SRC comedi.cpp) list(APPEND INCLUDE_DIRS ${COMEDILIB_INCLUDE_DIRS}) list(APPEND LIBRARIES PkgConfig::COMEDILIB) endif() # Enable Infiniband support if(WITH_NODE_INFINIBAND) - list(APPEND NODE_SRC infiniband.c) + list(APPEND NODE_SRC infiniband.cpp) list(APPEND INCLUDE_DIRS ${IBVERBS_INCLUDE_DIRS} ${RDMACM_INCLUDE_DIRS}) list(APPEND LIBRARIES ${IBVERBS_LIBRARIES} ${RDMACM_LIBRARIES}) endif() diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.cpp similarity index 95% rename from lib/nodes/amqp.c rename to lib/nodes/amqp.cpp index e644d679e..58578d014 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.cpp @@ -114,7 +114,7 @@ static int amqp_close(amqp_connection_state_t conn) int amqp_parse(struct node *n, json_t *json) { int ret; - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; int port = 5672; const char *format = "json"; @@ -195,7 +195,7 @@ int amqp_parse(struct node *n, json_t *json) char * amqp_print(struct node *n) { - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; char *buf = NULL; @@ -233,7 +233,7 @@ char * amqp_print(struct node *n) int amqp_start(struct node *n) { int ret; - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; amqp_bytes_t queue; amqp_rpc_reply_t rep; @@ -293,7 +293,7 @@ int amqp_start(struct node *n) int amqp_stop(struct node *n) { int ret; - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; ret = amqp_close(a->consumer); if (ret) @@ -313,7 +313,7 @@ int amqp_stop(struct node *n) int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; amqp_envelope_t env; amqp_rpc_reply_t rep; @@ -321,7 +321,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel if (rep.reply_type != AMQP_RESPONSE_NORMAL) return -1; - ret = io_sscan(&a->io, env.message.body.bytes, env.message.body.len, NULL, smps, cnt); + ret = io_sscan(&a->io, static_cast(env.message.body.bytes), env.message.body.len, NULL, smps, cnt); amqp_destroy_envelope(&env); @@ -331,7 +331,7 @@ int amqp_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { int ret; - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; char data[1500]; size_t wbytes; @@ -358,7 +358,7 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re int amqp_poll_fds(struct node *n, int fds[]) { - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; amqp_socket_t *sock = amqp_get_socket(a->consumer); @@ -369,7 +369,7 @@ int amqp_poll_fds(struct node *n, int fds[]) int amqp_destroy(struct node *n) { - struct amqp *a = n->_vd; + struct amqp *a = (struct amqp *) n->_vd; if (a->uri) free(a->uri); @@ -399,13 +399,13 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct amqp), + .destroy = amqp_destroy, .parse = amqp_parse, .print = amqp_print, .start = amqp_start, .stop = amqp_stop, .read = amqp_read, .write = amqp_write, - .destroy = amqp_destroy, .poll_fds = amqp_poll_fds } }; diff --git a/lib/nodes/comedi.c b/lib/nodes/comedi.cpp similarity index 96% rename from lib/nodes/comedi.c rename to lib/nodes/comedi.cpp index c6ed42da5..5788e5265 100644 --- a/lib/nodes/comedi.c +++ b/lib/nodes/comedi.cpp @@ -70,10 +70,10 @@ static int comedi_parse_direction(struct comedi *c, struct comedi_direction *d, return 0; } - d->chanlist = alloc(d->chanlist_len * sizeof(*d->chanlist)); + d->chanlist = (unsigned int*) alloc(d->chanlist_len * sizeof(*d->chanlist)); assert(d->chanlist != NULL); - d->chanspecs = alloc(d->chanlist_len * sizeof(*d->chanspecs)); + d->chanspecs = (comedi_chanspec *) alloc(d->chanlist_len * sizeof(*d->chanspecs)); assert(d->chanspecs != NULL); json_array_foreach(json_chans, i, json_chan) { @@ -109,7 +109,7 @@ static int comedi_start_common(struct node *n) continue; /* Sanity-check channel config and populate chanspec for later */ - for (int i = 0; i < d->chanlist_len; i++) { + for (unsigned i = 0; i < d->chanlist_len; i++) { const unsigned int channel = CR_CHAN(d->chanlist[i]); const int range = CR_RANGE(d->chanlist[i]); @@ -234,7 +234,7 @@ static int comedi_start_in(struct node *n) #if COMEDI_USE_READ /* Be prepared to consume one entire buffer */ - c->buf = alloc(c->in.buffer_size); + c->buf = (char *) alloc(c->in.buffer_size); c->bufptr = c->buf; assert(c->bufptr != NULL); @@ -324,12 +324,12 @@ static int comedi_start_out(struct node *n) /* Allocate buffer for one complete villas sample */ /** @todo: maybe increase buffer size according to c->vectorize */ const size_t local_buffer_size = d->sample_size * d->chanlist_len; - d->buffer = alloc(local_buffer_size); + d->buffer = (char *) alloc(local_buffer_size); d->bufptr = d->buffer; assert(d->buffer != NULL); /* Initialize local buffer used for write() syscalls */ - for (int channel = 0; channel < d->chanlist_len; channel++) { + for (unsigned channel = 0; channel < d->chanlist_len; channel++) { const unsigned raw = comedi_from_phys(0.0f, d->chanspecs[channel].range, d->chanspecs[channel].maxdata); if (d->sample_size == sizeof(sampl_t)) @@ -341,9 +341,9 @@ static int comedi_start_out(struct node *n) } /* Preload comedi output buffer */ - for (int i = 0; i < d->buffer_size / local_buffer_size; i++) { - ret = write(comedi_fileno(c->dev), d->buffer, local_buffer_size); - if (ret != local_buffer_size) { + for (unsigned i = 0; i < d->buffer_size / local_buffer_size; i++) { + size_t written = write(comedi_fileno(c->dev), d->buffer, local_buffer_size); + if (written != local_buffer_size) { error("Cannot preload Comedi buffer"); } } @@ -584,7 +584,7 @@ int comedi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r smps[i]->capacity, d->chanlist_len); } - for (int si = 0; si < d->chanlist_len; si++) { + for (unsigned si = 0; si < d->chanlist_len; si++) { unsigned int raw; if (d->sample_size == sizeof(sampl_t)) @@ -874,7 +874,7 @@ int comedi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned * d->bufptr = d->buffer; /* Move samples from villas into local buffer for comedi */ - for (int si = 0; si < sample->length; si++) { + for (unsigned si = 0; si < sample->length; si++) { unsigned raw_value = 0; switch (sample_format(sample, si)) { @@ -910,15 +910,15 @@ int comedi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned * } /* Try to write one complete villas sample to comedi */ - ret = write(comedi_fileno(c->dev), d->buffer, villas_sample_size); - if (ret < 0) + size_t written = write(comedi_fileno(c->dev), d->buffer, villas_sample_size); + if (written < 0) error("write"); - else if (ret == 0) + else if (written == 0) break; /* Comedi doesn't accept any more samples at the moment */ - else if (ret == villas_sample_size) + else if (written == villas_sample_size) villas_samples_written++; else - error("Only partial sample written (%d bytes), oops", ret); + error("Only partial sample written (%zu bytes), oops", written); } if (villas_samples_written == 0) { diff --git a/lib/nodes/file.c b/lib/nodes/file.cpp similarity index 79% rename from lib/nodes/file.c rename to lib/nodes/file.cpp index 0a0f80303..746669e77 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.cpp @@ -37,7 +37,7 @@ static char * file_format_name(const char *format, struct timespec *ts) { struct tm tm; - char *buf = alloc(FILE_MAX_PATHLEN); + char *buf = (char *) alloc(FILE_MAX_PATHLEN); /* Convert time */ gmtime_r(&ts->tv_sec, &tm); @@ -47,26 +47,26 @@ static char * file_format_name(const char *format, struct timespec *ts) return buf; } -static struct timespec file_calc_offset(const struct timespec *first, const struct timespec *epoch, enum epoch_mode mode) +static struct timespec file_calc_offset(const struct timespec *first, const struct timespec *epoch, enum file::epoch mode) { /* Get current time */ struct timespec now = time_now(); struct timespec offset; - /* Set offset depending on epoch_mode */ + /* Set offset depending on epoch */ switch (mode) { - case FILE_EPOCH_DIRECT: /* read first value at now + epoch */ + case file::epoch::DIRECT: /* read first value at now + epoch */ offset = time_diff(first, &now); return time_add(&offset, epoch); - case FILE_EPOCH_WAIT: /* read first value at now + first + epoch */ + case file::epoch::WAIT: /* read first value at now + first + epoch */ offset = now; return time_add(&now, epoch); - case FILE_EPOCH_RELATIVE: /* read first value at first + epoch */ + case file::epoch::RELATIVE: /* read first value at first + epoch */ return *epoch; - case FILE_EPOCH_ABSOLUTE: /* read first value at f->epoch */ + case file::epoch::ABSOLUTE: /* read first value at f->epoch */ return time_diff(first, epoch); default: @@ -76,7 +76,7 @@ static struct timespec file_calc_offset(const struct timespec *first, const stru int file_parse(struct node *n, json_t *cfg) { - struct file *f = n->_vd; + struct file *f = (struct file *) n->_vd; int ret; json_error_t err; @@ -84,13 +84,13 @@ int file_parse(struct node *n, json_t *cfg) const char *uri_tmpl = NULL; const char *format = "villas.human"; const char *eof = NULL; - const char *epoch_mode = NULL; + const char *epoch = NULL; double epoch_flt = 0; /* Default values */ f->rate = 0; - f->eof = FILE_EOF_STOP; - f->epoch_mode = FILE_EPOCH_DIRECT; + f->eof_mode = file::eof::STOP; + f->epoch_mode = file::epoch::DIRECT; f->flush = 0; f->buffer_size_in = 0; f->buffer_size_out = 0; @@ -101,7 +101,7 @@ int file_parse(struct node *n, json_t *cfg) "in", "eof", &eof, "rate", &f->rate, - "epoch_mode", &epoch_mode, + "epoch", &epoch, "epoch", &epoch_flt, "buffer_size", &f->buffer_size_in, "out", @@ -120,28 +120,28 @@ int file_parse(struct node *n, json_t *cfg) if (eof) { if (!strcmp(eof, "exit") || !strcmp(eof, "stop")) - f->eof = FILE_EOF_STOP; + f->eof_mode = file::eof::STOP; else if (!strcmp(eof, "rewind")) - f->eof = FILE_EOF_REWIND; + f->eof_mode = file::eof::REWIND; else if (!strcmp(eof, "wait")) - f->eof = FILE_EOF_WAIT; + f->eof_mode = file::eof::SUSPEND; else error("Invalid mode '%s' for 'eof' setting of node %s", eof, node_name(n)); } - if (epoch_mode) { - if (!strcmp(epoch_mode, "direct")) - f->epoch_mode = FILE_EPOCH_DIRECT; - else if (!strcmp(epoch_mode, "wait")) - f->epoch_mode = FILE_EPOCH_WAIT; - else if (!strcmp(epoch_mode, "relative")) - f->epoch_mode = FILE_EPOCH_RELATIVE; - else if (!strcmp(epoch_mode, "absolute")) - f->epoch_mode = FILE_EPOCH_ABSOLUTE; - else if (!strcmp(epoch_mode, "original")) - f->epoch_mode = FILE_EPOCH_ORIGINAL; + if (epoch) { + if (!strcmp(epoch, "direct")) + f->epoch_mode = file::epoch::DIRECT; + else if (!strcmp(epoch, "wait")) + f->epoch_mode = file::epoch::WAIT; + else if (!strcmp(epoch, "relative")) + f->epoch_mode = file::epoch::RELATIVE; + else if (!strcmp(epoch, "absolute")) + f->epoch_mode = file::epoch::ABSOLUTE; + else if (!strcmp(epoch, "original")) + f->epoch_mode = file::epoch::ORIGINAL; else - error("Invalid value '%s' for setting 'epoch_mode' of node %s", epoch_mode, node_name(n)); + error("Invalid value '%s' for setting 'epoch' of node %s", epoch, node_name(n)); } n->_vd = f; @@ -158,20 +158,50 @@ char * file_print(struct node *n) const char *eof_str = NULL; switch (f->epoch_mode) { - case FILE_EPOCH_DIRECT: epoch_str = "direct"; break; - case FILE_EPOCH_WAIT: epoch_str = "wait"; break; - case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break; - case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break; - case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break; + case file::epoch::DIRECT: + epoch_str = "direct"; + break; + + case file::epoch::WAIT: + epoch_str = "wait"; + break; + + case file::epoch::RELATIVE: + epoch_str = "relative"; + break; + + case file::epoch::ABSOLUTE: + epoch_str = "absolute"; + break; + + case file::epoch::ORIGINAL: + epoch_str = "original"; + break; + + default: + epoch_str = ""; + break; } - switch (f->eof) { - case FILE_EOF_STOP: eof_str = "stop"; break; - case FILE_EOF_WAIT: eof_str = "wait"; break; - case FILE_EOF_REWIND: eof_str = "rewind"; break; + switch (f->eof_mode) { + case file::eof::STOP: + eof_str = "stop"; + break; + + case file::eof::SUSPEND: + eof_str = "wait"; + break; + + case file::eof::REWIND: + eof_str = "rewind"; + break; + + default: + eof_str = ""; + break; } - strcatf(&buf, "uri=%s, format=%s, flush=%s, eof=%s, epoch_mode=%s, epoch=%.2f", + strcatf(&buf, "uri=%s, format=%s, flush=%s, eof=%s, epoch=%s, epoch=%.2f", f->uri ? f->uri : f->uri_tmpl, format_type_name(f->format), f->flush ? "yes" : "no", @@ -273,7 +303,7 @@ int file_start(struct node *n) serror("Failed to create timer"); /* Get timestamp of first line */ - if (f->epoch_mode != FILE_EPOCH_ORIGINAL) { + if (f->epoch_mode != file::epoch::ORIGINAL) { io_rewind(&f->io); struct sample s = { .capacity = 0 }; @@ -341,15 +371,15 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel retry: ret = io_scan(&f->io, smps, cnt); if (ret <= 0) { if (io_eof(&f->io)) { - switch (f->eof) { - case FILE_EOF_REWIND: + switch (f->eof_mode) { + case file::eof::REWIND: info("Rewind input file of node %s", node_name(n)); f->offset = file_calc_offset(&f->first, &f->epoch, f->epoch_mode); io_rewind(&f->io); goto retry; - case FILE_EOF_WAIT: + case file::eof::SUSPEND: /* We wait 10ms before fetching again. */ usleep(100000); @@ -369,12 +399,14 @@ retry: ret = io_scan(&f->io, smps, cnt); goto retry; - case FILE_EOF_STOP: + case file::eof::STOP: info("Reached end-of-file."); n->state = STATE_STOPPING; return -1; + + default: { } } } else @@ -384,7 +416,7 @@ retry: ret = io_scan(&f->io, smps, cnt); } /* We dont wait in FILE_EPOCH_ORIGINAL mode */ - if (f->epoch_mode == FILE_EPOCH_ORIGINAL) + if (f->epoch_mode == file::epoch::ORIGINAL) return cnt; if (f->rate) { @@ -428,16 +460,16 @@ int file_poll_fds(struct node *n, int fds[]) if (f->rate) { fds[0] = task_fd(&f->task); + return 1; } - else { - if (f->epoch_mode == FILE_EPOCH_ORIGINAL) { - fds[0] = io_fd(&f->io); - return 1; - } - else - return -1; /** @todo not supported yet */ + else if (f->epoch_mode == file::epoch::ORIGINAL) { + fds[0] = io_fd(&f->io); + + return 1; } + + return -1; /** @todo not supported yet */ } static struct plugin p = { @@ -450,8 +482,8 @@ static struct plugin p = { .parse = file_parse, .print = file_print, .start = file_start, - .stop = file_stop, .restart = file_restart, + .stop = file_stop, .read = file_read, .write = file_write, .poll_fds = file_poll_fds diff --git a/lib/nodes/iec61850.c b/lib/nodes/iec61850.cpp similarity index 71% rename from lib/nodes/iec61850.c rename to lib/nodes/iec61850.cpp index 50633e7f9..9e8538a66 100644 --- a/lib/nodes/iec61850.c +++ b/lib/nodes/iec61850.cpp @@ -37,26 +37,26 @@ const struct iec61850_type_descriptor type_descriptors[] = { /* name, iec_type, type, size, supported */ - { "boolean", IEC61850_TYPE_BOOLEAN, SIGNAL_TYPE_BOOLEAN, 1, false, false }, - { "int8", IEC61850_TYPE_INT8, SIGNAL_TYPE_INTEGER, 1, false, false }, - { "int16", IEC61850_TYPE_INT16, SIGNAL_TYPE_INTEGER, 2, false, false }, - { "int32", IEC61850_TYPE_INT32, SIGNAL_TYPE_INTEGER, 4, false, false }, - { "int64", IEC61850_TYPE_INT64, SIGNAL_TYPE_INTEGER, 8, false, false }, - { "int8u", IEC61850_TYPE_INT8U, SIGNAL_TYPE_INTEGER, 1, false, false }, - { "int16u", IEC61850_TYPE_INT16U, SIGNAL_TYPE_INTEGER, 2, false, false }, - { "int32u", IEC61850_TYPE_INT32U, SIGNAL_TYPE_INTEGER, 4, false, false }, - { "int64u", IEC61850_TYPE_INT64U, SIGNAL_TYPE_INTEGER, 8, false, false }, - { "float32", IEC61850_TYPE_FLOAT32, SIGNAL_TYPE_FLOAT, 4, false, false }, - { "float64", IEC61850_TYPE_FLOAT64, SIGNAL_TYPE_FLOAT, 8, false, false }, - { "enumerated", IEC61850_TYPE_ENUMERATED, SIGNAL_TYPE_INVALID, 4, false, false }, - { "coded_enum", IEC61850_TYPE_CODED_ENUM, SIGNAL_TYPE_INVALID, 4, false, false }, - { "octet_string", IEC61850_TYPE_OCTET_STRING, SIGNAL_TYPE_INVALID, 20, false, false }, - { "visible_string", IEC61850_TYPE_VISIBLE_STRING, SIGNAL_TYPE_INVALID, 35, false, false }, - { "objectname", IEC61850_TYPE_OBJECTNAME, SIGNAL_TYPE_INVALID, 20, false, false }, - { "objectreference", IEC61850_TYPE_OBJECTREFERENCE, SIGNAL_TYPE_INVALID, 20, false, false }, - { "timestamp", IEC61850_TYPE_TIMESTAMP, SIGNAL_TYPE_INVALID, 8, false, false }, - { "entrytime", IEC61850_TYPE_ENTRYTIME, SIGNAL_TYPE_INVALID, 6, false, false }, - { "bitstring", IEC61850_TYPE_BITSTRING, SIGNAL_TYPE_INVALID, 4, false, false } + { "boolean", iec61850_type::BOOLEAN, SIGNAL_TYPE_BOOLEAN, 1, false, false }, + { "int8", iec61850_type::INT8, SIGNAL_TYPE_INTEGER, 1, false, false }, + { "int16", iec61850_type::INT16, SIGNAL_TYPE_INTEGER, 2, false, false }, + { "int32", iec61850_type::INT32, SIGNAL_TYPE_INTEGER, 4, false, false }, + { "int64", iec61850_type::INT64, SIGNAL_TYPE_INTEGER, 8, false, false }, + { "int8u", iec61850_type::INT8U, SIGNAL_TYPE_INTEGER, 1, false, false }, + { "int16u", iec61850_type::INT16U, SIGNAL_TYPE_INTEGER, 2, false, false }, + { "int32u", iec61850_type::INT32U, SIGNAL_TYPE_INTEGER, 4, false, false }, + { "int64u", iec61850_type::INT64U, SIGNAL_TYPE_INTEGER, 8, false, false }, + { "float32", iec61850_type::FLOAT32, SIGNAL_TYPE_FLOAT, 4, false, false }, + { "float64", iec61850_type::FLOAT64, SIGNAL_TYPE_FLOAT, 8, false, false }, + { "enumerated", iec61850_type::ENUMERATED, SIGNAL_TYPE_INVALID, 4, false, false }, + { "coded_enum", iec61850_type::CODED_ENUM, SIGNAL_TYPE_INVALID, 4, false, false }, + { "octet_string", iec61850_type::OCTET_STRING, SIGNAL_TYPE_INVALID, 20, false, false }, + { "visible_string", iec61850_type::VISIBLE_STRING, SIGNAL_TYPE_INVALID, 35, false, false }, + { "objectname", iec61850_type::OBJECTNAME, SIGNAL_TYPE_INVALID, 20, false, false }, + { "objectreference", iec61850_type::OBJECTREFERENCE, SIGNAL_TYPE_INVALID, 20, false, false }, + { "timestamp", iec61850_type::TIMESTAMP, SIGNAL_TYPE_INVALID, 8, false, false }, + { "entrytime", iec61850_type::ENTRYTIME, SIGNAL_TYPE_INVALID, 6, false, false }, + { "bitstring", iec61850_type::BITSTRING, SIGNAL_TYPE_INVALID, 4, false, false } }; /** Each network interface needs a separate receiver */ @@ -78,8 +78,8 @@ static void * iec61850_thread(void *ctx) struct iec61850_receiver *r = (struct iec61850_receiver *) vlist_at(&receivers, i); switch (r->type) { - case IEC61850_RECEIVER_GOOSE: GooseReceiver_tick(r->goose); break; - case IEC61850_RECEIVER_SV: SVReceiver_tick(r->sv); break; + case iec61850_receiver::type::GOOSE: GooseReceiver_tick(r->goose); break; + case iec61850_receiver::type::SAMPLED_VALUES: SVReceiver_tick(r->sv); break; } } } @@ -122,7 +122,7 @@ int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct v if (!node_signals) return -1; - sig = vlist_at(node_signals, i); + sig = (struct signal *) vlist_at(node_signals, i); if (!sig) return -1; @@ -162,7 +162,7 @@ int iec61850_parse_signals(json_t *json_signals, struct vlist *signals, struct v if (!td) return -1; - for (int i = 0; i < vlist_length(node_signals); i++) { + for (unsigned i = 0; i < vlist_length(node_signals); i++) { vlist_push(signals, (void *) td); total_size += td->size; @@ -224,11 +224,11 @@ int iec61850_type_stop() int iec61850_receiver_start(struct iec61850_receiver *r) { switch (r->type) { - case IEC61850_RECEIVER_GOOSE: + case iec61850_receiver::type::GOOSE: r->socket = GooseReceiver_startThreadless(r->goose); break; - case IEC61850_RECEIVER_SV: + case iec61850_receiver::type::SAMPLED_VALUES: r->socket = SVReceiver_startThreadless(r->sv); break; } @@ -243,11 +243,11 @@ int iec61850_receiver_stop(struct iec61850_receiver *r) EthernetHandleSet_removeSocket(hset, r->socket); switch (r->type) { - case IEC61850_RECEIVER_GOOSE: + case iec61850_receiver::type::GOOSE: GooseReceiver_stopThreadless(r->goose); break; - case IEC61850_RECEIVER_SV: + case iec61850_receiver::type::SAMPLED_VALUES: SVReceiver_stopThreadless(r->sv); break; } @@ -258,11 +258,11 @@ int iec61850_receiver_stop(struct iec61850_receiver *r) int iec61850_receiver_destroy(struct iec61850_receiver *r) { switch (r->type) { - case IEC61850_RECEIVER_GOOSE: + case iec61850_receiver::type::GOOSE: GooseReceiver_destroy(r->goose); break; - case IEC61850_RECEIVER_SV: + case iec61850_receiver::type::SAMPLED_VALUES: SVReceiver_destroy(r->sv); break; } @@ -272,7 +272,7 @@ int iec61850_receiver_destroy(struct iec61850_receiver *r) return 0; } -struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver_type t, const char *intf) +struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver::type t, const char *intf) { for (unsigned i = 0; i < vlist_length(&receivers); i++) { struct iec61850_receiver *r = (struct iec61850_receiver *) vlist_at(&receivers, i); @@ -284,14 +284,14 @@ struct iec61850_receiver * iec61850_receiver_lookup(enum iec61850_receiver_type return NULL; } -struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver_type t, const char *intf) +struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver::type t, const char *intf) { struct iec61850_receiver *r; /* Check if there is already a SVReceiver for this interface */ r = iec61850_receiver_lookup(t, intf); if (!r) { - r = alloc(sizeof(struct iec61850_receiver)); + r = (struct iec61850_receiver *) alloc(sizeof(struct iec61850_receiver)); if (!r) return NULL; @@ -299,12 +299,12 @@ struct iec61850_receiver * iec61850_receiver_create(enum iec61850_receiver_type r->type = t; switch (r->type) { - case IEC61850_RECEIVER_GOOSE: + case iec61850_receiver::type::GOOSE: r->goose = GooseReceiver_create(); GooseReceiver_setInterfaceId(r->goose, r->interface); break; - case IEC61850_RECEIVER_SV: + case iec61850_receiver::type::SAMPLED_VALUES: r->sv = SVReceiver_create(); SVReceiver_setInterfaceId(r->sv, r->interface); break; diff --git a/lib/nodes/iec61850_sv.c b/lib/nodes/iec61850_sv.cpp similarity index 94% rename from lib/nodes/iec61850_sv.c rename to lib/nodes/iec61850_sv.cpp index e6f1338b1..277fca587 100644 --- a/lib/nodes/iec61850_sv.c +++ b/lib/nodes/iec61850_sv.cpp @@ -92,35 +92,35 @@ static void iec61850_sv_listener(SVSubscriber subscriber, void *ctx, SVSubscribe continue; switch (td->iec_type) { - case IEC61850_TYPE_INT8: + case iec61850_type::INT8: smp->data[j].i = SVSubscriber_ASDU_getINT8(asdu, offset); break; - case IEC61850_TYPE_INT16: + case iec61850_type::INT16: smp->data[j].i = SVSubscriber_ASDU_getINT16(asdu, offset); break; - case IEC61850_TYPE_INT32: + case iec61850_type::INT32: smp->data[j].i = SVSubscriber_ASDU_getINT32(asdu, offset); break; - case IEC61850_TYPE_INT8U: + case iec61850_type::INT8U: smp->data[j].i = SVSubscriber_ASDU_getINT8U(asdu, offset); break; - case IEC61850_TYPE_INT16U: + case iec61850_type::INT16U: smp->data[j].i = SVSubscriber_ASDU_getINT16U(asdu, offset); break; - case IEC61850_TYPE_INT32U: + case iec61850_type::INT32U: smp->data[j].i = SVSubscriber_ASDU_getINT32U(asdu, offset); break; - case IEC61850_TYPE_FLOAT32: + case iec61850_type::FLOAT32: smp->data[j].f = SVSubscriber_ASDU_getFLOAT32(asdu, offset); break; - case IEC61850_TYPE_FLOAT64: + case iec61850_type::FLOAT64: smp->data[j].f = SVSubscriber_ASDU_getFLOAT64(asdu, offset); break; @@ -280,19 +280,19 @@ int iec61850_sv_start(struct node *n) struct iec61850_type_descriptor *td = (struct iec61850_type_descriptor *) vlist_at(&i->out.signals, k); switch (td->iec_type) { - case IEC61850_TYPE_INT8: + case iec61850_type::INT8: SVPublisher_ASDU_addINT8(i->out.asdu); break; - case IEC61850_TYPE_INT32: + case iec61850_type::INT32: SVPublisher_ASDU_addINT32(i->out.asdu); break; - case IEC61850_TYPE_FLOAT32: + case iec61850_type::FLOAT32: SVPublisher_ASDU_addFLOAT(i->out.asdu); break; - case IEC61850_TYPE_FLOAT64: + case iec61850_type::FLOAT64: SVPublisher_ASDU_addFLOAT64(i->out.asdu); break; @@ -314,7 +314,7 @@ int iec61850_sv_start(struct node *n) /* Start subscriber */ if (i->in.enabled) { - struct iec61850_receiver *r = iec61850_receiver_create(IEC61850_RECEIVER_SV, i->interface); + struct iec61850_receiver *r = iec61850_receiver_create(iec61850_receiver::type::SAMPLED_VALUES, i->interface); i->in.receiver = r->sv; i->in.subscriber = SVSubscriber_create(i->dst_address.ether_addr_octet, i->app_id); @@ -414,13 +414,13 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig double fval = 0; switch (td->iec_type) { - case IEC61850_TYPE_INT8: - case IEC61850_TYPE_INT32: + case iec61850_type::INT8: + case iec61850_type::INT32: ival = sample_format(smps[j], k) == SIGNAL_TYPE_FLOAT ? smps[j]->data[k].f : smps[j]->data[k].i; break; - case IEC61850_TYPE_FLOAT32: - case IEC61850_TYPE_FLOAT64: + case iec61850_type::FLOAT32: + case iec61850_type::FLOAT64: fval = sample_format(smps[j], k) == SIGNAL_TYPE_FLOAT ? smps[j]->data[k].f : smps[j]->data[k].i; break; @@ -428,19 +428,19 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig } switch (td->iec_type) { - case IEC61850_TYPE_INT8: + case iec61850_type::INT8: SVPublisher_ASDU_setINT8(i->out.asdu, offset, ival); break; - case IEC61850_TYPE_INT32: + case iec61850_type::INT32: SVPublisher_ASDU_setINT32(i->out.asdu, offset, ival); break; - case IEC61850_TYPE_FLOAT32: + case iec61850_type::FLOAT32: SVPublisher_ASDU_setFLOAT(i->out.asdu, offset, fval); break; - case IEC61850_TYPE_FLOAT64: + case iec61850_type::FLOAT64: SVPublisher_ASDU_setFLOAT64(i->out.asdu, offset, fval); break; @@ -480,13 +480,15 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct iec61850_sv), - .type.start = iec61850_type_start, - .type.stop = iec61850_type_stop, + .type = { + .start = iec61850_type_start, + .stop = iec61850_type_stop + }, + .destroy = iec61850_sv_destroy, .parse = iec61850_sv_parse, .print = iec61850_sv_print, .start = iec61850_sv_start, .stop = iec61850_sv_stop, - .destroy = iec61850_sv_destroy, .read = iec61850_sv_read, .write = iec61850_sv_write, .poll_fds = iec61850_sv_poll_fds diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.cpp similarity index 97% rename from lib/nodes/infiniband.c rename to lib/nodes/infiniband.cpp index e3d2b96fc..7288b8d94 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.cpp @@ -351,8 +351,8 @@ int ib_check(struct node *n) error("The buffer substraction value cannot be bigger than in.max_wrs - in.vectorize"); /* Check if the set value is a power of 2, and warn the user if this is not the case */ - int max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr))); - int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr))); + unsigned max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr))); + unsigned max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr))); if (ib->qp_init.cap.max_send_wr != max_send_pow) { warning("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i", @@ -513,35 +513,35 @@ void * ib_rdma_cm_event_thread(void *n) switch(event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: - ret = ib_addr_resolved(n); + ret = ib_addr_resolved(node); break; case RDMA_CM_EVENT_ADDR_ERROR: warning("Address resolution (rdma_resolve_addr) failed!"); - ib_continue_as_listen(n, event); + ib_continue_as_listen(node, event); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: - ret = ib_route_resolved(n); + ret = ib_route_resolved(node); break; case RDMA_CM_EVENT_ROUTE_ERROR: warning("Route resolution (rdma_resovle_route) failed!"); - ib_continue_as_listen(n, event); + ib_continue_as_listen(node, event); break; case RDMA_CM_EVENT_UNREACHABLE: warning("Remote server unreachable!"); - ib_continue_as_listen(n, event); + ib_continue_as_listen(node, event); break; case RDMA_CM_EVENT_CONNECT_REQUEST: - ret = ib_connect_request(n, event->id); + ret = ib_connect_request(node, event->id); /* A target UDP node will never really connect. In order to receive data, * we set it to connected after it answered the connection request @@ -557,14 +557,14 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_ERROR: warning("An error has occurred trying to establish a connection!"); - ib_continue_as_listen(n, event); + ib_continue_as_listen(node, event); break; case RDMA_CM_EVENT_REJECTED: warning("Connection request or response was rejected by the remote end point!"); - ib_continue_as_listen(n, event); + ib_continue_as_listen(node, event); break; @@ -577,14 +577,14 @@ void * ib_rdma_cm_event_thread(void *n) node->state = STATE_CONNECTED; - info("Connection established in node %s", node_name(n)); + info("Connection established in node %s", node_name(node)); break; case RDMA_CM_EVENT_DISCONNECTED: node->state = STATE_STARTED; - ret = ib_disconnect(n); + ret = ib_disconnect(node); if (!ret) info("Host disconnected. Ready to accept new connections."); @@ -874,7 +874,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele struct ibv_mr *mr; int ret; - int sent = 0; /* Used for first loop: prepare work requests to post to send queue */ + unsigned sent = 0; /* Used for first loop: prepare work requests to post to send queue */ debug(LOG_IB | 10, "ib_write is called"); @@ -951,7 +951,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele /* Reorder list. Place inline and unposted samples to the top * m will always be equal or smaller than *release */ - for (int m = 0; m < cnt; m++) { + for (unsigned m = 0; m < cnt; m++) { /* We can't use wr_id as identifier, since it is 0 for inline * elements */ @@ -1007,15 +1007,15 @@ static struct plugin p = { .vectorize = 0, .size = sizeof(struct infiniband), .pool_size = 8192, - .reverse = ib_reverse, + .destroy = ib_destroy, .parse = ib_parse, .check = ib_check, .print = ib_print, .start = ib_start, - .destroy = ib_destroy, .stop = ib_stop, .read = ib_read, .write = ib_write, + .reverse = ib_reverse, .memory_type = memory_ib } }; diff --git a/lib/nodes/influxdb.c b/lib/nodes/influxdb.cpp similarity index 98% rename from lib/nodes/influxdb.c rename to lib/nodes/influxdb.cpp index bc98c2dfb..134223222 100644 --- a/lib/nodes/influxdb.c +++ b/lib/nodes/influxdb.cpp @@ -124,14 +124,14 @@ int influxdb_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned char *buf = NULL; ssize_t sentlen, buflen; - for (int k = 0; k < cnt; k++) { + for (unsigned k = 0; k < cnt; k++) { struct sample *smp = smps[k]; /* Key */ strcatf(&buf, "%s", i->key); /* Fields */ - for (int j = 0; j < smp->length; j++) { + for (unsigned j = 0; j < smp->length; j++) { struct signal *sig = (struct signal *) vlist_at(smp->signals, j); union signal_data *data = &smp->data[k]; diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.cpp similarity index 99% rename from lib/nodes/loopback.c rename to lib/nodes/loopback.cpp index fcc151e85..0e2c92260 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.cpp @@ -117,11 +117,11 @@ int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned int loopback_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { - int copied; - struct loopback *l = (struct loopback *) n->_vd; struct sample *copies[cnt]; + unsigned copied; + copied = sample_alloc_many(&l->pool, copies, cnt); if (copied < cnt) warning("Pool underrun for node %s", node_name(n)); diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.cpp similarity index 97% rename from lib/nodes/mqtt.c rename to lib/nodes/mqtt.cpp index 4a2c67d15..5d561bf1d 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.cpp @@ -92,7 +92,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct return; } - ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, smps, n->in.vectorize); + ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, NULL, smps, n->in.vectorize); if (ret < 0) { warning("MQTT: Node %s received an invalid message", node_name(n)); warning(" Payload: %s", (char *) msg->payload); @@ -104,7 +104,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct return; } - queue_signalled_push_many(&m->queue, (void *) smps, n->in.vectorize); + queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize); } static void mqtt_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) @@ -454,17 +454,19 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct mqtt), - .type.start = mqtt_type_start, - .type.stop = mqtt_type_stop, - .reverse = mqtt_reverse, + .type ={ + .start = mqtt_type_start, + .stop = mqtt_type_stop + }, + .destroy = mqtt_destroy, .parse = mqtt_parse, .check = mqtt_check, .print = mqtt_print, .start = mqtt_start, - .destroy = mqtt_destroy, .stop = mqtt_stop, .read = mqtt_read, .write = mqtt_write, + .reverse = mqtt_reverse, .poll_fds = mqtt_poll_fds } }; diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.cpp similarity index 97% rename from lib/nodes/nanomsg.c rename to lib/nodes/nanomsg.cpp index 121d4ca45..2e815554b 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.cpp @@ -37,8 +37,8 @@ int nanomsg_reverse(struct node *n) vlist_length(&m->in.endpoints) != 1) return -1; - char *subscriber = vlist_first(&m->in.endpoints); - char *publisher = vlist_first(&m->out.endpoints); + char *subscriber = (char *) vlist_first(&m->in.endpoints); + char *publisher = (char *) vlist_first(&m->out.endpoints); vlist_set(&m->in.endpoints, 0, publisher); vlist_set(&m->out.endpoints, 0, subscriber); @@ -297,14 +297,16 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct nanomsg), - .type.stop = nanomsg_type_stop, - .reverse = nanomsg_reverse, + .type = { + .stop = nanomsg_type_stop + }, .parse = nanomsg_parse, .print = nanomsg_print, .start = nanomsg_start, .stop = nanomsg_stop, .read = nanomsg_read, .write = nanomsg_write, + .reverse = nanomsg_reverse, .poll_fds = nanomsg_poll_fds, .netem_fds = nanomsg_netem_fds } diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.cpp similarity index 97% rename from lib/nodes/ngsi.c rename to lib/nodes/ngsi.cpp index 9cd5cf2ff..259b7f78a 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.cpp @@ -83,7 +83,7 @@ static json_t* ngsi_build_entity(struct ngsi *i, struct sample *smps[], unsigned if (flags & NGSI_ENTITY_VALUES) { /* Build value vector */ json_t *values = json_array(); - for (int k = 0; k < cnt; k++) { + for (unsigned k = 0; k < cnt; k++) { json_array_append_new(values, json_pack("[ f, f, i ]", time_to_double(&smps[k]->ts.origin), smps[k]->data[map->index].f, @@ -139,7 +139,7 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps if (strcmp(id, i->entity_id) || strcmp(type, i->entity_type)) return -2; - for (int k = 0; k < cnt; k++) + for (unsigned k = 0; k < cnt; k++) smps[k]->length = json_array_size(attributes); json_array_foreach(attributes, l, attribute) { @@ -158,7 +158,7 @@ static int ngsi_parse_entity(json_t *entity, struct ngsi *i, struct sample *smps return -3; /* Check attribute name and type */ - map = vlist_lookup(&i->mapping, name); + map = (struct ngsi_attribute *) vlist_lookup(&i->mapping, name); if (!map || strcmp(map->type, type)) return -4; @@ -237,15 +237,15 @@ static int ngsi_parse_mapping(struct vlist *mapping, json_t *cfg) /* Metadata: source(string)=name */ struct ngsi_metadata s = { - .name = "source", - .type = "string", + .name = strdup("source"), + .type = strdup("string"), .value = name }; /* Metadata: index(integer)=j */ struct ngsi_metadata i = { - .name = "index", - .type = "integer" + .name = strdup("index"), + .type = strdup("integer") }; assert(asprintf(&i.value, "%zu", j)); @@ -289,7 +289,7 @@ static size_t ngsi_request_writer(void *contents, size_t size, size_t nmemb, voi size_t realsize = size * nmemb; struct ngsi_response *mem = (struct ngsi_response *) userp; - mem->data = realloc(mem->data, mem->len + realsize + 1); + mem->data = (char *) realloc(mem->data, mem->len + realsize + 1); if (mem->data == NULL) /* out of memory! */ error("Not enough memory (realloc returned NULL)"); @@ -592,8 +592,10 @@ static struct plugin p = { .node = { .vectorize = 0, /* unlimited */ .size = sizeof(struct ngsi), - .type.start = ngsi_type_start, - .type.stop = ngsi_type_stop, + .type = { + .start = ngsi_type_start, + .stop = ngsi_type_stop + }, .parse = ngsi_parse, .print = ngsi_print, .start = ngsi_start, diff --git a/lib/nodes/opal.c b/lib/nodes/opal.cpp similarity index 100% rename from lib/nodes/opal.c rename to lib/nodes/opal.cpp diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.cpp similarity index 98% rename from lib/nodes/shmem.c rename to lib/nodes/shmem.cpp index 260a46ab0..b0809027c 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.cpp @@ -78,7 +78,7 @@ int shmem_parse(struct node *n, json_t *cfg) if (!json_is_array(json_exec)) error("Setting 'exec' of node %s must be an array of strings", node_name(n)); - shm->exec = alloc(sizeof(char *) * (json_array_size(json_exec) + 1)); + shm->exec = (char **) alloc(sizeof(char *) * (json_array_size(json_exec) + 1)); size_t i; json_t *json_val; @@ -161,7 +161,7 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r int avail, pushed, copied; avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt); - if (avail != cnt) + if (avail != (int) cnt) warning("Pool underrun for shmem node %s", shm->out_name); copied = sample_copy_many(shared_smps, smps, avail); diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.cpp similarity index 79% rename from lib/nodes/signal_generator.c rename to lib/nodes/signal_generator.cpp index 7428c7985..720dbe867 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.cpp @@ -28,53 +28,53 @@ #include #include -static enum signal_generator_type signal_generator_lookup_type(const char *type) +static enum signal_generator::type signal_generator_lookup_type(const char *type) { if (!strcmp(type, "random")) - return SIGNAL_GENERATOR_TYPE_RANDOM; + return signal_generator::type::RANDOM; else if (!strcmp(type, "sine")) - return SIGNAL_GENERATOR_TYPE_SINE; + return signal_generator::type::SINE; else if (!strcmp(type, "square")) - return SIGNAL_GENERATOR_TYPE_SQUARE; + return signal_generator::type::SQUARE; else if (!strcmp(type, "triangle")) - return SIGNAL_GENERATOR_TYPE_TRIANGLE; + return signal_generator::type::TRIANGLE; else if (!strcmp(type, "ramp")) - return SIGNAL_GENERATOR_TYPE_RAMP; + return signal_generator::type::RAMP; else if (!strcmp(type, "counter")) - return SIGNAL_GENERATOR_TYPE_COUNTER; + return signal_generator::type::COUNTER; else if (!strcmp(type, "constant")) - return SIGNAL_GENERATOR_TYPE_CONSTANT; + return signal_generator::type::CONSTANT; else if (!strcmp(type, "mixed")) - return SIGNAL_GENERATOR_TYPE_MIXED; + return signal_generator::type::MIXED; else - return -1; + return signal_generator::type::INVALID; } -static const char * signal_generator_type_str(enum signal_generator_type type) +static const char * signal_generator_type_str(enum signal_generator::type type) { switch (type) { - case SIGNAL_GENERATOR_TYPE_CONSTANT: + case signal_generator::type::CONSTANT: return "constant"; - case SIGNAL_GENERATOR_TYPE_SINE: + case signal_generator::type::SINE: return "sine"; - case SIGNAL_GENERATOR_TYPE_TRIANGLE: + case signal_generator::type::TRIANGLE: return "triangle"; - case SIGNAL_GENERATOR_TYPE_SQUARE: + case signal_generator::type::SQUARE: return "square"; - case SIGNAL_GENERATOR_TYPE_RAMP: + case signal_generator::type::RAMP: return "ramp"; - case SIGNAL_GENERATOR_TYPE_COUNTER: + case signal_generator::type::COUNTER: return "counter"; - case SIGNAL_GENERATOR_TYPE_RANDOM: + case signal_generator::type::RANDOM: return "random"; - case SIGNAL_GENERATOR_TYPE_MIXED: + case signal_generator::type::MIXED: return "mixed"; default: @@ -88,12 +88,12 @@ int signal_generator_prepare(struct node *n) assert(vlist_length(&n->in.signals) == 0); - for (int i = 0; i < s->values; i++) { - struct signal *sig = alloc(sizeof(struct signal)); + for (unsigned i = 0; i < s->values; i++) { + struct signal *sig = (struct signal *) alloc(sizeof(struct signal)); - int rtype = s->type == SIGNAL_GENERATOR_TYPE_MIXED ? i % 7 : s->type; + int rtype = s->type == signal_generator::type::MIXED ? i % 7 : s->type; - sig->name = strdup(signal_generator_type_str(rtype)); + sig->name = strdup(signal_generator_type_str((enum signal_generator::type) rtype)); sig->type = SIGNAL_TYPE_FLOAT; /* All generated signals are of type float */ vlist_push(&n->in.signals, sig); @@ -141,10 +141,10 @@ int signal_generator_parse(struct node *n, json_t *cfg) if (ret == -1) error("Unknown signal type '%s' of node %s", type, node_name(n)); - s->type = ret; + s->type = (enum signal_generator::type) ret; } else - s->type = SIGNAL_GENERATOR_TYPE_MIXED; + s->type = signal_generator::type::MIXED; return 0; } @@ -157,9 +157,9 @@ int signal_generator_start(struct node *n) s->missed_steps = 0; s->counter = 0; s->started = time_now(); - s->last = alloc(sizeof(double) * s->values); + s->last = (double *) alloc(sizeof(double) * s->values); - for (int i = 0; i < s->values; i++) + for (unsigned i = 0; i < s->values; i++) s->last[i] = s->offset; /* Setup task */ @@ -228,42 +228,42 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u t->length = MIN(s->values, t->capacity); t->signals = &n->in.signals; - for (int i = 0; i < MIN(s->values, t->capacity); i++) { - int rtype = (s->type != SIGNAL_GENERATOR_TYPE_MIXED) ? s->type : i % 7; + for (unsigned i = 0; i < MIN(s->values, t->capacity); i++) { + int rtype = (s->type != signal_generator::type::MIXED) ? s->type : i % 7; switch (rtype) { - case SIGNAL_GENERATOR_TYPE_CONSTANT: + case signal_generator::type::CONSTANT: t->data[i].f = s->offset + s->amplitude; break; - case SIGNAL_GENERATOR_TYPE_SINE: + case signal_generator::type::SINE: t->data[i].f = s->offset + s->amplitude * sin(running * s->frequency * 2 * M_PI); break; - case SIGNAL_GENERATOR_TYPE_TRIANGLE: + case signal_generator::type::TRIANGLE: t->data[i].f = s->offset + s->amplitude * (fabs(fmod(running * s->frequency, 1) - .5) - 0.25) * 4; break; - case SIGNAL_GENERATOR_TYPE_SQUARE: + case signal_generator::type::SQUARE: t->data[i].f = s->offset + s->amplitude * ( (fmod(running * s->frequency, 1) < .5) ? -1 : 1); break; - case SIGNAL_GENERATOR_TYPE_RAMP: + case signal_generator::type::RAMP: t->data[i].f = s->offset + s->amplitude * fmod(running, s->frequency); break; - case SIGNAL_GENERATOR_TYPE_COUNTER: + case signal_generator::type::COUNTER: t->data[i].f = s->offset + s->amplitude * s->counter; break; - case SIGNAL_GENERATOR_TYPE_RANDOM: + case signal_generator::type::RANDOM: s->last[i] += box_muller(0, s->stddev); t->data[i].f = s->last[i]; break; } } - if (s->limit > 0 && s->counter >= s->limit) { + if (s->limit > 0 && s->counter >= (unsigned) s->limit) { info("Reached limit."); n->state = STATE_STOPPING; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.cpp similarity index 92% rename from lib/nodes/socket.c rename to lib/nodes/socket.cpp index cfc3a37cd..8ef3817e1 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.cpp @@ -73,7 +73,8 @@ int socket_type_start(struct super_node *sn) char * socket_print(struct node *n) { struct socket *s = (struct socket *) n->_vd; - char *layer = NULL, *buf; + const char *layer = NULL; + char *buf; switch (s->layer) { case SOCKET_LAYER_UDP: @@ -270,12 +271,12 @@ int socket_start(struct node *n) } s->out.buflen = SOCKET_INITIAL_BUFFER_LEN; - s->out.buf = alloc(s->out.buflen); + s->out.buf = (char *) alloc(s->out.buflen); if (!s->out.buf) return -1; s->in.buflen = SOCKET_INITIAL_BUFFER_LEN; - s->in.buf = alloc(s->in.buflen); + s->in.buf = (char *) alloc(s->in.buflen); if (!s->in.buf) return -1; @@ -374,7 +375,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r } ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt); - if (ret < 0 || bytes != rbytes) + if (ret < 0 || (size_t) bytes != rbytes) warning("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); return ret; @@ -401,7 +402,7 @@ retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt); if (wbytes > s->out.buflen) { s->out.buflen = wbytes; - s->out.buf = realloc(s->out.buf, s->out.buflen); + s->out.buf = (char *) realloc(s->out.buf, s->out.buflen); goto retry; } @@ -441,8 +442,7 @@ retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->out else warning("Failed sendto() to node %s", node_name(n)); } - - if (bytes != wbytes) + else if ((size_t) bytes < wbytes) warning("Partial sendto() to node %s", node_name(n)); return cnt; @@ -557,30 +557,35 @@ int socket_fds(struct node *n, int fds[]) return 1; } -static struct plugin p = { - .name = "socket", +__attribute__((constructor(110))) +static void register_plugin() { + p.name = "socket"; #ifdef WITH_NETEM - .description = "BSD network sockets for Ethernet / IP / UDP (libnl3, netem support)", + p.description = "BSD network sockets for Ethernet / IP / UDP (libnl3, netem support)"; #else - .description = "BSD network sockets for Ethernet / IP / UDP", + p.description = "BSD network sockets for Ethernet / IP / UDP"; #endif - .type = PLUGIN_TYPE_NODE, - .node = { - .vectorize = 0, - .size = sizeof(struct socket), - .type.start = socket_type_start, - .reverse = socket_reverse, - .parse = socket_parse, - .print = socket_print, - .check = socket_check, - .start = socket_start, - .stop = socket_stop, - .read = socket_read, - .write = socket_write, - .poll_fds = socket_fds, - .netem_fds = socket_fds - } -}; + p.type = PLUGIN_TYPE_NODE; + p.node.vectorize = 0; + p.node.size = sizeof(struct socket); + p.node.type.start = socket_type_start; + p.node.reverse = socket_reverse; + p.node.parse = socket_parse; + p.node.print = socket_print; + p.node.check = socket_check; + p.node.start = socket_start; + p.node.stop = socket_stop; + p.node.read = socket_read; + p.node.write = socket_write; + p.node.poll_fds = socket_fds; + p.node.netem_fds = socket_fds; -REGISTER_PLUGIN(&p) -LIST_INIT_STATIC(&p.node.instances) + vlist_init(&p.node.instances); + vlist_push(&plugins, &p); +} + +__attribute__((destructor(110))) +static void deregister_plugin() { + if (plugins.state != STATE_DESTROYED) + vlist_remove_all(&plugins, &p); +} diff --git a/lib/nodes/uldaq.c b/lib/nodes/uldaq.cpp similarity index 94% rename from lib/nodes/uldaq.c rename to lib/nodes/uldaq.cpp index 761a304a6..73707e8e8 100644 --- a/lib/nodes/uldaq.c +++ b/lib/nodes/uldaq.cpp @@ -104,27 +104,27 @@ static const struct { static AiInputMode uldaq_parse_input_mode(const char *str) { - for (int i = 0; i < ARRAY_LEN(input_modes); i++) { + for (unsigned i = 0; i < ARRAY_LEN(input_modes); i++) { if (!strcmp(input_modes[i].name, str)) return input_modes[i].mode; } - return -1; + return (AiInputMode) -1; } static DaqDeviceInterface uldaq_parse_interface_type(const char *str) { - for (int i = 0; i < ARRAY_LEN(interface_types); i++) { + for (unsigned i = 0; i < ARRAY_LEN(interface_types); i++) { if (!strcmp(interface_types[i].name, str)) return interface_types[i].interface; } - return -1; + return (DaqDeviceInterface) -1; } static const char * uldaq_print_interface_type(DaqDeviceInterface iftype) { - for (int i = 0; i < ARRAY_LEN(interface_types); i++) { + for (unsigned i = 0; i < ARRAY_LEN(interface_types); i++) { if (interface_types[i].interface == iftype) return interface_types[i].name; } @@ -134,12 +134,12 @@ static const char * uldaq_print_interface_type(DaqDeviceInterface iftype) static Range uldaq_parse_range(const char *str) { - for (int i = 0; i < ARRAY_LEN(ranges); i++) { + for (unsigned i = 0; i < ARRAY_LEN(ranges); i++) { if (!strcmp(ranges[i].name, str)) return ranges[i].range; } - return -1; + return (Range) -1; } static DaqDeviceDescriptor * uldaq_find_device(struct uldaq *u) { @@ -151,7 +151,7 @@ static DaqDeviceDescriptor * uldaq_find_device(struct uldaq *u) { if (u->device_interface_type == ANY_IFC && u->device_id == NULL) return &descriptors[0]; - for (int i = 0; i < num_devs; i++) { + for (unsigned i = 0; i < num_devs; i++) { d = &descriptors[i]; if (u->device_id) { @@ -223,7 +223,7 @@ int uldaq_type_start(struct super_node *sn) } info("Found %d DAQ devices", num_devs); - for (int i = 0; i < num_devs; i++) { + for (unsigned i = 0; i < num_devs; i++) { DaqDeviceDescriptor *desc = &descriptors[i]; info(" %d: %s %s (%s)", i, desc->uniqueId, desc->devString, uldaq_print_interface_type(desc->devInterface)); @@ -306,11 +306,11 @@ int uldaq_parse(struct node *n, json_t *cfg) if (iftype < 0) error("Invalid interface type: %s for node '%s'", interface_type, node_name(n)); - u->device_interface_type = iftype; + u->device_interface_type = (DaqDeviceInterface) iftype; } u->in.channel_count = vlist_length(&n->in.signals); - u->in.queues = realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count); + u->in.queues = (struct AiQueueElement *) realloc(u->in.queues, sizeof(struct AiQueueElement) * u->in.channel_count); json_array_foreach(json_signals, i, json_signal) { const char *range_str = NULL, *input_mode_str = NULL; @@ -347,8 +347,8 @@ int uldaq_parse(struct node *n, json_t *cfg) if (input_mode < 0) error("Invalid input mode specified for signal %zu of node %s.", i, node_name(n)); - u->in.queues[i].range = range; - u->in.queues[i].inputMode = input_mode; + u->in.queues[i].range = (Range) range; + u->in.queues[i].inputMode = (AiInputMode) input_mode; u->in.queues[i].channel = channel; } @@ -539,7 +539,7 @@ int uldaq_start(struct node *n) err = ulEnableEvent(u->device_handle, DE_ON_DATA_AVAILABLE, n->in.vectorize, uldaq_data_available, n); /* Start the acquisition */ - err = ulAInScan(u->device_handle, 0, 0, 0, 0, u->in.buffer_len / u->in.channel_count, &u->in.sample_rate, u->in.scan_options, u->in.flags, u->in.buffer); + err = ulAInScan(u->device_handle, 0, 0, (AiInputMode) 0, (Range) 0, u->in.buffer_len / u->in.channel_count, &u->in.sample_rate, u->in.scan_options, u->in.flags, u->in.buffer); if (err != ERR_NO_ERROR) { warning("Failed to start acquisition on DAQ device for node '%s'", node_name(n)); return -1; @@ -610,12 +610,12 @@ int uldaq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re if (start_index + n->in.vectorize * u->in.channel_count > u->in.transfer_status.currentScanCount) pthread_cond_wait(&u->in.cv, &u->in.mutex); - for (int j = 0; j < cnt; j++) { + for (unsigned j = 0; j < cnt; j++) { struct sample *smp = smps[j]; long long scan_index = start_index + j * u->in.channel_count; - for (int i = 0; i < u->in.channel_count; i++) { + for (unsigned i = 0; i < u->in.channel_count; i++) { long long channel_index = (scan_index + i) % u->in.buffer_len; smp->data[i].f = u->in.buffer[channel_index]; @@ -642,10 +642,12 @@ static struct plugin p = { .vectorize = 0, .flags = 0, .size = sizeof(struct uldaq), - .type.start = uldaq_type_start, - .parse = uldaq_parse, + .type = { + .start = uldaq_type_start + }, .init = uldaq_init, .destroy = uldaq_destroy, + .parse = uldaq_parse, .print = uldaq_print, .start = uldaq_start, .stop = uldaq_stop, diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.cpp similarity index 85% rename from lib/nodes/websocket.c rename to lib/nodes/websocket.cpp index dccef4fd5..176e19047 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.cpp @@ -58,13 +58,13 @@ static char * websocket_connection_name(struct websocket_connection *c) strcatf(&c->_name, "remote.ip=%s, remote.name=%s", ip, name); } - else if (c->mode == WEBSOCKET_MODE_CLIENT && c->destination != NULL) + else if (c->mode == websocket_connection::mode::CLIENT && c->destination != NULL) strcatf(&c->_name, "dest=%s:%d", c->destination->info.address, c->destination->info.port); if (c->node) strcatf(&c->_name, ", node=%s", node_name(c->node)); - strcatf(&c->_name, ", mode=%s", c->mode == WEBSOCKET_MODE_CLIENT ? "client" : "server"); + strcatf(&c->_name, ", mode=%s", c->mode == websocket_connection::mode::CLIENT ? "client" : "server"); } return c->_name; @@ -104,7 +104,7 @@ static int websocket_connection_init(struct websocket_connection *c) if (ret) return ret; - c->state = WEBSOCKET_CONNECTION_STATE_INITIALIZED; + c->state = websocket_connection::state::INITIALIZED; return 0; } @@ -113,7 +113,7 @@ static int websocket_connection_destroy(struct websocket_connection *c) { int ret; - assert(c->state != WEBSOCKET_CONNECTION_STATE_DESTROYED); + assert(c->state != websocket_connection::state::DESTROYED); if (c->_name) free(c->_name); @@ -143,7 +143,7 @@ static int websocket_connection_destroy(struct websocket_connection *c) c->wsi = NULL; c->_name = NULL; - c->state = WEBSOCKET_CONNECTION_STATE_DESTROYED; + c->state = websocket_connection::state::DESTROYED; return 0; } @@ -152,11 +152,11 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam { int pushed; - if (c->state != WEBSOCKET_CONNECTION_STATE_INITIALIZED) + if (c->state != websocket_connection::state::INITIALIZED) return -1; pushed = queue_push_many(&c->queue, (void **) smps, cnt); - if (pushed < cnt) + if (pushed < (int) cnt) warning("Queue overrun in WebSocket connection: %s", websocket_connection_name(c)); sample_incref_many(smps, pushed); @@ -180,20 +180,20 @@ static void websocket_connection_close(struct websocket_connection *c, struct lw int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { int ret, recvd, pulled, cnt = 128; - struct websocket_connection *c = user; + struct websocket_connection *c = (struct websocket_connection *) user; switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: case LWS_CALLBACK_ESTABLISHED: c->wsi = wsi; - c->state = WEBSOCKET_CONNECTION_STATE_ESTABLISHED; + c->state = websocket_connection::state::ESTABLISHED; info("Established WebSocket connection: %s", websocket_connection_name(c)); if (reason == LWS_CALLBACK_CLIENT_ESTABLISHED) - c->mode = WEBSOCKET_MODE_CLIENT; + c->mode = websocket_connection::mode::CLIENT; else { - c->mode = WEBSOCKET_MODE_SERVER; + c->mode = websocket_connection::mode::SERVER; /* We use the URI to associate this connection to a node * and choose a protocol. * @@ -222,10 +222,10 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi format = strtok_r(NULL, "", &lasts); if (!format) - format = "villas.web"; + format = (char *) "villas.web"; /* Search for node whose name matches the URI. */ - c->node = vlist_lookup(&p.node.instances, node); + c->node = (struct node *) vlist_lookup(&p.node.instances, node); if (!c->node) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION, "Unknown node"); warning("Failed to find node: node=%s", node); @@ -253,7 +253,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - c->state = WEBSOCKET_CONNECTION_STATE_ERROR; + c->state = websocket_connection::state::ERROR; warning("Failed to establish WebSocket connection: %s, reason=%s", websocket_connection_name(c), in ? (char *) in : "unkown"); @@ -262,24 +262,24 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi case LWS_CALLBACK_CLOSED: debug(LOG_WEBSOCKET | 10, "Closed WebSocket connection: %s", websocket_connection_name(c)); - if (c->state != WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { + if (c->state != websocket_connection::state::SHUTDOWN) { /** @todo Attempt reconnect here */ } if (connections.state == STATE_INITIALIZED) vlist_remove_all(&connections, c); - if (c->state == WEBSOCKET_CONNECTION_STATE_INITIALIZED) + if (c->state == websocket_connection::state::INITIALIZED) websocket_connection_destroy(c); - if (c->mode == WEBSOCKET_MODE_CLIENT) + if (c->mode == websocket_connection::mode::CLIENT) free(c); break; case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_SERVER_WRITEABLE: { - struct sample **smps = alloca(cnt * sizeof(struct sample *)); + struct sample **smps = (struct sample **) alloca(cnt * sizeof(struct sample *)); pulled = queue_pull_many(&c->queue, (void **) smps, cnt); if (pulled > 0) { @@ -298,7 +298,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (queue_available(&c->queue) > 0) lws_callback_on_writable(wsi); - else if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { + else if (c->state == websocket_connection::state::SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } @@ -311,7 +311,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi if (lws_is_first_fragment(wsi)) buffer_clear(&c->buffers.recv); - ret = buffer_append(&c->buffers.recv, in, len); + ret = buffer_append(&c->buffers.recv, (char *) in, len); if (ret) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE, "Failed to process data"); return -1; @@ -324,7 +324,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi int avail, enqueued; struct websocket *w = (struct websocket *) n->_vd; - struct sample **smps = alloca(cnt * sizeof(struct sample *)); + struct sample **smps = (struct sample **) alloca(cnt * sizeof(struct sample *)); if (!smps) { warning("Failed to allocate memory for connection: %s", websocket_connection_name(c)); break; @@ -358,7 +358,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi buffer_clear(&c->buffers.recv); - if (c->state == WEBSOCKET_CONNECTION_STATE_SHUTDOWN) { + if (c->state == websocket_connection::state::SHUTDOWN) { websocket_connection_close(c, wsi, LWS_CLOSE_STATUS_GOINGAWAY, "Node stopped"); return -1; } @@ -397,12 +397,12 @@ int websocket_start(struct node *n) if (ret) return ret; - for (int i = 0; i < vlist_length(&w->destinations); i++) { + for (size_t i = 0; i < vlist_length(&w->destinations); i++) { const char *format; struct websocket_destination *d = (struct websocket_destination *) vlist_at(&w->destinations, i); struct websocket_connection *c = (struct websocket_connection *) alloc(sizeof(struct websocket_connection)); - c->state = WEBSOCKET_CONNECTION_STATE_CONNECTING; + c->state = websocket_connection::state::CONNECTING; format = strchr(d->info.path, '.'); if (format) @@ -438,13 +438,13 @@ int websocket_stop(struct node *n) if (c->node != n) continue; - c->state = WEBSOCKET_CONNECTION_STATE_SHUTDOWN; + c->state = websocket_connection::state::SHUTDOWN; lws_callback_on_writable(c->wsi); } /* Count open connections belonging to this node */ - for (int i = 0; i < vlist_length(&connections); i++) { + for (size_t i = 0; i < vlist_length(&connections); i++) { struct websocket_connection *c = (struct websocket_connection *) vlist_at(&connections, i); if (c->node == n) @@ -509,7 +509,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigne /* Make copies of all samples */ avail = sample_alloc_many(&w->pool, cpys, cnt); - if (avail < cnt) + if (avail < (int) cnt) warning("Pool underrun for node %s: avail=%u", node_name(n), avail); sample_copy_many(cpys, smps, avail); @@ -609,24 +609,32 @@ int websocket_poll_fds(struct node *n, int fds[]) return 1; } -static struct plugin p = { - .name = "websocket", - .description = "Send and receive samples of a WebSocket connection (libwebsockets)", - .type = PLUGIN_TYPE_NODE, - .node = { - .vectorize = 0, /* unlimited */ - .size = sizeof(struct websocket), - .type.start = websocket_type_start, - .start = websocket_start, - .stop = websocket_stop, - .destroy = websocket_destroy, - .read = websocket_read, - .write = websocket_write, - .print = websocket_print, - .parse = websocket_parse, - .poll_fds = websocket_poll_fds - } -}; +__attribute__((constructor(110))) static void UNIQUE(__ctor)() { + if (plugins.state == STATE_DESTROYED) + vlist_init(&plugins); -REGISTER_PLUGIN(&p) -LIST_INIT_STATIC(&p.node.instances) + p.name = "websocket"; + p.description = "Send and receive samples of a WebSocket connection (libwebsockets)"; + p.type = PLUGIN_TYPE_NODE; + p.node.vectorize = 0; /* unlimited */ + p.node.size = sizeof(struct websocket); + p.node.instances.state = STATE_DESTROYED; + p.node.type.start = websocket_type_start; + p.node.destroy = websocket_destroy; + p.node.parse = websocket_parse; + p.node.print = websocket_print; + p.node.start = websocket_start; + p.node.stop = websocket_stop; + p.node.read = websocket_read; + p.node.write = websocket_write; + p.node.poll_fds = websocket_poll_fds; + + vlist_init(&p.node.instances); + + vlist_push(&plugins, &p); +} + +__attribute__((destructor(110))) static void UNIQUE(__dtor)() { + if (plugins.state != STATE_DESTROYED) + vlist_remove_all(&plugins, &p); +} diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.cpp similarity index 94% rename from lib/nodes/zeromq.c rename to lib/nodes/zeromq.cpp index 9fecd2186..6c5b0ed6a 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.cpp @@ -83,7 +83,7 @@ int zeromq_reverse(struct node *n) return -1; char *subscriber = z->in.endpoint; - char *publisher = vlist_first(&z->out.endpoints); + char *publisher = (char *) vlist_first(&z->out.endpoints); z->in.endpoint = publisher; vlist_set(&z->out.endpoints, 0, subscriber); @@ -189,10 +189,10 @@ int zeromq_parse(struct node *n, json_t *cfg) if (type) { if (!strcmp(type, "pubsub")) - z->pattern = ZEROMQ_PATTERN_PUBSUB; + z->pattern = zeromq::pattern::PUBSUB; #ifdef ZMQ_BUILD_DISH else if (!strcmp(type, "radiodish")) - z->pattern = ZEROMQ_PATTERN_RADIODISH; + z->pattern = zeromq::pattern::RADIODISH; #endif else error("Invalid type for ZeroMQ node: %s", node_name_short(n)); @@ -206,12 +206,17 @@ char * zeromq_print(struct node *n) struct zeromq *z = (struct zeromq *) n->_vd; char *buf = NULL; - char *pattern = NULL; + const char *pattern = NULL; switch (z->pattern) { - case ZEROMQ_PATTERN_PUBSUB: pattern = "pubsub"; break; + case zeromq::pattern::PUBSUB: + pattern = "pubsub"; + break; + #ifdef ZMQ_BUILD_DISH - case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break; + case zeromq::pattern::RADIODISH: + pattern = "radiodish"; + break; #endif } @@ -267,13 +272,13 @@ int zeromq_start(struct node *n) switch (z->pattern) { #ifdef ZMQ_BUILD_DISH - case ZEROMQ_PATTERN_RADIODISH: + case zeromq::pattern::RADIODISH: z->in.socket = zmq_socket(context, ZMQ_DISH); z->out.socket = zmq_socket(context, ZMQ_RADIO); break; #endif - case ZEROMQ_PATTERN_PUBSUB: + case zeromq::pattern::PUBSUB: z->in.socket = zmq_socket(context, ZMQ_SUB); z->out.socket = zmq_socket(context, ZMQ_PUB); break; @@ -287,12 +292,12 @@ int zeromq_start(struct node *n) /* Join group */ switch (z->pattern) { #ifdef ZMQ_BUILD_DISH - case ZEROMQ_PATTERN_RADIODISH: + case zeromq::pattern::RADIODISH: ret = zmq_join(z->in.socket, z->in.filter); break; #endif - case ZEROMQ_PATTERN_PUBSUB: + case zeromq::pattern::PUBSUB: ret = zmq_setsockopt(z->in.socket, ZMQ_SUBSCRIBE, z->in.filter, z->in.filter ? strlen(z->in.filter) : 0); break; @@ -452,7 +457,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r if (z->in.filter) { switch (z->pattern) { - case ZEROMQ_PATTERN_PUBSUB: + case zeromq::pattern::PUBSUB: /* Discard envelope */ zmq_recv(z->in.socket, NULL, 0, 0); break; @@ -466,7 +471,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r if (ret < 0) return ret; - recv = io_sscan(&z->io, zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt); + recv = io_sscan(&z->io, (const char *) zmq_msg_data(&m), zmq_msg_size(&m), NULL, smps, cnt); ret = zmq_msg_close(&m); if (ret) @@ -494,14 +499,14 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned * if (z->out.filter) { switch (z->pattern) { #ifdef ZMQ_BUILD_DISH - case ZEROMQ_PATTERN_RADIODISH: + case zeromq::pattern::RADIODISH: ret = zmq_msg_set_group(&m, z->out.filter); if (ret < 0) goto fail; break; #endif - case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */ + case zeromq::pattern::PUBSUB: /* Send envelope */ zmq_send(z->out.socket, z->out.filter, strlen(z->out.filter), ZMQ_SNDMORE); break; } @@ -566,16 +571,18 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct zeromq), - .type.start = zeromq_type_start, - .type.stop = zeromq_type_stop, - .reverse = zeromq_reverse, + .type = { + .start = zeromq_type_start, + .stop = zeromq_type_stop, + }, + .destroy = zeromq_destroy, .parse = zeromq_parse, .print = zeromq_print, .start = zeromq_start, .stop = zeromq_stop, - .destroy = zeromq_destroy, .read = zeromq_read, .write = zeromq_write, + .reverse = zeromq_reverse, .poll_fds = zeromq_poll_fds, .netem_fds = zeromq_netem_fds, }