1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss-mqtt: Add support for MQTT UNSUBSCRIBE and DISCONNECT

Send UNSUBSCRIBE and DISCONNECT when LWSSSSRET_DISCONNECT_ME is
returned.
This commit is contained in:
Chunho Lee 2021-04-14 06:07:11 -07:00 committed by Andy Green
parent 85cec16f95
commit c5484c0232
5 changed files with 116 additions and 10 deletions

View file

@ -882,6 +882,11 @@ enum lws_callback_reasons {
* if we timed out waiting for a PUBACK or a PUBREC, and we must resend
* the message. Return nonzero to close the wsi.
*/
LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT = 211,
/**< When a UNSUBSCRIBE is sent, this callback is generated instead of
* the _UNSUBSCRIBED one if we timed out waiting for a UNSUBACK.
* Return nonzero to close the wsi.
*/
/****** add new things just above ---^ ******/

View file

@ -176,3 +176,25 @@ lws_mqtt_client_send_connect(struct lws *wsi)
return wsi;
}
struct lws *
lws_mqtt_client_send_disconnect(struct lws *wsi)
{
uint8_t b[256 + LWS_PRE], *start = b + LWS_PRE, *p = start;
/* 1. Fixed Headers */
if (lws_mqtt_fill_fixed_header(p++, LMQCP_DISCONNECT, 0, 0, 0))
{
lwsl_err("%s: Failled to fill fixed header\n", __func__);
return NULL;
}
*p++ = 0;
if (lws_write(wsi, (unsigned char *)&b[LWS_PRE], lws_ptr_diff_size_t(p, start),
LWS_WRITE_BINARY) != lws_ptr_diff(p, start)) {
lwsl_err("%s: write failed\n", __func__);
return NULL;
}
return wsi;
}

View file

@ -1551,6 +1551,7 @@ bail1:
}
n = 1;
lws_sul_cancel(&w->mqtt->sul_unsuback_wait);
if (requested_close) {
__lws_close_free_wsi(w,
0, "unsub ack cb");
@ -1927,6 +1928,20 @@ lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
static void
lws_mqtt_unsuback_timeout(struct lws_sorted_usec_list *sul)
{
struct _lws_mqtt_related *mqtt = lws_container_of(sul,
struct _lws_mqtt_related, sul_unsuback_wait);
lwsl_debug("%s: %s\n", __func__, lws_wsi_tag(mqtt->wsi));
if (mqtt->wsi->a.protocol->callback(mqtt->wsi,
LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT,
mqtt->wsi->user_space, NULL, 0))
lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
}
int
lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
const void *buf, uint32_t len, int is_complete)
@ -2412,6 +2427,11 @@ lws_mqtt_client_send_unsubcribe(struct lws *wsi,
wsi->mqtt->inside_unsubscribe = 1;
wsi->mqtt->sul_unsuback_wait.cb = lws_mqtt_unsuback_timeout;
__lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
&wsi->mqtt->sul_unsuback_wait,
3 * LWS_USEC_PER_SEC);
return 0;
}

View file

@ -353,6 +353,9 @@ typedef struct lws_mqttc {
struct _lws_mqtt_related {
lws_mqttc_t client;
lws_sorted_usec_list_t sul_qos_puback_pubrec_wait; /* QoS1 puback or QoS2 pubrec wait TO */
lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
lws_sorted_usec_list_t sul_unsuback_wait; /* QoS1 unsuback wait TO */
lws_sorted_usec_list_t sul_qos2_pubrec_wait; /* QoS2 pubrec wait TO */
struct lws *wsi; /**< so sul can use lws_container_of */
lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */
void *rx_cpkt_param;
@ -416,6 +419,9 @@ lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
struct lws *
lws_mqtt_client_send_connect(struct lws *wsi);
struct lws *
lws_mqtt_client_send_disconnect(struct lws *wsi);
int
lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
uint8_t dup, lws_mqtt_qos_levels_t qos,

View file

@ -24,6 +24,28 @@
#include <private-lib-core.h>
static void
secstream_mqtt_cleanup(lws_ss_handle_t *h)
{
uint32_t i;
if (h->u.mqtt.heap_baggage) {
lws_free(h->u.mqtt.heap_baggage);
h->u.mqtt.heap_baggage = NULL;
}
if (h->u.mqtt.sub_info.topic) {
for (i = 0; i < h->u.mqtt.sub_info.num_topics; i++) {
if (h->u.mqtt.sub_info.topic[i].name) {
lws_free((void*)h->u.mqtt.sub_info.topic[i].name);
h->u.mqtt.sub_info.topic[i].name = NULL;
}
}
lws_free(h->u.mqtt.sub_info.topic);
h->u.mqtt.sub_info.topic = NULL;
}
}
static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
@ -49,10 +71,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
h->wsi = NULL;
if (h->u.mqtt.heap_baggage) {
lws_free(h->u.mqtt.heap_baggage);
h->u.mqtt.heap_baggage = NULL;
}
secstream_mqtt_cleanup(h);
if (r == LWSSSSRET_DESTROY_ME)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
@ -78,10 +97,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_set_opaque_user_data(h->wsi, NULL);
h->wsi = NULL;
if (h->u.mqtt.heap_baggage) {
lws_free(h->u.mqtt.heap_baggage);
h->u.mqtt.heap_baggage = NULL;
}
secstream_mqtt_cleanup(h);
if (r)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
@ -215,7 +231,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
expbuf, used_out);
expbuf, used_out + 1);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
@ -241,6 +257,10 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
h->u.mqtt.sub_info.num_topics = 1;
h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
h->u.mqtt.sub_info.topic = lws_malloc(sizeof(lws_mqtt_topic_elem_t),
__func__);
h->u.mqtt.sub_info.topic[0].name = lws_strdup(expbuf);
h->u.mqtt.sub_info.topic[0].qos = h->policy->u.mqtt.qos;
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
lwsl_notice("%s: unable to subscribe", __func__);
@ -266,6 +286,22 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
if (r == LWSSSSRET_DISCONNECT_ME) {
lws_mqtt_subscribe_param_t lmsp;
if (h->u.mqtt.sub_info.num_topics) {
lmsp.num_topics = h->u.mqtt.sub_info.num_topics;
lmsp.topic = h->u.mqtt.sub_info.topic;
lmsp.packet_id = (uint16_t)(h->txord - 1);
if (lws_mqtt_client_send_unsubcribe(wsi,
&lmsp)) {
lwsl_err("%s, failed to send"
" MQTT unsubsribe", __func__);
return -1;
}
return 0;
}
}
if (r < 0)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
@ -290,7 +326,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
used_out);
used_out + 1);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
strlen(h->policy->u.mqtt.topic), &used_in,
@ -327,6 +363,23 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return 0;
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
{
struct lws *nwsi = lws_get_network_wsi(wsi);
if (nwsi && (nwsi->mux.child_count == 1))
lws_mqtt_client_send_disconnect(nwsi);
return -1;
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBE_TIMEOUT:
if (wsi->mqtt->inside_unsubscribe) {
lwsl_warn("%s: %s: Unsubscribe timout.\n", __func__,
lws_ss_tag(h));
return -1;
}
break;
default:
break;
}