1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-16 00:00:07 +01:00

mqtt: topic validation for different mqtt servers

AWS IoT enforces limits topic level and length. If 'aws_iot' is set
on the policy, the topic limits will be enforced for AWS IoT.
This commit is contained in:
Chunho Lee 2021-03-29 07:46:14 -07:00 committed by Andy Green
parent d464df74a3
commit 8fc7dc33a3
7 changed files with 89 additions and 16 deletions

View file

@ -33,7 +33,8 @@ typedef struct lws_mqtt_str_st lws_mqtt_str_t;
#define LWS_MQTT_FINAL_PART 1
#define LWS_MQTT_MAX_TOPICLEN 256
#define LWS_MQTT_MAX_AWSIOT_TOPICLEN 256
#define LWS_MQTT_MAX_TOPICLEN 65535
#define LWS_MQTT_MAX_CIDLEN 128
#define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between
1 and 23 chars... */
@ -77,6 +78,7 @@ typedef struct lws_mqtt_client_connect_param_s {
parameters */
const char *username;
const char *password;
uint8_t aws_iot;
} lws_mqtt_client_connect_param_t;
/*

View file

@ -305,6 +305,7 @@ typedef struct lws_ss_policy {
uint8_t clean_start;
uint8_t will_qos;
uint8_t will_retain;
uint8_t aws_iot;
} mqtt;

View file

@ -123,6 +123,7 @@ lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
lws_free((void *)cp->client_id);
c->keep_alive_secs = cp->keep_alive;
c->aws_iot = cp->aws_iot;
if (cp->will_param.topic &&
*cp->will_param.topic) {

View file

@ -346,19 +346,25 @@ lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
}
static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char *topic, size_t topiclen)
lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
{
size_t spos = 0;
const char *sub = topic;
int8_t slashes = 0;
lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
if (topiclen > LWS_MQTT_MAX_TOPICLEN)
return LMVTR_FAILED_OVERSIZE;
if (topic[0] == '$') {
ret = LMVTR_VALID_SHADOW;
slashes = -3;
if (awsiot) {
if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
return LMVTR_FAILED_OVERSIZE;
if (topic[0] == '$') {
ret = LMVTR_VALID_SHADOW;
slashes = -3;
}
} else {
if (topiclen > LWS_MQTT_MAX_TOPICLEN)
return LMVTR_FAILED_OVERSIZE;
if (topic[0] == '$')
return LMVTR_FAILED_WILDCARD_FORMAT;
}
while (*sub != 0) {
@ -389,7 +395,7 @@ lws_mqtt_validate_topic(const char *topic, size_t topiclen)
sub++;
}
if (slashes < 0 || slashes > 7)
if (awsiot && (slashes < 0 || slashes > 7))
return LMVTR_FAILED_SHADOW_FORMAT;
return ret;
@ -402,7 +408,7 @@ lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
size_t topiclen = strlen(topic);
lws_mqtt_validate_topic_return_t flag;
flag = lws_mqtt_validate_topic(topic, topiclen);
flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
switch (flag) {
case LMVTR_FAILED_OVERSIZE:
lwsl_err("%s: Topic is too long\n",

View file

@ -338,6 +338,7 @@ typedef struct lws_mqttc {
} will;
uint16_t keep_alive_secs;
uint8_t conn_flags;
uint8_t aws_iot;
} lws_mqttc_t;
struct _lws_mqtt_related {

View file

@ -114,6 +114,7 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.mqtt_will_message",
"s[].*.mqtt_will_qos",
"s[].*.mqtt_will_retain",
"s[].*.aws_iot",
"s[].*.swake_validity",
"s[].*.use_auth",
"s[].*.aws_region",
@ -212,6 +213,7 @@ typedef enum {
LSSPPT_MQTT_WILL_MESSAGE,
LSSPPT_MQTT_WILL_QOS,
LSSPPT_MQTT_WILL_RETAIN,
LSSPPT_MQTT_AWS_IOT,
LSSPPT_SWAKE_VALIDITY,
LSSPPT_USE_AUTH,
LSSPPT_AWS_REGION,
@ -1010,6 +1012,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->u.mqtt.will_retain =
reason == LEJPCB_VAL_TRUE;
break;
case LSSPPT_MQTT_AWS_IOT:
if (reason == LEJPCB_VAL_TRUE)
a->curr[LTY_POLICY].p->u.mqtt.aws_iot =
reason == LEJPCB_VAL_TRUE;
break;
#endif
case LSSPPT_PROTOCOL:

View file

@ -164,9 +164,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
{
size_t used_in, used_out;
size_t used_in, used_out, topic_limit;
lws_strexp_t exp;
char expbuf[LWS_MQTT_MAX_TOPICLEN+1];
char *expbuf;
if (!h || !h->info.tx)
return 0;
@ -176,16 +176,44 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
break;
}
if (h->policy->u.mqtt.aws_iot)
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
else
topic_limit = LWS_MQTT_MAX_TOPICLEN;
if (h->policy->u.mqtt.subscribe &&
!wsi->mqtt->done_subscribe) {
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, sizeof(expbuf));
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
NULL, topic_limit);
/*
* Expand with no output first to calculate the size of
* expanded string then, allocate new buffer and expand
* again with the buffer
*/
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT subscribe"
" topic with no output\n", __func__);
return 1;
}
expbuf = lws_malloc(used_out + 1, __func__);
if (!expbuf) {
lwsl_err("%s, failed to allocate MQTT subscribe"
"topic", __func__);
return 1;
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
expbuf, used_out);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, faled to expand MQTT subscribe topic\n",
lwsl_err("%s, failed to expand MQTT subscribe topic\n",
__func__);
lws_free(expbuf);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
@ -207,8 +235,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
lwsl_notice("%s: unable to subscribe", __func__);
lws_free(expbuf);
h->u.mqtt.sub_top.name = NULL;
return -1;
}
lws_free(expbuf);
h->u.mqtt.sub_top.name = NULL;
/* Expect a SUBACK */
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
@ -231,12 +263,32 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
memset(&mqpp, 0, sizeof(mqpp));
/* this is the string-substituted h->policy->u.mqtt.topic */
mqpp.topic = (char *)h->u.mqtt.topic_qos.name;
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, sizeof(expbuf));
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
topic_limit);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
strlen(h->policy->u.mqtt.topic),
&used_in, &used_out) != LSTRX_DONE)
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT publish"
" topic with no output\n", __func__);
return 1;
}
expbuf = lws_malloc(used_out + 1, __func__);
if (!expbuf) {
lwsl_err("%s, failed to allocate MQTT publish topic",
__func__);
return 1;
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
used_out);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
strlen(h->policy->u.mqtt.topic), &used_in,
&used_out) != LSTRX_DONE) {
lws_free(expbuf);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
mqpp.topic = (char *)expbuf;
@ -258,9 +310,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
(uint32_t)buflen,
f & LWSSS_FLAG_EOM)) {
lwsl_notice("%s: failed to publish\n", __func__);
lws_free(expbuf);
return -1;
}
lws_free(expbuf);
return 0;
}
@ -376,6 +430,7 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
ct->ccp.clean_start = h->policy->u.mqtt.clean_start;
ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
/*