From 43bab87be6173e17dc6dcb5f7c5afadc614ef20a Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 20 Jun 2023 17:38:15 +0000 Subject: [PATCH] webrtc: Support signaling messages spread over multiple WebSocket fragments Signed-off-by: Steffen Vogel --- .../villas/nodes/webrtc/signaling_client.hpp | 4 +- lib/nodes/webrtc/signaling_client.cpp | 37 ++++++++----------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/include/villas/nodes/webrtc/signaling_client.hpp b/include/villas/nodes/webrtc/signaling_client.hpp index 6e54df87e..7239de76a 100644 --- a/include/villas/nodes/webrtc/signaling_client.hpp +++ b/include/villas/nodes/webrtc/signaling_client.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -68,6 +69,8 @@ protected: std::atomic running; + Buffer buffer; // A buffer for received fragments before JSON decoding. + Logger logger; int protocolCallback(struct lws *wsi, enum lws_callback_reasons reason, void *in, size_t len); @@ -75,7 +78,6 @@ protected: static void connectStatic(struct lws_sorted_usec_list *sul); - int receive(void *in, size_t len); int writable(); public: diff --git a/lib/nodes/webrtc/signaling_client.cpp b/lib/nodes/webrtc/signaling_client.cpp index 4bb8ba49e..edc4c7043 100644 --- a/lib/nodes/webrtc/signaling_client.cpp +++ b/lib/nodes/webrtc/signaling_client.cpp @@ -114,9 +114,22 @@ int SignalingClient::protocolCallback(struct lws *wsi, enum lws_callback_reasons goto do_retry; case LWS_CALLBACK_CLIENT_RECEIVE: - ret = receive(in, len); - if (ret) - goto do_retry; + if (lws_is_first_fragment(wsi)) + buffer.clear(); + + buffer.append((char *) in, len); + + if (lws_is_final_fragment(wsi)) { + auto *json = buffer.decode(); + if (json == nullptr) { + logger->error("Failed to decode JSON"); + goto do_retry; + } + + cbMessage(SignalingMessage::fromJSON(json)); + + json_decref(json); + } break; @@ -197,24 +210,6 @@ int SignalingClient::writable() return 0; } -int SignalingClient::receive(void *in, size_t len) -{ - json_error_t err; - json_t *json = json_loadb((char *) in, len, 0, &err); - if (!json) { - logger->error("Failed to decode json: {} at ({}:{})", err.text, err.line, err.column); - return -1; - } - - logger->debug("Signaling message received: {:.{}}", (char *)in, len); - - cbMessage(SignalingMessage::fromJSON(json)); - - json_decref(json); - - return 0; -} - void SignalingClient::sendMessage(SignalingMessage msg) { outgoingMessages.push(msg);