1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

async-dns: update for lws_retry udp

This commit is contained in:
Andy Green 2019-09-22 03:25:09 -07:00
parent 04f99f1499
commit dabd865a5c
6 changed files with 192 additions and 139 deletions

View file

@ -70,10 +70,12 @@ lws_async_dns_query(struct lws_context *context, int tsi, const char *name,
/**
* lws_async_dns_freeaddrinfo() - decrement refcount on cached addrinfo results
*
* \param ai: the first addrinfo returned as result in the callback
* \param pai: a pointert to a pointer to first addrinfo returned as result in the callback
*
* Decrements the cache object's reference count. When it reaches zero, the
* cached object may be reaped subject to LRU rules.
*
* The pointer to the first addrinfo give in the argument is set to NULL.
*/
LWS_VISIBLE LWS_EXTERN void
lws_async_dns_freeaddrinfo(const struct addrinfo *ai);
lws_async_dns_freeaddrinfo(const struct addrinfo **ai);

View file

@ -552,7 +552,6 @@ lws_create_adopt_udp(struct lws_vhost *vhost, const char *ads, int port,
{
struct addrinfo *r, h;
char buf[16];
int n;
memset(&h, 0, sizeof(h));
h.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */

View file

@ -485,22 +485,13 @@ lws_adns_parse_udp(lws_async_dns_t *dns, const uint8_t *pkt, size_t len)
/* match both A and AAAA queries if any */
q = lws_adns_get_query(dns, 0, &dns->waiting_resp,
q = lws_adns_get_query(dns, 0, &dns->waiting,
lws_ser_ru16be(pkt + DHO_TID), NULL);
if (!q) {
/*
* if he's still waiting to send the second query, he's still
* on the .waiting_send list
*/
q = lws_adns_get_query(dns, 0, &dns->waiting_send,
lws_ser_ru16be(pkt + DHO_TID), NULL);
lwsl_notice("%s: dropping unknown query tid 0x%x\n",
__func__, lws_ser_ru16be(pkt + DHO_TID));
if (!q) {
lwsl_notice("%s: dropping unknown query tid 0x%x\n",
__func__, lws_ser_ru16be(pkt + DHO_TID));
return;
}
return;
}
/* we can get dups... drop any that have already happened */
@ -583,6 +574,7 @@ lws_adns_parse_udp(lws_async_dns_t *dns, const uint8_t *pkt, size_t len)
} else {
q->firstcache = c;
c->incomplete = q->responded != q->asked;
/*
* Only register the first one into the cache...
@ -613,6 +605,7 @@ lws_adns_parse_udp(lws_async_dns_t *dns, const uint8_t *pkt, size_t len)
* addrinfo results, if any, to all interested wsi, if any...
*/
c->incomplete = 0;
lws_async_dns_complete(q, q->firstcache);
/*

View file

@ -25,6 +25,12 @@
#include "private-lib-core.h"
#include "private-lib-async-dns.h"
static const uint32_t botable[] = { 500, 1000, 1250, 5000
/* in case everything just dog slow */ };
static const lws_retry_bo_t retry_policy = {
botable, LWS_ARRAY_SIZE(botable), LWS_ARRAY_SIZE(botable),
/* don't conceal after the last table entry */ 0, 0, 20 };
void
lws_adns_q_destroy(lws_adns_q_t *q)
{
@ -101,31 +107,150 @@ lws_async_dns_complete(lws_adns_q_t *q, lws_adns_cache_t *c)
}
static void
sul_cb_timeout(struct lws_sorted_usec_list *sul)
lws_async_dns_sul_cb_retry(struct lws_sorted_usec_list *sul)
{
lws_adns_q_t *q = lws_container_of(sul, lws_adns_q_t, sul);
lws_async_dns_complete(q, NULL);
lws_adns_q_destroy(q);
// lwsl_notice("%s\n", __func__);
lws_callback_on_writable(q->dns->wsi);
}
static void
lws_async_dns_writeable(struct lws *wsi, lws_adns_q_t *q)
{
uint8_t pkt[LWS_PRE + DNS_PACKET_LEN], *e = &pkt[sizeof(pkt)], *p, *pl;
int m, n, which;
const char *name;
// lwsl_notice("%s: %p\n", __func__, q);
/*
* our policy is to force reloading the dns server info if our
* connection ever timed out, in case it or the routing state changed
* UDP is not reliable, it can be locally dropped, or dropped
* by any intermediary or the remote peer. So even though we
* will do the write in a moment, we schedule another request
* for rewrite according to the wsi retry policy.
*
* If the result came before, we'll cancel it as part of the
* wsi close.
*
* If we have already reached the end of our concealed retries
* in the policy, just close without another write.
*/
if (lws_dll2_is_detached(&q->sul.list) &&
lws_retry_sul_schedule_retry_wsi(wsi, &q->sul, lws_async_dns_sul_cb_retry,
&q->retry)) {
/* we have reached the end of our concealed retries */
lwsl_notice("%s: failing query\n", __func__);
/*
* our policy is to force reloading the dns server info
* if our connection ever timed out, in case it or the
* routing state changed
*/
lws_async_dns_drop_server(q->context);
lws_async_dns_drop_server(q->context);
goto qfail;
}
name = (const char *)&q[1];
p = &pkt[LWS_PRE];
memset(p, 0, DHO_SIZEOF);
#if defined(LWS_WITH_IPV6)
if (!q->responded) {
/* must pick between ipv6 and ipv4 */
which = q->sent[0] >= q->sent[1];
q->sent[which]++;
q->asked = 3; /* want results for 4 & 6 before done */
} else
which = q->responded & 1;
#else
which = 0;
q->asked = 1;
#endif
/* we hack b0 of the tid to be 0 = A, 1 = AAAA */
lws_ser_wu16be(&p[DHO_TID], which ? q->tid | 1 : q->tid);
lws_ser_wu16be(&p[DHO_FLAGS], (1 << 8));
lws_ser_wu16be(&p[DHO_NQUERIES], 1);
p += DHO_SIZEOF;
/* start of label-formatted qname */
pl = p++;
do {
if (*name == '.' || !*name) {
*pl = lws_ptr_diff(p, pl + 1);
pl = p;
*p++ = 0; /* also serves as terminal length */
if (!*name++)
break;
} else
*p++ = *name++;
} while (p + 6 < e);
if (p + 6 >= e) {
assert(0);
lwsl_err("%s: name too big\n", __func__);
goto qfail;
}
lws_ser_wu16be(p, which ? LWS_ADNS_RECORD_AAAA :
LWS_ADNS_RECORD_A);
p += 2;
lws_ser_wu16be(p, 1); /* IN class */
p += 2;
assert(p < pkt + sizeof(pkt) - LWS_PRE);
n = lws_ptr_diff(p, pkt + LWS_PRE);
/*
fd = lws_get_socket_fd(wsi);
if (fd < 0)
goto qfail;
m = send(fd, pkt + LWS_PRE, n, 0);
*/
m = lws_write(wsi, pkt + LWS_PRE, n, 0);
if (m != n) {
lwsl_notice("%s: dns write failed %d %d\n", __func__,
m, n);
goto qfail;
}
#if defined(LWS_WITH_IPV6)
if (!q->responded && q->sent[0] != q->sent[1])
lws_callback_on_writable(wsi);
#endif
/* if we did anything, check one more time */
lws_callback_on_writable(wsi);
return;
qfail:
lwsl_warn("%s: failing query doing NULL completion\n", __func__);
/*
* in ipv6 case, we made a cache entry for the first response but
* evidently the second response didn't come in time, purge the
* incomplete cache entry
*/
if (q->firstcache)
lws_adns_cache_destroy(q->firstcache);
lws_async_dns_complete(q, NULL);
lws_adns_q_destroy(q);
}
static int
callback_async_dns(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
uint8_t pkt[LWS_PRE + DNS_PACKET_LEN], *e = &pkt[sizeof(pkt)], *p, *pl;
struct lws_async_dns *dns = &(lws_get_context(wsi)->async_dns);
struct lws_dll2 *d;
const char *name;
lws_adns_q_t *q;
int fd, m, n;
switch (reason) {
@ -143,102 +268,26 @@ callback_async_dns(struct lws *wsi, enum lws_callback_reasons reason,
// lwsl_user("LWS_CALLBACK_RAW_RX (%d)\n", (int)len);
//lwsl_hexdump_level(LLL_NOTICE, in, len);
lws_adns_parse_udp(dns, in, len);
return 0;
break;
case LWS_CALLBACK_RAW_WRITEABLE:
// lwsl_notice("%s: WRITABLE\n", __func__);
d = lws_dll2_get_head(&dns->waiting_send);
if (!d)
return 0;
q = lws_container_of(d, lws_adns_q_t, list);
name = (const char *)&q[1];
p = &pkt[LWS_PRE];
memset(p, 0, DHO_SIZEOF);
/* we hack b0 of the tid to be 0 = A, 1 = AAAA */
lws_ser_wu16be(&p[DHO_TID], q->asked ? q->tid | 1 :
q->tid);
lws_ser_wu16be(&p[DHO_FLAGS], (1 << 8));
lws_ser_wu16be(&p[DHO_NQUERIES], 1);
p += DHO_SIZEOF;
/* start of label-formatted qname */
pl = p++;
do {
if (*name == '.' || !*name) {
*pl = lws_ptr_diff(p, pl + 1);
pl = p;
*p++ = 0; /* also serves as terminal length */
if (!*name++)
break;
} else
*p++ = *name++;
} while (p + 6 < e);
if (p + 6 >= e) {
assert(0);
lwsl_err("%s: name too big\n", __func__);
goto qfail;
}
lws_ser_wu16be(p, q->asked ? LWS_ADNS_RECORD_AAAA :
LWS_ADNS_RECORD_A);
p += 2;
lws_ser_wu16be(p, 1); /* IN class */
p += 2;
assert(p < pkt + sizeof(pkt) - LWS_PRE);
n = lws_ptr_diff(p, pkt + LWS_PRE);
fd = lws_get_socket_fd(wsi);
if (fd < 0)
break;
m = send(fd, pkt + LWS_PRE, n, 0);
if (m != n) {
lwsl_notice("%s: dns write failed %d %d\n", __func__,
m, n);
goto qfail;
}
/* move us to the "waiting for response" list */
#if defined(LWS_WITH_IPV6)
/* don't move to waiting resp until we sent both */
if (q->asked) {
q->asked |= 2;
#endif
lws_dll2_remove(&q->list);
lws_dll2_add_head(&q->list, &dns->waiting_resp);
#if defined(LWS_WITH_IPV6)
} else
lws_callback_on_writable(wsi);
#endif
q->asked |= 1;
if (lws_dll2_get_head(&dns->waiting_send))
/* more to do */
lws_callback_on_writable(wsi);
lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
dns->waiting.head) {
lws_adns_q_t *q = lws_container_of(d, lws_adns_q_t,
list);
if (lws_dll2_is_detached(&q->sul.list) &&
(!q->asked || q->responded != q->asked))
lws_async_dns_writeable(wsi, q);
} lws_end_foreach_dll_safe(d, d1);
break;
default:
break;
}
return 0;
qfail:
lws_async_dns_complete(q, NULL);
lws_adns_q_destroy(q);
return 0;
}
@ -262,12 +311,16 @@ lws_async_dns_init(struct lws_context *context)
return 1;
}
if (n != LADNS_CONF_SERVER_CHANGED)
return 0;
dns->sa46.sa4.sin_port = htons(53);
lws_write_numeric_address((uint8_t *)&dns->sa46.sa4.sin_addr.s_addr, 4,
ads, sizeof(ads));
context->async_dns.wsi = lws_create_adopt_udp(context->vhost_list, ads,
53, 0, lws_async_dns_protocol.name, NULL);
53, 0, lws_async_dns_protocol.name, NULL,
&retry_policy);
if (!dns->wsi) {
lwsl_err("%s: foreign socket adoption failed\n", __func__);
return 1;
@ -289,7 +342,7 @@ lws_adns_get_cache(lws_async_dns_t *dns, const char *name)
c = lws_container_of(d, lws_adns_cache_t, list);
cn = (const char *)&c[1];
if (name && !strcasecmp(name, cn)) {
if (name && !c->incomplete && !strcasecmp(name, cn)) {
/* Keep sorted by LRU: move to the head */
lws_dll2_remove(&c->list);
lws_dll2_add_head(&c->list, &dns->cached);
@ -328,11 +381,11 @@ sul_cb_expire(struct lws_sorted_usec_list *sul)
}
void
lws_async_dns_freeaddrinfo(const struct addrinfo *ai)
lws_async_dns_freeaddrinfo(const struct addrinfo **pai)
{
lws_adns_cache_t *c;
if (!ai)
if (!*pai)
return;
/*
@ -344,7 +397,7 @@ lws_async_dns_freeaddrinfo(const struct addrinfo *ai)
* Adjust c to the firstcache in this case.
*/
c = &((lws_adns_cache_t *)ai)[-1];
c = &((lws_adns_cache_t *)(*pai))[-1];
if (c->firstcache)
c = c->firstcache;
@ -355,6 +408,7 @@ lws_async_dns_freeaddrinfo(const struct addrinfo *ai)
assert(c->refcount > 0);
c->refcount--;
*pai = NULL;
}
void
@ -386,8 +440,7 @@ clean(struct lws_dll2 *d, void *user)
void
lws_async_dns_deinit(lws_async_dns_t *dns)
{
lws_dll2_foreach_safe(&dns->waiting_send, NULL, clean);
lws_dll2_foreach_safe(&dns->waiting_resp, NULL, clean);
lws_dll2_foreach_safe(&dns->waiting, NULL, clean);
lws_dll2_foreach_safe(&dns->cached, NULL, cache_clean);
}
@ -398,7 +451,7 @@ cancel(struct lws_dll2 *d, void *user)
lws_adns_q_t *q = lws_container_of(d, lws_adns_q_t, list);
lws_start_foreach_dll_safe(struct lws_dll2 *, d3, d4,
lws_dll2_get_head(&q->wsi_adns)) {
lws_dll2_get_head(&q->wsi_adns)) {
struct lws *w = lws_container_of(d3, struct lws, adns);
if (user == w) {
@ -417,8 +470,7 @@ lws_async_dns_cancel(struct lws *wsi)
{
lws_async_dns_t *dns = &wsi->context->async_dns;
if (!lws_dll2_foreach_safe(&dns->waiting_send, wsi, cancel))
lws_dll2_foreach_safe(&dns->waiting_resp, wsi, cancel);
lws_dll2_foreach_safe(&dns->waiting, wsi, cancel);
}
@ -443,11 +495,7 @@ lws_async_dns_get_new_tid(struct lws_context *context, lws_adns_q_t *q)
if (lws_get_random(context, &q->tid, 2) != 2)
return -1;
if (lws_dll2_foreach_safe(&dns->waiting_send,
(void *)(long)q->tid, check_tid))
continue;
if (lws_dll2_foreach_safe(&dns->waiting_resp,
if (lws_dll2_foreach_safe(&dns->waiting,
(void *)(long)q->tid, check_tid))
continue;
@ -514,9 +562,13 @@ lws_async_dns_query(struct lws_context *context, int tsi, const char *name,
c = lws_adns_get_cache(dns, name);
if (c) {
cb(wsi, name, c->results, 0, opaque);
lwsl_err("%s: using cached, c->results %p\n", __func__, c->results);
m = c->results ? LADNS_RET_FOUND : LADNS_RET_FAILED;
if (c->results)
c->refcount++;
cb(wsi, name, c->results, m, opaque);
return LADNS_RET_FOUND;
return m;
}
/*
@ -599,9 +651,7 @@ lws_async_dns_query(struct lws_context *context, int tsi, const char *name,
/* there's an ongoing query we can share the result of */
q = lws_adns_get_query(dns, qtype, &dns->waiting_send, 0, name);
if (!q)
q = lws_adns_get_query(dns, qtype, &dns->waiting_resp, 0, name);
q = lws_adns_get_query(dns, qtype, &dns->waiting, 0, name);
if (q) {
lwsl_debug("%s: dns piggybacking: %d:%s\n", __func__,
qtype, name);
@ -643,13 +693,14 @@ lws_async_dns_query(struct lws_context *context, int tsi, const char *name,
q->context = context;
q->tsi = tsi;
q->opaque = opaque;
q->dns = dns;
if (!wsi)
q->standalone_cb = cb;
lws_sul_schedule(context, tsi, &q->sul, sul_cb_timeout,
lws_now_usecs() +
(DNS_QUERY_TIMEOUT * LWS_US_PER_SEC));
/* schedule a retry according to the retry policy on the wsi */
lws_retry_sul_schedule_retry_wsi(dns->wsi, &q->sul,
lws_async_dns_sul_cb_retry, &q->retry);
p = (char *)&q[1];
while (nlen--)
@ -658,7 +709,7 @@ lws_async_dns_query(struct lws_context *context, int tsi, const char *name,
lws_callback_on_writable(dns->wsi);
lws_dll2_add_head(&q->list, &dns->waiting_send);
lws_dll2_add_head(&q->list, &dns->waiting);
lwsl_debug("%s: created new query\n", __func__);

View file

@ -43,6 +43,7 @@ typedef struct lws_adns_cache {
struct addrinfo *results;
uint8_t flags; /* b0 = has ipv4, b1 = has ipv6 */
char refcount;
char incomplete;
/* name, and then result struct addrinfos overallocated here */
} lws_adns_cache_t;
@ -51,7 +52,7 @@ typedef struct lws_adns_cache {
*/
typedef struct {
lws_sorted_usec_list_t sul; /* for query network timeout */
lws_sorted_usec_list_t sul; /* per-query write retry timer */
lws_dll2_t list;
lws_dll2_owner_t wsi_adns;
@ -59,14 +60,21 @@ typedef struct {
struct lws_context *context;
void *opaque;
struct addrinfo **last;
lws_async_dns_t *dns;
lws_adns_cache_t *firstcache;
lws_async_dns_retcode_t ret;
uint16_t tid;
uint16_t qtype;
uint16_t retry;
uint8_t tsi;
#if defined(LWS_WITH_IPV6)
uint8_t sent[2];
#else
uint8_t sent[1];
#endif
uint8_t asked;
uint8_t responded;

View file

@ -190,7 +190,7 @@ again:
fail++;
next:
lws_async_dns_freeaddrinfo(a);
lws_async_dns_freeaddrinfo(&a);
if (dtest == (int)LWS_ARRAY_SIZE(adt))
interrupted = 1;
else