/* * lws-minimal-mqtt-client * * Written in 2010-2020 by Andy Green * Sakthi Kannan * * This file is made available under the Creative Commons CC0 1.0 * Universal Public Domain Dedication. */ #include #include #include #if defined(WIN32) #define HAVE_STRUCT_TIMESPEC #if defined(pid_t) #undef pid_t #endif #endif #include #include #define COUNT 8 struct test_item { struct lws_context *context; struct lws *wsi; lws_sorted_usec_list_t sul; } items[COUNT]; enum { STATE_SUBSCRIBE, /* subscribe to the topic */ STATE_WAIT_SUBACK, STATE_PUBLISH_QOS0, /* Send the message in QoS0 */ STATE_WAIT_ACK0, /* Wait for the synthetic "ack" */ STATE_PUBLISH_QOS1, /* Send the message in QoS1 */ STATE_WAIT_ACK1, /* Wait for the real ack (or timeout + retry) */ STATE_UNSUBSCRIBE, STATE_WAIT_UNSUBACK, STATE_TEST_FINISH }; static int interrupted, do_ssl, pipeline, stagger_us = 5000, okay, done, count = COUNT; static const lws_retry_bo_t retry = { .secs_since_valid_ping = 20, /* if idle, PINGREQ after secs */ .secs_since_valid_hangup = 25, /* hangup if still idle secs */ }; static const lws_mqtt_client_connect_param_t client_connect_param = { .client_id = NULL, .keep_alive = 60, .clean_start = 1, .will_param = { .topic = "good/bye", .message = "sign-off", .qos = 0, .retain = 0, }, .username = "lwsUser", .password = "mySecretPassword", }; static lws_mqtt_topic_elem_t topics[] = { [0] = { .name = "test/topic0", .qos = QOS0 }, [1] = { .name = "test/topic1", .qos = QOS1 }, }; static lws_mqtt_subscribe_param_t sub_param = { .topic = &topics[0], .num_topics = LWS_ARRAY_SIZE(topics), }; static const char * const test_string = "No one would have believed in the last years of the nineteenth " "century that this world was being watched keenly and closely by " "intelligences greater than man's and yet as mortal as his own; that as " "men busied themselves about their various concerns they were " "scrutinised and studied, perhaps almost as narrowly as a man with a " "microscope might scrutinise the transient creatures that swarm and " "multiply in a drop of water. With infinite complacency men went to " "and fro over this globe about their little affairs, serene in their " "assurance of their empire over matter. It is possible that the " "infusoria under the microscope do the same. No one gave a thought to " "the older worlds of space as sources of human danger, or thought of " "them only to dismiss the idea of life upon them as impossible or " "improbable. It is curious to recall some of the mental habits of " "those departed days. At most terrestrial men fancied there might be " "other men upon Mars, perhaps inferior to themselves and ready to " "welcome a missionary enterprise. Yet across the gulf of space, minds " "that are to our minds as ours are to those of the beasts that perish, " "intellects vast and cool and unsympathetic, regarded this earth with " "envious eyes, and slowly and surely drew their plans against us. And " "early in the twentieth century came the great disillusionment. "; /* this reflects the length of the string above */ #define TEST_STRING_LEN 1337 struct pss { lws_mqtt_publish_param_t pub_param; int state; size_t pos; int retries; }; static void sigint_handler(int sig) { interrupted = 1; } static int connect_client(struct lws_context *context, struct test_item *item) { struct lws_client_connect_info i; memset(&i, 0, sizeof i); i.mqtt_cp = &client_connect_param; i.opaque_user_data = item; i.protocol = "test-mqtt"; i.address = "localhost"; i.host = "localhost"; i.pwsi = &item->wsi; i.context = context; i.method = "MQTT"; i.alpn = "mqtt"; i.port = 1883; if (do_ssl) { i.ssl_connection = LCCSCF_USE_SSL; i.ssl_connection |= LCCSCF_ALLOW_SELFSIGNED; i.port = 8883; } if (pipeline) i.ssl_connection |= LCCSCF_PIPELINE; if (!lws_client_connect_via_info(&i)) { lwsl_err("%s: Client Connect Failed\n", __func__); return 1; } return 0; } static void start_conn(struct lws_sorted_usec_list *sul) { struct test_item *item = lws_container_of(sul, struct test_item, sul); lwsl_notice("%s: item %d\n", __func__, (int)(item - &items[0])); if (connect_client(item->context, item)) interrupted = 1; } static int system_notify_cb(lws_state_manager_t *mgr, lws_state_notify_link_t *link, int current, int target) { struct lws_context *context = mgr->parent; int n; if (current != LWS_SYSTATE_OPERATIONAL || target != LWS_SYSTATE_OPERATIONAL) return 0; /* * We delay trying to do the client connection until the protocols have * been initialized for each vhost... this happens after we have network * and time so we can judge tls cert validity. * * Stagger the connection attempts so we get some joining before the * first has connected and some afterwards */ for (n = 0; n < count; n++) { items[n].context = context; lws_sul_schedule(context, 0, &items[n].sul, start_conn, n * stagger_us); } return 0; } static int callback_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { struct test_item *item = (struct test_item *)lws_get_opaque_user_data(wsi); struct pss *pss = (struct pss *)user; lws_mqtt_publish_param_t *pub; size_t chunk; switch (reason) { case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: lwsl_err("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__, in ? (char *)in : "(null)"); if (++done == count) goto finish_test; break; case LWS_CALLBACK_MQTT_CLIENT_CLOSED: lwsl_user("%s: item %d: CLIENT_CLOSED %p\n", __func__, (int)(item - &items[0]), wsi); if (++done == count) goto finish_test; break; case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED: lwsl_user("%s: MQTT_CLIENT_ESTABLISHED: %p\n", __func__, wsi); lws_callback_on_writable(wsi); return 0; case LWS_CALLBACK_MQTT_SUBSCRIBED: lwsl_user("%s: MQTT_SUBSCRIBED\n", __func__); /* then we can get on with the actual test part */ pss->state++; lws_callback_on_writable(wsi); break; case LWS_CALLBACK_MQTT_UNSUBSCRIBED: lwsl_user("%s: item %d: UNSUBSCRIBED: %p: Received unsuback\n", __func__, (int)(item - &item[0]), wsi); okay++; if (++pss->state == STATE_TEST_FINISH) { lwsl_notice("%s: MQTT_UNSUBACK ending stream %d successfully(%d/%d)\n", __func__, (int)(item - &items[0]), okay, count); /* We are done, request to close */ return -1; } break; case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE: /* * Extra WRITEABLE may appear here other than ones we asked * for, so we must consult our own state to decide if we want * to make use of the opportunity */ switch (pss->state) { case STATE_SUBSCRIBE: lwsl_user("%s: item %d: WRITEABLE: %p: Subscribing\n", __func__, (int)(item - &items[0]), wsi); if (lws_mqtt_client_send_subcribe(wsi, &sub_param)) { lwsl_notice("%s: subscribe failed\n", __func__); return -1; } pss->state++; break; case STATE_PUBLISH_QOS0: case STATE_PUBLISH_QOS1: lwsl_user("%s: item %d: WRITEABLE: %p: Publish\n", __func__, (int)(item - &items[0]), wsi); pss->pub_param.topic = pss->state == STATE_PUBLISH_QOS0 ? "test/topic0" : "test/topic1"; pss->pub_param.topic_len = (uint16_t)strlen(pss->pub_param.topic); pss->pub_param.qos = pss->state == STATE_PUBLISH_QOS0 ? QOS0 : QOS1; pss->pub_param.payload_len = TEST_STRING_LEN; /* We send the message out 300 bytes or less at at time */ chunk = 300; if (chunk > TEST_STRING_LEN - pss->pos) chunk = TEST_STRING_LEN - pss->pos; lwsl_notice("%s: sending %d at +%d\n", __func__, (int)chunk, (int)pss->pos); if (lws_mqtt_client_send_publish(wsi, &pss->pub_param, test_string + pss->pos, chunk, (pss->pos + chunk == TEST_STRING_LEN))) { lwsl_notice("%s: publish failed\n", __func__); return -1; } pss->pos += chunk; if (pss->pos == TEST_STRING_LEN) { lwsl_debug("%s: sent message\n", __func__); pss->pos = 0; pss->state++; } break; case STATE_UNSUBSCRIBE: lwsl_user("%s: item %d: UNSUBSCRIBE: %p: Send unsub\n", __func__, (int)(item - &item[0]), wsi); pss->state++; if (lws_mqtt_client_send_unsubcribe(wsi, &sub_param)) { lwsl_notice("%s: subscribe failed\n", __func__); return -1; } break; default: break; } return 0; case LWS_CALLBACK_MQTT_ACK: lwsl_user("%s: item %d: MQTT_ACK (state %d)\n", __func__, (int)(item - &items[0]), pss->state); /* * We can forget about the message we just sent, it's done. * * For our test, that's the indication we can close the wsi. */ pss->state++; if (pss->state != STATE_TEST_FINISH) { lws_callback_on_writable(wsi); break; } break; case LWS_CALLBACK_MQTT_RESEND: lwsl_user("%s: MQTT_RESEND\n", __func__); /* * We must resend the packet ID mentioned in len */ if (++pss->retries == 3) { lwsl_notice("%s: too many retries\n", __func__); return 1; /* kill the connection */ } pss->state--; pss->pos = 0; break; case LWS_CALLBACK_MQTT_CLIENT_RX: pub = (lws_mqtt_publish_param_t *)in; assert(pub); lwsl_user("%s: item %d: MQTT_CLIENT_RX (%s) pos %d/%d len %d\n", __func__, (int)(item - &items[0]), pub->topic, (int)pub->payload_pos, (int)pub->payload_len, (int)len); //lwsl_hexdump_info(pub->payload, len); return 0; default: break; } return 0; finish_test: interrupted = 1; lws_cancel_service(lws_get_context(wsi)); return 0; } static const struct lws_protocols protocols[] = { { .name = "test-mqtt", .callback = callback_mqtt, .per_session_data_size = sizeof(struct pss) }, { NULL, NULL, 0, 0 } }; int main(int argc, const char **argv) { lws_state_notify_link_t notifier = { {}, system_notify_cb, "app" }; lws_state_notify_link_t *na[] = { ¬ifier, NULL }; struct lws_context_creation_info info; struct lws_context *context; const char *p; int n = 0; signal(SIGINT, sigint_handler); memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ lws_cmdline_option_handle_builtin(argc, argv, &info); do_ssl = !!lws_cmdline_option(argc, argv, "-s"); if (do_ssl) info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; if (lws_cmdline_option(argc, argv, "-p")) pipeline = 1; if ((p = lws_cmdline_option(argc, argv, "-i"))) stagger_us = atoi(p); if ((p = lws_cmdline_option(argc, argv, "-c"))) count = atoi(p); if (count > COUNT) { count = COUNT; lwsl_err("%s: clipped count at max %d\n", __func__, count); } lwsl_user("LWS minimal MQTT client %s [-d][-s]\n", do_ssl ? "tls enabled": "unencrypted"); info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ info.protocols = protocols; info.register_notifier_list = na; info.fd_limit_per_thread = 1 + COUNT + 1; info.retry_and_idle_policy = &retry; #if defined(LWS_WITH_MBEDTLS) /* * OpenSSL uses the system trust store. mbedTLS has to be told which * CA to trust explicitly. */ info.client_ssl_ca_filepath = "./mosq-ca.crt"; #endif context = lws_create_context(&info); if (!context) { lwsl_err("lws init failed\n"); return 1; } /* Event loop */ while (n >= 0 && !interrupted) n = lws_service(context, 0); lwsl_user("%s: Completed: %d/%d ok, %s\n", __func__, okay, count, okay != count ? "failed" : "OK"); lws_context_destroy(context); return okay != count; }