1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-16 00:00:07 +01:00
libwebsockets/minimal-examples/mqtt-client/minimal-mqtt-client-multi/minimal-mqtt-client-multi.c
Sakthi Kannan 9d099ba7be client: MQTT
Adds client support for MQTT QoS0 and QoS1, compatible with AWS IoT

Supports stream binding where independent client connections to the
same endpoint can mux on a single tcp + tls connection with topic
routing managed internally.
2020-03-04 12:17:49 +00:00

437 lines
11 KiB
C

/*
* lws-minimal-mqtt-client
*
* Written in 2010-2020 by Andy Green <andy@warmcat.com>
* Sakthi Kannan <saktr@amazon.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#include <assert.h>
#define COUNT 8
struct test_item {
struct lws_context *context;
struct lws *wsi;
lws_sorted_usec_list_t sul;
} items[COUNT];
enum {
STATE_SUBSCRIBE, /* subscribe to the topic */
STATE_WAIT_SUBACK,
STATE_PUBLISH_QOS0, /* Send the message in QoS0 */
STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */
STATE_PUBLISH_QOS1, /* Send the message in QoS1 */
STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */
STATE_UNSUBSCRIBE,
STATE_WAIT_UNSUBACK,
STATE_TEST_FINISH
};
static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay,
done, count = COUNT;
static const lws_retry_bo_t retry = {
.secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */
.secs_since_valid_hangup = 25, /* hangup if still idle secs */
};
static const lws_mqtt_client_connect_param_t client_connect_param = {
.client_id = NULL,
.keep_alive = 60,
.clean_start = 1,
.will_param = {
.topic = "good/bye",
.message = "sign-off",
.qos = 0,
.retain = 0,
},
.username = "lwsUser",
.password = "mySecretPassword",
};
static lws_mqtt_topic_elem_t topics[] = {
[0] = { .name = "test/topic0", .qos = QOS0 },
[1] = { .name = "test/topic1", .qos = QOS1 },
};
static lws_mqtt_subscribe_param_t sub_param = {
.topic = &topics[0],
.num_topics = LWS_ARRAY_SIZE(topics),
};
static const char * const test_string =
"No one would have believed in the last years of the nineteenth "
"century that this world was being watched keenly and closely by "
"intelligences greater than man's and yet as mortal as his own; that as "
"men busied themselves about their various concerns they were "
"scrutinised and studied, perhaps almost as narrowly as a man with a "
"microscope might scrutinise the transient creatures that swarm and "
"multiply in a drop of water. With infinite complacency men went to "
"and fro over this globe about their little affairs, serene in their "
"assurance of their empire over matter. It is possible that the "
"infusoria under the microscope do the same. No one gave a thought to "
"the older worlds of space as sources of human danger, or thought of "
"them only to dismiss the idea of life upon them as impossible or "
"improbable. It is curious to recall some of the mental habits of "
"those departed days. At most terrestrial men fancied there might be "
"other men upon Mars, perhaps inferior to themselves and ready to "
"welcome a missionary enterprise. Yet across the gulf of space, minds "
"that are to our minds as ours are to those of the beasts that perish, "
"intellects vast and cool and unsympathetic, regarded this earth with "
"envious eyes, and slowly and surely drew their plans against us. And "
"early in the twentieth century came the great disillusionment. ";
/* this reflects the length of the string above */
#define TEST_STRING_LEN 1337
struct pss {
lws_mqtt_publish_param_t pub_param;
int state;
size_t pos;
int retries;
};
static void
sigint_handler(int sig)
{
interrupted = 1;
}
static int
connect_client(struct lws_context *context, struct test_item *item)
{
struct lws_client_connect_info i;
memset(&i, 0, sizeof i);
i.mqtt_cp = &client_connect_param;
i.opaque_user_data = item;
i.protocol = "test-mqtt";
i.address = "localhost";
i.host = "localhost";
i.pwsi = &item->wsi;
i.context = context;
i.method = "MQTT";
i.alpn = "mqtt";
i.port = 1883;
if (do_ssl) {
i.ssl_connection = LCCSCF_USE_SSL;
i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED;
i.port = 8883;
}
if (pipeline)
i.ssl_connection |= LCCSCF_PIPELINE;
if (!lws_client_connect_via_info(&i)) {
lwsl_err("%s: Client Connect Failed\n", __func__);
return 1;
}
return 0;
}
static void
start_conn(struct lws_sorted_usec_list *sul)
{
struct test_item *item = lws_container_of(sul, struct test_item, sul);
lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0]));
if (connect_client(item->context, item))
interrupted = 1;
}
static int
system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link,
int current, int target)
{
struct lws_context *context = mgr->parent;
int n;
if (current != LWS_SYSTATE_OPERATIONAL ||
target != LWS_SYSTATE_OPERATIONAL)
return 0;
/*
* We delay trying to do the client connection until the protocols have
* been initialized for each vhost... this happens after we have network
* and time so we can judge tls cert validity.
*
* Stagger the connection attempts so we get some joining before the
* first has connected and some afterwards
*/
for (n = 0; n < count; n++) {
items[n].context = context;
lws_sul_schedule(context, 0, &items[n].sul, start_conn,
n * stagger_us);
}
return 0;
}
static int
callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi);
struct pss *pss = (struct pss *)user;
lws_mqtt_publish_param_t *pub;
size_t chunk;
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
in ? (char *)in : "(null)");
if (++done == count)
goto finish_test;
break;
case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi);
if (++done == count)
goto finish_test;
break;
case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi);
lws_callback_on_writable(wsi);
return 0;
case LWS_CALLBACK_MQTT_SUBSCRIBED:
lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__);
/* then we can get on with the actual test part */
pss->state++;
lws_callback_on_writable(wsi);
break;
case LWS_CALLBACK_MQTT_UNSUBSCRIBED:
lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n",
__func__, (int)(item - &item[0]), wsi);
okay++;
if (++pss->state == STATE_TEST_FINISH) {
lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n",
__func__, (int)(item - &items[0]), okay, count);
/* We are done, request to close */
return -1;
}
break;
case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
/*
* Extra WRITEABLE may appear here other than ones we asked
* for, so we must consult our own state to decide if we want
* to make use of the opportunity
*/
switch (pss->state) {
case STATE_SUBSCRIBE:
lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi);
if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) {
lwsl_notice("%s: subscribe failed\n", __func__);
return -1;
}
pss->state++;
break;
case STATE_PUBLISH_QOS0:
case STATE_PUBLISH_QOS1:
lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi);
pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ?
"test/topic0" : "test/topic1";
pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic);
pss->pub_param.qos =
pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1;
pss->pub_param.payload_len = TEST_STRING_LEN;
/* We send the message out 300 bytes or less at at time */
chunk = 300;
if (chunk > TEST_STRING_LEN - pss->pos)
chunk = TEST_STRING_LEN - pss->pos;
lwsl_notice("%s: sending %d at +%d\n", __func__,
(int)chunk, (int)pss->pos);
if (lws_mqtt_client_send_publish(wsi, &pss->pub_param,
test_string + pss->pos, chunk,
(pss->pos + chunk == TEST_STRING_LEN))) {
lwsl_notice("%s: publish failed\n", __func__);
return -1;
}
pss->pos += chunk;
if (pss->pos == TEST_STRING_LEN) {
lwsl_debug("%s: sent message\n", __func__);
pss->pos = 0;
pss->state++;
}
break;
case STATE_UNSUBSCRIBE:
lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n",
__func__, (int)(item - &item[0]), wsi);
pss->state++;
if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) {
lwsl_notice("%s: subscribe failed\n", __func__);
return -1;
}
break;
default:
break;
}
return 0;
case LWS_CALLBACK_MQTT_ACK:
lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state);
/*
* We can forget about the message we just sent, it's done.
*
* For our test, that's the indication we can close the wsi.
*/
pss->state++;
if (pss->state != STATE_TEST_FINISH) {
lws_callback_on_writable(wsi);
break;
}
break;
case LWS_CALLBACK_MQTT_RESEND:
lwsl_user("%s: MQTT_RESEND\n", __func__);
/*
* We must resend the packet ID mentioned in len
*/
if (++pss->retries == 3) {
lwsl_notice("%s: too many retries\n", __func__);
return 1; /* kill the connection */
}
pss->state--;
pss->pos = 0;
break;
case LWS_CALLBACK_MQTT_CLIENT_RX:
pub = (lws_mqtt_publish_param_t *)in;
assert(pub);
lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__,
(int)(item - &items[0]), pub->topic, (int)pub->payload_pos,
(int)pub->payload_len, (int)len);
//lwsl_hexdump_info(pub->payload, len);
return 0;
default:
break;
}
return 0;
finish_test:
interrupted = 1;
lws_cancel_service(lws_get_context(wsi));
return 0;
}
static const struct lws_protocols protocols[] = {
{
.name = "test-mqtt",
.callback = callback_mqtt,
.per_session_data_size = sizeof(struct pss)
},
{ NULL, NULL, 0, 0 }
};
int main(int argc, const char **argv)
{
lws_state_notify_link_t notifier = { {}, system_notify_cb, "app" };
lws_state_notify_link_t *na[] = { &notifier, NULL };
struct lws_context_creation_info info;
struct lws_context *context;
const char *p;
int n = 0;
signal(SIGINT, sigint_handler);
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
lws_cmdline_option_handle_builtin(argc, argv, &info);
do_ssl = !!lws_cmdline_option(argc, argv, "-s");
if (do_ssl)
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
if (lws_cmdline_option(argc, argv, "-p"))
pipeline = 1;
if ((p = lws_cmdline_option(argc, argv, "-i")))
stagger_us = atoi(p);
if ((p = lws_cmdline_option(argc, argv, "-c")))
count = atoi(p);
if (count > COUNT) {
count = COUNT;
lwsl_err("%s: clipped count at max %d\n", __func__, count);
}
lwsl_user("LWS minimal MQTT client %s [-d<verbosity>][-s]\n",
do_ssl ? "tls enabled": "unencrypted");
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
info.protocols = protocols;
info.register_notifier_list = na;
info.fd_limit_per_thread = 1 + COUNT + 1;
info.retry_and_idle_policy = &retry;
#if defined(LWS_WITH_MBEDTLS)
/*
* OpenSSL uses the system trust store. mbedTLS has to be told which
* CA to trust explicitly.
*/
info.client_ssl_ca_filepath = "./mosq-ca.crt";
#endif
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
/* Event loop */
while (n >= 0 && !interrupted)
n = lws_service(context, 0);
lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count,
okay != count ? "failed" : "OK");
lws_context_destroy(context);
return okay != count;
}