diff --git a/bin/logger/include/channel.h b/bin/logger/include/channel.h index ce2b9b4..ba0bc92 100644 --- a/bin/logger/include/channel.h +++ b/bin/logger/include/channel.h @@ -36,6 +36,7 @@ typedef struct channel { char id[5]; /* only for internal usage & debugging */ reading_id_t identifier; /* channel identifier (OBIS, string) */ + reading_t last; /* most recent reading */ buffer_t buffer; /* circular queue to buffer readings */ pthread_cond_t condition; /* pthread syncronization to notify logging thread and local webserver */ @@ -44,15 +45,10 @@ typedef struct channel { char *middleware; /* url to middleware */ char *uuid; /* unique identifier for middleware */ - - double last; /* last counter value */ - int counter:1; /* TRUE if we want to send the diffrence between to values */ } channel_t; /* prototypes */ -void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier, int counter); +void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier); void channel_free(channel_t *ch); -reading_t * channel_add_readings(channel_t *ch, meter_protocol_t protocol, reading_t *rds, size_t n); - #endif /* _CHANNEL_H_ */ diff --git a/bin/logger/include/config.h b/bin/logger/include/config.h index c168e84..1ece813 100644 --- a/bin/logger/include/config.h +++ b/bin/logger/include/config.h @@ -86,7 +86,7 @@ int config_validate_uuid(const char *uuid); */ int config_parse(const char *filename, list_t *mappings, config_options_t *options); -struct channel * config_parse_channel(struct json_object *jso); +struct channel * config_parse_channel(struct json_object *jso, meter_protocol_t protocol); struct map * config_parse_meter(struct json_object *jso); diff --git a/bin/logger/src/channel.c b/bin/logger/src/channel.c index 34008c0..5ed8fef 100644 --- a/bin/logger/src/channel.c +++ b/bin/logger/src/channel.c @@ -30,14 +30,12 @@ #include "channel.h" -void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier, int counter) { +void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier) { static int instances; /* static to generate channel ids */ snprintf(ch->id, 5, "ch%i", instances++); ch->identifier = identifier; ch->status = status_unknown; - ch->counter = counter; - ch->last = 0; ch->uuid = strdup(uuid); ch->middleware = strdup(middleware); @@ -57,48 +55,3 @@ void channel_free(channel_t *ch) { free(ch->middleware); } -int channel_compare_identifier(meter_protocol_t protocol, reading_id_t a, reading_id_t b) { - switch (protocol) { - case meter_protocol_d0: - case meter_protocol_sml: - /* intelligent protocols require matching ids */ - return obis_compare(a.obis, b.obis); - - case meter_protocol_file: - case meter_protocol_exec: - return strcmp(a.string, b.string); - break; - default: - /* no channel identifier, adding all readings to buffer */ - return 0; /* equal */ - } -} - -reading_t * channel_add_readings(channel_t *ch, meter_protocol_t protocol, reading_t *rds, size_t n) { - reading_t *first = NULL; /* first unsent reading which has been added */ - - double delta; - - for (int i = 0; i < n; i++) { - if (channel_compare_identifier(protocol, rds[i].identifier, ch->identifier) == 0) { - - /* ignoring first reading when no preceeding reading is available (no consumption) */ - delta = (ch->last == 0) ? 0 : rds[i].value - ch->last; - ch->last = rds[i].value; - - print(log_info, "Adding reading to queue (value=%.2f delta=%.2f ts=%.3f)", ch, rds[i].value, delta, tvtod(rds[i].time)); - - if (ch->counter) { /* send relative consumption since last reading */ - rds[i].value = delta; - } - - reading_t *added = buffer_push(&ch->buffer, &rds[i]); /* remember last value to calculate relative consumption */ - - if (first == NULL) { - first = added; - } - } - } - - return first; -} diff --git a/bin/logger/src/local.c b/bin/logger/src/local.c index df3c952..90015c5 100644 --- a/bin/logger/src/local.c +++ b/bin/logger/src/local.c @@ -93,7 +93,7 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url json_object_object_add(json_ch, "uuid", json_object_new_string(ch->uuid)); json_object_object_add(json_ch, "middleware", json_object_new_string(ch->middleware)); - json_object_object_add(json_ch, "last", json_object_new_double(ch->last)); + json_object_object_add(json_ch, "last", json_object_new_double(ch->last.value)); json_object_object_add(json_ch, "interval", json_object_new_int(mapping->meter.interval)); json_object_object_add(json_ch, "protocol", json_object_new_string(meter_get_details(mapping->meter.protocol)->name)); diff --git a/bin/logger/src/threads.c b/bin/logger/src/threads.c index 7a32ed5..7ee8607 100644 --- a/bin/logger/src/threads.c +++ b/bin/logger/src/threads.c @@ -56,7 +56,7 @@ void * reading_thread(void *arg) { pthread_cleanup_push(&reading_thread_cleanup, rds); do { /* start thread main loop */ - /* fetch readings from meter and measure interval */ + /* fetch readings from meter and calculate delta */ last = time(NULL); n = meter_read(mtr, rds, details->max_readings); delta = time(NULL) - last; @@ -64,48 +64,49 @@ void * reading_thread(void *arg) { /* dumping meter output */ if (options.verbosity > log_debug) { print(log_debug, "Got %i new readings from meter:", mtr, n); - char identifier[255]; // TODO choose correct length + + char identifier[MAX_IDENTIFIER_LEN]; for (int i = 0; i < n; i++) { - switch (mtr->protocol) { - case meter_protocol_d0: - case meter_protocol_sml: - obis_unparse(rds[i].identifier.obis, identifier, 255); - break; - - case meter_protocol_file: - case meter_protocol_exec: - if (rds[i].identifier.string != NULL) { - strncpy(identifier, rds[i].identifier.string, 255); - break; - } - - default: - identifier[0] = '\0'; - } - + reading_id_unparse(mtr->protocol, rds[i].identifier, identifier, MAX_IDENTIFIER_LEN); print(log_debug, "Reading: id=%s value=%.2f ts=%.3f", mtr, identifier, rds[i].value, tvtod(rds[i].time)); } } /* update buffer length with current interval */ - if (!details->periodic && mtr->interval != delta) { + if (details->periodic == FALSE && delta > 0 && delta != mtr->interval) { print(log_debug, "Updating interval to %i", mtr, delta); mtr->interval = delta; } + /* insert readings into channel queues */ foreach(mapping->channels, ch, channel_t) { buffer_t *buf = &ch->buffer; - reading_t *added = channel_add_readings(ch, mtr->protocol, rds, n); + reading_t *add = NULL; - /* update buffer length to interval */ + for (int i = 0; i < n; i++) { + if (reading_id_compare(mtr->protocol, rds[i].identifier, ch->identifier) == 0) { + if (tvtod(ch->last.time) < tvtod(rds[i].time)) { + ch->last = rds[i]; + } + + print(log_info, "Adding reading to queue (value=%.2f ts=%.3f)", ch, rds[i].value, tvtod(rds[i].time)); + reading_t *rd = buffer_push(&ch->buffer, &rds[i]); + + if (add == NULL) { + add = rd; /* remember first reading which has been added to the buffer */ + } + } + } + + /* update buffer length */ if (options.local) { - buf->keep = ceil(options.buffer_length / mtr->interval); + buf->keep = (mtr->interval > 0) ? ceil(options.buffer_length / mtr->interval) : 0; } /* queue reading into sending buffer logging thread if logging is enabled & sent queue is empty */ if (options.logging && buf->sent == NULL) { - buf->sent = added; + buf->sent = add; } /* shrink buffer */