Make the backend usable only after it responds to initial PING

This commit is contained in:
Jan Kaluza 2012-07-18 13:54:17 +02:00
parent 981513b2a5
commit fd54120de7
4 changed files with 70 additions and 30 deletions

View file

@ -84,6 +84,7 @@ class NetworkPluginServer {
private:
void handleNewClientConnection(boost::shared_ptr<Swift::Connection> c);
void handleSessionFinished(Backend *c);
void handlePongReceived(Backend *c);
void handleDataRead(Backend *c, boost::shared_ptr<Swift::SafeByteArray> data);
void handleConnectedPayload(const std::string &payload);

View file

@ -207,6 +207,16 @@ int main(int argc, char **argv)
std::cerr << "Can't create service.working_dir directory " << CONFIG_STRING(&config, "service.working_dir") << ".\n";
return 1;
}
// create directories
try {
boost::filesystem::create_directories(
boost::filesystem::path(CONFIG_STRING(&config, "service.portfile")).parent_path().string()
);
}
catch (...) {
std::cerr << "Can't create service.portfile directory " << CONFIG_STRING(&config, "service.portfile") << ".\n";
return 1;
}
#ifndef WIN32
if (!CONFIG_STRING(&config, "service.group").empty() ||!CONFIG_STRING(&config, "service.user").empty() ) {
@ -223,6 +233,20 @@ int main(int argc, char **argv)
chown(CONFIG_STRING(&config, "service.working_dir").c_str(), pw->pw_uid, gr->gr_gid);
}
char backendport[20];
FILE* port_file_f;
port_file_f = fopen(CONFIG_STRING(&config, "service.portfile").c_str(), "w+");
if (port_file_f == NULL) {
std::cerr << "Cannot create port_file file " << CONFIG_STRING(&config, "service.portfile").c_str() << ". Exiting\n";
exit(1);
}
sprintf(backendport,"%s\n",CONFIG_STRING(&config, "service.backend_port").c_str());
if (fwrite(backendport,1,strlen(backendport),port_file_f) < strlen(backendport)) {
std::cerr << "Cannot write to port file " << CONFIG_STRING(&config, "service.portfile") << ". Exiting\n";
exit(1);
}
fclose(port_file_f);
if (!no_daemon) {
// daemonize
daemonize(CONFIG_STRING(&config, "service.working_dir").c_str(), CONFIG_STRING(&config, "service.pidfile").c_str());

View file

@ -75,6 +75,7 @@ bool Config::load(std::istream &ifs, boost::program_options::options_description
("service.backend", value<std::string>()->default_value("libpurple_backend"), "Backend")
("service.protocol", value<std::string>()->default_value(""), "Protocol")
("service.pidfile", value<std::string>()->default_value("/var/run/spectrum2/$jid.pid"), "Full path to pid file")
("service.portfile", value<std::string>()->default_value("/var/run/spectrum2/$jid.port"), "File to store backend_port to. It's used by spectrum2_manager.")
("service.working_dir", value<std::string>()->default_value("/var/lib/spectrum2/$jid"), "Working dir")
("service.allowed_servers", value<std::vector<std::string> >()->multitoken(), "Only users from these servers can connect")
("service.server_mode", value<bool>()->default_value(false), "True if Spectrum should behave as server")

View file

@ -318,18 +318,14 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Con
client->res = 0;
client->init_res = 0;
client->shared = 0;
client->willDie = 0;
// Until we receive first PONG from backend, backend is in willDie state.
client->willDie = true;
// Backend does not accept new clients automatically if it's long-running
client->acceptUsers = !m_isNextLongRun;
client->longRun = m_isNextLongRun;
LOG4CXX_INFO(logger, "New" + (client->longRun ? std::string(" long-running") : "") + " backend " << client << " connected. Current backend count=" << (m_clients.size() + 1));
if (m_clients.size() == 0) {
// first backend connected, start the server, we're ready.
m_component->start();
}
m_clients.push_front(client);
c->onDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, client));
@ -340,28 +336,6 @@ void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Con
// in first ::pingTimeout call, because it can be called right after this function
// and backend wouldn't have any time to response to ping.
client->pongReceived = -1;
// some users are in queue waiting for this backend
while(!m_waitingUsers.empty()) {
// There's no new backend, so stop associating users and wait for new backend,
// which has been already spawned in getFreeClient() call.
if (getFreeClient() == NULL)
break;
User *u = m_waitingUsers.front();
m_waitingUsers.pop_front();
LOG4CXX_INFO(logger, "Associating " << u->getJID().toString() << " with this backend");
// associate backend with user
handleUserCreated(u);
// connect user if it's ready
if (u->isReadyToConnect()) {
handleUserReadyToConnect(u);
}
}
}
void NetworkPluginServer::handleSessionFinished(Backend *c) {
@ -753,6 +727,42 @@ void NetworkPluginServer::handleFTDataNeeded(Backend *b, unsigned long ftid) {
send(b->connection, message);
}
void NetworkPluginServer::handlePongReceived(Backend *c) {
// This could be first PONG from the backend
if (c->pongReceived == -1) {
// Backend is fully ready to handle requests
c->willDie = false;
if (m_clients.size() == 1) {
// first backend connected, start the server, we're ready.
m_component->start();
}
// some users are in queue waiting for this backend
while(!m_waitingUsers.empty()) {
// There's no new backend, so stop associating users and wait for new backend,
// which has been already spawned in getFreeClient() call.
if (getFreeClient() == NULL)
break;
User *u = m_waitingUsers.front();
m_waitingUsers.pop_front();
LOG4CXX_INFO(logger, "Associating " << u->getJID().toString() << " with this backend");
// associate backend with user
handleUserCreated(u);
// connect user if it's ready
if (u->isReadyToConnect()) {
handleUserReadyToConnect(u);
}
}
}
c->pongReceived = true;
}
void NetworkPluginServer::handleDataRead(Backend *c, boost::shared_ptr<Swift::SafeByteArray> data) {
// Append data to buffer
c->data.insert(c->data.end(), data->begin(), data->end());
@ -803,7 +813,7 @@ void NetworkPluginServer::handleDataRead(Backend *c, boost::shared_ptr<Swift::Sa
handleConvMessagePayload(wrapper.payload(), true);
break;
case pbnetwork::WrapperMessage_Type_TYPE_PONG:
c->pongReceived = true;
handlePongReceived(c);
break;
case pbnetwork::WrapperMessage_Type_TYPE_PARTICIPANT_CHANGED:
handleParticipantChangedPayload(wrapper.payload());
@ -897,7 +907,11 @@ void NetworkPluginServer::pingTimeout() {
// pong has been received OR backend just connected and did not have time to answer the ping
// request.
if ((*it)->pongReceived || (*it)->pongReceived == -1) {
sendPing((*it));
// Don't send another ping if pongReceived == -1, because we've already sent one
// when registering backend.
if ((*it)->pongReceived) {
sendPing((*it));
}
}
else {
LOG4CXX_INFO(logger, "Disconnecting backend " << (*it) << " (ID=" << (*it)->id << "). PING response not received.");