/*
 * libwebsockets-test-server - libwebsockets test implementation
 *
 * Written in 2010-2021 by Andy Green <andy@warmcat.com>
 *
 * This file is made available under the Creative Commons CC0 1.0
 * Universal Public Domain Dedication.
 *
 * The person who associated a work with this deed has dedicated
 * the work to the public domain by waiving all of his or her rights
 * to the work worldwide under copyright law, including all related
 * and neighboring rights, to the extent allowed by law. You can copy,
 * modify, distribute and perform the work, even for commercial purposes,
 * all without asking permission.
 *
 * The test apps are intended to be adapted for use in your code, which
 * may be proprietary.  So unlike the library itself, they are licensed
 * Public Domain.
 *
 * Scrapeable, proxiable OpenMetrics metrics (compatible with Prometheus)
 *
 * https://tools.ietf.org/html/draft-richih-opsawg-openmetrics-00
 *
 * This plugin provides four protocols related to openmetrics handling:
 *
 * 1) "lws-openmetrics" direct http listener so scraper can directly get metrics
 *
 * 2) "lws-openmetrics-prox-agg" metrics proxy server that scraper can connect
 *    to locally to proxy through to connected remote clients at 3)
 *
 * 3) "lws-openmetrics-prox-server" metrics proxy server that remote clients can
 *    connect to, providing a path where scrapers at 2) can get metrics from
 *    clients connected us
 *
 * 4) "lws-openmetrics-prox-client" nailed-up metrics proxy client that tries to
 *    keep up a connection to the server at 3), allowing to scraper to reach
 *    clients that have no reachable way to serve.
 *
 * These are provided like this to maximize flexibility in being able to add
 * openmetrics serving, proxying, or client->proxy to existing lws code.
 *
 * Openmetrics supports a "metric" at the top of its report that describes the
 * source aka "target metadata".
 *
 * Since we want to enable collection from devices that are not externally
 * reachable, we must provide a reachable server that the clients can attach to
 * and have their stats aggregated and then read by Prometheus or whatever.
 * Openmetrics says that it wants to present the aggregated stats in a flat
 * summary with only the aggregator's "target metadata" and contributor targets
 * getting their data tagged with the source
 *
 * "The above discussion is in the context of individual exposers.  An
 *  exposition from a general purpose monitoring system may contain
 *  metrics from many individual targets, and thus may expose multiple
 *  target info Metrics.  The metrics may already have had target
 *  metadata added to them as labels as part of ingestion.  The metric
 *  names MUST NOT be varied based on target metadata.  For example it
 *  would be incorrect for all metrics to end up being prefixed with
 *  staging_ even if they all originated from targets in a staging
 *  environment)."
 */

#if !defined (LWS_PLUGIN_STATIC)
#if !defined(LWS_DLL)
#define LWS_DLL
#endif
#if !defined(LWS_INTERNAL)
#define LWS_INTERNAL
#endif
#include <libwebsockets.h>
#endif
#include <string.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <fcntl.h>
#if !defined(WIN32)
#include <unistd.h>
#endif
#include <assert.h>

struct vhd {
	struct lws_context	*cx;
	struct lws_vhost	*vhost;

	char			ws_server_uri[128];
	char			metrics_proxy_path[128];
	char			ba_secret[128];

	const char		*proxy_side_bind_name;
	/**< name used to bind the two halves of the proxy together, must be
	 * the same name given in a pvo for both "lws-openmetrics-prox-agg"
	 * (the side local to the scraper) and "lws-openmetrics-prox-server"
	 * (the side the clients connect to)
	 */

	char			sanity[8];

	lws_dll2_owner_t	clients;

	lws_sorted_usec_list_t	sul;	     /* schedule connection retry */

	struct vhd		*bind_partner_vhd;

	struct lws		*wsi;	     /* related wsi if any */
	uint16_t		retry_count; /* count of consequetive retries */
};

struct pss {
	lws_dll2_t		list;
	char			proxy_path[64];
	struct lwsac		*ac;	/* the translated metrics, one ac per line */
	struct lwsac		*walk;	/* iterator for ac when writing */
	size_t			tot;	/* content-length computation */
	struct lws		*wsi;

	uint8_t			greet:1; /* set if client needs to send proxy path */
	uint8_t			trigger:1; /* we want to ask the client to dump */
};

#if defined(LWS_WITH_CLIENT)
static const uint32_t backoff_ms[] = { 1000, 2000, 3000, 4000, 5000 };

static const lws_retry_bo_t retry = {
	.retry_ms_table			= backoff_ms,
	.retry_ms_table_count		= LWS_ARRAY_SIZE(backoff_ms),
	.conceal_count			= LWS_ARRAY_SIZE(backoff_ms),

	.secs_since_valid_ping		= 400,  /* force PINGs after secs idle */
	.secs_since_valid_hangup	= 400, /* hangup after secs idle */

	.jitter_percent			= 0,
};

static void
omc_connect_client(lws_sorted_usec_list_t *sul)
{
	struct vhd *vhd = lws_container_of(sul, struct vhd, sul);
	struct lws_client_connect_info i;
	const char *prot;
	char url[128];

	memset(&i, 0, sizeof(i));

	lwsl_notice("%s: %s %s %s\n", __func__, vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);

	lws_strncpy(url, vhd->ws_server_uri, sizeof(url));

	if (lws_parse_uri(url, &prot, &i.address, &i.port, &i.path)) {
		lwsl_err("%s: unable to parse uri %s\n", __func__,
			 vhd->ws_server_uri);
		return;
	}

	i.context		= vhd->cx;
	i.origin		= i.address;
	i.host			= i.address;
	i.ssl_connection	= LCCSCF_USE_SSL;
	i.protocol		= "lws-openmetrics-prox-server"; /* public subprot */
	i.local_protocol_name	= "lws-openmetrics-prox-client";
	i.pwsi			= &vhd->wsi;
	i.retry_and_idle_policy = &retry;
	i.userdata		= vhd;
	i.vhost			= vhd->vhost;

	lwsl_notice("%s: %s %u %s\n", __func__, i.address, i.port, i.path);

	if (lws_client_connect_via_info(&i))
		return;

	/*
	 * Failed... schedule a retry... we can't use the _retry_wsi()
	 * convenience wrapper api here because no valid wsi at this
	 * point.
	 */
	if (!lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
				    omc_connect_client, &vhd->retry_count))
		return;

	vhd->retry_count = 0;
	lws_retry_sul_schedule(vhd->cx, 0, sul, &retry,
			       omc_connect_client, &vhd->retry_count);
}
#endif

static void
openmetrics_san(char *nm, size_t nl)
{
	size_t m;

	/* Openmetrics has a very restricted token charset */

	for (m = 0; m < nl; m++)
		if ((nm[m] < 'A' || nm[m] > 'Z') &&
		    (nm[m] < 'a' || nm[m] > 'z') &&
		    (nm[m] < '0' || nm[m] > '9') &&
		    nm[m] != '_')
			nm[m] = '_';
}

static int
lws_metrics_om_format_agg(lws_metric_pub_t *pub, const char *nm, lws_usec_t now,
			  int gng, char *buf, size_t len)
{
	const char *_gng = gng ? "_nogo" : "_go";
	char *end = buf + len - 1, *obuf = buf;

	if (pub->flags & LWSMTFL_REPORT_ONLY_GO)
		_gng = "";

	if (!(pub->flags & LWSMTFL_REPORT_MEAN)) {
		/* only the sum is meaningful */
		if (pub->flags & LWSMTFL_REPORT_DUTY_WALLCLOCK_US) {
			buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
				"%s_count %u\n"
				"%s_us_sum %llu\n"
				"%s_created %lu.%06u\n",
				nm, (unsigned int)pub->u.agg.count[gng],
				nm, (unsigned long long)pub->u.agg.sum[gng],
				nm, (unsigned long)(pub->us_first / 1000000),
				    (unsigned int)(pub->us_first % 1000000));

			return lws_ptr_diff(buf, obuf);
		}

		/* it's a monotonic ordinal, like total tx */
		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
				    "%s%s_count %u\n"
				    "%s%s_sum %llu\n",
				    nm, _gng,
				    (unsigned int)pub->u.agg.count[gng],
				    nm, _gng,
				    (unsigned long long)pub->u.agg.sum[gng]);

	} else
		buf += lws_snprintf(buf, lws_ptr_diff_size_t(end, buf),
				    "%s%s_count %u\n"
				    "%s%s_mean %llu\n",
				    nm, _gng,
				    (unsigned int)pub->u.agg.count[gng],
				    nm, _gng, (unsigned long long)
				    (pub->u.agg.count[gng] ?
						pub->u.agg.sum[gng] /
						pub->u.agg.count[gng] : 0));

	return lws_ptr_diff(buf, obuf);
}

static int
lws_metrics_om_ac_stash(struct pss *pss, const char *buf, size_t len)
{
	char *q;

	q = lwsac_use(&pss->ac, LWS_PRE + len + 2, LWS_PRE + len + 2);
	if (!q) {
		lwsac_free(&pss->ac);

		return -1;
	}
	q[LWS_PRE] = (char)((len >> 8) & 0xff);
	q[LWS_PRE + 1] = (char)(len & 0xff);
	memcpy(q + LWS_PRE + 2, buf, len);
	pss->tot += len;

	return 0;
}

