1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss-mqtt: Skip SUBSCRIBE when MQTT session is resumed

This commit is contained in:
Chunho Lee 2021-10-06 19:20:47 -07:00 committed by Andy Green
parent 97023b6512
commit fa50cf23b0
3 changed files with 130 additions and 82 deletions

View file

@ -2272,6 +2272,9 @@ lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
return 1;
}
if (wsi->mqtt->inside_resume_session)
return 0;
if (lws_write(nwsi, start, lws_ptr_diff_size_t(p, start), LWS_WRITE_BINARY) !=
lws_ptr_diff(p, start))
return 1;

View file

@ -371,6 +371,7 @@ struct _lws_mqtt_related {
uint8_t inside_payload:1;
uint8_t inside_subscribe:1;
uint8_t inside_unsubscribe:1;
uint8_t inside_resume_session:1;
uint8_t send_puback:1;
uint8_t send_pubrel:1;
uint8_t send_pubrec:1;

View file

@ -46,6 +46,100 @@ secstream_mqtt_cleanup(lws_ss_handle_t *h)
}
}
static int
secstream_mqtt_subscribe(struct lws *wsi)
{
size_t used_in, used_out, topic_limit;
lws_strexp_t exp;
char* expbuf;
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
if (!h || !h->policy)
return -1;
if (h->policy->u.mqtt.aws_iot)
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
else
topic_limit = LWS_MQTT_MAX_TOPICLEN;
if (!h->policy->u.mqtt.subscribe || wsi->mqtt->done_subscribe)
return 0;
lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, NULL,
topic_limit);
/*
* Expand with no output first to calculate the size of
* expanded string then, allocate new buffer and expand
* again with the buffer
*/
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe), &used_in,
&used_out) != LSTRX_DONE) {
lwsl_err(
"%s, failed to expand MQTT subscribe"
" topic with no output\n",
__func__);
return 1;
}
expbuf = lws_malloc(used_out + 1, __func__);
if (!expbuf) {
lwsl_err(
"%s, failed to allocate MQTT subscribe"
"topic",
__func__);
return 1;
}
lws_strexp_init(&exp, (void*)h, lws_ss_exp_cb_metadata, expbuf,
used_out + 1);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe), &used_in,
&used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT subscribe topic\n",
__func__);
lws_free(expbuf);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
h->u.mqtt.sub_top.name = expbuf;
/*
* The policy says to subscribe to something, and we
* haven't done it yet. Do it using the pre-prepared
* string-substituted version of the policy string.
*/
lwsl_notice("%s: subscribing %s\n", __func__,
h->u.mqtt.sub_top.name);
h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
h->u.mqtt.sub_info.num_topics = 1;
h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
h->u.mqtt.sub_info.topic =
lws_malloc(sizeof(lws_mqtt_topic_elem_t), __func__);
h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
lwsl_notice("%s: unable to subscribe", __func__);
lws_free(expbuf);
h->u.mqtt.sub_top.name = NULL;
return -1;
}
lws_free(expbuf);
h->u.mqtt.sub_top.name = NULL;
/* Expect a SUBACK */
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
return -1;
}
return 0;
}
static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
@ -120,13 +214,32 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->wsi = wsi;
h->retry = 0;
h->seqstate = SSSEQ_CONNECTED;
/*
* If a subscribe is pending on the stream, then make
* sure the SUBSCRIBE is done before signaling the
* user application.
*/
if (h->policy->u.mqtt.subscribe &&
!wsi->mqtt->done_subscribe) {
if (!h->policy->u.mqtt.subscribe ||
!h->policy->u.mqtt.subscribe[0]) {
/*
* If subscribe is empty in the policy, then,
* skip sending SUBSCRIBE and signal the user
* application.
*/
wsi->mqtt->done_subscribe = 1;
} else if (!h->policy->u.mqtt.clean_start &&
wsi->mqtt->session_resumed) {
wsi->mqtt->inside_resume_session = 1;
/*
* If the previous session is resumed and Server has
* stored session, then, do not subscribe.
*/
if (!secstream_mqtt_subscribe(wsi))
wsi->mqtt->done_subscribe = 1;
wsi->mqtt->inside_resume_session = 0;
} else if (h->policy->u.mqtt.subscribe &&
!wsi->mqtt->done_subscribe) {
/*
* If a subscribe is pending on the stream, then make
* sure the SUBSCRIBE is done before signaling the
* user application.
*/
lws_callback_on_writable(wsi);
break;
}
@ -208,79 +321,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
else
topic_limit = LWS_MQTT_MAX_TOPICLEN;
if (h->policy->u.mqtt.subscribe &&
!wsi->mqtt->done_subscribe) {
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
NULL, topic_limit);
/*
* Expand with no output first to calculate the size of
* expanded string then, allocate new buffer and expand
* again with the buffer
*/
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT subscribe"
" topic with no output\n", __func__);
return 1;
}
expbuf = lws_malloc(used_out + 1, __func__);
if (!expbuf) {
lwsl_err("%s, failed to allocate MQTT subscribe"
"topic", __func__);
return 1;
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
expbuf, used_out + 1);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT subscribe topic\n",
__func__);
lws_free(expbuf);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
h->u.mqtt.sub_top.name = expbuf;
/*
* The policy says to subscribe to something, and we
* haven't done it yet. Do it using the pre-prepared
* string-substituted version of the policy string.
*/
lwsl_notice("%s: subscribing %s\n", __func__,
h->u.mqtt.sub_top.name);
h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
h->u.mqtt.sub_info.num_topics = 1;
h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
h->u.mqtt.sub_info.topic = lws_malloc(sizeof(lws_mqtt_topic_elem_t),
__func__);
h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
lwsl_notice("%s: unable to subscribe", __func__);
lws_free(expbuf);
h->u.mqtt.sub_top.name = NULL;
return -1;
}
lws_free(expbuf);
h->u.mqtt.sub_top.name = NULL;
/* Expect a SUBACK */
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
return -1;
}
return 0;
}
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
return secstream_mqtt_subscribe(wsi);
buflen = sizeof(buf) - LWS_PRE;
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
@ -308,8 +350,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
memset(&mqpp, 0, sizeof(mqpp));
/* this is the string-substituted h->policy->u.mqtt.topic */
mqpp.topic = (char *)h->u.mqtt.topic_qos.name;
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
topic_limit);
@ -513,6 +554,9 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
*/
for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
if (!sources[n])
continue;
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
NULL, (size_t)-1);
if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),