searching bugs :-S

This commit is contained in:
Steffen Vogel 2011-06-05 13:16:27 +02:00
parent 3dfe790f66
commit 22568636be
6 changed files with 156 additions and 128 deletions

View file

@ -26,7 +26,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <json/json.h>
#include <unistd.h>
#include "api.h"
#include "main.h"
@ -68,97 +68,169 @@ int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data) {
size_t realsize = size * nmemb;
curl_response_t *response = (curl_response_t *) data;
CURLresponse *response = (CURLresponse *) data;
response->data = realloc(response->data, response->size + realsize + 1);
if (response->data == NULL) { /* out of memory! */
print(-1, "Not enough memory (realloc returned NULL)\n", NULL);
print(-1, "Not enough memory", NULL);
exit(EXIT_FAILURE);
}
memcpy(&(response->data[response->size]), ptr, realsize);
response->size += realsize;
response->data[response->size] = 0;
print(1, "Addr: %lu", NULL, &(response->data));
return realsize;
}
/**
* Log against the vz.org middleware with simple HTTP requests via CURL
*/
CURLcode api_log(channel_t *ch, reading_t rd) {
json_object * api_build_json(channel_t *ch) {
reading_t rd;
json_object *json_obj = json_object_new_object();
json_object *json_tuples = json_object_new_array();
for (int j = 0; j < ch->queue.size; j++) {
queue_deque(&ch->queue, &rd);
if (rd.tv.tv_sec) { /* skip empty readings */
json_object *json_tuple = json_object_new_array();
json_object_array_add(json_tuple, json_object_new_int(rd.tv.tv_sec * 1000 + rd.tv.tv_usec / 1000));
json_object_array_add(json_tuple, json_object_new_double(rd.value));
json_object_array_add(json_tuples, json_tuple);
}
}
json_object_object_add(json_obj, "tuples", json_tuples);
return json_obj;
}
CURL * api_curl_init(channel_t *ch) {
CURL *curl;
CURLcode rc = -1;
int curl_code;
curl_response_t chunk = {NULL, 0};
char url[255], useragent[255], post[255];
/* build request strings */
sprintf(url, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */
sprintf(useragent, "vzlogger/%s (%s)", "VZ_VERSION", curl_version());
sprintf(post, "?timestamp=%lu%lu&value=%f", rd.tv.tv_sec, rd.tv.tv_usec / 1000, rd.value);
char buffer[255];
curl = curl_easy_init();
if (!curl) {
print(-1, "Cannot create curl handle", ch);
exit(EXIT_FAILURE);
}
if (curl) {
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post);
curl_easy_setopt(curl, CURLOPT_USERAGENT, useragent);
curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) opts.verbose);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &chunk);
print(1, "Sending request: %s%s", ch, url, post);
rc = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &curl_code);
print((curl_code == 200) ? 1 : -1, "Request %s with code: %i", ch, (curl_code == 200) ? "succeded" : "failed", curl_code);
if (rc != CURLE_OK) {
print(-1, "CURL error: %s", ch, curl_easy_strerror(rc));
}
else if (chunk.size == 0 || chunk.data == NULL) {
print(-1, "No data received!", ch);
rc = -1;
}
else if (curl_code != 200) { /* parse exception */
struct json_tokener * json_tok;
struct json_object * json_obj;
sprintf(buffer, "%s/data/%s.json", ch->middleware, ch->uuid); /* build url */
curl_easy_setopt(curl, CURLOPT_URL, buffer);
json_tok = json_tokener_new();
json_obj = json_tokener_parse_ex(json_tok, chunk.data, chunk.size);
sprintf(buffer, "vzlogger/%s (%s)", VZ_VERSION, curl_version()); /* build user agent */
curl_easy_setopt(curl, CURLOPT_USERAGENT, buffer);
curl_easy_setopt(curl, CURLOPT_VERBOSE, (int) opts.verbose);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, curl_custom_debug_callback);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, (void *) ch);
if (json_tok->err == json_tokener_success) {
json_obj = json_object_object_get(json_obj, "exception");
if (json_obj) {
print(-1, "%s : %s", ch,
json_object_get_string(json_object_object_get(json_obj, "type")),
json_object_get_string(json_object_object_get(json_obj, "message"))
);
}
else {
print(-1, "Malformed middleware response: missing exception", ch);
}
}
else {
print(-1, "Malformed middleware response: %s", ch, json_tokener_errors[json_tok->err]);
}
rc = -1;
return curl;
}
char * api_parse_exception(CURLresponse response) {
struct json_tokener * json_tok;
struct json_object * json_obj;
char *errstr;
json_tok = json_tokener_new();
json_obj = json_tokener_parse_ex(json_tok, response.data, response.size);
if (json_tok->err == json_tokener_success) {
json_obj = json_object_object_get(json_obj, "exception");
if (json_obj) {
errstr = json_object_get_string(json_object_object_get(json_obj, "message"));
}
else {
errstr = "missing exception";
}
curl_easy_cleanup(curl); /* always cleanup */
free(chunk.data); /* free response */
}
else {
print(-1, "Failed to create CURL handle", ch);
errstr = json_tokener_errors[json_tok->err];
}
return rc;
json_tokener_free(json_tok);
return errstr;
}
/**
* Logging thread
*
* Logs buffered readings against middleware
*/
void *api_thread(void *arg) {
CURL *curl;
channel_t *ch = (channel_t *) arg; /* casting argument */
print(1, "Started logging thread", ch);
curl = api_curl_init(ch);
do { /* start thread mainloop */
CURLresponse response;
int curl_code, http_code;
char *json_str;
/* initialize response */
response.data = NULL;
response.size = 0;
//pthread_mutex_lock(&ch->mutex);
//while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */
// pthread_cond_wait(&ch->condition, &ch->mutex); /* sleep until new data has been read */
//}
//pthread_mutex_unlock(&ch->mutex);
//if (opts.verbose > 5) queue_print(&ch->queue); /* Debugging */
//pthread_mutex_lock(&ch->mutex);
//json_str = json_object_to_json_string(api_build_json(ch));
//pthread_mutex_unlock(&ch->mutex);
//print(1, "JSON body: %s", ch, json_str);
//curl_easy_setopt(curl, CURLOPT_POSTFIELDS, json_str);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, curl_custom_write_callback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *) &response);
curl_code = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
print(1, "Addr: %lu", ch, &(response.data));
print(1, "Response: %s", ch, response.data);
if (curl_code == CURLE_OK && http_code == 200) { /* everything is ok */
print(1, "Request succeeded with code: %i", ch, http_code);
// TODO clear queue
}
else { /* error */
if (curl_code != CURLE_OK) {
print(-1, "CURL failed: %s", ch, curl_easy_strerror(curl_code));
}
else if (http_code != 200) {
print(-1, "Invalid middlware response: %s", ch, api_parse_exception(response));
}
print(2, "Sleeping %i seconds due to previous failure", ch, RETRY_PAUSE);
sleep(RETRY_PAUSE);
}
/* householding */
free(json_str);
// TODO free json objects
//if (response.data) free(response.data);
pthread_testcancel(); /* test for cancelation request */
} while (opts.daemon);
curl_easy_cleanup(curl); /* always cleanup */
return NULL;
}

View file

@ -28,6 +28,7 @@
#include <stddef.h>
#include <curl/curl.h>
#include <json/json.h>
#include "main.h"
#include "protocol.h"
@ -35,10 +36,13 @@
typedef struct {
char *data;
size_t size;
} curl_response_t;
} CURLresponse;
/* curl callbacks */
int curl_custom_debug_callback(CURL *curl, curl_infotype type, char *data, size_t size, void *custom);
size_t curl_custom_write_callback(void *ptr, size_t size, size_t nmemb, void *data);
CURLcode api_log(channel_t *ch, reading_t read);
json_object * api_build_json(channel_t *ch);
void *api_thread(void *arg);
#endif /* _API_H_ */

View file

@ -52,15 +52,15 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
if (strcmp(url, "/") == 0 || strcmp(ch->uuid, url + 1) == 0) {
pthread_mutex_lock(&ch->mutex);
pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data comet-like blocking of HTTP response */
/* wait for new data comet-like blocking of HTTP response */
pthread_cond_wait(&ch->condition, &ch->mutex); // TODO use pthread_cond_timedwait()
pthread_mutex_unlock(&ch->mutex);
struct json_object * json_tuples = json_object_new_array();
int j = ch->queue.write_p;
do {
for (int j = 0; j < ch->queue.size; j++) {
pthread_mutex_lock(&ch->mutex);
rd = ch->queue.buf[j];
queue_deque(&ch->queue, &rd);
pthread_mutex_unlock(&ch->mutex);
if (rd.value != 0) { /* skip invalid / empty readings */
@ -70,14 +70,10 @@ int handle_request(void *cls, struct MHD_Connection *connection, const char *url
json_object_array_add(json_tuple, json_object_new_int(timestamp));
json_object_array_add(json_tuple, json_object_new_double(rd.value));
json_object_array_add(json_tuple, json_object_new_int(1)); /* just for middleware compability */
json_object_array_add(json_tuples, json_tuple);
}
j++;
j %= ch->queue.size;
} while (j != ch->queue.read_p);
}
json_object_object_add(json_data, "uuid", json_object_new_string(ch->uuid));
json_object_object_add(json_data, "interval", json_object_new_int(ch->interval));

View file

@ -29,7 +29,6 @@
#include <stdlib.h>
#include <string.h>
#include <curl/curl.h>
#include <unistd.h>
#include <math.h>
#include <stdint.h>
#include <microhttpd.h>
@ -267,49 +266,6 @@ int parse_channels(char * filename, channel_t * chans) {
return j;
}
/**
* Logging thread
*
* Logs buffered readings against middleware
*/
void *log_thread(void *arg) {
channel_t *ch = (channel_t *) arg; /* casting argument */
reading_t rd;
CURLcode rc;
print(1, "Started logging thread", ch);
do {
pthread_mutex_lock(&ch->mutex);
while (queue_is_empty(&ch->queue)) { /* detect spurious wakeups */
pthread_cond_wait(&ch->condition, &ch->mutex); /* wait for new data */
}
pthread_mutex_unlock(&ch->mutex);
while (!queue_is_empty(&ch->queue)) {
pthread_mutex_lock(&ch->mutex);
queue_first(&ch->queue, &rd);
pthread_mutex_unlock(&ch->mutex);
rc = api_log(ch, rd); /* log reading */
if (rc == CURLE_OK) {
pthread_mutex_lock(&ch->mutex);
queue_deque(&ch->queue, &rd); /* remove reading from buffer */
pthread_mutex_unlock(&ch->mutex);
}
else {
print(1, "Delaying next transmission for %i seconds due to pervious error", ch, RETRY_PAUSE);
sleep(RETRY_PAUSE);
}
}
pthread_testcancel(); /* test for cancelation request */
} while (opts.daemon);
return NULL;
}
/**
* Read thread
*
@ -369,7 +325,7 @@ int main(int argc, char * argv[]) {
/* start threads */
pthread_create(&ch->reading_thread, NULL, read_thread, (void *) ch);
pthread_create(&ch->logging_thread, NULL, log_thread, (void *) ch);
pthread_create(&ch->logging_thread, NULL, api_thread, (void *) ch);
}
/* start webserver for local interface */

View file

@ -35,7 +35,7 @@
#define VZ_VERSION "0.2"
#define MAX_CHANNELS 16
#define RETRY_PAUSE 600 /* seconds to wait after failed request */
#define RETRY_PAUSE 10 //600 /* seconds to wait after failed request */
#define BUFFER_LENGTH 600 /* in seconds */
#ifndef TRUE

View file

@ -54,7 +54,7 @@ bool_t queue_first(queue_t *q, reading_t *rd) {
void queue_print(queue_t *q) {
printf("Queue dump: [%.1f", q->buf[0].value);
for (int i = 1; i < q->size; i++) {
printf("|%.1f", q->buf[i].value);
printf("|%.2f", q->buf[i].value);
}
printf("]\n");
}