/*
 * We have to do the ac listing at this level, because there can be too large
 * a number to metrics tags to iterate that can fit in a reasonable buffer.
 */

static int
lws_metrics_om_format(struct pss *pss, lws_metric_pub_t *pub, const char *nm)
{
	char buf[1200], *p = buf, *end = buf + sizeof(buf) - 1, tmp[512];
	lws_usec_t t = lws_now_usecs();

	if (pub->flags & LWSMTFL_REPORT_HIST) {
		lws_metric_bucket_t *buck = pub->u.hist.head;

		p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
				  "%s_count %llu\n",
				  nm, (unsigned long long)
				  pub->u.hist.total_count);

		while (buck) {
			lws_strncpy(tmp, lws_metric_bucket_name(buck),
				    sizeof(tmp));

			p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
					  "%s{%s} %llu\n", nm, tmp,
					  (unsigned long long)buck->count);

			lws_metrics_om_ac_stash(pss, buf,
						lws_ptr_diff_size_t(p, buf));
			p = buf;

			buck = buck->next;
		}

		goto happy;
	}

	if (!pub->u.agg.count[METRES_GO] && !pub->u.agg.count[METRES_NOGO])
		return 0;

	if (pub->u.agg.count[METRES_GO])
		p += lws_metrics_om_format_agg(pub, nm, t, METRES_GO, p,
					       lws_ptr_diff_size_t(end, p));

	if (!(pub->flags & LWSMTFL_REPORT_ONLY_GO) &&
	    pub->u.agg.count[METRES_NOGO])
		p += lws_metrics_om_format_agg(pub, nm, t, METRES_NOGO, p,
					       lws_ptr_diff_size_t(end, p));

	if (pub->flags & LWSMTFL_REPORT_MEAN)
		p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
				  "%s_min %llu\n"
				  "%s_max %llu\n",
				  nm, (unsigned long long)pub->u.agg.min,
				  nm, (unsigned long long)pub->u.agg.max);

happy:
	return lws_metrics_om_ac_stash(pss, buf, lws_ptr_diff_size_t(p, buf));
}

static int
append_om_metric(lws_metric_pub_t *pub, void *user)
{
	struct pss *pss = (struct pss *)user;
	char nm[64];
	size_t nl;

	/*
	 * Convert lws_metrics to openmetrics metrics data, stashing into an
	 * lwsac without backfill.  Since it's not backfilling, use areas are in
	 * linear sequence simplifying walking them.  Limiting the lwsac alloc
	 * to less than a typical mtu means we can write one per write
	 * efficiently
	 */

	lws_strncpy(nm, pub->name, sizeof(nm));
	nl = strlen(nm);

	openmetrics_san(nm, nl);

	return lws_metrics_om_format(pss, pub, nm);
}

#if defined(__linux__)
static int
grabfile(const char *fi, char *buf, size_t len)
{
	int n, fd = lws_open(fi, LWS_O_RDONLY);

	buf[0] = '\0';
	if (fd < 0)
		return -1;

	n = (int)read(fd, buf, len - 1);
	close(fd);
	if (n < 0) {
		buf[0] = '\0';
		return -1;
	}

	buf[n] = '\0';
	if (n > 0 && buf[n - 1] == '\n')
		buf[--n] = '\0';

	return n;
}
#endif

/*
 * Let's pregenerate the output into an lwsac all at once and
 * then spool it back to the peer afterwards
 *
 * - there's not going to be that much of it (a few kB)
 * - we then know the content-length for the headers
 * - it's stretchy to arbitrary numbers of metrics
 * - lwsac block list provides the per-metric structure to
 *   hold the data in a way we can walk to write it simply
 */

int
ome_prepare(struct lws_context *ctx, struct pss *pss)
{
	char buf[1224], *start = buf + LWS_PRE, *p = start,
	     *end = buf + sizeof(buf) - 1;
	char hn[64];

	pss->tot = 0;

	/*
	 * Target metadata
	 */

	hn[0] = '\0';
	gethostname(hn, sizeof(hn) - 1);
	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
			  "# TYPE target info\n"
			  "# HELP target Target metadata\n"
			  "target_info{hostname=\"%s\"", hn);

#if defined(__linux__)
	if (grabfile("/proc/self/cmdline", hn, sizeof(hn)))
		p += lws_snprintf((char *)p, lws_ptr_diff_size_t(end, p),
				  ",cmdline=\"%s\"", hn);
