diff --git a/.gitignore b/.gitignore
index 88e5d6a51..b4a5b4000 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,6 @@
.cproject
.settings/
.vscode/
+
+# YouCompleteMe
+.ycm_extra_conf.py
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3b8e23313..e32c1c49e 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -20,10 +20,7 @@
# along with this program. If not, see .
###################################################################################
-cmake_minimum_required(VERSION 3.3)
-
-# Policies
-cmake_policy(SET CMP0068 NEW)
+cmake_minimum_required(VERSION 3.6)
project(VILLASnode C CXX)
@@ -31,7 +28,6 @@ project(VILLASnode C CXX)
set(CMAKE_C_STANDARD 11)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_THREAD_PREFER_PTHREAD ON)
-#set(CMAKE_SKIP_INSTALL_RPATH ON)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
if(APPLE)
diff --git a/include/villas/common.h b/include/villas/common.h
index 64de9bb8c..a7093ac38 100644
--- a/include/villas/common.h
+++ b/include/villas/common.h
@@ -38,7 +38,8 @@ enum state {
STATE_OPENED = 4, /* alias for STATE_STARTED used by struct io */
STATE_STOPPED = 5,
STATE_UNLOADED = 5, /* alias for STATE_STARTED used by struct plugin */
- STATE_CLOSED = 5 /* alias for STATE_STARTED used by struct io */
+ STATE_CLOSED = 5, /* alias for STATE_STARTED used by struct io */
+ STATE_CONNECTED = 6
};
/** Callback to destroy list elements.
diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h
index 0e0b3136c..cf2cb286d 100644
--- a/include/villas/nodes/infiniband.h
+++ b/include/villas/nodes/infiniband.h
@@ -38,6 +38,7 @@
/* Function pointer typedefs */
typedef void (*ib_on_completion) (struct node*, struct ibv_wc*, int*);
typedef void * (*ib_poll_function) (void*);
+typedef void * (*ib_event_function) (void*);
/* Enums */
enum poll_mode_e {
@@ -75,10 +76,10 @@ struct infiniband {
/* Poll thread */
pthread_t cq_poller_thread;
-
- int stopThread;
} poll;
+ int stopThreads;
+
/* Connection specific variables */
struct connection_s {
struct addrinfo *src_addr;
@@ -88,8 +89,7 @@ struct infiniband {
struct r_addr_key_s *r_addr_key;
- pthread_t stop_thread;
- int rdma_disconnect_called;
+ pthread_t rdma_cm_event_thread;
int available_recv_wrs;
} conn;
diff --git a/lib/formats/villas.pb-c.c b/lib/formats/villas.pb-c.c
deleted file mode 100644
index 0266a9cd6..000000000
--- a/lib/formats/villas.pb-c.c
+++ /dev/null
@@ -1,438 +0,0 @@
-/* Generated by the protocol buffer compiler. DO NOT EDIT! */
-/* Generated from: lib/formats/villas.proto */
-
-/* Do not generate deprecated warnings for self */
-#ifndef PROTOBUF_C__NO_DEPRECATED
-#define PROTOBUF_C__NO_DEPRECATED
-#endif
-
-#include "lib/formats/villas.pb-c.h"
-void villas__node__message__init
- (Villas__Node__Message *message)
-{
- static const Villas__Node__Message init_value = VILLAS__NODE__MESSAGE__INIT;
- *message = init_value;
-}
-size_t villas__node__message__get_packed_size
- (const Villas__Node__Message *message)
-{
- assert(message->base.descriptor == &villas__node__message__descriptor);
- return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
-}
-size_t villas__node__message__pack
- (const Villas__Node__Message *message,
- uint8_t *out)
-{
- assert(message->base.descriptor == &villas__node__message__descriptor);
- return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
-}
-size_t villas__node__message__pack_to_buffer
- (const Villas__Node__Message *message,
- ProtobufCBuffer *buffer)
-{
- assert(message->base.descriptor == &villas__node__message__descriptor);
- return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
-}
-Villas__Node__Message *
- villas__node__message__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data)
-{
- return (Villas__Node__Message *)
- protobuf_c_message_unpack (&villas__node__message__descriptor,
- allocator, len, data);
-}
-void villas__node__message__free_unpacked
- (Villas__Node__Message *message,
- ProtobufCAllocator *allocator)
-{
- if(!message)
- return;
- assert(message->base.descriptor == &villas__node__message__descriptor);
- protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
-}
-void villas__node__sample__init
- (Villas__Node__Sample *message)
-{
- static const Villas__Node__Sample init_value = VILLAS__NODE__SAMPLE__INIT;
- *message = init_value;
-}
-size_t villas__node__sample__get_packed_size
- (const Villas__Node__Sample *message)
-{
- assert(message->base.descriptor == &villas__node__sample__descriptor);
- return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
-}
-size_t villas__node__sample__pack
- (const Villas__Node__Sample *message,
- uint8_t *out)
-{
- assert(message->base.descriptor == &villas__node__sample__descriptor);
- return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
-}
-size_t villas__node__sample__pack_to_buffer
- (const Villas__Node__Sample *message,
- ProtobufCBuffer *buffer)
-{
- assert(message->base.descriptor == &villas__node__sample__descriptor);
- return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
-}
-Villas__Node__Sample *
- villas__node__sample__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data)
-{
- return (Villas__Node__Sample *)
- protobuf_c_message_unpack (&villas__node__sample__descriptor,
- allocator, len, data);
-}
-void villas__node__sample__free_unpacked
- (Villas__Node__Sample *message,
- ProtobufCAllocator *allocator)
-{
- if(!message)
- return;
- assert(message->base.descriptor == &villas__node__sample__descriptor);
- protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
-}
-void villas__node__timestamp__init
- (Villas__Node__Timestamp *message)
-{
- static const Villas__Node__Timestamp init_value = VILLAS__NODE__TIMESTAMP__INIT;
- *message = init_value;
-}
-size_t villas__node__timestamp__get_packed_size
- (const Villas__Node__Timestamp *message)
-{
- assert(message->base.descriptor == &villas__node__timestamp__descriptor);
- return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
-}
-size_t villas__node__timestamp__pack
- (const Villas__Node__Timestamp *message,
- uint8_t *out)
-{
- assert(message->base.descriptor == &villas__node__timestamp__descriptor);
- return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
-}
-size_t villas__node__timestamp__pack_to_buffer
- (const Villas__Node__Timestamp *message,
- ProtobufCBuffer *buffer)
-{
- assert(message->base.descriptor == &villas__node__timestamp__descriptor);
- return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
-}
-Villas__Node__Timestamp *
- villas__node__timestamp__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data)
-{
- return (Villas__Node__Timestamp *)
- protobuf_c_message_unpack (&villas__node__timestamp__descriptor,
- allocator, len, data);
-}
-void villas__node__timestamp__free_unpacked
- (Villas__Node__Timestamp *message,
- ProtobufCAllocator *allocator)
-{
- if(!message)
- return;
- assert(message->base.descriptor == &villas__node__timestamp__descriptor);
- protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
-}
-void villas__node__value__init
- (Villas__Node__Value *message)
-{
- static const Villas__Node__Value init_value = VILLAS__NODE__VALUE__INIT;
- *message = init_value;
-}
-size_t villas__node__value__get_packed_size
- (const Villas__Node__Value *message)
-{
- assert(message->base.descriptor == &villas__node__value__descriptor);
- return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
-}
-size_t villas__node__value__pack
- (const Villas__Node__Value *message,
- uint8_t *out)
-{
- assert(message->base.descriptor == &villas__node__value__descriptor);
- return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
-}
-size_t villas__node__value__pack_to_buffer
- (const Villas__Node__Value *message,
- ProtobufCBuffer *buffer)
-{
- assert(message->base.descriptor == &villas__node__value__descriptor);
- return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
-}
-Villas__Node__Value *
- villas__node__value__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data)
-{
- return (Villas__Node__Value *)
- protobuf_c_message_unpack (&villas__node__value__descriptor,
- allocator, len, data);
-}
-void villas__node__value__free_unpacked
- (Villas__Node__Value *message,
- ProtobufCAllocator *allocator)
-{
- if(!message)
- return;
- assert(message->base.descriptor == &villas__node__value__descriptor);
- protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
-}
-static const ProtobufCFieldDescriptor villas__node__message__field_descriptors[1] =
-{
- {
- "samples",
- 1,
- PROTOBUF_C_LABEL_REPEATED,
- PROTOBUF_C_TYPE_MESSAGE,
- offsetof(Villas__Node__Message, n_samples),
- offsetof(Villas__Node__Message, samples),
- &villas__node__sample__descriptor,
- NULL,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
-};
-static const unsigned villas__node__message__field_indices_by_name[] = {
- 0, /* field[0] = samples */
-};
-static const ProtobufCIntRange villas__node__message__number_ranges[1 + 1] =
-{
- { 1, 0 },
- { 0, 1 }
-};
-const ProtobufCMessageDescriptor villas__node__message__descriptor =
-{
- PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
- "villas.node.Message",
- "Message",
- "Villas__Node__Message",
- "villas.node",
- sizeof(Villas__Node__Message),
- 1,
- villas__node__message__field_descriptors,
- villas__node__message__field_indices_by_name,
- 1, villas__node__message__number_ranges,
- (ProtobufCMessageInit) villas__node__message__init,
- NULL,NULL,NULL /* reserved[123] */
-};
-static const ProtobufCEnumValue villas__node__sample__type__enum_values_by_number[3] =
-{
- { "DATA", "VILLAS__NODE__SAMPLE__TYPE__DATA", 1 },
- { "START", "VILLAS__NODE__SAMPLE__TYPE__START", 2 },
- { "STOP", "VILLAS__NODE__SAMPLE__TYPE__STOP", 3 },
-};
-static const ProtobufCIntRange villas__node__sample__type__value_ranges[] = {
-{1, 0},{0, 3}
-};
-static const ProtobufCEnumValueIndex villas__node__sample__type__enum_values_by_name[3] =
-{
- { "DATA", 0 },
- { "START", 1 },
- { "STOP", 2 },
-};
-const ProtobufCEnumDescriptor villas__node__sample__type__descriptor =
-{
- PROTOBUF_C__ENUM_DESCRIPTOR_MAGIC,
- "villas.node.Sample.Type",
- "Type",
- "Villas__Node__Sample__Type",
- "villas.node",
- 3,
- villas__node__sample__type__enum_values_by_number,
- 3,
- villas__node__sample__type__enum_values_by_name,
- 1,
- villas__node__sample__type__value_ranges,
- NULL,NULL,NULL,NULL /* reserved[1234] */
-};
-static const Villas__Node__Sample__Type villas__node__sample__type__default_value = VILLAS__NODE__SAMPLE__TYPE__DATA;
-static const ProtobufCFieldDescriptor villas__node__sample__field_descriptors[4] =
-{
- {
- "type",
- 1,
- PROTOBUF_C_LABEL_REQUIRED,
- PROTOBUF_C_TYPE_ENUM,
- 0, /* quantifier_offset */
- offsetof(Villas__Node__Sample, type),
- &villas__node__sample__type__descriptor,
- &villas__node__sample__type__default_value,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
- {
- "sequence",
- 2,
- PROTOBUF_C_LABEL_OPTIONAL,
- PROTOBUF_C_TYPE_UINT32,
- offsetof(Villas__Node__Sample, has_sequence),
- offsetof(Villas__Node__Sample, sequence),
- NULL,
- NULL,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
- {
- "timestamp",
- 4,
- PROTOBUF_C_LABEL_OPTIONAL,
- PROTOBUF_C_TYPE_MESSAGE,
- 0, /* quantifier_offset */
- offsetof(Villas__Node__Sample, timestamp),
- &villas__node__timestamp__descriptor,
- NULL,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
- {
- "values",
- 5,
- PROTOBUF_C_LABEL_REPEATED,
- PROTOBUF_C_TYPE_MESSAGE,
- offsetof(Villas__Node__Sample, n_values),
- offsetof(Villas__Node__Sample, values),
- &villas__node__value__descriptor,
- NULL,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
-};
-static const unsigned villas__node__sample__field_indices_by_name[] = {
- 1, /* field[1] = sequence */
- 2, /* field[2] = timestamp */
- 0, /* field[0] = type */
- 3, /* field[3] = values */
-};
-static const ProtobufCIntRange villas__node__sample__number_ranges[2 + 1] =
-{
- { 1, 0 },
- { 4, 2 },
- { 0, 4 }
-};
-const ProtobufCMessageDescriptor villas__node__sample__descriptor =
-{
- PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
- "villas.node.Sample",
- "Sample",
- "Villas__Node__Sample",
- "villas.node",
- sizeof(Villas__Node__Sample),
- 4,
- villas__node__sample__field_descriptors,
- villas__node__sample__field_indices_by_name,
- 2, villas__node__sample__number_ranges,
- (ProtobufCMessageInit) villas__node__sample__init,
- NULL,NULL,NULL /* reserved[123] */
-};
-static const ProtobufCFieldDescriptor villas__node__timestamp__field_descriptors[2] =
-{
- {
- "sec",
- 1,
- PROTOBUF_C_LABEL_REQUIRED,
- PROTOBUF_C_TYPE_UINT32,
- 0, /* quantifier_offset */
- offsetof(Villas__Node__Timestamp, sec),
- NULL,
- NULL,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
- {
- "nsec",
- 2,
- PROTOBUF_C_LABEL_REQUIRED,
- PROTOBUF_C_TYPE_UINT32,
- 0, /* quantifier_offset */
- offsetof(Villas__Node__Timestamp, nsec),
- NULL,
- NULL,
- 0, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
-};
-static const unsigned villas__node__timestamp__field_indices_by_name[] = {
- 1, /* field[1] = nsec */
- 0, /* field[0] = sec */
-};
-static const ProtobufCIntRange villas__node__timestamp__number_ranges[1 + 1] =
-{
- { 1, 0 },
- { 0, 2 }
-};
-const ProtobufCMessageDescriptor villas__node__timestamp__descriptor =
-{
- PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
- "villas.node.Timestamp",
- "Timestamp",
- "Villas__Node__Timestamp",
- "villas.node",
- sizeof(Villas__Node__Timestamp),
- 2,
- villas__node__timestamp__field_descriptors,
- villas__node__timestamp__field_indices_by_name,
- 1, villas__node__timestamp__number_ranges,
- (ProtobufCMessageInit) villas__node__timestamp__init,
- NULL,NULL,NULL /* reserved[123] */
-};
-static const ProtobufCFieldDescriptor villas__node__value__field_descriptors[2] =
-{
- {
- "f",
- 1,
- PROTOBUF_C_LABEL_OPTIONAL,
- PROTOBUF_C_TYPE_FLOAT,
- offsetof(Villas__Node__Value, value_case),
- offsetof(Villas__Node__Value, f),
- NULL,
- NULL,
- 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
- {
- "i",
- 2,
- PROTOBUF_C_LABEL_OPTIONAL,
- PROTOBUF_C_TYPE_INT32,
- offsetof(Villas__Node__Value, value_case),
- offsetof(Villas__Node__Value, i),
- NULL,
- NULL,
- 0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
- 0,NULL,NULL /* reserved1,reserved2, etc */
- },
-};
-static const unsigned villas__node__value__field_indices_by_name[] = {
- 0, /* field[0] = f */
- 1, /* field[1] = i */
-};
-static const ProtobufCIntRange villas__node__value__number_ranges[1 + 1] =
-{
- { 1, 0 },
- { 0, 2 }
-};
-const ProtobufCMessageDescriptor villas__node__value__descriptor =
-{
- PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
- "villas.node.Value",
- "Value",
- "Villas__Node__Value",
- "villas.node",
- sizeof(Villas__Node__Value),
- 2,
- villas__node__value__field_descriptors,
- villas__node__value__field_indices_by_name,
- 1, villas__node__value__number_ranges,
- (ProtobufCMessageInit) villas__node__value__init,
- NULL,NULL,NULL /* reserved[123] */
-};
diff --git a/lib/formats/villas.pb-c.h b/lib/formats/villas.pb-c.h
deleted file mode 100644
index 557064744..000000000
--- a/lib/formats/villas.pb-c.h
+++ /dev/null
@@ -1,222 +0,0 @@
-/* Generated by the protocol buffer compiler. DO NOT EDIT! */
-/* Generated from: lib/formats/villas.proto */
-
-#ifndef PROTOBUF_C_lib_2fformats_2fvillas_2eproto__INCLUDED
-#define PROTOBUF_C_lib_2fformats_2fvillas_2eproto__INCLUDED
-
-#include
-
-PROTOBUF_C__BEGIN_DECLS
-
-#if PROTOBUF_C_VERSION_NUMBER < 1000000
-# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
-#elif 1003000 < PROTOBUF_C_MIN_COMPILER_VERSION
-# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
-#endif
-
-
-typedef struct _Villas__Node__Message Villas__Node__Message;
-typedef struct _Villas__Node__Sample Villas__Node__Sample;
-typedef struct _Villas__Node__Timestamp Villas__Node__Timestamp;
-typedef struct _Villas__Node__Value Villas__Node__Value;
-
-
-/* --- enums --- */
-
-typedef enum _Villas__Node__Sample__Type {
- /*
- * Message contains float / integer data values
- */
- VILLAS__NODE__SAMPLE__TYPE__DATA = 1,
- /*
- * Message marks the beginning of a new simulation case
- */
- VILLAS__NODE__SAMPLE__TYPE__START = 2,
- /*
- * Message marks the end of a simulation case
- */
- VILLAS__NODE__SAMPLE__TYPE__STOP = 3
- PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(VILLAS__NODE__SAMPLE__TYPE)
-} Villas__Node__Sample__Type;
-
-/* --- messages --- */
-
-struct _Villas__Node__Message
-{
- ProtobufCMessage base;
- size_t n_samples;
- Villas__Node__Sample **samples;
-};
-#define VILLAS__NODE__MESSAGE__INIT \
- { PROTOBUF_C_MESSAGE_INIT (&villas__node__message__descriptor) \
- , 0,NULL }
-
-
-struct _Villas__Node__Sample
-{
- ProtobufCMessage base;
- Villas__Node__Sample__Type type;
- /*
- * The sequence number is incremented by one for consecutive messages.
- */
- protobuf_c_boolean has_sequence;
- uint32_t sequence;
- Villas__Node__Timestamp *timestamp;
- size_t n_values;
- Villas__Node__Value **values;
-};
-#define VILLAS__NODE__SAMPLE__INIT \
- { PROTOBUF_C_MESSAGE_INIT (&villas__node__sample__descriptor) \
- , VILLAS__NODE__SAMPLE__TYPE__DATA, 0, 0, NULL, 0,NULL }
-
-
-struct _Villas__Node__Timestamp
-{
- ProtobufCMessage base;
- /*
- * Seconds since 1970-01-01 00:00:00
- */
- uint32_t sec;
- /*
- * Nanoseconds of the current second.
- */
- uint32_t nsec;
-};
-#define VILLAS__NODE__TIMESTAMP__INIT \
- { PROTOBUF_C_MESSAGE_INIT (&villas__node__timestamp__descriptor) \
- , 0, 0 }
-
-
-typedef enum {
- VILLAS__NODE__VALUE__VALUE__NOT_SET = 0,
- VILLAS__NODE__VALUE__VALUE_F = 1,
- VILLAS__NODE__VALUE__VALUE_I = 2
- PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(VILLAS__NODE__VALUE__VALUE)
-} Villas__Node__Value__ValueCase;
-
-struct _Villas__Node__Value
-{
- ProtobufCMessage base;
- Villas__Node__Value__ValueCase value_case;
- union {
- /*
- * Floating point values.
- */
- float f;
- /*
- * Integer values.
- */
- int32_t i;
- };
-};
-#define VILLAS__NODE__VALUE__INIT \
- { PROTOBUF_C_MESSAGE_INIT (&villas__node__value__descriptor) \
- , VILLAS__NODE__VALUE__VALUE__NOT_SET, {0} }
-
-
-/* Villas__Node__Message methods */
-void villas__node__message__init
- (Villas__Node__Message *message);
-size_t villas__node__message__get_packed_size
- (const Villas__Node__Message *message);
-size_t villas__node__message__pack
- (const Villas__Node__Message *message,
- uint8_t *out);
-size_t villas__node__message__pack_to_buffer
- (const Villas__Node__Message *message,
- ProtobufCBuffer *buffer);
-Villas__Node__Message *
- villas__node__message__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data);
-void villas__node__message__free_unpacked
- (Villas__Node__Message *message,
- ProtobufCAllocator *allocator);
-/* Villas__Node__Sample methods */
-void villas__node__sample__init
- (Villas__Node__Sample *message);
-size_t villas__node__sample__get_packed_size
- (const Villas__Node__Sample *message);
-size_t villas__node__sample__pack
- (const Villas__Node__Sample *message,
- uint8_t *out);
-size_t villas__node__sample__pack_to_buffer
- (const Villas__Node__Sample *message,
- ProtobufCBuffer *buffer);
-Villas__Node__Sample *
- villas__node__sample__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data);
-void villas__node__sample__free_unpacked
- (Villas__Node__Sample *message,
- ProtobufCAllocator *allocator);
-/* Villas__Node__Timestamp methods */
-void villas__node__timestamp__init
- (Villas__Node__Timestamp *message);
-size_t villas__node__timestamp__get_packed_size
- (const Villas__Node__Timestamp *message);
-size_t villas__node__timestamp__pack
- (const Villas__Node__Timestamp *message,
- uint8_t *out);
-size_t villas__node__timestamp__pack_to_buffer
- (const Villas__Node__Timestamp *message,
- ProtobufCBuffer *buffer);
-Villas__Node__Timestamp *
- villas__node__timestamp__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data);
-void villas__node__timestamp__free_unpacked
- (Villas__Node__Timestamp *message,
- ProtobufCAllocator *allocator);
-/* Villas__Node__Value methods */
-void villas__node__value__init
- (Villas__Node__Value *message);
-size_t villas__node__value__get_packed_size
- (const Villas__Node__Value *message);
-size_t villas__node__value__pack
- (const Villas__Node__Value *message,
- uint8_t *out);
-size_t villas__node__value__pack_to_buffer
- (const Villas__Node__Value *message,
- ProtobufCBuffer *buffer);
-Villas__Node__Value *
- villas__node__value__unpack
- (ProtobufCAllocator *allocator,
- size_t len,
- const uint8_t *data);
-void villas__node__value__free_unpacked
- (Villas__Node__Value *message,
- ProtobufCAllocator *allocator);
-/* --- per-message closures --- */
-
-typedef void (*Villas__Node__Message_Closure)
- (const Villas__Node__Message *message,
- void *closure_data);
-typedef void (*Villas__Node__Sample_Closure)
- (const Villas__Node__Sample *message,
- void *closure_data);
-typedef void (*Villas__Node__Timestamp_Closure)
- (const Villas__Node__Timestamp *message,
- void *closure_data);
-typedef void (*Villas__Node__Value_Closure)
- (const Villas__Node__Value *message,
- void *closure_data);
-
-/* --- services --- */
-
-
-/* --- descriptors --- */
-
-extern const ProtobufCMessageDescriptor villas__node__message__descriptor;
-extern const ProtobufCMessageDescriptor villas__node__sample__descriptor;
-extern const ProtobufCEnumDescriptor villas__node__sample__type__descriptor;
-extern const ProtobufCMessageDescriptor villas__node__timestamp__descriptor;
-extern const ProtobufCMessageDescriptor villas__node__value__descriptor;
-
-PROTOBUF_C__END_DECLS
-
-
-#endif /* PROTOBUF_C_lib_2fformats_2fvillas_2eproto__INCLUDED */
diff --git a/lib/log.c b/lib/log.c
index fab1a89bf..988239409 100644
--- a/lib/log.c
+++ b/lib/log.c
@@ -304,12 +304,14 @@ void log_vprint(struct log *l, const char *lvl, const char *fmt, va_list ap)
/* Timestamp & Severity */
strcatf(&buf, "%10.3f %-5s ", time_delta(&l->epoch, &ts), lvl);
- /* Indention */
+ /* Indention in case we log to the terminal */
#ifdef __GNUC__
- for (int i = 0; i < indent; i++)
- strcatf(&buf, "%s ", BOX_UD);
+ if (l->file == stderr || l->file == stdout) {
+ for (int i = 0; i < indent; i++)
+ strcatf(&buf, "%s ", BOX_UD);
- strcatf(&buf, "%s ", BOX_UDR);
+ strcatf(&buf, "%s ", BOX_UDR);
+ }
#endif
/* Format String */
@@ -319,7 +321,11 @@ void log_vprint(struct log *l, const char *lvl, const char *fmt, va_list ap)
#ifdef ENABLE_OPAL_ASYNC
OpalPrint("VILLASnode: %s\n", buf);
#endif
- fprintf(l->file ? l->file : stderr, "%s\n", buf);
+ if (l->file)
+ fprintf(l->file, "%s\n", buf);
+
+ if (l->syslog)
+ vsyslog(LOG_INFO, fmt, ap);
free(buf);
}
diff --git a/lib/memory/ib.c b/lib/memory/ib.c
index 993b5398d..3a5f24da6 100644
--- a/lib/memory/ib.c
+++ b/lib/memory/ib.c
@@ -55,6 +55,9 @@ static struct memory_allocation * memory_ib_alloc(struct memory_type *m, size_t
ma->parent = mi->parent->alloc(mi->parent, len + sizeof(struct ibv_mr *), alignment);
ma->address = ma->parent->address;
+ if(!mi->pd)
+ error("Protection domain is not registered!");
+
ma->ib.mr = ibv_reg_mr(mi->pd, ma->address, ma->length, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if(!ma->ib.mr) {
mi->parent->free(mi->parent, ma->parent);
diff --git a/lib/node.c b/lib/node.c
index aa7eb696b..267eccff7 100644
--- a/lib/node.c
+++ b/lib/node.c
@@ -351,7 +351,7 @@ int node_stop(struct node *n)
{
int ret;
- if (n->state != STATE_STARTED)
+ if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
return 0;
info("Stopping node %s", node_name(n));
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index ba82bb1f7..5b103b71e 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -32,13 +32,13 @@
#include
#include
-#include
-
-int ib_cleanup(struct node *n)
+int ib_disconnect(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
debug(LOG_IB | 1, "Starting to clean up");
+ rdma_disconnect(ib->ctx.id);
+
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
debug(LOG_IB | 3, "Destroyed QP");
@@ -54,13 +54,8 @@ int ib_cleanup(struct node *n)
pool_destroy(&ib->mem.p_send);
debug(LOG_IB | 3, "Destroyed memory pools");
- // Destroy RDMA CM ID
- rdma_destroy_id(ib->ctx.id);
- debug(LOG_IB | 3, "Destroyed rdma_cm_id");
-
- // Destroy event channel
- rdma_destroy_event_channel(ib->ctx.ec);
- debug(LOG_IB | 3, "Destroyed event channel");
+ // Set available receive work requests to zero
+ ib->conn.available_recv_wrs = 0;
return 0;
}
@@ -93,17 +88,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
{
- struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd;
-
for (int i = 0; i < *size; i++) {
- //On disconnect, the QP set to error state and will be flushed
- if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
- debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_completion_source. Stopping thread.");
-
- ib->poll.stopThread = 1;
- return;
- }
-
if (wc[i].status != IBV_WC_SUCCESS)
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[i].status);
@@ -147,7 +132,7 @@ void * ib_busy_poll_thread(void *n)
while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
ib->poll.on_compl(n, wc, &size);
- if (ib->poll.stopThread)
+ if (ib->stopThreads)
return NULL;
}
}
@@ -170,21 +155,13 @@ static void ib_init_wc_poll(struct node *n)
}
// Create completion queues and bind to channel (or NULL)
- ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs,
- ib->cq_size,
- NULL,
- NULL,
- 0);
+ ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0);
if (!ib->ctx.recv_cq)
error("Could not create receive completion queue in node %s", node_name(n));
debug(LOG_IB | 3, "Created receive Completion Queue");
- ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs,
- ib->cq_size,
- NULL,
- ib->ctx.comp_channel,
- 0);
+ ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, 0);
if (!ib->ctx.send_cq)
error("Could not create send completion queue in node %s", node_name(n));
@@ -216,13 +193,6 @@ static void ib_build_ibv(struct node *n)
debug(LOG_IB | 1, "Starting to build IBV components");
- //Allocate protection domain
- ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs);
- if (!ib->ctx.pd)
- error("Could not allocate protection domain in node %s", node_name(n));
-
- debug(LOG_IB | 3, "Allocated Protection Domain");
-
// Initiate poll mode
ib_init_wc_poll(n);
@@ -245,10 +215,7 @@ static void ib_build_ibv(struct node *n)
ib->mem.p_recv.queue.state = STATE_DESTROYED;
// Set pool size to maximum size of Receive Queue
- pool_init(&ib->mem.p_recv,
- ib->qp_init.cap.max_recv_wr,
- SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN),
- &memory_type_heap);
+ pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memory_type_heap);
if (ret)
error("Failed to init recv memory pool of node %s: %s",
node_name(n), gai_strerror(ret));
@@ -260,8 +227,7 @@ static void ib_build_ibv(struct node *n)
// Register memory for IB Device. Not necessary if data is send
// exclusively inline
- ib->mem.mr_recv = ibv_reg_mr(
- ib->ctx.pd,
+ ib->mem.mr_recv = ibv_reg_mr(ib->ctx.pd,
(char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off,
ib->mem.p_recv.len,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
@@ -276,29 +242,22 @@ static void ib_build_ibv(struct node *n)
ib->mem.p_send.queue.state = STATE_DESTROYED;
// Set pool size to maximum size of Receive Queue
- pool_init(&ib->mem.p_send,
- ib->qp_init.cap.max_send_wr,
- sizeof(double),
- &memory_type_heap);
+ pool_init(&ib->mem.p_send, ib->qp_init.cap.max_send_wr, sizeof(double), &memory_type_heap);
if (ret)
- error("Failed to init send memory of node %s: %s",
- node_name(n), gai_strerror(ret));
+ error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret));
- debug(LOG_IB | 3, "Created internal send pool with %i elements",
- ib->qp_init.cap.max_recv_wr);
+ debug(LOG_IB | 3, "Created internal send pool with %i elements", ib->qp_init.cap.max_recv_wr);
//ToDo: initialize r_addr_key struct if mode is RDMA
// Register memory for IB Device. Not necessary if data is send
// exclusively inline
- ib->mem.mr_send = ibv_reg_mr(
- ib->ctx.pd,
+ ib->mem.mr_send = ibv_reg_mr(ib->ctx.pd,
(char*)&ib->mem.p_send+ib->mem.p_send.buffer_off,
ib->mem.p_send.len,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!ib->mem.mr_send)
- error("Failed to register mr_send with ibv_reg_mr of node %s",
- node_name(n));
+ error("Failed to register mr_send with ibv_reg_mr of node %s", node_name(n));
debug(LOG_IB | 3, "Registered send pool with ibv_reg_mr");
}
@@ -362,51 +321,6 @@ static int ib_connect_request(struct node *n, struct rdma_cm_id *id)
return 0;
}
-static int ib_event(struct node *n, struct rdma_cm_event *event)
-{
- int ret = 0;
-
- switch(event->event) {
- case RDMA_CM_EVENT_ADDR_RESOLVED:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED");
- ret = ib_addr_resolved(n);
- break;
- case RDMA_CM_EVENT_ADDR_ERROR:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR");
- error("Address resolution (rdma_resolve_addr) failed!");
- case RDMA_CM_EVENT_ROUTE_RESOLVED:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED");
- ret = ib_route_resolved(n);
- break;
- case RDMA_CM_EVENT_ROUTE_ERROR:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR");
- error("Route resolution (rdma_resovle_route) failed!");
- case RDMA_CM_EVENT_CONNECT_REQUEST:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
- ret = ib_connect_request(n, event->id);
- break;
- case RDMA_CM_EVENT_CONNECT_ERROR:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR");
- error("An error has occurred trying to establish a connection!");
- case RDMA_CM_EVENT_REJECTED:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED");
- error("Connection request or response was rejected by the remote end point!");
- case RDMA_CM_EVENT_ESTABLISHED:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED");
- info("Connection established in node %s", node_name(n));
- ret = 1;
- break;
- case RDMA_CM_EVENT_DISCONNECTED:
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
- ret = ib_cleanup(n);
- break;
- default:
- error("Unknown event occurred: %u", event->event);
- }
-
- return ret;
-}
-
int ib_reverse(struct node *n)
{
return 0;
@@ -516,7 +430,7 @@ int ib_parse(struct node *n, json_t *cfg)
//Check if node is a source and connect to target
if (remote) {
- debug(LOG_IB | 3, "Node %s is set up to be able to send data (source and target)", node_name(n));
+ debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n));
ib->is_source = 1;
@@ -529,13 +443,13 @@ int ib_parse(struct node *n, json_t *cfg)
error("Failed to resolve remote address '%s' of node %s: %s",
remote, node_name(n), gai_strerror(ret));
- debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo in node %s", ip_adr, port, node_name(n));
+ debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port);
// Set correct Work Completion function
ib->poll.on_compl = ib_completion_source;
}
else {
- debug(LOG_IB | 3, "Node %s is set up to be able to only receive data (target)", node_name(n));
+ debug(LOG_IB | 3, "Node %s is set up as target", node_name(n));
ib->is_source = 0;
@@ -557,11 +471,11 @@ int ib_check(struct node *n)
int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
if (ib->qp_init.cap.max_send_wr != max_send_pow)
- warn("Max nr. of send WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i",
+ warn("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
ib->qp_init.cap.max_send_wr, max_send_pow);
if (ib->qp_init.cap.max_recv_wr != max_recv_pow)
- warn("Max nr. of recv WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i",
+ warn("Max nr. of recv WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
ib->qp_init.cap.max_recv_wr, max_recv_pow);
@@ -587,32 +501,101 @@ int ib_destroy(struct node *n)
return 0;
}
-void * ib_disconnect_thread(void *n)
+void * ib_rdma_cm_event_thread(void *n)
{
struct node *node = (struct node *) n;
struct infiniband *ib = (struct infiniband *) node->_vd;
struct rdma_cm_event *event;
+ int ret = 0;
- debug(LOG_IB | 1, "Started disconnect thread of node %s", node_name(node));
+ debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
+
+ // Wait until node is completely started
+ while (node->state != STATE_STARTED);
+
+ // Monitor event channel
while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
- if (event->event == RDMA_CM_EVENT_DISCONNECTED) {
- debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
- rdma_ack_cm_event(event);
- ib->conn.rdma_disconnect_called = 1;
+ switch(event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED");
- node_stop(node);
- return NULL;
+ ret = ib_addr_resolved(n);
+ break;
+
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR");
+
+ error("Address resolution (rdma_resolve_addr) failed!");
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED");
+
+ ret = ib_route_resolved(n);
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR");
+
+ error("Route resolution (rdma_resovle_route) failed!");
+ break;
+
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
+
+ ret = ib_connect_request(n, event->id);
+ break;
+
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR");
+
+ error("An error has occurred trying to establish a connection!");
+ break;
+
+ case RDMA_CM_EVENT_REJECTED:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED");
+
+ error("Connection request or response was rejected by the remote end point!");
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED");
+
+ node->state = STATE_CONNECTED;
+
+ info("Connection established in node %s", node_name(n));
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
+
+ node->state = STATE_STARTED;
+ ret = ib_disconnect(n);
+
+ info("Host disconnected. Ready to accept new connections.");
+
+ break;
+
+ case RDMA_CM_EVENT_TIMEWAIT_EXIT:
+ break;
+
+ default:
+ error("Unknown event occurred: %u", event->event);
}
+
+ rdma_ack_cm_event(event);
+
+ if (ret || ib->stopThreads)
+ break;
}
+
return NULL;
}
int ib_start(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
- struct rdma_cm_event *event = NULL;
int ret;
debug(LOG_IB | 1, "Started ib_start");
@@ -626,8 +609,7 @@ int ib_start(struct node *n)
ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
if (ret)
- error("Failed to create rdma_cm_id of node %s: %s",
- node_name(n), gai_strerror(ret));
+ error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret));
debug(LOG_IB | 3, "Created rdma_cm_id");
@@ -639,22 +621,20 @@ int ib_start(struct node *n)
debug(LOG_IB | 3, "Bound rdma_cm_id to Infiniband device");
+ // The ID will be overwritten for the target. If the event type is
+ // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
+ // that communication.
+ ib->ctx.listen_id = ib->ctx.id;
+
+
if (ib->is_source) {
// Resolve address
- ret = rdma_resolve_addr(ib->ctx.id,
- NULL,
- ib->conn.dst_addr->ai_addr,
- ib->conn.timeout);
+ ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout);
if (ret)
error("Failed to resolve remote address after %ims of node %s: %s",
ib->conn.timeout, node_name(n), gai_strerror(ret));
}
else {
- // The ID will be overwritten for the target. If the event type is
- // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
- // that communication.
- ib->ctx.listen_id = ib->ctx.id;
-
// Listen on rdma_cm_id for events
ret = rdma_listen(ib->ctx.listen_id, 10);
if (ret)
@@ -663,24 +643,22 @@ int ib_start(struct node *n)
debug(LOG_IB | 3, "Started to listen to rdma_cm_id");
}
+ //Allocate protection domain
+ ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs);
+ if (!ib->ctx.pd)
+ error("Could not allocate protection domain in node %s", node_name(n));
+
+ debug(LOG_IB | 3, "Allocated Protection Domain");
+
+
// Several events should occur on the event channel, to make
// sure the nodes are succesfully connected.
debug(LOG_IB | 1, "Starting to monitor events on rdma_cm_id");
- while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
- struct rdma_cm_event event_copy;
-
- memcpy(&event_copy, event, sizeof(*event));
-
- rdma_ack_cm_event(event);
-
- if (ib_event(n, &event_copy))
- break;
- }
-
- ret = pthread_create(&ib->conn.stop_thread, NULL, ib_disconnect_thread, n);
+ //Create thread to monitor rdma_cm_event channel
+ ret = pthread_create(&ib->conn.rdma_cm_event_thread, NULL, ib_rdma_cm_event_thread, n);
if (ret)
- error("Failed to create thread to monitor disconnects in node %s: %s",
+ error("Failed to create thread to monitor rdma_cm events in node %s: %s",
node_name(n), gai_strerror(ret));
return 0;
@@ -689,32 +667,56 @@ int ib_start(struct node *n)
int ib_stop(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
- struct rdma_cm_event *event = NULL;
int ret;
+ debug(LOG_IB | 1, "Called ib_stop");
+
+ ib->stopThreads = 1;
+
// Call RDMA disconnect function
// Will flush all outstanding WRs to the Completion Queue and
// will call RDMA_CM_EVENT_DISCONNECTED if that is done.
- ret = rdma_disconnect(ib->ctx.id);
+ if(! ib->is_source && n->state == STATE_CONNECTED)
+ ret = rdma_disconnect(ib->ctx.id);
+ else
+ ret = rdma_disconnect(ib->ctx.listen_id);
+
if (ret)
error("Error while calling rdma_disconnect in node %s: %s",
- node_name(n), gai_strerror(ret));
+ node_name(n), gai_strerror(ret));
debug(LOG_IB | 3, "Called rdma_disconnect");
+ info("Disconnecting... Please give me a few seconds.");
- // If disconnected event already occured, directly call cleanup function
- if (ib->conn.rdma_disconnect_called)
- ib_cleanup(n);
- else {
- // Else, wait for event to occur
- ib->conn.rdma_disconnect_called = 1;
- rdma_get_cm_event(ib->ctx.ec, &event);
+ // Wait for event thread to join
+ ret = pthread_join(ib->conn.rdma_cm_event_thread, NULL);
+ if (ret)
+ error("Error while joining rdma_cm_event_thread in node %s: %i", node_name(n), ret);
- rdma_ack_cm_event(event);
+ debug(LOG_IB | 3, "Joined rdma_cm_event_thread");
- ib_event(n, event);
+ // Wait for polling thread to join
+ if (ib->is_source) {
+ ret = pthread_join(ib->poll.cq_poller_thread, NULL);
+ if (ret)
+ error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret);
}
+
+ // Destroy RDMA CM ID
+ rdma_destroy_id(ib->ctx.id);
+ debug(LOG_IB | 3, "Destroyed rdma_cm_id");
+
+ // Dealloc Protection Domain
+ ibv_dealloc_pd(ib->ctx.pd);
+ debug(LOG_IB | 3, "Destroyed protection domain");
+
+ // Destroy event channel
+ rdma_destroy_event_channel(ib->ctx.ec);
+ debug(LOG_IB | 3, "Destroyed event channel");
+
+ info("Successfully stopped %s", node_name(n));
+
return 0;
}
@@ -735,78 +737,81 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_mr *mr;
- int ret;
+ int ret = 0;
debug(LOG_IB | 15, "ib_read is called");
- if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) {
- // Get Memory Region
- mr = memory_ib_get_mr(smps[0]);
+ if (n->state == STATE_CONNECTED) {
- for (int i = 0; i < cnt; i++) {
- // Increase refcnt of sample
- sample_get(smps[i]);
+ if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize) {
+ // Get Memory Region
+ mr = memory_ib_get_mr(smps[0]);
- // Prepare receive Scatter/Gather element
- sge[i].addr = (uint64_t) &smps[i]->data;
- sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
- sge[i].lkey = mr->lkey;
+ for (int i = 0; i < cnt; i++) {
+ // Increase refcnt of sample
+ sample_get(smps[i]);
- // Prepare a receive Work Request
- wr[i].wr_id = (uintptr_t) smps[i];
- wr[i].next = &wr[i+1];
- wr[i].sg_list = &sge[i];
- wr[i].num_sge = 1;
+ // Prepare receive Scatter/Gather element
+ sge[i].addr = (uint64_t) &smps[i]->data;
+ sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
+ sge[i].lkey = mr->lkey;
- ib->conn.available_recv_wrs++;
+ // Prepare a receive Work Request
+ wr[i].wr_id = (uintptr_t) smps[i];
+ wr[i].next = &wr[i+1];
+ wr[i].sg_list = &sge[i];
+ wr[i].num_sge = 1;
- if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
- debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
+ ib->conn.available_recv_wrs++;
- wr[i].next = NULL;
- break;
+ if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
+ debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
+
+ wr[i].next = NULL;
+ break;
+ }
}
+
+ // Post list of Work Requests
+ ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
+ if (ret)
+ error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx",
+ node_name(n), ret, bad_wr->wr_id);
+
+ debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
+
}
- // Post list of Work Requests
- ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
- if (ret)
- error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx",
- node_name(n), ret, bad_wr->wr_id);
+ // Poll Completion Queue
+ ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc);
- debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
+ if (ret) {
+ debug(LOG_IB | 10, "Received %i Work Completions", ret);
- }
+ ib->conn.available_recv_wrs -= ret;
- // Poll Completion Queue
- ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc);
+ for (int i = 0; i < ret; i++) {
+ if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
+ debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
- if (ret) {
- debug(LOG_IB | 10, "Received %i Work Completions", ret);
+ ret = 0;
+ }
+ else if (wc[i].status != IBV_WC_SUCCESS) {
+ warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
+ node_name(n), wc[i].status);
+ ret = 0;
+ }
+ else if (wc[i].opcode & IBV_WC_RECV) {
+ smps[i] = (struct sample*)(wc[i].wr_id);
+ smps[i]->length = wc[i].byte_len/sizeof(double);
+ }
+ else
+ ret = 0;
- ib->conn.available_recv_wrs -= ret;
-
- for (int i = 0; i < ret; i++) {
- if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
- debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it.");
-
- ret = 0;
+ //Release sample
+ sample_put((struct sample *) (wc[i].wr_id));
+ debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id));
}
- else if (wc[i].status != IBV_WC_SUCCESS) {
- warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
- node_name(n), wc[i].status);
- ret = 0;
- }
- else if (wc[i].opcode & IBV_WC_RECV) {
- smps[i] = (struct sample*)(wc[i].wr_id);
- smps[i]->length = wc[i].byte_len/sizeof(double);
- }
- else
- ret = 0;
-
- //Release sample
- sample_put((struct sample *) (wc[i].wr_id));
- debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id));
}
}
@@ -823,53 +828,55 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
debug(LOG_IB | 10, "ib_write is called");
- memset(&wr, 0, sizeof(wr));
+ if (n->state == STATE_CONNECTED) {
+ memset(&wr, 0, sizeof(wr));
- //ToDo: Place this into configuration and create checks if settings are valid
- int send_inline = 1;
+ //ToDo: Place this into configuration and create checks if settings are valid
+ int send_inline = 1;
- debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline);
+ debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline);
- // Get Memory Region
- mr = memory_ib_get_mr(smps[0]);
+ // Get Memory Region
+ mr = memory_ib_get_mr(smps[0]);
- for (int i = 0; i < cnt; i++) {
- // Increase refcnt of sample
- sample_get(smps[i]);
+ for (int i = 0; i < cnt; i++) {
+ // Increase refcnt of sample
+ sample_get(smps[i]);
- //Set Scatter/Gather element to data of sample
- sge[i].addr = (uint64_t)&smps[i]->data;
- sge[i].length = smps[i]->length*sizeof(double);
- sge[i].lkey = mr->lkey;
+ //Set Scatter/Gather element to data of sample
+ sge[i].addr = (uint64_t)&smps[i]->data;
+ sge[i].length = smps[i]->length*sizeof(double);
+ sge[i].lkey = mr->lkey;
- // Set Send Work Request
- wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
- wr[i].sg_list = &sge[i];
- wr[i].num_sge = 1;
+ // Set Send Work Request
+ wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
+ wr[i].sg_list = &sge[i];
+ wr[i].num_sge = 1;
- if (i == (cnt-1)) {
- debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
- wr[i].next = NULL;
+ if (i == (cnt-1)) {
+ debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
+ wr[i].next = NULL;
+ }
+ else
+ wr[i].next = &wr[i+1];
+
+ wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
+ wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
+ wr[i].opcode = IBV_WR_SEND_WITH_IMM;
}
- else
- wr[i].next = &wr[i+1];
- wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
- wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
- wr[i].opcode = IBV_WR_SEND_WITH_IMM;
+ //Send linked list of Work Requests
+ ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
+ if (ret) {
+ error("Failed to send message in node %s: %i, bad WR ID: 0x%lx",
+ node_name(n), ret, bad_wr->wr_id);
+
+ return -ret;
+ }
+
+ debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
}
- //Send linked list of Work Requests
- ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
- if (ret) {
- error("Failed to send message in node %s: %i, bad WR ID: 0x%lx",
- node_name(n), ret, bad_wr->wr_id);
-
- return -ret;
- }
-
- debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
-
return cnt;
}