1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00

Merge branch 'advio-improvements' into 'develop'

Advio improvements

See merge request !25
This commit is contained in:
Steffen Vogel 2017-06-17 19:43:16 +02:00
commit a8827b6c44
10 changed files with 483 additions and 118 deletions

View file

@ -31,6 +31,11 @@
struct advio { struct advio {
CURL *curl; CURL *curl;
FILE *file; FILE *file;
long uploaded; /**< Upload progress. How much has already been uploaded to the remote file. */
long downloaded; /**< Download progress. How much has already been downloaded from the remote file. */
int completed:1; /**< Was the upload completd */
unsigned char hash[SHA_DIGEST_LENGTH]; unsigned char hash[SHA_DIGEST_LENGTH];
@ -43,10 +48,13 @@ typedef struct advio AFILE;
/* The remaining functions from stdio are just replaced macros */ /* The remaining functions from stdio are just replaced macros */
#define afeof(af) feof((af)->file) #define afeof(af) feof((af)->file)
#define aftell(af) ftell((af)->file) #define aftell(af) ftell((af)->file)
#define arewind(af) rewind((af)->file)
#define afileno(af) fileno((af)->file) #define afileno(af) fileno((af)->file)
#define afread(ptr, sz, nitems, af) fread(ptr, sz, nitems, (af)->file) #define afread(ptr, sz, nitems, af) fread(ptr, sz, nitems, (af)->file)
#define afwrite(ptr, sz, nitems, af) fwrite(ptr, sz, nitems, (af)->file) #define afwrite(ptr, sz, nitems, af) fwrite(ptr, sz, nitems, (af)->file)
#define afputs(ptr, af) fputs(ptr, (af)->file)
#define afprintf(af, fmt, ...) fprintf((af)->file, fmt, __VA_ARGS__)
#define afscanf(af, fmt, ...) fprintf((af)->file, fmt, __VA_ARGS__)
#define agetline(linep, linecapp, af) getline(linep, linecapp, (af)->file)
/* Extensions */ /* Extensions */
#define auri(af) ((af)->uri) #define auri(af) ((af)->uri)
@ -55,6 +63,18 @@ typedef struct advio AFILE;
AFILE *afopen(const char *url, const char *mode); AFILE *afopen(const char *url, const char *mode);
int afclose(AFILE *file); int afclose(AFILE *file);
int afflush(AFILE *file); int afflush(AFILE *file);
int adownload(AFILE *af);
int aupload(AFILE *af); int afseek(AFILE *file, long offset, int origin);
void arewind(AFILE *file);
/** Download contens from remote file
*
*
* @param resume Do a partial download and append to the local file
*/
int adownload(AFILE *af, int resume);
int aupload(AFILE *af, int resume);

View file

@ -66,14 +66,15 @@ enum log_facilities {
LOG_XIL = (1L << 20), LOG_XIL = (1L << 20),
LOG_TC = (1L << 21), LOG_TC = (1L << 21),
LOG_IF = (1L << 22), LOG_IF = (1L << 22),
LOG_ADVIO = (1L << 23),
/* Node-types */ /* Node-types */
LOG_SOCKET = (1L << 23), LOG_SOCKET = (1L << 24),
LOG_FILE = (1L << 24), LOG_FILE = (1L << 25),
LOG_FPGA = (1L << 25), LOG_FPGA = (1L << 26),
LOG_NGSI = (1L << 26), LOG_NGSI = (1L << 27),
LOG_WEBSOCKET = (1L << 27), LOG_WEBSOCKET = (1L << 28),
LOG_OPAL = (1L << 28), LOG_OPAL = (1L << 30),
/* Classes */ /* Classes */
LOG_NODES = LOG_NODE | LOG_SOCKET | LOG_FILE | LOG_FPGA | LOG_NGSI | LOG_WEBSOCKET | LOG_OPAL, LOG_NODES = LOG_NODE | LOG_SOCKET | LOG_FILE | LOG_FPGA | LOG_NGSI | LOG_WEBSOCKET | LOG_OPAL,

View file

@ -52,18 +52,22 @@ struct file {
} read, write; } read, write;
enum read_epoch_mode { enum read_epoch_mode {
EPOCH_DIRECT, FILE_EPOCH_DIRECT,
EPOCH_WAIT, FILE_EPOCH_WAIT,
EPOCH_RELATIVE, FILE_EPOCH_RELATIVE,
EPOCH_ABSOLUTE, FILE_EPOCH_ABSOLUTE,
EPOCH_ORIGINAL FILE_EPOCH_ORIGINAL
} read_epoch_mode; /**< Specifies how file::offset is calculated. */ } read_epoch_mode; /**< Specifies how file::offset is calculated. */
struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */ struct timespec read_first; /**< The first timestamp in the file file::{read,write}::uri */
struct timespec read_epoch; /**< The epoch timestamp from the configuration. */ struct timespec read_epoch; /**< The epoch timestamp from the configuration. */
struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */ struct timespec read_offset; /**< An offset between the timestamp in the input file and the current time */
int read_rewind; /**< Should we rewind the file when we reach EOF? */ enum {
FILE_EOF_EXIT, /**< Terminate when EOF is reached. */
FILE_EOF_REWIND, /**< Rewind the file when EOF is reached. */
FILE_EOF_WAIT /**< Blocking wait when EOF is reached. */
} read_eof; /**< Should we rewind the file when we reach EOF? */
int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */ int read_timer; /**< Timer file descriptor. Blocks until 1 / rate seconds are elapsed. */
double read_rate; /**< The read rate. */ double read_rate; /**< The read rate. */
}; };

