1
0
Fork 0
mirror of https://github.com/warmcat/libwebsockets.git synced 2025-03-09 00:00:04 +01:00

threadpool

This commit is contained in:
Andy Green 2018-09-02 14:35:37 +08:00
parent edd7efd43d
commit ebed5e74cb
40 changed files with 2475 additions and 113 deletions

View file

@ -19,6 +19,7 @@ env:
- LWS_METHOD=nologs CMAKE_ARGS="-DLWS_WITH_NO_LOGS=ON"
- LWS_METHOD=smp CMAKE_ARGS="-DLWS_MAX_SMP=32 -DLWS_WITH_MINIMAL_EXAMPLES=1"
- LWS_METHOD=nows CMAKE_ARGS="-DLWS_ROLE_WS=0"
- LWS_METHOD=threadpool CMAKE_ARGS="-DLWS_WITH_THREADPOOL=1 -DLWS_WITH_MINIMAL_EXAMPLES=1"
os:
- linux

View file

@ -37,6 +37,7 @@ option(LWS_WITH_PEER_LIMITS "Track peers and restrict resources a single peer ca
option(LWS_WITH_ACCESS_LOG "Support generating Apache-compatible access logs" OFF)
option(LWS_WITH_RANGES "Support http ranges (RFC7233)" OFF)
option(LWS_WITH_SERVER_STATUS "Support json + jscript server monitoring" OFF)
option(LWS_WITH_THREADPOOL "Managed worker thread pool support (relies on pthreads)" ON)
option(LWS_WITH_HTTP_STREAM_COMPRESSION "Support HTTP stream compression" OFF)
option(LWS_WITH_HTTP_BROTLI "Also offer brotli http stream compression (requires LWS_WITH_HTTP_STREAM_COMPRESSION)" OFF)
option(LWS_WITH_ACME "Enable support for ACME automatic cert acquisition + maintenance (letsencrypt etc)" OFF)
@ -289,6 +290,7 @@ endif()
if (WIN32)
set(LWS_MAX_SMP 1)
set(LWS_WITH_THREADPOOL 0)
endif()
@ -749,6 +751,10 @@ set(SOURCES
lib/misc/base64-decode.c
lib/misc/lws-ring.c
lib/roles/pipe/ops-pipe.c)
if (LWS_WITH_THREADPOOL AND UNIX AND LWS_HAVE_PTHREAD_H)
list(APPEND SOURCES lib/misc/threadpool/threadpool.c)
endif()
if (LWS_ROLE_H1 OR LWS_ROLE_H2)
list(APPEND SOURCES

View file

@ -5,6 +5,10 @@
News
----
## v3.0.1 released
See the git log for the list of fixes.
## v3.0.0 released
See the changelog for info https://libwebsockets.org/git/libwebsockets/tree/changelog?h=v3.0-stable

View file

@ -182,5 +182,6 @@
#cmakedefine LWS_WITH_HTTP_STREAM_COMPRESSION
#cmakedefine LWS_WITH_HTTP_BROTLI
#cmakedefine LWS_WITH_THREADPOOL
${LWS_SIZEOFPTR_CODE}

View file

@ -0,0 +1,153 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Created with Inkscape (http://www.inkscape.org/) -->
<svg width="139.96mm" height="203.94mm" version="1.1" viewBox="0 0 139.95773 203.94206" xmlns="http://www.w3.org/2000/svg" xmlns:cc="http://creativecommons.org/ns#" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#">
<defs>
<marker id="a" overflow="visible" orient="auto">
<path transform="matrix(-.4 0 0 -.4 -4 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="m" overflow="visible" orient="auto">
<path transform="matrix(-.4 0 0 -.4 -4 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill="#2a7fff" fill-rule="evenodd" stroke="#2a7fff" stroke-width="1pt"/>
</marker>
<marker id="o" overflow="visible" orient="auto">
<path transform="matrix(-.4 0 0 -.4 -4 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill="#2a7fff" fill-rule="evenodd" stroke="#2a7fff" stroke-width="1pt"/>
</marker>
<marker id="n" overflow="visible" orient="auto">
<path transform="matrix(-.4 0 0 -.4 -4 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill="#2a7fff" fill-rule="evenodd" stroke="#2a7fff" stroke-width="1pt"/>
</marker>
<marker id="p" overflow="visible" orient="auto">
<path transform="matrix(-.4 0 0 -.4 -4 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill="#2a7fff" fill-rule="evenodd" stroke="#2a7fff" stroke-width="1pt"/>
</marker>
<marker id="Arrow1Mend" overflow="visible" orient="auto">
<path transform="matrix(-.4 0 0 -.4 -4 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="e" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="f" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="g" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="h" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="b" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="i" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="c" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="j" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="d" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<marker id="k" overflow="visible" orient="auto">
<path transform="matrix(-.2 0 0 -.2 -1.2 0)" d="m0 0 5-5-17.5 5 17.5 5z" fill-rule="evenodd" stroke="#000" stroke-width="1pt"/>
</marker>
<filter id="l" x="-.028307" y="-.019087" width="1.0566" height="1.0382" color-interpolation-filters="sRGB">
<feGaussianBlur stdDeviation="1.5622839"/>
</filter>
</defs>
<metadata>
<rdf:RDF>
<cc:Work rdf:about="">
<dc:format>image/svg+xml</dc:format>
<dc:type rdf:resource="http://purl.org/dc/dcmitype/StillImage"/>
<dc:title/>
</cc:Work>
</rdf:RDF>
</metadata>
<g transform="translate(-63.825 34.781)">
<g>
<rect x="67.574" y="-31.031" width="132.46" height="196.44" filter="url(#l)"/>
<rect x="67.013" y="-31.592" width="132.46" height="196.44" fill="#fff"/>
<rect x="71.316" y="4.5158" width="60.617" height="156.03" fill="#e3e2db" opacity=".497"/>
<circle cx="101.26" cy="21.51" r="9.1221" fill="#fca" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="101.37344" y="22.928638" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="101.37344" y="22.928638" font-family="'Open Sans'" stroke-width=".53093">queued</tspan></text>
<circle cx="101.06" cy="43.549" r="9.1221" fill="#ff0" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="101.1737" y="44.968067" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="101.1737" y="44.968067" font-family="'Open Sans'" stroke-width=".53093">running</tspan></text>
<circle cx="88.275" cy="111.47" r="9.1221" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="88.389488" y="112.88415" fill="#ffffff" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="88.389488" y="112.88415" fill="#ffffff" font-family="'Open Sans'" stroke-width=".53093">finished</tspan></text>
<circle cx="115.44" cy="111.6" r="9.1221" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="115.55593" y="113.01734" fill="#ffffff" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="115.55593" y="113.01734" fill="#ffffff" font-family="'Open Sans'" stroke-width=".53093">stopped</tspan></text>
<circle cx="101.19" cy="89.093" r="9.1221" fill="#00f" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="101.30688" y="90.511833" fill="#ffffff" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="101.30688" y="90.511833" fill="#ffffff" font-family="'Open Sans'" stroke-width=".53093">stopping</tspan></text>
</g>
<g fill="none" stroke="#000" stroke-width=".98901">
<path d="m92.669 25.172c-5.6792 3.7073-5.9327 8.623-3.0086 12.864 0.59949 0.92797 1.3564 1.75 2.2096 2.45" marker-end="url(#k)"/>
<path d="m92.794 46.725c-6.9332 3.511-5.3268 11.205-0.79902 15.314" marker-end="url(#d)"/>
<path d="m107.31 95.998c1.5624 2.1075 1.7621 1.7662 4.5277 6.3921" marker-end="url(#j)"/>
<path d="m96.922 73.892c-19.598 6.9719-17.845 24.544-15.314 29.697" marker-end="url(#c)"/>
<path d="m105.58 74.072c3.4663 2.6148 2.6371 4.0982 2.0086 6.4967" marker-end="url(#i)"/>
</g>
<text x="83.188652" y="26.350147" fill="#000000" font-family="'Open Sans'" font-size="1.9194px" letter-spacing="0px" stroke-width=".23992" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="83.188652" y="26.350147">threadpool</tspan><tspan x="83.188652" y="28.74935">worker</tspan><tspan x="83.188652" y="31.148552">thread</tspan><tspan x="83.188652" y="33.547756">takes</tspan><tspan x="83.188652" y="35.946957">task</tspan></text>
<text x="81.767693" y="52.351551" fill="#000000" font-family="'Open Sans'" font-size="1.9194px" letter-spacing="0px" stroke-width=".23992" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="81.767693" y="52.351551">worker</tspan><tspan x="81.767693" y="54.750755">produces</tspan><tspan x="81.767693" y="57.149956">a buffer</tspan><tspan x="81.767693" y="59.54916">of output</tspan></text>
<path d="m109.63 63.431c6.9332-3.511 5.3268-11.205 0.79901-15.314" fill="none" marker-end="url(#b)" stroke="#000" stroke-width=".98901"/>
<g fill="#000000" font-family="'Open Sans'" letter-spacing="0px" text-anchor="middle" word-spacing="0px">
<text x="85.033684" y="93.276115" font-size="1.9194px" stroke-width=".23992" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="85.033684" y="93.276115">nothing</tspan><tspan x="85.033684" y="95.675316">more</tspan><tspan x="85.033684" y="98.074524">to do</tspan></text>
<text x="120.33315" y="51.474964" font-size="2.1167px" stroke-width=".26458" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="120.33315" y="51.474964">buffer</tspan><tspan x="120.33315" y="54.120796">sent on</tspan><tspan x="120.33315" y="56.766632">and more</tspan><tspan x="120.33315" y="59.412464">to do</tspan></text>
<text x="113.87862" y="74.393372" font-size="2.1167px" stroke-width=".26458" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="113.87862" y="74.393372">problems</tspan><tspan x="113.87862" y="77.039207">sending</tspan></text>
</g>
<circle cx="101.19" cy="66.055" r="9.1221" fill="#f00" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="101.30688" y="67.473564" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="101.30688" y="67.473564" font-family="'Open Sans'" stroke-width=".53093">sync</tspan></text>
<path d="m115.52 120.58c-1.0042 3.6297-3.8472 3.6305-6.9278 6.0658" fill="none" marker-end="url(#h)" stroke="#000" stroke-width=".98901"/>
<path d="m89.258 120.63c1.0042 3.6297 3.8472 3.6305 6.9278 6.0658" fill="none" marker-end="url(#g)" stroke="#000" stroke-width=".98901"/>
<text x="102.9891" y="153.14995" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="102.9891" y="153.14995" font-family="'Open Sans'" stroke-width=".53093">free task</tspan></text>
<path d="m103.43 141.5c-0.56428 2.168-0.208 2.0595-0.42484 6.8954" fill="none" marker-end="url(#f)" stroke="#000" stroke-width=".98901"/>
<circle cx="102.84" cy="133.7" r="9.1221" fill="#f00" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<g fill="#000000" font-family="'Open Sans'" letter-spacing="0px" text-anchor="middle" word-spacing="0px">
<text x="102.95511" y="135.11937" font-size="4.2475px" stroke-width=".53093" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="102.95511" y="135.11937" font-family="'Open Sans'" stroke-width=".53093">sync</tspan></text>
<text x="91.148003" y="3.3398941" font-size="3.8932px" stroke-width=".48665" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="91.148003" y="3.3398941" font-family="'Open Sans'" stroke-width=".48665">worker thread context</tspan></text>
<text x="158.78787" y="-23.464428" font-size="3.8932px" stroke-width=".48665" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="158.78787" y="-23.464428" font-family="'Open Sans'" stroke-width=".48665">lws service thread context</tspan></text>
</g>
<rect x="135.67" y="-21.491" width="60.617" height="182.23" fill="#9dac93" opacity=".497"/>
<text x="87.216949" y="130.01646" fill="#000000" font-family="'Open Sans'" font-size="1.9194px" letter-spacing="0px" stroke-width=".23992" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="87.216949" y="130.01646">wait until</tspan><tspan x="87.216949" y="132.41566">the lws</tspan><tspan x="87.216949" y="134.81487">service</tspan><tspan x="87.216949" y="137.21407">thread</tspan><tspan x="87.216949" y="139.61328">knows the</tspan><tspan x="87.216949" y="142.01248">task is done</tspan></text>
<path d="m167.23 0.99672c-0.56428 2.168-0.208 2.0595-0.42484 6.8954" fill="none" marker-end="url(#e)" stroke="#000" stroke-width=".98901"/>
<g>
<circle cx="166.68" cy="-6.6945" r="9.1221" fill="#cfa" stroke="#4d4d4d" stroke-linejoin="round" stroke-opacity=".99608" stroke-width=".4711"/>
<text x="166.52914" y="-9.9058332" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1" xml:space="preserve"><tspan x="166.52914" y="-9.9058332" style="line-height:1">new</tspan><tspan x="166.52914" y="-5.6583772" style="line-height:1">wsi on</tspan><tspan x="166.52914" y="-1.4109211" style="line-height:1">mount</tspan></text>
<rect x="153.45" y="8.1222" width="27.384" height="16.007" fill="#37c8ab" opacity=".497"/>
<text x="167.22537" y="13.081109" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1" xml:space="preserve"><tspan x="167.22537" y="13.081109" style="line-height:1">protocol</tspan><tspan x="167.22537" y="17.328566" style="line-height:1">_HTTP</tspan><tspan x="167.22537" y="21.576021" style="line-height:1">callback</tspan></text>
</g>
<path d="m153.38 15.565-42.877 5.5222" fill="none" marker-end="url(#Arrow1Mend)" stroke="#2a7fff" stroke-dasharray="0.26499999, 0.26499999" stroke-width=".265"/>
<text transform="rotate(-6.3716)" x="129.80591" y="31.348564" fill="#0044aa" font-family="'Open Sans'" font-size="2.9008px" letter-spacing="0px" stroke-width=".3626" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="129.80591" y="31.348564" fill="#0044aa" font-family="'Open Sans'" stroke-width=".3626">enqueue threadpool task</tspan></text>
<path d="m110.03 66.524 43.37-14.496" fill="none" marker-end="url(#p)" stroke="#2a7fff" stroke-dasharray="0.26499999, 0.26499999" stroke-width=".265"/>
<g>
<rect x="153.23" y="43.846" width="27.384" height="16.007" fill="#37c8ab" opacity=".497"/>
<text x="167.00159" y="51.049557" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1" xml:space="preserve"><tspan x="167.00159" y="51.049557" style="line-height:1">protocol</tspan><tspan x="167.00159" y="55.297012" style="line-height:1">WRITEABLE</tspan></text>
<text transform="rotate(-17.805)" x="111.54491" y="95.327446" fill="#0044aa" font-family="'Open Sans'" font-size="2.9008px" letter-spacing="0px" stroke-width=".3626" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="111.54491" y="95.327446" fill="#0044aa" font-family="'Open Sans'" stroke-width=".3626">cancel service</tspan></text>
</g>
<path d="m152.72 54.497-43.37 14.496" fill="none" marker-end="url(#n)" stroke="#2a7fff" stroke-dasharray="0.26499999, 0.26499999" stroke-width=".265"/>
<g>
<rect x="153.67" y="114.19" width="27.384" height="16.007" fill="#37c8ab" opacity=".497"/>
<text x="167.45035" y="121.39483" fill="#000000" font-family="'Open Sans'" font-size="4.2475px" letter-spacing="0px" stroke-width=".53093" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1" xml:space="preserve"><tspan x="167.45035" y="121.39483" style="line-height:1">protocol</tspan><tspan x="167.45035" y="125.64229" style="line-height:1">WRITEABLE</tspan></text>
<text transform="rotate(-17.805)" x="89.692116" y="162.38983" fill="#0044aa" font-family="'Open Sans'" font-size="2.9008px" letter-spacing="0px" stroke-width=".3626" text-align="center" text-anchor="middle" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="89.692116" y="162.38983" fill="#0044aa" font-family="'Open Sans'" stroke-width=".3626">cancel service</tspan></text>
</g>
<path d="m111.52 136.63 43.37-14.496" fill="none" marker-end="url(#o)" stroke="#2a7fff" stroke-dasharray="0.26499999, 0.26499999" stroke-width=".265"/>
<path d="m154.21 124.6-43.37 14.496" fill="none" marker-end="url(#m)" stroke="#2a7fff" stroke-dasharray="0.26499999, 0.26499999" stroke-width=".265"/>
<g font-family="'Open Sans'" letter-spacing="0px" text-anchor="middle" word-spacing="0px">
<text transform="rotate(-17.805)" x="107.68779" y="101.80786" fill="#0044aa" font-size="2.9008px" stroke-width=".3626" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="107.68779" y="101.80786" fill="#0044aa" font-family="'Open Sans'" stroke-width=".3626">lws_threadpool_task_sync</tspan></text>
<text transform="rotate(-17.805)" x="85.189613" y="168.89598" fill="#0044aa" font-size="2.9008px" stroke-width=".3626" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="85.189613" y="168.89598" fill="#0044aa" font-family="'Open Sans'" stroke-width=".3626">lws_threadpool_task_status_wsi</tspan></text>
<text x="132.67136" y="108.40496" fill="#000000" font-size="2.1167px" stroke-width=".26458" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="132.67136" y="108.40496">move to</tspan><tspan x="132.67136" y="111.0508">"done queue"</tspan><tspan x="132.67136" y="113.69662">idling</tspan><tspan x="132.67136" y="116.34246">worker thread</tspan></text>
</g>
<path d="m91.919 68.712c-21.074 4.8881-23.555 52.34-13.084 71.959 2.8493 5.3384 9.0772 11.691 14.142 11.914" fill="none" marker-end="url(#a)" stroke="#000" stroke-dasharray="0.265, 0.265" stroke-width=".265"/>
<g fill="#000000" font-family="'Open Sans'" letter-spacing="0px" text-anchor="middle" word-spacing="0px">
<text x="79.087029" y="66.727417" font-size="2.1167px" stroke-width=".26458" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="79.087029" y="66.727417">wsi has</tspan><tspan x="79.087029" y="69.373253">unexpect-</tspan><tspan x="79.087029" y="72.019081">edly gone</tspan></text>
<text x="167.50217" y="64.309265" font-size="3.5963px" stroke-width=".44954" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="167.50217" y="64.309265">write the</tspan><tspan x="167.50217" y="68.804657">buffer on</tspan><tspan x="167.50217" y="73.300049">the wsi</tspan></text>
<text x="167.53455" y="136.16283" font-size="3.5963px" stroke-width=".44954" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="167.53455" y="136.16283">acknowledge</tspan><tspan x="167.53455" y="140.65822">the task has</tspan><tspan x="167.53455" y="145.15361">ended</tspan></text>
<text x="100.54455" y="-20.110546" font-size="7.4216px" stroke-width=".9277" text-align="center" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="100.54455" y="-20.110546" font-family="'Open Sans'" stroke-width=".9277">Threadpool</tspan></text>
</g>
<g transform="matrix(1.3705 0 0 1.3705 152.73 -89.601)">
<path d="m-53.889 51.059c-0.2293-0.20876-0.45859-0.41752-0.68788-0.62628h-2.0842c-0.41073-0.4478-0.80598-0.91066-1.2269-1.3486-0.15635-0.18661-0.4411-0.26749-0.65538-0.13157-0.2532 0.13273-0.37597 0.41669-0.39448 0.69074-0.04758 0.31909 0.05354 0.68674 0.35175 0.85254 0.18832 0.10808 0.49386 0.04671 0.5437-0.19062 0.06699-0.21357-0.06488-0.51518-0.31343-0.50948-0.16072 3e-3 -0.26391 0.28316-0.07717 0.33226 0.08893 0.01617 0.10978-0.27326 0.17002-0.07939 0.0638 0.17507-0.16528 0.31938-0.28293 0.16402-0.213-0.21052-0.14192-0.60391 0.10406-0.75718 0.23529-0.07437 0.39199 0.16171 0.52824 0.31112 0.3873 0.42849 0.77336 0.85822 1.1601 1.2873 0.95481 0.0017 1.9097 0.0034 2.8645 0.0051z"/>
<path d="m-56.225 50.077c0.07138-0.08712 0.14278-0.17424 0.21416-0.26135 0.10648 0.11858 0.21295 0.23715 0.31943 0.35573 0.19118 0.0012 0.38235 0.0025 0.57352 0.0036-0.2069-0.22868-0.41381-0.45737-0.62072-0.68605 0.10043-0.11374 0.20086-0.22747 0.30128-0.34121 0.19601 0.22868 0.39203 0.45737 0.58804 0.68605-0.0012-0.219-0.0025-0.43801-0.0036-0.65701-0.10043-0.11252-0.20085-0.22505-0.30128-0.33758 0.07198-0.10615 0.24907-0.21551 0.08668-0.31451-0.36406-0.40214-0.72811-0.80428-1.0922-1.2064-0.76543-0.0026-1.5317 0.01021-2.2966-0.0021-0.1821-0.01604-0.40898-0.07372-0.45071-0.28286-0.088-0.27195 0.21101-0.59285 0.48564-0.46528 0.18982 0.03674 0.11004 0.42003-0.05779 0.29297 0.17429-0.25602-0.31955-0.22697-0.17976 0.02451 0.09575 0.22004 0.44485 0.25635 0.58255 0.05617 0.14323-0.2308-0.05154-0.4937-0.2705-0.58497-0.21915-0.10466-0.49117-0.07771-0.67434 0.08567-0.27378 0.20864-0.40621 0.61643-0.22752 0.92795 0.104 0.233 0.33446 0.38219 0.58455 0.41121 0.33507 0.04187 0.67431 0.01608 1.0114 0.02326h1.3407c0.25409 0.28192 0.50818 0.56384 0.76228 0.84577-0.22868 0.25288-0.45736 0.50576-0.68605 0.75865-0.16572-0.21381-0.41577-0.37628-0.51666-0.63121-0.08161-0.29871 0.35413-0.53156 0.5618-0.30555 0.19943 0.11035-0.01178 0.46291-0.15577 0.26978 0.1166-0.03951 0.16904-0.22582-0.01437-0.21638-0.21499 0.08183-0.12303 0.40655 0.0643 0.46926 0.17871 0.08651 0.40261-0.07556 0.3799-0.27118 0.01826-0.28278-0.18842-0.5907-0.48713-0.60388-0.31967-0.06227-0.68039 0.12472-0.75854 0.45263-0.10766 0.28228 0.06955 0.56541 0.26324 0.75966 0.22723 0.24585 0.44911 0.49691 0.67408 0.74475z" fill="#f00"/>
<path d="m-54.464 50.361c0.0017-0.54757 0.0034-1.0951 0.0051-1.6427-0.33832-0.38748-0.69457-0.75998-1.021-1.1574-0.10913-0.13496-0.19252-0.29467-0.18614-0.47289-0.0086-0.33136 0.19731-0.65612 0.50556-0.78212 0.23741-0.10513 0.52154-0.11525 0.754 0.0094 0.3243 0.16516 0.47791 0.61109 0.30157 0.93353-0.13519 0.19951-0.48216 0.19542-0.59404-0.0265-0.1079-0.14135-0.10151-0.43084 0.10373-0.47103 0.16696-0.0034 0.20764 0.24106 0.02397 0.25083-0.04085 0.15882 0.28306 0.12232 0.27255-0.04546 0.04138-0.23154-0.21514-0.42013-0.4314-0.37653-0.24357 0.02414-0.45758 0.28096-0.37332 0.52553 0.08871 0.24037 0.30285 0.40102 0.46218 0.59282 0.25613 0.27896 0.51148 0.55865 0.76757 0.83766-0.0017 0.61002-0.0034 1.22-0.0051 1.8301-0.19509-0.0017-0.3902-0.0035-0.58527-0.0051z"/>
</g>
<text x="70.290306" y="-13.629649" fill="#000000" font-family="'Open Sans'" font-size="2.8389px" letter-spacing="0px" stroke-width=".35487" text-align="center" word-spacing="0px" style="font-feature-settings:normal;font-variant-caps:normal;font-variant-ligatures:normal;font-variant-numeric:normal;line-height:1.25" xml:space="preserve"><tspan x="70.290306" y="-13.629649" text-align="start">synchronization with the lws service thread</tspan><tspan x="70.290306" y="-10.08099" text-align="start">(syncs to the correct service thread for the wsi)</tspan></text>
</g>
</svg>

After

Width:  |  Height:  |  Size: 26 KiB

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 15 KiB

View file

@ -407,6 +407,7 @@ struct lws;
#include <libwebsockets/lws-vfs.h>
#include <libwebsockets/lws-lejp.h>
#include <libwebsockets/lws-stats.h>
#include <libwebsockets/lws-threadpool.h>
#if defined(LWS_WITH_TLS)

View file

@ -729,8 +729,7 @@ enum lws_callback_reasons {
* these callbacks. The deadline can be continuously extended into the
* future by later calls to lws_set_timer_usecs() before the deadline
* expires, or cancelled by lws_set_timer_usecs(wsi, -1);
* See the note on lws_set_timer_usecs() about which event loops are
* supported. */
*/
LWS_CALLBACK_EVENT_WAIT_CANCELLED = 71,
/**< This is sent to every protocol of every vhost in response

View file

@ -867,33 +867,5 @@ struct lws_http_mount {
void *_unused[2]; /**< dummy */
};
/**
* lws_http_compression_apply() - apply an http compression transform
*
* \param wsi: the wsi to apply the compression transform to
* \param name: NULL, or the name of the compression transform, eg, "deflate"
* \param p: pointer to pointer to headers buffer
* \param end: pointer to end of headers buffer
* \param decomp: 0 = add compressor to wsi, 1 = add decompressor
*
* This allows transparent compression of dynamically generated HTTP. The
* requested compression (eg, "deflate") is only applied if the client headers
* indicated it was supported (and it has support in lws), otherwise it's a NOP.
*
* If the requested compression method is NULL, then the supported compression
* formats are tried, and for non-decompression (server) mode the first that's
* found on the client's accept-encoding header is chosen.
*
* NOTE: the compression transform, same as h2 support, relies on the user
* code using LWS_WRITE_HTTP and then LWS_WRITE_HTTP_FINAL on the last part
* written. The internal lws fileserving code already does this.
*
* If the library was built without the cmake option
* LWS_WITH_HTTP_STREAM_COMPRESSION set, then a NOP is provided for this api,
* allowing user code to build either way and use compression if available.
*/
LWS_VISIBLE int
lws_http_compression_apply(struct lws *wsi, const char *name,
unsigned char **p, unsigned char *end, char decomp);
///@}
///@}

View file

@ -646,5 +646,34 @@ lws_http_redirect(struct lws *wsi, int code, const unsigned char *loc, int len,
*/
LWS_VISIBLE LWS_EXTERN int LWS_WARN_UNUSED_RESULT
lws_http_transaction_completed(struct lws *wsi);
/**
* lws_http_compression_apply() - apply an http compression transform
*
* \param wsi: the wsi to apply the compression transform to
* \param name: NULL, or the name of the compression transform, eg, "deflate"
* \param p: pointer to pointer to headers buffer
* \param end: pointer to end of headers buffer
* \param decomp: 0 = add compressor to wsi, 1 = add decompressor
*
* This allows transparent compression of dynamically generated HTTP. The
* requested compression (eg, "deflate") is only applied if the client headers
* indicated it was supported (and it has support in lws), otherwise it's a NOP.
*
* If the requested compression method is NULL, then the supported compression
* formats are tried, and for non-decompression (server) mode the first that's
* found on the client's accept-encoding header is chosen.
*
* NOTE: the compression transform, same as h2 support, relies on the user
* code using LWS_WRITE_HTTP and then LWS_WRITE_HTTP_FINAL on the last part
* written. The internal lws fileserving code already does this.
*
* If the library was built without the cmake option
* LWS_WITH_HTTP_STREAM_COMPRESSION set, then a NOP is provided for this api,
* allowing user code to build either way and use compression if available.
*/
LWS_VISIBLE int
lws_http_compression_apply(struct lws *wsi, const char *name,
unsigned char **p, unsigned char *end, char decomp);
///@}

View file

@ -0,0 +1,225 @@
/*
* libwebsockets - small server side websockets and web server implementation
*
* Copyright (C) 2010-2018 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation:
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301 USA
*
* included from libwebsockets.h
*/
/** \defgroup threadpool Threadpool related functions
* ##Threadpool
* \ingroup lwsapi
*
* This allows you to create one or more pool of threads which can run tasks
* associated with a wsi. If the pool is busy, tasks wait on a queue.
*
* Tasks don't have to be atomic, if they will take more than a few tens of ms
* they should return back to the threadpool worker with a return of 0. This
* will allow them to abort cleanly.
*/
//@{
struct lws_threadpool;
struct lws_threadpool_task;
enum lws_threadpool_task_status {
LWS_TP_STATUS_QUEUED,
LWS_TP_STATUS_RUNNING,
LWS_TP_STATUS_SYNCING,
LWS_TP_STATUS_STOPPING,
LWS_TP_STATUS_FINISHED, /* lws_threadpool_task_status() frees task */
LWS_TP_STATUS_STOPPED, /* lws_threadpool_task_status() frees task */
};
enum lws_threadpool_task_return {
/** Still work to do, just confirming not being stopped */
LWS_TP_RETURN_CHECKING_IN,
/** Still work to do, enter cond_wait until service thread syncs. This
* is used if you have filled your buffer(s) of data to the service
* thread and are blocked until the service thread completes sending at
* least one.
*/
LWS_TP_RETURN_SYNC,
/** No more work to do... */
LWS_TP_RETURN_FINISHED,
/** Responding to request to stop */
LWS_TP_RETURN_STOPPED
};
struct lws_threadpool_create_args {
int threads;
int max_queue_depth;
};
struct lws_threadpool_task_args {
struct lws *wsi; /**< user must set to wsi task is bound to */
void *user; /**< user may set (user-private pointer) */
const char *name; /**< user may set to describe task */
enum lws_threadpool_task_return (*task)(void *user,
enum lws_threadpool_task_status s);
/**< user must set to actual task function */
void (*cleanup)(struct lws *wsi, void *user);
/**< socket lifecycle may end while task is not stoppable, so the task
* must be able to detach from any wsi and clean itself up when it does
* stop. If NULL, no cleanup necessary, otherwise point to a user-
* supplied function that destroys the stuff in \p user.
*
* wsi may be NULL on entry, indicating the task got detached due to the
* wsi closing before.
*/
};
/**
* lws_threadpool_create() - create a pool of worker threads
*
* \param context: the lws_context the threadpool will exist inside
* \param args: argument struct prepared by caller
* \param format: printf-type format for the task name
* \param ...: printf type args for the task name format
*
* Creates a pool of worker threads with \p threads and a queue of up to
* \p max_queue_depth waiting tasks if all the threads are busy.
*
* Returns NULL if OOM, or a struct lws_threadpool pointer that must be
* destroyed by lws_threadpool_destroy().
*/
LWS_VISIBLE LWS_EXTERN struct lws_threadpool *
lws_threadpool_create(struct lws_context *context,
const struct lws_threadpool_create_args *args,
const char *format, ...) LWS_FORMAT(3);
/**
* lws_threadpool_finish() - Stop all pending and running tasks
*
* \param tp: the threadpool object
*
* Marks the threadpool as under destruction. Removes everything from the
* pending queue and completes those tasks as LWS_TP_STATUS_STOPPED.
*
* Running tasks will also get LWS_TP_STATUS_STOPPED as soon as they
* "resurface".
*
* This doesn't reap tasks or free the threadpool, the reaping is done by the
* lws_threadpool_task_status() on the done task.
*/
LWS_VISIBLE LWS_EXTERN void
lws_threadpool_finish(struct lws_threadpool *tp);
/**
* lws_threadpool_destroy() - Destroy a threadpool
*
* \param tp: the threadpool object
*
* Waits for all worker threads to stop, ends the threads and frees the tp.
*/
LWS_VISIBLE LWS_EXTERN void
lws_threadpool_destroy(struct lws_threadpool *tp);
/**
* lws_threadpool_enqueue() - Queue the task and run it on a worker thread when possible
*
* \param tp: the threadpool to queue / run on
* \param args: information about what to run
* \param format: printf-type format for the task name
* \param ...: printf type args for the task name format
*
* This asks for a task to run ASAP on a worker thread in threadpool \p tp.
*
* The args defines the wsi, a user-private pointer, a timeout in secs and
* a pointer to the task function.
*
* Returns NULL or an opaque pointer to the queued (or running, or completed)
* task.
*
* Once a task is created and enqueued, it can only be destroyed by calling
* lws_threadpool_task_status() on it after it has reached the state
* LWS_TP_STATUS_FINISHED or LWS_TP_STATUS_STOPPED.
*/
LWS_VISIBLE LWS_EXTERN struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool *tp,
const struct lws_threadpool_task_args *args,
const char *format, ...) LWS_FORMAT(3);
/**
* lws_threadpool_dequeue() - Dequeue or try to stop a running task
*
* \param wsi: the wsi whose current task we want to eliminate
*
* Returns 0 is the task was dequeued or already compeleted, or 1 if the task
* has been asked to stop asynchronously.
*
* This doesn't free the task. It only shortcuts it to state
* LWS_TP_STATUS_STOPPED. lws_threadpool_task_status() must be performed on
* the task separately once it is in LWS_TP_STATUS_STOPPED to free the task.
*/
LWS_VISIBLE LWS_EXTERN int
lws_threadpool_dequeue(struct lws *wsi);
/**
* lws_threadpool_task_status() - Dequeue or try to stop a running task
*
* \param wsi: the wsi to query the current task of
* \param task: receives a pointer to the opaque task
* \param user: receives a void * pointer to the task user data
*
* This is the equivalent of posix waitpid()... it returns the status of the
* task, and if the task is in state LWS_TP_STATUS_FINISHED or
* LWS_TP_STATUS_STOPPED, frees \p task. If in another state, the task
* continues to exist.
*
* This is designed to be called from the service thread.
*
* Its use is to make sure the service thread has seen the state of the task
* before deleting it.
*/
LWS_VISIBLE LWS_EXTERN enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws *wsi,
struct lws_threadpool_task **task, void **user);
/**
* lws_threadpool_task_sync() - Indicate to a stalled task it may continue
*
* \param task: the task to unblock
* \param stop: 0 = run after unblock, 1 = when he unblocks, stop him
*
* Inform the task that the service thread has finished with the shared data
* and that the task, if blocked in LWS_TP_RETURN_SYNC, may continue.
*
* If the lws service context determined that the task must be aborted, it
* should still call this but with stop = 1, causing the task to finish.
*/
LWS_VISIBLE LWS_EXTERN void
lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop);
/**
* lws_threadpool_dump() - dump the state of a threadpool to the log
*
* \param tp: The threadpool to dump
*
* This locks the threadpool and then dumps the pending queue, the worker
* threads and the done queue, together with time information for how long
* the tasks have been in their current state, how long they have occupied a
* thread, etc.
*
* This only does anything on lws builds with CMAKE_BUILD_TYPE=DEBUG, otherwise
* while it still exists, it's a NOP.
*/
LWS_VISIBLE LWS_EXTERN void
lws_threadpool_dump(struct lws_threadpool *tp);
//@}

View file

@ -61,6 +61,8 @@ enum pending_timeout {
PENDING_TIMEOUT_UDP_IDLE = 26,
PENDING_TIMEOUT_CLIENT_CONN_IDLE = 27,
PENDING_TIMEOUT_LAGGING = 28,
PENDING_TIMEOUT_THREADPOOL = 29,
PENDING_TIMEOUT_THREADPOOL_TASK = 30,
/****** add new things just above ---^ ******/

View file

@ -147,7 +147,7 @@ lws_client_connect_via_info(const struct lws_client_connect_info *i)
lwsl_info("%s: protocol binding to %s\n", __func__, local);
p = lws_vhost_name_to_protocol(wsi->vhost, local);
if (p)
lws_bind_protocol(wsi, p);
lws_bind_protocol(wsi, p, __func__);
}
/*

View file

@ -537,7 +537,7 @@ lws_remove_child_from_any_parent(struct lws *wsi)
}
int
lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p)
lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p, const char *reason)
{
// if (wsi->protocol == p)
// return 0;
@ -546,7 +546,7 @@ lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p)
if (wsi->protocol && wsi->protocol_bind_balance) {
wsi->protocol->callback(wsi,
wsi->role_ops->protocol_unbind_cb[!!lwsi_role_server(wsi)],
wsi->user_space, NULL, 0);
wsi->user_space, (void *)reason, 0);
wsi->protocol_bind_balance = 0;
}
if (!wsi->user_space_externally_allocated)
@ -759,7 +759,7 @@ __lws_close_free_wsi(struct lws *wsi, enum lws_close_status reason, const char *
wsi->protocol->callback(wsi,
wsi->role_ops->protocol_unbind_cb[
!!lwsi_role_server(wsi)],
wsi->user_space, NULL, 0);
wsi->user_space, (void *)__func__, 0);
wsi->protocol_bind_balance = 0;
}
@ -794,7 +794,7 @@ just_kill_connection:
wsi->protocol->callback(wsi,
wsi->role_ops->protocol_unbind_cb[
!!lwsi_role_server(wsi)],
wsi->user_space, NULL, 0);
wsi->user_space, (void *)__func__, 0);
wsi->protocol_bind_balance = 0;
}

View file

@ -29,7 +29,7 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
struct lws_context *context = lws_get_context(wsi);
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
size_t real_len = len;
unsigned int n;
unsigned int n, m;
// lwsl_notice("%s: len %d\n", __func__, (int)len);
@ -104,13 +104,15 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
/* nope, send it on the socket directly */
lws_latency_pre(context, wsi);
n = lws_ssl_capable_write(wsi, buf, n);
lws_latency(context, wsi, "send lws_issue_raw", n, n == len);
m = lws_ssl_capable_write(wsi, buf, n);
lws_latency(context, wsi, "send lws_issue_raw", n, n == m);
lwsl_info("%s: ssl_capable_write (%d) says %d\n", __func__, n, m);
/* something got written, it can have been truncated now */
wsi->could_have_pending = 1;
switch (n) {
switch (m) {
case LWS_SSL_CAPABLE_ERROR:
/* we're going to close, let close know sends aren't possible */
wsi->socket_is_permanently_unusable = 1;
@ -121,7 +123,7 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
* ie, implying treat it was a truncated send so it gets
* retried
*/
n = 0;
m = 0;
break;
}
@ -131,17 +133,17 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
* send in the buflist.
*/
if (lws_has_buffered_out(wsi)) {
if (n) {
lwsl_info("%p partial adv %d (vs %ld)\n", wsi, n,
if (m) {
lwsl_info("%p partial adv %d (vs %ld)\n", wsi, m,
(long)real_len);
lws_buflist_use_segment(&wsi->buflist_out, n);
lws_buflist_use_segment(&wsi->buflist_out, m);
}
if (!lws_has_buffered_out(wsi)) {
lwsl_info("%s: wsi %p: buflist_out flushed\n",
__func__, wsi);
n = (int)real_len;
m = (int)real_len;
if (lwsi_state(wsi) == LRS_FLUSHING_BEFORE_CLOSE) {
lwsl_info("** %p signalling to close now\n", wsi);
return -1; /* retry closing now */
@ -162,7 +164,7 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
/* always callback on writeable */
lws_callback_on_writable(wsi);
return n;
return m;
}
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
@ -170,9 +172,9 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
lws_callback_on_writable(wsi);
#endif
if ((unsigned int)n == real_len)
if (m == real_len)
/* what we just sent went out cleanly */
return n;
return m;
/*
* We were not able to send everything... and we were not sending from
@ -180,13 +182,13 @@ int lws_issue_raw(struct lws *wsi, unsigned char *buf, size_t len)
* buffering the unsent remainder on it.
* (it will get first priority next time the socket is writable).
*/
lwsl_debug("%p new partial sent %d from %lu total\n", wsi, n,
lwsl_debug("%p new partial sent %d from %lu total\n", wsi, m,
(unsigned long)real_len);
lws_buflist_append_segment(&wsi->buflist_out, buf + n, real_len - n);
lws_buflist_append_segment(&wsi->buflist_out, buf + m, real_len - m);
lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_C_WRITE_PARTIALS, 1);
lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_B_PARTIALS_ACCEPTED_PARTS, n);
lws_stats_atomic_bump(wsi->context, pt, LWSSTATS_B_PARTIALS_ACCEPTED_PARTS, m);
#if !defined(LWS_WITH_ESP32)
if (lws_wsi_is_udp(wsi)) {

View file

@ -474,7 +474,7 @@ lws_same_vh_protocol_insert(struct lws *wsi, int n)
{
if (wsi->same_vh_protocol_prev || wsi->same_vh_protocol_next) {
lws_same_vh_protocol_remove(wsi);
lwsl_notice("Attempted to attach wsi twice to same vh prot\n");
lwsl_info("Attempted to attach wsi twice to same vh prot\n");
}
lws_vhost_lock(wsi->vhost);

View file

@ -631,6 +631,11 @@ struct lws_context {
struct lws_vhost *vhost_pending_destruction_list;
struct lws_plugin *plugin_list;
struct lws_deferred_free *deferred_free_list;
#if defined(LWS_WITH_THREADPOOL)
struct lws_threadpool *tp_list_head;
#endif
#if defined(LWS_WITH_PEER_LIMITS)
struct lws_peer **pl_hash_table;
struct lws_peer *peer_wait_list;
@ -873,6 +878,10 @@ struct lws {
struct lws_dll_lws dll_hrtimer;
struct lws_dll_lws dll_buflist; /* guys with pending rxflow */
#if defined(LWS_WITH_THREADPOOL)
struct lws_threadpool_task *tp_task;
#endif
#if defined(LWS_WITH_PEER_LIMITS)
struct lws_peer *peer;
#endif
@ -1350,7 +1359,8 @@ int
lws_protocol_init(struct lws_context *context);
int
lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p);
lws_bind_protocol(struct lws *wsi, const struct lws_protocols *p,
const char *reason);
const struct lws_http_mount *
lws_find_mount(struct lws *wsi, const char *uri_ptr, int uri_len);
@ -1488,6 +1498,8 @@ hubbub_error
html_parser_cb(const hubbub_token *token, void *pw);
#endif
int
lws_threadpool_tsi_context(struct lws_context *context, int tsi);
void
__lws_remove_from_timeout_list(struct lws *wsi);

View file

@ -99,7 +99,7 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
wsi->http.comp_ctx.may_have_more) {
enum lws_write_protocol wp = LWS_WRITE_HTTP;
lwsl_debug("%s: completing comp partial (buflist_comp %p, may %d)\n",
lwsl_info("%s: completing comp partial (buflist_comp %p, may %d)\n",
__func__, wsi->http.comp_ctx.buflist_comp,
wsi->http.comp_ctx.may_have_more
);
@ -334,9 +334,10 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi)
{
struct lws_context_per_thread *pt = &context->pt[tsi];
/* Figure out if we really want to wait in poll()
* We only need to wait if really nothing already to do and we have
* to wait for something from network
/*
* Figure out if we really want to wait in poll()... we only need to
* wait if really nothing already to do and we have to wait for
* something from network
*/
#if defined(LWS_ROLE_WS) && !defined(LWS_WITHOUT_EXTENSIONS)
/* 1) if we know we are draining rx ext, do not wait in poll */
@ -347,11 +348,12 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi)
/* 2) if we know we have non-network pending data, do not wait in poll */
if (pt->context->tls_ops &&
pt->context->tls_ops->fake_POLLIN_for_buffered)
if (pt->context->tls_ops->fake_POLLIN_for_buffered(pt))
pt->context->tls_ops->fake_POLLIN_for_buffered &&
pt->context->tls_ops->fake_POLLIN_for_buffered(pt))
return 0;
/* 3) If there is any wsi with rxflow buffered and in a state to process
/*
* 3) If there is any wsi with rxflow buffered and in a state to process
* it, we should not wait in poll
*/
@ -361,6 +363,12 @@ lws_service_adjust_timeout(struct lws_context *context, int timeout_ms, int tsi)
if (lwsi_state(wsi) != LRS_DEFERRING_ACTION)
return 0;
/*
* 4) If any guys with http compression to spill, we shouldn't wait in
* poll but hurry along and service them
*/
} lws_end_foreach_dll(d);
return timeout_ms;

View file

@ -0,0 +1,175 @@
## Threadpool
### Overview
![overview](/doc-assets/threadpool.svg)
An api that lets you create a pool of worker threads, and a queue of tasks that
are bound to a wsi. Tasks in their own thread synchronize communication to the
lws service thread of the wsi via `LWS_CALLBACK_SERVER_WRITEABLE` and friends.
Tasks can produce some output, then return that they want to "sync" with the
service thread. That causes a `LWS_CALLBACK_SERVER_WRITEABLE` in the service
thread context, where the output can be consumed, and the task told to continue,
or completed tasks be reaped.
ALL of the details related to thread synchronization and an associated wsi in
the lws service thread context are handled by the threadpool api, without needing
any pthreads in user code.
### Example
https://libwebsockets.org/git/libwebsockets/tree/minimal-examples/ws-server/minimal-ws-server-threadpool
### Lifecycle considerations
#### Tasks vs wsi
Although all tasks start out as being associated to a wsi, in fact the lifetime
of a task and that of the wsi are not necessarily linked.
You may start a long task, eg, that runs atomically in its thread for 30s, and
at any time the client may close the connection, eg, close a browser window.
There are arrangements that a task can "check in" periodically with lws to see
if it has been asked to stop, allowing the task lifetime to be related to the
wsi lifetime somewhat, but some tasks are going to be atomic and longlived.
For that reason, at wsi close an ongoing task can detach from the wsi and
continue until it ends or understands it has been asked to stop. To make
that work, the task is created with a `cleanup` callback that performs any
freeing independent of still having a wsi around to do it... the task takes over
responsibility to free the user pointer on destruction when the task is created.
![Threadpool States](/doc-assets/threadpool-states.svg)
#### Reaping completed tasks
Once created, although tasks may run asynchronously, the task itself does not
get destroyed on completion but added to a "done queue". Only when the lws
service thread context queries the task state with `lws_threadpool_task_status()`
may the task be reaped and memory freed.
This is analogous to unix processes and `wait()`.
If a task became detached from its wsi, then joining the done queue is enough
to get the task reaped, since there's nobody left any more to synchronize the
reaping with.
### User interface
The api is declared at https://libwebsockets.org/git/libwebsockets/tree/include/libwebsockets/lws-threadpool.h
#### Threadpool creation / destruction
The threadpool should be created at program or vhost init using
`lws_threadpool_create()` and destroyed on exit or vhost destruction using
first `lws_threadpool_finish()` and then `lws_threadpool_destroy()`.
Threadpools should be named, varargs are provided on the create function
to facilite eg, naming the threadpool by the vhost it's associated with.
Threadpool creation takes an args struct with the following members:
Member|function
---|---
threads|The maxiumum number of independent threads in the pool
max_queue_depth|The maximum number of tasks allowed to wait for a place in the pool
#### Task creation / destruction
Tasks are created and queued using `lws_threadpool_enqueue()`, this takes an
args struct with the following members
Member|function
---|---
wsi|The wsi the task is initially associated with
user|An opaque user-private pointer used for communication with the lws service thread and private state / data
task|A pointer to the function that will run in the pool thread
cleanup|A pointer to a function that will clean up finished or stopped tasks (perhaps freeing user)
Tasks also should have a name, the creation function again provides varargs
to simplify naming the task with string elements related to who started it
and why.
#### The task function itself
The task function receives the task user pointer and the task state. The
possible task states are
State|Meaning
---|---
LWS_TP_STATUS_QUEUED|Task is still waiting for a pool thread
LWS_TP_STATUS_RUNNING|Task is supposed to do its work
LWS_TP_STATUS_SYNCING|Task is blocked waiting for sync from lws service thread
LWS_TP_STATUS_STOPPING|Task has been asked to stop but didn't stop yet
LWS_TP_STATUS_FINISHED|Task has reported it has completed
LWS_TP_STATUS_STOPPED|Task has aborted
The task function will only be told `LWS_TP_STATUS_RUNNING` or
`LWS_TP_STATUS_STOPPING` in its status argument... RUNNING means continue with the
user task and STOPPING means clean up and return `LWS_TP_RETURN_STOPPED`.
If possible every 100ms or so the task should return `LWS_TP_RETURN_CHECKING_IN`
to allow lws to inform it reasonably quickly that it has been asked to stop
(eg, because the related wsi has closed), or if it can continue. If not
possible, it's okay but eg exiting the application may experience delays
until the running task finishes, and since the wsi may have gone, the work
is wasted.
The task function may return one of
Return|Meaning
---|---
LWS_TP_RETURN_CHECKING_IN|Still wants to run, but confirming nobody asked him to stop. Will be called again immediately with `LWS_TP_STATUS_RUNNING` or `LWS_TP_STATUS_STOPPING`
LWS_TP_RETURN_SYNC|Task wants to trigger a WRITABLE callback and block until lws service thread restarts it with `lws_threadpool_task_sync()`
LWS_TP_RETURN_FINISHED|Task has finished, successfully as far as it goes
LWS_TP_RETURN_STOPPED|Task has finished, aborting in response to a request to stop
#### Synchronizing
The task can choose to "SYNC" with the lws service thread, in other words
cause a WRITABLE callback on the associated wsi in the lws service thread
context and block itself until it hears back from there via
`lws_threadpool_task_sync()` to resume the task.
This is typically used when, eg, the task has filled its buffer, or ringbuffer,
and needs to pause operations until what's done has been sent and some buffer
space is open again.
In the WRITABLE callback, in lws service thread context, the buffer can be
sent with `lws_write()` and then `lws_threadpool_task_sync()` to allow the task
to fill another buffer and continue that way.
If the WRITABLE callback determines that the task should stop, it can just call
`lws_threadpool_task_sync()` with the second argument as 1, to force the task
to stop immediately after it resumes.
#### The cleanup function
When a finished task is reaped, or a task that become detached from its initial
wsi completes or is stopped, it calls the `.cleanup` function defined in the
task creation args struct to free anything related to the user pointer.
With threadpool, responsibility for freeing allocations used by the task belongs
strictly with the task, via the `.cleanup` function, once the task has been
enqueued. That's different from a typical non-threadpool protocol where the
wsi lifecycle controls deallocation. This reflects the fact that the task
may outlive the wsi.
#### Protecting against WRITABLE and / or SYNC duplication
Care should be taken than data prepared by the task thread in the user priv
memory should only be sent once. For example, after sending data from a user
priv buffer of a given length stored in the priv, zero down the length.
Task execution and the SYNC writable callbacks are mutually exclusive, so there
is no danger of collision between the task thread and the lws service thread if
the reason for the callback is a SYNC operation from the task thread.
### Thread overcommit
If the tasks running on the threads are ultimately network-bound for all or some
of their processing (via the SYNC with the WRITEABLE callback), it's possible
to overcommit the number of threads in the pool compared to the number of
threads the processor has in hardware to get better occupancy in the CPU.

View file

@ -0,0 +1,979 @@
/*
* libwebsockets - threadpool api
*
* Copyright (C) 2018 Andy Green <andy@warmcat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation:
* version 2.1 of the License.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301 USA
*/
#include "core/private.h"
#include <pthread.h>
#include <string.h>
#include <stdio.h>
struct lws_threadpool;
struct lws_threadpool_task {
struct lws_threadpool_task *task_queue_next;
struct lws_threadpool *tp;
char name[32];
struct lws_threadpool_task_args args;
lws_usec_t created;
lws_usec_t acquired;
lws_usec_t done;
lws_usec_t entered_state;
lws_usec_t acc_running;
lws_usec_t acc_syncing;
pthread_cond_t wake_idle;
enum lws_threadpool_task_status status;
int late_sync_retries;
char wanted_writeable_cb;
};
struct lws_pool {
struct lws_threadpool *tp;
pthread_t thread;
pthread_mutex_t lock; /* part of task wake_idle */
struct lws_threadpool_task *task;
lws_usec_t acquired;
int worker_index;
};
struct lws_threadpool {
pthread_mutex_t lock; /* protects all pool lists */
pthread_cond_t wake_idle;
struct lws_pool *pool_list;
struct lws_context *context;
struct lws_threadpool *tp_list; /* context list of threadpools */
struct lws_threadpool_task *task_queue_head;
struct lws_threadpool_task *task_done_head;
char name[32];
int threads_in_pool;
int queue_depth;
int done_queue_depth;
int max_queue_depth;
int running_tasks;
unsigned int destroying:1;
};
static int
ms_delta(lws_usec_t now, lws_usec_t then)
{
return (int)((now - then) / 1000);
}
static void
us_accrue(lws_usec_t *acc, lws_usec_t then)
{
lws_usec_t now = lws_now_usecs();
*acc += now - then;
}
static int
pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
{
lws_usec_t delta = (now - then) + 1;
return (int)((us * 100) / delta);
}
static void
__lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
{
lws_usec_t now = lws_now_usecs();
char *end = buf + len - 1;
int syncms = 0, runms = 0;
if (!task->acquired) {
buf += lws_snprintf(buf, end - buf,
"task: %s, QUEUED queued: %dms",
task->name, ms_delta(now, task->created));
return;
}
if (task->acc_running)
runms = task->acc_running;
if (task->acc_syncing)
syncms = task->acc_syncing;
if (!task->done) {
buf += lws_snprintf(buf, end - buf,
"task: %s, ONGOING state %d (%dms) alive: %dms "
"(queued %dms, acquired: %dms, "
"run: %d%%, sync: %d%%)", task->name, task->status,
ms_delta(now, task->entered_state),
ms_delta(now, task->created),
ms_delta(task->acquired, task->created),
ms_delta(now, task->acquired),
pc_delta(now, task->acquired, runms),
pc_delta(now, task->acquired, syncms));
return;
}
buf += lws_snprintf(buf, end - buf,
"task: %s, DONE state %d lived: %dms "
"(queued %dms, on thread: %dms, "
"ran: %d%%, synced: %d%%)", task->name, task->status,
ms_delta(task->done, task->created),
ms_delta(task->acquired, task->created),
ms_delta(task->done, task->acquired),
pc_delta(task->done, task->acquired, runms),
pc_delta(task->done, task->acquired, syncms));
}
void
lws_threadpool_dump(struct lws_threadpool *tp)
{
#if defined(_DEBUG)
struct lws_threadpool_task **c;
char buf[160];
int n, count;
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
tp->name, tp->queue_depth, tp->running_tasks,
tp->done_queue_depth);
count = 0;
c = &tp->task_queue_head;
while (*c) {
struct lws_threadpool_task *task = *c;
__lws_threadpool_task_dump(task, buf, sizeof(buf));
lwsl_thread(" - %s\n", buf);
count++;
c = &(*c)->task_queue_next;
}
if (count != tp->queue_depth)
lwsl_err("%s: tp says queue depth %d, but actually %d\n",
__func__, tp->queue_depth, count);
count = 0;
for (n = 0; n < tp->threads_in_pool; n++) {
struct lws_pool *pool = &tp->pool_list[n];
struct lws_threadpool_task *task = pool->task;
if (task) {
__lws_threadpool_task_dump(task, buf, sizeof(buf));
lwsl_thread(" - worker %d: %s\n", n, buf);
count++;
}
}
if (count != tp->running_tasks)
lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
__func__, tp->running_tasks, count);
count = 0;
c = &tp->task_done_head;
while (*c) {
struct lws_threadpool_task *task = *c;
__lws_threadpool_task_dump(task, buf, sizeof(buf));
lwsl_thread(" - %s\n", buf);
count++;
c = &(*c)->task_queue_next;
}
if (count != tp->done_queue_depth)
lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
__func__, tp->done_queue_depth, count);
pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
#endif
}
static void
state_transition(struct lws_threadpool_task *task,
enum lws_threadpool_task_status status)
{
task->entered_state = lws_now_usecs();
task->status = status;
}
static void
lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
{
if (task->args.cleanup)
task->args.cleanup(task->args.wsi, task->args.user);
if (task->args.wsi)
task->args.wsi->tp_task = NULL;
lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
__func__, task->tp, task->args.wsi);
lws_free(task);
}
static void
__lws_threadpool_reap(struct lws_threadpool_task *task)
{
struct lws_threadpool_task **c, *t = NULL;
struct lws_threadpool *tp = task->tp;
/* remove the task from the done queue */
c = &tp->task_done_head;
while (*c) {
if ((*c) == task) {
t = *c;
*c = t->task_queue_next;
t->task_queue_next = NULL;
tp->done_queue_depth--;
lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__,
tp->name, task->args.wsi);
break;
}
c = &(*c)->task_queue_next;
}
if (!t)
lwsl_err("%s: task %p not in done queue\n", __func__, task);
/* call the task's cleanup and delete the task itself */
lws_threadpool_task_cleanup_destroy(task);
}
/*
* this gets called from each tsi service context after the service was
* cancelled... we need to ask for the writable callback from the matching
* tsi context for any wsis bound to a worked thread that need it
*/
int
lws_threadpool_tsi_context(struct lws_context *context, int tsi)
{
struct lws_threadpool_task **c, *task = NULL;
struct lws_threadpool *tp;
struct lws *wsi;
lws_context_lock(context, __func__);
tp = context->tp_list_head;
while (tp) {
int n;
/* for the running (syncing...) tasks... */
for (n = 0; n < tp->threads_in_pool; n++) {
struct lws_pool *pool = &tp->pool_list[n];
task = pool->task;
if (!task)
continue;
wsi = task->args.wsi;
if (!wsi || wsi->tsi != tsi ||
!task->wanted_writeable_cb)
continue;
task->wanted_writeable_cb = 0;
lws_memory_barrier();
/*
* finally... we can ask for the callback on
* writable from the correct service thread
* context
*/
lws_callback_on_writable(wsi);
}
/* for the done tasks... */
c = &tp->task_done_head;
while (*c) {
task = *c;
wsi = task->args.wsi;
if (wsi && wsi->tsi == tsi &&
task->wanted_writeable_cb) {
task->wanted_writeable_cb = 0;
lws_memory_barrier();
/*
* finally... we can ask for the callback on
* writable from the correct service thread
* context
*/
lws_callback_on_writable(wsi);
}
c = &task->task_queue_next;
}
tp = tp->tp_list;
}
lws_context_unlock(context);
return 0;
}
static int
lws_threadpool_worker_sync(struct lws_pool *pool,
struct lws_threadpool_task *task)
{
enum lws_threadpool_task_status temp;
struct timespec abstime;
struct lws *wsi;
int tries = 15;
/* block until writable acknowledges */
lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__,
pool->tp->name, task, task->name, task->args.wsi);
temp = task->status;
state_transition(task, LWS_TP_STATUS_SYNCING);
while (tries--) {
wsi = task->args.wsi;
/*
* if the wsi is no longer attached to this task, there is
* nothing we can sync to usefully. Since the work wants to
* sync, it means we should react to the situation by telling
* the task it can't continue usefully by stopping it.
*/
if (!wsi) {
lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
"wsi to sync to\n", __func__, pool->tp->name,
task, task->name);
state_transition(task, LWS_TP_STATUS_STOPPING);
goto done;
}
/*
* So tries times this is the maximum time between SYNC asking
* for a callback on writable and actually getting it we are
* willing to sit still for.
*
* If it is exceeded, we will stop the task.
*/
abstime.tv_sec = time(NULL) + 2;
abstime.tv_nsec = 0;
task->wanted_writeable_cb = 1;
lws_memory_barrier();
/*
* This will cause lws_threadpool_tsi_context() to get called
* from each tsi service context, where we can safely ask for
* a callback on writeable on the wsi we are associated with.
*/
lws_cancel_service(lws_get_context(wsi));
/*
* so the danger here is that we asked for a writable callback
* on the wsi, but for whatever reason, we are never going to
* get one. To avoid deadlocking forever, we allow a set time
* for the sync to happen naturally, otherwise the cond wait
* times out and we stop the task.
*/
if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
&abstime) == ETIMEDOUT) {
task->late_sync_retries++;
if (!tries) {
lwsl_err("%s: %s: task %p (%s): SYNC timed out "
"(associated wsi %p)\n",
__func__, pool->tp->name, task,
task->name, task->args.wsi);
state_transition(task, LWS_TP_STATUS_STOPPING);
goto done;
}
continue;
} else
break;
}
if (task->status == LWS_TP_STATUS_SYNCING)
state_transition(task, temp);
lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
done:
pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
return 0;
}
static void *
lws_threadpool_worker(void *d)
{
struct lws_threadpool_task **c, **c2, *task;
struct lws_pool *pool = d;
struct lws_threadpool *tp = pool->tp;
char buf[160];
while (!tp->destroying) {
/* we have no running task... wait and get one from the queue */
pthread_mutex_lock(&tp->lock); /* =================== tp lock */
/*
* if there's no task already waiting in the queue, wait for
* the wake_idle condition to signal us that might have changed
*/
while (!tp->task_queue_head && !tp->destroying)
pthread_cond_wait(&tp->wake_idle, &tp->lock);
if (tp->destroying) {
pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
continue;
}
c = &tp->task_queue_head;
c2 = NULL;
task = NULL;
pool->task = NULL;
/* look at the queue tail */
while (*c) {
c2 = c;
c = &(*c)->task_queue_next;
}
/* is there a task at the queue tail? */
if (c2 && *c2) {
pool->task = task = *c2;
task->acquired = pool->acquired = lws_now_usecs();
/* remove it from the queue */
*c2 = task->task_queue_next;
task->task_queue_next = NULL;
tp->queue_depth--;
/* mark it as running */
state_transition(task, LWS_TP_STATUS_RUNNING);
}
/* someone else got it first... wait and try again */
if (!task) {
pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
continue;
}
task->wanted_writeable_cb = 0;
/* we have acquired a new task */
__lws_threadpool_task_dump(task, buf, sizeof(buf));
lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
__func__, tp->name, pool->worker_index, buf);
tp->running_tasks++;
pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
/*
* 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
* "resurface" periodically, and get called again with
* cont = 1 immediately to indicate it is picking up where it
* left off if the task is not being "stopped".
*
* This allows long tasks to respond to requests to stop in
* a clean and opaque way.
*
* 2) The task can return with LWS_TP_RETURN_SYNC to register
* a "callback on writable" request on the service thread and
* block until it hears back from the WRITABLE handler.
*
* This allows the work on the thread to be synchronized to the
* previous work being dispatched cleanly.
*
* 3) The task can return with LWS_TP_RETURN_FINISHED to
* indicate its work is completed nicely.
*
* 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
* it stopped and cleaned up after incomplete work.
*/
do {
lws_usec_t then;
int n;
if (tp->destroying || !task->args.wsi)
state_transition(task, LWS_TP_STATUS_STOPPING);
then = lws_now_usecs();
n = task->args.task(task->args.user, task->status);
us_accrue(&task->acc_running, then);
switch (n) {
case LWS_TP_RETURN_CHECKING_IN:
/* if not destroying the tp, continue */
break;
case LWS_TP_RETURN_SYNC:
/* block until writable acknowledges */
then = lws_now_usecs();
lws_threadpool_worker_sync(pool, task);
us_accrue(&task->acc_syncing, then);
break;
case LWS_TP_RETURN_FINISHED:
state_transition(task, LWS_TP_STATUS_FINISHED);
break;
case LWS_TP_RETURN_STOPPED:
state_transition(task, LWS_TP_STATUS_STOPPED);
break;
}
} while (task->status == LWS_TP_STATUS_RUNNING);
pthread_mutex_lock(&tp->lock); /* =================== tp lock */
tp->running_tasks--;
if (pool->task->status == LWS_TP_STATUS_STOPPING)
state_transition(task, LWS_TP_STATUS_STOPPED);
/* move the task to the done queue */
pool->task->task_queue_next = tp->task_done_head;
tp->task_done_head = task;
tp->done_queue_depth++;
pool->task->done = lws_now_usecs();
if (!pool->task->args.wsi &&
(pool->task->status == LWS_TP_STATUS_STOPPED ||
pool->task->status == LWS_TP_STATUS_FINISHED)) {
__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
lwsl_thread("%s: %s: worker %d REAPING: %s\n",
__func__, tp->name, pool->worker_index,
buf);
/*
* there is no longer any wsi attached, so nothing is
* going to take care of reaping us. So we must take
* care of it ourselves.
*/
__lws_threadpool_reap(pool->task);
} else {
__lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
lwsl_thread("%s: %s: worker %d DONE: %s\n",
__func__, tp->name, pool->worker_index,
buf);
/* signal the associated wsi to take a fresh look at
* task status */
if (pool->task->args.wsi) {
task->wanted_writeable_cb = 1;
lws_cancel_service(
lws_get_context(pool->task->args.wsi));
}
}
pool->task = NULL;
pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
}
/* threadpool is being destroyed */
pthread_exit(NULL);
return NULL;
}
struct lws_threadpool *
lws_threadpool_create(struct lws_context *context,
const struct lws_threadpool_create_args *args,
const char *format, ...)
{
struct lws_threadpool *tp;
va_list ap;
int n;
tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * args->threads),
"threadpool alloc");
if (!tp)
return NULL;
memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * args->threads));
tp->pool_list = (struct lws_pool *)(tp + 1);
tp->max_queue_depth = args->max_queue_depth;
va_start(ap, format);
n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
va_end(ap);
lws_context_lock(context, __func__);
tp->context = context;
tp->tp_list = context->tp_list_head;
context->tp_list_head = tp;
lws_context_unlock(context);
pthread_mutex_init(&tp->lock, NULL);
pthread_cond_init(&tp->wake_idle, NULL);
for (n = 0; n < args->threads; n++) {
tp->pool_list[n].tp = tp;
tp->pool_list[n].worker_index = n;
pthread_mutex_init(&tp->pool_list[n].lock, NULL);
if (pthread_create(&tp->pool_list[n].thread, NULL,
lws_threadpool_worker, &tp->pool_list[n])) {
lwsl_err("thread creation failed\n");
} else
tp->threads_in_pool++;
}
return tp;
}
void
lws_threadpool_finish(struct lws_threadpool *tp)
{
struct lws_threadpool_task **c, *task;
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
/* nothing new can start, running jobs will abort as STOPPED and the
* pool threads will exit ASAP (they are joined in destroy) */
tp->destroying = 1;
/* stop everyone in the pending queue and move to the done queue */
c = &tp->task_queue_head;
while (*c) {
task = *c;
*c = task->task_queue_next;
task->task_queue_next = tp->task_done_head;
tp->task_done_head = task;
state_transition(task, LWS_TP_STATUS_STOPPED);
tp->queue_depth--;
tp->done_queue_depth++;
task->done = lws_now_usecs();
c = &task->task_queue_next;
}
pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
pthread_cond_broadcast(&tp->wake_idle);
}
void
lws_threadpool_destroy(struct lws_threadpool *tp)
{
struct lws_threadpool_task *task, *next;
struct lws_threadpool **ptp;
void *retval;
int n;
/* remove us from the context list of threadpools */
lws_context_lock(tp->context, __func__);
ptp = &tp->context->tp_list_head;
while (*ptp) {
if (*ptp == tp) {
*ptp = tp->tp_list;
break;
}
ptp = &(*ptp)->tp_list;
}
lws_context_unlock(tp->context);
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
tp->destroying = 1;
pthread_cond_broadcast(&tp->wake_idle);
pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
lws_threadpool_dump(tp);
for (n = 0; n < tp->threads_in_pool; n++) {
task = tp->pool_list[n].task;
/* he could be sitting waiting for SYNC */
if (task != NULL)
pthread_cond_broadcast(&task->wake_idle);
pthread_join(tp->pool_list[n].thread, &retval);
pthread_mutex_destroy(&tp->pool_list[n].lock);
}
lwsl_info("%s: all threadpools exited\n", __func__);
task = tp->task_done_head;
while (task) {
next = task->task_queue_next;
lws_threadpool_task_cleanup_destroy(task);
tp->done_queue_depth--;
task = next;
}
pthread_mutex_destroy(&tp->lock);
lws_free(tp);
}
/*
* we want to stop and destroy the task and related priv. The wsi may no
* longer exist.
*/
int
lws_threadpool_dequeue(struct lws *wsi)
{
struct lws_threadpool *tp;
struct lws_threadpool_task **c, *task;
int n;
task = wsi->tp_task;
if (!task)
return 0;
tp = task->tp;
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
c = &tp->task_queue_head;
/* is he queued waiting for a chance to run? Mark him as stopped and
* move him on to the done queue */
while (*c) {
if ((*c) == task) {
*c = task->task_queue_next;
task->task_queue_next = tp->task_done_head;
tp->task_done_head = task;
state_transition(task, LWS_TP_STATUS_STOPPED);
tp->queue_depth--;
tp->done_queue_depth++;
task->done = lws_now_usecs();
lwsl_debug("%s: tp %p: removed queued task wsi %p\n",
__func__, tp, task->args.wsi);
break;
}
c = &(*c)->task_queue_next;
}
/* is he on the done queue? */
c = &tp->task_done_head;
while (*c) {
if ((*c) == task) {
*c = task->task_queue_next;
task->task_queue_next = NULL;
lws_threadpool_task_cleanup_destroy(task);
tp->done_queue_depth--;
goto bail;
}
c = &(*c)->task_queue_next;
}
/* he's not in the queue... is he already running on a thread? */
for (n = 0; n < tp->threads_in_pool; n++) {
if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
continue;
/*
* ensure we don't collide with tests or changes in the
* worker thread
*/
pthread_mutex_lock(&tp->pool_list[n].lock);
/*
* mark him as having been requested to stop...
* the caller will hear about it in his service thread
* context as a request to close
*/
state_transition(task, LWS_TP_STATUS_STOPPING);
/* disconnect from wsi, and wsi from task */
task->args.wsi->tp_task = NULL;
task->args.wsi = NULL;
pthread_mutex_unlock(&tp->pool_list[n].lock);
lwsl_debug("%s: tp %p: request stop running task "
"for wsi %p\n", __func__, tp, task->args.wsi);
break;
}
if (n == tp->threads_in_pool) {
/* can't find it */
lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
__func__, tp, task->args.wsi);
task->args.wsi->tp_task = NULL;
task->args.wsi = NULL;
}
bail:
pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
return 0;
}
struct lws_threadpool_task *
lws_threadpool_enqueue(struct lws_threadpool *tp,
const struct lws_threadpool_task_args *args,
const char *format, ...)
{
struct lws_threadpool_task *task = NULL;
va_list ap;
if (tp->destroying)
return NULL;
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
/*
* if there's room on the queue, the job always goes on the queue
* first, then any free thread may pick it up after the wake_idle
*/
if (tp->queue_depth == tp->max_queue_depth) {
lwsl_notice("%s: queue reached limit %d\n", __func__,
tp->max_queue_depth);
goto bail;
}
/*
* create the task object
*/
task = lws_malloc(sizeof(*task), __func__);
if (!task)
goto bail;
memset(task, 0, sizeof(*task));
pthread_cond_init(&task->wake_idle, NULL);
task->args = *args;
task->tp = tp;
task->created = lws_now_usecs();
va_start(ap, format);
vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
va_end(ap);
/*
* add him on the tp task queue
*/
task->task_queue_next = tp->task_queue_head;
state_transition(task, LWS_TP_STATUS_QUEUED);
tp->task_queue_head = task;
tp->queue_depth++;
/*
* mark the wsi itself as depending on this tp (so wsi close for
* whatever reason can clean up)
*/
args->wsi->tp_task = task;
lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
__func__, tp->name, task, task->name, args->wsi,
tp->queue_depth);
/* alert any idle thread there's something new on the task list */
lws_memory_barrier();
pthread_cond_signal(&tp->wake_idle);
bail:
pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
return task;
}
/* this should be called from the service thread */
enum lws_threadpool_task_status
lws_threadpool_task_status_wsi(struct lws *wsi,
struct lws_threadpool_task **task, void **user)
{
enum lws_threadpool_task_status status;
struct lws_threadpool *tp;
char buf[160];
*task = wsi->tp_task;
if (!*task)
return -1;
tp = (*task)->tp;
*user = (*task)->args.user;
status = (*task)->status;
if (status == LWS_TP_STATUS_FINISHED ||
status == LWS_TP_STATUS_STOPPED) {
pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
__lws_threadpool_task_dump(*task, buf, sizeof(buf));
lwsl_thread("%s: %s: service thread REAPING: %s\n",
__func__, tp->name, buf);
__lws_threadpool_reap(*task);
lws_memory_barrier();
pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
}
return status;
}
void
lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
{
lwsl_debug("%s\n", __func__);
if (stop)
state_transition(task, LWS_TP_STATUS_STOPPING);
pthread_cond_signal(&task->wake_idle);
}