#endif

	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p), "} 1\n");

	if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
				    lws_ptr_diff_size_t(p, buf + LWS_PRE)))
		return 1;

	/* lws version */

	p = start;
	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
			  "# TYPE lws_info info\n"
			  "# HELP lws_info Version of lws producing this\n"
			  "lws_info{version=\"%s\"} 1\n", LWS_BUILD_HASH);
	if (lws_metrics_om_ac_stash(pss, (const char *)buf + LWS_PRE,
				    lws_ptr_diff_size_t(p, buf + LWS_PRE)))
		return 1;

	/* system scalars */

#if defined(__linux__)
	if (grabfile("/proc/loadavg", hn, sizeof(hn))) {
		char *sp = strchr(hn, ' ');
		if (sp) {
			p = start;
			p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
					  "load_1m %.*s\n",
					  lws_ptr_diff(sp, hn), hn);
			if (lws_metrics_om_ac_stash(pss,
						    (char *)buf + LWS_PRE,
						    lws_ptr_diff_size_t(p,
								start)))
				return 1;
		}
	}
#endif

	if (lws_metrics_foreach(ctx, pss, append_om_metric))
		return 1;

	p = start;
	p += lws_snprintf(p, lws_ptr_diff_size_t(end, p),
			  "# EOF\n");
	if (lws_metrics_om_ac_stash(pss, (char *)buf + LWS_PRE,
				    lws_ptr_diff_size_t(p, buf + LWS_PRE)))
		return 1;

	pss->walk = pss->ac;

	return 0;
}

#if defined(LWS_WITH_SERVER)

/* 1) direct http export for scraper */

static int
callback_lws_openmetrics_export(struct lws *wsi,
				enum lws_callback_reasons reason,
				void *user, void *in, size_t len)
{
	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
		      *end = buf + sizeof(buf) - 1, *ip;
	struct lws_context *cx = lws_get_context(wsi);
	struct pss *pss = (struct pss *)user;
	unsigned int m, wm;

	switch (reason) {
	case LWS_CALLBACK_HTTP:

		ome_prepare(cx, pss);

		p = start;
		if (lws_add_http_common_headers(wsi, HTTP_STATUS_OK,
						"application/openmetrics-text; "
						"version=1.0.0; charset=utf-8",
						pss->tot, &p, end) ||
		    lws_finalize_write_http_header(wsi, start, &p, end))
			return 1;

		lws_callback_on_writable(wsi);

		return 0;

	case LWS_CALLBACK_CLOSED_HTTP:
		lwsac_free(&pss->ac);
		break;

	case LWS_CALLBACK_HTTP_WRITEABLE:
		if (!pss->walk)
			return 0;

		do {
			ip = (uint8_t *)pss->walk +
				lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
			m = (unsigned int)((ip[0] << 8) | ip[1]);

			/* coverity */
			if (m > lwsac_get_tail_pos(pss->walk) -
				lwsac_sizeof(pss->walk == pss->ac))
				return -1;

			if (lws_ptr_diff_size_t(end, p) < m)
				break;

			memcpy(p, ip + 2, m);
			p += m;

			pss->walk = lwsac_get_next(pss->walk);
		} while (pss->walk);

		if (!lws_ptr_diff_size_t(p, start)) {
			lwsl_err("%s: stuck\n", __func__);
			return -1;
		}

		wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;

		if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
			      (enum lws_write_protocol)wm) < 0)
			return 1;

		if (!pss->walk) {
			 if (lws_http_transaction_completed(wsi))
				return -1;
		} else
			lws_callback_on_writable(wsi);

		return 0;

	default:
		break;
	}

	return lws_callback_http_dummy(wsi, reason, user, in, len);
}

static struct pss *
omc_lws_om_get_other_side_pss_client(struct vhd *vhd, struct pss *pss)
{
	/*
	 * Search through our partner's clients list looking for one with the
	 * same proxy path
	 */
	lws_start_foreach_dll(struct lws_dll2 *, d,
			vhd->bind_partner_vhd->clients.head) {
		struct pss *apss = lws_container_of(d, struct pss, list);

		if (!strcmp(pss->proxy_path, apss->proxy_path))
			return apss;

	} lws_end_foreach_dll(d);

	return NULL;
}

/* 2) "lws-openmetrics-prox-agg": http server export via proxy to connected clients */

