diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 11489d250..911f5789f 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -141,8 +141,8 @@ secstream_mqtt_subscribe(struct lws *wsi) } static int -secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, - const char* topic, +secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, + uint32_t payload_len, const char* topic, lws_mqtt_qos_levels_t qos, uint8_t retain, int f) { lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); @@ -190,17 +190,17 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, mqpp.qos = qos; mqpp.retain = !!retain; mqpp.payload = buf; - if (h->writeable_len) - mqpp.payload_len = (uint32_t)h->writeable_len; + if (payload_len) + mqpp.payload_len = payload_len; else - mqpp.payload_len = (uint32_t)buflen; + mqpp.payload_len = (uint32_t)buf_len; lwsl_notice("%s: payload len %d\n", __func__, (int)mqpp.payload_len); if (lws_mqtt_client_send_publish(wsi, &mqpp, (const char *)buf, - (uint32_t)buflen, + (uint32_t)buf_len, f & LWSSS_FLAG_EOM)) { lwsl_notice("%s: failed to publish\n", __func__); lws_free(expbuf); @@ -210,6 +210,29 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, return 0; } +static int +secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) { + lws_strexp_t exp; + size_t used_in, used_out = 0; + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + + if (h->policy->u.mqtt.birth_message) { + lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, + (char *)buf, buflen); + if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message, + strlen(h->policy->u.mqtt.birth_message), + &used_in, &used_out) != LSTRX_DONE) { + return 1; + } + } + wsi->mqtt->inside_birth = 1; + return secstream_mqtt_publish(wsi, buf, + used_out, 0, h->policy->u.mqtt.birth_topic, + h->policy->u.mqtt.birth_qos, + h->policy->u.mqtt.birth_retain, + LWSSS_FLAG_EOM); +} + static int secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -285,6 +308,23 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, h->retry = 0; h->seqstate = SSSEQ_CONNECTED; + if (h->policy->u.mqtt.birth_topic && + !wsi->mqtt->done_birth) { + struct lws *nwsi = lws_get_network_wsi(wsi); + lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) { + if (w != wsi && + (w->mqtt->done_birth || w->mqtt->inside_birth)) { + /* + * If any Birth was sent out or + * is pending on other stream, + * skip sending Birth. + */ + wsi->mqtt->done_birth = 1; + break; + } + } lws_end_foreach_ll(w, mux.sibling_list); + } + if (!h->policy->u.mqtt.subscribe || !h->policy->u.mqtt.subscribe[0]) { /* @@ -313,6 +353,17 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lws_callback_on_writable(wsi); break; } + + if (h->policy->u.mqtt.birth_topic && + !wsi->mqtt->done_birth) { + /* + * If a Birth is pending on the stream, then make + * sure the Birth is done before signaling the + * user application. + */ + lws_callback_on_writable(wsi); + break; + } lws_sul_cancel(&h->sul); #if defined(LWS_WITH_SYS_METRICS) /* @@ -351,10 +402,10 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, case LWS_CALLBACK_MQTT_SUBSCRIBED: /* - * Stream demanded a subscribe while connecting, once + * Stream demanded a subscribe without a Birth while connecting, once * done notify CONNECTED event to the application. */ - if (wsi->mqtt->done_subscribe == 0) { + if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) { lws_sul_cancel(&h->sul); r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); if (r != LWSSSSRET_OK) @@ -368,10 +419,15 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lws_sul_cancel(&h->sul_timeout); if (wsi->mqtt->inside_birth) { /* - * Skip LWSSSCS_QOS_ACK_REMOTE for birth topic. + * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify + * CONNECTED event to the application. */ wsi->mqtt->inside_birth = 0; wsi->mqtt->done_birth = 1; + r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); + if (r != LWSSSSRET_OK) + return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); + lws_callback_on_writable(wsi); break; } r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE); @@ -379,6 +435,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); break; + case LWS_CALLBACK_MQTT_RESEND: + if (wsi->mqtt->inside_birth) { + lwsl_err("%s: %s: Failed to send Birth\n", __func__, + lws_ss_tag(h)); + return -1; + } + break; + case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: { if (!h || !h->info.tx) @@ -393,27 +457,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) return secstream_mqtt_subscribe(wsi); - if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) { - lws_strexp_t exp; - size_t used_in, used_out = 0; - if (h->policy->u.mqtt.birth_message) { - lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, - (char *)(buf + LWS_PRE), buflen); - if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message, - strlen(h->policy->u.mqtt.birth_message), - &used_in, &used_out) != LSTRX_DONE) { - return 1; - } - } - wsi->mqtt->inside_birth = 1; - return secstream_mqtt_publish(wsi, buf + LWS_PRE, - used_out, h->policy->u.mqtt.birth_topic, - h->policy->u.mqtt.birth_qos, - h->policy->u.mqtt.birth_retain, - LWSSS_FLAG_EOM); - } + if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) + return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen); + r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, &buflen, &f); + if (r == LWSSSSRET_TX_DONT_SEND) return 0; @@ -436,10 +485,16 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (r < 0) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); - return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, - h->policy->u.mqtt.topic, - h->policy->u.mqtt.qos, - h->policy->u.mqtt.retain, f); + if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, + (uint32_t)h->writeable_len, + h->policy->u.mqtt.topic, + h->policy->u.mqtt.qos, + h->policy->u.mqtt.retain, f) != 0) { + r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); + if (r != LWSSSSRET_OK) + return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); + } + return 0; } case LWS_CALLBACK_MQTT_UNSUBSCRIBED: