diff --git a/misc/controller/vzlogger/src/api.c b/misc/controller/vzlogger/src/api.c index 344564a..1eb72c8 100644 --- a/misc/controller/vzlogger/src/api.c +++ b/misc/controller/vzlogger/src/api.c @@ -26,7 +26,7 @@ #include #include #include -#include +#include #include "api.h" #include "main.h" @@ -68,97 +68,169 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_ size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; - curl_response_t *response = (curl_response_t *) data; + CURLresponse *response = (CURLresponse *) data; response->data = realloc(response->data, response->size + realsize + 1); if (response->data == NULL) { /* out of memory! */ - print(-1, "Not enough memory (realloc returned NULL)\n", NULL); + print(-1, "Not enough memory", NULL); exit(EXIT_FAILURE); } memcpy(&(response->data[response->size]), ptr, realsize); response->size += realsize; response->data[response->size] = 0; + + print(1, "Addr: %lu", NULL, &(response->data)); return realsize; } -/** - * Log against the vz.org middleware with simple HTTP requests via CURL - */ -CURLcode api_log(channel_t *ch, reading_t rd) { +json_object * api_build_json(channel_t *ch) { + reading_t rd; + + json_object *json_obj = json_object_new_object(); + json_object *json_tuples = json_object_new_array(); + + for (int j = 0; j < ch->queue.size; j++) { + queue_deque(&ch->queue, &rd); + + if (rd.tv.tv_sec) { /* skip empty readings */ + json_object *json_tuple = json_object_new_array(); + json_object_array_add(json_tuple, json_object_new_int(rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000)); + json_object_array_add(json_tuple, json_object_new_double(rd.value)); + json_object_array_add(json_tuples, json_tuple); + } + } + + json_object_object_add(json_obj, "tuples", json_tuples); + + return json_obj; +} + +CURL * api_curl_init(channel_t *ch) { CURL *curl; - CURLcode rc = -1; - int curl_code; - curl_response_t chunk = {NULL, 0}; - - char url[255], useragent[255], post[255]; - /* build request strings */ - sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */ - sprintf(useragent, "vzlogger/%s (%s)", "VZ_VERSION", curl_version()); - sprintf(post, "?timestamp=%lu%lu&value=%f", rd.tv.tv_sec, rd.tv.tv_usec / 1000, rd.value); - + char buffer[255]; + curl = curl_easy_init(); + if (!curl) { + print(-1, "Cannot create curl handle", ch); + exit(EXIT_FAILURE); + } - if (curl) { - curl_easy_setopt(curl, CURLOPT_URL, url); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post); - curl_easy_setopt(curl, CURLOPT_USERAGENT, useragent); - curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) opts.verbose); - curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback); - curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk); - - print(1, "Sending request: %s%s", ch, url, post); - - rc = curl_easy_perform(curl); - - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code); - - print((curl_code == 200) ? 1 : -1, "Request %s with code: %i", ch, (curl_code == 200) ? "succeded" : "failed", curl_code); - - if (rc != CURLE_OK) { - print(-1, "CURL error: %s", ch, curl_easy_strerror(rc)); - } - else if (chunk.size == 0 || chunk.data == NULL) { - print(-1, "No data received!", ch); - rc = -1; - } - else if (curl_code != 200) { /* parse exception */ - struct json_tokener * json_tok; - struct json_object * json_obj; + sprintf(buffer, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */ + curl_easy_setopt(curl, CURLOPT_URL, buffer); - json_tok = json_tokener_new(); - json_obj = json_tokener_parse_ex(json_tok, chunk.data, chunk.size); + sprintf(buffer, "vzlogger/%s (%s)", VZ_VERSION, curl_version()); /* build user agent */ + curl_easy_setopt(curl, CURLOPT_USERAGENT, buffer); + + curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) opts.verbose); + curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback); + curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch); - if (json_tok->err == json_tokener_success) { - json_obj = json_object_object_get(json_obj, "exception"); - - if (json_obj) { - print(-1, "%s : %s", ch, - json_object_get_string(json_object_object_get(json_obj, "type")), - json_object_get_string(json_object_object_get(json_obj, "message")) - ); - } - else { - print(-1, "Malformed middleware response: missing exception", ch); - } - } - else { - print(-1, "Malformed middleware response: %s", ch, json_tokener_errors[json_tok->err]); - } - - rc = -1; + return curl; +} + +char * api_parse_exception(CURLresponse response) { + struct json_tokener * json_tok; + struct json_object * json_obj; + char *errstr; + + json_tok = json_tokener_new(); + json_obj = json_tokener_parse_ex(json_tok, response.data, response.size); + if (json_tok->err == json_tokener_success) { + json_obj = json_object_object_get(json_obj, "exception"); + + if (json_obj) { + errstr = json_object_get_string(json_object_object_get(json_obj, "message")); + } + else { + errstr = "missing exception"; } - - curl_easy_cleanup(curl); /* always cleanup */ - free(chunk.data); /* free response */ } else { - print(-1, "Failed to create CURL handle", ch); + errstr = json_tokener_errors[json_tok->err]; } - return rc; + json_tokener_free(json_tok); + + return errstr; +} + + +/** + * Logging thread + * + * Logs buffered readings against middleware + */ +void *api_thread(void *arg) { + CURL *curl; + channel_t *ch = (channel_t *) arg; /* casting argument */ + + print(1, "Started logging thread", ch); + + curl = api_curl_init(ch); + + do { /* start thread mainloop */ + CURLresponse response; + int curl_code, http_code; + char *json_str; + + /* initialize response */ + response.data = NULL; + response.size = 0; + + //pthread_mutex_lock(&ch->mutex); + //while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */ + // pthread_cond_wait(&ch->condition, &ch->mutex); /* sleep until new data has been read */ + //} + //pthread_mutex_unlock(&ch->mutex); + + //if (opts.verbose > 5) queue_print(&ch->queue); /* Debugging */ + + //pthread_mutex_lock(&ch->mutex); + //json_str = json_object_to_json_string(api_build_json(ch)); + //pthread_mutex_unlock(&ch->mutex); + + //print(1, "JSON body: %s", ch, json_str); + + //curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &response); + + curl_code = curl_easy_perform(curl); + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code); + + print(1, "Addr: %lu", ch, &(response.data)); + print(1, "Response: %s", ch, response.data); + + if (curl_code == CURLE_OK && http_code == 200) { /* everything is ok */ + print(1, "Request succeeded with code: %i", ch, http_code); + + // TODO clear queue + } + else { /* error */ + if (curl_code != CURLE_OK) { + print(-1, "CURL failed: %s", ch, curl_easy_strerror(curl_code)); + } + else if (http_code != 200) { + print(-1, "Invalid middlware response: %s", ch, api_parse_exception(response)); + } + + print(2, "Sleeping %i seconds due to previous failure", ch, RETRY_PAUSE); + sleep(RETRY_PAUSE); + } + + /* householding */ + free(json_str); + // TODO free json objects + + //if (response.data) free(response.data); + + pthread_testcancel(); /* test for cancelation request */ + } while (opts.daemon); + + curl_easy_cleanup(curl); /* always cleanup */ + + return NULL; } diff --git a/misc/controller/vzlogger/src/api.h b/misc/controller/vzlogger/src/api.h index 1fe251f..dfd2fba 100644 --- a/misc/controller/vzlogger/src/api.h +++ b/misc/controller/vzlogger/src/api.h @@ -28,6 +28,7 @@ #include #include +#include #include "main.h" #include "protocol.h" @@ -35,10 +36,13 @@ typedef struct { char *data; size_t size; -} curl_response_t; +} CURLresponse; +/* curl callbacks */ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom); size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data); -CURLcode api_log(channel_t *ch, reading_t read); + +json_object * api_build_json(channel_t *ch); +void *api_thread(void *arg); #endif /* _API_H_ */ diff --git a/misc/controller/vzlogger/src/local.c b/misc/controller/vzlogger/src/local.c index d4b7466..a7dab34 100644 --- a/misc/controller/vzlogger/src/local.c +++ b/misc/controller/vzlogger/src/local.c @@ -52,15 +52,15 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url if (strcmp(url, "/") == 0 || strcmp(ch->uuid, url + 1) == 0) { pthread_mutex_lock(&ch->mutex); - pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data comet-like blocking of HTTP response */ + /* wait for new data comet-like blocking of HTTP response */ + pthread_cond_wait(&ch->condition, &ch->mutex); // TODO use pthread_cond_timedwait() pthread_mutex_unlock(&ch->mutex); struct json_object * json_tuples = json_object_new_array(); - int j = ch->queue.write_p; - do { + for (int j = 0; j < ch->queue.size; j++) { pthread_mutex_lock(&ch->mutex); - rd = ch->queue.buf[j]; + queue_deque(&ch->queue, &rd); pthread_mutex_unlock(&ch->mutex); if (rd.value != 0) { /* skip invalid / empty readings */ @@ -70,14 +70,10 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url json_object_array_add(json_tuple, json_object_new_int(timestamp)); json_object_array_add(json_tuple, json_object_new_double(rd.value)); - json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */ json_object_array_add(json_tuples, json_tuple); } - - j++; - j %= ch->queue.size; - } while (j != ch->queue.read_p); + } json_object_object_add(json_data, "uuid", json_object_new_string(ch->uuid)); json_object_object_add(json_data, "interval", json_object_new_int(ch->interval)); diff --git a/misc/controller/vzlogger/src/main.c b/misc/controller/vzlogger/src/main.c index d582970..f3a2bc3 100644 --- a/misc/controller/vzlogger/src/main.c +++ b/misc/controller/vzlogger/src/main.c @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -267,49 +266,6 @@ int parse_channels(char * filename, channel_t * chans) { return j; } -/** - * Logging thread - * - * Logs buffered readings against middleware - */ -void *log_thread(void *arg) { - channel_t *ch = (channel_t *) arg; /* casting argument */ - reading_t rd; - - CURLcode rc; - - print(1, "Started logging thread", ch); - - do { - pthread_mutex_lock(&ch->mutex); - while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */ - pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data */ - } - pthread_mutex_unlock(&ch->mutex); - - while (!queue_is_empty(&ch->queue)) { - pthread_mutex_lock(&ch->mutex); - queue_first(&ch->queue, &rd); - pthread_mutex_unlock(&ch->mutex); - - rc = api_log(ch, rd); /* log reading */ - - if (rc == CURLE_OK) { - pthread_mutex_lock(&ch->mutex); - queue_deque(&ch->queue, &rd); /* remove reading from buffer */ - pthread_mutex_unlock(&ch->mutex); - } - else { - print(1, "Delaying next transmission for %i seconds due to pervious error", ch, RETRY_PAUSE); - sleep(RETRY_PAUSE); - } - } - pthread_testcancel(); /* test for cancelation request */ - } while (opts.daemon); - - return NULL; -} - /** * Read thread * @@ -369,7 +325,7 @@ int main(int argc, char * argv[]) { /* start threads */ pthread_create(&ch->reading_thread, NULL, read_thread, (void *) ch); - pthread_create(&ch->logging_thread, NULL, log_thread, (void *) ch); + pthread_create(&ch->logging_thread, NULL, api_thread, (void *) ch); } /* start webserver for local interface */ diff --git a/misc/controller/vzlogger/src/main.h b/misc/controller/vzlogger/src/main.h index 8435e3b..3bad60d 100644 --- a/misc/controller/vzlogger/src/main.h +++ b/misc/controller/vzlogger/src/main.h @@ -35,7 +35,7 @@ #define VZ_VERSION "0.2" #define MAX_CHANNELS 16 -#define RETRY_PAUSE 600 /* seconds to wait after failed request */ +#define RETRY_PAUSE 10 //600 /* seconds to wait after failed request */ #define BUFFER_LENGTH 600 /* in seconds */ #ifndef TRUE diff --git a/misc/controller/vzlogger/src/queue.c b/misc/controller/vzlogger/src/queue.c index 49fda5c..1a4feab 100644 --- a/misc/controller/vzlogger/src/queue.c +++ b/misc/controller/vzlogger/src/queue.c @@ -54,7 +54,7 @@ bool_t queue_first(queue_t *q, reading_t *rd) { void queue_print(queue_t *q) { printf("Queue dump: [%.1f", q->buf[0].value); for (int i = 1; i < q->size; i++) { - printf("|%.1f", q->buf[i].value); + printf("|%.2f", q->buf[i].value); } printf("]\n"); }