1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

ss: mqtt: add support for retained message

This commit is contained in:
Chunho Lee 2022-02-05 15:10:44 -08:00 committed by Andy Green
parent 7e3125b818
commit a80fbeb820
7 changed files with 24 additions and 4 deletions

View file

@ -114,6 +114,7 @@ typedef struct lws_mqtt_publish_param_s {
0 */
uint8_t dup:1; /* Retried PUBLISH,
for QoS > 0 */
uint8_t retain:1; /* Retained message */
} lws_mqtt_publish_param_t;
typedef struct topic_elem {

View file

@ -323,6 +323,7 @@ typedef struct lws_ss_policy {
uint8_t birth_qos;
uint8_t birth_retain;
uint8_t aws_iot;
uint8_t retain;
} mqtt;

View file

@ -1982,7 +1982,7 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
* payload (if any)
*/
if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
0, pub->qos, 0)) {
pub->dup, pub->qos, pub->retain)) {
lwsl_err("%s: Failed to fill fixed header\n", __func__);
return 1;
}

View file

@ -661,6 +661,10 @@ Set the topic this streamtype subscribes to
Set the QOS level for this streamtype
### `mqtt_retain`
Set to true if this streamtype should use MQTT's "retain" feature.
### `mqtt_keep_alive`
16-bit number representing MQTT keep alive for the stream.

View file

@ -111,6 +111,7 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.mqtt_topic",
"s[].*.mqtt_subscribe",
"s[].*.mqtt_qos",
"s[].*.mqtt_retain",
"s[].*.mqtt_keep_alive",
"s[].*.mqtt_clean_start",
"s[].*.mqtt_will_topic",
@ -218,6 +219,7 @@ typedef enum {
LSSPPT_MQTT_TOPIC,
LSSPPT_MQTT_SUBSCRIBE,
LSSPPT_MQTT_QOS,
LSSPPT_MQTT_RETAIN,
LSSPPT_MQTT_KEEPALIVE,
LSSPPT_MQTT_CLEAN_START,
LSSPPT_MQTT_WILL_TOPIC,
@ -1020,6 +1022,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->u.mqtt.qos = (uint8_t)atoi(ctx->buf);
break;
case LSSPPT_MQTT_RETAIN:
a->curr[LTY_POLICY].p->u.mqtt.retain =
reason == LEJPCB_VAL_TRUE;
break;
case LSSPPT_MQTT_KEEPALIVE:
a->curr[LTY_POLICY].p->u.mqtt.keep_alive = (uint16_t)atoi(ctx->buf);
break;

View file

@ -143,7 +143,7 @@ secstream_mqtt_subscribe(struct lws *wsi)
static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
const char* topic,
lws_mqtt_qos_levels_t qos, int f)
lws_mqtt_qos_levels_t qos, uint8_t retain, int f)
{
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
size_t used_in, used_out, topic_limit;
@ -188,6 +188,7 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
mqpp.packet_id = (uint16_t)(h->txord - 1);
mqpp.qos = qos;
mqpp.retain = !!retain;
mqpp.payload = buf;
if (h->writeable_len)
mqpp.payload_len = (uint32_t)h->writeable_len;
@ -407,7 +408,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf + LWS_PRE,
used_out, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM);
h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
}
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f);
@ -435,7 +438,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos, f);
h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f);
}
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:

View file

@ -598,6 +598,9 @@ int main(int argc, const char **argv)
if (pol->u.mqtt.aws_iot)
printf("\t\t\t.aws_iot = %u,\n",
pol->u.mqtt.aws_iot);
if (pol->u.mqtt.retain)
printf("\t\t\t.retain = %u,\n",
pol->u.mqtt.retain);
printf("\t\t}\n\t},\n");