first more or less working version of multiple-backend support

This commit is contained in:
Jan Kaluza 2011-05-26 08:53:05 +02:00
parent 2b338d77da
commit 98687145b0
5 changed files with 92 additions and 47 deletions

View file

@ -74,6 +74,7 @@ class NetworkPlugin {
void send(const std::string &data);
void sendPong();
void pingTimeout();
std::string m_data;
std::string m_host;
@ -81,7 +82,8 @@ class NetworkPlugin {
Swift::BoostNetworkFactories *m_factories;
Swift::BoostIOServiceThread m_boostIOServiceThread;
boost::shared_ptr<Swift::Connection> m_conn;
Swift::Timer::ref m_reconnectTimer;
Swift::Timer::ref m_pingTimer;
bool m_pingReceived;
};

View file

@ -40,6 +40,12 @@ class NetworkConversation;
class NetworkPluginServer {
public:
struct Client {
bool pongReceived;
std::list<User *> users;
std::string data;
};
NetworkPluginServer(Component *component, Config *config, UserManager *userManager);
virtual ~NetworkPluginServer();
@ -67,15 +73,13 @@ class NetworkPluginServer {
void send(boost::shared_ptr<Swift::Connection> &, const std::string &data);
void pingTimeout();
void sendPing();
void sendPing(boost::shared_ptr<Swift::Connection> c);
boost::shared_ptr<Swift::Connection> getFreeClient();
std::string m_command;
std::string m_data;
UserManager *m_userManager;
Config *m_config;
boost::shared_ptr<Swift::ConnectionServer> m_server;
boost::shared_ptr<Swift::Connection> m_client;
bool m_pongReceived;
std::map<boost::shared_ptr<Swift::Connection>, Client > m_clients;
Swift::Timer::ref m_pingTimer;
};

View file

@ -96,6 +96,8 @@ class User {
bool m_connected;
bool m_readyForConnect;
Swift::Timer::ref m_reconnectTimer;
boost::shared_ptr<Swift::Connection> connection;
friend class NetworkPluginServer;
};
}

View file

@ -41,13 +41,14 @@ NetworkPlugin::NetworkPlugin(Swift::EventLoop *loop, const std::string &host, in
m_factories = new Swift::BoostNetworkFactories(loop);
m_host = host;
m_port = port;
m_pingReceived = false;
m_conn = m_factories->getConnectionFactory()->createConnection();
m_conn->onDataRead.connect(boost::bind(&NetworkPlugin::handleDataRead, this, _1));
m_conn->onConnectFinished.connect(boost::bind(&NetworkPlugin::handleConnected, this, _1));
m_conn->onDisconnected.connect(boost::bind(&NetworkPlugin::handleDisconnected, this));
m_reconnectTimer = m_factories->getTimerFactory()->createTimer(1000);
m_reconnectTimer->onTick.connect(boost::bind(&NetworkPlugin::connect, this));
m_pingTimer = m_factories->getTimerFactory()->createTimer(30000);
m_pingTimer->onTick.connect(boost::bind(&NetworkPlugin::pingTimeout, this));
connect();
}
@ -156,26 +157,25 @@ void NetworkPlugin::handleRoomChanged(const std::string &user, const std::string
void NetworkPlugin::handleConnected(bool error) {
if (error) {
std::cout << "Connecting error\n";
// m_reconnectTimer->start();
std::cerr << "Connecting error\n";
m_pingTimer->stop();
exit(1);
}
else {
std::cout << "Connected\n";
m_reconnectTimer->stop();
m_pingTimer->start();
}
}
void NetworkPlugin::handleDisconnected() {
std::cout << "Disconnected\n";
// m_reconnectTimer->start();
std::cerr << "Disconnected\n";
m_pingTimer->stop();
exit(1);
}
void NetworkPlugin::connect() {
std::cout << "Trying to connect the server\n";
m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(m_host), m_port));
m_reconnectTimer->stop();
}
void NetworkPlugin::handleLoginPayload(const std::string &data) {
@ -288,6 +288,7 @@ void NetworkPlugin::send(const std::string &data) {
}
void NetworkPlugin::sendPong() {
m_pingReceived = true;
std::string message;
pbnetwork::WrapperMessage wrap;
wrap.set_type(pbnetwork::WrapperMessage_Type_TYPE_PONG);
@ -297,4 +298,11 @@ void NetworkPlugin::sendPong() {
// std::cout << "SENDING PONG\n";
}
void NetworkPlugin::pingTimeout() {
if (m_pingReceived == false) {
exit(1);
}
m_pingReceived = false;
}
}

View file

@ -106,7 +106,6 @@ NetworkPluginServer::NetworkPluginServer(Component *component, Config *config, U
m_userManager = userManager;
m_config = config;
component->m_factory = new NetworkFactory(this);
m_pongReceived = false;
m_userManager->onUserCreated.connect(boost::bind(&NetworkPluginServer::handleUserCreated, this, _1));
m_userManager->onUserDestroyed.connect(boost::bind(&NetworkPluginServer::handleUserDestroyed, this, _1));
@ -127,24 +126,30 @@ NetworkPluginServer::~NetworkPluginServer() {
}
void NetworkPluginServer::handleNewClientConnection(boost::shared_ptr<Swift::Connection> c) {
if (m_client) {
c->disconnect();
}
m_client = c;
m_pongReceived = false;
Client client;
client.pongReceived = true;
m_clients[c] = client;
c->onDisconnected.connect(boost::bind(&NetworkPluginServer::handleSessionFinished, this, c));
c->onDataRead.connect(boost::bind(&NetworkPluginServer::handleDataRead, this, c, _1));
sendPing();
sendPing(c);
m_pingTimer->start();
}
void NetworkPluginServer::handleSessionFinished(boost::shared_ptr<Swift::Connection> c) {
if (c == m_client) {
m_client.reset();
for (std::list<User *>::const_iterator it = m_clients[c].users.begin(); it != m_clients[c].users.end(); it++) {
(*it)->handleDisconnected("Internal Server Error, please reconnect.");
}
m_clients.erase(c);
// Execute new session only if there's no free one after this crash/disconnection
for (std::map<boost::shared_ptr<Swift::Connection>, Client>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
if ((*it).second.users.size() < 1) {
return;
}
}
m_pingTimer->stop();
exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
}
@ -274,23 +279,23 @@ void NetworkPluginServer::handleConvMessagePayload(const std::string &data, bool
void NetworkPluginServer::handleDataRead(boost::shared_ptr<Swift::Connection> c, const Swift::ByteArray &data) {
long expected_size = 0;
m_data += data.toString();
m_clients[c].data += data.toString();
// std::cout << "received data; size = " << m_data.size() << "\n";
while (m_data.size() != 0) {
if (m_data.size() >= 4) {
unsigned char * head = (unsigned char*) m_data.c_str();
while (m_clients[c].data.size() != 0) {
if (m_clients[c].data.size() >= 4) {
unsigned char * head = (unsigned char*) m_clients[c].data.c_str();
expected_size = (((((*head << 8) | *(head + 1)) << 8) | *(head + 2)) << 8) | *(head + 3);
//expected_size = m_data[0];
// std::cout << "expected_size=" << expected_size << "\n";
if (m_data.size() - 4 < expected_size)
if (m_clients[c].data.size() - 4 < expected_size)
return;
}
else {
return;
}
std::string msg = m_data.substr(4, expected_size);
m_data.erase(0, 4 + expected_size);
std::string msg = m_clients[c].data.substr(4, expected_size);
m_clients[c].data.erase(0, 4 + expected_size);
pbnetwork::WrapperMessage wrapper;
if (wrapper.ParseFromString(msg) == false) {
@ -315,7 +320,7 @@ void NetworkPluginServer::handleDataRead(boost::shared_ptr<Swift::Connection> c,
handleConvMessagePayload(wrapper.payload(), true);
break;
case pbnetwork::WrapperMessage_Type_TYPE_PONG:
m_pongReceived = true;
m_clients[c].pongReceived = true;
break;
case pbnetwork::WrapperMessage_Type_TYPE_PARTICIPANT_CHANGED:
handleParticipantChangedPayload(wrapper.payload());
@ -339,12 +344,14 @@ void NetworkPluginServer::send(boost::shared_ptr<Swift::Connection> &c, const st
void NetworkPluginServer::pingTimeout() {
std::cout << "pingtimeout\n";
if (m_pongReceived) {
sendPing();
m_pingTimer->start();
}
else {
exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
for (std::map<boost::shared_ptr<Swift::Connection>, Client>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
if ((*it).second.pongReceived) {
sendPing((*it).first);
m_pingTimer->start();
}
else {
exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
}
}
}
@ -367,8 +374,10 @@ void NetworkPluginServer::handleUserReadyToConnect(User *user) {
login.SerializeToString(&message);
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LOGIN);
user->connection = getFreeClient();
m_clients[user->connection].users.push_back(user);
send(m_client, message);
send(user->connection, message);
}
void NetworkPluginServer::handleRoomJoined(User *user, const std::string &r, const std::string &nickname, const std::string &password) {
@ -385,7 +394,7 @@ void NetworkPluginServer::handleRoomJoined(User *user, const std::string &r, con
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_JOIN_ROOM);
send(m_client, message);
send(user->connection, message);
NetworkConversation *conv = new NetworkConversation(user->getConversationManager(), r, true);
conv->onMessageToSend.connect(boost::bind(&NetworkPluginServer::handleMessageReceived, this, _1, _2));
@ -406,7 +415,7 @@ void NetworkPluginServer::handleRoomLeft(User *user, const std::string &r) {
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LEAVE_ROOM);
send(m_client, message);
send(user->connection, message);
NetworkConversation *conv = (NetworkConversation *) user->getConversationManager()->getConversation(r);
if (!conv) {
@ -428,7 +437,15 @@ void NetworkPluginServer::handleUserDestroyed(User *user) {
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_LOGOUT);
send(m_client, message);
send(user->connection, message);
m_clients[user->connection].users.remove(user);
if (m_clients[user->connection].users.size() == 0) {
std::cout << "DISCONNECTING\n";
user->connection->disconnect();
user->connection.reset();
m_clients.erase(user->connection);
}
}
void NetworkPluginServer::handleMessageReceived(NetworkConversation *conv, boost::shared_ptr<Swift::Message> &msg) {
@ -442,19 +459,31 @@ void NetworkPluginServer::handleMessageReceived(NetworkConversation *conv, boost
WRAP(message, pbnetwork::WrapperMessage_Type_TYPE_CONV_MESSAGE);
send(m_client, message);
send(conv->getConversationManager()->getUser()->connection, message);
}
void NetworkPluginServer::sendPing() {
void NetworkPluginServer::sendPing(boost::shared_ptr<Swift::Connection> c) {
std::string message;
pbnetwork::WrapperMessage wrap;
wrap.set_type(pbnetwork::WrapperMessage_Type_TYPE_PING);
wrap.SerializeToString(&message);
send(m_client, message);
m_pongReceived = false;
send(c, message);
m_clients[c].pongReceived = false;
std::cout << "SENDING PING\n";
}
boost::shared_ptr<Swift::Connection> NetworkPluginServer::getFreeClient() {
for (std::map<boost::shared_ptr<Swift::Connection>, Client>::const_iterator it = m_clients.begin(); it != m_clients.end(); it++) {
if ((*it).second.users.size() < 1) {
if ((*it).second.users.size() + 1 == 1) {
exec_(CONFIG_STRING(m_config, "service.backend").c_str(), "localhost", "10000", m_config->getConfigFile().c_str());
}
return (*it).first;
}
}
return boost::shared_ptr<Swift::Connection>();
}
}