View file

@ -42,7 +42,7 @@ lws_read_h1(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
lws_filepos_t body_chunk_len;
size_t n;
// lwsl_notice("%s: h1 path: wsi state 0x%x\n", __func__, lwsi_state(wsi));
lwsl_debug("%s: h1 path: wsi state 0x%x\n", __func__, lwsi_state(wsi));
switch (lwsi_state(wsi)) {
@ -225,7 +225,7 @@ ws_mode:
break;
case LRS_DEFERRING_ACTION:
lwsl_debug("%s: LRS_DEFERRING_ACTION\n", __func__);
lwsl_notice("%s: LRS_DEFERRING_ACTION\n", __func__);
break;
case LRS_SSL_ACK_PENDING:
@ -543,6 +543,37 @@ rops_handle_POLLIN_h1(struct lws_context_per_thread *pt, struct lws *wsi,
}
#endif
/* Priority 2: pre- compression transform */
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
if (wsi->http.comp_ctx.buflist_comp ||
wsi->http.comp_ctx.may_have_more) {
enum lws_write_protocol wp = LWS_WRITE_HTTP;
lwsl_info("%s: completing comp partial (buflist_comp %p, may %d)\n",
__func__, wsi->http.comp_ctx.buflist_comp,
wsi->http.comp_ctx.may_have_more
);
if (wsi->role_ops->write_role_protocol(wsi, NULL, 0, &wp) < 0) {
lwsl_info("%s signalling to close\n", __func__);
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
lws_callback_on_writable(wsi);
if (!wsi->http.comp_ctx.buflist_comp &&
!wsi->http.comp_ctx.may_have_more &&
wsi->http.deferred_transaction_completed) {
wsi->http.deferred_transaction_completed = 0;
if (lws_http_transaction_completed(wsi))
return LWS_HPI_RET_PLEASE_CLOSE_ME;
}
return LWS_HPI_RET_HANDLED;
}
#endif
if (lws_is_flowcontrolled(wsi))
/* We cannot deal with any kind of new RX because we are
* RX-flowcontrolled.
@ -652,7 +683,7 @@ rops_write_role_protocol_h1(struct lws *wsi, unsigned char *buf, size_t len,
if (n)
return n;
lwsl_debug("%s: %p: transformed %d bytes to %d "
lwsl_info("%s: %p: transformed %d bytes to %d "
"(wp 0x%x, more %d)\n", __func__, wsi, (int)len,
(int)o, (int)*wp, wsi->http.comp_ctx.may_have_more);
@ -665,7 +696,7 @@ rops_write_role_protocol_h1(struct lws *wsi, unsigned char *buf, size_t len,
* pipelining
*/
n = lws_snprintf(c, sizeof(c), "%X\x0d\x0a", (int)o);
// lwsl_notice("%s: chunk %s\n", __func__, c);
lwsl_info("%s: chunk (%d) %s", __func__, (int)o, c);
out -= n;
o += n;
memcpy(out, c, n);
@ -673,6 +704,7 @@ rops_write_role_protocol_h1(struct lws *wsi, unsigned char *buf, size_t len,
out[o++] = '\x0a';
if (((*wp) & 0x1f) == LWS_WRITE_HTTP_FINAL) {
lwsl_info("%s: final chunk\n", __func__);
out[o++] = '0';
out[o++] = '\x0d';
out[o++] = '\x0a';
@ -902,7 +934,7 @@ rops_perform_user_POLLOUT_h1(struct lws *wsi)
wsi->http.comp_ctx.may_have_more) {
enum lws_write_protocol wp = LWS_WRITE_HTTP;
lwsl_debug("%s: completing comp partial"
lwsl_info("%s: completing comp partial"
"(buflist_comp %p, may %d)\n",
__func__, wsi->http.comp_ctx.buflist_comp,
wsi->http.comp_ctx.may_have_more);

View file

@ -1424,6 +1424,10 @@ lws_h2_parse_end_of_frame(struct lws *wsi)
}
}
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
lws_http_compression_validate(h2n->swsi);
#endif
wsi->vhost->conn_stats.h2_trans++;
p = lws_hdr_simple_ptr(h2n->swsi, WSI_TOKEN_HTTP_COLON_METHOD);
if (!strcmp(p, "POST"))

View file

@ -363,7 +363,7 @@ rops_write_role_protocol_h2(struct lws *wsi, unsigned char *buf, size_t len,
size_t olen = len;
int n;
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
unsigned char mtubuf[1450 + LWS_PRE];
unsigned char mtubuf[4096 + LWS_PRE];
#endif
/* if not in a state to send stuff, then just send nothing */
@ -396,7 +396,7 @@ rops_write_role_protocol_h2(struct lws *wsi, unsigned char *buf, size_t len,
if (n)
return n;
lwsl_debug("%s: %p: transformed %d bytes to %d "
lwsl_info("%s: %p: transformed %d bytes to %d "
"(wp 0x%x, more %d)\n", __func__,
wsi, (int)len, (int)o, (int)*wp,
wsi->http.comp_ctx.may_have_more);
@ -776,7 +776,7 @@ lws_h2_bind_for_post_before_action(struct lws *wsi)
return 1;
}
if (lws_bind_protocol(wsi, pp))
if (lws_bind_protocol(wsi, pp, __func__))
return 1;
}
@ -885,7 +885,7 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
w->http.comp_ctx.may_have_more) {
enum lws_write_protocol wp = LWS_WRITE_HTTP;
lwsl_debug("%s: completing comp partial"
lwsl_info("%s: completing comp partial"
"(buflist_comp %p, may %d)\n",
__func__, w->http.comp_ctx.buflist_comp,
w->http.comp_ctx.may_have_more);

View file

@ -720,7 +720,7 @@ lws_client_interpret_server_handshake(struct lws *wsi)
* set-cookie:.test=LWS_1456736240_336776_COOKIE;Max-Age=360000
*/
wsi->http.connection_type = HTTP_CONNECTION_KEEP_ALIVE;
wsi->http.conn_type = HTTP_CONNECTION_KEEP_ALIVE;
if (!wsi->client_h2_substream) {
p = lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP);
if (wsi->do_ws && !p) {
@ -730,7 +730,7 @@ lws_client_interpret_server_handshake(struct lws *wsi)
}
if (!p) {
p = lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP1_0);
wsi->http.connection_type = HTTP_CONNECTION_CLOSE;
wsi->http.conn_type = HTTP_CONNECTION_CLOSE;
}
if (!p) {
cce = "HS: URI missing";
@ -828,7 +828,7 @@ lws_client_interpret_server_handshake(struct lws *wsi)
/* if h1 KA is allowed, enable the queued pipeline guys */
if (!wsi->client_h2_alpn && !wsi->client_h2_substream && w == wsi) { /* ie, coming to this for the first time */
if (wsi->http.connection_type == HTTP_CONNECTION_KEEP_ALIVE)
if (wsi->http.conn_type == HTTP_CONNECTION_KEEP_ALIVE)
wsi->keepalive_active = 1;
else {
/*
@ -916,7 +916,7 @@ lws_client_interpret_server_handshake(struct lws *wsi)
wsi->http.rx_content_length;
} else /* can't do 1.1 without a content length or chunked */
if (!wsi->chunked)
wsi->http.connection_type =
wsi->http.conn_type =
HTTP_CONNECTION_CLOSE;
/*
@ -1033,7 +1033,7 @@ lws_generate_client_handshake(struct lws *wsi, char *pkt)
return NULL;
}
lws_bind_protocol(wsi, pr);
lws_bind_protocol(wsi, pr, __func__);
}
if ((wsi->protocol->callback)(wsi, LWS_CALLBACK_RAW_ADOPT,

View file

@ -141,6 +141,10 @@ lws_add_http_common_headers(struct lws *wsi, unsigned int code,
const char *content_type, lws_filepos_t content_len,
unsigned char **p, unsigned char *end)
{
const char *ka[] = { "close", "keep-alive" };
int types[] = { HTTP_CONNECTION_CLOSE, HTTP_CONNECTION_KEEP_ALIVE },
t = 0;
if (lws_add_http_header_status(wsi, code, p, end))
return 1;
@ -149,16 +153,60 @@ lws_add_http_common_headers(struct lws *wsi, unsigned int code,
(int)strlen(content_type), p, end))
return 1;
if (content_len != LWS_ILLEGAL_HTTP_CONTENT_LEN) {
if (lws_add_http_header_content_length(wsi, content_len, p, end))
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
if (!wsi->http.lcs &&
(!strncmp(content_type, "text/", 5) ||
!strcmp(content_type, "application/javascript") ||
!strcmp(content_type, "image/svg+xml")))
lws_http_compression_apply(wsi, NULL, p, end, 0);
#endif
/*
* if we decided to compress it, we don't know the content length...
* the compressed data will go out chunked on h1
*/
if (
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
!wsi->http.lcs &&
#endif
content_len != LWS_ILLEGAL_HTTP_CONTENT_LEN) {
if (lws_add_http_header_content_length(wsi, content_len,
p, end))
return 1;
} else {
if (lws_add_http_header_by_token(wsi, WSI_TOKEN_CONNECTION,
(unsigned char *)"close", 5,
p, end))
return 1;
/* there was no length... it normally means CONNECTION_CLOSE */
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
wsi->http.connection_type = HTTP_CONNECTION_CLOSE;
if (!wsi->http2_substream && wsi->http.lcs) {
/* so...
* - h1 connection
* - http compression transform active
* - did not send content length
*
* then mark as chunked...
*/
wsi->http.comp_ctx.chunking = 1;
if (lws_add_http_header_by_token(wsi,
WSI_TOKEN_HTTP_TRANSFER_ENCODING,
(unsigned char *)"chunked", 7, p, end))
return -1;
/* ... but h1 compression is chunked, if active we can
* still pipeline
*/
if (wsi->http.lcs &&
wsi->http.conn_type == HTTP_CONNECTION_KEEP_ALIVE)
t = 1;
}
#endif
if (!wsi->http2_substream) {
if (lws_add_http_header_by_token(wsi, WSI_TOKEN_CONNECTION,
(unsigned char *)ka[t],
(int)strlen(ka[t]), p, end))
return 1;
wsi->http.conn_type = types[t];
}
}
return 0;
@ -246,6 +294,7 @@ lws_add_http_header_status(struct lws *wsi, unsigned int _code,
end))
return 1;
}
headers = wsi->vhost->headers;
while (headers) {
if (lws_add_http_header_by_name(wsi,
@ -303,9 +352,9 @@ lws_return_http_status(struct lws *wsi, unsigned int code,
code == HTTP_STATUS_NOT_FOUND)
/* we should do a redirect, and do the 404 there */
if (lws_http_redirect(wsi, HTTP_STATUS_FOUND,
(uint8_t *)wsi->vhost->http.error_document_404,
(int)strlen(wsi->vhost->http.error_document_404),
&p, end) > 0)
(uint8_t *)wsi->vhost->http.error_document_404,
(int)strlen(wsi->vhost->http.error_document_404),
&p, end) > 0)
return 0;
#endif

View file

@ -39,7 +39,7 @@ enum http_version {
HTTP_VERSION_2
};
enum http_connection_type {
enum http_conn_type {
HTTP_CONNECTION_CLOSE,
HTTP_CONNECTION_KEEP_ALIVE
};
@ -226,10 +226,11 @@ struct _lws_http_mode_related {
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
struct lws_compression_support *lcs;
lws_comp_ctx_t comp_ctx;
unsigned char comp_accept_mask;
#endif
enum http_version request_version;
enum http_connection_type connection_type;
enum http_conn_type conn_type;
lws_filepos_t tx_content_length;
lws_filepos_t tx_content_remain;
lws_filepos_t rx_content_length;

View file

@ -1123,6 +1123,7 @@ excessive:
set_parsing_complete:
if (ah->ues != URIES_IDLE)
goto forbid;
if (lws_hdr_total_length(wsi, WSI_TOKEN_UPGRADE)) {
if (lws_hdr_total_length(wsi, WSI_TOKEN_VERSION))
wsi->rx_frame_type = /* temp for ws version index */

View file

@ -37,7 +37,7 @@ LWS_EXTERN int
lws_rewrite_parse(struct lws_rewrite *r,
const unsigned char *in, int in_len)
{
if (hubbub_parser_parse_chunk(r->parser, in, in_len) != HUBBUB_OK)
if (r && hubbub_parser_parse_chunk(r->parser, in, in_len) != HUBBUB_OK)
return -1;
return 0;

View file

@ -673,7 +673,7 @@ lws_http_serve(struct lws *wsi, char *uri, const char *origin,
const struct lws_protocols *pp = lws_vhost_name_to_protocol(
wsi->vhost, m->protocol);
if (lws_bind_protocol(wsi, pp))
if (lws_bind_protocol(wsi, pp, __func__))
return -1;
args.p = (char *)p;
args.max_len = lws_ptr_diff(end, p);
@ -887,7 +887,7 @@ int
lws_http_action(struct lws *wsi)
{
struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
enum http_connection_type connection_type;
enum http_conn_type conn_type;
enum http_version request_version;
char content_length_str[32];
struct lws_process_html_args args;
@ -971,9 +971,9 @@ lws_http_action(struct lws *wsi)
/* HTTP/1.1 defaults to "keep-alive", 1.0 to "close" */
if (request_version == HTTP_VERSION_1_1)
connection_type = HTTP_CONNECTION_KEEP_ALIVE;
conn_type = HTTP_CONNECTION_KEEP_ALIVE;
else
connection_type = HTTP_CONNECTION_CLOSE;
conn_type = HTTP_CONNECTION_CLOSE;
/* Override default if http "Connection:" header: */
if (lws_hdr_total_length(wsi, WSI_TOKEN_CONNECTION)) {
@ -982,12 +982,12 @@ lws_http_action(struct lws *wsi)
WSI_TOKEN_CONNECTION);
http_conn_str[sizeof(http_conn_str) - 1] = '\0';
if (!strcasecmp(http_conn_str, "keep-alive"))
connection_type = HTTP_CONNECTION_KEEP_ALIVE;
conn_type = HTTP_CONNECTION_KEEP_ALIVE;
else
if (!strcasecmp(http_conn_str, "close"))
connection_type = HTTP_CONNECTION_CLOSE;
conn_type = HTTP_CONNECTION_CLOSE;
}
wsi->http.connection_type = connection_type;
wsi->http.conn_type = conn_type;
}
n = wsi->protocol->callback(wsi, LWS_CALLBACK_FILTER_HTTP_CONNECTION,
@ -1040,7 +1040,7 @@ lws_http_action(struct lws *wsi)
lwsl_info("no hit\n");
if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0]))
if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0], "no mount hit"))
return 1;
lwsi_set_state(wsi, LRS_DOING_TRANSACTION);
@ -1260,7 +1260,7 @@ lws_http_action(struct lws *wsi)
return 1;
}
if (lws_bind_protocol(wsi, pp))
if (lws_bind_protocol(wsi, pp, "http action CALLBACK bind"))
return 1;
args.p = uri_ptr;
@ -1344,7 +1344,9 @@ lws_http_action(struct lws *wsi)
lws_vhost_name_to_protocol(
wsi->vhost, hit->protocol);
if (lws_bind_protocol(wsi, pp))
lwsi_set_state(wsi, LRS_DOING_TRANSACTION);
if (lws_bind_protocol(wsi, pp, "http_action HTTP"))
return 1;
m = pp->callback(wsi, LWS_CALLBACK_HTTP,
@ -1497,7 +1499,8 @@ raw_transition:
lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
lws_bind_protocol(wsi, &wsi->vhost->protocols[
wsi->vhost->
raw_protocol_index]);
raw_protocol_index],
__func__);
lwsl_info("transition to raw vh %s prot %d\n",
wsi->vhost->name,
wsi->vhost->raw_protocol_index);
@ -1635,6 +1638,10 @@ raw_transition:
lwsi_set_state(wsi, LRS_ESTABLISHED);
wsi->http.fop_fd = NULL;
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
lws_http_compression_validate(wsi);
#endif
lwsl_debug("%s: wsi %p: ah %p\n", __func__, (void *)wsi,
(void *)wsi->http.ah);
@ -1737,7 +1744,7 @@ lws_http_transaction_completed(struct lws *wsi)
return 0;
}
lwsl_info("%s: wsi %p\n", __func__, wsi);
lwsl_debug("%s: wsi %p\n", __func__, wsi);
#if defined(LWS_WITH_HTTP_STREAM_COMPRESSION)
lws_http_compression_destroy(wsi);
@ -1760,12 +1767,12 @@ lws_http_transaction_completed(struct lws *wsi)
if (wsi->seen_zero_length_recv)
return 1;
if (wsi->http.connection_type != HTTP_CONNECTION_KEEP_ALIVE) {
if (wsi->http.conn_type != HTTP_CONNECTION_KEEP_ALIVE) {
lwsl_info("%s: %p: close connection\n", __func__, wsi);
return 1;
}
if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0]))
if (lws_bind_protocol(wsi, &wsi->vhost->protocols[0], __func__))
return 1;
/*
@ -1804,7 +1811,7 @@ lws_http_transaction_completed(struct lws *wsi)
if (wsi->http.ah) {
// lws_buflist_describe(&wsi->buflist, wsi);
if (!lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
lwsl_info("%s: %p: nothing in buflist so detaching ah\n",
lwsl_debug("%s: %p: nothing in buflist so detaching ah\n",
__func__, wsi);
lws_header_table_detach(wsi, 1);
#ifdef LWS_WITH_TLS
@ -1840,13 +1847,14 @@ lws_http_transaction_completed(struct lws *wsi)
if (wsi->http.ah)
wsi->http.ah->ues = URIES_IDLE;
//lwsi_set_state(wsi, LRS_ESTABLISHED);
//lwsi_set_state(wsi, LRS_ESTABLISHED); // !!!
} else
if (lws_buflist_next_segment_len(&wsi->buflist, NULL))
if (lws_header_table_attach(wsi, 0))
lwsl_debug("acquired ah\n");
lwsl_info("%s: %p: keep-alive await new transaction\n", __func__, wsi);
lwsl_debug("%s: %p: keep-alive await new transaction (state 0x%x)\n",
__func__, wsi, wsi->wsistate);
lws_callback_on_writable(wsi);
return 0;
@ -2096,11 +2104,6 @@ lws_serve_http_file(struct lws *wsi, const char *file, const char *content_type,
(unsigned char *)cc, cclen, &p, end))
return -1;
if (wsi->http.connection_type == HTTP_CONNECTION_KEEP_ALIVE)
if (lws_add_http_header_by_token(wsi, WSI_TOKEN_CONNECTION,
(unsigned char *)"keep-alive", 10, &p, end))
return -1;
if (other_headers) {
if ((end - p) < other_headers_len)
return -1;
@ -2159,7 +2162,7 @@ LWS_VISIBLE int lws_serve_http_file_fragment(struct lws *wsi)
wsi->http.comp_ctx.may_have_more) {
enum lws_write_protocol wp = LWS_WRITE_HTTP;
lwsl_debug("%s: completing comp partial (buflist_comp %p, may %d)\n",
lwsl_info("%s: completing comp partial (buflist_comp %p, may %d)\n",
__func__, wsi->http.comp_ctx.buflist_comp,
wsi->http.comp_ctx.may_have_more);

View file

@ -39,6 +39,18 @@ rops_handle_POLLIN_pipe(struct lws_context_per_thread *pt, struct lws *wsi,
if (n < 0)
return LWS_HPI_RET_PLEASE_CLOSE_ME;
#endif
#if defined(LWS_WITH_THREADPOOL)
/*
* threadpools that need to call for on_writable callbacks do it by
* marking the task as needing one for its wsi, then cancelling service.
*
* Each tsi will call this to perform the actual callback_on_writable
* from the correct service thread context
*/
lws_threadpool_tsi_context(pt->context, pt->tid);
#endif
/*
* the poll() wait, or the event loop for libuv etc is a
* process-wide resource that we interrupted. So let every

View file

@ -156,11 +156,12 @@ rops_adoption_bind_raw_skt(struct lws *wsi, int type, const char *vh_prot_name)
LRS_ESTABLISHED, &role_ops_raw_skt);
if (vh_prot_name)
lws_bind_protocol(wsi, wsi->protocol);
lws_bind_protocol(wsi, wsi->protocol, __func__);
else
/* this is the only time he will transition */
lws_bind_protocol(wsi,
&wsi->vhost->protocols[wsi->vhost->raw_protocol_index]);
&wsi->vhost->protocols[wsi->vhost->raw_protocol_index],
__func__);
return 1; /* bound */
}

