1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: mqtt: add QOS_NACK_REMOTE state on PUBLISH failure

This adds setting QOS_NACK_REMOTE state when QoS 1/2 PUBLISH
transmissions and all retries are unacked and failed. Also this
allows state transitions between QOS_ACK_REMOTE and QOS_NACK_REMOTE.
This commit is contained in:
Chunho Lee 2022-04-09 10:26:33 -07:00 committed by Andy Green
parent a51d3564a2
commit 21baf47aed
5 changed files with 113 additions and 6 deletions

View file

@ -2027,7 +2027,9 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
/* Packet ID */
if (pub->qos != QOS0) {
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
if (!pub->dup)
nwsi->mqtt->pkt_id++;
wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id;
lwsl_debug("%s: pkt_id = %d\n", __func__,
(int)wsi->mqtt->ack_pkt_id);
lws_ser_wu16be(p, pub->packet_id);

View file

@ -45,6 +45,7 @@ extern struct lws_role_ops role_ops_mqtt;
#define LWS_MQTT_RESPONSE_TIMEOUT (3 * LWS_US_PER_SEC)
#define LWS_MQTT_RETRY_CEILING (60 * LWS_US_PER_SEC)
#define LWS_MQTT_MAX_PUBLISH_RETRY (3)
typedef enum {
LMSPR_COMPLETED = 0,
@ -354,7 +355,7 @@ struct _lws_mqtt_related {
lws_mqttc_t client;
lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */
lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
lws_sorted_usec_list_t sul_unsuback_wait; /* QoS1 unsuback wait TO */
lws_sorted_usec_list_t sul_unsuback_wait; /* unsuback wait TO */
lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */
struct lws *wsi; /**< so sul can use lws_container_of */
lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */

View file

@ -151,6 +151,10 @@ typedef struct lws_ss_handle {
void *heap_baggage;
const char *subscribe_to;
size_t subscribe_to_len;
struct lws_buflist *buflist_unacked;
uint32_t unacked_size;
uint8_t retry_count;
uint8_t send_unacked:1;
} mqtt;
#endif
#if defined(LWS_WITH_SYS_SMD)

View file

@ -44,6 +44,7 @@ secstream_mqtt_cleanup(lws_ss_handle_t *h)
lws_free(h->u.mqtt.sub_info.topic);
h->u.mqtt.sub_info.topic = NULL;
}
lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
}
static int
@ -143,7 +144,8 @@ secstream_mqtt_subscribe(struct lws *wsi)
static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
uint32_t payload_len, const char* topic,
lws_mqtt_qos_levels_t qos, uint8_t retain, int f)
lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup,
int f)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
size_t used_in, used_out, topic_limit;
@ -190,6 +192,7 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
mqpp.qos = qos;
mqpp.retain = !!retain;
mqpp.payload = buf;
mqpp.dup = !!dup;
if (payload_len)
mqpp.payload_len = payload_len;
else
@ -207,6 +210,15 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len,
return -1;
}
lws_free(expbuf);
if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) {
if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked,
buf, buf_len) < 0) {
lwsl_notice("%s: failed to store unacked\n", __func__);
return -1;
}
}
return 0;
}
@ -229,10 +241,63 @@ secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) {
return secstream_mqtt_publish(wsi, buf,
used_out, 0, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
h->policy->u.mqtt.birth_retain, 0,
LWSSS_FLAG_EOM);
}
static int
secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) {
uint8_t *buffered;
size_t len;
int f = 0, r;
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked,
&buffered);
if (h->u.mqtt.unacked_size <= len)
f |= LWSSS_FLAG_EOM;
if (!len) {
/* when the message does not have payload */
buffered = buf;
} else {
h->u.mqtt.unacked_size -= (uint32_t)len;
}
if (wsi->mqtt->inside_birth) {
r = secstream_mqtt_publish(wsi, buffered, len, 0,
h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
1, f);
} else {
r = secstream_mqtt_publish(wsi, buffered, len,
(uint32_t)h->writeable_len,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, 1, f);
}
if (len)
lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len);
if (r) {
lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
if (wsi->mqtt->inside_birth) {
lwsl_err("%s: %s: failed to send Birth\n", __func__,
lws_ss_tag(h));
return -1;
} else {
r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
}
}
return 0;
}
static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
@ -417,6 +482,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_ACK:
lws_sul_cancel(&h->sul_timeout);
if (h->u.mqtt.send_unacked) {
lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
}
if (wsi->mqtt->inside_birth) {
/*
* Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify
@ -436,11 +506,33 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
break;
case LWS_CALLBACK_MQTT_RESEND:
lws_sul_cancel(&h->sul_timeout);
if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) {
h->u.mqtt.unacked_size =
(uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked);
if (h->u.mqtt.unacked_size) {
lwsl_notice("%s: %s: resend unacked message (%d/%d) \n",
__func__, lws_ss_tag(h),
h->u.mqtt.retry_count,
LWS_MQTT_MAX_PUBLISH_RETRY);
h->u.mqtt.send_unacked = 1;
lws_callback_on_writable(wsi);
break;
}
}
lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked);
h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0;
if (wsi->mqtt->inside_birth) {
lwsl_err("%s: %s: Failed to send Birth\n", __func__,
lwsl_err("%s: %s: failed to send Birth\n", __func__,
lws_ss_tag(h));
return -1;
}
r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
@ -457,6 +549,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
return secstream_mqtt_subscribe(wsi);
if (h->u.mqtt.send_unacked)
return secstream_mqtt_resend(wsi, buf + LWS_PRE);
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic)
return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen);
@ -489,7 +584,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
(uint32_t)h->writeable_len,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f) != 0) {
h->policy->u.mqtt.retain, 0, f) != 0) {
r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);

View file

@ -147,11 +147,16 @@ const uint32_t ss_state_txn_validity[] = {
(1 << LWSSSCS_TIMEOUT) |
#if defined(LWS_ROLE_MQTT)
(1 << LWSSSCS_QOS_ACK_REMOTE) |
(1 << LWSSSCS_QOS_NACK_REMOTE) |
#endif
(1 << LWSSSCS_DESTROYING),
[LWSSSCS_QOS_NACK_REMOTE] = (1 << LWSSSCS_DISCONNECTED) |
(1 << LWSSSCS_TIMEOUT) |
#if defined(LWS_ROLE_MQTT)
(1 << LWSSSCS_QOS_ACK_REMOTE) |
(1 << LWSSSCS_QOS_NACK_REMOTE) |
#endif
(1 << LWSSSCS_DESTROYING),
[LWSSSCS_QOS_ACK_LOCAL] = (1 << LWSSSCS_DISCONNECTED) |