1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

opal: map RecvID / SendID of Asnynchronous process to msg->id field in order to support multiple send / receive blocks

This commit is contained in:
Steffen Vogel 2017-07-22 14:07:43 +02:00
parent 7482107000
commit 7e4051b8cb
7 changed files with 72 additions and 51 deletions

View file

@ -11,21 +11,21 @@
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
#ifndef _CONFIG_H_
#define _CONFIG_H_
#define PROGNAME "VILLASnode-OPAL-UDP"
#define VERSION "0.6"
#define VERSION "0.7"
#define MAX_VALUES 64
@ -38,4 +38,4 @@
#define PROTOCOL VILLAS
#endif
#endif /* _CONFIG_H_ */
#endif /* _CONFIG_H_ */

View file

@ -11,12 +11,12 @@
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
@ -47,7 +47,8 @@
.version = MSG_VERSION, \
.type = MSG_TYPE_DATA, \
.length = len, \
.sequence = seq \
.sequence = seq, \
.id = 0 \
}
/** The timestamp of a message in struct timespec format */
@ -74,10 +75,10 @@ struct msg
#error Invalid byte-order
#endif
uint8_t rsvd2; /**< Reserved bits */
uint8_t id; /**< An id which identifies the source of this sample */
uint16_t length; /**< The number of values in msg::data[]. */
uint32_t sequence; /**< The sequence number is incremented by one for consecutive messages. */
/** A timestamp per message. */
struct {
uint32_t sec; /**< Seconds since 1970-01-01 00:00:00 */
@ -89,4 +90,4 @@ struct msg
float f; /**< Floating point values. */
uint32_t i; /**< Integer values. */
} data[];
} __attribute__((packed));
} __attribute__((packed));

View file

@ -60,7 +60,7 @@ struct socket skt;
static void * SendToIPPort(void *arg)
{
unsigned int ModelState, SendID = 1, Sequence = 0;
unsigned int ModelState, SendID, Sequence = 0;
int nbSend = 0, ret, cnt, len;
/* Data from OPAL-RT model */
@ -119,7 +119,7 @@ static void * SendToIPPort(void *arg)
msg->version = MSG_VERSION;
msg->type = MSG_TYPE_DATA;
msg->rsvd1 = 0;
msg->rsvd2 = 0;
msg->id = SendID;
msg->length = cnt;
msg->sequence = Sequence++;
msg->ts.sec = now.tv_sec;
@ -170,7 +170,7 @@ static void * SendToIPPort(void *arg)
static void * RecvFromIPPort(void *arg)
{
unsigned int ModelState, RecvID = 1;
unsigned int ModelState, RecvID;
int nbRecv = 0, ret, cnt;
/* Data from OPAL-RT model */
@ -195,6 +195,14 @@ static void * RecvFromIPPort(void *arg)
return NULL;
}
/* Get list of RecvIds */
unsigned int RecvIDs[nbRecv];
ret = OpalGetAsyncRecvIDList(RecvIDs, sizeof(RecvIDs));
if (ret != EOK) {
OpalPrint("%s: Failed to get list of RecvIDs\n", PROGNAME);
return NULL;
}
do {
/* Receive message */
ret = socket_recv(&skt, (char *) msg, sizeof(buf), 1.0);
@ -211,7 +219,23 @@ static void * RecvFromIPPort(void *arg)
break;
}
/* Get the number of signals to send back to the model */
#if PROTOCOL == VILLAS
RecvID = msg->id;
#elif PROTOCOL == GTNET_SKT
RecvID = 1;
#else
#error Unknown protocol
#endif
/* Check if this RecvID exists */
for (int i = 0; i < nbRecv; i++) {
if (RecvIDs[i] == RecvID)
goto found;
}
OpalPrint("%s: Received message with non-existent RecvID=%d. Changing to RecvID=%d...\n", PROGNAME, RecvID, RecvIDs[0]);
RecvID = RecvIDs[0];
found: /* Get the number of signals to send back to the model */
OpalGetAsyncRecvIconDataLength(&mdldata_size, RecvID);
cnt = mdldata_size / sizeof(double);
if (cnt > MAX_VALUES) {
@ -308,26 +332,26 @@ int main(int argc, char *argv[])
/* Initialize socket */
ret = socket_init(&skt, IconCtrlStruct);
if (ret != EOK) {
OpalPrint("%s: ERROR: Initialization failed.\n", PROGNAME);
OpalPrint("%s: ERROR: Failed to create socket.\n", PROGNAME);
exit(EXIT_FAILURE);
}
/* Start send/receive threads */
ret = pthread_create(&tid_send, NULL, SendToIPPort, NULL);
if (ret == -1)
if (ret < 0)
OpalPrint("%s: ERROR: Could not create thread (SendToIPPort), errno %d\n", PROGNAME, errno);
ret = pthread_create(&tid_recv, NULL, RecvFromIPPort, NULL);
if (ret == -1)
if (ret < 0)
OpalPrint("%s: ERROR: Could not create thread (RecvFromIPPort), errno %d\n", PROGNAME, errno);
/* Wait for both threads to finish */
ret = pthread_join(tid_send, NULL);
if (ret != 0)
if (ret)
OpalPrint("%s: ERROR: pthread_join (SendToIPPort), errno %d\n", PROGNAME, ret);
ret = pthread_join(tid_recv, NULL);
if (ret != 0)
if (ret)
OpalPrint("%s: ERROR: pthread_join (RecvFromIPPort), errno %d\n", PROGNAME, ret);
/* Close the ip port and shared memories */

View file

@ -10,12 +10,12 @@
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
@ -28,7 +28,7 @@
void msg_ntoh(struct msg *m)
{
msg_hdr_ntoh(m);
for (int i = 0; i < m->length; i++)
m->data[i].i = ntohl(m->data[i].i);
}
@ -63,8 +63,8 @@ int msg_verify(struct msg *m)
return -1;
else if (m->type != MSG_TYPE_DATA)
return -2;
else if ((m->rsvd1 != 0) || (m->rsvd2 != 0))
else if (m->rsvd1 != 0)
return -3;
else
return 0;
}
}

