added some comments and one wire support to vzlogger

This commit is contained in:
Steffen Vogel 2011-05-27 22:26:50 +02:00
parent d31fc12e5b
commit 850784b71b
9 changed files with 224 additions and 84 deletions

View file

@ -8,8 +8,8 @@ all: $(TARGET)
clean:
rm -rf *.o
vzlogger: main.c api.c local.c queue.c obis.c
$(CC) $(LDFLAGS) main.o api.o local.o queue.o obis.o `curl-config --libs` -ljson -lpthread -o $(TARGET) -lmicrohttpd -lm
vzlogger: main.c api.c local.c queue.c 1wire.c obis.c
$(CC) $(LDFLAGS) main.o api.o local.o queue.o 1wire.o obis.o `curl-config --libs` -ljson -lpthread -o $(TARGET) -lmicrohttpd -lm
main.c:
$(CC) $(CFLAGS) src/main.c -o main.o
@ -20,9 +20,11 @@ api.c:
local.c:
$(CC) $(CFLAGS) src/local.c -o local.o
queue.c:
$(CC) $(CFLAGS) src/queue.c -o queue.o
1wire.c:
$(CC) $(CFLAGS) src/protocols/1wire.c -o 1wire.o
obis.c:
$(CC) $(CFLAGS) src/protocols/obis.c -o obis.o

View file

