1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

client: MQTT

Adds client support for MQTT QoS0 and QoS1, compatible with AWS IoT

Supports stream binding where independent client connections to the
same endpoint can mux on a single tcp + tls connection with topic
routing managed internally.
This commit is contained in:
Sakthi Kannan 2020-02-25 13:28:25 +00:00 committed by Andy Green
parent d88d41090e
commit 9d099ba7be
38 changed files with 5788 additions and 42 deletions

View file

@ -23,6 +23,7 @@ env:
- LWS_METHOD=nologs CMAKE_ARGS="-DLWS_WITH_NO_LOGS=ON"
- LWS_METHOD=smp CMAKE_ARGS="-DLWS_MAX_SMP=32 -DLWS_WITH_MINIMAL_EXAMPLES=1"
- LWS_METHOD=nows CMAKE_ARGS="-DLWS_ROLE_WS=0"
- LWS_METHOD=mqtt CMAKE_ARGS="-DLWS_ROLE_MQTT=1"
- LWS_METHOD=threadpool CMAKE_ARGS="-DLWS_WITH_THREADPOOL=1 -DLWS_WITH_MINIMAL_EXAMPLES=1"
os:

View file

@ -28,6 +28,7 @@ option(LWS_FOR_GITOHASHI "Enable features recommended for use with gitohashi" OF
option(LWS_WITH_NETWORK "Compile with network-related code" ON)
option(LWS_ROLE_H1 "Compile with support for http/1 (needed for ws)" ON)
option(LWS_ROLE_WS "Compile with support for websockets" ON)
option(LWS_ROLE_MQTT "Build with support for MQTT client" OFF)
option(LWS_ROLE_DBUS "Compile with support for DBUS" OFF)
option(LWS_ROLE_RAW_PROXY "Raw packet proxy" OFF)
option(LWS_ROLE_RAW_FILE "Compile with support for raw files" ON)
@ -241,9 +242,11 @@ if(LWS_WITH_DISTRO_RECOMMENDED)
set(LWS_WITH_STRUCT_SQLITE3 1)
set(LWS_WITH_SPAWN 1)
set(LWS_WITH_FSMOUNT 1)
set(LWS_ROLE_MQTT 1)
endif()
if (NOT LWS_WITH_NETWORK)
set(LWS_ROLE_MQTT 0)
set(LWS_ROLE_H1 0)
set(LWS_ROLE_WS 0)
set(LWS_ROLE_RAW 0)
@ -389,7 +392,7 @@ if (LWS_WITH_MBEDTLS)
include_directories(lib/tls/mbedtls/wrapper/include)
endif()
include_directories(include plugins lib/core lib/core-net lib/event-libs include/abstract lib/tls lib/roles lib/event-libs/libuv lib/event-libs/poll lib/event-libs/libevent lib/event-libs/glib lib/event-libs/libev lib/jose/jwe lib/jose/jws lib/jose lib/misc lib/roles/http lib/roles/http/compression lib/roles/h1 lib/roles/h2 lib/roles/ws lib/roles/cgi lib/roles/dbus lib/roles/raw-proxy lib/abstract lib/system/async-dns)
include_directories(include plugins lib/core lib/core-net lib/event-libs include/abstract lib/tls lib/roles lib/event-libs/libuv lib/event-libs/poll lib/event-libs/libevent lib/event-libs/glib lib/event-libs/libev lib/jose/jwe lib/jose/jws lib/jose lib/misc lib/roles/http lib/roles/http/compression lib/roles/h1 lib/roles/h2 lib/roles/ws lib/roles/cgi lib/roles/dbus lib/roles/raw-proxy lib/abstract lib/system/async-dns lib/roles/mqtt)
if (LWS_PLAT_FREERTOS)
include_directories(lib/plat/freertos lib/plat/freertos/esp32)
@ -1172,7 +1175,17 @@ endif()
if (LWS_WITH_DIR)
list(APPEND SOURCES lib/misc/dir.c)
endif()
if (LWS_ROLE_MQTT AND LWS_WITH_CLIENT)
list(APPEND SOURCES
lib/roles/mqtt/mqtt.c
lib/roles/mqtt/ops-mqtt.c
lib/roles/mqtt/primitives.c
lib/roles/mqtt/client/client-mqtt.c
lib/roles/mqtt/client/client-mqtt-handshake.c
)
endif()
if (LWS_WITH_THREADPOOL AND UNIX AND LWS_HAVE_PTHREAD_H)
list(APPEND SOURCES lib/misc/threadpool/threadpool.c)
endif()
@ -2916,6 +2929,7 @@ message(" LWS_WITH_GLIB = ${LWS_WITH_GLIB}")
message(" LWS_IPV6 = ${LWS_IPV6}")
message(" LWS_UNIX_SOCK = ${LWS_UNIX_SOCK}")
message(" LWS_WITH_HTTP2 = ${LWS_WITH_HTTP2}")
message(" LWS_ROLE_MQTT = ${LWS_ROLE_MQTT}")
message(" LWS_SSL_SERVER_WITH_ECDH_CERT = ${LWS_SSL_SERVER_WITH_ECDH_CERT}")
message(" LWS_MAX_SMP = ${LWS_MAX_SMP}")
message(" LWS_HAVE_PTHREAD_H = ${LWS_HAVE_PTHREAD_H}")

View file

@ -16,6 +16,11 @@ various scenarios, CC0-licensed (public domain) for cut-and-paste, allow you to
News
----
## mqtt client support
If you enable `-DLWS_ROLE_MQTT=1`, lws can now support QoS0 and QoS1 MQTT client
connections. See the examples at ./minimal-examples/mqtt-client
## libglib native event loop support
glib's event loop joins libuv, libevent and libev support in lws for both the

View file

@ -94,6 +94,7 @@
#cmakedefine LWS_ROLE_RAW_FILE
#cmakedefine LWS_ROLE_RAW_PROXY
#cmakedefine LWS_ROLE_WS
#cmakedefine LWS_ROLE_MQTT
#cmakedefine LWS_SHA1_USE_OPENSSL_NAME
#cmakedefine LWS_SSL_CLIENT_USE_OS_CA_CERTS
#cmakedefine LWS_SSL_SERVER_WITH_ECDH_CERT

View file

@ -548,6 +548,9 @@ struct lws;
#include <libwebsockets/lws-protocols-plugins.h>
#include <libwebsockets/lws-plugin-generic-sessions.h>
#include <libwebsockets/lws-context-vhost.h>
#if defined(LWS_ROLE_MQTT)
#include <libwebsockets/lws-mqtt.h>
#endif
#include <libwebsockets/lws-client.h>
#include <libwebsockets/lws-http.h>
#include <libwebsockets/lws-spa.h>

View file

@ -827,6 +827,30 @@ enum lws_callback_reasons {
* and failure. in points to optional JSON, and len represents the
* connection state using enum lws_cert_update_state */
/* ---------------------------------------------------------------------
* ----- Callbacks related to MQTT Client -----
*/
LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED = 200,
LWS_CALLBACK_MQTT_IDLE = 201,
LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED = 202,
LWS_CALLBACK_MQTT_SUBSCRIBED = 203,
LWS_CALLBACK_MQTT_CLIENT_WRITEABLE = 204,
LWS_CALLBACK_MQTT_CLIENT_RX = 205,
LWS_CALLBACK_MQTT_UNSUBSCRIBED = 206,
LWS_CALLBACK_MQTT_DROP_PROTOCOL = 207,
LWS_CALLBACK_MQTT_CLIENT_CLOSED = 208,
LWS_CALLBACK_MQTT_ACK = 209,
/**< When a message is fully sent, if QoS0 this callback is generated
* to locally "acknowledge" it. For QoS1, this callback is only
* generated when the matching PUBACK is received. Return nonzero to
* close the wsi.
*/
LWS_CALLBACK_MQTT_RESEND = 210,
/**< In QoS1, this callback is generated instead of the _ACK one if
* we timed out waiting for a PUBACK and we must resend the message.
* Return nonzero to close the wsi.
*/
/****** add new things just above ---^ ******/

View file

@ -157,6 +157,12 @@ struct lws_client_connect_info {
* to the client connection.
*/
#if defined(LWS_ROLE_MQTT)
const lws_mqtt_client_connect_param_t *mqtt_cp;
#else
void *mqtt_cp;
#endif
/* Add new things just above here ---^
* This is part of the ABI, don't needlessly break compatibility
*

View file

@ -0,0 +1,330 @@
/*
* libwebsockets - protocol - mqtt
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation:
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301 USA
*
* included from libwebsockets.h
*/
#ifndef _LWS_MQTT_H
#define _LWS_MQTT_H 1
struct _lws_mqtt_related;
typedef struct _lws_mqtt_related lws_mqtt_related_t;
struct lws_mqtt_str_st;
typedef struct lws_mqtt_str_st lws_mqtt_str_t;
#define MQTT_VER_3_1_1 4
#define LWS_MQTT_FINAL_PART 1
typedef enum {
QOS0,
QOS1,
QOS2, /* not supported */
RESERVED_QOS_LEVEL,
FAILURE_QOS_LEVEL = 0x80
} lws_mqtt_qos_levels_t;
typedef union {
struct {
uint8_t retain:1;
uint8_t qos:2;
uint8_t dup:1;
uint8_t ctrl_pkt_type:4;
} flags;
uint8_t bits;
} lws_mqtt_fixed_hdr_t;
/*
* MQTT connection parameters, passed into struct
* lws_client_connect_info to establish a connection using
* lws_client_connect_via_info().
*/
typedef struct lws_mqtt_client_connect_param_s {
const char *client_id; /* Client ID */
uint16_t keep_alive; /* MQTT keep alive
interval in
seconds */
uint8_t clean_start; /* MQTT clean
session */
struct {
const char *topic;
const char *message;
lws_mqtt_qos_levels_t qos;
uint8_t retain;
} will_param; /* MQTT LWT
parameters */
const char *username;
const char *password;
} lws_mqtt_client_connect_param_t;
/*
* MQTT publish parameters
*/
typedef struct lws_mqtt_publish_param_s {
char *topic; /* Topic Name */
uint16_t topic_len;
const void *payload; /* Publish Payload */
uint32_t payload_len; /* Size of the
complete payload */
uint32_t payload_pos; /* where we are in payload */
lws_mqtt_qos_levels_t qos;
/*--v-Following will be used by LWS-v--*/
uint16_t packet_id; /* Packet ID for QoS >
0 */
uint8_t dup:1; /* Retried PUBLISH,
for QoS > 0 */
} lws_mqtt_publish_param_t;
typedef struct topic_elem {
const char *name; /* Topic Name */
lws_mqtt_qos_levels_t qos; /* Requested QoS */
/*--v-Following will be used by LWS-v--*/
uint8_t acked;
} lws_mqtt_topic_elem_t;
/*
* MQTT publish parameters
*/
typedef struct lws_mqtt_subscribe_param_s {
uint32_t num_topics; /* Number of topics */
lws_mqtt_topic_elem_t *topic; /* Array of topic elements */
/*--v-Following will be used by LWS-v--*/
uint16_t packet_id;
} lws_mqtt_subscribe_param_t;
typedef enum {
LMQCP_RESERVED,
LMQCP_CTOS_CONNECT, /* Connection request */
LMQCP_STOC_CONNACK, /* Connection acknowledgment */
LMQCP_PUBLISH, /* Publish Message */
LMQCP_PUBACK, /* QoS 1: Publish acknowledgment */
LMQCP_PUBREC, /* QoS 2.1: Publish received */
LMQCP_PUBREL, /* QoS 2.2: Publish release */
LMQCP_PUBCOMP, /* QoS 2.3: Publish complete */
LMQCP_CTOS_SUBSCRIBE, /* Subscribe request */
LMQCP_STOC_SUBACK, /* Subscribe acknowledgment */
LMQCP_CTOS_UNSUBSCRIBE, /* Unsubscribe request */
LMQCP_STOC_UNSUBACK, /* Unsubscribe acknowledgment */
LMQCP_CTOS_PINGREQ, /* PING request */
LMQCP_STOC_PINGRESP, /* PONG response */
LMQCP_DISCONNECT, /* Disconnect notification */
LMQCP_AUTH /* Authentication exchange */
} lws_mqtt_control_packet_t;
/* flags from byte 8 of C_TO_S CONNECT */
typedef enum {
LMQCFT_USERNAME = (1 << 7),
LMQCFT_PASSWORD = (1 << 6),
LMQCFT_WILL_RETAIN = (1 << 5),
LMQCFT_WILL_QOS = (1 << 3),
LMQCFT_WILL_FLAG = (1 << 2),
LMQCFT_CLEAN_START = (1 << 1),
LMQCFT_RESERVED = (1 << 0),
LMQCFT_WILL_QOS_MASK = (3 << 3),
} lws_mqtt_connect_flags_t;
/* flags for S_TO_C CONNACK */
typedef enum {
LMQCFT_SESSION_PRESENT = (1 << 0),
} lws_mqtt_connack_flags_t;
typedef enum {
LMQCP_REASON_SUCCESS = 0x00,
LMQCP_REASON_NORMAL_DISCONNECTION = 0x00,
LMQCP_REASON_GRANTED_QOS0 = 0x00,
LMQCP_REASON_GRANTED_QOS1 = 0x01,
LMQCP_REASON_GRANTED_QOS2 = 0x02,
LMQCP_REASON_DISCONNECT_WILL = 0x04,
LMQCP_REASON_NO_MATCHING_SUBSCRIBER = 0x10,
LMQCP_REASON_NO_SUBSCRIPTION_EXISTED = 0x11,
LMQCP_REASON_CONTINUE_AUTHENTICATION = 0x18,
LMQCP_REASON_RE_AUTHENTICATE = 0x19,
LMQCP_REASON_UNSPECIFIED_ERROR = 0x80,
LMQCP_REASON_MALFORMED_PACKET = 0x81,
LMQCP_REASON_PROTOCOL_ERROR = 0x82,
LMQCP_REASON_IMPLEMENTATION_SPECIFIC_ERROR = 0x83,
/* Begin - Error codes for CONNACK */
LMQCP_REASON_UNSUPPORTED_PROTOCOL = 0x84,
LMQCP_REASON_CLIENT_ID_INVALID = 0x85,
LMQCP_REASON_BAD_CREDENTIALS = 0x86,
LMQCP_REASON_NOT_AUTHORIZED = 0x87,
/* End - Error codes for CONNACK */
LMQCP_REASON_SERVER_UNAVAILABLE = 0x88,
LMQCP_REASON_SERVER_BUSY = 0x89,
LMQCP_REASON_BANNED = 0x8a,
LMQCP_REASON_SERVER_SHUTTING_DOWN = 0x8b,
LMQCP_REASON_BAD_AUTHENTICATION_METHOD = 0x8c,
LMQCP_REASON_KEEPALIVE_TIMEOUT = 0x8d,
LMQCP_REASON_SESSION_TAKEN_OVER = 0x8e,
LMQCP_REASON_TOPIC_FILTER_INVALID = 0x8f,
LMQCP_REASON_TOPIC_NAME_INVALID = 0x90,
LMQCP_REASON_PACKET_ID_IN_USE = 0x91,
LMQCP_REASON_PACKET_ID_NOT_FOUND = 0x92,
LMQCP_REASON_MAX_RX_EXCEEDED = 0x93,
LMQCP_REASON_TOPIC_ALIAS_INVALID = 0x94,
LMQCP_REASON_PACKET_TOO_LARGE = 0x95,
LMQCP_REASON_RATELIMIT = 0x96,
LMQCP_REASON_QUOTA_EXCEEDED = 0x97,
LMQCP_REASON_ADMINISTRATIVE_ACTION = 0x98,
LMQCP_REASON_PAYLOAD_FORMAT_INVALID = 0x99,
LMQCP_REASON_RETAIN_NOT_SUPPORTED = 0x9a,
LMQCP_REASON_QOS_NOT_SUPPORTED = 0x9b,
LMQCP_REASON_USE_ANOTHER_SERVER = 0x9c,
LMQCP_REASON_SERVER_MOVED = 0x9d,
LMQCP_REASON_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED = 0x9e,
LMQCP_REASON_CONNECTION_RATE_EXCEEDED = 0x9f,
LMQCP_REASON_MAXIMUM_CONNECT_TIME = 0xa0,
LMQCP_REASON_SUBSCRIPTION_IDS_NOT_SUPPORTED = 0xa1,
LMQCP_REASON_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED = 0xa2,
} lws_mqtt_reason_t;
typedef enum {
LMQPROP_INVALID,
LMQPROP_PAYLOAD_FORMAT_INDICATOR = 0x01,
LMQPROP_MESSAGE_EXPIRY_INTERVAL = 0x02,
LMQPROP_CONTENT_TYPE = 0x03,
LMQPROP_RESPONSE_TOPIC = 0x08,
LMQPROP_CORRELATION_DATA = 0x09,
LMQPROP_SUBSCRIPTION_IDENTIFIER = 0x0b,
LMQPROP_SESSION_EXPIRY_INTERVAL = 0x11,
LMQPROP_ASSIGNED_CLIENT_IDENTIFIER = 0x12,
LMQPROP_SERVER_KEEP_ALIVE = 0x13,
LMQPROP_AUTHENTICATION_METHOD = 0x15,
LMQPROP_AUTHENTICATION_DATA = 0x16,
LMQPROP_REQUEST_PROBLEM_INFORMATION = 0x17,
LMQPROP_WILL_DELAY_INTERVAL = 0x18,
LMQPROP_REQUEST_RESPONSE_INFORMATION = 0x19,
LMQPROP_RESPONSE_INFORMATION = 0x1a,
LMQPROP_SERVER_REFERENCE = 0x1c,
LMQPROP_REASON_STRING = 0x1f,
LMQPROP_RECEIVE_MAXIMUM = 0x21,
LMQPROP_TOPIC_ALIAS_MAXIMUM = 0x22,
LMQPROP_TOPIC_ALIAS = 0x23,
LMQPROP_MAXIMUM_QOS = 0x24,
LMQPROP_RETAIN_AVAILABLE = 0x25,
LMQPROP_USER_PROPERTY = 0x26,
LMQPROP_MAXIMUM_PACKET_SIZE = 0x27,
LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL = 0x28,
LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL = 0x29,
LMQPROP_SHARED_SUBSCRIPTION_AVAIL = 0x2a
} lws_mqtt_property;
int
lws_read_mqtt(struct lws *wsi, unsigned char *buf, lws_filepos_t len);
/* returns 0 if bd1 and bd2 are "the same", that includes empty, else nonzero */
LWS_VISIBLE LWS_EXTERN int
lws_mqtt_bindata_cmp(const lws_mqtt_str_t *bd1, const lws_mqtt_str_t *bd2);
LWS_VISIBLE LWS_EXTERN void
lws_mqtt_str_init(lws_mqtt_str_t *s, uint8_t *buf, uint16_t lim, char nf);
LWS_VISIBLE LWS_EXTERN lws_mqtt_str_t *
lws_mqtt_str_create(uint16_t lim);
LWS_VISIBLE LWS_EXTERN lws_mqtt_str_t *
lws_mqtt_str_create_init(uint8_t *buf, uint16_t len, uint16_t lim);
LWS_VISIBLE LWS_EXTERN lws_mqtt_str_t *
lws_mqtt_str_create_cstr_dup(const char *buf, uint16_t lim);
LWS_VISIBLE LWS_EXTERN uint8_t *
lws_mqtt_str_next(lws_mqtt_str_t *s, uint16_t *budget);
LWS_VISIBLE LWS_EXTERN int
lws_mqtt_str_advance(lws_mqtt_str_t *s, int n);
LWS_VISIBLE LWS_EXTERN void
lws_mqtt_str_free(lws_mqtt_str_t **s);
/**
* lws_mqtt_client_send_publish() - lws_write a publish packet
*
* \param wsi: the mqtt child wsi
* \param pub: additional information on what we're publishing
* \param buf: payload to send
* \param len: length of data in buf
* \param final: flag indicating this is the last part
*
* Issues part of, or the whole of, a PUBLISH frame. The first part of the
* frame contains the header, and uses the .qos and .payload_len parts of \p pub
* since MQTT requires the frame to specify the PUBLISH message length at the
* start. The \p len paramter may be less than \p pub.payload_len, in which
* case subsequent calls with more payload are needed to complete the frame.
*
* Although the connection is stuck waiting for the remainder, in that it can't
* issue any other frames until the current one is completed, lws returns to the
* event loop normally and can continue the calls with additional payload even
* for huge frames as the data becomes available, consistent with timeout needs
* and latency to start any new frame (even, eg, related to ping / pong).
*
* If you're sending large frames, the OS will typically not allow the data to
* be sent all at once to kernel side. So you should ideally cut the payload
* up into 1 or 2- mtu sized chunks and send that.
*
* Final should be set when you're calling with the last part of the payload.
*/
LWS_VISIBLE LWS_EXTERN int
lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
const void *buf, uint32_t len, int final);
/**
* lws_mqtt_client_send_subcribe() - lws_write a subscribe packet
*
* \param wsi: the mqtt child wsi
* \param sub: which topic(s) we want to subscribe to
*
* For topics other child streams have not already subscribed to, send a packet
* to the server asking to subscribe to them. If all topics listed are already
* subscribed to be the shared network connection, just trigger the
* LWS_CALLBACK_MQTT_SUBSCRIBED callback as if a SUBACK had come.
*
* \p sub doesn't need to exist after the return from this function.
*/
LWS_VISIBLE LWS_EXTERN int
lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub);
/**
* lws_mqtt_client_send_unsubcribe() - lws_write a unsubscribe packet
*
* \param wsi: the mqtt child wsi
* \param sub: which topic(s) we want to unsubscribe from
*
* For topics other child streams are not subscribed to, send a packet
* to the server asking to unsubscribe from them. If all topics
* listed are already subscribed by other child streams on the shared
* network connection, just trigger the LWS_CALLBACK_MQTT_UNSUBSCRIBED
* callback as if a UNSUBACK had come.
*
* \p unsub doesn't need to exist after the return from this function.
*/
LWS_VISIBLE LWS_EXTERN int LWS_WARN_UNUSED_RESULT
lws_mqtt_client_send_unsubcribe(struct lws *wsi,
const lws_mqtt_subscribe_param_t *unsub);
#endif /* _LWS_MQTT_H */

View file

@ -79,10 +79,8 @@ __lws_reset_wsi(struct lws *wsi)
* or by specified the user. We should only free what we allocated.
*/
if (wsi->protocol && wsi->protocol->per_session_data_size &&
wsi->user_space && !wsi->user_space_externally_allocated) {
lws_free(wsi->user_space);
wsi->user_space = NULL;
}
wsi->user_space && !wsi->user_space_externally_allocated)
lws_free_set_NULL(wsi->user_space);
lws_buflist_destroy_all_segments(&wsi->buflist);
lws_buflist_destroy_all_segments(&wsi->buflist_out);

