almost finished & ready for #ec11

This commit is contained in:
Steffen Vogel 2011-05-26 23:33:47 +02:00
parent f3b85c1af8
commit 7dfe61d98b
13 changed files with 664 additions and 198 deletions

View file

@ -1,5 +1,5 @@
CC=cc
CFLAGS=-c -Wall -g -D_REENTRANT -std=c99
CFLAGS=-c -Wall -g -D_REENTRANT -std=gnu99
LDFLAGS=
TARGET=vzlogger
@ -8,8 +8,8 @@ all: $(TARGET)
clean:
rm -rf *.o
vzlogger: main.c obis.c api.c
$(CC) $(LDFLAGS) main.o obis.o api.o `curl-config --libs` -ljson -lpthread -o $(TARGET)
vzlogger: main.c api.c local.c queue.c obis.c
$(CC) $(LDFLAGS) main.o api.o local.o queue.o obis.o `curl-config --libs` -ljson -lpthread -o $(TARGET) -lmicrohttpd -lm
main.c:
$(CC) $(CFLAGS) src/main.c -o main.o
@ -17,5 +17,12 @@ main.c:
api.c:
$(CC) $(CFLAGS) src/api.c -o api.o `curl-config --cflags`
local.c:
$(CC) $(CFLAGS) src/local.c -o local.o
queue.c:
$(CC) $(CFLAGS) src/queue.c -o queue.o
obis.c:
$(CC) $(CFLAGS) src/protocols/obis.c -o obis.o

View file

@ -23,14 +23,15 @@
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <json/json.h>
#include "main.h"
#include "api.h"
#include "main.h"
extern struct options opts;
extern options_t opts;
/**
* Reformat CURLs debugging output
@ -39,22 +40,22 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_
switch (type) {
case CURLINFO_TEXT:
case CURLINFO_END:
printf("%.*s", (int) size, data);
print(4, "%.*s", NULL, (int) size, data);
break;
case CURLINFO_HEADER_IN:
case CURLINFO_HEADER_OUT:
//printf("header: %.*s", size, data);
//print(4, "Header: %.*s", NULL, size, data);
break;
case CURLINFO_SSL_DATA_IN:
case CURLINFO_DATA_IN:
printf("Received %lu bytes\n", (unsigned long) size);
print(4, "Received %lu bytes\n", NULL, (unsigned long) size);
break;
case CURLINFO_SSL_DATA_OUT:
case CURLINFO_DATA_OUT:
printf("Sent %lu bytes.. ", (unsigned long) size);
print(4, "Sent %lu bytes.. ", NULL, (unsigned long) size);
break;
}
@ -63,11 +64,11 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) {
size_t realsize = size * nmemb;
struct curl_response *response = (struct curl_response *) data;
curl_response_t *response = (curl_response_t *) data;
response->data = realloc(response->data, response->size + realsize + 1);
if (response->data == NULL) { /* out of memory! */
fprintf(stderr, "Not enough memory (realloc returned NULL)\n");
print(-1, "Not enough memory (realloc returned NULL)\n", NULL);
exit(EXIT_FAILURE);
}
@ -81,20 +82,18 @@ size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *da
/**
* Log against the vz.org middleware with simple HTTP requests via CURL
*/
CURLcode api_log(char * middleware, char * uuid, struct reading read) {
CURLcode api_log(channel_t *ch, reading_t rd) {
CURL *curl;
CURLcode rc = -1;
int curl_code;
struct curl_response chunk = {NULL, 0};
curl_response_t chunk = {NULL, 0};
char url[255], useragent[255], post[255];
/* build request strings */
sprintf(url, "%s/data/%s.json", middleware, uuid); /* build url */
sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */
sprintf(useragent, "vzlogger/%s (%s)", VZ_VERSION, curl_version());
sprintf(post, "?timestamp=%lu%lu&value=%f", read.tv.tv_sec, read.tv.tv_usec, read.value);
curl_global_init(CURL_GLOBAL_ALL);
sprintf(post, "?timestamp=%lu%lu&value=%f", rd.tv.tv_sec, rd.tv.tv_usec / 1000, rd.value);
curl = curl_easy_init();
@ -107,19 +106,19 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk);
if (opts.verbose) printf("Sending request: %s%s\n", url, post);
print(1, "Sending request: %s%s", ch, url, post);
rc = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code);
if (opts.verbose) printf("Request %s with code: %i\n", (curl_code == 200) ? "succeded" : "failed", curl_code);
print((curl_code == 200) ? 1 : -1, "Request %s with code: %i", ch, (curl_code == 200) ? "succeded" : "failed", curl_code);
if (rc != CURLE_OK) {
fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(rc));
print(-1, "CURL error: %s", ch, curl_easy_strerror(rc));
}
else if (chunk.size == 0 || chunk.data == NULL) {
fprintf(stderr, "No data received!\n");
print(-1, "No data received!", ch);
rc = -1;
}
else if (curl_code != 200) { /* parse exception */
@ -133,18 +132,17 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
json_obj = json_object_object_get(json_obj, "exception");
if (json_obj) {
fprintf(stderr, "%s [%i]: %s\n",
print(-1, "%s : %s", ch,
json_object_get_string(json_object_object_get(json_obj, "type")),
json_object_get_int(json_object_object_get(json_obj, "code")),
json_object_get_string(json_object_object_get(json_obj, "message"))
);
}
else {
fprintf(stderr, "Malformed middleware response: missing exception\n");
print(-1, "Malformed middleware response: missing exception", ch);
}
}
else {
fprintf(stderr, "Malformed middleware response: %s\n", json_tokener_errors[json_tok->err]);
print(-1, "Malformed middleware response: %s", ch, json_tokener_errors[json_tok->err]);
}
rc = -1;
@ -154,7 +152,7 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
free(chunk.data); /* free response */
}
else {
fprintf(stderr, "Failed to create CURL handle\n");
print(-1, "Failed to create CURL handle", ch);
}
return rc;

