added threading and configfile parsing for multiple channels

This commit is contained in:
Steffen Vogel 2011-05-25 00:40:20 +02:00
parent 282c2c309a
commit 30afeb4fd0
8 changed files with 391 additions and 257 deletions

View file

@ -1,5 +1,5 @@
CC=cc
CFLAGS=-c -Wall -g
CFLAGS=-c -Wall -g -D_REENTRANT -std=c99
LDFLAGS=
TARGET=vzlogger
@ -8,11 +8,14 @@ all: $(TARGET)
clean:
rm -rf *.o
vzlogger: main.c obis.c
$(CC) $(LDFLAGS) main.o obis.o `curl-config --libs` -l json -o $(TARGET)
vzlogger: main.c obis.c api.c
$(CC) $(LDFLAGS) main.o obis.o api.o `curl-config --libs` -ljson -lpthread -o $(TARGET)
main.c:
$(CC) $(CFLAGS) src/main.c -o main.o `curl-config --cflags`
$(CC) $(CFLAGS) src/main.c -o main.o
api.c:
$(CC) $(CFLAGS) src/api.c -o api.o `curl-config --cflags`
obis.c:
$(CC) $(CFLAGS) src/protocols/obis.c -o obis.o

View file

@ -0,0 +1,161 @@
/**
* Implementation of volkszaehler.org API calls
*
* @author Steffen Vogel <info@steffenvogel.de>
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include <json/json.h>
#include "main.h"
#include "api.h"
extern struct options opts;
/**
* Reformat CURLs debugging output
*/
int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom) {
switch (type) {
case CURLINFO_TEXT:
case CURLINFO_END:
printf("%.*s", (int) size, data);
break;
case CURLINFO_HEADER_IN:
case CURLINFO_HEADER_OUT:
//printf("header: %.*s", size, data);
break;
case CURLINFO_SSL_DATA_IN:
case CURLINFO_DATA_IN:
printf("Received %lu bytes\n", (unsigned long) size);
break;
case CURLINFO_SSL_DATA_OUT:
case CURLINFO_DATA_OUT:
printf("Sent %lu bytes.. ", (unsigned long) size);
break;
}
return 0;
}
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) {
size_t realsize = size * nmemb;
struct curl_response *response = (struct curl_response *) data;
response->data = realloc(response->data, response->size + realsize + 1);
if (response->data == NULL) { /* out of memory! */
fprintf(stderr, "Not enough memory (realloc returned NULL)\n");
exit(EXIT_FAILURE);
}
memcpy(&(response->data[response->size]), ptr, realsize);
response->size += realsize;
response->data[response->size] = 0;
return realsize;
}
/**
* Log against the vz.org middleware with simple HTTP requests via CURL
*/
CURLcode api_log(char * middleware, char * uuid, struct reading read) {
CURL *curl;
CURLcode rc = -1;
int curl_code;
struct curl_response chunk = {NULL, 0};
char url[255], useragent[255], post[255];
/* build request strings */
sprintf(url, "%s/data/%s.json", middleware, uuid); /* build url */
sprintf(useragent, "vzlogger/%s (%s)", VZ_VERSION, curl_version());
sprintf(post, "?timestamp=%lu%lu&value=%f", read.tv.tv_sec, read.tv.tv_usec, read.value);
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if (curl) {
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post);
curl_easy_setopt(curl, CURLOPT_USERAGENT, useragent);
curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) opts.verbose);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk);
if (opts.verbose) printf("Sending request: %s%s\n", url, post);
rc = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code);
if (opts.verbose) printf("Request %s with code: %i\n", (curl_code == 200) ? "succeded" : "failed", curl_code);
if (rc != CURLE_OK) {
fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(rc));
}
else if (chunk.size == 0 || chunk.data == NULL) {
fprintf(stderr, "No data received!\n");
rc = -1;
}
else if (curl_code != 200) { /* parse exception */
struct json_tokener * json_tok;
struct json_object * json_obj;
json_tok = json_tokener_new();
json_obj = json_tokener_parse_ex(json_tok, chunk.data, chunk.size);
if (json_tok->err == json_tokener_success) {
json_obj = json_object_object_get(json_obj, "exception");
if (json_obj) {
fprintf(stderr, "%s [%i]: %s\n",
json_object_get_string(json_object_object_get(json_obj, "type")),
json_object_get_int(json_object_object_get(json_obj, "code")),
json_object_get_string(json_object_object_get(json_obj, "message"))
);
}
else {
fprintf(stderr, "Malformed middleware response: missing exception\n");
}
}
else {
fprintf(stderr, "Malformed middleware response: %s\n", json_tokener_errors[json_tok->err]);
}
rc = -1;
}
curl_easy_cleanup(curl); /* always cleanup */
free(chunk.data); /* free response */
}
else {
fprintf(stderr, "Failed to create CURL handle\n");
}
return rc;
}

