diff --git a/include/libwebsockets/lws-callbacks.h b/include/libwebsockets/lws-callbacks.h index 7a7826226..baec7f18e 100644 --- a/include/libwebsockets/lws-callbacks.h +++ b/include/libwebsockets/lws-callbacks.h @@ -878,9 +878,9 @@ enum lws_callback_reasons { * close the wsi. */ LWS_CALLBACK_MQTT_RESEND = 210, - /**< In QoS1, this callback is generated instead of the _ACK one if - * we timed out waiting for a PUBACK and we must resend the message. - * Return nonzero to close the wsi. + /**< In QoS1 or QoS2, this callback is generated instead of the _ACK one + * if we timed out waiting for a PUBACK or a PUBREC, and we must resend + * the message. Return nonzero to close the wsi. */ /****** add new things just above ---^ ******/ diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index 9ec43096d..7f3e1a174 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -655,6 +655,114 @@ _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par, } break; + /* PUBREC */ + case LMQCPP_PUBREC_PACKET: + lwsl_debug("%s: received PUBREC pkt\n", __func__); + lws_mqtt_vbi_init(&par->vbit); + switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { + case LMSPR_NEED_MORE: + break; + case LMSPR_COMPLETED: + par->cpkt_remlen = par->vbit.value; + lwsl_debug("%s: PUBREC pkt len = %d\n", + __func__, (int)par->cpkt_remlen); + if (par->cpkt_remlen < 2) + goto send_protocol_error_and_close; + par->state = LMQCPP_PUBREC_VH_PKT_ID; + break; + default: + lwsl_notice("%s: pubrec bad vbi\n", __func__); + goto send_protocol_error_and_close; + } + break; + + case LMQCPP_PUBREC_VH_PKT_ID: + if (len < 2) { + lwsl_notice("%s: len breakage 3\n", __func__); + return -1; + } + + par->cpkt_id = lws_ser_ru16be(buf); + wsi->mqtt->ack_pkt_id = par->cpkt_id; + buf += 2; + len -= 2; + par->cpkt_remlen -= 2; + par->n = 0; + + goto cmd_completion; + + /* PUBREL */ + case LMQCPP_PUBREL_PACKET: + lwsl_debug("%s: received PUBREL pkt\n", __func__); + lws_mqtt_vbi_init(&par->vbit); + switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { + case LMSPR_NEED_MORE: + break; + case LMSPR_COMPLETED: + par->cpkt_remlen = par->vbit.value; + lwsl_debug("%s: PUBREL pkt len = %d\n", + __func__, (int)par->cpkt_remlen); + if (par->cpkt_remlen < 2) + goto send_protocol_error_and_close; + par->state = LMQCPP_PUBREL_VH_PKT_ID; + break; + default: + lwsl_err("%s: pubrel bad vbi\n", __func__); + goto send_protocol_error_and_close; + } + break; + + case LMQCPP_PUBREL_VH_PKT_ID: + if (len < 2) { + lwsl_notice("%s: len breakage 3\n", __func__); + return -1; + } + + par->cpkt_id = lws_ser_ru16be(buf); + wsi->mqtt->ack_pkt_id = par->cpkt_id; + buf += 2; + len -= 2; + par->cpkt_remlen -= 2; + par->n = 0; + + goto cmd_completion; + + /* PUBCOMP */ + case LMQCPP_PUBCOMP_PACKET: + lwsl_debug("%s: received PUBCOMP pkt\n", __func__); + lws_mqtt_vbi_init(&par->vbit); + switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) { + case LMSPR_NEED_MORE: + break; + case LMSPR_COMPLETED: + par->cpkt_remlen = par->vbit.value; + lwsl_debug("%s: PUBCOMP pkt len = %d\n", + __func__, (int)par->cpkt_remlen); + if (par->cpkt_remlen < 2) + goto send_protocol_error_and_close; + par->state = LMQCPP_PUBCOMP_VH_PKT_ID; + break; + default: + lwsl_err("%s: pubcmp bad vbi\n", __func__); + goto send_protocol_error_and_close; + } + break; + + case LMQCPP_PUBCOMP_VH_PKT_ID: + if (len < 2) { + lwsl_notice("%s: len breakage 3\n", __func__); + return -1; + } + + par->cpkt_id = lws_ser_ru16be(buf); + wsi->mqtt->ack_pkt_id = par->cpkt_id; + buf += 2; + len -= 2; + par->cpkt_remlen -= 2; + par->n = 0; + + goto cmd_completion; + case LMQCPP_PUBLISH_PACKET: if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) { lwsl_notice("%s: Topic rx before subscribing\n", @@ -1214,6 +1322,93 @@ bail1: return 0; + case LMQCP_PUBREC: + lwsl_err("%s: cmd_completion: PUBREC\n", + __func__); + /* + * Figure out which child asked for this + */ + n = 0; + lws_start_foreach_ll(struct lws *, w, + wsi->mux.child_list) { + if (w->mqtt->unacked_publish && + w->mqtt->ack_pkt_id == par->cpkt_id) { + char requested_close = 0; + + w->mqtt->unacked_publish = 0; + w->mqtt->unacked_pubrel = 1; + + if (user_callback_handle_rxflow( + w->a.protocol->callback, + w, LWS_CALLBACK_MQTT_ACK, + w->user_space, NULL, 0) < 0) { + lwsl_info("%s: MQTT_ACK requests close\n", + __func__); + requested_close = 1; + } + n = 1; + + /* + * We got an assertive PUBREC, + * no need for timeout wait + * any more + */ + lws_sul_cancel(&w->mqtt-> + sul_qos_puback_pubrec_wait); + + if (requested_close) { + __lws_close_free_wsi(w, + 0, "ack cb"); + break; + } + + break; + } + } lws_end_foreach_ll(w, mux.sibling_list); + + if (!n) { + lwsl_err("%s: unsolicited PUBREC\n", + __func__); + return -1; + } + wsi->mqtt->send_pubrel = 1; + lws_callback_on_writable(wsi); + break; + + case LMQCP_PUBCOMP: + lwsl_err("%s: cmd_completion: PUBCOMP\n", + __func__); + n = 0; + lws_start_foreach_ll(struct lws *, w, + wsi->mux.child_list) { + if (w->mqtt->unacked_pubrel > 0 && + w->mqtt->ack_pkt_id == par->cpkt_id) { + w->mqtt->unacked_pubrel = 0; + n = 1; + } + } lws_end_foreach_ll(w, mux.sibling_list); + + if (!n) { + lwsl_err("%s: unsolicited PUBCOMP\n", + __func__); + return -1; + } + + /* + * If we published something and PUBCOMP arrived, + * our connection is definitely working in both + * directions at the moment. + */ + lws_validity_confirmed(wsi); + break; + + case LMQCP_PUBREL: + lwsl_err("%s: cmd_completion: PUBREL\n", + __func__); + wsi->mqtt->send_pubcomp = 1; + lws_callback_on_writable(wsi); + break; + case LMQCP_PUBACK: lwsl_info("%s: cmd_completion: PUBACK\n", __func__); @@ -1245,7 +1440,7 @@ bail1: * no need for ACK timeout wait * any more */ - lws_sul_cancel(&w->mqtt->sul_qos1_puback_wait); + lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait); if (requested_close) { __lws_close_free_wsi(w, @@ -1440,10 +1635,14 @@ bail1: break; } - /* For QOS>0, send out PUBACK */ - if (pub->qos) { + if (pub->qos == 1) { + /* For QOS = 1, send out PUBACK */ wsi->mqtt->send_puback = 1; lws_callback_on_writable(wsi); + } else if (pub->qos == 2) { + /* For QOS = 2, send out PUBREC */ + wsi->mqtt->send_pubrec = 1; + lws_callback_on_writable(wsi); } par->payload_consumed = 0; @@ -1711,15 +1910,15 @@ lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, } /* - * This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before - * the timeout period + * This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or + * PUBREC came before the timeout period */ static void lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul) { struct _lws_mqtt_related *mqtt = lws_container_of(sul, - struct _lws_mqtt_related, sul_qos1_puback_wait); + struct _lws_mqtt_related, sul_qos_puback_pubrec_wait); lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); @@ -1874,17 +2073,16 @@ do_write: lwsl_err("%s: ACK callback exited\n", __func__); return 1; } - - return 0; + } else if (pub->qos == QOS1 || pub->qos == QOS2) { + /* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s, + * we must RETRY the publish + */ + wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend; + __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], + &wsi->mqtt->sul_qos_puback_pubrec_wait, + 3 * LWS_USEC_PER_SEC); } - /* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */ - - wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend; - __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], - &wsi->mqtt->sul_qos1_puback_wait, - 3 * LWS_USEC_PER_SEC); - return 0; } diff --git a/lib/roles/mqtt/ops-mqtt.c b/lib/roles/mqtt/ops-mqtt.c index 3f04bf9fa..f4caa6ebc 100644 --- a/lib/roles/mqtt/ops-mqtt.c +++ b/lib/roles/mqtt/ops-mqtt.c @@ -302,6 +302,37 @@ rops_handle_POLLOUT_mqtt(struct lws *wsi) return LWS_HP_RET_BAIL_OK; } #endif + if (wsi->mqtt && !wsi->mqtt->inside_payload && + (wsi->mqtt->send_pubrec || wsi->mqtt->send_pubrel || + wsi->mqtt->send_pubcomp)) { + uint8_t buf[LWS_PRE + 4]; + if (wsi->mqtt->send_pubrec) { + lwsl_notice("%s: issuing PUBREC for pkt id: %d\n", + __func__, wsi->mqtt->ack_pkt_id); + buf[LWS_PRE] = LMQCP_PUBREC << 4 | 0x2; + wsi->mqtt->send_pubrec = 0; + } else if (wsi->mqtt->send_pubrel) { + lwsl_notice("%s: issuing PUBREL for pkt id: %d\n", + __func__, wsi->mqtt->ack_pkt_id); + buf[LWS_PRE] = LMQCP_PUBREL << 4 | 0x2; + wsi->mqtt->send_pubrel = 0; + } else { + lwsl_notice("%s: issuing PUBCOMP for pkt id: %d\n", + __func__, wsi->mqtt->ack_pkt_id); + buf[LWS_PRE] = LMQCP_PUBCOMP << 4 | 0x2; + wsi->mqtt->send_pubcomp = 0; + } + /* Remaining len = 2 */ + buf[LWS_PRE + 1] = 2; + /* Packet ID */ + lws_ser_wu16be(&buf[LWS_PRE + 2], + wsi->mqtt->ack_pkt_id); + + if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4, + LWS_WRITE_BINARY) != 4) + return LWS_HP_RET_BAIL_DIE; + return LWS_HP_RET_BAIL_OK; + } wsi = lws_get_network_wsi(wsi); @@ -424,7 +455,7 @@ rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi) c = &wsi->mqtt->client; - lws_sul_cancel(&wsi->mqtt->sul_qos1_puback_wait); + lws_sul_cancel(&wsi->mqtt->sul_qos_puback_pubrec_wait); lws_mqtt_str_free(&c->username); lws_mqtt_str_free(&c->password); diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index 6383ead2e..8ca47ceb4 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -156,6 +156,15 @@ typedef enum { LMQCPP_PUBACK_VH_PKT_ID, LMQCPP_PUBACK_PROPERTIES_LEN_VBI, + LMQCPP_PUBREC_PACKET = LMQCP_PUBREC << 4, + LMQCPP_PUBREC_VH_PKT_ID, + + LMQCPP_PUBREL_PACKET = LMQCP_PUBREL << 4, + LMQCPP_PUBREL_VH_PKT_ID, + + LMQCPP_PUBCOMP_PACKET = LMQCP_PUBCOMP << 4, + LMQCPP_PUBCOMP_VH_PKT_ID, + LMQCPP_SUBACK_PACKET = LMQCP_STOC_SUBACK << 4, LMQCPP_SUBACK_VH_PKT_ID, LMQCPP_SUBACK_PAYLOAD, @@ -343,7 +352,7 @@ typedef struct lws_mqttc { struct _lws_mqtt_related { lws_mqttc_t client; - lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */ + lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */ struct lws *wsi; /**< so sul can use lws_container_of */ lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */ void *rx_cpkt_param; @@ -359,7 +368,11 @@ struct _lws_mqtt_related { uint8_t inside_subscribe:1; uint8_t inside_unsubscribe:1; uint8_t send_puback:1; + uint8_t send_pubrel:1; + uint8_t send_pubrec:1; + uint8_t send_pubcomp:1; uint8_t unacked_publish:1; + uint8_t unacked_pubrel:1; uint8_t done_subscribe:1; };