1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss-mqtt: Add support for Birth message

This provides Birth message on SS policy. The Birth message is
a message published just after the MQTT connection has been
established.
This commit is contained in:
Chunho Lee 2021-10-06 20:09:40 -07:00 committed by Andy Green
parent fa50cf23b0
commit f440a67ec8
6 changed files with 164 additions and 86 deletions

View file

@ -81,6 +81,13 @@ typedef struct lws_mqtt_client_connect_param_s {
uint8_t retain;
} will_param; /* MQTT LWT
parameters */
struct {
const char *topic;
const char *message;
lws_mqtt_qos_levels_t qos;
uint8_t retain;
} birth_param; /* MQTT Birth
parameters */
const char *username;
const char *password;
uint8_t aws_iot;

View file

@ -312,11 +312,16 @@ typedef struct lws_ss_policy {
const char *will_topic;
const char *will_message;
const char *birth_topic;
const char *birth_message;
uint16_t keep_alive;
uint8_t qos;
uint8_t clean_start;
uint8_t will_qos;
uint8_t will_retain;
uint8_t birth_qos;
uint8_t birth_retain;
uint8_t aws_iot;
} mqtt;

View file

@ -2038,17 +2038,12 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
return 1;
}
}
/*
* A non-empty Payload is expected and a chunk
* is present
*/
if (pub->payload_len && len) {
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
memcpy(p, buf, len);
if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
return 1;
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
}
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
memcpy(p, buf, len);
if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
return 1;
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
if (!is_complete)
nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;

View file

