/* * libwebsockets-test-server - libwebsockets test implementation * * Copyright (C) 2010-2016 Andy Green * * This file is made available under the Creative Commons CC0 1.0 * Universal Public Domain Dedication. * * The person who associated a work with this deed has dedicated * the work to the public domain by waiving all of his or her rights * to the work worldwide under copyright law, including all related * and neighboring rights, to the extent allowed by law. You can copy, * modify, distribute and perform the work, even for commercial purposes, * all without asking permission. * * The test apps are intended to be adapted for use in your code, which * may be proprietary. So unlike the library itself, they are licensed * Public Domain. */ #if !defined (LWS_PLUGIN_STATIC) #define LWS_DLL #define LWS_INTERNAL #include "../lib/libwebsockets.h" #endif #include #include /* lws-mirror_protocol */ #if defined(LWS_WITH_ESP8266) #define MAX_MESSAGE_QUEUE 64 #else #define MAX_MESSAGE_QUEUE 512 #endif #define MAX_MIRROR_INSTANCES 10 struct lws_mirror_instance; struct per_session_data__lws_mirror { struct lws *wsi; struct lws_mirror_instance *mi; struct per_session_data__lws_mirror *same_mi_pss_list; int ringbuffer_tail; }; struct a_message { void *payload; size_t len; }; struct lws_mirror_instance { struct lws_mirror_instance *next; struct per_session_data__lws_mirror *same_mi_pss_list; char name[30]; struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; int ringbuffer_head; }; struct per_vhost_data__lws_mirror { struct lws_mirror_instance *mi_list; }; static int callback_lws_mirror(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { struct per_session_data__lws_mirror *pss = (struct per_session_data__lws_mirror *)user, **ppss, *pss1; struct per_vhost_data__lws_mirror *v = (struct per_vhost_data__lws_mirror *) lws_protocol_vh_priv_get(lws_get_vhost(wsi), lws_get_protocol(wsi)); struct lws_mirror_instance *mi, **pmi; char name[30]; int n, m, count_mi = 0; switch (reason) { case LWS_CALLBACK_ESTABLISHED: lwsl_info("%s: LWS_CALLBACK_ESTABLISHED\n", __func__); /* * mirror instance name... defaults to "", but if URL includes * "?mirror=xxx", will be "xxx" */ name[0] = '\0'; lws_get_urlarg_by_name(wsi, "mirror", name, sizeof(name) - 1); lwsl_notice("mirror %s\n", name); /* is there already a mirror instance of this name? */ mi = v->mi_list; while (mi) { if (!strcmp(name, mi->name)) { lwsl_notice("Joining existing mi %p '%s'\n", mi, name); /* yes... we will join it */ break; } count_mi++; mi = mi->next; } if (!mi) { /* no existing mirror instance for name */ if (count_mi == MAX_MIRROR_INSTANCES) return -1; /* create one with this name, and join it */ mi = malloc(sizeof(*mi)); memset(mi, 0, sizeof(*mi)); mi->next = v->mi_list; v->mi_list = mi; strcpy(mi->name, name); mi->ringbuffer_head = 0; lwsl_notice("Created new mi %p '%s'\n", mi, name); } /* add our pss to list of guys bound to this mi */ pss->same_mi_pss_list = mi->same_mi_pss_list; mi->same_mi_pss_list = pss; /* init the pss */ pss->mi = mi; pss->ringbuffer_tail = mi->ringbuffer_head; pss->wsi = wsi; break; case LWS_CALLBACK_CLOSED: /* detach our pss from the mirror instance */ mi = pss->mi; if (!mi) break; ppss = &mi->same_mi_pss_list; while (*ppss) { if (*ppss == pss) { *ppss = pss->same_mi_pss_list; break; } ppss = &(*ppss)->same_mi_pss_list; } if (!mi->same_mi_pss_list) { /* last pss unbound from mi... delete mi */ pmi = &v->mi_list; while (*pmi) { if (*pmi == mi) { *pmi = (*pmi)->next; if (!pss->mi) break; lwsl_info("%s: mirror protocol cleaning up %p\n", __func__, v); for (n = 0; n < ARRAY_SIZE(pss->mi->ringbuffer); n++) if (pss->mi->ringbuffer[n].payload) { free(pss->mi->ringbuffer[n].payload); pss->mi->ringbuffer[n].payload = NULL; } free(mi); break; } count_mi++; pmi = &(*pmi)->next; } } break; case LWS_CALLBACK_PROTOCOL_INIT: /* per vhost */ lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct per_vhost_data__lws_mirror)); break; case LWS_CALLBACK_PROTOCOL_DESTROY: /* per vhost */ break; case LWS_CALLBACK_SERVER_WRITEABLE: while (pss->ringbuffer_tail != pss->mi->ringbuffer_head) { m = pss->mi->ringbuffer[pss->ringbuffer_tail].len; n = lws_write(wsi, (unsigned char *) pss->mi->ringbuffer[pss->ringbuffer_tail].payload + LWS_PRE, m, LWS_WRITE_TEXT); if (n < 0) { lwsl_err("ERROR %d writing to mirror socket\n", n); return -1; } if (n < m) lwsl_err("mirror partial write %d vs %d\n", n, m); if (pss->ringbuffer_tail == (MAX_MESSAGE_QUEUE - 1)) pss->ringbuffer_tail = 0; else pss->ringbuffer_tail++; if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) lws_rx_flow_allow_all_protocol(lws_get_context(wsi), lws_get_protocol(wsi)); if (lws_send_pipe_choked(wsi)) { lws_callback_on_writable(wsi); break; } } break; case LWS_CALLBACK_RECEIVE: if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 1)) { lwsl_err("dropping!\n"); goto choke; } if (pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload) free(pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload); pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload = malloc(LWS_PRE + len); pss->mi->ringbuffer[pss->mi->ringbuffer_head].len = len; memcpy((char *)pss->mi->ringbuffer[pss->mi->ringbuffer_head].payload + LWS_PRE, in, len); if (pss->mi->ringbuffer_head == (MAX_MESSAGE_QUEUE - 1)) pss->mi->ringbuffer_head = 0; else pss->mi->ringbuffer_head++; if (((pss->mi->ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)) != (MAX_MESSAGE_QUEUE - 2)) goto done; choke: lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi); lws_rx_flow_control(wsi, 0); done: /* * ask for WRITABLE callback for every wsi bound to this * mirror instance */ pss1 = pss->mi->same_mi_pss_list; while (pss1) { lws_callback_on_writable(pss1->wsi); pss1 = pss1->same_mi_pss_list; } break; default: break; } return 0; } #define LWS_PLUGIN_PROTOCOL_MIRROR { \ "lws-mirror-protocol", \ callback_lws_mirror, \ sizeof(struct per_session_data__lws_mirror), \ 128, /* rx buf size must be >= permessage-deflate rx size */ \ } #if !defined (LWS_PLUGIN_STATIC) static const struct lws_protocols protocols[] = { LWS_PLUGIN_PROTOCOL_MIRROR }; LWS_EXTERN LWS_VISIBLE int init_protocol_lws_mirror(struct lws_context *context, struct lws_plugin_capability *c) { if (c->api_magic != LWS_PLUGIN_API_MAGIC) { lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC, c->api_magic); return 1; } c->protocols = protocols; c->count_protocols = ARRAY_SIZE(protocols); c->extensions = NULL; c->count_extensions = 0; return 0; } LWS_EXTERN LWS_VISIBLE int destroy_protocol_lws_mirror(struct lws_context *context) { return 0; } #endif