diff --git a/include/transport/networkpluginserver.h b/include/transport/networkpluginserver.h index 7a2d569b..f04db01b 100644 --- a/include/transport/networkpluginserver.h +++ b/include/transport/networkpluginserver.h @@ -52,6 +52,7 @@ class NetworkPluginServer { unsigned long init_res; unsigned long shared; bool acceptUsers; + bool longRun; }; NetworkPluginServer(Component *component, Config *config, UserManager *userManager); @@ -68,6 +69,8 @@ class NetworkPluginServer { void collectBackend(); + void moveToLongRunBackend(User *user); + void handleMessageReceived(NetworkConversation *conv, boost::shared_ptr &message); private: @@ -107,7 +110,7 @@ class NetworkPluginServer { void pingTimeout(); void sendPing(Backend *c); - Backend *getFreeClient(); + Backend *getFreeClient(bool acceptUsers = true, bool longRun = false); UserManager *m_userManager; VCardResponder *m_vcardResponder; @@ -120,6 +123,7 @@ class NetworkPluginServer { Swift::Timer::ref m_collectTimer; Component *m_component; std::list m_waitingUsers; + bool m_isNextLongRun; }; } diff --git a/include/transport/user.h b/include/transport/user.h index 135f82f2..840736c8 100644 --- a/include/transport/user.h +++ b/include/transport/user.h @@ -75,6 +75,14 @@ class User { void handleSubscription(Swift::Presence::ref presence); + time_t &getLastActivity() { + return m_lastActivity; + } + + void updateLastActivity() { + m_lastActivity = time(NULL); + } + /// Returns language. /// \return language const char *getLang() { return "en"; } @@ -87,8 +95,12 @@ class User { void setConnected(bool connected) { m_connected = connected; + setIgnoreDisconnect(false); + updateLastActivity(); } + void setIgnoreDisconnect(bool ignoreDisconnect); + bool isConnected() { return m_connected; } @@ -113,8 +125,10 @@ class User { void *m_data; bool m_connected; bool m_readyForConnect; + bool m_ignoreDisconnect; Swift::Timer::ref m_reconnectTimer; boost::shared_ptr connection; + time_t m_lastActivity; }; } diff --git a/spectrum/src/sample.cfg b/spectrum/src/sample.cfg index c7aea261..39d761b4 100644 --- a/spectrum/src/sample.cfg +++ b/spectrum/src/sample.cfg @@ -8,6 +8,7 @@ backend_host=localhost # < this option doesn't work yet backend_port=10001 admin_username=admin admin_password=test +#idle_reconnect_time=10 #cert= #patch to PKCS#12 certificate #cert_password= #password to that certificate if any users_per_backend=10 diff --git a/src/config.cpp b/src/config.cpp index 36e7f05f..7a772b5d 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -47,6 +47,7 @@ bool Config::load(const std::string &configfile, boost::program_options::options ("service.admin_username", value()->default_value(""), "Administrator username.") ("service.admin_password", value()->default_value(""), "Administrator password.") ("service.reuse_old_backends", value()->default_value(true), "True if Spectrum should use old backends which were full in the past.") + ("service.idle_reconnect_time", value()->default_value(4*3600), "Time in seconds after which idle users are reconnected to let their backend die.") ("identity.name", value()->default_value("Spectrum 2 Transport"), "Name showed in service discovery.") ("identity.category", value()->default_value("gateway"), "Disco#info identity category. 'gateway' by default.") ("identity.type", value()->default_value(""), "Type of transport ('icq','msn','gg','irc', ...)") diff --git a/src/networkpluginserver.cpp b/src/networkpluginserver.cpp index bd0c9204..bfbbbb30 100644 --- a/src/networkpluginserver.cpp +++ b/src/networkpluginserver.cpp @@ -155,6 +155,7 @@ NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, U m_userManager = userManager; m_config = config; m_component = component; + m_isNextLongRun = false; m_component->m_factory = new NetworkFactory(this); m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1)); m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1)); @@ -210,9 +211,10 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptrres = 0; client->init_res = 0; client->shared = 0; - client->acceptUsers = true; + client->acceptUsers = !m_isNextLongRun; + client->longRun = m_isNextLongRun; - LOG4CXX_INFO(logger, "New backend " << client << " connected. Current backend count=" << (m_clients.size() + 1)); + LOG4CXX_INFO(logger, "New" + (client->longRun ? std::string(" long-running") : "") + " backend " << client << " connected. Current backend count=" << (m_clients.size() + 1)); if (m_clients.size() == 0) { // first backend connected, start the server, we're ready. @@ -264,12 +266,12 @@ void NetworkPluginServer::handleSessionFinished(Backend *c) { delete c; // Execute new session only if there's no free one after this crash/disconnection - for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { - if ((*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend")) { - return; - } - } - exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str()); +// for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { +// if ((*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend")) { +// return; +// } +// } +// exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str()); } void NetworkPluginServer::handleConnectedPayload(const std::string &data) { @@ -452,6 +454,8 @@ void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool if (!user) return; + user->updateLastActivity(); + boost::shared_ptr msg(new Swift::Message()); if (subject) { msg->setSubject(payload.message()); @@ -590,6 +594,28 @@ void NetworkPluginServer::send(boost::shared_ptr &c, const st } void NetworkPluginServer::pingTimeout() { + // TODO: move to separate timer, those 2 loops could be expensive + time_t now = time(NULL); + std::vector usersToMove; + unsigned long diff = CONFIG_INT(m_config, "service.idle_reconnect_time"); + for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { + if ((*it)->longRun) { + continue; + } + + BOOST_FOREACH(User *u, (*it)->users) { + if (now - u->getLastActivity() > diff) { + usersToMove.push_back(u); + } + } + } + + BOOST_FOREACH(User *u, usersToMove) { + LOG4CXX_INFO(logger, "Moving user " << u->getJID().toString() << " to long-running backend"); + moveToLongRunBackend(u); + } + + // check ping responses for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { if ((*it)->pongReceived || (*it)->pongReceived == -1) { @@ -601,7 +627,12 @@ void NetworkPluginServer::pingTimeout() { (*it)->connection.reset(); // handleSessionFinished((*it)); } - + + if ((*it)->users.size() == 0) { + LOG4CXX_INFO(logger, "Disconnecting backend " << (*it) << ". There are no users."); + (*it)->connection->disconnect(); + (*it)->connection.reset(); + } } m_pingTimer->start(); } @@ -624,6 +655,43 @@ void NetworkPluginServer::collectBackend() { } } +void NetworkPluginServer::moveToLongRunBackend(User *user) { + // Check if user has already some backend + Backend *old = (Backend *) user->getData(); + if (!old) { + LOG4CXX_INFO(logger, "User " << user->getJID().toString() << " does not have old backend. Not moving."); + return; + } + + // if he's already on long run, do nothing + if (old->longRun) { + LOG4CXX_INFO(logger, "User " << user->getJID().toString() << " is already on long-running backend. Not moving."); + return; + } + + // Get free longrun backend, if there's no longrun backend, create one and wait + // for its connection + Backend *backend = getFreeClient(false, true); + if (!backend) { + LOG4CXX_INFO(logger, "No free long-running backend for user " << user->getJID().toString() << ". Will try later"); + return; + } + + // old backend will trigger disconnection which has to be ignored to keep user online + user->setIgnoreDisconnect(true); + + // remove user from the old backend + // If backend is empty, it will be collected by pingTimeout + old->users.remove(user); + + // switch to new backend and connect + user->setData(backend); + backend->users.push_back(user); + + // connect him + handleUserReadyToConnect(user); +} + void NetworkPluginServer::handleUserCreated(User *user) { Backend *c = getFreeClient(); @@ -771,7 +839,7 @@ void NetworkPluginServer::handleUserDestroyed(User *user) { } void NetworkPluginServer::handleMessageReceived(NetworkConversation *conv, boost::shared_ptr &msg) { - + conv->getConversationManager()->getUser()->updateLastActivity(); boost::shared_ptr statePayload = msg->getPayload(); if (statePayload) { pbnetwork::WrapperMessage_Type type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED; @@ -978,12 +1046,12 @@ void NetworkPluginServer::sendPing(Backend *c) { // LOG4CXX_INFO(logger, "PING to " << c); } -NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient() { +NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun) { NetworkPluginServer::Backend *c = NULL; // bool spawnNew = false; for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { // This backend is free. - if ((*it)->acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection) { + if ((*it)->acceptUsers == acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection && (*it)->longRun == longRun) { c = *it; if (!CONFIG_BOOL(m_config, "service.reuse_old_backends")) { if (c->users.size() + 1 >= CONFIG_INT(m_config, "service.users_per_backend")) { @@ -995,6 +1063,7 @@ NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient() { } if (c == NULL) { + m_isNextLongRun = longRun; exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str()); } diff --git a/src/user.cpp b/src/user.cpp index e3157724..ac519980 100644 --- a/src/user.cpp +++ b/src/user.cpp @@ -51,6 +51,7 @@ User::User(const Swift::JID &jid, UserInfo &userInfo, Component *component, User m_userInfo = userInfo; m_connected = false; m_readyForConnect = false; + m_ignoreDisconnect = false; m_reconnectTimer = m_component->getNetworkFactories()->getTimerFactory()->createTimer(10000); m_reconnectTimer->onTick.connect(boost::bind(&User::onConnectingTimeout, this)); @@ -58,6 +59,7 @@ User::User(const Swift::JID &jid, UserInfo &userInfo, Component *component, User m_rosterManager = new RosterManager(this, m_component); m_conversationManager = new ConversationManager(this, m_component); LOG4CXX_INFO(logger, m_jid.toString() << ": Created"); + updateLastActivity(); } User::~User(){ @@ -175,7 +177,16 @@ void User::onConnectingTimeout() { onReadyToConnect(); } +void User::setIgnoreDisconnect(bool ignoreDisconnect) { + m_ignoreDisconnect = ignoreDisconnect; + LOG4CXX_INFO(logger, m_jid.toString() << ": Setting ignoreDisconnect=" << m_ignoreDisconnect); +} + void User::handleDisconnected(const std::string &error) { + if (m_ignoreDisconnect) { + LOG4CXX_INFO(logger, m_jid.toString() << ": Disconnecting from legacy network ignored (probably moving between backends)"); + return; + } if (error.empty()) { LOG4CXX_INFO(logger, m_jid.toString() << ": Disconnected from legacy network");