View file

@ -313,7 +313,8 @@ lws_client_connect_via_info(const struct lws_client_connect_info *i)
i->uri_replace_to);
#endif
if (i->method && (!strcmp(i->method, "RAW"))) {
if (i->method && (!strcmp(i->method, "RAW") ||
!strcmp(i->method, "MQTT"))) {
#if defined(LWS_WITH_TLS)

View file

@ -531,16 +531,20 @@ lws_callback_on_writable(struct lws *wsi)
#endif
if (wsi->role_ops->callback_on_writable) {
if (wsi->role_ops->callback_on_writable(wsi))
int q = wsi->role_ops->callback_on_writable(wsi);
//lwsl_notice("%s: rops_cow says %d\n", __func__, q);
if (q)
return 1;
w = lws_get_network_wsi(wsi);
}
} else
if (w->position_in_fds_table == LWS_NO_FDS_POS) {
lwsl_debug("%s: failed to find socket %d\n", __func__,
wsi->desc.sockfd);
return -1;
}
if (w->position_in_fds_table == LWS_NO_FDS_POS) {
lwsl_debug("%s: failed to find socket %d\n", __func__,
wsi->desc.sockfd);
return -1;
}
//lwsl_notice("%s: marking for POLLOUT %p (wsi %p)\n", __func__, w, wsi);
if (__lws_change_pollfd(w, 0, LWS_POLLOUT))
return -1;

View file

@ -41,6 +41,8 @@ struct lws_muxable {
unsigned int my_sid;
unsigned int child_count;
uint32_t highest_sid;
uint8_t requested_POLLOUT;
};
@ -417,7 +419,6 @@ struct lws_context_per_thread {
#if defined(LWS_ROLE_DBUS)
struct lws_pt_role_dbus dbus;
#endif
/* --- event library based members --- */
#if defined(LWS_WITH_LIBEV)
@ -476,7 +477,7 @@ struct lws_context_per_thread {
struct lws_conn_stats {
unsigned long long rx, tx;
unsigned long h1_conn, h1_trans, h2_trans, ws_upg, h2_alpn, h2_subs,
h2_upg, rejected;
h2_upg, rejected, mqtt_subs;
};
#endif
@ -643,8 +644,11 @@ struct lws {
#if defined(LWS_ROLE_DBUS)
struct _lws_dbus_mode_related dbus;
#endif
#if defined(LWS_ROLE_MQTT)
struct _lws_mqtt_related *mqtt;
#endif
#if defined(LWS_ROLE_H2)
#if defined(LWS_ROLE_H2) || defined(LWS_ROLE_MQTT)
struct lws_muxable mux;
struct lws_tx_credit txc;
#endif
@ -1368,6 +1372,7 @@ _lws_generic_transaction_completed_active_conn(struct lws **wsi);
#define ACTIVE_CONNS_SOLO 0
#define ACTIVE_CONNS_MUXED 1
#define ACTIVE_CONNS_QUEUED 2
#define ACTIVE_CONNS_FAILED 3
int
lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin);

View file

@ -146,11 +146,13 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
if (!wsi->role_ops->handle_POLLOUT)
goto bail_ok;
switch ((wsi->role_ops->handle_POLLOUT)(wsi)) {
n = wsi->role_ops->handle_POLLOUT(wsi);
switch (n) {
case LWS_HP_RET_BAIL_OK:
goto bail_ok;
case LWS_HP_RET_BAIL_DIE:
goto bail_die;
case LWS_HP_RET_DROP_POLLOUT:
case LWS_HP_RET_USER_SERVICE:
break;
default:
@ -192,6 +194,9 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
lwsi_state(wsi) != LRS_ISSUE_HTTP_BODY)
goto bail_ok;
if (n == LWS_HP_RET_DROP_POLLOUT)
goto bail_ok;
#ifdef LWS_WITH_CGI
user_service_go_again:

View file

@ -39,6 +39,9 @@ const struct lws_role_ops *available_roles[] = {
#endif
#if defined(LWS_ROLE_RAW_PROXY)
&role_ops_raw_proxy,
#endif
#if defined(LWS_ROLE_MQTT) && defined(LWS_WITH_CLIENT)
&role_ops_mqtt,
#endif
NULL
};
@ -1394,6 +1397,25 @@ lws_get_vhost_by_name(struct lws_context *context, const char *name)
int
lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
{
if (!lws_dll2_is_detached(&wsi->dll2_cli_txn_queue)) {
struct lws *w = lws_container_of(
wsi->dll2_cli_txn_queue.owner, struct lws,
dll2_cli_txn_queue_owner);
*nwsi = w;
return ACTIVE_CONNS_QUEUED;
}
if (wsi->mux.parent_wsi) {
/*
* We already decided...
*/
*nwsi = wsi->mux.parent_wsi;
return ACTIVE_CONNS_MUXED;
}
lws_vhost_lock(wsi->vhost); /* ----------------------------------- { */
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
@ -1401,8 +1423,8 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
struct lws *w = lws_container_of(d, struct lws,
dll_cli_active_conns);
lwsl_debug("%s: check %s %s %d %d\n", __func__, adsin,
w->cli_hostname_copy, wsi->c_port, w->c_port);
lwsl_debug("%s: check %p %p %s %s %d %d\n", __func__, wsi, w,
adsin, w->cli_hostname_copy, wsi->c_port, w->c_port);
if (w != wsi &&
/*
@ -1430,9 +1452,10 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
"keepalive on server\n");
goto solo;
}
#if defined (LWS_WITH_HTTP2)
#if defined(LWS_WITH_HTTP2)
/*
* h2: in usable state already: just use it without
* h2: if in usable state already: just use it without
* going through the queue
*/
if (w->client_h2_alpn && w->client_mux_migrated &&
@ -1460,8 +1483,36 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
}
#endif
lwsl_info("apply %p to txn queue on %p state 0x%lx\n",
wsi, w, (unsigned long)w->wsistate);
#if defined(LWS_ROLE_MQTT)
/*
* MQTT: if in usable state already: just use it without
* going through the queue
*/
if (lwsi_role_mqtt(wsi) && w->client_mux_migrated &&
lwsi_state(w) == LRS_ESTABLISHED) {
if (lws_wsi_mqtt_adopt(w, wsi)) {
lwsl_notice("%s: join mqtt directly\n", __func__);
lws_dll2_remove(&wsi->dll2_cli_txn_queue);
wsi->client_mux_substream = 1;
lws_vhost_unlock(wsi->vhost); /* } ---------- */
return ACTIVE_CONNS_MUXED;
}
}
#endif
/*
* If the connection is viable but not yet in a usable
* state, let's attach ourselves to it and wait for it
* to get there or fail.
*/
lwsl_notice("%s: apply %p to txn queue on %p state 0x%lx\n",
__func__, wsi, w, (unsigned long)w->wsistate);
/*
* ...let's add ourselves to his transaction queue...
* we are adding ourselves at the TAIL
@ -1475,7 +1526,7 @@ lws_vhost_active_conns(struct lws *wsi, struct lws **nwsi, const char *adsin)
}
/*
* h1: pipeline our headers out on him,
* For eg, h1 next we'd pipeline our headers out on him,
* and wait for our turn at client transaction_complete
* to take over parsing the rx.
*/

View file

@ -52,8 +52,10 @@ lws_vhost_bind_wsi(struct lws_vhost *vh, struct lws *wsi)
wsi->vhost = vh;
vh->count_bound_wsi++;
lws_context_unlock(vh->context); /* } context ---------- */
lwsl_info("%s: vh %s: count_bound_wsi %d\n",
__func__, vh->name, vh->count_bound_wsi);
lwsl_debug("%s: vh %s: wsi %s/%s, count_bound_wsi %d\n", __func__,
vh->name, wsi->role_ops ? wsi->role_ops->name : "none",
wsi->protocol ? wsi->protocol->name : "none",
vh->count_bound_wsi);
assert(wsi->vhost->count_bound_wsi > 0);
}
@ -67,8 +69,8 @@ lws_vhost_unbind_wsi(struct lws *wsi)
assert(wsi->vhost->count_bound_wsi > 0);
wsi->vhost->count_bound_wsi--;
lwsl_info("%s: vh %s: count_bound_wsi %d\n", __func__,
wsi->vhost->name, wsi->vhost->count_bound_wsi);
lwsl_debug("%s: vh %s: count_bound_wsi %d\n", __func__,
wsi->vhost->name, wsi->vhost->count_bound_wsi);
if (!wsi->vhost->count_bound_wsi &&
wsi->vhost->being_destroyed) {
@ -92,7 +94,7 @@ lws_get_network_wsi(struct lws *wsi)
if (!wsi)
return NULL;
#if defined(LWS_WITH_HTTP2)
#if defined(LWS_WITH_HTTP2) || defined(LWS_ROLE_MQTT)
if (!wsi->mux_substream
#if defined(LWS_WITH_CLIENT)
&& !wsi->client_mux_substream
@ -1062,7 +1064,7 @@ lws_wsi_client_stash_item(struct lws *wsi, int stash_idx, int hdr_idx)
}
#endif
#if defined(LWS_ROLE_H2)
#if defined(LWS_ROLE_H2) || defined(LWS_ROLE_MQTT)
void
lws_wsi_mux_insert(struct lws *wsi, struct lws *parent_wsi, int sid)
@ -1102,6 +1104,7 @@ lws_wsi_mux_dump_children(struct lws *wsi)
wsi->mux.parent_wsi->mux.child_list) {
lwsl_info(" \\---- child %s %p\n",
(*w)->role_ops ? (*w)->role_ops->name : "?", *w);
assert(*w != (*w)->mux.sibling_list);
} lws_end_foreach_llp(w, mux.sibling_list);
#endif
}
@ -1118,6 +1121,7 @@ lws_wsi_mux_close_children(struct lws *wsi, int reason)
lwsl_info(" closing child %p\n", *w);
/* disconnect from siblings */
wsi2 = (*w)->mux.sibling_list;
assert (wsi2 != *w);
(*w)->mux.sibling_list = NULL;
(*w)->socket_is_permanently_unusable = 1;
__lws_close_free_wsi(*w, reason, "mux child recurse");
@ -1146,6 +1150,7 @@ lws_wsi_mux_sibling_disconnect(struct lws *wsi)
}
} lws_end_foreach_llp(w, mux.sibling_list);
wsi->mux.parent_wsi->mux.child_count--;
wsi->mux.parent_wsi = NULL;
}
@ -1158,9 +1163,10 @@ lws_wsi_mux_dump_waiting_children(struct lws *wsi)
wsi = wsi->mux.child_list;
while (wsi) {
lwsl_info(" %c %p: sid %u: %s %s\n",
lwsl_info(" %c %p: sid %u: 0x%x %s %s\n",
wsi->mux.requested_POLLOUT ? '*' : ' ',
wsi, wsi->mux.my_sid, wsi->role_ops->name,
wsi, wsi->mux.my_sid, lwsi_state(wsi),
wsi->role_ops->name,
wsi->protocol ? wsi->protocol->name : "noprotocol");
wsi = wsi->mux.sibling_list;
@ -1233,11 +1239,14 @@ lws_wsi_mux_action_pending_writeable_reqs(struct lws *wsi)
if (w->mux.requested_POLLOUT) {
if (lws_change_pollfd(wsi, 0, LWS_POLLOUT))
return -1;
break;
return 0;
}
w = w->mux.sibling_list;
}
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0))
return -1;
return 0;
}
@ -1316,11 +1325,13 @@ lws_wsi_mux_apply_queue(struct lws *wsi)
/* we have a transaction queue that wants to pipeline */
lws_vhost_lock(wsi->vhost);
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
wsi->dll2_cli_txn_queue_owner.head) {
struct lws *w = lws_container_of(d, struct lws,
dll2_cli_txn_queue);
#if defined(LWS_ROLE_H2)
if (lwsi_role_http(wsi) &&
lwsi_state(w) == LRS_H2_WAITING_TO_SEND_HEADERS) {
lwsl_info("%s: cli pipeq %p to be h2\n", __func__, w);
@ -1333,8 +1344,23 @@ lws_wsi_mux_apply_queue(struct lws *wsi)
/* attach ourselves as an h2 stream */
lws_wsi_h2_adopt(wsi, w);
}
#endif
#if defined(LWS_ROLE_MQTT)
if (lwsi_role_mqtt(wsi) &&
lwsi_state(wsi) == LRS_ESTABLISHED) {
lwsl_info("%s: cli pipeq %p to be mqtt\n", __func__, w);
/* remove ourselves from client queue */
lws_dll2_remove(&w->dll2_cli_txn_queue);
/* attach ourselves as an h2 stream */
lws_wsi_mqtt_adopt(wsi, w);
}
#endif
} lws_end_foreach_dll_safe(d, d1);
lws_vhost_unlock(wsi->vhost);
return 0;