View file

@ -39,39 +39,116 @@
#define BAR_WIDTH 60 /**< How wide you want the progress meter to be. */ #define BAR_WIDTH 60 /**< How wide you want the progress meter to be. */
static int advio_xferinfo(void *p, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) static int advio_trace(CURL *handle, curl_infotype type, char *data, size_t size, void *userp)
{
char *nl;
switch (type) {
case CURLINFO_TEXT:
nl = strchr(data, '\n');
if (nl)
*nl = 0;
debug(LOG_ADVIO | 10, "%s", data);
default: /* in case a new one is introduced to shock us */
return 0;
}
return 0;
}
static char * advio_human_time(double t, char *buf, size_t len)
{
int i = 0;
const char *units[] = { "secs", "mins", "hrs", "days", "weeks", "months", "years" };
int divs[] = { 60, 60, 24, 7, 4, 12 };
while (t > divs[i] && i < ARRAY_LEN(divs)) {
t /= divs[i];
i++;
}
snprintf(buf, len, "%.2f %s", t, units[i]);
return buf;
}
static char * advio_human_size(double s, char *buf, size_t len)
{
int i = 0;
const char *units[] = { "B", "kiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB" };
while (s > 1024 && i < ARRAY_LEN(units)) {
s /= 1024;
i++;
}
snprintf(buf, len, "%.*f %s", i ? 2 : 0, s, units[i]);
return buf;
}
static int advio_xferinfo(void *p, curl_off_t dl_total_bytes, curl_off_t dl_bytes, curl_off_t ul_total_bytes, curl_off_t ul_bytes)
{ {
struct advio *af = (struct advio *) p; struct advio *af = (struct advio *) p;
double curtime = 0; double cur_time, eta_time, estimated_time, frac;
curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &curtime); curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &cur_time);
// ensure that the file to be downloaded is not empty /* Is this transaction an upload? */
// because that would cause a division by zero error later on int upload = ul_total_bytes > 0;
if (dltotal <= 0.0)
curl_off_t total_bytes = upload ? ul_total_bytes : dl_total_bytes;
curl_off_t bytes = upload ? ul_bytes : dl_bytes;
/* Are we finished? */
if (bytes == 0)
af->completed = 0;
if (af->completed)
return 0; return 0;
/* Ensure that the file to be downloaded is not empty
* because that would cause a division by zero error later on */
if (total_bytes <= 0)
return 0;
frac = (double) bytes / total_bytes;
estimated_time = cur_time * (1.0 / frac);
eta_time = estimated_time - cur_time;
double frac = dlnow / dltotal; /* Print file sizes in human readable format */
char buf[4][32];
char *bytes_human = advio_human_size(bytes, buf[0], sizeof(buf[0]));
char *total_bytes_human = advio_human_size(total_bytes, buf[1], sizeof(buf[1]));
char *eta_time_human = advio_human_time(eta_time, buf[2], sizeof(buf[2]));
// part of the progressmeter that's already "full" /* Part of the progressmeter that's already "full" */
int dotz = round(frac * BAR_WIDTH); int dotz = round(frac * BAR_WIDTH);
// create the "meter" /* Progress bar */
fprintf(stderr, "%3.0f%% in %f s (%" CURL_FORMAT_CURL_OFF_T " / %" CURL_FORMAT_CURL_OFF_T ") [", frac * 100, curtime, dlnow, dltotal); fprintf(stderr, "\r[");
for (int i = 0 ; i < BAR_WIDTH; i++) {
if (upload)
fputc(BAR_WIDTH - i > dotz ? ' ' : '<', stderr);
else
fputc(i > dotz ? ' ' : '>', stderr);
}
// part that's full already fprintf(stderr, "] ");
int i = 0;
for ( ; i < dotz; i++) /* Details */
fprintf(stderr, "="); fprintf(stderr, "eta %-12s %12s of %-12s", eta_time_human, bytes_human, total_bytes_human);
// remaining part (spaces)
for ( ; i < BAR_WIDTH; i++)
fprintf(stderr, " ");
// and back to line begin - do not forget the fflush to avoid output buffering problems!
fprintf(stderr, "]\r");
fflush(stderr); fflush(stderr);
if (bytes == total_bytes) {
af->completed = 1;
fprintf(stderr, "\33[2K\r");
}
return 0; return 0;
} }
@ -118,13 +195,20 @@ AFILE * afopen(const char *uri, const char *mode)
curl_easy_setopt(af->curl, CURLOPT_USERAGENT, USER_AGENT); curl_easy_setopt(af->curl, CURLOPT_USERAGENT, USER_AGENT);
curl_easy_setopt(af->curl, CURLOPT_URL, af->uri); curl_easy_setopt(af->curl, CURLOPT_URL, af->uri);
curl_easy_setopt(af->curl, CURLOPT_WRITEDATA, af->file); curl_easy_setopt(af->curl, CURLOPT_WRITEDATA, af->file);
curl_easy_setopt(af->curl, CURLOPT_READDATA, af->file);
curl_easy_setopt(af->curl, CURLOPT_DEBUGFUNCTION, advio_trace);
curl_easy_setopt(af->curl, CURLOPT_VERBOSE, 1);
curl_easy_setopt(af->curl, CURLOPT_XFERINFOFUNCTION, advio_xferinfo); curl_easy_setopt(af->curl, CURLOPT_XFERINFOFUNCTION, advio_xferinfo);
curl_easy_setopt(af->curl, CURLOPT_XFERINFODATA, af); curl_easy_setopt(af->curl, CURLOPT_XFERINFODATA, af);
curl_easy_setopt(af->curl, CURLOPT_NOPROGRESS, 0L);
ret = adownload(af); ret = adownload(af, 0);
if (ret) if (ret)
goto out0; goto out0;
af->uploaded = 0;
af->downloaded = 0;
return af; return af;
@ -151,6 +235,42 @@ int afclose(AFILE *af)
return ret; return ret;
} }
int afseek(AFILE *af, long offset, int origin)
{
long new, cur = aftell(af);
switch (origin) {
case SEEK_SET:
new = offset;
break;
case SEEK_END:
fseek(af->file, 0, SEEK_END);
new = aftell(af);
fseek(af->file, cur, SEEK_SET);
break;
case SEEK_CUR:
new = cur + offset;
break;
default:
return -1;
}
if (new < af->uploaded)
af->uploaded = new;
return fseek(af->file, offset, origin);
}
void arewind(AFILE *af)
{
af->uploaded = 0;
return rewind(af->file);
}
int afflush(AFILE *af) int afflush(AFILE *af)
{ {
bool dirty; bool dirty;
@ -161,58 +281,112 @@ int afflush(AFILE *af)
dirty = memcmp(hash, af->hash, sizeof(hash)); dirty = memcmp(hash, af->hash, sizeof(hash));
if (dirty) if (dirty)
return aupload(af); return aupload(af, 1);
return 0; return 0;
} }
int aupload(AFILE *af) int aupload(AFILE *af, int resume)
{ {
CURLcode res; CURLcode res;
long pos;
int ret; long pos, end;
ret = fflush(af->file); double total_bytes = 0, total_time = 0;
if (ret) char buf[2][32];
return ret;
pos = aftell(af);
fseek(af->file, 0, SEEK_END);
end = aftell(af);
fseek(af->file, 0, SEEK_SET);
if (resume) {
if (end == af->uploaded)
return 0;
char *size_human = advio_human_size(end - af->uploaded, buf[0], sizeof(buf[0]));
info("Resume upload of %s of %s from offset %lu", af->uri, size_human, af->uploaded);
curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, af->uploaded);
}
else {
char *size_human = advio_human_size(end, buf[0], sizeof(buf[0]));
info("Start upload of %s of %s", af->uri, size_human);
curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, 0);
}
curl_easy_setopt(af->curl, CURLOPT_UPLOAD, 1L); curl_easy_setopt(af->curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(af->curl, CURLOPT_READDATA, af->file); curl_easy_setopt(af->curl, CURLOPT_INFILESIZE, end - af->uploaded);
curl_easy_setopt(af->curl, CURLOPT_NOPROGRESS, !isatty(fileno(stderr)));
pos = ftell(af->file); /* Remember old stream pointer */
fseek(af->file, 0, SEEK_SET);
res = curl_easy_perform(af->curl); res = curl_easy_perform(af->curl);
fprintf(stderr, "\e[2K");
fflush(stderr); /* do not continue in the same line as the progress bar */
fseek(af->file, pos, SEEK_SET); /* Restore old stream pointer */ fseek(af->file, pos, SEEK_SET); /* Restore old stream pointer */
if (res != CURLE_OK) if (res != CURLE_OK)
return -1; return -1;
sha1sum(af->file, af->hash); sha1sum(af->file, af->hash);
curl_easy_getinfo(af->curl, CURLINFO_SIZE_UPLOAD, &total_bytes);
curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &total_time);
char *total_bytes_human = advio_human_size(total_bytes, buf[0], sizeof(buf[0]));
char *total_time_human = advio_human_time(total_time, buf[1], sizeof(buf[1]));
info("Finished uploaded of %s in %s", total_bytes_human, total_time_human);
af->uploaded += total_bytes;
return 0; return 0;
} }
int adownload(AFILE *af) int adownload(AFILE *af, int resume)
{ {
CURLcode res; CURLcode res;
long code; long code, pos;
int ret; int ret;
double total_bytes = 0, total_time = 0;
char buf[2][32];
pos = aftell(af);
fseek(af->file, 0, SEEK_SET); if (resume) {
info("Resume download of %s from offset %lu", af->uri, af->downloaded);
curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, af->downloaded);
}
else {
info("Start download of %s", af->uri);
rewind(af->file);
curl_easy_setopt(af->curl, CURLOPT_RESUME_FROM, 0);
}
curl_easy_setopt(af->curl, CURLOPT_UPLOAD, 0L);
curl_easy_setopt(af->curl, CURLOPT_NOPROGRESS, !isatty(fileno(stderr)));
res = curl_easy_perform(af->curl); res = curl_easy_perform(af->curl);
fprintf(stderr, "\e[2K");
fflush(stderr); /* do not continue in the same line as the progress bar */
switch (res) { switch (res) {
case CURLE_OK: case CURLE_OK:
curl_easy_getinfo(af->curl, CURLINFO_RESPONSE_CODE, &code); curl_easy_getinfo(af->curl, CURLINFO_SIZE_DOWNLOAD, &total_bytes);
curl_easy_getinfo(af->curl, CURLINFO_TOTAL_TIME, &total_time);
char *total_bytes_human = advio_human_size(total_bytes, buf[0], sizeof(buf[0]));
char *total_time_human = advio_human_time(total_time, buf[1], sizeof(buf[1]));
info("Finished download of %s in %s", total_bytes_human, total_time_human);
af->downloaded += total_bytes;
af->uploaded = af->downloaded;
res = curl_easy_getinfo(af->curl, CURLINFO_RESPONSE_CODE, &code);
if (res)
return -1;
switch (code) { switch (code) {
case 0: case 0:
case 200: goto exist; case 200: goto exist;
@ -234,7 +408,7 @@ int adownload(AFILE *af)
return -1; return -1;
default: default:
error("Failed to fetch file: %s: %s\n", af->uri, curl_easy_strerror(res)); error("ADVIO: Failed to fetch file: %s: %s", af->uri, curl_easy_strerror(res));
return -1; return -1;
} }
@ -255,11 +429,13 @@ notexist: /* File does not exist */
return ret; return ret;
exist: /* File exists */ exist: /* File exists */
if (af->mode[0] == 'a') if (resume)
fseek(af->file, 0, SEEK_END); afseek(af, pos, SEEK_SET);
else if (af->mode[0] == 'a')
afseek(af, 0, SEEK_END);
else if (af->mode[0] == 'r' || af->mode[0] == 'w') else if (af->mode[0] == 'r' || af->mode[0] == 'w')
fseek(af->file, 0, SEEK_SET); afseek(af, 0, SEEK_SET);
sha1sum(af->file, af->hash); sha1sum(af->file, af->hash);
return 0; return 0;

View file

@ -65,6 +65,7 @@ static const char *facilities_strs[] = {
"xil", /* LOG_XIL */ "xil", /* LOG_XIL */
"tc", /* LOG_TC */ "tc", /* LOG_TC */
"if", /* LOG_IF */ "if", /* LOG_IF */
"advio", /* LOG_ADVIO */
/* Node-types */ /* Node-types */
"socket", /* LOG_SOCKET */ "socket", /* LOG_SOCKET */
@ -232,7 +233,7 @@ void log_vprint(struct log *l, const char *lvl, const char *fmt, va_list ap)
#ifdef ENABLE_OPAL_ASYNC #ifdef ENABLE_OPAL_ASYNC
OpalPrint("VILLASnode: %s\n", buf); OpalPrint("VILLASnode: %s\n", buf);
#endif #endif
fprintf(l->file ? l->file : stderr, "%s\n", buf); fprintf(l->file ? l->file : stderr, "\33[2K\r%s\n", buf);
free(buf); free(buf);
} }

View file

@ -85,21 +85,21 @@ static struct timespec file_calc_read_offset(const struct timespec *first, const
/* Set read_offset depending on epoch_mode */ /* Set read_offset depending on epoch_mode */
switch (mode) { switch (mode) {
case EPOCH_DIRECT: /* read first value at now + epoch */ case FILE_EPOCH_DIRECT: /* read first value at now + epoch */
offset = time_diff(first, &now); offset = time_diff(first, &now);
offset = time_add(&offset, epoch); offset = time_add(&offset, epoch);
break; break;
case EPOCH_WAIT: /* read first value at now + first + epoch */ case FILE_EPOCH_WAIT: /* read first value at now + first + epoch */
offset = now; offset = now;
return time_add(&now, epoch); return time_add(&now, epoch);
break; break;
case EPOCH_RELATIVE: /* read first value at first + epoch */ case FILE_EPOCH_RELATIVE: /* read first value at first + epoch */
return *epoch; return *epoch;
break; break;
case EPOCH_ABSOLUTE: /* read first value at f->read_epoch */ case FILE_EPOCH_ABSOLUTE: /* read first value at f->read_epoch */
return time_diff(first, epoch); return time_diff(first, epoch);
break; break;
@ -123,12 +123,24 @@ int file_parse(struct node *n, config_setting_t *cfg)
cfg_in = config_setting_get_member(cfg, "in"); cfg_in = config_setting_get_member(cfg, "in");
if (cfg_in) { if (cfg_in) {
const char *eof;
if (file_parse_direction(cfg_in, f, FILE_READ)) if (file_parse_direction(cfg_in, f, FILE_READ))
cerror(cfg_in, "Failed to parse input file for node %s", node_name(n)); cerror(cfg_in, "Failed to parse input file for node %s", node_name(n));
/* More read specific settings */ /* More read specific settings */
if (!config_setting_lookup_bool(cfg_in, "rewind", &f->read_rewind)) if (config_setting_lookup_string(cfg_in, "eof", &eof)) {
f->read_rewind = 1; if (!strcmp(eof, "exit"))
f->read_eof = FILE_EOF_EXIT;
else if (!strcmp(eof, "rewind"))
f->read_eof = FILE_EOF_REWIND;
else if (!strcmp(eof, "wait"))
f->read_eof = FILE_EOF_WAIT;
else
cerror(cfg_in, "Invalid mode '%s' for 'eof' setting", eof);
}
else
f->read_eof = FILE_EOF_EXIT;
if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate)) if (!config_setting_lookup_float(cfg_in, "rate", &f->read_rate))
f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */ f->read_rate = 0; /* Disable fixed rate sending. Using timestamps of file instead */
@ -141,20 +153,20 @@ int file_parse(struct node *n, config_setting_t *cfg)
const char *epoch_mode; const char *epoch_mode;
if (config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode)) { if (config_setting_lookup_string(cfg_in, "epoch_mode", &epoch_mode)) {
if (!strcmp(epoch_mode, "direct")) if (!strcmp(epoch_mode, "direct"))
f->read_epoch_mode = EPOCH_DIRECT; f->read_epoch_mode = FILE_EPOCH_DIRECT;
else if (!strcmp(epoch_mode, "wait")) else if (!strcmp(epoch_mode, "wait"))
f->read_epoch_mode = EPOCH_WAIT; f->read_epoch_mode = FILE_EPOCH_WAIT;
else if (!strcmp(epoch_mode, "relative")) else if (!strcmp(epoch_mode, "relative"))
f->read_epoch_mode = EPOCH_RELATIVE; f->read_epoch_mode = FILE_EPOCH_RELATIVE;
else if (!strcmp(epoch_mode, "absolute")) else if (!strcmp(epoch_mode, "absolute"))
f->read_epoch_mode = EPOCH_ABSOLUTE; f->read_epoch_mode = FILE_EPOCH_ABSOLUTE;
else if (!strcmp(epoch_mode, "original")) else if (!strcmp(epoch_mode, "original"))
f->read_epoch_mode = EPOCH_ORIGINAL; f->read_epoch_mode = FILE_EPOCH_ORIGINAL;
else else
cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode); cerror(cfg_in, "Invalid value '%s' for setting 'epoch_mode'", epoch_mode);
} }
else else
f->read_epoch_mode = EPOCH_DIRECT; f->read_epoch_mode = FILE_EPOCH_DIRECT;
} }
n->_vd = f; n->_vd = f;
@ -170,37 +182,44 @@ char * file_print(struct node *n)
if (f->read.fmt) { if (f->read.fmt) {
const char *epoch_str = NULL; const char *epoch_str = NULL;
switch (f->read_epoch_mode) { switch (f->read_epoch_mode) {
case EPOCH_DIRECT: epoch_str = "direct"; break; case FILE_EPOCH_DIRECT: epoch_str = "direct"; break;
case EPOCH_WAIT: epoch_str = "wait"; break; case FILE_EPOCH_WAIT: epoch_str = "wait"; break;
case EPOCH_RELATIVE: epoch_str = "relative"; break; case FILE_EPOCH_RELATIVE: epoch_str = "relative"; break;
case EPOCH_ABSOLUTE: epoch_str = "absolute"; break; case FILE_EPOCH_ABSOLUTE: epoch_str = "absolute"; break;
case EPOCH_ORIGINAL: epoch_str = "original"; break; case FILE_EPOCH_ORIGINAL: epoch_str = "original"; break;
}
const char *eof_str = NULL;
switch (f->read_eof) {
case FILE_EOF_EXIT: eof_str = "exit"; break;
case FILE_EOF_WAIT: eof_str = "wait"; break;
case FILE_EOF_REWIND: eof_str = "rewind"; break;
} }
strcatf(&buf, "in=%s, mode=%s, rewind=%u, epoch_mode=%s, epoch=%.2f", strcatf(&buf, "in=%s, mode=%s, eof=%s, epoch_mode=%s, epoch=%.2f",
f->read.uri ? f->read.uri : f->read.fmt, f->read.uri ? f->read.uri : f->read.fmt,
f->read.mode, f->read.mode,
f->read_rewind, eof_str,
epoch_str, epoch_str,
time_to_double(&f->read_epoch) time_to_double(&f->read_epoch)
); );
if (f->read_rate) if (f->read_rate)
strcatf(&buf, "rate=%.1f, ", f->read_rate); strcatf(&buf, ", rate=%.1f", f->read_rate);
} }
if (f->write.fmt) { if (f->write.fmt) {
strcatf(&buf, "out=%s, mode=%s, ", strcatf(&buf, ", out=%s, mode=%s",
f->write.uri ? f->write.uri : f->write.fmt, f->write.uri ? f->write.uri : f->write.fmt,
f->write.mode f->write.mode
); );
} }
if (f->read_first.tv_sec || f->read_first.tv_nsec) if (f->read_first.tv_sec || f->read_first.tv_nsec)
strcatf(&buf, "first=%.2f, ", time_to_double(&f->read_first)); strcatf(&buf, ", first=%.2f", time_to_double(&f->read_first));
if (f->read_offset.tv_sec || f->read_offset.tv_nsec) if (f->read_offset.tv_sec || f->read_offset.tv_nsec)
strcatf(&buf, "offset=%.2f, ", time_to_double(&f->read_offset)); strcatf(&buf, ", offset=%.2f", time_to_double(&f->read_offset));
if ((f->read_first.tv_sec || f->read_first.tv_nsec) && if ((f->read_first.tv_sec || f->read_first.tv_nsec) &&
(f->read_offset.tv_sec || f->read_offset.tv_nsec)) { (f->read_offset.tv_sec || f->read_offset.tv_nsec)) {
@ -210,12 +229,9 @@ char * file_print(struct node *n)
eta = time_diff(&now, &eta); eta = time_diff(&now, &eta);
if (eta.tv_sec || eta.tv_nsec) if (eta.tv_sec || eta.tv_nsec)
strcatf(&buf, "eta=%.2f sec, ", time_to_double(&eta)); strcatf(&buf, ", eta=%.2f sec", time_to_double(&eta));
} }
if (strlen(buf) > 2)
buf[strlen(buf) - 2] = 0;
return buf; return buf;
} }
@ -246,13 +262,16 @@ int file_start(struct node *n)
struct sample s; struct sample s;
arewind(f->read.handle); arewind(f->read.handle);
ret = sample_io_villas_fscan(f->read.handle->file, &s, NULL);
if (ret < 0) if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) {
error("Failed to read first timestamp of node %s", node_name(n)); ret = sample_io_villas_fscan(f->read.handle->file, &s, NULL);
if (ret < 0)
error("Failed to read first timestamp of node %s", node_name(n));
f->read_first = s.ts.origin; f->read_first = s.ts.origin;
f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode);
arewind(f->read.handle); arewind(f->read.handle);
}
} }
if (f->write.fmt) { if (f->write.fmt) {
@ -298,17 +317,23 @@ int file_read(struct node *n, struct sample *smps[], unsigned cnt)
retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */ retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get message and timestamp */
if (values < 0) { if (values < 0) {
if (afeof(f->read.handle)) { if (afeof(f->read.handle)) {
if (f->read_rewind) { switch (f->read_eof) {
info("Rewind input file of node %s", node_name(n)); case FILE_EOF_REWIND:
info("Rewind input file of node %s", node_name(n));
f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode); f->read_offset = file_calc_read_offset(&f->read_first, &f->read_epoch, f->read_epoch_mode);
arewind(f->read.handle); arewind(f->read.handle);
goto retry;
goto retry;
} case FILE_EOF_WAIT:
else { usleep(10000); /* We wait 10ms before fetching again. */
info("Reached end-of-file"); adownload(f->read.handle, 1);
exit(EXIT_SUCCESS); goto retry;
case FILE_EOF_EXIT:
info("Reached end-of-file");
exit(EXIT_SUCCESS);
} }
} }
else else
@ -317,10 +342,7 @@ retry: values = sample_io_villas_fscan(f->read.handle->file, s, &flags); /* Get
return 0; return 0;
} }
if (f->read_epoch_mode == EPOCH_ORIGINAL) { if (f->read_epoch_mode != FILE_EPOCH_ORIGINAL) {
return 1;
}
else {
if (!f->read_rate || aftell(f->read.handle) == 0) { if (!f->read_rate || aftell(f->read.handle) == 0) {
s->ts.origin = time_add(&s->ts.origin, &f->read_offset); s->ts.origin = time_add(&s->ts.origin, &f->read_offset);

View file

@ -0,0 +1,62 @@
#!/bin/bash
#
# Integration loopback test for villas-pipe.
#
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
# @license GNU General Public License (version 3)
#
# VILLASnode
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
##################################################################################
CONFIG_FILE=$(mktemp)
INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp)
NODE_FILE=$(mktemp)
cat > ${CONFIG_FILE} << EOF
nodes = {
node1 = {
type = "file";
in = {
uri = "${NODE_FILE}",
mode = "w+",
epoch_mode = "original",
eof = "wait"
},
out = {
uri = "${NODE_FILE}"
mode = "w+"
}
}
}
EOF
# Generate test data
villas-signal random -l 10 -n > ${INPUT_FILE}
# We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received
villas-pipe ${CONFIG_FILE} node1 > ${OUTPUT_FILE} < <(cat ${INPUT_FILE}; sleep 0.5; echo -n)
# Comapre data
villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE}
RC=$?
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} ${NODE_FILE}
exit $RC

View file

@ -26,7 +26,7 @@ CONFIG_FILE=$(mktemp)
INPUT_FILE=$(mktemp) INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp) OUTPUT_FILE=$(mktemp)
cat > ${CONFIG_FILE} <<- EOF cat > ${CONFIG_FILE} << EOF
nodes = { nodes = {
node1 = { node1 = {
type = "nanomsg"; type = "nanomsg";

View file

@ -26,7 +26,7 @@ CONFIG_FILE=$(mktemp)
INPUT_FILE=$(mktemp) INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp) OUTPUT_FILE=$(mktemp)
cat > ${CONFIG_FILE} <<- EOF cat > ${CONFIG_FILE} << EOF
nodes = { nodes = {
node1 = { node1 = {
type = "zeromq"; type = "zeromq";

View file

@ -27,6 +27,8 @@
#include <villas/utils.h> #include <villas/utils.h>
#include <villas/advio.h> #include <villas/advio.h>
#include <villas/sample.h>
#include <villas/sample_io.h>
/** This URI points to a Sciebo share which contains some test files. /** This URI points to a Sciebo share which contains some test files.
* The Sciebo share is read/write accessible via WebDAV. */ * The Sciebo share is read/write accessible via WebDAV. */
@ -68,6 +70,83 @@ Test(advio, download)
cr_assert_eq(ret, 0, "Failed to close file"); cr_assert_eq(ret, 0, "Failed to close file");
} }
Test(advio, download_large)
{
AFILE *af;
int ret, len = 16;
struct sample *smp = alloc(SAMPLE_LEN(len));
smp->capacity = len;
af = afopen(BASE_URI "/download-large" , "r");
cr_assert(af, "Failed to download file");
ret = sample_io_villas_fscan(af->file, smp, NULL);
cr_assert_eq(ret, 0);
cr_assert_eq(smp->sequence, 0);
cr_assert_eq(smp->length, 4);
cr_assert_eq(smp->ts.origin.tv_sec, 1497710378);
cr_assert_eq(smp->ts.origin.tv_nsec, 863332240);
float data[] = { 0.022245, 0.000000, -1.000000, 1.000000 };
for (int i = 0; i < smp->length; i++)
cr_assert_float_eq(smp->data[i].f, data[i], 1e-5);
ret = afclose(af);
cr_assert_eq(ret, 0, "Failed to close file");
}
Test(advio, resume)
{
int ret;
AFILE *af1, *af2;
char *fn, dir[] = "/tmp/temp.XXXXXX";
char line1[32];
char *line2 = NULL;
size_t linelen = 0;
mkdtemp(dir);
ret = asprintf(&fn, "%s/file", dir);
cr_assert_gt(ret, 0);
af1 = afopen(fn, "w+");
cr_assert_not_null(af1);
/* We flush once the empty file in order to upload an empty file. */
aupload(af1, 0);
af2 = afopen(fn, "r");
cr_assert_not_null(af2);
for (int i = 0; i < 100; i++) {
snprintf(line1, sizeof(line1), "This is line %d\n", i);
afputs(line1, af1);
aupload(af1, 1);
adownload(af2, 1);
agetline(&line2, &linelen, af2);
cr_assert_str_eq(line1, line2);
}
ret = afclose(af1);
cr_assert_eq(ret, 0);
ret = afclose(af2);
cr_assert_eq(ret, 0);
ret = unlink(fn);
cr_assert_eq(ret, 0);
ret = rmdir(dir);
cr_assert_eq(ret, 0);
free(line2);
}
Test(advio, upload) Test(advio, upload)
{ {
AFILE *af; AFILE *af;