View file

@ -0,0 +1,89 @@
/**
* Header file for volkszaehler.org API calls
*
* @author Steffen Vogel <info@steffenvogel.de>
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _API_H_
#define _API_H_
#include <curl/curl.h>
#include <curl/types.h>
#include <curl/easy.h>
#define BUFFER_LENGTH 64
typedef struct reading (*rfp)();
typedef void (*ifp)(char *options);
struct curl_response {
char *data;
size_t size;
};
struct reading {
float value;
struct timeval tv;
};
struct protocol {
char * name; /* short identifier for protocol */
char * desc; /* more detailed description */
rfp read_func; /* function pointer to read data */
ifp init_func; /* function to init a channel */
};
/**
* Datatype for every channel
*/
struct channel {
char * uuid;
char * middleware;
int interval;
char * options;
struct protocol *prot;
struct reading buffer[BUFFER_LENGTH]; /* ring buffer */
};
/**
* Options from command line
*/
struct options {
char * config; /* path to config file */
unsigned int interval; /* seconds */
/* boolean bitfields, at the end of struct */
unsigned int verbose:1;
unsigned int daemon:1;
// unsigned local:1; /* enable local interface */
};
/* Prototypes */
void usage(char ** argv);
struct options parse_options(int argc, char * argv[]);
//struct channels parse_channels(char * filename);
int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom);
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data);
CURLcode api_log(char * middleware, char * uuid, struct reading read);
#endif /* _API_H_ */

View file

