1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-30 00:00:16 +01:00

ss: mqtt: add support for AWS IoT Shadow topic

This introduces AWS IoT Shadow topic support. This subscribes and
unsubscribes shadow response topics before and after shadow topic
is transmitted.
This commit is contained in:
Chunho Lee 2022-04-28 23:22:20 -07:00 committed by Andy Green
parent c398dd546b
commit 8b1693a05e
6 changed files with 457 additions and 112 deletions

View file

@ -887,6 +887,11 @@ enum lws_callback_reasons {
* the _UNSUBSCRIBED one if we timed out waiting for a UNSUBACK.
* Return nonzero to close the wsi.
*/
LWS_CALLBACK_MQTT_SHADOW_TIMEOUT = 212,
/**< When a Device Shadow is sent, this callback is generated if we
* timed out waiting for a response from AWS IoT.
* Return nonzero to close the wsi.
*/
/****** add new things just above ---^ ******/

View file

@ -42,6 +42,29 @@ typedef struct lws_mqtt_str_st lws_mqtt_str_t;
#define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between
1 and 23 chars... */
#define LWS_MQTT_SHADOW_MAX_THING_LEN 128
#define LWS_MQTT_SHADOW_MAX_SHADOW_LEN 64
#define LWS_MQTT_SHADOW_UPDATE_STR "/update"
#define LWS_MQTT_SHADOW_DELETE_STR "/delete"
#define LWS_MQTT_SHADOW_GET_STR "/get"
#define LWS_MQTT_SHADOW_RESP_ACCEPTED_STR "/accepted"
#define LWS_MQTT_SHADOW_RESP_REJECTED_STR "/rejected"
#define LWS_MQTT_SHADOW_RESP_DELTA_STR "/delta"
#define LWS_MQTT_SHADOW_RESP_DOCUMENT_STR "/documents"
#define LWS_MQTT_SHADOW_UPDATE_ACCEPTED_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_ACCEPTED_STR
#define LWS_MQTT_SHADOW_UPDATE_REJECTED_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_REJECTED_STR
#define LWS_MQTT_SHADOW_UPDATE_DELTA_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_DELTA_STR
#define LWS_MQTT_SHADOW_UPDATE_DOCUMENT_STR LWS_MQTT_SHADOW_UPDATE_STR LWS_MQTT_SHADOW_RESP_DOCUMENT_STR
#define LWS_MQTT_SHADOW_DELETE_ACCEPTED_STR LWS_MQTT_SHADOW_DELETE_STR LWS_MQTT_SHADOW_RESP_ACCEPTED_STR
#define LWS_MQTT_SHADOW_DELETE_REJECTED_STR LWS_MQTT_SHADOW_DELETE_STR LWS_MQTT_SHADOW_RESP_REJECTED_STR
#define LWS_MQTT_SHADOW_GET_ACCEPTED_STR LWS_MQTT_SHADOW_GET_STR LWS_MQTT_SHADOW_RESP_ACCEPTED_STR
#define LWS_MQTT_SHADOW_GET_REJECTED_STR LWS_MQTT_SHADOW_GET_STR LWS_MQTT_SHADOW_RESP_REJECTED_STR
#define LWS_MQTT_SHADOW_PREFIX_FORMAT "$aws/things/%s"
#define LWS_MQTT_SHADOW_NAMED_SHADOW_TOPIC_FORMAT LWS_MQTT_SHADOW_PREFIX_FORMAT "/shadow/name/%s%s"
#define LWS_MQTT_SHADOW_UNNAMED_SHADOW_TOPIC_FORMAT LWS_MQTT_SHADOW_PREFIX_FORMAT "/shadow%s"
#define LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH "$aws/things/+/shadow/+"
#define LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH "$aws/things/+/shadow/name/+/+"
typedef enum {
QOS0,
QOS1,

View file

@ -214,13 +214,6 @@ static const uint8_t map_flags[] = {
LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
};
void
lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
{
lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
c->estate = s;
}
static int
lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
{
@ -277,74 +270,6 @@ lws_mqtt_set_client_established(struct lws *wsi)
return 0;
}
static lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char* sub, const char* pub)
{
const char *ppos = pub, *spos = sub;
if (!ppos || !spos) {
return LMMTR_TOPIC_MATCH_ERROR;
}
while (*spos) {
if (*ppos == '#' || *ppos == '+') {
lwsl_err("%s: PUBLISH to wildcard "
"topic \"%s\" not supported\n",
__func__, pub);
return LMMTR_TOPIC_MATCH_ERROR;
}
/* foo/+/bar == foo/xyz/bar ? */
if (*spos == '+') {
/* Skip ahead */
while (*ppos != '\0' && *ppos != '/') {
ppos++;
}
} else if (*spos == '#') {
return LMMTR_TOPIC_MATCH;
} else {
if (*ppos == '\0') {
/* foo/bar == foo/bar/# ? */
if (!strncmp(spos, "/#", 2))
return LMMTR_TOPIC_MATCH;
return LMMTR_TOPIC_NOMATCH;
/* Non-matching character */
} else if (*ppos != *spos) {
return LMMTR_TOPIC_NOMATCH;
}
ppos++;
}
spos++;
}
if (*spos == '\0' && *ppos == '\0')
return LMMTR_TOPIC_MATCH;
return LMMTR_TOPIC_NOMATCH;
}
lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
const char* ptopic) {
lws_mqtt_subs_t *s = mqtt->subs_head;
while (s) {
/* SUB topic == PUB topic ? */
/* foo/bar/xyz == foo/bar/xyz ? */
if (!s->wildcard) {
if (!strcmp((const char*)s->topic, ptopic))
return s;
} else {
if (lws_mqtt_is_topic_matched(
s->topic, ptopic) == LMMTR_TOPIC_MATCH)
return s;
}
s = s->next;
}
return NULL;
}
static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
{
@ -477,6 +402,126 @@ lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
return 1;
}
/*
* This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
* PUBREC came before the timeout period
*/
static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
mqtt->wsi->user_space, NULL, 0))
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_unsuback_wait);
lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
mqtt->wsi->user_space, NULL, 0))
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
static void
lws_mqtt_shadow_timeout(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_shadow_wait);
lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
LWS_CALLBACK_MQTT_SHADOW_TIMEOUT,
mqtt->wsi->user_space, NULL, 0))
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
void
lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
{
lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
c->estate = s;
}
lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char* sub, const char* pub)
{
const char *ppos = pub, *spos = sub;
if (!ppos || !spos) {
return LMMTR_TOPIC_MATCH_ERROR;
}
while (*spos) {
if (*ppos == '#' || *ppos == '+') {
lwsl_err("%s: PUBLISH to wildcard "
"topic \"%s\" not supported\n",
__func__, pub);
return LMMTR_TOPIC_MATCH_ERROR;
}
/* foo/+/bar == foo/xyz/bar ? */
if (*spos == '+') {
/* Skip ahead */
while (*ppos != '\0' && *ppos != '/') {
ppos++;
}
} else if (*spos == '#') {
return LMMTR_TOPIC_MATCH;
} else {
if (*ppos == '\0') {
/* foo/bar == foo/bar/# ? */
if (!strncmp(spos, "/#", 2))
return LMMTR_TOPIC_MATCH;
return LMMTR_TOPIC_NOMATCH;
/* Non-matching character */
} else if (*ppos != *spos) {
return LMMTR_TOPIC_NOMATCH;
}
ppos++;
}
spos++;
}
if (*spos == '\0' && *ppos == '\0')
return LMMTR_TOPIC_MATCH;
return LMMTR_TOPIC_NOMATCH;
}
lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
const char* ptopic) {
lws_mqtt_subs_t *s = mqtt->subs_head;
while (s) {
/* SUB topic == PUB topic ? */
/* foo/bar/xyz == foo/bar/xyz ? */
if (!s->wildcard) {
if (!strcmp((const char*)s->topic, ptopic))
return s;
} else {
if (lws_mqtt_is_topic_matched(
s->topic, ptopic) == LMMTR_TOPIC_MATCH)
return s;
}
s = s->next;
}
return NULL;
}
int
_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
const uint8_t *buf, size_t len)
@ -1908,38 +1953,6 @@ lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
return 0;
}
/*
* This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
* PUBREC came before the timeout period
*/
static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
mqtt->wsi->user_space, NULL, 0))
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_unsuback_wait);
lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
mqtt->wsi->user_space, NULL, 0))
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
int
lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
const void *buf, uint32_t len, int is_complete)
@ -2093,6 +2106,13 @@ do_write:
3 * LWS_USEC_PER_SEC);
}
if (wsi->mqtt->inside_shadow) {
wsi->mqtt->sul_shadow_wait.cb = lws_mqtt_shadow_timeout;
__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
&wsi->mqtt->sul_shadow_wait,
60 * LWS_USEC_PER_SEC);
}
return 0;
}

