diff --git a/etc/infiniband.conf b/etc/infiniband.conf index cc861e4e6..1ed4830e4 100644 --- a/etc/infiniband.conf +++ b/etc/infiniband.conf @@ -33,7 +33,6 @@ nodes = { vectorize = 1, - poll_mode = "BUSY", buffer_subtraction = 128, }, @@ -66,7 +65,6 @@ nodes = { vectorize = 1, - poll_mode = "BUSY", buffer_subtraction = 128, hooks = ( diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index cb7c92555..969382a70 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -36,17 +36,11 @@ #include /* Constants */ -#define META_SIZE 32 +#define META_SIZE 24 #define GRH_SIZE 40 #define META_GRH_SIZE META_SIZE + GRH_SIZE #define CHK_PER_ITER 2048 -/* Enums */ -enum poll_mode_e { - EVENT, - BUSY -}; - struct infiniband { /* IBV/RDMA CM structs */ struct context_s { @@ -111,8 +105,6 @@ struct infiniband { } conn; /* Misc settings */ - enum poll_mode_e poll_mode; - int is_source; }; diff --git a/include/villas/path.h b/include/villas/path.h index 39a6d8227..f07fba1b9 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -99,6 +99,7 @@ struct path { int reverse; /**< This path as a matching reverse path. */ int builtin; /**< This path should use built-in hooks by default. */ int queuelen; /**< The queue length for each path_destination::queue */ + int original_sequence_no; /**< Use original source sequence number when multiplexing */ char *_name; /**< Singleton: A string which is used to print this path to screen. */ diff --git a/include/villas/sample.h b/include/villas/sample.h index 0fc8f9b38..0bba02137 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -44,6 +44,9 @@ struct pool; /** The length of a sample data portion of a sample datastructure with \p values values in bytes. */ #define SAMPLE_DATA_LENGTH(len) ((len) * sizeof(double)) +/** The number of values in a sample datastructure. */ +#define SAMPLE_NUMBER_OF_VALUES(len) ((len) / sizeof(double)) + /** The offset to the beginning of the data section. */ #define SAMPLE_DATA_OFFSET(smp) ((char *) (smp) + offsetof(struct sample, data)) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 62b2d7d68..b97d411fd 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -185,7 +185,6 @@ int ib_parse(struct node *n, json_t *cfg) char *local = NULL; char *remote = NULL; const char *transport_mode = "RC"; - const char *poll_mode = "BUSY"; int timeout = 1000; int recv_cq_size = 128; int send_cq_size = 128; @@ -213,9 +212,8 @@ int ib_parse(struct node *n, json_t *cfg) if (json_in) { - ret = json_unpack_ex(json_in, &err, 0, "{ s?: s, s?: s, s?: i, s?: i, s?: i, s?: i}", + ret = json_unpack_ex(json_in, &err, 0, "{ s?: s, s?: i, s?: i, s?: i, s?: i}", "address", &local, - "poll_mode", &poll_mode, "cq_size", &recv_cq_size, "max_wrs", &max_recv_wr, "vectorize", &vectorize_in, @@ -303,17 +301,6 @@ int ib_parse(struct node *n, json_t *cfg) debug(LOG_IB | 4, "Set timeout to %i in node %s", timeout, node_name(n)); - // Translate poll mode - if (strcmp(poll_mode, "EVENT") == 0) - ib->poll_mode = EVENT; - else if (strcmp(poll_mode, "BUSY") == 0) - ib->poll_mode = BUSY; - else - error("Failed to translate poll_mode in node %s. %s is not a valid \ - poll mode!", node_name(n), poll_mode); - - debug(LOG_IB | 4, "Set poll mode to %s in node %s", poll_mode, node_name(n)); - // Set completion queue size ib->recv_cq_size = recv_cq_size; ib->send_cq_size = send_cq_size; @@ -852,7 +839,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); - // Doesn't start, if wcs == 0 + // Doesn't start if wcs == 0 for (int j = 0; j < wcs; j++) { if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) { // Drop all values, we don't know where the error occured @@ -872,11 +859,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea smps[j] = (struct sample *) (wc[j].wr_id); - smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); - + smps[j]->length = SAMPLE_NUMBER_OF_VALUES(wc[j].byte_len - correction); smps[j]->ts.received = ts_receive; smps[j]->flags = (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED | SAMPLE_HAS_SEQUENCE); } + } return read_values; } @@ -922,7 +909,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele // Actual Payload sge[sent][j].addr = (uint64_t) &smps[sent]->data; - sge[sent][j].length = smps[sent]->length*sizeof(double); + sge[sent][j].length = SAMPLE_DATA_LENGTH(smps[sent]->length); sge[sent][j].lkey = mr->lkey; j++; diff --git a/lib/path.c b/lib/path.c index b76fefeff..e083db342 100644 --- a/lib/path.c +++ b/lib/path.c @@ -116,7 +116,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) ? sample_clone(p->last_sample) : sample_clone(muxed_smps[i-1]); - muxed_smps[i]->sequence = p->last_sequence++; + muxed_smps[i]->sequence = p->original_sequence_no ? tomux_smps[i]->sequence : p->last_sequence++; muxed_smps[i]->ts = tomux_smps[i]->ts; mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); @@ -334,6 +334,7 @@ int path_init(struct path *p) p->enabled = 1; p->poll = -1; p->queuelen = DEFAULT_QUEUE_LENGTH; + p->original_sequence_no = 0; p->state = STATE_INITIALIZED; @@ -492,7 +493,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) list_init(&sources); list_init(&destinations); - ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: b, s?: F, s?: o }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: b, s?: F, s?: o, s?: b}", "in", &json_in, "out", &json_out, "hooks", &json_hooks, @@ -503,7 +504,9 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) "mode", &mode, "poll", &p->poll, "rate", &p->rate, - "mask", &json_mask + "mask", &json_mask, + "original_sequence_no", &p->original_sequence_no + ); if (ret) jerror(&err, "Failed to parse path configuration"); @@ -695,7 +698,7 @@ int path_start(struct path *p) mask = bitset_dump(&p->mask); - info("Starting path %s: #signals=%zu, mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", + info("Starting path %s: #signals=%zu, mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu, #original_sequence_no=%s", path_name(p), list_length(&p->signals), mode, @@ -707,7 +710,8 @@ int path_start(struct path *p) p->queuelen, list_length(&p->hooks), list_length(&p->sources), - list_length(&p->destinations) + list_length(&p->destinations), + p->original_sequence_no ? "yes" : "no" ); free(mask);