diff --git a/include/villas/api/server.hpp b/include/villas/api/server.hpp index 0f7d7a709..b1b9c30bd 100644 --- a/include/villas/api/server.hpp +++ b/include/villas/api/server.hpp @@ -25,6 +25,7 @@ #include #include +#include #include @@ -54,7 +55,7 @@ protected: int sd; std::vector pfds; - std::vector sessions; + std::map sessions; void acceptNewSession(); void closeSession(sessions::Socket *s); diff --git a/include/villas/api/sessions/socket.hpp b/include/villas/api/sessions/socket.hpp index b7eb2d2d3..901ab8761 100644 --- a/include/villas/api/sessions/socket.hpp +++ b/include/villas/api/sessions/socket.hpp @@ -47,6 +47,8 @@ public: int write(); virtual std::string getName(); + + int getSd() const { return sd; } }; } // sessions diff --git a/lib/api/server.cpp b/lib/api/server.cpp index df7469d4e..da4cacc10 100644 --- a/lib/api/server.cpp +++ b/lib/api/server.cpp @@ -36,8 +36,11 @@ #include #include +using namespace villas; using namespace villas::node::api; +static Logger logger = logging.get("api"); + Server::Server(Api *a) : state(STATE_INITIALIZED), api(a) @@ -66,7 +69,6 @@ void Server::start() }; pfds.push_back(pfd); - sessions.push_back(nullptr); struct sockaddr_un sun = { .sun_family = AF_UNIX }; @@ -127,30 +129,40 @@ void Server::run(int timeout) assert(state == STATE_STARTED); - ret = poll(pfds.data(), pfds.size(), timeout); + logger->info("pfds.size() = {}", pfds.size()); + + auto len = pfds.size(); + + ret = poll(pfds.data(), len, timeout); if (ret < 0) - throw SystemError("Failed to poll on API socket");; + throw SystemError("Failed to poll on API socket"); - for (unsigned i = 0; i < pfds.size(); i++) { + std::vector closing; + + for (unsigned i = 1; i < len; i++) { auto &pfd = pfds[i]; - auto s = sessions[i]; - if (pfd.revents & POLLOUT) { - if (s) - s->write(); - } + /* pfds[0] is the server socket */ + auto s = sessions[pfd.fd]; if (pfd.revents & POLLIN) { - /* New connection */ - if (s) { - ret = s->read(); - if (ret < 0) - closeSession(s); - } - else - acceptNewSession(); + ret = s->read(); + if (ret < 0) + closing.push_back(s); + } + + if (pfd.revents & POLLOUT) { + s->write(); } } + + /* Destroy closed sessions */ + for (auto *s : closing) + closeSession(s); + + /* Accept new connections */ + if (pfds[0].revents & POLLIN) + acceptNewSession(); } void Server::acceptNewSession() { @@ -164,21 +176,21 @@ void Server::acceptNewSession() { }; pfds.push_back(pfd); - sessions.push_back(s); + sessions[fd] = s; api->sessions.push_back(s); } void Server::closeSession(sessions::Socket *s) { + int sd = s->getSd(); + + sessions.erase(sd); api->sessions.remove(s); - ptrdiff_t pos = std::find(sessions.begin(), sessions.end(), s) - sessions.begin(); + pfds.erase(std::remove_if(pfds.begin(), pfds.end(), + [sd](const pollfd &p){ return p.fd == sd; }) + ); - if (pos < (ptrdiff_t) sessions.size()) { - pfds.erase(pfds.begin() + pos); - sessions.erase(sessions.begin() + pos); - - delete s; - } + delete s; }