diff --git a/misc/controller/vzlogger/Makefile b/misc/controller/vzlogger/Makefile
index b0ae209..c1c5edf 100644
--- a/misc/controller/vzlogger/Makefile
+++ b/misc/controller/vzlogger/Makefile
@@ -1,5 +1,5 @@
CC=cc
-CFLAGS=-c -Wall -g -D_REENTRANT -std=c99
+CFLAGS=-c -Wall -g -D_REENTRANT -std=gnu99
LDFLAGS=
TARGET=vzlogger
@@ -8,8 +8,8 @@ all: $(TARGET)
clean:
rm -rf *.o
-vzlogger: main.c obis.c api.c
- $(CC) $(LDFLAGS) main.o obis.o api.o `curl-config --libs` -ljson -lpthread -o $(TARGET)
+vzlogger: main.c api.c local.c queue.c obis.c
+ $(CC) $(LDFLAGS) main.o api.o local.o queue.o obis.o `curl-config --libs` -ljson -lpthread -o $(TARGET) -lmicrohttpd -lm
main.c:
$(CC) $(CFLAGS) src/main.c -o main.o
@@ -17,5 +17,12 @@ main.c:
api.c:
$(CC) $(CFLAGS) src/api.c -o api.o `curl-config --cflags`
+local.c:
+ $(CC) $(CFLAGS) src/local.c -o local.o
+
+
+queue.c:
+ $(CC) $(CFLAGS) src/queue.c -o queue.o
+
obis.c:
$(CC) $(CFLAGS) src/protocols/obis.c -o obis.o
diff --git a/misc/controller/vzlogger/src/api.c b/misc/controller/vzlogger/src/api.c
index 7a813a3..9472e1c 100644
--- a/misc/controller/vzlogger/src/api.c
+++ b/misc/controller/vzlogger/src/api.c
@@ -23,14 +23,15 @@
* along with volkszaehler.org. If not, see .
*/
+#include
#include
#include
#include
-#include "main.h"
#include "api.h"
+#include "main.h"
-extern struct options opts;
+extern options_t opts;
/**
* Reformat CURLs debugging output
@@ -39,22 +40,22 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_
switch (type) {
case CURLINFO_TEXT:
case CURLINFO_END:
- printf("%.*s", (int) size, data);
+ print(4, "%.*s", NULL, (int) size, data);
break;
case CURLINFO_HEADER_IN:
case CURLINFO_HEADER_OUT:
- //printf("header: %.*s", size, data);
+ //print(4, "Header: %.*s", NULL, size, data);
break;
case CURLINFO_SSL_DATA_IN:
case CURLINFO_DATA_IN:
- printf("Received %lu bytes\n", (unsigned long) size);
+ print(4, "Received %lu bytes\n", NULL, (unsigned long) size);
break;
case CURLINFO_SSL_DATA_OUT:
case CURLINFO_DATA_OUT:
- printf("Sent %lu bytes.. ", (unsigned long) size);
+ print(4, "Sent %lu bytes.. ", NULL, (unsigned long) size);
break;
}
@@ -63,11 +64,11 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) {
size_t realsize = size * nmemb;
- struct curl_response *response = (struct curl_response *) data;
+ curl_response_t *response = (curl_response_t *) data;
response->data = realloc(response->data, response->size + realsize + 1);
if (response->data == NULL) { /* out of memory! */
- fprintf(stderr, "Not enough memory (realloc returned NULL)\n");
+ print(-1, "Not enough memory (realloc returned NULL)\n", NULL);
exit(EXIT_FAILURE);
}
@@ -81,20 +82,18 @@ size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *da
/**
* Log against the vz.org middleware with simple HTTP requests via CURL
*/
-CURLcode api_log(char * middleware, char * uuid, struct reading read) {
+CURLcode api_log(channel_t *ch, reading_t rd) {
CURL *curl;
CURLcode rc = -1;
int curl_code;
- struct curl_response chunk = {NULL, 0};
+ curl_response_t chunk = {NULL, 0};
char url[255], useragent[255], post[255];
/* build request strings */
- sprintf(url, "%s/data/%s.json", middleware, uuid); /* build url */
+ sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */
sprintf(useragent, "vzlogger/%s (%s)", VZ_VERSION, curl_version());
- sprintf(post, "?timestamp=%lu%lu&value=%f", read.tv.tv_sec, read.tv.tv_usec, read.value);
-
- curl_global_init(CURL_GLOBAL_ALL);
+ sprintf(post, "?timestamp=%lu%lu&value=%f", rd.tv.tv_sec, rd.tv.tv_usec / 1000, rd.value);
curl = curl_easy_init();
@@ -107,19 +106,19 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk);
- if (opts.verbose) printf("Sending request: %s%s\n", url, post);
+ print(1, "Sending request: %s%s", ch, url, post);
rc = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code);
- if (opts.verbose) printf("Request %s with code: %i\n", (curl_code == 200) ? "succeded" : "failed", curl_code);
+ print((curl_code == 200) ? 1 : -1, "Request %s with code: %i", ch, (curl_code == 200) ? "succeded" : "failed", curl_code);
if (rc != CURLE_OK) {
- fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(rc));
+ print(-1, "CURL error: %s", ch, curl_easy_strerror(rc));
}
else if (chunk.size == 0 || chunk.data == NULL) {
- fprintf(stderr, "No data received!\n");
+ print(-1, "No data received!", ch);
rc = -1;
}
else if (curl_code != 200) { /* parse exception */
@@ -133,18 +132,17 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
json_obj = json_object_object_get(json_obj, "exception");
if (json_obj) {
- fprintf(stderr, "%s [%i]: %s\n",
+ print(-1, "%s : %s", ch,
json_object_get_string(json_object_object_get(json_obj, "type")),
- json_object_get_int(json_object_object_get(json_obj, "code")),
json_object_get_string(json_object_object_get(json_obj, "message"))
);
}
else {
- fprintf(stderr, "Malformed middleware response: missing exception\n");
+ print(-1, "Malformed middleware response: missing exception", ch);
}
}
else {
- fprintf(stderr, "Malformed middleware response: %s\n", json_tokener_errors[json_tok->err]);
+ print(-1, "Malformed middleware response: %s", ch, json_tokener_errors[json_tok->err]);
}
rc = -1;
@@ -154,7 +152,7 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
free(chunk.data); /* free response */
}
else {
- fprintf(stderr, "Failed to create CURL handle\n");
+ print(-1, "Failed to create CURL handle", ch);
}
return rc;
diff --git a/misc/controller/vzlogger/src/api.h b/misc/controller/vzlogger/src/api.h
index 7c180a3..1fe251f 100644
--- a/misc/controller/vzlogger/src/api.h
+++ b/misc/controller/vzlogger/src/api.h
@@ -26,64 +26,19 @@
#ifndef _API_H_
#define _API_H_
+#include
#include
-#include
-#include
-#define BUFFER_LENGTH 64
+#include "main.h"
+#include "protocol.h"
-typedef struct reading (*rfp)();
-typedef void (*ifp)(char *options);
-
-struct curl_response {
+typedef struct {
char *data;
size_t size;
-};
-
-struct reading {
- float value;
- struct timeval tv;
-};
-
-struct protocol {
- char * name; /* short identifier for protocol */
- char * desc; /* more detailed description */
- rfp read_func; /* function pointer to read data */
- ifp init_func; /* function to init a channel */
-};
-
-/**
- * Datatype for every channel
- */
-struct channel {
- char * uuid;
- char * middleware;
- int interval;
- char * options;
- struct protocol *prot;
- struct reading buffer[BUFFER_LENGTH]; /* ring buffer */
-};
-
-/**
- * Options from command line
- */
-struct options {
- char * config; /* path to config file */
- unsigned int interval; /* seconds */
-
- /* boolean bitfields, at the end of struct */
- unsigned int verbose:1;
- unsigned int daemon:1;
-// unsigned local:1; /* enable local interface */
-};
-
-/* Prototypes */
-void usage(char ** argv);
-struct options parse_options(int argc, char * argv[]);
-//struct channels parse_channels(char * filename);
+} curl_response_t;
int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom);
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data);
-CURLcode api_log(char * middleware, char * uuid, struct reading read);
+CURLcode api_log(channel_t *ch, reading_t read);
#endif /* _API_H_ */
diff --git a/misc/controller/vzlogger/src/local.c b/misc/controller/vzlogger/src/local.c
new file mode 100644
index 0000000..9b1b7e3
--- /dev/null
+++ b/misc/controller/vzlogger/src/local.c
@@ -0,0 +1,98 @@
+/**
+ * Implementation of local interface via libmicrohttpd
+ *
+ * @package vzlogger
+ * @copyright Copyright (c) 2011, The volkszaehler.org project
+ * @license http://www.gnu.org/licenses/gpl.txt GNU Public License
+ * @author Steffen Vogel
+ */
+/*
+ * This file is part of volkzaehler.org
+ *
+ * volkzaehler.org is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * volkzaehler.org is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with volkszaehler.org. If not, see .
+ */
+
+#include
+#include
+
+#include "main.h"
+#include "local.h"
+
+extern channel_t *chans;
+extern options_t opts;
+
+int handle_request(void *cls, struct MHD_Connection *connection, const char *url, const char *method,
+ const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) {
+ const char * json_str;
+ int ret;
+ int num_chans = *(int *) cls;
+
+ print(2, "Local request reveived: %s %s %s", NULL, version, method, url);
+
+ struct MHD_Response *response;
+
+ struct json_object * json_obj = json_object_new_object();
+ struct json_object * json_data = json_object_new_array();
+
+ for (int i = 0; i < num_chans; i++) {
+ channel_t *ch = &chans[i];
+ reading_t rd;
+
+ struct json_object * json_channel = json_object_new_object();
+ struct json_object * json_tuples = json_object_new_array();
+
+ int j = ch->queue.write_p;
+ do {
+ pthread_mutex_lock(&ch->mutex);
+ rd = ch->queue.buf[j];
+ pthread_mutex_unlock(&ch->mutex);
+
+ if (rd.value != 0) {
+ struct json_object * json_tuple = json_object_new_array();
+
+ int timestamp = rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000;
+
+ json_object_array_add(json_tuple, json_object_new_int(timestamp));
+ json_object_array_add(json_tuple, json_object_new_double(rd.value));
+ json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */
+
+ json_object_array_add(json_tuples, json_tuple);
+ }
+
+ j++;
+ j %= ch->queue.size;
+ } while (j != ch->queue.read_p);
+
+ json_object_object_add(json_channel, "uuid", json_object_new_string(ch->uuid));
+ json_object_object_add(json_channel, "interval", json_object_new_int(ch->interval));
+ json_object_object_add(json_channel, "tuples", json_tuples);
+
+
+ json_object_array_add(json_data, json_channel);
+ }
+
+ json_object_object_add(json_obj, "version", json_object_new_string(VZ_VERSION));
+ json_object_object_add(json_obj, "data", json_data);
+ json_str = json_object_to_json_string(json_obj);
+
+ response = MHD_create_response_from_data(strlen(json_str), (void *) json_str, FALSE, TRUE);
+
+ MHD_add_response_header(response, "Content-type", "application/json");
+
+ ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
+
+ MHD_destroy_response (response);
+
+ return ret;
+}
diff --git a/misc/controller/vzlogger/src/local.h b/misc/controller/vzlogger/src/local.h
new file mode 100644
index 0000000..49a8af7
--- /dev/null
+++ b/misc/controller/vzlogger/src/local.h
@@ -0,0 +1,44 @@
+/**
+ * Header file for local interface
+ *
+ * @package vzlogger
+ * @copyright Copyright (c) 2011, The volkszaehler.org project
+ * @license http://www.gnu.org/licenses/gpl.txt GNU Public License
+ * @author Steffen Vogel
+ */
+/*
+ * This file is part of volkzaehler.org
+ *
+ * volkzaehler.org is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * volkzaehler.org is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with volkszaehler.org. If not, see .
+ */
+
+#ifndef _LOCAL_H_
+#define _LOCAL_H_
+
+#include
+
+int handle_request(
+ void *cls,
+ struct MHD_Connection *connection,
+ const char *url,
+ const char *method,
+ const char *version,
+ const char *upload_data,
+ size_t *upload_data_size,
+ void **con_cls
+);
+
+#endif /* _LOCAL_H_ */
+
+
diff --git a/misc/controller/vzlogger/src/main.c b/misc/controller/vzlogger/src/main.c
index 4c94bd8..3aab1e3 100644
--- a/misc/controller/vzlogger/src/main.c
+++ b/misc/controller/vzlogger/src/main.c
@@ -24,24 +24,29 @@
*/
#define VZ_VERSION "0.2"
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
#include "main.h"
+#include "queue.h"
#include "api.h"
+#include "local.h"
#include "protocols/obis.h"
-
-static struct protocol protocols[] = {
- {"obis", "Plaintext OBIS", obis_get, obis_init},
+
+static protocol_t protocols[] = {
+ {"obis", "Plaintext OBIS", obis_get, obis_init, obis_close},
+// {"fluksousb", "FluksoUSB board", flukso_get, flukso_init, flukso_close},
+// {"onewire", "Dallas 1-Wire sensors",onewire_get, onewire_init, onewire_close},
{NULL} /* stop condition for iterator */
};
@@ -49,10 +54,10 @@ static struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"daemon", required_argument, 0, 'd'},
{"interval", required_argument, 0, 'i'},
-// {"local", no_argument, 0, 'l'},
-// {"local-port", required_argument, 0, 'p'},
+ {"local", no_argument, 0, 'l'},
+ {"local-port", required_argument, 0, 'p'},
{"help", no_argument, 0, 'h'},
- {"verbose", no_argument, 0, 'v'},
+ {"verbose", optional_argument, 0, 'v'},
{NULL} /* stop condition for iterator */
};
@@ -60,15 +65,16 @@ static char * long_options_descs[] = {
"config file with channel -> uuid mapping",
"run as daemon",
"interval in seconds to read meters",
-// "activate local interface (tiny webserver)",
-// "TCP port for local interface"
+ "activate local interface (tiny webserver)",
+ "TCP port for local interface"
"show this help",
"enable verbose output",
NULL /* stop condition for iterator */
};
/* globals */
-struct options opts;
+options_t opts;
+channel_t * chans; /* mem gets allocated in parse_channels(), and freed in main() */
/**
* Print availble options and some other usefull information
@@ -76,7 +82,7 @@ struct options opts;
void usage(char * argv[]) {
char ** desc = long_options_descs;
struct option * op = long_options;
- struct protocol * prot = protocols;
+ protocol_t * prot = protocols;
printf("Usage: %s [options]\n\n", argv[0]);
printf(" following options are available:\n");
@@ -95,28 +101,63 @@ void usage(char * argv[]) {
prot++;
}
- printf("\nvzlogger - volkszaehler.org logging utility VERSION\n");
+ printf("\nvzlogger - volkszaehler.org logging utility %s\n", VZ_VERSION);
printf("by Steffen Vogel \n");
}
+/**
+ * Wrapper to log notices and errors
+ */
+void print(int level, char * format, channel_t *ch, ... ) {
+ static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+ va_list args;
+
+ struct timeval now;
+ struct tm * timeinfo;
+ char buffer[16];
+
+ if (level <= (signed int) opts.verbose) {
+ gettimeofday(&now, NULL);
+ timeinfo = localtime(&now.tv_sec);
+
+ strftime(buffer, 16, "%b %d %H:%M:%S", timeinfo);
+
+ pthread_mutex_lock(&mutex);
+ fprintf((level > 0) ? stdout : stderr, "[%s.%3lu]", buffer, now.tv_usec / 1000);
+
+ if (ch != NULL) {
+ fprintf((level > 0) ? stdout : stderr, "[ch#%i]\t", ch->id);
+ }
+ else {
+ fprintf((level > 0) ? stdout : stderr, "\t\t");
+ }
+
+ va_start(args, ch);
+ vfprintf((level > 0) ? stdout : stderr, format, args);
+ va_end(args);
+ fprintf((level > 0) ? stdout : stderr, "\n");
+ pthread_mutex_unlock(&mutex);
+ }
+}
+
/**
* Parse options from command line
*/
-struct options parse_options(int argc, char * argv[]) {
- struct options opts;
+options_t parse_options(int argc, char * argv[]) {
+ options_t opts;
/* setting default options */
- opts.interval = 300; /* seconds */
- opts.verbose = FALSE;
opts.daemon = FALSE;
- //opts.local = FALSE;
- opts.config = NULL;
+ opts.local = FALSE;
+ opts.verbose = 0;
+ opts.port = 8080;
+ opts.config = "vzlogger.conf";
while (TRUE) {
/* getopt_long stores the option index here. */
int option_index = 0;
- int c = getopt_long(argc, argv, "i:c:hdv", long_options, &option_index);
+ int c = getopt_long(argc, argv, "i:c:p:lhdv:", long_options, &option_index);
/* detect the end of the options. */
if (c == -1)
@@ -124,11 +165,19 @@ struct options parse_options(int argc, char * argv[]) {
switch (c) {
case 'v':
- opts.verbose = 1;
+ opts.verbose = (optarg == NULL) ? 1 : atoi(optarg);
break;
-
+
+ case 'l':
+ opts.local = TRUE;
+ break;
+
case 'd':
- opts.daemon = 1;
+ opts.daemon = TRUE;
+ break;
+
+ case 'p': /* port for local interface */
+ opts.port = atoi(optarg);
break;
case 'c': /* read config file */
@@ -146,79 +195,145 @@ struct options parse_options(int argc, char * argv[]) {
return opts;
}
-struct channel parse_channel(char * line) {
- struct channel ch;
- struct protocol * prot;
- char *tok = strtok(line, ";");
-
- for (int i = 0; i < 7 && tok != NULL; i++) {
- switch(i) {
- case 0: /* middleware */
- ch.middleware = (char *) malloc(strlen(tok)+1);
- strcpy(ch.middleware, tok);
- break;
-
- case 1: /* uuid */
- ch.uuid = (char *) malloc(strlen(tok)+1);
- strcpy(ch.uuid, tok);
- break;
-
- case 2: /* protocol */
- prot = protocols; /* reset pointer */
- while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */
- ch.prot = prot;
- break;
-
- case 3: /* interval */
- ch.interval = atoi(tok);
- break;
-
- case 4: /* options */
- ch.options = (char *) malloc(strlen(tok)+1);
- strcpy(ch.options, tok);
- break;
- }
-
- tok = strtok(NULL, ";");
+channel_t * parse_channels(char * filename, int * num_chans) {
+ FILE *file = fopen(filename, "r"); /* open configuration */
+
+ if (file == NULL) {
+ perror(filename); /* why didn't the file open? */
+ exit(EXIT_FAILURE);
}
- if (opts.verbose) printf("Channel parsed: %s\n", line);
+ char line[256];
+ int j = 0;
- return ch;
+ channel_t *chans = malloc(sizeof(channel_t) * MAX_CHANNELS);
+
+ while (fgets(line, sizeof line, file) != NULL) { /* read a line */
+ if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */
+
+ channel_t ch;
+ protocol_t *prot;
+ char *tok = strtok(line, ";");
+
+ for (int i = 0; i < 7 && tok != NULL; i++) {
+ switch(i) {
+ case 0: /* middleware */
+ ch.middleware = (char *) malloc(strlen(tok)+1);
+ strcpy(ch.middleware, tok);
+ break;
+
+ case 1: /* uuid */
+ ch.uuid = (char *) malloc(strlen(tok)+1);
+ strcpy(ch.uuid, tok);
+ break;
+
+ case 2: /* protocol */
+ prot = protocols; /* reset pointer */
+ while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */
+ ch.prot = prot;
+ break;
+
+ case 3: /* interval */
+ ch.interval = atoi(tok);
+ break;
+
+ case 4: /* options */
+ ch.options = (char *) malloc(strlen(tok)+1);
+ strcpy(ch.options, tok);
+ break;
+ }
+
+ tok = strtok(NULL, ";");
+ }
+
+ ch.id = j;
+
+ print(1, "Parsed (on %s)", &ch, ch.middleware);
+ chans[j++] = ch;
+ }
+
+ fclose(file);
+ *num_chans = j;
+
+ return chans;
}
/**
* Logging thread
+ *
+ * Logs buffered readings against middleware
*/
void *log_thread(void *arg) {
- static int threads; /* number of threads already started */
- int thread_id = threads++; /* current thread identifier */
- struct channel ch;
- struct reading rd;
+ channel_t *ch = (channel_t *) arg; /* casting argument */
+ reading_t rd;
+
CURLcode rc;
- if (opts.verbose) printf("Thread #%i started\n", thread_id);
+ print(1, "Started logging thread", ch);
- ch = *(struct channel *) arg; /* copy channel struct */
+ while (TRUE) {
+ pthread_mutex_lock(&ch->mutex);
+ while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */
+ pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data */
+ }
+ pthread_mutex_unlock(&ch->mutex);
+
+ while (!queue_is_empty(&ch->queue)) {
+ pthread_mutex_lock(&ch->mutex);
+ queue_first(&ch->queue, &rd);
+ pthread_mutex_unlock(&ch->mutex);
+
+ rc = api_log(ch, rd); /* log reading */
- ch.prot->init_func(ch.options); /* init sensor/meter */
-
- log:
-
- rd = ch.prot->read_func(); /* aquire reading */
- rc = api_log(ch.middleware, ch.uuid, rd); /* log reading */
-
- if (rc != CURLE_OK) {
- if (opts.verbose) printf("Delaying next transmission for 15 minutes due to pervious error\n");
- sleep(15);
+ if (rc == CURLE_OK) {
+ pthread_mutex_lock(&ch->mutex);
+ queue_deque(&ch->queue, &rd); /* remove reading from buffer */
+ pthread_mutex_unlock(&ch->mutex);
+ }
+ else {
+ print(1, "Delaying next transmission for 15 minutes due to pervious error", ch);
+
+ sleep(RETRY_PAUSE);
+ }
+ }
+ pthread_testcancel(); /* test for cancelation request */
}
- if (opts.daemon) {
- if (opts.verbose) printf("Sleeping %i seconds for next transmission\n", ch.interval);
- sleep(ch.interval);
- goto log;
+ return NULL;
+}
+
+/**
+ * Read thread
+ *
+ * Aquires reading from meters/sensors
+ */
+void *read_thread(void *arg) {
+ channel_t *ch = (channel_t *) arg; /* casting argument */
+
+ print(1, "Started reading thread", ch);
+
+ /* initalize channel */
+ ch->handle = ch->prot->init_func(ch->options); /* init sensor/meter */
+
+ while (TRUE) {
+ reading_t rd = ch->prot->read_func(ch->handle); /* aquire reading */
+
+ pthread_mutex_lock(&ch->mutex);
+ if (opts.verbose > 4) queue_print(&ch->queue); /* Debugging */
+
+ queue_enque(&ch->queue, rd);
+ pthread_cond_broadcast(&ch->condition);
+ pthread_mutex_unlock(&ch->mutex);
+
+ print(1, "Value read: %.3f (next reading in %i secs)", ch, rd.value, ch->interval);
+
+ pthread_testcancel(); /* test for cancelation request */
+ sleep(ch->interval); /* else sleep and restart aquisition */
}
+ /* close channel */
+ ch->prot->close_func(ch->handle);
+
return NULL;
}
@@ -226,34 +341,56 @@ void *log_thread(void *arg) {
* The main loop
*/
int main(int argc, char * argv[]) {
+ int num_chans = 0;
+ struct MHD_Daemon * d;
+
opts = parse_options(argc, argv); /* parse command line arguments */
+ chans = parse_channels(opts.config, &num_chans); /* parse channels from configuration */
- FILE *file = fopen(opts.config, "r"); /* open configuration */
+ print(1, "Started %s with verbosity level %i", NULL, argv[0], opts.verbose);
+
+ curl_global_init(CURL_GLOBAL_ALL); /* global intialization for all threads */
+
+ /* start threads */
+ for (int i = 0; i < num_chans; i++) {
+ channel_t * ch = &chans[i];
+
+ queue_init(&ch->queue, (BUFFER_LENGTH / ch->interval) + 1); /* initialize queue to buffer 10 minutes of data */
+
+ pthread_mutex_init(&ch->mutex, NULL);
+ pthread_cond_init(&ch->condition, NULL);
- if (file == NULL) {
- perror(opts.config); /* why didn't the file open? */
- exit(EXIT_FAILURE);
+ pthread_create(&ch->reading_thread, NULL, read_thread, (void *) ch);
+ pthread_create(&ch->logging_thread, NULL, log_thread, (void *) ch);
}
- int i = 0;
- char line[256];
- pthread_t pthreads[16];
- while (fgets(line, sizeof line, file) != NULL) { /* read a line */
- if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */
-
- struct channel ch = parse_channel(line);
-
- /* start logging threads */
- pthread_create(&pthreads[i++], NULL, log_thread, (void *) &ch);
+ if (opts.local) { /* start webserver */
+ d = MHD_start_daemon(
+ MHD_USE_THREAD_PER_CONNECTION,
+ opts.port,
+ NULL, NULL,
+ handle_request,
+ &num_chans,
+ MHD_OPTION_END
+ );
}
- fclose(file);
-
- for (int n = 0; n < i; n++) { /* wait for all threads to terminate */
- pthread_join(pthreads[n], NULL);
+ /* wait for all threads to terminate */
+ for (int i = 0; i < num_chans; i++) {
+ channel_t * ch = &chans[i];
+
+ pthread_join(ch->reading_thread, NULL);
+ pthread_join(ch->logging_thread, NULL);
+
+ pthread_cond_destroy(&ch->condition);
+ pthread_mutex_destroy(&ch->mutex);
}
+
+ if (opts.local) { /* stop webserver */
+ MHD_stop_daemon(d);
+ }
+
+ free(chans);
return 0;
}
-
-
diff --git a/misc/controller/vzlogger/src/main.h b/misc/controller/vzlogger/src/main.h
index 00f8e8f..136c261 100644
--- a/misc/controller/vzlogger/src/main.h
+++ b/misc/controller/vzlogger/src/main.h
@@ -1,14 +1,89 @@
+/**
+ * Main header file
+ *
+ * @package vzlogger
+ * @copyright Copyright (c) 2011, The volkszaehler.org project
+ * @license http://www.gnu.org/licenses/gpl.txt GNU Public License
+ * @author Steffen Vogel
+ */
+/*
+ * This file is part of volkzaehler.org
+ *
+ * volkzaehler.org is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * volkzaehler.org is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with volkszaehler.org. If not, see .
+ */
+
#ifndef _MAIN_H_
#define _MAIN_H_
+#include
+#include
+
+#include "protocol.h"
+#include "queue.h"
+
#define VZ_VERSION "0.2"
+#define MAX_CHANNELS 16
+
+#define RETRY_PAUSE 16 /* seconds to wait after failed request */
+#define BUFFER_LENGTH 25 /* in seconds */
#ifndef TRUE
- #define TRUE 1
+#define TRUE 1
#endif
#ifndef FALSE
- #define FALSE 0
+#define FALSE 0
#endif
+/**
+ * Datatype for every channel
+ */
+typedef struct {
+ char * middleware;
+ char * uuid;
+ unsigned int interval;
+ char * options;
+
+ int id; /* only for internal usage & debugging */
+
+ queue_t queue; /* circular queue to buffer readings */
+ protocol_t *prot; /* pointer to protocol */
+ void * handle; /* handle to store connection status */
+
+ pthread_t reading_thread; /* pthread for asynchronus reading */
+ pthread_t logging_thread; /* pthread for asynchronus logging */
+ pthread_mutex_t mutex;
+ pthread_cond_t condition;
+} channel_t;
+
+/**
+ * Options from command line
+ */
+typedef struct {
+ char * config; /* path to config file */
+ unsigned int port; /* tcp port for local interface */
+ unsigned int verbose; /* verbosity level */
+
+ /* boolean bitfields, at the end of struct */
+ int daemon:1;
+ int local:1; /* enable local interface */
+} options_t;
+
+/* Prototypes */
+options_t parse_options(int argc, char * argv[]);
+channel_t * parse_channels(char * filename, int * num_chans);
+void print(int level, char * format, channel_t *ch, ... );
+void usage(char ** argv);
+
#endif /* _MAIN_H_ */
diff --git a/misc/controller/vzlogger/src/protocol.h b/misc/controller/vzlogger/src/protocol.h
new file mode 100644
index 0000000..a51bfdc
--- /dev/null
+++ b/misc/controller/vzlogger/src/protocol.h
@@ -0,0 +1,23 @@
+#ifndef _PROTOCOL_H_
+#define _PROTOCOL_H_
+
+#include
+
+typedef struct {
+ float value;
+ struct timeval tv;
+} reading_t;
+
+typedef void *(*ifp_t)(char *options);
+typedef void (*cfp_t)(void *handle);
+typedef reading_t (*rfp_t)(void *handle);
+
+typedef struct {
+ char * name; /* short identifier for protocol */
+ char * desc; /* more detailed description */
+ rfp_t read_func; /* function pointer to read data */
+ ifp_t init_func; /* function pointer to init a channel */
+ cfp_t close_func; /* function pointer to close a channel */
+} protocol_t;
+
+#endif /* _PROTOCOL_H_ */
diff --git a/misc/controller/vzlogger/src/protocols/obis.c b/misc/controller/vzlogger/src/protocols/obis.c
index 5fee272..4166811 100644
--- a/misc/controller/vzlogger/src/protocols/obis.c
+++ b/misc/controller/vzlogger/src/protocols/obis.c
@@ -28,16 +28,16 @@
* along with volkszaehler.org. If not, see .
*/
-#include
-#include
+#include
#include
-#include
+#include
+#include
#include "obis.h"
-void obis_init(char * port) {
+void * obis_init(char * port) {
struct termios tio;
- int fd;
+ int *fd = malloc(sizeof(int));
memset(&tio, 0, sizeof(tio));
@@ -48,13 +48,21 @@ void obis_init(char * port) {
tio.c_cc[VMIN] = 1;
tio.c_cc[VTIME] = 5;
- fd = open(port, O_RDWR); // | O_NONBLOCK);
+ *fd = open(port, O_RDWR); // | O_NONBLOCK);
cfsetospeed(&tio, B9600); // 9600 baud
cfsetispeed(&tio, B9600); // 9600 baud
+
+ return (void *) fd;
}
-struct reading obis_get() {
- struct reading rd;
+void obis_close(void *handle) {
+ // TODO close serial port
+
+ free(handle);
+}
+
+reading_t obis_get(void *handle) {
+ reading_t rd;
rd.value = 33.3334;
gettimeofday(&rd.tv, NULL);
diff --git a/misc/controller/vzlogger/src/protocols/obis.h b/misc/controller/vzlogger/src/protocols/obis.h
index 517d426..b77c523 100644
--- a/misc/controller/vzlogger/src/protocols/obis.h
+++ b/misc/controller/vzlogger/src/protocols/obis.h
@@ -29,9 +29,10 @@
#ifndef _OBIS_H_
#define _OBIS_H_
-#include "../api.h"
+#include "../protocol.h"
-void obis_init(char * port);
-struct reading obis_get();
+void * obis_init(char * port);
+void obis_close(void *handle);
+reading_t obis_get(void *handle);
#endif /* _OBIS_H_ */
diff --git a/misc/controller/vzlogger/src/queue.c b/misc/controller/vzlogger/src/queue.c
new file mode 100644
index 0000000..4d9a586
--- /dev/null
+++ b/misc/controller/vzlogger/src/queue.c
@@ -0,0 +1,60 @@
+#include
+#include
+
+#include "queue.h"
+
+bool_t queue_init(queue_t *q, size_t size) {
+ q->buf = malloc(sizeof(reading_t) * size); /* keep one slot open */
+
+ if (q->buf) {
+ q->size = size;
+ q->read_p = q->write_p = 0; /* queue is empty */
+
+ return TRUE;
+ }
+ else {
+ return FALSE; /* cannot allocate memory */
+ }
+}
+
+bool_t queue_is_full(queue_t *q) {
+ return (((q->write_p + 1) % q->size) == q->read_p);
+}
+
+bool_t queue_is_empty(queue_t *q) {
+ return (q->read_p == q->write_p);
+}
+
+bool_t queue_enque(queue_t *q, reading_t rd) {
+ q->buf[q->write_p] = rd;
+ q->write_p++;
+ q->write_p %= q->size;
+
+ return !queue_is_full(q);
+}
+
+bool_t queue_deque(queue_t *q, reading_t *rd) {
+ *rd = q->buf[q->read_p];
+ q->read_p++;
+ q->read_p %= q->size;
+
+ return !queue_is_empty(q);
+}
+
+size_t queue_size(queue_t *q) {
+ return q->write_p - q->read_p + (q->read_p > q->write_p) * q->size;
+}
+
+bool_t queue_first(queue_t *q, reading_t *rd) {
+ *rd = q->buf[q->read_p];
+
+ return !queue_is_empty(q);
+}
+
+void queue_print(queue_t *q) {
+ printf("Queue dump: [%.1f", q->buf[0].value);
+ for (int i = 1; i < q->size; i++) {
+ printf("|%.1f", q->buf[i].value);
+ }
+ printf("]\n");
+}
diff --git a/misc/controller/vzlogger/src/queue.h b/misc/controller/vzlogger/src/queue.h
new file mode 100644
index 0000000..0dfa5da
--- /dev/null
+++ b/misc/controller/vzlogger/src/queue.h
@@ -0,0 +1,60 @@
+/**
+ * Circular queue to buffer readings
+ *
+ * @package vzlogger
+ * @copyright Copyright (c) 2011, The volkszaehler.org project
+ * @license http://www.gnu.org/licenses/gpl.txt GNU Public License
+ * @author Steffen Vogel
+ */
+/*
+ * This file is part of volkzaehler.org
+ *
+ * volkzaehler.org is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * volkzaehler.org is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with volkszaehler.org. If not, see .
+ */
+
+#ifndef _QUEUE_H_
+#define _QUEUE_H_
+
+#include "protocol.h"
+
+#ifndef TRUE
+#define TRUE 1
+#endif
+
+#ifndef FALSE
+#define FALSE 0
+#endif
+
+typedef char bool_t;
+
+typedef struct {
+ size_t size;
+
+ int read_p;
+ int write_p;
+
+ reading_t *buf;
+} queue_t;
+
+bool_t queue_init(queue_t *q, size_t size);
+bool_t queue_is_full(queue_t *q);
+bool_t queue_is_empty(queue_t *q);
+bool_t queue_enque(queue_t *q, reading_t rd);
+bool_t queue_deque(queue_t *q, reading_t *rd);
+bool_t queue_first(queue_t *q, reading_t *rd);
+size_t queue_size(queue_t *q);
+void queue_print(queue_t *q);
+
+#endif /* _QUEUE_H_ */
+
diff --git a/misc/controller/vzlogger/vzlogger.conf b/misc/controller/vzlogger/vzlogger.conf
index 52a6f8f..435feee 100644
--- a/misc/controller/vzlogger/vzlogger.conf
+++ b/misc/controller/vzlogger/vzlogger.conf
@@ -1,3 +1,3 @@
#middleware;uuid;type;interval;options
-http://localhost/workspace/volkszaehler.org/htdocs/middleware;6836dd20-00d5-11e0-bab1-856ed5f959ae;obis;5;/dev/ttyUSB0
-http://volkszaehler.org/demo/middleware;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;6;/dev/ttyS0
+http://localhost/workspace/volkszaehler.org/htdocs/middleware;a301d8d0-903b-1234-94bb-d943d061b6a8;obis;5;/dev/ttyUSB0
+http://volkszaehler.org/demo/middleware.php;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;10;/dev/ttyS0