diff --git a/include/transport/admininterface.h b/include/transport/admininterface.h index 324d4dd6..64082dd3 100644 --- a/include/transport/admininterface.h +++ b/include/transport/admininterface.h @@ -38,6 +38,8 @@ class AdminInterface { ~AdminInterface(); + void handleQuery(Swift::Message::ref message); + private: void handleMessageReceived(Swift::Message::ref message); diff --git a/include/transport/networkpluginserver.h b/include/transport/networkpluginserver.h index 2930ba24..f8a601be 100644 --- a/include/transport/networkpluginserver.h +++ b/include/transport/networkpluginserver.h @@ -42,6 +42,7 @@ class VCardResponder; class RosterResponder; class BlockResponder; class DummyReadBytestream; +class AdminInterface; class NetworkPluginServer { public: @@ -63,6 +64,10 @@ class NetworkPluginServer { virtual ~NetworkPluginServer(); + void setAdminInterface(AdminInterface *adminInterface) { + m_adminInterface = adminInterface; + } + int getBackendCount() { return m_clients.size(); } @@ -101,7 +106,8 @@ class NetworkPluginServer { void handleStatsPayload(Backend *c, const std::string &payload); void handleFTStartPayload(const std::string &payload); void handleFTFinishPayload(const std::string &payload); - void handleFTDataPayload(Backend *b ,const std::string &payload); + void handleFTDataPayload(Backend *b, const std::string &payload); + void handleQueryPayload(Backend *b, const std::string &payload); void handleUserCreated(User *user); void handleRoomJoined(User *user, const Swift::JID &who, const std::string &room, const std::string &nickname, const std::string &password); @@ -145,6 +151,7 @@ class NetworkPluginServer { std::map m_filetransfers; FileTransferManager *m_ftManager; std::vector m_crashedBackends; + AdminInterface *m_adminInterface; }; } diff --git a/include/transport/protocol.proto b/include/transport/protocol.proto index 8dddaf7e..929c02b7 100644 --- a/include/transport/protocol.proto +++ b/include/transport/protocol.proto @@ -158,6 +158,7 @@ message WrapperMessage { TYPE_FT_CONTINUE = 28; TYPE_EXIT = 29; TYPE_BACKEND_CONFIG = 30; + TYPE_QUERY = 31; } required Type type = 1; optional bytes payload = 2; diff --git a/spectrum/src/main.cpp b/spectrum/src/main.cpp index c2505c27..8e1c1e1a 100644 --- a/spectrum/src/main.cpp +++ b/spectrum/src/main.cpp @@ -334,6 +334,8 @@ int main(int argc, char **argv) NetworkPluginServer plugin(&transport, &config, &userManager, &ftManager); AdminInterface adminInterface(&transport, &userManager, &plugin, storageBackend, userRegistration); + plugin.setAdminInterface(&adminInterface); + StatsResponder statsResponder(&transport, &userManager, &plugin, storageBackend); statsResponder.start(); diff --git a/spectrum_manager/src/CMakeLists.txt b/spectrum_manager/src/CMakeLists.txt index 5f93dd9c..ac9f30f3 100644 --- a/spectrum_manager/src/CMakeLists.txt +++ b/spectrum_manager/src/CMakeLists.txt @@ -1,9 +1,9 @@ cmake_minimum_required(VERSION 2.6) FILE(GLOB SRC *.cpp) -ADD_EXECUTABLE(spectrum2_manager ${SRC} ../../src/config.cpp ../../src/util.cpp) +ADD_EXECUTABLE(spectrum2_manager ${SRC} ../../src/config.cpp ../../src/util.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../../include/transport/protocol.pb.cc) -target_link_libraries(spectrum2_manager ${SWIFTEN_LIBRARY}) +target_link_libraries(spectrum2_manager ${SWIFTEN_LIBRARY} ${PROTOBUF_LIBRARIES}) INSTALL(TARGETS spectrum2_manager RUNTIME DESTINATION bin) diff --git a/spectrum_manager/src/main.cpp b/spectrum_manager/src/main.cpp index 0ab40980..2c485490 100644 --- a/spectrum_manager/src/main.cpp +++ b/spectrum_manager/src/main.cpp @@ -1,5 +1,6 @@ #include "managerconfig.h" #include "transport/config.h" +#include "transport/protocol.pb.h" #include "Swiften/Swiften.h" #include "Swiften/EventLoop/SimpleEventLoop.h" @@ -12,6 +13,11 @@ #include "signal.h" #include "sys/wait.h" +#define WRAP(MESSAGE, TYPE) pbnetwork::WrapperMessage wrap; \ + wrap.set_type(TYPE); \ + wrap.set_payload(MESSAGE); \ + wrap.SerializeToString(&MESSAGE); + using namespace Transport; @@ -19,33 +25,7 @@ using namespace boost::filesystem; using namespace boost; -static int finished; -static std::string *m; - -static void handleDisconnected(Swift::Client *client, const boost::optional &, const std::string &server) { - std::cout << "[ DISCONNECTED ] " << server << "\n"; - if (--finished == 0) { - exit(0); - } -} - -static void handleConnected(Swift::Client *client, const std::string &server) { - boost::shared_ptr message(new Swift::Message()); - message->setTo(server); - message->setFrom(client->getJID()); - message->setBody(*m); - - client->sendMessage(message); -} - -static void handleMessageReceived(Swift::Client *client, Swift::Message::ref message, const std::string &server) { - std::string body = message->getBody(); - boost::replace_all(body, "\n", "\n[ OK ] " + server + ": "); - std::cout << "[ OK ] " << server << ": " << body << "\n"; - if (--finished == 0) { - exit(0); - } -} +std::string _data; static std::string searchForBinary(const std::string &binary) { std::vector path_list; @@ -103,6 +83,22 @@ static unsigned long exec_(std::string path, std::string config, std::string jid return (unsigned long) pid; } +static int getPort(const std::string &portfile) { + path p(portfile); + if (!exists(p) || is_directory(p)) { + return 0; + } + + std::ifstream f(p.string().c_str(), std::ios_base::in); + std::string port; + f >> port; + + if (port.empty()) + return 0; + + return boost::lexical_cast(port); +} + static int isRunning(const std::string &pidfile) { path p(pidfile); if (!exists(p) || is_directory(p)) { @@ -288,7 +284,75 @@ static int show_status(ManagerConfig *config) { return ret; } -static void ask_local_servers(ManagerConfig *config, Swift::BoostNetworkFactories &networkFactories, const std::string &message) { +static void handleDataRead(boost::shared_ptr m_conn, boost::shared_ptr data) { + _data += std::string(data->begin(), data->end()); + + // Parse data while there are some + while (_data.size() != 0) { + // expected_size of wrapper message + unsigned int expected_size; + + // if data is >= 4, we have whole header and we can + // read expected_size. + if (_data.size() >= 4) { + expected_size = *((unsigned int*) &_data[0]); + expected_size = ntohl(expected_size); + // If we don't have whole wrapper message, wait for next + // handleDataRead call. + if (_data.size() - 4 < expected_size) + return; + } + else { + return; + } + + // Parse wrapper message and erase it from buffer. + pbnetwork::WrapperMessage wrapper; + if (wrapper.ParseFromArray(&_data[4], expected_size) == false) { + std::cout << "PARSING ERROR " << expected_size << "\n"; + _data.erase(_data.begin(), _data.begin() + 4 + expected_size); + continue; + } + _data.erase(_data.begin(), _data.begin() + 4 + expected_size); + + if (wrapper.type() == pbnetwork::WrapperMessage_Type_TYPE_QUERY) { + pbnetwork::BackendConfig payload; + if (payload.ParseFromString(wrapper.payload()) == false) { + std::cout << "PARSING ERROR\n"; + // TODO: ERROR + continue; + } + + std::cout << payload.config() << "\n"; + exit(0); + } + } +} + +static void handleConnected(boost::shared_ptr m_conn, const std::string &msg, bool error) { + if (error) { + std::cerr << "Can't connect the server\n"; + exit(50); + } + else { + pbnetwork::BackendConfig m; + m.set_config(msg); + + std::string message; + m.SerializeToString(&message); + + WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_QUERY); + + uint32_t size = htonl(message.size()); + char *header = (char *) &size; + + + // send header together with wrapper message + m_conn->write(Swift::createSafeByteArray(std::string(header, 4) + message)); + } +} + +static void ask_local_server(ManagerConfig *config, Swift::BoostNetworkFactories &networkFactories, const std::string &jid, const std::string &message) { path p(CONFIG_STRING(config, "service.config_directory")); try { @@ -302,6 +366,7 @@ static void ask_local_servers(ManagerConfig *config, Swift::BoostNetworkFactorie exit(7); } + bool found = false; directory_iterator end_itr; for (directory_iterator itr(p); itr != end_itr; ++itr) { if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { @@ -311,22 +376,34 @@ static void ask_local_servers(ManagerConfig *config, Swift::BoostNetworkFactorie continue; } - if (CONFIG_VECTOR(&cfg, "service.admin_jid").empty() || CONFIG_STRING(&cfg, "service.admin_password").empty()) { - std::cerr << itr->path().string() << ": service.admin_jid or service.admin_password empty. This server can't be queried over XMPP.\n"; + if (CONFIG_STRING(&cfg, "service.jid") != jid) { continue; } - finished++; - Swift::Client *client = new Swift::Client(CONFIG_VECTOR(&cfg, "service.admin_jid")[0], CONFIG_STRING(&cfg, "service.admin_password"), &networkFactories); - client->setAlwaysTrustCertificates(); - client->onConnected.connect(boost::bind(&handleConnected, client, CONFIG_STRING(&cfg, "service.jid"))); - client->onDisconnected.connect(bind(&handleDisconnected, client, _1, CONFIG_STRING(&cfg, "service.jid"))); - client->onMessageReceived.connect(bind(&handleMessageReceived, client, _1, CONFIG_STRING(&cfg, "service.jid"))); - Swift::ClientOptions opt; - opt.allowPLAINWithoutTLS = true; - client->connect(opt); + found = true; + + boost::shared_ptr m_conn; + m_conn = networkFactories.getConnectionFactory()->createConnection(); + m_conn->onDataRead.connect(boost::bind(&handleDataRead, m_conn, _1)); + m_conn->onConnectFinished.connect(boost::bind(&handleConnected, m_conn, message, _1)); + m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(CONFIG_STRING(&cfg, "service.backend_host")), getPort(CONFIG_STRING(&cfg, "service.portfile")))); + +// finished++; +// Swift::Client *client = new Swift::Client(CONFIG_VECTOR(&cfg, "service.admin_jid")[0], CONFIG_STRING(&cfg, "service.admin_password"), &networkFactories); +// client->setAlwaysTrustCertificates(); +// client->onConnected.connect(boost::bind(&handleConnected, client, CONFIG_STRING(&cfg, "service.jid"))); +// client->onDisconnected.connect(bind(&handleDisconnected, client, _1, CONFIG_STRING(&cfg, "service.jid"))); +// client->onMessageReceived.connect(bind(&handleMessageReceived, client, _1, CONFIG_STRING(&cfg, "service.jid"))); +// Swift::ClientOptions opt; +// opt.allowPLAINWithoutTLS = true; +// client->connect(opt); } } + + if (!found) { + std::cerr << "Config file for Spectrum instance with this JID was not found\n"; + exit(20) + } } catch (const filesystem_error& ex) { std::cerr << "boost filesystem error\n"; @@ -334,24 +411,71 @@ static void ask_local_servers(ManagerConfig *config, Swift::BoostNetworkFactorie } } +// static void ask_local_servers(ManagerConfig *config, Swift::BoostNetworkFactories &networkFactories, const std::string &message) { +// path p(CONFIG_STRING(config, "service.config_directory")); +// +// try { +// if (!exists(p)) { +// std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; +// exit(6); +// } +// +// if (!is_directory(p)) { +// std::cerr << "Config directory " << CONFIG_STRING(config, "service.config_directory") << " does not exist\n"; +// exit(7); +// } +// +// directory_iterator end_itr; +// for (directory_iterator itr(p); itr != end_itr; ++itr) { +// if (is_regular(itr->path()) && extension(itr->path()) == ".cfg") { +// Config cfg; +// if (cfg.load(itr->path().string()) == false) { +// std::cerr << "Can't load config file " << itr->path().string() << ". Skipping...\n"; +// continue; +// } +// +// if (CONFIG_VECTOR(&cfg, "service.admin_jid").empty() || CONFIG_STRING(&cfg, "service.admin_password").empty()) { +// std::cerr << itr->path().string() << ": service.admin_jid or service.admin_password empty. This server can't be queried over XMPP.\n"; +// continue; +// } +// +// finished++; +// Swift::Client *client = new Swift::Client(CONFIG_VECTOR(&cfg, "service.admin_jid")[0], CONFIG_STRING(&cfg, "service.admin_password"), &networkFactories); +// client->setAlwaysTrustCertificates(); +// client->onConnected.connect(boost::bind(&handleConnected, client, CONFIG_STRING(&cfg, "service.jid"))); +// client->onDisconnected.connect(bind(&handleDisconnected, client, _1, CONFIG_STRING(&cfg, "service.jid"))); +// client->onMessageReceived.connect(bind(&handleMessageReceived, client, _1, CONFIG_STRING(&cfg, "service.jid"))); +// Swift::ClientOptions opt; +// opt.allowPLAINWithoutTLS = true; +// client->connect(opt); +// } +// } +// } +// catch (const filesystem_error& ex) { +// std::cerr << "boost filesystem error\n"; +// exit(5); +// } +// } + int main(int argc, char **argv) { ManagerConfig config; std::string config_file; - std::string command; + std::vector command; boost::program_options::variables_map vm; - boost::program_options::options_description desc("Usage: spectrum [OPTIONS] \nCommands:\n" + boost::program_options::options_description desc("Usage: spectrum [OPTIONS] \n" + " spectrum [OPTIONS] \nCommands:\n" " start - start all local Spectrum2 instances\n" - " stop - stop all local Spectrum2 instances\n" + " stop - stop all local Spectrum2 instances\n" " status - status of local Spectrum2 instances\n" - " - send command to all local + remote Spectrum2 instances and print output\n" + " - send command to local Spectrum2 instance and print output\n" "Allowed options"); desc.add_options() ("help,h", "Show help output") ("config,c", boost::program_options::value(&config_file)->default_value("/etc/spectrum2/spectrum_manager.cfg"), "Spectrum manager config file") - ("command", boost::program_options::value(&command)->default_value(""), "Command") + ("command", boost::program_options::value >(&command), "Command") ; try { @@ -388,37 +512,32 @@ int main(int argc, char **argv) return 1; } - if (command == "start") { + if (command[0] == "start") { start_all_instances(&config); } - else if (command == "stop") { + else if (command[0] == "stop") { stop_all_instances(&config); } - else if (command == "status") { + else if (command[0] == "status") { return show_status(&config); } else { + if (command.size() < 2) { + std::cout << desc << "\n"; + return 11; + } Swift::SimpleEventLoop eventLoop; Swift::BoostNetworkFactories networkFactories(&eventLoop); - std::string message = command; - m = &message; + std::string jid = command[0]; + command.erase(command.begin()); + std::string cmd = boost::algorithm::join(command, " "); - ask_local_servers(&config, networkFactories, message); + ask_local_server(&config, networkFactories, jid, cmd); +// std::string message = command; +// m = &message; - std::vector servers = CONFIG_VECTOR(&config, "servers.server"); - for (std::vector::const_iterator it = servers.begin(); it != servers.end(); it++) { - finished++; - Swift::Client *client = new Swift::Client(CONFIG_STRING(&config, "service.admin_username") + "@" + *it, CONFIG_STRING(&config, "service.admin_password"), &networkFactories); - client->setAlwaysTrustCertificates(); - client->onConnected.connect(boost::bind(&handleConnected, client, *it)); - client->onDisconnected.connect(bind(&handleDisconnected, client, _1, *it)); - client->onMessageReceived.connect(bind(&handleMessageReceived, client, _1, *it)); - Swift::ClientOptions opt; - opt.allowPLAINWithoutTLS = true; - client->connect(opt); - // std::cout << *it << "\n"; - } +// ask_local_server(&config, networkFactories, message); eventLoop.run(); } diff --git a/src/admininterface.cpp b/src/admininterface.cpp index 913484be..85dca619 100644 --- a/src/admininterface.cpp +++ b/src/admininterface.cpp @@ -57,22 +57,7 @@ AdminInterface::AdminInterface(Component *component, UserManager *userManager, N AdminInterface::~AdminInterface() { } -void AdminInterface::handleMessageReceived(Swift::Message::ref message) { - if (!message->getTo().getNode().empty()) - return; - - std::vector const &x = CONFIG_VECTOR(m_component->getConfig(),"service.admin_jid"); - if (std::find(x.begin(), x.end(), message->getFrom().toBare().toString()) == x.end()) { - LOG4CXX_WARN(logger, "Message not from admin user, but from " << message->getFrom().toBare().toString()); - return; - - } - - // Ignore empty messages - if (message->getBody().empty()) { - return; - } - +void AdminInterface::handleQuery(Swift::Message::ref message) { LOG4CXX_INFO(logger, "Message from admin received"); message->setTo(message->getFrom()); message->setFrom(m_component->getJID()); @@ -345,6 +330,25 @@ void AdminInterface::handleMessageReceived(Swift::Message::ref message) { else { message->setBody("Unknown command. Try \"help\""); } +} + +void AdminInterface::handleMessageReceived(Swift::Message::ref message) { + if (!message->getTo().getNode().empty()) + return; + + std::vector const &x = CONFIG_VECTOR(m_component->getConfig(),"service.admin_jid"); + if (std::find(x.begin(), x.end(), message->getFrom().toBare().toString()) == x.end()) { + LOG4CXX_WARN(logger, "Message not from admin user, but from " << message->getFrom().toBare().toString()); + return; + + } + + // Ignore empty messages + if (message->getBody().empty()) { + return; + } + + handleQuery(message); m_component->getStanzaChannel()->sendMessage(message); } diff --git a/src/networkpluginserver.cpp b/src/networkpluginserver.cpp index 17a55ec2..27298a55 100644 --- a/src/networkpluginserver.cpp +++ b/src/networkpluginserver.cpp @@ -32,6 +32,7 @@ #include "transport/rosterresponder.h" #include "transport/memoryreadbytestream.h" #include "transport/logging.h" +#include "transport/admininterface.h" #include "blockresponder.h" #include "Swiften/Swiften.h" #include "Swiften/Server/ServerStanzaChannel.h" @@ -228,6 +229,7 @@ NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, U m_config = config; m_component = component; m_isNextLongRun = false; + m_adminInterface = NULL; m_component->m_factory = new NetworkFactory(this); m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1)); m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1)); @@ -763,6 +765,32 @@ void NetworkPluginServer::handlePongReceived(Backend *c) { c->pongReceived = true; } +void NetworkPluginServer::handleQueryPayload(Backend *b, const std::string &data) { + pbnetwork::BackendConfig payload; + if (payload.ParseFromString(data) == false) { + // TODO: ERROR + return; + } + + if (!m_adminInterface) { + return; + } + + boost::shared_ptr msg(new Swift::Message()); + msg->setBody(payload.config()); + m_adminInterface->handleQuery(msg); + + pbnetwork::BackendConfig vcard; + vcard.set_config(msg->getBody()); + + std::string message; + vcard.SerializeToString(&message); + + WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_QUERY); + + send(b->connection, message); +} + void NetworkPluginServer::handleDataRead(Backend *c, boost::shared_ptr data) { // Append data to buffer c->data.insert(c->data.end(), data->begin(), data->end()); @@ -854,6 +882,9 @@ void NetworkPluginServer::handleDataRead(Backend *c, boost::shared_ptr