mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-30 00:00:16 +01:00
mqtt: topic validation for different mqtt servers
AWS IoT enforces limits topic level and length. If 'aws_iot' is set on the policy, the topic limits will be enforced for AWS IoT.
This commit is contained in:
parent
f3531ef673
commit
25ae9facc9
7 changed files with 89 additions and 16 deletions
|
@ -33,7 +33,8 @@ typedef struct lws_mqtt_str_st lws_mqtt_str_t;
|
||||||
|
|
||||||
#define LWS_MQTT_FINAL_PART 1
|
#define LWS_MQTT_FINAL_PART 1
|
||||||
|
|
||||||
#define LWS_MQTT_MAX_TOPICLEN 256
|
#define LWS_MQTT_MAX_AWSIOT_TOPICLEN 256
|
||||||
|
#define LWS_MQTT_MAX_TOPICLEN 65535
|
||||||
#define LWS_MQTT_MAX_CIDLEN 128
|
#define LWS_MQTT_MAX_CIDLEN 128
|
||||||
#define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between
|
#define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between
|
||||||
1 and 23 chars... */
|
1 and 23 chars... */
|
||||||
|
@ -77,6 +78,7 @@ typedef struct lws_mqtt_client_connect_param_s {
|
||||||
parameters */
|
parameters */
|
||||||
const char *username;
|
const char *username;
|
||||||
const char *password;
|
const char *password;
|
||||||
|
uint8_t aws_iot;
|
||||||
} lws_mqtt_client_connect_param_t;
|
} lws_mqtt_client_connect_param_t;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -305,6 +305,7 @@ typedef struct lws_ss_policy {
|
||||||
uint8_t clean_start;
|
uint8_t clean_start;
|
||||||
uint8_t will_qos;
|
uint8_t will_qos;
|
||||||
uint8_t will_retain;
|
uint8_t will_retain;
|
||||||
|
uint8_t aws_iot;
|
||||||
|
|
||||||
} mqtt;
|
} mqtt;
|
||||||
|
|
||||||
|
|
|
@ -123,6 +123,7 @@ lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
|
||||||
lws_free((void *)cp->client_id);
|
lws_free((void *)cp->client_id);
|
||||||
|
|
||||||
c->keep_alive_secs = cp->keep_alive;
|
c->keep_alive_secs = cp->keep_alive;
|
||||||
|
c->aws_iot = cp->aws_iot;
|
||||||
|
|
||||||
if (cp->will_param.topic &&
|
if (cp->will_param.topic &&
|
||||||
*cp->will_param.topic) {
|
*cp->will_param.topic) {
|
||||||
|
|
|
@ -346,19 +346,25 @@ lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
|
||||||
}
|
}
|
||||||
|
|
||||||
static lws_mqtt_validate_topic_return_t
|
static lws_mqtt_validate_topic_return_t
|
||||||
lws_mqtt_validate_topic(const char *topic, size_t topiclen)
|
lws_mqtt_validate_topic(const char *topic, size_t topiclen, uint8_t awsiot)
|
||||||
{
|
{
|
||||||
size_t spos = 0;
|
size_t spos = 0;
|
||||||
const char *sub = topic;
|
const char *sub = topic;
|
||||||
int8_t slashes = 0;
|
int8_t slashes = 0;
|
||||||
lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
|
lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
|
||||||
|
|
||||||
if (topiclen > LWS_MQTT_MAX_TOPICLEN)
|
if (awsiot) {
|
||||||
return LMVTR_FAILED_OVERSIZE;
|
if (topiclen > LWS_MQTT_MAX_AWSIOT_TOPICLEN)
|
||||||
|
return LMVTR_FAILED_OVERSIZE;
|
||||||
if (topic[0] == '$') {
|
if (topic[0] == '$') {
|
||||||
ret = LMVTR_VALID_SHADOW;
|
ret = LMVTR_VALID_SHADOW;
|
||||||
slashes = -3;
|
slashes = -3;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (topiclen > LWS_MQTT_MAX_TOPICLEN)
|
||||||
|
return LMVTR_FAILED_OVERSIZE;
|
||||||
|
if (topic[0] == '$')
|
||||||
|
return LMVTR_FAILED_WILDCARD_FORMAT;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (*sub != 0) {
|
while (*sub != 0) {
|
||||||
|
@ -389,7 +395,7 @@ lws_mqtt_validate_topic(const char *topic, size_t topiclen)
|
||||||
sub++;
|
sub++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (slashes < 0 || slashes > 7)
|
if (awsiot && (slashes < 0 || slashes > 7))
|
||||||
return LMVTR_FAILED_SHADOW_FORMAT;
|
return LMVTR_FAILED_SHADOW_FORMAT;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -402,7 +408,7 @@ lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
|
||||||
size_t topiclen = strlen(topic);
|
size_t topiclen = strlen(topic);
|
||||||
lws_mqtt_validate_topic_return_t flag;
|
lws_mqtt_validate_topic_return_t flag;
|
||||||
|
|
||||||
flag = lws_mqtt_validate_topic(topic, topiclen);
|
flag = lws_mqtt_validate_topic(topic, topiclen, mqtt->client.aws_iot);
|
||||||
switch (flag) {
|
switch (flag) {
|
||||||
case LMVTR_FAILED_OVERSIZE:
|
case LMVTR_FAILED_OVERSIZE:
|
||||||
lwsl_err("%s: Topic is too long\n",
|
lwsl_err("%s: Topic is too long\n",
|
||||||
|
|
|
@ -338,6 +338,7 @@ typedef struct lws_mqttc {
|
||||||
} will;
|
} will;
|
||||||
uint16_t keep_alive_secs;
|
uint16_t keep_alive_secs;
|
||||||
uint8_t conn_flags;
|
uint8_t conn_flags;
|
||||||
|
uint8_t aws_iot;
|
||||||
} lws_mqttc_t;
|
} lws_mqttc_t;
|
||||||
|
|
||||||
struct _lws_mqtt_related {
|
struct _lws_mqtt_related {
|
||||||
|
|
|
@ -114,6 +114,7 @@ static const char * const lejp_tokens_policy[] = {
|
||||||
"s[].*.mqtt_will_message",
|
"s[].*.mqtt_will_message",
|
||||||
"s[].*.mqtt_will_qos",
|
"s[].*.mqtt_will_qos",
|
||||||
"s[].*.mqtt_will_retain",
|
"s[].*.mqtt_will_retain",
|
||||||
|
"s[].*.aws_iot",
|
||||||
"s[].*.swake_validity",
|
"s[].*.swake_validity",
|
||||||
"s[].*.use_auth",
|
"s[].*.use_auth",
|
||||||
"s[].*.aws_region",
|
"s[].*.aws_region",
|
||||||
|
@ -212,6 +213,7 @@ typedef enum {
|
||||||
LSSPPT_MQTT_WILL_MESSAGE,
|
LSSPPT_MQTT_WILL_MESSAGE,
|
||||||
LSSPPT_MQTT_WILL_QOS,
|
LSSPPT_MQTT_WILL_QOS,
|
||||||
LSSPPT_MQTT_WILL_RETAIN,
|
LSSPPT_MQTT_WILL_RETAIN,
|
||||||
|
LSSPPT_MQTT_AWS_IOT,
|
||||||
LSSPPT_SWAKE_VALIDITY,
|
LSSPPT_SWAKE_VALIDITY,
|
||||||
LSSPPT_USE_AUTH,
|
LSSPPT_USE_AUTH,
|
||||||
LSSPPT_AWS_REGION,
|
LSSPPT_AWS_REGION,
|
||||||
|
@ -1010,6 +1012,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
|
||||||
a->curr[LTY_POLICY].p->u.mqtt.will_retain =
|
a->curr[LTY_POLICY].p->u.mqtt.will_retain =
|
||||||
reason == LEJPCB_VAL_TRUE;
|
reason == LEJPCB_VAL_TRUE;
|
||||||
break;
|
break;
|
||||||
|
case LSSPPT_MQTT_AWS_IOT:
|
||||||
|
if (reason == LEJPCB_VAL_TRUE)
|
||||||
|
a->curr[LTY_POLICY].p->u.mqtt.aws_iot =
|
||||||
|
reason == LEJPCB_VAL_TRUE;
|
||||||
|
break;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
case LSSPPT_PROTOCOL:
|
case LSSPPT_PROTOCOL:
|
||||||
|
|
|
@ -164,9 +164,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
||||||
|
|
||||||
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
|
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
|
||||||
{
|
{
|
||||||
size_t used_in, used_out;
|
size_t used_in, used_out, topic_limit;
|
||||||
lws_strexp_t exp;
|
lws_strexp_t exp;
|
||||||
char expbuf[LWS_MQTT_MAX_TOPICLEN+1];
|
char *expbuf;
|
||||||
|
|
||||||
if (!h || !h->info.tx)
|
if (!h || !h->info.tx)
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -176,16 +176,44 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
||||||
lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
|
lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (h->policy->u.mqtt.aws_iot)
|
||||||
|
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
|
||||||
|
else
|
||||||
|
topic_limit = LWS_MQTT_MAX_TOPICLEN;
|
||||||
|
|
||||||
if (h->policy->u.mqtt.subscribe &&
|
if (h->policy->u.mqtt.subscribe &&
|
||||||
!wsi->mqtt->done_subscribe) {
|
!wsi->mqtt->done_subscribe) {
|
||||||
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, sizeof(expbuf));
|
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
|
||||||
|
NULL, topic_limit);
|
||||||
|
/*
|
||||||
|
* Expand with no output first to calculate the size of
|
||||||
|
* expanded string then, allocate new buffer and expand
|
||||||
|
* again with the buffer
|
||||||
|
*/
|
||||||
|
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
|
||||||
|
strlen(h->policy->u.mqtt.subscribe),
|
||||||
|
&used_in, &used_out) != LSTRX_DONE) {
|
||||||
|
lwsl_err("%s, failed to expand MQTT subscribe"
|
||||||
|
" topic with no output\n", __func__);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
expbuf = lws_malloc(used_out + 1, __func__);
|
||||||
|
if (!expbuf) {
|
||||||
|
lwsl_err("%s, failed to allocate MQTT subscribe"
|
||||||
|
"topic", __func__);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
|
||||||
|
expbuf, used_out);
|
||||||
|
|
||||||
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
|
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
|
||||||
strlen(h->policy->u.mqtt.subscribe),
|
strlen(h->policy->u.mqtt.subscribe),
|
||||||
&used_in, &used_out) != LSTRX_DONE) {
|
&used_in, &used_out) != LSTRX_DONE) {
|
||||||
lwsl_err("%s, faled to expand MQTT subscribe topic\n",
|
lwsl_err("%s, failed to expand MQTT subscribe topic\n",
|
||||||
__func__);
|
__func__);
|
||||||
|
lws_free(expbuf);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
|
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
|
||||||
|
@ -207,8 +235,12 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
||||||
|
|
||||||
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
|
if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
|
||||||
lwsl_notice("%s: unable to subscribe", __func__);
|
lwsl_notice("%s: unable to subscribe", __func__);
|
||||||
|
lws_free(expbuf);
|
||||||
|
h->u.mqtt.sub_top.name = NULL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
lws_free(expbuf);
|
||||||
|
h->u.mqtt.sub_top.name = NULL;
|
||||||
/* Expect a SUBACK */
|
/* Expect a SUBACK */
|
||||||
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
|
if (lws_change_pollfd(wsi, 0, LWS_POLLIN)) {
|
||||||
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
|
lwsl_err("%s: Unable to set LWS_POLLIN\n", __func__);
|
||||||
|
@ -231,12 +263,32 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
||||||
memset(&mqpp, 0, sizeof(mqpp));
|
memset(&mqpp, 0, sizeof(mqpp));
|
||||||
/* this is the string-substituted h->policy->u.mqtt.topic */
|
/* this is the string-substituted h->policy->u.mqtt.topic */
|
||||||
mqpp.topic = (char *)h->u.mqtt.topic_qos.name;
|
mqpp.topic = (char *)h->u.mqtt.topic_qos.name;
|
||||||
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf, sizeof(expbuf));
|
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
|
||||||
|
topic_limit);
|
||||||
|
|
||||||
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
|
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
|
||||||
strlen(h->policy->u.mqtt.topic),
|
strlen(h->policy->u.mqtt.topic),
|
||||||
&used_in, &used_out) != LSTRX_DONE)
|
&used_in, &used_out) != LSTRX_DONE) {
|
||||||
|
lwsl_err("%s, failed to expand MQTT publish"
|
||||||
|
" topic with no output\n", __func__);
|
||||||
return 1;
|
return 1;
|
||||||
|
}
|
||||||
|
expbuf = lws_malloc(used_out + 1, __func__);
|
||||||
|
if (!expbuf) {
|
||||||
|
lwsl_err("%s, failed to allocate MQTT publish topic",
|
||||||
|
__func__);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
|
||||||
|
used_out);
|
||||||
|
|
||||||
|
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
|
||||||
|
strlen(h->policy->u.mqtt.topic), &used_in,
|
||||||
|
&used_out) != LSTRX_DONE) {
|
||||||
|
lws_free(expbuf);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
|
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
|
||||||
mqpp.topic = (char *)expbuf;
|
mqpp.topic = (char *)expbuf;
|
||||||
|
|
||||||
|
@ -258,9 +310,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
||||||
(uint32_t)buflen,
|
(uint32_t)buflen,
|
||||||
f & LWSSS_FLAG_EOM)) {
|
f & LWSSS_FLAG_EOM)) {
|
||||||
lwsl_notice("%s: failed to publish\n", __func__);
|
lwsl_notice("%s: failed to publish\n", __func__);
|
||||||
|
lws_free(expbuf);
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
lws_free(expbuf);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -376,6 +430,7 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
||||||
ct->ccp.clean_start = h->policy->u.mqtt.clean_start;
|
ct->ccp.clean_start = h->policy->u.mqtt.clean_start;
|
||||||
ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
|
ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
|
||||||
ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
|
ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
|
||||||
|
ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
|
||||||
h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
|
h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Add table
Reference in a new issue