1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

mqtt: Add support for QoS 2

Add support for QoS 2.
This commit is contained in:
Chunho Lee 2021-04-14 06:07:11 -07:00 committed by Andy Green
parent ae8b11aa83
commit 85cec16f95
4 changed files with 262 additions and 20 deletions

View file

@ -878,9 +878,9 @@ enum lws_callback_reasons {
* close the wsi.
*/
LWS_CALLBACK_MQTT_RESEND = 210,
/**< In QoS1, this callback is generated instead of the _ACK one if
* we timed out waiting for a PUBACK and we must resend the message.
* Return nonzero to close the wsi.
/**< In QoS1 or QoS2, this callback is generated instead of the _ACK one
* if we timed out waiting for a PUBACK or a PUBREC, and we must resend
* the message. Return nonzero to close the wsi.
*/
/****** add new things just above ---^ ******/

View file

@ -655,6 +655,114 @@ _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
}
break;
/* PUBREC */
case LMQCPP_PUBREC_PACKET:
lwsl_debug("%s: received PUBREC pkt\n", __func__);
lws_mqtt_vbi_init(&par->vbit);
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
case LMSPR_NEED_MORE:
break;
case LMSPR_COMPLETED:
par->cpkt_remlen = par->vbit.value;
lwsl_debug("%s: PUBREC pkt len = %d\n",
__func__, (int)par->cpkt_remlen);
if (par->cpkt_remlen < 2)
goto send_protocol_error_and_close;
par->state = LMQCPP_PUBREC_VH_PKT_ID;
break;
default:
lwsl_notice("%s: pubrec bad vbi\n", __func__);
goto send_protocol_error_and_close;
}
break;
case LMQCPP_PUBREC_VH_PKT_ID:
if (len < 2) {
lwsl_notice("%s: len breakage 3\n", __func__);
return -1;
}
par->cpkt_id = lws_ser_ru16be(buf);
wsi->mqtt->ack_pkt_id = par->cpkt_id;
buf += 2;
len -= 2;
par->cpkt_remlen -= 2;
par->n = 0;
goto cmd_completion;
/* PUBREL */
case LMQCPP_PUBREL_PACKET:
lwsl_debug("%s: received PUBREL pkt\n", __func__);
lws_mqtt_vbi_init(&par->vbit);
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
case LMSPR_NEED_MORE:
break;
case LMSPR_COMPLETED:
par->cpkt_remlen = par->vbit.value;
lwsl_debug("%s: PUBREL pkt len = %d\n",
__func__, (int)par->cpkt_remlen);
if (par->cpkt_remlen < 2)
goto send_protocol_error_and_close;
par->state = LMQCPP_PUBREL_VH_PKT_ID;
break;
default:
lwsl_err("%s: pubrel bad vbi\n", __func__);
goto send_protocol_error_and_close;
}
break;
case LMQCPP_PUBREL_VH_PKT_ID:
if (len < 2) {
lwsl_notice("%s: len breakage 3\n", __func__);
return -1;
}
par->cpkt_id = lws_ser_ru16be(buf);
wsi->mqtt->ack_pkt_id = par->cpkt_id;
buf += 2;
len -= 2;
par->cpkt_remlen -= 2;
par->n = 0;
goto cmd_completion;
/* PUBCOMP */
case LMQCPP_PUBCOMP_PACKET:
lwsl_debug("%s: received PUBCOMP pkt\n", __func__);
lws_mqtt_vbi_init(&par->vbit);
switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
case LMSPR_NEED_MORE:
break;
case LMSPR_COMPLETED:
par->cpkt_remlen = par->vbit.value;
lwsl_debug("%s: PUBCOMP pkt len = %d\n",
__func__, (int)par->cpkt_remlen);
if (par->cpkt_remlen < 2)
goto send_protocol_error_and_close;
par->state = LMQCPP_PUBCOMP_VH_PKT_ID;
break;
default:
lwsl_err("%s: pubcmp bad vbi\n", __func__);
goto send_protocol_error_and_close;
}
break;
case LMQCPP_PUBCOMP_VH_PKT_ID:
if (len < 2) {
lwsl_notice("%s: len breakage 3\n", __func__);
return -1;
}
par->cpkt_id = lws_ser_ru16be(buf);
wsi->mqtt->ack_pkt_id = par->cpkt_id;
buf += 2;
len -= 2;
par->cpkt_remlen -= 2;
par->n = 0;
goto cmd_completion;
case LMQCPP_PUBLISH_PACKET:
if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
lwsl_notice("%s: Topic rx before subscribing\n",
@ -1214,6 +1322,93 @@ bail1:
return 0;
case LMQCP_PUBREC:
lwsl_err("%s: cmd_completion: PUBREC\n",
__func__);
/*
* Figure out which child asked for this
*/
n = 0;
lws_start_foreach_ll(struct lws *, w,
wsi->mux.child_list) {
if (w->mqtt->unacked_publish &&
w->mqtt->ack_pkt_id == par->cpkt_id) {
char requested_close = 0;
w->mqtt->unacked_publish = 0;
w->mqtt->unacked_pubrel = 1;
if (user_callback_handle_rxflow(
w->a.protocol->callback,
w, LWS_CALLBACK_MQTT_ACK,
w->user_space, NULL, 0) < 0) {
lwsl_info("%s: MQTT_ACK requests close\n",
__func__);
requested_close = 1;
}
n = 1;
/*
* We got an assertive PUBREC,
* no need for timeout wait
* any more
*/
lws_sul_cancel(&w->mqtt->
sul_qos_puback_pubrec_wait);
if (requested_close) {
__lws_close_free_wsi(w,
0, "ack cb");
break;
}
break;
}
} lws_end_foreach_ll(w, mux.sibling_list);
if (!n) {
lwsl_err("%s: unsolicited PUBREC\n",
__func__);
return -1;
}
wsi->mqtt->send_pubrel = 1;
lws_callback_on_writable(wsi);
break;
case LMQCP_PUBCOMP:
lwsl_err("%s: cmd_completion: PUBCOMP\n",
__func__);
n = 0;
lws_start_foreach_ll(struct lws *, w,
wsi->mux.child_list) {
if (w->mqtt->unacked_pubrel > 0 &&
w->mqtt->ack_pkt_id == par->cpkt_id) {
w->mqtt->unacked_pubrel = 0;
n = 1;
}
} lws_end_foreach_ll(w, mux.sibling_list);
if (!n) {
lwsl_err("%s: unsolicited PUBCOMP\n",
__func__);
return -1;
}
/*
* If we published something and PUBCOMP arrived,
* our connection is definitely working in both
* directions at the moment.
*/
lws_validity_confirmed(wsi);
break;
case LMQCP_PUBREL:
lwsl_err("%s: cmd_completion: PUBREL\n",
__func__);
wsi->mqtt->send_pubcomp = 1;
lws_callback_on_writable(wsi);
break;
case LMQCP_PUBACK:
lwsl_info("%s: cmd_completion: PUBACK\n",
__func__);
@ -1245,7 +1440,7 @@ bail1:
* no need for ACK timeout wait
* any more
*/
lws_sul_cancel(&w->mqtt->sul_qos1_puback_wait);
lws_sul_cancel(&w->mqtt->sul_qos_puback_pubrec_wait);
if (requested_close) {
__lws_close_free_wsi(w,
@ -1440,10 +1635,14 @@ bail1:
break;
}
/* For QOS>0, send out PUBACK */
if (pub->qos) {
if (pub->qos == 1) {
/* For QOS = 1, send out PUBACK */
wsi->mqtt->send_puback = 1;
lws_callback_on_writable(wsi);
} else if (pub->qos == 2) {
/* For QOS = 2, send out PUBREC */
wsi->mqtt->send_pubrec = 1;
lws_callback_on_writable(wsi);
}
par->payload_consumed = 0;
@ -1711,15 +1910,15 @@ lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
}
/*
* This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before
* the timeout period
* This fires if the wsi did a PUBLISH under QoS1 or QoS2, but no PUBACK or
* PUBREC came before the timeout period
*/
static void
lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_qos1_puback_wait);
struct _lws_mqtt_related, sul_qos_puback_pubrec_wait);
lwsl_notice("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
@ -1874,17 +2073,16 @@ do_write:
lwsl_err("%s: ACK callback exited\n", __func__);
return 1;
}
return 0;
} else if (pub->qos == QOS1 || pub->qos == QOS2) {
/* For QoS1 or QoS2, if no PUBACK or PUBREC coming after 3s,
* we must RETRY the publish
*/
wsi->mqtt->sul_qos_puback_pubrec_wait.cb = lws_mqtt_publish_resend;
__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
&wsi->mqtt->sul_qos_puback_pubrec_wait,
3 * LWS_USEC_PER_SEC);
}
/* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */
wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend;
__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
&wsi->mqtt->sul_qos1_puback_wait,
3 * LWS_USEC_PER_SEC);
return 0;
}

