Compare commits

...

16 commits

Author SHA1 Message Date
Steffen Vogel
f5931dcc7e improved exception handling on failed HTTP requests 2012-03-12 16:12:24 +01:00
Steffen Vogel
319ee643ad added registry to manage memory for string identifiers 2012-03-12 01:32:54 +01:00
Steffen Vogel
26009e9cc8 removed unused variable 2012-02-21 19:22:55 +01:00
Steffen Vogel
cd330dd76a Merge branch 'master' of github.com:volkszaehler/vzlogger 2012-02-19 16:14:31 +01:00
Steffen Vogel
1f8e6ba783 release 0.3.3 2012-02-19 16:05:53 +01:00
Steffen Vogel
7a60100944 added serial support for D0 protocol (thanks to Kai Krüger, ITWM!) 2012-02-10 10:10:00 +01:00
Steffen Vogel
a0c17f2ead code cleanup 2012-02-09 10:03:05 +01:00
Steffen Vogel
f7651ca9dd some more memory optimizations 2012-02-03 15:28:09 +01:00
Steffen Vogel
5aca528242 removed old stuff 2012-02-03 15:27:49 +01:00
Steffen Vogel
17865a660c modularized api stuff (a step forward to support multiple protocols) 2012-02-03 15:26:07 +01:00
Steffen Vogel
a15aa43ca3 added routines to free allocated memory 2012-02-03 15:24:16 +01:00
Steffen Vogel
d67c965d46 fixed #131 (thanks to r00t!!) 2012-02-03 13:51:52 +01:00
Steffen Vogel
acae400cb0 fixed protocol descriptions 2012-01-28 17:55:30 +01:00
Steffen Vogel
3443e1d809 removed binary from repostory 2012-01-22 15:53:12 +01:00
Steffen Vogel
617c4e0282 fixed invalid header 2012-01-22 15:52:36 +01:00
Justin Otherguy
f950fe7e37 Merge pull request #2 from stv0g/master
new configuration scheme and protocol: "file"
2011-12-05 14:10:34 -08:00
30 changed files with 312 additions and 132 deletions

3
.gitignore vendored
View file

@ -1,8 +1,7 @@
*~ *~
# Binaries # Binaries
/bin/logger/vzlogger /src/vzlogger
/bin/reader/reader
# Compiled Object files # Compiled Object files
*.slo *.slo

2
debian/changelog vendored
View file

@ -1,4 +1,4 @@
vzlogger (0.3.3-rc1) stable; urgency=low vzlogger (0.3.3-1) stable; urgency=low
* added support for new fluksometer * added support for new fluksometer
* some code refactoring & cleanup * some code refactoring & cleanup

View file

@ -39,7 +39,13 @@ typedef struct {
size_t size; size_t size;
} CURLresponse; } CURLresponse;
CURL * api_curl_init(channel_t *ch); typedef struct {
CURL *curl;
struct curl_slist *headers;
} api_handle_t;
int api_init(channel_t *ch, api_handle_t *api);
void api_free(api_handle_t *api);
/** /**
* Reformat CURLs debugging output * Reformat CURLs debugging output
@ -61,6 +67,6 @@ json_object * api_json_tuples(buffer_t *buf, reading_t *first, reading_t *last);
/** /**
* Parses JSON encoded exception and stores describtion in err * Parses JSON encoded exception and stores describtion in err
*/ */
void api_parse_exception(CURLresponse response, char *err, size_t n); int api_parse_exception(CURLresponse response, char *err, size_t n);
#endif /* _API_H_ */ #endif /* _API_H_ */

View file

@ -50,7 +50,7 @@ reading_t * buffer_push(buffer_t *buf, reading_t *rd);
void buffer_free(buffer_t *buf); void buffer_free(buffer_t *buf);
void buffer_clean(buffer_t *buf); void buffer_clean(buffer_t *buf);
void buffer_clear(buffer_t *buf); void buffer_clear(buffer_t *buf);
char * buffer_dump(buffer_t *buf, char *dump, int len); char * buffer_dump(buffer_t *buf, char *dump, size_t len);
#endif /* _BUFFER_H_ */ #endif /* _BUFFER_H_ */

View file

@ -41,7 +41,6 @@ typedef struct channel {
pthread_cond_t condition; /* pthread syncronization to notify logging thread and local webserver */ pthread_cond_t condition; /* pthread syncronization to notify logging thread and local webserver */
pthread_t thread; /* pthread for asynchronus logging */ pthread_t thread; /* pthread for asynchronus logging */
pthread_status_t status; /* status of thread */
char *middleware; /* url to middleware */ char *middleware; /* url to middleware */
char *uuid; /* unique identifier for middleware */ char *uuid; /* unique identifier for middleware */

View file

@ -91,6 +91,7 @@ typedef struct {
/* function pointers */ /* function pointers */
int (*init_func)(meter_t *mtr, list_t options); int (*init_func)(meter_t *mtr, list_t options);
void (*free_func)(meter_t *mtr);
int (*open_func)(meter_t *mtr); int (*open_func)(meter_t *mtr);
int (*close_func)(meter_t *mtr); int (*close_func)(meter_t *mtr);
size_t (*read_func)(meter_t *mtr, reading_t *rds, size_t n); size_t (*read_func)(meter_t *mtr, reading_t *rds, size_t n);

View file

@ -47,6 +47,7 @@ struct meter;
struct reading; struct reading;
int meter_init_d0(struct meter *mtr, list_t options); int meter_init_d0(struct meter *mtr, list_t options);
void meter_free_d0(struct meter *mtr);
int meter_open_d0(struct meter *mtr); int meter_open_d0(struct meter *mtr);
int meter_close_d0(struct meter *mtr); int meter_close_d0(struct meter *mtr);
size_t meter_read_d0(struct meter *mtr, struct reading *rds, size_t n); size_t meter_read_d0(struct meter *mtr, struct reading *rds, size_t n);
@ -60,4 +61,6 @@ size_t meter_read_d0(struct meter *mtr, struct reading *rds, size_t n);
*/ */
int meter_d0_open_socket(const char *node, const char *service); int meter_d0_open_socket(const char *node, const char *service);
int meter_d0_open_device(const char *device, struct termios *old_tio, speed_t baudrate);
#endif /* _D0_H_ */ #endif /* _D0_H_ */

View file

@ -38,6 +38,7 @@ struct meter;
struct reading; struct reading;
int meter_init_exec(struct meter *mtr, list_t options); int meter_init_exec(struct meter *mtr, list_t options);
void meter_free_exec(struct meter *mtr);
int meter_open_exec(struct meter *mtr); int meter_open_exec(struct meter *mtr);
int meter_close_exec(struct meter *mtr); int meter_close_exec(struct meter *mtr);
size_t meter_read_exec(struct meter *mtr, struct reading *rds, size_t n); size_t meter_read_exec(struct meter *mtr, struct reading *rds, size_t n);

View file

@ -41,6 +41,7 @@ struct meter;
struct reading; struct reading;
int meter_init_file(struct meter *mtr, list_t options); int meter_init_file(struct meter *mtr, list_t options);
void meter_free_file(struct meter *mtr);
int meter_open_file(struct meter *mtr); int meter_open_file(struct meter *mtr);
int meter_close_file(struct meter *mtr); int meter_close_file(struct meter *mtr);
size_t meter_read_file(struct meter *mtr, struct reading *rds, size_t n); size_t meter_read_file(struct meter *mtr, struct reading *rds, size_t n);

View file

@ -26,6 +26,8 @@
#ifndef _FLUKSOV2_H_ #ifndef _FLUKSOV2_H_
#define _FLUKSOV2_H_ #define _FLUKSOV2_H_
#define FLUKSOV2_DEFAULT_FIFO "/var/run/spid/delta/out"
typedef struct { typedef struct {
char *fifo; char *fifo;
@ -37,6 +39,7 @@ struct meter;
struct reading; struct reading;
int meter_init_fluksov2(struct meter *mtr, list_t options); int meter_init_fluksov2(struct meter *mtr, list_t options);
void meter_free_fluksov2(struct meter *mtr);
int meter_open_fluksov2(struct meter *mtr); int meter_open_fluksov2(struct meter *mtr);
int meter_close_fluksov2(struct meter *mtr); int meter_close_fluksov2(struct meter *mtr);
size_t meter_read_fluksov2(struct meter *mtr, struct reading *rds, size_t n); size_t meter_read_fluksov2(struct meter *mtr, struct reading *rds, size_t n);

View file

@ -38,6 +38,7 @@ struct reading;
double ltqnorm(double p); double ltqnorm(double p);
int meter_init_random(struct meter *mtr, list_t options); int meter_init_random(struct meter *mtr, list_t options);
void meter_free_random(struct meter *mtr);
int meter_open_random(struct meter *mtr); int meter_open_random(struct meter *mtr);
int meter_close_random(struct meter *mtr); int meter_close_random(struct meter *mtr);
size_t meter_read_random(struct meter *mtr, struct reading *rds, size_t n); size_t meter_read_random(struct meter *mtr, struct reading *rds, size_t n);

View file

@ -41,6 +41,7 @@ struct meter;
struct reading; struct reading;
int meter_init_s0(struct meter *mtr, list_t options); int meter_init_s0(struct meter *mtr, list_t options);
void meter_free_s0(struct meter *mtr);
int meter_open_s0(struct meter *mtr); int meter_open_s0(struct meter *mtr);
int meter_close_s0(struct meter *mtr); int meter_close_s0(struct meter *mtr);
size_t meter_read_s0(struct meter *mtr, struct reading *rds, size_t n); size_t meter_read_s0(struct meter *mtr, struct reading *rds, size_t n);

View file

@ -61,6 +61,13 @@ struct reading;
*/ */
int meter_init_sml(struct meter *mtr, list_t options); int meter_init_sml(struct meter *mtr, list_t options);
/**
* Freeing allocated resources during initialization
*
* @param mtr the meter structure
*/
void meter_free_sml(struct meter *mtr);
/** /**
* Open connection via serial port or socket to meter * Open connection via serial port or socket to meter
* *

View file

@ -51,12 +51,20 @@ typedef struct reading {
enum meter_procotol; /* forward declaration */ enum meter_procotol; /* forward declaration */
/**
* Manages memory for string identifiers
*
* @param str string to lookup in the registry
* @return pointer to string in the regstry
*/
char * reading_id_registry(const char *str);
/** /**
* Parse two reading identifiers in a given protocol context * Parse two reading identifiers in a given protocol context
* *
* @return result like in strcmp() * @return result like in strcmp()
*/ */
int meter_id_compare(enum meter_procotol, reading_id_t a, reading_id_t b); int reading_id_compare(enum meter_procotol prot, reading_id_t a, reading_id_t b);
/** /**
* Parse identifier by a given string and protocol * Parse identifier by a given string and protocol

View file

@ -35,14 +35,6 @@
#include "common.h" #include "common.h"
#include "list.h" #include "list.h"
/* enumerations */
typedef enum {
status_unknown,
status_running,
status_terminated,
status__cancelled
} pthread_status_t;
/** /**
* Type for mapping channels to meters * Type for mapping channels to meters
*/ */
@ -51,7 +43,6 @@ typedef struct map {
list_t channels; list_t channels;
pthread_t thread; pthread_t thread;
pthread_status_t status;
} map_t; } map_t;
/* prototypes */ /* prototypes */

View file

@ -107,59 +107,65 @@ json_object * api_json_tuples(buffer_t *buf, reading_t *first, reading_t *last)
return json_tuples; return json_tuples;
} }
CURL * api_curl_init(channel_t *ch) { int api_init(channel_t *ch, api_handle_t *api) {
CURL *curl;
struct curl_slist *header = NULL;
char url[255], agent[255]; char url[255], agent[255];
/* prepare header, uuid & url */ /* prepare header, uuid & url */
sprintf(agent, "User-Agent: %s/%s (%s)", PACKAGE, VERSION, curl_version()); /* build user agent */ sprintf(agent, "User-Agent: %s/%s (%s)", PACKAGE, VERSION, curl_version()); /* build user agent */
sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */ sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */
header = curl_slist_append(header, "Content-type: application/json"); api->headers = NULL;
header = curl_slist_append(header, "Accept: application/json"); api->headers = curl_slist_append(api->headers, "Content-type: application/json");
header = curl_slist_append(header, agent); api->headers = curl_slist_append(api->headers, "Accept: application/json");
api->headers = curl_slist_append(api->headers, agent);
curl = curl_easy_init(); api->curl = curl_easy_init();
if (!curl) { if (!api->curl) {
print(log_error, "CURL: cannot create handle", ch); return EXIT_FAILURE;
exit(EXIT_FAILURE);
} }
curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(api->curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header); curl_easy_setopt(api->curl, CURLOPT_HTTPHEADER, api->headers);
curl_easy_setopt(curl, CURLOPT_VERBOSE, options.verbosity); curl_easy_setopt(api->curl, CURLOPT_VERBOSE, options.verbosity);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback); curl_easy_setopt(api->curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch); curl_easy_setopt(api->curl, CURLOPT_DEBUGDATA, (void *) ch);
return curl; return EXIT_SUCCESS;
} }
void api_parse_exception(CURLresponse response, char *err, size_t n) { void api_free(api_handle_t *api) {
curl_easy_cleanup(api->curl);
curl_slist_free_all(api->headers);
}
int api_parse_exception(CURLresponse response, char *err, size_t n) {
struct json_tokener *json_tok; struct json_tokener *json_tok;
struct json_object *json_obj; struct json_object *json_obj;
json_tok = json_tokener_new(); json_tok = json_tokener_new();
json_obj = json_tokener_parse_ex(json_tok, response.data, response.size); json_obj = json_tokener_parse_ex(json_tok, response.data, response.size);
if (json_tok->err == json_tokener_success) { if (json_tok->err != json_tokener_success) {
json_obj = json_object_object_get(json_obj, "exception"); json_tokener_free(json_tok);
return ERR;
}
if (json_obj) { json_obj = json_object_object_get(json_obj, "exception");
snprintf(err, n, "%s: %s",
json_object_get_string(json_object_object_get(json_obj, "type")), if (json_obj) {
json_object_get_string(json_object_object_get(json_obj, "message")) snprintf(err, n, "%s: %s",
); json_object_get_string(json_object_object_get(json_obj, "type")),
} json_object_get_string(json_object_object_get(json_obj, "message"))
else { );
strncpy(err, "missing exception", n);
}
} }
else { else {
strncpy(err, json_tokener_errors[json_tok->err], n); json_object_put(json_obj);
json_tokener_free(json_tok);
return ERR;
} }
json_object_put(json_obj); json_object_put(json_obj);
json_tokener_free(json_tok); json_tokener_free(json_tok);
return SUCCESS;
} }

View file

@ -97,26 +97,32 @@ void buffer_clean(buffer_t *buf) {
pthread_mutex_unlock(&buf->mutex); pthread_mutex_unlock(&buf->mutex);
} }
char * buffer_dump(buffer_t *buf, char *dump, int len) { char * buffer_dump(buffer_t *buf, char *dump, size_t len) {
strcpy(dump, "|"); size_t pos = 0;
dump[pos++] = '{';
for (reading_t *rd = buf->head; rd != NULL; rd = rd->next) { for (reading_t *rd = buf->head; rd != NULL; rd = rd->next) {
char tmp[16]; if (pos < len) {
sprintf(tmp, "%.2f|", rd->value); pos += snprintf(dump+pos, len-pos, "%.2f", rd->value);
if (strlen(dump)+strlen(tmp) < len) {
if (buf->sent == rd) { /* indicate last sent reading */
strcat(dump, "!");
}
strcat(dump, tmp);
} }
else {
return NULL; /* dump buffer is full! */ /* indicate last sent reading */
if (pos < len && buf->sent == rd) {
dump[pos++] = '!';
}
/* add seperator between values */
if (pos < len && rd->next != NULL) {
dump[pos++] = ',';
} }
} }
return dump; if (pos+1 < len) {
dump[pos++] = '}';
dump[pos] = '\0'; /* zero terminated string */
}
return (pos < len) ? dump : NULL; /* buffer full? */
} }
void buffer_free(buffer_t *buf) { void buffer_free(buffer_t *buf) {

View file

@ -35,7 +35,6 @@ void channel_init(channel_t *ch, const char *uuid, const char *middleware, readi
snprintf(ch->id, 5, "ch%i", instances++); snprintf(ch->id, 5, "ch%i", instances++);
ch->identifier = identifier; ch->identifier = identifier;
ch->status = status_unknown;
ch->uuid = strdup(uuid); ch->uuid = strdup(uuid);
ch->middleware = strdup(middleware); ch->middleware = strdup(middleware);

View file

@ -29,19 +29,19 @@
#include "meter.h" #include "meter.h"
#include "options.h" #include "options.h"
#define METER_DETAIL(NAME, DESC, MAX_RDS, PERIODIC) { meter_protocol_##NAME, #NAME, DESC, MAX_RDS, PERIODIC, meter_init_##NAME, meter_open_##NAME, meter_close_##NAME, meter_read_##NAME } #define METER_DETAIL(NAME, DESC, MAX_RDS, PERIODIC) { meter_protocol_##NAME, #NAME, DESC, MAX_RDS, PERIODIC, meter_init_##NAME, meter_free_##NAME, meter_open_##NAME, meter_close_##NAME, meter_read_##NAME }
static const meter_details_t protocols[] = { static const meter_details_t protocols[] = {
/* alias description max_rds periodic /* alias description max_rds periodic
===============================================================================================*/ ===============================================================================================*/
METER_DETAIL(file, "Read from file (ex. 1-Wire sensors via OWFS)", 32, TRUE), METER_DETAIL(file, "Read from file or fifo", 32, TRUE),
//METER_DETAIL(exec, "Read from program (ex. 1-Wire sensors via digitemp)", 32, TRUE), //METER_DETAIL(exec, "Parse program output", 32, TRUE),
METER_DETAIL(random, "Random walk", 1, TRUE), METER_DETAIL(random, "Generate random values with a random walk", 1, TRUE),
METER_DETAIL(fluksov2, "Read from onboard SPI of a Flukso v2", 16, FALSE), METER_DETAIL(fluksov2, "Read from Flukso's onboard SPI fifo", 16, FALSE),
METER_DETAIL(s0, "S0 on RS232", 1, TRUE), METER_DETAIL(s0, "S0-meter directly connected to RS232", 1, TRUE),
METER_DETAIL(d0, "Plaintext protocol (DIN EN 62056-21)", 32, FALSE), METER_DETAIL(d0, "DLMS/IEC 62056-21 plaintext protocol", 32, FALSE),
#ifdef SML_SUPPORT #ifdef SML_SUPPORT
METER_DETAIL(sml, "Smart Meter Language", 32, FALSE), METER_DETAIL(sml, "Smart Message Language as used by EDL-21, eHz and SyM²", 32, FALSE),
#endif /* SML_SUPPORT */ #endif /* SML_SUPPORT */
{} /* stop condition for iterator */ {} /* stop condition for iterator */
}; };
@ -77,14 +77,9 @@ int meter_init(meter_t *mtr, list_t options) {
return details->init_func(mtr, options); return details->init_func(mtr, options);
} }
int meter_lookup_protocol(const char* name, meter_protocol_t *protocol) { void meter_free(meter_t *mtr) {
for (const meter_details_t *it = meter_get_protocols(); it != NULL; it++) { const meter_details_t *details = meter_get_details(mtr->protocol);
if (strcmp(it->name, name) == 0) { return details->free_func(mtr);
*protocol = it->id;
return SUCCESS;
}
}
return ERR_NOT_FOUND;
} }
int meter_open(meter_t *mtr) { int meter_open(meter_t *mtr) {
@ -102,6 +97,16 @@ size_t meter_read(meter_t *mtr, reading_t rds[], size_t n) {
return details->read_func(mtr, rds, n); return details->read_func(mtr, rds, n);
} }
int meter_lookup_protocol(const char* name, meter_protocol_t *protocol) {
for (const meter_details_t *it = meter_get_protocols(); it != NULL; it++) {
if (strcmp(it->name, name) == 0) {
*protocol = it->id;
return SUCCESS;
}
}
return ERR_NOT_FOUND;
}
const meter_details_t * meter_get_protocols() { const meter_details_t * meter_get_protocols() {
return protocols; return protocols;
} }

View file

@ -36,6 +36,7 @@
#include <errno.h> #include <errno.h>
#include <ctype.h> #include <ctype.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/ioctl.h>
/* socket */ /* socket */
#include <netdb.h> #include <netdb.h>
@ -98,12 +99,23 @@ int meter_init_d0(meter_t *mtr, list_t options) {
return SUCCESS; return SUCCESS;
} }
void meter_free_d0(meter_t *mtr) {
meter_handle_d0_t *handle = &mtr->handle.d0;
if (handle->device != NULL) {
free(handle->device);
}
if (handle->host != NULL) {
free(handle->host);
}
}
int meter_open_d0(meter_t *mtr) { int meter_open_d0(meter_t *mtr) {
meter_handle_d0_t *handle = &mtr->handle.d0; meter_handle_d0_t *handle = &mtr->handle.d0;
if (handle->device != NULL) { if (handle->device != NULL) {
print(log_error, "TODO: implement serial interface", mtr); handle->fd = meter_d0_open_device(handle->device, &handle->oldtio, handle->baudrate);
return ERR;
} }
else if (handle->host != NULL) { else if (handle->host != NULL) {
char *addr = strdup(handle->host); char *addr = strdup(handle->host);
@ -292,3 +304,35 @@ int meter_d0_open_socket(const char *node, const char *service) {
return fd; return fd;
} }
int meter_d0_open_device(const char *device, struct termios *old_tio, speed_t baudrate) {
struct termios tio;
memset(&tio, 0, sizeof(struct termios));
int fd = open(device, O_RDWR);
if (fd < 0) {
print(log_error, "open(%s): %s", NULL, device, strerror(errno));
return ERR;
}
/* get old configuration */
tcgetattr(fd, &tio) ;
/* backup old configuration to restore it when closing the meter connection */
memcpy(old_tio, &tio, sizeof(struct termios));
/* set 7-N-1 */
tio.c_iflag &= ~(BRKINT | INLCR | IMAXBEL);
tio.c_oflag &= ~(OPOST | ONLCR);
tio.c_lflag &= ~(ISIG | ICANON | IEXTEN | ECHO);
tio.c_cflag |= (CS7 | PARENB);
/* set baudrate */
cfsetispeed(&tio, baudrate);
cfsetospeed(&tio, baudrate);
/* apply new configuration */
tcsetattr(fd, TCSANOW, &tio);
return fd;
}

View file

@ -62,6 +62,16 @@ int meter_init_exec(meter_t *mtr, list_t options) {
return SUCCESS; return SUCCESS;
} }
void meter_free_exec(meter_t *mtr) {
meter_handle_exec_t *handle = &mtr->handle.exec;
free(handle->command);
if (handle->format != NULL) {
free(handle->format);
}
}
int meter_open_exec(meter_t *mtr) { int meter_open_exec(meter_t *mtr) {
//meter_handle_exec_t *handle = &mtr->handle.exec; //meter_handle_exec_t *handle = &mtr->handle.exec;

View file

@ -126,6 +126,16 @@ int meter_init_file(meter_t *mtr, list_t options) {
return SUCCESS; return SUCCESS;
} }
void meter_free_file(meter_t *mtr) {
meter_handle_file_t *handle = &mtr->handle.file;
free(handle->path);
if (handle->format != NULL) {
free(handle->format);
}
}
int meter_open_file(meter_t *mtr) { int meter_open_file(meter_t *mtr) {
meter_handle_file_t *handle = &mtr->handle.file; meter_handle_file_t *handle = &mtr->handle.file;

View file

@ -41,12 +41,18 @@ int meter_init_fluksov2(meter_t *mtr, list_t options) {
handle->fifo = strdup(fifo); handle->fifo = strdup(fifo);
} }
else { else {
handle->fifo = "/var/run/spid/delta/out"; /* use default path */ handle->fifo = strdup(FLUKSOV2_DEFAULT_FIFO); /* use default path */
} }
return SUCCESS; return SUCCESS;
} }
void meter_free_fluksov2(meter_t *mtr) {
meter_handle_fluksov2_t *handle = &mtr->handle.fluksov2;
free(handle->fifo);
}
int meter_open_fluksov2(meter_t *mtr) { int meter_open_fluksov2(meter_t *mtr) {
meter_handle_fluksov2_t *handle = &mtr->handle.fluksov2; meter_handle_fluksov2_t *handle = &mtr->handle.fluksov2;

View file

@ -55,6 +55,10 @@ int meter_init_random(meter_t *mtr, list_t options) {
return SUCCESS; return SUCCESS;
} }
void meter_free_random(meter_t *mtr) {
//meter_handle_random_t *handle = &mtr->handle.random;
}
int meter_open_random(meter_t *mtr) { int meter_open_random(meter_t *mtr) {
//meter_handle_random_t *handle = &mtr->handle.random; //meter_handle_random_t *handle = &mtr->handle.random;

View file

@ -59,6 +59,12 @@ int meter_init_s0(meter_t *mtr, list_t options) {
return SUCCESS; return SUCCESS;
} }
void meter_free_s0(meter_t *mtr) {
meter_handle_s0_t *handle = &mtr->handle.s0;
free(handle->device);
}
int meter_open_s0(meter_t *mtr) { int meter_open_s0(meter_t *mtr) {
meter_handle_s0_t *handle = &mtr->handle.s0; meter_handle_s0_t *handle = &mtr->handle.s0;

View file

@ -104,6 +104,18 @@ int meter_init_sml(meter_t *mtr, list_t options) {
return SUCCESS; return SUCCESS;
} }
void meter_free_sml(meter_t *mtr) {
meter_handle_sml_t *handle = &mtr->handle.sml;
if (handle->device != NULL) {
free(handle->device);
}
if (handle->host != NULL) {
free(handle->host);
}
}
int meter_open_sml(meter_t *mtr) { int meter_open_sml(meter_t *mtr) {
meter_handle_sml_t *handle = &mtr->handle.sml; meter_handle_sml_t *handle = &mtr->handle.sml;

View file

@ -25,10 +25,32 @@
#include <stdlib.h> #include <stdlib.h>
#include <math.h> #include <math.h>
#include <string.h>
#include "reading.h" #include "reading.h"
#include "meter.h" #include "meter.h"
char * reading_id_registry(const char *str) {
static list_t strings;
char *found = NULL;
/* linear search in string list */
foreach(strings, it, const char *) {
if (strcmp(*it, str) == 0) {
found = *it;
break;
}
}
if (!found) {
found = strdup(str);
list_push(&strings, found);
}
return found;
}
int reading_id_compare(meter_protocol_t protocol, reading_id_t a, reading_id_t b) { int reading_id_compare(meter_protocol_t protocol, reading_id_t a, reading_id_t b) {
switch (protocol) { switch (protocol) {
case meter_protocol_d0: case meter_protocol_d0:
@ -40,7 +62,10 @@ int reading_id_compare(meter_protocol_t protocol, reading_id_t a, reading_id_t b
case meter_protocol_file: case meter_protocol_file:
case meter_protocol_exec: case meter_protocol_exec:
return strcmp(a.string, b.string); /* we only need to compare the base pointers here,
because each identifer exists only once in the registry
and has therefore a unique pointer */
return !(a.string == b.string);
default: default:
/* no channel id, adding all readings to buffer */ /* no channel id, adding all readings to buffer */
@ -81,7 +106,7 @@ int reading_id_parse(meter_protocol_t protocol, reading_id_t *id, const char *st
case meter_protocol_file: case meter_protocol_file:
case meter_protocol_exec: case meter_protocol_exec:
id->string = strdup(string); // TODO free() elsewhere id->string = reading_id_registry(string);
break; break;
default: /* ignore other protocols which do not provide id's */ default: /* ignore other protocols which do not provide id's */

View file

@ -120,9 +120,22 @@ void * reading_thread(void *arg) {
/* debugging */ /* debugging */
if (options.verbosity >= log_debug) { if (options.verbosity >= log_debug) {
char dump[1024]; size_t dump_len = 24;
buffer_dump(buf, dump, 1024); char *dump = malloc(dump_len);
print(log_debug, "Buffer dump: %s (size=%i, keep=%i)", ch, dump, buf->size, buf->keep);
if (dump == NULL) {
print(log_error, "cannot allocate buffer", ch);
}
while (dump == NULL || buffer_dump(buf, dump, dump_len) == NULL) {
dump_len *= 1.5;
free(dump);
dump = malloc(dump_len);
}
print(log_debug, "Buffer dump (size=%i keep=%i): %s", ch, buf->size, buf->keep, dump);
free(dump);
} }
} }
@ -138,14 +151,21 @@ void * reading_thread(void *arg) {
} }
void logging_thread_cleanup(void *arg) { void logging_thread_cleanup(void *arg) {
curl_easy_cleanup((CURL *) arg); /* always cleanup */ api_handle_t *api = (api_handle_t *) arg;
api_free(api);
} }
void * logging_thread(void *arg) { void * logging_thread(void *arg) {
channel_t *ch = (channel_t *) arg; /* casting argument */ channel_t *ch = (channel_t *) arg; /* casting argument */
CURL *curl = api_curl_init(ch); api_handle_t api;
pthread_cleanup_push(&logging_thread_cleanup, curl); if (api_init(ch, &api) != SUCCESS) {
print(log_error, "CURL: cannot create handle", ch);
exit(EXIT_FAILURE);
}
pthread_cleanup_push(&logging_thread_cleanup, &api);
do { /* start thread mainloop */ do { /* start thread mainloop */
CURLresponse response; CURLresponse response;
@ -171,28 +191,30 @@ void * logging_thread(void *arg) {
print(log_debug, "JSON request body: %s", ch, json_str); print(log_debug, "JSON request body: %s", ch, json_str);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str); curl_easy_setopt(api.curl, CURLOPT_POSTFIELDS, json_str);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback); curl_easy_setopt(api.curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &response); curl_easy_setopt(api.curl, CURLOPT_WRITEDATA, (void *) &response);
curl_code = curl_easy_perform(curl); curl_code = curl_easy_perform(api.curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code); curl_easy_getinfo(api.curl, CURLINFO_RESPONSE_CODE, &http_code);
/* check response */ /* check response */
if (curl_code == CURLE_OK && http_code == 200) { /* everything is ok */ if (curl_code != CURLE_OK) {
print(log_debug, "Request succeeded with code: %i", ch, http_code); print(log_error, "CURL: %s", ch, curl_easy_strerror(curl_code));
ch->buffer.sent = last->next;
} }
else { /* error */ else if (http_code != 200) {
if (curl_code != CURLE_OK) { char exception[255];
print(log_error, "CURL: %s", ch, curl_easy_strerror(curl_code)); if (api_parse_exception(response, exception, 255) == SUCCESS) {
print(log_error, "Request failed: [%i] %s", ch, http_code, exception);
} }
else if (http_code != 200) { else {
char err[255]; print(log_error, "Request failed: %i", ch, http_code);
api_parse_exception(response, err, 255);
print(log_error, "Error from middleware: %s", ch, err);
} }
} }
else {
print(log_debug, "Request succeeded: %i", ch, http_code);
ch->buffer.sent = last->next;
}
/* householding */ /* householding */
free(response.data); free(response.data);

Binary file not shown.

View file

@ -94,48 +94,48 @@ const char *long_options_descs[] = {
* @param id could be NULL for general messages * @param id could be NULL for general messages
* @todo integrate into syslog * @todo integrate into syslog
*/ */
void print(int level, const char *format, void *id, ... ) { void print(log_level_t level, const char *format, void *id, ... ) {
va_list args;
if (level > options.verbosity) { if (level > options.verbosity) {
return; /* skip message if its under the verbosity level */ return; /* skip message if its under the verbosity level */
} }
struct timeval now; struct timeval now;
struct tm * timeinfo; struct tm * timeinfo;
char buffer[1024]; char prefix[24];
char *pos = buffer; size_t pos = 0;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
timeinfo = localtime(&now.tv_sec); timeinfo = localtime(&now.tv_sec);
/* print timestamp to buffer */ /* format timestamp */
pos += sprintf(pos, "["); pos += strftime(prefix+pos, 18, "[%b %d %H:%M:%S]", timeinfo);
pos += strftime(pos, 16, "%b %d %H:%M:%S", timeinfo);
/* print logging 'section' */ /* format section */
pos += (id != NULL) ? sprintf(pos, "][%s]", (char *) id) : sprintf(pos, "]"); if (id) {
snprintf(prefix+pos, 8, "[%s]", (char *) id);
/* fill with whitespaces */
while(pos - buffer < 24) {
pos += sprintf(pos, " ");
} }
/* print formatstring */ va_list args;
va_start(args, id); va_start(args, id);
pos += vsprintf(pos, format, args); /* print to stdout/stderr */
if (getppid() != 1) { /* running as fork in background? */
FILE *stream = (level > 0) ? stdout : stderr;
fprintf(stream, "%-24s", prefix);
vfprintf(stream, format, args);
fprintf(stream, "\n");
}
va_end(args); va_end(args);
/* print to stdout/stderr */ va_start(args, id);
if (getppid() != 1) {
fprintf((level > 0) ? stdout : stderr, "%s\n", buffer);
}
/* append to logfile */ /* append to logfile */
if (options.logfd) { if (options.logfd) {
fprintf(options.logfd, "%s\n", buffer); fprintf(options.logfd, "%-24s", prefix);
fflush(options.logfd); vfprintf(options.logfd, format, args);
fprintf(options.logfd, "\n");
fflush(options.logfd);
} }
va_end(args);
} }
/** /**
@ -273,6 +273,7 @@ int config_parse_cli(int argc, char * argv[], config_options_t * options) {
break; break;
case 'c': /* config file */ case 'c': /* config file */
free(options->config);
options->config = strdup(optarg); options->config = strdup(optarg);
break; break;
@ -306,8 +307,8 @@ int config_parse_cli(int argc, char * argv[], config_options_t * options) {
*/ */
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
/* default options */ /* default options */
options.config = "/etc/vzlogger.conf"; options.config = strdup("/etc/vzlogger.conf");
options.log = "/var/log/vzlogger.log"; options.log = NULL;
options.logfd = NULL; options.logfd = NULL;
options.port = 8080; options.port = 8080;
options.verbosity = 0; options.verbosity = 0;
@ -388,7 +389,7 @@ int main(int argc, char *argv[]) {
ch->buffer.keep = ceil(options.buffer_length / (double) mapping->meter.interval); ch->buffer.keep = ceil(options.buffer_length / (double) mapping->meter.interval);
} }
if (ch->status != status_running && options.logging) { if (options.logging) {
pthread_create(&ch->thread, NULL, &logging_thread, (void *) ch); pthread_create(&ch->thread, NULL, &logging_thread, (void *) ch);
print(log_debug, "Logging thread started", ch); print(log_debug, "Logging thread started", ch);
} }
@ -422,9 +423,10 @@ int main(int argc, char *argv[]) {
channel_free(ch); channel_free(ch);
} }
list_free(&mapping->channels);
meter_close(mtr); /* closing connection */ meter_close(mtr); /* closing connection */
list_free(&mapping->channels);
meter_free(mtr);
} }
#ifdef LOCAL_SUPPORT #ifdef LOCAL_SUPPORT
@ -435,11 +437,13 @@ int main(int argc, char *argv[]) {
#endif /* LOCAL_SUPPORT */ #endif /* LOCAL_SUPPORT */
/* householding */ /* householding */
free(options.config);
list_free(&mappings); list_free(&mappings);
curl_global_cleanup(); curl_global_cleanup();
/* close logfile */ /* close logfile */
if (options.logfd) { if (options.logfd) {
free(options.log);
fclose(options.logfd); fclose(options.logfd);
} }