Multi-threaded twitter requests

This commit is contained in:
Sarang Bharadwaj 2012-06-01 22:52:01 +05:30
parent 9111a39937
commit d2ed406536

View file

@ -10,7 +10,12 @@
#include "signal.h"
#include "sys/wait.h"
#include "sys/signal.h"
#include <boost/algorithm/string.hpp>
#include <boost/signal.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include "twitcurl.h"
#include "TwitterResponseParser.h"
@ -18,6 +23,8 @@
#include <sstream>
#include <map>
#include <vector>
#include <queue>
#include <set>
#include <cstdio>
#include "userdb.h"
@ -31,7 +38,11 @@ class TwitterPlugin; // The plugin
TwitterPlugin * np = NULL;
StorageBackend *storagebackend;
#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")")
class TwitterPlugin : public NetworkPlugin {
private:
struct Request;
public:
Swift::BoostNetworkFactories *m_factories;
Swift::BoostIOServiceThread m_boostIOServiceThread;
@ -54,15 +65,15 @@ class TwitterPlugin : public NetworkPlugin {
m_conn = m_factories->getConnectionFactory()->createConnection();
m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2));
//db = new UserDB(std::string("user.db"));
//registeredUsers = db->getRegisteredUsers();
activeThreadCount = 0;
MAX_THREADS = 50;
LOG4CXX_INFO(logger, "Starting the plugin.");
}
~TwitterPlugin() {
//delete db;
delete storagebackend;
std::map<std::string, twitCurl*>::iterator it;
for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second;
@ -303,9 +314,13 @@ class TwitterPlugin : public NetworkPlugin {
}
void spawnThreadForRequest(Request r) {
std::string &user = r.from;
std::string &legacyName = r.to;
std::string &message = r.message;
void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
LOG4CXX_INFO(logger, "Sending message from " << user << " to " << legacyName << ".");
if(legacyName == "twitter-account") {
std::string cmd = message.substr(0, message.find(':'));
std::string data = message.substr(message.find(':') + 1);
@ -319,6 +334,72 @@ class TwitterPlugin : public NetworkPlugin {
else if(cmd == "#timeline") fetchTimeline(user);
else if(cmd == "#friends") fetchFriends(user);
}
updateActiveThreadCount(-1);
onDispatchRequest(r, false);
}
/*
* usersBeingServed - set of users being served at present
* usersToServe - queue of users who have requests pending in their request queue and are yet to be served; Each user appears only once here.
*/
void requestDispatcher(Request r, bool incoming) {
criticalRegion.lock();
if(incoming) {
std::string user = r.from;
if(getActiveThreadCount() < MAX_THREADS && usersBeingServed.count(user) == false) {
updateActiveThreadCount(1);
boost::thread(&TwitterPlugin::spawnThreadForRequest, this, r);
usersBeingServed.insert(user);
LOG4CXX_INFO(logger, user << ": Sending request " << STR(r) << " to twitter")
} else {
requests[user].push(r);
LOG4CXX_INFO(logger, user << " is already being served! Adding " << STR(r) << " to request queue");
if (!usersBeingServed.count(user)) {
usersToServe.push(user);
}
}
} else {
usersBeingServed.erase(r.from);
if(requests[r.from].size()) usersToServe.push(r.from);
while(getActiveThreadCount() < MAX_THREADS && !usersToServe.empty()) {
std::string user = usersToServe.front(); usersToServe.pop();
Request s = requests[user].front(); requests[user].pop();
updateActiveThreadCount(1);
boost::thread(&TwitterPlugin::spawnThreadForRequest, this, s);
usersBeingServed.insert(user);
LOG4CXX_INFO(logger, user << ": Sending request " << STR(s) << " to twitter")
}
}
criticalRegion.unlock();
}
void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
Request r;
r.from = user;
r.to = legacyName;
r.message = message;
LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r))
onDispatchRequest(r,true);
//requestDispatcher(r, true);
/*if(legacyName == "twitter-account") {
std::string cmd = message.substr(0, message.find(':'));
std::string data = message.substr(message.find(':') + 1);
handleMessage(user, "twitter-account", cmd + " " + data);
if(cmd == "#pin") handlePINExchange(user, data);
else if(cmd == "#help") printHelpMessage(user);
else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);}
else if(cmd == "#status") handleStatusUpdate(user, data);
else if(cmd == "#timeline") fetchTimeline(user);
else if(cmd == "#friends") fetchFriends(user);
}*/
}
void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector<std::string> &groups) {
@ -330,17 +411,50 @@ class TwitterPlugin : public NetworkPlugin {
}
int getActiveThreadCount() {
int res;
threadLock.lock();
res = activeThreadCount;
threadLock.unlock();
return res;
}
void updateActiveThreadCount(int k) {
threadLock.lock();
activeThreadCount+=k;
threadLock.unlock();
}
private:
enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED};
struct Request {
std::string from;
std::string to;
std::string message;
};
Config *config;
//UserDB *db;
std::string consumerKey;
std::string consumerSecret;
//std::set<std::string> registeredUsers;
std::map<std::string, twitCurl*> sessions;
std::map<std::string, status> connectionState;
std::string OAUTH_KEY;
std::string OAUTH_SECRET;
int activeThreadCount;
int MAX_THREADS;
boost::mutex criticalRegion;
boost::mutex threadLock;
std::map<std::string, twitCurl*> sessions;
std::map<std::string, std::queue<Request> > requests;
std::queue<std::string> usersToServe;
std::set<std::string> usersBeingServed;
std::map<std::string, status> connectionState;
boost::signal< void (Request, bool) > onDispatchRequest;
};
static void spectrum_sigchld_handler(int sig)