diff --git a/CMakeLists.txt b/CMakeLists.txt index bc0fa191..d33dcdab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -120,6 +120,7 @@ option(LWS_WITH_ZIP_FOPS "Support serving pre-zipped files" ON) option(LWS_AVOID_SIGPIPE_IGN "Android 7+ seems to need this" OFF) option(LWS_WITH_STATS "Keep statistics of lws internal operations" OFF) option(LWS_WITH_SOCKS5 "Allow use of SOCKS5 proxy on client connections" OFF) +option(LWS_WITH_PEER_LIMITS "Track peers and restrict resources a single peer can allocate" OFF) macro(confirm_command CMD NOCMD) find_program (HAVE_CMD_${CMD} ${CMD} ) @@ -137,6 +138,7 @@ if (LWS_WITH_LWSWS) set(LWS_WITH_SERVER_STATUS 1) set(LWS_WITH_LEJP 1) set(LWS_WITH_LEJP_CONF 1) + set(LWS_WITH_PEER_LIMITS 1) endif() if (LWS_WITH_PLUGINS AND NOT LWS_WITH_LIBUV) @@ -1916,6 +1918,7 @@ message(" LWS_WITH_STATS = ${LWS_WITH_STATS}") message(" LWS_WITH_SOCKS5 = ${LWS_WITH_SOCKS5}") message(" LWS_HAVE_SYS_CAPABILITY_H = ${LWS_HAVE_SYS_CAPABILITY_H}") message(" LWS_HAVE_LIBCAP = ${LWS_HAVE_LIBCAP}") +message(" LWS_WITH_PEER_LIMITS = ${LWS_WITH_PEER_LIMITS}") message(" LWS_HAVE_ATOLL = ${LWS_HAVE_ATOLL}") message(" LWS_HAVE__ATOI64 = ${LWS_HAVE__ATOI64}") message(" LWS_HAVE_STAT32I64 = ${LWS_HAVE_STAT32I64}") diff --git a/README.coding.md b/README.coding.md index 97ecc739..34ce55c2 100644 --- a/README.coding.md +++ b/README.coding.md @@ -43,6 +43,22 @@ If you want to restrict that allocation, or increase it, you can use ulimit or similar to change the available number of file descriptors, and when restarted **libwebsockets** will adapt accordingly. +@section peer_limits optional LWS_WITH_PEER_LIMITS + +If you select `LWS_WITH_PEER_LIMITS` at cmake, then lws will track peer IPs +and monitor how many connections and ah resources they are trying to use +at one time. You can choose to limit these at context creation time, using +`info.ip_limit_ah` and `info.ip_limit_wsi`. + +Note that although the ah limit is 'soft', ie, the connection will just wait +until the IP is under the ah limit again before attaching a new ah, the +wsi limit is 'hard', lws will drop any additional connections from the +IP until it's under the limit again. + +If you use these limits, you should consider multiple clients may simultaneously +try to access the site through NAT, etc. So the limits should err on the side +of being generous, while still making it impossible for one IP to exhaust +all the server resources. @section evtloop Libwebsockets is singlethreaded diff --git a/lib/context.c b/lib/context.c index 85d2e1fb..a2ef9609 100644 --- a/lib/context.c +++ b/lib/context.c @@ -801,6 +801,10 @@ lws_create_context(struct lws_context_creation_info *info) else context->pt_serv_buf_size = 4096; +#if LWS_MAX_SMP > 1 + pthread_mutex_init(&context->lock, NULL); +#endif + #if defined(LWS_WITH_ESP32) context->last_free_heap = esp_get_free_heap_size(); #endif @@ -973,6 +977,19 @@ lws_create_context(struct lws_context_creation_info *info) context->lws_event_sigint_cb = &lws_event_sigint_cb; #endif /* LWS_USE_LIBEVENT */ +#if defined(LWS_WITH_PEER_LIMITS) + /* scale the peer hash table according to the max fds for the process, + * so that the max list depth averages 16. Eg, 1024 fd -> 64, + * 102400 fd -> 6400 + */ + context->pl_hash_elements = + (context->count_threads * context->fd_limit_per_thread) / 16; + context->pl_hash_table = lws_zalloc(sizeof(struct lws_peer *) * + context->pl_hash_elements); + context->ip_limit_ah = info->ip_limit_ah; + context->ip_limit_wsi = info->ip_limit_wsi; +#endif + lwsl_info(" mem: context: %5lu bytes (%ld ctx + (%ld thr x %d))\n", (long)sizeof(struct lws_context) + (context->count_threads * context->pt_serv_buf_size), @@ -1455,6 +1472,9 @@ LWS_VISIBLE void lws_context_destroy2(struct lws_context *context) { struct lws_vhost *vh = NULL, *vh1; +#if defined(LWS_WITH_PEER_LIMITS) + uint32_t n; +#endif lwsl_notice("%s: ctx %p\n", __func__, context); @@ -1481,10 +1501,26 @@ lws_context_destroy2(struct lws_context *context) lws_ssl_context_destroy(context); lws_plat_context_late_destroy(context); +#if defined(LWS_WITH_PEER_LIMITS) + for (n = 0; n < context->pl_hash_elements; n++) { + lws_start_foreach_llp(struct lws_peer **, peer, context->pl_hash_table[n]) { + struct lws_peer *df = *peer; + *peer = df->next; + lws_free(df); + continue; + } lws_end_foreach_llp(peer, next); + } + lws_free(context->pl_hash_table); +#endif + if (context->external_baggage_free_on_destroy) free(context->external_baggage_free_on_destroy); lws_check_deferred_free(context, 1); +#if LWS_MAX_SMP > 1 + pthread_mutex_destroy(&context->lock, NULL); +#endif + lws_free(context); } diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index 3ce469f0..629af776 100755 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -99,6 +99,12 @@ lws_free_wsi(struct lws *wsi) pt->ah_count_in_use--; } } + +#if defined(LWS_WITH_PEER_LIMITS) + lws_peer_track_wsi_close(wsi->context, wsi->peer); + wsi->peer = NULL; +#endif + lws_pt_unlock(pt); /* since we will destroy the wsi, make absolutely sure now */ @@ -3582,7 +3588,9 @@ LWS_VISIBLE LWS_EXTERN void lws_stats_log_dump(struct lws_context *context) { struct lws_vhost *v = context->vhost_list; - int n; + int n, m; + + (void)m; if (!context->updated) return; @@ -3603,6 +3611,8 @@ lws_stats_log_dump(struct lws_context *context) lwsl_notice("LWSSTATS_C_SSL_CONNECTIONS_FAILED: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_SSL_CONNECTIONS_FAILED)); lwsl_notice("LWSSTATS_C_SSL_CONNECTIONS_ACCEPTED: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_SSL_CONNECTIONS_ACCEPTED)); lwsl_notice("LWSSTATS_C_SSL_CONNS_HAD_RX: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_SSL_CONNS_HAD_RX)); + lwsl_notice("LWSSTATS_C_PEER_LIMIT_AH_DENIED: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_PEER_LIMIT_AH_DENIED)); + lwsl_notice("LWSSTATS_C_PEER_LIMIT_WSI_DENIED: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_PEER_LIMIT_WSI_DENIED)); lwsl_notice("LWSSTATS_C_TIMEOUTS: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_TIMEOUTS)); lwsl_notice("LWSSTATS_C_SERVICE_ENTRY: %8llu\n", (unsigned long long)lws_stats_get(context, LWSSTATS_C_SERVICE_ENTRY)); @@ -3633,9 +3643,7 @@ lws_stats_log_dump(struct lws_context *context) lwsl_notice("Live wsi: %8d\n", context->count_wsi_allocated); -#if defined(LWS_WITH_STATS) context->updated = 1; -#endif while (v) { if (v->lserv_wsi) { @@ -3677,6 +3685,41 @@ lws_stats_log_dump(struct lws_context *context) lws_pt_unlock(pt); } +#if defined(LWS_WITH_PEER_LIMITS) + m = 0; + for (n = 0; n < context->pl_hash_elements; n++) { + lws_start_foreach_llp(struct lws_peer **, peer, context->pl_hash_table[n]) { + m++; + } lws_end_foreach_llp(peer, next); + } + + lwsl_notice(" Peers: total active %d\n", m); + if (m > 10) { + m = 10; + lwsl_notice(" (showing 10 peers only)\n"); + } + + if (m) { + for (n = 0; n < context->pl_hash_elements; n++) { + char buf[72]; + + lws_start_foreach_llp(struct lws_peer **, peer, context->pl_hash_table[n]) { + struct lws_peer *df = *peer; + + if (!lws_plat_inet_ntop(df->af, df->addr, buf, + sizeof(buf) - 1)) + strcpy(buf, "unknown"); + + lwsl_notice(" peer %s: count wsi: %d, count ah: %d\n", + buf, df->count_wsi, df->count_ah); + + if (!--m) + break; + } lws_end_foreach_llp(peer, next); + } + } +#endif + lwsl_notice("\n"); } diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index ebc5026b..834f3952 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -2289,14 +2289,6 @@ struct lws_context_creation_info { * your local machine use your lo / loopback interface and will be * disallowed by this. */ - - /* Add new things just above here ---^ - * This is part of the ABI, don't needlessly break compatibility - * - * The below is to ensure later library versions with new - * members added above will see 0 (default) even if the app - * was not built against the newer headers. - */ int ssl_info_event_mask; /**< VHOST: mask of ssl events to be reported on LWS_CALLBACK_SSL_INFO * callback for connections on this vhost. The mask values are of @@ -2306,6 +2298,29 @@ struct lws_context_creation_info { unsigned int timeout_secs_ah_idle; /**< VHOST: seconds to allow a client to hold an ah without using it. * 0 defaults to 10s. */ + unsigned short ip_limit_ah; + /**< CONTEXT: max number of ah a single IP may use simultaneously + * 0 is no limit. This is a soft limit: if the limit is + * reached, connections from that IP will wait in the ah + * waiting list and not be able to acquire an ah until + * a connection belonging to the IP relinquishes one it + * already has. + */ + unsigned short ip_limit_wsi; + /**< CONTEXT: max number of wsi a single IP may use simultaneously. + * 0 is no limit. This is a hard limit, connections from + * the same IP will simply be dropped once it acquires the + * amount of simultanoues wsi / accepted connections + * given here. + */ + + /* Add new things just above here ---^ + * This is part of the ABI, don't needlessly break compatibility + * + * The below is to ensure later library versions with new + * members added above will see 0 (default) even if the app + * was not built against the newer headers. + */ void *_unused[8]; /**< dummy */ }; @@ -5379,6 +5394,8 @@ enum { LWSSTATS_MS_WRITABLE_DELAY, /**< aggregate delay between asking for writable and getting cb */ LWSSTATS_MS_WORST_WRITABLE_DELAY, /**< single worst delay between asking for writable and getting cb */ LWSSTATS_MS_SSL_RX_DELAY, /**< aggregate delay between ssl accept complete and first RX */ + LWSSTATS_C_PEER_LIMIT_AH_DENIED, /**< number of times we would have given an ah but for the peer limit */ + LWSSTATS_C_PEER_LIMIT_WSI_DENIED, /**< number of times we would have given a wsi but for the peer limit */ /* Add new things just above here ---^ * This is part of the ABI, don't needlessly break compatibility */ diff --git a/lib/parsers.c b/lib/parsers.c index b944c236..f67ede0b 100644 --- a/lib/parsers.c +++ b/lib/parsers.c @@ -96,6 +96,8 @@ lws_header_table_reset(struct lws *wsi, int autoservice) lws_set_timeout(wsi, PENDING_TIMEOUT_HOLDING_AH, wsi->vhost->timeout_secs_ah_idle); + time(&ah->assigned); + /* * if we inherited pending rx (from socket adoption deferred * processing), apply and free it. @@ -122,58 +124,99 @@ lws_header_table_reset(struct lws *wsi, int autoservice) } } +static void +_lws_header_ensure_we_are_on_waiting_list(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + struct lws_pollargs pa; + struct lws **pwsi = &pt->ah_wait_list; + + while (*pwsi) { + if (*pwsi == wsi) + return; + pwsi = &(*pwsi)->u.hdr.ah_wait_list; + } + + lwsl_info("%s: wsi: %p\n", __func__, wsi); + wsi->u.hdr.ah_wait_list = pt->ah_wait_list; + pt->ah_wait_list = wsi; + pt->ah_wait_list_length++; + + /* we cannot accept input then */ + + _lws_change_pollfd(wsi, LWS_POLLIN, 0, &pa); +} + +static int +__lws_remove_from_ah_waiting_list(struct lws *wsi) +{ + struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; + struct lws **pwsi =&pt->ah_wait_list; + + //if (wsi->u.hdr.ah) + // return 0; + + while (*pwsi) { + if (*pwsi == wsi) { + lwsl_info("%s: wsi %p\n", __func__, wsi); + /* point prev guy to our next */ + *pwsi = wsi->u.hdr.ah_wait_list; + /* we shouldn't point anywhere now */ + wsi->u.hdr.ah_wait_list = NULL; + pt->ah_wait_list_length--; + + return 1; + } + pwsi = &(*pwsi)->u.hdr.ah_wait_list; + } + + return 0; +} + int LWS_WARN_UNUSED_RESULT lws_header_table_attach(struct lws *wsi, int autoservice) { struct lws_context *context = wsi->context; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; struct lws_pollargs pa; - struct lws **pwsi; int n; - lwsl_info("%s: wsi %p: ah %p (tsi %d, count = %d) in\n", __func__, (void *)wsi, - (void *)wsi->u.hdr.ah, wsi->tsi, pt->ah_count_in_use); + lwsl_info("%s: wsi %p: ah %p (tsi %d, count = %d) in\n", __func__, + (void *)wsi, (void *)wsi->u.hdr.ah, wsi->tsi, + pt->ah_count_in_use); /* if we are already bound to one, just clear it down */ if (wsi->u.hdr.ah) { - lwsl_info("cleardown\n"); + lwsl_info("%s: cleardown\n", __func__); goto reset; } lws_pt_lock(pt); - pwsi = &pt->ah_wait_list; - while (*pwsi) { - if (*pwsi == wsi) { - /* if already waiting on list, if no new ah just ret */ - if (pt->ah_count_in_use == - context->max_http_header_pool) { - lwsl_notice("%s: no free ah to attach\n", __func__); - goto bail; - } - /* new ah.... remove ourselves from waiting list */ - *pwsi = wsi->u.hdr.ah_wait_list; /* set our prev to our next */ - wsi->u.hdr.ah_wait_list = NULL; /* no next any more */ - pt->ah_wait_list_length--; - break; - } - pwsi = &(*pwsi)->u.hdr.ah_wait_list; + + n = pt->ah_count_in_use == context->max_http_header_pool; +#if defined(LWS_WITH_PEER_LIMITS) + if (!n) { + n = lws_peer_confirm_ah_attach_ok(context, wsi->peer); + if (n) + lws_stats_atomic_bump(wsi->context, pt, + LWSSTATS_C_PEER_LIMIT_AH_DENIED, 1); } - /* - * pool is all busy... add us to waiting list and return that we - * weren't able to deliver it right now - */ - if (pt->ah_count_in_use == context->max_http_header_pool) { - lwsl_info("%s: adding %p to ah waiting list\n", __func__, wsi); - wsi->u.hdr.ah_wait_list = pt->ah_wait_list; - pt->ah_wait_list = wsi; - pt->ah_wait_list_length++; +#endif + if (n) { + /* + * Pool is either all busy, or we don't want to give this + * particular guy an ah right now... + * + * Make sure we are on the waiting list, and return that we + * weren't able to provide the ah + */ + _lws_header_ensure_we_are_on_waiting_list(wsi); - /* we cannot accept input then */ - - _lws_change_pollfd(wsi, LWS_POLLIN, 0, &pa); goto bail; } + __lws_remove_from_ah_waiting_list(wsi); + for (n = 0; n < context->max_http_header_pool; n++) if (!pt->ah_pool[n].in_use) break; @@ -186,6 +229,11 @@ lws_header_table_attach(struct lws *wsi, int autoservice) pt->ah_pool[n].wsi = wsi; /* mark our owner */ pt->ah_count_in_use++; +#if defined(LWS_WITH_PEER_LIMITS) + if (wsi->peer) + wsi->peer->count_ah++; +#endif + _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa); lwsl_info("%s: did attach wsi %p: ah %p: count %d (on exit)\n", __func__, @@ -200,7 +248,6 @@ reset: wsi->u.hdr.ah->rxlen = 0; lws_header_table_reset(wsi, autoservice); - time(&wsi->u.hdr.ah->assigned); #ifndef LWS_NO_CLIENT if (wsi->state == LWSS_CLIENT_UNCONNECTED) @@ -237,36 +284,13 @@ lws_header_table_is_in_detachable_state(struct lws *wsi) return ah && ah->rxpos == ah->rxlen && wsi->hdr_parsing_completed; } -void -__lws_remove_from_ah_waiting_list(struct lws *wsi) -{ - struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi]; - struct lws **pwsi =&pt->ah_wait_list; - - if (wsi->u.hdr.ah) - return; - - while (*pwsi) { - if (*pwsi == wsi) { - lwsl_info("%s: wsi %p, remv wait\n", - __func__, wsi); - *pwsi = wsi->u.hdr.ah_wait_list; - wsi->u.hdr.ah_wait_list = NULL; - pt->ah_wait_list_length--; - return; - } - pwsi = &(*pwsi)->u.hdr.ah_wait_list; - } -} - - int lws_header_table_detach(struct lws *wsi, int autoservice) { struct lws_context *context = wsi->context; struct allocated_headers *ah = wsi->u.hdr.ah; struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi]; struct lws_pollargs pa; - struct lws **pwsi; + struct lws **pwsi, **pwsi_eligible; time_t now; lws_pt_lock(pt); @@ -285,8 +309,9 @@ int lws_header_table_detach(struct lws *wsi, int autoservice) /* may not be detached while he still has unprocessed rx */ if (!lws_header_table_is_in_detachable_state(wsi)) { - lwsl_err("%s: %p: CANNOT DETACH rxpos:%d, rxlen:%d, wsi->hdr_parsing_completed = %d\n", __func__, wsi, - ah->rxpos, ah->rxlen, wsi->hdr_parsing_completed); + lwsl_err("%s: %p: CANNOT DETACH rxpos:%d, rxlen:%d, " + "wsi->hdr_parsing_completed = %d\n", __func__, wsi, + ah->rxpos, ah->rxlen, wsi->hdr_parsing_completed); return 0; } @@ -299,7 +324,7 @@ int lws_header_table_detach(struct lws *wsi, int autoservice) * we're detaching the ah, but it was held an * unreasonably long time */ - lwsl_info("%s: wsi %p: ah held %ds, " + lwsl_debug("%s: wsi %p: ah held %ds, " "ah.rxpos %d, ah.rxlen %d, mode/state %d %d," "wsi->more_rx_waiting %d\n", __func__, wsi, (int)(now - ah->assigned), @@ -315,46 +340,70 @@ int lws_header_table_detach(struct lws *wsi, int autoservice) assert(ah->in_use); wsi->u.hdr.ah = NULL; ah->wsi = NULL; /* no owner */ +#if defined(LWS_WITH_PEER_LIMITS) + lws_peer_track_ah_detach(context, wsi->peer); +#endif pwsi = &pt->ah_wait_list; - /* oh there is nobody on the waiting list... leave it at that then */ - if (!*pwsi) { - ah->in_use = 0; - pt->ah_count_in_use--; + /* oh there is nobody on the waiting list... leave the ah unattached */ + if (!*pwsi) + goto nobody_usable_waiting; - goto bail; + /* + * at least one wsi on the same tsi is waiting, give it to oldest guy + * who is allowed to take it (if any) + */ + lwsl_info("pt wait list %p\n", *pwsi); + wsi = NULL; + pwsi_eligible = NULL; + + while (*pwsi) { +#if defined(LWS_WITH_PEER_LIMITS) + /* are we willing to give this guy an ah? */ + if (!lws_peer_confirm_ah_attach_ok(context, (*pwsi)->peer)) +#endif + { + wsi = *pwsi; + pwsi_eligible = pwsi; + } +#if defined(LWS_WITH_PEER_LIMITS) + else + if (!(*pwsi)->u.hdr.ah_wait_list) + lws_stats_atomic_bump(wsi->context, pt, + LWSSTATS_C_PEER_LIMIT_AH_DENIED, 1); +#endif + pwsi = &(*pwsi)->u.hdr.ah_wait_list; } - /* somebody else on same tsi is waiting, give it to oldest guy */ + if (!wsi) /* everybody waiting already has too many ah... */ + goto nobody_usable_waiting; - lwsl_info("pt wait list %p\n", *pwsi); - while ((*pwsi)->u.hdr.ah_wait_list) - pwsi = &(*pwsi)->u.hdr.ah_wait_list; - - wsi = *pwsi; - lwsl_info("last wsi in wait list %p\n", wsi); + lwsl_info("%s: last eligible wsi in wait list %p\n", __func__, wsi); wsi->u.hdr.ah = ah; ah->wsi = wsi; /* new owner */ + /* and reset the rx state */ ah->rxpos = 0; ah->rxlen = 0; lws_header_table_reset(wsi, autoservice); - time(&wsi->u.hdr.ah->assigned); +#if defined(LWS_WITH_PEER_LIMITS) + if (wsi->peer) + wsi->peer->count_ah++; +#endif /* clients acquire the ah and then insert themselves in fds table... */ if (wsi->position_in_fds_table != -1) { lwsl_info("%s: Enabling %p POLLIN\n", __func__, wsi); - /* he has been stuck waiting for an ah, but now his wait is over, - * let him progress - */ + /* he has been stuck waiting for an ah, but now his wait is + * over, let him progress */ _lws_change_pollfd(wsi, 0, LWS_POLLIN, &pa); } /* point prev guy to next guy in list instead */ - *pwsi = wsi->u.hdr.ah_wait_list; + *pwsi_eligible = wsi->u.hdr.ah_wait_list; /* the guy who got one is out of the list */ wsi->u.hdr.ah_wait_list = NULL; pt->ah_wait_list_length--; @@ -377,11 +426,18 @@ int lws_header_table_detach(struct lws *wsi, int autoservice) assert(!!pt->ah_wait_list_length == !!(lws_intptr_t)pt->ah_wait_list); bail: lwsl_info("%s: wsi %p: ah %p (tsi=%d, count = %d)\n", __func__, - (void *)wsi, (void *)ah, wsi->tsi, - pt->ah_count_in_use); + (void *)wsi, (void *)ah, pt->tid, pt->ah_count_in_use); + lws_pt_unlock(pt); return 0; + +nobody_usable_waiting: + lwsl_info("%s: nobody usable waiting\n", __func__); + ah->in_use = 0; + pt->ah_count_in_use--; + + goto bail; } LWS_VISIBLE int diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h index 2fa434ca..fd99b23e 100644 --- a/lib/private-libwebsockets.h +++ b/lib/private-libwebsockets.h @@ -931,6 +931,34 @@ struct lws_deferred_free void *payload; }; +typedef union { +#ifdef LWS_USE_IPV6 + struct sockaddr_in6 sa6; +#endif + struct sockaddr_in sa4; +} sockaddr46; + + +#if defined(LWS_WITH_PEER_LIMITS) +struct lws_peer { + struct lws_peer *next; + struct lws_peer *peer_wait_list; + + time_t time_created; + time_t time_closed_all; + + uint8_t addr[32]; + uint32_t hash; + uint32_t count_wsi; + uint32_t count_ah; + + uint32_t total_wsi; + uint32_t total_ah; + + uint8_t af; +}; +#endif + /* * the rest is managed per-context, that includes * @@ -949,6 +977,10 @@ struct lws_context { #endif struct lws_context_per_thread pt[LWS_MAX_SMP]; struct lws_conn_stats conn_stats; +#if LWS_MAX_SMP > 1 + pthread_mutex_t lock; + int lock_depth; +#endif #ifdef _WIN32 /* different implementation between unix and windows */ struct lws_fd_hashtable fd_hashtable[FD_HASHTABLE_MODULUS]; @@ -966,6 +998,11 @@ struct lws_context { struct lws_vhost *vhost_pending_destruction_list; struct lws_plugin *plugin_list; struct lws_deferred_free *deferred_free_list; +#if defined(LWS_WITH_PEER_LIMITS) + struct lws_peer **pl_hash_table; + struct lws_peer *peer_wait_list; + time_t next_cull; +#endif void *external_baggage_free_on_destroy; const struct lws_token_limits *token_limits; @@ -1033,7 +1070,12 @@ struct lws_context { int max_http_header_data; int simultaneous_ssl_restriction; int simultaneous_ssl; - +#if defined(LWS_WITH_PEER_LIMITS) + uint32_t pl_hash_elements; /* protected by context->lock */ + uint32_t count_peers; /* protected by context->lock */ + unsigned short ip_limit_ah; + unsigned short ip_limit_wsi; +#endif unsigned int deprecated:1; unsigned int being_destroyed:1; unsigned int being_destroyed1:1; @@ -1056,6 +1098,7 @@ struct lws_context { short server_string_len; unsigned short ws_ping_pong_interval; unsigned short deprecation_pending_listen_close_count; + uint8_t max_fi; }; @@ -1198,13 +1241,6 @@ LWS_EXTERN void lws_feature_status_libevent(struct lws_context_creation_info *in #define LWS_UNIX_SOCK_ENABLED(vhost) (0) #endif -typedef union { -#ifdef LWS_USE_IPV6 - struct sockaddr_in6 sa6; -#endif - struct sockaddr_in sa4; -} sockaddr46; - enum uri_path_states { URIPS_IDLE, URIPS_SEEN_SLASH, @@ -1590,6 +1626,9 @@ struct lws { #endif #if defined(LWS_USE_LIBEV) || defined(LWS_USE_LIBEVENT) struct lws_io_watcher w_write; +#endif +#ifdef LWS_WITH_ACCESS_LOG + struct lws_access_log access_log; #endif time_t pending_timeout_limit; @@ -1607,9 +1646,10 @@ struct lws { struct lws **same_vh_protocol_prev, *same_vh_protocol_next; struct lws *timeout_list; struct lws **timeout_list_prev; -#ifdef LWS_WITH_ACCESS_LOG - struct lws_access_log access_log; +#if defined(LWS_WITH_PEER_LIMITS) + struct lws_peer *peer; #endif + void *user_space; void *opaque_parent_data; /* rxflow handling */ @@ -2088,11 +2128,27 @@ lws_pt_unlock(struct lws_context_per_thread *pt) if (!(--pt->lock_depth)) pthread_mutex_unlock(&pt->lock); } +static LWS_INLINE void +lws_context_lock(struct lws_context *context) +{ + if (!context->lock_depth++) + pthread_mutex_lock(&context->lock); +} + +static LWS_INLINE void +lws_context_unlock(struct lws_context *context) +{ + if (!(--context->lock_depth)) + pthread_mutex_unlock(&context->lock); +} + #else #define lws_pt_mutex_init(_a) (void)(_a) #define lws_pt_mutex_destroy(_a) (void)(_a) #define lws_pt_lock(_a) (void)(_a) #define lws_pt_unlock(_a) (void)(_a) +#define lws_context_lock(_a) (void)(_a) +#define lws_context_unlock(_a) (void)(_a) #endif LWS_EXTERN int LWS_WARN_UNUSED_RESULT @@ -2287,6 +2343,17 @@ static inline uint64_t lws_stats_atomic_max(struct lws_context * context, void socks_generate_msg(struct lws *wsi, enum socks_msg_type type, ssize_t *msg_len); +#if defined(LWS_WITH_PEER_LIMITS) +void +lws_peer_track_wsi_close(struct lws_context *context, struct lws_peer *peer); +int +lws_peer_confirm_ah_attach_ok(struct lws_context *context, struct lws_peer *peer); +void +lws_peer_track_ah_detach(struct lws_context *context, struct lws_peer *peer); +void +lws_peer_cull_peer_wait_list(struct lws_context *context); +#endif + #ifdef __cplusplus }; #endif diff --git a/lib/server.c b/lib/server.c index ba69a653..a9dfe937 100644 --- a/lib/server.c +++ b/lib/server.c @@ -1787,6 +1787,239 @@ bail_nuke_ah: return 1; } +#if defined(LWS_WITH_PEER_LIMITS) + +/* requires context->lock */ +static void +__lws_peer_remove_from_peer_wait_list(struct lws_context *context, + struct lws_peer *peer) +{ + struct lws_peer *df; + + lws_start_foreach_llp(struct lws_peer **, p, context->peer_wait_list) { + if (*p == peer) { + df = *p; + + *p = df->peer_wait_list; + df->peer_wait_list = NULL; + + return; + } + } lws_end_foreach_llp(p, peer_wait_list); +} + +/* requires context->lock */ +static void +__lws_peer_add_to_peer_wait_list(struct lws_context *context, + struct lws_peer *peer) +{ + __lws_peer_remove_from_peer_wait_list(context, peer); + + peer->peer_wait_list = context->peer_wait_list; + context->peer_wait_list = peer; +} + + +static struct lws_peer * +lws_get_or_create_peer(struct lws_vhost *vhost, lws_sockfd_type sockfd) +{ + struct lws_context *context = vhost->context; + socklen_t rlen = 0; + void *q; + uint8_t *q8; + struct lws_peer *peer; + uint32_t hash = 0; + int n, af = AF_INET; + struct sockaddr_storage addr; + +#ifdef LWS_USE_IPV6 + if (LWS_IPV6_ENABLED(vhost)) { + af = AF_INET6; + } +#endif + rlen = sizeof(addr); + if (getpeername(sockfd, (struct sockaddr*)&addr, &rlen)) + return NULL; + + if (af == AF_INET) { + struct sockaddr_in *s = (struct sockaddr_in *)&addr; + q = &s->sin_addr; + rlen = sizeof(&s->sin_addr); + } else +#ifdef LWS_USE_IPV6 + { + struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; + q = &s->sin6_addr; + rlen = sizeof(s->sin6_addr); + } +#else + return NULL; +#endif + + q8 = q; + for (n = 0; n < rlen; n++) + hash = (((hash << 4) | (hash >> 28)) * n) ^ q8[n]; + + hash = hash % context->pl_hash_elements; + + lws_context_lock(context); /* <====================================== */ + + lws_start_foreach_ll(struct lws_peer *, peerx, + context->pl_hash_table[hash]) { + if (peerx->af == af && !memcmp(q, peerx->addr, rlen)) { + lws_context_unlock(context); /* === */ + return peerx; + } + } lws_end_foreach_ll(peerx, next); + + lwsl_info("%s: creating new peer\n", __func__); + + peer = lws_zalloc(sizeof(*peer)); + if (!peer) { + lws_context_unlock(context); /* === */ + return NULL; + } + + context->count_peers++; + peer->next = context->pl_hash_table[hash]; + peer->hash = hash; + peer->af = af; + context->pl_hash_table[hash] = peer; + memcpy(peer->addr, q, rlen); + time(&peer->time_created); + /* + * On creation, the peer has no wsi attached, so is created on the + * wait list. When a wsi is added it is removed from the wait list. + */ + time(&peer->time_closed_all); + __lws_peer_add_to_peer_wait_list(context, peer); + + lws_context_unlock(context); /* ====================================> */ + + return peer; +} + +/* requires context->lock */ +static int +__lws_peer_destroy(struct lws_context *context, struct lws_peer *peer) +{ + lws_start_foreach_llp(struct lws_peer **, p, + context->pl_hash_table[peer->hash]) { + if (*p == peer) { + struct lws_peer *df = *p; + *p = df->next; + lws_free(df); + context->count_peers--; + + return 0; + } + } lws_end_foreach_llp(p, next); + + return 1; +} + +void +lws_peer_cull_peer_wait_list(struct lws_context *context) +{ + struct lws_peer *df; + time_t t; + + time(&t); + + if (context->next_cull && t < context->next_cull) + return; + + lws_context_lock(context); /* <====================================== */ + + context->next_cull = t + 5; + + lws_start_foreach_llp(struct lws_peer **, p, context->peer_wait_list) { + if (t - (*p)->time_closed_all > 10) { + df = *p; + + /* remove us from the peer wait list */ + *p = df->peer_wait_list; + df->peer_wait_list = NULL; + + __lws_peer_destroy(context, df); + continue; /* we already point to next, if any */ + } + } lws_end_foreach_llp(p, peer_wait_list); + + lws_context_unlock(context); /* ====================================> */ +} + +static void +lws_peer_add_wsi(struct lws_context *context, struct lws_peer *peer, + struct lws *wsi) +{ + if (!peer) + return; + + lws_context_lock(context); /* <====================================== */ + + peer->count_wsi++; + wsi->peer = peer; + __lws_peer_remove_from_peer_wait_list(context, peer); + + lws_context_unlock(context); /* ====================================> */ +} + +void +lws_peer_track_wsi_close(struct lws_context *context, struct lws_peer *peer) +{ + if (!peer) + return; + + lws_context_lock(context); /* <====================================== */ + + assert(peer->count_wsi); + peer->count_wsi--; + + if (!peer->count_wsi && !peer->count_ah) { + /* + * in order that we can accumulate peer activity correctly + * allowing for periods when the peer has no connections, + * we don't synchronously destroy the peer when his last + * wsi closes. Instead we mark the time his last wsi + * closed and add him to a peer_wait_list to be reaped + * later if no further activity is coming. + */ + time(&peer->time_closed_all); + __lws_peer_add_to_peer_wait_list(context, peer); + } + + lws_context_unlock(context); /* ====================================> */ +} + +int +lws_peer_confirm_ah_attach_ok(struct lws_context *context, struct lws_peer *peer) +{ + if (!peer) + return 0; + + if (context->ip_limit_ah && peer->count_ah >= context->ip_limit_ah) { + lwsl_info("peer reached ah limit %d, deferring\n", + context->ip_limit_ah); + + return 1; + } + + return 0; +} + +void +lws_peer_track_ah_detach(struct lws_context *context, struct lws_peer *peer) +{ + if (!peer) + return; + + assert(peer->count_ah); + peer->count_ah--; +} + +#endif + static int lws_get_idlest_tsi(struct lws_context *context) { @@ -1964,15 +2197,41 @@ lws_adopt_descriptor_vhost(struct lws_vhost *vh, lws_adoption_type type, struct lws *parent) { struct lws_context *context = vh->context; - struct lws *new_wsi = lws_create_new_server_wsi(vh); + struct lws *new_wsi; struct lws_context_per_thread *pt; int n, ssl = 0; +#if defined(LWS_WITH_PEER_LIMITS) + struct lws_peer *peer = NULL; + + if (type & LWS_ADOPT_SOCKET && !(type & LWS_ADOPT_WS_PARENTIO)) { + peer = lws_get_or_create_peer(vh, fd.sockfd); + + if (!peer) { + lwsl_err("OOM creating peer\n"); + return NULL; + } + if (context->ip_limit_wsi && + peer->count_wsi >= context->ip_limit_wsi) { + lwsl_notice("Peer reached wsi limit %d\n", + context->ip_limit_wsi); + lws_stats_atomic_bump(context, &context->pt[0], + LWSSTATS_C_PEER_LIMIT_WSI_DENIED, 1); + return NULL; + } + } +#endif + + new_wsi = lws_create_new_server_wsi(vh); if (!new_wsi) { if (type & LWS_ADOPT_SOCKET && !(type & LWS_ADOPT_WS_PARENTIO)) compatible_close(fd.sockfd); return NULL; } +#if defined(LWS_WITH_PEER_LIMITS) + if (peer) + lws_peer_add_wsi(context, peer, new_wsi); +#endif pt = &context->pt[(int)new_wsi->tsi]; lws_stats_atomic_bump(context, pt, LWSSTATS_C_CONNECTIONS, 1); @@ -2519,6 +2778,13 @@ try_pollout: clilen = sizeof(cli_addr); lws_latency_pre(context, wsi); + + /* + * We cannot identify the peer who is in the listen + * socket connect queue before we accept it; even if + * we could, not accepting it due to PEER_LIMITS would + * block the connect queue for other legit peers. + */ accept_fd = accept(pollfd->fd, (struct sockaddr *)&cli_addr, &clilen); lws_latency(context, wsi, "listener accept", accept_fd, diff --git a/lib/service.c b/lib/service.c index b5b9200d..73c884d0 100644 --- a/lib/service.c +++ b/lib/service.c @@ -839,6 +839,10 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t lws_check_deferred_free(context, 0); +#if defined(LWS_WITH_PEER_LIMITS) + lws_peer_cull_peer_wait_list(context); +#endif + /* retire unused deprecated context */ #if !defined(LWS_PLAT_OPTEE) && !defined(LWS_WITH_ESP32) #if LWS_POSIX && !defined(_WIN32) @@ -988,6 +992,7 @@ lws_service_fd_tsi(struct lws_context *context, struct lws_pollfd *pollfd, int t } } + /* the socket we came to service timed out, nothing to do */ if (timed_out) return 0; diff --git a/lws_config.h.in b/lws_config.h.in index 514afebb..3f2488ab 100644 --- a/lws_config.h.in +++ b/lws_config.h.in @@ -112,6 +112,7 @@ #cmakedefine LWS_WITH_SERVER_STATUS #cmakedefine LWS_WITH_STATEFUL_URLDECODE +#cmakedefine LWS_WITH_PEER_LIMITS /* Maximum supported service threads */ #define LWS_MAX_SMP ${LWS_MAX_SMP} diff --git a/test-server/test-server.c b/test-server/test-server.c index dd02fd24..fe903b26 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -442,6 +442,8 @@ int main(int argc, char **argv) "!DHE-RSA-AES256-SHA256:" "!AES256-GCM-SHA384:" "!AES256-SHA256"; + info.ip_limit_ah = 4; /* for testing */ + info.ip_limit_wsi = 105; /* for testing */ if (use_ssl) /* redirect guys coming on http */