diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 551ac0435..f79b94729 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -301,30 +301,35 @@ secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) { static char * expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len) { - lws_strexp_t exp = {0}; - char* expbuf = NULL; + lws_strexp_t exp; + char *expbuf = NULL; size_t used_in = 0, used_out = 0, post_len = 0; + memset(&exp, 0, sizeof(exp)); + if (post) post_len = strlen(post); if (post_len > max_len) return NULL; - lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL, + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, NULL, max_len - post_len); if (lws_strexp_expand(&exp, str, strlen(str), &used_in, &used_out) != LSTRX_DONE) { lwsl_err("%s, failed to expand %s", __func__, str); + return NULL; } expbuf = lws_malloc(used_out + 1 + post_len, __func__); if (!expbuf) { lwsl_err("%s, failed to allocate str_exp for %s", __func__, str); + return NULL; } + lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, used_out + 1 + post_len); @@ -332,11 +337,11 @@ expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t ma &used_out) != LSTRX_DONE) { lwsl_err("%s, failed to expand str_exp %s\n", __func__, str); lws_free(expbuf); + return NULL; } - if (post) { + if (post) strcat(expbuf, post); - } return expbuf; } @@ -356,7 +361,8 @@ secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic) expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN); if (!expbuf) { - lwsl_warn("%s, failed to expand Shadow topic", __func__); + lwsl_wsi_warn(wsi, "Failed to expand Shadow topic"); + return LMMTR_TOPIC_MATCH_ERROR; } for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) { @@ -377,9 +383,8 @@ secstream_mqtt_shadow_cleanup(struct lws *wsi) lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); uint32_t i = 0; - for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { + for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name); - } h->u.mqtt.shadow_sub.num_topics = 0; @@ -398,16 +403,19 @@ secstream_mqtt_shadow_unsubscribe(struct lws *wsi) wsi->mqtt->send_shadow_unsubscribe = 0; wsi->mqtt->inside_shadow = 0; wsi->mqtt->done_shadow_subscribe = 0; + return LWSSSSRET_OK; } if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) { - lwsl_err("%s, failed to send MQTT unsubsribe", __func__); + lwsl_wsi_err(wsi, "Failed to send MQTT unsubsribe"); + return LWSSSSRET_DISCONNECT_ME; } /* Expect a UNSUBACK */ if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { - lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); + lwsl_wsi_err(wsi, "Unable to set LWS_POLLIN"); + return LWSSSSRET_DISCONNECT_ME; } wsi->mqtt->send_shadow_unsubscribe = 0; @@ -422,7 +430,7 @@ secstream_mqtt_shadow_subscribe(struct lws *wsi) char* expbuf = NULL; const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR, LWS_MQTT_SHADOW_RESP_REJECTED_STR }; - unsigned int suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]); + unsigned int i, suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]); if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow) return 0; @@ -434,17 +442,17 @@ secstream_mqtt_shadow_subscribe(struct lws *wsi) h->u.mqtt.shadow_sub.topic = lws_malloc( sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__); if (!h->u.mqtt.shadow_sub.topic) { - lwsl_err("%s, failed to allocate Shadow topics", __func__); + lwsl_ss_err(h, "Failed to allocate Shadow topics"); return -1; } h->u.mqtt.shadow_sub.num_topics = suffixes_len; - for (unsigned int i = 0; i < suffixes_len; i++) { + for (i = 0; i < suffixes_len; i++) { expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i], LWS_MQTT_MAX_AWSIOT_TOPICLEN); if (!expbuf) { - lwsl_err("%s, failed to allocate Shadow topic", - __func__); + lwsl_ss_err(h, "Failed to allocate Shadow topic"); secstream_mqtt_shadow_cleanup(wsi); + return -1; } h->u.mqtt.shadow_sub.topic[i].name = expbuf; @@ -453,7 +461,8 @@ secstream_mqtt_shadow_subscribe(struct lws *wsi) h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1); if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) { - lwsl_notice("%s: unable to subscribe Shadow topics", __func__); + lwsl_wsi_notice(wsi, "Unable to subscribe Shadow topics"); + return 0; } @@ -472,15 +481,18 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); - lws_mqtt_publish_param_t *pmqpp = NULL; - lws_ss_metadata_t *omd = NULL; - uint8_t buf[LWS_PRE + 1400] = {0}; - lws_ss_state_return_t r = LWSSSSRET_OK; - size_t buflen = sizeof(buf) - LWS_PRE; - int f = 0; - lws_strexp_t exp = {0}; size_t used_in = 0, used_out = 0, topic_len = 0; - char* sub_topic = NULL; + lws_mqtt_publish_param_t *pmqpp = NULL; + lws_ss_state_return_t r = LWSSSSRET_OK; + uint8_t buf[LWS_PRE + 1400]; + size_t buflen = sizeof(buf) - LWS_PRE; + lws_ss_metadata_t *omd = NULL; + char *sub_topic = NULL; + lws_strexp_t exp; + int f = 0; + + memset(buf, 0, sizeof(buf)); + memset(&exp, 0, sizeof(exp)); switch (reason) { @@ -665,19 +677,19 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload, len, f); - if (wsi->mqtt->inside_shadow) { + if (wsi->mqtt->inside_shadow) _lws_ss_set_metadata(omd, exp.name, &sub_topic, topic_len); - } if (r != LWSSSSRET_OK) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); if (wsi->mqtt->inside_shadow) { - uint32_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR); - uint32_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR); + size_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR); + size_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR); + uint32_t i; - for (uint32_t i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { + for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { /* * received response ('/accepted' or 'rejected') * and clean up Shadow operation @@ -689,15 +701,15 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, continue; if (!strcmp(pmqpp->topic + - (strlen(pmqpp->topic) - acc_n), - LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) || + (strlen(pmqpp->topic) - acc_n), + LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) || !strcmp(pmqpp->topic + - (strlen(pmqpp->topic) - rej_n), - LWS_MQTT_SHADOW_RESP_REJECTED_STR)) { - lws_sul_cancel( - &wsi->mqtt->sul_shadow_wait); + (strlen(pmqpp->topic) - rej_n), + LWS_MQTT_SHADOW_RESP_REJECTED_STR)) { + lws_sul_cancel(&wsi->mqtt->sul_shadow_wait); wsi->mqtt->send_shadow_unsubscribe = 1; lws_callback_on_writable(wsi); + return 0; } } @@ -708,6 +720,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (wsi->mqtt->inside_shadow) { wsi->mqtt->done_shadow_subscribe = 1; lws_callback_on_writable(wsi); + return 0; } /*