static int
callback_lws_openmetrics_prox_agg(struct lws *wsi,
				  enum lws_callback_reasons reason,
				  void *user, void *in, size_t len)
{
	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
		      *end = buf + sizeof(buf) - 1, *ip;
	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
				lws_get_vhost(wsi), lws_get_protocol(wsi));
	struct lws_context *cx = lws_get_context(wsi);
	struct pss *pss = (struct pss *)user, *partner_pss;
	unsigned int m, wm;

	switch (reason) {

	case LWS_CALLBACK_PROTOCOL_INIT:
		lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));
		/*
		 * We get told what to do when we are bound to the vhost
		 */
		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
				lws_get_protocol(wsi), sizeof(struct vhd));
		if (!vhd) {
			lwsl_err("%s: vhd alloc failed\n", __func__);
			return 0;
		}

		vhd->cx = cx;

		/*
		 * Try to bind to the counterpart server in the proxy, binding
		 * to the right one by having a common bind name set in a pvo.
		 * We don't know who will get instantiated last, so both parts
		 * try to bind if not already bound
		 */

		if (!lws_pvo_get_str(in, "proxy-side-bind-name",
				     &vhd->proxy_side_bind_name)) {
			/*
			 * Attempt to find the vhd that belongs to a vhost
			 * that has instantiated protocol
			 * "lws-openmetrics-prox-server", and has set pvo
			 * "proxy-side-bind-name" on it to whatever our
			 * vhd->proxy_side_bind_name was also set to.
			 *
			 * If found, inform the two sides of the same proxy
			 * what their partner vhd is
			 */
			lws_strncpy(vhd->sanity, "isagg", sizeof(vhd->sanity));
			vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
						"lws-openmetrics-prox-server",
						"proxy-side-bind-name",
						vhd->proxy_side_bind_name);
			if (vhd->bind_partner_vhd) {
				assert(!strcmp(vhd->bind_partner_vhd->sanity, "isws"));
				lwsl_notice("%s: proxy binding OK\n", __func__);
				vhd->bind_partner_vhd->bind_partner_vhd = vhd;
			}
		} else {
			lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
			return 1;
		}

		break;

	case LWS_CALLBACK_PROTOCOL_DESTROY:
		if (vhd)
			lws_sul_cancel(&vhd->sul);
		break;

	case LWS_CALLBACK_HTTP:

		/*
		 * The scraper has connected to us, the local side of the proxy,
		 * we need to match what it wants to
		 */

		if (!vhd->bind_partner_vhd)
			return 0;

		lws_strnncpy(pss->proxy_path, (const char *)in, len,
			     sizeof(pss->proxy_path));

		if (pss->list.owner) {
			lwsl_warn("%s: double HTTP?\n", __func__);
			return 0;
		}

		pss->wsi = wsi;

		lws_start_foreach_dll(struct lws_dll2 *, d,
				      vhd->bind_partner_vhd->clients.head) {
			struct pss *apss = lws_container_of(d, struct pss, list);

			if (!strcmp((const char *)in, apss->proxy_path)) {
				apss->trigger = 1;
				lws_callback_on_writable(apss->wsi);

				/* let's add him on the http server vhd list */

				lws_dll2_add_tail(&pss->list, &vhd->clients);
				return 0;
			}

		} lws_end_foreach_dll(d);

		return 0;

	case LWS_CALLBACK_CLOSED_HTTP:
		lwsac_free(&pss->ac);
		lws_dll2_remove(&pss->list);
		break;

	case LWS_CALLBACK_HTTP_WRITEABLE:

		if (!pss->walk)
			return 0;

		/* locate the wss side if it's still around */

		partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
		if (!partner_pss)
			return -1;

		do {
			ip = (uint8_t *)pss->walk +
				lwsac_sizeof(pss->walk == partner_pss->ac) + LWS_PRE;
			m = (unsigned int)((ip[0] << 8) | ip[1]);

			/* coverity */
			if (m > lwsac_get_tail_pos(pss->walk) -
				lwsac_sizeof(pss->walk == partner_pss->ac))
				return -1;

			if (lws_ptr_diff_size_t(end, p) < m)
				break;

			memcpy(p, ip + 2, m);
			p += m;

			pss->walk = lwsac_get_next(pss->walk);
		} while (pss->walk);

		if (!lws_ptr_diff_size_t(p, start)) {
			lwsl_err("%s: stuck\n", __func__);
			return -1;
		}

		wm = pss->walk ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;

		if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
			      (enum lws_write_protocol)wm) < 0)
			return 1;

		if (!pss->walk) {
			lwsl_info("%s: whole msg proxied to scraper\n", __func__);
			lws_dll2_remove(&pss->list);
			lwsac_free(&partner_pss->ac);
//			if (lws_http_transaction_completed(wsi))
			return -1;
		} else
			lws_callback_on_writable(wsi);

		return 0;

	default:
		break;
	}

	return lws_callback_http_dummy(wsi, reason, user, in, len);
}

/* 3) "lws-openmetrics-prox-server": ws server side of metrics proxy, for
 *    ws clients to connect to */

