introduced new JSON configuration format and some structural changes (meter => channel mapping)

This commit is contained in:
Steffen Vogel 2011-10-15 23:58:50 +02:00
parent a0c3044515
commit 668812aff9
20 changed files with 711 additions and 396 deletions

View file

@ -2,7 +2,7 @@ AM_CFLAGS = -Wall -D_REENTRANT -std=gnu99 $(DEPS_VZ_CFLAGS)
AM_CPPFLAGS = -I$(top_srcdir)/include -Iinclude
bin_PROGRAMS = vzlogger
vzlogger_SOURCES = src/vzlogger.c src/channel.c src/api.c src/options.c src/threads.c src/buffer.c
vzlogger_SOURCES = src/vzlogger.c src/channel.c src/api.c src/configuration.c src/threads.c src/buffer.c
vzlogger_LDADD = $(top_srcdir)/src/libmeter.a
vzlogger_LDFLAGS = -lpthread -lm $(DEPS_VZ_LIBS)

View file

@ -57,10 +57,10 @@ CONFIG_CLEAN_VPATH_FILES =
am__installdirs = "$(DESTDIR)$(bindir)"
PROGRAMS = $(bin_PROGRAMS)
am__vzlogger_SOURCES_DIST = src/vzlogger.c src/channel.c src/api.c \
src/options.c src/threads.c src/buffer.c src/local.c
src/configuration.c src/threads.c src/buffer.c src/local.c
@LOCAL_SUPPORT_TRUE@am__objects_1 = local.$(OBJEXT)
am_vzlogger_OBJECTS = vzlogger.$(OBJEXT) channel.$(OBJEXT) \
api.$(OBJEXT) options.$(OBJEXT) threads.$(OBJEXT) \
api.$(OBJEXT) configuration.$(OBJEXT) threads.$(OBJEXT) \
buffer.$(OBJEXT) $(am__objects_1)
vzlogger_OBJECTS = $(am_vzlogger_OBJECTS)
am__DEPENDENCIES_1 =
@ -183,7 +183,7 @@ AM_CFLAGS = -Wall -D_REENTRANT -std=gnu99 $(DEPS_VZ_CFLAGS) \
$(am__append_3) $(am__append_5)
AM_CPPFLAGS = -I$(top_srcdir)/include -Iinclude
vzlogger_SOURCES = src/vzlogger.c src/channel.c src/api.c \
src/options.c src/threads.c src/buffer.c $(am__append_1)
src/configuration.c src/threads.c src/buffer.c $(am__append_1)
vzlogger_LDADD = $(top_srcdir)/src/libmeter.a $(am__append_2) \
$(am__append_4)
vzlogger_LDFLAGS = -lpthread -lm $(DEPS_VZ_LIBS)
@ -271,8 +271,8 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/api.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/buffer.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/channel.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/configuration.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/local.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/options.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/threads.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/vzlogger.Po@am__quote@
@ -332,19 +332,19 @@ api.obj: src/api.c
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o api.obj `if test -f 'src/api.c'; then $(CYGPATH_W) 'src/api.c'; else $(CYGPATH_W) '$(srcdir)/src/api.c'; fi`
options.o: src/options.c
@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT options.o -MD -MP -MF $(DEPDIR)/options.Tpo -c -o options.o `test -f 'src/options.c' || echo '$(srcdir)/'`src/options.c
@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/options.Tpo $(DEPDIR)/options.Po
@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/options.c' object='options.o' libtool=no @AMDEPBACKSLASH@
configuration.o: src/configuration.c
@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT configuration.o -MD -MP -MF $(DEPDIR)/configuration.Tpo -c -o configuration.o `test -f 'src/configuration.c' || echo '$(srcdir)/'`src/configuration.c
@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/configuration.Tpo $(DEPDIR)/configuration.Po
@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/configuration.c' object='configuration.o' libtool=no @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o options.o `test -f 'src/options.c' || echo '$(srcdir)/'`src/options.c
@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o configuration.o `test -f 'src/configuration.c' || echo '$(srcdir)/'`src/configuration.c
options.obj: src/options.c
@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT options.obj -MD -MP -MF $(DEPDIR)/options.Tpo -c -o options.obj `if test -f 'src/options.c'; then $(CYGPATH_W) 'src/options.c'; else $(CYGPATH_W) '$(srcdir)/src/options.c'; fi`
@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/options.Tpo $(DEPDIR)/options.Po
@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/options.c' object='options.obj' libtool=no @AMDEPBACKSLASH@
configuration.obj: src/configuration.c
@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT configuration.obj -MD -MP -MF $(DEPDIR)/configuration.Tpo -c -o configuration.obj `if test -f 'src/configuration.c'; then $(CYGPATH_W) 'src/configuration.c'; else $(CYGPATH_W) '$(srcdir)/src/configuration.c'; fi`
@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/configuration.Tpo $(DEPDIR)/configuration.Po
@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/configuration.c' object='configuration.obj' libtool=no @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o options.obj `if test -f 'src/options.c'; then $(CYGPATH_W) 'src/options.c'; else $(CYGPATH_W) '$(srcdir)/src/options.c'; fi`
@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o configuration.obj `if test -f 'src/configuration.c'; then $(CYGPATH_W) 'src/configuration.c'; else $(CYGPATH_W) '$(srcdir)/src/configuration.c'; fi`
threads.o: src/threads.c
@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT threads.o -MD -MP -MF $(DEPDIR)/threads.Tpo -c -o threads.o `test -f 'src/threads.c' || echo '$(srcdir)/'`src/threads.c

View file

@ -36,7 +36,6 @@ typedef struct {
char id[5]; /* only for internal usage & debugging */
char *middleware; /* url to middleware */
char *uuid; /* unique identifier for middleware */
unsigned long interval; /* polling interval (== 0 for meters) */
reading_id_t identifier; /* channel identifier (OBIS, string) */
buffer_t buffer; /* circular queue to buffer readings */
@ -47,7 +46,7 @@ typedef struct {
} channel_t;
/* Prototypes */
void channel_init(channel_t *ch, char *uuid, char *middleware, unsigned long interval, reading_id_t identifier);
void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier);
void channel_free(channel_t *ch);
#endif /* _CHANNEL_H_ */

