diff --git a/include/libwebsockets/lws-callbacks.h b/include/libwebsockets/lws-callbacks.h index baec7f18e..62848fa5a 100644 --- a/include/libwebsockets/lws-callbacks.h +++ b/include/libwebsockets/lws-callbacks.h @@ -882,6 +882,11 @@ enum lws_callback_reasons { * if we timed out waiting for a PUBACK or a PUBREC, and we must resend * the message. Return nonzero to close the wsi. */ + LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT = 211, + /**< When a UNSUBSCRIBE is sent, this callback is generated instead of + * the _UNSUBSCRIBED one if we timed out waiting for a UNSUBACK. + * Return nonzero to close the wsi. + */ /****** add new things just above ---^ ******/ diff --git a/lib/roles/mqtt/client/client-mqtt-handshake.c b/lib/roles/mqtt/client/client-mqtt-handshake.c index ab9cec79e..009838bef 100644 --- a/lib/roles/mqtt/client/client-mqtt-handshake.c +++ b/lib/roles/mqtt/client/client-mqtt-handshake.c @@ -176,3 +176,25 @@ lws_mqtt_client_send_connect(struct lws *wsi) return wsi; } + +struct lws * +lws_mqtt_client_send_disconnect(struct lws *wsi) +{ + uint8_t b[256 + LWS_PRE], *start = b + LWS_PRE, *p = start; + + /* 1. Fixed Headers */ + if (lws_mqtt_fill_fixed_header(p++, LMQCP_DISCONNECT, 0, 0, 0)) + { + lwsl_err("%s: Failled to fill fixed header\n", __func__); + return NULL; + } + *p++ = 0; + if (lws_write(wsi, (unsigned char *)&b[LWS_PRE], lws_ptr_diff_size_t(p, start), + LWS_WRITE_BINARY) != lws_ptr_diff(p, start)) { + lwsl_err("%s: write failed\n", __func__); + + return NULL; + } + + return wsi; +} diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index 7f3e1a174..2071903d8 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -1551,6 +1551,7 @@ bail1: } n = 1; + lws_sul_cancel(&w->mqtt->sul_unsuback_wait); if (requested_close) { __lws_close_free_wsi(w, 0, "unsub ack cb"); @@ -1927,6 +1928,20 @@ lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul) lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); } +static void +lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul) +{ + struct _lws_mqtt_related *mqtt = lws_container_of(sul, + struct _lws_mqtt_related, sul_unsuback_wait); + + lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi)); + + if (mqtt->wsi->a.protocol->callback(mqtt->wsi, + LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT, + mqtt->wsi->user_space, NULL, 0)) + lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC); +} + int lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, const void *buf, uint32_t len, int is_complete) @@ -2412,6 +2427,11 @@ lws_mqtt_client_send_unsubcribe(struct lws *wsi, wsi->mqtt->inside_unsubscribe = 1; + wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout; + __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend], + &wsi->mqtt->sul_unsuback_wait, + 3 * LWS_USEC_PER_SEC); + return 0; } diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index 8ca47ceb4..fd312b77e 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -353,6 +353,9 @@ typedef struct lws_mqttc { struct _lws_mqtt_related { lws_mqttc_t client; lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */ + lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */ + lws_sorted_usec_list_t sul_unsuback_wait; /* QoS1 unsuback wait TO */ + lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */ struct lws *wsi; /**< so sul can use lws_container_of */ lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */ void *rx_cpkt_param; @@ -416,6 +419,9 @@ lws_create_client_mqtt_object(const struct lws_client_connect_info *i, struct lws * lws_mqtt_client_send_connect(struct lws *wsi); +struct lws * +lws_mqtt_client_send_disconnect(struct lws *wsi); + int lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type, uint8_t dup, lws_mqtt_qos_levels_t qos, diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index a5319028a..9b92a5682 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -24,6 +24,28 @@ #include +static void +secstream_mqtt_cleanup(lws_ss_handle_t *h) +{ + uint32_t i; + + if (h->u.mqtt.heap_baggage) { + lws_free(h->u.mqtt.heap_baggage); + h->u.mqtt.heap_baggage = NULL; + } + + if (h->u.mqtt.sub_info.topic) { + for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) { + if (h->u.mqtt.sub_info.topic[i].name) { + lws_free((void*)h->u.mqtt.sub_info.topic[i].name); + h->u.mqtt.sub_info.topic[i].name = NULL; + } + } + lws_free(h->u.mqtt.sub_info.topic); + h->u.mqtt.sub_info.topic = NULL; + } +} + static int secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) @@ -49,10 +71,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE); h->wsi = NULL; - if (h->u.mqtt.heap_baggage) { - lws_free(h->u.mqtt.heap_baggage); - h->u.mqtt.heap_baggage = NULL; - } + secstream_mqtt_cleanup(h); if (r == LWSSSSRET_DESTROY_ME) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); @@ -78,10 +97,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lws_set_opaque_user_data(h->wsi, NULL); h->wsi = NULL; - if (h->u.mqtt.heap_baggage) { - lws_free(h->u.mqtt.heap_baggage); - h->u.mqtt.heap_baggage = NULL; - } + secstream_mqtt_cleanup(h); if (r) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); @@ -215,7 +231,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, } lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, - expbuf, used_out); + expbuf, used_out + 1); if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, strlen(h->policy->u.mqtt.subscribe), @@ -241,6 +257,10 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info)); h->u.mqtt.sub_info.num_topics = 1; h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top; + h->u.mqtt.sub_info.topic = lws_malloc(sizeof(lws_mqtt_topic_elem_t), + __func__); + h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf); + h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos; if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { lwsl_notice("%s: unable to subscribe", __func__); @@ -266,6 +286,22 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (r == LWSSSSRET_TX_DONT_SEND) return 0; + if (r == LWSSSSRET_DISCONNECT_ME) { + lws_mqtt_subscribe_param_t lmsp; + if (h->u.mqtt.sub_info.num_topics) { + lmsp.num_topics = h->u.mqtt.sub_info.num_topics; + lmsp.topic = h->u.mqtt.sub_info.topic; + lmsp.packet_id = (uint16_t)(h->txord - 1); + if (lws_mqtt_client_send_unsubcribe(wsi, + &lmsp)) { + lwsl_err("%s, failed to send" + " MQTT unsubsribe", __func__); + return -1; + } + return 0; + } + } + if (r < 0) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); @@ -290,7 +326,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, } lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, - used_out); + used_out + 1); if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic, strlen(h->policy->u.mqtt.topic), &used_in, @@ -327,6 +363,23 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, return 0; } + + case LWS_CALLBACK_MQTT_UNSUBSCRIBED: + { + struct lws *nwsi = lws_get_network_wsi(wsi); + if (nwsi && (nwsi->mux.child_count == 1)) + lws_mqtt_client_send_disconnect(nwsi); + return -1; + } + + case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT: + if (wsi->mqtt->inside_unsubscribe) { + lwsl_warn("%s: %s: Unsubscribe timout.\n", __func__, + lws_ss_tag(h)); + return -1; + } + break; + default: break; }