diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index 08cff4427..b41d44d12 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -2027,7 +2027,9 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, /* Packet ID */ if (pub->qos != QOS0) { p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); - wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id; + if (!pub->dup) + nwsi->mqtt->pkt_id++; + wsi->mqtt->ack_pkt_id = pub->packet_id = nwsi->mqtt->pkt_id; lwsl_debug("%s: pkt_id = %d\n", __func__, (int)wsi->mqtt->ack_pkt_id); lws_ser_wu16be(p, pub->packet_id); diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index d89f37187..575d2be58 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -45,6 +45,7 @@ extern struct lws_role_ops role_ops_mqtt; #define LWS_MQTT_RESPONSE_TIMEOUT (3 * LWS_US_PER_SEC) #define LWS_MQTT_RETRY_CEILING (60 * LWS_US_PER_SEC) +#define LWS_MQTT_MAX_PUBLISH_RETRY (3) typedef enum { LMSPR_COMPLETED = 0, @@ -354,7 +355,7 @@ struct _lws_mqtt_related { lws_mqttc_t client; lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */ lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */ - lws_sorted_usec_list_t sul_unsuback_wait; /* QoS1 unsuback wait TO */ + lws_sorted_usec_list_t sul_unsuback_wait; /* unsuback wait TO */ lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */ struct lws *wsi; /**< so sul can use lws_container_of */ lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */ diff --git a/lib/secure-streams/private-lib-secure-streams.h b/lib/secure-streams/private-lib-secure-streams.h index 6af59e470..b14a82876 100644 --- a/lib/secure-streams/private-lib-secure-streams.h +++ b/lib/secure-streams/private-lib-secure-streams.h @@ -151,6 +151,10 @@ typedef struct lws_ss_handle { void *heap_baggage; const char *subscribe_to; size_t subscribe_to_len; + struct lws_buflist *buflist_unacked; + uint32_t unacked_size; + uint8_t retry_count; + uint8_t send_unacked:1; } mqtt; #endif #if defined(LWS_WITH_SYS_SMD) diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 911f5789f..11ab7b27b 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -44,6 +44,7 @@ secstream_mqtt_cleanup(lws_ss_handle_t *h) lws_free(h->u.mqtt.sub_info.topic); h->u.mqtt.sub_info.topic = NULL; } + lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); } static int @@ -143,7 +144,8 @@ secstream_mqtt_subscribe(struct lws *wsi) static int secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, uint32_t payload_len, const char* topic, - lws_mqtt_qos_levels_t qos, uint8_t retain, int f) + lws_mqtt_qos_levels_t qos, uint8_t retain, uint8_t dup, + int f) { lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); size_t used_in, used_out, topic_limit; @@ -190,6 +192,7 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, mqpp.qos = qos; mqpp.retain = !!retain; mqpp.payload = buf; + mqpp.dup = !!dup; if (payload_len) mqpp.payload_len = payload_len; else @@ -207,6 +210,15 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buf_len, return -1; } lws_free(expbuf); + + if ((mqpp.qos == QOS1 || mqpp.qos == QOS2) && buf_len > 0) { + if (lws_buflist_append_segment(&h->u.mqtt.buflist_unacked, + buf, buf_len) < 0) { + lwsl_notice("%s: failed to store unacked\n", __func__); + return -1; + } + } + return 0; } @@ -229,10 +241,63 @@ secstream_mqtt_birth(struct lws *wsi, uint8_t *buf, size_t buflen) { return secstream_mqtt_publish(wsi, buf, used_out, 0, h->policy->u.mqtt.birth_topic, h->policy->u.mqtt.birth_qos, - h->policy->u.mqtt.birth_retain, + h->policy->u.mqtt.birth_retain, 0, LWSSS_FLAG_EOM); } +static int +secstream_mqtt_resend(struct lws *wsi, uint8_t *buf) { + uint8_t *buffered; + size_t len; + int f = 0, r; + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + + len = lws_buflist_next_segment_len(&h->u.mqtt.buflist_unacked, + &buffered); + + if (h->u.mqtt.unacked_size <= len) + f |= LWSSS_FLAG_EOM; + + if (!len) { + /* when the message does not have payload */ + buffered = buf; + } else { + h->u.mqtt.unacked_size -= (uint32_t)len; + } + + if (wsi->mqtt->inside_birth) { + r = secstream_mqtt_publish(wsi, buffered, len, 0, + h->policy->u.mqtt.birth_topic, + h->policy->u.mqtt.birth_qos, + h->policy->u.mqtt.birth_retain, + 1, f); + } else { + r = secstream_mqtt_publish(wsi, buffered, len, + (uint32_t)h->writeable_len, + h->policy->u.mqtt.topic, + h->policy->u.mqtt.qos, + h->policy->u.mqtt.retain, 1, f); + } + if (len) + lws_buflist_use_segment(&h->u.mqtt.buflist_unacked, len); + + if (r) { + lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); + h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; + + if (wsi->mqtt->inside_birth) { + lwsl_err("%s: %s: failed to send Birth\n", __func__, + lws_ss_tag(h)); + return -1; + } else { + r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); + if (r != LWSSSSRET_OK) + return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); + } + } + return 0; +} + static int secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -417,6 +482,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, case LWS_CALLBACK_MQTT_ACK: lws_sul_cancel(&h->sul_timeout); + if (h->u.mqtt.send_unacked) { + lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); + h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; + } + if (wsi->mqtt->inside_birth) { /* * Skip LWSSSCS_QOS_ACK_REMOTE for a Birth, notify @@ -436,11 +506,33 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, break; case LWS_CALLBACK_MQTT_RESEND: + lws_sul_cancel(&h->sul_timeout); + if (h->u.mqtt.retry_count++ < LWS_MQTT_MAX_PUBLISH_RETRY) { + h->u.mqtt.unacked_size = + (uint32_t)lws_buflist_total_len(&h->u.mqtt.buflist_unacked); + if (h->u.mqtt.unacked_size) { + lwsl_notice("%s: %s: resend unacked message (%d/%d) \n", + __func__, lws_ss_tag(h), + h->u.mqtt.retry_count, + LWS_MQTT_MAX_PUBLISH_RETRY); + h->u.mqtt.send_unacked = 1; + lws_callback_on_writable(wsi); + break; + } + } + + lws_buflist_destroy_all_segments(&h->u.mqtt.buflist_unacked); + h->u.mqtt.retry_count = h->u.mqtt.send_unacked = 0; + if (wsi->mqtt->inside_birth) { - lwsl_err("%s: %s: Failed to send Birth\n", __func__, + lwsl_err("%s: %s: failed to send Birth\n", __func__, lws_ss_tag(h)); return -1; } + + r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); + if (r != LWSSSSRET_OK) + return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); break; case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: @@ -457,6 +549,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) return secstream_mqtt_subscribe(wsi); + if (h->u.mqtt.send_unacked) + return secstream_mqtt_resend(wsi, buf + LWS_PRE); + if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) return secstream_mqtt_birth(wsi, buf + LWS_PRE, buflen); @@ -489,7 +584,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, (uint32_t)h->writeable_len, h->policy->u.mqtt.topic, h->policy->u.mqtt.qos, - h->policy->u.mqtt.retain, f) != 0) { + h->policy->u.mqtt.retain, 0, f) != 0) { r = lws_ss_event_helper(h, LWSSSCS_QOS_NACK_REMOTE); if (r != LWSSSSRET_OK) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); diff --git a/lib/secure-streams/secure-streams.c b/lib/secure-streams/secure-streams.c index c74d229ce..911f71321 100644 --- a/lib/secure-streams/secure-streams.c +++ b/lib/secure-streams/secure-streams.c @@ -147,11 +147,16 @@ const uint32_t ss_state_txn_validity[] = { (1 << LWSSSCS_TIMEOUT) | #if defined(LWS_ROLE_MQTT) (1 << LWSSSCS_QOS_ACK_REMOTE) | + (1 << LWSSSCS_QOS_NACK_REMOTE) | #endif (1 << LWSSSCS_DESTROYING), [LWSSSCS_QOS_NACK_REMOTE] = (1 << LWSSSCS_DISCONNECTED) | (1 << LWSSSCS_TIMEOUT) | +#if defined(LWS_ROLE_MQTT) + (1 << LWSSSCS_QOS_ACK_REMOTE) | + (1 << LWSSSCS_QOS_NACK_REMOTE) | +#endif (1 << LWSSSCS_DESTROYING), [LWSSSCS_QOS_ACK_LOCAL] = (1 << LWSSSCS_DISCONNECTED) |