View file

@ -23,28 +23,19 @@
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _OPTIONS_H_
#define _OPTIONS_H_
#ifndef _CONFIGURATION_H_
#define _CONFIGURATION_H_
#include <json/json.h>
#include "list.h"
#include "channel.h"
#include "list.h"
#include "vzlogger.h"
/**
* Options from command line
*/
typedef struct {
char *config; /* path to config file */
unsigned int port; /* tcp port for local interface */
int verbose; /* verbosity level */
void parse_configuration(char *filename, list_t *assocs, options_t *opts);
channel_t * parse_channel(struct json_object *jso);
assoc_t * parse_meter(struct json_object *jso);
/* boolean bitfields, padding at the end of struct */
int daemon:1; /* run in background */
int local:1; /* enable local interface */
int logging:1; /* start logging threads */
} options_t;
int check_type(char *key, struct json_object *jso, enum json_type type);
/* Prototypes */
void parse_options(int argc, char *argv[], options_t *options);
void parse_channels(const char *filename, list_t *meters);
#endif /* _OPTIONS_H_ */
#endif /* _CONFIGURATION_H_ */

View file

@ -26,6 +26,7 @@
#ifndef _LOCAL_H_
#define _LOCAL_H_
#include <stdarg.h> /* required for MHD */
#include <microhttpd.h>
int handle_request(

View file

@ -30,13 +30,15 @@
#include <meter.h>
#include "config.h"
#include "list.h"
/* some hard coded configuration */
#define RETRY_PAUSE 30 /* seconds to wait after failed request */
#define BUFFER_KEEP 600 /* for the local interface; in seconds */
#define COMET_TIMEOUT 30 /* in seconds */
enum {
LOG_ERROR = -1,
LOG_STD = 0,
LOG_INFO = 5,
LOG_DEBUG = 10,
LOG_FINEST = 15
};
typedef enum {
UNKNOWN,
@ -51,12 +53,38 @@ typedef enum {
typedef struct {
meter_t meter;
list_t channels;
int interval;
pthread_t thread;
pthread_status_t status;
} assoc_t;
/**
* Options from command line
*/
typedef struct {
char *config; /* filename of configuration */
char *log; /* filename for logging */
FILE *logfd;
int port; /* TCP port for local interface */
int verbosity; /* verbosity level */
int comet_timeout; /* in seconds; */
int buffer_length; /* in seconds; how long to buffer readings for local interfalce */
int retry_pause; /* in seconds; how long to pause after an unsuccessful HTTP request */
/* boolean bitfields, padding at the end of struct */
int channel_index:1; /* give a index of all available channels via local interface */
int daemon:1; /* run in background */
int foreground:1; /* dont fork in background */
int local:1; /* enable local interface */
int logging:1; /* start logging threads, depends on local & daemon */
} options_t;
/* Prototypes */
void print(int level, const char *format, void *id, ... );
void usage(char ** argv);
void quit(int sig);
void parse_options(int argc, char *argv[], options_t *options);
#endif /* _VZLOGGER_H_ */

View file

@ -31,7 +31,7 @@
#include <meter.h>
#include "api.h"
#include "options.h"
#include "vzlogger.h"
extern options_t options;
@ -87,11 +87,11 @@ json_object * api_json_tuples(buffer_t *buf, reading_t *first, reading_t *last)
json_object *json_tuples = json_object_new_array();
reading_t *it;
for (it = first; it != last->next; it = it->next) {
for (it = first; it != NULL && it != last->next; it = it->next) {
struct json_object *json_tuple = json_object_new_array();
pthread_mutex_lock(&buf->mutex);
// TODO use long int of new json-c version
// API requires milliseconds => * 1000
double timestamp = tvtod(it->time) * 1000;
@ -128,7 +128,7 @@ CURL * api_curl_init(channel_t *ch) {
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header);
curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) options.verbose);
curl_easy_setopt(curl, CURLOPT_VERBOSE, options.verbosity);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch);

View file

@ -45,7 +45,7 @@ void buffer_init(buffer_t *buf) {
reading_t * buffer_push(buffer_t *buf, reading_t *rd) {
reading_t *new;
/* allocate memory for new reading */
new = malloc(sizeof(reading_t));

View file

@ -30,11 +30,10 @@
#include "channel.h"
void channel_init(channel_t *ch, char *uuid, char *middleware, unsigned long interval, reading_id_t identifier) {
void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier) {
static int instances; /* static to generate channel ids */
snprintf(ch->id, 5, "ch%i", instances++);
ch->interval = interval;
ch->identifier = identifier;
ch->status = UNKNOWN;

View file

@ -0,0 +1,263 @@
/**
* Parsing Apache HTTPd-like configuration
*
* @author Steffen Vogel <info@steffenvogel.de>
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <errno.h>
#include "configuration.h"
#include "channel.h"
extern const meter_type_t meter_types[];
void parse_configuration(char *filename, list_t *assocs, options_t *options) {
struct json_object *json_config;
struct json_tokener *json_tok = json_tokener_new();
char buf[JSON_FILE_BUF_SIZE];
int line = 0;
/* open configuration file */
FILE *file = fopen(filename, "r");
if (file == NULL) {
print(LOG_ERROR, "Cannot open configfile %s: %s", NULL, filename, strerror(errno)); /* why didn't the file open? */
exit(EXIT_FAILURE);
}
else {
print(2, "Start parsing configuration from %s", NULL, filename);
}
/* parse JSON */
while(fgets(buf, JSON_FILE_BUF_SIZE, file)) {
line++;
json_config = json_tokener_parse_ex(json_tok, buf, strlen(buf));
if (json_tok->err > 1) {
print(-1, "Error in %s:%d %s at offset %d", NULL, filename, line, json_tokener_errors[json_tok->err], json_tok->char_offset);
exit(EXIT_FAILURE);
}
}
fclose(file);
json_tokener_free(json_tok);
/* parse configuration */
json_object_object_foreach(json_config, key, value) {
if (strcmp(key, "daemon") == 0 && check_type(key, value, json_type_boolean)) {
options->daemon = json_object_get_boolean(value);
}
else if (strcmp(key, "foreground") == 0 && check_type(key, value, json_type_boolean)) {
options->foreground = json_object_get_boolean(value);
}
else if (strcmp(key, "log") == 0 && check_type(key, value, json_type_string)) {
options->log = strdup(json_object_get_string(value));
}
else if (strcmp(key, "retry") == 0 && check_type(key, value, json_type_int)) {
options->retry_pause = json_object_get_int(value);
}
else if (strcmp(key, "verbosity") == 0 && check_type(key, value, json_type_int)) {
options->verbosity = json_object_get_int(value);
}
else if (strcmp(key, "local") == 0) {
json_object_object_foreach(value, key, local_value) {
if (strcmp(key, "enabled") == 0 && check_type(key, local_value, json_type_boolean)) {
options->local = json_object_get_boolean(local_value);
}
else if (strcmp(key, "port") == 0 && check_type(key, local_value, json_type_int)) {
options->port = json_object_get_int(local_value);
}
else if (strcmp(key, "timeout") == 0 && check_type(key, local_value, json_type_int)) {
options->comet_timeout = json_object_get_int(local_value);
}
else if (strcmp(key, "buffer") == 0 && check_type(key, local_value, json_type_int)) {
options->buffer_length = json_object_get_int(local_value);
}
else if (strcmp(key, "index") == 0 && check_type(key, local_value, json_type_boolean)) {
options->channel_index = json_object_get_boolean(local_value);
}
else {
print(-1, "Invalid field: %s", NULL, key);
exit(EXIT_FAILURE);
}
}
}
else if ((strcmp(key, "sensors") == 0 || strcmp(key, "meters") == 0) && check_type(key, value, json_type_array)) {
int len = json_object_array_length(value);
for (int i = 0; i < len; i++) {
assoc_t *as = parse_meter(json_object_array_get_idx(value, i));
if (as != NULL) {
list_push(assocs, as);
}
}
}
else {
print(-1, "Invalid field: %s", NULL, key);
exit(EXIT_FAILURE);
}
}
json_object_put(json_config);
}
assoc_t * parse_meter(struct json_object *jso) {
assoc_t *assoc = malloc(sizeof(assoc_t));
list_t json_channels;
const meter_type_t *type = NULL;
const char *connection = NULL;
const char *protocol = NULL;
int enabled = TRUE;
list_init(&assoc->channels);
list_init(&json_channels);
assoc->interval = 0;
json_object_object_foreach(jso, key, value) {
if (strcmp(key, "enabled") == 0 && check_type(key, value, json_type_boolean)) {
enabled = json_object_get_boolean(value);
}
else if (strcmp(key, "protocol") == 0 && check_type(key, value, json_type_string)) {
protocol = json_object_get_string(value);
for (type = meter_types; type->name != NULL; type++) { /* linear search */
if (strcmp(type->name, protocol) == 0) break;
}
if (type == NULL) {
print(-1, "Invalid protocol: %s", NULL, protocol);
}
}
else if (strcmp(key, "connection") == 0 && check_type(key, value, json_type_string)) {
connection = json_object_get_string(value);
}
else if (strcmp(key, "interval") == 0 && check_type(key, value, json_type_int)) {
assoc->interval = json_object_get_int(value);
}
else if (strcmp(key, "channels") == 0 && check_type(key, value, json_type_array)) {
int len = json_object_array_length(value);
for (int i = 0; i < len; i++) {
list_push(&json_channels, value);
}
}
else if (strcmp(key, "channel") == 0 && check_type(key, value, json_type_object)) {
list_push(&json_channels, value);
}
else {
print(-1, "Invalid field: %s", NULL, key);
exit(EXIT_FAILURE);
}
}
if (type == NULL) {
print(-1, "Missing protocol", NULL);
exit(EXIT_FAILURE);
}
else if (connection == NULL) {
print(-1, "Missing connection", NULL);
exit(EXIT_FAILURE);
}
else if (enabled == TRUE) {
/* init meter */
meter_init(&assoc->meter, type, connection);
print(5, "New meter initialized (protocol=%s, connection=%s, interval=%d)", assoc, protocol, connection, assoc->interval);
/* init channels */
foreach(json_channels, it) {
struct json_object *jso = (struct json_object *) it->data;
channel_t *ch = parse_channel(jso);
if (ch != NULL) {
list_push(&assoc->channels, ch);
}
}
return assoc;
}
else {
return NULL;
}
}
channel_t * parse_channel(struct json_object *jso) {
const char *uuid = NULL;
const char *middleware = NULL;
const char *identifier = NULL;
int enabled = TRUE;
json_object_object_foreach(jso, key, value) {
if (strcmp(key, "uuid") == 0 && check_type(key, value, json_type_string)) {
uuid = json_object_get_string(value);
}
else if (strcmp(key, "middleware") == 0 && check_type(key, value, json_type_string)) {
middleware = json_object_get_string(value);
}
else if (strcmp(key, "identifier") == 0 && check_type(key, value, json_type_string)) {
identifier = json_object_get_string(value);
}
else if (strcmp(key, "enabled") == 0 && check_type(key, value, json_type_boolean)) {
enabled = json_object_get_boolean(value);
}
else {
print(-1, "Invalid field: %s", NULL, key);
exit(EXIT_FAILURE);
}
}
if (uuid == NULL) {
print(-1, "Missing UUID", NULL);
exit(EXIT_FAILURE);
}
else if (middleware == NULL) {
print(-1, "Missing middleware", NULL);
exit(EXIT_FAILURE);
}
else if (enabled == TRUE) {
reading_id_t id = { (identifier == NULL) ? obis_init(NULL) : obis_parse(identifier) };
channel_t *ch = malloc(sizeof(channel_t));
channel_init(ch, uuid, middleware, id);
print(5, "New channel initialized (uuid=...%s middleware=%s identifier=%s)", ch, uuid+30, middleware, identifier);
return ch;
}
else {
return NULL;
}
}
int check_type(char *key, struct json_object *jso, enum json_type type) {
char *json_types[] = { "null", "boolean", "double", "int", "object", "array", "string" };
if (json_object_get_type(jso) != type) {
print(-1, "Invalid variable type for field %s: %s (%s)", NULL, key, json_object_get_string(jso), json_types[json_object_get_type(jso)]);
exit(EXIT_FAILURE);
}
else {
return 1;
}
}

View file

@ -33,6 +33,8 @@
#include "local.h"
#include "api.h"
extern options_t options;
int handle_request(void *cls, struct MHD_Connection *connection, const char *url, const char *method,
const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) {
@ -44,48 +46,63 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
struct MHD_Response *response;
const char *mode = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "mode");
print(2, "Local request received: method=%s url=%s mode=%s", NULL, method, url, mode);
print(2, "Local request received: method=%s url=%s mode=%s", "http", method, url, mode);
if (strcmp(method, "GET") == 0) {
struct timespec ts;
struct timeval tp;
struct json_object *json_obj = json_object_new_object();
struct json_object *json_data = json_object_new_array();
const char *uuid = url+1;
struct json_object *json_exception = NULL;
const char *uuid = url + 1; /* strip leading slash */
const char *json_str;
int show_all = 0;
if (strcmp(url, "/") == 0) {
if (options.channel_index) {
show_all = TRUE;
}
else {
json_exception = json_object_new_object();
json_object_object_add(json_exception, "message", json_object_new_string("channel indexing is disabled"));
json_object_object_add(json_exception, "code", json_object_new_int((int) 0));
}
}
foreach(*assocs, it) {
assoc_t *assoc = (assoc_t *) it->data;
foreach(assoc->channels, it) {
channel_t *ch = (channel_t *) it->data;
if (strcmp(url, "/") == 0 || strcmp(ch->uuid, uuid) == 0) {
if (strcmp(ch->uuid, uuid) == 0 || show_all) {
response_code = MHD_HTTP_OK;
/* blocking until new data arrives (comet-like blocking of HTTP response) */
if (strcmp(url, "/") != 0 && mode && strcmp(mode, "comet") == 0) {
/* blocking until new data arrives (comet-like blocking of HTTP response) */
if (mode && strcmp(mode, "comet") == 0) {
/* convert from timeval to timespec */
gettimeofday(&tp, NULL);
ts.tv_sec = tp.tv_sec + COMET_TIMEOUT;
ts.tv_sec = tp.tv_sec + options.comet_timeout;
ts.tv_nsec = tp.tv_usec * 1000;
pthread_mutex_lock(&ch->buffer.mutex);
pthread_cond_timedwait(&ch->condition, &ch->buffer.mutex, &ts);
pthread_mutex_unlock(&ch->buffer.mutex);
}
struct json_object *json_ch = json_object_new_object();
json_object_object_add(json_ch, "uuid", json_object_new_string(ch->uuid));
json_object_object_add(json_ch, "middleware", json_object_new_string(ch->middleware));
json_object_object_add(json_ch, "interval", json_object_new_int(ch->interval));
json_object_object_add(json_ch, "interval", json_object_new_int(assoc->interval));
json_object_object_add(json_ch, "protocol", json_object_new_int(assoc->meter.type->name));
struct json_object *json_tuples = api_json_tuples(&ch->buffer, ch->buffer.head, ch->buffer.tail);
json_object_object_add(json_ch, "tuples", json_tuples);
json_object_array_add(json_data, json_ch);
}
}
@ -95,6 +112,10 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
json_object_object_add(json_obj, "generator", json_object_new_string(PACKAGE));
json_object_object_add(json_obj, "data", json_data);
if (json_exception) {
json_object_object_add(json_obj, "exception", json_exception);
}
json_str = json_object_to_json_string(json_obj);
response = MHD_create_response_from_data(strlen(json_str), (void *) json_str, FALSE, TRUE);
json_object_put(json_obj);
@ -109,10 +130,9 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
MHD_add_response_header(response, "Content-type", "text/text");
}
status = MHD_queue_response(connection, response_code, response);
MHD_destroy_response(response);
return status;

View file

@ -1,224 +0,0 @@
/**
* Parsing commandline options and channel list
*
* @author Steffen Vogel <info@steffenvogel.de>
* @copyright Copyright (c) 2011, The volkszaehler.org project
* @package vzlogger
* @license http://opensource.org/licenses/gpl-license.php GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <getopt.h>
#include <regex.h>
#include <meter.h>
#include "options.h"
#include "channel.h"
#include "vzlogger.h"
extern meter_type_t meter_types[];
options_t options = { /* setting default options */
"/etc/vzlogger.conf", /* config file */
8080, /* port for local interface */
0, /* verbosity level */
0, /* daemon mode */
0 /* local interface */
};
/**
* Command line options
*/
const struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"daemon", required_argument, 0, 'd'},
#ifdef LOCAL_SUPPORT
{"local", no_argument, 0, 'l'},
{"local-port", required_argument, 0, 'p'},
#endif /* LOCAL_SUPPORT */
{"verbose", optional_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{"version", no_argument, 0, 'V'},
{} /* stop condition for iterator */
};
/**
* Options for config file
*/
struct {
char *key;
int level;
}
/**
* Descriptions vor command line options
*/
char *long_options_descs[] = {
"config file with channel -> uuid mapping",
"run as daemon",
#ifdef LOCAL_SUPPORT
"activate local interface (tiny webserver)",
"TCP port for local interface",
#endif /* LOCAL_SUPPORT */
"enable verbose output",
"show this help",
"show version of vzlogger",
"" /* stop condition for iterator */
};
/**
* Parse options from command line
*/
void parse_options(int argc, char * argv[], options_t * opts) {
while (1) {
int c = getopt_long(argc, argv, "i:c:p:lhVdv::", long_options, NULL);
/* detect the end of the options. */
if (c == -1) break;
switch (c) {
case 'v':
opts->verbose = (optarg == NULL) ? 1 : atoi(optarg);
break;
#ifdef LOCAL_SUPPORT
case 'l':
opts->local = 1;
break;
case 'p': /* port for local interface */
opts->port = atoi(optarg);
break;
#endif /* LOCAL_SUPPORT */
case 'd':
opts->daemon = 1;
break;
case 'c': /* read config file */
opts->config = (char *) malloc(strlen(optarg)+1);
strcpy(opts->config, optarg);
break;
case 'V':
printf("%s\n", VERSION);
exit(EXIT_SUCCESS);
break;
case '?':
case 'h':
default:
usage(argv);
exit((c == '?') ? EXIT_FAILURE : EXIT_SUCCESS);
}
}
opts->logging = (!opts->local || opts->daemon);
}
void parse_channels(const char *filename, list_t *assocs) {
FILE *file = fopen(filename, "r"); /* open configuration */
if (!filename) { /* nothing found */
print(-1, "No config file found! Please specify with --config!\n", NULL);
exit(EXIT_FAILURE);
}
if (file == NULL) {
perror(filename); /* why didn't the file open? */
exit(EXIT_FAILURE);
}
else {
print(2, "Start parsing configuration from %s", NULL, filename);
}
/* compile regular expressions */
regex_t re_uuid, re_middleware, re_section, re_value;
regcomp(&re_uuid, "^[:xdigit:]{8}-([:xdigit:]{4}-){3}[:xdigit:]{12}$", REG_EXTENDED | REG_NOSUB);
regcomp(&re_middleware, "^https?://[[:alnum:].-]+(\\.[:alpha:]{2,6})?(:[:digit:]{1,5})?(/\\S*)?$", REG_EXTENDED | REG_NOSUB);
regcomp(&re_section, "^[:blank:]*<(/?)([:alpha]+)>[:blank:]*$", REG_EXTENDED);
regcomp(&re_value, "^[:blank:]*([:alpha]+)[:blank:]+(.+)[:blank:]*$", REG_EXTENDED);
struct options {
char *key;
enum context;
//char regex;
}
/* read a line */
while ((line = fgets(buffer, 256, file)) != NULL) {
if (regex(&re_section) == 0) {
}
else if (regex(&re_value) == 0) {
switch (context) {
case GLOBAL:
/* no values allowed here */
break;
case METER:
if (strcasecmp(key, "protocol") == 0) {
}
else if (strcasecmp(key, "interval") == 0) {
}
else if (strcasecmp(key, "host") == 0) {
}
else if (strcasecmp(key, "port") == 0) {
}
break;
case CHANNEL:
if (strcasecmp(key, "uuid") == 0) {
}
else if (strcasecmp(key, "middlware") == 0) {
}
else if (strcasecmp(key, "identifier") == 0) {
}
break;
char *key, *value;
}
}
}
fclose(file);
free(buffer);
regfree(&re_middleware);
regfree(&re_uuid);
}

View file

@ -28,7 +28,7 @@
#include "threads.h"
#include "api.h"
#include "options.h"
#include "vzlogger.h"
extern options_t options;
@ -42,21 +42,26 @@ void * reading_thread(void *arg) {
reading_t *rds = malloc(sizeof(reading_t) * mtr->type->max_readings);
time_t last, delta;
size_t n = 0;
pthread_cleanup_push(&reading_thread_cleanup, rds);
do { /* start thread main loop */
/* fetch readings from meter */
last = time(NULL);
n = meter_read(mtr, rds, mtr->type->max_readings);
delta = time(NULL) - last;
/* update buffer length with current interval */
if (!mtr->type->periodic && delta != assoc->interval) {
print(15, "Updating interval to %i", mtr, delta);
assoc->interval = delta;
}
foreach(assoc->channels, it) {
channel_t *ch = (channel_t *) it->data;
buffer_t *buf = &ch->buffer;
reading_t *added;
for (int i = 0; i < n; i++) {
switch (mtr->type->id) {
case SML:
@ -66,51 +71,49 @@ void * reading_thread(void *arg) {
added = buffer_push(buf, &rds[i]);
}
break;
default:
/* no channel identifier, adding all readings to buffer */
print(5, "New reading (value=%.2f ts=%f)", ch, ch->id, rds[i].value, tvtod(rds[i].time));
print(LOG_INFO, "New reading (value=%.2f ts=%f)", ch, ch->id, rds[i].value, tvtod(rds[i].time));
added = buffer_push(buf, &rds[i]);
}
}
/* update buffer length with current interval */
if (!mtr->type->periodic && options.local && delta) {
print(15, "Updating interval to %i", ch, delta);
ch->interval = delta;
ch->buffer.keep = ceil(BUFFER_KEEP / (double) delta);
/* update buffer length to interval */
if (options.local) {
ch->buffer.keep = ceil(options.buffer_length / assoc->interval);
}
/* queue reading into sending buffer logging thread if
logging is enabled & sent queue is empty */
if (options.logging && !buf->sent) {
buf->sent = added;
}
/* shrink buffer */
buffer_clean(buf);
/* notify webserver and logging thread */
pthread_mutex_lock(&buf->mutex);
pthread_cond_broadcast(&ch->condition);
pthread_mutex_unlock(&buf->mutex);
/* debugging */
if (options.verbose >= 10) {
if (options.verbosity >= 10) {
char dump[1024];
buffer_dump(buf, dump, 1024);
print(10, "Buffer dump: %s (size=%i, memory=%i)", ch, dump, buf->size, buf->keep);
print(LOG_DEBUG, "Buffer dump: %s (size=%i, keep=%i)", ch, dump, buf->size, buf->keep);
}
}
if ((options.daemon || options.local) && mtr->type->periodic) {
print(8, "Next reading in %i seconds", NULL, 5);
print(LOG_INFO, "Next reading in %i seconds", mtr, 5);
sleep(5); // TODO handle parsing
}
} while (options.daemon || options.local);
pthread_cleanup_pop(1);
return NULL;
}
@ -141,11 +144,11 @@ void * logging_thread(void *arg) {
pthread_cond_wait(&ch->condition, &ch->buffer.mutex); /* sleep until new data has been read */
}
pthread_mutex_unlock(&ch->buffer.mutex);
last = ch->buffer.tail;
json_obj = api_json_tuples(&ch->buffer, ch->buffer.sent, last);
json_str = json_object_to_json_string(json_obj);
print(10, "JSON request body: %s", ch, json_str);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str);
@ -170,17 +173,17 @@ void * logging_thread(void *arg) {
print(-1, "Error from middleware: %s", ch, err);
}
}
/* householding */
free(response.data);
json_object_put(json_obj);
if (options.daemon && (curl_code != CURLE_OK || http_code != 200)) {
print(2, "Sleeping %i seconds due to previous failure", ch, RETRY_PAUSE);
sleep(RETRY_PAUSE);
print(1, "Waiting %i secs for next request due to previous failure", ch, options.retry_pause);
sleep(options.retry_pause);
}
} while (options.daemon);
pthread_cleanup_pop(1);
return NULL;

View file

@ -25,18 +25,23 @@
#include <stdio.h> /* for print() */
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <unistd.h>
#include <math.h>
#include <errno.h>
#include <stdlib.h>
#include <signal.h>
#include <getopt.h>
#include <curl/curl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include "vzlogger.h"
#include "list.h"
#include "channel.h"
#include "options.h"
#include "configuration.h"
#include "threads.h"
#ifdef LOCAL_SUPPORT
@ -44,13 +49,47 @@
#include "local.h"
#endif /* LOCAL_SUPPORT */
list_t assocs; /* mapping between meters and channels */
extern options_t options;
extern const char *long_options_descs[];
extern const struct option long_options[];
extern const meter_type_t meter_types[];
list_t assocs; /* mapping between meters and channels */
options_t options; /* global application options */
/**
* Command line options
*/
const struct option long_options[] = {
{"config", required_argument, 0, 'c'},
{"log", required_argument, 0, 'o'},
{"daemon", required_argument, 0, 'd'},
{"foreground", required_argument, 0, 'f'},
#ifdef LOCAL_SUPPORT
{"httpd", no_argument, 0, 'l'},
{"httpd-port", required_argument, 0, 'p'},
#endif /* LOCAL_SUPPORT */
{"verbose", required_argument, 0, 'v'},
{"help", no_argument, 0, 'h'},
{"version", no_argument, 0, 'V'},
{} /* stop condition for iterator */
};
/**
* Descriptions vor command line options
*/
const char *long_options_descs[] = {
"configuration file",
"log file",
"run as periodically",
"do not run in background",
#ifdef LOCAL_SUPPORT
"activate local interface (tiny HTTPd which serves live readings)",
"TCP port for HTTPd",
#endif /* LOCAL_SUPPORT */
"enable verbose output",
"show this help",
"show version of vzlogger",
NULL /* stop condition for iterator */
};
/**
* Print available options and some other usefull information
*/
@ -62,7 +101,7 @@ void usage(char *argv[]) {
printf("Usage: %s [options]\n\n", argv[0]);
printf(" following options are available:\n");
while (op->name && *desc) {
while (op->name && desc) {
printf("\t-%c, --%-12s\t%s\n", op->val, op->name, *desc);
op++; desc++;
}
@ -93,18 +132,22 @@ void print(int level, const char *format, void *id, ... ) {
struct tm * timeinfo;
char buffer[1024], *pos = buffer;
if (level <= options.verbose) {
if (level <= options.verbosity) {
gettimeofday(&now, NULL);
timeinfo = localtime(&now.tv_sec);
pos += sprintf(pos, "[");
pos += strftime(pos, 16, "%b %d %H:%M:%S", timeinfo);
if (id != NULL) {
pos += sprintf(pos, "][%s]\t", (char *) id);
pos += sprintf(pos, "][%s]", (char *) id);
}
else {
pos += sprintf(pos, "]\t");
pos += sprintf(pos, "]");
}
while(pos - buffer < 24) {
pos += sprintf(pos, " ");
}
va_start(args, id);
@ -112,9 +155,56 @@ void print(int level, const char *format, void *id, ... ) {
va_end(args);
fprintf((level > 0) ? stdout : stderr, "%s\n", buffer);
if (options.logfd) {
fprintf(options.logfd, "%s\n", buffer);
fflush(options.logfd);
}
}
}
/* http://www.enderunix.org/docs/eng/daemon.php */
void daemonize() {
if(getppid() == 1) {
return; /* already a daemon */
}
int i = fork();
if (i < 0) {
exit(EXIT_FAILURE); /* fork error */
}
else if (i > 0) {
exit(EXIT_SUCCESS); /* parent exits */
}
/* child (daemon) continues */
setsid(); /* obtain a new process group */
for (i=getdtablesize();i>=0;--i) {
close(i); /* close all descriptors */
}
/* handle standart I/O */
i = open("/dev/null", O_RDWR);
dup(i);
dup(i);
chdir("/"); /* change working directory */
umask(0022);
/* ignore signals from parent tty */
struct sigaction action;
sigemptyset(&action.sa_mask);
action.sa_flags = 0;
action.sa_handler = SIG_IGN;
sigaction(SIGCHLD, &action, NULL); /* ignore child */
sigaction(SIGTSTP, &action, NULL); /* ignore tty signals */
sigaction(SIGTTOU, &action, NULL);
sigaction(SIGTTIN, &action, NULL);
}
/**
* Cancel threads
*
@ -122,60 +212,158 @@ void print(int level, const char *format, void *id, ... ) {
*/
void quit(int sig) {
print(2, "Closing connections to terminate", NULL);
foreach(assocs, it) {
assoc_t *assoc = (assoc_t *) it->data;
pthread_cancel(assoc->thread);
foreach(assoc->channels, it) {
channel_t *ch = (channel_t *) it->data;
pthread_cancel(ch->thread);
}
}
}
/**
* The main loop
* Parse options from command line
*/
void parse_options(int argc, char * argv[], options_t * options) {
while (1) {
int c = getopt_long(argc, argv, "c:o:p:lhVdfv:", long_options, NULL);
/* detect the end of the options. */
if (c == -1) break;
switch (c) {
case 'v':
options->verbosity = atoi(optarg);
break;
#ifdef LOCAL_SUPPORT
case 'l':
options->local = 1;
break;
case 'p': /* port for local interface */
options->port = atoi(optarg);
break;
#endif /* LOCAL_SUPPORT */
case 'd':
options->daemon = 1;
break;
case 'f':
options->foreground = 1;
break;
case 'c': /* config file */
options->config = strdup(optarg);
break;
case 'o': /* log file */
options->log = strdup(optarg);
break;
case 'V':
printf("%s\n", VERSION);
exit(EXIT_SUCCESS);
break;
case '?':
case 'h':
default:
usage(argv);
exit((c == '?') ? EXIT_FAILURE : EXIT_SUCCESS);
}
}
}
/**
* The application entrypoint
*/
int main(int argc, char *argv[]) {
/* default options */
options.config = "/etc/vzlogger.conf";
options.log = "/var/log/vzlogger.log";
options.logfd = NULL;
options.port = 8080;
options.verbosity = 0;
options.comet_timeout = 30;
options.buffer_length = 600;
options.retry_pause = 15;
options.daemon = FALSE;
options.local = FALSE;
options.logging = TRUE;
/* bind signal handler */
struct sigaction action;
sigemptyset(&action.sa_mask);
action.sa_flags = 0;
action.sa_handler = quit;
sigaction(SIGINT, &action, NULL);
sigaction(SIGINT, &action, NULL); /* catch ctrl-c from terminal */
sigaction(SIGHUP, &action, NULL); /* catch hangup signal */
sigaction(SIGTERM, &action, NULL); /* catch kill signal */
/* initialize adts and apis */
curl_global_init(CURL_GLOBAL_ALL);
list_init(&assocs);
parse_options(argc, argv, &options); /* parse command line arguments */
parse_channels(options.config, &assocs);/* parse channels from configuration */
/* parse command line and file options */
// TODO command line should have a higher priority as file
parse_options(argc, argv, &options);
parse_configuration(options.config, &assocs, &options);
options.logging = (!options.local || options.daemon);
if (!options.foreground && (options.daemon || options.local)) {
print(1, "Daemonize process...", NULL);
daemonize();
}
/* open logfile */
if (options.log) {
FILE *logfd = fopen(options.log, "a");
if (logfd) {
options.logfd = logfd;
print(LOG_DEBUG, "Opened logfile %s", NULL, options.log);
}
else {
print(LOG_ERROR, "Cannot open logfile %s: %s", NULL, options.log, strerror(errno));
exit(EXIT_FAILURE);
}
}
if (assocs.size == 0) {
print(6, "No meters found!", NULL);
exit(EXIT_FAILURE);
}
/* open connection meters & start threads */
foreach(assocs, it) {
assoc_t *assoc = (assoc_t *) it->data;
meter_t *mtr = &assoc->meter;
int res = meter_open(mtr);
if (res < 0) {
print(LOG_ERROR, "Failed to open meter", mtr);
exit(EXIT_FAILURE);
}
print(5, "Meter connected", mtr);
pthread_create(&assoc->thread, NULL, &reading_thread, (void *) assoc);
print(5, "Meter thread started", mtr);
foreach(assoc->channels, it) {
channel_t *ch = (channel_t *) it->data;
/* set buffer length for perriodic meters */
if (mtr->type->periodic && options.local) {
ch->buffer.keep = ceil(BUFFER_KEEP / (double) ch->interval);
ch->buffer.keep = ceil(options.buffer_length / (double) assoc->interval);
}
if (ch->status != RUNNING && options.logging) {
pthread_create(&ch->thread, NULL, &logging_thread, (void *) ch);
print(5, "Logging thread started", ch);
@ -187,7 +375,7 @@ int main(int argc, char *argv[]) {
/* start webserver for local interface */
struct MHD_Daemon *httpd_handle = NULL;
if (options.local) {
print(5, "Starting local interface HTTPd on port %i", NULL, options.port);
print(5, "Starting local interface HTTPd on port %i", "http", options.port);
httpd_handle = MHD_start_daemon(
MHD_USE_THREAD_PER_CONNECTION,
options.port,
@ -202,27 +390,27 @@ int main(int argc, char *argv[]) {
foreach(assocs, it) {
assoc_t *assoc = (assoc_t *) it->data;
meter_t *mtr = &assoc->meter;
pthread_join(assoc->thread, NULL);
foreach(assoc->channels, it) {
channel_t *ch = (channel_t *) it->data;
pthread_join(ch->thread, NULL);
channel_free(ch);
}
list_free(&assoc->channels);
meter_close(mtr); /* closing connection */
meter_free(mtr);
}
#ifdef LOCAL_SUPPORT
/* stop webserver */
if (httpd_handle) {
print(8, "Stopping local interface HTTPd on port %i", NULL, options.port);
print(8, "Stopping local interface HTTPd", "http");
MHD_stop_daemon(httpd_handle);
}
#endif /* LOCAL_SUPPORT */
@ -230,7 +418,9 @@ int main(int argc, char *argv[]) {
/* householding */
list_free(&assocs);
curl_global_cleanup();
print(10, "Bye bye!", NULL);
if (options.logfd) {
fclose(options.logfd);
}
return EXIT_SUCCESS;
}

View file

@ -29,6 +29,8 @@
#ifndef _D0_H_
#define _D0_H_
#define D0_BUFFER_LENGTH 1024
#include <termios.h>
typedef struct {

View file

@ -110,7 +110,7 @@ double tvtod(struct timeval tv);
* @param type the type it should be initialized with
* @param connection type specific initialization arguments (connection settings, port, etc..)
*/
void meter_init(meter_t *mtr, meter_type_t *type, const char *connection);
void meter_init(meter_t *mtr, const meter_type_t *type, const char *connection);
/**
* Freeing all memory which has been allocated during the initialization

View file

@ -27,10 +27,11 @@
#define _S0_H_
#include <termios.h>
#include <signal.h>
typedef struct {
int fd; /* file descriptor of port */
struct termios oldtio; /* required to reset port */
int fd; /* file descriptor of port */
struct termios oldtio; /* required to reset port */
} meter_handle_s0_t;
struct meter; /* forward declaration */

View file

@ -34,34 +34,31 @@
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <regex.h>
#include "meter.h"
#include "obis.h"
#include "d0.h"
int meter_open_d0(meter_t *mtr) {
meter_handle_d0_t *handle = &mtr->handle.d0;
struct termios tio;
memset(&tio, 0, sizeof(tio));
/* open serial port */
handle->fd = open(mtr->connection, O_RDWR); // | O_NONBLOCK);
handle->fd = open(mtr->connection, O_RDWR);
if (handle->fd < 0) {
return -1;
}
// TODO save oldtio
tio.c_iflag = 0;
tio.c_oflag = 0;
tio.c_cflag = CS7|CREAD|CLOCAL; // 7n1, see termios.h for more information
tio.c_cflag = B9600 | CS7 | CREAD | CLOCAL;
tio.c_lflag = 0;
tio.c_cc[VMIN] = 1;
tio.c_cc[VTIME] = 5;
cfsetospeed(&tio, B9600); // 9600 baud
cfsetispeed(&tio, B9600); // 9600 baud
tio.c_cc[VTIME] = 20;
return 0;
}
@ -73,10 +70,53 @@ void meter_close_d0(meter_t *mtr) {
}
size_t meter_read_d0(meter_t *mtr, reading_t rds[], size_t n) {
// TODO implement
rds->value = 123.456;
gettimeofday(&rds->time, NULL);
meter_handle_d0_t *handle = &mtr->handle.d0;
struct timeval time;
enum { IDENTIFICATION, OBIS, VALUE, UNIT } context;
char identification[20]; /* 3 vendor + 1 baudrate + 16 meter specific */
char line[78]; /* 16 obis + '(' + 32 value + '*' + 16 unit + ')' + '\r' + '\n' */
char byte;
int fd = handle->fd;
int i, j, m;
/* wait for identification */
while (read(fd, &byte, 1)) {
if (byte == '/') { /* start of identification */
for (i = 0; i < 16 && byte != '\r'; i++) {
read(fd, &byte, 1);
identification[i] = byte;
}
identification[i] = '\0';
break;
}
}
/* debug */
printf("got message from %s\n", identification);
/* take timestamp */
gettimeofday(&time, NULL);
/* read data lines: "obis(value*unit)" */
m = 0;
while (m < n && byte != '!') {
meter_d0_parse_line(&rds[m], line, j);
}
return 1;
}
int meter_d0_parse_line(reading_t &rd, char *line, size_t n) {
char id[16];
char value[32];
char unit[16];
rds[m].time = time; /* use timestamp of data block arrival for all readings */
rds[m].value = ;
rds[m].identifier.obis = ;
}

View file

@ -46,8 +46,8 @@ const meter_type_t meter_types[] = {
double tvtod(struct timeval tv) {
return tv.tv_sec + tv.tv_usec / 1e6;
}
void meter_init(meter_t *mtr, meter_type_t *type, const char *connection) {
void meter_init(meter_t *mtr, const meter_type_t *type, const char *connection) {
static int instances; /* static to generate channel ids */
snprintf(mtr->id, 5, "mtr%i", instances++);

View file

@ -39,29 +39,33 @@ int meter_open_s0(meter_t *mtr) {
meter_handle_s0_t *handle = &mtr->handle.s0;
/* open port */
handle->fd = open(mtr->connection, O_RDWR | O_NOCTTY);
int fd = open(mtr->connection, O_RDWR | O_NOCTTY);
if (handle->fd < 0) {
if (fd < 0) {
perror(mtr->connection);
return -1;
}
/* save current port settings */
tcgetattr(handle->fd, &handle->oldtio);
tcgetattr(fd, &handle->oldtio);
/* configure port */
struct termios tio;
memset(&tio, 0, sizeof(struct termios));
tio.c_cflag = B300 | CS8 | CLOCAL | CREAD;
tio.c_iflag = IGNPAR;
tio.c_oflag = 0;
tio.c_lflag = 0; /* set input mode (non-canonical, no echo,...) */
tio.c_cc[VTIME] = 0; /* inter-character timer unused */
tio.c_cc[VMIN] = 1; /* blocking read until data is received */
tio.c_iflag = IGNPAR;
tio.c_oflag = 0;
tio.c_lflag = 0;
tio.c_cc[VMIN]=1;
tio.c_cc[VTIME]=0;
tcflush(fd, TCIFLUSH);
/* apply configuration */
tcsetattr(handle->fd, TCSANOW, &tio);
tcsetattr(fd, TCSANOW, &tio);
handle->fd = fd;
return 0;
}
@ -77,23 +81,21 @@ void meter_close_s0(meter_t *mtr) {
size_t meter_read_s0(meter_t *mtr, reading_t rds[], size_t n) {
meter_handle_s0_t *handle = &mtr->handle.s0;
char buf[8];
rds->value = 1;
/* clear input buffer */
tcflush(handle->fd, TCIOFLUSH);
/* blocking until one character/pulse is read */
read(handle->fd, buf, 8);
/* store current timestamp */
gettimeofday(&rds->time, NULL);
rds->value = 1;
/* wait some ms for debouncing */
usleep(30000);
return 1;
}