From 117913287d54f53dfa87b621fa2a2aa6fbc5c617 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 4 Aug 2018 15:20:21 +0200 Subject: [PATCH 1/5] Added configurable setvbuf to output of file node --- include/villas/nodes/file.h | 1 + lib/nodes/file.c | 13 +++++++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index d8e24e5e1..3c7fa4684 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -50,6 +50,7 @@ struct file { int flush; /**< Flush / upload file contents after each write. */ struct task task; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ double rate; /**< The read rate. */ + size_t buffer_size; /**< Defines size of stream buffer. No buffer is created if value is set to zero. */ enum epoch_mode { FILE_EPOCH_DIRECT, diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 96bf81fcf..8bc926fcf 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -89,8 +89,9 @@ int file_parse(struct node *n, json_t *cfg) f->eof = FILE_EOF_EXIT; f->epoch_mode = FILE_EPOCH_DIRECT; f->flush = 0; + f->buffer_size = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b, s?: i } }", "uri", &uri_tmpl, "format", &format, "in", @@ -99,7 +100,8 @@ int file_parse(struct node *n, json_t *cfg) "epoch_mode", &epoch_mode, "epoch", &epoch_flt, "out", - "flush", &f->flush + "flush", &f->flush, + "buffer_size", &f->buffer_size ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -219,6 +221,13 @@ int file_start(struct node *n) if (ret) return ret; + if (f->buffer_size) { + ret = setvbuf(f->io.output.stream.std, NULL, _IOFBF, f->buffer_size); + if(ret) + return ret; + } + + /* Create timer */ ret = task_init(&f->task, f->rate, CLOCK_REALTIME); if (ret) From 69c8f0adaab75e715510bffb51b58e7ec498c1e1 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 4 Aug 2018 17:27:06 +0200 Subject: [PATCH 2/5] Added flag to enable/disable warning about missed steps --- include/villas/nodes/signal_generator.h | 1 + lib/nodes/signal_generator.c | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/include/villas/nodes/signal_generator.h b/include/villas/nodes/signal_generator.h index ae227fdc0..83e828dd5 100644 --- a/include/villas/nodes/signal_generator.h +++ b/include/villas/nodes/signal_generator.h @@ -65,6 +65,7 @@ struct signal_generator { double amplitude; /**< Amplitude of the generated signals. */ double stddev; /**< Standard deviation of random signals (normal distributed). */ double offset; /**< A constant bias. */ + int monitor_missed; /**< Boolean, if set, node counts missed steps and warns user. */ double *last; /**< The values from the previous period which are required for random walk. */ diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index 8d733fdc8..0256fd8c7 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -67,8 +67,9 @@ int signal_generator_parse(struct node *n, json_t *cfg) s->amplitude = 1; s->stddev = 0.2; s->offset = 0; + s->monitor_missed = 1; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: i, s?: i, s?: F, s?: F, s?: F, s?: F, s?: F }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: i, s?: i, s?: F, s?: F, s?: F, s?: F, s?: F, s?: b}", "signal", &type, "realtime", &s->rt, "limit", &s->limit, @@ -77,7 +78,8 @@ int signal_generator_parse(struct node *n, json_t *cfg) "frequency", &s->frequency, "amplitude", &s->amplitude, "stddev", &s->stddev, - "offset", &s->offset + "offset", &s->offset, + "monitor_missed", &s->monitor_missed ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -114,10 +116,11 @@ int signal_generator_parse_cli(struct node *n, int argc, char *argv[]) s->values = 1; s->limit = -1; s->offset = 0; + s->monitor_missed = 1; /* Parse optional command line arguments */ char c, *endptr; - while ((c = getopt(argc, argv, "v:r:f:l:a:D:no:")) != -1) { + while ((c = getopt(argc, argv, "v:r:f:l:a:D:no:m")) != -1) { switch (c) { case 'n': s->rt = 0; @@ -143,6 +146,9 @@ int signal_generator_parse_cli(struct node *n, int argc, char *argv[]) case 'D': s->stddev = strtof(optarg, &endptr); goto check; + case 'm': + s->monitor_missed = 0; + goto check; case '?': break; } @@ -204,7 +210,7 @@ int signal_generator_stop(struct node *n) return ret; } - if (s->missed_steps > 0) + if (s->missed_steps > 0 && s->monitor_missed) warn("Node %s missed a total of %d steps.", node_name(n), s->missed_steps); free(s->last); @@ -226,7 +232,7 @@ int signal_generator_read(struct node *n, struct sample *smps[], unsigned cnt, u if (s->rt) { /* Block until 1/p->rate seconds elapsed */ steps = task_wait(&s->task); - if (steps > 1) { + if (steps > 1 && s->monitor_missed) { warn("Missed steps: %u", steps-1); s->missed_steps += steps-1; } From 71134a4c81ab16fccf669f73b69766fd18331393 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 4 Aug 2018 17:34:52 +0200 Subject: [PATCH 3/5] Node now already posts Work Receives if it accepts the connections. Before, it waited until it is really connected. That caused problems, because the send side will start immediately sending if it is connected. Especially at high rates (>100k) this was a problem. --- lib/nodes/infiniband.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 64b6c8985..57fddd2bf 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -453,6 +453,7 @@ static void ib_continue_as_listen(struct node *n, struct rdma_cm_event *event) "continue as listening node in such cases, set use_fallback = true in the configuration", node_name(n)); + n->state = STATE_STARTED; // Acknowledge event rdma_ack_cm_event(event); @@ -533,10 +534,10 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_CONNECT_REQUEST: ret = ib_connect_request(n, event->id); - //ToDo: Think about this. In this context, we say that the QP is initialized - //and at least one other node send data - if (ib->conn.port_space == RDMA_PS_UDP) - node->state = STATE_CONNECTED; + // Set state to connected before RDMA_CM_EVENT_ESTABLISHED actually occurs. + // This way, we can already fill the receive queue with WRs at the receive side + node->state = STATE_CONNECTED; + break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -827,8 +828,10 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // Doesn't start, if wcs == 0 for (int j = 0; j < wcs; j++) { - if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) - read_values--; + if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) { + // Drop all values, we don't know where the error occured + read_values = 0; + } if (wc[j].status == IBV_WC_WR_FLUSH_ERR) debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); @@ -842,6 +845,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea int correction = (ib->conn.port_space == RDMA_PS_UDP) ? META_GRH_SIZE : META_SIZE; smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); smps[j]->ts.received = ts_receive; From 2a1dc60d16dc6556266b0885b968e3b9a26d7294 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sun, 5 Aug 2018 11:07:45 +0200 Subject: [PATCH 4/5] Changed configuration section of --- lib/nodes/file.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 8bc926fcf..313b07e0d 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -91,17 +91,17 @@ int file_parse(struct node *n, json_t *cfg) f->flush = 0; f->buffer_size = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b, s?: i } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: i, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b } }", "uri", &uri_tmpl, "format", &format, + "buffer_size", &f->buffer_size, "in", "eof", &eof, "rate", &f->rate, "epoch_mode", &epoch_mode, "epoch", &epoch_flt, "out", - "flush", &f->flush, - "buffer_size", &f->buffer_size + "flush", &f->flush ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); From 0c1e92bced582df7dd885ff0a437f5b21eb6d5b2 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Mon, 6 Aug 2018 23:48:13 +0200 Subject: [PATCH 5/5] Added separate buffer for input and output section --- include/villas/nodes/file.h | 3 ++- lib/nodes/file.c | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 3c7fa4684..6598dc1bb 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -50,7 +50,8 @@ struct file { int flush; /**< Flush / upload file contents after each write. */ struct task task; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ double rate; /**< The read rate. */ - size_t buffer_size; /**< Defines size of stream buffer. No buffer is created if value is set to zero. */ + size_t buffer_size_out; /**< Defines size of output stream buffer. No buffer is created if value is set to zero. */ + size_t buffer_size_in; /**< Defines size of input stream buffer. No buffer is created if value is set to zero. */ enum epoch_mode { FILE_EPOCH_DIRECT, diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 313b07e0d..d45e9106b 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -89,19 +89,21 @@ int file_parse(struct node *n, json_t *cfg) f->eof = FILE_EOF_EXIT; f->epoch_mode = FILE_EPOCH_DIRECT; f->flush = 0; - f->buffer_size = 0; + f->buffer_size_in = 0; + f->buffer_size_out = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: i, s?: { s?: s, s?: F, s?: s, s?: F }, s?: { s?: b } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: { s?: s, s?: F, s?: s, s?: F, s?: i }, s?: { s?: b, s?: i } }", "uri", &uri_tmpl, "format", &format, - "buffer_size", &f->buffer_size, "in", "eof", &eof, "rate", &f->rate, "epoch_mode", &epoch_mode, "epoch", &epoch_flt, + "buffer_size", &f->buffer_size_in, "out", - "flush", &f->flush + "flush", &f->flush, + "buffer_size", &f->buffer_size_out ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); @@ -221,12 +223,17 @@ int file_start(struct node *n) if (ret) return ret; - if (f->buffer_size) { - ret = setvbuf(f->io.output.stream.std, NULL, _IOFBF, f->buffer_size); + if (f->buffer_size_in) { + ret = setvbuf(f->io.input.stream.std, NULL, _IOFBF, f->buffer_size_in); if(ret) return ret; } + if (f->buffer_size_out) { + ret = setvbuf(f->io.output.stream.std, NULL, _IOFBF, f->buffer_size_out); + if(ret) + return ret; + } /* Create timer */ ret = task_init(&f->task, f->rate, CLOCK_REALTIME);