View file

@ -302,6 +302,37 @@ rops_handle_POLLOUT_mqtt(struct lws *wsi)
return LWS_HP_RET_BAIL_OK;
}
#endif
if (wsi->mqtt && !wsi->mqtt->inside_payload &&
(wsi->mqtt->send_pubrec || wsi->mqtt->send_pubrel ||
wsi->mqtt->send_pubcomp)) {
uint8_t buf[LWS_PRE + 4];
if (wsi->mqtt->send_pubrec) {
lwsl_notice("%s: issuing PUBREC for pkt id: %d\n",
__func__, wsi->mqtt->ack_pkt_id);
buf[LWS_PRE] = LMQCP_PUBREC << 4 | 0x2;
wsi->mqtt->send_pubrec = 0;
} else if (wsi->mqtt->send_pubrel) {
lwsl_notice("%s: issuing PUBREL for pkt id: %d\n",
__func__, wsi->mqtt->ack_pkt_id);
buf[LWS_PRE] = LMQCP_PUBREL << 4 | 0x2;
wsi->mqtt->send_pubrel = 0;
} else {
lwsl_notice("%s: issuing PUBCOMP for pkt id: %d\n",
__func__, wsi->mqtt->ack_pkt_id);
buf[LWS_PRE] = LMQCP_PUBCOMP << 4 | 0x2;
wsi->mqtt->send_pubcomp = 0;
}
/* Remaining len = 2 */
buf[LWS_PRE + 1] = 2;
/* Packet ID */
lws_ser_wu16be(&buf[LWS_PRE + 2],
wsi->mqtt->ack_pkt_id);
if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
LWS_WRITE_BINARY) != 4)
return LWS_HP_RET_BAIL_DIE;
return LWS_HP_RET_BAIL_OK;
}
wsi = lws_get_network_wsi(wsi);
@ -424,7 +455,7 @@ rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
c = &wsi->mqtt->client;
lws_sul_cancel(&wsi->mqtt->sul_qos1_puback_wait);
lws_sul_cancel(&wsi->mqtt->sul_qos_puback_pubrec_wait);
lws_mqtt_str_free(&c->username);
lws_mqtt_str_free(&c->password);