View file

@ -64,7 +64,7 @@ lws_client_connect_4_established(struct lws *wsi, struct lws *wsi_piggyback,
#if defined(LWS_CLIENT_HTTP_PROXYING)
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
#endif
const char *meth = NULL;
const char *meth;
struct lws_pollfd pfd;
const char *cce = "";
int n, m, rawish = 0;
@ -72,7 +72,11 @@ lws_client_connect_4_established(struct lws *wsi, struct lws *wsi_piggyback,
meth = lws_wsi_client_stash_item(wsi, CIS_METHOD,
_WSI_TOKEN_CLIENT_METHOD);
if (meth && !strcmp(meth, "RAW"))
if (meth && (!strcmp(meth, "RAW")
#if defined(LWS_ROLE_MQTT)
|| !strcmp(meth, "MQTT")
#endif
))
rawish = 1;
if (wsi_piggyback)
@ -224,6 +228,41 @@ send_hs:
/* service.c pollout processing wants this */
wsi->hdr_parsing_completed = 1;
#if defined(LWS_ROLE_MQTT)
if (!strcmp(meth, "MQTT")) {
#if defined(LWS_WITH_TLS)
if (wsi->tls.use_ssl & LCCSCF_USE_SSL) {
lwsi_set_state(wsi, LRS_WAITING_SSL);
return wsi;
}
#endif
lwsl_info("%s: settings LRS_MQTTC_IDLE\n",
__func__);
lwsi_set_state(wsi, LRS_MQTTC_IDLE);
/*
* provoke service to issue the CONNECT directly.
*/
lws_set_timeout(wsi, PENDING_TIMEOUT_SENT_CLIENT_HANDSHAKE,
AWAITING_TIMEOUT);
assert(lws_socket_is_valid(wsi->desc.sockfd));
pfd.fd = wsi->desc.sockfd;
pfd.events = LWS_POLLIN;
pfd.revents = LWS_POLLOUT;
lwsl_info("%s: going to service fd\n", __func__);
n = lws_service_fd(wsi->context, &pfd);
if (n < 0) {
cce = "first service failed";
goto failed;
}
if (n) /* returns 1 on failure after closing wsi */
return NULL;
return wsi;
}
#endif
lwsl_info("%s: setting ESTABLISHED\n", __func__);
lwsi_set_state(wsi, LRS_ESTABLISHED);
@ -722,8 +761,9 @@ lws_client_connect_2_dnsreq(struct lws *wsi)
int n, port = 0;
struct lws *w;
if (lwsi_state(wsi) == LRS_WAITING_DNS) {
lwsl_notice("%s: LRS_WAITING_DNS\n", __func__);
if (lwsi_state(wsi) == LRS_WAITING_DNS ||
lwsi_state(wsi) == LRS_WAITING_CONNECT) {
lwsl_info("%s: LRS_WAITING_DNS / CONNECT\n", __func__);
return wsi;
}
@ -747,7 +787,8 @@ lws_client_connect_2_dnsreq(struct lws *wsi)
/* only pipeline things we associate with being a stream */
if (meth && strcmp(meth, "RAW") && strcmp(meth, "GET") &&
strcmp(meth, "POST") && strcmp(meth, "PUT"))
strcmp(meth, "POST") && strcmp(meth, "PUT") &&
strcmp(meth, "UDP") && strcmp(meth, "MQTT"))
goto solo;
/* consult active connections to find out disposition */
@ -809,10 +850,12 @@ solo:
*/
if (meth && (!strcmp(meth, "RAW") || !strcmp(meth, "GET") ||
!strcmp(meth, "POST") || !strcmp(meth, "PUT")) &&
!strcmp(meth, "POST") || !strcmp(meth, "PUT") ||
!strcmp(meth, "MQTT")) &&
lws_dll2_is_detached(&wsi->dll2_cli_txn_queue) &&
lws_dll2_is_detached(&wsi->dll_cli_active_conns)) {
lws_vhost_lock(wsi->vhost);
lwsl_info("%s: adding active conn %p\n", __func__, wsi);
/* caution... we will have to unpick this on oom4 path */
lws_dll2_add_head(&wsi->dll_cli_active_conns,
&wsi->vhost->dll_cli_active_conns_owner);
@ -1338,7 +1381,8 @@ lws_http_client_connect_via_info2(struct lws *wsi)
wsi->opaque_user_data = wsi->stash->opaque_user_data;
if (stash->cis[CIS_METHOD] && !strcmp(stash->cis[CIS_METHOD], "RAW"))
if (stash->cis[CIS_METHOD] && (!strcmp(stash->cis[CIS_METHOD], "RAW") ||
!strcmp(stash->cis[CIS_METHOD], "MQTT")))
goto no_ah;
/*

View file

@ -211,6 +211,11 @@ lws_header_table_attach(struct lws *wsi, int autoservice)
struct lws_pollargs pa;
int n;
#if defined(LWS_ROLE_MQTT) && defined(LWS_WITH_CLIENT)
if (lwsi_role_mqtt(wsi))
goto connect_via_info2;
#endif
lwsl_info("%s: wsi %p: ah %p (tsi %d, count = %d) in\n", __func__,
(void *)wsi, (void *)wsi->http.ah, wsi->tsi,
pt->http.ah_count_in_use);
@ -281,6 +286,9 @@ reset:
lws_pt_unlock(pt);
#if defined(LWS_ROLE_MQTT)
connect_via_info2:
#endif
#if defined(LWS_WITH_CLIENT)
if (lwsi_role_client(wsi) && lwsi_state(wsi) == LRS_UNCONNECTED)
if (!lws_http_client_connect_via_info2(wsi))

View file

@ -0,0 +1,182 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
* Sakthi Kannan <saktr@amazon.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <private-lib-core.h>
#define MQTT_CONNECT_MSG_BASE_LEN (12)
struct lws *
lws_mqtt_client_send_connect(struct lws *wsi)
{
/* static int */
/* lws_mqttc_abs_writeable(lws_abs_protocol_inst_t *api, size_t budget) */
const lws_mqttc_t *c = &wsi->mqtt->client;
uint8_t b[256 + LWS_PRE], *start = b + LWS_PRE, *p = start,
len = MQTT_CONNECT_MSG_BASE_LEN;
switch (lwsi_state(wsi)) {
case LRS_MQTTC_IDLE:
/*
* Transport connected - this is our chance to do the
* protocol connect action.
*/
/* 1. Fixed Headers */
if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_CONNECT, 0, 0, 0)) {
lwsl_err("%s: Failled to fill fixed header\n", __func__);
return NULL;
}
/*
* 2. Remaining length - Add the lengths of client ID,
* username and password and their length fields if
* the respective flags are set.
*/
len += c->id->len;
if (c->conn_flags & LMQCFT_USERNAME && c->username) {
len += c->username->len + 2;
if (c->conn_flags & LMQCFT_PASSWORD)
len += (c->password ? c->password->len : 0) + 2;
}
if (c->conn_flags & LMQCFT_WILL_FLAG && c->will.topic) {
len += c->will.topic->len + 2;
len += (c->will.message ? c->will.message->len : 0) + 2;
}
p += lws_mqtt_vbi_encode(len, p);
/*
* 3. Variable Header - Protocol name & level, Connect
* flags and keep alive time (in secs).
*/
lws_ser_wu16be(p, 4); /* Length of protocol name */
p += 2;
*p++ = 'M';
*p++ = 'Q';
*p++ = 'T';
*p++ = 'T';
*p++ = MQTT_VER_3_1_1;
*p++ = c->conn_flags;
lws_ser_wu16be(p, c->keep_alive_secs);
p += 2;
/*
* 4. Payload - Client ID, Will topic & message,
* Username & password.
*/
if (lws_mqtt_str_is_not_empty(c->id)) {
lws_ser_wu16be(p, c->id->len);
p += 2;
memcpy(p, c->id->buf, c->id->len);
p += c->id->len;
} else {
/*
* If the Client supplies a zero-byte
* ClientId, the Client MUST also set
* CleanSession to 1 [MQTT-3.1.3-7].
*/
if (!(c->conn_flags & LMQCFT_CLEAN_START)) {
lwsl_err("%s: Empty client ID needs a clean start\n",
__func__);
return NULL;
}
*p++ = 0;
}
if ((c->conn_flags & ~LMQCFT_CLEAN_START) == 0) {
*p++ = 0; /* no properties */
break;
}
if (c->conn_flags & LMQCFT_WILL_FLAG) {
if (lws_mqtt_str_is_not_empty(c->will.topic)) {
lws_ser_wu16be(p, c->will.topic->len);
p += 2;
memcpy(p, c->will.topic->buf, c->will.topic->len);
p += c->will.topic->len;
if (lws_mqtt_str_is_not_empty(c->will.message)) {
lws_ser_wu16be(p, c->will.topic->len);
p += 2;
memcpy(p, c->will.message->buf,
c->will.message->len);
p += c->will.message->len;
} else {
lws_ser_wu16be(p, 0);
p += 2;
}
} else {
lwsl_err("%s: Missing Will Topic\n", __func__);
return NULL;
}
}
if (c->conn_flags & LMQCFT_USERNAME) {
/*
* Detailed sanity check on the username and
* password strings.
*/
if (lws_mqtt_str_is_not_empty(c->username)) {
lws_ser_wu16be(p, c->username->len);
p += 2;
memcpy(p, c->username->buf, c->username->len);
p += c->username->len;
} else {
lwsl_err("%s: Empty / missing Username!\n",
__func__);
return NULL;
}
if (c->conn_flags & LMQCFT_PASSWORD) {
if (lws_mqtt_str_is_not_empty(c->password)) {
lws_ser_wu16be(p, c->password->len);
p += 2;
memcpy(p, c->password->buf,
c->password->len);
p += c->password->len;
} else {
lws_ser_wu16be(p, 0);
p += 2;
}
}
} else if (c->conn_flags & LMQCFT_PASSWORD) {
lwsl_err("%s: Unsupported - Password without username\n",
__func__);
return NULL;
}
break;
default:
lwsl_err("%s: unexpected state %d\n", __func__, lwsi_state(wsi));
return NULL;
}
/*
* Perform the actual write
*/
if (lws_write(wsi, (unsigned char *)&b[LWS_PRE], lws_ptr_diff(p, start),
LWS_WRITE_BINARY) != lws_ptr_diff(p, start)) {
lwsl_notice("%s: write failed\n", __func__);
return NULL;
}
return wsi;
}

View file