static int
callback_lws_openmetrics_prox_server(struct lws *wsi,
				     enum lws_callback_reasons reason,
				     void *user, void *in, size_t len)
{
	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
		      *end = buf + sizeof(buf) - 1;
	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
				lws_get_vhost(wsi), lws_get_protocol(wsi));
	struct lws_context *cx = lws_get_context(wsi);
	struct pss *pss = (struct pss *)user, *partner_pss;

	switch (reason) {

	case LWS_CALLBACK_PROTOCOL_INIT:
		/*
		 * We get told what to do when we are bound to the vhost
		 */

		lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__, lws_vh_tag(lws_get_vhost(wsi)));

		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
				lws_get_protocol(wsi), sizeof(struct vhd));
		if (!vhd) {
			lwsl_err("%s: vhd alloc failed\n", __func__);
			return 0;
		}

		vhd->cx = cx;

		/*
		 * Try to bind to the counterpart server in the proxy, binding
		 * to the right one by having a common bind name set in a pvo.
		 * We don't know who will get instantiated last, so both parts
		 * try to bind if not already bound
		 */

		if (!lws_pvo_get_str(in, "proxy-side-bind-name",
				     &vhd->proxy_side_bind_name)) {
			/*
			 * Attempt to find the vhd that belongs to a vhost
			 * that has instantiated protocol
			 * "lws-openmetrics-prox-server", and has set pvo
			 * "proxy-side-bind-name" on it to whatever our
			 * vhd->proxy_side_bind_name was also set to.
			 *
			 * If found, inform the two sides of the same proxy
			 * what their partner vhd is
			 */
			lws_strncpy(vhd->sanity, "isws", sizeof(vhd->sanity));
			vhd->bind_partner_vhd = lws_vhd_find_by_pvo(cx,
						"lws-openmetrics-prox-agg",
						"proxy-side-bind-name",
						vhd->proxy_side_bind_name);
			if (vhd->bind_partner_vhd) {
				assert(!strcmp(vhd->bind_partner_vhd->sanity, "isagg"));
				lwsl_notice("%s: proxy binding OK\n", __func__);
				vhd->bind_partner_vhd->bind_partner_vhd = vhd;
			}
		} else {
			lwsl_warn("%s: proxy-side-bind-name required\n", __func__);
			return 1;
		}

		break;

	case LWS_CALLBACK_PROTOCOL_DESTROY:
		break;

	case LWS_CALLBACK_ESTABLISHED:
		/*
		 * a client has joined... we need to add his pss to our list
		 * of live, joined clients
		 */

		/* mark us as waiting for the reference name from the client */
		pss->greet = 1;
		pss->wsi = wsi;
		lws_validity_confirmed(wsi);

		return 0;

	case LWS_CALLBACK_CLOSED:
		/*
		 * a client has parted
		 */
		lws_dll2_remove(&pss->list);
		lwsl_warn("%s: client %s left (%u)\n", __func__,
				pss->proxy_path,
				(unsigned int)vhd->clients.count);
		lwsac_free(&pss->ac);

		/* let's kill the scraper connection accordingly, if still up */
		partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
		if (partner_pss)
			lws_wsi_close(partner_pss->wsi, LWS_TO_KILL_ASYNC);
		break;

	case LWS_CALLBACK_RECEIVE:
		if (pss->greet) {
			pss->greet = 0;
			lws_strnncpy(pss->proxy_path, (const char *)in, len,
				     sizeof(pss->proxy_path));

			lws_validity_confirmed(wsi);
			lwsl_notice("%s: received greet '%s'\n", __func__,
				    pss->proxy_path);
			/*
			 * we need to add his pss to our list of configured,
			 * live, joined clients
			 */
			lws_dll2_add_tail(&pss->list, &vhd->clients);
			return 0;
		}

		/*
		 * He's sending us his results... let's collect chunks into the
		 * pss lwsac before worrying about anything else
		 */

		if (lws_is_first_fragment(wsi))
			pss->tot = 0;

		lws_metrics_om_ac_stash(pss, (const char *)in, len);

		if (lws_is_final_fragment(wsi)) {
			struct pss *partner_pss;

			lwsl_info("%s: ws side received complete msg\n",
					__func__);

			/* the lwsac is complete */
			pss->walk = pss->ac;
			partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
			if (!partner_pss) {
				lwsl_notice("%s: no partner A\n", __func__);
				return -1;
			}

			/* indicate to scraper side we want to issue now */

			p = start;
			if (lws_add_http_common_headers(partner_pss->wsi, HTTP_STATUS_OK,
							"application/openmetrics-text; "
							"version=1.0.0; charset=utf-8",
							pss->tot, &p, end) ||
			    lws_finalize_write_http_header(partner_pss->wsi,
							    start, &p, end))
				return -1;

			/* indicate to scraper side we want to issue now */

			partner_pss->walk = pss->ac;
			partner_pss->trigger = 1;
			lws_callback_on_writable(partner_pss->wsi);
		}

		return 0;

	case LWS_CALLBACK_SERVER_WRITEABLE:
		if (!pss->trigger)
			return 0;

		pss->trigger = 0;

		partner_pss = omc_lws_om_get_other_side_pss_client(vhd, pss);
		if (!partner_pss) {
			lwsl_err("%s: no partner\n", __func__);
			return 0;
		}

		lwsl_info("%s: sending trigger to client\n", __func__);

		*start = 'x';
		if (lws_write(wsi, start, 1,
			      (enum lws_write_protocol)LWS_WRITE_TEXT) < 0)
			return 1;

		lws_validity_confirmed(wsi);

		return 0;

	default:
		break;
	}

	return lws_callback_http_dummy(wsi, reason, user, in, len);
}
#endif

