diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index 02e607f65..48e5efe68 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -115,6 +115,10 @@ typedef struct lws_ss_handle { lws_mqtt_topic_elem_t topic_qos; lws_mqtt_topic_elem_t sub_top; lws_mqtt_subscribe_param_t sub_info; + /* allocation that must be destroyed with conn */ + void *heap_baggage; + const char *subscribe_to; + size_t subscribe_to_len; } mqtt; #endif } u; diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 59d5495e1..eb2e4bf6b 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -44,6 +44,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, break; lws_ss_event_helper(h, LWSSSCS_UNREACHABLE); h->wsi = NULL; + if (h->u.mqtt.heap_baggage) { + lws_free(h->u.mqtt.heap_baggage); + h->u.mqtt.heap_baggage = NULL; + } + lws_ss_backoff(h); break; @@ -54,6 +59,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (h->wsi) lws_set_opaque_user_data(h->wsi, NULL); h->wsi = NULL; + + if (h->u.mqtt.heap_baggage) { + lws_free(h->u.mqtt.heap_baggage); + h->u.mqtt.heap_baggage = NULL; + } + if (f) { lws_ss_destroy(&h); break; @@ -118,17 +129,20 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, break; } - if (h->policy->u.mqtt.subscribe && !wsi->mqtt->done_subscribe) { + if (h->policy->u.mqtt.subscribe && + !wsi->mqtt->done_subscribe) { /* * The policy says to subscribe to something, and we - * haven't done it yet + * haven't done it yet. Do it using the pre-prepared + * string-substituted version of the policy string. */ - lwsl_warn("%s: subscribing %s\n", __func__, h->policy->u.mqtt.subscribe); + lwsl_notice("%s: subscribing %s\n", __func__, + h->u.mqtt.subscribe_to); memset(&h->u.mqtt.sub_top, 0, sizeof(h->u.mqtt.sub_top)); - h->u.mqtt.sub_top.name = h->policy->u.mqtt.subscribe; + h->u.mqtt.sub_top.name = h->u.mqtt.subscribe_to; h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos; memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); h->u.mqtt.sub_info.num_topics = 1; @@ -165,7 +179,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, } memset(&mqpp, 0, sizeof(mqpp)); - mqpp.topic = (char *)h->policy->u.mqtt.topic; + /* this is the string-substituted h->policy->u.mqtt.topic */ + mqpp.topic = (char *)h->u.mqtt.topic_qos.name; mqpp.topic_len = strlen(mqpp.topic); mqpp.packet_id = h->txord - 1; mqpp.payload = buf + LWS_PRE; @@ -174,7 +189,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, else mqpp.payload_len = buflen; - lwsl_notice("%s: payload len %d\n", __func__, (int)mqpp.payload_len); + lwsl_notice("%s: payload len %d\n", __func__, + (int)mqpp.payload_len); mqpp.qos = h->policy->u.mqtt.qos; @@ -212,25 +228,103 @@ const struct lws_protocols protocol_secstream_mqtt = { * For ws, protocol aux is ; */ +enum { + SSCMM_STRSUB_WILL_TOPIC, + SSCMM_STRSUB_WILL_MESSAGE, + SSCMM_STRSUB_SUBSCRIBE, + SSCMM_STRSUB_TOPIC +}; + static int secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, struct lws_client_connect_info *i, union lws_ss_contemp *ct) { + const char *sources[4] = { + /* we're going to string-substitute these before use */ + h->policy->u.mqtt.will_topic, + h->policy->u.mqtt.will_message, + h->policy->u.mqtt.subscribe, + h->policy->u.mqtt.topic + }; + size_t used_in, olen[4] = { 0, 0, 0, 0 }, tot = 0; + lws_strexp_t exp; + char *p, *ps[4]; + int n; + memset(&ct->ccp, 0, sizeof(ct->ccp)); ct->ccp.client_id = "lwsMqttClient"; ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive; ct->ccp.clean_start = h->policy->u.mqtt.clean_start; - ct->ccp.will_param.topic = h->policy->u.mqtt.will_topic; - ct->ccp.will_param.message = h->policy->u.mqtt.will_message; ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos; ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain; + h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; - lwsl_notice("%s\n", __func__); + /* + * We're going to string-substitute several of these parameters, which + * have unknown, possibly large size. And, as their usage is deferred + * inside the asynchronous lifetime of the MQTT connection, they need + * to live on the heap. + * + * Notice these allocations at h->u.mqtt.heap_baggage belong to the + * underlying MQTT stream lifetime, not the logical SS lifetime, and + * are destroyed if present at connection error or close of the + * underlying connection. + * + * + * First, compute the length of each without producing strsubst output, + * and keep a running total. + */ - h->u.mqtt.topic_qos.name = h->policy->u.mqtt.subscribe; - h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; + for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, + NULL, (size_t)-1); + if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), + &used_in, &olen[n]) != LSTRX_DONE) { + lwsl_err("%s: failed to subsitute %s\n", __func__, + sources[n]); + return 1; + } + tot += olen[n] + 1; + } + + /* + * Then, allocate enough space on the heap for the total of the + * substituted results + */ + + h->u.mqtt.heap_baggage = lws_malloc(tot, __func__); + if (!h->u.mqtt.heap_baggage) + return 1; + + /* + * Finally, issue the subsitutions one after the other into the single + * allocated result buffer and prepare pointers into them + */ + + p = h->u.mqtt.heap_baggage; + for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) { + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, + p, (size_t)-1); + ps[n] = p; + if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]), + &used_in, &olen[n]) != LSTRX_DONE) + return 1; + + p += olen[n] + 1; + } + + /* + * Point the guys who want the substituted content at the substituted + * strings + */ + + ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC]; + ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE]; + h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE]; + h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE]; + h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC]; i->method = "MQTT"; i->mqtt_cp = &ct->ccp; @@ -240,19 +334,6 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, /* share connections where possible */ i->ssl_connection |= LCCSCF_PIPELINE; -/* - if (!h->policy->u.http.url) - return 0; - - // protocol aux is the path part ; ws subprotocol name - - i->path = NULL; - lws_snprintf(buf, len, "/%s", h->policy->u.mqtt.topic); - -// i->protocol = h->policy->u.mqtt.u.ws.subprotocol; - - lwsl_notice("%s: url %s, ws subprotocol %s\n", __func__, buf, i->protocol); -*/ return 0; }