2015-12-04 01:47:49 +01:00
/** Node type: Websockets (libwebsockets)
*
2022-12-14 17:41:58 +01:00
* @ author Steffen Vogel < post @ steffenvogel . de >
2022-03-15 09:28:57 -04:00
* @ copyright 2014 - 2022 , Institute for Automation of Complex Power Systems , EONERC
2022-07-04 18:20:03 +02:00
* @ license Apache 2.0
2015-12-04 01:47:49 +01:00
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2019-06-23 16:57:00 +02:00
# include <cstdio>
# include <cstdlib>
2015-12-02 13:55:58 +01:00
# include <unistd.h>
2019-06-23 16:57:00 +02:00
# include <cstring>
2015-12-02 13:55:58 +01:00
# include <signal.h>
2015-12-13 02:01:57 +01:00
2018-08-02 10:28:08 +02:00
# include <libwebsockets.h>
2021-08-10 10:12:48 -04:00
# include <villas/timing.hpp>
2022-12-24 15:02:05 +01:00
# include <villas/exceptions.hpp>
2019-04-23 13:15:00 +02:00
# include <villas/utils.hpp>
2021-08-10 10:12:48 -04:00
# include <villas/node_compat.hpp>
2019-04-23 00:12:31 +02:00
# include <villas/nodes/websocket.hpp>
2019-04-23 13:14:47 +02:00
# include <villas/super_node.hpp>
2017-03-12 17:13:37 -03:00
2020-06-08 04:03:07 +02:00
using namespace villas ;
2021-05-10 00:12:30 +02:00
using namespace villas : : node ;
2019-06-04 16:55:38 +02:00
using namespace villas : : utils ;
2018-05-24 09:10:56 +02:00
# define DEFAULT_WEBSOCKET_BUFFER_SIZE (1 << 12)
2015-12-13 02:01:57 +01:00
/* Private static storage */
2022-01-11 07:38:19 -05:00
static std : : list < struct websocket_connection * > connections ; /**< List of active libwebsocket connections which receive samples from all nodes (catch all) */
static std : : mutex connections_lock ;
2018-10-21 10:28:07 +02:00
2019-04-23 13:14:47 +02:00
static villas : : node : : Web * web ;
2021-02-16 14:15:14 +01:00
static villas : : Logger logger = logging . get ( " websocket " ) ;
2016-11-07 22:19:30 -05:00
2016-07-11 18:18:20 +02:00
/* Forward declarations */
2021-08-10 10:12:48 -04:00
static NodeCompatType p ;
static NodeCompatFactory ncp ( & p ) ;
2015-12-04 01:47:49 +01:00
2021-08-10 10:12:48 -04:00
static
void websocket_destination_destroy ( struct websocket_destination * d )
2016-11-08 00:24:57 -05:00
{
free ( d - > uri ) ;
2017-05-05 19:24:16 +00:00
2017-04-24 19:28:45 +02:00
free ( ( char * ) d - > info . path ) ;
free ( ( char * ) d - > info . address ) ;
2016-11-08 00:24:57 -05:00
}
2021-08-10 10:12:48 -04:00
static
int websocket_connection_init ( struct websocket_connection * c )
2018-06-12 18:36:59 +02:00
{
int ret ;
2019-10-26 13:34:03 +02:00
ret = queue_init ( & c - > queue , DEFAULT_QUEUE_LENGTH ) ;
2018-06-12 18:36:59 +02:00
if ( ret )
return ret ;
2021-08-10 10:12:48 -04:00
c - > formatter - > start ( c - > node - > getInputSignals ( false ) , ~ ( int ) SampleFlags : : HAS_OFFSET ) ;
2018-08-20 18:27:45 +02:00
2020-06-08 04:03:07 +02:00
c - > buffers . recv = new Buffer ( DEFAULT_WEBSOCKET_BUFFER_SIZE ) ;
c - > buffers . send = new Buffer ( DEFAULT_WEBSOCKET_BUFFER_SIZE ) ;
2018-06-12 18:36:59 +02:00
2020-07-04 16:22:10 +02:00
if ( ! c - > buffers . recv | | ! c - > buffers . send )
throw MemoryAllocationError ( ) ;
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : INITIALIZED ;
2018-06-12 18:36:59 +02:00
return 0 ;
}
2021-08-10 10:12:48 -04:00
static
int websocket_connection_destroy ( struct websocket_connection * c )
2018-06-12 18:36:59 +02:00
{
int ret ;
2019-06-23 16:13:23 +02:00
assert ( c - > state ! = websocket_connection : : State : : DESTROYED ) ;
2018-06-12 18:36:59 +02:00
/* Return all samples to pool */
int avail ;
2021-08-10 10:12:48 -04:00
struct Sample * smp ;
2022-01-11 07:33:43 -05:00
2018-06-12 18:36:59 +02:00
while ( ( avail = queue_pull ( & c - > queue , ( void * * ) & smp ) ) )
2018-08-07 09:22:26 +02:00
sample_decref ( smp ) ;
2018-06-12 18:36:59 +02:00
ret = queue_destroy ( & c - > queue ) ;
if ( ret )
return ret ;
2021-05-10 00:12:30 +02:00
delete c - > formatter ;
2020-06-08 04:03:07 +02:00
delete c - > buffers . recv ;
delete c - > buffers . send ;
2018-06-12 18:36:59 +02:00
2019-04-22 23:45:38 +02:00
c - > wsi = nullptr ;
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : DESTROYED ;
2018-06-12 18:36:59 +02:00
return 0 ;
}
2021-08-10 10:12:48 -04:00
static
int websocket_connection_write ( struct websocket_connection * c , struct Sample * const smps [ ] , unsigned cnt )
2016-11-08 00:24:57 -05:00
{
2017-08-27 17:05:34 +02:00
int pushed ;
2017-09-04 14:28:55 +02:00
2022-01-11 07:38:19 -05:00
if ( c - > state ! = websocket_connection : : State : : ESTABLISHED )
2018-06-12 18:36:59 +02:00
return - 1 ;
2017-08-27 17:05:34 +02:00
pushed = queue_push_many ( & c - > queue , ( void * * ) smps , cnt ) ;
2019-04-22 23:43:46 +02:00
if ( pushed < ( int ) cnt )
2023-01-10 15:25:27 +01:00
c - > node - > logger - > warn ( " Queue overrun in WebSocket connection: {} " , c - > toString ( ) ) ;
2017-09-04 14:28:55 +02:00
2018-08-07 09:22:26 +02:00
sample_incref_many ( smps , pushed ) ;
2017-09-04 14:28:55 +02:00
2023-01-10 15:25:27 +01:00
c - > node - > logger - > debug ( " Enqueued {} samples to {} " , pushed , c - > toString ( ) ) ;
2017-09-04 14:28:55 +02:00
2021-08-10 10:12:48 -04:00
/* Client connections which are currently connecting don't have an associate c->wsi yet */
2019-01-12 19:05:09 +01:00
if ( c - > wsi )
2019-04-23 13:14:47 +02:00
web - > callbackOnWritable ( c - > wsi ) ;
2022-01-11 07:38:19 -05:00
else
c - > node - > logger - > warn ( " No WSI for conn? " ) ;
2017-05-05 19:24:16 +00:00
2016-11-08 00:24:57 -05:00
return 0 ;
}
2021-08-10 10:12:48 -04:00
static
void websocket_connection_close ( struct websocket_connection * c , struct lws * wsi , enum lws_close_status status , const char * reason )
2017-04-24 19:28:45 +02:00
{
lws_close_reason ( wsi , status , ( unsigned char * ) reason , strlen ( reason ) ) ;
2023-01-10 15:25:27 +01:00
c - > node - > logger - > debug ( " Closing WebSocket connection with {}: status={}, reason={} " , c - > toString ( ) , status , reason ) ;
2022-01-11 09:13:51 -05:00
c - > state = websocket_connection : : State : : CLOSED ;
2017-04-24 19:28:45 +02:00
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_protocol_cb ( struct lws * wsi , enum lws_callback_reasons reason , void * user , void * in , size_t len )
2015-12-04 01:47:49 +01:00
{
2017-08-27 17:05:34 +02:00
int ret , recvd , pulled , cnt = 128 ;
2019-04-22 23:43:46 +02:00
struct websocket_connection * c = ( struct websocket_connection * ) user ;
2017-09-04 14:28:55 +02:00
2015-12-02 13:55:58 +01:00
switch ( reason ) {
2016-07-11 18:18:20 +02:00
case LWS_CALLBACK_CLIENT_ESTABLISHED :
2017-04-03 09:01:14 +02:00
case LWS_CALLBACK_ESTABLISHED :
2018-06-12 18:36:59 +02:00
if ( reason = = LWS_CALLBACK_CLIENT_ESTABLISHED )
2019-06-23 16:13:23 +02:00
c - > mode = websocket_connection : : Mode : : CLIENT ;
2018-06-12 18:36:59 +02:00
else {
2019-06-23 16:13:23 +02:00
c - > mode = websocket_connection : : Mode : : SERVER ;
2018-06-12 18:36:59 +02:00
/* We use the URI to associate this connection to a node
* and choose a protocol .
*
* Example : ws : //example.com/node_1.json
* Will select the node with the name ' node_1 '
* and format ' json ' .
*/
/* Get path of incoming request */
2019-02-18 01:09:33 +01:00
char * node , * format , * lasts ;
2018-06-12 18:36:59 +02:00
char uri [ 64 ] ;
lws_hdr_copy ( wsi , uri , sizeof ( uri ) , WSI_TOKEN_GET_URI ) ; /* The path component of the*/
if ( strlen ( uri ) < = 0 ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_PROTOCOL_ERR , " Invalid URL " ) ;
2021-02-16 14:15:14 +01:00
logger - > warn ( " Failed to get request URI " ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2017-09-04 14:28:55 +02:00
2019-02-18 01:09:33 +01:00
node = strtok_r ( uri , " /. " , & lasts ) ;
2018-06-12 18:36:59 +02:00
if ( ! node ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown node " ) ;
2021-02-16 14:15:14 +01:00
logger - > warn ( " Failed to tokenize request URI " ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2017-09-04 14:28:55 +02:00
2019-04-22 23:45:38 +02:00
format = strtok_r ( nullptr , " " , & lasts ) ;
2018-06-12 18:36:59 +02:00
if ( ! format )
2019-04-22 23:43:46 +02:00
format = ( char * ) " villas.web " ;
2017-05-05 19:24:16 +00:00
2018-06-12 18:36:59 +02:00
/* Search for node whose name matches the URI. */
2021-08-10 10:12:48 -04:00
auto * n = ncp . instances . lookup ( node ) ;
if ( ! n ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown node " ) ;
logger - > warn ( " Failed to find node: {} " , node ) ;
return - 1 ;
}
c - > node = dynamic_cast < NodeCompat * > ( n ) ;
2018-06-12 18:36:59 +02:00
if ( ! c - > node ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown node " ) ;
2021-02-16 14:15:14 +01:00
logger - > warn ( " Failed to find node: {} " , node ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2018-03-28 14:29:55 +02:00
2021-05-10 00:12:30 +02:00
c - > formatter = FormatFactory : : make ( format ) ;
if ( ! c - > formatter ) {
2018-06-12 18:36:59 +02:00
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Unknown format " ) ;
2021-02-16 14:15:14 +01:00
c - > node - > logger - > warn ( " Failed to find format: format={} " , format ) ;
2018-06-12 18:36:59 +02:00
return - 1 ;
}
2017-08-14 14:42:07 +02:00
}
2017-09-04 14:28:55 +02:00
2018-06-12 18:36:59 +02:00
ret = websocket_connection_init ( c ) ;
if ( ret ) {
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_POLICY_VIOLATION , " Internal error " ) ;
2021-02-16 14:15:14 +01:00
c - > node - > logger - > warn ( " Failed to intialize WebSocket connection: reason={} " , ret ) ;
2016-11-07 22:19:30 -05:00
return - 1 ;
2018-06-12 18:36:59 +02:00
}
2015-12-04 01:47:49 +01:00
2022-01-11 07:38:19 -05:00
c - > wsi = wsi ;
c - > state = websocket_connection : : State : : ESTABLISHED ;
2023-01-10 15:25:27 +01:00
c - > node - > logger - > info ( " Established WebSocket connection: {} " , c - > toString ( ) ) ;
2020-07-04 17:15:27 +02:00
2022-01-11 07:38:19 -05:00
{
std : : lock_guard guard ( connections_lock ) ;
connections . push_back ( c ) ;
}
2017-08-14 14:42:07 +02:00
break ;
2017-04-24 19:28:45 +02:00
2017-08-27 17:05:34 +02:00
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR :
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : ERROR ;
2017-05-05 19:24:16 +00:00
2021-02-16 14:15:14 +01:00
logger - > warn ( " Failed to establish WebSocket connection: reason={} " , in ? ( char * ) in : " unknown " ) ;
2016-02-04 18:25:13 +01:00
2017-08-27 17:05:34 +02:00
return - 1 ;
2017-09-04 14:28:55 +02:00
2015-12-04 01:47:49 +01:00
case LWS_CALLBACK_CLOSED :
2022-01-11 09:13:51 -05:00
c - > state = websocket_connection : : State : : CLOSED ;
2023-01-10 15:25:27 +01:00
c - > node - > logger - > debug ( " Closed WebSocket connection: {} " , c - > toString ( ) ) ;
2017-09-04 14:28:55 +02:00
2022-01-11 09:13:51 -05:00
if ( c - > state ! = websocket_connection : : State : : CLOSING ) {
2017-08-27 17:05:34 +02:00
/** @todo Attempt reconnect here */
2016-02-04 16:30:36 +01:00
}
2022-01-11 07:38:19 -05:00
{
std : : lock_guard guard ( connections_lock ) ;
connections . remove ( c ) ;
}
2017-05-05 19:24:16 +00:00
2022-01-11 09:13:51 -05:00
ret = websocket_connection_destroy ( c ) ;
if ( ret )
return ret ;
2017-04-24 19:28:45 +02:00
2019-06-23 16:13:23 +02:00
if ( c - > mode = = websocket_connection : : Mode : : CLIENT )
2020-01-21 16:26:51 +01:00
delete c ;
2017-04-24 19:28:45 +02:00
2017-08-27 17:05:34 +02:00
break ;
2017-05-05 19:24:16 +00:00
2016-07-11 18:18:20 +02:00
case LWS_CALLBACK_CLIENT_WRITEABLE :
2018-08-20 18:27:45 +02:00
case LWS_CALLBACK_SERVER_WRITEABLE : {
2021-08-10 10:12:48 -04:00
struct Sample * smps [ cnt ] ;
2017-07-13 22:39:38 +02:00
2017-08-27 17:05:34 +02:00
pulled = queue_pull_many ( & c - > queue , ( void * * ) smps , cnt ) ;
if ( pulled > 0 ) {
2018-05-23 00:17:41 +02:00
size_t wbytes ;
2021-05-10 00:12:30 +02:00
c - > formatter - > sprint ( c - > buffers . send - > data ( ) + LWS_PRE , c - > buffers . send - > size ( ) - LWS_PRE , & wbytes , smps , pulled ) ;
2017-09-04 14:28:55 +02:00
2022-02-24 07:31:00 -05:00
auto isBinary = dynamic_cast < BinaryFormat * > ( c - > formatter ) ! = nullptr ;
ret = lws_write ( wsi , ( unsigned char * ) c - > buffers . send - > data ( ) + LWS_PRE , wbytes , isBinary ? LWS_WRITE_BINARY : LWS_WRITE_TEXT ) ;
2017-05-05 19:24:16 +00:00
2018-08-07 09:22:26 +02:00
sample_decref_many ( smps , pulled ) ;
2017-05-05 19:24:16 +00:00
2019-01-14 10:00:09 +01:00
if ( ret < 0 )
return ret ;
2023-01-10 15:25:27 +01:00
c - > node - > logger - > debug ( " Send {} samples to connection: {}, bytes={} " , pulled , c - > toString ( ) , ret ) ;
2017-07-13 22:39:38 +02:00
}
2017-09-04 14:28:55 +02:00
2017-08-27 17:59:24 +02:00
if ( queue_available ( & c - > queue ) > 0 )
lws_callback_on_writable ( wsi ) ;
2022-01-11 09:13:51 -05:00
else if ( c - > state = = websocket_connection : : State : : CLOSING ) {
2018-08-20 18:27:45 +02:00
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_GOINGAWAY , " Node stopped " ) ;
return - 1 ;
}
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
break ;
2018-08-20 18:27:45 +02:00
}
2015-12-02 13:55:58 +01:00
2016-07-11 18:18:20 +02:00
case LWS_CALLBACK_CLIENT_RECEIVE :
2017-09-04 14:28:55 +02:00
case LWS_CALLBACK_RECEIVE :
2017-08-27 17:05:34 +02:00
if ( lws_is_first_fragment ( wsi ) )
2020-06-08 04:03:07 +02:00
c - > buffers . recv - > clear ( ) ;
2017-09-04 14:28:55 +02:00
2020-10-20 22:17:55 +02:00
c - > buffers . recv - > append ( ( char * ) in , len ) ;
2017-09-04 14:28:55 +02:00
2017-08-27 17:05:34 +02:00
/* We dont try to parse the frame yet, as we have to wait for the remaining fragments */
if ( lws_is_final_fragment ( wsi ) ) {
struct timespec ts_recv = time_now ( ) ;
2021-08-10 10:12:48 -04:00
auto * n = c - > node ;
2018-02-06 23:30:37 +01:00
2018-05-07 18:48:34 +02:00
int avail , enqueued ;
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
struct Sample * smps [ cnt ] ;
2017-09-04 14:28:55 +02:00
2018-05-07 18:48:34 +02:00
avail = sample_alloc_many ( & w - > pool , smps , cnt ) ;
if ( avail < cnt )
2023-01-10 15:25:27 +01:00
c - > node - > logger - > warn ( " Pool underrun for connection: {} " , c - > toString ( ) ) ;
2017-05-05 19:24:16 +00:00
2021-05-10 00:12:30 +02:00
recvd = c - > formatter - > sscan ( c - > buffers . recv - > data ( ) , c - > buffers . recv - > size ( ) , nullptr , smps , avail ) ;
2017-08-27 17:05:34 +02:00
if ( recvd < 0 ) {
2023-01-10 15:25:27 +01:00
c - > node - > logger - > warn ( " Failed to parse sample data received on connection: {} " , c - > toString ( ) ) ;
2017-08-27 17:05:34 +02:00
break ;
2017-04-24 19:28:45 +02:00
}
2017-05-05 19:24:16 +00:00
2023-01-10 15:25:27 +01:00
c - > node - > logger - > debug ( " Received {} samples from connection: {} " , recvd , c - > toString ( ) ) ;
2017-05-05 19:24:16 +00:00
2017-08-27 17:05:34 +02:00
/* Set receive timestamp */
2017-09-04 14:28:55 +02:00
for ( int i = 0 ; i < recvd ; i + + ) {
2017-08-27 17:05:34 +02:00
smps [ i ] - > ts . received = ts_recv ;
2019-06-23 16:13:23 +02:00
smps [ i ] - > flags | = ( int ) SampleFlags : : HAS_TS_RECEIVED ;
2017-09-04 14:28:55 +02:00
}
2017-04-24 19:28:45 +02:00
2018-05-07 18:48:34 +02:00
enqueued = queue_signalled_push_many ( & w - > queue , ( void * * ) smps , recvd ) ;
if ( enqueued < recvd )
2023-01-10 15:25:27 +01:00
c - > node - > logger - > warn ( " Queue overrun in connection: {} " , c - > toString ( ) ) ;
2017-09-04 14:28:55 +02:00
2018-05-07 18:48:34 +02:00
/* Release unused samples back to pool */
if ( enqueued < avail )
2018-08-07 09:22:26 +02:00
sample_decref_many ( & smps [ enqueued ] , avail - enqueued ) ;
2018-05-07 18:48:34 +02:00
2020-06-08 04:03:07 +02:00
c - > buffers . recv - > clear ( ) ;
2018-05-26 01:13:22 +02:00
2022-01-11 09:13:51 -05:00
if ( c - > state = = websocket_connection : : State : : CLOSING ) {
2017-08-27 17:05:34 +02:00
websocket_connection_close ( c , wsi , LWS_CLOSE_STATUS_GOINGAWAY , " Node stopped " ) ;
return - 1 ;
2016-11-07 22:19:30 -05:00
}
2016-01-14 23:17:39 +01:00
}
2017-05-05 19:24:16 +00:00
2017-08-27 17:05:34 +02:00
break ;
2016-07-11 18:18:20 +02:00
2015-12-02 13:55:58 +01:00
default :
2017-08-14 14:42:07 +02:00
break ;
2015-12-02 13:55:58 +01:00
}
2017-09-04 14:28:55 +02:00
2017-08-14 14:42:07 +02:00
return 0 ;
2015-12-02 13:55:58 +01:00
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_type_start ( villas : : node : : SuperNode * sn )
2017-04-24 19:28:45 +02:00
{
2022-12-24 15:02:05 +01:00
if ( sn = = nullptr )
throw RuntimeError ( " WebSocket node-type requires super-node " ) ;
2019-04-23 13:14:47 +02:00
web = sn - > getWeb ( ) ;
2021-09-14 10:50:05 +02:00
if ( ! web - > isEnabled ( ) )
2018-12-02 02:56:52 +01:00
return - 1 ;
2017-04-24 19:28:45 +02:00
return 0 ;
}
2022-01-11 07:35:28 -05:00
int villas : : node : : websocket_init ( NodeCompat * n )
{
auto * w = n - > getData < struct websocket > ( ) ;
w - > wait = false ;
int ret = list_init ( & w - > destinations ) ;
if ( ret )
return ret ;
return 0 ;
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_start ( NodeCompat * n )
2015-12-02 13:55:58 +01:00
{
2016-07-11 18:18:20 +02:00
int ret ;
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
2017-05-05 19:24:16 +00:00
2021-08-10 10:12:48 -04:00
ret = pool_init ( & w - > pool , DEFAULT_WEBSOCKET_QUEUE_LENGTH , SAMPLE_LENGTH ( n - > getInputSignals ( false ) - > size ( ) ) ) ;
2016-07-11 18:18:20 +02:00
if ( ret )
return ret ;
2017-05-05 19:24:16 +00:00
2019-10-26 13:34:03 +02:00
ret = queue_signalled_init ( & w - > queue , DEFAULT_WEBSOCKET_QUEUE_LENGTH ) ;
2017-04-02 13:02:49 +02:00
if ( ret )
return ret ;
2016-11-07 22:19:30 -05:00
2021-08-10 10:12:48 -04:00
for ( size_t i = 0 ; i < list_length ( & w - > destinations ) ; i + + ) {
2018-08-20 18:27:45 +02:00
const char * format ;
2021-08-10 10:12:48 -04:00
auto * d = ( struct websocket_destination * ) list_at ( & w - > destinations , i ) ;
2020-01-21 16:26:51 +01:00
auto * c = new struct websocket_connection ;
2020-07-04 16:22:10 +02:00
if ( ! c )
throw MemoryAllocationError ( ) ;
2017-05-05 19:24:16 +00:00
2019-06-23 16:13:23 +02:00
c - > state = websocket_connection : : State : : CONNECTING ;
2018-06-12 18:36:59 +02:00
2018-08-20 18:27:45 +02:00
format = strchr ( d - > info . path , ' . ' ) ;
2019-01-12 19:06:23 +01:00
if ( format )
2019-03-26 07:09:55 +01:00
format = format + 1 ; /* Removes "." */
2019-01-12 19:06:23 +01:00
else
2018-08-20 18:27:45 +02:00
format = " villas.web " ;
2021-05-10 00:12:30 +02:00
c - > formatter = FormatFactory : : make ( format ) ;
if ( ! c - > formatter )
2018-08-20 18:27:45 +02:00
return - 1 ;
2017-04-24 19:28:45 +02:00
c - > node = n ;
2017-08-14 14:42:07 +02:00
c - > destination = d ;
2017-05-05 19:24:16 +00:00
2019-04-23 13:14:47 +02:00
d - > info . context = web - > getContext ( ) ;
d - > info . vhost = web - > getVHost ( ) ;
2017-04-24 19:28:45 +02:00
d - > info . userdata = c ;
2017-09-04 14:28:55 +02:00
2017-04-24 19:28:45 +02:00
lws_client_connect_via_info ( & d - > info ) ;
}
2017-03-16 22:42:58 -03:00
2022-01-11 07:33:43 -05:00
/* Wait until all destinations are connected */
if ( w - > wait ) {
2022-01-11 09:13:51 -05:00
unsigned connected = 0 , total = list_length ( & w - > destinations ) ;
2022-01-11 07:33:43 -05:00
do {
{
std : : lock_guard guard ( connections_lock ) ;
connected = 0 ;
for ( auto * c : connections ) {
if ( c - > mode = = websocket_connection : : Mode : : CLIENT & &
c - > state = = websocket_connection : : State : : ESTABLISHED & &
c - > node = = n )
connected + + ;
}
}
2022-01-11 09:13:51 -05:00
if ( connected < total ) {
n - > logger - > info ( " Wait until all destinations are connected: pending={} " , total - connected ) ;
sleep ( 1 ) ;
}
} while ( connected < total ) ;
2022-01-11 07:33:43 -05:00
}
2015-12-02 13:55:58 +01:00
return 0 ;
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_stop ( NodeCompat * n )
2016-02-04 17:13:28 +01:00
{
2022-01-11 07:38:19 -05:00
int ret ;
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
2017-09-04 14:28:55 +02:00
2022-01-11 07:38:19 -05:00
unsigned open_connections ;
do {
{
std : : lock_guard guard ( connections_lock ) ;
2017-05-05 19:24:16 +00:00
2022-01-11 07:38:19 -05:00
open_connections = 0 ;
for ( auto * c : connections ) {
if ( c - > node = = n ) {
2022-01-11 09:13:51 -05:00
if ( c - > state ! = websocket_connection : : State : : CLOSED ) {
open_connections + + ;
2017-04-24 19:28:45 +02:00
2022-01-11 09:13:51 -05:00
c - > state = websocket_connection : : State : : CLOSING ;
2022-01-11 07:38:19 -05:00
lws_callback_on_writable ( c - > wsi ) ;
}
}
}
}
2018-08-20 18:27:45 +02:00
2022-01-11 09:13:51 -05:00
if ( open_connections > 0 ) {
n - > logger - > info ( " Waiting for open connections to be closed: pending={} " , open_connections ) ;
sleep ( 1 ) ;
}
2022-01-11 07:38:19 -05:00
} while ( open_connections > 0 ) ;
2018-08-20 18:27:45 +02:00
2019-03-31 12:52:07 +02:00
ret = queue_signalled_close ( & w - > queue ) ;
if ( ret )
return ret ;
2016-02-04 17:13:28 +01:00
return 0 ;
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_destroy ( NodeCompat * n )
2015-12-02 13:55:58 +01:00
{
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
2018-05-26 01:15:23 +02:00
int ret ;
2017-03-16 22:42:58 -03:00
2022-01-11 07:38:19 -05:00
ret = queue_signalled_destroy ( & w - > queue ) ;
if ( ret )
return ret ;
ret = pool_destroy ( & w - > pool ) ;
if ( ret )
return ret ;
2021-08-10 10:12:48 -04:00
ret = list_destroy ( & w - > destinations , ( dtor_cb_t ) websocket_destination_destroy , true ) ;
2018-05-26 01:15:23 +02:00
if ( ret )
return ret ;
2015-12-04 01:47:49 +01:00
2015-12-02 13:55:58 +01:00
return 0 ;
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_read ( NodeCompat * n , struct Sample * const smps [ ] , unsigned cnt )
2015-12-02 13:55:58 +01:00
{
2017-04-24 19:28:45 +02:00
int avail ;
2017-03-16 22:42:58 -03:00
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
struct Sample * cpys [ cnt ] ;
2017-03-16 22:42:58 -03:00
2018-07-11 18:14:29 +02:00
avail = queue_signalled_pull_many ( & w - > queue , ( void * * ) cpys , cnt ) ;
2017-08-27 17:05:34 +02:00
if ( avail < 0 )
return avail ;
2017-04-24 19:28:45 +02:00
2018-03-28 14:29:55 +02:00
sample_copy_many ( smps , cpys , avail ) ;
2018-08-07 09:22:26 +02:00
sample_decref_many ( cpys , avail ) ;
2016-06-08 22:39:17 +02:00
2017-04-24 19:28:45 +02:00
return avail ;
2015-12-02 13:55:58 +01:00
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_write ( NodeCompat * n , struct Sample * const smps [ ] , unsigned cnt )
2015-12-02 13:55:58 +01:00
{
2017-04-24 19:28:45 +02:00
int avail ;
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
struct Sample * cpys [ cnt ] ;
2017-05-05 19:24:16 +00:00
2017-04-24 19:28:45 +02:00
/* Make copies of all samples */
2018-07-11 18:14:29 +02:00
avail = sample_alloc_many ( & w - > pool , cpys , cnt ) ;
2019-04-22 23:43:46 +02:00
if ( avail < ( int ) cnt )
2021-02-16 14:15:14 +01:00
n - > logger - > warn ( " Pool underrun: avail={} " , avail ) ;
2017-04-24 19:28:45 +02:00
2018-03-28 14:29:55 +02:00
sample_copy_many ( cpys , smps , avail ) ;
2017-05-05 19:24:16 +00:00
2022-01-11 07:38:19 -05:00
{
std : : lock_guard guard ( connections_lock ) ;
for ( auto * c : connections ) {
if ( c - > node = = n )
websocket_connection_write ( c , cpys , cnt ) ;
}
2017-03-25 21:23:31 +01:00
}
2017-05-05 19:24:16 +00:00
2018-08-07 09:22:26 +02:00
sample_decref_many ( cpys , avail ) ;
2016-06-08 22:39:17 +02:00
2018-07-11 18:14:29 +02:00
return cnt ;
2015-12-02 13:55:58 +01:00
}
2016-07-11 18:18:20 +02:00
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_parse ( NodeCompat * n , json_t * json )
2016-11-07 22:19:30 -05:00
{
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
2016-11-08 00:24:57 -05:00
int ret ;
2017-03-16 22:42:58 -03:00
2018-08-07 18:40:32 +02:00
size_t i ;
2019-04-22 23:45:38 +02:00
json_t * json_dests = nullptr ;
2017-10-16 08:08:35 +02:00
json_t * json_dest ;
2017-08-03 00:19:27 +02:00
json_error_t err ;
2022-01-11 07:33:43 -05:00
int wc = - 1 ;
2017-08-03 00:19:27 +02:00
2017-03-16 22:42:58 -03:00
2022-01-11 07:33:43 -05:00
ret = json_unpack_ex ( json , & err , 0 , " { s?: o, s?: b } " ,
" destinations " , & json_dests ,
" wait_connected " , & wc
) ;
2017-08-03 00:19:27 +02:00
if ( ret )
2021-02-16 14:15:14 +01:00
throw ConfigError ( json , err , " node-config-node-websocket " ) ;
2017-08-03 00:19:27 +02:00
2022-01-11 07:33:43 -05:00
if ( wc > = 0 )
w - > wait = wc ! = 0 ;
2022-01-11 07:35:28 -05:00
list_clear ( & w - > destinations ) ;
2017-10-16 08:08:35 +02:00
if ( json_dests ) {
if ( ! json_is_array ( json_dests ) )
2021-02-16 14:15:14 +01:00
throw ConfigError ( json_dests , err , " node-config-node-websocket-destinations " , " The 'destinations' setting must be an array of URLs " ) ;
2017-05-05 19:24:16 +00:00
2018-08-07 18:40:32 +02:00
json_array_foreach ( json_dests , i , json_dest ) {
2017-03-11 23:50:46 -03:00
const char * uri , * prot , * ads , * path ;
2017-05-05 19:24:16 +00:00
2017-10-16 08:08:35 +02:00
uri = json_string_value ( json_dest ) ;
2017-03-11 23:50:46 -03:00
if ( ! uri )
2021-02-16 14:15:14 +01:00
throw ConfigError ( json_dest , err , " node-config-node-websocket-destinations " , " The 'destinations' setting must be an array of URLs " ) ;
2017-05-05 19:24:16 +00:00
2020-01-21 16:26:51 +01:00
auto * d = new struct websocket_destination ;
2020-07-04 16:22:10 +02:00
if ( ! d )
throw MemoryAllocationError ( ) ;
memset ( d , 0 , sizeof ( struct websocket_destination ) ) ;
2017-05-05 19:24:16 +00:00
2017-07-09 14:36:09 +02:00
d - > uri = strdup ( uri ) ;
2017-05-05 19:24:16 +00:00
2017-07-09 14:36:09 +02:00
ret = lws_parse_uri ( d - > uri , & prot , & ads , & d - > info . port , & path ) ;
2017-03-11 23:50:46 -03:00
if ( ret )
2021-02-16 14:15:14 +01:00
throw ConfigError ( json_dest , err , " node-config-node-websocket-destinations " , " Failed to parse WebSocket URI: '{}' " , uri ) ;
2017-05-05 19:24:16 +00:00
2017-07-09 14:36:09 +02:00
d - > info . ssl_connection = ! strcmp ( prot , " https " ) ;
d - > info . address = strdup ( ads ) ;
2020-09-30 16:07:39 +02:00
d - > info . path = strf ( " /%s " , path ) ;
2017-07-09 14:36:09 +02:00
d - > info . host = d - > info . address ;
d - > info . origin = d - > info . address ;
d - > info . ietf_version_or_minus_one = - 1 ;
d - > info . protocol = " live " ;
2017-05-05 19:24:16 +00:00
2021-08-10 10:12:48 -04:00
list_push ( & w - > destinations , d ) ;
2017-03-11 23:50:46 -03:00
}
2016-07-11 18:18:20 +02:00
}
2017-03-08 09:53:28 -03:00
2016-11-08 00:24:57 -05:00
return 0 ;
2016-11-07 22:19:30 -05:00
}
2021-08-10 10:12:48 -04:00
char * villas : : node : : websocket_print ( NodeCompat * n )
2016-11-07 22:19:30 -05:00
{
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
2016-11-08 00:24:57 -05:00
2019-04-22 23:45:38 +02:00
char * buf = nullptr ;
2017-05-05 19:24:16 +00:00
2017-04-24 18:11:05 +02:00
buf = strcatf ( & buf , " destinations=[ " ) ;
2017-05-05 19:24:16 +00:00
2021-08-10 10:12:48 -04:00
for ( size_t i = 0 ; i < list_length ( & w - > destinations ) ; i + + ) {
struct websocket_destination * d = ( struct websocket_destination * ) list_at ( & w - > destinations , i ) ;
2017-03-25 21:23:31 +01:00
2017-08-27 17:05:34 +02:00
buf = strcatf ( & buf , " %s://%s:%d/%s " ,
2017-04-24 18:11:05 +02:00
d - > info . ssl_connection ? " wss " : " ws " ,
d - > info . address ,
d - > info . port ,
d - > info . path
2016-11-08 00:24:57 -05:00
) ;
2016-07-11 18:18:20 +02:00
}
2017-05-05 19:24:16 +00:00
2017-04-24 18:11:05 +02:00
buf = strcatf ( & buf , " ] " ) ;
2017-05-05 19:24:16 +00:00
2016-11-08 00:24:57 -05:00
return buf ;
2015-12-02 13:55:58 +01:00
}
2021-08-10 10:12:48 -04:00
int villas : : node : : websocket_poll_fds ( NodeCompat * n , int fds [ ] )
2017-08-30 13:30:31 +02:00
{
2021-08-10 10:12:48 -04:00
auto * w = n - > getData < struct websocket > ( ) ;
2017-09-04 14:28:55 +02:00
2019-01-21 15:47:34 +01:00
fds [ 0 ] = queue_signalled_fd ( & w - > queue ) ;
return 1 ;
2017-08-30 13:30:31 +02:00
}
2019-04-22 23:43:46 +02:00
__attribute__ ( ( constructor ( 110 ) ) ) static void UNIQUE ( __ctor ) ( ) {
2021-06-21 16:11:42 -04:00
p . name = " websocket " ;
p . description = " Send and receive samples of a WebSocket connection (libwebsockets) " ;
p . vectorize = 0 ;
p . size = sizeof ( struct websocket ) ;
p . type . start = websocket_type_start ;
2022-01-11 07:35:28 -05:00
p . init = websocket_init ;
2021-06-21 16:11:42 -04:00
p . destroy = websocket_destroy ;
p . parse = websocket_parse ;
p . print = websocket_print ;
p . start = websocket_start ;
p . stop = websocket_stop ;
p . read = websocket_read ;
p . write = websocket_write ;
p . poll_fds = websocket_poll_fds ;
2021-08-10 10:12:48 -04:00
p . flags = ( int ) NodeFactory : : Flags : : REQUIRES_WEB ;
2019-04-22 23:43:46 +02:00
}