View file

@ -326,7 +326,8 @@ lws_process_ws_upgrade(struct lws *wsi)
!strcmp(wsi->vhost->protocols[n].name,
protocol_name)) {
lws_bind_protocol(wsi,
&wsi->vhost->protocols[n]);
&wsi->vhost->protocols[n],
"ws upgrade select pcol");
hit = 1;
break;
}
@ -353,7 +354,8 @@ lws_process_ws_upgrade(struct lws *wsi)
wsi->vhost->default_protocol_index);
n = wsi->vhost->default_protocol_index;
lws_bind_protocol(wsi, &wsi->vhost->protocols[
(int)wsi->vhost->default_protocol_index]);
(int)wsi->vhost->default_protocol_index],
"ws upgrade default pcol");
}
/* allocate the ws struct for the wsi */

View file

@ -5,6 +5,7 @@ minimal-ws-server-echo|Simple ws server that listens and echos back anything cli
minimal-ws-server-pmd-bulk|Simple ws server showing how to pass bulk data with permessage-deflate
minimal-ws-server-pmd|Simple ws server with permessage-deflate support
minimal-ws-server-ring|Like minimal-ws-server but holds the chat in a multi-tail ringbuffer
minimal-ws-server-threadpool|Demonstrates how to use a worker thread pool with lws
minimal-ws-server-threads|Simple ws server where data is produced by different threads
minimal-ws-server|Serves an index.html over http that opens a ws shared chat client in a browser

View file

@ -0,0 +1,92 @@
cmake_minimum_required(VERSION 2.8)
include(CheckIncludeFile)
include(CheckCSourceCompiles)
set(SAMP lws-minimal-ws-server-threadpool)
set(SRCS minimal-ws-server-threadpool.c)
MACRO(require_pthreads result)
CHECK_INCLUDE_FILE(pthread.h LWS_HAVE_PTHREAD_H)
if (NOT LWS_HAVE_PTHREAD_H)
if (LWS_WITH_MINIMAL_EXAMPLES)
set(result 0)
else()
message(FATAL_ERROR "threading support requires pthreads")
endif()
endif()
ENDMACRO()
# If we are being built as part of lws, confirm current build config supports
# reqconfig, else skip building ourselves.
#
# If we are being built externally, confirm installed lws was configured to
# support reqconfig, else error out with a helpful message about the problem.
#
MACRO(require_lws_config reqconfig _val result)
if (DEFINED ${reqconfig})
if (${reqconfig})
set (rq 1)
else()
set (rq 0)
endif()
else()
set(rq 0)
endif()
if (${_val} EQUAL ${rq})
set(SAME 1)
else()
set(SAME 0)
endif()
if (LWS_WITH_MINIMAL_EXAMPLES AND NOT ${SAME})
if (${_val})
message("${SAMP}: skipping as lws being built without ${reqconfig}")
else()
message("${SAMP}: skipping as lws built with ${reqconfig}")
endif()
set(${result} 0)
else()
if (LWS_WITH_MINIMAL_EXAMPLES)
set(MET ${SAME})
else()
CHECK_C_SOURCE_COMPILES("#include <libwebsockets.h>\nint main(void) {\n#if defined(${reqconfig})\n return 0;\n#else\n fail;\n#endif\n return 0;\n}\n" HAS_${reqconfig})
if (NOT DEFINED HAS_${reqconfig} OR NOT HAS_${reqconfig})
set(HAS_${reqconfig} 0)
else()
set(HAS_${reqconfig} 1)
endif()
if ((HAS_${reqconfig} AND ${_val}) OR (NOT HAS_${reqconfig} AND NOT ${_val}))
set(MET 1)
else()
set(MET 0)
endif()
endif()
if (NOT MET)
if (${_val})
message(FATAL_ERROR "This project requires lws must have been configured with ${reqconfig}")
else()
message(FATAL_ERROR "Lws configuration of ${reqconfig} is incompatible with this project")
endif()
endif()
endif()
ENDMACRO()
set(requirements 1)
require_pthreads(requirements)
require_lws_config(LWS_ROLE_WS 1 requirements)
require_lws_config(LWS_WITHOUT_SERVER 0 requirements)
require_lws_config(LWS_WITH_THREADPOOL 1 requirements)
if (requirements)
add_executable(${SAMP} ${SRCS})
if (websockets_shared)
target_link_libraries(${SAMP} websockets_shared pthread)
add_dependencies(${SAMP} websockets_shared)
else()
target_link_libraries(${SAMP} websockets pthread)
endif()
endif()

