From 8fd9081c9a40fb0f5b50ed9b96dd9bdadd3cdbd9 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 26 May 2011 23:33:47 +0200 Subject: [PATCH] almost finished & ready for #ec11 --- misc/controller/vzlogger/Makefile | 13 +- misc/controller/vzlogger/src/api.c | 44 ++- misc/controller/vzlogger/src/api.h | 57 +-- misc/controller/vzlogger/src/local.c | 98 +++++ misc/controller/vzlogger/src/local.h | 44 +++ misc/controller/vzlogger/src/main.c | 349 ++++++++++++------ misc/controller/vzlogger/src/main.h | 79 +++- misc/controller/vzlogger/src/protocol.h | 23 ++ misc/controller/vzlogger/src/protocols/obis.c | 24 +- misc/controller/vzlogger/src/protocols/obis.h | 7 +- misc/controller/vzlogger/src/queue.c | 60 +++ misc/controller/vzlogger/src/queue.h | 60 +++ misc/controller/vzlogger/vzlogger.conf | 4 +- 13 files changed, 664 insertions(+), 198 deletions(-) create mode 100644 misc/controller/vzlogger/src/local.c create mode 100644 misc/controller/vzlogger/src/local.h create mode 100644 misc/controller/vzlogger/src/protocol.h create mode 100644 misc/controller/vzlogger/src/queue.c create mode 100644 misc/controller/vzlogger/src/queue.h diff --git a/misc/controller/vzlogger/Makefile b/misc/controller/vzlogger/Makefile index b0ae209..c1c5edf 100644 --- a/misc/controller/vzlogger/Makefile +++ b/misc/controller/vzlogger/Makefile @@ -1,5 +1,5 @@ CC=cc -CFLAGS=-c -Wall -g -D_REENTRANT -std=c99 +CFLAGS=-c -Wall -g -D_REENTRANT -std=gnu99 LDFLAGS= TARGET=vzlogger @@ -8,8 +8,8 @@ all: $(TARGET) clean: rm -rf *.o -vzlogger: main.c obis.c api.c - $(CC) $(LDFLAGS) main.o obis.o api.o `curl-config --libs` -ljson -lpthread -o $(TARGET) +vzlogger: main.c api.c local.c queue.c obis.c + $(CC) $(LDFLAGS) main.o api.o local.o queue.o obis.o `curl-config --libs` -ljson -lpthread -o $(TARGET) -lmicrohttpd -lm main.c: $(CC) $(CFLAGS) src/main.c -o main.o @@ -17,5 +17,12 @@ main.c: api.c: $(CC) $(CFLAGS) src/api.c -o api.o `curl-config --cflags` +local.c: + $(CC) $(CFLAGS) src/local.c -o local.o + + +queue.c: + $(CC) $(CFLAGS) src/queue.c -o queue.o + obis.c: $(CC) $(CFLAGS) src/protocols/obis.c -o obis.o diff --git a/misc/controller/vzlogger/src/api.c b/misc/controller/vzlogger/src/api.c index 7a813a3..9472e1c 100644 --- a/misc/controller/vzlogger/src/api.c +++ b/misc/controller/vzlogger/src/api.c @@ -23,14 +23,15 @@ * along with volkszaehler.org. If not, see . */ +#include #include #include #include -#include "main.h" #include "api.h" +#include "main.h" -extern struct options opts; +extern options_t opts; /** * Reformat CURLs debugging output @@ -39,22 +40,22 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_ switch (type) { case CURLINFO_TEXT: case CURLINFO_END: - printf("%.*s", (int) size, data); + print(4, "%.*s", NULL, (int) size, data); break; case CURLINFO_HEADER_IN: case CURLINFO_HEADER_OUT: - //printf("header: %.*s", size, data); + //print(4, "Header: %.*s", NULL, size, data); break; case CURLINFO_SSL_DATA_IN: case CURLINFO_DATA_IN: - printf("Received %lu bytes\n", (unsigned long) size); + print(4, "Received %lu bytes\n", NULL, (unsigned long) size); break; case CURLINFO_SSL_DATA_OUT: case CURLINFO_DATA_OUT: - printf("Sent %lu bytes.. ", (unsigned long) size); + print(4, "Sent %lu bytes.. ", NULL, (unsigned long) size); break; } @@ -63,11 +64,11 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_ size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; - struct curl_response *response = (struct curl_response *) data; + curl_response_t *response = (curl_response_t *) data; response->data = realloc(response->data, response->size + realsize + 1); if (response->data == NULL) { /* out of memory! */ - fprintf(stderr, "Not enough memory (realloc returned NULL)\n"); + print(-1, "Not enough memory (realloc returned NULL)\n", NULL); exit(EXIT_FAILURE); } @@ -81,20 +82,18 @@ size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *da /** * Log against the vz.org middleware with simple HTTP requests via CURL */ -CURLcode api_log(char * middleware, char * uuid, struct reading read) { +CURLcode api_log(channel_t *ch, reading_t rd) { CURL *curl; CURLcode rc = -1; int curl_code; - struct curl_response chunk = {NULL, 0}; + curl_response_t chunk = {NULL, 0}; char url[255], useragent[255], post[255]; /* build request strings */ - sprintf(url, "%s/data/%s.json", middleware, uuid); /* build url */ + sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */ sprintf(useragent, "vzlogger/%s (%s)", VZ_VERSION, curl_version()); - sprintf(post, "?timestamp=%lu%lu&value=%f", read.tv.tv_sec, read.tv.tv_usec, read.value); - - curl_global_init(CURL_GLOBAL_ALL); + sprintf(post, "?timestamp=%lu%lu&value=%f", rd.tv.tv_sec, rd.tv.tv_usec / 1000, rd.value); curl = curl_easy_init(); @@ -107,19 +106,19 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) { curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback); curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk); - if (opts.verbose) printf("Sending request: %s%s\n", url, post); + print(1, "Sending request: %s%s", ch, url, post); rc = curl_easy_perform(curl); curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code); - if (opts.verbose) printf("Request %s with code: %i\n", (curl_code == 200) ? "succeded" : "failed", curl_code); + print((curl_code == 200) ? 1 : -1, "Request %s with code: %i", ch, (curl_code == 200) ? "succeded" : "failed", curl_code); if (rc != CURLE_OK) { - fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(rc)); + print(-1, "CURL error: %s", ch, curl_easy_strerror(rc)); } else if (chunk.size == 0 || chunk.data == NULL) { - fprintf(stderr, "No data received!\n"); + print(-1, "No data received!", ch); rc = -1; } else if (curl_code != 200) { /* parse exception */ @@ -133,18 +132,17 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) { json_obj = json_object_object_get(json_obj, "exception"); if (json_obj) { - fprintf(stderr, "%s [%i]: %s\n", + print(-1, "%s : %s", ch, json_object_get_string(json_object_object_get(json_obj, "type")), - json_object_get_int(json_object_object_get(json_obj, "code")), json_object_get_string(json_object_object_get(json_obj, "message")) ); } else { - fprintf(stderr, "Malformed middleware response: missing exception\n"); + print(-1, "Malformed middleware response: missing exception", ch); } } else { - fprintf(stderr, "Malformed middleware response: %s\n", json_tokener_errors[json_tok->err]); + print(-1, "Malformed middleware response: %s", ch, json_tokener_errors[json_tok->err]); } rc = -1; @@ -154,7 +152,7 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) { free(chunk.data); /* free response */ } else { - fprintf(stderr, "Failed to create CURL handle\n"); + print(-1, "Failed to create CURL handle", ch); } return rc; diff --git a/misc/controller/vzlogger/src/api.h b/misc/controller/vzlogger/src/api.h index 7c180a3..1fe251f 100644 --- a/misc/controller/vzlogger/src/api.h +++ b/misc/controller/vzlogger/src/api.h @@ -26,64 +26,19 @@ #ifndef _API_H_ #define _API_H_ +#include #include -#include -#include -#define BUFFER_LENGTH 64 +#include "main.h" +#include "protocol.h" -typedef struct reading (*rfp)(); -typedef void (*ifp)(char *options); - -struct curl_response { +typedef struct { char *data; size_t size; -}; - -struct reading { - float value; - struct timeval tv; -}; - -struct protocol { - char * name; /* short identifier for protocol */ - char * desc; /* more detailed description */ - rfp read_func; /* function pointer to read data */ - ifp init_func; /* function to init a channel */ -}; - -/** - * Datatype for every channel - */ -struct channel { - char * uuid; - char * middleware; - int interval; - char * options; - struct protocol *prot; - struct reading buffer[BUFFER_LENGTH]; /* ring buffer */ -}; - -/** - * Options from command line - */ -struct options { - char * config; /* path to config file */ - unsigned int interval; /* seconds */ - - /* boolean bitfields, at the end of struct */ - unsigned int verbose:1; - unsigned int daemon:1; -// unsigned local:1; /* enable local interface */ -}; - -/* Prototypes */ -void usage(char ** argv); -struct options parse_options(int argc, char * argv[]); -//struct channels parse_channels(char * filename); +} curl_response_t; int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom); size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data); -CURLcode api_log(char * middleware, char * uuid, struct reading read); +CURLcode api_log(channel_t *ch, reading_t read); #endif /* _API_H_ */ diff --git a/misc/controller/vzlogger/src/local.c b/misc/controller/vzlogger/src/local.c new file mode 100644 index 0000000..9b1b7e3 --- /dev/null +++ b/misc/controller/vzlogger/src/local.c @@ -0,0 +1,98 @@ +/** + * Implementation of local interface via libmicrohttpd + * + * @package vzlogger + * @copyright Copyright (c) 2011, The volkszaehler.org project + * @license http://www.gnu.org/licenses/gpl.txt GNU Public License + * @author Steffen Vogel + */ +/* + * This file is part of volkzaehler.org + * + * volkzaehler.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * volkzaehler.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with volkszaehler.org. If not, see . + */ + +#include +#include + +#include "main.h" +#include "local.h" + +extern channel_t *chans; +extern options_t opts; + +int handle_request(void *cls, struct MHD_Connection *connection, const char *url, const char *method, + const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) { + const char * json_str; + int ret; + int num_chans = *(int *) cls; + + print(2, "Local request reveived: %s %s %s", NULL, version, method, url); + + struct MHD_Response *response; + + struct json_object * json_obj = json_object_new_object(); + struct json_object * json_data = json_object_new_array(); + + for (int i = 0; i < num_chans; i++) { + channel_t *ch = &chans[i]; + reading_t rd; + + struct json_object * json_channel = json_object_new_object(); + struct json_object * json_tuples = json_object_new_array(); + + int j = ch->queue.write_p; + do { + pthread_mutex_lock(&ch->mutex); + rd = ch->queue.buf[j]; + pthread_mutex_unlock(&ch->mutex); + + if (rd.value != 0) { + struct json_object * json_tuple = json_object_new_array(); + + int timestamp = rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000; + + json_object_array_add(json_tuple, json_object_new_int(timestamp)); + json_object_array_add(json_tuple, json_object_new_double(rd.value)); + json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */ + + json_object_array_add(json_tuples, json_tuple); + } + + j++; + j %= ch->queue.size; + } while (j != ch->queue.read_p); + + json_object_object_add(json_channel, "uuid", json_object_new_string(ch->uuid)); + json_object_object_add(json_channel, "interval", json_object_new_int(ch->interval)); + json_object_object_add(json_channel, "tuples", json_tuples); + + + json_object_array_add(json_data, json_channel); + } + + json_object_object_add(json_obj, "version", json_object_new_string(VZ_VERSION)); + json_object_object_add(json_obj, "data", json_data); + json_str = json_object_to_json_string(json_obj); + + response = MHD_create_response_from_data(strlen(json_str), (void *) json_str, FALSE, TRUE); + + MHD_add_response_header(response, "Content-type", "application/json"); + + ret = MHD_queue_response(connection, MHD_HTTP_OK, response); + + MHD_destroy_response (response); + + return ret; +} diff --git a/misc/controller/vzlogger/src/local.h b/misc/controller/vzlogger/src/local.h new file mode 100644 index 0000000..49a8af7 --- /dev/null +++ b/misc/controller/vzlogger/src/local.h @@ -0,0 +1,44 @@ +/** + * Header file for local interface + * + * @package vzlogger + * @copyright Copyright (c) 2011, The volkszaehler.org project + * @license http://www.gnu.org/licenses/gpl.txt GNU Public License + * @author Steffen Vogel + */ +/* + * This file is part of volkzaehler.org + * + * volkzaehler.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * volkzaehler.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with volkszaehler.org. If not, see . + */ + +#ifndef _LOCAL_H_ +#define _LOCAL_H_ + +#include + +int handle_request( + void *cls, + struct MHD_Connection *connection, + const char *url, + const char *method, + const char *version, + const char *upload_data, + size_t *upload_data_size, + void **con_cls +); + +#endif /* _LOCAL_H_ */ + + diff --git a/misc/controller/vzlogger/src/main.c b/misc/controller/vzlogger/src/main.c index 4c94bd8..3aab1e3 100644 --- a/misc/controller/vzlogger/src/main.c +++ b/misc/controller/vzlogger/src/main.c @@ -24,24 +24,29 @@ */ #define VZ_VERSION "0.2" - -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include "main.h" +#include "queue.h" #include "api.h" +#include "local.h" #include "protocols/obis.h" - -static struct protocol protocols[] = { - {"obis", "Plaintext OBIS", obis_get, obis_init}, + +static protocol_t protocols[] = { + {"obis", "Plaintext OBIS", obis_get, obis_init, obis_close}, +// {"fluksousb", "FluksoUSB board", flukso_get, flukso_init, flukso_close}, +// {"onewire", "Dallas 1-Wire sensors",onewire_get, onewire_init, onewire_close}, {NULL} /* stop condition for iterator */ }; @@ -49,10 +54,10 @@ static struct option long_options[] = { {"config", required_argument, 0, 'c'}, {"daemon", required_argument, 0, 'd'}, {"interval", required_argument, 0, 'i'}, -// {"local", no_argument, 0, 'l'}, -// {"local-port", required_argument, 0, 'p'}, + {"local", no_argument, 0, 'l'}, + {"local-port", required_argument, 0, 'p'}, {"help", no_argument, 0, 'h'}, - {"verbose", no_argument, 0, 'v'}, + {"verbose", optional_argument, 0, 'v'}, {NULL} /* stop condition for iterator */ }; @@ -60,15 +65,16 @@ static char * long_options_descs[] = { "config file with channel -> uuid mapping", "run as daemon", "interval in seconds to read meters", -// "activate local interface (tiny webserver)", -// "TCP port for local interface" + "activate local interface (tiny webserver)", + "TCP port for local interface" "show this help", "enable verbose output", NULL /* stop condition for iterator */ }; /* globals */ -struct options opts; +options_t opts; +channel_t * chans; /* mem gets allocated in parse_channels(), and freed in main() */ /** * Print availble options and some other usefull information @@ -76,7 +82,7 @@ struct options opts; void usage(char * argv[]) { char ** desc = long_options_descs; struct option * op = long_options; - struct protocol * prot = protocols; + protocol_t * prot = protocols; printf("Usage: %s [options]\n\n", argv[0]); printf(" following options are available:\n"); @@ -95,28 +101,63 @@ void usage(char * argv[]) { prot++; } - printf("\nvzlogger - volkszaehler.org logging utility VERSION\n"); + printf("\nvzlogger - volkszaehler.org logging utility %s\n", VZ_VERSION); printf("by Steffen Vogel \n"); } +/** + * Wrapper to log notices and errors + */ +void print(int level, char * format, channel_t *ch, ... ) { + static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + va_list args; + + struct timeval now; + struct tm * timeinfo; + char buffer[16]; + + if (level <= (signed int) opts.verbose) { + gettimeofday(&now, NULL); + timeinfo = localtime(&now.tv_sec); + + strftime(buffer, 16, "%b %d %H:%M:%S", timeinfo); + + pthread_mutex_lock(&mutex); + fprintf((level > 0) ? stdout : stderr, "[%s.%3lu]", buffer, now.tv_usec / 1000); + + if (ch != NULL) { + fprintf((level > 0) ? stdout : stderr, "[ch#%i]\t", ch->id); + } + else { + fprintf((level > 0) ? stdout : stderr, "\t\t"); + } + + va_start(args, ch); + vfprintf((level > 0) ? stdout : stderr, format, args); + va_end(args); + fprintf((level > 0) ? stdout : stderr, "\n"); + pthread_mutex_unlock(&mutex); + } +} + /** * Parse options from command line */ -struct options parse_options(int argc, char * argv[]) { - struct options opts; +options_t parse_options(int argc, char * argv[]) { + options_t opts; /* setting default options */ - opts.interval = 300; /* seconds */ - opts.verbose = FALSE; opts.daemon = FALSE; - //opts.local = FALSE; - opts.config = NULL; + opts.local = FALSE; + opts.verbose = 0; + opts.port = 8080; + opts.config = "vzlogger.conf"; while (TRUE) { /* getopt_long stores the option index here. */ int option_index = 0; - int c = getopt_long(argc, argv, "i:c:hdv", long_options, &option_index); + int c = getopt_long(argc, argv, "i:c:p:lhdv:", long_options, &option_index); /* detect the end of the options. */ if (c == -1) @@ -124,11 +165,19 @@ struct options parse_options(int argc, char * argv[]) { switch (c) { case 'v': - opts.verbose = 1; + opts.verbose = (optarg == NULL) ? 1 : atoi(optarg); break; - + + case 'l': + opts.local = TRUE; + break; + case 'd': - opts.daemon = 1; + opts.daemon = TRUE; + break; + + case 'p': /* port for local interface */ + opts.port = atoi(optarg); break; case 'c': /* read config file */ @@ -146,79 +195,145 @@ struct options parse_options(int argc, char * argv[]) { return opts; } -struct channel parse_channel(char * line) { - struct channel ch; - struct protocol * prot; - char *tok = strtok(line, ";"); - - for (int i = 0; i < 7 && tok != NULL; i++) { - switch(i) { - case 0: /* middleware */ - ch.middleware = (char *) malloc(strlen(tok)+1); - strcpy(ch.middleware, tok); - break; - - case 1: /* uuid */ - ch.uuid = (char *) malloc(strlen(tok)+1); - strcpy(ch.uuid, tok); - break; - - case 2: /* protocol */ - prot = protocols; /* reset pointer */ - while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */ - ch.prot = prot; - break; - - case 3: /* interval */ - ch.interval = atoi(tok); - break; - - case 4: /* options */ - ch.options = (char *) malloc(strlen(tok)+1); - strcpy(ch.options, tok); - break; - } - - tok = strtok(NULL, ";"); +channel_t * parse_channels(char * filename, int * num_chans) { + FILE *file = fopen(filename, "r"); /* open configuration */ + + if (file == NULL) { + perror(filename); /* why didn't the file open? */ + exit(EXIT_FAILURE); } - if (opts.verbose) printf("Channel parsed: %s\n", line); + char line[256]; + int j = 0; - return ch; + channel_t *chans = malloc(sizeof(channel_t) * MAX_CHANNELS); + + while (fgets(line, sizeof line, file) != NULL) { /* read a line */ + if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */ + + channel_t ch; + protocol_t *prot; + char *tok = strtok(line, ";"); + + for (int i = 0; i < 7 && tok != NULL; i++) { + switch(i) { + case 0: /* middleware */ + ch.middleware = (char *) malloc(strlen(tok)+1); + strcpy(ch.middleware, tok); + break; + + case 1: /* uuid */ + ch.uuid = (char *) malloc(strlen(tok)+1); + strcpy(ch.uuid, tok); + break; + + case 2: /* protocol */ + prot = protocols; /* reset pointer */ + while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */ + ch.prot = prot; + break; + + case 3: /* interval */ + ch.interval = atoi(tok); + break; + + case 4: /* options */ + ch.options = (char *) malloc(strlen(tok)+1); + strcpy(ch.options, tok); + break; + } + + tok = strtok(NULL, ";"); + } + + ch.id = j; + + print(1, "Parsed (on %s)", &ch, ch.middleware); + chans[j++] = ch; + } + + fclose(file); + *num_chans = j; + + return chans; } /** * Logging thread + * + * Logs buffered readings against middleware */ void *log_thread(void *arg) { - static int threads; /* number of threads already started */ - int thread_id = threads++; /* current thread identifier */ - struct channel ch; - struct reading rd; + channel_t *ch = (channel_t *) arg; /* casting argument */ + reading_t rd; + CURLcode rc; - if (opts.verbose) printf("Thread #%i started\n", thread_id); + print(1, "Started logging thread", ch); - ch = *(struct channel *) arg; /* copy channel struct */ + while (TRUE) { + pthread_mutex_lock(&ch->mutex); + while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */ + pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data */ + } + pthread_mutex_unlock(&ch->mutex); + + while (!queue_is_empty(&ch->queue)) { + pthread_mutex_lock(&ch->mutex); + queue_first(&ch->queue, &rd); + pthread_mutex_unlock(&ch->mutex); + + rc = api_log(ch, rd); /* log reading */ - ch.prot->init_func(ch.options); /* init sensor/meter */ - - log: - - rd = ch.prot->read_func(); /* aquire reading */ - rc = api_log(ch.middleware, ch.uuid, rd); /* log reading */ - - if (rc != CURLE_OK) { - if (opts.verbose) printf("Delaying next transmission for 15 minutes due to pervious error\n"); - sleep(15); + if (rc == CURLE_OK) { + pthread_mutex_lock(&ch->mutex); + queue_deque(&ch->queue, &rd); /* remove reading from buffer */ + pthread_mutex_unlock(&ch->mutex); + } + else { + print(1, "Delaying next transmission for 15 minutes due to pervious error", ch); + + sleep(RETRY_PAUSE); + } + } + pthread_testcancel(); /* test for cancelation request */ } - if (opts.daemon) { - if (opts.verbose) printf("Sleeping %i seconds for next transmission\n", ch.interval); - sleep(ch.interval); - goto log; + return NULL; +} + +/** + * Read thread + * + * Aquires reading from meters/sensors + */ +void *read_thread(void *arg) { + channel_t *ch = (channel_t *) arg; /* casting argument */ + + print(1, "Started reading thread", ch); + + /* initalize channel */ + ch->handle = ch->prot->init_func(ch->options); /* init sensor/meter */ + + while (TRUE) { + reading_t rd = ch->prot->read_func(ch->handle); /* aquire reading */ + + pthread_mutex_lock(&ch->mutex); + if (opts.verbose > 4) queue_print(&ch->queue); /* Debugging */ + + queue_enque(&ch->queue, rd); + pthread_cond_broadcast(&ch->condition); + pthread_mutex_unlock(&ch->mutex); + + print(1, "Value read: %.3f (next reading in %i secs)", ch, rd.value, ch->interval); + + pthread_testcancel(); /* test for cancelation request */ + sleep(ch->interval); /* else sleep and restart aquisition */ } + /* close channel */ + ch->prot->close_func(ch->handle); + return NULL; } @@ -226,34 +341,56 @@ void *log_thread(void *arg) { * The main loop */ int main(int argc, char * argv[]) { + int num_chans = 0; + struct MHD_Daemon * d; + opts = parse_options(argc, argv); /* parse command line arguments */ + chans = parse_channels(opts.config, &num_chans); /* parse channels from configuration */ - FILE *file = fopen(opts.config, "r"); /* open configuration */ + print(1, "Started %s with verbosity level %i", NULL, argv[0], opts.verbose); + + curl_global_init(CURL_GLOBAL_ALL); /* global intialization for all threads */ + + /* start threads */ + for (int i = 0; i < num_chans; i++) { + channel_t * ch = &chans[i]; + + queue_init(&ch->queue, (BUFFER_LENGTH / ch->interval) + 1); /* initialize queue to buffer 10 minutes of data */ + + pthread_mutex_init(&ch->mutex, NULL); + pthread_cond_init(&ch->condition, NULL); - if (file == NULL) { - perror(opts.config); /* why didn't the file open? */ - exit(EXIT_FAILURE); + pthread_create(&ch->reading_thread, NULL, read_thread, (void *) ch); + pthread_create(&ch->logging_thread, NULL, log_thread, (void *) ch); } - int i = 0; - char line[256]; - pthread_t pthreads[16]; - while (fgets(line, sizeof line, file) != NULL) { /* read a line */ - if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */ - - struct channel ch = parse_channel(line); - - /* start logging threads */ - pthread_create(&pthreads[i++], NULL, log_thread, (void *) &ch); + if (opts.local) { /* start webserver */ + d = MHD_start_daemon( + MHD_USE_THREAD_PER_CONNECTION, + opts.port, + NULL, NULL, + handle_request, + &num_chans, + MHD_OPTION_END + ); } - fclose(file); - - for (int n = 0; n < i; n++) { /* wait for all threads to terminate */ - pthread_join(pthreads[n], NULL); + /* wait for all threads to terminate */ + for (int i = 0; i < num_chans; i++) { + channel_t * ch = &chans[i]; + + pthread_join(ch->reading_thread, NULL); + pthread_join(ch->logging_thread, NULL); + + pthread_cond_destroy(&ch->condition); + pthread_mutex_destroy(&ch->mutex); } + + if (opts.local) { /* stop webserver */ + MHD_stop_daemon(d); + } + + free(chans); return 0; } - - diff --git a/misc/controller/vzlogger/src/main.h b/misc/controller/vzlogger/src/main.h index 00f8e8f..136c261 100644 --- a/misc/controller/vzlogger/src/main.h +++ b/misc/controller/vzlogger/src/main.h @@ -1,14 +1,89 @@ +/** + * Main header file + * + * @package vzlogger + * @copyright Copyright (c) 2011, The volkszaehler.org project + * @license http://www.gnu.org/licenses/gpl.txt GNU Public License + * @author Steffen Vogel + */ +/* + * This file is part of volkzaehler.org + * + * volkzaehler.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * volkzaehler.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with volkszaehler.org. If not, see . + */ + #ifndef _MAIN_H_ #define _MAIN_H_ +#include +#include + +#include "protocol.h" +#include "queue.h" + #define VZ_VERSION "0.2" +#define MAX_CHANNELS 16 + +#define RETRY_PAUSE 16 /* seconds to wait after failed request */ +#define BUFFER_LENGTH 25 /* in seconds */ #ifndef TRUE - #define TRUE 1 +#define TRUE 1 #endif #ifndef FALSE - #define FALSE 0 +#define FALSE 0 #endif +/** + * Datatype for every channel + */ +typedef struct { + char * middleware; + char * uuid; + unsigned int interval; + char * options; + + int id; /* only for internal usage & debugging */ + + queue_t queue; /* circular queue to buffer readings */ + protocol_t *prot; /* pointer to protocol */ + void * handle; /* handle to store connection status */ + + pthread_t reading_thread; /* pthread for asynchronus reading */ + pthread_t logging_thread; /* pthread for asynchronus logging */ + pthread_mutex_t mutex; + pthread_cond_t condition; +} channel_t; + +/** + * Options from command line + */ +typedef struct { + char * config; /* path to config file */ + unsigned int port; /* tcp port for local interface */ + unsigned int verbose; /* verbosity level */ + + /* boolean bitfields, at the end of struct */ + int daemon:1; + int local:1; /* enable local interface */ +} options_t; + +/* Prototypes */ +options_t parse_options(int argc, char * argv[]); +channel_t * parse_channels(char * filename, int * num_chans); +void print(int level, char * format, channel_t *ch, ... ); +void usage(char ** argv); + #endif /* _MAIN_H_ */ diff --git a/misc/controller/vzlogger/src/protocol.h b/misc/controller/vzlogger/src/protocol.h new file mode 100644 index 0000000..a51bfdc --- /dev/null +++ b/misc/controller/vzlogger/src/protocol.h @@ -0,0 +1,23 @@ +#ifndef _PROTOCOL_H_ +#define _PROTOCOL_H_ + +#include + +typedef struct { + float value; + struct timeval tv; +} reading_t; + +typedef void *(*ifp_t)(char *options); +typedef void (*cfp_t)(void *handle); +typedef reading_t (*rfp_t)(void *handle); + +typedef struct { + char * name; /* short identifier for protocol */ + char * desc; /* more detailed description */ + rfp_t read_func; /* function pointer to read data */ + ifp_t init_func; /* function pointer to init a channel */ + cfp_t close_func; /* function pointer to close a channel */ +} protocol_t; + +#endif /* _PROTOCOL_H_ */ diff --git a/misc/controller/vzlogger/src/protocols/obis.c b/misc/controller/vzlogger/src/protocols/obis.c index 5fee272..4166811 100644 --- a/misc/controller/vzlogger/src/protocols/obis.c +++ b/misc/controller/vzlogger/src/protocols/obis.c @@ -28,16 +28,16 @@ * along with volkszaehler.org. If not, see . */ -#include -#include +#include #include -#include +#include +#include #include "obis.h" -void obis_init(char * port) { +void * obis_init(char * port) { struct termios tio; - int fd; + int *fd = malloc(sizeof(int)); memset(&tio, 0, sizeof(tio)); @@ -48,13 +48,21 @@ void obis_init(char * port) { tio.c_cc[VMIN] = 1; tio.c_cc[VTIME] = 5; - fd = open(port, O_RDWR); // | O_NONBLOCK); + *fd = open(port, O_RDWR); // | O_NONBLOCK); cfsetospeed(&tio, B9600); // 9600 baud cfsetispeed(&tio, B9600); // 9600 baud + + return (void *) fd; } -struct reading obis_get() { - struct reading rd; +void obis_close(void *handle) { + // TODO close serial port + + free(handle); +} + +reading_t obis_get(void *handle) { + reading_t rd; rd.value = 33.3334; gettimeofday(&rd.tv, NULL); diff --git a/misc/controller/vzlogger/src/protocols/obis.h b/misc/controller/vzlogger/src/protocols/obis.h index 517d426..b77c523 100644 --- a/misc/controller/vzlogger/src/protocols/obis.h +++ b/misc/controller/vzlogger/src/protocols/obis.h @@ -29,9 +29,10 @@ #ifndef _OBIS_H_ #define _OBIS_H_ -#include "../api.h" +#include "../protocol.h" -void obis_init(char * port); -struct reading obis_get(); +void * obis_init(char * port); +void obis_close(void *handle); +reading_t obis_get(void *handle); #endif /* _OBIS_H_ */ diff --git a/misc/controller/vzlogger/src/queue.c b/misc/controller/vzlogger/src/queue.c new file mode 100644 index 0000000..4d9a586 --- /dev/null +++ b/misc/controller/vzlogger/src/queue.c @@ -0,0 +1,60 @@ +#include +#include + +#include "queue.h" + +bool_t queue_init(queue_t *q, size_t size) { + q->buf = malloc(sizeof(reading_t) * size); /* keep one slot open */ + + if (q->buf) { + q->size = size; + q->read_p = q->write_p = 0; /* queue is empty */ + + return TRUE; + } + else { + return FALSE; /* cannot allocate memory */ + } +} + +bool_t queue_is_full(queue_t *q) { + return (((q->write_p + 1) % q->size) == q->read_p); +} + +bool_t queue_is_empty(queue_t *q) { + return (q->read_p == q->write_p); +} + +bool_t queue_enque(queue_t *q, reading_t rd) { + q->buf[q->write_p] = rd; + q->write_p++; + q->write_p %= q->size; + + return !queue_is_full(q); +} + +bool_t queue_deque(queue_t *q, reading_t *rd) { + *rd = q->buf[q->read_p]; + q->read_p++; + q->read_p %= q->size; + + return !queue_is_empty(q); +} + +size_t queue_size(queue_t *q) { + return q->write_p - q->read_p + (q->read_p > q->write_p) * q->size; +} + +bool_t queue_first(queue_t *q, reading_t *rd) { + *rd = q->buf[q->read_p]; + + return !queue_is_empty(q); +} + +void queue_print(queue_t *q) { + printf("Queue dump: [%.1f", q->buf[0].value); + for (int i = 1; i < q->size; i++) { + printf("|%.1f", q->buf[i].value); + } + printf("]\n"); +} diff --git a/misc/controller/vzlogger/src/queue.h b/misc/controller/vzlogger/src/queue.h new file mode 100644 index 0000000..0dfa5da --- /dev/null +++ b/misc/controller/vzlogger/src/queue.h @@ -0,0 +1,60 @@ +/** + * Circular queue to buffer readings + * + * @package vzlogger + * @copyright Copyright (c) 2011, The volkszaehler.org project + * @license http://www.gnu.org/licenses/gpl.txt GNU Public License + * @author Steffen Vogel + */ +/* + * This file is part of volkzaehler.org + * + * volkzaehler.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * volkzaehler.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with volkszaehler.org. If not, see . + */ + +#ifndef _QUEUE_H_ +#define _QUEUE_H_ + +#include "protocol.h" + +#ifndef TRUE +#define TRUE 1 +#endif + +#ifndef FALSE +#define FALSE 0 +#endif + +typedef char bool_t; + +typedef struct { + size_t size; + + int read_p; + int write_p; + + reading_t *buf; +} queue_t; + +bool_t queue_init(queue_t *q, size_t size); +bool_t queue_is_full(queue_t *q); +bool_t queue_is_empty(queue_t *q); +bool_t queue_enque(queue_t *q, reading_t rd); +bool_t queue_deque(queue_t *q, reading_t *rd); +bool_t queue_first(queue_t *q, reading_t *rd); +size_t queue_size(queue_t *q); +void queue_print(queue_t *q); + +#endif /* _QUEUE_H_ */ + diff --git a/misc/controller/vzlogger/vzlogger.conf b/misc/controller/vzlogger/vzlogger.conf index 52a6f8f..435feee 100644 --- a/misc/controller/vzlogger/vzlogger.conf +++ b/misc/controller/vzlogger/vzlogger.conf @@ -1,3 +1,3 @@ #middleware;uuid;type;interval;options -http://localhost/workspace/volkszaehler.org/htdocs/middleware;6836dd20-00d5-11e0-bab1-856ed5f959ae;obis;5;/dev/ttyUSB0 -http://volkszaehler.org/demo/middleware;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;6;/dev/ttyS0 +http://localhost/workspace/volkszaehler.org/htdocs/middleware;a301d8d0-903b-1234-94bb-d943d061b6a8;obis;5;/dev/ttyUSB0 +http://volkszaehler.org/demo/middleware.php;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;10;/dev/ttyS0