spectrum2/src/networkpluginserver.cpp
2011-10-06 23:42:13 +02:00

1347 lines
42 KiB
C++

/**
* libtransport -- C++ library for easy XMPP Transports development
*
* Copyright (C) 2011, Jan Kaluza <hanzz.k@gmail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
*/
#include "transport/networkpluginserver.h"
#include "transport/user.h"
#include "transport/transport.h"
#include "transport/storagebackend.h"
#include "transport/rostermanager.h"
#include "transport/usermanager.h"
#include "transport/conversationmanager.h"
#include "transport/localbuddy.h"
#include "transport/config.h"
#include "transport/conversation.h"
#include "transport/vcardresponder.h"
#include "transport/rosterresponder.h"
#include "transport/memoryreadbytestream.h"
#include "blockresponder.h"
#include "Swiften/Swiften.h"
#include "Swiften/Server/ServerStanzaChannel.h"
#include "Swiften/Elements/StreamError.h"
#include "Swiften/Network/BoostConnectionServer.h"
#include "Swiften/Elements/AttentionPayload.h"
#include "Swiften/Elements/XHTMLIMPayload.h"
#include "Swiften/Elements/InvisiblePayload.h"
#include "pbnetwork.pb.h"
#include "log4cxx/logger.h"
#include <Swiften/FileTransfer/ReadBytestream.h>
#include <Swiften/Elements/StreamInitiationFileInfo.h>
#ifdef _WIN32
#include "windows.h"
#else
#include "sys/wait.h"
#include "sys/signal.h"
#include "popt.h"
#endif
using namespace log4cxx;
namespace Transport {
static unsigned long backend_id;
static unsigned long bytestream_id;
static LoggerPtr logger = Logger::getLogger("NetworkPluginServer");
class NetworkConversation : public Conversation {
public:
NetworkConversation(ConversationManager *conversationManager, const std::string &legacyName, bool muc = false) : Conversation(conversationManager, legacyName, muc) {
}
// Called when there's new message to legacy network from XMPP network
void sendMessage(boost::shared_ptr<Swift::Message> &message) {
onMessageToSend(this, message);
}
boost::signal<void (NetworkConversation *, boost::shared_ptr<Swift::Message> &)> onMessageToSend;
};
class NetworkFactory : public Factory {
public:
NetworkFactory(NetworkPluginServer *nps) {
m_nps = nps;
}
// Creates new conversation (NetworkConversation in this case)
Conversation *createConversation(ConversationManager *conversationManager, const std::string &legacyName) {
NetworkConversation *nc = new NetworkConversation(conversationManager, legacyName);
nc->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, m_nps, _1, _2));
return nc;
}
// Creates new LocalBuddy
Buddy *createBuddy(RosterManager *rosterManager, const BuddyInfo &buddyInfo) {
LocalBuddy *buddy = new LocalBuddy(rosterManager, buddyInfo.id);
buddy->setAlias(buddyInfo.alias);
buddy->setName(buddyInfo.legacyName);
buddy->setSubscription(buddyInfo.subscription);
buddy->setGroups(buddyInfo.groups);
buddy->setFlags((BuddyFlag) (buddyInfo.flags));
if (buddyInfo.settings.find("icon_hash") != buddyInfo.settings.end())
buddy->setIconHash(buddyInfo.settings.find("icon_hash")->second.s);
return buddy;
}
private:
NetworkPluginServer *m_nps;
};
// Wraps google protobuf payload into WrapperMessage and serialize it to string
#define WRAP(MESSAGE, TYPE) pbnetwork::WrapperMessage wrap; \
wrap.set_type(TYPE); \
wrap.set_payload(MESSAGE); \
wrap.SerializeToString(&MESSAGE);
// Executes new backend
static unsigned long exec_(std::string path, const char *host, const char *port, const char *config) {
std::string original_path = path;
// BACKEND_ID is replaced with unique ID. The ID is increasing for every backend.
boost::replace_all(path, "BACKEND_ID", boost::lexical_cast<std::string>(backend_id++));
// Add host and port.
path += std::string(" --host ") + host + " --port " + port + " " + config;
LOG4CXX_INFO(logger, "Starting new backend " << path);
#ifdef _WIN32
STARTUPINFO si;
PROCESS_INFORMATION pi;
ZeroMemory (&si, sizeof(si));
si.cb=sizeof (si);
if (! CreateProcess(
NULL,
(LPSTR)path.c_str(), // command line
0, // process attributes
0, // thread attributes
0, // inherit handles
0, // creation flags
0, // environment
0, // cwd
&si,
&pi
)
) {
LOG4CXX_ERROR(logger, "Could not start process");
}
return 0;
#else
// Create array of char * from string using -lpopt library
char *p = (char *) malloc(path.size() + 1);
strcpy(p, path.c_str());
int argc;
char **argv;
poptParseArgvString(p, &argc, (const char ***) &argv);
// fork and exec
pid_t pid = fork();
if ( pid == 0 ) {
// child process
exit(execv(argv[0], argv));
} else if ( pid < 0 ) {
// fork failed
}
free(p);
return (unsigned long) pid;
#endif
}
#ifndef _WIN32
static void SigCatcher(int n) {
pid_t result;
int status;
// Read exit code from all children to not have zombies arround
// WARNING: Do not put LOG4CXX_ here, because it can lead to deadlock
while ((result = waitpid(0, &status, WNOHANG)) > 0) {
if (result != 0) {
if (WIFEXITED(status)) {
if (WEXITSTATUS(status) != 0) {
// LOG4CXX_ERROR(logger, "Backend can not be started, exit_code=" << WEXITSTATUS(status));
}
}
else {
// LOG4CXX_ERROR(logger, "Backend can not be started");
}
}
}
}
#endif
static void handleBuddyPayload(LocalBuddy *buddy, const pbnetwork::Buddy &payload) {
buddy->setName(payload.buddyname());
// Set alias only if it's not empty. Backends are allowed to send empty alias if it has
// not changed.
if (!payload.alias().empty()) {
buddy->setAlias(payload.alias());
}
// Change groups if it's not empty. The same as above...
if (!payload.groups().empty()) {
std::vector<std::string> groups;
groups.push_back(payload.groups());
buddy->setGroups(groups);
}
buddy->setStatus(Swift::StatusShow((Swift::StatusShow::Type) payload.status()), payload.statusmessage());
buddy->setIconHash(payload.iconhash());
buddy->setBlocked(payload.blocked());
}
NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, UserManager *userManager, FileTransferManager *ftManager) {
m_ftManager = ftManager;
m_userManager = userManager;
m_config = config;
m_component = component;
m_isNextLongRun = false;
m_component->m_factory = new NetworkFactory(this);
m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1));
m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1));
m_pingTimer = component->getNetworkFactories()->getTimerFactory()->createTimer(20000);
m_pingTimer->onTick.connect(boost::bind(&NetworkPluginServer::pingTimeout, this));
m_pingTimer->start();
if (CONFIG_INT(m_config, "service.memory_collector_time") != 0) {
m_collectTimer = component->getNetworkFactories()->getTimerFactory()->createTimer(CONFIG_INT(m_config, "service.memory_collector_time"));
m_collectTimer->onTick.connect(boost::bind(&NetworkPluginServer::collectBackend, this));
m_collectTimer->start();
}
m_vcardResponder = new VCardResponder(component->getIQRouter(), component->getNetworkFactories(), userManager);
m_vcardResponder->onVCardRequired.connect(boost::bind(&NetworkPluginServer::handleVCardRequired, this, _1, _2, _3));
m_vcardResponder->onVCardUpdated.connect(boost::bind(&NetworkPluginServer::handleVCardUpdated, this, _1, _2));
m_vcardResponder->start();
m_rosterResponder = new RosterResponder(component->getIQRouter(), userManager);
m_rosterResponder->onBuddyAdded.connect(boost::bind(&NetworkPluginServer::handleBuddyAdded, this, _1, _2));
m_rosterResponder->onBuddyRemoved.connect(boost::bind(&NetworkPluginServer::handleBuddyRemoved, this, _1));
m_rosterResponder->onBuddyUpdated.connect(boost::bind(&NetworkPluginServer::handleBuddyUpdated, this, _1, _2));
m_rosterResponder->start();
m_blockResponder = new BlockResponder(component->getIQRouter(), userManager);
m_blockResponder->onBlockToggled.connect(boost::bind(&NetworkPluginServer::handleBlockToggled, this, _1));
m_blockResponder->start();
m_server = component->getNetworkFactories()->getConnectionServerFactory()->createConnectionServer(Swift::HostAddress(CONFIG_STRING(m_config, "service.backend_host")), boost::lexical_cast<int>(CONFIG_STRING(m_config, "service.backend_port")));
m_server->onNewConnection.connect(boost::bind(&NetworkPluginServer::handleNewClientConnection, this, _1));
m_server->start();
LOG4CXX_INFO(logger, "Listening on host " << CONFIG_STRING(m_config, "service.backend_host") << " port " << CONFIG_STRING(m_config, "service.backend_port"));
#ifndef _WIN32
signal(SIGCHLD, SigCatcher);
#endif
exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());
}
NetworkPluginServer::~NetworkPluginServer() {
m_pingTimer->stop();
m_server->stop();
m_server.reset();
delete m_component->m_factory;
delete m_vcardResponder;
delete m_rosterResponder;
delete m_blockResponder;
}
void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Connection> c) {
// Create new Backend instance
Backend *client = new Backend;
client->pongReceived = -1;
client->connection = c;
client->res = 0;
client->init_res = 0;
client->shared = 0;
// Backend does not accept new clients automatically if it's long-running
client->acceptUsers = !m_isNextLongRun;
client->longRun = m_isNextLongRun;
LOG4CXX_INFO(logger, "New" + (client->longRun ? std::string(" long-running") : "") + " backend " << client << " connected. Current backend count=" << (m_clients.size() + 1));
if (m_clients.size() == 0) {
// first backend connected, start the server, we're ready.
m_component->start();
}
m_clients.push_front(client);
c->onDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, client));
c->onDataRead.connect(boost::bind(&NetworkPluginServer::handleDataRead, this, client, _1));
sendPing(client);
// sendPing sets pongReceived to 0, but we want to have it -1 to ignore this backend
// in first ::pingTimeout call, because it can be called right after this function
// and backend wouldn't have any time to response to ping.
client->pongReceived = -1;
// some users are in queue waiting for this backend
while(!m_waitingUsers.empty()) {
// There's no new backend, so stop associating users and wait for new backend,
// which has been already spawned in getFreeClient() call.
if (getFreeClient() == NULL)
break;
User *u = m_waitingUsers.front();
m_waitingUsers.pop_front();
LOG4CXX_INFO(logger, "Associating " << u->getJID().toString() << " with this backend");
// associate backend with user
handleUserCreated(u);
// connect user if it's ready
if (u->isReadyToConnect()) {
handleUserReadyToConnect(u);
}
}
}
void NetworkPluginServer::handleSessionFinished(Backend *c) {
LOG4CXX_INFO(logger, "Backend " << c << " disconnected. Current backend count=" << (m_clients.size() - 1));
// If there are users associated with this backend, it must have crashed, so print error output
// and disconnect users
for (std::list<User *>::const_iterator it = c->users.begin(); it != c->users.end(); it++) {
LOG4CXX_ERROR(logger, "Backend " << c << " disconnected (probably crashed) with active user " << (*it)->getJID().toString());
(*it)->setData(NULL);
(*it)->handleDisconnected("Internal Server Error, please reconnect.");
}
c->connection->onDisconnected.disconnect_all_slots();
c->connection->onDataRead.disconnect_all_slots();
c->connection->disconnect();
c->connection.reset();
m_clients.remove(c);
delete c;
}
void NetworkPluginServer::handleConnectedPayload(const std::string &data) {
pbnetwork::Connected payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.user());
if (!user) {
return;
}
user->setConnected(true);
m_component->m_userRegistry->onPasswordValid(payload.user());
}
void NetworkPluginServer::handleDisconnectedPayload(const std::string &data) {
pbnetwork::Disconnected payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
m_component->m_userRegistry->onPasswordInvalid(payload.user());
User *user = m_userManager->getUser(payload.user());
if (!user) {
return;
}
user->handleDisconnected(payload.message());
}
void NetworkPluginServer::handleVCardPayload(const std::string &data) {
pbnetwork::VCard payload;
if (payload.ParseFromString(data) == false) {
std::cout << "PARSING ERROR\n";
// TODO: ERROR
return;
}
boost::shared_ptr<Swift::VCard> vcard(new Swift::VCard());
vcard->setFullName(payload.fullname());
vcard->setPhoto(Swift::createByteArray(payload.photo()));
vcard->setNickname(payload.nickname());
m_vcardResponder->sendVCard(payload.id(), vcard);
}
void NetworkPluginServer::handleAuthorizationPayload(const std::string &data) {
pbnetwork::Buddy payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
// Create subscribe presence and forward it to XMPP side
Swift::Presence::ref response = Swift::Presence::create();
response->setTo(user->getJID());
std::string name = payload.buddyname();
name = Swift::JID::getEscapedNode(name);
// if (name.find_last_of("@") != std::string::npos) { // OK when commented
// name.replace(name.find_last_of("@"), 1, "%"); // OK when commented
// }
response->setFrom(Swift::JID(name, m_component->getJID().toString()));
response->setType(Swift::Presence::Subscribe);
m_component->getStanzaChannel()->sendPresence(response);
}
void NetworkPluginServer::handleChatStatePayload(const std::string &data, Swift::ChatState::ChatStateType type) {
pbnetwork::Buddy payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
// We're not creating new Conversation just because of chatstates.
// Some networks/clients spams with chatstates a lot and it leads to bigger memory usage.
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
if (!conv) {
return;
}
// Forward chatstate
boost::shared_ptr<Swift::Message> msg(new Swift::Message());
msg->addPayload(boost::make_shared<Swift::ChatState>(type));
conv->handleMessage(msg);
}
void NetworkPluginServer::handleBuddyChangedPayload(const std::string &data) {
pbnetwork::Buddy payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
LocalBuddy *buddy = (LocalBuddy *) user->getRosterManager()->getBuddy(payload.buddyname());
if (buddy) {
handleBuddyPayload(buddy, payload);
buddy->handleBuddyChanged();
}
else {
buddy = new LocalBuddy(user->getRosterManager(), -1);
buddy->setFlags(BUDDY_JID_ESCAPING);
handleBuddyPayload(buddy, payload);
user->getRosterManager()->setBuddy(buddy);
}
}
void NetworkPluginServer::handleParticipantChangedPayload(const std::string &data) {
pbnetwork::Participant payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.room());
if (!conv) {
return;
}
conv->handleParticipantChanged(payload.nickname(), payload.flag(), payload.status(), payload.statusmessage(), payload.newname());
}
void NetworkPluginServer::handleRoomChangedPayload(const std::string &data) {
pbnetwork::Room payload;
if (payload.ParseFromString(data) == false) {
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.room());
if (!conv) {
return;
}
conv->setNickname(payload.nickname());
}
void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool subject) {
pbnetwork::ConversationMessage payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
// Message from legacy network triggers network acticity
user->updateLastActivity();
// Set proper body.
boost::shared_ptr<Swift::Message> msg(new Swift::Message());
if (subject) {
msg->setSubject(payload.message());
}
else {
msg->setBody(payload.message());
}
// Add xhtml-im payload.
if (!payload.xhtml().empty()) {
msg->addPayload(boost::make_shared<Swift::XHTMLIMPayload>(payload.xhtml()));
}
// Create new Conversation if it does not exist
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
if (!conv) {
conv = new NetworkConversation(user->getConversationManager(), payload.buddyname());
conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
}
// Forward it
conv->handleMessage(msg, payload.nickname());
}
void NetworkPluginServer::handleAttentionPayload(const std::string &data) {
pbnetwork::ConversationMessage payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
boost::shared_ptr<Swift::Message> msg(new Swift::Message());
msg->setBody(payload.message());
msg->addPayload(boost::make_shared<Swift::AttentionPayload>());
// Attentions trigger new Conversation creation
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
if (!conv) {
conv = new NetworkConversation(user->getConversationManager(), payload.buddyname());
conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
}
conv->handleMessage(msg);
}
void NetworkPluginServer::handleStatsPayload(Backend *c, const std::string &data) {
pbnetwork::Stats payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
c->res = payload.res();
c->init_res = payload.init_res();
c->shared = payload.shared();
}
void NetworkPluginServer::handleFTStartPayload(const std::string &data) {
pbnetwork::File payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
LOG4CXX_INFO(logger, "handleFTStartPayload " << payload.filename() << " " << payload.buddyname());
LocalBuddy *buddy = (LocalBuddy *) user->getRosterManager()->getBuddy(payload.buddyname());
if (!buddy) {
// TODO: escape? reject?
return;
}
Swift::StreamInitiationFileInfo fileInfo;
fileInfo.setSize(payload.size());
fileInfo.setName(payload.filename());
Backend *c = (Backend *) user->getData();
boost::shared_ptr<MemoryReadBytestream> bytestream(new MemoryReadBytestream(payload.size()));
bytestream->onDataNeeded.connect(boost::bind(&NetworkPluginServer::handleFTDataNeeded, this, c, bytestream_id + 1));
LOG4CXX_INFO(logger, "jid=" << buddy->getJID());
FileTransferManager::Transfer transfer = m_ftManager->sendFile(user, buddy, bytestream, fileInfo);
if (!transfer.ft) {
handleFTRejected(user, payload.buddyname(), payload.filename(), payload.size());
return;
}
m_filetransfers[++bytestream_id] = transfer;
transfer.ft->onStateChange.connect(boost::bind(&NetworkPluginServer::handleFTStateChanged, this, _1, payload.username(), payload.buddyname(), payload.filename(), payload.size(), bytestream_id));
transfer.ft->start();
}
void NetworkPluginServer::handleFTFinishPayload(const std::string &data) {
pbnetwork::File payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
if (payload.has_ftid()) {
if (m_filetransfers.find(payload.ftid()) != m_filetransfers.end()) {
FileTransferManager::Transfer &transfer = m_filetransfers[payload.ftid()];
transfer.ft->cancel();
}
else {
LOG4CXX_ERROR(logger, "FTFinishPayload for unknown ftid=" << payload.ftid());
}
}
}
void NetworkPluginServer::handleFTDataPayload(Backend *b, const std::string &data) {
pbnetwork::FileTransferData payload;
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
// User *user = m_userManager->getUser(payload.username());
// if (!user)
// return;
FileTransferManager::Transfer &transfer = m_filetransfers[payload.ftid()];
MemoryReadBytestream *bytestream = (MemoryReadBytestream *) transfer.readByteStream.get();
if (bytestream->appendData(payload.data()) > 5000000) {
pbnetwork::FileTransferData f;
f.set_ftid(payload.ftid());
f.set_data("");
std::string message;
f.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_FT_PAUSE);
send(b->connection, message);
}
}
void NetworkPluginServer::handleFTDataNeeded(Backend *b, unsigned long ftid) {
pbnetwork::FileTransferData f;
f.set_ftid(ftid);
f.set_data("");
std::string message;
f.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_FT_CONTINUE);
send(b->connection, message);
}
void NetworkPluginServer::handleDataRead(Backend *c, boost::shared_ptr<Swift::SafeByteArray> data) {
// Append data to buffer
c->data.insert(c->data.end(), data->begin(), data->end());
// Parse data while there are some
while (c->data.size() != 0) {
// expected_size of wrapper message
unsigned int expected_size;
// if data is >= 4, we have whole header and we can
// read expected_size.
if (c->data.size() >= 4) {
expected_size = *((unsigned int*) &c->data[0]);
expected_size = ntohl(expected_size);
// If we don't have whole wrapper message, wait for next
// handleDataRead call.
if (c->data.size() - 4 < expected_size)
return;
}
else {
return;
}
// Parse wrapper message and erase it from buffer.
pbnetwork::WrapperMessage wrapper;
if (wrapper.ParseFromArray(&c->data[4], expected_size) == false) {
std::cout << "PARSING ERROR " << expected_size << "\n";
c->data.erase(c->data.begin(), c->data.begin() + 4 + expected_size);
continue;
}
c->data.erase(c->data.begin(), c->data.begin() + 4 + expected_size);
// Handle payload in wrapper message
switch(wrapper.type()) {
case pbnetwork::WrapperMessage_Type_TYPE_CONNECTED:
handleConnectedPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_DISCONNECTED:
handleDisconnectedPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED:
handleBuddyChangedPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_CONV_MESSAGE:
handleConvMessagePayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_ROOM_SUBJECT_CHANGED:
handleConvMessagePayload(wrapper.payload(), true);
break;
case pbnetwork::WrapperMessage_Type_TYPE_PONG:
c->pongReceived = true;
break;
case pbnetwork::WrapperMessage_Type_TYPE_PARTICIPANT_CHANGED:
handleParticipantChangedPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_ROOM_NICKNAME_CHANGED:
handleRoomChangedPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_VCARD:
handleVCardPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_BUDDY_TYPING:
handleChatStatePayload(wrapper.payload(), Swift::ChatState::Composing);
break;
case pbnetwork::WrapperMessage_Type_TYPE_BUDDY_TYPED:
handleChatStatePayload(wrapper.payload(), Swift::ChatState::Paused);
break;
case pbnetwork::WrapperMessage_Type_TYPE_BUDDY_STOPPED_TYPING:
handleChatStatePayload(wrapper.payload(), Swift::ChatState::Active);
break;
case pbnetwork::WrapperMessage_Type_TYPE_AUTH_REQUEST:
handleAuthorizationPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_ATTENTION:
handleAttentionPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_STATS:
handleStatsPayload(c, wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_FT_START:
handleFTStartPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_FT_FINISH:
handleFTFinishPayload(wrapper.payload());
break;
case pbnetwork::WrapperMessage_Type_TYPE_FT_DATA:
handleFTDataPayload(c, wrapper.payload());
break;
default:
return;
}
}
}
void NetworkPluginServer::send(boost::shared_ptr<Swift::Connection> &c, const std::string &data) {
// generate header - size of wrapper message
char header[4];
*((int*)(header)) = htonl(data.size());
// send header together with wrapper message
c->write(Swift::createSafeByteArray(std::string(header, 4) + data));
}
void NetworkPluginServer::pingTimeout() {
// TODO: move to separate timer, those 2 loops could be expensive
// Some users are connected for weeks and they are blocking backend to be destroyed and its memory
// to be freed. We are finding users who are inactive for more than "idle_reconnect_time" seconds and
// reconnect them to long-running backend, where they can idle hapilly till the end of ages.
time_t now = time(NULL);
std::vector<User *> usersToMove;
unsigned long diff = CONFIG_INT(m_config, "service.idle_reconnect_time");
if (diff != 0) {
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
// Users from long-running backends can't be moved
if ((*it)->longRun) {
continue;
}
// Find users which are inactive for more than 'diff'
BOOST_FOREACH(User *u, (*it)->users) {
if (now - u->getLastActivity() > diff) {
usersToMove.push_back(u);
}
}
}
// Move inactive users to long-running backend.
BOOST_FOREACH(User *u, usersToMove) {
LOG4CXX_INFO(logger, "Moving user " << u->getJID().toString() << " to long-running backend");
if (!moveToLongRunBackend(u))
break;
}
}
// check ping responses
std::vector<Backend *> toRemove;
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
// pong has been received OR backend just connected and did not have time to answer the ping
// request.
if ((*it)->pongReceived || (*it)->pongReceived == -1) {
sendPing((*it));
}
else {
LOG4CXX_INFO(logger, "Disconnecting backend " << (*it) << ". PING response not received.");
toRemove.push_back(*it);
}
if ((*it)->users.size() == 0) {
LOG4CXX_INFO(logger, "Disconnecting backend " << (*it) << ". There are no users.");
toRemove.push_back(*it);
}
}
BOOST_FOREACH(Backend *b, toRemove) {
handleSessionFinished(b);
}
m_pingTimer->start();
}
void NetworkPluginServer::collectBackend() {
// Stop accepting new users to backend with the biggest memory usage. This prevents backends
// which are leaking to eat whole memory by connectin new users to legacy network.
LOG4CXX_INFO(logger, "Collect backend called, finding backend which will be set to die");
unsigned long max = 0;
Backend *backend = NULL;
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
if ((*it)->res > max) {
max = (*it)->res;
backend = (*it);
}
}
if (backend) {
if (m_collectTimer) {
m_collectTimer->start();
}
LOG4CXX_INFO(logger, "Backend " << backend << "is set to die");
backend->acceptUsers = false;
}
}
bool NetworkPluginServer::moveToLongRunBackend(User *user) {
// Check if user has already some backend
Backend *old = (Backend *) user->getData();
if (!old) {
LOG4CXX_INFO(logger, "User " << user->getJID().toString() << " does not have old backend. Not moving.");
return true;
}
// if he's already on long run, do nothing
if (old->longRun) {
LOG4CXX_INFO(logger, "User " << user->getJID().toString() << " is already on long-running backend. Not moving.");
return true;
}
// Get free longrun backend, if there's no longrun backend, create one and wait
// for its connection
Backend *backend = getFreeClient(false, true);
if (!backend) {
LOG4CXX_INFO(logger, "No free long-running backend for user " << user->getJID().toString() << ". Will try later");
return false;
}
// old backend will trigger disconnection which has to be ignored to keep user online
user->setIgnoreDisconnect(true);
// remove user from the old backend
// If backend is empty, it will be collected by pingTimeout
old->users.remove(user);
// switch to new backend and connect
user->setData(backend);
backend->users.push_back(user);
// connect him
handleUserReadyToConnect(user);
return true;
}
void NetworkPluginServer::handleUserCreated(User *user) {
// Get free backend to handle this user or spawn new one if there's no free one.
Backend *c = getFreeClient();
// Add user to queue if there's no free backend to handle him so far.
if (!c) {
LOG4CXX_INFO(logger, "There is no backend to handle user " << user->getJID().toString() << ". Adding him to queue.");
m_waitingUsers.push_back(user);
return;
}
// Associate users with backend
user->setData(c);
c->users.push_back(user);
// UserInfo userInfo = user->getUserInfo();
user->onReadyToConnect.connect(boost::bind(&NetworkPluginServer::handleUserReadyToConnect, this, user));
user->onPresenceChanged.connect(boost::bind(&NetworkPluginServer::handleUserPresenceChanged, this, user, _1));
user->onRoomJoined.connect(boost::bind(&NetworkPluginServer::handleRoomJoined, this, user, _1, _2, _3));
user->onRoomLeft.connect(boost::bind(&NetworkPluginServer::handleRoomLeft, this, user, _1));
}
void NetworkPluginServer::handleUserReadyToConnect(User *user) {
UserInfo userInfo = user->getUserInfo();
pbnetwork::Login login;
login.set_user(user->getJID().toBare());
login.set_legacyname(userInfo.uin);
login.set_password(userInfo.password);
std::string message;
login.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LOGIN);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleUserPresenceChanged(User *user, Swift::Presence::ref presence) {
if (presence->getShow() == Swift::StatusShow::None)
return;
UserInfo userInfo = user->getUserInfo();
pbnetwork::Status status;
status.set_username(user->getJID().toBare());
bool isInvisible = presence->getPayload<Swift::InvisiblePayload>() != NULL;
if (isInvisible) {
LOG4CXX_INFO(logger, "This presence is invisible");
status.set_status((pbnetwork::STATUS_INVISIBLE));
}
else {
status.set_status((pbnetwork::StatusType) presence->getShow());
}
status.set_statusmessage(presence->getStatus());
std::string message;
status.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_STATUS_CHANGED);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleRoomJoined(User *user, const std::string &r, const std::string &nickname, const std::string &password) {
UserInfo userInfo = user->getUserInfo();
pbnetwork::Room room;
room.set_username(user->getJID().toBare());
room.set_nickname(nickname);
room.set_room(r);
room.set_password(password);
std::string message;
room.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_JOIN_ROOM);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
NetworkConversation *conv = new NetworkConversation(user->getConversationManager(), r, true);
conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
conv->setNickname(nickname);
}
void NetworkPluginServer::handleRoomLeft(User *user, const std::string &r) {
UserInfo userInfo = user->getUserInfo();
pbnetwork::Room room;
room.set_username(user->getJID().toBare());
room.set_nickname("");
room.set_room(r);
room.set_password("");
std::string message;
room.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LEAVE_ROOM);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(r);
if (!conv) {
return;
}
user->getConversationManager()->removeConversation(conv);
delete conv;
}
void NetworkPluginServer::handleUserDestroyed(User *user) {
std::cout << "HANDLE_DESTROYED\n";
m_waitingUsers.remove(user);
UserInfo userInfo = user->getUserInfo();
pbnetwork::Logout logout;
logout.set_user(user->getJID().toBare());
logout.set_legacyname(userInfo.uin);
std::string message;
logout.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LOGOUT);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
c->users.remove(user);
// if (c->users.size() == 0) {
// LOG4CXX_INFO(logger, "Disconnecting backend " << c << ". There are no users.");
// handleSessionFinished(c);
// m_clients.erase(user->connection);
// }
}
void NetworkPluginServer::handleMessageReceived(NetworkConversation *conv, boost::shared_ptr<Swift::Message> &msg) {
conv->getConversationManager()->getUser()->updateLastActivity();
boost::shared_ptr<Swift::ChatState> statePayload = msg->getPayload<Swift::ChatState>();
if (statePayload) {
pbnetwork::WrapperMessage_Type type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED;
switch (statePayload->getChatState()) {
case Swift::ChatState::Active:
type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_STOPPED_TYPING;
break;
case Swift::ChatState::Composing:
type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_TYPING;
break;
case Swift::ChatState::Paused:
type = pbnetwork::WrapperMessage_Type_TYPE_BUDDY_TYPED;
break;
default:
break;
}
if (type != pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED) {
pbnetwork::Buddy buddy;
buddy.set_username(conv->getConversationManager()->getUser()->getJID().toBare());
buddy.set_buddyname(conv->getLegacyName());
std::string message;
buddy.SerializeToString(&message);
WRAP(message, type);
Backend *c = (Backend *) conv->getConversationManager()->getUser()->getData();
if (!c) {
return;
}
send(c->connection, message);
}
}
boost::shared_ptr<Swift::AttentionPayload> attentionPayload = msg->getPayload<Swift::AttentionPayload>();
if (attentionPayload) {
pbnetwork::ConversationMessage m;
m.set_username(conv->getConversationManager()->getUser()->getJID().toBare());
m.set_buddyname(conv->getLegacyName());
m.set_message(msg->getBody());
std::string message;
m.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_ATTENTION);
Backend *c = (Backend *) conv->getConversationManager()->getUser()->getData();
send(c->connection, message);
return;
}
std::string xhtml;
boost::shared_ptr<Swift::XHTMLIMPayload> xhtmlPayload = msg->getPayload<Swift::XHTMLIMPayload>();
if (xhtmlPayload) {
xhtml = xhtmlPayload->getBody();
}
// Send normal message
if (!msg->getBody().empty() || !xhtml.empty()) {
pbnetwork::ConversationMessage m;
m.set_username(conv->getConversationManager()->getUser()->getJID().toBare());
m.set_buddyname(conv->getLegacyName());
m.set_message(msg->getBody());
m.set_xhtml(xhtml);
std::string message;
m.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_CONV_MESSAGE);
Backend *c = (Backend *) conv->getConversationManager()->getUser()->getData();
if (!c) {
return;
}
send(c->connection, message);
}
}
void NetworkPluginServer::handleBuddyRemoved(Buddy *b) {
User *user = b->getRosterManager()->getUser();
pbnetwork::Buddy buddy;
buddy.set_username(user->getJID().toBare());
buddy.set_buddyname(b->getName());
buddy.set_alias(b->getAlias());
buddy.set_groups(b->getGroups().size() == 0 ? "" : b->getGroups()[0]);
buddy.set_status(pbnetwork::STATUS_NONE);
std::string message;
buddy.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_BUDDY_REMOVED);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleBuddyUpdated(Buddy *b, const Swift::RosterItemPayload &item) {
User *user = b->getRosterManager()->getUser();
dynamic_cast<LocalBuddy *>(b)->setAlias(item.getName());
dynamic_cast<LocalBuddy *>(b)->setGroups(item.getGroups());
user->getRosterManager()->storeBuddy(b);
pbnetwork::Buddy buddy;
buddy.set_username(user->getJID().toBare());
buddy.set_buddyname(b->getName());
buddy.set_alias(b->getAlias());
buddy.set_groups(b->getGroups().size() == 0 ? "" : b->getGroups()[0]);
buddy.set_status(pbnetwork::STATUS_NONE);
std::string message;
buddy.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleBuddyAdded(Buddy *buddy, const Swift::RosterItemPayload &item) {
handleBuddyUpdated(buddy, item);
}
void NetworkPluginServer::handleBlockToggled(Buddy *b) {
User *user = b->getRosterManager()->getUser();
pbnetwork::Buddy buddy;
buddy.set_username(user->getJID().toBare());
buddy.set_buddyname(b->getName());
buddy.set_alias(b->getAlias());
buddy.set_groups(b->getGroups().size() == 0 ? "" : b->getGroups()[0]);
buddy.set_status(pbnetwork::STATUS_NONE);
buddy.set_blocked(!b->isBlocked());
std::string message;
buddy.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_BUDDY_CHANGED);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleVCardUpdated(User *user, boost::shared_ptr<Swift::VCard> v) {
pbnetwork::VCard vcard;
vcard.set_username(user->getJID().toBare());
vcard.set_buddyname("");
vcard.set_id(0);
vcard.set_photo(&v->getPhoto()[0], v->getPhoto().size());
vcard.set_nickname(v->getNickname());
std::string message;
vcard.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_VCARD);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleVCardRequired(User *user, const std::string &name, unsigned int id) {
pbnetwork::VCard vcard;
vcard.set_username(user->getJID().toBare());
vcard.set_buddyname(name);
vcard.set_id(id);
std::string message;
vcard.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_VCARD);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleFTAccepted(User *user, const std::string &buddyName, const std::string &fileName, unsigned long size, unsigned long ftID) {
pbnetwork::File f;
f.set_username(user->getJID().toBare());
f.set_buddyname(buddyName);
f.set_filename(fileName);
f.set_size(size);
f.set_ftid(ftID);
std::string message;
f.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_FT_START);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleFTRejected(User *user, const std::string &buddyName, const std::string &fileName, unsigned long size) {
pbnetwork::File f;
f.set_username(user->getJID().toBare());
f.set_buddyname(buddyName);
f.set_filename(fileName);
f.set_size(size);
f.set_ftid(0);
std::string message;
f.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_FT_FINISH);
Backend *c = (Backend *) user->getData();
if (!c) {
return;
}
send(c->connection, message);
}
void NetworkPluginServer::handleFTStateChanged(Swift::FileTransfer::State state, const std::string &userName, const std::string &buddyName, const std::string &fileName, unsigned long size, unsigned long id) {
User *user = m_userManager->getUser(userName);
if (!user) {
// TODO: reject and remove filetransfer
}
if (state.state == Swift::FileTransfer::State::Transferring) {
handleFTAccepted(user, buddyName, fileName, size, id);
}
else if (state.state == Swift::FileTransfer::State::Canceled) {
handleFTRejected(user, buddyName, fileName, size);
}
}
void NetworkPluginServer::sendPing(Backend *c) {
std::string message;
pbnetwork::WrapperMessage wrap;
wrap.set_type(pbnetwork::WrapperMessage_Type_TYPE_PING);
wrap.SerializeToString(&message);
if (c->connection) {
LOG4CXX_INFO(logger, "PING to " << c);
send(c->connection, message);
c->pongReceived = false;
}
// LOG4CXX_INFO(logger, "PING to " << c);
}
NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun) {
NetworkPluginServer::Backend *c = NULL;
// Check all backends and find free one
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
if ((*it)->acceptUsers == acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection && (*it)->longRun == longRun) {
c = *it;
// if we're not reusing all backends and backend is full, stop accepting new users on this backend
if (!CONFIG_BOOL(m_config, "service.reuse_old_backends")) {
if (c->users.size() + 1 >= CONFIG_INT(m_config, "service.users_per_backend")) {
c->acceptUsers = false;
}
}
break;
}
}
// there's no free backend, so spawn one.
if (c == NULL) {
m_isNextLongRun = longRun;
exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());
}
return c;
}
}