View file

@ -0,0 +1,26 @@
# lws minimal ws server (threadpool)
## build
```
$ cmake . && make
```
Pthreads is required on your system.
This demonstrates how to cleanly assign tasks bound to a wsi to a thread pool,
with a queue if the pool is occupied.
It creates a threadpool with 3 worker threads and a maxiumum queue size of 4.
The web page at http://localhost:7681 then starts up 8 x ws connections.
## usage
```
$ ./lws-minimal-ws-server-threadpool
[2018/03/13 13:09:52:2208] USER: LWS minimal ws server + threadpool | visit http://localhost:7681
[2018/03/13 13:09:52:2365] NOTICE: Creating Vhost 'default' port 7681, 2 protocols, IPv6 off
```

View file

@ -0,0 +1,127 @@
/*
* lws-minimal-ws-server=threadpool
*
* Copyright (C) 2018 Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* This demonstrates a minimal ws server that can cooperate with
* other threads cleanly. Two other threads are started, which fill
* a ringbuffer with strings at 10Hz.
*
* The actual work and thread spawning etc are done in the protocol
* implementation in protocol_lws_minimal.c.
*
* To keep it simple, it serves stuff in the subdirectory "./mount-origin" of
* the directory it was started in.
* You can change that by changing mount.origin.
*/
#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>
#define LWS_PLUGIN_STATIC
#include "protocol_lws_minimal_threadpool.c"
static struct lws_protocols protocols[] = {
{ "http", lws_callback_http_dummy, 0, 0 },
LWS_PLUGIN_PROTOCOL_MINIMAL,
{ NULL, NULL, 0, 0 } /* terminator */
};
static int interrupted;
static const struct lws_http_mount mount = {
/* .mount_next */ NULL, /* linked-list "next" */
/* .mountpoint */ "/", /* mountpoint URL */
/* .origin */ "./mount-origin", /* serve from dir */
/* .def */ "index.html", /* default filename */
/* .protocol */ NULL,
/* .cgienv */ NULL,
/* .extra_mimetypes */ NULL,
/* .interpret */ NULL,
/* .cgi_timeout */ 0,
/* .cache_max_age */ 0,
/* .auth_mask */ 0,
/* .cache_reusable */ 0,
/* .cache_revalidate */ 0,
/* .cache_intermediaries */ 0,
/* .origin_protocol */ LWSMPRO_FILE, /* files in a dir */
/* .mountpoint_len */ 1, /* char count */
/* .basic_auth_login_file */ NULL,
};
/*
* This demonstrates how to pass a pointer into a specific protocol handler
* running on a specific vhost. In this case, it's our default vhost and
* we pass the pvo named "config" with the value a const char * "myconfig".
*
* This is the preferred way to pass configuration into a specific vhost +
* protocol instance.
*/
static const struct lws_protocol_vhost_options pvo_ops = {
NULL,
NULL,
"config", /* pvo name */
(void *)"myconfig" /* pvo value */
};
static const struct lws_protocol_vhost_options pvo = {
NULL, /* "next" pvo linked-list */
&pvo_ops, /* "child" pvo linked-list */
"lws-minimal", /* protocol name we belong to on this vhost */
"" /* ignored */
};
void sigint_handler(int sig)
{
interrupted = 1;
}
int main(int argc, const char **argv)
{
struct lws_context_creation_info info;
struct lws_context *context;
const char *p;
int logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
/* for LLL_ verbosity above NOTICE to be built into lws,
* lws must have been configured and built with
* -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
/* | LLL_EXT */ /* | LLL_CLIENT */ /* | LLL_LATENCY */
/* | LLL_DEBUG */;
signal(SIGINT, sigint_handler);
if ((p = lws_cmdline_option(argc, argv, "-d")))
logs = atoi(p);
lws_set_log_level(logs, NULL);
lwsl_user("LWS minimal ws server + threadpool | visit http://localhost:7681\n");
memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */
info.port = 7681;
info.mounts = &mount;
info.protocols = protocols;
info.pvo = &pvo; /* per-vhost options */
context = lws_create_context(&info);
if (!context) {
lwsl_err("lws init failed\n");
return 1;
}
/* start the threads that create content */
while (!interrupted)
if (lws_service(context, 1000))
interrupted = 1;
lws_context_destroy(context);
return 0;
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

View file

@ -0,0 +1,97 @@
<meta charset="UTF-8">
<html>
<body>
<img src="libwebsockets.org-logo.png"><br>
<b>Minimal ws server threadpool example</b>.<br>
8 x ws connections are opened back to the example server.<br>
There are three threads in the pool to service them, the<br>
remainder are queued until a thread in the pool is free.<p>
The textarea show the last 50 lines received.
<br>
<br>
<textarea id=r readonly cols=40 rows=50></textarea><br>
</body>
<script>
var head = 0, tail = 0, ring = new Array();
function get_appropriate_ws_url(extra_url)
{
var pcol;
var u = document.URL;
/*
* We open the websocket encrypted if this page came on an
* https:// url itself, otherwise unencrypted
*/
if (u.substring(0, 5) == "https") {
pcol = "wss://";
u = u.substr(8);
} else {
pcol = "ws://";
if (u.substring(0, 4) == "http")
u = u.substr(7);
}
u = u.split('/');
/* + "/xxx" bit is for IE10 workaround */
return pcol + u[0] + "/" + extra_url;
}
function new_ws(urlpath, protocol)
{
if (typeof MozWebSocket != "undefined")
return new MozWebSocket(urlpath, protocol);
return new WebSocket(urlpath, protocol);
}
var n, wsa = new Array, alive = 0;
for (n = 0; n < 8; n++) {
ws = new_ws(get_appropriate_ws_url(""), "lws-minimal");
wsa.push(ws);
try {
ws.onopen = function() {
document.getElementById("r").disabled = 0;
alive++;
}
ws.onmessage =function got_packet(msg) {
var n, s = "";
ring[head] = msg.data + "\n";
head = (head + 1) % 50;
if (tail == head)
tail = (tail + 1) % 50;
n = tail;
do {
s = s + ring[n];
n = (n + 1) % 50;
} while (n != head);
document.getElementById("r").value = s;
document.getElementById("r").scrollTop =
document.getElementById("r").scrollHeight;
}
ws.onclose = function(){
if (--alive == 0)
document.getElementById("r").disabled = 1;
}
} catch(exception) {
alert('<p>Error' + exception);
}
}
</script>
</html>

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

View file

@ -0,0 +1,343 @@
/*
* ws protocol handler plugin for "lws-minimal" demonstrating lws threadpool
*
* Copyright (C) 2010-2018 Andy Green <andy@warmcat.com>
*
* This file is made available under the Creative Commons CC0 1.0
* Universal Public Domain Dedication.
*
* The main reason some things are as they are is that the task lifecycle may
* be unrelated to the wsi lifecycle that queued that task.
*
* Consider the task may call an external library and run for 30s without
* "checking in" to see if it should stop. The wsi that started the task may
* have closed at any time before the 30s are up, with the browser window
* closing or whatever.
*
* So data shared between the asynchronous task and the wsi must have its
* lifecycle determined by the task, not the wsi. That means a separate struct
* that can be freed by the task.
*
* In the case the wsi outlives the task, the tasks do not get destroyed until
* the service thread has called lws_threadpool_task_status() on the completed
* task. So there is no danger of the shared task private data getting randomly
* freed.
*/
#if !defined (LWS_PLUGIN_STATIC)
#define LWS_DLL
#define LWS_INTERNAL
#include <libwebsockets.h>
#endif
#include <string.h>
struct per_vhost_data__minimal {
struct lws_threadpool *tp;
const char *config;
};
struct task_data {
char result[64];
uint64_t pos, end;
};
/*
* Create the private data for the task
*
* Notice we hand over responsibility for the cleanup and freeing of the
* allocated task_data to the threadpool, because the wsi it was originally
* bound to may close while the thread is still running. So we allocate
* something discrete for the task private data that can be definitively owned
* and freed by the threadpool, not the wsi... the pss won't do, as it only
* exists for the lifecycle of the wsi connection.
*
* When the task is created, we also tell it how to destroy the private data
* by giving it args.cleanup as cleanup_task_private_data() defined below.
*/
static struct task_data *
create_task_private_data(void)
{
struct task_data *priv = malloc(sizeof(*priv));
return priv;
}
/*
* Destroy the private data for the task
*
* Notice the wsi the task was originally bound to may be long gone, in the
* case we are destroying the lws context and the thread was doing something
* for a long time without checking in.
*/
static void
cleanup_task_private_data(struct lws *wsi, void *user)
{
struct task_data *priv = (struct task_data *)user;
free(priv);
}
/*
* This runs in its own thread, from the threadpool.
*
* The implementation behind this in lws uses pthreads, but no pthreadisms are
* required in the user code.
*
* The example counts to 10M, "checking in" to see if it should stop after every
* 100K and pausing to sync with the service thread to send a ws message every
* 1M. It resumes after the service thread determines the wsi is writable and
* the LWS_CALLBACK_SERVER_WRITEABLE indicates the task thread can continue by
* calling lws_threadpool_task_sync().
*/
static enum lws_threadpool_task_return
task_function(void *user, enum lws_threadpool_task_status s)
{
struct task_data *priv = (struct task_data *)user;
int budget = 100 * 1000;
if (priv->pos == priv->end)
return LWS_TP_RETURN_FINISHED;
/*
* Preferably replace this with ~100ms of your real task, so it
* can "check in" at short intervals to see if it has been asked to
* stop.
*
* You can just run tasks atomically here with the thread dedicated
* to it, but it will cause odd delays while shutting down etc and
* the task will run to completion even if the wsi that started it
* has since closed.
*/
while (budget--)
priv->pos++;
usleep(100000);
if (!(priv->pos % (1000 * 1000))) {
lws_snprintf(priv->result + LWS_PRE,
sizeof(priv->result) - LWS_PRE,
"pos %llu", (unsigned long long)priv->pos);
return LWS_TP_RETURN_SYNC;
}
return LWS_TP_RETURN_CHECKING_IN;
}
static int
callback_minimal(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{
struct per_vhost_data__minimal *vhd =
(struct per_vhost_data__minimal *)
lws_protocol_vh_priv_get(lws_get_vhost(wsi),
lws_get_protocol(wsi));
const struct lws_protocol_vhost_options *pvo;
struct lws_threadpool_create_args cargs;
struct lws_threadpool_task_args args;
struct lws_threadpool_task *task;
struct task_data *priv;
int n, m, r = 0;
char name[32];
void *_user;
switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT:
/* create our per-vhost struct */
vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
lws_get_protocol(wsi),
sizeof(struct per_vhost_data__minimal));
if (!vhd)
return 1;
/* recover the pointer to the globals struct */
pvo = lws_pvo_search(
(const struct lws_protocol_vhost_options *)in,
"config");
if (!pvo || !pvo->value) {
lwsl_err("%s: Can't find \"config\" pvo\n", __func__);
return 1;
}
vhd->config = pvo->value;
memset(&cargs, 0, sizeof(cargs));
cargs.max_queue_depth = 8;
cargs.threads = 3;
vhd->tp = lws_threadpool_create(lws_get_context(wsi),
&cargs, "%s",
lws_get_vhost_name(lws_get_vhost(wsi)));
if (!vhd->tp)
return 1;
lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
lws_get_protocol(wsi),
LWS_CALLBACK_USER, 1);
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
lws_threadpool_finish(vhd->tp);
lws_threadpool_destroy(vhd->tp);
break;
case LWS_CALLBACK_USER:
/*
* in debug mode, dump the threadpool stat to the logs once
* a second
*/
lws_threadpool_dump(vhd->tp);
lws_timed_callback_vh_protocol(lws_get_vhost(wsi),
lws_get_protocol(wsi),
LWS_CALLBACK_USER, 1);
break;
case LWS_CALLBACK_ESTABLISHED:
memset(&args, 0, sizeof(args));
priv = args.user = create_task_private_data();
if (!args.user)
return 1;
priv->pos = 0;
priv->end = 10 * 1000 * 1000;
/* queue the task... the task takes on responsibility for
* destroying args.user. pss->priv just has a copy of it */
args.wsi = wsi;
args.task = task_function;
args.cleanup = cleanup_task_private_data;
lws_get_peer_simple(wsi, name, sizeof(name));
if (!lws_threadpool_enqueue(vhd->tp, &args, "ws %s", name)) {
lwsl_user("%s: Couldn't enqueue task\n", __func__);
cleanup_task_private_data(wsi, priv);
return 1;
}
lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL, 30);
/*
* so the asynchronous worker will let us know the next step
* by causing LWS_CALLBACK_SERVER_WRITEABLE
*/
break;
case LWS_CALLBACK_CLOSED:
break;
case LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL:
lwsl_debug("LWS_CALLBACK_WS_SERVER_DROP_PROTOCOL: %p\n", wsi);
lws_threadpool_dequeue(wsi);
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
/*
* even completed tasks wait in a queue until we call the
* below on them. Then they may destroy themselves and their
* args.user data (by calling the cleanup callback).
*
* If you need to get things from the still-valid private task
* data, copy it here before calling
* lws_threadpool_task_status() that may free the task and the
* private task data.
*/
n = lws_threadpool_task_status_wsi(wsi, &task, &_user);
lwsl_debug("%s: LWS_CALLBACK_SERVER_WRITEABLE: status %d\n",
__func__, n);
switch(n) {
case LWS_TP_STATUS_FINISHED:
case LWS_TP_STATUS_STOPPED:
case LWS_TP_STATUS_QUEUED:
case LWS_TP_STATUS_RUNNING:
case LWS_TP_STATUS_STOPPING:
return 0;
case LWS_TP_STATUS_SYNCING:
/* the task has paused for us to do something */
break;
default:
return -1;
}
priv = (struct task_data *)_user;
lws_set_timeout(wsi, PENDING_TIMEOUT_THREADPOOL_TASK, 5);
n = strlen(priv->result + LWS_PRE);
m = lws_write(wsi, (unsigned char *)priv->result + LWS_PRE,
n, LWS_WRITE_TEXT);
if (m < n) {
lwsl_err("ERROR %d writing to ws socket\n", m);
lws_threadpool_task_sync(task, 1);
return -1;
}
/*
* service thread has done whatever it wanted to do with the
* data the task produced: if it's waiting to do more it can
* continue now.
*/
lws_threadpool_task_sync(task, 0);
break;
default:
break;
}
return r;
}
#define LWS_PLUGIN_PROTOCOL_MINIMAL \
{ \
"lws-minimal", \
callback_minimal, \
0, \
128, \
0, NULL, 0 \
}
#if !defined (LWS_PLUGIN_STATIC)
/* boilerplate needed if we are built as a dynamic plugin */
static const struct lws_protocols protocols[] = {
LWS_PLUGIN_PROTOCOL_MINIMAL
};
LWS_EXTERN LWS_VISIBLE int
init_protocol_minimal(struct lws_context *context,
struct lws_plugin_capability *c)
{
if (c->api_magic != LWS_PLUGIN_API_MAGIC) {
lwsl_err("Plugin API %d, library API %d", LWS_PLUGIN_API_MAGIC,
c->api_magic);
return 1;
}
c->protocols = protocols;
c->count_protocols = LWS_ARRAY_SIZE(protocols);
c->extensions = NULL;
c->count_extensions = 0;
return 0;
}
LWS_EXTERN LWS_VISIBLE int
destroy_protocol_minimal(struct lws_context *context)
{
return 0;
}
#endif