last changes for new flukso protocol

This commit is contained in:
Steffen Vogel 2012-01-19 18:07:40 +01:00
parent a88ffd26d9
commit 75457b8ef5
5 changed files with 30 additions and 80 deletions

View file

@ -36,6 +36,7 @@ typedef struct channel {
char id[5]; /* only for internal usage & debugging */
reading_id_t identifier; /* channel identifier (OBIS, string) */
reading_t last; /* most recent reading */
buffer_t buffer; /* circular queue to buffer readings */
pthread_cond_t condition; /* pthread syncronization to notify logging thread and local webserver */
@ -44,15 +45,10 @@ typedef struct channel {
char *middleware; /* url to middleware */
char *uuid; /* unique identifier for middleware */
double last; /* last counter value */
int counter:1; /* TRUE if we want to send the diffrence between to values */
} channel_t;
/* prototypes */
void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier, int counter);
void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier);
void channel_free(channel_t *ch);
reading_t * channel_add_readings(channel_t *ch, meter_protocol_t protocol, reading_t *rds, size_t n);
#endif /* _CHANNEL_H_ */

View file

@ -86,7 +86,7 @@ int config_validate_uuid(const char *uuid);
*/
int config_parse(const char *filename, list_t *mappings, config_options_t *options);
struct channel * config_parse_channel(struct json_object *jso);
struct channel * config_parse_channel(struct json_object *jso, meter_protocol_t protocol);
struct map * config_parse_meter(struct json_object *jso);

View file

@ -30,14 +30,12 @@
#include "channel.h"
void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier, int counter) {
void channel_init(channel_t *ch, const char *uuid, const char *middleware, reading_id_t identifier) {
static int instances; /* static to generate channel ids */
snprintf(ch->id, 5, "ch%i", instances++);
ch->identifier = identifier;
ch->status = status_unknown;
ch->counter = counter;
ch->last = 0;
ch->uuid = strdup(uuid);
ch->middleware = strdup(middleware);
@ -57,48 +55,3 @@ void channel_free(channel_t *ch) {
free(ch->middleware);
}
int channel_compare_identifier(meter_protocol_t protocol, reading_id_t a, reading_id_t b) {
switch (protocol) {
case meter_protocol_d0:
case meter_protocol_sml:
/* intelligent protocols require matching ids */
return obis_compare(a.obis, b.obis);
case meter_protocol_file:
case meter_protocol_exec:
return strcmp(a.string, b.string);
break;
default:
/* no channel identifier, adding all readings to buffer */
return 0; /* equal */
}
}
reading_t * channel_add_readings(channel_t *ch, meter_protocol_t protocol, reading_t *rds, size_t n) {
reading_t *first = NULL; /* first unsent reading which has been added */
double delta;
for (int i = 0; i < n; i++) {
if (channel_compare_identifier(protocol, rds[i].identifier, ch->identifier) == 0) {
/* ignoring first reading when no preceeding reading is available (no consumption) */
delta = (ch->last == 0) ? 0 : rds[i].value - ch->last;
ch->last = rds[i].value;
print(log_info, "Adding reading to queue (value=%.2f delta=%.2f ts=%.3f)", ch, rds[i].value, delta, tvtod(rds[i].time));
if (ch->counter) { /* send relative consumption since last reading */
rds[i].value = delta;
}
reading_t *added = buffer_push(&ch->buffer, &rds[i]); /* remember last value to calculate relative consumption */
if (first == NULL) {
first = added;
}
}
}
return first;
}

View file

@ -93,7 +93,7 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
json_object_object_add(json_ch, "uuid", json_object_new_string(ch->uuid));
json_object_object_add(json_ch, "middleware", json_object_new_string(ch->middleware));
json_object_object_add(json_ch, "last", json_object_new_double(ch->last));
json_object_object_add(json_ch, "last", json_object_new_double(ch->last.value));
json_object_object_add(json_ch, "interval", json_object_new_int(mapping->meter.interval));
json_object_object_add(json_ch, "protocol", json_object_new_string(meter_get_details(mapping->meter.protocol)->name));

View file

@ -56,7 +56,7 @@ void * reading_thread(void *arg) {
pthread_cleanup_push(&reading_thread_cleanup, rds);
do { /* start thread main loop */
/* fetch readings from meter and measure interval */
/* fetch readings from meter and calculate delta */
last = time(NULL);
n = meter_read(mtr, rds, details->max_readings);
delta = time(NULL) - last;
@ -64,48 +64,49 @@ void * reading_thread(void *arg) {
/* dumping meter output */
if (options.verbosity > log_debug) {
print(log_debug, "Got %i new readings from meter:", mtr, n);
char identifier[255]; // TODO choose correct length
char identifier[MAX_IDENTIFIER_LEN];
for (int i = 0; i < n; i++) {
switch (mtr->protocol) {
case meter_protocol_d0:
case meter_protocol_sml:
obis_unparse(rds[i].identifier.obis, identifier, 255);
break;
case meter_protocol_file:
case meter_protocol_exec:
if (rds[i].identifier.string != NULL) {
strncpy(identifier, rds[i].identifier.string, 255);
break;
}
default:
identifier[0] = '\0';
}
reading_id_unparse(mtr->protocol, rds[i].identifier, identifier, MAX_IDENTIFIER_LEN);
print(log_debug, "Reading: id=%s value=%.2f ts=%.3f", mtr, identifier, rds[i].value, tvtod(rds[i].time));
}
}
/* update buffer length with current interval */
if (!details->periodic && mtr->interval != delta) {
if (details->periodic == FALSE && delta > 0 && delta != mtr->interval) {
print(log_debug, "Updating interval to %i", mtr, delta);
mtr->interval = delta;
}
/* insert readings into channel queues */
foreach(mapping->channels, ch, channel_t) {
buffer_t *buf = &ch->buffer;
reading_t *added = channel_add_readings(ch, mtr->protocol, rds, n);
reading_t *add = NULL;
/* update buffer length to interval */
for (int i = 0; i < n; i++) {
if (reading_id_compare(mtr->protocol, rds[i].identifier, ch->identifier) == 0) {
if (tvtod(ch->last.time) < tvtod(rds[i].time)) {
ch->last = rds[i];
}
print(log_info, "Adding reading to queue (value=%.2f ts=%.3f)", ch, rds[i].value, tvtod(rds[i].time));
reading_t *rd = buffer_push(&ch->buffer, &rds[i]);
if (add == NULL) {
add = rd; /* remember first reading which has been added to the buffer */
}
}
}
/* update buffer length */
if (options.local) {
buf->keep = ceil(options.buffer_length / mtr->interval);
buf->keep = (mtr->interval > 0) ? ceil(options.buffer_length / mtr->interval) : 0;
}
/* queue reading into sending buffer logging thread if
logging is enabled & sent queue is empty */
if (options.logging && buf->sent == NULL) {
buf->sent = added;
buf->sent = add;
}
/* shrink buffer */