2015-12-04 01:47:49 +01:00
/** Node type: Websockets (libwebsockets)
*
* @ author Steffen Vogel < stvogel @ eonerc . rwth - aachen . de >
2019-01-13 00:42:39 +01:00
* @ copyright 2014 - 2019 , Institute for Automation of Complex Power Systems , EONERC
2017-04-27 12:56:43 +02:00
* @ license GNU General Public License ( version 3 )
*
* VILLASnode
*
* This program is free software : you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation , either version 3 of the License , or
* any later version .
2017-05-05 19:24:16 +00:00
*
2017-04-27 12:56:43 +02:00
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
2017-05-05 19:24:16 +00:00
*
2017-04-27 12:56:43 +02:00
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
2015-12-04 01:47:49 +01:00
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2019-06-23 16:57:00 +02:00
# include <cstdio>
# include <cstdlib>
2015-12-02 13:55:58 +01:00
# include <unistd.h>
2019-06-23 16:57:00 +02:00
# include <cstring>
2015-12-02 13:55:58 +01:00
# include <signal.h>
2015-12-13 02:01:57 +01:00
2018-08-02 10:28:08 +02:00
# include <libwebsockets.h>
2017-12-09 02:19:28 +08:00
# include <villas/timing.h>
2019-04-23 13:15:00 +02:00
# include <villas/utils.hpp>
2017-12-09 02:19:28 +08:00
# include <villas/buffer.h>
# include <villas/plugin.h>
2019-04-23 00:12:31 +02:00
# include <villas/nodes/websocket.hpp>
2018-05-12 13:56:12 +02:00
# include <villas/format_type.h>
# include <villas/formats/msg_format.h>
2019-04-23 13:14:47 +02:00
# include <villas/super_node.hpp>
2017-03-12 17:13:37 -03:00
2019-06-04 16:55:38 +02:00
using namespace villas : : utils ;
2018-05-24 09:10:56 +02:00
# define DEFAULT_WEBSOCKET_BUFFER_SIZE (1 << 12)
2015-12-13 02:01:57 +01:00
/* Private static storage */
2019-06-23 16:13:23 +02:00
static struct vlist connections = { . state = State : : DESTROYED } ; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
2018-10-21 10:28:07 +02:00
2019-04-23 13:14:47 +02:00
static villas : : node : : Web * web ;
2016-11-07 22:19:30 -05:00
2016-07-11 18:18:20 +02:00
/* Forward declarations */
2017-03-12 17:13:37 -03:00
static struct plugin p ;
2015-12-04 01:47:49 +01:00
2017-03-06 12:28:06 -04:00
static char * websocket_connection_name ( struct websocket_connection * c )
2016-11-08 00:24:57 -05:00
{
if ( ! c - > _name ) {
2017-08-27 17:05:34 +02:00
if ( c - > wsi ) {
2017-08-27 18:44:03 +02:00
char name [ 128 ] ;
char ip [ 128 ] ;
2017-09-04 14:28:55 +02:00
2017-08-27 18:44:03 +02:00
lws_get_peer_addresses ( c - > wsi , lws_get_socket_fd ( c - > wsi ) , name , sizeof ( name ) , ip , sizeof ( ip ) ) ;
2017-09-04 14:28:55 +02:00
2017-08-27 18:44:03 +02:00
strcatf ( & c - > _name , " remote.ip=%s, remote.name=%s " , ip , name ) ;
2017-08-27 17:05:34 +02:00
}
2019-06-23 16:13:23 +02:00
else if ( c - > mode = = websocket_connection : : Mode : : CLIENT & & c - > destination ! = nullptr )
2018-06-12 18:36:59 +02:00
strcatf ( & c - > _name , " dest=%s:%d " , c - > destination - > info . address , c - > destination - > info . port ) ;
2017-05-05 19:24:16 +00:00
2016-11-08 00:24:57 -05:00
if ( c - > node )
2017-08-27 18:44:03 +02:00
strcatf ( & c - > _name , " , node=%s " , node_name ( c - > node ) ) ;
2017-05-05 19:24:16 +00:00
2019-06-23 16:13:23 +02:00
strcatf ( & c - > _name , " , mode=%s " , c - > mode = = websocket_connection : : Mode : : CLIENT ? " client " : " server " ) ;
2016-11-08 00:24:57 -05:00
}
2017-05-05 19:24:16 +00:00
2016-11-08 00:24:57 -05:00
return c - > _name ;
}
2017-03-29 06:01:50 +02:00
static void websocket_destination_destroy ( struct websocket_destination * d )
2016-11-08 00:24:57 -05:00
{
free ( d - > uri ) ;
2017-05-05 19:24:16 +00:00
2017-04-24 19:28:45 +02:00
free ( ( char * ) d - > info . path ) ;
free ( ( char * ) d - > info . address ) ;
2016-11-08 00:24:57 -05:00
}
2018-06-12 18:36:59 +02:00
static int websocket_connection_init ( struct websocket_connection * c )
{
int ret ;
2019-04-22 23:45:38 +02:00
c - > _name = nullptr ;
2018-06-12 18:36:59 +02:00
2018-08-02 10:43:49 +02:00
ret = queue_init ( & c - > queue , DEFAULT_QUEUE_LENGTH , & memory_hugepage ) ;
2018-06-12 18:36:59 +02:00
if ( ret )
return ret ;
2019-06-23 16:13:23 +02:00
ret = io_init ( & c - > io , c - > format , & c - > node - > in . signals , ( int ) SampleFlags : : HAS_ALL & ~ ( int ) SampleFlags : : HAS_OFFSET ) ;
2018-08-20 18:27:45 +02:00
if ( ret )
return ret ;
ret = io_check ( & c - > io ) ;
2018-06-12 18:36:59 +02:00
if ( ret )
return ret ;
ret = buffer_init ( & c - > buffers . recv , DEFAULT_WEBSOCKET_BUFFER_SIZE ) ;
if ( ret )
return ret ;
ret = buffer_init ( & c - > buffers . send , DEFAULT_WEBSOCKET_BUFFER_SIZE ) ;
if ( ret )
return ret ;
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : INITIALIZED ;
2018-06-12 18:36:59 +02:00
return 0 ;
}
static int websocket_connection_destroy ( struct websocket_connection * c )
{
int ret ;
2019-06-23 16:13:23 +02:00
assert ( c - > state ! = websocket_connection : : State : : DESTROYED ) ;
2018-06-12 18:36:59 +02:00
if ( c - > _name )
free ( c - > _name ) ;
/* Return all samples to pool */
int avail ;
struct sample * smp ;
while ( ( avail = queue_pull ( & c - > queue , ( void * * ) & smp ) ) )
2018-08-07 09:22:26 +02:00
sample_decref ( smp ) ;
2018-06-12 18:36:59 +02:00
ret = queue_destroy ( & c - > queue ) ;
if ( ret )
return ret ;
ret = io_destroy ( & c - > io ) ;
if ( ret )
return ret ;
ret = buffer_destroy ( & c - > buffers . recv ) ;
if ( ret )
return ret ;
ret = buffer_destroy ( & c - > buffers . send ) ;
if ( ret )
return ret ;
2019-04-22 23:45:38 +02:00
c - > wsi = nullptr ;
c - > _name = nullptr ;
2018-06-12 18:36:59 +02:00
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : DESTROYED ;
2018-06-12 18:36:59 +02:00
return 0 ;
}
2017-03-06 12:28:06 -04:00
static int websocket_connection_write ( struct websocket_connection * c , struct sample * smps [ ] , unsigned cnt )
2016-11-08 00:24:57 -05:00
{
2017-08-27 17:05:34 +02:00
int pushed ;
2017-09-04 14:28:55 +02:00
2019-06-23 16:13:23 +02:00
if ( c - > state ! = websocket_connection : : State : : INITIALIZED )
2018-06-12 18:36:59 +02:00
return - 1 ;
2017-08-27 17:05:34 +02:00
pushed = queue_push_many ( & c - > queue , ( void * * ) smps , cnt ) ;
2019-04-22 23:43:46 +02:00
if ( pushed < ( int ) cnt )
2018-10-21 21:36:08 +01:00
warning ( " Queue overrun in WebSocket connection: %s " , websocket_connection_name ( c ) ) ;
2017-09-04 14:28:55 +02:00
2018-08-07 09:22:26 +02:00
sample_incref_many ( smps , pushed ) ;
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
debug ( LOG_WEBSOCKET | 10 , " Enqueued %u samples to %s " , pushed , websocket_connection_name ( c ) ) ;
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
/* Client connections which are currently conecting don't have an associate c->wsi yet */
2019-01-12 19:05:09 +01:00
if ( c - > wsi )
2019-04-23 13:14:47 +02:00
web - > callbackOnWritable ( c - > wsi ) ;
2017-05-05 19:24:16 +00:00
2016-11-08 00:24:57 -05:00
return 0 ;
}
2017-04-24 19:28:45 +02:00
static void websocket_connection_close ( struct websocket_connection * c , struct lws * wsi , enum lws_close_status status , const char * reason )
{
lws_close_reason ( wsi , status , ( unsigned char * ) reason , strlen ( reason ) ) ;
2017-08-27 17:05:34 +02:00
debug ( LOG_WEBSOCKET | 10 , " Closing WebSocket connection with %s: status=%u, reason=%s " , websocket_connection_name ( c ) , status , reason ) ;
2017-04-24 19:28:45 +02:00
}
2017-03-06 12:28:06 -04:00
int websocket_protocol_cb ( struct lws * wsi , enum lws_callback_reasons reason , void * user , void * in , size_t len )
2015-12-04 01:47:49 +01:00
{
2017-08-27 17:05:34 +02:00
int ret , recvd , pulled , cnt = 128 ;
2019-04-22 23:43:46 +02:00
struct websocket_connection * c = ( struct websocket_connection * ) user ;
2017-09-04 14:28:55 +02:00
2015-12-02 13:55:58 +01:00
switch ( reason ) {
2016-07-11 18:18:20 +02:00
case LWS_CALLBACK_CLIENT_ESTABLISHED :
2017-04-03 09:01:14 +02:00
case LWS_CALLBACK_ESTABLISHED :
2017-08-27 17:05:34 +02:00
c - > wsi = wsi ;
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : ESTABLISHED ;
2018-06-12 18:36:59 +02:00
2019-01-12 19:05:32 +01:00
info ( " Established WebSocket connection: %s " , websocket_connection_name ( c ) ) ;
2018-06-12 20:02:43 +02:00
2018-06-12 18:36:59 +02:00
if ( reason = = LWS_CALLBACK_CLIENT_ESTABLISHED )
2019-06-23 16:13:23 +02:00
c - > mode = websocket_connection : : Mode : : CLIENT ;
2018-06-12 18:36:59 +02:00
else {
2019-06-23 16:13:23 +02:00
c - > mode = websocket_connection : : Mode : : SERVER ;
2018-06-12 18:36:59 +02:00
/* We use the URI to associate this connection to a node
* and choose a protocol .
*
* Example : ws : //example.com/node_1.json
* Will select the node with the name ' node_1 '
* and format ' json ' .
*/
/* Get path of incoming request */
2019-02-18 01:09:33 +01:00
char * node , * format , * lasts ;
2018-06-12 18:36:59 +02:00
char uri [ 64 ] ;
lws_hdr_copy ( wsi , uri , sizeof ( uri ) , WSI_TOKEN_GET_URI ) ; /* The path component of the*/
if ( strlen ( uri ) < = 0 ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_PROTOCOL_ERR , " Invalid URL " ) ;
2018-10-21 21:36:08 +01:00
warning ( " Failed to get request URI " ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2017-09-04 14:28:55 +02:00
2019-02-18 01:09:33 +01:00
node = strtok_r ( uri , " /. " , & lasts ) ;
2018-06-12 18:36:59 +02:00
if ( ! node ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown node " ) ;
2018-10-21 21:36:08 +01:00
warning ( " Failed to tokenize request URI " ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2017-09-04 14:28:55 +02:00
2019-04-22 23:45:38 +02:00
format = strtok_r ( nullptr , " " , & lasts ) ;
2018-06-12 18:36:59 +02:00
if ( ! format )
2019-04-22 23:43:46 +02:00
format = ( char * ) " villas.web " ;
2017-05-05 19:24:16 +00:00
2018-06-12 18:36:59 +02:00
/* Search for node whose name matches the URI. */
2019-04-22 23:43:46 +02:00
c - > node = ( struct node * ) vlist_lookup ( & p . node . instances , node ) ;
2018-06-12 18:36:59 +02:00
if ( ! c - > node ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown node " ) ;
2018-10-21 21:36:08 +01:00
warning ( " Failed to find node: node=%s " , node ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2018-03-28 14:29:55 +02:00
2018-06-12 18:36:59 +02:00
c - > format = format_type_lookup ( format ) ;
if ( ! c - > format ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown format " ) ;
2018-10-21 21:36:08 +01:00
warning ( " Failed to find format: format=%s " , format ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2017-08-14 14:42:07 +02:00
}
2017-09-04 14:28:55 +02:00
2018-06-12 18:36:59 +02:00
ret = websocket_connection_init ( c ) ;
if ( ret ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Internal error " ) ;
2018-10-21 21:36:08 +01:00
warning ( " Failed to intialize websocket connection: reason=%d " , ret ) ;
2016-11-07 22:19:30 -05:00
return - 1 ;
2018-06-12 18:36:59 +02:00
}
2015-12-04 01:47:49 +01:00
2019-01-07 10:28:55 +01:00
vlist_push ( & connections , c ) ;
2017-09-04 14:28:55 +02:00
2018-06-12 20:02:43 +02:00
debug ( LOG_WEBSOCKET | 10 , " Initialized WebSocket connection: %s " , websocket_connection_name ( c ) ) ;
2017-08-14 14:42:07 +02:00
break ;
2017-04-24 19:28:45 +02:00
2017-08-27 17:05:34 +02:00
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR :
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : ERROR ;
2017-05-05 19:24:16 +00:00
2018-10-21 21:36:08 +01:00
warning ( " Failed to establish WebSocket connection: %s, reason=%s " , websocket_connection_name ( c ) , in ? ( char * ) in : " unkown " ) ;
2016-02-04 18:25:13 +01:00
2017-08-27 17:05:34 +02:00
return - 1 ;
2017-09-04 14:28:55 +02:00
2015-12-04 01:47:49 +01:00
case LWS_CALLBACK_CLOSED :
2017-08-27 17:05:34 +02:00
debug ( LOG_WEBSOCKET | 10 , " Closed WebSocket connection: %s " , websocket_connection_name ( c ) ) ;
2017-09-04 14:28:55 +02:00
2019-06-23 16:13:23 +02:00
if ( c - > state ! = websocket_connection : : State : : SHUTDOWN ) {
2017-08-27 17:05:34 +02:00
/** @todo Attempt reconnect here */
2016-02-04 16:30:36 +01:00
}
2019-06-23 16:13:23 +02:00
if ( connections . state = = State : : INITIALIZED )
2019-02-24 09:23:31 +01:00
vlist_remove_all ( & connections , c ) ;
2017-05-05 19:24:16 +00:00
2019-06-23 16:13:23 +02:00
if ( c - > state = = websocket_connection : : State : : INITIALIZED )
2018-06-12 20:02:43 +02:00
websocket_connection_destroy ( c ) ;
2017-04-24 19:28:45 +02:00
2019-06-23 16:13:23 +02:00
if ( c - > mode = = websocket_connection : : Mode : : CLIENT )
2017-04-24 19:28:45 +02:00
free ( c ) ;
2017-08-27 17:05:34 +02:00
break ;
2017-05-05 19:24:16 +00:00
2016-07-11 18:18:20 +02:00
case LWS_CALLBACK_CLIENT_WRITEABLE :
2018-08-20 18:27:45 +02:00
case LWS_CALLBACK_SERVER_WRITEABLE : {
2019-04-22 23:43:46 +02:00
struct sample * * smps = ( struct sample * * ) alloca ( cnt * sizeof ( struct sample * ) ) ;
2017-07-13 22:39:38 +02:00
2017-08-27 17:05:34 +02:00
pulled = queue_pull_many ( & c - > queue , ( void * * ) smps , cnt ) ;
if ( pulled > 0 ) {
2018-05-23 00:17:41 +02:00
size_t wbytes ;
2018-05-12 15:25:29 +02:00
io_sprint ( & c - > io , c - > buffers . send . buf + LWS_PRE , c - > buffers . send . size - LWS_PRE , & wbytes , smps , pulled ) ;
2017-09-04 14:28:55 +02:00
2019-06-23 16:13:23 +02:00
ret = lws_write ( wsi , ( unsigned char * ) c - > buffers . send . buf + LWS_PRE , wbytes , c - > io . flags & ( int ) IOFlags : : HAS_BINARY_PAYLOAD ? LWS_WRITE_BINARY : LWS_WRITE_TEXT ) ;
2017-05-05 19:24:16 +00:00
2018-08-07 09:22:26 +02:00
sample_decref_many ( smps , pulled ) ;
2017-05-05 19:24:16 +00:00
2019-01-14 10:00:09 +01:00
if ( ret < 0 )
return ret ;
2017-08-27 18:44:03 +02:00
debug ( LOG_WEBSOCKET | 10 , " Send %d samples to connection: %s, bytes=%d " , pulled , websocket_connection_name ( c ) , ret ) ;
2017-07-13 22:39:38 +02:00
}
2017-09-04 14:28:55 +02:00
2017-08-27 17:59:24 +02:00
if ( queue_available ( & c - > queue ) > 0 )
lws_callback_on_writable ( wsi ) ;
2019-06-23 16:13:23 +02:00
else if ( c - > state = = websocket_connection : : State : : SHUTDOWN ) {
2018-08-20 18:27:45 +02:00
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_GOINGAWAY , " Node stopped " ) ;
return - 1 ;
}
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
break ;
2018-08-20 18:27:45 +02:00
}
2015-12-02 13:55:58 +01:00
2016-07-11 18:18:20 +02:00
case LWS_CALLBACK_CLIENT_RECEIVE :
2017-09-04 14:28:55 +02:00
case LWS_CALLBACK_RECEIVE :
2017-08-27 17:05:34 +02:00
if ( lws_is_first_fragment ( wsi ) )
buffer_clear ( & c - > buffers . recv ) ;
2017-09-04 14:28:55 +02:00
2019-04-22 23:43:46 +02:00
ret = buffer_append ( & c - > buffers . recv , ( char * ) in , len ) ;
2017-08-27 17:05:34 +02:00
if ( ret ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_UNACCEPTABLE_OPCODE , " Failed to process data " ) ;
2015-12-13 20:24:56 +01:00
return - 1 ;
2017-04-24 19:28:45 +02:00
}
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
if ( lws_is_final_fragment ( wsi ) ) {
struct timespec ts_recv = time_now ( ) ;
2018-02-06 23:30:37 +01:00
struct node * n = c - > node ;
2018-05-07 18:48:34 +02:00
int avail , enqueued ;
2018-02-06 23:30:37 +01:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2019-04-22 23:43:46 +02:00
struct sample * * smps = ( struct sample * * ) alloca ( cnt * sizeof ( struct sample * ) ) ;
2018-05-26 01:12:00 +02:00
if ( ! smps ) {
2018-10-21 21:36:08 +01:00
warning ( " Failed to allocate memory for connection: %s " , websocket_connection_name ( c ) ) ;
2018-05-26 01:12:00 +02:00
break ;
}
2017-09-04 14:28:55 +02:00
2018-05-07 18:48:34 +02:00
avail = sample_alloc_many ( & w - > pool , smps , cnt ) ;
if ( avail < cnt )
2018-10-21 21:36:08 +01:00
warning ( " Pool underrun for connection: %s " , websocket_connection_name ( c ) ) ;
2017-05-05 19:24:16 +00:00
2019-04-22 23:45:38 +02:00
recvd = io_sscan ( & c - > io , c - > buffers . recv . buf , c - > buffers . recv . len , nullptr , smps , avail ) ;
2017-08-27 17:05:34 +02:00
if ( recvd < 0 ) {
2018-10-21 21:36:08 +01:00
warning ( " Failed to parse sample data received on connection: %s " , websocket_connection_name ( c ) ) ;
2017-08-27 17:05:34 +02:00
break ;
2017-04-24 19:28:45 +02:00
}
2017-05-05 19:24:16 +00:00
2018-05-26 01:15:34 +02:00
debug ( LOG_WEBSOCKET | 10 , " Received %d samples from connection: %s " , recvd , websocket_connection_name ( c ) ) ;
2017-05-05 19:24:16 +00:00
2017-08-27 17:05:34 +02:00
/* Set receive timestamp */
2017-09-04 14:28:55 +02:00
for ( int i = 0 ; i < recvd ; i + + ) {
2017-08-27 17:05:34 +02:00
smps [ i ] - > ts . received = ts_recv ;
2019-06-23 16:13:23 +02:00
smps [ i ] - > flags | = ( int ) SampleFlags : : HAS_TS_RECEIVED ;
2017-09-04 14:28:55 +02:00
}
2017-04-24 19:28:45 +02:00
2018-05-07 18:48:34 +02:00
enqueued = queue_signalled_push_many ( & w - > queue , ( void * * ) smps , recvd ) ;
if ( enqueued < recvd )
2018-10-21 21:36:08 +01:00
warning ( " Queue overrun in connection: %s " , websocket_connection_name ( c ) ) ;
2017-09-04 14:28:55 +02:00
2018-05-07 18:48:34 +02:00
/* Release unused samples back to pool */
if ( enqueued < avail )
2018-08-07 09:22:26 +02:00
sample_decref_many ( & smps [ enqueued ] , avail - enqueued ) ;
2018-05-07 18:48:34 +02:00
2018-05-26 01:13:22 +02:00
buffer_clear ( & c - > buffers . recv ) ;
2019-06-23 16:13:23 +02:00
if ( c - > state = = websocket_connection : : State : : SHUTDOWN ) {
2017-08-27 17:05:34 +02:00
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_GOINGAWAY , " Node stopped " ) ;
return - 1 ;
2016-11-07 22:19:30 -05:00
}
2016-01-14 23:17:39 +01:00
}
2017-05-05 19:24:16 +00:00
2017-08-27 17:05:34 +02:00
break ;
2016-07-11 18:18:20 +02:00
2015-12-02 13:55:58 +01:00
default :
2017-08-14 14:42:07 +02:00
break ;
2015-12-02 13:55:58 +01:00
}
2017-09-04 14:28:55 +02:00
2017-08-14 14:42:07 +02:00
return 0 ;
2015-12-02 13:55:58 +01:00
}
2019-04-23 13:14:47 +02:00
int websocket_type_start ( villas : : node : : SuperNode * sn )
2017-04-24 19:28:45 +02:00
{
2019-01-07 10:28:55 +01:00
vlist_init ( & connections ) ;
2017-05-05 19:24:16 +00:00
2019-04-23 13:14:47 +02:00
web = sn - > getWeb ( ) ;
2019-06-23 16:13:23 +02:00
if ( web - > getState ( ) ! = State : : STARTED )
2018-12-02 02:56:52 +01:00
return - 1 ;
2017-04-24 19:28:45 +02:00
return 0 ;
}
2017-03-11 23:30:24 -03:00
int websocket_start ( struct node * n )
2015-12-02 13:55:58 +01:00
{
2016-07-11 18:18:20 +02:00
int ret ;
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2017-05-05 19:24:16 +00:00
2018-08-02 10:45:15 +02:00
ret = pool_init ( & w - > pool , DEFAULT_WEBSOCKET_QUEUE_LENGTH , SAMPLE_LENGTH ( DEFAULT_WEBSOCKET_SAMPLE_LENGTH ) , & memory_hugepage ) ;
2016-07-11 18:18:20 +02:00
if ( ret )
return ret ;
2017-05-05 19:24:16 +00:00
2019-06-23 16:13:23 +02:00
ret = queue_signalled_init ( & w - > queue , DEFAULT_WEBSOCKET_QUEUE_LENGTH , & memory_hugepage ) ;
2017-04-02 13:02:49 +02:00
if ( ret )
return ret ;
2016-11-07 22:19:30 -05:00
2019-04-22 23:43:46 +02:00
for ( size_t i = 0 ; i < vlist_length ( & w - > destinations ) ; i + + ) {
2018-08-20 18:27:45 +02:00
const char * format ;
2019-01-07 10:28:55 +01:00
struct websocket_destination * d = ( struct websocket_destination * ) vlist_at ( & w - > destinations , i ) ;
2017-10-18 15:39:53 +02:00
struct websocket_connection * c = ( struct websocket_connection * ) alloc ( sizeof ( struct websocket_connection ) ) ;
2017-05-05 19:24:16 +00:00
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : CONNECTING ;
2018-06-12 18:36:59 +02:00
2018-08-20 18:27:45 +02:00
format = strchr ( d - > info . path , ' . ' ) ;
2019-01-12 19:06:23 +01:00
if ( format )
2019-03-26 07:09:55 +01:00
format = format + 1 ; /* Removes "." */
2019-01-12 19:06:23 +01:00
else
2018-08-20 18:27:45 +02:00
format = " villas.web " ;
2019-01-12 19:06:23 +01:00
c - > format = format_type_lookup ( format ) ;
2018-08-20 18:27:45 +02:00
if ( ! c - > format )
return - 1 ;
2017-04-24 19:28:45 +02:00
c - > node = n ;
2017-08-14 14:42:07 +02:00
c - > destination = d ;
2017-05-05 19:24:16 +00:00
2019-04-23 13:14:47 +02:00
d - > info . context = web - > getContext ( ) ;
d - > info . vhost = web - > getVHost ( ) ;
2017-04-24 19:28:45 +02:00
d - > info . userdata = c ;
2017-09-04 14:28:55 +02:00
2017-04-24 19:28:45 +02:00
lws_client_connect_via_info ( & d - > info ) ;
}
2017-03-16 22:42:58 -03:00
2015-12-02 13:55:58 +01:00
return 0 ;
}
2017-03-11 23:30:24 -03:00
int websocket_stop ( struct node * n )
2016-02-04 17:13:28 +01:00
{
2019-03-31 12:52:07 +02:00
int ret , open_connections = 0 ; ;
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2017-09-04 14:28:55 +02:00
2019-01-07 10:28:55 +01:00
for ( size_t i = 0 ; i < vlist_length ( & connections ) ; i + + ) {
struct websocket_connection * c = ( struct websocket_connection * ) vlist_at ( & connections , i ) ;
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
if ( c - > node ! = n )
continue ;
2017-05-05 19:24:16 +00:00
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : SHUTDOWN ;
2017-03-29 06:01:50 +02:00
2016-11-07 22:19:30 -05:00
lws_callback_on_writable ( c - > wsi ) ;
}
2017-04-24 19:28:45 +02:00
2019-03-31 12:52:07 +02:00
/* Count open connections belonging to this node */
2019-04-22 23:43:46 +02:00
for ( size_t i = 0 ; i < vlist_length ( & connections ) ; i + + ) {
2019-03-31 12:52:07 +02:00
struct websocket_connection * c = ( struct websocket_connection * ) vlist_at ( & connections , i ) ;
2018-08-20 18:27:45 +02:00
2019-03-31 12:52:07 +02:00
if ( c - > node = = n )
open_connections + + ;
}
2018-08-20 18:27:45 +02:00
2019-03-31 12:52:07 +02:00
if ( open_connections > 0 ) {
info ( " Waiting for shutdown of %u connections... " , open_connections ) ;
2018-08-20 18:27:45 +02:00
sleep ( 1 ) ;
}
2019-03-31 12:52:07 +02:00
ret = queue_signalled_close ( & w - > queue ) ;
if ( ret )
return ret ;
2017-04-07 12:25:17 +02:00
ret = queue_signalled_destroy ( & w - > queue ) ;
2017-04-02 13:02:49 +02:00
if ( ret )
return ret ;
2017-05-05 19:24:16 +00:00
2017-04-24 19:28:45 +02:00
ret = pool_destroy ( & w - > pool ) ;
if ( ret )
return ret ;
2016-02-04 17:13:28 +01:00
return 0 ;
}
int websocket_destroy ( struct node * n )
2015-12-02 13:55:58 +01:00
{
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2018-05-26 01:15:23 +02:00
int ret ;
2017-03-16 22:42:58 -03:00
2019-01-07 10:28:55 +01:00
ret = vlist_destroy ( & w - > destinations , ( dtor_cb_t ) websocket_destination_destroy , true ) ;
2018-05-26 01:15:23 +02:00
if ( ret )
return ret ;
2015-12-04 01:47:49 +01:00
2015-12-02 13:55:58 +01:00
return 0 ;
}
2018-07-11 18:14:29 +02:00
int websocket_read ( struct node * n , struct sample * smps [ ] , unsigned cnt , unsigned * release )
2015-12-02 13:55:58 +01:00
{
2017-04-24 19:28:45 +02:00
int avail ;
2017-03-16 22:42:58 -03:00
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2018-07-11 18:14:29 +02:00
struct sample * cpys [ cnt ] ;
2017-03-16 22:42:58 -03:00
2018-07-11 18:14:29 +02:00
avail = queue_signalled_pull_many ( & w - > queue , ( void * * ) cpys , cnt ) ;
2017-08-27 17:05:34 +02:00
if ( avail < 0 )
return avail ;
2017-04-24 19:28:45 +02:00
2018-03-28 14:29:55 +02:00
sample_copy_many ( smps , cpys , avail ) ;
2018-08-07 09:22:26 +02:00
sample_decref_many ( cpys , avail ) ;
2016-06-08 22:39:17 +02:00
2017-04-24 19:28:45 +02:00
return avail ;
2015-12-02 13:55:58 +01:00
}
2018-07-11 18:14:29 +02:00
int websocket_write ( struct node * n , struct sample * smps [ ] , unsigned cnt , unsigned * release )
2015-12-02 13:55:58 +01:00
{
2017-04-24 19:28:45 +02:00
int avail ;
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2018-07-11 18:14:29 +02:00
struct sample * cpys [ cnt ] ;
2017-05-05 19:24:16 +00:00
2017-04-24 19:28:45 +02:00
/* Make copies of all samples */
2018-07-11 18:14:29 +02:00
avail = sample_alloc_many ( & w - > pool , cpys , cnt ) ;
2019-04-22 23:43:46 +02:00
if ( avail < ( int ) cnt )
2018-10-21 21:36:08 +01:00
warning ( " Pool underrun for node %s: avail=%u " , node_name ( n ) , avail ) ;
2017-04-24 19:28:45 +02:00
2018-03-28 14:29:55 +02:00
sample_copy_many ( cpys , smps , avail ) ;
2017-05-05 19:24:16 +00:00
2019-01-07 10:28:55 +01:00
for ( size_t i = 0 ; i < vlist_length ( & connections ) ; i + + ) {
struct websocket_connection * c = ( struct websocket_connection * ) vlist_at ( & connections , i ) ;
2017-09-04 14:28:55 +02:00
2018-03-28 14:29:55 +02:00
if ( c - > node = = n )
2018-07-11 18:14:29 +02:00
websocket_connection_write ( c , cpys , cnt ) ;
2017-03-25 21:23:31 +01:00
}
2017-05-05 19:24:16 +00:00
2018-08-07 09:22:26 +02:00
sample_decref_many ( cpys , avail ) ;
2016-06-08 22:39:17 +02:00
2018-07-11 18:14:29 +02:00
return cnt ;
2015-12-02 13:55:58 +01:00
}
2016-07-11 18:18:20 +02:00
2017-08-03 00:19:27 +02:00
int websocket_parse ( struct node * n , json_t * cfg )
2016-11-07 22:19:30 -05:00
{
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2016-11-08 00:24:57 -05:00
int ret ;
2017-03-16 22:42:58 -03:00
2018-08-07 18:40:32 +02:00
size_t i ;
2019-04-22 23:45:38 +02:00
json_t * json_dests = nullptr ;
2017-10-16 08:08:35 +02:00
json_t * json_dest ;
2017-08-03 00:19:27 +02:00
json_error_t err ;
2019-01-07 10:28:55 +01:00
vlist_init ( & w - > destinations ) ;
2017-03-16 22:42:58 -03:00
2017-10-16 08:08:35 +02:00
ret = json_unpack_ex ( cfg , & err , 0 , " { s?: o } " , " destinations " , & json_dests ) ;
2017-08-03 00:19:27 +02:00
if ( ret )
jerror ( & err , " Failed to parse configuration of node %s " , node_name ( n ) ) ;
2017-10-16 08:08:35 +02:00
if ( json_dests ) {
if ( ! json_is_array ( json_dests ) )
2017-08-03 00:19:27 +02:00
error ( " The 'destinations' setting of node %s must be an array of URLs " , node_name ( n ) ) ;
2017-05-05 19:24:16 +00:00
2018-08-07 18:40:32 +02:00
json_array_foreach ( json_dests , i , json_dest ) {
2017-03-11 23:50:46 -03:00
const char * uri , * prot , * ads , * path ;
2017-05-05 19:24:16 +00:00
2017-10-16 08:08:35 +02:00
uri = json_string_value ( json_dest ) ;
2017-03-11 23:50:46 -03:00
if ( ! uri )
2017-08-03 00:19:27 +02:00
error ( " The 'destinations' setting of node %s must be an array of URLs " , node_name ( n ) ) ;
2017-05-05 19:24:16 +00:00
2017-10-18 15:39:53 +02:00
struct websocket_destination * d = ( struct websocket_destination * ) alloc ( sizeof ( struct websocket_destination ) ) ;
2017-05-05 19:24:16 +00:00
2017-07-09 14:36:09 +02:00
d - > uri = strdup ( uri ) ;
2017-05-05 19:24:16 +00:00
2017-07-09 14:36:09 +02:00
ret = lws_parse_uri ( d - > uri , & prot , & ads , & d - > info . port , & path ) ;
2017-03-11 23:50:46 -03:00
if ( ret )
2017-08-03 00:19:27 +02:00
error ( " Failed to parse WebSocket URI: '%s' " , uri ) ;
2017-05-05 19:24:16 +00:00
2017-07-09 14:36:09 +02:00
d - > info . ssl_connection = ! strcmp ( prot , " https " ) ;
d - > info . address = strdup ( ads ) ;
2017-08-27 17:05:34 +02:00
d - > info . path = strdup ( path ) ;
2017-07-09 14:36:09 +02:00
d - > info . host = d - > info . address ;
d - > info . origin = d - > info . address ;
d - > info . ietf_version_or_minus_one = - 1 ;
d - > info . protocol = " live " ;
2017-05-05 19:24:16 +00:00
2019-01-07 10:28:55 +01:00
vlist_push ( & w - > destinations , d ) ;
2017-03-11 23:50:46 -03:00
}
2016-07-11 18:18:20 +02:00
}
2017-03-08 09:53:28 -03:00
2016-11-08 00:24:57 -05:00
return 0 ;
2016-11-07 22:19:30 -05:00
}
char * websocket_print ( struct node * n )
{
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2016-11-08 00:24:57 -05:00
2019-04-22 23:45:38 +02:00
char * buf = nullptr ;
2017-05-05 19:24:16 +00:00
2017-04-24 18:11:05 +02:00
buf = strcatf ( & buf , " destinations=[ " ) ;
2017-05-05 19:24:16 +00:00
2019-01-07 10:28:55 +01:00
for ( size_t i = 0 ; i < vlist_length ( & w - > destinations ) ; i + + ) {
struct websocket_destination * d = ( struct websocket_destination * ) vlist_at ( & w - > destinations , i ) ;
2017-03-25 21:23:31 +01:00
2017-08-27 17:05:34 +02:00
buf = strcatf ( & buf , " %s://%s:%d/%s " ,
2017-04-24 18:11:05 +02:00
d - > info . ssl_connection ? " wss " : " ws " ,
d - > info . address ,
d - > info . port ,
d - > info . path
2016-11-08 00:24:57 -05:00
) ;
2016-07-11 18:18:20 +02:00
}
2017-05-05 19:24:16 +00:00
2017-04-24 18:11:05 +02:00
buf = strcatf ( & buf , " ] " ) ;
2017-05-05 19:24:16 +00:00
2016-11-08 00:24:57 -05:00
return buf ;
2015-12-02 13:55:58 +01:00
}
2019-01-21 15:47:34 +01:00
int websocket_poll_fds ( struct node * n , int fds [ ] )
2017-08-30 13:30:31 +02:00
{
2017-10-18 15:39:53 +02:00
struct websocket * w = ( struct websocket * ) n - > _vd ;
2017-09-04 14:28:55 +02:00
2019-01-21 15:47:34 +01:00
fds [ 0 ] = queue_signalled_fd ( & w - > queue ) ;
return 1 ;
2017-08-30 13:30:31 +02:00
}
2019-04-22 23:43:46 +02:00
__attribute__ ( ( constructor ( 110 ) ) ) static void UNIQUE ( __ctor ) ( ) {
2019-06-23 16:13:23 +02:00
if ( plugins . state = = State : : DESTROYED )
2019-04-23 00:36:06 +02:00
vlist_init ( & plugins ) ;
2019-04-22 23:43:46 +02:00
2019-04-23 00:36:06 +02:00
p . name = " websocket " ;
p . description = " Send and receive samples of a WebSocket connection (libwebsockets) " ;
2019-06-23 16:13:23 +02:00
p . type = PluginType : : NODE ;
2019-04-22 23:43:46 +02:00
p . node . vectorize = 0 ; /* unlimited */
p . node . size = sizeof ( struct websocket ) ;
2019-06-23 16:13:23 +02:00
p . node . instances . state = State : : DESTROYED ;
2019-04-22 23:43:46 +02:00
p . node . type . start = websocket_type_start ;
p . node . destroy = websocket_destroy ;
p . node . parse = websocket_parse ;
p . node . print = websocket_print ;
p . node . start = websocket_start ;
p . node . stop = websocket_stop ;
p . node . read = websocket_read ;
p . node . write = websocket_write ;
p . node . poll_fds = websocket_poll_fds ;
vlist_init ( & p . node . instances ) ;
vlist_push ( & plugins , & p ) ;
}
2015-12-04 17:32:51 +01:00
2019-04-22 23:43:46 +02:00
__attribute__ ( ( destructor ( 110 ) ) ) static void UNIQUE ( __dtor ) ( ) {
2019-06-23 16:13:23 +02:00
if ( plugins . state ! = State : : DESTROYED )
2019-04-22 23:43:46 +02:00
vlist_remove_all ( & plugins , & p ) ;
}