1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

mqtt: wildcard topic and topic to 256 chars

Adding supports to MQTT wildcard support, topic to 256 chars,
incorrect topic validation.
This commit is contained in:
Sakthi Kannan 2021-03-22 12:38:08 -07:00 committed by Andy Green
parent a088b72696
commit f3531ef673
4 changed files with 171 additions and 11 deletions

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - protocol - mqtt
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@ -33,6 +33,7 @@ typedef struct lws_mqtt_str_st lws_mqtt_str_t;
#define LWS_MQTT_FINAL_PART 1
#define LWS_MQTT_MAX_TOPICLEN 256
#define LWS_MQTT_MAX_CIDLEN 128
#define LWS_MQTT_RANDOM_CIDLEN 23 /* 3.1.3.1-5: Server MUST... between
1 and 23 chars... */

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -277,28 +277,163 @@ lws_mqtt_set_client_established(struct lws *wsi)
return 0;
}
lws_mqtt_subs_t *
lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic)
static lws_mqtt_match_topic_return_t
lws_mqtt_is_topic_matched(const char* sub, const char* pub)
{
const char *ppos = pub, *spos = sub;
if (!ppos || !spos) {
return LMMTR_TOPIC_MATCH_ERROR;
}
while (*spos) {
if (*ppos == '#' || *ppos == '+') {
lwsl_err("%s: PUBLISH to wildcard "
"topic \"%s\" not supported\n",
__func__, pub);
return LMMTR_TOPIC_MATCH_ERROR;
}
/* foo/+/bar == foo/xyz/bar ? */
if (*spos == '+') {
/* Skip ahead */
while (*ppos != '\0' && *ppos != '/') {
ppos++;
}
} else if (*spos == '#') {
return LMMTR_TOPIC_MATCH;
} else {
if (*ppos == '\0') {
/* foo/bar == foo/bar/# ? */
if (!strncmp(spos, "/#", 2))
return LMMTR_TOPIC_MATCH;
return LMMTR_TOPIC_NOMATCH;
/* Non-matching character */
} else if (*ppos != *spos) {
return LMMTR_TOPIC_NOMATCH;
}
ppos++;
}
spos++;
}
if (*spos == '\0' && *ppos == '\0')
return LMMTR_TOPIC_MATCH;
return LMMTR_TOPIC_NOMATCH;
}
lws_mqtt_subs_t* lws_mqtt_find_sub(struct _lws_mqtt_related* mqtt,
const char* ptopic) {
lws_mqtt_subs_t *s = mqtt->subs_head;
while (s) {
if (!strcmp((const char *)s->topic, topic))
return s;
/* SUB topic == PUB topic ? */
/* foo/bar/xyz == foo/bar/xyz ? */
if (!s->wildcard) {
if (!strcmp((const char*)s->topic, ptopic))
return s;
} else {
if (lws_mqtt_is_topic_matched(
s->topic, ptopic) == LMMTR_TOPIC_MATCH)
return s;
}
s = s->next;
}
return NULL;
}
static lws_mqtt_validate_topic_return_t
lws_mqtt_validate_topic(const char *topic, size_t topiclen)
{
size_t spos = 0;
const char *sub = topic;
int8_t slashes = 0;
lws_mqtt_validate_topic_return_t ret = LMVTR_VALID;
if (topiclen > LWS_MQTT_MAX_TOPICLEN)
return LMVTR_FAILED_OVERSIZE;
if (topic[0] == '$') {
ret = LMVTR_VALID_SHADOW;
slashes = -3;
}
while (*sub != 0) {
if (sub[0] == '+') {
/* topic == "+foo" || "a/+foo" ? */
if (spos > 0 && sub[-1] != '/')
return LMVTR_FAILED_WILDCARD_FORMAT;
/* topic == "foo+" or "foo+/a" ? */
if (sub[1] != 0 && sub[1] != '/')
return LMVTR_FAILED_WILDCARD_FORMAT;
ret = LMVTR_VALID_WILDCARD;
} else if (sub[0] == '#') {
/* topic == "foo#" ? */
if (spos > 0 && sub[-1] != '/')
return LMVTR_FAILED_WILDCARD_FORMAT;
/* topic == "#foo" ? */
if (sub[1] != 0)
return LMVTR_FAILED_WILDCARD_FORMAT;
ret = LMVTR_VALID_WILDCARD;
} else if (sub[0] == '/') {
slashes++;
}
spos++;
sub++;
}
if (slashes < 0 || slashes > 7)
return LMVTR_FAILED_SHADOW_FORMAT;
return ret;
}
static lws_mqtt_subs_t *
lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
{
lws_mqtt_subs_t *mysub;
size_t topiclen = strlen(topic);
lws_mqtt_validate_topic_return_t flag;
mysub = lws_malloc(sizeof(*mysub) + strlen(topic) + 1, "sub");
if (!mysub)
flag = lws_mqtt_validate_topic(topic, topiclen);
switch (flag) {
case LMVTR_FAILED_OVERSIZE:
lwsl_err("%s: Topic is too long\n",
__func__);
return NULL;
case LMVTR_FAILED_SHADOW_FORMAT:
case LMVTR_FAILED_WILDCARD_FORMAT:
lwsl_err("%s: Invalid topic format \"%s\"\n",
__func__, topic);
return NULL;
case LMVTR_VALID:
case LMVTR_VALID_WILDCARD:
case LMVTR_VALID_SHADOW:
mysub = lws_malloc(sizeof(*mysub) + topiclen + 1, "sub");
if (!mysub) {
lwsl_err("%s: Error allocating mysub\n",
__func__);
return NULL;
}
if (flag == LMVTR_VALID_WILDCARD)
mysub->wildcard = 1;
else if (flag == LMVTR_VALID_SHADOW)
mysub->shadow = 1;
break;
default:
lwsl_err("%s: Unknown flag - %d\n",
__func__, flag);
return NULL;
}
mysub->next = mqtt->subs_head;
mqtt->subs_head = mysub;

View file

@ -1,7 +1,7 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
* Copyright (C) 2010 - 2021 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
@ -287,11 +287,32 @@ typedef struct lws_mqtt_parser_st {
} lws_mqtt_parser_t;
typedef enum {
LMVTR_VALID = 0,
LMVTR_VALID_WILDCARD = 1,
LMVTR_VALID_SHADOW = 2,
LMVTR_FAILED_OVERSIZE = -1,
LMVTR_FAILED_WILDCARD_FORMAT = -2,
LMVTR_FAILED_SHADOW_FORMAT = -3,
} lws_mqtt_validate_topic_return_t;
typedef enum {
LMMTR_TOPIC_NOMATCH = 0,
LMMTR_TOPIC_MATCH = 1,
LMMTR_TOPIC_MATCH_ERROR = -1
} lws_mqtt_match_topic_return_t;
typedef struct lws_mqtt_subs {
struct lws_mqtt_subs *next;
uint8_t ref_count; /* number of children referencing */
/* Flags */
uint8_t wildcard:1;
uint8_t shadow:1;
/* subscription name + NUL overallocated here */
char topic[];
} lws_mqtt_subs_t;

View file

@ -166,7 +166,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
{
size_t used_in, used_out;
lws_strexp_t exp;
char expbuf[128];
char expbuf[LWS_MQTT_MAX_TOPICLEN+1];
if (!h || !h->info.tx)
return 0;
@ -183,8 +183,11 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (lws_strexp_expand(&exp, h->policy->u.mqtt.subscribe,
strlen(h->policy->u.mqtt.subscribe),
&used_in, &used_out) != LSTRX_DONE)
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, faled to expand MQTT subscribe topic\n",
__func__);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
h->u.mqtt.sub_top.name = expbuf;