@ -0,0 +1,398 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
/*
* You can leave buf NULL, if so it will be allocated on the heap once the
* actual length is known. nf should be 0, it will be set at allocation time.
*
* Or you can ensure no allocation and use an external buffer by setting buf
* and lim. But buf must be in the ep context somehow, since it may have to
* survive returns to the event loop unchanged. Set nf to 0 in this case.
*
* Or you can set buf to an externally allocated buffer, in which case you may
* set nf so it will be freed when the string is "freed".
*/
#include "private-lib-core.h"
/* #include "lws-mqtt.h" */
/* 3.1.3.1-5: MUST allow... that contain only the characters... */
static const uint8_t *code = (const uint8_t *)
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
static int
lws_mqtt_generate_id(struct lws* wsi, lws_mqtt_str_t **ms, const char *client_id)
{
struct lws_context *context = wsi->context;
uint16_t ran[24]; /* 16-bit so wrap bias from %62 diluted by ~1000 */
size_t n, len;
uint8_t *buf;
if (client_id)
len = strlen(client_id);
else
len = 23;
if (len > 23) /* 3.1.3.1-5: Server MUST... between 1 and 23 chars... */
return 1;
*ms = lws_mqtt_str_create((uint16_t)(len + 1));
if (!*ms)
return 1;
buf = lws_mqtt_str_next(*ms, NULL);
if (client_id) {
lws_strnncpy((char *)buf, client_id, len, len + 1);
lwsl_notice("%s: User space provided a client ID '%s'\n",
__func__, (const char *)buf);
} else {
lwsl_notice("%s: generating random client id\n", __func__);
n = len * sizeof(ran[0]);
if (lws_get_random(context, ran, n) != n) {
lws_mqtt_str_free(ms);
return 1;
}
for (n = 0; n < len; n++)
buf[n] = code[ran[n] % 62];
buf[len] = '\0';
}
lws_mqtt_str_advance(*ms, (uint16_t)len);
return 0;
}
int
lws_read_mqtt(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
{
lws_mqttc_t *c = &wsi->mqtt->client;
return _lws_mqtt_rx_parser(wsi, &c->par, buf, len);
}
int
lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
struct lws *wsi)
{
lws_mqttc_t *c;
const lws_mqtt_client_connect_param_t *cp = i->mqtt_cp;
/* allocate the ws struct for the wsi */
wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "client mqtt struct");
if (!wsi->mqtt)
goto oom;
wsi->mqtt->wsi = wsi;
c = &wsi->mqtt->client;
if (lws_mqtt_generate_id(wsi, &c->id, cp->client_id)) {
lwsl_err("%s: Error generating client ID\n", __func__);
return 1;
}
lwsl_info("%s: using client id '%.*s'\n", __func__, c->id->len,
(const char *)c->id->buf);
if (cp->clean_start || !cp->client_id[0])
c->conn_flags = LMQCFT_CLEAN_START;
c->keep_alive_secs = cp->keep_alive;
if (cp->will_param.topic &&
*cp->will_param.topic) {
c->will.topic = lws_mqtt_str_create_cstr_dup(
cp->will_param.topic, 0);
if (!c->will.topic)
goto oom1;
c->conn_flags |= LMQCFT_WILL_FLAG;
if (cp->will_param.message) {
c->will.message = lws_mqtt_str_create_cstr_dup(
cp->will_param.message, 0);
if (!c->will.message)
goto oom2;
}
c->conn_flags |= (cp->will_param.qos << 3) & LMQCFT_WILL_QOS_MASK;
c->conn_flags |= (!!cp->will_param.retain) * LMQCFT_WILL_RETAIN;
}
if (cp->username &&
*cp->username) {
c->username = lws_mqtt_str_create_cstr_dup(cp->username, 0);
if (!c->username)
goto oom3;
c->conn_flags |= LMQCFT_USERNAME;
if (cp->password) {
c->password =
lws_mqtt_str_create_cstr_dup(cp->password, 0);
if (!c->password)
goto oom4;
c->conn_flags |= LMQCFT_PASSWORD;
}
}
return 0;
oom4:
lws_mqtt_str_free(&c->username);
oom3:
lws_mqtt_str_free(&c->will.message);
oom2:
lws_mqtt_str_free(&c->will.topic);
oom1:
lws_mqtt_str_free(&c->id);
oom:
lwsl_err("%s: OOM!\n", __func__);
return 1;
}
int
lws_mqtt_client_socket_service(struct lws *wsi, struct lws_pollfd *pollfd,
struct lws *wsi_conn)
{
struct lws_context *context = wsi->context;
struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
int n = 0, m = 0;
struct lws_tokens ebuf;
int buffered = 0;
char pending = 0;
#if defined(LWS_WITH_TLS)
char erbuf[128];
#endif
const char *cce = NULL;
switch (lwsi_state(wsi)) {
#if defined(LWS_WITH_SOCKS5)
/* SOCKS Greeting Reply */
case LRS_WAITING_SOCKS_GREETING_REPLY:
case LRS_WAITING_SOCKS_AUTH_REPLY:
case LRS_WAITING_SOCKS_CONNECT_REPLY:
switch (lws_socks5c_handle_state(wsi, pollfd, &cce)) {
case LW5CHS_RET_RET0:
return 0;
case LW5CHS_RET_BAIL3:
goto bail3;
case LW5CHS_RET_STARTHS:
/*
* Now we got the socks5 connection, we need to go down
* the tls path on it if that's what we want
*/
if (!(wsi->tls.use_ssl & LCCSCF_USE_SSL))
goto start_ws_handshake;
/* we can retry this... just cook the SSL BIO the first time */
if (lws_ssl_client_bio_create(wsi) < 0) {
lwsl_err("%s: bio_create failed\n", __func__);
goto bail3;
}
if (wsi->tls.use_ssl & LCCSCF_USE_SSL) {
n = lws_ssl_client_connect1(wsi);
if (!n)
return 0;
if (n < 0) {
lwsl_err("%s: lws_ssl_client_connect1 failed\n",
__func__);
goto bail3;
}
}
default:
break;
}
break;
#endif
case LRS_WAITING_DNS:
/*
* we are under PENDING_TIMEOUT_SENT_CLIENT_HANDSHAKE
* timeout protection set in client-handshake.c
*/
if (!lws_client_connect_2_dnsreq(wsi)) {
/* closed */
lwsl_client("closed\n");
return -1;
}
/* either still pending connection, or changed mode */
return 0;
case LRS_WAITING_CONNECT:
/*
* we are under PENDING_TIMEOUT_SENT_CLIENT_HANDSHAKE
* timeout protection set in client-handshake.c
*/
if (pollfd->revents & LWS_POLLOUT)
lws_client_connect_3_connect(wsi, NULL, NULL, 0, NULL);
break;
#if defined(LWS_WITH_TLS)
case LRS_WAITING_SSL:
if (wsi->tls.use_ssl & LCCSCF_USE_SSL) {
n = lws_ssl_client_connect2(wsi, erbuf, sizeof(erbuf));
if (!n)
return 0;
if (n < 0) {
cce = erbuf;
goto bail3;
}
} else
wsi->tls.ssl = NULL;
#endif /* LWS_WITH_TLS */
#if defined(LWS_WITH_DETAILED_LATENCY)
if (context->detailed_latency_cb) {
wsi->detlat.type = LDLT_TLS_NEG_CLIENT;
wsi->detlat.latencies[LAT_DUR_PROXY_CLIENT_REQ_TO_WRITE] =
lws_now_usecs() -
wsi->detlat.earliest_write_req_pre_write;
wsi->detlat.latencies[LAT_DUR_USERCB] = 0;
lws_det_lat_cb(wsi->context, &wsi->detlat);
}
#endif
#if 0
if (wsi->client_h2_alpn) {
/*
* We connected to the server and set up tls, and
* negotiated "h2".
*
* So this is it, we are an h2 master client connection
* now, not an h1 client connection.
*/
#if defined(LWS_WITH_TLS)
lws_tls_server_conn_alpn(wsi);
#endif
/* send the H2 preface to legitimize the connection */
if (lws_h2_issue_preface(wsi)) {
cce = "error sending h2 preface";
goto bail3;
}
break;
}
#endif
/* fallthru */
#if defined(LWS_WITH_SOCKS5)
start_ws_handshake:
#endif
lwsi_set_state(wsi, LRS_MQTTC_IDLE);
lws_set_timeout(wsi, PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND,
context->timeout_secs);
/* fallthru */
case LRS_MQTTC_IDLE:
/*
* we should be ready to send out MQTT CONNECT
*/
lwsl_info("%s: wsi %p: Transport established, send out CONNECT\n",
__func__, wsi);
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0))
return -1;
if (!lws_mqtt_client_send_connect(wsi)) {
lwsl_err("%s: Unable to send MQTT CONNECT\n", __func__);
return -1;
}
if (lws_change_pollfd(wsi, 0, LWS_POLLIN))
return -1;
lwsi_set_state(wsi, LRS_MQTTC_AWAIT_CONNACK);
return 0;
case LRS_ESTABLISHED:
case LRS_MQTTC_AWAIT_CONNACK:
buffered = 0;
ebuf.token = pt->serv_buf;
ebuf.len = wsi->context->pt_serv_buf_size;
if ((unsigned int)ebuf.len > wsi->context->pt_serv_buf_size)
ebuf.len = wsi->context->pt_serv_buf_size;
if ((int)pending > ebuf.len)
pending = ebuf.len;
ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
pending ? (int)pending :
ebuf.len);
switch (ebuf.len) {
case 0:
lwsl_info("%s: zero length read\n",
__func__);
goto fail;
case LWS_SSL_CAPABLE_MORE_SERVICE:
lwsl_info("SSL Capable more service\n");
goto fail;
case LWS_SSL_CAPABLE_ERROR:
lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
__func__);
goto fail;
}
if (ebuf.len < 0)
n = -1;
else
n = lws_read_mqtt(wsi, ebuf.token, ebuf.len);
if (n < 0) {
lwsl_err("%s: Parsing packet failed\n", __func__);
goto fail;
}
m = ebuf.len - n;
// lws_buflist_describe(&wsi->buflist, wsi, __func__);
lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
if (lws_buflist_aware_finished_consuming(wsi, &ebuf, m,
buffered,
__func__))
return -1;
return 0;
#if defined(LWS_WITH_TLS) || defined(LWS_WITH_SOCKS5)
bail3:
#endif
lwsl_info("closing conn at LWS_CONNMODE...SERVER_REPLY\n");
if (cce)
lwsl_info("reason: %s\n", cce);
lws_inform_client_conn_fail(wsi, (void *)cce, strlen(cce));
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "cbail3");
return -1;
default:
break;
}
return 0;
fail:
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
return LWS_HPI_RET_WSI_ALREADY_DIED;
}

2116
lib/roles/mqtt/mqtt.c Normal file

File diff suppressed because it is too large Load diff

603
lib/roles/mqtt/ops-mqtt.c Normal file
View file

@ -0,0 +1,603 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "private-lib-core.h"
static int
rops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi,
struct lws_pollfd *pollfd)
{
unsigned int pending = 0;
struct lws_tokens ebuf;
int n = 0;
char buffered = 0;
lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__,
(unsigned int)wsi->wsistate, wsi->protocol->name,
pollfd->revents);
/*
* After the CONNACK and nwsi establishment, the first logical
* stream is migrated out of the nwsi to be child sid 1, and the
* nwsi no longer has a wsi->mqtt of its own.
*
* RX events on the nwsi must be converted to events seen or not
* seen by one or more child streams.
*
* SUBACK - reflected to child stream that asked for it
* PUBACK - routed to child that did the related publish
*/
ebuf.token = NULL;
ebuf.len = 0;
if (lwsi_state(wsi) != LRS_ESTABLISHED) {
#if defined(LWS_WITH_CLIENT)
if (lwsi_state(wsi) == LRS_WAITING_SSL &&
((pollfd->revents & LWS_POLLOUT)) &&
lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
lwsl_info("failed at set pollfd\n");
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
if ((pollfd->revents & LWS_POLLOUT) &&
lws_handle_POLLOUT_event(wsi, pollfd)) {
lwsl_debug("POLLOUT event closed it\n");
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
n = lws_mqtt_client_socket_service(wsi, pollfd, NULL);
if (n)
return LWS_HPI_RET_WSI_ALREADY_DIED;
#endif
return LWS_HPI_RET_HANDLED;
}
/* 1: something requested a callback when it was OK to write */
if ((pollfd->revents & LWS_POLLOUT) &&
lwsi_state_can_handle_POLLOUT(wsi) &&
lws_handle_POLLOUT_event(wsi, pollfd)) {
if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
/* 3: buflist needs to be drained
*/
read:
// lws_buflist_describe(&wsi->buflist, wsi, __func__);
ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token);
if (ebuf.len) {
lwsl_info("draining buflist (len %d)\n", ebuf.len);
buffered = 1;
goto drain;
}
if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
return LWS_HPI_RET_HANDLED;
/* if (lws_is_flowcontrolled(wsi)) { */
/* lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */
/* __func__, wsi, wsi->rxflow_bitmap); */
/* return LWS_HPI_RET_HANDLED; */
/* } */
if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) {
/*
* In case we are going to react to this rx by scheduling
* writes, we need to restrict the amount of rx to the size
* the protocol reported for rx buffer.
*
* Otherwise we get a situation we have to absorb possibly a
* lot of reads before we get a chance to drain them by writing
* them, eg, with echo type tests in autobahn.
*/
buffered = 0;
ebuf.token = pt->serv_buf;
ebuf.len = wsi->context->pt_serv_buf_size;
if ((unsigned int)ebuf.len > wsi->context->pt_serv_buf_size)
ebuf.len = wsi->context->pt_serv_buf_size;
if ((int)pending > ebuf.len)
pending = ebuf.len;
ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
pending ? (int)pending :
ebuf.len);
switch (ebuf.len) {
case 0:
lwsl_info("%s: zero length read\n",
__func__);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
case LWS_SSL_CAPABLE_MORE_SERVICE:
lwsl_info("SSL Capable more service\n");
return LWS_HPI_RET_HANDLED;
case LWS_SSL_CAPABLE_ERROR:
lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
__func__);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
/*
* coverity thinks ssl_capable_read() may read over
* 2GB. Dissuade it...
*/
ebuf.len &= 0x7fffffff;
}
drain:
/* service incoming data */
//lws_buflist_describe(&wsi->buflist, wsi, __func__);
if (ebuf.len) {
n = lws_read_mqtt(wsi, ebuf.token, ebuf.len);
if (n < 0) {
lwsl_notice("%s: lws_read_mqtt returned %d\n",
__func__, n);
/* we closed wsi */
n = 0;
goto fail;
}
// lws_buflist_describe(&wsi->buflist, wsi, __func__);
lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len,
buffered, __func__))
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
ebuf.token = NULL;
ebuf.len = 0;
pending = lws_ssl_pending(wsi);
if (pending) {
pending = pending > wsi->context->pt_serv_buf_size ?
wsi->context->pt_serv_buf_size : pending;
goto read;
}
if (buffered && /* were draining, now nothing left */
!lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
lwsl_info("%s: %p flow buf: drained\n", __func__, wsi);
/* having drained the rxflow buffer, can rearm POLLIN */
#if !defined(LWS_WITH_SERVER)
n =
#endif
__lws_rx_flow_control(wsi);
/* n ignored, needed for NO_SERVER case */
}
/* n = 0 */
return LWS_HPI_RET_HANDLED;
fail:
lwsl_err("%s: Failed, bailing\n", __func__);
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
return LWS_HPI_RET_WSI_ALREADY_DIED;
}
#if 0 /* defined(LWS_WITH_SERVER) */
static int
rops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name)
{
/* no http but socket... must be mqtt */
if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) ||
(type & _LWS_ADOPT_FINISH))
return 0; /* no match */
lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT :
LRS_ESTABLISHED, &role_ops_mqtt);
if (vh_prot_name)
lws_bind_protocol(wsi, wsi->protocol, __func__);
else
/* this is the only time he will transition */
lws_bind_protocol(wsi,
&wsi->vhost->protocols[wsi->vhost->mqtt_protocol_index],
__func__);
return 1; /* bound */
}
#endif
static int
rops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i)
{
lwsl_debug("%s: i = %p\n", __func__, i);
if (!i) {
/* finalize */
if (!wsi->user_space && wsi->stash->cis[CIS_METHOD])
if (lws_ensure_user_space(wsi))
return 1;
if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN])
wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca";
/* if we went on the ah waiting list, it's ok, we can
* wait.
*
* When we do get the ah, now or later, he will end up
* at lws_http_client_connect_via_info2().
*/
#if defined(LWS_WITH_CLIENT)
if (lws_header_table_attach(wsi, 0) < 0)
/*
* if we failed here, the connection is already closed
* and freed.
*/
return -1;
#else
if (lws_header_table_attach(wsi, 0))
return 0;
#endif
return 0;
}
/* if a recognized mqtt method, bind to it */
if (strcmp(i->method, "MQTT"))
return 0; /* no match */
if (lws_create_client_mqtt_object(i, wsi))
return 1;
lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED,
&role_ops_mqtt);
return 1; /* matched */
}
static int
rops_handle_POLLOUT_mqtt(struct lws *wsi)
{
struct lws **wsi2;
lwsl_debug("%s\n", __func__);
#if defined(LWS_WITH_CLIENT)
if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) {
uint8_t buf[LWS_PRE + 2];
/*
* We are swallowing this POLLOUT in order to send a PINGREQ
* autonomously
*/
wsi->mqtt->send_pingreq = 0;
lwsl_notice("%s: issuing PINGREQ\n", __func__);
buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4;
buf[LWS_PRE + 1] = 0;
if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2,
LWS_WRITE_BINARY) != 2)
return LWS_HP_RET_BAIL_DIE;
return LWS_HP_RET_BAIL_OK;
}
#endif
wsi = lws_get_network_wsi(wsi);
wsi->mux.requested_POLLOUT = 0;
wsi2 = &wsi->mux.child_list;
if (!*wsi2) {
lwsl_debug("%s: no children\n", __func__);
return LWS_HP_RET_DROP_POLLOUT;
}
lws_wsi_mux_dump_waiting_children(wsi);
do {
struct lws *w, **wa;
wa = &(*wsi2)->mux.sibling_list;
if (!(*wsi2)->mux.requested_POLLOUT)
goto next_child;
if (!lwsi_state_can_handle_POLLOUT(wsi))
goto next_child;
/*
* If the nwsi is in the middle of a frame, we can only
* continue to send that
*/
if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload)
goto next_child;
/*
* we're going to do writable callback for this child.
* move him to be the last child
*/
w = lws_wsi_mux_move_child_to_tail(wsi2);
if (!w) {
wa = &wsi->mux.child_list;
goto next_child;
}
lwsl_debug("%s: child %p (wsistate 0x%x)\n", __func__, w,
(unsigned int)w->wsistate);
if (lwsi_state(wsi) == LRS_ESTABLISHED &&
!wsi->mqtt->inside_payload &&
wsi->mqtt->send_puback) {
uint8_t buf[LWS_PRE + 4];
lwsl_notice("%s: issuing PUBACK for pkt id: %d\n",
__func__, wsi->mqtt->ack_pkt_id);
/* Fixed header */
buf[LWS_PRE] = LMQCP_PUBACK << 4;
/* Remaining len = 2 */
buf[LWS_PRE + 1] = 2;
/* Packet ID */
lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->ack_pkt_id);
if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
LWS_WRITE_BINARY) != 4)
return LWS_HP_RET_BAIL_DIE;
wsi->mqtt->send_puback = 0;
w->mux.requested_POLLOUT = 1;
wa = &wsi->mux.child_list;
goto next_child;
}
if (lws_callback_as_writeable(w)) {
lwsl_notice("%s: Closing child %p\n", __func__, w);
lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
"mqtt pollout handle");
wa = &wsi->mux.child_list;
}
next_child:
wsi2 = wa;
} while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
// lws_wsi_mux_dump_waiting_children(wsi);
if (lws_wsi_mux_action_pending_writeable_reqs(wsi))
return LWS_HP_RET_BAIL_DIE;
return LWS_HP_RET_BAIL_OK;
}
#if defined(LWS_WITH_CLIENT)
static int
rops_issue_keepalive_mqtt(struct lws *wsi, int isvalid)
{
struct lws *nwsi = lws_get_network_wsi(wsi);
if (isvalid) {
_lws_validity_confirmed_role(nwsi);
return 0;
}
nwsi->mqtt->send_pingreq = 1;
lws_callback_on_writable(nwsi);
return 0;
}
#endif
static int
rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
{
struct lws *nwsi = lws_get_network_wsi(wsi);
lws_mqtt_subs_t *s, *s1, *mysub;
lws_mqttc_t *c;
if (!wsi->mqtt)
return 0;
c = &wsi->mqtt->client;
__lws_sul_insert(&pt->pt_sul_owner, &wsi->mqtt->sul_qos1_puback_wait,
LWS_SET_TIMER_USEC_CANCEL);
lws_mqtt_str_free(&c->username);
lws_mqtt_str_free(&c->password);
lws_mqtt_str_free(&c->will.message);
lws_mqtt_str_free(&c->will.topic);
lws_mqtt_str_free(&c->id);
/* clean up any subscription allocations */
s = wsi->mqtt->subs_head;
wsi->mqtt->subs_head = NULL;
while (s) {
s1 = s->next;
/*
* Account for children no longer using nwsi subscription
*/
mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]);
// assert(mysub); /* if child subscribed, nwsi must feel the same */
if (mysub) {
assert(mysub->ref_count);
mysub->ref_count--;
}
lws_free(s);
s = s1;
}
lws_mqtt_publish_param_t *pub =
(lws_mqtt_publish_param_t *)
wsi->mqtt->rx_cpkt_param;
if (pub)
lws_free_set_NULL(pub->topic);
lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
lws_free_set_NULL(wsi->mqtt);
return 0;
}
static int
rops_callback_on_writable_mqtt(struct lws *wsi)
{
#if defined(LWS_WITH_CLIENT)
struct lws *network_wsi;
#endif
int already;
lwsl_debug("%s: %p (wsistate 0x%x)\n", __func__, wsi, (unsigned int)wsi->wsistate);
if (wsi->mux.requested_POLLOUT
#if defined(LWS_WITH_CLIENT)
&& !wsi->client_h2_alpn
#endif
) {
lwsl_debug("already pending writable\n");
return 1;
}
#if 0
/* is this for DATA or for control messages? */
if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps &&
!lws_h2_tx_cr_get(wsi)) {
/*
* other side is not able to cope with us sending DATA
* anything so no matter if we have POLLOUT on our side if it's
* DATA we want to send.
*
* Delay waiting for our POLLOUT until peer indicates he has
* space for more using tx window command in http2 layer
*/
lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi,
wsi->h2.tx_cr);
wsi->h2.skint = 1;
return 0;
}
wsi->h2.skint = 0;
#endif
#if defined(LWS_WITH_CLIENT)
network_wsi = lws_get_network_wsi(wsi);
#endif
already = lws_wsi_mux_mark_parents_needing_writeable(wsi);
/* for network action, act only on the network wsi */
if (already
#if defined(LWS_WITH_CLIENT)
&& !network_wsi->client_mux_substream
#endif
)
return 1;
return 0;
}
static int
rops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason)
{
lwsl_info(" wsi: %p, his parent %p: child list %p, siblings:\n", wsi,
wsi->mux.parent_wsi, wsi->mux.child_list);
//lws_wsi_mux_dump_children(wsi);
if (wsi->mux_substream
#if defined(LWS_WITH_CLIENT)
|| wsi->client_mux_substream
#endif
) {
lwsl_info("closing %p: parent %p: first child %p\n", wsi,
wsi->mux.parent_wsi, wsi->mux.child_list);
if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) {
lwsl_info(" parent %p: closing children: list:\n", wsi);
lws_wsi_mux_dump_children(wsi);
}
lws_wsi_mux_close_children(wsi, reason);
}
if ((
#if defined(LWS_WITH_CLIENT)
wsi->client_mux_substream ||
#endif
wsi->mux_substream) &&
wsi->mux.parent_wsi) {
lws_wsi_mux_sibling_disconnect(wsi);
}
return 0;
}
struct lws_role_ops role_ops_mqtt = {
/* role name */ "mqtt",
/* alpn id */ "x-amzn-mqtt-ca", /* "mqtt/3.1.1" */
/* check_upgrades */ NULL,
/* pt_init_destroy */ NULL,
/* init_vhost */ NULL,
/* destroy_vhost */ NULL,
/* service_flag_pending */ NULL,
.handle_POLLIN = rops_handle_POLLIN_mqtt,
.handle_POLLOUT = rops_handle_POLLOUT_mqtt,
/* perform_user_POLLOUT */ NULL,
/* callback_on_writable */ rops_callback_on_writable_mqtt,
/* tx_credit */ NULL,
.write_role_protocol = NULL,
/* encapsulation_parent */ NULL,
/* alpn_negotiated */ NULL,
/* close_via_role_protocol */ NULL,
.close_role = rops_close_role_mqtt,
.close_kill_connection = rops_close_kill_connection_mqtt,
/* destroy_role */ NULL,
#if 0 /* defined(LWS_WITH_SERVER) */
/* adoption_bind */ rops_adoption_bind_mqtt,
#else
NULL,
#endif
#if defined(LWS_WITH_CLIENT)
.client_bind = rops_client_bind_mqtt,
.issue_keepalive = rops_issue_keepalive_mqtt,
#else
.client_bind = NULL,
.issue_keepalive = NULL,
#endif
.adoption_cb = { LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED,
LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED },
.rx_cb = { LWS_CALLBACK_MQTT_CLIENT_RX,
LWS_CALLBACK_MQTT_CLIENT_RX },
.writeable_cb = { LWS_CALLBACK_MQTT_CLIENT_WRITEABLE,
LWS_CALLBACK_MQTT_CLIENT_WRITEABLE },
.close_cb = { LWS_CALLBACK_MQTT_CLIENT_CLOSED,
LWS_CALLBACK_MQTT_CLIENT_CLOSED },
.protocol_bind_cb = { LWS_CALLBACK_MQTT_IDLE,
LWS_CALLBACK_MQTT_IDLE },
.protocol_unbind_cb = { LWS_CALLBACK_MQTT_DROP_PROTOCOL,
LWS_CALLBACK_MQTT_DROP_PROTOCOL },
.file_handle = 0,
};

