diff --git a/include/libwebsockets/lws-mqtt.h b/include/libwebsockets/lws-mqtt.h index 22865801e..b228315a1 100644 --- a/include/libwebsockets/lws-mqtt.h +++ b/include/libwebsockets/lws-mqtt.h @@ -114,6 +114,7 @@ typedef struct lws_mqtt_publish_param_s { 0 */ uint8_t dup:1; /* Retried PUBLISH, for QoS > 0 */ + uint8_t retain:1; /* Retained message */ } lws_mqtt_publish_param_t; typedef struct topic_elem { diff --git a/include/libwebsockets/lws-secure-streams-policy.h b/include/libwebsockets/lws-secure-streams-policy.h index 863140d74..2c81f9fd8 100644 --- a/include/libwebsockets/lws-secure-streams-policy.h +++ b/include/libwebsockets/lws-secure-streams-policy.h @@ -323,6 +323,7 @@ typedef struct lws_ss_policy { uint8_t birth_qos; uint8_t birth_retain; uint8_t aws_iot; + uint8_t retain; } mqtt; diff --git a/lib/roles/mqtt/mqtt.c b/lib/roles/mqtt/mqtt.c index 67d64dbc0..4ba49c74f 100644 --- a/lib/roles/mqtt/mqtt.c +++ b/lib/roles/mqtt/mqtt.c @@ -1982,7 +1982,7 @@ lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub, * payload (if any) */ if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH, - 0, pub->qos, 0)) { + pub->dup, pub->qos, pub->retain)) { lwsl_err("%s: Failed to fill fixed header\n", __func__); return 1; } diff --git a/lib/secure-streams/README.md b/lib/secure-streams/README.md index b7a9c3d50..5d707aed6 100644 --- a/lib/secure-streams/README.md +++ b/lib/secure-streams/README.md @@ -661,6 +661,10 @@ Set the topic this streamtype subscribes to Set the QOS level for this streamtype +### `mqtt_retain` + +Set to true if this streamtype should use MQTT's "retain" feature. + ### `mqtt_keep_alive` 16-bit number representing MQTT keep alive for the stream. diff --git a/lib/secure-streams/policy-json.c b/lib/secure-streams/policy-json.c index 7e5656f67..e230ce5eb 100644 --- a/lib/secure-streams/policy-json.c +++ b/lib/secure-streams/policy-json.c @@ -111,6 +111,7 @@ static const char * const lejp_tokens_policy[] = { "s[].*.mqtt_topic", "s[].*.mqtt_subscribe", "s[].*.mqtt_qos", + "s[].*.mqtt_retain", "s[].*.mqtt_keep_alive", "s[].*.mqtt_clean_start", "s[].*.mqtt_will_topic", @@ -218,6 +219,7 @@ typedef enum { LSSPPT_MQTT_TOPIC, LSSPPT_MQTT_SUBSCRIBE, LSSPPT_MQTT_QOS, + LSSPPT_MQTT_RETAIN, LSSPPT_MQTT_KEEPALIVE, LSSPPT_MQTT_CLEAN_START, LSSPPT_MQTT_WILL_TOPIC, @@ -1020,6 +1022,11 @@ lws_ss_policy_parser_cb(struct lejp_ctx *ctx, char reason) a->curr[LTY_POLICY].p->u.mqtt.qos = (uint8_t)atoi(ctx->buf); break; + case LSSPPT_MQTT_RETAIN: + a->curr[LTY_POLICY].p->u.mqtt.retain = + reason == LEJPCB_VAL_TRUE; + break; + case LSSPPT_MQTT_KEEPALIVE: a->curr[LTY_POLICY].p->u.mqtt.keep_alive = (uint16_t)atoi(ctx->buf); break; diff --git a/lib/secure-streams/protocols/ss-mqtt.c b/lib/secure-streams/protocols/ss-mqtt.c index 9d58cf887..11489d250 100644 --- a/lib/secure-streams/protocols/ss-mqtt.c +++ b/lib/secure-streams/protocols/ss-mqtt.c @@ -143,7 +143,7 @@ secstream_mqtt_subscribe(struct lws *wsi) static int secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, const char* topic, - lws_mqtt_qos_levels_t qos, int f) + lws_mqtt_qos_levels_t qos, uint8_t retain, int f) { lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi); size_t used_in, used_out, topic_limit; @@ -188,6 +188,7 @@ secstream_mqtt_publish(struct lws *wsi, uint8_t *buf, size_t buflen, mqpp.topic_len = (uint16_t)strlen(mqpp.topic); mqpp.packet_id = (uint16_t)(h->txord - 1); mqpp.qos = qos; + mqpp.retain = !!retain; mqpp.payload = buf; if (h->writeable_len) mqpp.payload_len = (uint32_t)h->writeable_len; @@ -407,7 +408,9 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, wsi->mqtt->inside_birth = 1; return secstream_mqtt_publish(wsi, buf + LWS_PRE, used_out, h->policy->u.mqtt.birth_topic, - h->policy->u.mqtt.birth_qos, LWSSS_FLAG_EOM); + h->policy->u.mqtt.birth_qos, + h->policy->u.mqtt.birth_retain, + LWSSS_FLAG_EOM); } r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE, &buflen, &f); @@ -435,7 +438,8 @@ secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, return secstream_mqtt_publish(wsi, buf + LWS_PRE, buflen, h->policy->u.mqtt.topic, - h->policy->u.mqtt.qos, f); + h->policy->u.mqtt.qos, + h->policy->u.mqtt.retain, f); } case LWS_CALLBACK_MQTT_UNSUBSCRIBED: diff --git a/minimal-examples/secure-streams/minimal-secure-streams-policy2c/minimal-secure-streams.c b/minimal-examples/secure-streams/minimal-secure-streams-policy2c/minimal-secure-streams.c index 6f777656d..a89d0c8d6 100644 --- a/minimal-examples/secure-streams/minimal-secure-streams-policy2c/minimal-secure-streams.c +++ b/minimal-examples/secure-streams/minimal-secure-streams-policy2c/minimal-secure-streams.c @@ -598,6 +598,9 @@ int main(int argc, const char **argv) if (pol->u.mqtt.aws_iot) printf("\t\t\t.aws_iot = %u,\n", pol->u.mqtt.aws_iot); + if (pol->u.mqtt.retain) + printf("\t\t\t.retain = %u,\n", + pol->u.mqtt.retain); printf("\t\t}\n\t},\n");