diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index d8e24e5e1..6598dc1bb 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -50,6 +50,8 @@ struct file { int flush; /**< Flush / upload file contents after each write. */ struct task task; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ double rate; /**< The read rate. */ + size_t buffer_size_out; /**< Defines size of output stream buffer. No buffer is created if value is set to zero. */ + size_t buffer_size_in; /**< Defines size of input stream buffer. No buffer is created if value is set to zero. */ enum epoch_mode { FILE_EPOCH_DIRECT, diff --git a/include/villas/nodes/signal_generator.h b/include/villas/nodes/signal_generator.h index ae227fdc0..83e828dd5 100644 --- a/include/villas/nodes/signal_generator.h +++ b/include/villas/nodes/signal_generator.h @@ -65,6 +65,7 @@ struct signal_generator { double amplitude; /**< Amplitude of the generated signals. */ double stddev; /**< Standard deviation of random signals (normal distributed). */ double offset; /**< A constant bias. */ + int monitor_missed; /**< Boolean, if set, node counts missed steps and warns user. */ double *last; /**< The values from the previous period which are required for random walk. */ diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 96bf81fcf..d45e9106b 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -89,8 +89,10 @@ int file_parse(struct node *n, json_t *cfg) f->eof = FILE_EOF_EXIT; f->epoch_mode = FILE_EPOCH_DIRECT; f->flush = 0; + f->buffer_size_in = 0; + f->buffer_size_out = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F, s?: i }, s?: { s?: b, s?: i } }", "uri", &uri_tmpl, "format", &format, "in", @@ -98,8 +100,10 @@ int file_parse(struct node *n, json_t *cfg) "rate", &f->rate, "epoch_mode", &epoch_mode, "epoch", &epoch_flt, + "buffer_size", &f->buffer_size_in, "out", - "flush", &f->flush + "flush", &f->flush, + "buffer_size", &f->buffer_size_out ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -219,6 +223,18 @@ int file_start(struct node *n) if (ret) return ret; + if (f->buffer_size_in) { + ret = setvbuf(f->io.input.stream.std, NULL, _IOFBF, f->buffer_size_in); + if(ret) + return ret; + } + + if (f->buffer_size_out) { + ret = setvbuf(f->io.output.stream.std, NULL, _IOFBF, f->buffer_size_out); + if(ret) + return ret; + } + /* Create timer */ ret = task_init(&f->task, f->rate, CLOCK_REALTIME); if (ret) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 64b6c8985..57fddd2bf 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -453,6 +453,7 @@ static void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) "continue as listening node in such cases, set use_fallback = true in the configuration", node_name(n)); + n->state = STATE_STARTED; // Acknowledge event rdma_ack_cm_event(event); @@ -533,10 +534,10 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_REQUEST: ret = ib_connect_request(n, event->id); - //ToDo: Think about this. In this context, we say that the QP is initialized - //and at least one other node send data - if (ib->conn.port_space == RDMA_PS_UDP) - node->state = STATE_CONNECTED; + // Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs. + // This way, we can already fill the receive queue with WRs at the receive side + node->state = STATE_CONNECTED; + break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -827,8 +828,10 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // Doesn't start, if wcs == 0 for (int j = 0; j < wcs; j++) { - if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) - read_values--; + if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) { + // Drop all values, we don't know where the error occured + read_values = 0; + } if (wc[j].status == IBV_WC_WR_FLUSH_ERR) debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); @@ -842,6 +845,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea int correction = (ib->conn.port_space == RDMA_PS_UDP) ? META_GRH_SIZE : META_SIZE; smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); smps[j]->ts.received = ts_receive; diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 8d733fdc8..0256fd8c7 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -67,8 +67,9 @@ int signal_generator_parse(struct node *n, json_t *cfg) s->amplitude = 1; s->stddev = 0.2; s->offset = 0; + s->monitor_missed = 1; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: i, s?: i, s?: F, s?: F, s?: F, s?: F, s?: F }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: i, s?: i, s?: F, s?: F, s?: F, s?: F, s?: F, s?: b}", "signal", &type, "realtime", &s->rt, "limit", &s->limit, @@ -77,7 +78,8 @@ int signal_generator_parse(struct node *n, json_t *cfg) "frequency", &s->frequency, "amplitude", &s->amplitude, "stddev", &s->stddev, - "offset", &s->offset + "offset", &s->offset, + "monitor_missed", &s->monitor_missed ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -114,10 +116,11 @@ int signal_generator_parse_cli(struct node *n, int argc, char *argv[]) s->values = 1; s->limit = -1; s->offset = 0; + s->monitor_missed = 1; /* Parse optional command line arguments */ char c, *endptr; - while ((c = getopt(argc, argv, "v:r:f:l:a:D:no:")) != -1) { + while ((c = getopt(argc, argv, "v:r:f:l:a:D:no:m")) != -1) { switch (c) { case 'n': s->rt = 0; @@ -143,6 +146,9 @@ int signal_generator_parse_cli(struct node *n, int argc, char *argv[]) case 'D': s->stddev = strtof(optarg, &endptr); goto check; + case 'm': + s->monitor_missed = 0; + goto check; case '?': break; } @@ -204,7 +210,7 @@ int signal_generator_stop(struct node *n) return ret; } - if (s->missed_steps > 0) + if (s->missed_steps > 0 && s->monitor_missed) warn("Node %s missed a total of %d steps.", node_name(n), s->missed_steps); free(s->last); @@ -226,7 +232,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u if (s->rt) { /* Block until 1/p->rate seconds elapsed */ steps = task_wait(&s->task); - if (steps > 1) { + if (steps > 1 && s->monitor_missed) { warn("Missed steps: %u", steps-1); s->missed_steps += steps-1; }