From 668812aff9881042e40b1bf8bc5323ada46970d1 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 15 Oct 2011 23:58:50 +0200 Subject: [PATCH] introduced new JSON configuration format and some structural changes (meter => channel mapping) --- bin/logger/Makefile.am | 2 +- bin/logger/Makefile.in | 28 +- bin/logger/include/channel.h | 3 +- .../include/{options.h => configuration.h} | 31 +-- bin/logger/include/local.h | 1 + bin/logger/include/vzlogger.h | 38 ++- bin/logger/src/api.c | 8 +- bin/logger/src/buffer.c | 2 +- bin/logger/src/channel.c | 5 +- bin/logger/src/configuration.c | 263 ++++++++++++++++++ bin/logger/src/local.c | 58 ++-- bin/logger/src/options.c | 224 --------------- bin/logger/src/threads.c | 63 +++-- bin/logger/src/vzlogger.c | 262 ++++++++++++++--- include/d0.h | 2 + include/meter.h | 2 +- include/s0.h | 5 +- src/d0.c | 64 ++++- src/meter.c | 4 +- src/s0.c | 42 +-- 20 files changed, 711 insertions(+), 396 deletions(-) rename bin/logger/include/{options.h => configuration.h} (62%) create mode 100644 bin/logger/src/configuration.c delete mode 100644 bin/logger/src/options.c diff --git a/bin/logger/Makefile.am b/bin/logger/Makefile.am index e1b2746..876883c 100644 --- a/bin/logger/Makefile.am +++ b/bin/logger/Makefile.am @@ -2,7 +2,7 @@ AM_CFLAGS = -Wall -D_REENTRANT -std=gnu99 $(DEPS_VZ_CFLAGS) AM_CPPFLAGS = -I$(top_srcdir)/include -Iinclude bin_PROGRAMS = vzlogger -vzlogger_SOURCES = src/vzlogger.c src/channel.c src/api.c src/options.c src/threads.c src/buffer.c +vzlogger_SOURCES = src/vzlogger.c src/channel.c src/api.c src/configuration.c src/threads.c src/buffer.c vzlogger_LDADD = $(top_srcdir)/src/libmeter.a vzlogger_LDFLAGS = -lpthread -lm $(DEPS_VZ_LIBS) diff --git a/bin/logger/Makefile.in b/bin/logger/Makefile.in index c250e11..707c3b8 100644 --- a/bin/logger/Makefile.in +++ b/bin/logger/Makefile.in @@ -57,10 +57,10 @@ CONFIG_CLEAN_VPATH_FILES = am__installdirs = "$(DESTDIR)$(bindir)" PROGRAMS = $(bin_PROGRAMS) am__vzlogger_SOURCES_DIST = src/vzlogger.c src/channel.c src/api.c \ - src/options.c src/threads.c src/buffer.c src/local.c + src/configuration.c src/threads.c src/buffer.c src/local.c @LOCAL_SUPPORT_TRUE@am__objects_1 = local.$(OBJEXT) am_vzlogger_OBJECTS = vzlogger.$(OBJEXT) channel.$(OBJEXT) \ - api.$(OBJEXT) options.$(OBJEXT) threads.$(OBJEXT) \ + api.$(OBJEXT) configuration.$(OBJEXT) threads.$(OBJEXT) \ buffer.$(OBJEXT) $(am__objects_1) vzlogger_OBJECTS = $(am_vzlogger_OBJECTS) am__DEPENDENCIES_1 = @@ -183,7 +183,7 @@ AM_CFLAGS = -Wall -D_REENTRANT -std=gnu99 $(DEPS_VZ_CFLAGS) \ $(am__append_3) $(am__append_5) AM_CPPFLAGS = -I$(top_srcdir)/include -Iinclude vzlogger_SOURCES = src/vzlogger.c src/channel.c src/api.c \ - src/options.c src/threads.c src/buffer.c $(am__append_1) + src/configuration.c src/threads.c src/buffer.c $(am__append_1) vzlogger_LDADD = $(top_srcdir)/src/libmeter.a $(am__append_2) \ $(am__append_4) vzlogger_LDFLAGS = -lpthread -lm $(DEPS_VZ_LIBS) @@ -271,8 +271,8 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/api.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/buffer.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/channel.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/configuration.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/local.Po@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/options.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/threads.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/vzlogger.Po@am__quote@ @@ -332,19 +332,19 @@ api.obj: src/api.c @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o api.obj `if test -f 'src/api.c'; then $(CYGPATH_W) 'src/api.c'; else $(CYGPATH_W) '$(srcdir)/src/api.c'; fi` -options.o: src/options.c -@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT options.o -MD -MP -MF $(DEPDIR)/options.Tpo -c -o options.o `test -f 'src/options.c' || echo '$(srcdir)/'`src/options.c -@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/options.Tpo $(DEPDIR)/options.Po -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/options.c' object='options.o' libtool=no @AMDEPBACKSLASH@ +configuration.o: src/configuration.c +@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT configuration.o -MD -MP -MF $(DEPDIR)/configuration.Tpo -c -o configuration.o `test -f 'src/configuration.c' || echo '$(srcdir)/'`src/configuration.c +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/configuration.Tpo $(DEPDIR)/configuration.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/configuration.c' object='configuration.o' libtool=no @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o options.o `test -f 'src/options.c' || echo '$(srcdir)/'`src/options.c +@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o configuration.o `test -f 'src/configuration.c' || echo '$(srcdir)/'`src/configuration.c -options.obj: src/options.c -@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT options.obj -MD -MP -MF $(DEPDIR)/options.Tpo -c -o options.obj `if test -f 'src/options.c'; then $(CYGPATH_W) 'src/options.c'; else $(CYGPATH_W) '$(srcdir)/src/options.c'; fi` -@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/options.Tpo $(DEPDIR)/options.Po -@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/options.c' object='options.obj' libtool=no @AMDEPBACKSLASH@ +configuration.obj: src/configuration.c +@am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT configuration.obj -MD -MP -MF $(DEPDIR)/configuration.Tpo -c -o configuration.obj `if test -f 'src/configuration.c'; then $(CYGPATH_W) 'src/configuration.c'; else $(CYGPATH_W) '$(srcdir)/src/configuration.c'; fi` +@am__fastdepCC_TRUE@ $(am__mv) $(DEPDIR)/configuration.Tpo $(DEPDIR)/configuration.Po +@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='src/configuration.c' object='configuration.obj' libtool=no @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o options.obj `if test -f 'src/options.c'; then $(CYGPATH_W) 'src/options.c'; else $(CYGPATH_W) '$(srcdir)/src/options.c'; fi` +@am__fastdepCC_FALSE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -c -o configuration.obj `if test -f 'src/configuration.c'; then $(CYGPATH_W) 'src/configuration.c'; else $(CYGPATH_W) '$(srcdir)/src/configuration.c'; fi` threads.o: src/threads.c @am__fastdepCC_TRUE@ $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) -MT threads.o -MD -MP -MF $(DEPDIR)/threads.Tpo -c -o threads.o `test -f 'src/threads.c' || echo '$(srcdir)/'`src/threads.c diff --git a/bin/logger/include/channel.h b/bin/logger/include/channel.h index dcd6c1d..f1a1675 100644 --- a/bin/logger/include/channel.h +++ b/bin/logger/include/channel.h @@ -36,7 +36,6 @@ typedef struct { char id[5]; /* only for internal usage & debugging */ char *middleware; /* url to middleware */ char *uuid; /* unique identifier for middleware */ - unsigned long interval; /* polling interval (== 0 for meters) */ reading_id_t identifier; /* channel identifier (OBIS, string) */ buffer_t buffer; /* circular queue to buffer readings */ @@ -47,7 +46,7 @@ typedef struct { } channel_t; /* Prototypes */ -void channel_init(channel_t *ch, char *uuid, char *middleware, unsigned long interval, reading_id_t identifier); +void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier); void channel_free(channel_t *ch); #endif /* _CHANNEL_H_ */ diff --git a/bin/logger/include/options.h b/bin/logger/include/configuration.h similarity index 62% rename from bin/logger/include/options.h rename to bin/logger/include/configuration.h index b4ad524..cd4c205 100644 --- a/bin/logger/include/options.h +++ b/bin/logger/include/configuration.h @@ -23,28 +23,19 @@ * along with volkszaehler.org. If not, see . */ -#ifndef _OPTIONS_H_ -#define _OPTIONS_H_ +#ifndef _CONFIGURATION_H_ +#define _CONFIGURATION_H_ + +#include -#include "list.h" #include "channel.h" +#include "list.h" +#include "vzlogger.h" -/** - * Options from command line - */ -typedef struct { - char *config; /* path to config file */ - unsigned int port; /* tcp port for local interface */ - int verbose; /* verbosity level */ +void parse_configuration(char *filename, list_t *assocs, options_t *opts); +channel_t * parse_channel(struct json_object *jso); +assoc_t * parse_meter(struct json_object *jso); - /* boolean bitfields, padding at the end of struct */ - int daemon:1; /* run in background */ - int local:1; /* enable local interface */ - int logging:1; /* start logging threads */ -} options_t; +int check_type(char *key, struct json_object *jso, enum json_type type); -/* Prototypes */ -void parse_options(int argc, char *argv[], options_t *options); -void parse_channels(const char *filename, list_t *meters); - -#endif /* _OPTIONS_H_ */ +#endif /* _CONFIGURATION_H_ */ diff --git a/bin/logger/include/local.h b/bin/logger/include/local.h index 49a8af7..51ec990 100644 --- a/bin/logger/include/local.h +++ b/bin/logger/include/local.h @@ -26,6 +26,7 @@ #ifndef _LOCAL_H_ #define _LOCAL_H_ +#include /* required for MHD */ #include int handle_request( diff --git a/bin/logger/include/vzlogger.h b/bin/logger/include/vzlogger.h index ee3ca6d..38e974d 100644 --- a/bin/logger/include/vzlogger.h +++ b/bin/logger/include/vzlogger.h @@ -30,13 +30,15 @@ #include #include "config.h" - #include "list.h" -/* some hard coded configuration */ -#define RETRY_PAUSE 30 /* seconds to wait after failed request */ -#define BUFFER_KEEP 600 /* for the local interface; in seconds */ -#define COMET_TIMEOUT 30 /* in seconds */ +enum { + LOG_ERROR = -1, + LOG_STD = 0, + LOG_INFO = 5, + LOG_DEBUG = 10, + LOG_FINEST = 15 +}; typedef enum { UNKNOWN, @@ -51,12 +53,38 @@ typedef enum { typedef struct { meter_t meter; list_t channels; + int interval; + pthread_t thread; pthread_status_t status; } assoc_t; +/** + * Options from command line + */ +typedef struct { + char *config; /* filename of configuration */ + char *log; /* filename for logging */ + FILE *logfd; + + int port; /* TCP port for local interface */ + int verbosity; /* verbosity level */ + int comet_timeout; /* in seconds; */ + int buffer_length; /* in seconds; how long to buffer readings for local interfalce */ + int retry_pause; /* in seconds; how long to pause after an unsuccessful HTTP request */ + + /* boolean bitfields, padding at the end of struct */ + int channel_index:1; /* give a index of all available channels via local interface */ + int daemon:1; /* run in background */ + int foreground:1; /* dont fork in background */ + int local:1; /* enable local interface */ + int logging:1; /* start logging threads, depends on local & daemon */ +} options_t; + /* Prototypes */ void print(int level, const char *format, void *id, ... ); void usage(char ** argv); +void quit(int sig); +void parse_options(int argc, char *argv[], options_t *options); #endif /* _VZLOGGER_H_ */ diff --git a/bin/logger/src/api.c b/bin/logger/src/api.c index 1b075ac..3826f83 100644 --- a/bin/logger/src/api.c +++ b/bin/logger/src/api.c @@ -31,7 +31,7 @@ #include #include "api.h" -#include "options.h" +#include "vzlogger.h" extern options_t options; @@ -87,11 +87,11 @@ json_object * api_json_tuples(buffer_t *buf, reading_t *first, reading_t *last) json_object *json_tuples = json_object_new_array(); reading_t *it; - for (it = first; it != last->next; it = it->next) { + for (it = first; it != NULL && it != last->next; it = it->next) { struct json_object *json_tuple = json_object_new_array(); pthread_mutex_lock(&buf->mutex); - + // TODO use long int of new json-c version // API requires milliseconds => * 1000 double timestamp = tvtod(it->time) * 1000; @@ -128,7 +128,7 @@ CURL * api_curl_init(channel_t *ch) { curl_easy_setopt(curl, CURLOPT_URL, url); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header); - curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) options.verbose); + curl_easy_setopt(curl, CURLOPT_VERBOSE, options.verbosity); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback); curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch); diff --git a/bin/logger/src/buffer.c b/bin/logger/src/buffer.c index b99aeb0..149dcfb 100644 --- a/bin/logger/src/buffer.c +++ b/bin/logger/src/buffer.c @@ -45,7 +45,7 @@ void buffer_init(buffer_t *buf) { reading_t * buffer_push(buffer_t *buf, reading_t *rd) { reading_t *new; - + /* allocate memory for new reading */ new = malloc(sizeof(reading_t)); diff --git a/bin/logger/src/channel.c b/bin/logger/src/channel.c index 7dab98e..c12acc5 100644 --- a/bin/logger/src/channel.c +++ b/bin/logger/src/channel.c @@ -30,11 +30,10 @@ #include "channel.h" -void channel_init(channel_t *ch, char *uuid, char *middleware, unsigned long interval, reading_id_t identifier) { +void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier) { static int instances; /* static to generate channel ids */ snprintf(ch->id, 5, "ch%i", instances++); - - ch->interval = interval; + ch->identifier = identifier; ch->status = UNKNOWN; diff --git a/bin/logger/src/configuration.c b/bin/logger/src/configuration.c new file mode 100644 index 0000000..eda1a25 --- /dev/null +++ b/bin/logger/src/configuration.c @@ -0,0 +1,263 @@ +/** + * Parsing Apache HTTPd-like configuration + * + * @author Steffen Vogel + * @copyright Copyright (c) 2011, The volkszaehler.org project + * @package vzlogger + * @license http://opensource.org/licenses/gpl-license.php GNU Public License + */ +/* + * This file is part of volkzaehler.org + * + * volkzaehler.org is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * volkzaehler.org is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with volkszaehler.org. If not, see . + */ + +#include +#include + +#include "configuration.h" +#include "channel.h" + +extern const meter_type_t meter_types[]; + +void parse_configuration(char *filename, list_t *assocs, options_t *options) { + struct json_object *json_config; + struct json_tokener *json_tok = json_tokener_new(); + + char buf[JSON_FILE_BUF_SIZE]; + int line = 0; + + /* open configuration file */ + FILE *file = fopen(filename, "r"); + + if (file == NULL) { + print(LOG_ERROR, "Cannot open configfile %s: %s", NULL, filename, strerror(errno)); /* why didn't the file open? */ + exit(EXIT_FAILURE); + } + else { + print(2, "Start parsing configuration from %s", NULL, filename); + } + + /* parse JSON */ + while(fgets(buf, JSON_FILE_BUF_SIZE, file)) { + line++; + + json_config = json_tokener_parse_ex(json_tok, buf, strlen(buf)); + + if (json_tok->err > 1) { + print(-1, "Error in %s:%d %s at offset %d", NULL, filename, line, json_tokener_errors[json_tok->err], json_tok->char_offset); + exit(EXIT_FAILURE); + } + } + + fclose(file); + json_tokener_free(json_tok); + + /* parse configuration */ + json_object_object_foreach(json_config, key, value) { + if (strcmp(key, "daemon") == 0 && check_type(key, value, json_type_boolean)) { + options->daemon = json_object_get_boolean(value); + } + else if (strcmp(key, "foreground") == 0 && check_type(key, value, json_type_boolean)) { + options->foreground = json_object_get_boolean(value); + } + else if (strcmp(key, "log") == 0 && check_type(key, value, json_type_string)) { + options->log = strdup(json_object_get_string(value)); + } + else if (strcmp(key, "retry") == 0 && check_type(key, value, json_type_int)) { + options->retry_pause = json_object_get_int(value); + } + else if (strcmp(key, "verbosity") == 0 && check_type(key, value, json_type_int)) { + options->verbosity = json_object_get_int(value); + } + else if (strcmp(key, "local") == 0) { + json_object_object_foreach(value, key, local_value) { + if (strcmp(key, "enabled") == 0 && check_type(key, local_value, json_type_boolean)) { + options->local = json_object_get_boolean(local_value); + } + else if (strcmp(key, "port") == 0 && check_type(key, local_value, json_type_int)) { + options->port = json_object_get_int(local_value); + } + else if (strcmp(key, "timeout") == 0 && check_type(key, local_value, json_type_int)) { + options->comet_timeout = json_object_get_int(local_value); + } + else if (strcmp(key, "buffer") == 0 && check_type(key, local_value, json_type_int)) { + options->buffer_length = json_object_get_int(local_value); + } + else if (strcmp(key, "index") == 0 && check_type(key, local_value, json_type_boolean)) { + options->channel_index = json_object_get_boolean(local_value); + } + else { + print(-1, "Invalid field: %s", NULL, key); + exit(EXIT_FAILURE); + } + } + } + else if ((strcmp(key, "sensors") == 0 || strcmp(key, "meters") == 0) && check_type(key, value, json_type_array)) { + int len = json_object_array_length(value); + for (int i = 0; i < len; i++) { + assoc_t *as = parse_meter(json_object_array_get_idx(value, i)); + + if (as != NULL) { + list_push(assocs, as); + } + } + } + else { + print(-1, "Invalid field: %s", NULL, key); + exit(EXIT_FAILURE); + } + } + + json_object_put(json_config); +} + +assoc_t * parse_meter(struct json_object *jso) { + assoc_t *assoc = malloc(sizeof(assoc_t)); + list_t json_channels; + + const meter_type_t *type = NULL; + const char *connection = NULL; + const char *protocol = NULL; + int enabled = TRUE; + + list_init(&assoc->channels); + list_init(&json_channels); + assoc->interval = 0; + + json_object_object_foreach(jso, key, value) { + if (strcmp(key, "enabled") == 0 && check_type(key, value, json_type_boolean)) { + enabled = json_object_get_boolean(value); + } + else if (strcmp(key, "protocol") == 0 && check_type(key, value, json_type_string)) { + protocol = json_object_get_string(value); + + for (type = meter_types; type->name != NULL; type++) { /* linear search */ + if (strcmp(type->name, protocol) == 0) break; + } + + if (type == NULL) { + print(-1, "Invalid protocol: %s", NULL, protocol); + } + } + else if (strcmp(key, "connection") == 0 && check_type(key, value, json_type_string)) { + connection = json_object_get_string(value); + } + else if (strcmp(key, "interval") == 0 && check_type(key, value, json_type_int)) { + assoc->interval = json_object_get_int(value); + } + else if (strcmp(key, "channels") == 0 && check_type(key, value, json_type_array)) { + int len = json_object_array_length(value); + for (int i = 0; i < len; i++) { + list_push(&json_channels, value); + } + } + else if (strcmp(key, "channel") == 0 && check_type(key, value, json_type_object)) { + list_push(&json_channels, value); + } + else { + print(-1, "Invalid field: %s", NULL, key); + exit(EXIT_FAILURE); + } + } + + if (type == NULL) { + print(-1, "Missing protocol", NULL); + exit(EXIT_FAILURE); + } + else if (connection == NULL) { + print(-1, "Missing connection", NULL); + exit(EXIT_FAILURE); + } + else if (enabled == TRUE) { + /* init meter */ + meter_init(&assoc->meter, type, connection); + print(5, "New meter initialized (protocol=%s, connection=%s, interval=%d)", assoc, protocol, connection, assoc->interval); + + /* init channels */ + foreach(json_channels, it) { + struct json_object *jso = (struct json_object *) it->data; + channel_t *ch = parse_channel(jso); + + if (ch != NULL) { + list_push(&assoc->channels, ch); + } + } + + return assoc; + } + else { + return NULL; + } +} + +channel_t * parse_channel(struct json_object *jso) { + const char *uuid = NULL; + const char *middleware = NULL; + const char *identifier = NULL; + int enabled = TRUE; + + json_object_object_foreach(jso, key, value) { + if (strcmp(key, "uuid") == 0 && check_type(key, value, json_type_string)) { + uuid = json_object_get_string(value); + } + else if (strcmp(key, "middleware") == 0 && check_type(key, value, json_type_string)) { + middleware = json_object_get_string(value); + } + else if (strcmp(key, "identifier") == 0 && check_type(key, value, json_type_string)) { + identifier = json_object_get_string(value); + } + else if (strcmp(key, "enabled") == 0 && check_type(key, value, json_type_boolean)) { + enabled = json_object_get_boolean(value); + } + else { + print(-1, "Invalid field: %s", NULL, key); + exit(EXIT_FAILURE); + } + } + + if (uuid == NULL) { + print(-1, "Missing UUID", NULL); + exit(EXIT_FAILURE); + } + else if (middleware == NULL) { + print(-1, "Missing middleware", NULL); + exit(EXIT_FAILURE); + } + else if (enabled == TRUE) { + reading_id_t id = { (identifier == NULL) ? obis_init(NULL) : obis_parse(identifier) }; + + channel_t *ch = malloc(sizeof(channel_t)); + channel_init(ch, uuid, middleware, id); + print(5, "New channel initialized (uuid=...%s middleware=%s identifier=%s)", ch, uuid+30, middleware, identifier); + + return ch; + } + else { + return NULL; + } +} + +int check_type(char *key, struct json_object *jso, enum json_type type) { + char *json_types[] = { "null", "boolean", "double", "int", "object", "array", "string" }; + + if (json_object_get_type(jso) != type) { + print(-1, "Invalid variable type for field %s: %s (%s)", NULL, key, json_object_get_string(jso), json_types[json_object_get_type(jso)]); + exit(EXIT_FAILURE); + } + else { + return 1; + } +} + diff --git a/bin/logger/src/local.c b/bin/logger/src/local.c index 5f04ec9..0e036e3 100644 --- a/bin/logger/src/local.c +++ b/bin/logger/src/local.c @@ -33,6 +33,8 @@ #include "local.h" #include "api.h" +extern options_t options; + int handle_request(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **con_cls) { @@ -44,48 +46,63 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url struct MHD_Response *response; const char *mode = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "mode"); - print(2, "Local request received: method=%s url=%s mode=%s", NULL, method, url, mode); - + print(2, "Local request received: method=%s url=%s mode=%s", "http", method, url, mode); + if (strcmp(method, "GET") == 0) { struct timespec ts; struct timeval tp; - + struct json_object *json_obj = json_object_new_object(); struct json_object *json_data = json_object_new_array(); - - const char *uuid = url+1; + struct json_object *json_exception = NULL; + + const char *uuid = url + 1; /* strip leading slash */ const char *json_str; - + int show_all = 0; + + if (strcmp(url, "/") == 0) { + if (options.channel_index) { + show_all = TRUE; + } + else { + json_exception = json_object_new_object(); + + json_object_object_add(json_exception, "message", json_object_new_string("channel indexing is disabled")); + json_object_object_add(json_exception, "code", json_object_new_int((int) 0)); + } + } + foreach(*assocs, it) { assoc_t *assoc = (assoc_t *) it->data; - + foreach(assoc->channels, it) { channel_t *ch = (channel_t *) it->data; - - if (strcmp(url, "/") == 0 || strcmp(ch->uuid, uuid) == 0) { + + if (strcmp(ch->uuid, uuid) == 0 || show_all) { response_code = MHD_HTTP_OK; - /* blocking until new data arrives (comet-like blocking of HTTP response) */ - if (strcmp(url, "/") != 0 && mode && strcmp(mode, "comet") == 0) { + /* blocking until new data arrives (comet-like blocking of HTTP response) */ + if (mode && strcmp(mode, "comet") == 0) { /* convert from timeval to timespec */ gettimeofday(&tp, NULL); - ts.tv_sec = tp.tv_sec + COMET_TIMEOUT; + ts.tv_sec = tp.tv_sec + options.comet_timeout; ts.tv_nsec = tp.tv_usec * 1000; - + pthread_mutex_lock(&ch->buffer.mutex); pthread_cond_timedwait(&ch->condition, &ch->buffer.mutex, &ts); pthread_mutex_unlock(&ch->buffer.mutex); } - + struct json_object *json_ch = json_object_new_object(); json_object_object_add(json_ch, "uuid", json_object_new_string(ch->uuid)); json_object_object_add(json_ch, "middleware", json_object_new_string(ch->middleware)); - json_object_object_add(json_ch, "interval", json_object_new_int(ch->interval)); + json_object_object_add(json_ch, "interval", json_object_new_int(assoc->interval)); + json_object_object_add(json_ch, "protocol", json_object_new_int(assoc->meter.type->name)); struct json_object *json_tuples = api_json_tuples(&ch->buffer, ch->buffer.head, ch->buffer.tail); json_object_object_add(json_ch, "tuples", json_tuples); - + json_object_array_add(json_data, json_ch); } } @@ -95,6 +112,10 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url json_object_object_add(json_obj, "generator", json_object_new_string(PACKAGE)); json_object_object_add(json_obj, "data", json_data); + if (json_exception) { + json_object_object_add(json_obj, "exception", json_exception); + } + json_str = json_object_to_json_string(json_obj); response = MHD_create_response_from_data(strlen(json_str), (void *) json_str, FALSE, TRUE); json_object_put(json_obj); @@ -109,10 +130,9 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url MHD_add_response_header(response, "Content-type", "text/text"); } - - + status = MHD_queue_response(connection, response_code, response); - + MHD_destroy_response(response); return status; diff --git a/bin/logger/src/options.c b/bin/logger/src/options.c deleted file mode 100644 index a6fb17e..0000000 --- a/bin/logger/src/options.c +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Parsing commandline options and channel list - * - * @author Steffen Vogel - * @copyright Copyright (c) 2011, The volkszaehler.org project - * @package vzlogger - * @license http://opensource.org/licenses/gpl-license.php GNU Public License - */ -/* - * This file is part of volkzaehler.org - * - * volkzaehler.org is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * volkzaehler.org is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with volkszaehler.org. If not, see . - */ - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "options.h" -#include "channel.h" -#include "vzlogger.h" - -extern meter_type_t meter_types[]; - -options_t options = { /* setting default options */ - "/etc/vzlogger.conf", /* config file */ - 8080, /* port for local interface */ - 0, /* verbosity level */ - 0, /* daemon mode */ - 0 /* local interface */ -}; - -/** - * Command line options - */ -const struct option long_options[] = { - {"config", required_argument, 0, 'c'}, - {"daemon", required_argument, 0, 'd'}, -#ifdef LOCAL_SUPPORT - {"local", no_argument, 0, 'l'}, - {"local-port", required_argument, 0, 'p'}, -#endif /* LOCAL_SUPPORT */ - {"verbose", optional_argument, 0, 'v'}, - {"help", no_argument, 0, 'h'}, - {"version", no_argument, 0, 'V'}, - {} /* stop condition for iterator */ -}; - -/** - * Options for config file - */ -struct { - char *key; - int level; -} - - -/** - * Descriptions vor command line options - */ -char *long_options_descs[] = { - "config file with channel -> uuid mapping", - "run as daemon", -#ifdef LOCAL_SUPPORT - "activate local interface (tiny webserver)", - "TCP port for local interface", -#endif /* LOCAL_SUPPORT */ - "enable verbose output", - "show this help", - "show version of vzlogger", - "" /* stop condition for iterator */ -}; - -/** - * Parse options from command line - */ -void parse_options(int argc, char * argv[], options_t * opts) { - while (1) { - int c = getopt_long(argc, argv, "i:c:p:lhVdv::", long_options, NULL); - - /* detect the end of the options. */ - if (c == -1) break; - - switch (c) { - case 'v': - opts->verbose = (optarg == NULL) ? 1 : atoi(optarg); - break; - -#ifdef LOCAL_SUPPORT - case 'l': - opts->local = 1; - break; - - case 'p': /* port for local interface */ - opts->port = atoi(optarg); - break; -#endif /* LOCAL_SUPPORT */ - - case 'd': - opts->daemon = 1; - break; - - case 'c': /* read config file */ - opts->config = (char *) malloc(strlen(optarg)+1); - strcpy(opts->config, optarg); - break; - - case 'V': - printf("%s\n", VERSION); - exit(EXIT_SUCCESS); - break; - - case '?': - case 'h': - default: - usage(argv); - exit((c == '?') ? EXIT_FAILURE : EXIT_SUCCESS); - } - } - - opts->logging = (!opts->local || opts->daemon); -} - -void parse_channels(const char *filename, list_t *assocs) { - FILE *file = fopen(filename, "r"); /* open configuration */ - - if (!filename) { /* nothing found */ - print(-1, "No config file found! Please specify with --config!\n", NULL); - exit(EXIT_FAILURE); - } - - if (file == NULL) { - perror(filename); /* why didn't the file open? */ - exit(EXIT_FAILURE); - } - else { - print(2, "Start parsing configuration from %s", NULL, filename); - } - - /* compile regular expressions */ - regex_t re_uuid, re_middleware, re_section, re_value; - regcomp(&re_uuid, "^[:xdigit:]{8}-([:xdigit:]{4}-){3}[:xdigit:]{12}$", REG_EXTENDED | REG_NOSUB); - regcomp(&re_middleware, "^https?://[[:alnum:].-]+(\\.[:alpha:]{2,6})?(:[:digit:]{1,5})?(/\\S*)?$", REG_EXTENDED | REG_NOSUB); - - regcomp(&re_section, "^[:blank:]*<(/?)([:alpha]+)>[:blank:]*$", REG_EXTENDED); - regcomp(&re_value, "^[:blank:]*([:alpha]+)[:blank:]+(.+)[:blank:]*$", REG_EXTENDED); - - struct options { - char *key; - enum context; - //char regex; - } - - /* read a line */ - while ((line = fgets(buffer, 256, file)) != NULL) { - if (regex(&re_section) == 0) { - - } - else if (regex(&re_value) == 0) { - switch (context) { - case GLOBAL: - /* no values allowed here */ - break; - - case METER: - if (strcasecmp(key, "protocol") == 0) { - - } - else if (strcasecmp(key, "interval") == 0) { - - } - else if (strcasecmp(key, "host") == 0) { - - } - else if (strcasecmp(key, "port") == 0) { - - } - break; - - case CHANNEL: - if (strcasecmp(key, "uuid") == 0) { - - } - else if (strcasecmp(key, "middlware") == 0) { - - } - else if (strcasecmp(key, "identifier") == 0) { - - } - - break; - - char *key, *value; - - - } - } - - } - - fclose(file); - free(buffer); - regfree(&re_middleware); - regfree(&re_uuid); -} - - diff --git a/bin/logger/src/threads.c b/bin/logger/src/threads.c index de323e9..abcad77 100644 --- a/bin/logger/src/threads.c +++ b/bin/logger/src/threads.c @@ -28,7 +28,7 @@ #include "threads.h" #include "api.h" -#include "options.h" +#include "vzlogger.h" extern options_t options; @@ -42,21 +42,26 @@ void * reading_thread(void *arg) { reading_t *rds = malloc(sizeof(reading_t) * mtr->type->max_readings); time_t last, delta; size_t n = 0; - + pthread_cleanup_push(&reading_thread_cleanup, rds); - + do { /* start thread main loop */ /* fetch readings from meter */ - last = time(NULL); n = meter_read(mtr, rds, mtr->type->max_readings); delta = time(NULL) - last; - + + /* update buffer length with current interval */ + if (!mtr->type->periodic && delta != assoc->interval) { + print(15, "Updating interval to %i", mtr, delta); + assoc->interval = delta; + } + foreach(assoc->channels, it) { channel_t *ch = (channel_t *) it->data; buffer_t *buf = &ch->buffer; reading_t *added; - + for (int i = 0; i < n; i++) { switch (mtr->type->id) { case SML: @@ -66,51 +71,49 @@ void * reading_thread(void *arg) { added = buffer_push(buf, &rds[i]); } break; - + default: /* no channel identifier, adding all readings to buffer */ - print(5, "New reading (value=%.2f ts=%f)", ch, ch->id, rds[i].value, tvtod(rds[i].time)); + print(LOG_INFO, "New reading (value=%.2f ts=%f)", ch, ch->id, rds[i].value, tvtod(rds[i].time)); added = buffer_push(buf, &rds[i]); } } - - /* update buffer length with current interval */ - if (!mtr->type->periodic && options.local && delta) { - print(15, "Updating interval to %i", ch, delta); - ch->interval = delta; - ch->buffer.keep = ceil(BUFFER_KEEP / (double) delta); + + /* update buffer length to interval */ + if (options.local) { + ch->buffer.keep = ceil(options.buffer_length / assoc->interval); } - + /* queue reading into sending buffer logging thread if logging is enabled & sent queue is empty */ if (options.logging && !buf->sent) { buf->sent = added; } - + /* shrink buffer */ buffer_clean(buf); - + /* notify webserver and logging thread */ pthread_mutex_lock(&buf->mutex); pthread_cond_broadcast(&ch->condition); pthread_mutex_unlock(&buf->mutex); - + /* debugging */ - if (options.verbose >= 10) { + if (options.verbosity >= 10) { char dump[1024]; buffer_dump(buf, dump, 1024); - print(10, "Buffer dump: %s (size=%i, memory=%i)", ch, dump, buf->size, buf->keep); + print(LOG_DEBUG, "Buffer dump: %s (size=%i, keep=%i)", ch, dump, buf->size, buf->keep); } } if ((options.daemon || options.local) && mtr->type->periodic) { - print(8, "Next reading in %i seconds", NULL, 5); + print(LOG_INFO, "Next reading in %i seconds", mtr, 5); sleep(5); // TODO handle parsing } } while (options.daemon || options.local); - + pthread_cleanup_pop(1); - + return NULL; } @@ -141,11 +144,11 @@ void * logging_thread(void *arg) { pthread_cond_wait(&ch->condition, &ch->buffer.mutex); /* sleep until new data has been read */ } pthread_mutex_unlock(&ch->buffer.mutex); - + last = ch->buffer.tail; json_obj = api_json_tuples(&ch->buffer, ch->buffer.sent, last); json_str = json_object_to_json_string(json_obj); - + print(10, "JSON request body: %s", ch, json_str); curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str); @@ -170,17 +173,17 @@ void * logging_thread(void *arg) { print(-1, "Error from middleware: %s", ch, err); } } - + /* householding */ free(response.data); json_object_put(json_obj); - + if (options.daemon && (curl_code != CURLE_OK || http_code != 200)) { - print(2, "Sleeping %i seconds due to previous failure", ch, RETRY_PAUSE); - sleep(RETRY_PAUSE); + print(1, "Waiting %i secs for next request due to previous failure", ch, options.retry_pause); + sleep(options.retry_pause); } } while (options.daemon); - + pthread_cleanup_pop(1); return NULL; diff --git a/bin/logger/src/vzlogger.c b/bin/logger/src/vzlogger.c index c64cdcf..25f9476 100644 --- a/bin/logger/src/vzlogger.c +++ b/bin/logger/src/vzlogger.c @@ -25,18 +25,23 @@ #include /* for print() */ +#include #include +#include +#include #include #include -#include #include #include #include +#include +#include +#include #include "vzlogger.h" #include "list.h" #include "channel.h" -#include "options.h" +#include "configuration.h" #include "threads.h" #ifdef LOCAL_SUPPORT @@ -44,13 +49,47 @@ #include "local.h" #endif /* LOCAL_SUPPORT */ -list_t assocs; /* mapping between meters and channels */ - -extern options_t options; -extern const char *long_options_descs[]; -extern const struct option long_options[]; extern const meter_type_t meter_types[]; +list_t assocs; /* mapping between meters and channels */ +options_t options; /* global application options */ + +/** + * Command line options + */ +const struct option long_options[] = { + {"config", required_argument, 0, 'c'}, + {"log", required_argument, 0, 'o'}, + {"daemon", required_argument, 0, 'd'}, + {"foreground", required_argument, 0, 'f'}, +#ifdef LOCAL_SUPPORT + {"httpd", no_argument, 0, 'l'}, + {"httpd-port", required_argument, 0, 'p'}, +#endif /* LOCAL_SUPPORT */ + {"verbose", required_argument, 0, 'v'}, + {"help", no_argument, 0, 'h'}, + {"version", no_argument, 0, 'V'}, + {} /* stop condition for iterator */ +}; + +/** + * Descriptions vor command line options + */ +const char *long_options_descs[] = { + "configuration file", + "log file", + "run as periodically", + "do not run in background", +#ifdef LOCAL_SUPPORT + "activate local interface (tiny HTTPd which serves live readings)", + "TCP port for HTTPd", +#endif /* LOCAL_SUPPORT */ + "enable verbose output", + "show this help", + "show version of vzlogger", + NULL /* stop condition for iterator */ +}; + /** * Print available options and some other usefull information */ @@ -62,7 +101,7 @@ void usage(char *argv[]) { printf("Usage: %s [options]\n\n", argv[0]); printf(" following options are available:\n"); - while (op->name && *desc) { + while (op->name && desc) { printf("\t-%c, --%-12s\t%s\n", op->val, op->name, *desc); op++; desc++; } @@ -93,18 +132,22 @@ void print(int level, const char *format, void *id, ... ) { struct tm * timeinfo; char buffer[1024], *pos = buffer; - if (level <= options.verbose) { + if (level <= options.verbosity) { gettimeofday(&now, NULL); timeinfo = localtime(&now.tv_sec); - + pos += sprintf(pos, "["); pos += strftime(pos, 16, "%b %d %H:%M:%S", timeinfo); if (id != NULL) { - pos += sprintf(pos, "][%s]\t", (char *) id); + pos += sprintf(pos, "][%s]", (char *) id); } else { - pos += sprintf(pos, "]\t"); + pos += sprintf(pos, "]"); + } + + while(pos - buffer < 24) { + pos += sprintf(pos, " "); } va_start(args, id); @@ -112,9 +155,56 @@ void print(int level, const char *format, void *id, ... ) { va_end(args); fprintf((level > 0) ? stdout : stderr, "%s\n", buffer); + + if (options.logfd) { + fprintf(options.logfd, "%s\n", buffer); + fflush(options.logfd); + } } } +/* http://www.enderunix.org/docs/eng/daemon.php */ +void daemonize() { + if(getppid() == 1) { + return; /* already a daemon */ + } + + int i = fork(); + if (i < 0) { + exit(EXIT_FAILURE); /* fork error */ + } + else if (i > 0) { + exit(EXIT_SUCCESS); /* parent exits */ + } + + /* child (daemon) continues */ + + setsid(); /* obtain a new process group */ + + for (i=getdtablesize();i>=0;--i) { + close(i); /* close all descriptors */ + } + + /* handle standart I/O */ + i = open("/dev/null", O_RDWR); + dup(i); + dup(i); + + chdir("/"); /* change working directory */ + umask(0022); + + /* ignore signals from parent tty */ + struct sigaction action; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + action.sa_handler = SIG_IGN; + + sigaction(SIGCHLD, &action, NULL); /* ignore child */ + sigaction(SIGTSTP, &action, NULL); /* ignore tty signals */ + sigaction(SIGTTOU, &action, NULL); + sigaction(SIGTTIN, &action, NULL); +} + /** * Cancel threads * @@ -122,60 +212,158 @@ void print(int level, const char *format, void *id, ... ) { */ void quit(int sig) { print(2, "Closing connections to terminate", NULL); - + foreach(assocs, it) { assoc_t *assoc = (assoc_t *) it->data; - + pthread_cancel(assoc->thread); - + foreach(assoc->channels, it) { channel_t *ch = (channel_t *) it->data; - pthread_cancel(ch->thread); } } } /** - * The main loop + * Parse options from command line + */ +void parse_options(int argc, char * argv[], options_t * options) { + while (1) { + int c = getopt_long(argc, argv, "c:o:p:lhVdfv:", long_options, NULL); + + /* detect the end of the options. */ + if (c == -1) break; + + switch (c) { + case 'v': + options->verbosity = atoi(optarg); + break; + +#ifdef LOCAL_SUPPORT + case 'l': + options->local = 1; + break; + + case 'p': /* port for local interface */ + options->port = atoi(optarg); + break; +#endif /* LOCAL_SUPPORT */ + + case 'd': + options->daemon = 1; + break; + + case 'f': + options->foreground = 1; + break; + + case 'c': /* config file */ + options->config = strdup(optarg); + break; + + case 'o': /* log file */ + options->log = strdup(optarg); + break; + + case 'V': + printf("%s\n", VERSION); + exit(EXIT_SUCCESS); + break; + + case '?': + case 'h': + default: + usage(argv); + exit((c == '?') ? EXIT_FAILURE : EXIT_SUCCESS); + } + } +} + +/** + * The application entrypoint */ int main(int argc, char *argv[]) { + /* default options */ + options.config = "/etc/vzlogger.conf"; + options.log = "/var/log/vzlogger.log"; + options.logfd = NULL; + options.port = 8080; + options.verbosity = 0; + options.comet_timeout = 30; + options.buffer_length = 600; + options.retry_pause = 15; + options.daemon = FALSE; + options.local = FALSE; + options.logging = TRUE; /* bind signal handler */ struct sigaction action; sigemptyset(&action.sa_mask); action.sa_flags = 0; action.sa_handler = quit; - sigaction(SIGINT, &action, NULL); + + sigaction(SIGINT, &action, NULL); /* catch ctrl-c from terminal */ + sigaction(SIGHUP, &action, NULL); /* catch hangup signal */ + sigaction(SIGTERM, &action, NULL); /* catch kill signal */ /* initialize adts and apis */ curl_global_init(CURL_GLOBAL_ALL); list_init(&assocs); - - parse_options(argc, argv, &options); /* parse command line arguments */ - parse_channels(options.config, &assocs);/* parse channels from configuration */ + + /* parse command line and file options */ + // TODO command line should have a higher priority as file + parse_options(argc, argv, &options); + parse_configuration(options.config, &assocs, &options); + + options.logging = (!options.local || options.daemon); + + if (!options.foreground && (options.daemon || options.local)) { + print(1, "Daemonize process...", NULL); + daemonize(); + } + + /* open logfile */ + if (options.log) { + FILE *logfd = fopen(options.log, "a"); + + if (logfd) { + options.logfd = logfd; + print(LOG_DEBUG, "Opened logfile %s", NULL, options.log); + } + else { + print(LOG_ERROR, "Cannot open logfile %s: %s", NULL, options.log, strerror(errno)); + exit(EXIT_FAILURE); + } + } + + if (assocs.size == 0) { + print(6, "No meters found!", NULL); + exit(EXIT_FAILURE); + } /* open connection meters & start threads */ foreach(assocs, it) { assoc_t *assoc = (assoc_t *) it->data; meter_t *mtr = &assoc->meter; - + int res = meter_open(mtr); if (res < 0) { + print(LOG_ERROR, "Failed to open meter", mtr); exit(EXIT_FAILURE); } print(5, "Meter connected", mtr); pthread_create(&assoc->thread, NULL, &reading_thread, (void *) assoc); print(5, "Meter thread started", mtr); - + foreach(assoc->channels, it) { channel_t *ch = (channel_t *) it->data; - + /* set buffer length for perriodic meters */ if (mtr->type->periodic && options.local) { - ch->buffer.keep = ceil(BUFFER_KEEP / (double) ch->interval); + ch->buffer.keep = ceil(options.buffer_length / (double) assoc->interval); } - + if (ch->status != RUNNING && options.logging) { pthread_create(&ch->thread, NULL, &logging_thread, (void *) ch); print(5, "Logging thread started", ch); @@ -187,7 +375,7 @@ int main(int argc, char *argv[]) { /* start webserver for local interface */ struct MHD_Daemon *httpd_handle = NULL; if (options.local) { - print(5, "Starting local interface HTTPd on port %i", NULL, options.port); + print(5, "Starting local interface HTTPd on port %i", "http", options.port); httpd_handle = MHD_start_daemon( MHD_USE_THREAD_PER_CONNECTION, options.port, @@ -202,27 +390,27 @@ int main(int argc, char *argv[]) { foreach(assocs, it) { assoc_t *assoc = (assoc_t *) it->data; meter_t *mtr = &assoc->meter; - + pthread_join(assoc->thread, NULL); - + foreach(assoc->channels, it) { channel_t *ch = (channel_t *) it->data; - + pthread_join(ch->thread, NULL); - + channel_free(ch); } - + list_free(&assoc->channels); meter_close(mtr); /* closing connection */ meter_free(mtr); } - + #ifdef LOCAL_SUPPORT /* stop webserver */ if (httpd_handle) { - print(8, "Stopping local interface HTTPd on port %i", NULL, options.port); + print(8, "Stopping local interface HTTPd", "http"); MHD_stop_daemon(httpd_handle); } #endif /* LOCAL_SUPPORT */ @@ -230,7 +418,9 @@ int main(int argc, char *argv[]) { /* householding */ list_free(&assocs); curl_global_cleanup(); - - print(10, "Bye bye!", NULL); + if (options.logfd) { + fclose(options.logfd); + } + return EXIT_SUCCESS; } diff --git a/include/d0.h b/include/d0.h index 7577247..5aaa576 100644 --- a/include/d0.h +++ b/include/d0.h @@ -29,6 +29,8 @@ #ifndef _D0_H_ #define _D0_H_ +#define D0_BUFFER_LENGTH 1024 + #include typedef struct { diff --git a/include/meter.h b/include/meter.h index ac3fd79..16d504d 100644 --- a/include/meter.h +++ b/include/meter.h @@ -110,7 +110,7 @@ double tvtod(struct timeval tv); * @param type the type it should be initialized with * @param connection type specific initialization arguments (connection settings, port, etc..) */ -void meter_init(meter_t *mtr, meter_type_t *type, const char *connection); +void meter_init(meter_t *mtr, const meter_type_t *type, const char *connection); /** * Freeing all memory which has been allocated during the initialization diff --git a/include/s0.h b/include/s0.h index 7eb533f..5b1a3b4 100644 --- a/include/s0.h +++ b/include/s0.h @@ -27,10 +27,11 @@ #define _S0_H_ #include +#include typedef struct { - int fd; /* file descriptor of port */ - struct termios oldtio; /* required to reset port */ + int fd; /* file descriptor of port */ + struct termios oldtio; /* required to reset port */ } meter_handle_s0_t; struct meter; /* forward declaration */ diff --git a/src/d0.c b/src/d0.c index 0110106..798db9a 100644 --- a/src/d0.c +++ b/src/d0.c @@ -34,34 +34,31 @@ #include #include #include +#include #include "meter.h" +#include "obis.h" #include "d0.h" int meter_open_d0(meter_t *mtr) { meter_handle_d0_t *handle = &mtr->handle.d0; struct termios tio; - + memset(&tio, 0, sizeof(tio)); /* open serial port */ - handle->fd = open(mtr->connection, O_RDWR); // | O_NONBLOCK); + handle->fd = open(mtr->connection, O_RDWR); if (handle->fd < 0) { return -1; } - // TODO save oldtio - tio.c_iflag = 0; tio.c_oflag = 0; - tio.c_cflag = CS7|CREAD|CLOCAL; // 7n1, see termios.h for more information + tio.c_cflag = B9600 | CS7 | CREAD | CLOCAL; tio.c_lflag = 0; tio.c_cc[VMIN] = 1; - tio.c_cc[VTIME] = 5; - - cfsetospeed(&tio, B9600); // 9600 baud - cfsetispeed(&tio, B9600); // 9600 baud + tio.c_cc[VTIME] = 20; return 0; } @@ -73,10 +70,53 @@ void meter_close_d0(meter_t *mtr) { } size_t meter_read_d0(meter_t *mtr, reading_t rds[], size_t n) { - // TODO implement - rds->value = 123.456; - gettimeofday(&rds->time, NULL); + meter_handle_d0_t *handle = &mtr->handle.d0; + + struct timeval time; + enum { IDENTIFICATION, OBIS, VALUE, UNIT } context; + + char identification[20]; /* 3 vendor + 1 baudrate + 16 meter specific */ + char line[78]; /* 16 obis + '(' + 32 value + '*' + 16 unit + ')' + '\r' + '\n' */ + char byte; + + int fd = handle->fd; + int i, j, m; + + /* wait for identification */ + while (read(fd, &byte, 1)) { + if (byte == '/') { /* start of identification */ + for (i = 0; i < 16 && byte != '\r'; i++) { + read(fd, &byte, 1); + identification[i] = byte; + } + identification[i] = '\0'; + break; + } + } + + /* debug */ + printf("got message from %s\n", identification); + + /* take timestamp */ + gettimeofday(&time, NULL); + + /* read data lines: "obis(value*unit)" */ + m = 0; + while (m < n && byte != '!') { + + meter_d0_parse_line(&rds[m], line, j); + } return 1; } +int meter_d0_parse_line(reading_t &rd, char *line, size_t n) { + char id[16]; + char value[32]; + char unit[16]; + + rds[m].time = time; /* use timestamp of data block arrival for all readings */ + rds[m].value = ; + rds[m].identifier.obis = ; +} + diff --git a/src/meter.c b/src/meter.c index bbe1913..a11d59d 100644 --- a/src/meter.c +++ b/src/meter.c @@ -46,8 +46,8 @@ const meter_type_t meter_types[] = { double tvtod(struct timeval tv) { return tv.tv_sec + tv.tv_usec / 1e6; } - -void meter_init(meter_t *mtr, meter_type_t *type, const char *connection) { + +void meter_init(meter_t *mtr, const meter_type_t *type, const char *connection) { static int instances; /* static to generate channel ids */ snprintf(mtr->id, 5, "mtr%i", instances++); diff --git a/src/s0.c b/src/s0.c index 34c8174..084e7c8 100644 --- a/src/s0.c +++ b/src/s0.c @@ -39,29 +39,33 @@ int meter_open_s0(meter_t *mtr) { meter_handle_s0_t *handle = &mtr->handle.s0; /* open port */ - handle->fd = open(mtr->connection, O_RDWR | O_NOCTTY); + int fd = open(mtr->connection, O_RDWR | O_NOCTTY); - if (handle->fd < 0) { + if (fd < 0) { + perror(mtr->connection); return -1; } /* save current port settings */ - tcgetattr(handle->fd, &handle->oldtio); + tcgetattr(fd, &handle->oldtio); /* configure port */ struct termios tio; memset(&tio, 0, sizeof(struct termios)); - + tio.c_cflag = B300 | CS8 | CLOCAL | CREAD; - tio.c_iflag = IGNPAR; - tio.c_oflag = 0; - tio.c_lflag = 0; /* set input mode (non-canonical, no echo,...) */ - tio.c_cc[VTIME] = 0; /* inter-character timer unused */ - tio.c_cc[VMIN] = 1; /* blocking read until data is received */ - + tio.c_iflag = IGNPAR; + tio.c_oflag = 0; + tio.c_lflag = 0; + tio.c_cc[VMIN]=1; + tio.c_cc[VTIME]=0; + + tcflush(fd, TCIFLUSH); + /* apply configuration */ - tcsetattr(handle->fd, TCSANOW, &tio); - + tcsetattr(fd, TCSANOW, &tio); + handle->fd = fd; + return 0; } @@ -77,23 +81,21 @@ void meter_close_s0(meter_t *mtr) { size_t meter_read_s0(meter_t *mtr, reading_t rds[], size_t n) { meter_handle_s0_t *handle = &mtr->handle.s0; - char buf[8]; - - rds->value = 1; - + /* clear input buffer */ tcflush(handle->fd, TCIOFLUSH); - + /* blocking until one character/pulse is read */ read(handle->fd, buf, 8); - + /* store current timestamp */ gettimeofday(&rds->time, NULL); - + rds->value = 1; + /* wait some ms for debouncing */ usleep(30000); - + return 1; }