326
lib/roles/mqtt/primitives.c Normal file
View file

@ -0,0 +1,326 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*
* MQTT v5
*
* http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
*/
#include "private-lib-core.h"
/* #include "lws-mqtt.h" */
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#include <assert.h>
/*
* Encode is done into a buffer of at least 4 bytes space.
*
* Returns -1 for error, or number of bytes used
*/
int
lws_mqtt_vbi_encode(uint32_t value, void *buf)
{
uint8_t *p = (uint8_t *)buf, b;
if (value > 0xfffffff) {
assert(0);
return -1;
}
do {
b = value & 0x7f;
value >>= 7;
if (value)
*p++ = (0x80 | b);
else
*p++ = b;
} while (value);
return lws_ptr_diff(p, (uint8_t *)buf);
}
void
lws_mqtt_vbi_init(lws_mqtt_vbi *vbi)
{
vbi->value = 0;
vbi->consumed = 0;
vbi->budget = 4;
}
void
lws_mqtt_2byte_init(lws_mqtt_vbi *vbi)
{
vbi->value = 0;
vbi->consumed = 0;
vbi->budget = 2;
}
void
lws_mqtt_4byte_init(lws_mqtt_vbi *vbi)
{
vbi->value = 0;
vbi->consumed = 0;
vbi->budget = 4;
}
lws_mqtt_stateful_primitive_return_t
lws_mqtt_vbi_r(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len)
{
uint8_t multiplier = 0;
if (!vbi->budget) {
lwsl_info("%s: bad vbi\n", __func__);
return LMSPR_FAILED_ALREADY_COMPLETED;
}
while (*len && vbi->budget--) {
uint8_t u = *((*in)++);
(*len)--;
vbi->consumed++;
vbi->value += (u & 0x7f) << multiplier;
multiplier += 7;
if (!(u & 0x80))
return LMSPR_COMPLETED; /* finished */
}
if (!vbi->budget) { /* should have ended on b7 = 0 and exited then... */
lwsl_info("%s: bad vbi\n", __func__);
return LMSPR_FAILED_FORMAT;
}
return LMSPR_NEED_MORE;
}
lws_mqtt_stateful_primitive_return_t
lws_mqtt_mb_parse(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len)
{
if (!vbi->budget)
return LMSPR_FAILED_ALREADY_COMPLETED;
while (*len && vbi->budget--) {
vbi->value = (vbi->value << 8) | *((*in)++);
(*len)--;
vbi->consumed++;
}
return vbi->budget ? LMSPR_NEED_MORE : LMSPR_COMPLETED;
}
/*
* You can leave buf NULL, if so it will be allocated on the heap once the
* actual length is known. nf should be 0, it will be set at allocation time.
*
* Or you can ensure no allocation and use an external buffer by setting buf
* and lim. But buf must be in the ep context somehow, since it may have to
* survive returns to the event loop unchanged. Set nf to 0 in this case.
*
* Or you can set buf to an externally allocated buffer, in which case you may
* set nf so it will be freed when the string is "freed".
*/
void
lws_mqtt_str_init(lws_mqtt_str_t *s, uint8_t *buf, uint16_t lim, char nf)
{
s->len = 0; /* at COMPLETED, consumed count is s->len + 2 */
s->pos = 0;
s->buf = buf;
s->limit = lim;
s->len_valid = 0;
s->needs_freeing = nf;
}
lws_mqtt_str_t *
lws_mqtt_str_create(uint16_t lim)
{
lws_mqtt_str_t *s = lws_malloc(sizeof(*s) + lim + 1, __func__);
if (!s)
return NULL;
s->len = 0;
s->pos = 0;
s->buf = (uint8_t *)&s[1];
s->limit = lim;
s->len_valid = 0;
s->needs_freeing = 1;
return s;
}
lws_mqtt_str_t *
lws_mqtt_str_create_init(uint8_t *buf, uint16_t len, uint16_t lim)
{
lws_mqtt_str_t *s;
if (!lim)
lim = len;
s = lws_mqtt_str_create(lim);
if (!s)
return NULL;
memcpy(s->buf, buf, len);
s->len = len;
s->len_valid = 1;
s->pos = len;
return s;
}
lws_mqtt_str_t *
lws_mqtt_str_create_cstr_dup(const char *buf, uint16_t lim)
{
size_t len = strlen(buf);
if (!lim)
lim = (uint16_t)len;
return lws_mqtt_str_create_init((uint8_t *)buf, (uint16_t)len, lim);
}
uint8_t *
lws_mqtt_str_next(lws_mqtt_str_t *s, uint16_t *budget)
{
if (budget)
*budget = s->limit - s->pos;
return &s->buf[s->pos];
}
int
lws_mqtt_str_advance(lws_mqtt_str_t *s, int n)
{
if (n > s->limit - s->pos) {
lwsl_err("%s: attempted overflow %d vs %d\n", __func__,
n, s->limit - s->pos);
return 1;
}
s->pos += n;
s->len += n;
return 0;
}
void
lws_mqtt_str_free(lws_mqtt_str_t **ps)
{
lws_mqtt_str_t *s = *ps;
if (!s || !s->needs_freeing)
return;
/* buf may be independently allocated or allocated along with the
* lws_mqtt_str_t at the end... if so the whole lws_mqtt_str_t is freed.
*/
if (s->buf != (uint8_t *)&s[1])
lws_free_set_NULL(s->buf);
else
lws_free_set_NULL(*ps);
}
/*
* Parses and allocates for lws_mqtt_str_t in a fragmentation-immune, but
* efficient for bulk data way.
*
* Returns: LMSPR_NEED_MORE if needs more data,
* LMSPR_COMPLETED if complete, <0 for error
*
* *len is reduced by, and *in is advanced by, the amount of data actually used,
* except in error case
*
* lws_mqtt_str_free() must be called after calling this successfully
* or not.
*/
lws_mqtt_stateful_primitive_return_t
lws_mqtt_str_parse(lws_mqtt_str_t *s, const uint8_t **in, size_t *len)
{
const uint8_t *oin = *in;
/* handle the length + allocation if needed */
while (*len && !s->len_valid && s->pos < 2) {
s->len = (s->len << 8) | *((*in)++);
(*len)--;
oin = *in;
if (++s->pos == 2) {
if (s->len > s->limit)
return LMSPR_FAILED_OVERSIZE;
s->pos = 0;
s->len_valid = 1;
if (!s->len) /* do not need to allocate */
return LMSPR_COMPLETED;
if (!s->buf) {
s->buf = lws_malloc(s->len, __func__);
if (!s->buf)
return LMSPR_FAILED_OOM;
s->needs_freeing = 1;
}
}
}
/* handle copying bulk data into allocation */
if (s->len_valid && *len) {
uint16_t span = s->len - s->pos;
if (span > *len)
span = (uint16_t)*len;
memcpy(s->buf + s->pos, *in, span);
*in += span;
s->pos += span;
}
*len -= *in - oin;
return s->buf && s->pos == s->len ? LMSPR_COMPLETED : LMSPR_NEED_MORE;
}
int
lws_mqtt_bindata_cmp(const lws_mqtt_str_t *bd1,
const lws_mqtt_str_t *bd2)
{
if (bd1->len != bd2->len)
return 1;
if (!!bd1->buf != !!bd2->buf)
return 1;
if (!bd1->buf && !bd2->buf)
return 0;
return memcmp(bd1->buf, bd2->buf, bd1->len);
}

View file

@ -0,0 +1,398 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010 - 2020 Andy Green <andy@warmcat.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#ifndef _PRIVATE_LIB_ROLES_MQTT
#define _PRIVATE_LIB_ROLES_MQTT 1
#include <libwebsockets/lws-mqtt.h>
extern struct lws_role_ops role_ops_mqtt;
#define lwsi_role_mqtt(wsi) (wsi->role_ops == &role_ops_mqtt)
#define LWS_MQTT_MAX_CHILDREN 8 /* max child streams on same parent */
#define LMQCP_LUT_FLAG_RESERVED_FLAGS 0x10
#define LMQCP_LUT_FLAG_PACKET_ID_NONE 0x00
#define LMQCP_LUT_FLAG_PACKET_ID_HAS 0x20
#define LMQCP_LUT_FLAG_PACKET_ID_QOS12 0x40
#define LMQCP_LUT_FLAG_PACKET_ID_MASK 0x60
#define LMQCP_LUT_FLAG_PAYLOAD 0x80 /* payload req (publish = opt)*/
#define lws_mqtt_str_is_not_empty(s) ( ((s)) && \
((s))->len && \
((s))->buf && \
*((s))->buf )
#define LWS_MQTT_RESPONSE_TIMEOUT (3 * LWS_US_PER_SEC)
#define LWS_MQTT_RETRY_CEILING (60 * LWS_US_PER_SEC)
typedef enum {
LMSPR_COMPLETED = 0,
LMSPR_NEED_MORE = 1,
LMSPR_FAILED_OOM = -1,
LMSPR_FAILED_OVERSIZE = -2,
LMSPR_FAILED_FORMAT = -3,
LMSPR_FAILED_ALREADY_COMPLETED = -4,
} lws_mqtt_stateful_primitive_return_t;
typedef struct {
uint32_t value;
char budget;
char consumed;
} lws_mqtt_vbi;
/* works for vbi, 2-byte and 4-byte fixed length */
static inline int
lws_mqtt_mb_first(lws_mqtt_vbi *vbi) { return !vbi->consumed; }
int
lws_mqtt_vbi_encode(uint32_t value, void *buf);
/*
* Decode is done statefully on an arbitrary amount of input data (which may
* be one byte). It's like this so it can continue seamlessly if a buffer ends
* partway through the primitive, and the api matches the bulk binary data case.
*
* VBI decode:
*
* Initialize the lws_mqtt_vbi state by calling lws_mqtt_vbi_init() on it, then
* feed lws_mqtt_vbi_r() bytes to decode.
*
* Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
* LMSPR_NEED_MORE if more calls to lws_mqtt_vbi_r() with subsequent bytes
* needed.
*
* *in and *len are updated accordingly.
*
* 2-byte and 4-byte decode:
*
* Initialize the lws_mqtt_vbi state by calling lws_mqtt_2byte_init() or
* lws_mqtt_4byte_init() on it, then feed lws_mqtt_mb_parse() bytes
* to decode.
*
* Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
* LMSPR_NEED_MORE if more calls to lws_mqtt_mb_parse() with subsequent
* bytes needed.
*
* *in and *len are updated accordingly.
*/
void
lws_mqtt_vbi_init(lws_mqtt_vbi *vbi);
void
lws_mqtt_2byte_init(lws_mqtt_vbi *vbi);
void
lws_mqtt_4byte_init(lws_mqtt_vbi *vbi);
lws_mqtt_stateful_primitive_return_t
lws_mqtt_vbi_r(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
lws_mqtt_stateful_primitive_return_t
lws_mqtt_mb_parse(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
typedef struct lws_mqtt_str_st {
uint8_t *buf;
uint16_t len;
uint16_t limit; /* it's cheaper to add the state here than
* the pointer to point to it elsewhere */
uint16_t pos;
char len_valid;
char needs_freeing;
} lws_mqtt_str_t;
static inline int
lws_mqtt_str_first(lws_mqtt_str_t *s) { return !s->buf && !s->pos; }
lws_mqtt_stateful_primitive_return_t
lws_mqtt_str_parse(lws_mqtt_str_t *bd, const uint8_t **in, size_t *len);
typedef enum {
LMQCPP_IDLE,
/* receive packet type part of fixed header took us out of idle... */
LMQCPP_CONNECT_PACKET = LMQCP_CTOS_CONNECT << 4,
LMQCPP_CONNECT_REMAINING_LEN_VBI,
LMQCPP_CONNECT_VH_PNAME,
LMQCPP_CONNECT_VH_PVERSION,
LMQCPP_CONNECT_VH_FLAGS,
LMQCPP_CONNECT_VH_KEEPALIVE,
LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN,
LMQCPP_CONNACK_PACKET = LMQCP_STOC_CONNACK << 4,
LMQCPP_CONNACK_VH_FLAGS,
LMQCPP_CONNACK_VH_RETURN_CODE,
LMQCPP_PUBLISH_PACKET = LMQCP_PUBLISH << 4,
LMQCPP_PUBLISH_REMAINING_LEN_VBI,
LMQCPP_PUBLISH_VH_TOPIC,
LMQCPP_PUBLISH_VH_PKT_ID,
LMQCPP_PUBACK_PACKET = LMQCP_PUBACK << 4,
LMQCPP_PUBACK_VH_PKT_ID,
LMQCPP_PUBACK_PROPERTIES_LEN_VBI,
LMQCPP_SUBACK_PACKET = LMQCP_STOC_SUBACK << 4,
LMQCPP_SUBACK_VH_PKT_ID,
LMQCPP_SUBACK_PAYLOAD,
LMQCPP_UNSUBACK_PACKET = LMQCP_STOC_UNSUBACK << 4,
LMQCPP_UNSUBACK_VH_PKT_ID,
LMQCPP_PINGRESP_ZERO = LMQCP_STOC_PINGRESP << 4,
LMQCPP_PAYLOAD,
LMQCPP_EAT_PROPERTIES_AND_COMPLETE,
LMQCPP_PROP_ID_VBI,
/* all possible property payloads */
/* 3.3.2.3.2 */
LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE = 0x101,
LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE = 0x102,
LMQCPP_PROP_CONTENT_TYPE_UTF8S = 0x103,
LMQCPP_PROP_RESPONSE_TOPIC_UTF8S = 0x108,
LMQCPP_PROP_CORRELATION_BINDATA = 0x109,
LMQCPP_PROP_SUBSCRIPTION_ID_VBI = 0x10b,
LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE = 0x111,
LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S = 0x112,
LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE = 0x113,
LMQCPP_PROP_AUTH_METHOD_UTF8S = 0x115,
LMQCPP_PROP_AUTH_DATA_BINDATA = 0x116,
LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE = 0x117,
LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE = 0x118,
LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE = 0x119,
LMQCPP_PROP_RESPONSE_INFO_UTF8S = 0x11a,
LMQCPP_PROP_SERVER_REFERENCE_UTF8S = 0x11c,
LMQCPP_PROP_REASON_STRING_UTF8S = 0x11f,
LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE = 0x121,
LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE = 0x122,
LMQCPP_PROP_TOPIC_ALIAS_2BYTE = 0x123,
LMQCPP_PROP_MAXIMUM_QOS_1BYTE = 0x124,
LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE = 0x125,
LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S = 0x126,
LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S = 0x226,
LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE = 0x127,
LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE = 0x128,
LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE = 0x129,
LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE = 0x12a,
} lws_mqtt_packet_parse_state_t;
/*
* the states an MQTT connection can be in
*/
typedef enum {
LGSMQTT_UNKNOWN,
LGSMQTT_IDLE,
LGSMQTT_TRANSPORT_CONNECTED,
LGSMQTT_SENT_CONNECT,
LGSMQTT_ESTABLISHED,
LGSMQTT_SENT_SUBSCRIBE,
LGSMQTT_SUBSCRIBED,
} lwsgs_mqtt_states_t;
typedef struct lws_mqtt_parser_st {
/* lws_mqtt_str_t s_content_type; */
lws_mqtt_packet_parse_state_t state;
lws_mqtt_vbi vbit;
lws_mqtt_reason_t reason;
lws_mqtt_str_t s_temp;
uint8_t fixed_seen[4];
uint8_t props_seen[8];
uint8_t cpkt_flags;
uint32_t cpkt_remlen;
uint32_t props_len;
uint32_t consumed;
uint32_t prop_id;
uint32_t props_consumed;
uint32_t payload_consumed;
uint16_t keepalive;
uint16_t cpkt_id;
uint32_t n;
uint8_t temp[32];
uint8_t conn_rc;
uint8_t payload_format;
uint8_t packet_type_flags;
uint8_t conn_protocol_version;
uint8_t fixed;
uint8_t flag_pending_send_connack_close:1;
uint8_t flag_pending_send_reason_close:1;
uint8_t flag_prop_multi:1;
uint8_t flag_server:1;
} lws_mqtt_parser_t;
typedef struct lws_mqtt_subs {
struct lws_mqtt_subs *next;
uint8_t ref_count; /* number of children referencing */
/* subscription name + NUL overallocated here */
char topic[];
} lws_mqtt_subs_t;
typedef struct lws_mqtts {
lws_mqtt_parser_t par;
lwsgs_mqtt_states_t estate;
struct lws_dll2 active_session_list_head;
struct lws_dll2 limbo_session_list_head;
} lws_mqtts_t;
typedef struct lws_mqttc {
lws_mqtt_parser_t par;
lwsgs_mqtt_states_t estate;
lws_mqtt_str_t *id;
lws_mqtt_str_t *username;
lws_mqtt_str_t *password;
struct {
lws_mqtt_str_t *topic;
lws_mqtt_str_t *message;
lws_mqtt_qos_levels_t qos;
uint8_t retain;
} will;
uint16_t keep_alive_secs;
uint8_t conn_flags;
} lws_mqttc_t;
struct _lws_mqtt_related {
lws_mqttc_t client;
lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
struct lws *wsi; /**< so sul can use lws_container_of */
lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */
void *rx_cpkt_param;
uint16_t pkt_id;
uint16_t ack_pkt_id;
uint16_t sub_size;
#if defined(LWS_WITH_CLIENT)
uint8_t send_pingreq:1;
uint8_t session_resumed:1;
#endif
uint8_t inside_payload:1;
uint8_t inside_subscribe:1;
uint8_t inside_unsubscribe:1;
uint8_t send_puback:1;
uint8_t unacked_publish:1;
uint8_t done_subscribe:1;
};
/*
* New sessions are created by starting CONNECT. If the ClientID sent
* by the client matches a different, extant session, then the
* existing one is taken over and the new one created for duration of
* CONNECT processing is destroyed.
*
* On the server side, bearing in mind multiple simultaneous,
* fragmented CONNECTs may be interleaved ongoing, all state and
* parsing temps for a session must live in the session object.
*/
struct lws_mqtt_endpoint_st;
typedef struct lws_mqtts_session_st {
struct lws_dll2 session_list;
} lws_mqtts_session_t;
#define ctl_pkt_type(x) (x->packet_type_flags >> 4)
void
lws_mqttc_state_transition(lws_mqttc_t *ep, lwsgs_mqtt_states_t s);
int
_lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
const uint8_t *buf, size_t len);
int
lws_mqtt_client_socket_service(struct lws *wsi, struct lws_pollfd *pollfd,
struct lws *wsi_conn);
int
lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
struct lws *wsi);
struct lws *
lws_mqtt_client_send_connect(struct lws *wsi);
int
lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
uint8_t dup, lws_mqtt_qos_levels_t qos,
uint8_t retain);
struct lws *
lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi);
lws_mqtt_subs_t *
lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic);
#endif /* _PRIVATE_LIB_ROLES_MQTT */

View file

@ -110,6 +110,8 @@ enum lwsi_state {
LRS_H2_AWAIT_PREFACE = LWSIFS_NOT_EST | 12,
LRS_H2_AWAIT_SETTINGS = LWSIFS_NOT_EST |
LWSIFS_POCB | 13,
LRS_MQTTC_IDLE = LWSIFS_POCB | 33,
LRS_MQTTC_AWAIT_CONNACK = 34,
/* Phase 5: protocol logically established */
@ -125,6 +127,7 @@ enum lwsi_state {
LRS_BODY = 23,
LRS_DISCARD_BODY = 24,
LRS_ESTABLISHED = LWSIFS_POCB | 25,
/* we are established, but we have embarked on serving a single
* transaction. Other transaction input may be pending, but we will
* not service it while we are busy dealing with the current
@ -317,10 +320,17 @@ extern const struct lws_role_ops role_ops_raw_skt, role_ops_raw_file,
#define lwsi_role_raw_proxy(wsi) (0)
#endif
#if defined(LWS_ROLE_MQTT)
#include "mqtt/private-lib-roles-mqtt.h"
#else
#define lwsi_role_mqtt(wsi) (0)
#endif
enum {
LWS_HP_RET_BAIL_OK,
LWS_HP_RET_BAIL_DIE,
LWS_HP_RET_USER_SERVICE,
LWS_HP_RET_DROP_POLLOUT,
LWS_HPI_RET_WSI_ALREADY_DIED, /* we closed it */
LWS_HPI_RET_HANDLED, /* no probs */

View file

@ -0,0 +1,4 @@
|name|demonstrates|
---|---
minimal-mqtt-client|Simple demo for mqtt client operation
minimal-mqtt-client-multi|Demonstrates automatic binding / muxing of independent connections to share a single tcp / tls connection

View file

@ -0,0 +1,79 @@
cmake_minimum_required(VERSION 2.8)
include(CheckCSourceCompiles)
set(SAMP lws-minimal-mqtt-client-multi)
set(SRCS minimal-mqtt-client-multi.c)
# If we are being built as part of lws, confirm current build config supports
# reqconfig, else skip building ourselves.
#
# If we are being built externally, confirm installed lws was configured to
# support reqconfig, else error out with a helpful message about the problem.
#
MACRO(require_lws_config reqconfig _val result)
if (DEFINED ${reqconfig})
if (${reqconfig})
set (rq 1)
else()
set (rq 0)
endif()
else()
set(rq 0)
endif()
if (${_val} EQUAL ${rq})
set(SAME 1)
else()
set(SAME 0)
endif()
if (LWS_WITH_MINIMAL_EXAMPLES AND NOT ${SAME})
if (${_val})
message("${SAMP}: skipping as lws being built without ${reqconfig}")
else()
message("${SAMP}: skipping as lws built with ${reqconfig}")
endif()
set(${result} 0)
else()
if (LWS_WITH_MINIMAL_EXAMPLES)
set(MET ${SAME})
else()
CHECK_C_SOURCE_COMPILES("#include <libwebsockets.h>\nint main(void) {\n#if defined(${reqconfig})\n return 0;\n#else\n fail;\n#endif\n return 0;\n}\n" HAS_${reqconfig})
if (NOT DEFINED HAS_${reqconfig} OR NOT HAS_${reqconfig})
set(HAS_${reqconfig} 0)
else()
set(HAS_${reqconfig} 1)
endif()
if ((HAS_${reqconfig} AND ${_val}) OR (NOT HAS_${reqconfig} AND NOT ${_val}))
set(MET 1)
else()
set(MET 0)
endif()
endif()
if (NOT MET)
if (${_val})
message(FATAL_ERROR "This project requires lws must have been configured with ${reqconfig}")
else()
message(FATAL_ERROR "Lws configuration of ${reqconfig} is incompatible with this project")
endif()
endif()
endif()
ENDMACRO()
set(requirements 1)
require_lws_config(LWS_ROLE_MQTT 1 requirements)
require_lws_config(LWS_WITH_CLIENT 1 requirements)
if (requirements)
add_executable(${SAMP} ${SRCS})
if (websockets_shared)
target_link_libraries(${SAMP} websockets_shared)
add_dependencies(${SAMP} websockets_shared)
else()
target_link_libraries(${SAMP} websockets)
endif()
endif()

View file

@ -0,0 +1,24 @@
# lws minimal MQTT client multi
## build
```
$ cmake . && make
```
## usage
The application goes to https://warmcat.com and receives the page data
same as minimal http client.
However it does it for 8 client connections concurrently.
## Commandline Options
Option|Meaning
---|---
-c <conns>|Count of simultaneous connections (default 8)
-s|Stagger the connections by 100ms, the last by 1s
-p|Use stream binding

View file

@ -0,0 +1,437 @@
/*
* lws-minimal-mqtt-client
*
* Written in 2010-2020 by Andy Green <andy@warmcat.com>
* Sakthi Kannan <saktr@amazon.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <assert.h>
#define COUNT 8
struct test_item {
struct lws_context *context;
struct lws *wsi;
lws_sorted_usec_list_t sul;
} items[COUNT];
enum {
STATE_SUBSCRIBE, /* subscribe to the topic */
STATE_WAIT_SUBACK,
STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
STATE_UNSUBSCRIBE,
STATE_WAIT_UNSUBACK,
STATE_TEST_FINISH
};
static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
done, count = COUNT;
static const lws_retry_bo_t retry = {
.secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
.secs_since_valid_hangup = 25, /* hangup if still idle secs */
};
static const lws_mqtt_client_connect_param_t client_connect_param = {
.client_id = NULL,
.keep_alive = 60,
.clean_start = 1,
.will_param = {
.topic = "good/bye",
.message = "sign-off",
.qos = 0,
.retain = 0,
},
.username = "lwsUser",
.password = "mySecretPassword",
};
static lws_mqtt_topic_elem_t topics[] = {
[0] = { .name = "test/topic0", .qos = QOS0 },
[1] = { .name = "test/topic1", .qos = QOS1 },
};
static lws_mqtt_subscribe_param_t sub_param = {
.topic = &topics[0],
.num_topics = LWS_ARRAY_SIZE(topics),
};
static const char * const test_string =
"No one would have believed in the last years of the nineteenth "
"century that this world was being watched keenly and closely by "
"intelligences greater than man's and yet as mortal as his own; that as "
"men busied themselves about their various concerns they were "
"scrutinised and studied, perhaps almost as narrowly as a man with a "
"microscope might scrutinise the transient creatures that swarm and "
"multiply in a drop of water. With infinite complacency men went to "
"and fro over this globe about their little affairs, serene in their "
"assurance of their empire over matter. It is possible that the "
"infusoria under the microscope do the same. No one gave a thought to "
"the older worlds of space as sources of human danger, or thought of "
"them only to dismiss the idea of life upon them as impossible or "
"improbable. It is curious to recall some of the mental habits of "
"those departed days. At most terrestrial men fancied there might be "
"other men upon Mars, perhaps inferior to themselves and ready to "
"welcome a missionary enterprise. Yet across the gulf of space, minds "
"that are to our minds as ours are to those of the beasts that perish, "
"intellects vast and cool and unsympathetic, regarded this earth with "
"envious eyes, and slowly and surely drew their plans against us. And "
"early in the twentieth century came the great disillusionment. ";
/* this reflects the length of the string above */
#define TEST_STRING_LEN 1337
struct pss {
lws_mqtt_publish_param_t pub_param;
int state;
size_t pos;
int retries;
};
static void
sigint_handler(int sig)
{
interrupted = 1;
}
static int
connect_client(struct lws_context *context, struct test_item *item)
{
struct lws_client_connect_info i;
memset(&i, 0, sizeof i);
i.mqtt_cp = &client_connect_param;
i.opaque_user_data = item;
i.protocol = "test-mqtt";
i.address = "localhost";
i.host = "localhost";
i.pwsi = &item->wsi;
i.context = context;
i.method = "MQTT";
i.alpn = "mqtt";
i.port = 1883;
if (do_ssl) {
i.ssl_connection = LCCSCF_USE_SSL;
i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
i.port = 8883;
}
if (pipeline)
i.ssl_connection |= LCCSCF_PIPELINE;
if (!lws_client_connect_via_info(&i)) {
lwsl_err("%s: Client Connect Failed\n", __func__);
return 1;
}
return 0;
}
static void
start_conn(struct lws_sorted_usec_list *sul)
{
struct test_item *item = lws_container_of(sul, struct test_item, sul);
lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
if (connect_client(item->context, item))
interrupted = 1;
}
static int
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
int current, int target)
{
struct lws_context *context = mgr->parent;
int n;
if (current != LWS_SYSTATE_OPERATIONAL ||
target != LWS_SYSTATE_OPERATIONAL)
return 0;
/*
* We delay trying to do the client connection until the protocols have
* been initialized for each vhost... this happens after we have network
* and time so we can judge tls cert validity.
*
* Stagger the connection attempts so we get some joining before the
* first has connected and some afterwards
*/
for (n = 0; n < count; n++) {
items[n].context = context;
lws_sul_schedule(context, 0, &items[n].sul, start_conn,
n * stagger_us);
}
return 0;
}
static int
callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
struct pss *pss = (struct pss *)user;
lws_mqtt_publish_param_t *pub;
size_t chunk;
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
in ? (char *)in : "(null)");
if (++done == count)
goto finish_test;
break;
case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
if (++done == count)
goto finish_test;
break;
case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
lws_callback_on_writable(wsi);
return 0;
case LWS_CALLBACK_MQTT_SUBSCRIBED:
lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
/* then we can get on with the actual test part */
pss->state++;
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
__func__, (int)(item - &item[0]), wsi);
okay++;
if (++pss->state == STATE_TEST_FINISH) {
lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
__func__, (int)(item - &items[0]), okay, count);
/* We are done, request to close */
return -1;
}
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
/*
* Extra WRITEABLE may appear here other than ones we asked
* for, so we must consult our own state to decide if we want
* to make use of the opportunity
*/
switch (pss->state) {
case STATE_SUBSCRIBE:
lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
lwsl_notice("%s: subscribe failed\n", __func__);
return -1;
}
pss->state++;
break;
case STATE_PUBLISH_QOS0:
case STATE_PUBLISH_QOS1:
lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ?
"test/topic0" : "test/topic1";
pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
pss->pub_param.qos =
pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
pss->pub_param.payload_len = TEST_STRING_LEN;
/* We send the message out 300 bytes or less at at time */
chunk = 300;
if (chunk > TEST_STRING_LEN - pss->pos)
chunk = TEST_STRING_LEN - pss->pos;
lwsl_notice("%s: sending %d at +%d\n", __func__,
(int)chunk, (int)pss->pos);
if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
test_string + pss->pos, chunk,
(pss->pos + chunk == TEST_STRING_LEN))) {
lwsl_notice("%s: publish failed\n", __func__);
return -1;
}
pss->pos += chunk;
if (pss->pos == TEST_STRING_LEN) {
lwsl_debug("%s: sent message\n", __func__);
pss->pos = 0;
pss->state++;
}
break;
case STATE_UNSUBSCRIBE:
lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
__func__, (int)(item - &item[0]), wsi);
pss->state++;
if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
lwsl_notice("%s: subscribe failed\n", __func__);
return -1;
}
break;
default:
break;
}
return 0;
case LWS_CALLBACK_MQTT_ACK:
lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
/*
* We can forget about the message we just sent, it's done.
*
* For our test, that's the indication we can close the wsi.
*/
pss->state++;
if (pss->state != STATE_TEST_FINISH) {
lws_callback_on_writable(wsi);
break;
}
break;
case LWS_CALLBACK_MQTT_RESEND:
lwsl_user("%s: MQTT_RESEND\n", __func__);
/*
* We must resend the packet ID mentioned in len
*/
if (++pss->retries == 3) {
lwsl_notice("%s: too many retries\n", __func__);
return 1; /* kill the connection */
}
pss->state--;
pss->pos = 0;
break;
case LWS_CALLBACK_MQTT_CLIENT_RX:
pub = (lws_mqtt_publish_param_t *)in;
assert(pub);
lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
(int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
(int)pub->payload_len, (int)len);
//lwsl_hexdump_info(pub->payload, len);
return 0;
default:
break;
}
return 0;
finish_test:
interrupted = 1;
lws_cancel_service(lws_get_context(wsi));
return 0;
}
static const struct lws_protocols protocols[] = {
{
.name = "test-mqtt",
.callback = callback_mqtt,
.per_session_data_size = sizeof(struct pss)
},
{ NULL, NULL, 0, 0 }
};
int main(int argc, const char **argv)
{
lws_state_notify_link_t notifier = { {}, system_notify_cb, "app" };
lws_state_notify_link_t *na[] = { &notifier, NULL };
struct lws_context_creation_info info;
struct lws_context *context;
const char *p;
int n = 0;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
lws_cmdline_option_handle_builtin(argc, argv, &info);
do_ssl = !!lws_cmdline_option(argc, argv, "-s");
if (do_ssl)
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
if (lws_cmdline_option(argc, argv, "-p"))
pipeline = 1;
if ((p = lws_cmdline_option(argc, argv, "-i")))
stagger_us = atoi(p);
if ((p = lws_cmdline_option(argc, argv, "-c")))
count = atoi(p);
if (count > COUNT) {
count = COUNT;
lwsl_err("%s: clipped count at max %d\n", __func__, count);
}
lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
do_ssl ? "tls enabled": "unencrypted");
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.register_notifier_list = na;
info.fd_limit_per_thread = 1 + COUNT + 1;
info.retry_and_idle_policy = &retry;
#if defined(LWS_WITH_MBEDTLS)
/*
* OpenSSL uses the system trust store. mbedTLS has to be told which
* CA to trust explicitly.
*/
info.client_ssl_ca_filepath = "./mosq-ca.crt";
#endif
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
/* Event loop */
while (n >= 0 && !interrupted)
n = lws_service(context, 0);
lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
okay != count ? "failed" : "OK");
lws_context_destroy(context);
return okay != count;
}

