/** Message paths. * * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. * Unauthorized copying of this file, via any medium is strictly prohibited. *********************************************************************************/ #include #include #include #include #include #include "config.h" #include "utils.h" #include "path.h" #include "timing.h" #include "pool.h" #include "queue.h" /** Main thread function per path: receive -> sent messages */ static void * path_run(void *arg) { struct path *p = arg; unsigned cnt = p->in->vectorize; int recv, enqueue, enqueued; int ready = 0; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ struct sample *smps[cnt]; /* Main thread loop */ for (;;) { /* Fill smps[] free sample blocks from the pool */ ready += sample_get_many(&p->pool, smps, cnt - ready); if (ready != cnt) warn("Pool underrun for path %s", path_name(p)); /* Read ready samples and store them to blocks pointed by smps[] */ recv = p->in->_vt->read(p->in, smps, ready); if (recv < 0) error("Failed to receive message from node %s", node_name(p->in)); else if (recv < ready) warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); debug(DBG_PATH | 15, "Received %u messages from node %s", recv, node_name(p->in)); /* Run preprocessing hooks for vector of samples */ enqueue = hook_run(p, smps, recv, HOOK_READ); if (enqueue != recv) { info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); p->skipped += recv - enqueue; } enqueued = queue_push_many(&p->queue, (void **) smps, enqueue); if (enqueue != enqueued) warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p)); ready -= enqueued; debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p)); list_foreach(struct node *n, &p->destinations) { int cnt = n->vectorize; int sent, tosend, available, released; struct sample *smps[n->vectorize]; available = queue_pull_many(&p->queue, (void **) smps, cnt); if (available < cnt) warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); if (available == 0) continue; tosend = hook_run(p, smps, available, HOOK_WRITE); if (tosend == 0) continue; sent = node_write(n, smps, tosend); if (sent < 0) error("Failed to sent %u samples to node %s", cnt, node_name(n)); else if (sent < tosend) warn("Partial write to node %s", node_name(n)); debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n)); released = pool_put_many(&p->pool, (void **) smps, sent); if (sent != released) warn("Failed to release %u samples to pool for path %s", sent - released, path_name(p)); } } return NULL; } int path_start(struct path *p) { int ret; info("Starting path: %s (#hooks=%zu)", path_name(p), list_length(&p->hooks)); ret = hook_run(p, NULL, 0, HOOK_PATH_START); if (ret) return -1; p->state = PATH_RUNNING; return pthread_create(&p->tid, NULL, &path_run, p); } int path_stop(struct path *p) { info("Stopping path: %s", path_name(p)); pthread_cancel(p->recv_tid); pthread_join(p->recv_tid, NULL); if (hook_run(p, NULL, 0, HOOK_PATH_STOP)) return -1; p->state = PATH_STOPPED; return 0; } const char * path_name(struct path *p) { if (!p->_name) { strcatf(&p->_name, "%s " MAG("=>"), node_name_short(p->in)); list_foreach(struct node *n, &p->destinations) strcatf(&p->_name, " %s", node_name_short(n)); } return p->_name; } int path_init(struct path *p) { int ret; /* Add internal hooks if they are not already in the list*/ list_foreach(struct hook *h, &hooks) { if ((h->type & HOOK_INTERNAL) && (list_lookup(&p->hooks, h->name) == NULL)) list_push(&p->hooks, memdup(h, sizeof(struct hook))); } /* We sort the hooks according to their priority before starting the path */ list_sort(&p->hooks, hooks_sort_priority); /* Allocate hook private memory for storing state / parameters */ ret = hook_run(p, NULL, 0, HOOK_INIT); if (ret) error("Failed to initialize hooks of path: %s", path_name(p)); /* Parse hook arguments */ ret = hook_run(p, NULL, 0, HOOK_PARSE); if (ret) error("Failed to parse arguments for hooks of path: %s", path_name(p)); /* Initialize queue */ ret = pool_init(&p->pool, SAMPLE_LEN(p->samplelen), p->queuelen, &memtype_hugepage); if (ret) error("Failed to allocate memory pool for path"); ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage); if (ret) error("Failed to initialize queue for path"); p->state = PATH_INITIALIZED; return 0; } void path_destroy(struct path *p) { hook_run(p, NULL, 0, HOOK_DEINIT); /* Release memory */ list_destroy(&p->destinations, NULL, false); list_destroy(&p->hooks, NULL, true); queue_destroy(&p->queue); pool_destroy(&p->pool); p->state = PATH_DESTROYED; free(p->_name); } int path_uses_node(struct path *p, struct node *n) { return (p->in == n) || list_contains(&p->destinations, n) ? 0 : 1; }