View file

@ -75,7 +75,7 @@ int socket_init(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct)
/* Bind local port and address to socket. */
ret = bind(s->sd, (struct sockaddr *) &s->recv_ad, sizeof(struct sockaddr_in));
if (ret == -1) {
if (ret < 0) {
OpalPrint("%s: ERROR: Could not bind local port to socket\n", PROGNAME);
return EIO;
}
@ -112,13 +112,10 @@ int socket_init(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct)
return EIO;
}
OpalPrint("%s: Added process to multicast group (%s)\n",
PROGNAME, IconCtrlStruct.StringParam[1]);
}
else {
OpalPrint("%s: WARNING: IP address for multicast group is not in multicast range. Ignored\n",
PROGNAME);
OpalPrint("%s: Added process to multicast group (%s)\n", PROGNAME, IconCtrlStruct.StringParam[1]);
}
else
OpalPrint("%s: WARNING: IP address for multicast group is not in multicast range. Ignored\n", PROGNAME);
}
return EOK;
@ -126,20 +123,21 @@ int socket_init(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct)
int socket_close(struct socket *s, Opal_GenAsyncParam_Ctrl IconCtrlStruct)
{
if (s->sd < 0) {
shutdown(s->sd, SHUT_RDWR);
close(s->sd);
}
int ret;
ret = shutdown(s->sd, SHUT_RDWR);
if (ret)
return ret;
ret = close(s->sd);
if (ret)
return ret;
return 0;
}
int socket_send(struct socket *s, char *data, int len)
{
if (s->sd < 0)
return -1;
/* Send the packet */
return sendto(s->sd, data, len, 0, (struct sockaddr *) &s->send_ad, sizeof(s->send_ad));
}
@ -151,9 +149,6 @@ int socket_recv(struct socket *s, char *data, int len, double timeout)
socklen_t client_ad_size = sizeof(client_ad);
fd_set sd_set;
if (s->sd < 0)
return -1;
/* Set the descriptor set for the select() call */
FD_ZERO(&sd_set);
FD_SET(s->sd, &sd_set);

View file

@ -47,7 +47,8 @@
.version = MSG_VERSION, \
.type = MSG_TYPE_DATA, \
.length = len, \
.sequence = seq \
.sequence = seq, \
.id = 0 \
}
/** The timestamp of a message in struct timespec format */
@ -74,7 +75,7 @@ struct msg
#error Invalid byte-order
#endif
uint8_t rsvd2; /**< Reserved bits */
uint8_t id; /**< An id which identifies the source of this sample */
uint16_t length; /**< The number of values in msg::data[]. */
uint32_t sequence; /**< The sequence number is incremented by one for consecutive messages. */

View file

@ -66,7 +66,7 @@ int msg_verify(struct msg *m)
return -1;
else if (m->type != MSG_TYPE_DATA)
return -2;
else if ((m->rsvd1 != 0) || (m->rsvd2 != 0))
else if (m->rsvd1 != 0)
return -3;
else
return 0;
@ -89,7 +89,7 @@ int msg_to_sample(struct msg *msg, struct sample *smp)
smp->ts.received.tv_nsec = -1;
memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length));
return 0;
}
@ -125,7 +125,7 @@ ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf,
msg = (struct msg *) ptr;
smp = smps[++i];
}
return ptr - buf;
}
@ -147,6 +147,6 @@ int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t
msg = (struct msg *) ptr;
smp = smps[++i];
}
return i;
}
}