diff --git a/include/libwebsockets/lws-mqtt.h b/include/libwebsockets/lws-mqtt.h index 71193e662..eb9dcbb2a 100644 --- a/include/libwebsockets/lws-mqtt.h +++ b/include/libwebsockets/lws-mqtt.h @@ -81,6 +81,13 @@ typedef struct lws_mqtt_client_connect_param_s { uint8_t retain; } will_param; /* MQTT LWT parameters */ + struct { + const char *topic; + const char *message; + lws_mqtt_qos_levels_t qos; + uint8_t retain; + } birth_param; /* MQTT Birth + parameters */ const char *username; const char *password; uint8_t aws_iot; diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h index f84edec99..863140d74 100644 --- a/include/libwebsockets/lws-secure-streams-policy.h +++ b/include/libwebsockets/lws-secure-streams-policy.h @@ -312,11 +312,16 @@ typedef struct lws_ss_policy { const char *will_topic; const char *will_message; + const char *birth_topic; + const char *birth_message; + uint16_t keep_alive; uint8_t qos; uint8_t clean_start; uint8_t will_qos; uint8_t will_retain; + uint8_t birth_qos; + uint8_t birth_retain; uint8_t aws_iot; } mqtt; diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index 2a73d6f5e..b66134362 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -2038,17 +2038,12 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, return 1; } } - /* - * A non-empty Payload is expected and a chunk - * is present - */ - if (pub->payload_len && len) { - p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); - memcpy(p, buf, len); - if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len)) - return 1; - p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); - } + + p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); + memcpy(p, buf, len); + if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len)) + return 1; + p = lws_mqtt_str_next(&mqtt_vh_payload, NULL); if (!is_complete) nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1; diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index 779e2971c..d89f37187 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -371,6 +371,7 @@ struct _lws_mqtt_related { uint8_t inside_payload:1; uint8_t inside_subscribe:1; uint8_t inside_unsubscribe:1; + uint8_t inside_birth:1; uint8_t inside_resume_session:1; uint8_t send_puback:1; uint8_t send_pubrel:1; @@ -380,6 +381,7 @@ struct _lws_mqtt_related { uint8_t unacked_pubrel:1; uint8_t done_subscribe:1; + uint8_t done_birth:1; }; /* diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c index c49879e1e..3427ae6d4 100644 --- a/lib/secure-streams/policy-json.c +++ b/lib/secure-streams/policy-json.c @@ -117,6 +117,10 @@ static const char * const lejp_tokens_policy[] = { "s[].*.mqtt_will_message", "s[].*.mqtt_will_qos", "s[].*.mqtt_will_retain", + "s[].*.mqtt_birth_topic", + "s[].*.mqtt_birth_message", + "s[].*.mqtt_birth_qos", + "s[].*.mqtt_birth_retain", "s[].*.aws_iot", "s[].*.swake_validity", "s[].*.use_auth", @@ -220,6 +224,10 @@ typedef enum { LSSPPT_MQTT_WILL_MESSAGE, LSSPPT_MQTT_WILL_QOS, LSSPPT_MQTT_WILL_RETAIN, + LSSPPT_MQTT_BIRTH_TOPIC, + LSSPPT_MQTT_BIRTH_MESSAGE, + LSSPPT_MQTT_BIRTH_QOS, + LSSPPT_MQTT_BIRTH_RETAIN, LSSPPT_MQTT_AWS_IOT, LSSPPT_SWAKE_VALIDITY, LSSPPT_USE_AUTH, @@ -1035,6 +1043,21 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) a->curr[LTY_POLICY].p->u.mqtt.will_retain = reason == LEJPCB_VAL_TRUE; break; + case LSSPPT_MQTT_BIRTH_TOPIC: + pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_topic; + goto string2; + + case LSSPPT_MQTT_BIRTH_MESSAGE: + pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_message; + goto string2; + + case LSSPPT_MQTT_BIRTH_QOS: + a->curr[LTY_POLICY].p->u.mqtt.birth_qos = (uint8_t)atoi(ctx->buf); + break; + case LSSPPT_MQTT_BIRTH_RETAIN: + a->curr[LTY_POLICY].p->u.mqtt.birth_retain = + reason == LEJPCB_VAL_TRUE; + break; case LSSPPT_MQTT_AWS_IOT: if (reason == LEJPCB_VAL_TRUE) a->curr[LTY_POLICY].p->u.mqtt.aws_iot = diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 1cd7d09bf..f13eea4a5 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -140,15 +140,85 @@ secstream_mqtt_subscribe(struct lws *wsi) return 0; } +static int +secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, + const char* topic, + lws_mqtt_qos_levels_t qos, int f) +{ + lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); + size_t used_in, used_out, topic_limit; + lws_strexp_t exp; + char *expbuf; + lws_mqtt_publish_param_t mqpp; + + if (h->policy->u.mqtt.aws_iot) + topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; + else + topic_limit = LWS_MQTT_MAX_TOPICLEN; + + memset(&mqpp, 0, sizeof(mqpp)); + + lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL, + topic_limit); + + if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in, + &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand MQTT publish" + " topic with no output\n", __func__); + return 1; + } + expbuf = lws_malloc(used_out + 1, __func__); + if (!expbuf) { + lwsl_err("%s, failed to allocate MQTT publish topic", + __func__); + return 1; + } + + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, + used_out + 1); + + if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in, + &used_out) != LSTRX_DONE) { + lws_free(expbuf); + return 1; + } + lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); + mqpp.topic = (char *)expbuf; + + mqpp.topic_len = (uint16_t)strlen(mqpp.topic); + mqpp.packet_id = (uint16_t)(h->txord - 1); + mqpp.payload = buf; + if (h->writeable_len) + mqpp.payload_len = (uint32_t)h->writeable_len; + else + mqpp.payload_len = (uint32_t)buflen; + + lwsl_notice("%s: payload len %d\n", __func__, + (int)mqpp.payload_len); + + mqpp.qos = h->policy->u.mqtt.qos; + + if (lws_mqtt_client_send_publish(wsi, &mqpp, + (const char *)buf, + (uint32_t)buflen, + f & LWSSS_FLAG_EOM)) { + lwsl_notice("%s: failed to publish\n", __func__); + lws_free(expbuf); + return -1; + } + lws_free(expbuf); + return 0; +} + static int secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); - lws_mqtt_publish_param_t mqpp, *pmqpp; + lws_mqtt_publish_param_t *pmqpp; uint8_t buf[LWS_PRE + 1400]; lws_ss_state_return_t r; - size_t buflen; + size_t buflen = sizeof(buf) - LWS_PRE; int f = 0; switch (reason) { @@ -288,8 +358,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lws_sul_cancel(&h->sul); r = lws_ss_event_helper(h, LWSSSCS_CONNECTED); if (r != LWSSSSRET_OK) - return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, - wsi, &h); + return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); } wsi->mqtt->done_subscribe = 1; lws_callback_on_writable(wsi); @@ -297,6 +366,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, case LWS_CALLBACK_MQTT_ACK: lws_sul_cancel(&h->sul_timeout); + if (wsi->mqtt->inside_birth) { + /* + * Skip LWSSSCS_QOS_ACK_REMOTE for birth topic. + */ + wsi->mqtt->inside_birth = 0; + wsi->mqtt->done_birth = 1; + break; + } r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE); if (r != LWSSSSRET_OK) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); @@ -304,10 +381,6 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: { - size_t used_in, used_out, topic_limit; - lws_strexp_t exp; - char *expbuf; - if (!h || !h->info.tx) return 0; lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h)); @@ -316,17 +389,29 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate); break; } - if (h->policy->u.mqtt.aws_iot) - topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; - else - topic_limit = LWS_MQTT_MAX_TOPICLEN; if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe) return secstream_mqtt_subscribe(wsi); - buflen = sizeof(buf) - LWS_PRE; + if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) { + lws_strexp_t exp; + size_t used_in, used_out = 0; + if (h->policy->u.mqtt.birth_message) { + lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, + (char *)(buf + LWS_PRE), buflen); + if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message, + strlen(h->policy->u.mqtt.birth_message), + &used_in, &used_out) != LSTRX_DONE) { + return 1; + } + } + wsi->mqtt->inside_birth = 1; + return secstream_mqtt_publish(wsi, buf + LWS_PRE, + used_out, h->policy->u.mqtt.birth_topic, + h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM); + } r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, - &buflen, &f); + &buflen, &f); if (r == LWSSSSRET_TX_DONT_SEND) return 0; @@ -349,62 +434,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (r < 0) return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h); - memset(&mqpp, 0, sizeof(mqpp)); - - lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL, - topic_limit); - - if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic, - strlen(h->policy->u.mqtt.topic), - &used_in, &used_out) != LSTRX_DONE) { - lwsl_err("%s, failed to expand MQTT publish" - " topic with no output\n", __func__); - return 1; - } - expbuf = lws_malloc(used_out + 1, __func__); - if (!expbuf) { - lwsl_err("%s, failed to allocate MQTT publish topic", - __func__); - return 1; - } - - lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, - used_out + 1); - - if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic, - strlen(h->policy->u.mqtt.topic), &used_in, - &used_out) != LSTRX_DONE) { - lws_free(expbuf); - return 1; - } - lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); - mqpp.topic = (char *)expbuf; - - mqpp.topic_len = (uint16_t)strlen(mqpp.topic); - mqpp.packet_id = (uint16_t)(h->txord - 1); - mqpp.payload = buf + LWS_PRE; - if (h->writeable_len) - mqpp.payload_len = (uint32_t)h->writeable_len; - else - mqpp.payload_len = (uint32_t)buflen; - - lwsl_notice("%s: payload len %d\n", __func__, - (int)mqpp.payload_len); - - mqpp.qos = h->policy->u.mqtt.qos; - - if (lws_mqtt_client_send_publish(wsi, &mqpp, - (const char *)buf + LWS_PRE, - (uint32_t)buflen, - f & LWSSS_FLAG_EOM)) { - lwsl_notice("%s: failed to publish\n", __func__); - lws_free(expbuf); - - return -1; - } - lws_free(expbuf); - - return 0; + return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, + h->policy->u.mqtt.topic, + h->policy->u.mqtt.qos, f); } case LWS_CALLBACK_MQTT_UNSUBSCRIBED: @@ -450,7 +482,9 @@ enum { SSCMM_STRSUB_WILL_TOPIC, SSCMM_STRSUB_WILL_MESSAGE, SSCMM_STRSUB_SUBSCRIBE, - SSCMM_STRSUB_TOPIC + SSCMM_STRSUB_TOPIC, + SSCMM_STRSUB_BIRTH_TOPIC, + SSCMM_STRSUB_BIRTH_MESSAGE }; static int @@ -458,16 +492,18 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, struct lws_client_connect_info *i, union lws_ss_contemp *ct) { - const char *sources[4] = { + const char *sources[6] = { /* we're going to string-substitute these before use */ h->policy->u.mqtt.will_topic, h->policy->u.mqtt.will_message, h->policy->u.mqtt.subscribe, - h->policy->u.mqtt.topic + h->policy->u.mqtt.topic, + h->policy->u.mqtt.birth_topic, + h->policy->u.mqtt.birth_message }; - size_t used_in, olen[4] = { 0, 0, 0, 0 }, tot = 0; + size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0; lws_strexp_t exp; - char *ps[4]; + char *ps[6]; uint8_t *p = NULL; int n = -1; size_t blen; @@ -485,6 +521,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, return -1; } p = (uint8_t *)lws_zalloc(blen+1, __func__); + if (!p) + return -1; n = lws_system_blob_get(b, p, &blen, 0); if (n) { ct->ccp.client_id = NULL; @@ -504,6 +542,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, /* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */ if (b && (blen = lws_system_blob_get_size(b))) { p = (uint8_t *)lws_zalloc(blen+1, __func__); + if (!p) + return -1; n = lws_system_blob_get(b, p, &blen, 0); if (n) { ct->ccp.username = NULL; @@ -520,6 +560,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, /* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */ if (b && (blen = lws_system_blob_get_size(b))) { p = (uint8_t *)lws_zalloc(blen+1, __func__); + if (!p) + return -1; n = lws_system_blob_get(b, p, &blen, 0); if (n) { ct->ccp.password = NULL; @@ -534,6 +576,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u); ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos; ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain; + ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos; + ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain; ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot; h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; @@ -608,6 +652,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE]; h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE]; h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC]; + ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC]; + ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE]; i->method = "MQTT"; i->mqtt_cp = &ct->ccp;