mirror of
https://github.com/warmcat/libwebsockets.git
synced 2025-03-09 00:00:04 +01:00
ss-mqtt: Add support for Birth message
This provides Birth message on SS policy. The Birth message is a message published just after the MQTT connection has been established.
This commit is contained in:
parent
6decd5a7e7
commit
bf5744ab07
6 changed files with 164 additions and 86 deletions
|
@ -81,6 +81,13 @@ typedef struct lws_mqtt_client_connect_param_s {
|
|||
uint8_t retain;
|
||||
} will_param; /* MQTT LWT
|
||||
parameters */
|
||||
struct {
|
||||
const char *topic;
|
||||
const char *message;
|
||||
lws_mqtt_qos_levels_t qos;
|
||||
uint8_t retain;
|
||||
} birth_param; /* MQTT Birth
|
||||
parameters */
|
||||
const char *username;
|
||||
const char *password;
|
||||
uint8_t aws_iot;
|
||||
|
|
|
@ -312,11 +312,16 @@ typedef struct lws_ss_policy {
|
|||
const char *will_topic;
|
||||
const char *will_message;
|
||||
|
||||
const char *birth_topic;
|
||||
const char *birth_message;
|
||||
|
||||
uint16_t keep_alive;
|
||||
uint8_t qos;
|
||||
uint8_t clean_start;
|
||||
uint8_t will_qos;
|
||||
uint8_t will_retain;
|
||||
uint8_t birth_qos;
|
||||
uint8_t birth_retain;
|
||||
uint8_t aws_iot;
|
||||
|
||||
} mqtt;
|
||||
|
|
|
@ -2038,17 +2038,12 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
|
|||
return 1;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* A non-empty Payload is expected and a chunk
|
||||
* is present
|
||||
*/
|
||||
if (pub->payload_len && len) {
|
||||
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
||||
memcpy(p, buf, len);
|
||||
if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
|
||||
return 1;
|
||||
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
||||
}
|
||||
|
||||
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
||||
memcpy(p, buf, len);
|
||||
if (lws_mqtt_str_advance(&mqtt_vh_payload, (int)len))
|
||||
return 1;
|
||||
p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
|
||||
|
||||
if (!is_complete)
|
||||
nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
|
||||
|
|
|
@ -371,6 +371,7 @@ struct _lws_mqtt_related {
|
|||
uint8_t inside_payload:1;
|
||||
uint8_t inside_subscribe:1;
|
||||
uint8_t inside_unsubscribe:1;
|
||||
uint8_t inside_birth:1;
|
||||
uint8_t inside_resume_session:1;
|
||||
uint8_t send_puback:1;
|
||||
uint8_t send_pubrel:1;
|
||||
|
@ -380,6 +381,7 @@ struct _lws_mqtt_related {
|
|||
uint8_t unacked_pubrel:1;
|
||||
|
||||
uint8_t done_subscribe:1;
|
||||
uint8_t done_birth:1;
|
||||
};
|
||||
|
||||
/*
|
||||
|
|
|
@ -117,6 +117,10 @@ static const char * const lejp_tokens_policy[] = {
|
|||
"s[].*.mqtt_will_message",
|
||||
"s[].*.mqtt_will_qos",
|
||||
"s[].*.mqtt_will_retain",
|
||||
"s[].*.mqtt_birth_topic",
|
||||
"s[].*.mqtt_birth_message",
|
||||
"s[].*.mqtt_birth_qos",
|
||||
"s[].*.mqtt_birth_retain",
|
||||
"s[].*.aws_iot",
|
||||
"s[].*.swake_validity",
|
||||
"s[].*.use_auth",
|
||||
|
@ -220,6 +224,10 @@ typedef enum {
|
|||
LSSPPT_MQTT_WILL_MESSAGE,
|
||||
LSSPPT_MQTT_WILL_QOS,
|
||||
LSSPPT_MQTT_WILL_RETAIN,
|
||||
LSSPPT_MQTT_BIRTH_TOPIC,
|
||||
LSSPPT_MQTT_BIRTH_MESSAGE,
|
||||
LSSPPT_MQTT_BIRTH_QOS,
|
||||
LSSPPT_MQTT_BIRTH_RETAIN,
|
||||
LSSPPT_MQTT_AWS_IOT,
|
||||
LSSPPT_SWAKE_VALIDITY,
|
||||
LSSPPT_USE_AUTH,
|
||||
|
@ -1035,6 +1043,21 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
|
|||
a->curr[LTY_POLICY].p->u.mqtt.will_retain =
|
||||
reason == LEJPCB_VAL_TRUE;
|
||||
break;
|
||||
case LSSPPT_MQTT_BIRTH_TOPIC:
|
||||
pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_topic;
|
||||
goto string2;
|
||||
|
||||
case LSSPPT_MQTT_BIRTH_MESSAGE:
|
||||
pp = (char **)&a->curr[LTY_POLICY].p->u.mqtt.birth_message;
|
||||
goto string2;
|
||||
|
||||
case LSSPPT_MQTT_BIRTH_QOS:
|
||||
a->curr[LTY_POLICY].p->u.mqtt.birth_qos = (uint8_t)atoi(ctx->buf);
|
||||
break;
|
||||
case LSSPPT_MQTT_BIRTH_RETAIN:
|
||||
a->curr[LTY_POLICY].p->u.mqtt.birth_retain =
|
||||
reason == LEJPCB_VAL_TRUE;
|
||||
break;
|
||||
case LSSPPT_MQTT_AWS_IOT:
|
||||
if (reason == LEJPCB_VAL_TRUE)
|
||||
a->curr[LTY_POLICY].p->u.mqtt.aws_iot =
|
||||
|
|
|
@ -140,15 +140,85 @@ secstream_mqtt_subscribe(struct lws *wsi)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
|
||||
const char* topic,
|
||||
lws_mqtt_qos_levels_t qos, int f)
|
||||
{
|
||||
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
|
||||
size_t used_in, used_out, topic_limit;
|
||||
lws_strexp_t exp;
|
||||
char *expbuf;
|
||||
lws_mqtt_publish_param_t mqpp;
|
||||
|
||||
if (h->policy->u.mqtt.aws_iot)
|
||||
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
|
||||
else
|
||||
topic_limit = LWS_MQTT_MAX_TOPICLEN;
|
||||
|
||||
memset(&mqpp, 0, sizeof(mqpp));
|
||||
|
||||
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
|
||||
topic_limit);
|
||||
|
||||
if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
|
||||
&used_out) != LSTRX_DONE) {
|
||||
lwsl_err("%s, failed to expand MQTT publish"
|
||||
" topic with no output\n", __func__);
|
||||
return 1;
|
||||
}
|
||||
expbuf = lws_malloc(used_out + 1, __func__);
|
||||
if (!expbuf) {
|
||||
lwsl_err("%s, failed to allocate MQTT publish topic",
|
||||
__func__);
|
||||
return 1;
|
||||
}
|
||||
|
||||
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
|
||||
used_out + 1);
|
||||
|
||||
if (lws_strexp_expand(&exp, topic, strlen(topic), &used_in,
|
||||
&used_out) != LSTRX_DONE) {
|
||||
lws_free(expbuf);
|
||||
return 1;
|
||||
}
|
||||
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
|
||||
mqpp.topic = (char *)expbuf;
|
||||
|
||||
mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
|
||||
mqpp.packet_id = (uint16_t)(h->txord - 1);
|
||||
mqpp.payload = buf;
|
||||
if (h->writeable_len)
|
||||
mqpp.payload_len = (uint32_t)h->writeable_len;
|
||||
else
|
||||
mqpp.payload_len = (uint32_t)buflen;
|
||||
|
||||
lwsl_notice("%s: payload len %d\n", __func__,
|
||||
(int)mqpp.payload_len);
|
||||
|
||||
mqpp.qos = h->policy->u.mqtt.qos;
|
||||
|
||||
if (lws_mqtt_client_send_publish(wsi, &mqpp,
|
||||
(const char *)buf,
|
||||
(uint32_t)buflen,
|
||||
f & LWSSS_FLAG_EOM)) {
|
||||
lwsl_notice("%s: failed to publish\n", __func__);
|
||||
lws_free(expbuf);
|
||||
return -1;
|
||||
}
|
||||
lws_free(expbuf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
||||
void *in, size_t len)
|
||||
{
|
||||
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
|
||||
lws_mqtt_publish_param_t mqpp, *pmqpp;
|
||||
lws_mqtt_publish_param_t *pmqpp;
|
||||
uint8_t buf[LWS_PRE + 1400];
|
||||
lws_ss_state_return_t r;
|
||||
size_t buflen;
|
||||
size_t buflen = sizeof(buf) - LWS_PRE;
|
||||
int f = 0;
|
||||
|
||||
switch (reason) {
|
||||
|
@ -288,8 +358,7 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
|||
lws_sul_cancel(&h->sul);
|
||||
r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
|
||||
if (r != LWSSSSRET_OK)
|
||||
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r,
|
||||
wsi, &h);
|
||||
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
|
||||
}
|
||||
wsi->mqtt->done_subscribe = 1;
|
||||
lws_callback_on_writable(wsi);
|
||||
|
@ -297,6 +366,14 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
|||
|
||||
case LWS_CALLBACK_MQTT_ACK:
|
||||
lws_sul_cancel(&h->sul_timeout);
|
||||
if (wsi->mqtt->inside_birth) {
|
||||
/*
|
||||
* Skip LWSSSCS_QOS_ACK_REMOTE for birth topic.
|
||||
*/
|
||||
wsi->mqtt->inside_birth = 0;
|
||||
wsi->mqtt->done_birth = 1;
|
||||
break;
|
||||
}
|
||||
r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
|
||||
if (r != LWSSSSRET_OK)
|
||||
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
|
||||
|
@ -304,10 +381,6 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
|||
|
||||
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
|
||||
{
|
||||
size_t used_in, used_out, topic_limit;
|
||||
lws_strexp_t exp;
|
||||
char *expbuf;
|
||||
|
||||
if (!h || !h->info.tx)
|
||||
return 0;
|
||||
lwsl_notice("%s: %s: WRITEABLE\n", __func__, lws_ss_tag(h));
|
||||
|
@ -316,17 +389,29 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
|||
lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
|
||||
break;
|
||||
}
|
||||
if (h->policy->u.mqtt.aws_iot)
|
||||
topic_limit = LWS_MQTT_MAX_AWSIOT_TOPICLEN;
|
||||
else
|
||||
topic_limit = LWS_MQTT_MAX_TOPICLEN;
|
||||
|
||||
if (!wsi->mqtt->done_subscribe && h->policy->u.mqtt.subscribe)
|
||||
return secstream_mqtt_subscribe(wsi);
|
||||
|
||||
buflen = sizeof(buf) - LWS_PRE;
|
||||
if (!wsi->mqtt->done_birth && h->policy->u.mqtt.birth_topic) {
|
||||
lws_strexp_t exp;
|
||||
size_t used_in, used_out = 0;
|
||||
if (h->policy->u.mqtt.birth_message) {
|
||||
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata,
|
||||
(char *)(buf + LWS_PRE), buflen);
|
||||
if (lws_strexp_expand(&exp, h->policy->u.mqtt.birth_message,
|
||||
strlen(h->policy->u.mqtt.birth_message),
|
||||
&used_in, &used_out) != LSTRX_DONE) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
wsi->mqtt->inside_birth = 1;
|
||||
return secstream_mqtt_publish(wsi, buf + LWS_PRE,
|
||||
used_out, h->policy->u.mqtt.birth_topic,
|
||||
h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
|
||||
}
|
||||
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
|
||||
&buflen, &f);
|
||||
&buflen, &f);
|
||||
if (r == LWSSSSRET_TX_DONT_SEND)
|
||||
return 0;
|
||||
|
||||
|
@ -349,62 +434,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
|
|||
if (r < 0)
|
||||
return _lws_ss_handle_state_ret_CAN_DESTROY_HANDLE(r, wsi, &h);
|
||||
|
||||
memset(&mqpp, 0, sizeof(mqpp));
|
||||
|
||||
lws_strexp_init(&exp, h, lws_ss_exp_cb_metadata, NULL,
|
||||
topic_limit);
|
||||
|
||||
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
|
||||
strlen(h->policy->u.mqtt.topic),
|
||||
&used_in, &used_out) != LSTRX_DONE) {
|
||||
lwsl_err("%s, failed to expand MQTT publish"
|
||||
" topic with no output\n", __func__);
|
||||
return 1;
|
||||
}
|
||||
expbuf = lws_malloc(used_out + 1, __func__);
|
||||
if (!expbuf) {
|
||||
lwsl_err("%s, failed to allocate MQTT publish topic",
|
||||
__func__);
|
||||
return 1;
|
||||
}
|
||||
|
||||
lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata, expbuf,
|
||||
used_out + 1);
|
||||
|
||||
if (lws_strexp_expand(&exp, h->policy->u.mqtt.topic,
|
||||
strlen(h->policy->u.mqtt.topic), &used_in,
|
||||
&used_out) != LSTRX_DONE) {
|
||||
lws_free(expbuf);
|
||||
return 1;
|
||||
}
|
||||
lwsl_notice("%s, expbuf - %s\n", __func__, expbuf);
|
||||
mqpp.topic = (char *)expbuf;
|
||||
|
||||
mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
|
||||
mqpp.packet_id = (uint16_t)(h->txord - 1);
|
||||
mqpp.payload = buf + LWS_PRE;
|
||||
if (h->writeable_len)
|
||||
mqpp.payload_len = (uint32_t)h->writeable_len;
|
||||
else
|
||||
mqpp.payload_len = (uint32_t)buflen;
|
||||
|
||||
lwsl_notice("%s: payload len %d\n", __func__,
|
||||
(int)mqpp.payload_len);
|
||||
|
||||
mqpp.qos = h->policy->u.mqtt.qos;
|
||||
|
||||
if (lws_mqtt_client_send_publish(wsi, &mqpp,
|
||||
(const char *)buf + LWS_PRE,
|
||||
(uint32_t)buflen,
|
||||
f & LWSSS_FLAG_EOM)) {
|
||||
lwsl_notice("%s: failed to publish\n", __func__);
|
||||
lws_free(expbuf);
|
||||
|
||||
return -1;
|
||||
}
|
||||
lws_free(expbuf);
|
||||
|
||||
return 0;
|
||||
return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
|
||||
h->policy->u.mqtt.topic,
|
||||
h->policy->u.mqtt.qos, f);
|
||||
}
|
||||
|
||||
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
|
||||
|
@ -450,7 +482,9 @@ enum {
|
|||
SSCMM_STRSUB_WILL_TOPIC,
|
||||
SSCMM_STRSUB_WILL_MESSAGE,
|
||||
SSCMM_STRSUB_SUBSCRIBE,
|
||||
SSCMM_STRSUB_TOPIC
|
||||
SSCMM_STRSUB_TOPIC,
|
||||
SSCMM_STRSUB_BIRTH_TOPIC,
|
||||
SSCMM_STRSUB_BIRTH_MESSAGE
|
||||
};
|
||||
|
||||
static int
|
||||
|
@ -458,16 +492,18 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
|||
struct lws_client_connect_info *i,
|
||||
union lws_ss_contemp *ct)
|
||||
{
|
||||
const char *sources[4] = {
|
||||
const char *sources[6] = {
|
||||
/* we're going to string-substitute these before use */
|
||||
h->policy->u.mqtt.will_topic,
|
||||
h->policy->u.mqtt.will_message,
|
||||
h->policy->u.mqtt.subscribe,
|
||||
h->policy->u.mqtt.topic
|
||||
h->policy->u.mqtt.topic,
|
||||
h->policy->u.mqtt.birth_topic,
|
||||
h->policy->u.mqtt.birth_message
|
||||
};
|
||||
size_t used_in, olen[4] = { 0, 0, 0, 0 }, tot = 0;
|
||||
size_t used_in, olen[6] = { 0, 0, 0, 0, 0, 0 }, tot = 0;
|
||||
lws_strexp_t exp;
|
||||
char *ps[4];
|
||||
char *ps[6];
|
||||
uint8_t *p = NULL;
|
||||
int n = -1;
|
||||
size_t blen;
|
||||
|
@ -485,6 +521,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
|||
return -1;
|
||||
}
|
||||
p = (uint8_t *)lws_zalloc(blen+1, __func__);
|
||||
if (!p)
|
||||
return -1;
|
||||
n = lws_system_blob_get(b, p, &blen, 0);
|
||||
if (n) {
|
||||
ct->ccp.client_id = NULL;
|
||||
|
@ -504,6 +542,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
|||
/* If LWS_SYSBLOB_TYPE_MQTT_USERNAME is set */
|
||||
if (b && (blen = lws_system_blob_get_size(b))) {
|
||||
p = (uint8_t *)lws_zalloc(blen+1, __func__);
|
||||
if (!p)
|
||||
return -1;
|
||||
n = lws_system_blob_get(b, p, &blen, 0);
|
||||
if (n) {
|
||||
ct->ccp.username = NULL;
|
||||
|
@ -520,6 +560,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
|||
/* If LWS_SYSBLOB_TYPE_MQTT_PASSWORD is set */
|
||||
if (b && (blen = lws_system_blob_get_size(b))) {
|
||||
p = (uint8_t *)lws_zalloc(blen+1, __func__);
|
||||
if (!p)
|
||||
return -1;
|
||||
n = lws_system_blob_get(b, p, &blen, 0);
|
||||
if (n) {
|
||||
ct->ccp.password = NULL;
|
||||
|
@ -534,6 +576,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
|||
ct->ccp.clean_start = (h->policy->u.mqtt.clean_start & 1u);
|
||||
ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
|
||||
ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
|
||||
ct->ccp.birth_param.qos = h->policy->u.mqtt.birth_qos;
|
||||
ct->ccp.birth_param.retain = h->policy->u.mqtt.birth_retain;
|
||||
ct->ccp.aws_iot = h->policy->u.mqtt.aws_iot;
|
||||
h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
|
||||
|
||||
|
@ -608,6 +652,8 @@ secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
|
|||
h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE];
|
||||
h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE];
|
||||
h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC];
|
||||
ct->ccp.birth_param.topic = ps[SSCMM_STRSUB_BIRTH_TOPIC];
|
||||
ct->ccp.birth_param.message = ps[SSCMM_STRSUB_BIRTH_MESSAGE];
|
||||
|
||||
i->method = "MQTT";
|
||||
i->mqtt_cp = &ct->ccp;
|
||||
|
|
Loading…
Add table
Reference in a new issue