From b62ef77146eb874031e74809354936a8a13aaed5 Mon Sep 17 00:00:00 2001 From: Sarang Bharadwaj Date: Mon, 4 Jun 2012 00:08:16 +0530 Subject: [PATCH] Multi-threaded requests --- backends/twitter/Requests/OAuthFlow.h | 4 +- .../twitter/Requests/PINExchangeProcess.h | 4 +- backends/twitter/TwitterPlugin.cpp | 8 +- backends/twitter/libtwitcurl/twitcurl.cpp | 29 ++ backends/twitter/libtwitcurl/twitcurl.h | 29 +- backends/twitter/main.cpp | 265 ------------------ 6 files changed, 38 insertions(+), 301 deletions(-) diff --git a/backends/twitter/Requests/OAuthFlow.h b/backends/twitter/Requests/OAuthFlow.h index 4ece9abb..bbc730c6 100644 --- a/backends/twitter/Requests/OAuthFlow.h +++ b/backends/twitter/Requests/OAuthFlow.h @@ -22,14 +22,14 @@ class OAuthFlow : public Thread public: OAuthFlow(TwitterPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_username) { - twitObj = obj; + twitObj = obj->clone(); username = _username; user = _user; np = _np; } ~OAuthFlow() { - //delete twitObj; + delete twitObj; } void run(); diff --git a/backends/twitter/Requests/PINExchangeProcess.h b/backends/twitter/Requests/PINExchangeProcess.h index 4aba98a1..6d94085c 100644 --- a/backends/twitter/Requests/PINExchangeProcess.h +++ b/backends/twitter/Requests/PINExchangeProcess.h @@ -22,14 +22,14 @@ class PINExchangeProcess : public Thread public: PINExchangeProcess(TwitterPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) { - twitObj = obj; + twitObj = obj->clone(); data = _data; user = _user; np = _np; } ~PINExchangeProcess() { - //delete twitObj; + delete twitObj; } void run(); diff --git a/backends/twitter/TwitterPlugin.cpp b/backends/twitter/TwitterPlugin.cpp index 512a1496..6c4746f7 100644 --- a/backends/twitter/TwitterPlugin.cpp +++ b/backends/twitter/TwitterPlugin.cpp @@ -203,8 +203,8 @@ void TwitterPlugin::OAuthFlowComplete(const std::string user, twitCurl *obj) { boost::mutex::scoped_lock lock(userlock); - //delete sessions[user]; - //sessions[user] = obj; + delete sessions[user]; + sessions[user] = obj->clone(); connectionState[user] = WAITING_FOR_PIN; } @@ -212,7 +212,7 @@ void TwitterPlugin::pinExchangeComplete(const std::string user, const std::strin { boost::mutex::scoped_lock lock(userlock); - //sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey ); - //sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret ); + sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey ); + sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret ); connectionState[user] = CONNECTED; } diff --git a/backends/twitter/libtwitcurl/twitcurl.cpp b/backends/twitter/libtwitcurl/twitcurl.cpp index 24de2853..63f2112c 100644 --- a/backends/twitter/libtwitcurl/twitcurl.cpp +++ b/backends/twitter/libtwitcurl/twitcurl.cpp @@ -51,6 +51,35 @@ twitCurl::~twitCurl() } } +twitCurl* twitCurl::clone() +{ + twitCurl *cloneObj = new twitCurl(); + + /* cURL flags */ + //cloneObj->m_curlProxyParamsSet = false; + //cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet; + //cloneObj->m_curlCallbackParamsSet = m_curlCallbackParamsSet; + + /* cURL proxy data */ + cloneObj->setProxyServerIp(m_proxyServerIp); + cloneObj->setProxyServerPort(m_proxyServerPort); + cloneObj->setProxyUserName(m_proxyUserName); + cloneObj->setProxyPassword(m_proxyPassword); + + + /* Twitter data */ + cloneObj->setTwitterUsername(m_twitterUsername); + cloneObj->setTwitterPassword(m_twitterPassword); + + /* Twitter API type */ + cloneObj->setTwitterApiType(m_eApiFormatType); + + /* OAuth data */ + cloneObj->m_oAuth = m_oAuth.clone(); + + return cloneObj; +} + /*++ * @method: twitCurl::setTwitterApiType * diff --git a/backends/twitter/libtwitcurl/twitcurl.h b/backends/twitter/libtwitcurl/twitcurl.h index e2d261fc..b374cb6b 100644 --- a/backends/twitter/libtwitcurl/twitcurl.h +++ b/backends/twitter/libtwitcurl/twitcurl.h @@ -220,34 +220,7 @@ public: void setProxyUserName( std::string& proxyUserName /* in */ ); void setProxyPassword( std::string& proxyPassword /* in */ ); - - twitCurl* clone() - { - twitCurl *cloneObj = new twitCurl(); - - /* cURL flags */ - cloneObj->m_curlProxyParamsSet = m_curlProxyParamsSet; - cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet; - cloneObj->m_curlCallbackParamsSet = m_curlCallbackParamsSet; - - /* cURL proxy data */ - cloneObj->m_proxyServerIp = m_proxyServerIp; - cloneObj->m_proxyServerPort = m_proxyServerPort; - cloneObj->m_proxyUserName = m_proxyUserName; - - - /* Twitter data */ - cloneObj->m_twitterUsername = m_twitterUsername; - cloneObj->m_twitterPassword = m_twitterPassword; - - /* Twitter API type */ - cloneObj->m_eApiFormatType = m_eApiFormatType; - - /* OAuth data */ - cloneObj->m_oAuth = m_oAuth.clone(); - - return cloneObj; - } + twitCurl* clone(); private: /* cURL data */ diff --git a/backends/twitter/main.cpp b/backends/twitter/main.cpp index d9db800d..cf387fd7 100644 --- a/backends/twitter/main.cpp +++ b/backends/twitter/main.cpp @@ -1,270 +1,5 @@ -/*#include "transport/config.h" -#include "transport/networkplugin.h" -#include "transport/logging.h" -#include "transport/sqlite3backend.h" -#include "transport/mysqlbackend.h" -#include "transport/pqxxbackend.h" -#include "transport/storagebackend.h" - -#include "Swiften/Swiften.h" -#include "unistd.h" -#include "signal.h" -#include "sys/wait.h" -#include "sys/signal.h" - -#include -#include -#include -#include - -#include "twitcurl.h" -#include "TwitterResponseParser.h" - -#include -#include -#include -#include -#include -#include -#include - -#include "ThreadPool.h" -#include "Requests/StatusUpdateRequest.h" -#include "Requests/DirectMessageRequest.h" -#include "Requests/TimelineRequest.h" -#include "Requests/FetchFriends.h" -#include "Requests/HelpMessageRequest.h" -#include "Requests/PINExchangeProcess.h" -#include "Requests/OAuthFlow.h" - - - -using namespace boost::filesystem; -using namespace boost::program_options; -using namespace Transport;*/ - #include "TwitterPlugin.h" - DEFINE_LOGGER(logger, "Twitter Backend"); -//class TwitterPlugin; // The plugin - -/*#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")") - -class TwitterPlugin : public NetworkPlugin { - private: - struct Request; - public: - Swift::BoostNetworkFactories *m_factories; - Swift::BoostIOServiceThread m_boostIOServiceThread; - boost::shared_ptr m_conn; - - TwitterPlugin(Config *config, Swift::SimpleEventLoop *loop, const std::string &host, int port) : NetworkPlugin() { - this->config = config; - - if(CONFIG_HAS_KEY(config, "twitter.consumer_key") == false || - CONFIG_HAS_KEY(config, "twitter.consumer_secret") == false) { - LOG4CXX_ERROR(logger, "Couldn't find consumer key and/or secret. Please check config file."); - exit(0); - } - consumerKey = CONFIG_STRING(config, "twitter.consumer_key"); - consumerSecret = CONFIG_STRING(config, "twitter.consumer_secret"); - OAUTH_KEY = "oauth_key"; - OAUTH_SECRET = "oauth_secret"; - - m_factories = new Swift::BoostNetworkFactories(loop); - m_conn = m_factories->getConnectionFactory()->createConnection(); - m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1)); - m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port)); - - tp = new ThreadPool(loop_, 10); - - LOG4CXX_INFO(logger, "Starting the plugin."); - } - - ~TwitterPlugin() { - delete storagebackend; - std::map::iterator it; - for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second; - delete tp; - } - - // Send data to NetworkPlugin server - void sendData(const std::string &string) { - m_conn->write(Swift::createSafeByteArray(string)); - } - - // Receive date from the NetworkPlugin server and invoke the appropirate payload handler (implement in the NetworkPlugin class) - void _handleDataRead(boost::shared_ptr data) { - std::string d(data->begin(), data->end()); - handleDataRead(d); - } - - // User trying to login into his twitter account - void handleLoginRequest(const std::string &user, const std::string &legacyName, const std::string &password) { - if(connectionState.count(user) && (connectionState[user] == NEW || - connectionState[user] == CONNECTED || - connectionState[user] == WAITING_FOR_PIN)) { - LOG4CXX_INFO(logger, std::string("A session corresponding to ") + user + std::string(" is already active")) - return; - } - - LOG4CXX_INFO(logger, std::string("Received login request for ") + user) - - initUserSession(user, password); - - handleConnected(user); - handleBuddyChanged(user, "twitter-account", "twitter", std::vector(), pbnetwork::STATUS_ONLINE); - - LOG4CXX_INFO(logger, "Querying database for usersettings of " << user) - - std::string key, secret; - getUserOAuthKeyAndSecret(user, key, secret); - - if(key == "" || secret == "") { - LOG4CXX_INFO(logger, "Intiating OAuth Flow for user " << user) - tp->runAsThread(new OAuthFlow(np, sessions[user], user, sessions[user]->getTwitterUsername())); - } else { - LOG4CXX_INFO(logger, user << " is already registerd. Using the stored oauth key and secret") - LOG4CXX_INFO(logger, key << " " << secret) - pinExchangeComplete(user, key, secret); - } - } - - // User logging out - void handleLogoutRequest(const std::string &user, const std::string &legacyName) { - delete sessions[user]; - sessions[user] = NULL; - connectionState[user] = DISCONNECTED; - } - - - void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { - - if(legacyName == "twitter-account") { - std::string cmd = message.substr(0, message.find(':')); - std::string data = message.substr(message.find(':') + 1); - - handleMessage(user, "twitter-account", cmd + " " + data); - - if(cmd == "#pin") tp->runAsThread(new PINExchangeProcess(np, sessions[user], user, data)); - else if(cmd == "#help") tp->runAsThread(new HelpMessageRequest(np, user)); - else if(cmd[0] == '@') { - std::string username = cmd.substr(1); - tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); - } - else if(cmd == "#status") tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data)); - else if(cmd == "#timeline") tp->runAsThread(new TimelineRequest(np, sessions[user], user)); - else if(cmd == "#friends") tp->runAsThread(new FetchFriends(np, sessions[user], user)); - } - } - - void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups) { - LOG4CXX_INFO(logger, user << ": Added buddy " << buddyName << "."); - handleBuddyChanged(user, buddyName, alias, groups, pbnetwork::STATUS_ONLINE); - } - - void handleBuddyRemovedRequest(const std::string &user, const std::string &buddyName, const std::vector &groups) { - - } - - - bool getUserOAuthKeyAndSecret(const std::string user, std::string &key, std::string &secret) { - boost::mutex::scoped_lock lock(dblock); - - UserInfo info; - if(storagebackend->getUser(user, info) == false) { - LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") - return false; - } - - key="", secret=""; int type; - storagebackend->getUserSetting((long)info.id, OAUTH_KEY, type, key); - storagebackend->getUserSetting((long)info.id, OAUTH_SECRET, type, secret); - return true; - } - - bool storeUserOAuthKeyAndSecret(const std::string user, const std::string OAuthKey, const std::string OAuthSecret) { - - boost::mutex::scoped_lock lock(dblock); - - UserInfo info; - if(storagebackend->getUser(user, info) == false) { - LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") - return false; - } - - storagebackend->updateUserSetting((long)info.id, OAUTH_KEY, OAuthKey); - storagebackend->updateUserSetting((long)info.id, OAUTH_SECRET, OAuthSecret); - return true; - } - - void initUserSession(const std::string user, const std::string password){ - boost::mutex::scoped_lock lock(userlock); - - std::string username = user.substr(0,user.find('@')); - std::string passwd = password; - LOG4CXX_INFO(logger, username + " " + passwd) - - sessions[user] = new twitCurl(); - if(CONFIG_HAS_KEY(config,"proxy.server")) { - std::string ip = CONFIG_STRING(config,"proxy.server"); - - std::ostringstream out; - out << CONFIG_INT(config,"proxy.port"); - std::string port = out.str(); - - std::string puser = CONFIG_STRING(config,"proxy.user"); - std::string ppasswd = CONFIG_STRING(config,"proxy.password"); - - LOG4CXX_INFO(logger, ip << " " << port << " " << puser << " " << ppasswd) - - if(ip != "localhost" && port != "0") { - sessions[user]->setProxyServerIp(ip); - sessions[user]->setProxyServerPort(port); - sessions[user]->setProxyUserName(puser); - sessions[user]->setProxyPassword(ppasswd); - } - } - - connectionState[user] = NEW; - sessions[user]->setTwitterUsername(username); - sessions[user]->setTwitterPassword(passwd); - sessions[user]->getOAuth().setConsumerKey(consumerKey); - sessions[user]->getOAuth().setConsumerSecret(consumerSecret); - } - - void OAuthFlowComplete(const std::string user, twitCurl *obj) { - boost::mutex::scoped_lock lock(userlock); - - delete sessions[user]; - sessions[user] = obj->clone(); - connectionState[user] = WAITING_FOR_PIN; - } - - void pinExchangeComplete(const std::string user, const std::string OAuthAccessTokenKey, const std::string OAuthAccessTokenSecret) { - boost::mutex::scoped_lock lock(userlock); - - sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey ); - sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret ); - connectionState[user] = CONNECTED; - } - - private: - enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED}; - - Config *config; - - std::string consumerKey; - std::string consumerSecret; - std::string OAUTH_KEY; - std::string OAUTH_SECRET; - - boost::mutex dblock, userlock; - - ThreadPool *tp; - std::map sessions; - std::map connectionState; -};*/ static void spectrum_sigchld_handler(int sig) {