diff --git a/include/libwebsockets/lws-callbacks.h b/include/libwebsockets/lws-callbacks.h index 62848fa5a..fd36f7c7d 100644 --- a/include/libwebsockets/lws-callbacks.h +++ b/include/libwebsockets/lws-callbacks.h @@ -887,6 +887,11 @@ enum lws_callback_reasons { * the _UNSUBSCRIBED one if we timed out waiting for a UNSUBACK. * Return nonzero to close the wsi. */ + LWS_CALLBACK_MQTT_SHADOW_TIMEOUT = 212, + /**< When a Device Shadow is sent, this callback is generated if we + * timed out waiting for a response from AWS IoT. + * Return nonzero to close the wsi. + */ /****** add new things just above ---^ ******/ diff --git a/include/libwebsockets/lws-mqtt.h b/include/libwebsockets/lws-mqtt.h index b228315a1..cbf8b3637 100644 --- a/include/libwebsockets/lws-mqtt.h +++ b/include/libwebsockets/lws-mqtt.h @@ -42,6 +42,29 @@ typedef struct lws_mqtt_str_st lws_mqtt_str_t; #define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between 1 and 23 chars... */ +#define LWS_MQTT_SHADOW_MAX_THING_LEN 128 +#define LWS_MQTT_SHADOW_MAX_SHADOW_LEN 64 +#define LWS_MQTT_SHADOW_UPDATE_STR "/update" +#define LWS_MQTT_SHADOW_DELETE_STR "/delete" +#define LWS_MQTT_SHADOW_GET_STR "/get" +#define LWS_MQTT_SHADOW_RESP_ACCEPTED_STR "/accepted" +#define LWS_MQTT_SHADOW_RESP_REJECTED_STR "/rejected" +#define LWS_MQTT_SHADOW_RESP_DELTA_STR "/delta" +#define LWS_MQTT_SHADOW_RESP_DOCUMENT_STR "/documents" +#define LWS_MQTT_SHADOW_UPDATE_ACCEPTED_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_ACCEPTED_STR +#define LWS_MQTT_SHADOW_UPDATE_REJECTED_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_REJECTED_STR +#define LWS_MQTT_SHADOW_UPDATE_DELTA_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_DELTA_STR +#define LWS_MQTT_SHADOW_UPDATE_DOCUMENT_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_DOCUMENT_STR +#define LWS_MQTT_SHADOW_DELETE_ACCEPTED_STR LWS_MQTT_SHADOW_DELETE_STR LWS_MQTT_SHADOW_RESP_ACCEPTED_STR +#define LWS_MQTT_SHADOW_DELETE_REJECTED_STR LWS_MQTT_SHADOW_DELETE_STR LWS_MQTT_SHADOW_RESP_REJECTED_STR +#define LWS_MQTT_SHADOW_GET_ACCEPTED_STR LWS_MQTT_SHADOW_GET_STR LWS_MQTT_SHADOW_RESP_ACCEPTED_STR +#define LWS_MQTT_SHADOW_GET_REJECTED_STR LWS_MQTT_SHADOW_GET_STR LWS_MQTT_SHADOW_RESP_REJECTED_STR +#define LWS_MQTT_SHADOW_PREFIX_FORMAT "$aws/things/%s" +#define LWS_MQTT_SHADOW_NAMED_SHADOW_TOPIC_FORMAT LWS_MQTT_SHADOW_PREFIX_FORMAT "/shadow/name/%s%s" +#define LWS_MQTT_SHADOW_UNNAMED_SHADOW_TOPIC_FORMAT LWS_MQTT_SHADOW_PREFIX_FORMAT "/shadow%s" +#define LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH "$aws/things/+/shadow/+" +#define LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH "$aws/things/+/shadow/name/+/+" + typedef enum { QOS0, QOS1, diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index b41d44d12..aaac84b8b 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -214,13 +214,6 @@ static const uint8_t map_flags[] = { LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00, }; -void -lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s) -{ - lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s); - c->estate = s; -} - static int lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed) { @@ -277,74 +270,6 @@ lws_mqtt_set_client_established(struct lws *wsi) return 0; } - -static lws_mqtt_match_topic_return_t -lws_mqtt_is_topic_matched(const char* sub, const char* pub) -{ - const char *ppos = pub, *spos = sub; - - if (!ppos || !spos) { - return LMMTR_TOPIC_MATCH_ERROR; - } - - while (*spos) { - if (*ppos == '#' || *ppos == '+') { - lwsl_err("%s: PUBLISH to wildcard " - "topic \"%s\" not supported\n", - __func__, pub); - return LMMTR_TOPIC_MATCH_ERROR; - } - /* foo/+/bar == foo/xyz/bar ? */ - if (*spos == '+') { - /* Skip ahead */ - while (*ppos != '\0' && *ppos != '/') { - ppos++; - } - } else if (*spos == '#') { - return LMMTR_TOPIC_MATCH; - } else { - if (*ppos == '\0') { - /* foo/bar == foo/bar/# ? */ - if (!strncmp(spos, "/#", 2)) - return LMMTR_TOPIC_MATCH; - return LMMTR_TOPIC_NOMATCH; - /* Non-matching character */ - } else if (*ppos != *spos) { - return LMMTR_TOPIC_NOMATCH; - } - ppos++; - } - spos++; - } - - if (*spos == '\0' && *ppos == '\0') - return LMMTR_TOPIC_MATCH; - - return LMMTR_TOPIC_NOMATCH; -} - -lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, - const char* ptopic) { - lws_mqtt_subs_t *s = mqtt->subs_head; - - while (s) { - /* SUB topic == PUB topic ? */ - /* foo/bar/xyz == foo/bar/xyz ? */ - if (!s->wildcard) { - if (!strcmp((const char*)s->topic, ptopic)) - return s; - } else { - if (lws_mqtt_is_topic_matched( - s->topic, ptopic) == LMMTR_TOPIC_MATCH) - return s; - } - - s = s->next; - } - - return NULL; -} - static lws_mqtt_validate_topic_return_t lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot) { @@ -477,6 +402,126 @@ lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt) return 1; } +/* + * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or + * PUBREC came before the timeout period + */ + +static void +lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul) +{ + struct _lws_mqtt_related *mqtt = lws_container_of(sul, + struct _lws_mqtt_related, sul_qos_puback_pubrec_wait); + + lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); + + if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND, + mqtt->wsi->user_space, NULL, 0)) + lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); +} + +static void +lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul) +{ + struct _lws_mqtt_related *mqtt = lws_container_of(sul, + struct _lws_mqtt_related, sul_unsuback_wait); + + lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); + + if (mqtt->wsi->a.protocol->callback(mqtt->wsi, + LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT, + mqtt->wsi->user_space, NULL, 0)) + lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); +} + +static void +lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul) +{ + struct _lws_mqtt_related *mqtt = lws_container_of(sul, + struct _lws_mqtt_related, sul_shadow_wait); + + lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); + + if (mqtt->wsi->a.protocol->callback(mqtt->wsi, + LWS_CALLBACK_MQTT_SHADOW_TIMEOUT, + mqtt->wsi->user_space, NULL, 0)) + lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); +} + +void +lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s) +{ + lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s); + c->estate = s; +} + +lws_mqtt_match_topic_return_t +lws_mqtt_is_topic_matched(const char* sub, const char* pub) +{ + const char *ppos = pub, *spos = sub; + + if (!ppos || !spos) { + return LMMTR_TOPIC_MATCH_ERROR; + } + + while (*spos) { + if (*ppos == '#' || *ppos == '+') { + lwsl_err("%s: PUBLISH to wildcard " + "topic \"%s\" not supported\n", + __func__, pub); + return LMMTR_TOPIC_MATCH_ERROR; + } + /* foo/+/bar == foo/xyz/bar ? */ + if (*spos == '+') { + /* Skip ahead */ + while (*ppos != '\0' && *ppos != '/') { + ppos++; + } + } else if (*spos == '#') { + return LMMTR_TOPIC_MATCH; + } else { + if (*ppos == '\0') { + /* foo/bar == foo/bar/# ? */ + if (!strncmp(spos, "/#", 2)) + return LMMTR_TOPIC_MATCH; + return LMMTR_TOPIC_NOMATCH; + /* Non-matching character */ + } else if (*ppos != *spos) { + return LMMTR_TOPIC_NOMATCH; + } + ppos++; + } + spos++; + } + + if (*spos == '\0' && *ppos == '\0') + return LMMTR_TOPIC_MATCH; + + return LMMTR_TOPIC_NOMATCH; +} + +lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, + const char* ptopic) { + lws_mqtt_subs_t *s = mqtt->subs_head; + + while (s) { + /* SUB topic == PUB topic ? */ + /* foo/bar/xyz == foo/bar/xyz ? */ + if (!s->wildcard) { + if (!strcmp((const char*)s->topic, ptopic)) + return s; + } else { + if (lws_mqtt_is_topic_matched( + s->topic, ptopic) == LMMTR_TOPIC_MATCH) + return s; + } + + s = s->next; + } + + return NULL; +} + int _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par, const uint8_t *buf, size_t len) @@ -1908,38 +1953,6 @@ lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, return 0; } -/* - * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or - * PUBREC came before the timeout period - */ - -static void -lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul) -{ - struct _lws_mqtt_related *mqtt = lws_container_of(sul, - struct _lws_mqtt_related, sul_qos_puback_pubrec_wait); - - lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); - - if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND, - mqtt->wsi->user_space, NULL, 0)) - lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); -} - -static void -lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul) -{ - struct _lws_mqtt_related *mqtt = lws_container_of(sul, - struct _lws_mqtt_related, sul_unsuback_wait); - - lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); - - if (mqtt->wsi->a.protocol->callback(mqtt->wsi, - LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT, - mqtt->wsi->user_space, NULL, 0)) - lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); -} - int lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, const void *buf, uint32_t len, int is_complete) @@ -2093,6 +2106,13 @@ do_write: 3 * LWS_USEC_PER_SEC); } + if (wsi->mqtt->inside_shadow) { + wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout; + __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], + &wsi->mqtt->sul_shadow_wait, + 60 * LWS_USEC_PER_SEC); + } + return 0; } diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index 575d2be58..dc46a9362 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -357,6 +357,7 @@ struct _lws_mqtt_related { lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */ lws_sorted_usec_list_t sul_unsuback_wait; /* unsuback wait TO */ lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */ + lws_sorted_usec_list_t sul_shadow_wait; /* Device Shadow wait TO */ struct lws *wsi; /**< so sul can use lws_container_of */ lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */ void *rx_cpkt_param; @@ -383,6 +384,9 @@ struct _lws_mqtt_related { uint8_t done_subscribe:1; uint8_t done_birth:1; + uint8_t inside_shadow:1; + uint8_t done_shadow_subscribe:1; + uint8_t send_shadow_unsubscribe:1; }; /* @@ -438,5 +442,8 @@ lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi); lws_mqtt_subs_t * lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic); +lws_mqtt_match_topic_return_t +lws_mqtt_is_topic_matched(const char* sub, const char* pub); + #endif /* _PRIVATE_LIB_ROLES_MQTT */ diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index b14a82876..cc2fc4cfb 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -147,6 +147,7 @@ typedef struct lws_ss_handle { lws_mqtt_topic_elem_t topic_qos; lws_mqtt_topic_elem_t sub_top; lws_mqtt_subscribe_param_t sub_info; + lws_mqtt_subscribe_param_t shadow_sub; /* allocation that must be destroyed with conn */ void *heap_baggage; const char *subscribe_to; diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 11ab7b27b..551ac0435 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -298,16 +298,189 @@ secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) { return 0; } +static char * +expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len) +{ + lws_strexp_t exp = {0}; + char* expbuf = NULL; + size_t used_in = 0, used_out = 0, post_len = 0; + + if (post) + post_len = strlen(post); + + if (post_len > max_len) + return NULL; + + lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL, + max_len - post_len); + + if (lws_strexp_expand(&exp, str, strlen(str), &used_in, + &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand %s", __func__, str); + return NULL; + } + + expbuf = lws_malloc(used_out + 1 + post_len, __func__); + if (!expbuf) { + lwsl_err("%s, failed to allocate str_exp for %s", __func__, str); + return NULL; + } + lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf, + used_out + 1 + post_len); + + if (lws_strexp_expand(&exp, str, strlen(str), &used_in, + &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand str_exp %s\n", __func__, str); + lws_free(expbuf); + return NULL; + } + if (post) { + strcat(expbuf, post); + } + + return expbuf; +} + +static lws_mqtt_match_topic_return_t +secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic) +{ + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH, + LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH }; + char *expbuf = NULL; + unsigned int i = 0; + lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH; + + if (!topic) + return LMMTR_TOPIC_MATCH_ERROR; + + expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN); + if (!expbuf) { + lwsl_warn("%s, failed to expand Shadow topic", __func__); + return LMMTR_TOPIC_MATCH_ERROR; + } + for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) { + if (lws_mqtt_is_topic_matched( + match[i], expbuf) == LMMTR_TOPIC_MATCH) { + ret = LMMTR_TOPIC_MATCH; + break; + } + } + lws_free(expbuf); + + return ret; +} + +static void +secstream_mqtt_shadow_cleanup(struct lws *wsi) +{ + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + uint32_t i = 0; + + for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { + lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name); + } + + h->u.mqtt.shadow_sub.num_topics = 0; + + if (h->u.mqtt.shadow_sub.topic) { + lws_free(h->u.mqtt.shadow_sub.topic); + h->u.mqtt.shadow_sub.topic = NULL; + } +} + +static lws_ss_state_return_t +secstream_mqtt_shadow_unsubscribe(struct lws *wsi) +{ + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + + if (h->u.mqtt.shadow_sub.num_topics == 0) { + wsi->mqtt->send_shadow_unsubscribe = 0; + wsi->mqtt->inside_shadow = 0; + wsi->mqtt->done_shadow_subscribe = 0; + return LWSSSSRET_OK; + } + + if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) { + lwsl_err("%s, failed to send MQTT unsubsribe", __func__); + return LWSSSSRET_DISCONNECT_ME; + } + /* Expect a UNSUBACK */ + if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { + lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); + return LWSSSSRET_DISCONNECT_ME; + } + wsi->mqtt->send_shadow_unsubscribe = 0; + + return LWSSSSRET_OK; +} + +static int +secstream_mqtt_shadow_subscribe(struct lws *wsi) +{ + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + char* expbuf = NULL; + const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR, + LWS_MQTT_SHADOW_RESP_REJECTED_STR }; + unsigned int suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]); + + if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow) + return 0; + + if (h->u.mqtt.shadow_sub.num_topics > 0) + secstream_mqtt_shadow_cleanup(wsi); + + memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t)); + h->u.mqtt.shadow_sub.topic = lws_malloc( + sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__); + if (!h->u.mqtt.shadow_sub.topic) { + lwsl_err("%s, failed to allocate Shadow topics", __func__); + return -1; + } + h->u.mqtt.shadow_sub.num_topics = suffixes_len; + for (unsigned int i = 0; i < suffixes_len; i++) { + expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i], + LWS_MQTT_MAX_AWSIOT_TOPICLEN); + if (!expbuf) { + lwsl_err("%s, failed to allocate Shadow topic", + __func__); + secstream_mqtt_shadow_cleanup(wsi); + return -1; + } + h->u.mqtt.shadow_sub.topic[i].name = expbuf; + h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos; + } + h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1); + + if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) { + lwsl_notice("%s: unable to subscribe Shadow topics", __func__); + return 0; + } + + /* Expect a SUBACK */ + if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { + lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); + return -1; + } + wsi->mqtt->inside_shadow = 1; + + return 0; +} + static int secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); - lws_mqtt_publish_param_t *pmqpp; - uint8_t buf[LWS_PRE + 1400]; - lws_ss_state_return_t r; + lws_mqtt_publish_param_t *pmqpp = NULL; + lws_ss_metadata_t *omd = NULL; + uint8_t buf[LWS_PRE + 1400] = {0}; + lws_ss_state_return_t r = LWSSSSRET_OK; size_t buflen = sizeof(buf) - LWS_PRE; int f = 0; + lws_strexp_t exp = {0}; + size_t used_in = 0, used_out = 0, topic_len = 0; + char* sub_topic = NULL; switch (reason) { @@ -458,14 +631,85 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, h->subseq = 1; + if (wsi->mqtt->inside_shadow) { + /* + * When Shadow is used, the stream receives multiple + * topics including Shadow response, set received + * topic on the metadata + */ + lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, + NULL, (size_t)-1); + + if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, + strlen(h->policy->u.mqtt.subscribe), + &used_in, &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand subscribe topic", + __func__); + return -1; + } + omd = lws_ss_get_handle_metadata(h, exp.name); + + if (!omd) { + lwsl_err("%s, failed to find metadata for subscribe", + __func__); + return -1; + } + sub_topic = omd->value__may_own_heap; + topic_len = omd->length; + + _lws_ss_set_metadata(omd, exp.name, + (const void *)pmqpp->topic, + pmqpp->topic_len); + } + r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload, len, f); + + if (wsi->mqtt->inside_shadow) { + _lws_ss_set_metadata(omd, exp.name, &sub_topic, + topic_len); + } + if (r != LWSSSSRET_OK) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); + if (wsi->mqtt->inside_shadow) { + uint32_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR); + uint32_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR); + + for (uint32_t i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) { + /* + * received response ('/accepted' or 'rejected') + * and clean up Shadow operation + */ + if (strncmp(h->u.mqtt.shadow_sub.topic[i].name, + pmqpp->topic, pmqpp->topic_len) || + (strlen(pmqpp->topic) < acc_n || + strlen(pmqpp->topic) < rej_n)) + continue; + + if (!strcmp(pmqpp->topic + + (strlen(pmqpp->topic) - acc_n), + LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) || + !strcmp(pmqpp->topic + + (strlen(pmqpp->topic) - rej_n), + LWS_MQTT_SHADOW_RESP_REJECTED_STR)) { + lws_sul_cancel( + &wsi->mqtt->sul_shadow_wait); + wsi->mqtt->send_shadow_unsubscribe = 1; + lws_callback_on_writable(wsi); + return 0; + } + } + } return 0; /* don't passthru */ case LWS_CALLBACK_MQTT_SUBSCRIBED: + if (wsi->mqtt->inside_shadow) { + wsi->mqtt->done_shadow_subscribe = 1; + lws_callback_on_writable(wsi); + return 0; + } /* * Stream demanded a subscribe without a Birth while connecting, once * done notify CONNECTED event to the application. @@ -555,11 +799,25 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen); + if (h->policy->u.mqtt.aws_iot) { + if (secstream_mqtt_is_shadow_matched(wsi, + h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) { + if (!wsi->mqtt->done_shadow_subscribe) + return secstream_mqtt_shadow_subscribe(wsi); + if (wsi->mqtt->send_shadow_unsubscribe) + return secstream_mqtt_shadow_unsubscribe(wsi); + } + } + r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, &buflen, &f); - if (r == LWSSSSRET_TX_DONT_SEND) + if (r == LWSSSSRET_TX_DONT_SEND) { + if (wsi->mqtt->done_shadow_subscribe) { + return secstream_mqtt_shadow_unsubscribe(wsi); + } return 0; + } if (r == LWSSSSRET_DISCONNECT_ME) { lws_mqtt_subscribe_param_t lmsp; @@ -595,19 +853,50 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, case LWS_CALLBACK_MQTT_UNSUBSCRIBED: { struct lws *nwsi = lws_get_network_wsi(wsi); + + if (wsi->mqtt->inside_shadow) { + secstream_mqtt_shadow_cleanup(wsi); + wsi->mqtt->inside_shadow = 0; + wsi->mqtt->done_shadow_subscribe = 0; + break; + } if (nwsi && (nwsi->mux.child_count == 1)) lws_mqtt_client_send_disconnect(nwsi); return -1; } case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT: + if (!wsi || !wsi->mqtt) + return -1; + + if (wsi->mqtt->inside_shadow) { + secstream_mqtt_shadow_cleanup(wsi); + wsi->mqtt->inside_shadow = 0; + wsi->mqtt->done_shadow_subscribe = 0; + lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n", + __func__, lws_ss_tag(h)); + break; + } + if (wsi->mqtt->inside_unsubscribe) { - lwsl_warn("%s: %s: Unsubscribe timout.\n", __func__, + lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__, lws_ss_tag(h)); return -1; } break; + case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT: + if (!wsi || !wsi->mqtt) + return -1; + + if (wsi->mqtt->inside_shadow) { + lwsl_warn("%s: %s: Shadow timeout.\n", __func__, + lws_ss_tag(h)); + wsi->mqtt->send_shadow_unsubscribe = 1; + lws_callback_on_writable(wsi); + } + break; + default: break; }