View file

@ -26,64 +26,19 @@
#ifndef _API_H_
#define _API_H_
#include <stddef.h>
#include <curl/curl.h>
#include <curl/types.h>
#include <curl/easy.h>
#define BUFFER_LENGTH 64
#include "main.h"
#include "protocol.h"
typedef struct reading (*rfp)();
typedef void (*ifp)(char *options);
struct curl_response {
typedef struct {
char *data;
size_t size;
};
struct reading {
float value;
struct timeval tv;
};
struct protocol {
char * name; /* short identifier for protocol */
char * desc; /* more detailed description */
rfp read_func; /* function pointer to read data */
ifp init_func; /* function to init a channel */
};
/**
* Datatype for every channel
*/
struct channel {
char * uuid;
char * middleware;
int interval;
char * options;
struct protocol *prot;
struct reading buffer[BUFFER_LENGTH]; /* ring buffer */
};
/**
* Options from command line
*/
struct options {
char * config; /* path to config file */
unsigned int interval; /* seconds */
/* boolean bitfields, at the end of struct */
unsigned int verbose:1;
unsigned int daemon:1;
// unsigned local:1; /* enable local interface */
};
/* Prototypes */
void usage(char ** argv);
struct options parse_options(int argc, char * argv[]);
//struct channels parse_channels(char * filename);
} curl_response_t;
int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom);
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data);
CURLcode api_log(char * middleware, char * uuid, struct reading read);
CURLcode api_log(channel_t *ch, reading_t read);
#endif /* _API_H_ */

98
src/local.c Normal file
View file

@ -0,0 +1,98 @@
/**
* Implementation of local interface via libmicrohttpd
*
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <json/json.h>
#include <string.h>
#include "main.h"
#include "local.h"
extern channel_t *chans;
extern options_t opts;
int handle_request(void *cls, struct MHD_Connection *connection, const char *url, const char *method,
const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) {
const char * json_str;
int ret;
int num_chans = *(int *) cls;
print(2, "Local request reveived: %s %s %s", NULL, version, method, url);
struct MHD_Response *response;
struct json_object * json_obj = json_object_new_object();
struct json_object * json_data = json_object_new_array();
for (int i = 0; i < num_chans; i++) {
channel_t *ch = &chans[i];
reading_t rd;
struct json_object * json_channel = json_object_new_object();
struct json_object * json_tuples = json_object_new_array();
int j = ch->queue.write_p;
do {
pthread_mutex_lock(&ch->mutex);
rd = ch->queue.buf[j];
pthread_mutex_unlock(&ch->mutex);
if (rd.value != 0) {
struct json_object * json_tuple = json_object_new_array();
int timestamp = rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000;
json_object_array_add(json_tuple, json_object_new_int(timestamp));
json_object_array_add(json_tuple, json_object_new_double(rd.value));
json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */
json_object_array_add(json_tuples, json_tuple);
}
j++;
j %= ch->queue.size;
} while (j != ch->queue.read_p);
json_object_object_add(json_channel, "uuid", json_object_new_string(ch->uuid));
json_object_object_add(json_channel, "interval", json_object_new_int(ch->interval));
json_object_object_add(json_channel, "tuples", json_tuples);
json_object_array_add(json_data, json_channel);
}
json_object_object_add(json_obj, "version", json_object_new_string(VZ_VERSION));
json_object_object_add(json_obj, "data", json_data);
json_str = json_object_to_json_string(json_obj);
response = MHD_create_response_from_data(strlen(json_str), (void *) json_str, FALSE, TRUE);
MHD_add_response_header(response, "Content-type", "application/json");
ret = MHD_queue_response(connection, MHD_HTTP_OK, response);
MHD_destroy_response (response);
return ret;
}

44
src/local.h Normal file
View file

@ -0,0 +1,44 @@
/**
* Header file for local interface
*
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _LOCAL_H_
#define _LOCAL_H_
#include <microhttpd.h>
int handle_request(
void *cls,
struct MHD_Connection *connection,
const char *url,
const char *method,
const char *version,
const char *upload_data,
size_t *upload_data_size,
void **con_cls
);
#endif /* _LOCAL_H_ */

View file

