From 7917fe858ea30dc64199ee947e506af3aadfc1c7 Mon Sep 17 00:00:00 2001 From: Andy Green Date: Wed, 19 Jul 2017 04:39:14 +0800 Subject: [PATCH] lws-meta --- CMakeLists.txt | 2 + README.lws-meta.md | 192 +++++++ lib/libwebsockets.c | 89 +++- lib/libwebsockets.h | 116 ++++- lib/output.c | 17 + lib/parsers.c | 18 +- lib/pollfd.c | 15 + lib/private-libwebsockets.h | 4 + lib/server.c | 108 ++-- lib/service.c | 7 + lwsws/etc-lwsws-conf.d-localhost-EXAMPLE | 3 + plugins/protocol_lws_meta.c | 613 +++++++++++++++++++++++ test-server/lws-common.js | 279 +++++++++++ test-server/test-server-v2.0.c | 9 +- test-server/test-server.c | 5 + test-server/test.html | 35 +- 16 files changed, 1446 insertions(+), 66 deletions(-) create mode 100644 README.lws-meta.md create mode 100644 plugins/protocol_lws_meta.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ad0174e..e784ae2a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1443,6 +1443,8 @@ if (NOT LWS_WITHOUT_TESTAPPS) endmacro() + create_plugin(protocol_lws_meta + "plugins/protocol_lws_meta.c" "" "") create_plugin(protocol_dumb_increment "plugins/protocol_dumb_increment.c" "" "") create_plugin(protocol_lws_mirror diff --git a/README.lws-meta.md b/README.lws-meta.md new file mode 100644 index 00000000..dbca4c0a --- /dev/null +++ b/README.lws-meta.md @@ -0,0 +1,192 @@ +# lws-meta protocol + +lws-meta is a lightweight ws subprotocol that accepts other ws connections +to the same server inside it and multiplexes their access to the connection. + +``` + Client Server + + conn1: \ / :conn1 + conn2: = mux ------ lws-meta ws protocol ----- mux = :conn2 + conn3: / \ :conn3 +``` + +You may have n client ws connections back to the server, but you now +only have one tcp connection (and one SSL wrapper if using SSL) instead +of n of those. + +If you currently make multiple ws connections back to the server, so you +can have different protocols active in one webpage, this if for you. + + - The subprotocol code for the connections inside a lws-meta connection + need zero changes from being a normal ws connection. It is unaware + it is inside an lws-meta parent connection. + + - The traffic on the lws-meta connection is indistinguishable from + standard ws traffic, so intermediaries won't object to it + + - The multiplexing is done in the protocol, **not by an extension**. So + it's compatible with all browsers. + + - Javascript helper code is provided to very simply use lws-meta + protocol instead of direct connections. The lws test server has + been converted to use this by default. + +# Converting your server + +1) include the provided lws-meta plugin (plugins/protocl_lws_meta.c) as an +active protocol for your server. You can do that using runtime plugins, or +include the plugin sources into your server at build-time. The lws test +server uses the latter approach. + +That's all you need to do on the server side. + +# Converting your browser JS + +1) import lws-common.js + +2) Instantiate a parent lws-meta connection object + +``` +var lws_meta = new lws_meta_ws(); +``` + +3) Connect the lws-meta object to your server + +``` +lws_meta.new_parent(get_appropriate_ws_url("?mirror=" + mirror_name)); +``` + +4) Convert your actual ws connections to go via the lws_meta object + +``` +var my_ws = lws_meta.new_ws("", "dumb-increment-protocol"); +``` + +The first arg is the URL path, the second arg is the ws protocol you want. + +That's it. my_ws will get `onopen()`, `onmessage()` etc calls as before. + +# lws-meta wire protocol + +lws-meta works by adding some bytes at the start of a message indicating +which channel the message applies to. + +Channel messages are atomic on the wire. The reason is if we tried to +intersperse other channel fragments between one channels message fragments, +an intermediary would observe violations of the ws framing rule about +having to start a message with TEXT or BINARY, and use only CONTINUATION +for the subsequent fragments. Eg + +``` + [ ch1 TEXT NOFIN ] [ ch2 BINARY FIN ] [ ch1 CONTINUATION FIN ] +``` + +is illegal to an observer that doesn't understand lws-meta headers in the +packet payloads. So to avoid this situation, only complete messages may +be sent from one subchannel in each direction at a time. + +Consequently, only the first fragment of each message is modified to +have the extra two bytes identifying the subchannel it is aimed at, since +the rest of the message from the same subchannel is defined to follow. + +If it makes latencies, modify the protocol sending large messages to +send smaller messages, so the transmission of messages from other channels +can be sent inbetween the smaller messages. + +## lws-meta commands + +1) CSTRING indicates a string terminated by 0x00 byte + +2) Channel IDs are sent with 0x20 added to them, to guarantee valid UTF-8 + +### 0x41: RX: LWS_META_CMD_OPEN_SUBCHANNEL + + - CSTRING: protocol name + - CSTRING: url + - CSTRING: cookie (7 bytes max) + +Client is requesting to open a new channel with the given protocol name, +at the given url. The cookie (eg, channel name) is only used in +LWS_META_CMD_OPEN_RESULT, when the channel id is assigned, so it is +applied to the right channel. + +### 0x42: TX: LWS_META_CMD_OPEN_RESULT + + - CSTRING cookie + - BYTE channel id (0 indicates failed) + - CSTRING: selected protocol name + +The server is informing the client of the results of a previous +open request. The cookie the client sent to identify the request +is returned along with a channel id to be used subsequently. If +the channel ID is 0 (after subtracting the transport offset of +0x20) then the open request has failed. + +### 0x43: TX: LWS_META_CMD_CLOSE_NOTIFY + + - BYTE channel id + - BYTE: payload length + 0x20 + - BYTE: close code MSB + - BYTE: close code LSB + - PAYLOAD: payload (< 123 bytes) + +Server notifies the client that a child has closed, for whatever reason. + +### 0x44: RX: LWS_META_CMD_CLOSE_RQ + - BYTE: channel id + - BYTE: payload length + 0x20 + - BYTE: close code MSB + - BYTE: close code LSB + - PAYLOAD: payload (< 123 bytes) + +The client requests to close a child connection + +### 0x45: TX: LWS_META_CMD_WRITE + + - BYTE: channel id + +Normal write of payload n from lws-meta perspective is actually +LWS_META_CMD_WRITE, channel id, then (n - 2) bytes of payload + +The command only appears at the start of a message, continuations do +not have the command. + +## Protocol Notes + + - Once the subchannel is up, overhead is only +2 bytes per message + + - Close reasons are supported in both directions + + - Ping and Pong are only supported at the lws-meta level, using normal ws ping and pong packets. + + - Only the final close of the tcp lws-meta connection itself goes out as + a normal ws close frame. Subchannels close is done in a normal TEXT + message using LWS_META_CMD_CLOSE_RQ and then the close packet payload. + This is so intermediaries do not mistake subchannel closures for the + tcp / ws link going down. + + Messages that start with LWS_META_CMD_OPEN_SUBCHANNEL only contain those + commands but may contain any number of them for the whole duration of the + message. The lws-meta js support collects child open requests made before + the parent lws-meta connection is open, and dumps them all in a single + message when it does open. + + Messages that start with LWS_META_CMD_OPEN_RESULT or LWS_META_CMD_CLOSE_NOTIFY + only contain those two commands, but they may contain any number of them + for the whole duration of the message. + + +# Current Implemention Limitations + + - only server side is supported in lws. The client side JS for + a browser is supported. + + - max number of child connections per parent at the moment is 8 + + - child connection URL paramter when opening the connection is + ignored + + - there is no ah attached when the child connections are + established inside the lws-meta parent. So header access + functions will fail. diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 51d86181..5cd0c03e 100755 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -116,6 +116,13 @@ lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs) struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; time_t now; + if (secs == LWS_TO_KILL_SYNC) { + lws_remove_from_timeout_list(wsi); + lwsl_debug("synchronously killing %p\n", wsi); + lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS); + return; + } + lws_pt_lock(pt); time(&now); @@ -157,6 +164,12 @@ lws_remove_child_from_any_parent(struct lws *wsi) if (*pwsi == wsi) { lwsl_info("%s: detach %p from parent %p\n", __func__, wsi, wsi->parent); + + if (wsi->parent->protocol) + wsi->parent->protocol->callback(wsi, + LWS_CALLBACK_CHILD_CLOSING, + wsi->parent->user_space, wsi, 0); + *pwsi = wsi->sibling_list; seen = 1; break; @@ -228,6 +241,8 @@ lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason) struct lws_tokens eff_buf; int n, m, ret; + lwsl_debug("%s: %p\n", __func__, wsi); + if (!wsi) return; @@ -568,6 +583,8 @@ just_kill_connection: /* checking return redundant since we anyway close */ if (wsi->desc.sockfd != LWS_SOCK_INVALID) remove_wsi_socket_from_fds(wsi); + else + lws_same_vh_protocol_remove(wsi); #if defined(LWS_WITH_ESP8266) espconn_disconnect(wsi->desc.sockfd); @@ -670,17 +687,18 @@ async_close: wsi->socket_is_permanently_unusable = 1; #ifdef LWS_USE_LIBUV - if (LWS_LIBUV_ENABLED(context)) { - if (wsi->listener) { - lwsl_debug("%s: stopping listner libuv poll\n", __func__); - uv_poll_stop(&wsi->w_read.uv_watcher); - } - lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi); - /* libuv has to do his own close handle processing asynchronously */ - lws_libuv_closehandle(wsi); + if (!wsi->parent_carries_io) + if (LWS_LIBUV_ENABLED(context)) { + if (wsi->listener) { + lwsl_debug("%s: stopping listner libuv poll\n", __func__); + uv_poll_stop(&wsi->w_read.uv_watcher); + } + lwsl_debug("%s: lws_libuv_closehandle: wsi %p\n", __func__, wsi); + /* libuv has to do his own close handle processing asynchronously */ + lws_libuv_closehandle(wsi); - return; - } + return; + } #endif lws_close_free_wsi_final(wsi); @@ -856,6 +874,9 @@ lws_get_peer_simple(struct lws *wsi, char *name, int namelen) int af = AF_INET; void *p, *q; + if (wsi->parent_carries_io) + wsi = wsi->parent; + #ifdef LWS_USE_IPV6 if (LWS_IPV6_ENABLED(wsi->vhost)) { len = sizeof(sin6); @@ -1407,6 +1428,12 @@ lws_is_final_fragment(struct lws *wsi) return wsi->u.ws.final && !wsi->u.ws.rx_packet_length && !wsi->u.ws.rx_draining_ext; } +LWS_VISIBLE int +lws_is_first_fragment(struct lws *wsi) +{ + return wsi->u.ws.first_fragment; +} + LWS_VISIBLE unsigned char lws_get_reserved_bits(struct lws *wsi) { @@ -1650,6 +1677,48 @@ lws_get_child(const struct lws *wsi) return wsi->child_list; } +LWS_VISIBLE LWS_EXTERN void +lws_set_parent_carries_io(struct lws *wsi) +{ + wsi->parent_carries_io = 1; +} + +LWS_VISIBLE LWS_EXTERN void * +lws_get_opaque_parent_data(const struct lws *wsi) +{ + return wsi->opaque_parent_data; +} + +LWS_VISIBLE LWS_EXTERN void +lws_set_opaque_parent_data(struct lws *wsi, void *data) +{ + wsi->opaque_parent_data = data; +} + +LWS_VISIBLE LWS_EXTERN int +lws_get_child_pending_on_writable(const struct lws *wsi) +{ + return wsi->parent_pending_cb_on_writable; +} + +LWS_VISIBLE LWS_EXTERN void +lws_clear_child_pending_on_writable(struct lws *wsi) +{ + wsi->parent_pending_cb_on_writable = 0; +} + +LWS_VISIBLE LWS_EXTERN int +lws_get_close_length(struct lws *wsi) +{ + return wsi->u.ws.close_in_ping_buffer_len; +} + +LWS_VISIBLE LWS_EXTERN unsigned char * +lws_get_close_payload(struct lws *wsi) +{ + return &wsi->u.ws.ping_payload_buf[LWS_PRE]; +} + LWS_VISIBLE LWS_EXTERN void lws_close_reason(struct lws *wsi, enum lws_close_status status, unsigned char *buf, size_t len) diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index fcd5d99b..7faa5f31 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -823,6 +823,38 @@ struct lws_context; /* needed even with extensions disabled for create context */ struct lws_extension; +/*! \defgroup lwsmeta lws-meta + * + * ##lws-meta protocol + * + * The protocol wraps other muxed connections inside one tcp connection. + * + * Commands are assigned from 0x41 up (so they are valid unicode) + */ +///@{ + +enum lws_meta_commands { + LWS_META_CMD_OPEN_SUBCHANNEL = 'A', + /**< Client requests to open new subchannel + */ + LWS_META_CMD_OPEN_RESULT, + /**< Result of client request to open new subchannel */ + LWS_META_CMD_CLOSE_NOTIFY, + /**< Notification of subchannel closure */ + LWS_META_CMD_CLOSE_RQ, + /**< client requests to close a subchannel */ + LWS_META_CMD_WRITE, + /**< connection writes something to specific channel index */ + + /****** add new things just above ---^ ******/ +}; + +/* channel numbers are transported offset by 0x20 so they are valid unicode */ + +#define LWS_META_CHANNEL_OFFSET_TRANSPORT 0x20 + +///@} + /*! \defgroup usercb User Callback * * ##User protocol callback @@ -1252,6 +1284,16 @@ enum lws_callback_reasons { * using the vhost. @in is a pointer to a * struct lws_ssl_info containing information about the * event*/ + LWS_CALLBACK_CHILD_WRITE_VIA_PARENT = 68, + /**< Child has been marked with parent_carries_io attribute, so + * lws_write directs the to this callback at the parent, + * @in is a struct lws_write_passthru containing the args + * the lws_write() was called with. + */ + LWS_CALLBACK_CHILD_CLOSING = 69, + /**< Sent to parent to notify them a child is closing / being + * destroyed. @in is the child wsi. + */ /****** add new things just above ---^ ******/ @@ -2783,6 +2825,9 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi) /* Backwards compatibility */ #define lws_plat_service_tsi lws_service_tsi +LWS_VISIBLE LWS_EXTERN int +lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd); + ///@} /*! \defgroup http HTTP @@ -3625,6 +3670,19 @@ enum pending_timeout { /****** add new things just above ---^ ******/ }; +#define LWS_TO_KILL_ASYNC -1 +/**< If LWS_TO_KILL_ASYNC is given as the timeout sec in a lws_set_timeout() + * call, then the connection is marked to be killed at the next timeout + * check. This is how you should force-close the wsi being serviced if + * you are doing it outside the callback (where you should close by nonzero + * return). + */ +#define LWS_TO_KILL_SYNC -2 +/**< If LWS_TO_KILL_SYNC is given as the timeout sec in a lws_set_timeout() + * call, then the connection is closed before returning (which may delete + * the wsi). This should only be used where the wsi being closed is not the + * wsi currently being serviced. + */ /** * lws_set_timeout() - marks the wsi as subject to a timeout * @@ -3632,7 +3690,10 @@ enum pending_timeout { * * \param wsi: Websocket connection instance * \param reason: timeout reason - * \param secs: how many seconds + * \param secs: how many seconds. You may set to LWS_TO_KILL_ASYNC to + * force the connection to timeout at the next opportunity, or + * LWS_TO_KILL_SYNC to close it synchronously if you know the + * wsi is not the one currently being serviced. */ LWS_VISIBLE LWS_EXTERN void lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs); @@ -3657,7 +3718,8 @@ lws_set_timeout(struct lws *wsi, enum pending_timeout reason, int secs); #endif #define _LWS_PAD(n) (((n) % _LWS_PAD_SIZE) ? \ ((n) + (_LWS_PAD_SIZE - ((n) % _LWS_PAD_SIZE))) : (n)) -#define LWS_PRE _LWS_PAD(4 + 10) +/* last 2 is for lws-meta */ +#define LWS_PRE _LWS_PAD(4 + 10 + 2) /* used prior to 1.7 and retained for backward compatibility */ #define LWS_SEND_BUFFER_PRE_PADDING LWS_PRE #define LWS_SEND_BUFFER_POST_PADDING 0 @@ -3709,6 +3771,15 @@ enum lws_write_protocol { * decode the content if used */ }; +/* used with LWS_CALLBACK_CHILD_WRITE_VIA_PARENT */ + +struct lws_write_passthru { + struct lws *wsi; + unsigned char *buf; + size_t len; + enum lws_write_protocol wp; +}; + /** * lws_write() - Apply protocol then write data to client @@ -4053,7 +4124,11 @@ typedef enum { LWS_ADOPT_RAW_FILE_DESC = 0, /* convenience constant */ LWS_ADOPT_HTTP = 1, /* flag: absent implies RAW */ LWS_ADOPT_SOCKET = 2, /* flag: absent implies file descr */ - LWS_ADOPT_ALLOW_SSL = 4 /* flag: if set requires LWS_ADOPT_SOCKET */ + LWS_ADOPT_ALLOW_SSL = 4, /* flag: if set requires LWS_ADOPT_SOCKET */ + LWS_ADOPT_WS_PARENTIO = 8, /* flag: ws mode parent handles IO + * if given must be only flag + * wsi put directly into ws mode + */ } lws_adoption_type; typedef union { @@ -4411,6 +4486,32 @@ lws_get_parent(const struct lws *wsi); LWS_VISIBLE LWS_EXTERN struct lws * LWS_WARN_UNUSED_RESULT lws_get_child(const struct lws *wsi); +/** + * lws_parent_carries_io() - mark wsi as needing to send messages via parent + * + * \param wsi: child lws connection + */ + +LWS_VISIBLE LWS_EXTERN void +lws_set_parent_carries_io(struct lws *wsi); + +LWS_VISIBLE LWS_EXTERN void * +lws_get_opaque_parent_data(const struct lws *wsi); + +LWS_VISIBLE LWS_EXTERN void +lws_set_opaque_parent_data(struct lws *wsi, void *data); + +LWS_VISIBLE LWS_EXTERN int +lws_get_child_pending_on_writable(const struct lws *wsi); + +LWS_VISIBLE LWS_EXTERN void +lws_clear_child_pending_on_writable(struct lws *wsi); + +LWS_VISIBLE LWS_EXTERN int +lws_get_close_length(struct lws *wsi); + +LWS_VISIBLE LWS_EXTERN unsigned char * +lws_get_close_payload(struct lws *wsi); /* * \deprecated DEPRECATED Note: this is not normally needed as a user api. @@ -4448,11 +4549,20 @@ lws_send_pipe_choked(struct lws *wsi); /** * lws_is_final_fragment() - tests if last part of ws message + * * \param wsi: lws connection */ LWS_VISIBLE LWS_EXTERN int lws_is_final_fragment(struct lws *wsi); +/** + * lws_is_first_fragment() - tests if first part of ws message + * + * \param wsi: lws connection + */ +LWS_VISIBLE LWS_EXTERN int +lws_is_first_fragment(struct lws *wsi); + /** * lws_get_reserved_bits() - access reserved bits of ws frame * \param wsi: lws connection diff --git a/lib/output.c b/lib/output.c index be0c0108..9f72443e 100644 --- a/lib/output.c +++ b/lib/output.c @@ -250,6 +250,23 @@ LWS_VISIBLE int lws_write(struct lws *wsi, unsigned char *buf, size_t len, int pre = 0, n; size_t orig_len = len; + if (wsi->parent_carries_io) { + struct lws_write_passthru pas; + + pas.buf = buf; + pas.len = len; + pas.wp = wp; + pas.wsi = wsi; + + if (wsi->parent->protocol->callback(wsi->parent, + LWS_CALLBACK_CHILD_WRITE_VIA_PARENT, + wsi->parent->user_space, + (void *)&pas, 0)) + return 1; + + return len; + } + lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_API_LWS_WRITE, 1); if ((int)len < 0) { diff --git a/lib/parsers.c b/lib/parsers.c index e41bdf02..172de1df 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -385,6 +385,9 @@ lws_hdr_fragment_length(struct lws *wsi, enum lws_token_indexes h, int frag_idx) { int n; + if (!wsi->u.hdr.ah) + return 0; + n = wsi->u.hdr.ah->frag_index[h]; if (!n) return 0; @@ -402,6 +405,9 @@ LWS_VISIBLE int lws_hdr_total_length(struct lws *wsi, enum lws_token_indexes h) int n; int len = 0; + if (!wsi->u.hdr.ah) + return 0; + n = wsi->u.hdr.ah->frag_index[h]; if (!n) return 0; @@ -417,7 +423,12 @@ LWS_VISIBLE int lws_hdr_copy_fragment(struct lws *wsi, char *dst, int len, enum lws_token_indexes h, int frag_idx) { int n = 0; - int f = wsi->u.hdr.ah->frag_index[h]; + int f; + + if (!wsi->u.hdr.ah) + return -1; + + f = wsi->u.hdr.ah->frag_index[h]; if (!f) return -1; @@ -448,6 +459,9 @@ LWS_VISIBLE int lws_hdr_copy(struct lws *wsi, char *dst, int len, if (toklen >= len) return -1; + if (!wsi->u.hdr.ah) + return -1; + n = wsi->u.hdr.ah->frag_index[h]; if (!n) return 0; @@ -1145,6 +1159,7 @@ handle_first: wsi->u.ws.rsv_first_msg = (c & 0x70); wsi->u.ws.frame_is_binary = wsi->u.ws.opcode == LWSWSOPC_BINARY_FRAME; + wsi->u.ws.first_fragment = 1; break; case 3: case 4: @@ -1492,6 +1507,7 @@ drain_extension: /* eff_buf may be pointing somewhere completely different now, * it's the output */ + wsi->u.ws.first_fragment = 0; if (n < 0) { /* * we may rely on this to get RX, just drop connection diff --git a/lib/pollfd.c b/lib/pollfd.c index 7c3a0efa..f3f2fbba 100644 --- a/lib/pollfd.c +++ b/lib/pollfd.c @@ -236,6 +236,11 @@ remove_wsi_socket_from_fds(struct lws *wsi) #endif int m, ret = 0; + if (wsi->parent_carries_io) { + lws_same_vh_protocol_remove(wsi); + return 0; + } + #if !defined(_WIN32) && !defined(LWS_WITH_ESP8266) if (wsi->desc.sockfd > context->max_fds) { lwsl_err("fd %d too high (%d)\n", wsi->desc.sockfd, context->max_fds); @@ -348,6 +353,16 @@ lws_callback_on_writable(struct lws *wsi) if (wsi->socket_is_permanently_unusable) return 0; + if (wsi->parent_carries_io) { + int n = lws_callback_on_writable(wsi->parent); + + if (n < 0) + return n; + + wsi->parent_pending_cb_on_writable = 1; + return 1; + } + pt = &wsi->context->pt[(int)wsi->tsi]; lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_WRITEABLE_CB_REQ, 1); #if defined(LWS_WITH_STATS) diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 1c903d79..a527661b 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -1484,6 +1484,7 @@ struct _lws_websocket_related { unsigned int rx_draining_ext:1; unsigned int tx_draining_ext:1; unsigned int send_check_ping:1; + unsigned int first_fragment:1; }; #ifdef LWS_WITH_CGI @@ -1588,6 +1589,7 @@ struct lws { struct lws_access_log access_log; #endif void *user_space; + void *opaque_parent_data; /* rxflow handling */ unsigned char *rxflow_buffer; /* truncated send handling */ @@ -1651,6 +1653,8 @@ struct lws { unsigned int told_user_closed:1; unsigned int waiting_to_send_close_frame:1; unsigned int ipv6:1; + unsigned int parent_carries_io:1; + unsigned int parent_pending_cb_on_writable:1; #if defined(LWS_WITH_ESP8266) unsigned int pending_send_completion:3; diff --git a/lib/server.c b/lib/server.c index 78b2221a..668f6f1f 100644 --- a/lib/server.c +++ b/lib/server.c @@ -1284,6 +1284,57 @@ transaction_result_n: #endif } +static int +lws_server_init_wsi_for_ws(struct lws *wsi) +{ + int n; + + wsi->state = LWSS_ESTABLISHED; + lws_restart_ws_ping_pong_timer(wsi); + + /* + * create the frame buffer for this connection according to the + * size mentioned in the protocol definition. If 0 there, use + * a big default for compatibility + */ + + n = wsi->protocol->rx_buffer_size; + if (!n) + n = wsi->context->pt_serv_buf_size; + n += LWS_PRE; + wsi->u.ws.rx_ubuf = lws_malloc(n + 4 /* 0x0000ffff zlib */); + if (!wsi->u.ws.rx_ubuf) { + lwsl_err("Out of Mem allocating rx buffer %d\n", n); + return 1; + } + wsi->u.ws.rx_ubuf_alloc = n; + lwsl_debug("Allocating RX buffer %d\n", n); + +#if LWS_POSIX && !defined(LWS_WITH_ESP32) + if (!wsi->parent_carries_io) + if (setsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_SNDBUF, + (const char *)&n, sizeof n)) { + lwsl_warn("Failed to set SNDBUF to %d", n); + return 1; + } +#endif + + /* notify user code that we're ready to roll */ + + if (wsi->protocol->callback) + if (wsi->protocol->callback(wsi, LWS_CALLBACK_ESTABLISHED, + wsi->user_space, +#ifdef LWS_OPENSSL_SUPPORT + wsi->ssl, +#else + NULL, +#endif + 0)) + return 1; + + return 0; +} + int lws_handshake_server(struct lws *wsi, unsigned char **buf, size_t len) { @@ -1652,49 +1703,10 @@ upgrade_ws: wsi->u.hdr = hdr; lws_pt_unlock(pt); - lws_restart_ws_ping_pong_timer(wsi); - - /* - * create the frame buffer for this connection according to the - * size mentioned in the protocol definition. If 0 there, use - * a big default for compatibility - */ - - n = wsi->protocol->rx_buffer_size; - if (!n) - n = context->pt_serv_buf_size; - n += LWS_PRE; - wsi->u.ws.rx_ubuf = lws_malloc(n + 4 /* 0x0000ffff zlib */); - if (!wsi->u.ws.rx_ubuf) { - lwsl_err("Out of Mem allocating rx buffer %d\n", n); - return 1; - } - wsi->u.ws.rx_ubuf_alloc = n; - lwsl_debug("Allocating RX buffer %d\n", n); -#if LWS_POSIX && !defined(LWS_WITH_ESP32) - if (setsockopt(wsi->desc.sockfd, SOL_SOCKET, SO_SNDBUF, - (const char *)&n, sizeof n)) { - lwsl_warn("Failed to set SNDBUF to %d", n); - return 1; - } -#endif - + lws_server_init_wsi_for_ws(wsi); lwsl_parser("accepted v%02d connection\n", wsi->ietf_spec_revision); - /* notify user code that we're ready to roll */ - - if (wsi->protocol->callback) - if (wsi->protocol->callback(wsi, LWS_CALLBACK_ESTABLISHED, - wsi->user_space, -#ifdef LWS_OPENSSL_SUPPORT - wsi->ssl, -#else - NULL, -#endif - 0)) - return 1; - /* !!! drop ah unreservedly after ESTABLISHED */ if (!wsi->more_rx_waiting) { lws_header_table_force_to_detachable_state(wsi); @@ -1782,6 +1794,8 @@ lws_create_new_server_wsi(struct lws_vhost *vhost) new_wsi->user_space = NULL; new_wsi->ietf_spec_revision = 0; new_wsi->desc.sockfd = LWS_SOCK_INVALID; + new_wsi->position_in_fds_table = -1; + vhost->context->count_wsi_allocated++; /* @@ -1889,7 +1903,7 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type, int n, ssl = 0; if (!new_wsi) { - if (type & LWS_ADOPT_SOCKET) + if (type & LWS_ADOPT_SOCKET && !(type & LWS_ADOPT_WS_PARENTIO)) compatible_close(fd.sockfd); return NULL; } @@ -1900,6 +1914,9 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type, new_wsi->parent = parent; new_wsi->sibling_list = parent->child_list; parent->child_list = new_wsi; + + if (type & LWS_ADOPT_WS_PARENTIO) + new_wsi->parent_carries_io = 1; } new_wsi->desc = fd; @@ -1916,6 +1933,15 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type, lwsl_notice("OOM trying to get user_space\n"); goto bail; } + if (type & LWS_ADOPT_WS_PARENTIO) { + new_wsi->desc.sockfd = LWS_SOCK_INVALID; + lwsl_debug("binding to %s\n", new_wsi->protocol->name); + lws_bind_protocol(new_wsi, new_wsi->protocol); + lws_union_transition(new_wsi, LWSCM_WS_SERVING); + lws_server_init_wsi_for_ws(new_wsi); + + return new_wsi; + } } else if (type & LWS_ADOPT_HTTP) /* he will transition later */ new_wsi->protocol = diff --git a/lib/service.c b/lib/service.c index 65868e05..2f703f8b 100644 --- a/lib/service.c +++ b/lib/service.c @@ -329,6 +329,13 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd) user_service: /* one shot */ + if (wsi->parent_carries_io) { + wsi->handling_pollout = 0; + wsi->leave_pollout_active = 0; + + return lws_calllback_as_writeable(wsi); + } + if (pollfd) { int eff = wsi->leave_pollout_active; diff --git a/lwsws/etc-lwsws-conf.d-localhost-EXAMPLE b/lwsws/etc-lwsws-conf.d-localhost-EXAMPLE index 42ac2753..2aa85e58 100644 --- a/lwsws/etc-lwsws-conf.d-localhost-EXAMPLE +++ b/lwsws/etc-lwsws-conf.d-localhost-EXAMPLE @@ -32,6 +32,9 @@ # vhost-specific config options for the protocol # "ws-protocols": [{ + "lws-meta": { + "status": "ok" + }, "dumb-increment-protocol": { "status": "ok" }, diff --git a/plugins/protocol_lws_meta.c b/plugins/protocol_lws_meta.c new file mode 100644 index 00000000..633283fa --- /dev/null +++ b/plugins/protocol_lws_meta.c @@ -0,0 +1,613 @@ +/* + * lws meta protocol handler + * + * Copyright (C) 2017 Andy Green + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation: + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + * MA 02110-1301 USA* + * + */ + +#if !defined (LWS_PLUGIN_STATIC) +#define LWS_DLL +#define LWS_INTERNAL +#include "../lib/libwebsockets.h" +#endif + +#include +#include + +#define MAX_SUBCHANNELS 8 + +enum lws_meta_parser_state { + MP_IDLE, /* in body of message */ + + MP_CMD, /* await cmd */ + + MP_OPEN_SUBCHANNEL_PROTOCOL, + MP_OPEN_SUBCHANNEL_URL, + MP_OPEN_SUBCHANNEL_COOKIE, + + MP_CLOSE_CHID, + MP_CLOSE_LEN, + MP_CLOSE_CODEM, + MP_CLOSE_CODEL, + MP_CLOSE_PAYLOAD, + + MP_WRITE_CHID, +}; + +enum { + PENDING_TYPE_OPEN_RESULT = 0, + PENDING_TYPE_CHILD_CLOSE +}; + +/* + * while we haven't reported the result yet, we keep a linked-list of + * connection opens and their result. + */ + +struct pending_conn { + struct pending_conn *next; + char protocol[123]; + char cookie[8]; + int ch; + int len; + + unsigned char type; +}; + +/* the parent, lws-meta connection */ + +struct per_session_data__lws_meta { + struct lws *wsi[MAX_SUBCHANNELS + 1]; + char told_closing[MAX_SUBCHANNELS + 1]; + struct pending_conn *first; + struct pending_conn *pend; + char suburl[64]; + unsigned char close[126]; + int active_subchannel_tx, active_subchannel_rx; + enum lws_meta_parser_state state; + int pos; + int count_pending; + int round_robin; + int close_status_16; + int close_len; + int which_close; + int ch; +}; + +static int +lws_find_free_channel(struct per_session_data__lws_meta *pss) +{ + int n; + + for (n = 1; n <= MAX_SUBCHANNELS; n++) + if (pss->wsi[n] == NULL) + return n; + + return 0; /* none free */ +} + +static struct lws * +lws_get_channel_wsi(struct per_session_data__lws_meta *pss, int ch) +{ + if (!ch) + return 0; + return pss->wsi[ch]; +} + +static int +lws_get_channel_id(struct lws *wsi) +{ + return (lws_intptr_t)lws_get_opaque_parent_data(wsi); +} + +static void +lws_set_channel_id(struct lws *wsi, int id) +{ + lws_set_opaque_parent_data(wsi, (void *)(lws_intptr_t)id); +} + +static struct pending_conn * +new_pending(struct per_session_data__lws_meta *pss) +{ + struct pending_conn *pend; + + if (pss->count_pending >= MAX_SUBCHANNELS * 2) { + lwsl_notice("too many pending open subchannel\n"); + return NULL; + } + + pss->count_pending++; + + pend = malloc(sizeof(*pend)); + if (!pend) { + lwsl_notice("OOM\n"); + return NULL; + } + + memset(pend, 0, sizeof(*pend)); + + return pend; +} + + +static int +callback_lws_meta(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + struct per_session_data__lws_meta *pss = + (struct per_session_data__lws_meta *)user; + struct lws_write_passthru *pas; + struct pending_conn *pend, *pend1; + struct lws *cwsi; + lws_sock_file_fd_type fd; + unsigned char *bin, buf[LWS_PRE + 512], *start = &buf[LWS_PRE], + *end = &buf[sizeof(buf) - 1], *p = start; + int n, m; + + switch (reason) { + + case LWS_CALLBACK_ESTABLISHED: + lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__); + pss->state = MP_CMD; + pss->pos = 0; + break; + + case LWS_CALLBACK_CLOSED: + break; + + case LWS_CALLBACK_CHILD_CLOSING: + cwsi = (struct lws *)in; + + /* remove it from our tracking */ + pss->wsi[lws_get_channel_id(cwsi)] = NULL; + + if (pss->told_closing[lws_get_channel_id(cwsi)]) { + pss->told_closing[lws_get_channel_id(cwsi)] = 0; + break; + } + + pend = new_pending(pss); + if (!pend) + return -1; + + /* note which channel id */ + pend->ch = lws_get_channel_id(cwsi); + + + if (lws_get_close_length(cwsi)) { + pend->len = lws_get_close_length(cwsi); + memcpy(pend->protocol, lws_get_close_payload(cwsi), + pend->len); + } + + pend->type = PENDING_TYPE_CHILD_CLOSE; + pend->next = pss->first; + pss->first = pend; + + /* + * nothing else will complete from this wsi, so abandon + * tracking in-process messages from this wsi. + */ + + if (pss->active_subchannel_tx == pend->ch) + pss->active_subchannel_tx = 0; + + if (pss->active_subchannel_rx == pend->ch) + pss->active_subchannel_rx = 0; + break; + + case LWS_CALLBACK_SERVER_WRITEABLE: + + if (!pss->active_subchannel_tx) { + + /* not in the middle of a message... + * + * PRIORITY 1: pending open and close notifications + */ + + pend = pss->first; + while (pend && p < end - 128) { + switch (pend->type) { + case PENDING_TYPE_OPEN_RESULT: + lwsl_debug("open result %s %s\n", pend->cookie, pend->protocol); + *p++ = LWS_META_CMD_OPEN_RESULT; + memcpy(p, pend->cookie, + strlen(pend->cookie) + 1); + p += strlen(pend->cookie) + 1; + *p++ = LWS_META_CHANNEL_OFFSET_TRANSPORT + pend->ch; + memcpy(p, pend->protocol, + strlen(pend->protocol) + 1); + p += strlen(pend->protocol) + 1; + break; + case PENDING_TYPE_CHILD_CLOSE: + *p++ = LWS_META_CMD_CLOSE_NOTIFY; + *p++ = LWS_META_CHANNEL_OFFSET_TRANSPORT + pend->ch; + for (n = 0; n < pend->len; n++) + *p++ = pend->protocol[n]; + break; + } + + pss->count_pending--; + pend1 = pend; + pend = pend->next; + free(pend1); + pss->first = pend; + } + + if (p != start) { + if (lws_write(wsi, start, p - start, LWS_WRITE_BINARY) < 0) + return 1; + if (pend) /* still more */ + lws_callback_on_writable(wsi); + break; + } + + /* PRIORITY 2: pick a child to get the writable callback */ + + cwsi = NULL; + for (n = 0; n < MAX_SUBCHANNELS; n++) { + m = ((pss->round_robin + n) % MAX_SUBCHANNELS) + 1; + if (pss->wsi[m] && + lws_get_child_pending_on_writable(pss->wsi[m])) { + pss->round_robin = m; + cwsi = pss->wsi[m]; + break; + } + } + + } else + /* one child is in middle of message, stay with it */ + cwsi = pss->wsi[pss->active_subchannel_tx]; + + if (!cwsi) + break; + + lws_clear_child_pending_on_writable(cwsi); + if (lws_handle_POLLOUT_event(cwsi, NULL)) + return -1; + break; + + case LWS_CALLBACK_RECEIVE: + bin = (unsigned char *)in; + + /* + * at the start of a message, we may have one or more + * lws_meta command blocks. + */ + + while (pss->state != MP_IDLE && + (unsigned int)(bin - (unsigned char *)in) < len) { + + switch (pss->state) { + case MP_IDLE: /* in body of message */ + + if (!lws_is_first_fragment(wsi)) + break; + + pss->state = MP_CMD; + + /* fallthru */ + + case MP_CMD: /* await cmd */ + + pss->pos = 0; + + switch (*bin++) { + case LWS_META_CMD_OPEN_SUBCHANNEL: + + pss->pend = new_pending(pss); + if (!pss->pend) + return -1; + + pss->state = MP_OPEN_SUBCHANNEL_PROTOCOL; + + break; + case LWS_META_CMD_CLOSE_NOTIFY: + case LWS_META_CMD_CLOSE_RQ: + pss->which_close = bin[-1]; + pss->state = MP_CLOSE_CHID; + break; + case LWS_META_CMD_WRITE: + pss->state = MP_WRITE_CHID; + break; + + // open result is also illegal to receive + default: + lwsl_notice("illegal lws_meta cmd 0x%x\n", bin[-1]); + + return -1; + } + + break; + + case MP_OPEN_SUBCHANNEL_PROTOCOL: + pss->pend->protocol[pss->pos++] = *bin++; + if (pss->pos == sizeof(pss->pend->protocol) - 1) { + lwsl_notice("protocol name too long\n"); + return -1; + } + + if (bin[-1] != '\0') + break; + + pss->state = MP_OPEN_SUBCHANNEL_URL; + pss->pos = 0; + break; + + case MP_OPEN_SUBCHANNEL_URL: + pss->suburl[pss->pos++] = *bin++; + if (pss->pos == sizeof(pss->suburl) - 1) { + lwsl_notice("suburl too long\n"); + return -1; + } + + if (bin[-1] != '\0') + break; + + pss->state = MP_OPEN_SUBCHANNEL_COOKIE; + pss->pos = 0; + break; + + case MP_OPEN_SUBCHANNEL_COOKIE: + pss->pend->cookie[pss->pos++] = *bin++; + if (pss->pos == sizeof(pss->pend->cookie) - 1) { + lwsl_notice("cookie too long\n"); + return -1; + } + + if (bin[-1] != '\0') + break; + + lwsl_debug("%s: %s / %s / %s\n", __func__, + pss->pend->protocol, + pss->suburl, + pss->pend->cookie); + + pss->pend->ch = lws_find_free_channel(pss); + if (pss->pend->ch) { + + fd.sockfd = 0; // not going to be used + + cwsi = lws_adopt_descriptor_vhost( + lws_get_vhost(wsi), + LWS_ADOPT_WS_PARENTIO, + fd, pss->pend->protocol, + wsi); + + if (!cwsi) { + lwsl_notice("open failed\n"); + pss->pend->ch = 0; + } else { + pss->wsi[pss->pend->ch] = cwsi; + lws_set_channel_id(cwsi, + pss->pend->ch); + lwsl_debug("cwsi %p on parent %p open OK %s\n", + cwsi, wsi, pss->pend->protocol); + } + + } else + lwsl_notice("no free subchannels\n"); + + pss->pend->type = PENDING_TYPE_OPEN_RESULT; + pss->pend->next = pss->first; + pss->first = pss->pend; + + lws_callback_on_writable(wsi); + + pss->state = MP_CMD; + pss->pos = 0; + break; + + case MP_CLOSE_CHID: + pss->ch = (*bin++) - LWS_META_CHANNEL_OFFSET_TRANSPORT; + pss->state = MP_CLOSE_LEN; + pss->pos = 0; + break; + case MP_CLOSE_LEN: + pss->close_len = (*bin++) - LWS_META_CHANNEL_OFFSET_TRANSPORT; + lwsl_debug("close len %d\n", pss->close_len); + pss->state = MP_CLOSE_CODEM; + pss->pos = 0; + break; + case MP_CLOSE_CODEM: + pss->close[pss->pos++] = *bin; + pss->close_status_16 = (*bin++) * 256; + pss->state = MP_CLOSE_CODEL; + break; + case MP_CLOSE_CODEL: + pss->close[pss->pos++] = *bin; + pss->close_status_16 |= *bin++; + pss->state = MP_CLOSE_PAYLOAD; + break; + case MP_CLOSE_PAYLOAD: + pss->close[pss->pos++] = *bin++; + if (pss->pos == sizeof(pss->close) - 1) { + lwsl_notice("close payload too long\n"); + return -1; + } + if (--pss->close_len) + break; + + cwsi = lws_get_channel_wsi(pss, pss->ch); + if (!cwsi) + lwsl_notice("received close (%d) for unknown ch %d\n", pss->which_close, pss->ch); + else { + + if (pss->which_close == LWS_META_CMD_CLOSE_RQ) { + + if (lws_get_protocol(cwsi)->callback(cwsi, + LWS_CALLBACK_WS_PEER_INITIATED_CLOSE, + lws_wsi_user(cwsi), + &pss->close, pss->pos)) + return -1; + + /* + * we need to echo back the close payload + * when we send the close notification + */ + + lws_close_reason(cwsi, + pss->close_status_16, + &pss->close[2], pss->pos - 2); + } + + /* so force him closed */ + + lws_set_timeout(cwsi, + PENDING_TIMEOUT_KILLED_BY_PARENT, + LWS_TO_KILL_SYNC); + } + + pss->state = MP_CMD; + break; + + case MP_WRITE_CHID: + pss->active_subchannel_rx = (*bin++) - + LWS_META_CHANNEL_OFFSET_TRANSPORT; + pss->state = MP_IDLE; + break; + } + + } + + len -= bin - (unsigned char *)in; + + if (!len) + break; + + cwsi = lws_get_channel_wsi(pss, pss->active_subchannel_rx); + if (!cwsi) { + lwsl_notice("invalid ch %d\n", pss->active_subchannel_rx); + + return -1; + } + + lwsl_debug("%s: RX len %d\n", __func__, (int)len); + + if (lws_get_protocol(cwsi)->callback( + cwsi, + LWS_CALLBACK_RECEIVE, + lws_wsi_user(cwsi), + bin, len)) { + lws_set_timeout(cwsi, + PENDING_TIMEOUT_KILLED_BY_PARENT, + LWS_TO_KILL_SYNC); + } + + if (lws_is_final_fragment(wsi)) { + pss->active_subchannel_rx = 0; + pss->state = MP_CMD; + } + break; + + /* + * child wrote something via lws_write.... which passed it up to us to + * deal with, because we are the parent. Prepend two bytes for + * lws-meta command and channel index, and send it out on parent + */ + case LWS_CALLBACK_CHILD_WRITE_VIA_PARENT: + pas = in; + bin = ((unsigned char *)pas->buf); + + if ((pas->wp & 7) == 4 /*LWS_WRITE_CLOSE */) { + *p++ = LWS_META_CMD_CLOSE_NOTIFY; + *p++ = LWS_META_CHANNEL_OFFSET_TRANSPORT + + lws_get_channel_id(pas->wsi); + *p++ = pas->len - 2 + LWS_META_CHANNEL_OFFSET_TRANSPORT; + *p++ = *bin++; + *p++ = *bin++; + for (n = 0; n < (int)pas->len - 2; n++) + *p++ = bin[n]; + + if (lws_write(wsi, start, p - start, LWS_WRITE_BINARY) < 0) + return 1; + + pss->told_closing[lws_get_channel_id(pas->wsi)] = 1; + break; + } + + if ((pas->wp & 7) == LWS_WRITE_TEXT || + (pas->wp & 7) == LWS_WRITE_BINARY) { + + if (pas->wp & LWS_WRITE_NO_FIN) + pss->active_subchannel_tx = + lws_get_channel_id(pas->wsi); + + /* start of message, prepend the subchannel id */ + + bin -= 2; + bin[0] = LWS_META_CMD_WRITE; + bin[1] = lws_get_channel_id(pas->wsi) + + LWS_META_CHANNEL_OFFSET_TRANSPORT; + if (lws_write(wsi, bin, pas->len + 2, pas->wp) < 0) + return 1; + + } else + if (lws_write(wsi, bin, pas->len, pas->wp) < 0) + return 1; + + /* track EOM */ + + if (!(pas->wp & LWS_WRITE_NO_FIN)) + pss->active_subchannel_tx = 0; + break; + + default: + break; + } + + return 0; +} + +#define LWS_PLUGIN_PROTOCOL_LWS_META { \ + "lws-meta", \ + callback_lws_meta, \ + sizeof(struct per_session_data__lws_meta), \ + 1024, /* rx buf size must be >= permessage-deflate rx size */ \ + } + +#if !defined (LWS_PLUGIN_STATIC) + +static const struct lws_protocols protocols[] = { + LWS_PLUGIN_PROTOCOL_LWS_META +}; + +LWS_EXTERN LWS_VISIBLE int +init_protocol_lws_meta(struct lws_context *context, + struct lws_plugin_capability *c) +{ + if (c->api_magic != LWS_PLUGIN_API_MAGIC) { + lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC, + c->api_magic); + return 1; + } + + c->protocols = protocols; + c->count_protocols = ARRAY_SIZE(protocols); + c->extensions = NULL; + c->count_extensions = 0; + + return 0; +} + +LWS_EXTERN LWS_VISIBLE int +destroy_protocol_lws_meta(struct lws_context *context) +{ + return 0; +} +#endif diff --git a/test-server/lws-common.js b/test-server/lws-common.js index 917aedf5..1e94eaae 100644 --- a/test-server/lws-common.js +++ b/test-server/lws-common.js @@ -108,6 +108,285 @@ function removeEvent( obj, type, fn ) { * end of grayOut related stuff */ +/* + * lws-meta helpers + */ + +var lws_meta_cmd = { + OPEN_SUBCHANNEL: 0x41, + /**< Client requests to open new subchannel + */ + OPEN_RESULT: 0x42, + /**< Result of client request to open new subchannel */ + CLOSE_NOT: 0x43, + CLOSE_RQ: 0x44, + /**< client requests to close a subchannel */ + WRITE: 0x45, + /**< connection writes something to specific channel index */ + RX: 0x46, +}; + +function new_ws(urlpath, protocol) +{ + if (typeof MozWebSocket != "undefined") + return new MozWebSocket(urlpath, protocol); + + return new WebSocket(urlpath, protocol); +} + +function lws_meta_ws() { + var real; + + var channel_id_to_child; + var pending_children; + var active_children; +} + +function lws_meta_ws_child() { + var onopen; + var onmessage; + var onclose; + + var channel_id; + + var subprotocol; + var suburl; + var cookie; + + var extensions; + + var parent; +} + +lws_meta_ws_child.prototype.send = function(data) +{ + + if (typeof data == "string") { + data = String.fromCharCode(lws_meta_cmd.WRITE) + + String.fromCharCode(this.channel_id) + + data; + + return this.parent.real.send(data); + } + + { + + var ab = new Uint8Array(data.length + 2); + + ab[0] = lws_meta_cmd.WRITE; + ab[1] = this.channel_id; + ab.set(data, 2); + + return this.parent.real.send(ab); + } +} + +lws_meta_ws_child.prototype.close = function(close_code, close_string) +{ + var pkt = new Uint8Array(129), m = 0, pkt1; + + pkt[m++] = lws_meta_cmd.CLOSE_RQ; + pkt[m++] = this.channel_id; + + pkt[m++] = close_string.length + 0x20; + + pkt[m++] = close_code / 256; + pkt[m++] = close_code % 256; + + for (i = 0; i < close_string.length; i++) + pkt[m++] = close_string.charCodeAt(i); + + pkt1 = new Uint8Array(m); + for (n = 0; n < m; n++) + pkt1[n] = pkt[n]; + + this.parent.real.send(pkt1.buffer); +} + +/* make a real ws connection using lws_meta*/ +lws_meta_ws.prototype.new_parent = function(urlpath) +{ + var n, i, m = 0, pkt1; + + this.ordinal = 1; + this.pending_children = []; + this.active_children = []; + this.real = new_ws(urlpath, "lws-meta"); + + this.real.binaryType = 'arraybuffer'; + this.real.myparent = this; + + this.real.onopen = function() { + pkt = new Uint8Array(1024); + var n, i, m = 0, pkt1; + console.log("real open - pending children " + this.myparent.pending_children.length); + for (n = 0; n < this.myparent.pending_children.length; n++) { + + var p = this.myparent.pending_children[n]; + + pkt[m++] = lws_meta_cmd.OPEN_SUBCHANNEL; + for (i = 0; i < p.subprotocol.length; i++) + pkt[m++] = p.subprotocol.charCodeAt(i); + pkt[m++] = 0; + for (i = 0; i < p.suburl.length; i++) + pkt[m++] = p.suburl.charCodeAt(i); + pkt[m++] = 0; + for (i = 0; i < p.cookie.length; i++) + pkt[m++] = p.cookie.charCodeAt(i); + pkt[m++] = 0; + } + + pkt1 = new Uint8Array(m); + for (n = 0; n < m; n++) + pkt1[n] = pkt[n]; + + console.log(this.myparent.pending_children[0].subprotocol); + console.log(pkt1); + + this.send(pkt1.buffer); + } + + + this.real.onmessage = function(msg) { + + if (typeof msg.data != "string") { + var ba = new Uint8Array(msg.data), n = 0; + + while (n < ba.length) { + + switch (ba[n++]) { + case lws_meta_cmd.OPEN_RESULT: + { + var m = 0, cookie = "", protocol = "", ch = 0; + var ws = this.myparent; + /* cookie NUL + * channel index + 0x20 + * protocol NUL + */ + while (ba[n]) + cookie = cookie + String.fromCharCode(ba[n++]); + n++; + ch = ba[n++]; + + while (ba[n]) + protocol = protocol + String.fromCharCode(ba[n++]); + + console.log("open result " + cookie + " " + protocol + " " + ch + " pending len " + ws.pending_children.length); + + for (m = 0; m < ws.pending_children.length; m++) { + if (ws.pending_children[m].cookie == cookie) { + var newchild = ws.pending_children[m]; + + /* found it */ + ws.pending_children[m].channel_id = ch; + /* add to active children array */ + ws.active_children.push(ws.pending_children[m]); + /* remove from pending children array */ + ws.pending_children.splice(m, 1); + + newchild.parent = ws; + newchild.extensions = this.extensions; + + newchild.onopen(); + + console.log("made active " + cookie); + break; + } + } + break; + } + + case lws_meta_cmd.CLOSE_NOT: + { + var code = 0, str = "", ch = 0, m, le; + var ba = new Uint8Array(msg.data); + /* + * BYTE: channel + * BYTE: MSB status code + * BYTE: LSB status code + * BYTES: rest of message is close status string + */ + + ch = ba[n++]; + le = ba[n++] - 0x20; + code = ba[n++] * 256; + code += ba[n++]; + + while (le--) + str += String.fromCharCode(ba[n++]); + + console.log("channel id " + ch + " code " + code + " str " + str + " len " + str.length); + + for (m = 0; m < this.myparent.active_children.length; m++) + if (this.myparent.active_children[m].channel_id == ch) { + var child = this.myparent.active_children[m]; + var ms = new CloseEvent("close", { code:code, reason:str } ); + + /* reply with close ack */ + this.send(msg.data); + + if (child.onclose) + child.onclose(ms); + + this.myparent.active_children.splice(m, 1); + break; + } + + } + } // switch + } + } else { + if (msg.data.charCodeAt(0) == lws_meta_cmd.WRITE ) { + var ch = msg.data.charCodeAt(1), m, ms; + var ws = this.myparent, ms; + + for (m = 0; m < ws.active_children.length; m++) { + if (ws.active_children[m].channel_id == ch) { + ms = new MessageEvent("WebSocket", { data: msg.data.substr(2, msg.data.length - 2) } ); + if (ws.active_children[m].onmessage) + ws.active_children[m].onmessage(ms); + break; + } + } + } + } + } + this.real.onclose = function() { + var ws = this.myparent, m; + for (m = 0; m < ws.active_children.length; m++) { + var child = ws.active_children[m]; + var ms = new CloseEvent("close", { code:1000, reason:"parent closed" } ); + + if (child.onclose) + child.onclose(ms); + } + } + +} + + + +/* make a child connection using existing lws_meta real ws connection */ +lws_meta_ws.prototype.new_ws = function(suburl, protocol) +{ + var ch = new lws_meta_ws_child(); + + ch.suburl = suburl; + ch.subprotocol = protocol; + ch.cookie = "C" + this.ordinal++; + + this.pending_children.push(ch); + + if (this.real.readyState == 1) + this.real.onopen(); + + return ch; +} + + +/* + * end of lws-meta helpers + */ function lws_san(s) { diff --git a/test-server/test-server-v2.0.c b/test-server/test-server-v2.0.c index 9623df21..d26e2b2a 100644 --- a/test-server/test-server-v2.0.c +++ b/test-server/test-server-v2.0.c @@ -226,8 +226,15 @@ static const struct lws_protocol_vhost_options pvo_opt4 = { * linked-list. We can also give the plugin per-vhost options here. */ -static const struct lws_protocol_vhost_options pvo_4 = { +static const struct lws_protocol_vhost_options pvo_5 = { NULL, + NULL, + "lws-meta", + "" /* ignored, just matches the protocol name above */ +}; + +static const struct lws_protocol_vhost_options pvo_4 = { + &pvo_5, &pvo_opt4, /* set us as the protocol who gets raw connections */ "protocol-lws-raw-test", "" /* ignored, just matches the protocol name above */ diff --git a/test-server/test-server.c b/test-server/test-server.c index 0765cfbe..dd02fd24 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -68,6 +68,7 @@ char crl_path[1024] = ""; #define LWS_PLUGIN_STATIC #include "../plugins/protocol_lws_mirror.c" #include "../plugins/protocol_lws_status.c" +#include "../plugins/protocol_lws_meta.c" /* singlethreaded version --> no locks */ @@ -104,6 +105,8 @@ enum demo_protocols { PROTOCOL_LWS_MIRROR, PROTOCOL_LWS_STATUS, + PROTOCOL_LWS_META, + /* always last */ DEMO_PROTOCOL_COUNT }; @@ -130,6 +133,8 @@ static struct lws_protocols protocols[] = { }, LWS_PLUGIN_PROTOCOL_MIRROR, LWS_PLUGIN_PROTOCOL_LWS_STATUS, + + LWS_PLUGIN_PROTOCOL_LWS_META, { NULL, NULL, 0, 0 } /* terminator */ }; diff --git a/test-server/test.html b/test-server/test.html index e7868eff..46426180 100644 --- a/test-server/test.html +++ b/test-server/test.html @@ -533,21 +533,31 @@ if (params.mirror) mirror_name = params.mirror; console.log(mirror_name); + -function new_ws(urlpath, protocol) -{ - if (typeof MozWebSocket != "undefined") - return new MozWebSocket(urlpath, protocol); +/* + * if using lws-meta to carry the other ws connections, declare the + * parent connection object and start its connection to the server. + * + * These helpers are defined in lws-common.js + */ - return new WebSocket(get_appropriate_ws_url(urlpath), protocol); -} + +var lws_meta = new lws_meta_ws(); +lws_meta.new_parent(get_appropriate_ws_url("?mirror=" + mirror_name)); document.getElementById("number").textContent = get_appropriate_ws_url(mirror_name); /* dumb increment protocol */ + + /* + * to connect via an lws-meta connection, start the connection using + * lws_meta.new_ws(). To connect by independent connection, start + * the connection using just new_ws() + */ - var socket_di = new_ws("", "dumb-increment-protocol"); + var socket_di = lws_meta.new_ws("", "dumb-increment-protocol"); try { socket_di.onopen = function() { @@ -571,8 +581,8 @@ document.getElementById("number").textContent = get_appropriate_ws_url(mirror_na var socket_status, jso, s; - socket_status = new_ws(get_appropriate_ws_url(""), "lws-status"); + socket_status = lws_meta.new_ws(get_appropriate_ws_url(""), "lws-status"); try { socket_status.onopen = function() { @@ -585,6 +595,8 @@ document.getElementById("number").textContent = get_appropriate_ws_url(mirror_na socket_status.onmessage =function got_packet(msg) { var s; + console.log(msg.data); + jso = JSON.parse(msg.data); document.getElementById("servinfo").innerHTML = @@ -637,7 +649,9 @@ var socket_ot; function ot_open() { - socket_ot = new_ws(get_appropriate_ws_url(""), "dumb-increment-protocol"); + socket_ot = lws_meta.new_ws(get_appropriate_ws_url(""), "dumb-increment-protocol"); + + console.log("ot_open"); try { socket_ot.onopen = function() { @@ -646,6 +660,7 @@ function ot_open() { document.getElementById("ot_open_btn").disabled = true; document.getElementById("ot_close_btn").disabled = false; document.getElementById("ot_req_close_btn").disabled = false; + console.log("ot_open.onopen"); } socket_ot.onclose = function(e){ @@ -680,7 +695,7 @@ function ot_req_close() { var socket_lm; var color = "#000000"; - socket_lm = new_ws(get_appropriate_ws_url("?mirror=" + mirror_name), + socket_lm = lws_meta.new_ws(get_appropriate_ws_url("?mirror=" + mirror_name), "lws-mirror-protocol"); try {