From d2ed4065362ca76f5ddcd7ea2d8685bcfb893f8d Mon Sep 17 00:00:00 2001 From: Sarang Bharadwaj Date: Fri, 1 Jun 2012 22:52:01 +0530 Subject: [PATCH] Multi-threaded twitter requests --- backends/twitter/main.cpp | 130 +++++++++++++++++++++++++++++++++++--- 1 file changed, 122 insertions(+), 8 deletions(-) diff --git a/backends/twitter/main.cpp b/backends/twitter/main.cpp index 6899b3be..3d0b2a09 100644 --- a/backends/twitter/main.cpp +++ b/backends/twitter/main.cpp @@ -10,7 +10,12 @@ #include "signal.h" #include "sys/wait.h" #include "sys/signal.h" + #include +#include +#include +#include + #include "twitcurl.h" #include "TwitterResponseParser.h" @@ -18,6 +23,8 @@ #include #include #include +#include +#include #include #include "userdb.h" @@ -31,7 +38,11 @@ class TwitterPlugin; // The plugin TwitterPlugin * np = NULL; StorageBackend *storagebackend; +#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")") + class TwitterPlugin : public NetworkPlugin { + private: + struct Request; public: Swift::BoostNetworkFactories *m_factories; Swift::BoostIOServiceThread m_boostIOServiceThread; @@ -54,15 +65,15 @@ class TwitterPlugin : public NetworkPlugin { m_conn = m_factories->getConnectionFactory()->createConnection(); m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1)); m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port)); + onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2)); - //db = new UserDB(std::string("user.db")); - //registeredUsers = db->getRegisteredUsers(); + activeThreadCount = 0; + MAX_THREADS = 50; LOG4CXX_INFO(logger, "Starting the plugin."); } ~TwitterPlugin() { - //delete db; delete storagebackend; std::map::iterator it; for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second; @@ -303,9 +314,13 @@ class TwitterPlugin : public NetworkPlugin { } + void spawnThreadForRequest(Request r) { + std::string &user = r.from; + std::string &legacyName = r.to; + std::string &message = r.message; - void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { LOG4CXX_INFO(logger, "Sending message from " << user << " to " << legacyName << "."); + if(legacyName == "twitter-account") { std::string cmd = message.substr(0, message.find(':')); std::string data = message.substr(message.find(':') + 1); @@ -319,6 +334,72 @@ class TwitterPlugin : public NetworkPlugin { else if(cmd == "#timeline") fetchTimeline(user); else if(cmd == "#friends") fetchFriends(user); } + updateActiveThreadCount(-1); + onDispatchRequest(r, false); + } + + /* + * usersBeingServed - set of users being served at present + * usersToServe - queue of users who have requests pending in their request queue and are yet to be served; Each user appears only once here. + */ + void requestDispatcher(Request r, bool incoming) { + + criticalRegion.lock(); + + if(incoming) { + std::string user = r.from; + if(getActiveThreadCount() < MAX_THREADS && usersBeingServed.count(user) == false) { + updateActiveThreadCount(1); + boost::thread(&TwitterPlugin::spawnThreadForRequest, this, r); + usersBeingServed.insert(user); + LOG4CXX_INFO(logger, user << ": Sending request " << STR(r) << " to twitter") + } else { + requests[user].push(r); + LOG4CXX_INFO(logger, user << " is already being served! Adding " << STR(r) << " to request queue"); + if (!usersBeingServed.count(user)) { + usersToServe.push(user); + } + } + } else { + usersBeingServed.erase(r.from); + if(requests[r.from].size()) usersToServe.push(r.from); + while(getActiveThreadCount() < MAX_THREADS && !usersToServe.empty()) { + std::string user = usersToServe.front(); usersToServe.pop(); + Request s = requests[user].front(); requests[user].pop(); + updateActiveThreadCount(1); + boost::thread(&TwitterPlugin::spawnThreadForRequest, this, s); + usersBeingServed.insert(user); + LOG4CXX_INFO(logger, user << ": Sending request " << STR(s) << " to twitter") + } + } + + criticalRegion.unlock(); + } + + + void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { + + Request r; + r.from = user; + r.to = legacyName; + r.message = message; + LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r)) + onDispatchRequest(r,true); + //requestDispatcher(r, true); + + /*if(legacyName == "twitter-account") { + std::string cmd = message.substr(0, message.find(':')); + std::string data = message.substr(message.find(':') + 1); + + handleMessage(user, "twitter-account", cmd + " " + data); + + if(cmd == "#pin") handlePINExchange(user, data); + else if(cmd == "#help") printHelpMessage(user); + else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);} + else if(cmd == "#status") handleStatusUpdate(user, data); + else if(cmd == "#timeline") fetchTimeline(user); + else if(cmd == "#friends") fetchFriends(user); + }*/ } void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups) { @@ -330,17 +411,50 @@ class TwitterPlugin : public NetworkPlugin { } + int getActiveThreadCount() { + int res; + threadLock.lock(); + res = activeThreadCount; + threadLock.unlock(); + return res; + } + + void updateActiveThreadCount(int k) { + threadLock.lock(); + activeThreadCount+=k; + threadLock.unlock(); + } + private: enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED}; + struct Request { + std::string from; + std::string to; + std::string message; + }; + Config *config; - //UserDB *db; std::string consumerKey; std::string consumerSecret; - //std::set registeredUsers; - std::map sessions; - std::map connectionState; std::string OAUTH_KEY; std::string OAUTH_SECRET; + + int activeThreadCount; + int MAX_THREADS; + + boost::mutex criticalRegion; + boost::mutex threadLock; + + std::map sessions; + std::map > requests; + + std::queue usersToServe; + std::set usersBeingServed; + + + std::map connectionState; + + boost::signal< void (Request, bool) > onDispatchRequest; }; static void spectrum_sigchld_handler(int sig)