#if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)

/* 4) ws client that keeps wss connection up to metrics proxy ws server */

static int
callback_lws_openmetrics_prox_client(struct lws *wsi,
				     enum lws_callback_reasons reason,
				     void *user, void *in, size_t len)
{
	unsigned char buf[1224], *start = buf + LWS_PRE, *p = start,
		      *end = buf + sizeof(buf) - 1, *ip;
	struct vhd *vhd = (struct vhd *)lws_protocol_vh_priv_get(
				lws_get_vhost(wsi), lws_get_protocol(wsi));
	struct lws_context *cx = lws_get_context(wsi);
	struct pss *pss = (struct pss *)user;
	unsigned int m, wm;
	const char *cp;
	char first;

	switch (reason) {

	case LWS_CALLBACK_PROTOCOL_INIT:

		lwsl_notice("%s: PROTOCOL_INIT on %s\n", __func__,
					lws_vh_tag(lws_get_vhost(wsi)));


		/*
		 * We get told what to do when we are bound to the vhost
		 */
		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
				lws_get_protocol(wsi), sizeof(struct vhd));
		if (!vhd)
			return 0;

		vhd->cx = cx;
		vhd->vhost = lws_get_vhost(wsi);

		/* the proxy server uri */

		if (lws_pvo_get_str(in, "ws-server-uri", &cp)) {
			lwsl_err("%s: ws-server-uri pvo required\n", __func__);

			return 1;
		}
		lws_strncpy(vhd->ws_server_uri, cp, sizeof(vhd->ws_server_uri));

		/* how we should be referenced at the proxy */

		if (lws_pvo_get_str(in, "metrics-proxy-path", &cp)) {
			lwsl_err("%s: metrics-proxy-path pvo required\n", __func__);

			return 1;
		}
		lws_strncpy(vhd->metrics_proxy_path, cp, sizeof(vhd->metrics_proxy_path));

		/* the shared secret to authenticate us as allowed to join */

		if (lws_pvo_get_str(in, "ba-secret", &cp)) {
			lwsl_err("%s: ba-secret pvo required\n", __func__);

			return 1;
		}
		lws_strncpy(vhd->ba_secret, cp, sizeof(vhd->ba_secret));

		lwsl_notice("%s: scheduling connect %s %s %s\n", __func__,
				vhd->ws_server_uri, vhd->metrics_proxy_path, vhd->ba_secret);

		lws_validity_confirmed(wsi);
		lws_sul_schedule(cx, 0, &vhd->sul, omc_connect_client, 1);
		break;

	case LWS_CALLBACK_PROTOCOL_DESTROY:
		if (vhd)
			lws_sul_cancel(&vhd->sul);
		break;

	case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
	{
		unsigned char **pp = (unsigned char **)in, *pend = (*pp) + len;
		char b[128];

		/* authorize ourselves to the metrics proxy using basic auth */

		if (lws_http_basic_auth_gen("metricsclient", vhd->ba_secret,
					    b, sizeof(b)))
			break;

		if (lws_add_http_header_by_token(wsi,
						 WSI_TOKEN_HTTP_AUTHORIZATION,
						 (unsigned char *)b,
						 (int)strlen(b), pp, pend))
			return -1;

		break;
	}

	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
			 in ? (char *)in : "(null)");
		goto do_retry;

	case LWS_CALLBACK_CLIENT_ESTABLISHED:
		lwsl_warn("%s: connected to ws metrics agg server\n", __func__);
		pss->greet = 1;
		lws_callback_on_writable(wsi);
		lws_validity_confirmed(wsi);
		return 0;

	case LWS_CALLBACK_CLIENT_CLOSED:
		lwsl_notice("%s: client closed\n", __func__);
		lwsac_free(&pss->ac);
		goto do_retry;

	case LWS_CALLBACK_CLIENT_RECEIVE:
		/*
		 * Proxy serverside sends us something to trigger us to create
		 * our metrics message and send it back over the ws link
		 */
		ome_prepare(cx, pss);
		pss->walk = pss->ac;
		lws_callback_on_writable(wsi);
		lwsl_info("%s: dump requested\n", __func__);
		break;

	case LWS_CALLBACK_CLIENT_WRITEABLE:
		if (pss->greet) {
			/*
			 * At first after establishing the we link, we send a
			 * message indicating to the metrics proxy how we
			 * should be referred to by the scraper to particularly
			 * select to talk to us
			 */
			lwsl_info("%s: sending greet '%s'\n", __func__,
					vhd->metrics_proxy_path);
			lws_strncpy((char *)start, vhd->metrics_proxy_path,
					sizeof(buf) - LWS_PRE);
			if (lws_write(wsi, start,
				      strlen(vhd->metrics_proxy_path),
				      LWS_WRITE_TEXT) < 0)
				return 1;

			lws_validity_confirmed(wsi);

			pss->greet = 0;
			return 0;
		}

		if (!pss->walk)
			return 0;

		/*
		 * We send the metrics dump in a single logical ws message,
		 * using ws fragmentation to split it around 1 mtu boundary
		 * and keep coming back until it's finished
		 */

		first = pss->walk == pss->ac;

		do {
			ip = (uint8_t *)pss->walk +
				lwsac_sizeof(pss->walk == pss->ac) + LWS_PRE;
			m = (unsigned int)((ip[0] << 8) | ip[1]);

			/* coverity */
			if (m > lwsac_get_tail_pos(pss->walk) -
				lwsac_sizeof(pss->walk == pss->ac)) {
				lwsl_err("%s: size blow\n", __func__);
				return -1;
			}

			if (lws_ptr_diff_size_t(end, p) < m)
				break;

			memcpy(p, ip + 2, m);
			p += m;

			pss->walk = lwsac_get_next(pss->walk);
		} while (pss->walk);

		if (!lws_ptr_diff_size_t(p, start)) {
			lwsl_err("%s: stuck\n", __func__);
			return -1;
		}

		wm = (unsigned int)lws_write_ws_flags(LWS_WRITE_TEXT, first,
						      !pss->walk);

		if (lws_write(wsi, start, lws_ptr_diff_size_t(p, start),
			      (enum lws_write_protocol)wm) < 0) {
			lwsl_notice("%s: write fail\n", __func__);
			return 1;
		}

		lws_validity_confirmed(wsi);
		lwsl_info("%s: forwarded %d\n", __func__, lws_ptr_diff(p, start));

		if (!pss->walk) {
			lwsl_info("%s: dump send completed\n", __func__);
			lwsac_free(&pss->ac);
		} else
			lws_callback_on_writable(wsi);

		return 0;

	default:
		break;
	}

	return lws_callback_http_dummy(wsi, reason, user, in, len);

