#include "methods.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "signal.h" #include "sys/wait.h" #define WRAP(MESSAGE, TYPE) pbnetwork::WrapperMessage wrap; \ wrap.set_type(TYPE); \ wrap.set_payload(MESSAGE); \ wrap.SerializeToString(&MESSAGE); using namespace Transport; using namespace boost::filesystem; using namespace boost; std::string _data; static std::string response; std::string get_response() { return response; } std::string searchForBinary(const std::string &binary) { std::vector path_list; char * env_path = getenv("PATH"); if (env_path != NULL) { std::string buffer = ""; for (int s = 0; s < strlen(env_path); s++) { if (env_path[s] == ':') { path_list.insert(path_list.end(), std::string(buffer)); buffer = ""; } else { buffer += env_path[s]; } } if (buffer != "") { path_list.insert(path_list.end(), std::string(buffer)); buffer = ""; } for (std::vector::iterator dit = path_list.begin(); dit < path_list.end(); dit++) { std::string bpath = *dit; bpath += "/"; bpath += binary; path p(bpath); if (exists(p) && !is_directory(p)) { return bpath; } } } return ""; } // Executes new backend unsigned long exec_(std::string path, std::string config, std::string jid) { // fork and exec pid_t pid = fork(); if ( pid == 0 ) { // child process if (jid.empty()) { exit(execl(path.c_str(), path.c_str(), config.c_str(), NULL)); } else { exit(execl(path.c_str(), path.c_str(), "-j", jid.c_str(), config.c_str(), NULL)); } } else if ( pid < 0 ) { // fork failed } else { waitpid(pid, 0, 0); } return (unsigned long) pid; } int getPort(const std::string &portfile) { path p(portfile); if (!exists(p) || is_directory(p)) { return 0; } std::ifstream f(p.string().c_str(), std::ios_base::in); std::string port; f >> port; if (port.empty()) return 0; return boost::lexical_cast(port); } int isRunning(const std::string &pidfile) { path p(pidfile); if (!exists(p) || is_directory(p)) { return 0; } std::ifstream f(p.string().c_str(), std::ios_base::in); std::string pid; f >> pid; if (pid.empty()) return 0; if (kill(boost::lexical_cast(pid), 0) != 0) return 0; return boost::lexical_cast(pid); } void start_instances(ManagerConfig *config, const std::string &_jid) { path p(CONFIG_STRING(config, "service.config_directory")); try { if (!exists(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(6); } if (!is_directory(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(7); } std::string spectrum2_binary = searchForBinary("spectrum2"); if (spectrum2_binary.empty()) { std::cerr << "spectrum2 binary not found in PATH\n"; exit(8); } directory_iterator end_itr; for (directory_iterator itr(p); itr != end_itr; ++itr) { if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { Config cfg; if (cfg.load(itr->path().string()) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; continue; } std::vector vhosts; if (CONFIG_HAS_KEY(&cfg, "vhosts.vhost")) vhosts = CONFIG_VECTOR(&cfg, "vhosts.vhost"); vhosts.push_back(CONFIG_STRING(&cfg, "service.jid")); BOOST_FOREACH(std::string &vhost, vhosts) { Config vhostCfg; if (vhostCfg.load(itr->path().string(), vhost) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; continue; } if (!_jid.empty() && _jid != vhost) { continue; } int pid = isRunning(CONFIG_STRING(&vhostCfg, "service.pidfile")); if (pid == 0) { std::cout << "Starting " << itr->path() << ": OK\n"; exec_(spectrum2_binary, itr->path().string(), vhost); } else { std::cout << "Starting " << itr->path() << ": Already started (PID=" << pid << ")\n"; } } } } } catch (const filesystem_error& ex) { std::cerr << "boost filesystem error\n"; exit(5); } } void stop_instances(ManagerConfig *config, const std::string &_jid) { path p(CONFIG_STRING(config, "service.config_directory")); try { if (!exists(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(6); } if (!is_directory(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(7); } directory_iterator end_itr; for (directory_iterator itr(p); itr != end_itr; ++itr) { if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { Config cfg; if (cfg.load(itr->path().string()) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; } std::vector vhosts; if (CONFIG_HAS_KEY(&cfg, "vhosts.vhost")) vhosts = CONFIG_VECTOR(&cfg, "vhosts.vhost"); vhosts.push_back(CONFIG_STRING(&cfg, "service.jid")); BOOST_FOREACH(std::string &vhost, vhosts) { Config vhostCfg; if (vhostCfg.load(itr->path().string(), vhost) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; continue; } if (!_jid.empty() && _jid != vhost) { continue; } int pid = isRunning(CONFIG_STRING(&vhostCfg, "service.pidfile")); if (pid) { std::cout << "Stopping " << itr->path() << ": "; kill(pid, SIGTERM); sleep(1); int count = 20; while (kill(pid, 0) == 0 && count != 0) { std::cout << "."; sleep(1); count--; } if (count == 0) { std::cout << " ERROR (timeout)\n"; } else { std::cout << " OK\n"; } } else { std::cout << "Stopping " << itr->path() << ": Not running\n"; } } } } } catch (const filesystem_error& ex) { std::cerr << "boost filesystem error\n"; exit(5); } } int show_status(ManagerConfig *config) { int ret = 0; path p(CONFIG_STRING(config, "service.config_directory")); try { if (!exists(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(6); } if (!is_directory(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(7); } directory_iterator end_itr; for (directory_iterator itr(p); itr != end_itr; ++itr) { if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { Config cfg; if (cfg.load(itr->path().string()) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; } std::vector vhosts; if (CONFIG_HAS_KEY(&cfg, "vhosts.vhost")) vhosts = CONFIG_VECTOR(&cfg, "vhosts.vhost"); vhosts.push_back(CONFIG_STRING(&cfg, "service.jid")); BOOST_FOREACH(std::string &vhost, vhosts) { Config vhostCfg; if (vhostCfg.load(itr->path().string(), vhost) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; continue; } int pid = isRunning(CONFIG_STRING(&vhostCfg, "service.pidfile")); if (pid) { std::cout << itr->path() << ": " << vhost << " Running\n"; } else { ret = 3; std::cout << itr->path() << ": " << vhost << " Stopped\n"; } } } } } catch (const filesystem_error& ex) { std::cerr << "boost filesystem error\n"; exit(5); } return ret; } static void handleDataRead(boost::shared_ptr m_conn, boost::shared_ptr data) { _data += std::string(data->begin(), data->end()); // Parse data while there are some while (_data.size() != 0) { // expected_size of wrapper message unsigned int expected_size; // if data is >= 4, we have whole header and we can // read expected_size. if (_data.size() >= 4) { expected_size = *((unsigned int*) &_data[0]); expected_size = ntohl(expected_size); // If we don't have whole wrapper message, wait for next // handleDataRead call. if (_data.size() - 4 < expected_size) return; } else { return; } // Parse wrapper message and erase it from buffer. pbnetwork::WrapperMessage wrapper; if (wrapper.ParseFromArray(&_data[4], expected_size) == false) { std::cout << "PARSING ERROR " << expected_size << "\n"; _data.erase(_data.begin(), _data.begin() + 4 + expected_size); continue; } _data.erase(_data.begin(), _data.begin() + 4 + expected_size); if (wrapper.type() == pbnetwork::WrapperMessage_Type_TYPE_QUERY) { pbnetwork::BackendConfig payload; if (payload.ParseFromString(wrapper.payload()) == false) { std::cout << "PARSING ERROR\n"; // TODO: ERROR continue; } m_conn->onDataRead.disconnect(boost::bind(&handleDataRead, m_conn, _1)); response = payload.config(); std::cout << payload.config() << "\n"; // exit(0); } } } static void handleConnected(boost::shared_ptr m_conn, const std::string &msg, bool error) { m_conn->onConnectFinished.disconnect(boost::bind(&handleConnected, m_conn, msg, _1)); if (error) { std::cerr << "Can't connect the server\n"; response = "Can't connect the server\n"; m_conn->onDataRead.disconnect(boost::bind(&handleDataRead, m_conn, _1)); // exit(50); } else { pbnetwork::BackendConfig m; m.set_config(msg); std::string message; m.SerializeToString(&message); WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_QUERY); uint32_t size = htonl(message.size()); char *header = (char *) &size; // send header together with wrapper message m_conn->write(Swift::createSafeByteArray(std::string(header, 4) + message)); } } void ask_local_server(ManagerConfig *config, Swift::BoostNetworkFactories &networkFactories, const std::string &jid, const std::string &message) { path p(CONFIG_STRING(config, "service.config_directory")); try { if (!exists(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(6); } if (!is_directory(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(7); } bool found = false; directory_iterator end_itr; for (directory_iterator itr(p); itr != end_itr; ++itr) { if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { Config cfg; if (cfg.load(itr->path().string()) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; continue; } if (CONFIG_STRING(&cfg, "service.jid") != jid) { continue; } found = true; boost::shared_ptr m_conn; m_conn = networkFactories.getConnectionFactory()->createConnection(); m_conn->onDataRead.connect(boost::bind(&handleDataRead, m_conn, _1)); m_conn->onConnectFinished.connect(boost::bind(&handleConnected, m_conn, message, _1)); m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(CONFIG_STRING(&cfg, "service.backend_host")), getPort(CONFIG_STRING(&cfg, "service.portfile")))); // finished++; // Swift::Client *client = new Swift::Client(CONFIG_VECTOR(&cfg, "service.admin_jid")[0], CONFIG_STRING(&cfg, "service.admin_password"), &networkFactories); // client->setAlwaysTrustCertificates(); // client->onConnected.connect(boost::bind(&handleConnected, client, CONFIG_STRING(&cfg, "service.jid"))); // client->onDisconnected.connect(bind(&handleDisconnected, client, _1, CONFIG_STRING(&cfg, "service.jid"))); // client->onMessageReceived.connect(bind(&handleMessageReceived, client, _1, CONFIG_STRING(&cfg, "service.jid"))); // Swift::ClientOptions opt; // opt.allowPLAINWithoutTLS = true; // client->connect(opt); } } if (!found) { std::cerr << "Config file for Spectrum instance with this JID was not found\n"; exit(20); } } catch (const filesystem_error& ex) { std::cerr << "boost filesystem error\n"; exit(5); } } std::vector show_list(ManagerConfig *config, bool show) { path p(CONFIG_STRING(config, "service.config_directory")); std::vector list; try { if (!exists(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(6); } if (!is_directory(p)) { std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; exit(7); } bool found = false; directory_iterator end_itr; for (directory_iterator itr(p); itr != end_itr; ++itr) { if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { Config cfg; if (cfg.load(itr->path().string()) == false) { std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; continue; } if (show) { std::cout << CONFIG_STRING(&cfg, "service.jid") << "\n"; } list.push_back(CONFIG_STRING(&cfg, "service.jid")); } } } catch (const filesystem_error& ex) { std::cerr << "boost filesystem error\n"; } return list; }