View file

@ -0,0 +1,32 @@
#!/bin/bash
#
# $1: path to minimal example binaries...
# if lws is built with -DLWS_WITH_MINIMAL_EXAMPLES=1
# that will be ./bin from your build dir
#
# $2: path for logs and results. The results will go
# in a subdir named after the directory this script
# is in
#
# $3: offset for test index count
#
# $4: total test count
#
# $5: path to ./minimal-examples dir in lws
#
# Test return code 0: OK, 254: timed out, other: error indication
. $5/selftests-library.sh
COUNT_TESTS=1
#dotest $1 $2 warmcat
Q=`which mosquitto`
spawn "" /tmp $Q -v
dotest $1 $2 -p-i100000 -p -i 100000
kill $SPID 2>/dev/null
wait $SPID 2>/dev/null
exit $FAILS

View file

@ -0,0 +1,58 @@
-----BEGIN CERTIFICATE-----
MIIFUDCCBDigAwIBAgISA4mJfIm3iCGbU9+o8YQa+4nUMA0GCSqGSIb3DQEBCwUA
MEoxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MSMwIQYDVQQD
ExpMZXQncyBFbmNyeXB0IEF1dGhvcml0eSBYMzAeFw0xOTA5MDcwNzA5MjNaFw0x
OTEyMDYwNzA5MjNaMBYxFDASBgNVBAMTC3dhcm1jYXQuY29tMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwnEoH9JW3GvpadpxHGZPb5wv1Q6KfAIMWtdq
YCOfotFxaYULuzHVxmrTTgmEqJr+eBqUBkXKmGuRR/9UipOmTu5j02qFyWHotFdF
ZGyp//8z+Rle9Qt1nL68oNIZLDtWkybh5x00b1uo4eyEszXUaa0aLqKP3lH7Q4jI
aSVARZ8snrJR640Gp3ByudvNTYkGz469bpWzRC/8wSNtzzY02DvHs1GxQx9tMXw+
BbtUxeP7lpYFKEFBjgZaIKLv+4g8ItJIuO7gMSzG2JfpQHxdhrlhxpx7dsaMUcyM
nnYXysNL5JG3KEMhkxbtdpCaEQ8jLSPbl/rnF/+mgce+lSjMuQIDAQABo4ICYjCC
Al4wDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcD
AjAMBgNVHRMBAf8EAjAAMB0GA1UdDgQWBBSI9ai12zLFeNTEDHKI9Ghkqcpa2TAf
BgNVHSMEGDAWgBSoSmpjBH3duubRObemRWXv86jsoTBvBggrBgEFBQcBAQRjMGEw
LgYIKwYBBQUHMAGGImh0dHA6Ly9vY3NwLmludC14My5sZXRzZW5jcnlwdC5vcmcw
LwYIKwYBBQUHMAKGI2h0dHA6Ly9jZXJ0LmludC14My5sZXRzZW5jcnlwdC5vcmcv
MBYGA1UdEQQPMA2CC3dhcm1jYXQuY29tMEwGA1UdIARFMEMwCAYGZ4EMAQIBMDcG
CysGAQQBgt8TAQEBMCgwJgYIKwYBBQUHAgEWGmh0dHA6Ly9jcHMubGV0c2VuY3J5
cHQub3JnMIIBBgYKKwYBBAHWeQIEAgSB9wSB9ADyAHcAY/Lbzeg7zCzPC3KEJ1dr
M6SNYXePvXWmOLHHaFRL2I0AAAFtCsVHHAAABAMASDBGAiEAy0q1cR4VwPL3iviL
cBWN67kjJRXk+DwhodmeoM3kb3gCIQC2soAHFs0Umo+0RNdFrL41+hMuidh2cXbb
Ovc6nh5tOQB3AOJpS64m6OlACeiGG7Y7g9Q+5/50iPukjyiTAZ3d8dv+AAABbQrF
R48AAAQDAEgwRgIhANqKQm4t9by263CJ7/DLOaZCjtcK29KgJjPwhv08UMn1AiEA
h35nGTASR8/E7xz+56ZUleqD7U1ABFgWZRZskIzsFO8wDQYJKoZIhvcNAQELBQAD
ggEBADDJBVbKe2LPHmi8k2vxErB3Y0Ty+3gwgPEXKYtEvQ7tos89eE+QmOXAzH5J
GwRarFf7kzmKeJv04tMebiEtshpap47oJfxCxfrtpja8hP8Cdu/v/Ae6eEzu3yet
0N08GJdxQKfgCFaoGUptbaF2RCIZS12SVcX4TPpdP+xaiZdmIx4dGM6tReQ8+y8B
10b4Hi2+d/zW0W1z6+FAemU6yleWriJDUik5oas9XZF5LAAMDb/WgF5eIB6P9CUG
LuAO8lWlk9nBgXvMLTxZ74SJb17H4kFEIrIjvABNshz5gBW8xw9nfr5YIfANtwEj
BDsq06Df3UORYVs/j3T97gPAEZ4=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIEkjCCA3qgAwIBAgIQCgFBQgAAAVOFc2oLheynCDANBgkqhkiG9w0BAQsFADA/
MSQwIgYDVQQKExtEaWdpdGFsIFNpZ25hdHVyZSBUcnVzdCBDby4xFzAVBgNVBAMT
DkRTVCBSb290IENBIFgzMB4XDTE2MDMxNzE2NDA0NloXDTIxMDMxNzE2NDA0Nlow
SjELMAkGA1UEBhMCVVMxFjAUBgNVBAoTDUxldCdzIEVuY3J5cHQxIzAhBgNVBAMT
GkxldCdzIEVuY3J5cHQgQXV0aG9yaXR5IFgzMIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEAnNMM8FrlLke3cl03g7NoYzDq1zUmGSXhvb418XCSL7e4S0EF
q6meNQhY7LEqxGiHC6PjdeTm86dicbp5gWAf15Gan/PQeGdxyGkOlZHP/uaZ6WA8
SMx+yk13EiSdRxta67nsHjcAHJyse6cF6s5K671B5TaYucv9bTyWaN8jKkKQDIZ0
Z8h/pZq4UmEUEz9l6YKHy9v6Dlb2honzhT+Xhq+w3Brvaw2VFn3EK6BlspkENnWA
a6xK8xuQSXgvopZPKiAlKQTGdMDQMc2PMTiVFrqoM7hD8bEfwzB/onkxEz0tNvjj
/PIzark5McWvxI0NHWQWM6r6hCm21AvA2H3DkwIDAQABo4IBfTCCAXkwEgYDVR0T
AQH/BAgwBgEB/wIBADAOBgNVHQ8BAf8EBAMCAYYwfwYIKwYBBQUHAQEEczBxMDIG
CCsGAQUFBzABhiZodHRwOi8vaXNyZy50cnVzdGlkLm9jc3AuaWRlbnRydXN0LmNv
bTA7BggrBgEFBQcwAoYvaHR0cDovL2FwcHMuaWRlbnRydXN0LmNvbS9yb290cy9k
c3Ryb290Y2F4My5wN2MwHwYDVR0jBBgwFoAUxKexpHsscfrb4UuQdf/EFWCFiRAw
VAYDVR0gBE0wSzAIBgZngQwBAgEwPwYLKwYBBAGC3xMBAQEwMDAuBggrBgEFBQcC
ARYiaHR0cDovL2Nwcy5yb290LXgxLmxldHNlbmNyeXB0Lm9yZzA8BgNVHR8ENTAz
MDGgL6AthitodHRwOi8vY3JsLmlkZW50cnVzdC5jb20vRFNUUk9PVENBWDNDUkwu
Y3JsMB0GA1UdDgQWBBSoSmpjBH3duubRObemRWXv86jsoTANBgkqhkiG9w0BAQsF
AAOCAQEA3TPXEfNjWDjdGBX7CVW+dla5cEilaUcne8IkCJLxWh9KEik3JHRRHGJo
uM2VcGfl96S8TihRzZvoroed6ti6WqEBmtzw3Wodatg+VyOeph4EYpr/1wXKtx8/
wApIvJSwtmVi4MFU5aMqrSDE6ea73Mj2tcMyo5jMd6jmeWUHK8so/joWUoHOUgwu
X4Po1QYz+3dszkDqMp4fklxBwXRsW10KXzPMTZ+sOPAveyxindmjkW8lGy+QsRlG
PfZ+G6Z6h7mjem0Y+iWlkYcV4PIWL1iwBi8saCbGS5jN2p8M+X+Q7UNKEkROb3N6
KOqkqm57TH2H3eDJAkSnh6/DNFu0Qg==
-----END CERTIFICATE-----

View file

@ -0,0 +1,11 @@
--2018-11-25 07:54:30-- https://www.gravatar.com/avatar/c50933ca2aa61e0fe2c43d46bb6b59cb/?s=128
Resolving www.gravatar.com (www.gravatar.com)... 192.0.73.2, 2a04:fa87:fffe::c000:4902
Connecting to www.gravatar.com (www.gravatar.com)|192.0.73.2|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24761 (24K) [image/png]
Saving to: /tmp/q
/tmp/q 0%[ ] 0 --.-KB/s /tmp/q 100%[================================>] 24.18K --.-KB/s in 0.01s
2018-11-25 07:54:31 (2.04 MB/s) - /tmp/q saved [24761/24761]

View file

@ -0,0 +1,79 @@
cmake_minimum_required(VERSION 2.8)
include(CheckCSourceCompiles)
set(SAMP lws-minimal-mqtt-client)
set(SRCS minimal-mqtt-client.c)
# If we are being built as part of lws, confirm current build config supports
# reqconfig, else skip building ourselves.
#
# If we are being built externally, confirm installed lws was configured to
# support reqconfig, else error out with a helpful message about the problem.
#
MACRO(require_lws_config reqconfig _val result)
if (DEFINED ${reqconfig})
if (${reqconfig})
set (rq 1)
else()
set (rq 0)
endif()
else()
set(rq 0)
endif()
if (${_val} EQUAL ${rq})
set(SAME 1)
else()
set(SAME 0)
endif()
if (LWS_WITH_MINIMAL_EXAMPLES AND NOT ${SAME})
if (${_val})
message("${SAMP}: skipping as lws being built without ${reqconfig}")
else()
message("${SAMP}: skipping as lws built with ${reqconfig}")
endif()
set(${result} 0)
else()
if (LWS_WITH_MINIMAL_EXAMPLES)
set(MET ${SAME})
else()
CHECK_C_SOURCE_COMPILES("#include <libwebsockets.h>\nint main(void) {\n#if defined(${reqconfig})\n return 0;\n#else\n fail;\n#endif\n return 0;\n}\n" HAS_${reqconfig})
if (NOT DEFINED HAS_${reqconfig} OR NOT HAS_${reqconfig})
set(HAS_${reqconfig} 0)
else()
set(HAS_${reqconfig} 1)
endif()
if ((HAS_${reqconfig} AND ${_val}) OR (NOT HAS_${reqconfig} AND NOT ${_val}))
set(MET 1)
else()
set(MET 0)
endif()
endif()
if (NOT MET)
if (${_val})
message(FATAL_ERROR "This project requires lws must have been configured with ${reqconfig}")
else()
message(FATAL_ERROR "Lws configuration of ${reqconfig} is incompatible with this project")
endif()
endif()
endif()
ENDMACRO()
set(requirements 1)
require_lws_config(LWS_ROLE_MQTT 1 requirements)
require_lws_config(LWS_WITH_CLIENT 1 requirements)
if (requirements)
add_executable(${SAMP} ${SRCS})
if (websockets_shared)
target_link_libraries(${SAMP} websockets_shared)
add_dependencies(${SAMP} websockets_shared)
else()
target_link_libraries(${SAMP} websockets)
endif()
endif()

View file

@ -0,0 +1,51 @@
# lws minimal MQTT client
The application connects to a broker at localhost 1883 (unencrypted) or
8883 (tls)
## build
```
$ cmake . && make
```
## usage
Commandline option|Meaning
---|---
-d <loglevel>|Debug verbosity in decimal, eg, -d15
-s| Use tls and connect to port 8883 instead of 1883
Start mosquitto server locally
```
$ mosquitto
```
Run the example
```
[2020/01/31 10:40:23:7789] U: LWS minimal MQTT client unencrypted [-d<verbosity>][-s]
[2020/01/31 10:40:23:8539] N: lws_mqtt_generate_id: User space provided a client ID 'lwsMqttClient'
[2020/01/31 10:40:23:9893] N: _lws_mqtt_rx_parser: migrated nwsi 0x50febd0 to sid 1 0x5106820
[2020/01/31 10:40:23:9899] U: callback_mqtt: MQTT_CLIENT_ESTABLISHED
[2020/01/31 10:40:23:9967] U: callback_mqtt: WRITEABLE: Subscribing
[2020/01/31 10:40:24:0068] U: callback_mqtt: MQTT_SUBSCRIBED
```
Send something to the test client
```
mosquitto_pub -h 127.0.0.1 -p 1883 -t test/topic0 -m "hello"
```
Observe it received at the test client
```
[2020/01/31 10:40:27:1845] U: callback_mqtt: MQTT_CLIENT_RX
[2020/01/31 10:40:27:1870] N:
[2020/01/31 10:40:27:1945] N: 0000: 74 65 73 74 2F 74 6F 70 69 63 30 test/topic0
[2020/01/31 10:40:27:1952] N:
```

View file