@ -35,6 +35,7 @@ extern options_t opts;
int handle_request(void *cls, struct MHD_Connection *connection, const char *url, const char *method,
const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) {
const char * json_str;
const char * uuid = url + 1;
int ret;
int num_chans = *(int *) cls;
@ -43,43 +44,45 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
struct MHD_Response *response;
struct json_object * json_obj = json_object_new_object();
struct json_object * json_data = json_object_new_array();
struct json_object * json_data = json_object_new_object();
for (int i = 0; i < num_chans; i++) {
channel_t *ch = &chans[i];
reading_t rd;
struct json_object * json_channel = json_object_new_object();
struct json_object * json_tuples = json_object_new_array();
int j = ch->queue.write_p;
do {
if (strcmp(ch->uuid, uuid) == 0) {
pthread_mutex_lock(&ch->mutex);
rd = ch->queue.buf[j];
pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data comet-like blocking of HTTP response */
pthread_mutex_unlock(&ch->mutex);
if (rd.value != 0) {
struct json_object * json_tuple = json_object_new_array();
int timestamp = rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000;
struct json_object * json_tuples = json_object_new_array();
json_object_array_add(json_tuple, json_object_new_int(timestamp));
json_object_array_add(json_tuple, json_object_new_double(rd.value));
json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */
int j = ch->queue.write_p;
do {
pthread_mutex_lock(&ch->mutex);
rd = ch->queue.buf[j];
pthread_mutex_unlock(&ch->mutex);
json_object_array_add(json_tuples, json_tuple);
}
if (rd.value != 0) { /* skip invalid / empty readings */
struct json_object * json_tuple = json_object_new_array();
j++;
j %= ch->queue.size;
} while (j != ch->queue.read_p);
int timestamp = rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000;
json_object_array_add(json_tuple, json_object_new_int(timestamp));
json_object_array_add(json_tuple, json_object_new_double(rd.value));
json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */
json_object_array_add(json_tuples, json_tuple);
}
j++;
j %= ch->queue.size;
} while (j != ch->queue.read_p);
json_object_object_add(json_channel, "uuid", json_object_new_string(ch->uuid));
json_object_object_add(json_channel, "interval", json_object_new_int(ch->interval));
json_object_object_add(json_channel, "tuples", json_tuples);
json_object_array_add(json_data, json_channel);
json_object_object_add(json_data, "uuid", json_object_new_string(ch->uuid));
json_object_object_add(json_data, "interval", json_object_new_int(ch->interval));
json_object_object_add(json_data, "tuples", json_tuples);
}
}
json_object_object_add(json_obj, "version", json_object_new_string(VZ_VERSION));

View file

@ -42,14 +42,23 @@
#include "local.h"
#include "protocols/obis.h"
#include "protocols/1wire.h"
/**
* List of available protocols
* incl. function pointers
*/
static protocol_t protocols[] = {
{"obis", "Plaintext OBIS", obis_get, obis_init, obis_close},
// {"fluksousb", "FluksoUSB board", flukso_get, flukso_init, flukso_close},
// {"onewire", "Dallas 1-Wire sensors",onewire_get, onewire_init, onewire_close},
{"obis", "Plaintext OBIS", obis_get, obis_init, obis_close},
// {"fluksousb", "FluksoUSB board", flukso_get, flukso_init, flukso_close},
{"1wire", "Dallas 1-Wire sensors (via OWFS)", onewire_get, onewire_init, onewire_close},
{NULL} /* stop condition for iterator */
};
/**
* Command line options
*/
static struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"daemon", required_argument, 0, 'd'},
@ -61,6 +70,9 @@ static struct option long_options[] = {
{NULL} /* stop condition for iterator */
};
/**
* Descriptions vor command line options
*/
static char * long_options_descs[] = {
"config file with channel -> uuid mapping",
"run as daemon",
@ -72,9 +84,15 @@ static char * long_options_descs[] = {
NULL /* stop condition for iterator */
};
/* globals */
options_t opts;
channel_t * chans; /* mem gets allocated in parse_channels(), and freed in main() */
/* Global variables */
channel_t chans[MAX_CHANNELS]; // TODO use dynamic allocation
options_t opts = { /* setting default options */
"vzlogger.conf", /* config file */
8080, /* port for local interface */
0, /* debug level / verbosity */
FALSE, /* daemon mode */
FALSE /* local interface */
};
/**
* Print availble options and some other usefull information
@ -107,6 +125,9 @@ void usage(char * argv[]) {
/**
* Wrapper to log notices and errors
*
* @param ch could be NULL for general messages
* @todo integrate into syslog
*/
void print(int level, char * format, channel_t *ch, ... ) {
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
@ -143,21 +164,12 @@ void print(int level, char * format, channel_t *ch, ... ) {
/**
* Parse options from command line
*/
options_t parse_options(int argc, char * argv[]) {
options_t opts;
/* setting default options */
opts.daemon = FALSE;
opts.local = FALSE;
opts.verbose = 0;
opts.port = 8080;
opts.config = "vzlogger.conf";
void parse_options(int argc, char * argv[], options_t * opts) {
while (TRUE) {
/* getopt_long stores the option index here. */
int option_index = 0;
int c = getopt_long(argc, argv, "i:c:p:lhdv:", long_options, &option_index);
int c = getopt_long(argc, argv, "i:c:p:lhdv", long_options, &option_index);
/* detect the end of the options. */
if (c == -1)
@ -165,24 +177,25 @@ options_t parse_options(int argc, char * argv[]) {
switch (c) {
case 'v':
opts.verbose = (optarg == NULL) ? 1 : atoi(optarg);
opts->verbose = (optarg == NULL) ? 1 : atoi(optarg);
break;
case 'l':
opts.local = TRUE;
opts->local = TRUE;
opts->daemon = TRUE; /* implicates daemon mode */
break;
case 'd':
opts.daemon = TRUE;
opts->daemon = TRUE;
break;
case 'p': /* port for local interface */
opts.port = atoi(optarg);
opts->port = atoi(optarg);
break;
case 'c': /* read config file */
opts.config = (char *) malloc(strlen(optarg)+1);
strcpy(opts.config, optarg);
opts->config = (char *) malloc(strlen(optarg)+1);
strcpy(opts->config, optarg);
break;
case 'h':
@ -191,11 +204,9 @@ options_t parse_options(int argc, char * argv[]) {
exit((c == '?') ? EXIT_FAILURE : EXIT_SUCCESS);
}
}
return opts;
}
channel_t * parse_channels(char * filename, int * num_chans) {
int parse_channels(char * filename, channel_t * chans) {
FILE *file = fopen(filename, "r"); /* open configuration */
if (file == NULL) {
@ -206,9 +217,7 @@ channel_t * parse_channels(char * filename, int * num_chans) {
char line[256];
int j = 0;
channel_t *chans = malloc(sizeof(channel_t) * MAX_CHANNELS);
while (fgets(line, sizeof line, file) != NULL) { /* read a line */
while (j < MAX_CHANNELS && fgets(line, sizeof line, file) != NULL) { /* read a line */
if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */
channel_t ch;
@ -216,14 +225,16 @@ channel_t * parse_channels(char * filename, int * num_chans) {
char *tok = strtok(line, ";");
for (int i = 0; i < 7 && tok != NULL; i++) {
size_t len = strlen(tok);
switch(i) {
case 0: /* middleware */
ch.middleware = (char *) malloc(strlen(tok)+1);
ch.middleware = (char *) malloc(len+1); /* including string termination */
strcpy(ch.middleware, tok);
break;
case 1: /* uuid */
ch.uuid = (char *) malloc(strlen(tok)+1);
ch.uuid = (char *) malloc(len+1); /* including string termination */
strcpy(ch.uuid, tok);
break;
@ -238,8 +249,9 @@ channel_t * parse_channels(char * filename, int * num_chans) {
break;
case 4: /* options */
ch.options = (char *) malloc(strlen(tok)+1);
strcpy(ch.options, tok);
ch.options = (char *) malloc(len);
strncpy(ch.options, tok, len-1);
ch.options[len] = '\0'; /* replace \n by \0 */
break;
}
@ -248,14 +260,13 @@ channel_t * parse_channels(char * filename, int * num_chans) {
ch.id = j;
print(1, "Parsed (on %s)", &ch, ch.middleware);
print(1, "Parsed %s (on %s)", &ch, ch.uuid, ch.middleware);
chans[j++] = ch;
}
fclose(file);
*num_chans = j;
return chans;
return j;
}
/**
@ -271,7 +282,7 @@ void *log_thread(void *arg) {
print(1, "Started logging thread", ch);
while (TRUE) {
do {
pthread_mutex_lock(&ch->mutex);
while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */
pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data */
@ -291,13 +302,12 @@ void *log_thread(void *arg) {
pthread_mutex_unlock(&ch->mutex);
}
else {
print(1, "Delaying next transmission for 15 minutes due to pervious error", ch);
print(1, "Delaying next transmission for %i seconds due to pervious error", ch, RETRY_PAUSE);
sleep(RETRY_PAUSE);
}
}
pthread_testcancel(); /* test for cancelation request */
}
} while (opts.daemon);
return NULL;
}
@ -315,7 +325,7 @@ void *read_thread(void *arg) {
/* initalize channel */
ch->handle = ch->prot->init_func(ch->options); /* init sensor/meter */
while (TRUE) {
do {
reading_t rd = ch->prot->read_func(ch->handle); /* aquire reading */
pthread_mutex_lock(&ch->mutex);
@ -329,7 +339,7 @@ void *read_thread(void *arg) {
pthread_testcancel(); /* test for cancelation request */
sleep(ch->interval); /* else sleep and restart aquisition */
}
} while (opts.daemon);
/* close channel */
ch->prot->close_func(ch->handle);
@ -341,30 +351,32 @@ void *read_thread(void *arg) {
* The main loop
*/
int main(int argc, char * argv[]) {
int num_chans = 0;
int num_chans;
struct MHD_Daemon * d;
opts = parse_options(argc, argv); /* parse command line arguments */
chans = parse_channels(opts.config, &num_chans); /* parse channels from configuration */
parse_options(argc, argv, &opts); /* parse command line arguments */
num_chans = parse_channels(opts.config, chans); /* parse channels from configuration */
print(1, "Started %s with verbosity level %i", NULL, argv[0], opts.verbose);
curl_global_init(CURL_GLOBAL_ALL); /* global intialization for all threads */
/* start threads */
for (int i = 0; i < num_chans; i++) {
channel_t * ch = &chans[i];
queue_init(&ch->queue, (BUFFER_LENGTH / ch->interval) + 1); /* initialize queue to buffer 10 minutes of data */
/* initialize thread syncronization helpers */
pthread_mutex_init(&ch->mutex, NULL);
pthread_cond_init(&ch->condition, NULL);
/* start threads */
pthread_create(&ch->reading_thread, NULL, read_thread, (void *) ch);
pthread_create(&ch->logging_thread, NULL, log_thread, (void *) ch);
}
if (opts.local) { /* start webserver */
/* start webserver for local interface */
if (opts.local) {
d = MHD_start_daemon(
MHD_USE_THREAD_PER_CONNECTION,
opts.port,
@ -382,6 +394,12 @@ int main(int argc, char * argv[]) {
pthread_join(ch->reading_thread, NULL);
pthread_join(ch->logging_thread, NULL);
free(ch->middleware);
free(ch->uuid);
free(ch->options);
queue_free(&ch->queue);
pthread_cond_destroy(&ch->condition);
pthread_mutex_destroy(&ch->mutex);
}
@ -390,7 +408,5 @@ int main(int argc, char * argv[]) {
MHD_stop_daemon(d);
}
free(chans);
return 0;
}

View file

@ -35,8 +35,8 @@
#define VZ_VERSION "0.2"
#define MAX_CHANNELS 16
#define RETRY_PAUSE 16 /* seconds to wait after failed request */
#define BUFFER_LENGTH 25 /* in seconds */
#define RETRY_PAUSE 600 /* seconds to wait after failed request */
#define BUFFER_LENGTH 600 /* in seconds */
#ifndef TRUE
#define TRUE 1
@ -81,8 +81,8 @@ typedef struct {
} options_t;
/* Prototypes */
options_t parse_options(int argc, char * argv[]);
channel_t * parse_channels(char * filename, int * num_chans);
void parse_options(int argc, char * argv[], options_t *opts);
int parse_channels(char * filename, channel_t *chans);
void print(int level, char * format, channel_t *ch, ... );
void usage(char ** argv);

View file

@ -0,0 +1,76 @@
/**
* Wrapper to read Dallas 1-wire Sensors via the 1-wire Filesystem (owfs)
*
* This is our example protocol. Use this skeleton to add your own
* protocols and meters.
*
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
* @author Mathias Dalheimer <md@gonium.net>
* based heavily on libehz (https://github.com/gonium/libehz.git)
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include "../main.h"
#include "../protocol.h"
#include "1wire.h"
/**
* Initialize sensor
*
* @param address path to the sensor in the owfs
* @return pointer to file descriptor
*/
void * onewire_init(char * address) {
FILE * fd = fopen(address, "r");
if (fd == NULL) {
perror(address);
print(-1, "Failed to open sensor: %s", NULL, address);
exit(EXIT_FAILURE);
}
return (void *) fd;
}
void onewire_close(void *handle) {
fclose((FILE *) handle);
}
reading_t onewire_get(void *handle) {
reading_t rd;
char buffer[16];
int bytes;
rewind((FILE *) handle);
bytes = fread(buffer, 1, 16, (FILE *) handle);
if (bytes) {
print(4, "Read from sensor file: %s", NULL, buffer);
rd.value = strtof(buffer, NULL);
gettimeofday(&rd.tv, NULL);
}
return rd;
}

View file

@ -0,0 +1,38 @@
/**
* Wrapper to read Dallas 1-wire Sensors via the 1-wire Filesystem (owfs)
*
* This is our example protocol. Use this skeleton to add your own
* protocols and meters.
*
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _1WIRE_H_
#define _1WIRE_H_
#include "../protocol.h"
void * onewire_init(char * port);
void onewire_close(void *handle);
reading_t onewire_get(void *handle);
#endif /* _1WIRE_H_ */

View file

@ -58,3 +58,7 @@ void queue_print(queue_t *q) {
}
printf("]\n");
}
void queue_free(queue_t *q) {
free(q->buf);
}

View file

@ -55,6 +55,7 @@ bool_t queue_deque(queue_t *q, reading_t *rd);
bool_t queue_first(queue_t *q, reading_t *rd);
size_t queue_size(queue_t *q);
void queue_print(queue_t *q);
void queue_free(queue_t *q);
#endif /* _QUEUE_H_ */

View file

@ -1,3 +1,3 @@
#middleware;uuid;type;interval;options
http://localhost/workspace/volkszaehler.org/htdocs/middleware;a301d8d0-903b-1234-94bb-d943d061b6a8;obis;5;/dev/ttyUSB0
http://volkszaehler.org/demo/middleware.php;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;10;/dev/ttyS0
http://localhost/workspace/volkszaehler.org/htdocs/middleware;52960fe0-8882-11e0-b356-85eba28c1922;obis;10;/mnt/1wire/10.12E6D3000800/temperature
#http://volkszaehler.org/demo/middleware.php;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;10;/dev/ttyS0