diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d0ef8281f..449be5384 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -101,7 +101,7 @@ integration: website: stage: deploy script: - - rsync web/ $DEPLOY_PATH/ + - rsync -r web/ $DEPLOY_PATH/ only: - develop tags: diff --git a/Dockerfile b/Dockerfile index 408766244..54ec74dc8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -82,7 +82,7 @@ COPY thirdparty/libwebsockets /tmp/libwebsockets RUN mkdir -p /tmp/libwebsockets/build && cd /tmp/libwebsockets/build && cmake .. && make install # Cleanup intermediate files from builds -RUN rm -rf /tmp +RUN rm -rf /tmp/* WORKDIR /villas diff --git a/config.h b/config.h index 2ca343bbe..087f7b3f9 100644 --- a/config.h +++ b/config.h @@ -24,9 +24,6 @@ #define DEFAULT_VALUES 64 #define DEFAULT_QUEUELEN 1024 -/** Whether or not to send / receive timestamp & sequence number as first values of payload */ -#define GTNET_SKT_HEADER 1 - /** Width of log output in characters */ #define LOG_WIDTH 132 diff --git a/etc/example.conf b/etc/example.conf index 90ce35282..7f5ce6753 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -26,16 +26,24 @@ nodes = { ### The following settings are specific to the socket node-type!! ### - layer = "udp" # Layer can be one of: - # udp Send / recv UDP packets - # ip Send / recv IP packets - # eth Send / recv raw Ethernet frames (IEEE802.3) + layer = "udp", # Layer can be one of: + # udp Send / receive UDP packets + # ip Send / receive IP packets + # eth Send / receive raw Ethernet frames (IEEE802.3) + + + header = "gtnet-skt:fake", # Header can be one of: + # default | villas Use VILLASnode protocol (see struct msg) (default) + # none | gtnet-skt Use no header, send raw data as used by RTDS GTNETv2-SKT + # fake | gtnet-skt:fake Same as 'none', but use first three data values as + # sequence, seconds & nanoseconds timestamp + # In this mode values are uint32_t not floats! local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair remote = "127.0.0.1:12000" # This node sents outgoing messages to this IP:Port pair - combine = 30 # Receive and sent 30 samples per message (multiplexing). + vectorize = 30 # Receive and sent 30 samples per message (combining). }, ethernet_node = { type = "socket", # See above. @@ -141,7 +149,7 @@ paths = ( hook = "print", # Register custom hook funktion (see src/hooks.c) poolsize = 30 # The amount of samples which are kept in a circular buffer. - # This number must be larger than the 'combine' settings of all + # This number must be larger than the 'vectorize' settings of all # associated input and output nodes! }, { diff --git a/etc/gtnet_test1.conf b/etc/gtnet-skt/test1.conf similarity index 100% rename from etc/gtnet_test1.conf rename to etc/gtnet-skt/test1.conf diff --git a/etc/gtnet_test2.conf b/etc/gtnet-skt/test2.conf similarity index 100% rename from etc/gtnet_test2.conf rename to etc/gtnet-skt/test2.conf diff --git a/etc/gtnet_test3.conf b/etc/gtnet-skt/test3.conf similarity index 100% rename from etc/gtnet_test3.conf rename to etc/gtnet-skt/test3.conf diff --git a/etc/gtnet_test4.conf b/etc/gtnet-skt/test4.conf similarity index 94% rename from etc/gtnet_test4.conf rename to etc/gtnet-skt/test4.conf index b80cbd41d..4a56ee34d 100644 --- a/etc/gtnet_test4.conf +++ b/etc/gtnet-skt/test4.conf @@ -32,8 +32,8 @@ nodes = { node1 = { type = "socket", layer = "udp", - local = "192.168.88.128:12002", # Local ip:port, use '*' for random port - remote = "192.168.88.129:12001", + local = "134.130.169.31:12002", # Local ip:port, use '*' for random port + remote = "134.130.169.98:12001", header = "gtnet-skt", # 'gtnet-skt' or 'villas'. If not provided, 'villas' header will be used vectorize = 1, # Number of samples to fetch per iteration from the socket netem = { diff --git a/include/villas/hooks.h b/include/villas/hooks.h index 9eae7bb6b..6f9b76427 100644 --- a/include/villas/hooks.h +++ b/include/villas/hooks.h @@ -22,6 +22,7 @@ #define _HOOKS_H_ #include +#include #include "queue.h" #include "list.h" diff --git a/include/villas/log.h b/include/villas/log.h index b94ccb891..c57d686bf 100644 --- a/include/villas/log.h +++ b/include/villas/log.h @@ -38,6 +38,7 @@ enum debug_facilities { DBG_CONFIG = (1 << 10), DBG_HOOK = (1 << 11), DBG_PATH = (1 << 12), + DBG_MEM = (1 << 13), /* Node-types */ DBG_SOCKET = (1 << 16), diff --git a/include/villas/node.h b/include/villas/node.h index e01863444..c24841832 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -47,6 +47,8 @@ struct node int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ int affinity; /**< CPU Affinity of this node */ + + unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ enum node_state { NODE_INVALID, /**< This node object is not in a valid state. */ diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 64c1dd82e..4a0a6c337 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -29,8 +29,9 @@ enum socket_layer { }; enum socket_header { - SOCKET_HEADER_DEFAULT, /**> Default header in the payload, (see msg_format.h) */ - SOCKET_HEADER_GTNET_SKT /**> No header in the payload, same as HDR_NONE*/ + SOCKET_HEADER_DEFAULT, /**> Default header in the payload, (see msg_format.h) */ + SOCKET_HEADER_NONE, /**> No header in the payload, same as HDR_NONE*/ + SOCKET_HEADER_FAKE /**> Same as SOCKET_HEADER_NONE but using the first three data values as: sequence, seconds & nano-seconds. */ }; union sockaddr_union { @@ -41,29 +42,19 @@ union sockaddr_union { }; struct socket { - /** The socket descriptor */ - int sd; - /** Socket mark for netem, routing and filtering */ - int mark; + int sd; /**> The socket descriptor */ + int mark; /**> Socket mark for netem, routing and filtering */ - /** The OSI / IP layer which should be used for this socket */ - enum socket_layer layer; + enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */ + enum socket_header header; /**> Payload header type */ - /** Payload header type */ - enum socket_header header; + union sockaddr_union local; /**> Local address of the socket */ + union sockaddr_union remote; /**> Remote address of the socket */ - /** Local address of the socket */ - union sockaddr_union local; - /** Remote address of the socket */ - union sockaddr_union remote; + struct rtnl_qdisc *tc_qdisc; /**> libnl3: Network emulator queuing discipline */ + struct rtnl_cls *tc_classifier; /**> libnl3: Firewall mark classifier */ - /** libnl3: Network emulator queuing discipline */ - struct rtnl_qdisc *tc_qdisc; - /** libnl3: Firewall mark classifier */ - struct rtnl_cls *tc_classifier; - - /* Linked list _per_interface_ */ - struct socket *next; + struct socket *next; /* Linked list _per_interface_ */ }; diff --git a/include/villas/path.h b/include/villas/path.h index 801acf51d..fb01e5d23 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -42,7 +42,7 @@ struct path struct node *in; /**< Pointer to the incoming node */ - struct mpmc_queue queue; /**< A ring buffer for all received messages (unmodified) */ + struct queue queue; /**< A ring buffer for all received messages (unmodified) */ struct pool pool; /**< Memory pool for messages / samples. */ struct list destinations; /**< List of all outgoing nodes */ diff --git a/include/villas/pool.h b/include/villas/pool.h index c6ffcc466..49f2f1c9a 100644 --- a/include/villas/pool.h +++ b/include/villas/pool.h @@ -15,8 +15,7 @@ #include #include "queue.h" - -struct memtype; +#include "memory.h" /** A thread-safe memory pool */ struct pool { @@ -28,13 +27,13 @@ struct pool { size_t blocksz; /**< Length of a block in bytes */ size_t alignment; /**< Alignment of a block in bytes */ - struct mpmc_queue queue; /**< The queue which is used to keep track of free blocks */ + struct queue queue; /**< The queue which is used to keep track of free blocks */ }; #define INLINE static inline __attribute__((unused)) /** Initiazlize a pool */ -int pool_init(struct pool *p, size_t blocksz, size_t alignment, const struct memtype *mem); +int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *mem); /** Destroy and release memory used by pool. */ int pool_destroy(struct pool *p); @@ -42,26 +41,26 @@ int pool_destroy(struct pool *p); /** Pop cnt values from the stack an place them in the array blocks */ INLINE ssize_t pool_get_many(struct pool *p, void *blocks[], size_t cnt) { - return mpmc_queue_pull_many(&p->queue, blocks, cnt); + return queue_pull_many(&p->queue, blocks, cnt); } /** Push cnt values which are giving by the array values to the stack. */ INLINE ssize_t pool_put_many(struct pool *p, void *blocks[], size_t cnt) { - return mpmc_queue_push_many(&p->queue, blocks, cnt); + return queue_push_many(&p->queue, blocks, cnt); } /** Get a free memory block from pool. */ INLINE void * pool_get(struct pool *p) { void *ptr; - return mpmc_queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL; + return queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL; } /** Release a memory block back to the pool. */ INLINE int pool_put(struct pool *p, void *buf) { - return mpmc_queue_push(&p->queue, buf); + return queue_push(&p->queue, buf); } #endif /* _POOL_H_ */ \ No newline at end of file diff --git a/include/villas/queue.h b/include/villas/queue.h index bf2a6cac0..40baf1f17 100644 --- a/include/villas/queue.h +++ b/include/villas/queue.h @@ -42,12 +42,12 @@ #define CACHELINE_SIZE 64 typedef char cacheline_pad_t[CACHELINE_SIZE]; -struct mpmc_queue { +struct queue { cacheline_pad_t _pad0; /**< Shared area: all threads read */ struct memtype const * mem; size_t buffer_mask; - struct mpmc_queue_cell { + struct queue_cell { atomic_size_t sequence; void *data; } *buffer; @@ -64,24 +64,24 @@ struct mpmc_queue { }; /** Initialize MPMC queue */ -int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem); +int queue_init(struct queue *q, size_t size, const struct memtype *mem); /** Desroy MPMC queue and release memory */ -int mpmc_queue_destroy(struct mpmc_queue *q); +int queue_destroy(struct queue *q); /** Return estimation of current queue usage. * * Note: This is only an estimation and not accurate as long other * threads are performing operations. */ -size_t mpmc_queue_available(struct mpmc_queue *q); +size_t queue_available(struct queue *q); -int mpmc_queue_push(struct mpmc_queue *q, void *ptr); +int queue_push(struct queue *q, void *ptr); -int mpmc_queue_pull(struct mpmc_queue *q, void **ptr); +int queue_pull(struct queue *q, void **ptr); -int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt); +int queue_push_many(struct queue *q, void *ptr[], size_t cnt); -int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt); +int queue_pull_many(struct queue *q, void *ptr[], size_t cnt); #endif /* _MPMC_QUEUE_H_ */ \ No newline at end of file diff --git a/include/villas/utils.h b/include/villas/utils.h index 6664b2232..82b7cabb7 100644 --- a/include/villas/utils.h +++ b/include/villas/utils.h @@ -204,11 +204,11 @@ void printb(void *mem, size_t len); void printdw(void *mem, size_t len); /** Get CPU timestep counter */ -__attribute__((always_inline)) static inline uint64_t rdtscp() +__attribute__((always_inline)) static inline uint64_t rdtsc() { uint64_t tsc; - __asm__ ("rdtscp;" + __asm__ ("rdtsc;" "shl $32, %%rdx;" "or %%rdx,%%rax" : "=a" (tsc) diff --git a/lib/memory.c b/lib/memory.c index e6501bf01..df4eccd7e 100644 --- a/lib/memory.c +++ b/lib/memory.c @@ -19,23 +19,20 @@ void * memory_alloc(const struct memtype *m, size_t len) { + debug(DBG_MEM | 2, "Allocating %zu byte of %s memory", len, m->name); return m->alloc(len); } void * memory_alloc_aligned(const struct memtype *m, size_t len, size_t alignment) { - warn("memory_alloc_aligned: not implemented yet!"); - return memory_alloc(m, len); -} - -void * memory_aligned_alloc(const struct memtype *m, size_t len, size_t align) -{ - warn("memory_aligned_alloc: not implemented yet. Falling back to unaligned version."); + debug(DBG_MEM | 2, "Allocating %zu byte of %zu-byte-aligned %s memory", len, alignment, m->name); + warn("%s: not implemented yet!", __FUNCTION__); return memory_alloc(m, len); } int memory_free(const struct memtype *m, void *ptr, size_t len) { + debug(DBG_MEM | 2, "Releasing %zu bytes of %s memory", len, m->name); return m->free(ptr, len); } @@ -85,7 +82,7 @@ const struct memtype memtype_hugepage = { .flags = MEMORY_MMAP | MEMORY_HUGEPAGE, .alloc = memory_hugepage_alloc, .free = memory_hugepage_free, - .alignment = 1 << 21 /* 2 MiB hugepage */ + .alignment = 21 /* 2 MiB hugepage */ }; /** @todo */ @@ -93,5 +90,5 @@ const struct memtype memtype_dma = { .name = "dma", .flags = MEMORY_DMA | MEMORY_MMAP, .alloc = NULL, .free = NULL, - .alignment = 1 << 12 + .alignment = 12 }; diff --git a/lib/node.c b/lib/node.c index 7baa1a407..b4aa9dc02 100644 --- a/lib/node.c +++ b/lib/node.c @@ -113,6 +113,8 @@ int node_start(struct node *n) if (ret == 0) n->state = NODE_RUNNING; + n->sequence = 0; + return ret; } diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index f6c3e3f12..d28644836 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -103,8 +103,9 @@ char * socket_print(struct node *n) } switch (s->header) { - case SOCKET_HEADER_GTNET_SKT: header = "gtnet-skt"; break; - case SOCKET_HEADER_DEFAULT: header = "villas"; break; + case SOCKET_HEADER_NONE: header = "none"; break; + case SOCKET_HEADER_FAKE: header = "fake"; break; + case SOCKET_HEADER_DEFAULT: header = "default"; break; } char *local = socket_print_addr((struct sockaddr *) &s->local); @@ -214,34 +215,29 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) int samples, ret, received, length; ssize_t bytes; - if (s->header == SOCKET_HEADER_GTNET_SKT) { + if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) { if (cnt < 1) return 0; /* The GTNETv2-SKT protocol send every sample in a single packet. * socket_read() receives a single packet. */ + int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; + struct iovec iov[iov_len]; struct sample *smp = smps[0]; - -#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER + uint32_t header[3]; - - struct iovec iov[] = { - { /* First three values are sequence, seconds and nano-seconds */ - .iov_base = header, - .iov_len = sizeof(header) - }, -#else - struct iovec iov[] = { -#endif - { /* Remaining values are payload */ - .iov_base = &smp->data, - .iov_len = SAMPLE_DATA_LEN(smp->capacity) - } - }; + if (s->header == SOCKET_HEADER_FAKE) { + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + } + + /* Remaining values are payload */ + iov[iov_len-1].iov_base = &smp->data; + iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity); struct msghdr mhdr = { .msg_iov = iov, - .msg_iovlen = ARRAY_LEN(iov), + .msg_iovlen = iov_len, .msg_name = (struct sockaddr *) &s->remote, .msg_namelen = sizeof(s->remote) }; @@ -258,28 +254,23 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) return -1; } -#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER - length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); -#else - length = bytes / SAMPLE_DATA_LEN(1); -#endif + length = (s->header == SOCKET_HEADER_FAKE ? bytes - sizeof(header) : bytes) / SAMPLE_DATA_LEN(1); if (length > smp->capacity) { warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); length = smp->capacity; } - /** @todo Should we generate sequence no here manually? - * Or maybe optinally use the first data value as a sequence? - * However this would require the RTDS model to be changed. */ -#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER - smp->sequence = header[0]; - smp->ts.origin.tv_sec = header[1]; - smp->ts.origin.tv_nsec = header[2]; -#else - smp->sequence = -1; - smp->ts.origin.tv_sec = -1; - smp->ts.origin.tv_nsec = -1; -#endif + if (s->header == SOCKET_HEADER_FAKE) { + smp->sequence = header[0]; + smp->ts.origin.tv_sec = header[1]; + smp->ts.origin.tv_nsec = header[2]; + } + else { + smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */ + smp->ts.origin.tv_sec = -1; + smp->ts.origin.tv_nsec = -1; + } + smp->ts.received.tv_sec = -1; smp->ts.received.tv_nsec = -1; @@ -376,35 +367,32 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int sent = 0; /* Construct iovecs */ - if (s->header == SOCKET_HEADER_GTNET_SKT) { + if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) { if (cnt < 1) return 0; - for (int i = 0; i < cnt; i++) { -#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER - uint32_t header[] = { - smps[i]->sequence, - smps[i]->ts.origin.tv_sec, - smps[i]->ts.origin.tv_nsec - }; + for (int i = 0; i < cnt; i++) { + int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; + struct iovec iov[iov_len]; - struct iovec iov[] = { - { /* First three values are sequence, seconds and nano-seconds */ - .iov_base = header, - .iov_len = sizeof(header) - }, -#else - struct iovec iov[] = { -#endif - { /* Remaining values are payload */ - .iov_base = &smps[i]->data, - .iov_len = SAMPLE_DATA_LEN(smps[i]->length) - } - }; + /* First three values are sequence, seconds and nano-seconds timestamps */ + uint32_t header[3]; + if (s->header == SOCKET_HEADER_FAKE) { + header[0] = smps[i]->sequence; + header[1] = smps[i]->ts.origin.tv_sec; + header[2] = smps[i]->ts.origin.tv_nsec; + + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + } + + /* Remaining values are payload */ + iov[iov_len-1].iov_base = &smps[i]->data; + iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); struct msghdr mhdr = { .msg_iov = iov, - .msg_iovlen = ARRAY_LEN(iov), + .msg_iovlen = iov_len, .msg_name = (struct sockaddr *) &s->remote, .msg_namelen = sizeof(s->remote) }; @@ -480,9 +468,11 @@ int socket_parse(struct node *n, config_setting_t *cfg) if (!config_setting_lookup_string(cfg, "header", &hdr)) s->header = SOCKET_HEADER_DEFAULT; else { - if (!strcmp(hdr, "gtnet-skt") || (!strcmp(hdr, "none"))) - s->header = SOCKET_HEADER_GTNET_SKT; - else if (!strcmp(hdr, "default") || !strcmp(hdr, "villas")) + if (!strcmp(hdr, "gtnet-skt") || (!strcmp(hdr, "none"))) + s->header = SOCKET_HEADER_NONE; + else if (!strcmp(hdr, "gtnet-skt:fake") || (!strcmp(hdr, "fake"))) + s->header = SOCKET_HEADER_FAKE; + else if (!strcmp(hdr, "villas") || !strcmp(hdr, "default")) s->header = SOCKET_HEADER_DEFAULT; else cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n)); diff --git a/lib/path.c b/lib/path.c index b783c816f..69667cbcc 100644 --- a/lib/path.c +++ b/lib/path.c @@ -26,7 +26,7 @@ static void path_write(struct path *p, bool resend) int sent, tosend, available, released; struct sample *smps[n->vectorize]; - available = mpmc_queue_pull_many(&p->queue, (void **) smps, cnt); + available = queue_pull_many(&p->queue, (void **) smps, cnt); if (available < cnt) warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); @@ -108,7 +108,7 @@ static void * path_run(void *arg) p->skipped += recv - enqueue; } - enqueued = mpmc_queue_push_many(&p->queue, (void **) smps, enqueue); + enqueued = queue_push_many(&p->queue, (void **) smps, enqueue); if (enqueue != enqueued) warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p)); @@ -219,7 +219,7 @@ int path_prepare(struct path *p) if (ret) error("Failed to allocate memory pool for path"); - ret = mpmc_queue_init(&p->queue, p->queuelen, &memtype_hugepage); + ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage); if (ret) error("Failed to initialize queue for path"); @@ -233,7 +233,7 @@ void path_destroy(struct path *p) list_destroy(&p->destinations, NULL, false); list_destroy(&p->hooks, NULL, true); - mpmc_queue_destroy(&p->queue); + queue_destroy(&p->queue); pool_destroy(&p->pool); free(p->_name); diff --git a/lib/pool.c b/lib/pool.c index 90317bd9e..955b8b77a 100644 --- a/lib/pool.c +++ b/lib/pool.c @@ -12,8 +12,10 @@ #include "memory.h" #include "kernel/kernel.h" -int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype *m) +int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *m) { + int ret; + /* Make sure that we use a block size that is aligned to the size of a cache line */ p->alignment = kernel_get_cacheline_size(); p->blocksz = blocksz * CEIL(blocksz, p->alignment); @@ -25,18 +27,20 @@ int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype * serror("Failed to allocate memory for memory pool"); else debug(DBG_POOL | 4, "Allocated %#zx bytes for memory pool", p->len); - - mpmc_queue_init(&p->queue, cnt, m); + + ret = queue_init(&p->queue, cnt, m); + if (ret) + return ret; for (int i = 0; i < cnt; i++) - mpmc_queue_push(&p->queue, (char *) p->buffer + i * p->blocksz); - + queue_push(&p->queue, (char *) p->buffer + i * p->blocksz); + return 0; } int pool_destroy(struct pool *p) { - mpmc_queue_destroy(&p->queue); + queue_destroy(&p->queue); return memory_free(p->mem, p->buffer, p->len); } \ No newline at end of file diff --git a/lib/queue.c b/lib/queue.c index 136a7d263..1d1b797bd 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -34,7 +34,7 @@ #include "queue.h" /** Initialize MPMC queue */ -int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem) +int queue_init(struct queue *q, size_t size, const struct memtype *mem) { /* Queue size must be 2 exponent */ if ((size < 2) || ((size & (size - 1)) != 0)) @@ -55,7 +55,7 @@ int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem return 0; } -int mpmc_queue_destroy(struct mpmc_queue *q) +int queue_destroy(struct queue *q) { return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0]))); } @@ -65,15 +65,15 @@ int mpmc_queue_destroy(struct mpmc_queue *q) * Note: This is only an estimation and not accurate as long other * threads are performing operations. */ -size_t mpmc_queue_available(struct mpmc_queue *q) +size_t queue_available(struct queue *q) { return atomic_load_explicit(&q->tail, memory_order_relaxed) - atomic_load_explicit(&q->head, memory_order_relaxed); } -int mpmc_queue_push(struct mpmc_queue *q, void *ptr) +int queue_push(struct queue *q, void *ptr) { - struct mpmc_queue_cell *cell; + struct queue_cell *cell; size_t pos, seq; intptr_t diff; @@ -99,9 +99,9 @@ int mpmc_queue_push(struct mpmc_queue *q, void *ptr) return 1; } -int mpmc_queue_pull(struct mpmc_queue *q, void **ptr) +int queue_pull(struct queue *q, void **ptr) { - struct mpmc_queue_cell *cell; + struct queue_cell *cell; size_t pos, seq; intptr_t diff; @@ -128,13 +128,13 @@ int mpmc_queue_pull(struct mpmc_queue *q, void **ptr) return 1; } -int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt) +int queue_push_many(struct queue *q, void *ptr[], size_t cnt) { int ret; size_t i; for (i = 0; i < cnt; i++) { - ret = mpmc_queue_push(q, ptr[i]); + ret = queue_push(q, ptr[i]); if (!ret) break; } @@ -142,13 +142,13 @@ int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt) return i; } -int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt) +int queue_pull_many(struct queue *q, void *ptr[], size_t cnt) { int ret; size_t i; for (i = 0; i < cnt; i++) { - ret = mpmc_queue_pull(q, &ptr[i]); + ret = queue_pull(q, &ptr[i]); if (!ret) break; } diff --git a/lib/utils.c b/lib/utils.c index 66512c3bd..eff72a954 100644 --- a/lib/utils.c +++ b/lib/utils.c @@ -370,12 +370,13 @@ void rdtsc_sleep(uint64_t nanosecs, uint64_t start) { uint64_t cycles; + /** @todo Replace the hard coded CPU clock frequency */ cycles = (double) nanosecs / (1e9 / 3392389000); if (start == 0) - start = rdtscp(); + start = rdtsc(); do { __asm__("nop"); - } while (rdtscp() - start < cycles); + } while (rdtsc() - start < cycles); } \ No newline at end of file diff --git a/src/fpga-bench-overruns.c b/src/fpga-bench-overruns.c index f161c7696..73346b5e9 100644 --- a/src/fpga-bench-overruns.c +++ b/src/fpga-bench-overruns.c @@ -115,9 +115,9 @@ int fpga_benchmark_overruns(struct fpga *f) for (int i = 0; i < runs + BENCH_WARMUP; i++) { dma_read(dm, mem.base_phys, 0x200); - start = rdtscp(); + start = rdtsc(); lapack_workload(p, A); - stop = rdtscp(); + stop = rdtsc(); dma_read_complete(dm, NULL, NULL); diff --git a/src/fpga-bench.c b/src/fpga-bench.c index 2ab2523a0..1de87f1f3 100644 --- a/src/fpga-bench.c +++ b/src/fpga-bench.c @@ -108,7 +108,7 @@ int fpga_benchmark_jitter(struct fpga *f) XTmrCtr_SetResetValue(xtmr, 0, period * FPGA_AXI_HZ); XTmrCtr_Start(xtmr, 0); - uint64_t end, start = rdtscp(); + uint64_t end, start = rdtsc(); for (int i = 0; i < runs; i++) { uint64_t cnt = intc_wait(f->intc, tmr->irq); if (cnt != 1) @@ -117,7 +117,7 @@ int fpga_benchmark_jitter(struct fpga *f) /* Ackowledge IRQ */ XTmrCtr_WriteReg((uintptr_t) f->map + tmr->baseaddr, 0, XTC_TCSR_OFFSET, XTmrCtr_ReadReg((uintptr_t) f->map + tmr->baseaddr, 0, XTC_TCSR_OFFSET)); - end = rdtscp(); + end = rdtsc(); hist[i] = end - start; start = end; } @@ -157,11 +157,11 @@ int fpga_benchmark_latency(struct fpga *f) error("Failed to enable interrupts"); for (int i = 0; i < runs; i++) { - start = rdtscp(); + start = rdtsc(); XIntc_Out32((uintptr_t) f->map + f->intc->baseaddr + XIN_ISR_OFFSET, 0x100); intc_wait(f->intc, 8); - end = rdtscp(); + end = rdtsc(); hist[i] = end - start; } @@ -239,7 +239,7 @@ int fpga_benchmark_datamover(struct fpga *f) uint64_t runs = BENCH_RUNS >> exp; for (int i = 0; i < runs + BENCH_WARMUP; i++) { - start = rdtscp(); + start = rdtsc(); #if BENCH_DM == 1 ssize_t ret; @@ -255,7 +255,7 @@ int fpga_benchmark_datamover(struct fpga *f) if (ret) error("DMA ping pong failed"); #endif - stop = rdtscp(); + stop = rdtsc(); if (memcmp(src.base_virt, dst.base_virt, len)) warn("Compare failed"); @@ -304,13 +304,13 @@ int fpga_benchmark_memcpy(struct fpga *f) uint64_t runs = (BENCH_RUNS << 2) >> exp; for (int i = 0; i < runs + BENCH_WARMUP; i++) { - start = rdtscp(); + start = rdtsc(); for (int j = 0; j < len / 4; j++) // mapi[j] = j; // write dummy += mapi[j]; // read - end = rdtscp(); + end = rdtsc(); if (i > BENCH_WARMUP) total += end - start; diff --git a/tests/Makefile.inc b/tests/Makefile.inc index b701f5772..facbef13f 100644 --- a/tests/Makefile.inc +++ b/tests/Makefile.inc @@ -3,7 +3,7 @@ TEST_OBJS = $(patsubst %.c,$(BUILDDIR)/%.o,$(TEST_SRCS)) TEST_CFLAGS = $(CFLAGS) TEST_LDFLAGS = $(LDFLAGS) -Wl,-rpath,'$$ORIGIN' -TEST_LDLIBS = $(LDLIBS) -lcriterion -lvillas +TEST_LDLIBS = $(LDLIBS) -lcriterion -lvillas -pthread tests: $(BUILDDIR)/testsuite diff --git a/tests/queue.c b/tests/queue.c index 1c02306ba..9d30a1b4f 100644 --- a/tests/queue.c +++ b/tests/queue.c @@ -1,25 +1,318 @@ -/** Unit tests for MPMC queue - * - * @author Steffen Vogel - * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC - * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. - *********************************************************************************/ +#include +#include +#include +#include +#include +#include #include +#include +#include +#include "utils.h" #include "queue.h" +#include "memory.h" -Test(queue, singleThreaded) +#define SIZE (1 << 10) + +static struct queue q; + +struct param { + int volatile start; + int thread_count; + int queue_size; + int iter_count; + int batch_size; + void * (*thread_func)(void *); + struct queue queue; + const struct memtype *memtype; +}; + +/** Get thread id as integer + * In contrast to pthread_t which is an opaque type */ +#ifdef __linux__ + #include +#endif + +uint64_t thread_get_id() { -/* struct queue q; +#ifdef __MACH__ + uint64_t id; + pthread_threadid_np(pthread_self(), &id); + return id; +#elif defined(SYS_gettid) + return (int) syscall(SYS_gettid); +#endif + return -1; +} + +/** Sleep, do nothing */ +__attribute__((always_inline)) static inline void nop() +{ + __asm__("rep nop;"); +} + +static void * producer(void *ctx) +{ + int ret; + struct param *p = (struct param *) ctx; - queue_init(&q, 100, &memtype_heap); + srand((unsigned) time(0) + thread_get_id()); + size_t nops = rand() % 1000; - srand(1337); + /** @todo Criterion cr_log() is broken for multi-threaded programs */ + //cr_log_info("producer: tid = %lu", thread_get_id()); + + /* Wait for global start signal */ + while (p->start == 0) + pthread_yield(); - for (int i = 0; i < 100; i++) - queue_push(&q, &) + //cr_log_info("producer: wait for %zd nops", nops); - queue_destroy(&q);*/ + /* Wait for a random time */ + for (size_t i = 0; i != nops; i += 1) + nop(); + + //cr_log_info("producer: start pushing"); + + /* Enqueue */ + for (unsigned long count = 0; count < p->iter_count; count++) { + do { + ret = queue_push(&p->queue, (void *) count); + pthread_yield(); + } while (ret != 1); + } + + //cr_log_info("producer: finished"); + + return NULL; +} + +static void * consumer(void *ctx) +{ + int ret; + struct param *p = (struct param *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t nops = rand() % 1000; + + //cr_log_info("consumer: tid = %lu", thread_get_id()); + + /* Wait for global start signal */ + while (p->start == 0) + pthread_yield(); + + //cr_log_info("consumer: wait for %zd nops", nops); + + /* Wait for a random time */ + for (size_t i = 0; i != nops; i += 1) + nop(); + + //cr_log_info("consumer: start pulling"); + + /* Dequeue */ + for (unsigned long count = 0; count < p->iter_count; count++) { + void *ptr; + + do { + ret = queue_pull(&p->queue, &ptr); + } while (ret != 1); + + //cr_log_info("consumer: %lu\n", count); + + //cr_assert_eq((intptr_t) ptr, count); + } + + //cr_log_info("consumer: finished"); + + return NULL; +} + +void * producer_consumer(void *ctx) +{ + struct param *p = (struct param *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t nops = rand() % 1000; + + /* Wait for global start signal */ + while (p->start == 0) + pthread_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != nops; i += 1) + nop(); + + for (int iter = 0; iter < p->iter_count; ++iter) { + for (size_t i = 0; i < p->batch_size; i++) { + void *ptr = (void *) (iter * p->batch_size + i); + while (!queue_push(&p->queue, ptr)) + pthread_yield(); /* queue full, let other threads proceed */ + } + + for (size_t i = 0; i < p->batch_size; i++) { + void *ptr; + while (!queue_pull(&p->queue, &ptr)) + pthread_yield(); /* queue empty, let other threads proceed */ + } + } + + return 0; +} + +void * producer_consumer_many(void *ctx) +{ + struct param *p = (struct param *) ctx; + + srand((unsigned) time(0) + thread_get_id()); + size_t nops = rand() % 1000; + + /* Wait for global start signal */ + while (p->start == 0) + pthread_yield(); + + /* Wait for a random time */ + for (size_t i = 0; i != nops; i += 1) + nop(); + + void *ptrs[p->batch_size]; + + for (int iter = 0; iter < p->iter_count; ++iter) { + for (size_t i = 0; i < p->batch_size; i++) + ptrs[i] = (void *) (iter * p->batch_size + i); + + int pushed = 0; + do { + pushed += queue_push_many(&p->queue, &ptrs[pushed], p->batch_size - pushed); + if (pushed != p->batch_size) + pthread_yield(); /* queue full, let other threads proceed */ + } while (pushed < p->batch_size); + + int pulled = 0; + do { + pulled += queue_pull_many(&p->queue, &ptrs[pulled], p->batch_size - pulled); + if (pulled != p->batch_size) + pthread_yield(); /* queue empty, let other threads proceed */ + } while (pulled < p->batch_size); + } + + return 0; +} + +Test(queue, single_threaded) +{ + int ret; + struct param p = { + .iter_count = 1 << 8, + .queue_size = 1 << 10, + .start = 1 /* we start immeadiatly */ + }; + + ret = queue_init(&p.queue, p.queue_size, &memtype_heap); + cr_assert_eq(ret, 0, "Failed to create queue"); + + producer(&p); + consumer(&p); + + cr_assert_eq(queue_available(&q), 0); + + ret = queue_destroy(&p.queue); + cr_assert_eq(ret, 0, "Failed to create queue"); +} + +ParameterizedTestParameters(queue, multi_threaded) +{ + static struct param params[] = { + { + .iter_count = 1 << 12, + .queue_size = 1 << 9, + .thread_count = 32, + .thread_func = producer_consumer_many, + .batch_size = 10, + .memtype = &memtype_heap + }, { + .iter_count = 1 << 8, + .queue_size = 1 << 9, + .thread_count = 4, + .thread_func = producer_consumer_many, + .batch_size = 100, + .memtype = &memtype_heap + }, { + .iter_count = 1 << 16, + .queue_size = 1 << 9, + .thread_count = 16, + .thread_func = producer_consumer_many, + .batch_size = 100, + .memtype = &memtype_heap + }, { + .iter_count = 1 << 8, + .queue_size = 1 << 9, + .thread_count = 4, + .thread_func = producer_consumer_many, + .batch_size = 10, + .memtype = &memtype_heap + }, { + .iter_count = 1 << 16, + .queue_size = 1 << 9, + .thread_count = 16, + .thread_func = producer_consumer, + .batch_size = 10, + .memtype = &memtype_hugepage + } + }; + + return cr_make_param_array(struct param, params, ARRAY_LEN(params)); +} + +ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 10) +{ + int ret, cycpop; + + pthread_t threads[p->thread_count]; + + p->start = 0; + + ret = queue_init(&p->queue, p->queue_size, &memtype_heap); + cr_assert_eq(ret, 0, "Failed to create queue"); + + uint64_t start_tsc_time, end_tsc_time; + + for (int i = 0; i < p->thread_count; ++i) + pthread_create(&threads[i], NULL, p->thread_func, &p); + + sleep(1); + + start_tsc_time = rdtsc(); + p->start = 1; + + for (int i = 0; i < p->thread_count; ++i) + pthread_join(threads[i], NULL); + + end_tsc_time = rdtsc(); + cycpop = (end_tsc_time - start_tsc_time) / p->iter_count; + + if (cycpop < 400) + cr_log_info("cycles/op: %u\n", cycpop); + else + cr_log_warn("cycles/op are very high (%u). Are you running on a a hypervisor?\n", cycpop); + + cr_assert_eq(queue_available(&q), 0); + + ret = queue_destroy(&p->queue); + cr_assert_eq(ret, 0, "Failed to create queue"); +} + +Test(queue, init_destroy) +{ + int ret; + struct queue q; + + ret = queue_init(&q, 100, &memtype_heap); + cr_assert_eq(ret, -1); /* Should fail as size is not 2^x */ + + ret = queue_init(&q, 1024, &memtype_heap); + cr_assert_eq(ret, 0); /* Should succeed */ + + ret = queue_destroy(&q); + cr_assert_eq(ret, 0); /* Should succeed */ } \ No newline at end of file diff --git a/tests/timing.c b/tests/timing.c index b4ef56813..82fb8bf6f 100644 --- a/tests/timing.c +++ b/tests/timing.c @@ -76,7 +76,7 @@ Test(timing, time_to_from_double) cr_assert_float_eq(dbl, ref, 1e-9); } -Test(timing, timerfd_create_rate) +Test(timing, timerfd_create_rate, .timeout = 20) { struct timespec start, end; @@ -100,7 +100,7 @@ Test(timing, timerfd_create_rate) close(tfd); } -Test(timing, timerfd_wait_until) +Test(timing, timerfd_wait_until, .timeout = 1) { int tfd = timerfd_create(CLOCK_REALTIME, 0); diff --git a/thirdparty/criterion b/thirdparty/criterion index 5b0f2b129..1b687a9f6 160000 --- a/thirdparty/criterion +++ b/thirdparty/criterion @@ -1 +1 @@ -Subproject commit 5b0f2b129046955c004ff56ab2caada5b55ba8e3 +Subproject commit 1b687a9f6a0e51c9e9b0a047e1fcd6c94de7a080