1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-30 00:00:16 +01:00

ss: mqtt: set the CONNECTED state after Birth

This sets the CONNECTED state after Birth topic is processed if
the stream has defined a Birth topic to avoid any confict when
the connection is not stable and the Birth is delayed.
This commit is contained in:
Chunho Lee 2022-04-09 09:42:54 -07:00 committed by Andy Green
parent 4975000aa2
commit 49f505304e

View file

@ -141,8 +141,8 @@ secstream_mqtt_subscribe(struct lws *wsi)
} }
static int static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
const char* topic, uint32_t payload_len, const char* topic,
lws_mqtt_qos_levels_t qos, uint8_t retain, int f) lws_mqtt_qos_levels_t qos, uint8_t retain, int f)
{ {
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
@ -190,17 +190,17 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
mqpp.qos = qos; mqpp.qos = qos;
mqpp.retain = !!retain; mqpp.retain = !!retain;
mqpp.payload = buf; mqpp.payload = buf;
if (h->writeable_len) if (payload_len)
mqpp.payload_len = (uint32_t)h->writeable_len; mqpp.payload_len = payload_len;
else else
mqpp.payload_len = (uint32_t)buflen; mqpp.payload_len = (uint32_t)buf_len;
lwsl_notice("%s: payload len %d\n", __func__, lwsl_notice("%s: payload len %d\n", __func__,
(int)mqpp.payload_len); (int)mqpp.payload_len);
if (lws_mqtt_client_send_publish(wsi, &mqpp, if (lws_mqtt_client_send_publish(wsi, &mqpp,
(const char *)buf, (const char *)buf,
(uint32_t)buflen, (uint32_t)buf_len,
f & LWSSS_FLAG_EOM)) { f & LWSSS_FLAG_EOM)) {
lwsl_notice("%s: failed to publish\n", __func__); lwsl_notice("%s: failed to publish\n", __func__);
lws_free(expbuf); lws_free(expbuf);
@ -210,6 +210,29 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
return 0; return 0;
} }
static int
secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) {
lws_strexp_t exp;
size_t used_in, used_out = 0;
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
if (h->policy->u.mqtt.birth_message) {
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
(char *)buf, buflen);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
strlen(h->policy->u.mqtt.birth_message),
&used_in, &used_out) != LSTRX_DONE) {
return 1;
}
}
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf,
used_out, 0, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
}
static int static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len) void *in, size_t len)
@ -285,6 +308,23 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
h->retry = 0; h->retry = 0;
h->seqstate = SSSEQ_CONNECTED; h->seqstate = SSSEQ_CONNECTED;
if (h->policy->u.mqtt.birth_topic &&
!wsi->mqtt->done_birth) {
struct lws *nwsi = lws_get_network_wsi(wsi);
lws_start_foreach_ll(struct lws *, w, nwsi->mux.child_list) {
if (w != wsi &&
(w->mqtt->done_birth || w->mqtt->inside_birth)) {
/*
* If any Birth was sent out or
* is pending on other stream,
* skip sending Birth.
*/
wsi->mqtt->done_birth = 1;
break;
}
} lws_end_foreach_ll(w, mux.sibling_list);
}
if (!h->policy->u.mqtt.subscribe || if (!h->policy->u.mqtt.subscribe ||
!h->policy->u.mqtt.subscribe[0]) { !h->policy->u.mqtt.subscribe[0]) {
/* /*
@ -313,6 +353,17 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_callback_on_writable(wsi); lws_callback_on_writable(wsi);
break; break;
} }
if (h->policy->u.mqtt.birth_topic &&
!wsi->mqtt->done_birth) {
/*
* If a Birth is pending on the stream, then make
* sure the Birth is done before signaling the
* user application.
*/
lws_callback_on_writable(wsi);
break;
}
lws_sul_cancel(&h->sul); lws_sul_cancel(&h->sul);
#if defined(LWS_WITH_SYS_METRICS) #if defined(LWS_WITH_SYS_METRICS)
/* /*
@ -351,10 +402,10 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_SUBSCRIBED: case LWS_CALLBACK_MQTT_SUBSCRIBED:
/* /*
* Stream demanded a subscribe while connecting, once * Stream demanded a subscribe without a Birth while connecting, once
* done notify CONNECTED event to the application. * done notify CONNECTED event to the application.
*/ */
if (wsi->mqtt->done_subscribe == 0) { if (!wsi->mqtt->done_subscribe && !h->policy->u.mqtt.birth_topic) {
lws_sul_cancel(&h->sul); lws_sul_cancel(&h->sul);
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK) if (r != LWSSSSRET_OK)
@ -368,10 +419,15 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_sul_cancel(&h->sul_timeout); lws_sul_cancel(&h->sul_timeout);
if (wsi->mqtt->inside_birth) { if (wsi->mqtt->inside_birth) {
/* /*
* Skip LWSSSCS_QOS_ACK_REMOTE for birth topic. * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify
* CONNECTED event to the application.
*/ */
wsi->mqtt->inside_birth = 0; wsi->mqtt->inside_birth = 0;
wsi->mqtt->done_birth = 1; wsi->mqtt->done_birth = 1;
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
lws_callback_on_writable(wsi);
break; break;
} }
r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE); r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
@ -379,6 +435,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
break; break;
case LWS_CALLBACK_MQTT_RESEND:
if (wsi->mqtt->inside_birth) {
lwsl_err("%s: %s: Failed to send Birth\n", __func__,
lws_ss_tag(h));
return -1;
}
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
{ {
if (!h || !h->info.tx) if (!h || !h->info.tx)
@ -393,27 +457,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
return secstream_mqtt_subscribe(wsi); return secstream_mqtt_subscribe(wsi);
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) { if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
lws_strexp_t exp; return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
size_t used_in, used_out = 0;
if (h->policy->u.mqtt.birth_message) {
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
(char *)(buf + LWS_PRE), buflen);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
strlen(h->policy->u.mqtt.birth_message),
&used_in, &used_out) != LSTRX_DONE) {
return 1;
}
}
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf + LWS_PRE,
used_out, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
}
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f); &buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND) if (r == LWSSSSRET_TX_DONT_SEND)
return 0; return 0;
@ -436,10 +485,16 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (r < 0) if (r < 0)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, if (secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
(uint32_t)h->writeable_len,
h->policy->u.mqtt.topic, h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos, h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f); h->policy->u.mqtt.retain, f) != 0) {
r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
}
return 0;
} }
case LWS_CALLBACK_MQTT_UNSUBSCRIBED: case LWS_CALLBACK_MQTT_UNSUBSCRIBED: