From a2b385e88f06870058ca3944df3cadc09fd68257 Mon Sep 17 00:00:00 2001 From: Sarang Bharadwaj Date: Sun, 3 Jun 2012 15:01:50 +0530 Subject: [PATCH] Multi-Threaded Request --- .../twitter/Requests/DirectMessageRequest.cpp | 4 +- .../twitter/Requests/DirectMessageRequest.h | 8 +- backends/twitter/Requests/FetchFriends.cpp | 10 +- backends/twitter/Requests/FetchFriends.h | 8 +- .../twitter/Requests/HelpMessageRequest.cpp | 19 ++ .../twitter/Requests/HelpMessageRequest.h | 28 +++ .../twitter/Requests/StatusUpdateRequest.cpp | 18 +- .../twitter/Requests/StatusUpdateRequest.h | 9 +- backends/twitter/Requests/TimelineRequest.cpp | 6 +- backends/twitter/Requests/TimelineRequest.h | 9 +- backends/twitter/libtwitcurl/oauthlib.h | 16 +- backends/twitter/libtwitcurl/twitcurl.h | 29 +++ backends/twitter/main.cpp | 232 +----------------- 13 files changed, 145 insertions(+), 251 deletions(-) create mode 100644 backends/twitter/Requests/HelpMessageRequest.cpp create mode 100644 backends/twitter/Requests/HelpMessageRequest.h diff --git a/backends/twitter/Requests/DirectMessageRequest.cpp b/backends/twitter/Requests/DirectMessageRequest.cpp index 89288272..e8c6f292 100644 --- a/backends/twitter/Requests/DirectMessageRequest.cpp +++ b/backends/twitter/Requests/DirectMessageRequest.cpp @@ -2,11 +2,11 @@ DEFINE_LOGGER(logger, "DirectMessageRequest") void DirectMessageRequest::run() { - if(twitObj.directMessageSend(username, data, false) == false) { + if(twitObj->directMessageSend(username, data, false) == false) { LOG4CXX_ERROR(logger, user << ": Error while sending directed message to " << username ); return; } - twitObj.getLastWebResponse( replyMsg ); + twitObj->getLastWebResponse( replyMsg ); } void DirectMessageRequest::finalize() diff --git a/backends/twitter/Requests/DirectMessageRequest.h b/backends/twitter/Requests/DirectMessageRequest.h index 6159ed65..2dd5ae7a 100644 --- a/backends/twitter/Requests/DirectMessageRequest.h +++ b/backends/twitter/Requests/DirectMessageRequest.h @@ -12,7 +12,7 @@ using namespace Transport; class DirectMessageRequest : public Thread { - twitCurl twitObj; + twitCurl *twitObj; std::string data; std::string user; std::string username; @@ -21,13 +21,17 @@ class DirectMessageRequest : public Thread public: DirectMessageRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string & _username, const std::string &_data) { - twitObj = *obj; + twitObj = obj->clone(); data = _data; user = _user; username = _username; np = _np; } + ~DirectMessageRequest() { + delete twitObj; + } + void run(); void finalize(); }; diff --git a/backends/twitter/Requests/FetchFriends.cpp b/backends/twitter/Requests/FetchFriends.cpp index c9c90ae4..ebd29049 100644 --- a/backends/twitter/Requests/FetchFriends.cpp +++ b/backends/twitter/Requests/FetchFriends.cpp @@ -4,18 +4,18 @@ DEFINE_LOGGER(logger, "FetchFriends") void FetchFriends::run() { replyMsg = ""; - if( twitObj.friendsIdsGet(twitObj.getTwitterUsername())) { + if( twitObj->friendsIdsGet(twitObj->getTwitterUsername())) { while(replyMsg.length() == 0) { - twitObj.getLastWebResponse( replyMsg ); + twitObj->getLastWebResponse( replyMsg ); } LOG4CXX_INFO(logger, user << " - " << replyMsg.length() << " " << replyMsg << "\n" ); std::vector IDs = getIDs( replyMsg ); - twitObj.userLookup(IDs, true); - twitObj.getLastWebResponse( replyMsg ); + twitObj->userLookup(IDs, true); + twitObj->getLastWebResponse( replyMsg ); LOG4CXX_INFO(logger, user << " - UserLookUp web response - " << replyMsg.length() << " " << replyMsg << "\n" ); @@ -35,7 +35,7 @@ void FetchFriends::finalize() if(replyMsg != "" ) { np->handleMessage(user, "twitter-account", userlist); } else { - twitObj.getLastCurlError( replyMsg ); + twitObj->getLastCurlError( replyMsg ); LOG4CXX_INFO(logger, user << " - friendsIdsGet error - " << replyMsg ); } } diff --git a/backends/twitter/Requests/FetchFriends.h b/backends/twitter/Requests/FetchFriends.h index e26714cf..dbd5bf8f 100644 --- a/backends/twitter/Requests/FetchFriends.h +++ b/backends/twitter/Requests/FetchFriends.h @@ -13,7 +13,7 @@ using namespace Transport; class FetchFriends : public Thread { - twitCurl twitObj; + twitCurl *twitObj; std::string user; std::string replyMsg; std::string userlist; @@ -21,11 +21,15 @@ class FetchFriends : public Thread public: FetchFriends(NetworkPlugin *_np, twitCurl *obj, const std::string &_user) { - twitObj = *obj; + twitObj = obj->clone(); np = _np; user = _user; } + ~FetchFriends() { + delete twitObj; + } + void run(); void finalize(); }; diff --git a/backends/twitter/Requests/HelpMessageRequest.cpp b/backends/twitter/Requests/HelpMessageRequest.cpp new file mode 100644 index 00000000..8a123fd7 --- /dev/null +++ b/backends/twitter/Requests/HelpMessageRequest.cpp @@ -0,0 +1,19 @@ +#include "HelpMessageRequest.h" +DEFINE_LOGGER(logger, "HelpMessageRequest") +void HelpMessageRequest::run() +{ + std::string helpMsg = ""; + helpMsg = helpMsg + + "\n******************************HELP************************************\n" + + "#status: ==> Update your status\n" + + "#timeline ==> Retrieve your timeline\n" + + "@: ==> Send a directed message to the user \n" + + "#help ==> Print this help message\n" + + "************************************************************************\n"; + + np->handleMessage(user, "twitter-account", helpMsg); +} + +void HelpMessageRequest::finalize() +{ +} diff --git a/backends/twitter/Requests/HelpMessageRequest.h b/backends/twitter/Requests/HelpMessageRequest.h new file mode 100644 index 00000000..e87623fb --- /dev/null +++ b/backends/twitter/Requests/HelpMessageRequest.h @@ -0,0 +1,28 @@ +#ifndef HELPMESSAGE_H +#define HELPMESSAGE_H + +#include "../ThreadPool.h" +#include "../libtwitcurl/twitcurl.h" +#include "transport/networkplugin.h" +#include "transport/logging.h" +#include +#include + +using namespace Transport; + +class HelpMessageRequest : public Thread +{ + std::string user; + NetworkPlugin *np; + + public: + HelpMessageRequest(NetworkPlugin *_np, const std::string &_user) { + user = _user; + np = _np; + } + + void run(); + void finalize(); +}; + +#endif diff --git a/backends/twitter/Requests/StatusUpdateRequest.cpp b/backends/twitter/Requests/StatusUpdateRequest.cpp index 48db74c4..b7bd4d50 100644 --- a/backends/twitter/Requests/StatusUpdateRequest.cpp +++ b/backends/twitter/Requests/StatusUpdateRequest.cpp @@ -2,20 +2,22 @@ DEFINE_LOGGER(logger, "StatusUpdateRequest") void StatusUpdateRequest::run() { - if( twitObj.statusUpdate( data ) ) { - replyMsg = ""; + replyMsg = ""; + if( twitObj->statusUpdate( data ) ) { while(replyMsg.length() == 0) { - twitObj.getLastWebResponse( replyMsg ); + twitObj->getLastWebResponse( replyMsg ); } LOG4CXX_INFO(logger, user << "StatusUpdateRequest response " << replyMsg ); - } else { - twitObj.getLastCurlError( replyMsg ); - LOG4CXX_ERROR(logger, user << "Error - " << replyMsg ); - } + } } void StatusUpdateRequest::finalize() { - LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data); + if(replyMsg != "" ) { + LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data); + } else { + twitObj->getLastCurlError( replyMsg ); + LOG4CXX_ERROR(logger, user << "Error - " << replyMsg ); + } return; } diff --git a/backends/twitter/Requests/StatusUpdateRequest.h b/backends/twitter/Requests/StatusUpdateRequest.h index 7937a885..5164612d 100644 --- a/backends/twitter/Requests/StatusUpdateRequest.h +++ b/backends/twitter/Requests/StatusUpdateRequest.h @@ -11,18 +11,23 @@ using namespace Transport; class StatusUpdateRequest : public Thread { - twitCurl twitObj; + twitCurl *twitObj; std::string data; std::string user; std::string replyMsg; NetworkPlugin *np; public: StatusUpdateRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) { - twitObj = *obj; + twitObj = obj->clone(); data = _data; user = _user; np = _np; } + + ~StatusUpdateRequest() { + delete twitObj; + } + void run(); void finalize(); }; diff --git a/backends/twitter/Requests/TimelineRequest.cpp b/backends/twitter/Requests/TimelineRequest.cpp index 842d0ac6..3c8f66fe 100644 --- a/backends/twitter/Requests/TimelineRequest.cpp +++ b/backends/twitter/Requests/TimelineRequest.cpp @@ -3,11 +3,11 @@ DEFINE_LOGGER(logger, "TimelineRequest") void TimelineRequest::run() { replyMsg = ""; - if( twitObj.timelinePublicGet() ) { + if( twitObj->timelineHomeGet() ) { LOG4CXX_INFO(logger, "Sending timeline request for user " << user) while(replyMsg.length() == 0) { - twitObj.getLastWebResponse( replyMsg ); + twitObj->getLastWebResponse( replyMsg ); } LOG4CXX_INFO(logger, user << " - " << replyMsg.length() << " " << replyMsg << "\n" ); @@ -27,7 +27,7 @@ void TimelineRequest::finalize() np->handleMessage(user, "twitter-account", timeline); //send timeline } else { - twitObj.getLastCurlError( replyMsg ); + twitObj->getLastCurlError( replyMsg ); LOG4CXX_ERROR(logger, user << " - " << replyMsg ); } } diff --git a/backends/twitter/Requests/TimelineRequest.h b/backends/twitter/Requests/TimelineRequest.h index c84c7b45..02292373 100644 --- a/backends/twitter/Requests/TimelineRequest.h +++ b/backends/twitter/Requests/TimelineRequest.h @@ -13,7 +13,7 @@ using namespace Transport; class TimelineRequest : public Thread { - twitCurl twitObj; + twitCurl *twitObj; std::string user; std::string replyMsg; std::string timeline; @@ -21,11 +21,16 @@ class TimelineRequest : public Thread public: TimelineRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user) { - twitObj = *obj; + twitObj = obj->clone(); np = _np; user = _user; } + ~TimelineRequest() { + //std::cerr << "*****Timeline request: DESTROYING twitObj****" << std::endl; + delete twitObj; + } + void run(); void finalize(); }; diff --git a/backends/twitter/libtwitcurl/oauthlib.h b/backends/twitter/libtwitcurl/oauthlib.h index 3b158e7a..880a7b88 100644 --- a/backends/twitter/libtwitcurl/oauthlib.h +++ b/backends/twitter/libtwitcurl/oauthlib.h @@ -90,6 +90,20 @@ public: bool extractOAuthTokenKeySecret( const std::string& requestTokenResponse /* in */ ); + oAuth clone() + { + oAuth cloneObj; + cloneObj.m_consumerKey = m_consumerKey; + cloneObj.m_consumerSecret = m_consumerSecret; + cloneObj.m_oAuthTokenKey = m_oAuthTokenKey; + cloneObj.m_oAuthTokenSecret = m_oAuthTokenSecret; + cloneObj.m_oAuthPin = m_oAuthPin; + cloneObj.m_nonce = m_nonce; + cloneObj.m_timeStamp = m_timeStamp; + cloneObj.m_oAuthScreenName = m_oAuthScreenName; + return cloneObj; + } + private: /* OAuth data */ @@ -121,4 +135,4 @@ private: void generateNonceTimeStamp(); }; -#endif // __OAUTHLIB_H__ \ No newline at end of file +#endif // __OAUTHLIB_H__ diff --git a/backends/twitter/libtwitcurl/twitcurl.h b/backends/twitter/libtwitcurl/twitcurl.h index c4086cd1..3552e20c 100644 --- a/backends/twitter/libtwitcurl/twitcurl.h +++ b/backends/twitter/libtwitcurl/twitcurl.h @@ -220,6 +220,35 @@ public: void setProxyUserName( std::string& proxyUserName /* in */ ); void setProxyPassword( std::string& proxyPassword /* in */ ); + + twitCurl* clone() + { + twitCurl *cloneObj = new twitCurl(); + + /* cURL flags */ + cloneObj->m_curlProxyParamsSet = m_curlProxyParamsSet; + cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet; + cloneObj->m_curlCallbackParamsSet = m_curlCallbackParamsSet; + + /* cURL proxy data */ + cloneObj->m_proxyServerIp = m_proxyServerIp; + cloneObj->m_proxyServerPort = m_proxyServerPort; + cloneObj->m_proxyUserName = m_proxyUserName; + cloneObj->m_proxyPassword = m_proxyPassword; + + /* Twitter data */ + cloneObj->m_twitterUsername = m_twitterUsername; + cloneObj->m_twitterPassword = m_twitterPassword; + + /* Twitter API type */ + cloneObj->m_eApiFormatType = m_eApiFormatType; + + /* OAuth data */ + cloneObj->m_oAuth = m_oAuth.clone(); + + return cloneObj; + } + private: /* cURL data */ CURL* m_curlHandle; diff --git a/backends/twitter/main.cpp b/backends/twitter/main.cpp index e1c4b422..a8b3f687 100644 --- a/backends/twitter/main.cpp +++ b/backends/twitter/main.cpp @@ -33,6 +33,7 @@ #include "Requests/DirectMessageRequest.h" #include "Requests/TimelineRequest.h" #include "Requests/FetchFriends.h" +#include "Requests/HelpMessageRequest.h" using namespace boost::filesystem; using namespace boost::program_options; @@ -71,13 +72,9 @@ class TwitterPlugin : public NetworkPlugin { m_conn = m_factories->getConnectionFactory()->createConnection(); m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1)); m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port)); - onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2)); tp = new ThreadPool(10); - - activeThreadCount = 0; - MAX_THREADS = 50; - + LOG4CXX_INFO(logger, "Starting the plugin."); } @@ -209,193 +206,9 @@ class TwitterPlugin : public NetworkPlugin { LOG4CXX_INFO(logger, user << ": Sent PIN " << data << " and obtained Access Token"); } - void printHelpMessage(const std::string &user) { - std::string helpMsg = ""; - helpMsg = helpMsg - + "\nHELP\n" - + "#status: - Update your status\n" - + "#timeline - Retrieve your timeline\n" - + "@: - Send a directed message to the user \n" - + "#help - print this help message\n"; - - handleMessage(user, "twitter-account", helpMsg); - } - - void handleDirectMessage(const std::string &user, std::string &username, std::string &data) { - if(sessions[user]->directMessageSend(username, data, false) == false) { - LOG4CXX_ERROR(logger, user << ": Error while sending directed message to user " << username ); - return; - } - - LOG4CXX_INFO(logger, user << ": Sending " << data << " to " << username) - - std::string replyMsg; - sessions[user]->getLastWebResponse( replyMsg ); - LOG4CXX_INFO(logger, replyMsg); - } - - void handleStatusUpdate(const std::string &user, std::string &data) { - if(connectionState[user] != CONNECTED) { - LOG4CXX_ERROR(logger, "Trying to update status for " << user << " when not connected!"); - return; - } - - std::string replyMsg; - if( sessions[user]->statusUpdate( data ) ) { - replyMsg = ""; - while(replyMsg.length() == 0) { - sessions[user]->getLastWebResponse( replyMsg ); - } - LOG4CXX_INFO(logger, user << ": twitCurl:statusUpdate web response: " << replyMsg ); - } else { - sessions[user]->getLastCurlError( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::statusUpdate error: " << replyMsg ); - } - LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data); - } - - void fetchTimeline(const std::string &user) { - if(connectionState[user] != CONNECTED) { - LOG4CXX_ERROR(logger, "Trying to fetch timeline for " << user << " when not connected!"); - return; - } - - std::string replyMsg = ""; - if( sessions[user]->timelineHomeGet()) { - - while(replyMsg.length() == 0) { - sessions[user]->getLastWebResponse( replyMsg ); - } - - LOG4CXX_INFO(logger, user << ": twitCurl::timeline web response: " << replyMsg.length() << " " << replyMsg << "\n" ); - - std::vector tweets = getTimeline(replyMsg); - std::string timeline = "\n"; - for(int i=0 ; igetLastCurlError( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::timeline error: " << replyMsg ); - } - } - - void fetchFriends(const std::string &user) { - if(connectionState[user] != CONNECTED) { - LOG4CXX_ERROR(logger, "Trying to fetch friends of " << user << " when not connected!"); - return; - } - - std::string replyMsg = ""; - if( sessions[user]->friendsIdsGet(sessions[user]->getTwitterUsername())) { - - while(replyMsg.length() == 0) { - sessions[user]->getLastWebResponse( replyMsg ); - } - - LOG4CXX_INFO(logger, user << ": twitCurl::friendsIdsGet web response: " << replyMsg.length() << " " << replyMsg << "\n" ); - - std::vector IDs = getIDs( replyMsg ); - /*for(int i=0 ; iuserLookup(IDs, true); - sessions[user]->getLastWebResponse( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::UserLookUp web response: " << replyMsg.length() << " " << replyMsg << "\n" ); - - std::vector users = getUsers( replyMsg ); - - std::string userlist = "\n***************USER LIST****************\n"; - for(int i=0 ; i < users.size() ; i++) { - userlist += "*)" + users[i].getUserName() + " (" + users[i].getScreenName() + ")\n"; - } - userlist += "***************************************\n"; - handleMessage(user, "twitter-account", userlist); - - } else { - sessions[user]->getLastCurlError( replyMsg ); - LOG4CXX_INFO(logger, user << ": twitCurl::friendsIdsGet error: " << replyMsg ); - } - - } - - void spawnThreadForRequest(Request r) { - std::string &user = r.from; - std::string &legacyName = r.to; - std::string &message = r.message; - - LOG4CXX_INFO(logger, "Sending message from " << user << " to " << legacyName << "."); - - if(legacyName == "twitter-account") { - std::string cmd = message.substr(0, message.find(':')); - std::string data = message.substr(message.find(':') + 1); - - handleMessage(user, "twitter-account", cmd + " " + data); - - if(cmd == "#pin") handlePINExchange(user, data); - else if(cmd == "#help") printHelpMessage(user); - else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);} - else if(cmd == "#status") handleStatusUpdate(user, data); - else if(cmd == "#timeline") fetchTimeline(user); - else if(cmd == "#friends") fetchFriends(user); - } - updateActiveThreadCount(-1); - onDispatchRequest(r, false); - } - - /* - * usersBeingServed - set of users being served at present - * usersToServe - queue of users who have requests pending in their request queue and are yet to be served; Each user appears only once here. - */ - void requestDispatcher(Request r, bool incoming) { - - criticalRegion.lock(); - - if(incoming) { - std::string user = r.from; - if(getActiveThreadCount() < MAX_THREADS && usersBeingServed.count(user) == false) { - updateActiveThreadCount(1); - boost::thread(&TwitterPlugin::spawnThreadForRequest, this, r); - usersBeingServed.insert(user); - LOG4CXX_INFO(logger, user << ": Sending request " << STR(r) << " to twitter") - } else { - requests[user].push(r); - LOG4CXX_INFO(logger, user << " is already being served! Adding " << STR(r) << " to request queue"); - if (!usersBeingServed.count(user)) { - usersToServe.push(user); - } - } - } else { - usersBeingServed.erase(r.from); - if(requests[r.from].size()) usersToServe.push(r.from); - while(getActiveThreadCount() < MAX_THREADS && !usersToServe.empty()) { - std::string user = usersToServe.front(); usersToServe.pop(); - Request s = requests[user].front(); requests[user].pop(); - updateActiveThreadCount(1); - boost::thread(&TwitterPlugin::spawnThreadForRequest, this, s); - usersBeingServed.insert(user); - LOG4CXX_INFO(logger, user << ": Sending request " << STR(s) << " to twitter") - } - } - - criticalRegion.unlock(); - } - void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { - /*Request r; - r.from = user; - r.to = legacyName; - r.message = message; - LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r)) - onDispatchRequest(r,true);*/ - //requestDispatcher(r, true); - if(legacyName == "twitter-account") { std::string cmd = message.substr(0, message.find(':')); std::string data = message.substr(message.find(':') + 1); @@ -403,7 +216,9 @@ class TwitterPlugin : public NetworkPlugin { handleMessage(user, "twitter-account", cmd + " " + data); if(cmd == "#pin") handlePINExchange(user, data); - //else if(cmd == "#help") printHelpMessage(user); + else if(cmd == "#help") { + tp->runAsThread(new HelpMessageRequest(np, user)); + } else if(cmd[0] == '@') { std::string username = cmd.substr(1); tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); @@ -413,6 +228,7 @@ class TwitterPlugin : public NetworkPlugin { } else if(cmd == "#timeline") { tp->runAsThread(new TimelineRequest(np, sessions[user], user)); + //fetchTimeline(user); } else if(cmd == "#friends") { tp->runAsThread(new FetchFriends(np, sessions[user], user)); @@ -429,28 +245,9 @@ class TwitterPlugin : public NetworkPlugin { } - int getActiveThreadCount() { - int res; - threadLock.lock(); - res = activeThreadCount; - threadLock.unlock(); - return res; - } - - void updateActiveThreadCount(int k) { - threadLock.lock(); - activeThreadCount+=k; - threadLock.unlock(); - } - private: enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED}; - struct Request { - std::string from; - std::string to; - std::string message; - }; - + Config *config; std::string consumerKey; @@ -458,22 +255,9 @@ class TwitterPlugin : public NetworkPlugin { std::string OAUTH_KEY; std::string OAUTH_SECRET; - int activeThreadCount; - int MAX_THREADS; - - boost::mutex criticalRegion; - boost::mutex threadLock; - ThreadPool *tp; - std::map sessions; - std::map > requests; - - std::queue usersToServe; - std::set usersBeingServed; - - + std::map sessions; std::map connectionState; - boost::signal< void (Request, bool) > onDispatchRequest; }; static void spectrum_sigchld_handler(int sig)