View file

@ -156,6 +156,15 @@ typedef enum {
LMQCPP_PUBACK_VH_PKT_ID,
LMQCPP_PUBACK_PROPERTIES_LEN_VBI,
LMQCPP_PUBREC_PACKET = LMQCP_PUBREC << 4,
LMQCPP_PUBREC_VH_PKT_ID,
LMQCPP_PUBREL_PACKET = LMQCP_PUBREL << 4,
LMQCPP_PUBREL_VH_PKT_ID,
LMQCPP_PUBCOMP_PACKET = LMQCP_PUBCOMP << 4,
LMQCPP_PUBCOMP_VH_PKT_ID,
LMQCPP_SUBACK_PACKET = LMQCP_STOC_SUBACK << 4,
LMQCPP_SUBACK_VH_PKT_ID,
LMQCPP_SUBACK_PAYLOAD,
@ -343,7 +352,7 @@ typedef struct lws_mqttc {
struct _lws_mqtt_related {
lws_mqttc_t client;
lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */
struct lws *wsi; /**< so sul can use lws_container_of */
lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */
void *rx_cpkt_param;
@ -359,7 +368,11 @@ struct _lws_mqtt_related {
uint8_t inside_subscribe:1;
uint8_t inside_unsubscribe:1;
uint8_t send_puback:1;
uint8_t send_pubrel:1;
uint8_t send_pubrec:1;
uint8_t send_pubcomp:1;
uint8_t unacked_publish:1;
uint8_t unacked_pubrel:1;
uint8_t done_subscribe:1;
};