do_retry:
	if (!lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
				    omc_connect_client, &vhd->retry_count))
		return 0;

	vhd->retry_count = 0;
	lws_retry_sul_schedule(cx, 0, &vhd->sul, &retry,
			       omc_connect_client, &vhd->retry_count);

	return 0;
}
#endif


LWS_VISIBLE const struct lws_protocols lws_openmetrics_export_protocols[] = {
#if defined(LWS_WITH_SERVER)
	{ /* for scraper directly: http export on listen socket */
		"lws-openmetrics",
		callback_lws_openmetrics_export,
		sizeof(struct pss),
		1024,
	},
	{ /* for scraper via ws proxy: http export on listen socket */
		"lws-openmetrics-prox-agg",
		callback_lws_openmetrics_prox_agg,
		sizeof(struct pss),
		1024,
	},
	{ /* metrics proxy server side: ws server for clients to connect to */
		"lws-openmetrics-prox-server",
		callback_lws_openmetrics_prox_server,
		sizeof(struct pss),
		1024,
	},
#endif
#if defined(LWS_WITH_CLIENT) && defined(LWS_ROLE_WS)
	{ /* client to metrics proxy: ws client to connect to metrics proxy*/
		"lws-openmetrics-prox-client",
		callback_lws_openmetrics_prox_client,
		sizeof(struct pss),
		1024,
	},
#endif
};

LWS_VISIBLE const lws_plugin_protocol_t lws_openmetrics_export = {
	.hdr = {
		"lws OpenMetrics export",
		"lws_protocol_plugin",
		LWS_BUILD_HASH,
		LWS_PLUGIN_API_MAGIC
	},

	.protocols = lws_openmetrics_export_protocols,
	.count_protocols = LWS_ARRAY_SIZE(lws_openmetrics_export_protocols),
};