From 56dd6499981437f1aa0bbe597f98b194eab14970 Mon Sep 17 00:00:00 2001 From: Jan Kaluza Date: Wed, 7 Dec 2011 10:30:39 +0100 Subject: [PATCH] Reconnect users automatically in case of spectrum2 main instance crash/restart --- ChangeLog | 4 ++ include/transport/mysqlbackend.h | 3 + include/transport/sqlite3backend.h | 3 + include/transport/storagebackend.h | 3 + include/transport/usermanager.h | 4 +- include/transport/usersreconnecter.h | 56 +++++++++++++++++ spectrum/src/main.cpp | 14 ++++- src/mysqlbackend.cpp | 16 +++++ src/sqlite3backend.cpp | 19 ++++++ src/transport.cpp | 2 + src/usermanager.cpp | 8 +-- src/usersreconnecter.cpp | 89 ++++++++++++++++++++++++++++ 12 files changed, 212 insertions(+), 9 deletions(-) create mode 100644 ChangeLog create mode 100644 include/transport/usersreconnecter.h create mode 100644 src/usersreconnecter.cpp diff --git a/ChangeLog b/ChangeLog new file mode 100644 index 00000000..3372f46a --- /dev/null +++ b/ChangeLog @@ -0,0 +1,4 @@ +version 2.0.0 alpha (2011-12-06): + General: + * First Spectrum 2.0.0 alpha release, check more on + http://spectrum.im/projects/spectrum/wiki/Spectrum_200_alpha diff --git a/include/transport/mysqlbackend.h b/include/transport/mysqlbackend.h index b35a345d..47c8757b 100644 --- a/include/transport/mysqlbackend.h +++ b/include/transport/mysqlbackend.h @@ -82,6 +82,8 @@ class MySQLBackend : public StorageBackend /// \return true if user has been found in database and roster has been fetched bool getBuddies(long id, std::list &roster); + bool getOnlineUsers(std::vector &users); + long addBuddy(long userId, const BuddyInfo &buddyInfo); void updateBuddy(long userId, const BuddyInfo &buddyInfo); @@ -148,6 +150,7 @@ class MySQLBackend : public StorageBackend Statement *m_getBuddies; Statement *m_getBuddiesSettings; Statement *m_setUserOnline; + Statement *m_getOnlineUsers; }; } diff --git a/include/transport/sqlite3backend.h b/include/transport/sqlite3backend.h index 91275d69..5d7372bf 100644 --- a/include/transport/sqlite3backend.h +++ b/include/transport/sqlite3backend.h @@ -70,6 +70,8 @@ class SQLite3Backend : public StorageBackend /// \param online online state void setUserOnline(long id, bool online); + bool getOnlineUsers(std::vector &users); + /// Removes user and all connected data from database. /// \param id id of user - UserInfo.id /// \return true if user has been found in database and removed @@ -115,6 +117,7 @@ class SQLite3Backend : public StorageBackend sqlite3_stmt *m_getBuddies; sqlite3_stmt *m_getBuddiesSettings; sqlite3_stmt *m_setUserOnline; + sqlite3_stmt *m_getOnlineUsers; }; } diff --git a/include/transport/storagebackend.h b/include/transport/storagebackend.h index 190ec03b..6bac4aa7 100644 --- a/include/transport/storagebackend.h +++ b/include/transport/storagebackend.h @@ -108,6 +108,9 @@ class StorageBackend /// getBuddies virtual bool getBuddies(long id, std::list &roster) = 0; + /// getOnlineUsers + virtual bool getOnlineUsers(std::vector &users) = 0; + virtual long addBuddy(long userId, const BuddyInfo &buddyInfo) = 0; virtual void updateBuddy(long userId, const BuddyInfo &buddyInfo) = 0; virtual void removeBuddy(long id) = 0; diff --git a/include/transport/usermanager.h b/include/transport/usermanager.h index cd02aade..290c4235 100644 --- a/include/transport/usermanager.h +++ b/include/transport/usermanager.h @@ -78,9 +78,9 @@ class UserManager : public Swift::EntityCapsProvider { /// Removes user. This function disconnects user and safely removes /// User class. This does *not* remove user from StorageBackend. /// \param user User class to remove - void removeUser(User *user); + void removeUser(User *user, bool onUserBehalf = true); - void removeAllUsers(); + void removeAllUsers(bool onUserBehalf = true); Swift::DiscoInfo::ref getCaps(const Swift::JID&) const; diff --git a/include/transport/usersreconnecter.h b/include/transport/usersreconnecter.h new file mode 100644 index 00000000..52497e64 --- /dev/null +++ b/include/transport/usersreconnecter.h @@ -0,0 +1,56 @@ +/** + * XMPP - libpurple transport + * + * Copyright (C) 2009, Jan Kaluza + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA + */ + +#pragma once + +#include +#include +#include +#include "Swiften/Swiften.h" + +namespace Transport { + +class StorageBackend; +class Component; + +/// Tries to reconnect users who have been online before crash/restart. +class UsersReconnecter { + public: + /// Creates new UsersReconnecter. + /// \param component Transport instance associated with this roster. + /// \param storageBackend StorageBackend from which the users will be fetched. + UsersReconnecter(Component *component, StorageBackend *storageBackend); + + /// Destructor. + virtual ~UsersReconnecter(); + + void reconnectNextUser(); + + private: + void handleConnected(); + + Component *m_component; + StorageBackend *m_storageBackend; + bool m_started; + std::vector m_users; + Swift::Timer::ref m_nextUserTimer; +}; + +} diff --git a/spectrum/src/main.cpp b/spectrum/src/main.cpp index fb52de18..d4ca6f4e 100644 --- a/spectrum/src/main.cpp +++ b/spectrum/src/main.cpp @@ -9,6 +9,7 @@ #include "transport/networkpluginserver.h" #include "transport/admininterface.h" #include "transport/statsresponder.h" +#include "transport/usersreconnecter.h" #include "transport/util.h" #include "Swiften/EventLoop/SimpleEventLoop.h" #include @@ -42,7 +43,7 @@ Component *component_ = NULL; UserManager *userManager_ = NULL; static void stop_spectrum() { - userManager_->removeAllUsers(); + userManager_->removeAllUsers(false); component_->stop(); eventLoop_->stop(); } @@ -369,13 +370,15 @@ int main(int argc, char **argv) UserManager userManager(&transport, &userRegistry, storageBackend); userManager_ = &userManager; + UserRegistration *userRegistration = NULL; + UsersReconnecter *usersReconnecter = NULL; if (storageBackend) { userRegistration = new UserRegistration(&transport, &userManager, storageBackend); userRegistration->start(); -// logger.setUserRegistration(&userRegistration); + + usersReconnecter = new UsersReconnecter(&transport, storageBackend); } -// logger.setUserManager(&userManager); FileTransferManager ftManager(&transport, &userManager); @@ -393,6 +396,11 @@ int main(int argc, char **argv) userRegistration->stop(); delete userRegistration; } + + if (usersReconnecter) { + delete usersReconnecter; + } + delete storageBackend; delete factories; } diff --git a/src/mysqlbackend.cpp b/src/mysqlbackend.cpp index 62dcde06..4bc2eec4 100644 --- a/src/mysqlbackend.cpp +++ b/src/mysqlbackend.cpp @@ -300,6 +300,7 @@ void MySQLBackend::disconnect() { delete m_updateUserSetting; delete m_updateBuddySetting; delete m_setUserOnline; + delete m_getOnlineUsers; mysql_close(&m_conn); } @@ -339,6 +340,7 @@ bool MySQLBackend::connect() { m_updateUserSetting = new Statement(&m_conn, "sis", "UPDATE " + m_prefix + "users_settings SET value=? WHERE user_id=? AND var=?"); m_setUserOnline = new Statement(&m_conn, "bi", "UPDATE " + m_prefix + "users SET online=?, last_login=NOW() WHERE id=?"); + m_getOnlineUsers = new Statement(&m_conn, "|s", "SELECT jid FROM " + m_prefix + "users WHERE online=1"); return true; } @@ -442,6 +444,20 @@ void MySQLBackend::setUserOnline(long id, bool online) { EXEC(m_setUserOnline, setUserOnline(id, online)); } +bool MySQLBackend::getOnlineUsers(std::vector &users) { + EXEC(m_getOnlineUsers, getOnlineUsers(users)); + if (!exec_ok) + return false; + + std::string jid; + while (m_getOnlineUsers->fetch() == 0) { + *m_getOnlineUsers >> jid; + users.push_back(jid); + } + + return true; +} + long MySQLBackend::addBuddy(long userId, const BuddyInfo &buddyInfo) { // "INSERT INTO " + m_prefix + "buddies (user_id, uin, subscription, groups, nickname, flags) VALUES (?, ?, ?, ?, ?, ?)" std::string groups = Util::serializeGroups(buddyInfo.groups); diff --git a/src/sqlite3backend.cpp b/src/sqlite3backend.cpp index 4313fd67..ad03bf23 100644 --- a/src/sqlite3backend.cpp +++ b/src/sqlite3backend.cpp @@ -99,6 +99,7 @@ SQLite3Backend::~SQLite3Backend(){ FINALIZE_STMT(m_updateUserSetting); FINALIZE_STMT(m_updateBuddySetting); FINALIZE_STMT(m_setUserOnline); + FINALIZE_STMT(m_getOnlineUsers); sqlite3_close(m_db); } } @@ -132,6 +133,7 @@ bool SQLite3Backend::connect() { PREP_STMT(m_updateUserSetting, "UPDATE " + m_prefix + "users_settings SET value=? WHERE user_id=? AND var=?"); PREP_STMT(m_setUserOnline, "UPDATE " + m_prefix + "users SET online=?, last_login=DATETIME('NOW') WHERE id=?"); + PREP_STMT(m_getOnlineUsers, "SELECT jid FROM " + m_prefix + "users WHERE online=1"); return true; } @@ -249,6 +251,23 @@ void SQLite3Backend::setUserOnline(long id, bool online) { EXECUTE_STATEMENT(m_setUserOnline, "setUserOnline query"); } +bool SQLite3Backend::getOnlineUsers(std::vector &users) { + sqlite3_reset(m_getOnlineUsers); + + int ret; + while((ret = sqlite3_step(m_getOnlineUsers)) == SQLITE_ROW) { + std::string jid = (const char *) sqlite3_column_text(m_getOnlineUsers, 0); + users.push_back(jid); + } + + if (ret != SQLITE_DONE) { + LOG4CXX_ERROR(logger, "getOnlineUsers query"<< (sqlite3_errmsg(m_db) == NULL ? "" : sqlite3_errmsg(m_db))); + return false; + } + + return true; +} + long SQLite3Backend::addBuddy(long userId, const BuddyInfo &buddyInfo) { // "INSERT INTO " + m_prefix + "buddies (user_id, uin, subscription, groups, nickname, flags) VALUES (?, ?, ?, ?, ?, ?)" BEGIN(m_addBuddy); diff --git a/src/transport.cpp b/src/transport.cpp index 90e28deb..ddcda381 100644 --- a/src/transport.cpp +++ b/src/transport.cpp @@ -197,6 +197,8 @@ void Component::start() { else if (m_server) { LOG4CXX_INFO(logger, "Starting component in server mode on port " << CONFIG_INT(m_config, "service.port")); m_server->start(); + // We're connected right here, because we're in server mode... + handleConnected(); } } diff --git a/src/usermanager.cpp b/src/usermanager.cpp index f4d7867e..ccb036f4 100644 --- a/src/usermanager.cpp +++ b/src/usermanager.cpp @@ -101,7 +101,7 @@ Swift::DiscoInfo::ref UserManager::getCaps(const Swift::JID &jid) const { return user->getCaps(jid); } -void UserManager::removeUser(User *user) { +void UserManager::removeUser(User *user, bool onUserBehalf) { m_users.erase(user->getJID().toBare().toString()); if (m_cachedUser == user) m_cachedUser = NULL; @@ -110,7 +110,7 @@ void UserManager::removeUser(User *user) { disconnectUser(user->getJID()); } - if (m_storageBackend) { + if (m_storageBackend && onUserBehalf) { m_storageBackend->setUserOnline(user->getUserInfo().id, false); } @@ -122,9 +122,9 @@ void UserManager::removeUser(User *user) { // VALGRIND_DO_LEAK_CHECK; } -void UserManager::removeAllUsers() { +void UserManager::removeAllUsers(bool onUserBehalf) { while(m_users.begin() != m_users.end()) { - removeUser((*m_users.begin()).second); + removeUser((*m_users.begin()).second, onUserBehalf); } } diff --git a/src/usersreconnecter.cpp b/src/usersreconnecter.cpp new file mode 100644 index 00000000..0ba66afb --- /dev/null +++ b/src/usersreconnecter.cpp @@ -0,0 +1,89 @@ +/** + * XMPP - libpurple transport + * + * Copyright (C) 2009, Jan Kaluza + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA + */ + +#include "transport/usersreconnecter.h" + +#include +#include +#include "Swiften/Queries/IQRouter.h" +#include "Swiften/Swiften.h" +#include "transport/storagebackend.h" +#include "transport/transport.h" +#include "log4cxx/logger.h" + +using namespace log4cxx; + +using namespace Swift; +using namespace boost; + +namespace Transport { + +static LoggerPtr logger = Logger::getLogger("UserReconnecter"); + +UsersReconnecter::UsersReconnecter(Component *component, StorageBackend *storageBackend) { + m_component = component; + m_storageBackend = storageBackend; + m_started = false; + + m_nextUserTimer = m_component->getNetworkFactories()->getTimerFactory()->createTimer(1000); + m_nextUserTimer->onTick.connect(boost::bind(&UsersReconnecter::reconnectNextUser, this)); + + m_component->onConnected.connect(bind(&UsersReconnecter::handleConnected, this)); +} + +UsersReconnecter::~UsersReconnecter() { + m_component->onConnected.disconnect(bind(&UsersReconnecter::handleConnected, this)); + m_nextUserTimer->stop(); + m_nextUserTimer->onTick.disconnect(boost::bind(&UsersReconnecter::reconnectNextUser, this)); +} + +void UsersReconnecter::reconnectNextUser() { + if (m_users.empty()) { + LOG4CXX_INFO(logger, "All users reconnected, stopping UserReconnecter."); + return; + } + + std::string user = m_users.back(); + m_users.pop_back(); + + LOG4CXX_INFO(logger, "Sending probe presence to " << user); + Swift::Presence::ref response = Swift::Presence::create(); + response->setTo(user); + response->setFrom(m_component->getJID()); + response->setType(Swift::Presence::Probe); + + m_component->getStanzaChannel()->sendPresence(response); + m_nextUserTimer->start(); +} + +void UsersReconnecter::handleConnected() { + if (m_started) + return; + + LOG4CXX_INFO(logger, "Starting UserReconnecter."); + m_started = true; + + m_storageBackend->getOnlineUsers(m_users); + + reconnectNextUser(); +} + + +}