/* * libwebsockets-test-server - libwebsockets test implementation * * Written in 2010-2021 by Andy Green * * This file is made available under the Creative Commons CC0 1.0 * Universal Public Domain Dedication. * * The person who associated a work with this deed has dedicated * the work to the public domain by waiving all of his or her rights * to the work worldwide under copyright law, including all related * and neighboring rights, to the extent allowed by law. You can copy, * modify, distribute and perform the work, even for commercial purposes, * all without asking permission. * * The test apps are intended to be adapted for use in your code, which * may be proprietary. So unlike the library itself, they are licensed * Public Domain. * * Scrapeable, proxiable OpenMetrics metrics (compatible with Prometheus) * * https://tools.ietf.org/html/draft-richih-opsawg-openmetrics-00 * * This plugin provides four protocols related to openmetrics handling: * * 1) "lws-openmetrics" direct http listener so scraper can directly get metrics * * 2) "lws-openmetrics-prox-agg" metrics proxy server that scraper can connect * to locally to proxy through to connected remote clients at 3) * * 3) "lws-openmetrics-prox-server" metrics proxy server that remote clients can * connect to, providing a path where scrapers at 2) can get metrics from * clients connected us * * 4) "lws-openmetrics-prox-client" nailed-up metrics proxy client that tries to * keep up a connection to the server at 3), allowing to scraper to reach * clients that have no reachable way to serve. * * These are provided like this to maximize flexibility in being able to add * openmetrics serving, proxying, or client->proxy to existing lws code. * * Openmetrics supports a "metric" at the top of its report that describes the * source aka "target metadata". * * Since we want to enable collection from devices that are not externally * reachable, we must provide a reachable server that the clients can attach to * and have their stats aggregated and then read by Prometheus or whatever. * Openmetrics says that it wants to present the aggregated stats in a flat * summary with only the aggregator's "target metadata" and contributor targets * getting their data tagged with the source * * "The above discussion is in the context of individual exposers. An * exposition from a general purpose monitoring system may contain * metrics from many individual targets, and thus may expose multiple * target info Metrics. The metrics may already have had target * metadata added to them as labels as part of ingestion. The metric * names MUST NOT be varied based on target metadata. For example it * would be incorrect for all metrics to end up being prefixed with * staging_ even if they all originated from targets in a staging * environment)." */ #if !defined (LWS_PLUGIN_STATIC) #if !defined(LWS_DLL) #define LWS_DLL #endif #if !defined(LWS_INTERNAL) #define LWS_INTERNAL #endif #include #endif #include #include #include #include #if !defined(WIN32) #include #endif #include struct vhd { struct lws_context *cx; struct lws_vhost *vhost; char ws_server_uri[128]; char metrics_proxy_path[128]; char ba_secret[128]; const char *proxy_side_bind_name; /**< name used to bind the two halves of the proxy together, must be * the same name given in a pvo for both "lws-openmetrics-prox-agg" * (the side local to the scraper) and "lws-openmetrics-prox-server" * (the side the clients connect to) */ char sanity[8]; lws_dll2_owner_t clients; lws_sorted_usec_list_t sul; /* schedule connection retry */ struct vhd *bind_partner_vhd; struct lws *wsi; /* related wsi if any */ uint16_t retry_count; /* count of consequetive retries */ }; struct pss { lws_dll2_t list; char proxy_path[64]; struct lwsac *ac; /* the translated metrics, one ac per line */ struct lwsac *walk; /* iterator for ac when writing */ size_t tot; /* content-length computation */ struct lws *wsi; uint8_t greet:1; /* set if client needs to send proxy path */ uint8_t trigger:1; /* we want to ask the client to dump */ }; #if defined(LWS_WITH_CLIENT) static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 }; static const lws_retry_bo_t retry = { .retry_ms_table = backoff_ms, .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), .conceal_count = LWS_ARRAY_SIZE(backoff_ms), .secs_since_valid_ping = 400, /* force PINGs after secs idle */ .secs_since_valid_hangup = 400, /* hangup after secs idle */ .jitter_percent = 0, }; static void omc_connect_client(lws_sorted_usec_list_t *sul) { struct vhd *vhd = lws_container_of(sul, struct vhd, sul); struct lws_client_connect_info i; const char *prot; char url[128]; memset(&i, 0, sizeof(i)); lwsl_notice("%s: %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret); lws_strncpy(url, vhd->ws_server_uri, sizeof(url)); if (lws_parse_uri(url, &prot, &i.address, &i.port, &i.path)) { lwsl_err("%s: unable to parse uri %s\n", __func__, vhd->ws_server_uri); return; } i.context = vhd->cx; i.origin = i.address; i.host = i.address; i.ssl_connection = LCCSCF_USE_SSL; i.protocol = "lws-openmetrics-prox-server"; /* public subprot */ i.local_protocol_name = "lws-openmetrics-prox-client"; i.pwsi = &vhd->wsi; i.retry_and_idle_policy = &retry; i.userdata = vhd; i.vhost = vhd->vhost; lwsl_notice("%s: %s %u %s\n", __func__, i.address, i.port, i.path); if (lws_client_connect_via_info(&i)) return; /* * Failed... schedule a retry... we can't use the _retry_wsi() * convenience wrapper api here because no valid wsi at this * point. */ if (!lws_retry_sul_schedule(vhd->cx, 0, sul, &retry, omc_connect_client, &vhd->retry_count)) return; vhd->retry_count = 0; lws_retry_sul_schedule(vhd->cx, 0, sul, &retry, omc_connect_client, &vhd->retry_count); } #endif static void openmetrics_san(char *nm, size_t nl) { size_t m; /* Openmetrics has a very restricted token charset */ for (m = 0; m < nl; m++) if ((nm[m] < 'A' || nm[m] > 'Z') && (nm[m] < 'a' || nm[m] > 'z') && (nm[m] < '0' || nm[m] > '9') && nm[m] != '_') nm[m] = '_'; } static int lws_metrics_om_format_agg(lws_metric_pub_t *pub, const char *nm, lws_usec_t now, int gng, char *buf, size_t len) { const char *_gng = gng ? "_nogo" : "_go"; char *end = buf + len - 1, *obuf = buf; if (pub->flags & LWSMTFL_REPORT_ONLY_GO) _gng = ""; if (!(pub->flags & LWSMTFL_REPORT_MEAN)) { /* only the sum is meaningful */ if (pub->flags & LWSMTFL_REPORT_DUTY_WALLCLOCK_US) { buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf), "%s_count %u\n" "%s_us_sum %llu\n" "%s_created %lu.%06u\n", nm, (unsigned int)pub->u.agg.count[gng], nm, (unsigned long long)pub->u.agg.sum[gng], nm, (unsigned long)(pub->us_first / 1000000), (unsigned int)(pub->us_first % 1000000)); return lws_ptr_diff(buf, obuf); } /* it's a monotonic ordinal, like total tx */ buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf), "%s%s_count %u\n" "%s%s_sum %llu\n", nm, _gng, (unsigned int)pub->u.agg.count[gng], nm, _gng, (unsigned long long)pub->u.agg.sum[gng]); } else buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf), "%s%s_count %u\n" "%s%s_mean %llu\n", nm, _gng, (unsigned int)pub->u.agg.count[gng], nm, _gng, (unsigned long long) (pub->u.agg.count[gng] ? pub->u.agg.sum[gng] / pub->u.agg.count[gng] : 0)); return lws_ptr_diff(buf, obuf); } static int lws_metrics_om_ac_stash(struct pss *pss, const char *buf, size_t len) { char *q; q = lwsac_use(&pss->ac, LWS_PRE + len + 2, LWS_PRE + len + 2); if (!q) { lwsac_free(&pss->ac); return -1; } q[LWS_PRE] = (char)((len >> 8) & 0xff); q[LWS_PRE + 1] = (char)(len & 0xff); memcpy(q + LWS_PRE + 2, buf, len); pss->tot += len; return 0; } /* * We have to do the ac listing at this level, because there can be too large * a number to metrics tags to iterate that can fit in a reasonable buffer. */ static int lws_metrics_om_format(struct pss *pss, lws_metric_pub_t *pub, const char *nm) { char buf[1200], *p = buf, *end = buf + sizeof(buf) - 1, tmp[512]; lws_usec_t t = lws_now_usecs(); if (pub->flags & LWSMTFL_REPORT_HIST) { lws_metric_bucket_t *buck = pub->u.hist.head; p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "%s_count %llu\n", nm, (unsigned long long) pub->u.hist.total_count); while (buck) { lws_strncpy(tmp, lws_metric_bucket_name(buck), sizeof(tmp)); p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "%s{%s} %llu\n", nm, tmp, (unsigned long long)buck->count); lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf)); p = buf; buck = buck->next; } goto happy; } if (!pub->u.agg.count[METRES_GO] && !pub->u.agg.count[METRES_NOGO]) return 0; if (pub->u.agg.count[METRES_GO]) p += lws_metrics_om_format_agg(pub, nm, t, METRES_GO, p, lws_ptr_diff_size_t(end, p)); if (!(pub->flags & LWSMTFL_REPORT_ONLY_GO) && pub->u.agg.count[METRES_NOGO]) p += lws_metrics_om_format_agg(pub, nm, t, METRES_NOGO, p, lws_ptr_diff_size_t(end, p)); if (pub->flags & LWSMTFL_REPORT_MEAN) p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "%s_min %llu\n" "%s_max %llu\n", nm, (unsigned long long)pub->u.agg.min, nm, (unsigned long long)pub->u.agg.max); happy: return lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf)); } static int append_om_metric(lws_metric_pub_t *pub, void *user) { struct pss *pss = (struct pss *)user; char nm[64]; size_t nl; /* * Convert lws_metrics to openmetrics metrics data, stashing into an * lwsac without backfill. Since it's not backfilling, use areas are in * linear sequence simplifying walking them. Limiting the lwsac alloc * to less than a typical mtu means we can write one per write * efficiently */ lws_strncpy(nm, pub->name, sizeof(nm)); nl = strlen(nm); openmetrics_san(nm, nl); return lws_metrics_om_format(pss, pub, nm); } #if defined(__linux__) static int grabfile(const char *fi, char *buf, size_t len) { int n, fd = lws_open(fi, LWS_O_RDONLY); buf[0] = '\0'; if (fd < 0) return -1; n = (int)read(fd, buf, len - 1); close(fd); if (n < 0) { buf[0] = '\0'; return -1; } buf[n] = '\0'; if (n > 0 && buf[n - 1] == '\n') buf[--n] = '\0'; return n; } #endif /* * Let's pregenerate the output into an lwsac all at once and * then spool it back to the peer afterwards * * - there's not going to be that much of it (a few kB) * - we then know the content-length for the headers * - it's stretchy to arbitrary numbers of metrics * - lwsac block list provides the per-metric structure to * hold the data in a way we can walk to write it simply */ int ome_prepare(struct lws_context *ctx, struct pss *pss) { char buf[1224], *start = buf + LWS_PRE, *p = start, *end = buf + sizeof(buf) - 1; char hn[64]; pss->tot = 0; /* * Target metadata */ hn[0] = '\0'; gethostname(hn, sizeof(hn) - 1); p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "# TYPE target info\n" "# HELP target Target metadata\n" "target_info{hostname=\"%s\"", hn); #if defined(__linux__) if (grabfile("/proc/self/cmdline", hn, sizeof(hn))) p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p), ",cmdline=\"%s\"", hn); #endif p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "} 1\n"); if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE, lws_ptr_diff_size_t(p, buf + LWS_PRE))) return 1; /* lws version */ p = start; p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "# TYPE lws_info info\n" "# HELP lws_info Version of lws producing this\n" "lws_info{version=\"%s\"} 1\n", LWS_BUILD_HASH); if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE, lws_ptr_diff_size_t(p, buf + LWS_PRE))) return 1; /* system scalars */ #if defined(__linux__) if (grabfile("/proc/loadavg", hn, sizeof(hn))) { char *sp = strchr(hn, ' '); if (sp) { p = start; p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "load_1m %.*s\n", lws_ptr_diff(sp, hn), hn); if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE, lws_ptr_diff_size_t(p, start))) return 1; } } #endif if (lws_metrics_foreach(ctx, pss, append_om_metric)) return 1; p = start; p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "# EOF\n"); if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE, lws_ptr_diff_size_t(p, buf + LWS_PRE))) return 1; pss->walk = pss->ac; return 0; } #if defined(LWS_WITH_SERVER) /* 1) direct http export for scraper */ static int callback_lws_openmetrics_export(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { unsigned char buf[1224], *start = buf + LWS_PRE, *p = start, *end = buf + sizeof(buf) - 1, *ip; struct lws_context *cx = lws_get_context(wsi); struct pss *pss = (struct pss *)user; unsigned int m, wm; switch (reason) { case LWS_CALLBACK_HTTP: ome_prepare(cx, pss); p = start; if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK, "application/openmetrics-text; " "version=1.0.0; charset=utf-8", pss->tot, &p, end) || lws_finalize_write_http_header(wsi, start, &p, end)) return 1; lws_callback_on_writable(wsi); return 0; case LWS_CALLBACK_CLOSED_HTTP: lwsac_free(&pss->ac); break; case LWS_CALLBACK_HTTP_WRITEABLE: if (!pss->walk) return 0; do { ip = (uint8_t *)pss->walk + lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE; m = (unsigned int)((ip[0] << 8) | ip[1]); /* coverity */ if (m > lwsac_get_tail_pos(pss->walk) - lwsac_sizeof(pss->walk == pss->ac)) return -1; if (lws_ptr_diff_size_t(end, p) < m) break; memcpy(p, ip + 2, m); p += m; pss->walk = lwsac_get_next(pss->walk); } while (pss->walk); if (!lws_ptr_diff_size_t(p, start)) { lwsl_err("%s: stuck\n", __func__); return -1; } wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL; if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start), (enum lws_write_protocol)wm) < 0) return 1; if (!pss->walk) { if (lws_http_transaction_completed(wsi)) return -1; } else lws_callback_on_writable(wsi); return 0; default: break; } return lws_callback_http_dummy(wsi, reason, user, in, len); } static struct pss * omc_lws_om_get_other_side_pss_client(struct vhd *vhd, struct pss *pss) { /* * Search through our partner's clients list looking for one with the * same proxy path */ lws_start_foreach_dll(struct lws_dll2 *, d, vhd->bind_partner_vhd->clients.head) { struct pss *apss = lws_container_of(d, struct pss, list); if (!strcmp(pss->proxy_path, apss->proxy_path)) return apss; } lws_end_foreach_dll(d); return NULL; } /* 2) "lws-openmetrics-prox-agg": http server export via proxy to connected clients */ static int callback_lws_openmetrics_prox_agg(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { unsigned char buf[1224], *start = buf + LWS_PRE, *p = start, *end = buf + sizeof(buf) - 1, *ip; struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get( lws_get_vhost(wsi), lws_get_protocol(wsi)); struct lws_context *cx = lws_get_context(wsi); struct pss *pss = (struct pss *)user, *partner_pss; unsigned int m, wm; switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi))); /* * We get told what to do when we are bound to the vhost */ vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct vhd)); if (!vhd) { lwsl_err("%s: vhd alloc failed\n", __func__); return 0; } vhd->cx = cx; /* * Try to bind to the counterpart server in the proxy, binding * to the right one by having a common bind name set in a pvo. * We don't know who will get instantiated last, so both parts * try to bind if not already bound */ if (!lws_pvo_get_str(in, "proxy-side-bind-name", &vhd->proxy_side_bind_name)) { /* * Attempt to find the vhd that belongs to a vhost * that has instantiated protocol * "lws-openmetrics-prox-server", and has set pvo * "proxy-side-bind-name" on it to whatever our * vhd->proxy_side_bind_name was also set to. * * If found, inform the two sides of the same proxy * what their partner vhd is */ lws_strncpy(vhd->sanity, "isagg", sizeof(vhd->sanity)); vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx, "lws-openmetrics-prox-server", "proxy-side-bind-name", vhd->proxy_side_bind_name); if (vhd->bind_partner_vhd) { assert(!strcmp(vhd->bind_partner_vhd->sanity, "isws")); lwsl_notice("%s: proxy binding OK\n", __func__); vhd->bind_partner_vhd->bind_partner_vhd = vhd; } } else { lwsl_warn("%s: proxy-side-bind-name required\n", __func__); return 1; } break; case LWS_CALLBACK_PROTOCOL_DESTROY: if (vhd) lws_sul_cancel(&vhd->sul); break; case LWS_CALLBACK_HTTP: /* * The scraper has connected to us, the local side of the proxy, * we need to match what it wants to */ if (!vhd->bind_partner_vhd) return 0; lws_strnncpy(pss->proxy_path, (const char *)in, len, sizeof(pss->proxy_path)); if (pss->list.owner) { lwsl_warn("%s: double HTTP?\n", __func__); return 0; } pss->wsi = wsi; lws_start_foreach_dll(struct lws_dll2 *, d, vhd->bind_partner_vhd->clients.head) { struct pss *apss = lws_container_of(d, struct pss, list); if (!strcmp((const char *)in, apss->proxy_path)) { apss->trigger = 1; lws_callback_on_writable(apss->wsi); /* let's add him on the http server vhd list */ lws_dll2_add_tail(&pss->list, &vhd->clients); return 0; } } lws_end_foreach_dll(d); return 0; case LWS_CALLBACK_CLOSED_HTTP: lwsac_free(&pss->ac); lws_dll2_remove(&pss->list); break; case LWS_CALLBACK_HTTP_WRITEABLE: if (!pss->walk) return 0; /* locate the wss side if it's still around */ partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss); if (!partner_pss) return -1; do { ip = (uint8_t *)pss->walk + lwsac_sizeof(pss->walk == partner_pss->ac) + LWS_PRE; m = (unsigned int)((ip[0] << 8) | ip[1]); /* coverity */ if (m > lwsac_get_tail_pos(pss->walk) - lwsac_sizeof(pss->walk == partner_pss->ac)) return -1; if (lws_ptr_diff_size_t(end, p) < m) break; memcpy(p, ip + 2, m); p += m; pss->walk = lwsac_get_next(pss->walk); } while (pss->walk); if (!lws_ptr_diff_size_t(p, start)) { lwsl_err("%s: stuck\n", __func__); return -1; } wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL; if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start), (enum lws_write_protocol)wm) < 0) return 1; if (!pss->walk) { lwsl_info("%s: whole msg proxied to scraper\n", __func__); lws_dll2_remove(&pss->list); lwsac_free(&partner_pss->ac); // if (lws_http_transaction_completed(wsi)) return -1; } else lws_callback_on_writable(wsi); return 0; default: break; } return lws_callback_http_dummy(wsi, reason, user, in, len); } /* 3) "lws-openmetrics-prox-server": ws server side of metrics proxy, for * ws clients to connect to */ static int callback_lws_openmetrics_prox_server(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { unsigned char buf[1224], *start = buf + LWS_PRE, *p = start, *end = buf + sizeof(buf) - 1; struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get( lws_get_vhost(wsi), lws_get_protocol(wsi)); struct lws_context *cx = lws_get_context(wsi); struct pss *pss = (struct pss *)user, *partner_pss; switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: /* * We get told what to do when we are bound to the vhost */ lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi))); vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct vhd)); if (!vhd) { lwsl_err("%s: vhd alloc failed\n", __func__); return 0; } vhd->cx = cx; /* * Try to bind to the counterpart server in the proxy, binding * to the right one by having a common bind name set in a pvo. * We don't know who will get instantiated last, so both parts * try to bind if not already bound */ if (!lws_pvo_get_str(in, "proxy-side-bind-name", &vhd->proxy_side_bind_name)) { /* * Attempt to find the vhd that belongs to a vhost * that has instantiated protocol * "lws-openmetrics-prox-server", and has set pvo * "proxy-side-bind-name" on it to whatever our * vhd->proxy_side_bind_name was also set to. * * If found, inform the two sides of the same proxy * what their partner vhd is */ lws_strncpy(vhd->sanity, "isws", sizeof(vhd->sanity)); vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx, "lws-openmetrics-prox-agg", "proxy-side-bind-name", vhd->proxy_side_bind_name); if (vhd->bind_partner_vhd) { assert(!strcmp(vhd->bind_partner_vhd->sanity, "isagg")); lwsl_notice("%s: proxy binding OK\n", __func__); vhd->bind_partner_vhd->bind_partner_vhd = vhd; } } else { lwsl_warn("%s: proxy-side-bind-name required\n", __func__); return 1; } break; case LWS_CALLBACK_PROTOCOL_DESTROY: break; case LWS_CALLBACK_ESTABLISHED: /* * a client has joined... we need to add his pss to our list * of live, joined clients */ /* mark us as waiting for the reference name from the client */ pss->greet = 1; pss->wsi = wsi; lws_validity_confirmed(wsi); return 0; case LWS_CALLBACK_CLOSED: /* * a client has parted */ lws_dll2_remove(&pss->list); lwsl_warn("%s: client %s left (%u)\n", __func__, pss->proxy_path, (unsigned int)vhd->clients.count); lwsac_free(&pss->ac); /* let's kill the scraper connection accordingly, if still up */ partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss); if (partner_pss) lws_wsi_close(partner_pss->wsi, LWS_TO_KILL_ASYNC); break; case LWS_CALLBACK_RECEIVE: if (pss->greet) { pss->greet = 0; lws_strnncpy(pss->proxy_path, (const char *)in, len, sizeof(pss->proxy_path)); lws_validity_confirmed(wsi); lwsl_notice("%s: received greet '%s'\n", __func__, pss->proxy_path); /* * we need to add his pss to our list of configured, * live, joined clients */ lws_dll2_add_tail(&pss->list, &vhd->clients); return 0; } /* * He's sending us his results... let's collect chunks into the * pss lwsac before worrying about anything else */ if (lws_is_first_fragment(wsi)) pss->tot = 0; lws_metrics_om_ac_stash(pss, (const char *)in, len); if (lws_is_final_fragment(wsi)) { struct pss *partner_pss; lwsl_info("%s: ws side received complete msg\n", __func__); /* the lwsac is complete */ pss->walk = pss->ac; partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss); if (!partner_pss) { lwsl_notice("%s: no partner A\n", __func__); return -1; } /* indicate to scraper side we want to issue now */ p = start; if (lws_add_http_common_headers(partner_pss->wsi, HTTP_STATUS_OK, "application/openmetrics-text; " "version=1.0.0; charset=utf-8", pss->tot, &p, end) || lws_finalize_write_http_header(partner_pss->wsi, start, &p, end)) return -1; /* indicate to scraper side we want to issue now */ partner_pss->walk = pss->ac; partner_pss->trigger = 1; lws_callback_on_writable(partner_pss->wsi); } return 0; case LWS_CALLBACK_SERVER_WRITEABLE: if (!pss->trigger) return 0; pss->trigger = 0; partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss); if (!partner_pss) { lwsl_err("%s: no partner\n", __func__); return 0; } lwsl_info("%s: sending trigger to client\n", __func__); *start = 'x'; if (lws_write(wsi, start, 1, (enum lws_write_protocol)LWS_WRITE_TEXT) < 0) return 1; lws_validity_confirmed(wsi); return 0; default: break; } return lws_callback_http_dummy(wsi, reason, user, in, len); } #endif #if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS) /* 4) ws client that keeps wss connection up to metrics proxy ws server */ static int callback_lws_openmetrics_prox_client(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { unsigned char buf[1224], *start = buf + LWS_PRE, *p = start, *end = buf + sizeof(buf) - 1, *ip; struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get( lws_get_vhost(wsi), lws_get_protocol(wsi)); struct lws_context *cx = lws_get_context(wsi); struct pss *pss = (struct pss *)user; unsigned int m, wm; const char *cp; char first; switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi))); /* * We get told what to do when we are bound to the vhost */ vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct vhd)); if (!vhd) return 0; vhd->cx = cx; vhd->vhost = lws_get_vhost(wsi); /* the proxy server uri */ if (lws_pvo_get_str(in, "ws-server-uri", &cp)) { lwsl_err("%s: ws-server-uri pvo required\n", __func__); return 1; } lws_strncpy(vhd->ws_server_uri, cp, sizeof(vhd->ws_server_uri)); /* how we should be referenced at the proxy */ if (lws_pvo_get_str(in, "metrics-proxy-path", &cp)) { lwsl_err("%s: metrics-proxy-path pvo required\n", __func__); return 1; } lws_strncpy(vhd->metrics_proxy_path, cp, sizeof(vhd->metrics_proxy_path)); /* the shared secret to authenticate us as allowed to join */ if (lws_pvo_get_str(in, "ba-secret", &cp)) { lwsl_err("%s: ba-secret pvo required\n", __func__); return 1; } lws_strncpy(vhd->ba_secret, cp, sizeof(vhd->ba_secret)); lwsl_notice("%s: scheduling connect %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret); lws_validity_confirmed(wsi); lws_sul_schedule(cx, 0, &vhd->sul, omc_connect_client, 1); break; case LWS_CALLBACK_PROTOCOL_DESTROY: if (vhd) lws_sul_cancel(&vhd->sul); break; case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: { unsigned char **pp = (unsigned char **)in, *pend = (*pp) + len; char b[128]; /* authorize ourselves to the metrics proxy using basic auth */ if (lws_http_basic_auth_gen("metricsclient", vhd->ba_secret, b, sizeof(b))) break; if (lws_add_http_header_by_token(wsi, WSI_TOKEN_HTTP_AUTHORIZATION, (unsigned char *)b, (int)strlen(b), pp, pend)) return -1; break; } case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)"); goto do_retry; case LWS_CALLBACK_CLIENT_ESTABLISHED: lwsl_warn("%s: connected to ws metrics agg server\n", __func__); pss->greet = 1; lws_callback_on_writable(wsi); lws_validity_confirmed(wsi); return 0; case LWS_CALLBACK_CLIENT_CLOSED: lwsl_notice("%s: client closed\n", __func__); lwsac_free(&pss->ac); goto do_retry; case LWS_CALLBACK_CLIENT_RECEIVE: /* * Proxy serverside sends us something to trigger us to create * our metrics message and send it back over the ws link */ ome_prepare(cx, pss); pss->walk = pss->ac; lws_callback_on_writable(wsi); lwsl_info("%s: dump requested\n", __func__); break; case LWS_CALLBACK_CLIENT_WRITEABLE: if (pss->greet) { /* * At first after establishing the we link, we send a * message indicating to the metrics proxy how we * should be referred to by the scraper to particularly * select to talk to us */ lwsl_info("%s: sending greet '%s'\n", __func__, vhd->metrics_proxy_path); lws_strncpy((char *)start, vhd->metrics_proxy_path, sizeof(buf) - LWS_PRE); if (lws_write(wsi, start, strlen(vhd->metrics_proxy_path), LWS_WRITE_TEXT) < 0) return 1; lws_validity_confirmed(wsi); pss->greet = 0; return 0; } if (!pss->walk) return 0; /* * We send the metrics dump in a single logical ws message, * using ws fragmentation to split it around 1 mtu boundary * and keep coming back until it's finished */ first = pss->walk == pss->ac; do { ip = (uint8_t *)pss->walk + lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE; m = (unsigned int)((ip[0] << 8) | ip[1]); /* coverity */ if (m > lwsac_get_tail_pos(pss->walk) - lwsac_sizeof(pss->walk == pss->ac)) { lwsl_err("%s: size blow\n", __func__); return -1; } if (lws_ptr_diff_size_t(end, p) < m) break; memcpy(p, ip + 2, m); p += m; pss->walk = lwsac_get_next(pss->walk); } while (pss->walk); if (!lws_ptr_diff_size_t(p, start)) { lwsl_err("%s: stuck\n", __func__); return -1; } wm = (unsigned int)lws_write_ws_flags(LWS_WRITE_TEXT, first, !pss->walk); if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start), (enum lws_write_protocol)wm) < 0) { lwsl_notice("%s: write fail\n", __func__); return 1; } lws_validity_confirmed(wsi); lwsl_info("%s: forwarded %d\n", __func__, lws_ptr_diff(p, start)); if (!pss->walk) { lwsl_info("%s: dump send completed\n", __func__); lwsac_free(&pss->ac); } else lws_callback_on_writable(wsi); return 0; default: break; } return lws_callback_http_dummy(wsi, reason, user, in, len); do_retry: if (!lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry, omc_connect_client, &vhd->retry_count)) return 0; vhd->retry_count = 0; lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry, omc_connect_client, &vhd->retry_count); return 0; } #endif LWS_VISIBLE const struct lws_protocols lws_openmetrics_export_protocols[] = { #if defined(LWS_WITH_SERVER) { /* for scraper directly: http export on listen socket */ "lws-openmetrics", callback_lws_openmetrics_export, sizeof(struct pss), 1024, }, { /* for scraper via ws proxy: http export on listen socket */ "lws-openmetrics-prox-agg", callback_lws_openmetrics_prox_agg, sizeof(struct pss), 1024, }, { /* metrics proxy server side: ws server for clients to connect to */ "lws-openmetrics-prox-server", callback_lws_openmetrics_prox_server, sizeof(struct pss), 1024, }, #endif #if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS) { /* client to metrics proxy: ws client to connect to metrics proxy*/ "lws-openmetrics-prox-client", callback_lws_openmetrics_prox_client, sizeof(struct pss), 1024, }, #endif }; LWS_VISIBLE const lws_plugin_protocol_t lws_openmetrics_export = { .hdr = { "lws OpenMetrics export", "lws_protocol_plugin", LWS_BUILD_HASH, LWS_PLUGIN_API_MAGIC }, .protocols = lws_openmetrics_export_protocols, .count_protocols = LWS_ARRAY_SIZE(lws_openmetrics_export_protocols), };