@ -0,0 +1,343 @@
/*
* lws-minimal-mqtt-client
*
* Written in 2010-2020 by Andy Green <andy@warmcat.com>
* Sakthi Kannan <saktr@amazon.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <assert.h>
enum {
STATE_SUBSCRIBE, /* subscribe to the topic */
STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
STATE_TEST_FINISH
};
static int interrupted, bad = 1, do_ssl;
static const lws_retry_bo_t retry = {
.secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
.secs_since_valid_hangup = 25, /* hangup if still idle secs */
};
static const lws_mqtt_client_connect_param_t client_connect_param = {
.client_id = "lwsMqttClient",
.keep_alive = 60,
.clean_start = 1,
.will_param = {
.topic = "good/bye",
.message = "sign-off",
.qos = 0,
.retain = 0,
},
.username = "lwsUser",
.password = "mySecretPassword",
};
static lws_mqtt_publish_param_t pub_param;
static lws_mqtt_topic_elem_t topics[] = {
[0] = { .name = "test/topic0", .qos = QOS0 },
[1] = { .name = "test/topic1", .qos = QOS1 },
};
static lws_mqtt_subscribe_param_t sub_param = {
.topic = &topics[0],
.num_topics = LWS_ARRAY_SIZE(topics),
};
static const char * const test_string =
"No one would have believed in the last years of the nineteenth "
"century that this world was being watched keenly and closely by "
"intelligences greater than man's and yet as mortal as his own; that as "
"men busied themselves about their various concerns they were "
"scrutinised and studied, perhaps almost as narrowly as a man with a "
"microscope might scrutinise the transient creatures that swarm and "
"multiply in a drop of water. With infinite complacency men went to "
"and fro over this globe about their little affairs, serene in their "
"assurance of their empire over matter. It is possible that the "
"infusoria under the microscope do the same. No one gave a thought to "
"the older worlds of space as sources of human danger, or thought of "
"them only to dismiss the idea of life upon them as impossible or "
"improbable. It is curious to recall some of the mental habits of "
"those departed days. At most terrestrial men fancied there might be "
"other men upon Mars, perhaps inferior to themselves and ready to "
"welcome a missionary enterprise. Yet across the gulf of space, minds "
"that are to our minds as ours are to those of the beasts that perish, "
"intellects vast and cool and unsympathetic, regarded this earth with "
"envious eyes, and slowly and surely drew their plans against us. And "
"early in the twentieth century came the great disillusionment. ";
/* this reflects the length of the string above */
#define TEST_STRING_LEN 1337
struct pss {
int state;
size_t pos;
int retries;
};
static void
sigint_handler(int sig)
{
interrupted = 1;
}
static int
connect_client(struct lws_context *context)
{
struct lws_client_connect_info i;
memset(&i, 0, sizeof i);
i.mqtt_cp = &client_connect_param;
i.address = "localhost";
i.host = "localhost";
i.protocol = "mqtt";
i.context = context;
i.method = "MQTT";
i.alpn = "mqtt";
i.port = 1883;
if (do_ssl) {
i.ssl_connection = LCCSCF_USE_SSL;
i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
i.port = 8883;
}
if (!lws_client_connect_via_info(&i)) {
lwsl_err("%s: Client Connect Failed\n", __func__);
return 1;
}
return 0;
}
static int
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
int current, int target)
{
struct lws_context *context = mgr->parent;
if (current != LWS_SYSTATE_OPERATIONAL ||
target != LWS_SYSTATE_OPERATIONAL)
return 0;
/*
* We delay trying to do the client connection until
* the protocols have been initialized for each
* vhost... this happens after we have network and
* time so we can judge tls cert validity.
*/
if (connect_client(context))
interrupted = 1;
return 0;
}
static int
callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct pss *pss = (struct pss *)user;
lws_mqtt_publish_param_t *pub;
size_t chunk;
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
in ? (char *)in : "(null)");
interrupted = 1;
break;
case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
lwsl_user("%s: CLIENT_CLOSED\n", __func__);
interrupted = 1;
break;
case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
lwsl_user("%s: MQTT_CLIENT_ESTABLISHED\n", __func__);
lws_callback_on_writable(wsi);
return 0;
case LWS_CALLBACK_MQTT_SUBSCRIBED:
lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
/*
* Extra WRITEABLE may appear here other than ones we asked
* for, so we must consult our own state to decide if we want
* to make use of the opportunity
*/
switch (pss->state) {
case STATE_SUBSCRIBE:
lwsl_user("%s: WRITEABLE: Subscribing\n", __func__);
if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
lwsl_notice("%s: subscribe failed\n", __func__);
return -1;
}
pss->state++;
break;
case STATE_PUBLISH_QOS0:
case STATE_PUBLISH_QOS1:
lwsl_user("%s: WRITEABLE: Publish\n", __func__);
pub_param.topic = "test/topic";
pub_param.topic_len = (uint16_t)strlen(pub_param.topic);
pub_param.qos = pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
pub_param.payload_len = TEST_STRING_LEN;
/* We send the message out 300 bytes or less at at time */
chunk = 300;
if (chunk > TEST_STRING_LEN - pss->pos)
chunk = TEST_STRING_LEN - pss->pos;
if (lws_mqtt_client_send_publish(wsi, &pub_param,
test_string + pss->pos, chunk,
(pss->pos + chunk == TEST_STRING_LEN)))
return -1;
pss->pos += chunk;
if (pss->pos == TEST_STRING_LEN) {
pss->pos = 0;
pss->state++;
}
break;
default:
break;
}
return 0;
case LWS_CALLBACK_MQTT_ACK:
lwsl_user("%s: MQTT_ACK\n", __func__);
/*
* We can forget about the message we just sent, it's done.
*
* For our test, that's the indication we can close the wsi.
*/
pss->state++;
if (pss->state != STATE_TEST_FINISH)
break;
/* Oh we are done then */
bad = 0;
interrupted = 1;
lws_cancel_service(lws_get_context(wsi));
break;
case LWS_CALLBACK_MQTT_RESEND:
lwsl_user("%s: MQTT_RESEND\n", __func__);
/*
* We must resend the packet ID mentioned in len
*/
if (++pss->retries == 3) {
interrupted = 1;
break;
}
pss->state--;
pss->pos = 0;
break;
case LWS_CALLBACK_MQTT_CLIENT_RX:
lwsl_user("%s: MQTT_CLIENT_RX\n", __func__);
pub = (lws_mqtt_publish_param_t *)in;
assert(pub);
lwsl_hexdump_notice(pub->topic, pub->topic_len);
lwsl_hexdump_notice(pub->payload, pub->payload_len);
return 0;
default:
break;
}
return 0;
}
static const struct lws_protocols protocols[] = {
{
.name = "mqtt",
.callback = callback_mqtt,
.per_session_data_size = sizeof(struct pss)
},
{ NULL, NULL, 0, 0 }
};
int main(int argc, const char **argv)
{
lws_state_notify_link_t notifier = { {}, system_notify_cb, "app" };
lws_state_notify_link_t *na[] = { &notifier, NULL };
struct lws_context_creation_info info;
struct lws_context *context;
int n = 0;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
lws_cmdline_option_handle_builtin(argc, argv, &info);
do_ssl = !!lws_cmdline_option(argc, argv, "-s");
if (do_ssl)
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
do_ssl ? "tls enabled": "unencrypted");
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.register_notifier_list = na;
info.fd_limit_per_thread = 1 + 1 + 1;
info.retry_and_idle_policy = &retry;
#if defined(LWS_WITH_MBEDTLS)
/*
* OpenSSL uses the system trust store. mbedTLS has to be told which
* CA to trust explicitly.
*/
info.client_ssl_ca_filepath = "./mosq-ca.crt";
#endif
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
/* Event loop */
while (n >= 0 && !interrupted)
n = lws_service(context, 0);
lwsl_user("Completed: %s\n", bad ? "failed" : "OK");
lws_context_destroy(context);
return bad;
}

View file

@ -0,0 +1,22 @@
-----BEGIN CERTIFICATE-----
MIIDjzCCAnegAwIBAgIUAVMnfaOq8yiLnvIB/obE689mulMwDQYJKoZIhvcNAQEL
BQAwVjELMAkGA1UEBhMCWFgxFTATBgNVBAcMDERlZmF1bHQgQ2l0eTEcMBoGA1UE
CgwTRGVmYXVsdCBDb21wYW55IEx0ZDESMBAGA1UEAwwJbG9jYWxob3N0MCAXDTE5
MTEyMDA1NTYyNFoYDzIxMTkxMDI3MDU1NjI0WjBWMQswCQYDVQQGEwJYWDEVMBMG
A1UEBwwMRGVmYXVsdCBDaXR5MRwwGgYDVQQKDBNEZWZhdWx0IENvbXBhbnkgTHRk
MRIwEAYDVQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQCyw+kBLg9lCGlBceil0lNqgh7fyguin8IFm5X60bfSJ/pV6i8dZZplVjE+
g75iFEFBYyfn+6bOPdinfQ7Uu+l6t6y2HWbK6MkoypF/g7cdtUFy9s4cUX0467BZ
hMPJUc4UfnD+bYcXoguPJ6/OH84+Ayg6uvm5nJ32pDiXr6gMd5YljdXaJpCeeh4w
O2UBD1HffyPIklIPT59lxv2ZvKnZbE4UE1uaLLvTWiT+X+gA3i0Syxkq5RlZ61DE
3MyIYAUVSf3coNXCSdJ9wrOsGoP+X+T+aDjnFCCnqus3QX3JOHTKf4+tBoF65cNP
mnHXb5/ZQCcR9HMofacalMpjiGb7AgMBAAGjUzBRMB0GA1UdDgQWBBTl3poLE/22
R4RXTMoXPHMlc3QRjzAfBgNVHSMEGDAWgBTl3poLE/22R4RXTMoXPHMlc3QRjzAP
BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQCwWVnNjKRH9CCBv3yT
Djah51q3NH3E+f1IcBZz2c5WbJHxEtP4QC57ou2x3hC7Cur9iOqIO57VW8vnFP2Y
bD9oHb46grsGhwuaSuA2AlFZ5EuUAe2cgEj5/3Ihd3HYsXN3rfRO1PVGN1iRG1sE
xAxENNm6nOS1Ht1Zy5YmMiSPzghcsTnpg44AqsmowbIED75EpumLwY2NbAl9/7JL
EJil3cxEZ8rl2DVWPU3hAwrOfhl/rkQTCcigyPvZvAqsJ9vYhZftrF6njUsqr5kL
KHENu5ySKPNk5gFR17WjWoqT6iEOZN25qyfFhBRzjpCX6zD1gx0sYcVryCnTH5Y4
Drjh
-----END CERTIFICATE-----

View file

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDNTCCAh0CFFu5XIMrh5gPYnjTr8UrXA3UiWqHMA0GCSqGSIb3DQEBCwUAMFYx
CzAJBgNVBAYTAlhYMRUwEwYDVQQHDAxEZWZhdWx0IENpdHkxHDAaBgNVBAoME0Rl
ZmF1bHQgQ29tcGFueSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAgFw0xOTExMjAw
NTU4NTdaGA8yMTE5MTAyNzA1NTg1N1owVjELMAkGA1UEBhMCWFgxFTATBgNVBAcM
DERlZmF1bHQgQ2l0eTEcMBoGA1UECgwTRGVmYXVsdCBDb21wYW55IEx0ZDESMBAG
A1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA
tldZ5yGrBsLR/7G4b48pwQSSG6fp4egiZdeFV7SRNfbMzpuIDlFdZM9zdcoQQrTl
24aVIGwkvfsMD33Hb/D1WW+r8UFnq4CutigwXArXUxoFX6fa0rwEEjuxwG3f7+xm
vb6p/KXomyWcdAUmAvALaDXIUDEc3tH+Hxik5z36YjIqRjH16jhhs/6T8B3xAWuR
jnDknJWv36QruMIyPUqYYkl2zl4VXUKBgWZr31Opm08kb/FrWJ6lQ7912jZC8G2L
rtwZJB/1psBrX3Oj/Quj+BWHmzkosqVae2G5zAhphZ2NMrdSVfxdctNmakH8oTwf
hRas8DE2olW3whUkfKG2DQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQAKEQ7LpPdU
XbJKushJ7wmuljQn3pmW9SjzFMlL9o59KLHWAmxzTDaAm6r3SGgHeSz3ZLwqtJ8I
7pCxQxI6V1ySMkWI1mfi4KPSavxBRaST4o8+YIKJt4c5aLB1seHoghx3Q/jXEGEB
9dFyLMK6u3EhYSletQNeMVGaeK1q/nVZdHNk4LXVIHsXnKlxyMnW3v18iaV3ZhVd
doAWMpnbY91AyCXjOmQrfQaHLL6n3r1Xk2L+cRO3nSor54UIXqIJxHZtj+ZYOy3Z
C5AkQ1yyTTOtEz9WB0Bk2O4ZfNgJO+1MbQSfL0m0YKpuaFnMHD9g5ufUlJGR2aMI
nw1F/oGZoNUl
-----END CERTIFICATE-----

View file

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAtldZ5yGrBsLR/7G4b48pwQSSG6fp4egiZdeFV7SRNfbMzpuI
DlFdZM9zdcoQQrTl24aVIGwkvfsMD33Hb/D1WW+r8UFnq4CutigwXArXUxoFX6fa
0rwEEjuxwG3f7+xmvb6p/KXomyWcdAUmAvALaDXIUDEc3tH+Hxik5z36YjIqRjH1
6jhhs/6T8B3xAWuRjnDknJWv36QruMIyPUqYYkl2zl4VXUKBgWZr31Opm08kb/Fr
WJ6lQ7912jZC8G2LrtwZJB/1psBrX3Oj/Quj+BWHmzkosqVae2G5zAhphZ2NMrdS
VfxdctNmakH8oTwfhRas8DE2olW3whUkfKG2DQIDAQABAoIBACMctwc3CIQIx/+A
7Y8t9lBg3PHOZ89EsDsEQX0eHEhT+iRe9tgq+t0KxaUNAAyYYRrg056mtHyQ90WU
Zu87a0OJqYaPnbL82KfjHUzcGZK7FAXTgOPLqM0KCbSQc+rzjuVC7eDk4eHeYD5H
L4apSskKckRe8LxHm7PJPxf4a1q1EuMEfAyJhh7Tot0oVsG/wABGFUuJVJWXnec1
0ukPowKh9bg7UyEecwyeYGzXqNqvbjhS3J0dBkjG5vfxuVHae2yIeXk6ZNsCw6tO
K8bklmsmbWAFR5SKpsNve8X/6nlclP0taDDZsz0KSbxJEd2DuRhFcdiRWEoryZVp
7DOORFECgYEA5sdsRjQoHaU85QZuM7ff6NpNT7kMIJbjHRdiauEBakLHs8yVLNEp
Vvg5fcZY4PumqPKyGEjUD6DenlLvb4OBGqzKGGhAJaLz9cpVoWWPz8y1NRBfPjlB
FQdB4GdtBQGXwnZoD9kXPjYHlk4nwZZ/Sitm2w6RibiIxE0adnwLhP8CgYEAykTE
5NZ88OGGf0RWUt54OxTl4fChAcvK93KkdlK9nbokXHs7VIl4QpKPFu1nuMDrkVI4
fVYwRDcZUjyxqbpBSf/M6T/kuEsMWBYYGv5c9/U87y0UWHbphN0TSdML2DJp9BTy
uy4RleQovof2kOr6sOsKP8lhBGSlhXyJDKn1iPMCgYEAnpvc7HsYPxe7vGQpBV6Q
g0bV777seNF7EhlqSK6P/GodOpOWyxCN6vn6+ViC6U3Lgz4Z7NrQ9FTJ6+JwMSIe
byjmVNQBklxmcz02kRBuQJEe0XOJIgjTlBJC0moC4Xfwx3P9nTbE5LrZiBH6/O/k
WCNwM4nVuOOdC906HMiwWh0CgYEAqn3m3ODydXQTk2i9vqIpA9vsnVLf1Ay8a3El
sVqy26VQCugQrYQmay7wD6pS2Ec9CMQeO3+PtaAf5tKkCmWlrMNCLIWfu7v+jq0o
6m/nW1ZKY2xDDwJEeaqDHKIZBMYRyxxxMVd2mTq1IUynh6WZY9DqVbPf4/0WC/tZ
5ePIxAMCgYEAwwBNT2xjG1mWD4eANvKjQgrsxKFttmaXXCiixZJR+tsQc5bff5Yb
IgvvkIwLHoNpL2Nk7sEjS4sUtAKwzCtIMwvPnhQedICnOEteZ8NPfaFmPewcovcL
gv9k+mFActZ7H8i9FXLrZHyEzOXZaM/vY/mHbrlJSWnSDZsvnVzQv+o=
-----END RSA PRIVATE KEY-----

View file

@ -8,7 +8,7 @@ then
if [ "$LWS_METHOD" == "lwsws" -o "$LWS_METHOD" == "lwsws2" ];
then
sudo apt-get install -y -qq realpath libjemalloc1 libev4 libuv-dev libdbus-1-dev valgrind
sudo apt-get install -y -qq realpath libjemalloc1 libev4 libuv-dev libdbus-1-dev valgrind mosquitto
sudo apt-get remove python-six
sudo pip install "six>=1.9"
sudo pip install "Twisted==16.0.0"