@ -1,7 +1,7 @@
/**
* main source
* Main source file
*
* @package controller
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
@ -26,40 +26,28 @@
#define VZ_VERSION "0.2"
#include <stdio.h>
#include <getopt.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
#include <sys/time.h>
#include <json/json.h>
#include <curl/curl.h>
#include <curl/types.h>
#include <curl/easy.h>
#include "main.h"
#include "api.h"
#include "protocols/obis.h"
static struct device devices[] = {
// {"1wire", "Dallas 1-Wire Sensors", 1wire_get},
// {"ccost", "CurrentCost", ccost_get},
// {"fusb", "FluksoUSB prototype board", fusb_get},
{"obis", "Plaintext OBIS", obis_get},
static struct protocol protocols[] = {
{"obis", "Plaintext OBIS", obis_get, obis_init},
{NULL} /* stop condition for iterator */
};
static struct option long_options[] = {
{"middleware", required_argument, 0, 'm'},
{"uuid", required_argument, 0, 'u'},
{"value", required_argument, 0, 'V'},
{"device", required_argument, 0, 'd'},
{"port", required_argument, 0, 'p'},
// {"config", required_argument, 0, 'c'},
{"daemon", required_argument, 0, 'D'},
{"config", required_argument, 0, 'c'},
{"daemon", required_argument, 0, 'd'},
{"interval", required_argument, 0, 'i'},
// {"local", no_argument, 0, 'l'},
// {"local-port", required_argument, 0, 'p'},
@ -69,14 +57,9 @@ static struct option long_options[] = {
};
static char * long_options_descs[] = {
"url to middleware",
"channel uuid",
"sensor value or meter consumption to log",
"device type",
"port the device is connected to",
// "config file with channel -> uuid mapping",
"config file with channel -> uuid mapping",
"run as daemon",
"interval in seconds to log data",
"interval in seconds to read meters",
// "activate local interface (tiny webserver)",
// "TCP port for local interface"
"show this help",
@ -93,7 +76,7 @@ struct options opts;
void usage(char * argv[]) {
char ** desc = long_options_descs;
struct option * op = long_options;
struct device * dev = devices;
struct protocol * prot = protocols;
printf("Usage: %s [options]\n\n", argv[0]);
printf(" following options are available:\n");
@ -105,11 +88,11 @@ void usage(char * argv[]) {
}
printf("\n");
printf(" following device types are available:\n");
printf(" following protocol types are supported:\n");
while (dev->name) {
printf("\t%-12s\t%s\n", dev->name, dev->desc);
dev++;
while (prot->name) {
printf("\t%-12s\t%s\n", prot->name, prot->desc);
prot++;
}
printf("\nvzlogger - volkszaehler.org logging utility VERSION\n");
@ -123,15 +106,17 @@ struct options parse_options(int argc, char * argv[]) {
struct options opts;
/* setting default options */
opts.interval = 300;
opts.verbose = 0;
opts.daemon = 0;
opts.interval = 300; /* seconds */
opts.verbose = FALSE;
opts.daemon = FALSE;
//opts.local = FALSE;
opts.config = NULL;
while (1) {
while (TRUE) {
/* getopt_long stores the option index here. */
int option_index = 0;
int c = getopt_long(argc, argv, "i:m:u:V:t:p:c:hdv", long_options, &option_index);
int c = getopt_long(argc, argv, "i:c:hdv", long_options, &option_index);
/* detect the end of the options. */
if (c == -1)
@ -146,28 +131,11 @@ struct options parse_options(int argc, char * argv[]) {
opts.daemon = 1;
break;
case 'i':
opts.interval = atoi(optarg);
case 'c': /* read config file */
opts.config = (char *) malloc(strlen(optarg)+1);
strcpy(opts.config, optarg);
break;
case 'u':
opts.uuid = (char *) malloc(strlen(optarg)+1);
strcpy(opts.uuid, optarg);
break;
case 'm':
opts.middleware = (char *) malloc(strlen(optarg)+1);
strcpy(opts.middleware, optarg);
break;
case 'p':
opts.port = (char *) malloc(strlen(optarg)+1);
strcpy(opts.port, optarg);
break;
//case 'c': /* read config file */
// break;
case 'h':
case '?':
usage(argv);
@ -178,131 +146,80 @@ struct options parse_options(int argc, char * argv[]) {
return opts;
}
/**
* Reformat CURLs debugging output
*/
int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom) {
switch (type) {
case CURLINFO_TEXT:
case CURLINFO_END:
printf("%.*s", (int) size, data);
break;
struct channel parse_channel(char * line) {
struct channel ch;
struct protocol * prot;
char *tok = strtok(line, ";");
case CURLINFO_HEADER_IN:
case CURLINFO_HEADER_OUT:
//printf("header in: %.*s", size, data);
break;
case CURLINFO_SSL_DATA_IN:
case CURLINFO_DATA_IN:
printf("Received %lu bytes\n", (unsigned long) size);
break;
case CURLINFO_SSL_DATA_OUT:
case CURLINFO_DATA_OUT:
printf("Sent %lu bytes.. ", (unsigned long) size);
break;
for (int i = 0; i < 7 && tok != NULL; i++) {
switch(i) {
case 0: /* middleware */
ch.middleware = (char *) malloc(strlen(tok)+1);
strcpy(ch.middleware, tok);
break;
case 1: /* uuid */
ch.uuid = (char *) malloc(strlen(tok)+1);
strcpy(ch.uuid, tok);
break;
case 2: /* protocol */
prot = protocols; /* reset pointer */
while (prot->name && strcmp(prot->name, tok) != 0) prot++; /* linear search */
ch.prot = prot;
break;
case 3: /* interval */
ch.interval = atoi(tok);
break;
case 4: /* options */
ch.options = (char *) malloc(strlen(tok)+1);
strcpy(ch.options, tok);
break;
}
tok = strtok(NULL, ";");
}
return 0;
}
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) {
size_t realsize = size * nmemb;
struct curl_response *response = (struct curl_response *) data;
response->data = realloc(response->data, response->size + realsize + 1);
if (response->data == NULL) { /* out of memory! */
fprintf(stderr, "Not enough memory (realloc returned NULL)\n");
exit(EXIT_FAILURE);
}
memcpy(&(response->data[response->size]), ptr, realsize);
response->size += realsize;
response->data[response->size] = 0;
return realsize;
if (opts.verbose) printf("Channel parsed: %s\n", line);
return ch;
}
/**
* Log against the vz.org middleware with simple HTTP requests via CURL
* Logging thread
*/
CURLcode api_log(char * middleware, char * uuid, struct reading read) {
CURL *curl;
CURLcode rc = -1;
char url[255], useragent[255], post[255];
int curl_code;
struct curl_response chunk = {NULL, 0};
void *log_thread(void *arg) {
static int threads; /* number of threads already started */
int thread_id = threads++; /* current thread identifier */
struct channel ch;
struct reading rd;
CURLcode rc;
sprintf(url, "%s/data/%s.json", middleware, uuid); /* build url */
sprintf(useragent, "vzlogger/%s (%s)", VZ_VERSION, curl_version());
sprintf(post, "?timestamp=%lu%lu&value=%f", read.tv.tv_sec, read.tv.tv_usec, read.value);
curl_global_init(CURL_GLOBAL_ALL);
curl = curl_easy_init();
if (opts.verbose) printf("Thread #%i started\n", thread_id);
if (curl) {
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post);
curl_easy_setopt(curl, CURLOPT_USERAGENT, useragent);
curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) opts.verbose);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk);
if (opts.verbose) printf("Sending request: %s%s\n", url, post);
rc = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code);
if (opts.verbose) printf("Request %s with code: %i\n", (curl_code == 200) ? "succeded" : "failed", curl_code);
if (rc != CURLE_OK) {
fprintf(stderr, "CURL error: %s\n", curl_easy_strerror(rc));
}
else if (chunk.size == 0 || chunk.data == NULL) {
fprintf(stderr, "No data received!\n");
rc = -1;
}
else if (curl_code != 200) { /* parse exception */
struct json_tokener * json_tok;
struct json_object * json_obj;
ch = *(struct channel *) arg; /* copy channel struct */
json_tok = json_tokener_new();
json_obj = json_tokener_parse_ex(json_tok, chunk.data, chunk.size);
if (json_tok->err == json_tokener_success) {
json_obj = json_object_object_get(json_obj, "exception");
if (json_obj) {
fprintf(stderr, "%s [%i]: %s\n",
json_object_get_string(json_object_object_get(json_obj, "type")),
json_object_get_int(json_object_object_get(json_obj, "code")),
json_object_get_string(json_object_object_get(json_obj, "message"))
);
}
else {
fprintf(stderr, "Malformed middleware response: missing exception\n");
}
}
else {
fprintf(stderr, "Malformed middleware response: %s\n", json_tokener_errors[json_tok->err]);
}
rc = -1;
}
curl_easy_cleanup(curl); /* always cleanup */
free(chunk.data); /* free response */
}
else {
fprintf(stderr, "Failed to create CURL handle\n");
ch.prot->init_func(ch.options); /* init sensor/meter */
log:
rd = ch.prot->read_func(); /* aquire reading */
rc = api_log(ch.middleware, ch.uuid, rd); /* log reading */
if (rc != CURLE_OK) {
if (opts.verbose) printf("Delaying next transmission for 15 minutes due to pervious error\n");
sleep(15);
}
return rc;
if (opts.daemon) {
if (opts.verbose) printf("Sleeping %i seconds for next transmission\n", ch.interval);
sleep(ch.interval);
goto log;
}
return NULL;
}
/**
@ -311,24 +228,29 @@ CURLcode api_log(char * middleware, char * uuid, struct reading read) {
int main(int argc, char * argv[]) {
opts = parse_options(argc, argv); /* parse command line arguments */
struct reading rd;
log: /* start logging */
rd.value = 33.333;
gettimeofday(&rd.tv, NULL);
FILE *file = fopen(opts.config, "r"); /* open configuration */
CURLcode rc = api_log(opts.middleware, opts.uuid, rd);
if (rc != CURLE_OK) {
if (opts.verbose) printf("Delaying next transmission for 15 minutes due to pervious error\n");
sleep(15*60);
if (file == NULL) {
perror(opts.config); /* why didn't the file open? */
exit(EXIT_FAILURE);
}
if (opts.daemon) {
if (opts.verbose) printf("Sleeping %i seconds for next transmission\n", opts.interval);
sleep(opts.interval);
goto log;
int i = 0;
char line[256];
pthread_t pthreads[16];
while (fgets(line, sizeof line, file) != NULL) { /* read a line */
if (line[0] == '#' || line[0] == '\n') continue; /* skip comments */
struct channel ch = parse_channel(line);
/* start logging threads */
pthread_create(&pthreads[i++], NULL, log_thread, (void *) &ch);
}
fclose(file);
for (int n = 0; n < i; n++) { /* wait for all threads to terminate */
pthread_join(pthreads[n], NULL);
}
return 0;

View file

@ -1,64 +1,14 @@
/**
* main header
*
* @package controller
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _MAIN_H_
#define _MAIN_H_
typedef struct reading (*rfp)();
#define VZ_VERSION "0.2"
struct curl_response {
char *data;
size_t size;
};
#ifndef TRUE
#define TRUE 1
#endif
struct reading {
float value;
struct timeval tv;
};
struct device {
char * name;
char * desc;
rfp read_fnct;
};
struct options {
unsigned interval; /* interval in seconds, the daemon send data */
char * middleware; /* url to middleware server */
/* following options should be replace by a list of connected/configured sensors/meters */
char * uuid; /* universal unique channel identifier */
char * port; /* port your sensor is connected to */
unsigned verbose:1; /* boolean bitfield, at the end of struct */
unsigned daemon:1; /* boolean bitfield */
};
/* Prototypes */
void usage(char ** argv);
struct options parse_options(int argc, char * argv[]);
CURLcode api_log(char * middleware, char * uuid, struct reading read);
#ifndef FALSE
#define FALSE 0
#endif
#endif /* _MAIN_H_ */

View file

@ -4,7 +4,7 @@
* This is our example protocol. Use this skeleton to add your own
* protocols and meters.
*
* @package controller
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
@ -31,8 +31,11 @@
#include <fcntl.h>
#include <termios.h>
#include <string.h>
#include <sys/time.h>
int obis_init(char * port) {
#include "obis.h"
void obis_init(char * port) {
struct termios tio;
int fd;
@ -48,11 +51,14 @@ int obis_init(char * port) {
fd = open(port, O_RDWR); // | O_NONBLOCK);
cfsetospeed(&tio, B9600); // 9600 baud
cfsetispeed(&tio, B9600); // 9600 baud
}
struct reading obis_get() {
struct reading rd;
return fd;
}
float obis_get() {
return 0;
rd.value = 33.3334;
gettimeofday(&rd.tv, NULL);
return rd;
}

View file

@ -4,7 +4,7 @@
* This is our example protocol. Use this skeleton to add your own
* protocols and meters.
*
* @package controller
* @package vzlogger
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
* @author Steffen Vogel <info@steffenvogel.de>
@ -29,9 +29,9 @@
#ifndef _OBIS_H_
#define _OBIS_H_
#include "../main.h"
#include "../api.h"
int obis_init(char * port);
void obis_init(char * port);
struct reading obis_get();
#endif /* _OBIS_H_ */

View file

@ -0,0 +1,3 @@
#middleware;uuid;type;interval;options
http://localhost/workspace/volkszaehler.org/htdocs/middleware;6836dd20-00d5-11e0-bab1-856ed5f959ae;obis;5;/dev/ttyUSB0
http://volkszaehler.org/demo/middleware;ef0e9adf-cd9e-4d9a-92c5-b4fb4c89ff98;obis;6;/dev/ttyS0