diff --git a/include/villas/advio.h b/include/villas/advio.h index 0e9bb5fe5..eea399087 100644 --- a/include/villas/advio.h +++ b/include/villas/advio.h @@ -31,6 +31,11 @@ struct advio { CURL *curl; FILE *file; + + long uploaded; /**< Upload progress. How much has already been uploaded to the remote file. */ + long downloaded; /**< Download progress. How much has already been downloaded from the remote file. */ + + int completed:1; /**< Was the upload completd */ unsigned char hash[SHA_DIGEST_LENGTH]; @@ -43,10 +48,13 @@ typedef struct advio AFILE; /* The remaining functions from stdio are just replaced macros */ #define afeof(af) feof((af)->file) #define aftell(af) ftell((af)->file) -#define arewind(af) rewind((af)->file) #define afileno(af) fileno((af)->file) #define afread(ptr, sz, nitems, af) fread(ptr, sz, nitems, (af)->file) #define afwrite(ptr, sz, nitems, af) fwrite(ptr, sz, nitems, (af)->file) +#define afputs(ptr, af) fputs(ptr, (af)->file) +#define afprintf(af, fmt, ...) fprintf((af)->file, fmt, __VA_ARGS__) +#define afscanf(af, fmt, ...) fprintf((af)->file, fmt, __VA_ARGS__) +#define agetline(linep, linecapp, af) getline(linep, linecapp, (af)->file) /* Extensions */ #define auri(af) ((af)->uri) @@ -55,6 +63,18 @@ typedef struct advio AFILE; AFILE *afopen(const char *url, const char *mode); int afclose(AFILE *file); + int afflush(AFILE *file); -int adownload(AFILE *af); -int aupload(AFILE *af); + +int afseek(AFILE *file, long offset, int origin); + +void arewind(AFILE *file); + +/** Download contens from remote file + * + * + * @param resume Do a partial download and append to the local file + */ +int adownload(AFILE *af, int resume); + +int aupload(AFILE *af, int resume); diff --git a/include/villas/log.h b/include/villas/log.h index 58e8b0509..4166c1500 100644 --- a/include/villas/log.h +++ b/include/villas/log.h @@ -66,14 +66,15 @@ enum log_facilities { LOG_XIL = (1L << 20), LOG_TC = (1L << 21), LOG_IF = (1L << 22), + LOG_ADVIO = (1L << 23), /* Node-types */ - LOG_SOCKET = (1L << 23), - LOG_FILE = (1L << 24), - LOG_FPGA = (1L << 25), - LOG_NGSI = (1L << 26), - LOG_WEBSOCKET = (1L << 27), - LOG_OPAL = (1L << 28), + LOG_SOCKET = (1L << 24), + LOG_FILE = (1L << 25), + LOG_FPGA = (1L << 26), + LOG_NGSI = (1L << 27), + LOG_WEBSOCKET = (1L << 28), + LOG_OPAL = (1L << 30), /* Classes */ LOG_NODES = LOG_NODE | LOG_SOCKET | LOG_FILE | LOG_FPGA | LOG_NGSI | LOG_WEBSOCKET | LOG_OPAL, diff --git a/include/villas/nodes/file.h b/include/villas/nodes/file.h index 26dceca0a..f07e1d905 100644 --- a/include/villas/nodes/file.h +++ b/include/villas/nodes/file.h @@ -52,18 +52,22 @@ struct file { } read, write; enum read_epoch_mode { - EPOCH_DIRECT, - EPOCH_WAIT, - EPOCH_RELATIVE, - EPOCH_ABSOLUTE, - EPOCH_ORIGINAL + FILE_EPOCH_DIRECT, + FILE_EPOCH_WAIT, + FILE_EPOCH_RELATIVE, + FILE_EPOCH_ABSOLUTE, + FILE_EPOCH_ORIGINAL } read_epoch_mode; /**< Specifies how file::offset is calculated. */ struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */ struct timespec read_epoch; /**< The epoch timestamp from the configuration. */ struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */ - int read_rewind; /**< Should we rewind the file when we reach EOF? */ + enum { + FILE_EOF_EXIT, /**< Terminate when EOF is reached. */ + FILE_EOF_REWIND, /**< Rewind the file when EOF is reached. */ + FILE_EOF_WAIT /**< Blocking wait when EOF is reached. */ + } read_eof; /**< Should we rewind the file when we reach EOF? */ int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ double read_rate; /**< The read rate. */ }; diff --git a/lib/advio.c b/lib/advio.c index a9a7255a1..df81d3ab9 100644 --- a/lib/advio.c +++ b/lib/advio.c @@ -39,39 +39,116 @@ #define BAR_WIDTH 60 /**< How wide you want the progress meter to be. */ -static int advio_xferinfo(void *p, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) +static int advio_trace(CURL *handle, curl_infotype type, char *data, size_t size, void *userp) +{ + char *nl; + + switch (type) { + case CURLINFO_TEXT: + nl = strchr(data, '\n'); + if (nl) + *nl = 0; + + debug(LOG_ADVIO | 10, "%s", data); + default: /* in case a new one is introduced to shock us */ + return 0; + } + + return 0; +} + +static char * advio_human_time(double t, char *buf, size_t len) +{ + int i = 0; + const char *units[] = { "secs", "mins", "hrs", "days", "weeks", "months", "years" }; + int divs[] = { 60, 60, 24, 7, 4, 12 }; + + while (t > divs[i] && i < ARRAY_LEN(divs)) { + t /= divs[i]; + i++; + } + + snprintf(buf, len, "%.2f %s", t, units[i]); + + return buf; +} + +static char * advio_human_size(double s, char *buf, size_t len) +{ + int i = 0; + const char *units[] = { "B", "kiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB" }; + + while (s > 1024 && i < ARRAY_LEN(units)) { + s /= 1024; + i++; + } + + snprintf(buf, len, "%.*f %s", i ? 2 : 0, s, units[i]); + + return buf; +} + +static int advio_xferinfo(void *p, curl_off_t dl_total_bytes, curl_off_t dl_bytes, curl_off_t ul_total_bytes, curl_off_t ul_bytes) { struct advio *af = (struct advio *) p; - double curtime = 0; + double cur_time, eta_time, estimated_time, frac; - curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &curtime); - - // ensure that the file to be downloaded is not empty - // because that would cause a division by zero error later on - if (dltotal <= 0.0) + curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &cur_time); + + /* Is this transaction an upload? */ + int upload = ul_total_bytes > 0; + + curl_off_t total_bytes = upload ? ul_total_bytes : dl_total_bytes; + curl_off_t bytes = upload ? ul_bytes : dl_bytes; + + /* Are we finished? */ + if (bytes == 0) + af->completed = 0; + + if (af->completed) return 0; + + /* Ensure that the file to be downloaded is not empty + * because that would cause a division by zero error later on */ + if (total_bytes <= 0) + return 0; + + frac = (double) bytes / total_bytes; + + estimated_time = cur_time * (1.0 / frac); + eta_time = estimated_time - cur_time; - double frac = dlnow / dltotal; + /* Print file sizes in human readable format */ + char buf[4][32]; + + char *bytes_human = advio_human_size(bytes, buf[0], sizeof(buf[0])); + char *total_bytes_human = advio_human_size(total_bytes, buf[1], sizeof(buf[1])); + char *eta_time_human = advio_human_time(eta_time, buf[2], sizeof(buf[2])); - // part of the progressmeter that's already "full" + /* Part of the progressmeter that's already "full" */ int dotz = round(frac * BAR_WIDTH); - // create the "meter" - fprintf(stderr, "%3.0f%% in %f s (%" CURL_FORMAT_CURL_OFF_T " / %" CURL_FORMAT_CURL_OFF_T ") [", frac * 100, curtime, dlnow, dltotal); + /* Progress bar */ + fprintf(stderr, "\r["); + + for (int i = 0 ; i < BAR_WIDTH; i++) { + if (upload) + fputc(BAR_WIDTH - i > dotz ? ' ' : '<', stderr); + else + fputc(i > dotz ? ' ' : '>', stderr); + } - // part that's full already - int i = 0; - for ( ; i < dotz; i++) - fprintf(stderr, "="); - - // remaining part (spaces) - for ( ; i < BAR_WIDTH; i++) - fprintf(stderr, " "); - - // and back to line begin - do not forget the fflush to avoid output buffering problems! - fprintf(stderr, "]\r"); + fprintf(stderr, "] "); + + /* Details */ + fprintf(stderr, "eta %-12s %12s of %-12s", eta_time_human, bytes_human, total_bytes_human); fflush(stderr); - + + if (bytes == total_bytes) { + af->completed = 1; + fprintf(stderr, "\33[2K\r"); + } + return 0; } @@ -118,13 +195,20 @@ AFILE * afopen(const char *uri, const char *mode) curl_easy_setopt(af->curl, CURLOPT_USERAGENT, USER_AGENT); curl_easy_setopt(af->curl, CURLOPT_URL, af->uri); curl_easy_setopt(af->curl, CURLOPT_WRITEDATA, af->file); + curl_easy_setopt(af->curl, CURLOPT_READDATA, af->file); + + curl_easy_setopt(af->curl, CURLOPT_DEBUGFUNCTION, advio_trace); + curl_easy_setopt(af->curl, CURLOPT_VERBOSE, 1); + curl_easy_setopt(af->curl, CURLOPT_XFERINFOFUNCTION, advio_xferinfo); curl_easy_setopt(af->curl, CURLOPT_XFERINFODATA, af); - curl_easy_setopt(af->curl, CURLOPT_NOPROGRESS, 0L); - ret = adownload(af); + ret = adownload(af, 0); if (ret) goto out0; + + af->uploaded = 0; + af->downloaded = 0; return af; @@ -151,6 +235,42 @@ int afclose(AFILE *af) return ret; } +int afseek(AFILE *af, long offset, int origin) +{ + long new, cur = aftell(af); + + switch (origin) { + case SEEK_SET: + new = offset; + break; + + case SEEK_END: + fseek(af->file, 0, SEEK_END); + new = aftell(af); + fseek(af->file, cur, SEEK_SET); + break; + + case SEEK_CUR: + new = cur + offset; + break; + + default: + return -1; + } + + if (new < af->uploaded) + af->uploaded = new; + + return fseek(af->file, offset, origin); +} + +void arewind(AFILE *af) +{ + af->uploaded = 0; + + return rewind(af->file); +} + int afflush(AFILE *af) { bool dirty; @@ -161,58 +281,112 @@ int afflush(AFILE *af) dirty = memcmp(hash, af->hash, sizeof(hash)); if (dirty) - return aupload(af); + return aupload(af, 1); return 0; } - -int aupload(AFILE *af) + +int aupload(AFILE *af, int resume) { CURLcode res; - long pos; - int ret; - - ret = fflush(af->file); - if (ret) - return ret; + + long pos, end; + + double total_bytes = 0, total_time = 0; + char buf[2][32]; + + pos = aftell(af); + fseek(af->file, 0, SEEK_END); + end = aftell(af); + fseek(af->file, 0, SEEK_SET); + + if (resume) { + if (end == af->uploaded) + return 0; + + char *size_human = advio_human_size(end - af->uploaded, buf[0], sizeof(buf[0])); + + info("Resume upload of %s of %s from offset %lu", af->uri, size_human, af->uploaded); + curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, af->uploaded); + } + else { + char *size_human = advio_human_size(end, buf[0], sizeof(buf[0])); + + info("Start upload of %s of %s", af->uri, size_human); + curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, 0); + } curl_easy_setopt(af->curl, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(af->curl, CURLOPT_READDATA, af->file); - - pos = ftell(af->file); /* Remember old stream pointer */ - fseek(af->file, 0, SEEK_SET); + curl_easy_setopt(af->curl, CURLOPT_INFILESIZE, end - af->uploaded); + curl_easy_setopt(af->curl, CURLOPT_NOPROGRESS, !isatty(fileno(stderr))); res = curl_easy_perform(af->curl); - fprintf(stderr, "\e[2K"); - fflush(stderr); /* do not continue in the same line as the progress bar */ - fseek(af->file, pos, SEEK_SET); /* Restore old stream pointer */ if (res != CURLE_OK) return -1; sha1sum(af->file, af->hash); + + curl_easy_getinfo(af->curl, CURLINFO_SIZE_UPLOAD, &total_bytes); + curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &total_time); + + char *total_bytes_human = advio_human_size(total_bytes, buf[0], sizeof(buf[0])); + char *total_time_human = advio_human_time(total_time, buf[1], sizeof(buf[1])); + + info("Finished uploaded of %s in %s", total_bytes_human, total_time_human); + + af->uploaded += total_bytes; return 0; } -int adownload(AFILE *af) +int adownload(AFILE *af, int resume) { CURLcode res; - long code; + long code, pos; int ret; + + double total_bytes = 0, total_time = 0; + char buf[2][32]; + + pos = aftell(af); - fseek(af->file, 0, SEEK_SET); + if (resume) { + info("Resume download of %s from offset %lu", af->uri, af->downloaded); + + curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, af->downloaded); + } + else { + info("Start download of %s", af->uri); + + rewind(af->file); + curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, 0); + } + + curl_easy_setopt(af->curl, CURLOPT_UPLOAD, 0L); + curl_easy_setopt(af->curl, CURLOPT_NOPROGRESS, !isatty(fileno(stderr))); res = curl_easy_perform(af->curl); - fprintf(stderr, "\e[2K"); - fflush(stderr); /* do not continue in the same line as the progress bar */ - switch (res) { case CURLE_OK: - curl_easy_getinfo(af->curl, CURLINFO_RESPONSE_CODE, &code); + curl_easy_getinfo(af->curl, CURLINFO_SIZE_DOWNLOAD, &total_bytes); + curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &total_time); + + char *total_bytes_human = advio_human_size(total_bytes, buf[0], sizeof(buf[0])); + char *total_time_human = advio_human_time(total_time, buf[1], sizeof(buf[1])); + + info("Finished download of %s in %s", total_bytes_human, total_time_human); + + af->downloaded += total_bytes; + af->uploaded = af->downloaded; + + res = curl_easy_getinfo(af->curl, CURLINFO_RESPONSE_CODE, &code); + if (res) + return -1; + switch (code) { case 0: case 200: goto exist; @@ -234,7 +408,7 @@ int adownload(AFILE *af) return -1; default: - error("Failed to fetch file: %s: %s\n", af->uri, curl_easy_strerror(res)); + error("ADVIO: Failed to fetch file: %s: %s", af->uri, curl_easy_strerror(res)); return -1; } @@ -255,11 +429,13 @@ notexist: /* File does not exist */ return ret; exist: /* File exists */ - if (af->mode[0] == 'a') - fseek(af->file, 0, SEEK_END); + if (resume) + afseek(af, pos, SEEK_SET); + else if (af->mode[0] == 'a') + afseek(af, 0, SEEK_END); else if (af->mode[0] == 'r' || af->mode[0] == 'w') - fseek(af->file, 0, SEEK_SET); - + afseek(af, 0, SEEK_SET); + sha1sum(af->file, af->hash); return 0; diff --git a/lib/log.c b/lib/log.c index 70347f88b..33b90954d 100644 --- a/lib/log.c +++ b/lib/log.c @@ -65,6 +65,7 @@ static const char *facilities_strs[] = { "xil", /* LOG_XIL */ "tc", /* LOG_TC */ "if", /* LOG_IF */ + "advio", /* LOG_ADVIO */ /* Node-types */ "socket", /* LOG_SOCKET */ @@ -232,7 +233,7 @@ void log_vprint(struct log *l, const char *lvl, const char *fmt, va_list ap) #ifdef ENABLE_OPAL_ASYNC OpalPrint("VILLASnode: %s\n", buf); #endif - fprintf(l->file ? l->file : stderr, "%s\n", buf); + fprintf(l->file ? l->file : stderr, "\33[2K\r%s\n", buf); free(buf); } diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 0d4d53e4b..72ea1703d 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -85,21 +85,21 @@ static struct timespec file_calc_read_offset(const struct timespec *first, const /* Set read_offset depending on epoch_mode */ switch (mode) { - case EPOCH_DIRECT: /* read first value at now + epoch */ + case FILE_EPOCH_DIRECT: /* read first value at now + epoch */ offset = time_diff(first, &now); offset = time_add(&offset, epoch); break; - case EPOCH_WAIT: /* read first value at now + first + epoch */ + case FILE_EPOCH_WAIT: /* read first value at now + first + epoch */ offset = now; return time_add(&now, epoch); break; - case EPOCH_RELATIVE: /* read first value at first + epoch */ + case FILE_EPOCH_RELATIVE: /* read first value at first + epoch */ return *epoch; break; - case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ + case FILE_EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ return time_diff(first, epoch); break; @@ -123,12 +123,24 @@ int file_parse(struct node *n, config_setting_t *cfg) cfg_in = config_setting_get_member(cfg, "in"); if (cfg_in) { + const char *eof; + if (file_parse_direction(cfg_in, f, FILE_READ)) cerror(cfg_in, "Failed to parse input file for node %s", node_name(n)); /* More read specific settings */ - if (!config_setting_lookup_bool(cfg_in, "rewind", &f->read_rewind)) - f->read_rewind = 1; + if (config_setting_lookup_string(cfg_in, "eof", &eof)) { + if (!strcmp(eof, "exit")) + f->read_eof = FILE_EOF_EXIT; + else if (!strcmp(eof, "rewind")) + f->read_eof = FILE_EOF_REWIND; + else if (!strcmp(eof, "wait")) + f->read_eof = FILE_EOF_WAIT; + else + cerror(cfg_in, "Invalid mode '%s' for 'eof' setting", eof); + } + else + f->read_eof = FILE_EOF_EXIT; if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate)) f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ @@ -141,20 +153,20 @@ int file_parse(struct node *n, config_setting_t *cfg) const char *epoch_mode; if (config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode)) { if (!strcmp(epoch_mode, "direct")) - f->read_epoch_mode = EPOCH_DIRECT; + f->read_epoch_mode = FILE_EPOCH_DIRECT; else if (!strcmp(epoch_mode, "wait")) - f->read_epoch_mode = EPOCH_WAIT; + f->read_epoch_mode = FILE_EPOCH_WAIT; else if (!strcmp(epoch_mode, "relative")) - f->read_epoch_mode = EPOCH_RELATIVE; + f->read_epoch_mode = FILE_EPOCH_RELATIVE; else if (!strcmp(epoch_mode, "absolute")) - f->read_epoch_mode = EPOCH_ABSOLUTE; + f->read_epoch_mode = FILE_EPOCH_ABSOLUTE; else if (!strcmp(epoch_mode, "original")) - f->read_epoch_mode = EPOCH_ORIGINAL; + f->read_epoch_mode = FILE_EPOCH_ORIGINAL; else cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); } else - f->read_epoch_mode = EPOCH_DIRECT; + f->read_epoch_mode = FILE_EPOCH_DIRECT; } n->_vd = f; @@ -170,37 +182,44 @@ char * file_print(struct node *n) if (f->read.fmt) { const char *epoch_str = NULL; switch (f->read_epoch_mode) { - case EPOCH_DIRECT: epoch_str = "direct"; break; - case EPOCH_WAIT: epoch_str = "wait"; break; - case EPOCH_RELATIVE: epoch_str = "relative"; break; - case EPOCH_ABSOLUTE: epoch_str = "absolute"; break; - case EPOCH_ORIGINAL: epoch_str = "original"; break; + case FILE_EPOCH_DIRECT: epoch_str = "direct"; break; + case FILE_EPOCH_WAIT: epoch_str = "wait"; break; + case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break; + case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break; + case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break; + } + + const char *eof_str = NULL; + switch (f->read_eof) { + case FILE_EOF_EXIT: eof_str = "exit"; break; + case FILE_EOF_WAIT: eof_str = "wait"; break; + case FILE_EOF_REWIND: eof_str = "rewind"; break; } - strcatf(&buf, "in=%s, mode=%s, rewind=%u, epoch_mode=%s, epoch=%.2f", + strcatf(&buf, "in=%s, mode=%s, eof=%s, epoch_mode=%s, epoch=%.2f", f->read.uri ? f->read.uri : f->read.fmt, f->read.mode, - f->read_rewind, + eof_str, epoch_str, time_to_double(&f->read_epoch) ); if (f->read_rate) - strcatf(&buf, "rate=%.1f, ", f->read_rate); + strcatf(&buf, ", rate=%.1f", f->read_rate); } if (f->write.fmt) { - strcatf(&buf, "out=%s, mode=%s, ", + strcatf(&buf, ", out=%s, mode=%s", f->write.uri ? f->write.uri : f->write.fmt, f->write.mode ); } if (f->read_first.tv_sec || f->read_first.tv_nsec) - strcatf(&buf, "first=%.2f, ", time_to_double(&f->read_first)); + strcatf(&buf, ", first=%.2f", time_to_double(&f->read_first)); if (f->read_offset.tv_sec || f->read_offset.tv_nsec) - strcatf(&buf, "offset=%.2f, ", time_to_double(&f->read_offset)); + strcatf(&buf, ", offset=%.2f", time_to_double(&f->read_offset)); if ((f->read_first.tv_sec || f->read_first.tv_nsec) && (f->read_offset.tv_sec || f->read_offset.tv_nsec)) { @@ -210,12 +229,9 @@ char * file_print(struct node *n) eta = time_diff(&now, &eta); if (eta.tv_sec || eta.tv_nsec) - strcatf(&buf, "eta=%.2f sec, ", time_to_double(&eta)); + strcatf(&buf, ", eta=%.2f sec", time_to_double(&eta)); } - if (strlen(buf) > 2) - buf[strlen(buf) - 2] = 0; - return buf; } @@ -246,13 +262,16 @@ int file_start(struct node *n) struct sample s; arewind(f->read.handle); - ret = sample_io_villas_fscan(f->read.handle->file, &s, NULL); - if (ret < 0) - error("Failed to read first timestamp of node %s", node_name(n)); + + if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) { + ret = sample_io_villas_fscan(f->read.handle->file, &s, NULL); + if (ret < 0) + error("Failed to read first timestamp of node %s", node_name(n)); - f->read_first = s.ts.origin; - f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); - arewind(f->read.handle); + f->read_first = s.ts.origin; + f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); + arewind(f->read.handle); + } } if (f->write.fmt) { @@ -298,17 +317,23 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt) retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */ if (values < 0) { if (afeof(f->read.handle)) { - if (f->read_rewind) { - info("Rewind input file of node %s", node_name(n)); + switch (f->read_eof) { + case FILE_EOF_REWIND: + info("Rewind input file of node %s", node_name(n)); - f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); - arewind(f->read.handle); - - goto retry; - } - else { - info("Reached end-of-file"); - exit(EXIT_SUCCESS); + f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); + arewind(f->read.handle); + goto retry; + + case FILE_EOF_WAIT: + usleep(10000); /* We wait 10ms before fetching again. */ + adownload(f->read.handle, 1); + goto retry; + + case FILE_EOF_EXIT: + info("Reached end-of-file"); + exit(EXIT_SUCCESS); + } } else @@ -317,10 +342,7 @@ retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get return 0; } - if (f->read_epoch_mode == EPOCH_ORIGINAL) { - return 1; - } - else { + if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) { if (!f->read_rate || aftell(f->read.handle) == 0) { s->ts.origin = time_add(&s->ts.origin, &f->read_offset); diff --git a/tests/integration/pipe-loopback-file.sh b/tests/integration/pipe-loopback-file.sh new file mode 100755 index 000000000..f2a1922e9 --- /dev/null +++ b/tests/integration/pipe-loopback-file.sh @@ -0,0 +1,62 @@ +#!/bin/bash +# +# Integration loopback test for villas-pipe. +# +# @author Steffen Vogel +# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +CONFIG_FILE=$(mktemp) +INPUT_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) +NODE_FILE=$(mktemp) + +cat > ${CONFIG_FILE} << EOF +nodes = { + node1 = { + type = "file"; + + in = { + uri = "${NODE_FILE}", + mode = "w+", + + epoch_mode = "original", + eof = "wait" + }, + out = { + uri = "${NODE_FILE}" + mode = "w+" + } + } +} +EOF + +# Generate test data +villas-signal random -l 10 -n > ${INPUT_FILE} + +# We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received +villas-pipe ${CONFIG_FILE} node1 > ${OUTPUT_FILE} < <(cat ${INPUT_FILE}; sleep 0.5; echo -n) + +# Comapre data +villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE} +RC=$? + +rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} ${NODE_FILE} + +exit $RC \ No newline at end of file diff --git a/tests/integration/pipe-loopback-nanomsg.sh b/tests/integration/pipe-loopback-nanomsg.sh index 78efe0e94..555ce2eb8 100755 --- a/tests/integration/pipe-loopback-nanomsg.sh +++ b/tests/integration/pipe-loopback-nanomsg.sh @@ -26,7 +26,7 @@ CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) OUTPUT_FILE=$(mktemp) -cat > ${CONFIG_FILE} <<- EOF +cat > ${CONFIG_FILE} << EOF nodes = { node1 = { type = "nanomsg"; diff --git a/tests/integration/pipe-loopback-zeromq.sh b/tests/integration/pipe-loopback-zeromq.sh index d2301b18c..594b060bf 100755 --- a/tests/integration/pipe-loopback-zeromq.sh +++ b/tests/integration/pipe-loopback-zeromq.sh @@ -26,7 +26,7 @@ CONFIG_FILE=$(mktemp) INPUT_FILE=$(mktemp) OUTPUT_FILE=$(mktemp) -cat > ${CONFIG_FILE} <<- EOF +cat > ${CONFIG_FILE} << EOF nodes = { node1 = { type = "zeromq"; diff --git a/tests/unit/advio.c b/tests/unit/advio.c index 451b30d61..96bbbd608 100644 --- a/tests/unit/advio.c +++ b/tests/unit/advio.c @@ -27,6 +27,8 @@ #include #include +#include +#include /** This URI points to a Sciebo share which contains some test files. * The Sciebo share is read/write accessible via WebDAV. */ @@ -68,6 +70,83 @@ Test(advio, download) cr_assert_eq(ret, 0, "Failed to close file"); } +Test(advio, download_large) +{ + AFILE *af; + int ret, len = 16; + + struct sample *smp = alloc(SAMPLE_LEN(len)); + smp->capacity = len; + + af = afopen(BASE_URI "/download-large" , "r"); + cr_assert(af, "Failed to download file"); + + ret = sample_io_villas_fscan(af->file, smp, NULL); + cr_assert_eq(ret, 0); + + cr_assert_eq(smp->sequence, 0); + cr_assert_eq(smp->length, 4); + cr_assert_eq(smp->ts.origin.tv_sec, 1497710378); + cr_assert_eq(smp->ts.origin.tv_nsec, 863332240); + + float data[] = { 0.022245, 0.000000, -1.000000, 1.000000 }; + + for (int i = 0; i < smp->length; i++) + cr_assert_float_eq(smp->data[i].f, data[i], 1e-5); + + ret = afclose(af); + cr_assert_eq(ret, 0, "Failed to close file"); +} + +Test(advio, resume) +{ + int ret; + AFILE *af1, *af2; + char *fn, dir[] = "/tmp/temp.XXXXXX"; + char line1[32]; + char *line2 = NULL; + size_t linelen = 0; + + mkdtemp(dir); + ret = asprintf(&fn, "%s/file", dir); + cr_assert_gt(ret, 0); + + af1 = afopen(fn, "w+"); + cr_assert_not_null(af1); + + /* We flush once the empty file in order to upload an empty file. */ + aupload(af1, 0); + + af2 = afopen(fn, "r"); + cr_assert_not_null(af2); + + for (int i = 0; i < 100; i++) { + snprintf(line1, sizeof(line1), "This is line %d\n", i); + + afputs(line1, af1); + aupload(af1, 1); + + adownload(af2, 1); + agetline(&line2, &linelen, af2); + + cr_assert_str_eq(line1, line2); + } + + ret = afclose(af1); + cr_assert_eq(ret, 0); + + ret = afclose(af2); + cr_assert_eq(ret, 0); + + ret = unlink(fn); + cr_assert_eq(ret, 0); + + ret = rmdir(dir); + cr_assert_eq(ret, 0); + + free(line2); +} + Test(advio, upload) { AFILE *af;