comments in networkpluginserver.cpp

This commit is contained in:
Jan Kaluza 2011-08-08 17:07:38 +02:00
parent d94de26489
commit 2781d97a66

View file

@ -56,6 +56,7 @@ class NetworkConversation : public Conversation {
NetworkConversation(ConversationManager *conversationManager, const std::string &legacyName, bool muc = false) : Conversation(conversationManager, legacyName, muc) {
}
// Called when there's new message to legacy network from XMPP network
void sendMessage(boost::shared_ptr<Swift::Message> &message) {
onMessageToSend(this, message);
}
@ -68,13 +69,15 @@ class NetworkFactory : public Factory {
NetworkFactory(NetworkPluginServer *nps) {
m_nps = nps;
}
// Creates new conversation (NetworkConversation in this case)
Conversation *createConversation(ConversationManager *conversationManager, const std::string &legacyName) {
NetworkConversation *nc = new NetworkConversation(conversationManager, legacyName);
nc->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, m_nps, _1, _2));
return nc;
}
// Creates new LocalBuddy
Buddy *createBuddy(RosterManager *rosterManager, const BuddyInfo &buddyInfo) {
LocalBuddy *buddy = new LocalBuddy(rosterManager, buddyInfo.id);
buddy->setAlias(buddyInfo.alias);
@ -86,26 +89,34 @@ class NetworkFactory : public Factory {
buddy->setIconHash(buddyInfo.settings.find("icon_hash")->second.s);
return buddy;
}
private:
NetworkPluginServer *m_nps;
};
// Wraps google protobuf payload into WrapperMessage and serialize it to string
#define WRAP(MESSAGE, TYPE) pbnetwork::WrapperMessage wrap; \
wrap.set_type(TYPE); \
wrap.set_payload(MESSAGE); \
wrap.SerializeToString(&MESSAGE);
// Executes new backend
static pid_t exec_(std::string path, const char *host, const char *port, const char *config) {
// BACKEND_ID is replaced with unique ID. The ID is increasing for every backend.
boost::replace_all(path, "BACKEND_ID", boost::lexical_cast<std::string>(backend_id++));
// Add host and port.
path += std::string(" --host ") + host + " --port " + port + " " + config;
LOG4CXX_INFO(logger, "Starting new backend " << path);
// Create array of char * from string using -lpopt library
char *p = (char *) malloc(path.size() + 1);
strcpy(p, path.c_str());
int argc;
char **argv;
poptParseArgvString(p, &argc, (const char ***) &argv);
// char *argv[] = {(char*)script_name, '\0'};
// fork and exec
pid_t pid = fork();
if ( pid == 0 ) {
// child process
@ -121,6 +132,8 @@ static pid_t exec_(std::string path, const char *host, const char *port, const c
static void SigCatcher(int n) {
pid_t result;
int status;
// Read exit code from all children to not have zombies arround
// WARNING: Do not put LOG4CXX_ here, because it can lead to deadlock
while ((result = waitpid(0, &status, WNOHANG)) > 0) {
if (result != 0) {
if (WIFEXITED(status)) {
@ -137,9 +150,13 @@ static void SigCatcher(int n) {
static void handleBuddyPayload(LocalBuddy *buddy, const pbnetwork::Buddy &payload) {
buddy->setName(payload.buddyname());
// Set alias only if it's not empty. Backends are allowed to send empty alias if it has
// not changed.
if (!payload.alias().empty()) {
buddy->setAlias(payload.alias());
}
// Change groups if it's not empty. The same as above...
if (!payload.groups().empty()) {
std::vector<std::string> groups;
groups.push_back(payload.groups());
@ -205,12 +222,14 @@ NetworkPluginServer::~NetworkPluginServer() {
}
void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Connection> c) {
// Create new Backend instance
Backend *client = new Backend;
client->pongReceived = -1;
client->connection = c;
client->res = 0;
client->init_res = 0;
client->shared = 0;
// Backend does not accept new clients automatically if it's long-running
client->acceptUsers = !m_isNextLongRun;
client->longRun = m_isNextLongRun;
@ -226,6 +245,10 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Con
c->onDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, client));
c->onDataRead.connect(boost::bind(&NetworkPluginServer::handleDataRead, this, client, _1));
sendPing(client);
// sendPing sets pongReceived to 0, but we want to have it -1 to ignore this backend
// in first ::pingTimeout call, because it can be called right after this function
// and backend wouldn't have any time to response to ping.
client->pongReceived = -1;
// some users are in queue waiting for this backend
@ -253,6 +276,9 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Con
void NetworkPluginServer::handleSessionFinished(Backend *c) {
LOG4CXX_INFO(logger, "Backend " << c << " disconnected. Current backend count=" << (m_clients.size() - 1));
// If there are users associated with this backend, it must have crashed, so print error output
// and disconnect users
for (std::list<User *>::const_iterator it = c->users.begin(); it != c->users.end(); it++) {
LOG4CXX_ERROR(logger, "Backend " << c << " disconnected (probably crashed) with active user " << (*it)->getJID().toString());
(*it)->setData(NULL);
@ -264,14 +290,6 @@ void NetworkPluginServer::handleSessionFinished(Backend *c) {
m_clients.remove(c);
delete c;
// Execute new session only if there's no free one after this crash/disconnection
// for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
// if ((*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend")) {
// return;
// }
// }
// exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());
}
void NetworkPluginServer::handleConnectedPayload(const std::string &data) {
@ -333,10 +351,12 @@ void NetworkPluginServer::handleAuthorizationPayload(const std::string &data) {
if (!user)
return;
// Create subscribe presence and forward it to XMPP side
Swift::Presence::ref response = Swift::Presence::create();
response->setTo(user->getJID());
std::string name = payload.buddyname();
// TODO:
// name = Swift::JID::getEscapedNode(name)
if (name.find_last_of("@") != std::string::npos) {
@ -359,11 +379,14 @@ void NetworkPluginServer::handleChatStatePayload(const std::string &data, Swift:
if (!user)
return;
// We're not creating new Conversation just because of chatstates.
// Some networks/clients spams with chatstates a lot and it leads to bigger memory usage.
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
if (!conv) {
return;
}
// Forward chatstate
boost::shared_ptr<Swift::Message> msg(new Swift::Message());
msg->addPayload(boost::make_shared<Swift::ChatState>(type));
@ -410,18 +433,6 @@ void NetworkPluginServer::handleParticipantChangedPayload(const std::string &dat
}
conv->handleParticipantChanged(payload.nickname(), payload.flag(), payload.status(), payload.statusmessage(), payload.newname());
// LocalBuddy *buddy = (LocalBuddy *) user->getRosterManager()->getBuddy(payload.buddyname());
// if (buddy) {
// handleBuddyPayload(buddy, payload);
// buddy->buddyChanged();
// }
// else {
// buddy = new LocalBuddy(user->getRosterManager(), -1);
// handleBuddyPayload(buddy, payload);
// user->getRosterManager()->setBuddy(buddy);
// }
// std::cout << payload.nickname() << "\n";
}
void NetworkPluginServer::handleRoomChangedPayload(const std::string &data) {
@ -444,18 +455,20 @@ void NetworkPluginServer::handleRoomChangedPayload(const std::string &data) {
void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool subject) {
pbnetwork::ConversationMessage payload;
// std::cout << "payload...\n";
if (payload.ParseFromString(data) == false) {
// TODO: ERROR
return;
}
// std::cout << "payload 2...\n";
User *user = m_userManager->getUser(payload.username());
if (!user)
return;
// Message from legacy network triggers network acticity
user->updateLastActivity();
// Set proper body.
boost::shared_ptr<Swift::Message> msg(new Swift::Message());
if (subject) {
msg->setSubject(payload.message());
@ -464,16 +477,19 @@ void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool
msg->setBody(payload.message());
}
// Add xhtml-im payload.
if (!payload.xhtml().empty()) {
msg->addPayload(boost::make_shared<Swift::XHTMLIMPayload>(payload.xhtml()));
}
// Create new Conversation if it does not exist
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
if (!conv) {
conv = new NetworkConversation(user->getConversationManager(), payload.buddyname());
conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
}
// Forward it
conv->handleMessage(msg, payload.nickname());
}
@ -492,6 +508,7 @@ void NetworkPluginServer::handleAttentionPayload(const std::string &data) {
msg->setBody(payload.message());
msg->addPayload(boost::make_shared<Swift::AttentionPayload>());
// Attentions trigger new Conversation creation
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(payload.buddyname());
if (!conv) {
conv = new NetworkConversation(user->getConversationManager(), payload.buddyname());
@ -507,19 +524,28 @@ void NetworkPluginServer::handleStatsPayload(Backend *c, const std::string &data
// TODO: ERROR
return;
}
c->res = payload.res();
c->init_res = payload.init_res();
c->shared = payload.shared();
}
void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray &data) {
// Append data to buffer
c->data.insert(c->data.end(), data.begin(), data.end());
// Parse data while there are some
while (c->data.size() != 0) {
// expected_size of wrapper message
unsigned int expected_size;
// if data is >= 4, we have whole header and we can
// read expected_size.
if (c->data.size() >= 4) {
expected_size = *((unsigned int*) &c->data[0]);
expected_size = ntohl(expected_size);
// If we don't have whole wrapper message, wait for next
// handleDataRead call.
if (c->data.size() - 4 < expected_size)
return;
}
@ -527,6 +553,7 @@ void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray
return;
}
// Parse wrapper message and erase it from buffer.
pbnetwork::WrapperMessage wrapper;
if (wrapper.ParseFromArray(&c->data[4], expected_size) == false) {
std::cout << "PARSING ERROR " << expected_size << "\n";
@ -535,6 +562,7 @@ void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray
}
c->data.erase(c->data.begin(), c->data.begin() + 4 + expected_size);
// Handle payload in wrapper message
switch(wrapper.type()) {
case pbnetwork::WrapperMessage_Type_TYPE_CONNECTED:
handleConnectedPayload(wrapper.payload());
@ -588,21 +616,29 @@ void NetworkPluginServer::handleDataRead(Backend *c, const Swift::SafeByteArray
}
void NetworkPluginServer::send(boost::shared_ptr<Swift::Connection> &c, const std::string &data) {
// generate header - size of wrapper message
char header[4];
*((int*)(header)) = htonl(data.size());
// send header together with wrapper message
c->write(Swift::createSafeByteArray(std::string(header, 4) + data));
}
void NetworkPluginServer::pingTimeout() {
// TODO: move to separate timer, those 2 loops could be expensive
// Some users are connected for weeks and they are blocking backend to be destroyed and its memory
// to be freed. We are finding users who are inactive for more than "idle_reconnect_time" seconds and
// reconnect them to long-running backend, where they can idle hapilly till the end of ages.
time_t now = time(NULL);
std::vector<User *> usersToMove;
unsigned long diff = CONFIG_INT(m_config, "service.idle_reconnect_time");
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
// Users from long-running backends can't be moved
if ((*it)->longRun) {
continue;
}
// Find users which are inactive for more than 'diff'
BOOST_FOREACH(User *u, (*it)->users) {
if (now - u->getLastActivity() > diff) {
usersToMove.push_back(u);
@ -610,6 +646,7 @@ void NetworkPluginServer::pingTimeout() {
}
}
// Move inactive users to long-running backend.
BOOST_FOREACH(User *u, usersToMove) {
LOG4CXX_INFO(logger, "Moving user " << u->getJID().toString() << " to long-running backend");
if (!moveToLongRunBackend(u))
@ -619,6 +656,8 @@ void NetworkPluginServer::pingTimeout() {
// check ping responses
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
// pong has been received OR backend just connected and did not have time to answer the ping
// request.
if ((*it)->pongReceived || (*it)->pongReceived == -1) {
sendPing((*it));
}
@ -639,6 +678,8 @@ void NetworkPluginServer::pingTimeout() {
}
void NetworkPluginServer::collectBackend() {
// Stop accepting new users to backend with the biggest memory usage. This prevents backends
// which are leaking to eat whole memory by connectin new users to legacy network.
LOG4CXX_INFO(logger, "Collect backend called, finding backend which will be set to die");
unsigned long max = 0;
Backend *backend = NULL;
@ -695,13 +736,17 @@ bool NetworkPluginServer::moveToLongRunBackend(User *user) {
}
void NetworkPluginServer::handleUserCreated(User *user) {
// Get free backend to handle this user or spawn new one if there's no free one.
Backend *c = getFreeClient();
// Add user to queue if there's no free backend to handle him so far.
if (!c) {
LOG4CXX_INFO(logger, "There is no backend to handle user " << user->getJID().toString() << ". Adding him to queue.");
m_waitingUsers.push_back(user);
return;
}
// Associate users with backend
user->setData(c);
c->users.push_back(user);
@ -1050,11 +1095,12 @@ void NetworkPluginServer::sendPing(Backend *c) {
NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUsers, bool longRun) {
NetworkPluginServer::Backend *c = NULL;
// bool spawnNew = false;
// Check all backends and find free one
for (std::list<Backend *>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
// This backend is free.
if ((*it)->acceptUsers == acceptUsers && (*it)->users.size() < CONFIG_INT(m_config, "service.users_per_backend") && (*it)->connection && (*it)->longRun == longRun) {
c = *it;
// if we're not reusing all backends and backend is full, stop accepting new users on this backend
if (!CONFIG_BOOL(m_config, "service.reuse_old_backends")) {
if (c->users.size() + 1 >= CONFIG_INT(m_config, "service.users_per_backend")) {
c->acceptUsers = false;
@ -1064,6 +1110,7 @@ NetworkPluginServer::Backend *NetworkPluginServer::getFreeClient(bool acceptUser
}
}
// there's no free backend, so spawn one.
if (c == NULL) {
m_isNextLongRun = longRun;
exec_(CONFIG_STRING(m_config, "service.backend"), CONFIG_STRING(m_config, "service.backend_host").c_str(), CONFIG_STRING(m_config, "service.backend_port").c_str(), m_config->getConfigFile().c_str());