1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-30 00:00:16 +01:00

ss: mqtt: add support for retained message

This commit is contained in:
Chunho Lee 2022-02-05 15:10:44 -08:00 committed by Andy Green
parent 4bf39f55d4
commit 3af7a16531
7 changed files with 24 additions and 4 deletions

View file

@ -114,6 +114,7 @@ typedef struct lws_mqtt_publish_param_s {
0 */ 0 */
uint8_t dup:1; /* Retried PUBLISH, uint8_t dup:1; /* Retried PUBLISH,
for QoS > 0 */ for QoS > 0 */
uint8_t retain:1; /* Retained message */
} lws_mqtt_publish_param_t; } lws_mqtt_publish_param_t;
typedef struct topic_elem { typedef struct topic_elem {

View file

@ -297,6 +297,7 @@ typedef struct lws_ss_policy {
uint8_t birth_qos; uint8_t birth_qos;
uint8_t birth_retain; uint8_t birth_retain;
uint8_t aws_iot; uint8_t aws_iot;
uint8_t retain;
} mqtt; } mqtt;

View file

@ -1984,7 +1984,7 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
* payload (if any) * payload (if any)
*/ */
if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH, if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
0, pub->qos, 0)) { pub->dup, pub->qos, pub->retain)) {
lwsl_err("%s: Failed to fill fixed header\n", __func__); lwsl_err("%s: Failed to fill fixed header\n", __func__);
return 1; return 1;
} }

View file

@ -661,6 +661,10 @@ Set the topic this streamtype subscribes to
Set the QOS level for this streamtype Set the QOS level for this streamtype
### `mqtt_retain`
Set to true if this streamtype should use MQTT's "retain" feature.
### `mqtt_keep_alive` ### `mqtt_keep_alive`
16-bit number representing MQTT keep alive for the stream. 16-bit number representing MQTT keep alive for the stream.

View file

@ -111,6 +111,7 @@ static const char * const lejp_tokens_policy[] = {
"s[].*.mqtt_topic", "s[].*.mqtt_topic",
"s[].*.mqtt_subscribe", "s[].*.mqtt_subscribe",
"s[].*.mqtt_qos", "s[].*.mqtt_qos",
"s[].*.mqtt_retain",
"s[].*.mqtt_keep_alive", "s[].*.mqtt_keep_alive",
"s[].*.mqtt_clean_start", "s[].*.mqtt_clean_start",
"s[].*.mqtt_will_topic", "s[].*.mqtt_will_topic",
@ -218,6 +219,7 @@ typedef enum {
LSSPPT_MQTT_TOPIC, LSSPPT_MQTT_TOPIC,
LSSPPT_MQTT_SUBSCRIBE, LSSPPT_MQTT_SUBSCRIBE,
LSSPPT_MQTT_QOS, LSSPPT_MQTT_QOS,
LSSPPT_MQTT_RETAIN,
LSSPPT_MQTT_KEEPALIVE, LSSPPT_MQTT_KEEPALIVE,
LSSPPT_MQTT_CLEAN_START, LSSPPT_MQTT_CLEAN_START,
LSSPPT_MQTT_WILL_TOPIC, LSSPPT_MQTT_WILL_TOPIC,
@ -997,6 +999,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason)
a->curr[LTY_POLICY].p->u.mqtt.qos = (uint8_t)atoi(ctx->buf); a->curr[LTY_POLICY].p->u.mqtt.qos = (uint8_t)atoi(ctx->buf);
break; break;
case LSSPPT_MQTT_RETAIN:
a->curr[LTY_POLICY].p->u.mqtt.retain =
reason == LEJPCB_VAL_TRUE;
break;
case LSSPPT_MQTT_KEEPALIVE: case LSSPPT_MQTT_KEEPALIVE:
a->curr[LTY_POLICY].p->u.mqtt.keep_alive = (uint16_t)atoi(ctx->buf); a->curr[LTY_POLICY].p->u.mqtt.keep_alive = (uint16_t)atoi(ctx->buf);
break; break;

View file

@ -143,7 +143,7 @@ secstream_mqtt_subscribe(struct lws *wsi)
static int static int
secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
const char* topic, const char* topic,
lws_mqtt_qos_levels_t qos, int f) lws_mqtt_qos_levels_t qos, uint8_t retain, int f)
{ {
lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
size_t used_in, used_out, topic_limit; size_t used_in, used_out, topic_limit;
@ -188,6 +188,7 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen,
mqpp.topic_len = (uint16_t)strlen(mqpp.topic); mqpp.topic_len = (uint16_t)strlen(mqpp.topic);
mqpp.packet_id = (uint16_t)(h->txord - 1); mqpp.packet_id = (uint16_t)(h->txord - 1);
mqpp.qos = qos; mqpp.qos = qos;
mqpp.retain = !!retain;
mqpp.payload = buf; mqpp.payload = buf;
if (h->writeable_len) if (h->writeable_len)
mqpp.payload_len = (uint32_t)h->writeable_len; mqpp.payload_len = (uint32_t)h->writeable_len;
@ -407,7 +408,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
wsi->mqtt->inside_birth = 1; wsi->mqtt->inside_birth = 1;
return secstream_mqtt_publish(wsi, buf + LWS_PRE, return secstream_mqtt_publish(wsi, buf + LWS_PRE,
used_out, h->policy->u.mqtt.birth_topic, used_out, h->policy->u.mqtt.birth_topic,
h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM); h->policy->u.mqtt.birth_qos,
h->policy->u.mqtt.birth_retain,
LWSSS_FLAG_EOM);
} }
r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
&buflen, &f); &buflen, &f);
@ -435,7 +438,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen,
h->policy->u.mqtt.topic, h->policy->u.mqtt.topic,
h->policy->u.mqtt.qos, f); h->policy->u.mqtt.qos,
h->policy->u.mqtt.retain, f);
} }
case LWS_CALLBACK_MQTT_UNSUBSCRIBED: case LWS_CALLBACK_MQTT_UNSUBSCRIBED:

View file

@ -598,6 +598,9 @@ int main(int argc, const char **argv)
if (pol->u.mqtt.aws_iot) if (pol->u.mqtt.aws_iot)
printf("\t\t\t.aws_iot = %u,\n", printf("\t\t\t.aws_iot = %u,\n",
pol->u.mqtt.aws_iot); pol->u.mqtt.aws_iot);
if (pol->u.mqtt.retain)
printf("\t\t\t.retain = %u,\n",
pol->u.mqtt.retain);
printf("\t\t}\n\t},\n"); printf("\t\t}\n\t},\n");