diff --git a/include/libwebsockets/lws-mqtt.h b/include/libwebsockets/lws-mqtt.h index 614eaaf19..ff0ebb8a0 100644 --- a/include/libwebsockets/lws-mqtt.h +++ b/include/libwebsockets/lws-mqtt.h @@ -33,7 +33,8 @@ typedef struct lws_mqtt_str_st lws_mqtt_str_t; #define LWS_MQTT_FINAL_PART 1 -#define LWS_MQTT_MAX_TOPICLEN 256 +#define LWS_MQTT_MAX_AWSIOT_TOPICLEN 256 +#define LWS_MQTT_MAX_TOPICLEN 65535 #define LWS_MQTT_MAX_CIDLEN 128 #define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between 1 and 23 chars... */ @@ -77,6 +78,7 @@ typedef struct lws_mqtt_client_connect_param_s { parameters */ const char *username; const char *password; + uint8_t aws_iot; } lws_mqtt_client_connect_param_t; /* diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h index f3924605d..a6d0f8cb7 100644 --- a/include/libwebsockets/lws-secure-streams-policy.h +++ b/include/libwebsockets/lws-secure-streams-policy.h @@ -305,6 +305,7 @@ typedef struct lws_ss_policy { uint8_t clean_start; uint8_t will_qos; uint8_t will_retain; + uint8_t aws_iot; } mqtt; diff --git a/lib/roles/mqtt/client/client-mqtt.c b/lib/roles/mqtt/client/client-mqtt.c index 93b539eb3..5ad4471a6 100644 --- a/lib/roles/mqtt/client/client-mqtt.c +++ b/lib/roles/mqtt/client/client-mqtt.c @@ -123,6 +123,7 @@ lws_create_client_mqtt_object(const struct lws_client_connect_info *i, lws_free((void *)cp->client_id); c->keep_alive_secs = cp->keep_alive; + c->aws_iot = cp->aws_iot; if (cp->will_param.topic && *cp->will_param.topic) { diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index 08a899394..3c253fb11 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -346,19 +346,25 @@ lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt, } static lws_mqtt_validate_topic_return_t -lws_mqtt_validate_topic(const char *topic, size_t topiclen) +lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot) { size_t spos = 0; const char *sub = topic; int8_t slashes = 0; lws_mqtt_validate_topic_return_t ret = LMVTR_VALID; - if (topiclen > LWS_MQTT_MAX_TOPICLEN) - return LMVTR_FAILED_OVERSIZE; - - if (topic[0] == '$') { - ret = LMVTR_VALID_SHADOW; - slashes = -3; + if (awsiot) { + if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN) + return LMVTR_FAILED_OVERSIZE; + if (topic[0] == '$') { + ret = LMVTR_VALID_SHADOW; + slashes = -3; + } + } else { + if (topiclen > LWS_MQTT_MAX_TOPICLEN) + return LMVTR_FAILED_OVERSIZE; + if (topic[0] == '$') + return LMVTR_FAILED_WILDCARD_FORMAT; } while (*sub != 0) { @@ -389,7 +395,7 @@ lws_mqtt_validate_topic(const char *topic, size_t topiclen) sub++; } - if (slashes < 0 || slashes > 7) + if (awsiot && (slashes < 0 || slashes > 7)) return LMVTR_FAILED_SHADOW_FORMAT; return ret; @@ -402,7 +408,7 @@ lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic) size_t topiclen = strlen(topic); lws_mqtt_validate_topic_return_t flag; - flag = lws_mqtt_validate_topic(topic, topiclen); + flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot); switch (flag) { case LMVTR_FAILED_OVERSIZE: lwsl_err("%s: Topic is too long\n", diff --git a/lib/roles/mqtt/private-lib-roles-mqtt.h b/lib/roles/mqtt/private-lib-roles-mqtt.h index 1f91c5a81..6383ead2e 100644 --- a/lib/roles/mqtt/private-lib-roles-mqtt.h +++ b/lib/roles/mqtt/private-lib-roles-mqtt.h @@ -338,6 +338,7 @@ typedef struct lws_mqttc { } will; uint16_t keep_alive_secs; uint8_t conn_flags; + uint8_t aws_iot; } lws_mqttc_t; struct _lws_mqtt_related { diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c index 3876ae2c9..7ecba8c34 100644 --- a/lib/secure-streams/policy-json.c +++ b/lib/secure-streams/policy-json.c @@ -114,6 +114,7 @@ static const char * const lejp_tokens_policy[] = { "s[].*.mqtt_will_message", "s[].*.mqtt_will_qos", "s[].*.mqtt_will_retain", + "s[].*.aws_iot", "s[].*.swake_validity", "s[].*.use_auth", "s[].*.aws_region", @@ -212,6 +213,7 @@ typedef enum { LSSPPT_MQTT_WILL_MESSAGE, LSSPPT_MQTT_WILL_QOS, LSSPPT_MQTT_WILL_RETAIN, + LSSPPT_MQTT_AWS_IOT, LSSPPT_SWAKE_VALIDITY, LSSPPT_USE_AUTH, LSSPPT_AWS_REGION, @@ -1010,6 +1012,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) a->curr[LTY_POLICY].p->u.mqtt.will_retain = reason == LEJPCB_VAL_TRUE; break; + case LSSPPT_MQTT_AWS_IOT: + if (reason == LEJPCB_VAL_TRUE) + a->curr[LTY_POLICY].p->u.mqtt.aws_iot = + reason == LEJPCB_VAL_TRUE; + break; #endif case LSSPPT_PROTOCOL: diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 4b44f5e20..a42831dd6 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -164,9 +164,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: { - size_t used_in, used_out; + size_t used_in, used_out, topic_limit; lws_strexp_t exp; - char expbuf[LWS_MQTT_MAX_TOPICLEN+1]; + char *expbuf; if (!h || !h->info.tx) return 0; @@ -176,16 +176,44 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate); break; } + if (h->policy->u.mqtt.aws_iot) + topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN; + else + topic_limit = LWS_MQTT_MAX_TOPICLEN; if (h->policy->u.mqtt.subscribe && !wsi->mqtt->done_subscribe) { - lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, sizeof(expbuf)); + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, + NULL, topic_limit); + /* + * Expand with no output first to calculate the size of + * expanded string then, allocate new buffer and expand + * again with the buffer + */ + if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, + strlen(h->policy->u.mqtt.subscribe), + &used_in, &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand MQTT subscribe" + " topic with no output\n", __func__); + return 1; + } + + expbuf = lws_malloc(used_out + 1, __func__); + if (!expbuf) { + lwsl_err("%s, failed to allocate MQTT subscribe" + "topic", __func__); + return 1; + } + + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, + expbuf, used_out); if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe, strlen(h->policy->u.mqtt.subscribe), &used_in, &used_out) != LSTRX_DONE) { - lwsl_err("%s, faled to expand MQTT subscribe topic\n", + lwsl_err("%s, failed to expand MQTT subscribe topic\n", __func__); + lws_free(expbuf); return 1; } lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); @@ -207,8 +235,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) { lwsl_notice("%s: unable to subscribe", __func__); + lws_free(expbuf); + h->u.mqtt.sub_top.name = NULL; return -1; } + lws_free(expbuf); + h->u.mqtt.sub_top.name = NULL; /* Expect a SUBACK */ if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) { lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__); @@ -231,12 +263,32 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, memset(&mqpp, 0, sizeof(mqpp)); /* this is the string-substituted h->policy->u.mqtt.topic */ mqpp.topic = (char *)h->u.mqtt.topic_qos.name; - lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, sizeof(expbuf)); + lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL, + topic_limit); if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic, strlen(h->policy->u.mqtt.topic), - &used_in, &used_out) != LSTRX_DONE) + &used_in, &used_out) != LSTRX_DONE) { + lwsl_err("%s, failed to expand MQTT publish" + " topic with no output\n", __func__); return 1; + } + expbuf = lws_malloc(used_out + 1, __func__); + if (!expbuf) { + lwsl_err("%s, failed to allocate MQTT publish topic", + __func__); + return 1; + } + + lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, + used_out); + + if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic, + strlen(h->policy->u.mqtt.topic), &used_in, + &used_out) != LSTRX_DONE) { + lws_free(expbuf); + return 1; + } lwsl_notice("%s, expbuf - %s\n", __func__, expbuf); mqpp.topic = (char *)expbuf; @@ -258,9 +310,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, (uint32_t)buflen, f & LWSSS_FLAG_EOM)) { lwsl_notice("%s: failed to publish\n", __func__); + lws_free(expbuf); return -1; } + lws_free(expbuf); return 0; } @@ -376,6 +430,7 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len, ct->ccp.clean_start = h->policy->u.mqtt.clean_start; ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos; ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain; + ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot; h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos; /*