From 9c7843fbe6dab4203321a103b10d4e5243d211aa Mon Sep 17 00:00:00 2001 From: Sakthi Kannan Date: Mon, 22 Mar 2021 11:58:28 -0700 Subject: [PATCH] mqtt: Setting the CONNECTED state only when SUBACK is received Setting the CONNECTED state only when SUBACK is received if the stream has defined a subscription topic. This is to avoid SS from sending out SUBSCRIBE right after CONNACK, even when the connection is not valid. --- lib/secure-streams/protocols/ss-mqtt.c | 28 +++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 0d719d568..420bec369 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -1,7 +1,7 @@ /* * libwebsockets - small server side websockets and web server implementation * - * Copyright (C) 2019 - 2020 Andy Green + * Copyright (C) 2019 - 2021 Andy Green * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to @@ -93,6 +93,16 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, h->wsi = wsi; h->retry = 0; h->seqstate = SSSEQ_CONNECTED; + /* + * If a subscribe is pending on the stream, then make + * sure the SUBSCRIBE is done before signaling the + * user application. + */ + if (h->policy->u.mqtt.subscribe && + !wsi->mqtt->done_subscribe) { + lws_callback_on_writable(wsi); + break; + } lws_sul_cancel(&h->sul); #if defined(LWS_WITH_SYS_METRICS) /* @@ -130,6 +140,17 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, return 0; /* don't passthru */ case LWS_CALLBACK_MQTT_SUBSCRIBED: + /* + * Stream demanded a subscribe while connecting, once + * done notify CONNECTED event to the application. + */ + if (wsi->mqtt->done_subscribe == 0) { + lws_sul_cancel(&h->sul); + r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); + if (r != LWSSSSRET_OK) + return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, + wsi, &h); + } wsi->mqtt->done_subscribe = 1; lws_callback_on_writable(wsi); break; @@ -185,6 +206,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lwsl_notice("%s: unable to subscribe", __func__); return -1; } + /* Expect a SUBACK */ + if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { + lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); + return -1; + } return 0; }