View file

@ -357,6 +357,7 @@ struct _lws_mqtt_related {
lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
lws_sorted_usec_list_t sul_unsuback_wait; /* unsuback wait TO */
lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */
lws_sorted_usec_list_t sul_shadow_wait; /* Device Shadow wait TO */
struct lws *wsi; /**< so sul can use lws_container_of */
lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */
void *rx_cpkt_param;
@ -383,6 +384,9 @@ struct _lws_mqtt_related {
uint8_t done_subscribe:1;
uint8_t done_birth:1;
uint8_t inside_shadow:1;
uint8_t done_shadow_subscribe:1;
uint8_t send_shadow_unsubscribe:1;
};
/*
@ -438,5 +442,8 @@ lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi);
lws_mqtt_subs_t *
lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic);
lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char* sub, const char* pub);
#endif /* _PRIVATE_LIB_ROLES_MQTT */

View file

@ -147,6 +147,7 @@ typedef struct lws_ss_handle {
lws_mqtt_topic_elem_t topic_qos;
lws_mqtt_topic_elem_t sub_top;
lws_mqtt_subscribe_param_t sub_info;
lws_mqtt_subscribe_param_t shadow_sub;
/* allocation that must be destroyed with conn */
void *heap_baggage;
const char *subscribe_to;

View file

@ -298,16 +298,189 @@ secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) {
return 0;
}
static char *
expand_metadata(lws_ss_handle_t *h, const char* str, const char* post, size_t max_len)
{
lws_strexp_t exp = {0};
char* expbuf = NULL;
size_t used_in = 0, used_out = 0, post_len = 0;
if (post)
post_len = strlen(post);
if (post_len > max_len)
return NULL;
lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
max_len - post_len);
if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
&used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand %s", __func__, str);
return NULL;
}
expbuf = lws_malloc(used_out + 1 + post_len, __func__);
if (!expbuf) {
lwsl_err("%s, failed to allocate str_exp for %s", __func__, str);
return NULL;
}
lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
used_out + 1 + post_len);
if (lws_strexp_expand(&exp, str, strlen(str), &used_in,
&used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand str_exp %s\n", __func__, str);
lws_free(expbuf);
return NULL;
}
if (post) {
strcat(expbuf, post);
}
return expbuf;
}
static lws_mqtt_match_topic_return_t
secstream_mqtt_is_shadow_matched(struct lws *wsi, const char *topic)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
const char *match[] = { LWS_MQTT_SHADOW_UNNAMED_TOPIC_MATCH,
LWS_MQTT_SHADOW_NAMED_TOPIC_MATCH };
char *expbuf = NULL;
unsigned int i = 0;
lws_mqtt_match_topic_return_t ret = LMMTR_TOPIC_NOMATCH;
if (!topic)
return LMMTR_TOPIC_MATCH_ERROR;
expbuf = expand_metadata(h, topic, NULL, LWS_MQTT_MAX_AWSIOT_TOPICLEN);
if (!expbuf) {
lwsl_warn("%s, failed to expand Shadow topic", __func__);
return LMMTR_TOPIC_MATCH_ERROR;
}
for (i = 0; i < (sizeof(match) / sizeof(match[0])); i++) {
if (lws_mqtt_is_topic_matched(
match[i], expbuf) == LMMTR_TOPIC_MATCH) {
ret = LMMTR_TOPIC_MATCH;
break;
}
}
lws_free(expbuf);
return ret;
}
static void
secstream_mqtt_shadow_cleanup(struct lws *wsi)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
uint32_t i = 0;
for (i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) {
lws_free((void *)h->u.mqtt.shadow_sub.topic[i].name);
}
h->u.mqtt.shadow_sub.num_topics = 0;
if (h->u.mqtt.shadow_sub.topic) {
lws_free(h->u.mqtt.shadow_sub.topic);
h->u.mqtt.shadow_sub.topic = NULL;
}
}
static lws_ss_state_return_t
secstream_mqtt_shadow_unsubscribe(struct lws *wsi)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
if (h->u.mqtt.shadow_sub.num_topics == 0) {
wsi->mqtt->send_shadow_unsubscribe = 0;
wsi->mqtt->inside_shadow = 0;
wsi->mqtt->done_shadow_subscribe = 0;
return LWSSSSRET_OK;
}
if (lws_mqtt_client_send_unsubcribe(wsi, &h->u.mqtt.shadow_sub)) {
lwsl_err("%s, failed to send MQTT unsubsribe", __func__);
return LWSSSSRET_DISCONNECT_ME;
}
/* Expect a UNSUBACK */
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
return LWSSSSRET_DISCONNECT_ME;
}
wsi->mqtt->send_shadow_unsubscribe = 0;
return LWSSSSRET_OK;
}
static int
secstream_mqtt_shadow_subscribe(struct lws *wsi)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
char* expbuf = NULL;
const char *suffixes[] = { LWS_MQTT_SHADOW_RESP_ACCEPTED_STR,
LWS_MQTT_SHADOW_RESP_REJECTED_STR };
unsigned int suffixes_len = sizeof(suffixes) / sizeof(suffixes[0]);
if (!h->policy->u.mqtt.topic || wsi->mqtt->inside_shadow)
return 0;
if (h->u.mqtt.shadow_sub.num_topics > 0)
secstream_mqtt_shadow_cleanup(wsi);
memset(&h->u.mqtt.shadow_sub, 0, sizeof(lws_mqtt_subscribe_param_t));
h->u.mqtt.shadow_sub.topic = lws_malloc(
sizeof(lws_mqtt_topic_elem_t) * suffixes_len, __func__);
if (!h->u.mqtt.shadow_sub.topic) {
lwsl_err("%s, failed to allocate Shadow topics", __func__);
return -1;
}
h->u.mqtt.shadow_sub.num_topics = suffixes_len;
for (unsigned int i = 0; i < suffixes_len; i++) {
expbuf = expand_metadata(h, h->policy->u.mqtt.topic, suffixes[i],
LWS_MQTT_MAX_AWSIOT_TOPICLEN);
if (!expbuf) {
lwsl_err("%s, failed to allocate Shadow topic",
__func__);
secstream_mqtt_shadow_cleanup(wsi);
return -1;
}
h->u.mqtt.shadow_sub.topic[i].name = expbuf;
h->u.mqtt.shadow_sub.topic[i].qos = h->policy->u.mqtt.qos;
}
h->u.mqtt.shadow_sub.packet_id = (uint16_t)(h->txord - 1);
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.shadow_sub)) {
lwsl_notice("%s: unable to subscribe Shadow topics", __func__);
return 0;
}
/* Expect a SUBACK */
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
return -1;
}
wsi->mqtt->inside_shadow = 1;
return 0;
}
static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
lws_mqtt_publish_param_t *pmqpp;
uint8_t buf[LWS_PRE + 1400];
lws_ss_state_return_t r;
lws_mqtt_publish_param_t *pmqpp = NULL;
lws_ss_metadata_t *omd = NULL;
uint8_t buf[LWS_PRE + 1400] = {0};
lws_ss_state_return_t r = LWSSSSRET_OK;
size_t buflen = sizeof(buf) - LWS_PRE;
int f = 0;
lws_strexp_t exp = {0};
size_t used_in = 0, used_out = 0, topic_len = 0;
char* sub_topic = NULL;
switch (reason) {
@ -458,14 +631,85 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->subseq = 1;
if (wsi->mqtt->inside_shadow) {
/*
* When Shadow is used, the stream receives multiple
* topics including Shadow response, set received
* topic on the metadata
*/
lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata,
NULL, (size_t)-1);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand subscribe topic",
__func__);
return -1;
}
omd = lws_ss_get_handle_metadata(h, exp.name);
if (!omd) {
lwsl_err("%s, failed to find metadata for subscribe",
__func__);
return -1;
}
sub_topic = omd->value__may_own_heap;
topic_len = omd->length;
_lws_ss_set_metadata(omd, exp.name,
(const void *)pmqpp->topic,
pmqpp->topic_len);
}
r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
len, f);
if (wsi->mqtt->inside_shadow) {
_lws_ss_set_metadata(omd, exp.name, &sub_topic,
topic_len);
}
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
if (wsi->mqtt->inside_shadow) {
uint32_t acc_n = strlen(LWS_MQTT_SHADOW_RESP_ACCEPTED_STR);
uint32_t rej_n = strlen(LWS_MQTT_SHADOW_RESP_REJECTED_STR);
for (uint32_t i = 0; i < h->u.mqtt.shadow_sub.num_topics; i++) {
/*
* received response ('/accepted' or 'rejected')
* and clean up Shadow operation
*/
if (strncmp(h->u.mqtt.shadow_sub.topic[i].name,
pmqpp->topic, pmqpp->topic_len) ||
(strlen(pmqpp->topic) < acc_n ||
strlen(pmqpp->topic) < rej_n))
continue;
if (!strcmp(pmqpp->topic +
(strlen(pmqpp->topic) - acc_n),
LWS_MQTT_SHADOW_RESP_ACCEPTED_STR) ||
!strcmp(pmqpp->topic +
(strlen(pmqpp->topic) - rej_n),
LWS_MQTT_SHADOW_RESP_REJECTED_STR)) {
lws_sul_cancel(
&wsi->mqtt->sul_shadow_wait);
wsi->mqtt->send_shadow_unsubscribe = 1;
lws_callback_on_writable(wsi);
return 0;
}
}
}
return 0; /* don't passthru */
case LWS_CALLBACK_MQTT_SUBSCRIBED:
if (wsi->mqtt->inside_shadow) {
wsi->mqtt->done_shadow_subscribe = 1;
lws_callback_on_writable(wsi);
return 0;
}
/*
* Stream demanded a subscribe without a Birth while connecting, once
* done notify CONNECTED event to the application.
@ -555,11 +799,25 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
if (h->policy->u.mqtt.aws_iot) {
if (secstream_mqtt_is_shadow_matched(wsi,
h->policy->u.mqtt.topic) == LMMTR_TOPIC_MATCH) {
if (!wsi->mqtt->done_shadow_subscribe)
return secstream_mqtt_shadow_subscribe(wsi);
if (wsi->mqtt->send_shadow_unsubscribe)
return secstream_mqtt_shadow_unsubscribe(wsi);
}
}
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
if (r == LWSSSSRET_TX_DONT_SEND) {
if (wsi->mqtt->done_shadow_subscribe) {
return secstream_mqtt_shadow_unsubscribe(wsi);
}
return 0;
}
if (r == LWSSSSRET_DISCONNECT_ME) {
lws_mqtt_subscribe_param_t lmsp;
@ -595,19 +853,50 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
{
struct lws *nwsi = lws_get_network_wsi(wsi);
if (wsi->mqtt->inside_shadow) {
secstream_mqtt_shadow_cleanup(wsi);
wsi->mqtt->inside_shadow = 0;
wsi->mqtt->done_shadow_subscribe = 0;
break;
}
if (nwsi && (nwsi->mux.child_count == 1))
lws_mqtt_client_send_disconnect(nwsi);
return -1;
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
if (!wsi || !wsi->mqtt)
return -1;
if (wsi->mqtt->inside_shadow) {
secstream_mqtt_shadow_cleanup(wsi);
wsi->mqtt->inside_shadow = 0;
wsi->mqtt->done_shadow_subscribe = 0;
lwsl_warn("%s: %s: Unsubscribe (Shadow) timeout.\n",
__func__, lws_ss_tag(h));
break;
}
if (wsi->mqtt->inside_unsubscribe) {
lwsl_warn("%s: %s: Unsubscribe timout.\n", __func__,
lwsl_warn("%s: %s: Unsubscribe timeout.\n", __func__,
lws_ss_tag(h));
return -1;
}
break;
case LWS_CALLBACK_MQTT_SHADOW_TIMEOUT:
if (!wsi || !wsi->mqtt)
return -1;
if (wsi->mqtt->inside_shadow) {
lwsl_warn("%s: %s: Shadow timeout.\n", __func__,
lws_ss_tag(h));
wsi->mqtt->send_shadow_unsubscribe = 1;
lws_callback_on_writable(wsi);
}
break;
default:
break;
}