From 7e4051b8cb85a40d7a430a7d6960135ad9a84019 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 22 Jul 2017 14:07:43 +0200 Subject: [PATCH] opal: map RecvID / SendID of Asnynchronous process to msg->id field in order to support multiple send / receive blocks --- .../opal/models/send_receive/include/config.h | 10 ++--- .../models/send_receive/include/msg_format.h | 13 +++--- clients/opal/models/send_receive/src/main.c | 44 ++++++++++++++----- clients/opal/models/send_receive/src/msg.c | 10 ++--- clients/opal/models/send_receive/src/socket.c | 31 ++++++------- include/villas/msg_format.h | 5 ++- lib/msg.c | 10 ++--- 7 files changed, 72 insertions(+), 51 deletions(-) diff --git a/clients/opal/models/send_receive/include/config.h b/clients/opal/models/send_receive/include/config.h index 6a206a659..9e854f783 100644 --- a/clients/opal/models/send_receive/include/config.h +++ b/clients/opal/models/send_receive/include/config.h @@ -11,21 +11,21 @@ * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. - * + * * You should have received a copy of the GNU General Public License * along with this program. If not, see . *********************************************************************************/ - + #ifndef _CONFIG_H_ #define _CONFIG_H_ #define PROGNAME "VILLASnode-OPAL-UDP" -#define VERSION "0.6" +#define VERSION "0.7" #define MAX_VALUES 64 @@ -38,4 +38,4 @@ #define PROTOCOL VILLAS #endif -#endif /* _CONFIG_H_ */ \ No newline at end of file +#endif /* _CONFIG_H_ */ diff --git a/clients/opal/models/send_receive/include/msg_format.h b/clients/opal/models/send_receive/include/msg_format.h index 0a9ba3202..3b544e09f 100644 --- a/clients/opal/models/send_receive/include/msg_format.h +++ b/clients/opal/models/send_receive/include/msg_format.h @@ -11,12 +11,12 @@ * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. - * + * * You should have received a copy of the GNU General Public License * along with this program. If not, see . *********************************************************************************/ @@ -47,7 +47,8 @@ .version = MSG_VERSION, \ .type = MSG_TYPE_DATA, \ .length = len, \ - .sequence = seq \ + .sequence = seq, \ + .id = 0 \ } /** The timestamp of a message in struct timespec format */ @@ -74,10 +75,10 @@ struct msg #error Invalid byte-order #endif - uint8_t rsvd2; /**< Reserved bits */ + uint8_t id; /**< An id which identifies the source of this sample */ uint16_t length; /**< The number of values in msg::data[]. */ uint32_t sequence; /**< The sequence number is incremented by one for consecutive messages. */ - + /** A timestamp per message. */ struct { uint32_t sec; /**< Seconds since 1970-01-01 00:00:00 */ @@ -89,4 +90,4 @@ struct msg float f; /**< Floating point values. */ uint32_t i; /**< Integer values. */ } data[]; -} __attribute__((packed)); \ No newline at end of file +} __attribute__((packed)); diff --git a/clients/opal/models/send_receive/src/main.c b/clients/opal/models/send_receive/src/main.c index 9f3044cf9..e89138002 100644 --- a/clients/opal/models/send_receive/src/main.c +++ b/clients/opal/models/send_receive/src/main.c @@ -60,7 +60,7 @@ struct socket skt; static void * SendToIPPort(void *arg) { - unsigned int ModelState, SendID = 1, Sequence = 0; + unsigned int ModelState, SendID, Sequence = 0; int nbSend = 0, ret, cnt, len; /* Data from OPAL-RT model */ @@ -119,7 +119,7 @@ static void * SendToIPPort(void *arg) msg->version = MSG_VERSION; msg->type = MSG_TYPE_DATA; msg->rsvd1 = 0; - msg->rsvd2 = 0; + msg->id = SendID; msg->length = cnt; msg->sequence = Sequence++; msg->ts.sec = now.tv_sec; @@ -170,7 +170,7 @@ static void * SendToIPPort(void *arg) static void * RecvFromIPPort(void *arg) { - unsigned int ModelState, RecvID = 1; + unsigned int ModelState, RecvID; int nbRecv = 0, ret, cnt; /* Data from OPAL-RT model */ @@ -195,6 +195,14 @@ static void * RecvFromIPPort(void *arg) return NULL; } + /* Get list of RecvIds */ + unsigned int RecvIDs[nbRecv]; + ret = OpalGetAsyncRecvIDList(RecvIDs, sizeof(RecvIDs)); + if (ret != EOK) { + OpalPrint("%s: Failed to get list of RecvIDs\n", PROGNAME); + return NULL; + } + do { /* Receive message */ ret = socket_recv(&skt, (char *) msg, sizeof(buf), 1.0); @@ -211,7 +219,23 @@ static void * RecvFromIPPort(void *arg) break; } - /* Get the number of signals to send back to the model */ +#if PROTOCOL == VILLAS + RecvID = msg->id; +#elif PROTOCOL == GTNET_SKT + RecvID = 1; +#else + #error Unknown protocol +#endif + /* Check if this RecvID exists */ + for (int i = 0; i < nbRecv; i++) { + if (RecvIDs[i] == RecvID) + goto found; + } + + OpalPrint("%s: Received message with non-existent RecvID=%d. Changing to RecvID=%d...\n", PROGNAME, RecvID, RecvIDs[0]); + RecvID = RecvIDs[0]; + +found: /* Get the number of signals to send back to the model */ OpalGetAsyncRecvIconDataLength(&mdldata_size, RecvID); cnt = mdldata_size / sizeof(double); if (cnt > MAX_VALUES) { @@ -308,26 +332,26 @@ int main(int argc, char *argv[]) /* Initialize socket */ ret = socket_init(&skt, IconCtrlStruct); if (ret != EOK) { - OpalPrint("%s: ERROR: Initialization failed.\n", PROGNAME); + OpalPrint("%s: ERROR: Failed to create socket.\n", PROGNAME); exit(EXIT_FAILURE); } /* Start send/receive threads */ ret = pthread_create(&tid_send, NULL, SendToIPPort, NULL); - if (ret == -1) + if (ret < 0) OpalPrint("%s: ERROR: Could not create thread (SendToIPPort), errno %d\n", PROGNAME, errno); ret = pthread_create(&tid_recv, NULL, RecvFromIPPort, NULL); - if (ret == -1) + if (ret < 0) OpalPrint("%s: ERROR: Could not create thread (RecvFromIPPort), errno %d\n", PROGNAME, errno); /* Wait for both threads to finish */ ret = pthread_join(tid_send, NULL); - if (ret != 0) + if (ret) OpalPrint("%s: ERROR: pthread_join (SendToIPPort), errno %d\n", PROGNAME, ret); - + ret = pthread_join(tid_recv, NULL); - if (ret != 0) + if (ret) OpalPrint("%s: ERROR: pthread_join (RecvFromIPPort), errno %d\n", PROGNAME, ret); /* Close the ip port and shared memories */ diff --git a/clients/opal/models/send_receive/src/msg.c b/clients/opal/models/send_receive/src/msg.c index 4794634f4..65fea1606 100644 --- a/clients/opal/models/send_receive/src/msg.c +++ b/clients/opal/models/send_receive/src/msg.c @@ -10,12 +10,12 @@ * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. - * + * * You should have received a copy of the GNU General Public License * along with this program. If not, see . *********************************************************************************/ @@ -28,7 +28,7 @@ void msg_ntoh(struct msg *m) { msg_hdr_ntoh(m); - + for (int i = 0; i < m->length; i++) m->data[i].i = ntohl(m->data[i].i); } @@ -63,8 +63,8 @@ int msg_verify(struct msg *m) return -1; else if (m->type != MSG_TYPE_DATA) return -2; - else if ((m->rsvd1 != 0) || (m->rsvd2 != 0)) + else if (m->rsvd1 != 0) return -3; else return 0; -} \ No newline at end of file +} diff --git a/clients/opal/models/send_receive/src/socket.c b/clients/opal/models/send_receive/src/socket.c index 9eba835db..d260049cd 100644 --- a/clients/opal/models/send_receive/src/socket.c +++ b/clients/opal/models/send_receive/src/socket.c @@ -75,7 +75,7 @@ int socket_init(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct) /* Bind local port and address to socket. */ ret = bind(s->sd, (struct sockaddr *) &s->recv_ad, sizeof(struct sockaddr_in)); - if (ret == -1) { + if (ret < 0) { OpalPrint("%s: ERROR: Could not bind local port to socket\n", PROGNAME); return EIO; } @@ -112,13 +112,10 @@ int socket_init(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct) return EIO; } - OpalPrint("%s: Added process to multicast group (%s)\n", - PROGNAME, IconCtrlStruct.StringParam[1]); - } - else { - OpalPrint("%s: WARNING: IP address for multicast group is not in multicast range. Ignored\n", - PROGNAME); + OpalPrint("%s: Added process to multicast group (%s)\n", PROGNAME, IconCtrlStruct.StringParam[1]); } + else + OpalPrint("%s: WARNING: IP address for multicast group is not in multicast range. Ignored\n", PROGNAME); } return EOK; @@ -126,20 +123,21 @@ int socket_init(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct) int socket_close(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct) { - if (s->sd < 0) { - shutdown(s->sd, SHUT_RDWR); - close(s->sd); - } + int ret; + + ret = shutdown(s->sd, SHUT_RDWR); + if (ret) + return ret; + + ret = close(s->sd); + if (ret) + return ret; return 0; } int socket_send(struct socket *s, char *data, int len) { - if (s->sd < 0) - return -1; - - /* Send the packet */ return sendto(s->sd, data, len, 0, (struct sockaddr *) &s->send_ad, sizeof(s->send_ad)); } @@ -151,9 +149,6 @@ int socket_recv(struct socket *s, char *data, int len, double timeout) socklen_t client_ad_size = sizeof(client_ad); fd_set sd_set; - if (s->sd < 0) - return -1; - /* Set the descriptor set for the select() call */ FD_ZERO(&sd_set); FD_SET(s->sd, &sd_set); diff --git a/include/villas/msg_format.h b/include/villas/msg_format.h index cbeadfbe1..3b544e09f 100644 --- a/include/villas/msg_format.h +++ b/include/villas/msg_format.h @@ -47,7 +47,8 @@ .version = MSG_VERSION, \ .type = MSG_TYPE_DATA, \ .length = len, \ - .sequence = seq \ + .sequence = seq, \ + .id = 0 \ } /** The timestamp of a message in struct timespec format */ @@ -74,7 +75,7 @@ struct msg #error Invalid byte-order #endif - uint8_t rsvd2; /**< Reserved bits */ + uint8_t id; /**< An id which identifies the source of this sample */ uint16_t length; /**< The number of values in msg::data[]. */ uint32_t sequence; /**< The sequence number is incremented by one for consecutive messages. */ diff --git a/lib/msg.c b/lib/msg.c index 183b42669..08622f8ce 100644 --- a/lib/msg.c +++ b/lib/msg.c @@ -66,7 +66,7 @@ int msg_verify(struct msg *m) return -1; else if (m->type != MSG_TYPE_DATA) return -2; - else if ((m->rsvd1 != 0) || (m->rsvd2 != 0)) + else if (m->rsvd1 != 0) return -3; else return 0; @@ -89,7 +89,7 @@ int msg_to_sample(struct msg *msg, struct sample *smp) smp->ts.received.tv_nsec = -1; memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length)); - + return 0; } @@ -125,7 +125,7 @@ ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, msg = (struct msg *) ptr; smp = smps[++i]; } - + return ptr - buf; } @@ -147,6 +147,6 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t msg = (struct msg *) ptr; smp = smps[++i]; } - + return i; -} \ No newline at end of file +}