diff --git a/include/villas/nodes/ngsi.hpp b/include/villas/nodes/ngsi.hpp
index 922e89d7b..a8557dd52 100644
--- a/include/villas/nodes/ngsi.hpp
+++ b/include/villas/nodes/ngsi.hpp
@@ -62,9 +62,8 @@ struct ngsi {
struct curl_slist *headers; /**< List of HTTP request headers for libcurl */
- CURL *curl; /**< libcurl: handle */
-
struct {
+ CURL *curl; /**< libcurl: handle */
struct vlist signals; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */
} in, out;
};
diff --git a/lib/nodes/ngsi.cpp b/lib/nodes/ngsi.cpp
index 2275ccc4b..e9144b224 100644
--- a/lib/nodes/ngsi.cpp
+++ b/lib/nodes/ngsi.cpp
@@ -13,20 +13,23 @@
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
- * along with this program. If not, see .
+ * along with this program. If not, see .
**********************************************************************************/
#include
#include
-#include
+#include
#include
#include
-#include
+#include
+#include
+#include
+#include
#include
#include
@@ -39,14 +42,43 @@ using namespace villas;
using namespace villas::utils;
/* Some global settings */
-static char *name = nullptr;
+
+#if OPENSSL_VERSION_NUMBER < 0x010003000L
+ // See: https://curl.haxx.se/libcurl/c/opensslthreadlock.html
+ #define CURL_SSL_REQUIRES_LOCKING
+#endif
+
+#ifdef CURL_SSL_REQUIRES_LOCKING
+/** This array will store all of the mutexes available to OpenSSL. */
+static pthread_mutex_t *mutex_buf = NULL;
+
+void handle_error(const char *file, int lineno, const char *msg)
+{
+ error("** %s:%d %s\n", file, lineno, msg);
+ ERR_print_errors_fp(stderr);
+ /* exit(-1); */
+}
+
+static void curl_ssl_locking_function(int mode, int n, const char *file, int line)
+{
+ if (mode & CRYPTO_LOCK)
+ pthread_mutex_lock(&mutex_buf[n]);
+ else
+ pthread_mutex_unlock(&mutex_buf[n]);
+}
+
+static unsigned long curl_ssl_thread_id_function(void)
+{
+ return ((unsigned long) pthread_self());
+}
+#endif /* CURL_SSL_REQUIRES_LOCKING */
enum NgsiFlags {
- NGSI_ENTITY_ATTRIBUTES_IN = (1 << 0),
+ NGSI_ENTITY_ATTRIBUTES_IN = (1 << 0),
NGSI_ENTITY_ATTRIBUTES_OUT = (1 << 1),
- NGSI_ENTITY_ATTRIBUTES = NGSI_ENTITY_ATTRIBUTES_IN | NGSI_ENTITY_ATTRIBUTES_OUT,
- NGSI_ENTITY_VALUES = (1 << 2),
- NGSI_ENTITY_METADATA = (1 << 3),
+ NGSI_ENTITY_ATTRIBUTES = NGSI_ENTITY_ATTRIBUTES_IN | NGSI_ENTITY_ATTRIBUTES_OUT,
+ NGSI_ENTITY_VALUES = (1 << 2),
+ NGSI_ENTITY_METADATA = (1 << 3),
};
class NgsiMetadata {
@@ -185,8 +217,8 @@ public:
for (auto &meta : metadata) {
json_array_append_new(json_metadatas, json_pack("{ s: s, s: s, s: s }",
- "name", meta.name.c_str(),
- "type", meta.type.c_str(),
+ "name", meta.name.c_str(),
+ "type", meta.type.c_str(),
"value", meta.value.c_str()
));
}
@@ -367,7 +399,7 @@ static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample
#ifdef NGSI_VECTORS
smp->flags |= (int) (SampleFlags::HAS_SEQUENCE |
- SampleFlags::HAS_TS_ORIGIN);
+ SampleFlags::HAS_TS_ORIGIN);
#endif
}
@@ -490,8 +522,10 @@ static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t
json_t *json_request = json_pack("{ s: [ o ] }", "entities", json_entity);
ret = ngsi_request(handle, endpoint, "queryContext", json_request, &json_response);
- if (ret)
+ if (ret) {
+ ret = -1;
goto out;
+ }
ret = ngsi_parse_context_response(json_response, &code, &reason, json_rentity);
if (ret)
@@ -532,12 +566,37 @@ out: json_decref(json_request);
int ngsi_type_start(villas::node::SuperNode *sn)
{
+#ifdef CURL_SSL_REQUIRES_LOCKING
+ mutex_buf = new pthread_mutex_t[CRYPTO_num_locks()];
+ if (!mutex_buf)
+ return -1;
+
+ for (int i = 0; i < CRYPTO_num_locks(); i++)
+ pthread_mutex_init(&mutex_buf[i], nullptr);
+
+ CRYPTO_set_id_callback(curl_ssl_thread_id_function);
+ CRYPTO_set_locking_callback(curl_ssl_locking_function);
+
+ info("Setup libcurl/openssl locking primitives");
+#endif /* CURL_SSL_REQUIRES_LOCKING */
+
return curl_global_init(CURL_GLOBAL_ALL);
}
int ngsi_type_stop()
{
- free(name);
+#ifdef CURL_SSL_REQUIRES_LOCKING
+ if (!mutex_buf)
+ return -1;
+
+ CRYPTO_set_id_callback(NULL);
+ CRYPTO_set_locking_callback(NULL);
+
+ for (int i = 0; i < CRYPTO_num_locks(); i++)
+ pthread_mutex_destroy(&mutex_buf[i]);
+
+ delete mutex_buf;
+#endif /* CURL_SSL_REQUIRES_LOCKING */
curl_global_cleanup();
@@ -605,7 +664,8 @@ int ngsi_start(struct node *n)
struct ngsi *i = (struct ngsi *) n->_vd;
int ret;
- i->curl = curl_easy_init();
+ i->in.curl = curl_easy_init();
+ i->out.curl = curl_easy_init();
i->headers = nullptr;
if (i->access_token) {
@@ -623,16 +683,20 @@ int ngsi_start(struct node *n)
i->headers = curl_slist_append(i->headers, "Accept: application/json");
i->headers = curl_slist_append(i->headers, "Content-Type: application/json");
- curl_easy_setopt(i->curl, CURLOPT_SSL_VERIFYPEER, i->ssl_verify);
- curl_easy_setopt(i->curl, CURLOPT_TIMEOUT_MS, i->timeout * 1e3);
- curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers);
- curl_easy_setopt(i->curl, CURLOPT_USERAGENT, USER_AGENT);
+ CURL *handles[] = { i->in.curl, i->out.curl };
+
+ for (unsigned p = 0; p < ARRAY_LEN(handles); p++) {
+ curl_easy_setopt(handles[p], CURLOPT_SSL_VERIFYPEER, i->ssl_verify);
+ curl_easy_setopt(handles[p], CURLOPT_TIMEOUT_MS, i->timeout * 1e3);
+ curl_easy_setopt(handles[p], CURLOPT_HTTPHEADER, i->headers);
+ curl_easy_setopt(handles[p], CURLOPT_USERAGENT, USER_AGENT);
+ }
/* Create entity and atributes */
if (i->create) {
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_ATTRIBUTES | NGSI_ENTITY_METADATA);
- ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", json_entity);
+ ret = ngsi_request_context_update(i->out.curl, i->endpoint, "APPEND", json_entity);
if (ret)
throw RuntimeError("Failed to create NGSI context for node {}", node_name(n));
@@ -652,11 +716,12 @@ int ngsi_stop(struct node *n)
/* Delete complete entity (not just attributes) */
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);
- ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", json_entity);
+ ret = ngsi_request_context_update(i->out.curl, i->endpoint, "DELETE", json_entity);
json_decref(json_entity);
- curl_easy_cleanup(i->curl);
+ curl_easy_cleanup(i->in.curl);
+ curl_easy_cleanup(i->out.curl);
curl_slist_free_all(i->headers);
return ret;
@@ -673,7 +738,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
json_t *json_rentity;
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);
- ret = ngsi_request_context_query(i->curl, i->endpoint, json_entity, &json_rentity);
+ ret = ngsi_request_context_query(i->in.curl, i->endpoint, json_entity, &json_rentity);
if (ret)
goto out;
@@ -694,7 +759,7 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
json_t *json_entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_ATTRIBUTES_OUT | NGSI_ENTITY_VALUES);
- ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", json_entity);
+ ret = ngsi_request_context_update(i->out.curl, i->endpoint, "UPDATE", json_entity);
json_decref(json_entity);