mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
ngsi: fix segfaults caused by missing multithreading primitives
This commit is contained in:
parent
88de719edf
commit
bbcfab1721
2 changed files with 90 additions and 26 deletions
|
@ -62,9 +62,8 @@ struct ngsi {
|
|||
|
||||
struct curl_slist *headers; /**< List of HTTP request headers for libcurl */
|
||||
|
||||
CURL *curl; /**< libcurl: handle */
|
||||
|
||||
struct {
|
||||
CURL *curl; /**< libcurl: handle */
|
||||
struct vlist signals; /**< A mapping between indices of the VILLASnode samples and the attributes in ngsi::context */
|
||||
} in, out;
|
||||
};
|
||||
|
|
|
@ -13,20 +13,23 @@
|
|||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
**********************************************************************************/
|
||||
|
||||
#include <cstring>
|
||||
#include <cstdio>
|
||||
|
||||
#include <curl/curl.h>
|
||||
#include <unistd.h>
|
||||
#include <jansson.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
#include <stdio.h>
|
||||
#include <openssl/err.h>
|
||||
#include <openssl/opensslv.h>
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include <villas/nodes/ngsi.hpp>
|
||||
#include <villas/utils.hpp>
|
||||
|
@ -39,14 +42,43 @@ using namespace villas;
|
|||
using namespace villas::utils;
|
||||
|
||||
/* Some global settings */
|
||||
static char *name = nullptr;
|
||||
|
||||
#if OPENSSL_VERSION_NUMBER < 0x010003000L
|
||||
// See: https://curl.haxx.se/libcurl/c/opensslthreadlock.html
|
||||
#define CURL_SSL_REQUIRES_LOCKING
|
||||
#endif
|
||||
|
||||
#ifdef CURL_SSL_REQUIRES_LOCKING
|
||||
/** This array will store all of the mutexes available to OpenSSL. */
|
||||
static pthread_mutex_t *mutex_buf = NULL;
|
||||
|
||||
void handle_error(const char *file, int lineno, const char *msg)
|
||||
{
|
||||
error("** %s:%d %s\n", file, lineno, msg);
|
||||
ERR_print_errors_fp(stderr);
|
||||
/* exit(-1); */
|
||||
}
|
||||
|
||||
static void curl_ssl_locking_function(int mode, int n, const char *file, int line)
|
||||
{
|
||||
if (mode & CRYPTO_LOCK)
|
||||
pthread_mutex_lock(&mutex_buf[n]);
|
||||
else
|
||||
pthread_mutex_unlock(&mutex_buf[n]);
|
||||
}
|
||||
|
||||
static unsigned long curl_ssl_thread_id_function(void)
|
||||
{
|
||||
return ((unsigned long) pthread_self());
|
||||
}
|
||||
#endif /* CURL_SSL_REQUIRES_LOCKING */
|
||||
|
||||
enum NgsiFlags {
|
||||
NGSI_ENTITY_ATTRIBUTES_IN = (1 << 0),
|
||||
NGSI_ENTITY_ATTRIBUTES_IN = (1 << 0),
|
||||
NGSI_ENTITY_ATTRIBUTES_OUT = (1 << 1),
|
||||
NGSI_ENTITY_ATTRIBUTES = NGSI_ENTITY_ATTRIBUTES_IN | NGSI_ENTITY_ATTRIBUTES_OUT,
|
||||
NGSI_ENTITY_VALUES = (1 << 2),
|
||||
NGSI_ENTITY_METADATA = (1 << 3),
|
||||
NGSI_ENTITY_ATTRIBUTES = NGSI_ENTITY_ATTRIBUTES_IN | NGSI_ENTITY_ATTRIBUTES_OUT,
|
||||
NGSI_ENTITY_VALUES = (1 << 2),
|
||||
NGSI_ENTITY_METADATA = (1 << 3),
|
||||
};
|
||||
|
||||
class NgsiMetadata {
|
||||
|
@ -185,8 +217,8 @@ public:
|
|||
|
||||
for (auto &meta : metadata) {
|
||||
json_array_append_new(json_metadatas, json_pack("{ s: s, s: s, s: s }",
|
||||
"name", meta.name.c_str(),
|
||||
"type", meta.type.c_str(),
|
||||
"name", meta.name.c_str(),
|
||||
"type", meta.type.c_str(),
|
||||
"value", meta.value.c_str()
|
||||
));
|
||||
}
|
||||
|
@ -367,7 +399,7 @@ static int ngsi_parse_entity(struct node *n, json_t *json_entity, struct sample
|
|||
|
||||
#ifdef NGSI_VECTORS
|
||||
smp->flags |= (int) (SampleFlags::HAS_SEQUENCE |
|
||||
SampleFlags::HAS_TS_ORIGIN);
|
||||
SampleFlags::HAS_TS_ORIGIN);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -490,8 +522,10 @@ static int ngsi_request_context_query(CURL *handle, const char *endpoint, json_t
|
|||
json_t *json_request = json_pack("{ s: [ o ] }", "entities", json_entity);
|
||||
|
||||
ret = ngsi_request(handle, endpoint, "queryContext", json_request, &json_response);
|
||||
if (ret)
|
||||
if (ret) {
|
||||
ret = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = ngsi_parse_context_response(json_response, &code, &reason, json_rentity);
|
||||
if (ret)
|
||||
|
@ -532,12 +566,37 @@ out: json_decref(json_request);
|
|||
|
||||
int ngsi_type_start(villas::node::SuperNode *sn)
|
||||
{
|
||||
#ifdef CURL_SSL_REQUIRES_LOCKING
|
||||
mutex_buf = new pthread_mutex_t[CRYPTO_num_locks()];
|
||||
if (!mutex_buf)
|
||||
return -1;
|
||||
|
||||
for (int i = 0; i < CRYPTO_num_locks(); i++)
|
||||
pthread_mutex_init(&mutex_buf[i], nullptr);
|
||||
|
||||
CRYPTO_set_id_callback(curl_ssl_thread_id_function);
|
||||
CRYPTO_set_locking_callback(curl_ssl_locking_function);
|
||||
|
||||
info("Setup libcurl/openssl locking primitives");
|
||||
#endif /* CURL_SSL_REQUIRES_LOCKING */
|
||||
|
||||
return curl_global_init(CURL_GLOBAL_ALL);
|
||||
}
|
||||
|
||||
int ngsi_type_stop()
|
||||
{
|
||||
free(name);
|
||||
#ifdef CURL_SSL_REQUIRES_LOCKING
|
||||
if (!mutex_buf)
|
||||
return -1;
|
||||
|
||||
CRYPTO_set_id_callback(NULL);
|
||||
CRYPTO_set_locking_callback(NULL);
|
||||
|
||||
for (int i = 0; i < CRYPTO_num_locks(); i++)
|
||||
pthread_mutex_destroy(&mutex_buf[i]);
|
||||
|
||||
delete mutex_buf;
|
||||
#endif /* CURL_SSL_REQUIRES_LOCKING */
|
||||
|
||||
curl_global_cleanup();
|
||||
|
||||
|
@ -605,7 +664,8 @@ int ngsi_start(struct node *n)
|
|||
struct ngsi *i = (struct ngsi *) n->_vd;
|
||||
int ret;
|
||||
|
||||
i->curl = curl_easy_init();
|
||||
i->in.curl = curl_easy_init();
|
||||
i->out.curl = curl_easy_init();
|
||||
i->headers = nullptr;
|
||||
|
||||
if (i->access_token) {
|
||||
|
@ -623,16 +683,20 @@ int ngsi_start(struct node *n)
|
|||
i->headers = curl_slist_append(i->headers, "Accept: application/json");
|
||||
i->headers = curl_slist_append(i->headers, "Content-Type: application/json");
|
||||
|
||||
curl_easy_setopt(i->curl, CURLOPT_SSL_VERIFYPEER, i->ssl_verify);
|
||||
curl_easy_setopt(i->curl, CURLOPT_TIMEOUT_MS, i->timeout * 1e3);
|
||||
curl_easy_setopt(i->curl, CURLOPT_HTTPHEADER, i->headers);
|
||||
curl_easy_setopt(i->curl, CURLOPT_USERAGENT, USER_AGENT);
|
||||
CURL *handles[] = { i->in.curl, i->out.curl };
|
||||
|
||||
for (unsigned p = 0; p < ARRAY_LEN(handles); p++) {
|
||||
curl_easy_setopt(handles[p], CURLOPT_SSL_VERIFYPEER, i->ssl_verify);
|
||||
curl_easy_setopt(handles[p], CURLOPT_TIMEOUT_MS, i->timeout * 1e3);
|
||||
curl_easy_setopt(handles[p], CURLOPT_HTTPHEADER, i->headers);
|
||||
curl_easy_setopt(handles[p], CURLOPT_USERAGENT, USER_AGENT);
|
||||
}
|
||||
|
||||
/* Create entity and atributes */
|
||||
if (i->create) {
|
||||
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, NGSI_ENTITY_ATTRIBUTES | NGSI_ENTITY_METADATA);
|
||||
|
||||
ret = ngsi_request_context_update(i->curl, i->endpoint, "APPEND", json_entity);
|
||||
ret = ngsi_request_context_update(i->out.curl, i->endpoint, "APPEND", json_entity);
|
||||
if (ret)
|
||||
throw RuntimeError("Failed to create NGSI context for node {}", node_name(n));
|
||||
|
||||
|
@ -652,11 +716,12 @@ int ngsi_stop(struct node *n)
|
|||
/* Delete complete entity (not just attributes) */
|
||||
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);
|
||||
|
||||
ret = ngsi_request_context_update(i->curl, i->endpoint, "DELETE", json_entity);
|
||||
ret = ngsi_request_context_update(i->out.curl, i->endpoint, "DELETE", json_entity);
|
||||
|
||||
json_decref(json_entity);
|
||||
|
||||
curl_easy_cleanup(i->curl);
|
||||
curl_easy_cleanup(i->in.curl);
|
||||
curl_easy_cleanup(i->out.curl);
|
||||
curl_slist_free_all(i->headers);
|
||||
|
||||
return ret;
|
||||
|
@ -673,7 +738,7 @@ int ngsi_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
|
|||
json_t *json_rentity;
|
||||
json_t *json_entity = ngsi_build_entity(n, nullptr, 0, 0);
|
||||
|
||||
ret = ngsi_request_context_query(i->curl, i->endpoint, json_entity, &json_rentity);
|
||||
ret = ngsi_request_context_query(i->in.curl, i->endpoint, json_entity, &json_rentity);
|
||||
if (ret)
|
||||
goto out;
|
||||
|
||||
|
@ -694,7 +759,7 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
|
||||
json_t *json_entity = ngsi_build_entity(n, smps, cnt, NGSI_ENTITY_ATTRIBUTES_OUT | NGSI_ENTITY_VALUES);
|
||||
|
||||
ret = ngsi_request_context_update(i->curl, i->endpoint, "UPDATE", json_entity);
|
||||
ret = ngsi_request_context_update(i->out.curl, i->endpoint, "UPDATE", json_entity);
|
||||
|
||||
json_decref(json_entity);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue