1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'setvbuf-file-node' into develop. Closes #190

This commit is contained in:
Dennis Potter 2018-08-07 10:12:55 +02:00
commit 5c5adb93f3
5 changed files with 42 additions and 13 deletions

View file

@ -50,6 +50,8 @@ struct file {
int flush; /**< Flush / upload file contents after each write. */
struct task task; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
double rate; /**< The read rate. */
size_t buffer_size_out; /**< Defines size of output stream buffer. No buffer is created if value is set to zero. */
size_t buffer_size_in; /**< Defines size of input stream buffer. No buffer is created if value is set to zero. */
enum epoch_mode {
FILE_EPOCH_DIRECT,

View file

@ -65,6 +65,7 @@ struct signal_generator {
double amplitude; /**< Amplitude of the generated signals. */
double stddev; /**< Standard deviation of random signals (normal distributed). */
double offset; /**< A constant bias. */
int monitor_missed; /**< Boolean, if set, node counts missed steps and warns user. */
double *last; /**< The values from the previous period which are required for random walk. */

View file

@ -89,8 +89,10 @@ int file_parse(struct node *n, json_t *cfg)
f->eof = FILE_EOF_EXIT;
f->epoch_mode = FILE_EPOCH_DIRECT;
f->flush = 0;
f->buffer_size_in = 0;
f->buffer_size_out = 0;
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b } }",
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F, s?: i }, s?: { s?: b, s?: i } }",
"uri", &uri_tmpl,
"format", &format,
"in",
@ -98,8 +100,10 @@ int file_parse(struct node *n, json_t *cfg)
"rate", &f->rate,
"epoch_mode", &epoch_mode,
"epoch", &epoch_flt,
"buffer_size", &f->buffer_size_in,
"out",
"flush", &f->flush
"flush", &f->flush,
"buffer_size", &f->buffer_size_out
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
@ -219,6 +223,18 @@ int file_start(struct node *n)
if (ret)
return ret;
if (f->buffer_size_in) {
ret = setvbuf(f->io.input.stream.std, NULL, _IOFBF, f->buffer_size_in);
if(ret)
return ret;
}
if (f->buffer_size_out) {
ret = setvbuf(f->io.output.stream.std, NULL, _IOFBF, f->buffer_size_out);
if(ret)
return ret;
}
/* Create timer */
ret = task_init(&f->task, f->rate, CLOCK_REALTIME);
if (ret)

View file

@ -453,6 +453,7 @@ static void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event)
"continue as listening node in such cases, set use_fallback = true in the configuration",
node_name(n));
n->state = STATE_STARTED;
// Acknowledge event
rdma_ack_cm_event(event);
@ -533,10 +534,10 @@ void * ib_rdma_cm_event_thread(void *n)
case RDMA_CM_EVENT_CONNECT_REQUEST:
ret = ib_connect_request(n, event->id);
//ToDo: Think about this. In this context, we say that the QP is initialized
//and at least one other node send data
if (ib->conn.port_space == RDMA_PS_UDP)
node->state = STATE_CONNECTED;
// Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs.
// This way, we can already fill the receive queue with WRs at the receive side
node->state = STATE_CONNECTED;
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
@ -827,8 +828,10 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
// Doesn't start, if wcs == 0
for (int j = 0; j < wcs; j++) {
if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) )
read_values--;
if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) {
// Drop all values, we don't know where the error occured
read_values = 0;
}
if (wc[j].status == IBV_WC_WR_FLUSH_ERR)
debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
@ -842,6 +845,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
int correction = (ib->conn.port_space == RDMA_PS_UDP) ? META_GRH_SIZE : META_SIZE;
smps[j] = (struct sample *) (wc[j].wr_id);
smps[j]->length = (wc[j].byte_len - correction) / sizeof(double);
smps[j]->ts.received = ts_receive;

View file

@ -67,8 +67,9 @@ int signal_generator_parse(struct node *n, json_t *cfg)
s->amplitude = 1;
s->stddev = 0.2;
s->offset = 0;
s->monitor_missed = 1;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: i, s?: i, s?: F, s?: F, s?: F, s?: F, s?: F }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: i, s?: i, s?: F, s?: F, s?: F, s?: F, s?: F, s?: b}",
"signal", &type,
"realtime", &s->rt,
"limit", &s->limit,
@ -77,7 +78,8 @@ int signal_generator_parse(struct node *n, json_t *cfg)
"frequency", &s->frequency,
"amplitude", &s->amplitude,
"stddev", &s->stddev,
"offset", &s->offset
"offset", &s->offset,
"monitor_missed", &s->monitor_missed
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
@ -114,10 +116,11 @@ int signal_generator_parse_cli(struct node *n, int argc, char *argv[])
s->values = 1;
s->limit = -1;
s->offset = 0;
s->monitor_missed = 1;
/* Parse optional command line arguments */
char c, *endptr;
while ((c = getopt(argc, argv, "v:r:f:l:a:D:no:")) != -1) {
while ((c = getopt(argc, argv, "v:r:f:l:a:D:no:m")) != -1) {
switch (c) {
case 'n':
s->rt = 0;
@ -143,6 +146,9 @@ int signal_generator_parse_cli(struct node *n, int argc, char *argv[])
case 'D':
s->stddev = strtof(optarg, &endptr);
goto check;
case 'm':
s->monitor_missed = 0;
goto check;
case '?':
break;
}
@ -204,7 +210,7 @@ int signal_generator_stop(struct node *n)
return ret;
}
if (s->missed_steps > 0)
if (s->missed_steps > 0 && s->monitor_missed)
warn("Node %s missed a total of %d steps.", node_name(n), s->missed_steps);
free(s->last);
@ -226,7 +232,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u
if (s->rt) {
/* Block until 1/p->rate seconds elapsed */
steps = task_wait(&s->task);
if (steps > 1) {
if (steps > 1 && s->monitor_missed) {
warn("Missed steps: %u", steps-1);
s->missed_steps += steps-1;
}