mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-16 00:00:07 +01:00

This provides a way to get ahold of LWS_WITH_CONMON telemetry from Secure Streams, it works the same with direct onward connections or via the proxy. You can mark streamtypes with a "perf": true policy attribute... this causes the onward connections on those streamtypes to collect information about the connection performance, and the unsorted DNS results. Streams with that policy attribute receive extra data in their rx callback, with the LWSSS_FLAG_PERF_JSON flag set on it, containing JSON describing the performance of the onward connection taken from CONMON data, in a JSON representation. Streams without the "perf" attribute set never receive this extra rx. The received JSON is based on the CONMON struct info and looks like {"peer":"46.105.127.147","dns_us":596,"sockconn_us":31382,"tls_us":28180,"txn_resp_us:23015,"dns":["2001:41d0:2:ee93::1","46.105.127.147"]} A new minimal example minimal-secure-streams-perf is added that collects this data on an HTTP GET from warmcat.com, and is built with a -client version as well if LWS_WITH_SECURE_STREAMS_PROXY_API is set, that operates via the ss proxy and produces the same result at the client.
790 lines
19 KiB
C
790 lines
19 KiB
C
/*
|
|
* libwebsockets - small server side websockets and web server implementation
|
|
*
|
|
* Copyright (C) 2019 - 2020 Andy Green <andy@warmcat.com>
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to
|
|
* deal in the Software without restriction, including without limitation the
|
|
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
* sell copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
* IN THE SOFTWARE.
|
|
*
|
|
*
|
|
* When the user code is in a different process, a non-tls unix domain socket
|
|
* proxy is used to asynchronusly transfer buffers in each direction via the
|
|
* network stack, without explicit IPC
|
|
*
|
|
* user_process{ [user code] | shim | socket-}------ lws_process{ lws }
|
|
*
|
|
* Lws exposes a listening unix domain socket in this case, the user processes
|
|
* connect to it and pass just info.streamtype in an initial tx packet. All
|
|
* packets are prepended by a 1-byte type field when used in this mode. See
|
|
* lws-secure-streams.h for documentation and definitions.
|
|
*
|
|
* Proxying in either direction can face the situation it cannot send the onward
|
|
* packet immediately and is subject to separating the write request from the
|
|
* write action. To make the best use of memory, a single preallocated buffer
|
|
* stashes pending packets in all four directions (c->p, p->c, p->ss, ss->p).
|
|
* This allows it to adapt to different traffic patterns without wasted areas
|
|
* dedicated to traffic that isn't coming in a particular application.
|
|
*
|
|
* A shim is provided to monitor the process' unix domain socket and regenerate
|
|
* the secure sockets api there with callbacks happening in the process thread
|
|
* context.
|
|
*
|
|
* This file implements the listening unix domain socket proxy... this code is
|
|
* only going to run on a Linux-class device with its implications about memory
|
|
* availability.
|
|
*/
|
|
|
|
#include <private-lib-core.h>
|
|
|
|
struct raw_pss {
|
|
struct conn *conn;
|
|
};
|
|
|
|
/*
|
|
* Proxy - onward secure-stream handler
|
|
*/
|
|
|
|
typedef struct ss_proxy_onward {
|
|
lws_ss_handle_t *ss;
|
|
struct conn *conn;
|
|
} ss_proxy_t;
|
|
|
|
void
|
|
lws_proxy_clean_conn_ss(struct lws *wsi)
|
|
{
|
|
#if 0
|
|
struct conn *conn;
|
|
|
|
if (!wsi)
|
|
return;
|
|
|
|
conn = (struct conn *)wsi->a.opaque_user_data;
|
|
|
|
if (conn && conn->ss)
|
|
conn->ss->wsi = NULL;
|
|
#endif
|
|
}
|
|
|
|
|
|
void
|
|
ss_proxy_onward_link_req_writeable(lws_ss_handle_t *h_onward)
|
|
{
|
|
ss_proxy_t *m = (ss_proxy_t *)&h_onward[1];
|
|
|
|
if (m->conn->wsi) /* if possible, request client conn write */
|
|
lws_callback_on_writable(m->conn->wsi);
|
|
}
|
|
|
|
int
|
|
__lws_ss_proxy_bind_ss_to_conn_wsi(void *parconn, size_t dsh_size)
|
|
{
|
|
struct conn *conn = (struct conn *)parconn;
|
|
struct lws_context_per_thread *pt;
|
|
|
|
if (!conn || !conn->wsi || !conn->ss)
|
|
return -1;
|
|
|
|
pt = &conn->wsi->a.context->pt[(int)conn->wsi->tsi];
|
|
|
|
if (lws_fi(&conn->ss->fic, "ssproxy_dsh_create_oom"))
|
|
return -1;
|
|
conn->dsh = lws_dsh_create(&pt->ss_dsh_owner, dsh_size, 2);
|
|
if (!conn->dsh)
|
|
return -1;
|
|
|
|
__lws_lc_tag_append(&conn->wsi->lc, lws_ss_tag(conn->ss));
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Onward secure streams payload interface */
|
|
|
|
static lws_ss_state_return_t
|
|
ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
|
|
{
|
|
ss_proxy_t *m = (ss_proxy_t *)userobj;
|
|
const char *rsp = NULL;
|
|
int n;
|
|
|
|
// lwsl_notice("%s: len %d\n", __func__, (int)len);
|
|
|
|
/*
|
|
* The onward secure stream connection has received something.
|
|
*/
|
|
|
|
if (m->ss->rideshare != m->ss->policy && m->ss->rideshare) {
|
|
rsp = m->ss->rideshare->streamtype;
|
|
flags |= LWSSS_FLAG_RIDESHARE;
|
|
}
|
|
|
|
/*
|
|
* Apply SSS framing around this chunk of RX and stash it in the dsh
|
|
* in ss -> proxy [ -> client] direction. This can fail...
|
|
*/
|
|
|
|
if (lws_fi(&m->ss->fic, "ssproxy_dsh_rx_queue_oom"))
|
|
n = 1;
|
|
else
|
|
n = lws_ss_serialize_rx_payload(m->conn->dsh, buf, len,
|
|
flags, rsp);
|
|
if (n)
|
|
/*
|
|
* We couldn't buffer this rx, eg due to OOM, let's escalate it
|
|
* to be a "loss of connection", which it basically is...
|
|
*/
|
|
return LWSSSSRET_DISCONNECT_ME;
|
|
|
|
/*
|
|
* Manage rx flow on the SS (onward) side according to our situation
|
|
* in the dsh holding proxy->client serialized forwarding rx
|
|
*/
|
|
|
|
if (m->ss->policy->proxy_buflen_rxflow_on_above && m->ss->wsi &&
|
|
m->conn->dsh->oha[KIND_SS_TO_P].total_size >
|
|
m->ss->policy->proxy_buflen_rxflow_on_above) {
|
|
lwsl_notice("%s: %s: rxflow disabling rx\n", __func__,
|
|
lws_wsi_tag(m->ss->wsi));
|
|
/* stop receiving taking in rx once above the threshold */
|
|
lws_rx_flow_control(m->ss->wsi, 0);
|
|
}
|
|
|
|
if (m->conn->wsi) /* if possible, request client conn write */
|
|
lws_callback_on_writable(m->conn->wsi);
|
|
|
|
return LWSSSSRET_OK;
|
|
}
|
|
|
|
/*
|
|
* we are transmitting buffered payload originally from the client on to the ss
|
|
*/
|
|
|
|
static lws_ss_state_return_t
|
|
ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
|
|
size_t *len, int *flags)
|
|
{
|
|
ss_proxy_t *m = (ss_proxy_t *)userobj;
|
|
void *p;
|
|
size_t si;
|
|
|
|
if (!m->conn->ss || m->conn->state != LPCSPROX_OPERATIONAL) {
|
|
lwsl_notice("%s: ss not ready\n", __func__);
|
|
*len = 0;
|
|
|
|
return LWSSSSRET_TX_DONT_SEND;
|
|
}
|
|
|
|
/*
|
|
* The onward secure stream says that we could send something to it
|
|
* (by putting it in buf, and setting *len and *flags)... dredge the
|
|
* next thing out of the dsh
|
|
*/
|
|
|
|
if (lws_ss_deserialize_tx_payload(m->conn->dsh, m->ss->wsi,
|
|
ord, buf, len, flags))
|
|
return LWSSSSRET_TX_DONT_SEND;
|
|
|
|
/* ... there's more we want to send? */
|
|
if (!lws_dsh_get_head(m->conn->dsh, KIND_C_TO_P, (void **)&p, &si))
|
|
lws_ss_request_tx(m->conn->ss);
|
|
|
|
if (!*len && !*flags)
|
|
/* we don't actually want to send anything */
|
|
return LWSSSSRET_TX_DONT_SEND;
|
|
|
|
lwsl_info("%s: onward tx %d fl 0x%x\n", __func__, (int)*len, *flags);
|
|
|
|
#if 0
|
|
{
|
|
int ff = open("/tmp/z", O_RDWR | O_CREAT | O_APPEND, 0666);
|
|
if (ff == -1)
|
|
lwsl_err("%s: errno %d\n", __func__, errno);
|
|
write(ff, buf, *len);
|
|
close(ff);
|
|
}
|
|
#endif
|
|
|
|
return LWSSSSRET_OK;
|
|
}
|
|
|
|
static lws_ss_state_return_t
|
|
ss_proxy_onward_state(void *userobj, void *sh,
|
|
lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
|
|
{
|
|
ss_proxy_t *m = (ss_proxy_t *)userobj;
|
|
size_t dsh_size;
|
|
|
|
switch (state) {
|
|
case LWSSSCS_CREATING:
|
|
|
|
/*
|
|
* conn is private to -process.c, call thru to a) adjust
|
|
* the accepted incoming proxy link wsi tag name to be
|
|
* appended with the onward ss tag information now we
|
|
* have it, and b) allocate the dsh buffer now we
|
|
* can find out the policy about it for the streamtype.
|
|
*/
|
|
|
|
dsh_size = m->ss->policy->proxy_buflen ?
|
|
m->ss->policy->proxy_buflen : 32768;
|
|
|
|
lwsl_notice("%s: %s: initializing dsh max len %lu\n",
|
|
__func__, lws_ss_tag(m->ss),
|
|
(unsigned long)dsh_size);
|
|
|
|
/* this includes ssproxy_dsh_create_oom fault generation */
|
|
|
|
if (__lws_ss_proxy_bind_ss_to_conn_wsi(m->conn, dsh_size)) {
|
|
|
|
/* failed to allocate the dsh */
|
|
|
|
lwsl_notice("%s: dsh init failed\n", __func__);
|
|
|
|
return LWSSSSRET_DESTROY_ME;
|
|
}
|
|
break;
|
|
|
|
case LWSSSCS_DESTROYING:
|
|
if (!m->conn)
|
|
break;
|
|
if (!m->conn->wsi) {
|
|
/*
|
|
* Our onward secure stream is closing and our client
|
|
* connection has already gone away... destroy the conn.
|
|
*/
|
|
lwsl_info("%s: Destroying conn\n", __func__);
|
|
lws_dsh_destroy(&m->conn->dsh);
|
|
free(m->conn);
|
|
m->conn = NULL;
|
|
return 0;
|
|
} else
|
|
lwsl_info("%s: ss DESTROYING, wsi up\n", __func__);
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
if (!m->conn) {
|
|
lwsl_warn("%s: dropping state due to conn not up\n", __func__);
|
|
|
|
return LWSSSSRET_OK;
|
|
}
|
|
|
|
if (lws_ss_serialize_state(m->conn->wsi, m->conn->dsh, state, ack))
|
|
/*
|
|
* Failed to alloc state packet that we want to send in dsh,
|
|
* we will lose coherence and have to disconnect the link
|
|
*/
|
|
return LWSSSSRET_DISCONNECT_ME;
|
|
|
|
if (m->conn->wsi) /* if possible, request client conn write */
|
|
lws_callback_on_writable(m->conn->wsi);
|
|
|
|
return LWSSSSRET_OK;
|
|
}
|
|
|
|
void
|
|
ss_proxy_onward_txcr(void *userobj, int bump)
|
|
{
|
|
ss_proxy_t *m = (ss_proxy_t *)userobj;
|
|
|
|
if (!m->conn)
|
|
return;
|
|
|
|
lws_ss_serialize_txcr(m->conn->dsh, bump);
|
|
|
|
if (m->conn->wsi) /* if possible, request client conn write */
|
|
lws_callback_on_writable(m->conn->wsi);
|
|
}
|
|
|
|
/*
|
|
* Client <-> Proxy connection, usually on Unix Domain Socket
|
|
*/
|
|
|
|
static int
|
|
callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason,
|
|
void *user, void *in, size_t len)
|
|
{
|
|
struct raw_pss *pss = (struct raw_pss *)user;
|
|
const lws_ss_policy_t *rsp;
|
|
struct conn *conn = NULL;
|
|
lws_ss_metadata_t *md;
|
|
lws_ss_info_t ssi;
|
|
const uint8_t *cp;
|
|
char s[512];
|
|
uint8_t *p;
|
|
size_t si;
|
|
char pay;
|
|
int n;
|
|
|
|
if (pss)
|
|
conn = pss->conn;
|
|
|
|
switch (reason) {
|
|
case LWS_CALLBACK_PROTOCOL_INIT:
|
|
break;
|
|
|
|
case LWS_CALLBACK_PROTOCOL_DESTROY:
|
|
break;
|
|
|
|
/* callbacks related to raw socket descriptor "accepted side" */
|
|
|
|
case LWS_CALLBACK_RAW_ADOPT:
|
|
lwsl_info("LWS_CALLBACK_RAW_ADOPT\n");
|
|
if (!pss)
|
|
return -1;
|
|
|
|
if (lws_fi(&wsi->fic, "ssproxy_client_adopt_oom"))
|
|
pss->conn = NULL;
|
|
else
|
|
pss->conn = malloc(sizeof(struct conn));
|
|
if (!pss->conn)
|
|
return -1;
|
|
|
|
memset(pss->conn, 0, sizeof(*pss->conn));
|
|
|
|
/* dsh is allocated when the onward ss is done */
|
|
|
|
pss->conn->wsi = wsi;
|
|
wsi->bound_ss_proxy_conn = 1; /* opaque is conn */
|
|
|
|
pss->conn->state = LPCSPROX_WAIT_INITIAL_TX;
|
|
|
|
/*
|
|
* Client is expected to follow the unix domain socket
|
|
* acceptance up rapidly with an initial tx containing the
|
|
* streamtype name. We can't create the stream until then.
|
|
*/
|
|
lws_set_timeout(wsi,
|
|
PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
|
|
break;
|
|
|
|
case LWS_CALLBACK_RAW_CLOSE:
|
|
lwsl_info("LWS_CALLBACK_RAW_CLOSE:\n");
|
|
|
|
if (!conn)
|
|
break;
|
|
|
|
/*
|
|
* the client unix domain socket connection (wsi / conn->wsi)
|
|
* has closed... eg, client has exited or otherwise has
|
|
* definitively finished with the proxying and onward connection
|
|
*
|
|
* But right now, the SS and possibly the SS onward wsi are
|
|
* still live...
|
|
*/
|
|
|
|
assert(conn->wsi == wsi);
|
|
conn->wsi = NULL;
|
|
|
|
lwsl_notice("%s: cli->prox link %s closing\n", __func__,
|
|
lws_wsi_tag(wsi));
|
|
|
|
/* sever relationship with conn */
|
|
lws_set_opaque_user_data(wsi, NULL);
|
|
|
|
/*
|
|
* The current wsi is decoupled from the pss / conn and
|
|
* the conn no longer has a pointer on it.
|
|
*
|
|
* If there's an outgoing, proxied SS conn on our behalf, we
|
|
* have to destroy those
|
|
*/
|
|
|
|
if (conn->ss) {
|
|
struct lws *cw = conn->ss->wsi;
|
|
/*
|
|
* conn->ss is the onward connection SS
|
|
*/
|
|
|
|
lwsl_info("%s: destroying %s, wsi %s\n",
|
|
__func__, lws_ss_tag(conn->ss),
|
|
lws_wsi_tag(conn->ss->wsi));
|
|
|
|
/* sever conn relationship with ss about to be deleted */
|
|
|
|
conn->ss->wsi = NULL;
|
|
|
|
if (cw && wsi != cw) {
|
|
|
|
/* disconnect onward SS from its wsi */
|
|
|
|
lws_set_opaque_user_data(cw, NULL);
|
|
|
|
/*
|
|
* The wsi doing the onward connection can no
|
|
* longer relate to the conn... otherwise when
|
|
* he gets callbacks he wants to bind to
|
|
* the ss we are about to delete
|
|
*/
|
|
lws_wsi_close(cw, LWS_TO_KILL_ASYNC);
|
|
}
|
|
|
|
lws_ss_destroy(&conn->ss);
|
|
/*
|
|
* Conn may have gone, at ss destroy handler in
|
|
* ssi.state for proxied ss
|
|
*/
|
|
break;
|
|
}
|
|
|
|
if (conn->state == LPCSPROX_DESTROYED || !conn->ss) {
|
|
/*
|
|
* There's no onward secure stream and our client
|
|
* connection is closing. Destroy the conn.
|
|
*/
|
|
lws_dsh_destroy(&conn->dsh);
|
|
free(conn);
|
|
pss->conn = NULL;
|
|
} else
|
|
lwsl_debug("%s: CLOSE; %s\n", __func__, lws_ss_tag(conn->ss));
|
|
|
|
break;
|
|
|
|
case LWS_CALLBACK_RAW_RX:
|
|
/*
|
|
* ie, the proxy is receiving something from a client
|
|
*/
|
|
lwsl_info("%s: RX: rx %d\n", __func__, (int)len);
|
|
|
|
if (!conn || !conn->wsi) {
|
|
lwsl_err("%s: rx with bad conn state\n", __func__);
|
|
|
|
return -1;
|
|
}
|
|
|
|
// lwsl_hexdump_info(in, len);
|
|
|
|
if (conn->state == LPCSPROX_WAIT_INITIAL_TX) {
|
|
memset(&ssi, 0, sizeof(ssi));
|
|
ssi.user_alloc = sizeof(ss_proxy_t);
|
|
ssi.handle_offset = offsetof(ss_proxy_t, ss);
|
|
ssi.opaque_user_data_offset =
|
|
offsetof(ss_proxy_t, conn);
|
|
ssi.rx = ss_proxy_onward_rx;
|
|
ssi.tx = ss_proxy_onward_tx;
|
|
}
|
|
ssi.state = ss_proxy_onward_state;
|
|
ssi.flags = 0;
|
|
|
|
n = lws_ss_deserialize_parse(&conn->parser,
|
|
lws_get_context(wsi), conn->dsh, in, len,
|
|
&conn->state, conn, &conn->ss, &ssi, 0);
|
|
switch (n) {
|
|
case LWSSSSRET_OK:
|
|
break;
|
|
case LWSSSSRET_DISCONNECT_ME:
|
|
return -1;
|
|
case LWSSSSRET_DESTROY_ME:
|
|
if (conn->ss)
|
|
lws_ss_destroy(&conn->ss);
|
|
return -1;
|
|
}
|
|
|
|
if (conn->state == LPCSPROX_REPORTING_FAIL ||
|
|
conn->state == LPCSPROX_REPORTING_OK)
|
|
lws_callback_on_writable(conn->wsi);
|
|
|
|
break;
|
|
|
|
case LWS_CALLBACK_RAW_WRITEABLE:
|
|
|
|
lwsl_debug("%s: %s: LWS_CALLBACK_RAW_WRITEABLE, state 0x%x\n",
|
|
__func__, lws_wsi_tag(wsi), lwsi_state(wsi));
|
|
|
|
/*
|
|
* We can transmit something back to the client from the dsh
|
|
* of stuff we received on its behalf from the ss
|
|
*/
|
|
|
|
if (!conn || !conn->wsi)
|
|
break;
|
|
|
|
n = 0;
|
|
pay = 0;
|
|
|
|
s[3] = 0;
|
|
cp = (const uint8_t *)s;
|
|
switch (conn->state) {
|
|
case LPCSPROX_REPORTING_FAIL:
|
|
s[3] = 1;
|
|
/* fallthru */
|
|
case LPCSPROX_REPORTING_OK:
|
|
s[0] = LWSSS_SER_RXPRE_CREATE_RESULT;
|
|
s[1] = 0;
|
|
s[2] = 1;
|
|
|
|
n = 8;
|
|
|
|
lws_ser_wu32be((uint8_t *)&s[4], conn->ss &&
|
|
conn->ss->policy ?
|
|
conn->ss->policy->client_buflen : 0);
|
|
|
|
/*
|
|
* If there's rideshare sequencing, it's added after the
|
|
* first 4 bytes or the create result, comma-separated
|
|
*/
|
|
|
|
if (conn->ss) {
|
|
rsp = conn->ss->policy;
|
|
|
|
while (rsp) {
|
|
if (n != 4 && n < (int)sizeof(s) - 2)
|
|
s[n++] = ',';
|
|
n += lws_snprintf(&s[n], sizeof(s) - (unsigned int)n,
|
|
"%s", rsp->streamtype);
|
|
rsp = lws_ss_policy_lookup(wsi->a.context,
|
|
rsp->rideshare_streamtype);
|
|
}
|
|
}
|
|
s[2] = (char)(n - 3);
|
|
conn->state = LPCSPROX_OPERATIONAL;
|
|
lws_set_timeout(wsi, 0, 0);
|
|
break;
|
|
|
|
case LPCSPROX_OPERATIONAL:
|
|
|
|
/*
|
|
* returning [onward -> ] proxy]-> client
|
|
* rx metadata has priority 1
|
|
*/
|
|
|
|
md = conn->ss->metadata;
|
|
while (md) {
|
|
// lwsl_notice("%s: check %s: %d\n", __func__,
|
|
// md->name, md->pending_onward);
|
|
if (md->pending_onward) {
|
|
size_t naml = strlen(md->name);
|
|
|
|
// lwsl_notice("%s: proxy issuing rxmd\n", __func__);
|
|
|
|
if (4 + naml + md->length > sizeof(s)) {
|
|
lwsl_err("%s: rxmdata too big\n",
|
|
__func__);
|
|
goto hangup;
|
|
}
|
|
md->pending_onward = 0;
|
|
p = (uint8_t *)s;
|
|
p[0] = LWSSS_SER_RXPRE_METADATA;
|
|
lws_ser_wu16be(&p[1], (uint16_t)(1 + naml +
|
|
md->length));
|
|
p[3] = (uint8_t)naml;
|
|
memcpy(&p[4], md->name, naml);
|
|
p += 4 + naml;
|
|
memcpy(p, md->value__may_own_heap,
|
|
md->length);
|
|
p += md->length;
|
|
|
|
n = lws_ptr_diff(p, cp);
|
|
goto again;
|
|
}
|
|
|
|
md = md->next;
|
|
}
|
|
|
|
/*
|
|
* If we have performance data, render it in JSON
|
|
* and send that in LWSSS_SER_RXPRE_PERF has
|
|
* priority 2
|
|
*/
|
|
|
|
if (conn->ss->conmon_json) {
|
|
unsigned int xlen = conn->ss->conmon_len;
|
|
|
|
if (xlen > sizeof(s) - 3)
|
|
xlen = sizeof(s) - 3;
|
|
cp = (uint8_t *)s;
|
|
p = (uint8_t *)s;
|
|
p[0] = LWSSS_SER_RXPRE_PERF;
|
|
lws_ser_wu16be(&p[1], (uint16_t)xlen);
|
|
memcpy(&p[3], conn->ss->conmon_json, xlen);
|
|
|
|
lws_free_set_NULL(conn->ss->conmon_json);
|
|
n = (int)(xlen + 3);
|
|
|
|
pay = 0;
|
|
goto again;
|
|
}
|
|
|
|
/*
|
|
* if no fresh rx metadata, just pass through incoming
|
|
* dsh
|
|
*/
|
|
|
|
if (lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,
|
|
(void **)&p, &si))
|
|
break;
|
|
|
|
cp = p;
|
|
|
|
#if 0
|
|
if (cp[0] == LWSSS_SER_RXPRE_RX_PAYLOAD &&
|
|
wsi->a.context->detailed_latency_cb) {
|
|
|
|
/*
|
|
* we're fulfilling rx that came in on ss
|
|
* by sending it back out to the client on
|
|
* the Unix Domain Socket
|
|
*
|
|
* + 7 u32 write will compute latency here...
|
|
* + 11 u32 ust we received from ss
|
|
*
|
|
* lws_write will report it and fill in
|
|
* LAT_DUR_PROXY_CLIENT_REQ_TO_WRITE
|
|
*/
|
|
|
|
us = lws_now_usecs();
|
|
lws_ser_wu32be(&p[7], us -
|
|
lws_ser_ru64be(&p[11]));
|
|
lws_ser_wu64be(&p[11], us);
|
|
|
|
wsi->detlat.acc_size =
|
|
wsi->detlat.req_size = si - 19;
|
|
/* time proxy held it */
|
|
wsi->detlat.latencies[
|
|
LAT_DUR_PROXY_RX_TO_ONWARD_TX] =
|
|
lws_ser_ru32be(&p[7]);
|
|
}
|
|
#endif
|
|
|
|
pay = 1;
|
|
n = (int)si;
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
again:
|
|
if (!n)
|
|
break;
|
|
|
|
if (lws_fi(&wsi->fic, "ssproxy_client_write_fail"))
|
|
n = -1;
|
|
else
|
|
n = lws_write(wsi, (uint8_t *)cp, (unsigned int)n, LWS_WRITE_RAW);
|
|
if (n < 0) {
|
|
lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
|
|
|
|
goto hangup;
|
|
}
|
|
|
|
switch (conn->state) {
|
|
case LPCSPROX_REPORTING_FAIL:
|
|
goto hangup;
|
|
case LPCSPROX_OPERATIONAL:
|
|
if (!conn)
|
|
break;
|
|
if (pay) {
|
|
lws_dsh_free((void **)&p);
|
|
|
|
/*
|
|
* Did we go below the rx flow threshold for
|
|
* this dsh?
|
|
*/
|
|
|
|
if (conn->ss->policy->proxy_buflen_rxflow_on_above &&
|
|
conn->ss->wsi &&
|
|
conn->dsh->oha[KIND_SS_TO_P].total_size <
|
|
conn->ss->policy->proxy_buflen_rxflow_off_below) {
|
|
lwsl_notice("%s: %s: rxflow re-enabling rx\n",
|
|
__func__,
|
|
lws_wsi_tag(conn->ss->wsi));
|
|
/*
|
|
* Resume receiving taking in rx once
|
|
* below the low threshold
|
|
*/
|
|
lws_rx_flow_control(conn->ss->wsi, 1);
|
|
}
|
|
}
|
|
if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,
|
|
(void **)&p, &si)) {
|
|
if (!lws_send_pipe_choked(wsi)) {
|
|
cp = p;
|
|
pay = 1;
|
|
n = (int)si;
|
|
goto again;
|
|
}
|
|
lws_callback_on_writable(wsi);
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
|
|
return lws_callback_http_dummy(wsi, reason, user, in, len);
|
|
|
|
hangup:
|
|
/* hang up on him */
|
|
|
|
return -1;
|
|
}
|
|
|
|
static const struct lws_protocols protocols[] = {
|
|
{
|
|
"ssproxy-protocol",
|
|
callback_ss_proxy,
|
|
sizeof(struct raw_pss),
|
|
2048, 2048, NULL, 0
|
|
},
|
|
{ NULL, NULL, 0, 0, 0, NULL, 0 }
|
|
};
|
|
|
|
/*
|
|
* called from create_context()
|
|
*/
|
|
|
|
int
|
|
lws_ss_proxy_create(struct lws_context *context, const char *bind, int port)
|
|
{
|
|
struct lws_context_creation_info info;
|
|
|
|
memset(&info, 0, sizeof(info));
|
|
|
|
info.vhost_name = "ssproxy";
|
|
info.options = LWS_SERVER_OPTION_ADOPT_APPLY_LISTEN_ACCEPT_CONFIG |
|
|
LWS_SERVER_OPTION_SS_PROXY;
|
|
info.port = port;
|
|
if (!port) {
|
|
if (!bind)
|
|
#if defined(__linux__)
|
|
bind = "@proxy.ss.lws";
|
|
#else
|
|
bind = "/tmp/proxy.ss.lws";
|
|
#endif
|
|
info.options |= LWS_SERVER_OPTION_UNIX_SOCK;
|
|
}
|
|
info.iface = bind;
|
|
#if defined(__linux__)
|
|
info.unix_socket_perms = "root:root";
|
|
#else
|
|
#endif
|
|
info.listen_accept_role = "raw-skt";
|
|
info.listen_accept_protocol = "ssproxy-protocol";
|
|
info.protocols = protocols;
|
|
|
|
if (!lws_create_vhost(context, &info)) {
|
|
lwsl_err("%s: Failed to create ss proxy vhost\n", __func__);
|
|
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|