@ -24,24 +24,29 @@
*/
#define VZ_VERSION "0.2"
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
#include <curl/types.h>
#include <stdio.h>
#include <getopt.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#include <unistd.h>
#include <math.h>
#include <stdint.h>
#include <microhttpd.h>
#include "main.h"
#include "queue.h"
#include "api.h"
#include "local.h"
#include "protocols/obis.h"
static struct protocol protocols[] = {
{"obis", "Plaintext OBIS", obis_get, obis_init},
static protocol_t protocols[] = {
{"obis", "Plaintext OBIS", obis_get, obis_init, obis_close},
// {"fluksousb", "FluksoUSB board", flukso_get, flukso_init, flukso_close},
// {"onewire", "Dallas 1-Wire sensors",onewire_get, onewire_init, onewire_close},
{NULL} /* stop condition for iterator */
};
@ -49,10 +54,10 @@ static struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"daemon", required_argument, 0, 'd'},
{"interval", required_argument, 0, 'i'},
// {"local", no_argument, 0, 'l'},
// {"local-port", required_argument, 0, 'p'},
{"local", no_argument, 0, 'l'},
{"local-port", required_argument, 0, 'p'},
{"help", no_argument, 0, 'h'},
{"verbose", no_argument, 0, 'v'},
{"verbose", optional_argument, 0, 'v'},
{NULL} /* stop condition for iterator */
};
@ -60,15 +65,16 @@ static char * long_options_descs[] = {
"config file with channel -> uuid mapping",
"run as daemon",
"interval in seconds to read meters",
// "activate local interface (tiny webserver)",
// "TCP port for local interface"
"activate local interface (tiny webserver)",
"TCP port for local interface"
"show this help",
"enable verbose output",
NULL /* stop condition for iterator */
};
/* globals */
struct options opts;
options_t opts;
channel_t * chans; /* mem gets allocated in parse_channels(), and freed in main() */
/**
* Print availble options and some other usefull information
@ -76,7 +82,7 @@ struct options opts;
void usage(char * argv[]) {
char ** desc = long_options_descs;
struct option * op = long_options;
struct protocol * prot = protocols;
protocol_t * prot = protocols;
printf("Usage: %s [options]\n\n", argv[0]);
printf(" following options are available:\n");
@ -95,28 +101,63 @@ void usage(char * argv[]) {
prot++;
}
printf("\nvzlogger - volkszaehler.org logging utility VERSION\n");
printf("\nvzlogger - volkszaehler.org logging utility %s\n", VZ_VERSION);
printf("by Steffen Vogel <stv0g@0l.de>\n");
}
/**
* Wrapper to log notices and errors
*/
void print(int level, char * format, channel_t *ch, ... ) {
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
va_list args;
struct timeval now;
struct tm * timeinfo;
char buffer[16];
if (level <= (signed int) opts.verbose) {
gettimeofday(&now, NULL);
timeinfo = localtime(&now.tv_sec);
strftime(buffer, 16, "%b %d %H:%M:%S", timeinfo);
pthread_mutex_lock(&mutex);
fprintf((level > 0) ? stdout : stderr, "[%s.%3lu]", buffer, now.tv_usec / 1000);
if (ch != NULL) {
fprintf((level > 0) ? stdout : stderr, "[ch#%i]\t", ch->id);
}
else {
fprintf((level > 0) ? stdout : stderr, "\t\t");
}
va_start(args, ch);
vfprintf((level > 0) ? stdout : stderr, format, args);
va_end(args);
fprintf((level > 0) ? stdout : stderr, "\n");
pthread_mutex_unlock(&mutex);
}
}
/**
* Parse options from command line
*/
struct options parse_options(int argc, char * argv[]) {
struct options opts;
options_t parse_options(int argc, char * argv[]) {
options_t opts;
/* setting default options */
opts.interval = 300; /* seconds */
opts.verbose = FALSE;
opts.daemon = FALSE;
//opts.local = FALSE;
opts.config = NULL;
opts.local = FALSE;
opts.verbose = 0;
opts.port = 8080;
opts.config = "vzlogger.conf";
while (TRUE) {
/* getopt_long stores the option index here. */
int option_index = 0;
int c = getopt_long(argc, argv, "i:c:hdv", long_options, &option_index);
int c = getopt_long(argc, argv, "i:c:p:lhdv:", long_options, &option_index);
/* detect the end of the options. */
if (c == -1)
@ -124,11 +165,19 @@ struct options parse_options(int argc, char * argv[]) {
switch (c) {
case 'v':
opts.verbose = 1;
opts.verbose = (optarg == NULL) ? 1 : atoi(optarg);
break;
case 'l':
opts.local = TRUE;
break;
case 'd':
opts.daemon = 1;
opts.daemon = TRUE;
break;
case 'p': /* port for local interface */
opts.port = atoi(optarg);
break;
case 'c': /* read config file */
@ -146,79 +195,145 @@ struct options parse_options(int argc, char * argv[]) {
return opts;
}
struct channel parse_channel(char * line) {
struct channel ch;
struct protocol * prot;
char *tok = strtok(line, ";");
for (int i = 0; i < 7 && tok != NULL; i++) {
switch(i) {
case 0: /* middleware */
ch.middleware = (char *) malloc(strlen(tok)+1);
strcpy(ch.middleware, tok);
break;
case 1: /* uuid */
ch.uuid = (char *) malloc(strlen(tok)+1);
strcpy(ch.uuid, tok);
break;
case 2: /* protocol */
prot = protocols; /* reset pointer */
while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */
ch.prot = prot;
break;
case 3: /* interval */
ch.interval = atoi(tok);
break;
case 4: /* options */
ch.options = (char *) malloc(strlen(tok)+1);
strcpy(ch.options, tok);
break;
}
tok = strtok(NULL, ";");
channel_t * parse_channels(char * filename, int * num_chans) {
FILE *file = fopen(filename, "r"); /* open configuration */
if (file == NULL) {
perror(filename); /* why didn't the file open? */
exit(EXIT_FAILURE);
}
if (opts.verbose) printf("Channel parsed: %s\n", line);
char line[256];
int j = 0;
return ch;
channel_t *chans = malloc(sizeof(channel_t) * MAX_CHANNELS);
while (fgets(line, sizeof line, file) != NULL) { /* read a line */
if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */
channel_t ch;
protocol_t *prot;
char *tok = strtok(line, ";");
for (int i = 0; i < 7 && tok != NULL; i++) {
switch(i) {
case 0: /* middleware */
ch.middleware = (char *) malloc(strlen(tok)+1);
strcpy(ch.middleware, tok);
break;
case 1: /* uuid */
ch.uuid = (char *) malloc(strlen(tok)+1);
strcpy(ch.uuid, tok);
break;
case 2: /* protocol */
prot = protocols; /* reset pointer */
while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */
ch.prot = prot;
break;
case 3: /* interval */
ch.interval = atoi(tok);
break;
case 4: /* options */
ch.options = (char *) malloc(strlen(tok)+1);
strcpy(ch.options, tok);
break;
}
tok = strtok(NULL, ";");
}
ch.id = j;
print(1, "Parsed (on %s)", &ch, ch.middleware);
chans[j++] = ch;
}
fclose(file);
*num_chans = j;
return chans;
}
/**
* Logging thread
*
* Logs buffered readings against middleware
*/
void *log_thread(void *arg) {
static int threads; /* number of threads already started */
int thread_id = threads++; /* current thread identifier */
struct channel ch;
struct reading rd;
channel_t *ch = (channel_t *) arg; /* casting argument */
reading_t rd;
CURLcode rc;
if (opts.verbose) printf("Thread #%i started\n", thread_id);
print(1, "Started logging thread", ch);
ch = *(struct channel *) arg; /* copy channel struct */
while (TRUE) {
pthread_mutex_lock(&ch->mutex);
while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */
pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data */
}
pthread_mutex_unlock(&ch->mutex);
while (!queue_is_empty(&ch->queue)) {
pthread_mutex_lock(&ch->mutex);
queue_first(&ch->queue, &rd);
pthread_mutex_unlock(&ch->mutex);
rc = api_log(ch, rd); /* log reading */
ch.prot->init_func(ch.options); /* init sensor/meter */
log:
rd = ch.prot->read_func(); /* aquire reading */
rc = api_log(ch.middleware, ch.uuid, rd); /* log reading */
if (rc != CURLE_OK) {
if (opts.verbose) printf("Delaying next transmission for 15 minutes due to pervious error\n");
sleep(15);
if (rc == CURLE_OK) {
pthread_mutex_lock(&ch->mutex);
queue_deque(&ch->queue, &rd); /* remove reading from buffer */
pthread_mutex_unlock(&ch->mutex);
}
else {
print(1, "Delaying next transmission for 15 minutes due to pervious error", ch);
sleep(RETRY_PAUSE);
}
}
pthread_testcancel(); /* test for cancelation request */
}
if (opts.daemon) {
if (opts.verbose) printf("Sleeping %i seconds for next transmission\n", ch.interval);
sleep(ch.interval);
goto log;
return NULL;
}
/**
* Read thread
*
* Aquires reading from meters/sensors
*/
void *read_thread(void *arg) {
channel_t *ch = (channel_t *) arg; /* casting argument */
print(1, "Started reading thread", ch);
/* initalize channel */
ch->handle = ch->prot->init_func(ch->options); /* init sensor/meter */
while (TRUE) {
reading_t rd = ch->prot->read_func(ch->handle); /* aquire reading */
pthread_mutex_lock(&ch->mutex);
if (opts.verbose > 4) queue_print(&ch->queue); /* Debugging */
queue_enque(&ch->queue, rd);
pthread_cond_broadcast(&ch->condition);
pthread_mutex_unlock(&ch->mutex);
print(1, "Value read: %.3f (next reading in %i secs)", ch, rd.value, ch->interval);
pthread_testcancel(); /* test for cancelation request */
sleep(ch->interval); /* else sleep and restart aquisition */
}
/* close channel */
ch->prot->close_func(ch->handle);
return NULL;
}
@ -226,34 +341,56 @@ void *log_thread(void *arg) {
* The main loop
*/
int main(int argc, char * argv[]) {
int num_chans = 0;
struct MHD_Daemon * d;
opts = parse_options(argc, argv); /* parse command line arguments */
chans = parse_channels(opts.config, &num_chans); /* parse channels from configuration */
FILE *file = fopen(opts.config, "r"); /* open configuration */
print(1, "Started %s with verbosity level %i", NULL, argv[0], opts.verbose);
curl_global_init(CURL_GLOBAL_ALL); /* global intialization for all threads */
/* start threads */
for (int i = 0; i < num_chans; i++) {
channel_t * ch = &chans[i];
queue_init(&ch->queue, (BUFFER_LENGTH / ch->interval) + 1); /* initialize queue to buffer 10 minutes of data */
pthread_mutex_init(&ch->mutex, NULL);
pthread_cond_init(&ch->condition, NULL);
if (file == NULL) {
perror(opts.config); /* why didn't the file open? */
exit(EXIT_FAILURE);
pthread_create(&ch->reading_thread, NULL, read_thread, (void *) ch);
pthread_create(&ch->logging_thread, NULL, log_thread, (void *) ch);
}
int i = 0;
char line[256];
pthread_t pthreads[16];
while (fgets(line, sizeof line, file) != NULL) { /* read a line */
if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */
struct channel ch = parse_channel(line);
/* start logging threads */
pthread_create(&pthreads[i++], NULL, log_thread, (void *) &ch);
if (opts.local) { /* start webserver */
d = MHD_start_daemon(
MHD_USE_THREAD_PER_CONNECTION,
opts.port,
NULL, NULL,
handle_request,
&num_chans,
MHD_OPTION_END
);
}
fclose(file);
for (int n = 0; n < i; n++) { /* wait for all threads to terminate */
pthread_join(pthreads[n], NULL);
/* wait for all threads to terminate */
for (int i = 0; i < num_chans; i++) {
channel_t * ch = &chans[i];
pthread_join(ch->reading_thread, NULL);
pthread_join(ch->logging_thread, NULL);
pthread_cond_destroy(&ch->condition);
pthread_mutex_destroy(&ch->mutex);
}
if (opts.local) { /* stop webserver */
MHD_stop_daemon(d);
}
free(chans);
return 0;
}

View file

@ -1,14 +1,89 @@
/**
* Main header file
*
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _MAIN_H_
#define _MAIN_H_
#include <sys/time.h>
#include <pthread.h>
#include "protocol.h"
#include "queue.h"
#define VZ_VERSION "0.2"
#define MAX_CHANNELS 16
#define RETRY_PAUSE 16 /* seconds to wait after failed request */
#define BUFFER_LENGTH 25 /* in seconds */
#ifndef TRUE
#define TRUE 1
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#define FALSE 0
#endif
/**
* Datatype for every channel
*/
typedef struct {
char * middleware;
char * uuid;
unsigned int interval;
char * options;
int id; /* only for internal usage & debugging */
queue_t queue; /* circular queue to buffer readings */
protocol_t *prot; /* pointer to protocol */
void * handle; /* handle to store connection status */
pthread_t reading_thread; /* pthread for asynchronus reading */
pthread_t logging_thread; /* pthread for asynchronus logging */
pthread_mutex_t mutex;
pthread_cond_t condition;
} channel_t;
/**
* Options from command line
*/
typedef struct {
char * config; /* path to config file */
unsigned int port; /* tcp port for local interface */
unsigned int verbose; /* verbosity level */
/* boolean bitfields, at the end of struct */
int daemon:1;
int local:1; /* enable local interface */
} options_t;
/* Prototypes */
options_t parse_options(int argc, char * argv[]);
channel_t * parse_channels(char * filename, int * num_chans);
void print(int level, char * format, channel_t *ch, ... );
void usage(char ** argv);
#endif /* _MAIN_H_ */

23
src/protocol.h Normal file
View file

@ -0,0 +1,23 @@
#ifndef _PROTOCOL_H_
#define _PROTOCOL_H_
#include <sys/time.h>
typedef struct {
float value;
struct timeval tv;
} reading_t;
typedef void *(*ifp_t)(char *options);
typedef void (*cfp_t)(void *handle);
typedef reading_t (*rfp_t)(void *handle);
typedef struct {
char * name; /* short identifier for protocol */
char * desc; /* more detailed description */
rfp_t read_func; /* function pointer to read data */
ifp_t init_func; /* function pointer to init a channel */
cfp_t close_func; /* function pointer to close a channel */
} protocol_t;
#endif /* _PROTOCOL_H_ */

View file

@ -28,16 +28,16 @@
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <fcntl.h>
#include <termios.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <fcntl.h>
#include <termios.h>
#include "obis.h"
void obis_init(char * port) {
void * obis_init(char * port) {
struct termios tio;
int fd;
int *fd = malloc(sizeof(int));
memset(&tio, 0, sizeof(tio));
@ -48,13 +48,21 @@ void obis_init(char * port) {
tio.c_cc[VMIN] = 1;
tio.c_cc[VTIME] = 5;
fd = open(port, O_RDWR); // | O_NONBLOCK);
*fd = open(port, O_RDWR); // | O_NONBLOCK);
cfsetospeed(&tio, B9600); // 9600 baud
cfsetispeed(&tio, B9600); // 9600 baud
return (void *) fd;
}
struct reading obis_get() {
struct reading rd;
void obis_close(void *handle) {
// TODO close serial port
free(handle);
}
reading_t obis_get(void *handle) {
reading_t rd;
rd.value = 33.3334;
gettimeofday(&rd.tv, NULL);

View file

@ -29,9 +29,10 @@
#ifndef _OBIS_H_
#define _OBIS_H_
#include "../api.h"
#include "../protocol.h"
void obis_init(char * port);
struct reading obis_get();
void * obis_init(char * port);
void obis_close(void *handle);
reading_t obis_get(void *handle);
#endif /* _OBIS_H_ */

60
src/queue.c Normal file
View file

@ -0,0 +1,60 @@
#include <stdlib.h>
#include <stdio.h>
#include "queue.h"
bool_t queue_init(queue_t *q, size_t size) {
q->buf = malloc(sizeof(reading_t) * size); /* keep one slot open */
if (q->buf) {
q->size = size;
q->read_p = q->write_p = 0; /* queue is empty */
return TRUE;
}
else {
return FALSE; /* cannot allocate memory */
}
}
bool_t queue_is_full(queue_t *q) {
return (((q->write_p + 1) % q->size) == q->read_p);
}
bool_t queue_is_empty(queue_t *q) {
return (q->read_p == q->write_p);
}
bool_t queue_enque(queue_t *q, reading_t rd) {
q->buf[q->write_p] = rd;
q->write_p++;
q->write_p %= q->size;
return !queue_is_full(q);
}
bool_t queue_deque(queue_t *q, reading_t *rd) {
*rd = q->buf[q->read_p];
q->read_p++;
q->read_p %= q->size;
return !queue_is_empty(q);
}
size_t queue_size(queue_t *q) {
return q->write_p - q->read_p + (q->read_p > q->write_p) * q->size;
}
bool_t queue_first(queue_t *q, reading_t *rd) {
*rd = q->buf[q->read_p];
return !queue_is_empty(q);
}
void queue_print(queue_t *q) {
printf("Queue dump: [%.1f", q->buf[0].value);
for (int i = 1; i < q->size; i++) {
printf("|%.1f", q->buf[i].value);
}
printf("]\n");
}

60
src/queue.h Normal file
View file

@ -0,0 +1,60 @@
/**
* Circular queue to buffer readings
*
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _QUEUE_H_
#define _QUEUE_H_
#include "protocol.h"
#ifndef TRUE
#define TRUE 1
#endif
#ifndef FALSE
#define FALSE 0
#endif
typedef char bool_t;
typedef struct {
size_t size;
int read_p;
int write_p;
reading_t *buf;
} queue_t;
bool_t queue_init(queue_t *q, size_t size);
bool_t queue_is_full(queue_t *q);
bool_t queue_is_empty(queue_t *q);
bool_t queue_enque(queue_t *q, reading_t rd);
bool_t queue_deque(queue_t *q, reading_t *rd);
bool_t queue_first(queue_t *q, reading_t *rd);
size_t queue_size(queue_t *q);
void queue_print(queue_t *q);
#endif /* _QUEUE_H_ */

View file

@ -1,3 +1,3 @@
#middleware;uuid;type;interval;options
http://localhost/workspace/volkszaehler.org/htdocs/middleware;6836dd20-00d5-11e0-bab1-856ed5f959ae;obis;5;/dev/ttyUSB0
http://volkszaehler.org/demo/middleware;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;6;/dev/ttyS0
http://localhost/workspace/volkszaehler.org/htdocs/middleware;a301d8d0-903b-1234-94bb-d943d061b6a8;obis;5;/dev/ttyUSB0
http://volkszaehler.org/demo/middleware.php;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;10;/dev/ttyS0