1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: mqtt: set the CONNECTED state after Birth

This sets the CONNECTED state after Birth topic is processed if
the stream has defined a Birth topic to avoid any confict when
the connection is not stable and the Birth is delayed.
This commit is contained in:
Chunho Lee 2022-04-09 09:42:54 -07:00 committed by Andy Green
parent fe0a5b8bb9
commit a51d3564a2

View file

@ -141,8 +141,8 @@ secstream_mqtt_subscribe(struct lws *wsi)
}
static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
const char* topic,
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
uint32_t payload_len, const char* topic,
lws_mqtt_qos_levels_t qos, uint8_t retain, int f)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
@ -190,17 +190,17 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
mqpp.qos = qos;
mqpp.retain = !!retain;
mqpp.payload = buf;
if (h->writeable_len)
mqpp.payload_len = (uint32_t)h->writeable_len;
if (payload_len)
mqpp.payload_len = payload_len;
else
mqpp.payload_len = (uint32_t)buflen;
mqpp.payload_len = (uint32_t)buf_len;
lwsl_notice("%s: payload len %d\n", __func__,
(int)mqpp.payload_len);
if (lws_mqtt_client_send_publish(wsi, &mqpp,
(const char *)buf,
(uint32_t)buflen,
(uint32_t)buf_len,
f & LWSSS_FLAG_EOM)) {
lwsl_notice("%s: failed to publish\n", __func__);
lws_free(expbuf);
@ -210,6 +210,29 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
return 0;
}
static int
secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) {
lws_strexp_t exp;
size_t used_in, used_out = 0;
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
if (h->policy->u.mqtt.birth_message) {
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
(char *)buf, buflen);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
strlen(h->policy->u.mqtt.birth_message),
&used_in, &used_out) != LSTRX_DONE) {
return 1;
}
}
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf,
used_out, 0, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
}
static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
@ -285,6 +308,23 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->retry = 0;
h->seqstate = SSSEQ_CONNECTED;
if (h->policy->u.mqtt.birth_topic &&
!wsi->mqtt->done_birth) {
struct lws *nwsi = lws_get_network_wsi(wsi);
lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) {
if (w != wsi &&
(w->mqtt->done_birth || w->mqtt->inside_birth)) {
/*
* If any Birth was sent out or
* is pending on other stream,
* skip sending Birth.
*/
wsi->mqtt->done_birth = 1;
break;
}
} lws_end_foreach_ll(w, mux.sibling_list);
}
if (!h->policy->u.mqtt.subscribe ||
!h->policy->u.mqtt.subscribe[0]) {
/*
@ -313,6 +353,17 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_callback_on_writable(wsi);
break;
}
if (h->policy->u.mqtt.birth_topic &&
!wsi->mqtt->done_birth) {
/*
* If a Birth is pending on the stream, then make
* sure the Birth is done before signaling the
* user application.
*/
lws_callback_on_writable(wsi);
break;
}
lws_sul_cancel(&h->sul);
#if defined(LWS_WITH_SYS_METRICS)
/*
@ -351,10 +402,10 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_SUBSCRIBED:
/*
* Stream demanded a subscribe while connecting, once
* Stream demanded a subscribe without a Birth while connecting, once
* done notify CONNECTED event to the application.
*/
if (wsi->mqtt->done_subscribe == 0) {
if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) {
lws_sul_cancel(&h->sul);
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
@ -368,10 +419,15 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_sul_cancel(&h->sul_timeout);
if (wsi->mqtt->inside_birth) {
/*
* Skip LWSSSCS_QOS_ACK_REMOTE for birth topic.
* Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify
* CONNECTED event to the application.
*/
wsi->mqtt->inside_birth = 0;
wsi->mqtt->done_birth = 1;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
lws_callback_on_writable(wsi);
break;
}
r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
@ -379,6 +435,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
break;
case LWS_CALLBACK_MQTT_RESEND:
if (wsi->mqtt->inside_birth) {
lwsl_err("%s: %s: Failed to send Birth\n", __func__,
lws_ss_tag(h));
return -1;
}
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
{
if (!h || !h->info.tx)
@ -393,27 +457,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
return secstream_mqtt_subscribe(wsi);
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) {
lws_strexp_t exp;
size_t used_in, used_out = 0;
if (h->policy->u.mqtt.birth_message) {
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
(char *)(buf + LWS_PRE), buflen);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
strlen(h->policy->u.mqtt.birth_message),
&used_in, &used_out) != LSTRX_DONE) {
return 1;
}
}
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf + LWS_PRE,
used_out, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
}
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
@ -436,10 +485,16 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (r < 0)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f);
if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
(uint32_t)h->writeable_len,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f) != 0) {
r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
}
return 0;
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBED: