diff --git a/src/networkpluginserver.cpp b/src/networkpluginserver.cpp index a9711faa..577348c4 100644 --- a/src/networkpluginserver.cpp +++ b/src/networkpluginserver.cpp @@ -56,6 +56,7 @@ class NetworkConversation : public Conversation { NetworkConversation(ConversationManager *conversationManager, const std::string &legacyName, bool muc = false) : Conversation(conversationManager, legacyName, muc) { } + // Called when there's new message to legacy network from XMPP network void sendMessage(boost::shared_ptr &message) { onMessageToSend(this, message); } @@ -68,13 +69,15 @@ class NetworkFactory : public Factory { NetworkFactory(NetworkPluginServer *nps) { m_nps = nps; } - + + // Creates new conversation (NetworkConversation in this case) Conversation *createConversation(ConversationManager *conversationManager, const std::string &legacyName) { NetworkConversation *nc = new NetworkConversation(conversationManager, legacyName); nc->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, m_nps, _1, _2)); return nc; } + // Creates new LocalBuddy Buddy *createBuddy(RosterManager *rosterManager, const BuddyInfo &buddyInfo) { LocalBuddy *buddy = new LocalBuddy(rosterManager, buddyInfo.id); buddy->setAlias(buddyInfo.alias); @@ -86,26 +89,34 @@ class NetworkFactory : public Factory { buddy->setIconHash(buddyInfo.settings.find("icon_hash")->second.s); return buddy; } + private: NetworkPluginServer *m_nps; }; +// Wraps google protobuf payload into WrapperMessage and serialize it to string #define WRAP(MESSAGE, TYPE) pbnetwork::WrapperMessage wrap; \ wrap.set_type(TYPE); \ wrap.set_payload(MESSAGE); \ wrap.SerializeToString(&MESSAGE); +// Executes new backend static pid_t exec_(std::string path, const char *host, const char *port, const char *config) { + // BACKEND_ID is replaced with unique ID. The ID is increasing for every backend. boost::replace_all(path, "BACKEND_ID", boost::lexical_cast(backend_id++)); + + // Add host and port. path += std::string(" --host ") + host + " --port " + port + " " + config; LOG4CXX_INFO(logger, "Starting new backend " << path); + + // Create array of char * from string using -lpopt library char *p = (char *) malloc(path.size() + 1); strcpy(p, path.c_str()); int argc; char **argv; poptParseArgvString(p, &argc, (const char ***) &argv); -// char *argv[] = {(char*)script_name, '\0'}; + // fork and exec pid_t pid = fork(); if ( pid == 0 ) { // child process @@ -121,6 +132,8 @@ static pid_t exec_(std::string path, const char *host, const char *port, const c static void SigCatcher(int n) { pid_t result; int status; + // Read exit code from all children to not have zombies arround + // WARNING: Do not put LOG4CXX_ here, because it can lead to deadlock while ((result = waitpid(0, &status, WNOHANG)) > 0) { if (result != 0) { if (WIFEXITED(status)) { @@ -137,9 +150,13 @@ static void SigCatcher(int n) { static void handleBuddyPayload(LocalBuddy *buddy, const pbnetwork::Buddy &payload) { buddy->setName(payload.buddyname()); + // Set alias only if it's not empty. Backends are allowed to send empty alias if it has + // not changed. if (!payload.alias().empty()) { buddy->setAlias(payload.alias()); } + + // Change groups if it's not empty. The same as above... if (!payload.groups().empty()) { std::vector groups; groups.push_back(payload.groups()); @@ -205,12 +222,14 @@ NetworkPluginServer::~NetworkPluginServer() { } void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr c) { + // Create new Backend instance Backend *client = new Backend; client->pongReceived = -1; client->connection = c; client->res = 0; client->init_res = 0; client->shared = 0; + // Backend does not accept new clients automatically if it's long-running client->acceptUsers = !m_isNextLongRun; client->longRun = m_isNextLongRun; @@ -226,6 +245,10 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptronDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, client)); c->onDataRead.connect(boost::bind(&NetworkPluginServer::handleDataRead, this, client, _1)); sendPing(client); + + // sendPing sets pongReceived to 0, but we want to have it -1 to ignore this backend + // in first ::pingTimeout call, because it can be called right after this function + // and backend wouldn't have any time to response to ping. client->pongReceived = -1; // some users are in queue waiting for this backend @@ -253,6 +276,9 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptrgetJID().toString()); (*it)->setData(NULL); @@ -264,14 +290,6 @@ void NetworkPluginServer::handleSessionFinished(Backend *c) { m_clients.remove(c); delete c; - - // Execute new session only if there's no free one after this crash/disconnection -// for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { -// if ((*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend")) { -// return; -// } -// } -// exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str()); } void NetworkPluginServer::handleConnectedPayload(const std::string &data) { @@ -333,10 +351,12 @@ void NetworkPluginServer::handleAuthorizationPayload(const std::string &data) { if (!user) return; + // Create subscribe presence and forward it to XMPP side Swift::Presence::ref response = Swift::Presence::create(); response->setTo(user->getJID()); std::string name = payload.buddyname(); + // TODO: // name = Swift::JID::getEscapedNode(name) if (name.find_last_of("@") != std::string::npos) { @@ -359,11 +379,14 @@ void NetworkPluginServer::handleChatStatePayload(const std::string &data, Swift: if (!user) return; + // We're not creating new Conversation just because of chatstates. + // Some networks/clients spams with chatstates a lot and it leads to bigger memory usage. NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname()); if (!conv) { return; } + // Forward chatstate boost::shared_ptr msg(new Swift::Message()); msg->addPayload(boost::make_shared(type)); @@ -410,18 +433,6 @@ void NetworkPluginServer::handleParticipantChangedPayload(const std::string &dat } conv->handleParticipantChanged(payload.nickname(), payload.flag(), payload.status(), payload.statusmessage(), payload.newname()); - -// LocalBuddy *buddy = (LocalBuddy *) user->getRosterManager()->getBuddy(payload.buddyname()); -// if (buddy) { -// handleBuddyPayload(buddy, payload); -// buddy->buddyChanged(); -// } -// else { -// buddy = new LocalBuddy(user->getRosterManager(), -1); -// handleBuddyPayload(buddy, payload); -// user->getRosterManager()->setBuddy(buddy); -// } -// std::cout << payload.nickname() << "\n"; } void NetworkPluginServer::handleRoomChangedPayload(const std::string &data) { @@ -444,18 +455,20 @@ void NetworkPluginServer::handleRoomChangedPayload(const std::string &data) { void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool subject) { pbnetwork::ConversationMessage payload; -// std::cout << "payload...\n"; + if (payload.ParseFromString(data) == false) { // TODO: ERROR return; } -// std::cout << "payload 2...\n"; + User *user = m_userManager->getUser(payload.username()); if (!user) return; + // Message from legacy network triggers network acticity user->updateLastActivity(); + // Set proper body. boost::shared_ptr msg(new Swift::Message()); if (subject) { msg->setSubject(payload.message()); @@ -464,16 +477,19 @@ void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool msg->setBody(payload.message()); } + // Add xhtml-im payload. if (!payload.xhtml().empty()) { msg->addPayload(boost::make_shared(payload.xhtml())); } + // Create new Conversation if it does not exist NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname()); if (!conv) { conv = new NetworkConversation(user->getConversationManager(), payload.buddyname()); conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2)); } + // Forward it conv->handleMessage(msg, payload.nickname()); } @@ -492,6 +508,7 @@ void NetworkPluginServer::handleAttentionPayload(const std::string &data) { msg->setBody(payload.message()); msg->addPayload(boost::make_shared()); + // Attentions trigger new Conversation creation NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname()); if (!conv) { conv = new NetworkConversation(user->getConversationManager(), payload.buddyname()); @@ -507,19 +524,28 @@ void NetworkPluginServer::handleStatsPayload(Backend *c, const std::string &data // TODO: ERROR return; } + c->res = payload.res(); c->init_res = payload.init_res(); c->shared = payload.shared(); } void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray &data) { + // Append data to buffer c->data.insert(c->data.end(), data.begin(), data.end()); + + // Parse data while there are some while (c->data.size() != 0) { + // expected_size of wrapper message unsigned int expected_size; + // if data is >= 4, we have whole header and we can + // read expected_size. if (c->data.size() >= 4) { expected_size = *((unsigned int*) &c->data[0]); expected_size = ntohl(expected_size); + // If we don't have whole wrapper message, wait for next + // handleDataRead call. if (c->data.size() - 4 < expected_size) return; } @@ -527,6 +553,7 @@ void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray return; } + // Parse wrapper message and erase it from buffer. pbnetwork::WrapperMessage wrapper; if (wrapper.ParseFromArray(&c->data[4], expected_size) == false) { std::cout << "PARSING ERROR " << expected_size << "\n"; @@ -535,6 +562,7 @@ void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray } c->data.erase(c->data.begin(), c->data.begin() + 4 + expected_size); + // Handle payload in wrapper message switch(wrapper.type()) { case pbnetwork::WrapperMessage_Type_TYPE_CONNECTED: handleConnectedPayload(wrapper.payload()); @@ -588,21 +616,29 @@ void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray } void NetworkPluginServer::send(boost::shared_ptr &c, const std::string &data) { + // generate header - size of wrapper message char header[4]; *((int*)(header)) = htonl(data.size()); + + // send header together with wrapper message c->write(Swift::createSafeByteArray(std::string(header, 4) + data)); } void NetworkPluginServer::pingTimeout() { // TODO: move to separate timer, those 2 loops could be expensive + // Some users are connected for weeks and they are blocking backend to be destroyed and its memory + // to be freed. We are finding users who are inactive for more than "idle_reconnect_time" seconds and + // reconnect them to long-running backend, where they can idle hapilly till the end of ages. time_t now = time(NULL); std::vector usersToMove; unsigned long diff = CONFIG_INT(m_config, "service.idle_reconnect_time"); for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { + // Users from long-running backends can't be moved if ((*it)->longRun) { continue; } + // Find users which are inactive for more than 'diff' BOOST_FOREACH(User *u, (*it)->users) { if (now - u->getLastActivity() > diff) { usersToMove.push_back(u); @@ -610,6 +646,7 @@ void NetworkPluginServer::pingTimeout() { } } + // Move inactive users to long-running backend. BOOST_FOREACH(User *u, usersToMove) { LOG4CXX_INFO(logger, "Moving user " << u->getJID().toString() << " to long-running backend"); if (!moveToLongRunBackend(u)) @@ -619,6 +656,8 @@ void NetworkPluginServer::pingTimeout() { // check ping responses for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { + // pong has been received OR backend just connected and did not have time to answer the ping + // request. if ((*it)->pongReceived || (*it)->pongReceived == -1) { sendPing((*it)); } @@ -639,6 +678,8 @@ void NetworkPluginServer::pingTimeout() { } void NetworkPluginServer::collectBackend() { + // Stop accepting new users to backend with the biggest memory usage. This prevents backends + // which are leaking to eat whole memory by connectin new users to legacy network. LOG4CXX_INFO(logger, "Collect backend called, finding backend which will be set to die"); unsigned long max = 0; Backend *backend = NULL; @@ -695,13 +736,17 @@ bool NetworkPluginServer::moveToLongRunBackend(User *user) { } void NetworkPluginServer::handleUserCreated(User *user) { + // Get free backend to handle this user or spawn new one if there's no free one. Backend *c = getFreeClient(); + // Add user to queue if there's no free backend to handle him so far. if (!c) { LOG4CXX_INFO(logger, "There is no backend to handle user " << user->getJID().toString() << ". Adding him to queue."); m_waitingUsers.push_back(user); return; } + + // Associate users with backend user->setData(c); c->users.push_back(user); @@ -1050,11 +1095,12 @@ void NetworkPluginServer::sendPing(Backend *c) { NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun) { NetworkPluginServer::Backend *c = NULL; -// bool spawnNew = false; + + // Check all backends and find free one for (std::list::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) { - // This backend is free. if ((*it)->acceptUsers == acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection && (*it)->longRun == longRun) { c = *it; + // if we're not reusing all backends and backend is full, stop accepting new users on this backend if (!CONFIG_BOOL(m_config, "service.reuse_old_backends")) { if (c->users.size() + 1 >= CONFIG_INT(m_config, "service.users_per_backend")) { c->acceptUsers = false; @@ -1064,6 +1110,7 @@ NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUser } } + // there's no free backend, so spawn one. if (c == NULL) { m_isNextLongRun = longRun; exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());