@ -371,6 +371,7 @@ struct _lws_mqtt_related {
uint8_t inside_payload:1;
uint8_t inside_subscribe:1;
uint8_t inside_unsubscribe:1;
uint8_t inside_birth:1;
uint8_t inside_resume_session:1;
uint8_t send_puback:1;
uint8_t send_pubrel:1;
@ -380,6 +381,7 @@ struct _lws_mqtt_related {
uint8_t unacked_pubrel:1;
uint8_t done_subscribe:1;
uint8_t done_birth:1;
};
/*

View file

@ -117,6 +117,10 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.mqtt_will_message",
"s[].*.mqtt_will_qos",
"s[].*.mqtt_will_retain",
"s[].*.mqtt_birth_topic",
"s[].*.mqtt_birth_message",
"s[].*.mqtt_birth_qos",
"s[].*.mqtt_birth_retain",
"s[].*.aws_iot",
"s[].*.swake_validity",
"s[].*.use_auth",
@ -220,6 +224,10 @@ typedef enum {
LSSPPT_MQTT_WILL_MESSAGE,
LSSPPT_MQTT_WILL_QOS,
LSSPPT_MQTT_WILL_RETAIN,
LSSPPT_MQTT_BIRTH_TOPIC,
LSSPPT_MQTT_BIRTH_MESSAGE,
LSSPPT_MQTT_BIRTH_QOS,
LSSPPT_MQTT_BIRTH_RETAIN,
LSSPPT_MQTT_AWS_IOT,
LSSPPT_SWAKE_VALIDITY,
LSSPPT_USE_AUTH,
@ -1035,6 +1043,21 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->u.mqtt.will_retain =
reason == LEJPCB_VAL_TRUE;
break;
case LSSPPT_MQTT_BIRTH_TOPIC:
pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_topic;
goto string2;
case LSSPPT_MQTT_BIRTH_MESSAGE:
pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_message;
goto string2;
case LSSPPT_MQTT_BIRTH_QOS:
a->curr[LTY_POLICY].p->u.mqtt.birth_qos = (uint8_t)atoi(ctx->buf);
break;
case LSSPPT_MQTT_BIRTH_RETAIN:
a->curr[LTY_POLICY].p->u.mqtt.birth_retain =
reason == LEJPCB_VAL_TRUE;
break;
case LSSPPT_MQTT_AWS_IOT:
if (reason == LEJPCB_VAL_TRUE)
a->curr[LTY_POLICY].p->u.mqtt.aws_iot =

View file

@ -140,15 +140,85 @@ secstream_mqtt_subscribe(struct lws *wsi)
return 0;
}
static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
const char* topic,
lws_mqtt_qos_levels_t qos, int f)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
size_t used_in, used_out, topic_limit;
lws_strexp_t exp;
char *expbuf;
lws_mqtt_publish_param_t mqpp;
if (h->policy->u.mqtt.aws_iot)
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
else
topic_limit = LWS_MQTT_MAX_TOPICLEN;
memset(&mqpp, 0, sizeof(mqpp));
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
topic_limit);
if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
&used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT publish"
" topic with no output\n", __func__);
return 1;
}
expbuf = lws_malloc(used_out + 1, __func__);
if (!expbuf) {
lwsl_err("%s, failed to allocate MQTT publish topic",
__func__);
return 1;
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
used_out + 1);
if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
&used_out) != LSTRX_DONE) {
lws_free(expbuf);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
mqpp.topic = (char *)expbuf;
mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
mqpp.packet_id = (uint16_t)(h->txord - 1);
mqpp.payload = buf;
if (h->writeable_len)
mqpp.payload_len = (uint32_t)h->writeable_len;
else
mqpp.payload_len = (uint32_t)buflen;
lwsl_notice("%s: payload len %d\n", __func__,
(int)mqpp.payload_len);
mqpp.qos = h->policy->u.mqtt.qos;
if (lws_mqtt_client_send_publish(wsi, &mqpp,
(const char *)buf,
(uint32_t)buflen,
f & LWSSS_FLAG_EOM)) {
lwsl_notice("%s: failed to publish\n", __func__);
lws_free(expbuf);
return -1;
}
lws_free(expbuf);
return 0;
}
static int
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
void *in, size_t len)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
lws_mqtt_publish_param_t mqpp, *pmqpp;
lws_mqtt_publish_param_t *pmqpp;
uint8_t buf[LWS_PRE + 1400];
lws_ss_state_return_t r;
size_t buflen;
size_t buflen = sizeof(buf) - LWS_PRE;
int f = 0;
switch (reason) {
@ -288,8 +358,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lws_sul_cancel(&h->sul);
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r,
wsi, &h);
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
}
wsi->mqtt->done_subscribe = 1;
lws_callback_on_writable(wsi);
@ -297,6 +366,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_ACK:
lws_sul_cancel(&h->sul_timeout);
if (wsi->mqtt->inside_birth) {
/*
* Skip LWSSSCS_QOS_ACK_REMOTE for birth topic.
*/
wsi->mqtt->inside_birth = 0;
wsi->mqtt->done_birth = 1;
break;
}
r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
if (r != LWSSSSRET_OK)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
@ -304,10 +381,6 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
{
size_t used_in, used_out, topic_limit;
lws_strexp_t exp;
char *expbuf;
if (!h || !h->info.tx)
return 0;
lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
@ -316,17 +389,29 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
break;
}
if (h->policy->u.mqtt.aws_iot)
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
else
topic_limit = LWS_MQTT_MAX_TOPICLEN;
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
return secstream_mqtt_subscribe(wsi);
buflen = sizeof(buf) - LWS_PRE;
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) {
lws_strexp_t exp;
size_t used_in, used_out = 0;
if (h->policy->u.mqtt.birth_message) {
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
(char *)(buf + LWS_PRE), buflen);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
strlen(h->policy->u.mqtt.birth_message),
&used_in, &used_out) != LSTRX_DONE) {
return 1;
}
}
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf + LWS_PRE,
used_out, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
}
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
&buflen, &f);
if (r == LWSSSSRET_TX_DONT_SEND)
return 0;
@ -349,62 +434,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
if (r < 0)
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
memset(&mqpp, 0, sizeof(mqpp));
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
topic_limit);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
strlen(h->policy->u.mqtt.topic),
&used_in, &used_out) != LSTRX_DONE) {
lwsl_err("%s, failed to expand MQTT publish"
" topic with no output\n", __func__);
return 1;
}
expbuf = lws_malloc(used_out + 1, __func__);
if (!expbuf) {
lwsl_err("%s, failed to allocate MQTT publish topic",
__func__);
return 1;
}
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
used_out + 1);
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
strlen(h->policy->u.mqtt.topic), &used_in,
&used_out) != LSTRX_DONE) {
lws_free(expbuf);
return 1;
}
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
mqpp.topic = (char *)expbuf;
mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
mqpp.packet_id = (uint16_t)(h->txord - 1);
mqpp.payload = buf + LWS_PRE;
if (h->writeable_len)
mqpp.payload_len = (uint32_t)h->writeable_len;
else
mqpp.payload_len = (uint32_t)buflen;
lwsl_notice("%s: payload len %d\n", __func__,
(int)mqpp.payload_len);
mqpp.qos = h->policy->u.mqtt.qos;
if (lws_mqtt_client_send_publish(wsi, &mqpp,
(const char *)buf + LWS_PRE,
(uint32_t)buflen,
f & LWSSS_FLAG_EOM)) {
lwsl_notice("%s: failed to publish\n", __func__);
lws_free(expbuf);
return -1;
}
lws_free(expbuf);
return 0;
return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos, f);
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
@ -450,7 +482,9 @@ enum {
SSCMM_STRSUB_WILL_TOPIC,
SSCMM_STRSUB_WILL_MESSAGE,
SSCMM_STRSUB_SUBSCRIBE,
SSCMM_STRSUB_TOPIC
SSCMM_STRSUB_TOPIC,
SSCMM_STRSUB_BIRTH_TOPIC,
SSCMM_STRSUB_BIRTH_MESSAGE
};
static int
@ -458,16 +492,18 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
struct lws_client_connect_info *i,
union lws_ss_contemp *ct)
{
const char *sources[4] = {
const char *sources[6] = {
/* we're going to string-substitute these before use */
h->policy->u.mqtt.will_topic,
h->policy->u.mqtt.will_message,
h->policy->u.mqtt.subscribe,
h->policy->u.mqtt.topic
h->policy->u.mqtt.topic,
h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_message
};
size_t used_in, olen[4] = { 0, 0, 0, 0 }, tot = 0;
size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
lws_strexp_t exp;
char *ps[4];
char *ps[6];
uint8_t *p = NULL;
int n = -1;
size_t blen;
@ -485,6 +521,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
return -1;
}
p = (uint8_t *)lws_zalloc(blen+1, __func__);
if (!p)
return -1;
n = lws_system_blob_get(b, p, &blen, 0);
if (n) {
ct->ccp.client_id = NULL;
@ -504,6 +542,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
/* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
if (b && (blen = lws_system_blob_get_size(b))) {
p = (uint8_t *)lws_zalloc(blen+1, __func__);
if (!p)
return -1;
n = lws_system_blob_get(b, p, &blen, 0);
if (n) {
ct->ccp.username = NULL;
@ -520,6 +560,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
/* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
if (b && (blen = lws_system_blob_get_size(b))) {
p = (uint8_t *)lws_zalloc(blen+1, __func__);
if (!p)
return -1;
n = lws_system_blob_get(b, p, &blen, 0);
if (n) {
ct->ccp.password = NULL;
@ -534,6 +576,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u);
ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos;
ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain;
ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
@ -608,6 +652,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE];
h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE];
h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC];
ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC];
ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE];
i->method = "MQTT";
i->mqtt_cp = &ct->ccp;