From cb62e786889fc8cf4d1b4627bc121ff0eb3ca185 Mon Sep 17 00:00:00 2001 From: Sarang Bharadwaj Date: Sun, 3 Jun 2012 22:34:33 +0530 Subject: [PATCH] Muti-threaded login process --- backends/twitter/Requests/OAuthFlow.cpp | 28 +++ backends/twitter/Requests/OAuthFlow.h | 39 +++ .../twitter/Requests/PINExchangeProcess.cpp | 54 ++++ .../twitter/Requests/PINExchangeProcess.h | 39 +++ backends/twitter/TwitterPlugin.cpp | 218 ++++++++++++++++ backends/twitter/TwitterPlugin.h | 101 ++++++++ backends/twitter/libtwitcurl/twitcurl.cpp | 4 +- backends/twitter/libtwitcurl/twitcurl.h | 4 +- backends/twitter/main.cpp | 236 +++++++++--------- 9 files changed, 606 insertions(+), 117 deletions(-) create mode 100644 backends/twitter/Requests/OAuthFlow.cpp create mode 100644 backends/twitter/Requests/OAuthFlow.h create mode 100644 backends/twitter/Requests/PINExchangeProcess.cpp create mode 100644 backends/twitter/Requests/PINExchangeProcess.h create mode 100644 backends/twitter/TwitterPlugin.cpp create mode 100644 backends/twitter/TwitterPlugin.h diff --git a/backends/twitter/Requests/OAuthFlow.cpp b/backends/twitter/Requests/OAuthFlow.cpp new file mode 100644 index 00000000..880bcf1f --- /dev/null +++ b/backends/twitter/Requests/OAuthFlow.cpp @@ -0,0 +1,28 @@ +#include "OAuthFlow.h" +DEFINE_LOGGER(logger, "OAuthFlow") +void OAuthFlow::run() +{ + success = twitObj->oAuthRequestToken( authUrl ); +} + +void OAuthFlow::finalize() +{ + if (!success) { + LOG4CXX_ERROR(logger, "Error creating twitter authorization url!"); + np->handleLogoutRequest(user, username); + } else { + np->handleMessage(user, "twitter-account", std::string("Please visit the following link and authorize this application: ") + authUrl); + np->handleMessage(user, "twitter-account", std::string("Please reply with the PIN provided by twitter. Prefix the pin with 'pin:'. Ex. 'pin: 1234'")); + np->OAuthFlowComplete(user, twitObj); + } +} + +/*std::string authUrl; +if (sessions[user]->oAuthRequestToken( authUrl ) == false ) { + LOG4CXX_ERROR(logger, "Error creating twitter authorization url!"); + handleLogoutRequest(user, username); + return; +} +handleMessage(user, "twitter-account", std::string("Please visit the following link and authorize this application: ") + authUrl); +handleMessage(user, "twitter-account", std::string("Please reply with the PIN provided by twitter. Prefix the pin with 'pin:'. Ex. 'pin: 1234'")); +connectionState[user] = WAITING_FOR_PIN;*/ diff --git a/backends/twitter/Requests/OAuthFlow.h b/backends/twitter/Requests/OAuthFlow.h new file mode 100644 index 00000000..4ece9abb --- /dev/null +++ b/backends/twitter/Requests/OAuthFlow.h @@ -0,0 +1,39 @@ +#ifndef OAUTH_FLOW +#define OAUTH_FLOW + +#include "../ThreadPool.h" +#include "../libtwitcurl/twitcurl.h" +#include "../TwitterPlugin.h" +#include "transport/logging.h" + +#include +#include + +//class TwitterPlugin; +using namespace Transport; +class OAuthFlow : public Thread +{ + twitCurl *twitObj; + std::string username; + std::string user; + std::string authUrl; + TwitterPlugin *np; + bool success; + + public: + OAuthFlow(TwitterPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_username) { + twitObj = obj; + username = _username; + user = _user; + np = _np; + } + + ~OAuthFlow() { + //delete twitObj; + } + + void run(); + void finalize(); +}; + +#endif diff --git a/backends/twitter/Requests/PINExchangeProcess.cpp b/backends/twitter/Requests/PINExchangeProcess.cpp new file mode 100644 index 00000000..a54f0674 --- /dev/null +++ b/backends/twitter/Requests/PINExchangeProcess.cpp @@ -0,0 +1,54 @@ +#include "PINExchangeProcess.h" +DEFINE_LOGGER(logger, "PINExchangeProcess") +void PINExchangeProcess::run() +{ + LOG4CXX_INFO(logger, user << ": Sending PIN " << data) + LOG4CXX_INFO(logger, user << " " << twitObj->getProxyServerIp() << " " << twitObj->getProxyServerPort()) + twitObj->getOAuth().setOAuthPin( data ); + success = twitObj->oAuthAccessToken(); +} + +void PINExchangeProcess::finalize() +{ + if(!success) { + LOG4CXX_ERROR(logger, user << ": Error while exchanging PIN for Access Token!") + np->handleLogoutRequest(user, ""); + } else { + std::string OAuthAccessTokenKey, OAuthAccessTokenSecret; + twitObj->getOAuth().getOAuthTokenKey( OAuthAccessTokenKey ); + twitObj->getOAuth().getOAuthTokenSecret( OAuthAccessTokenSecret ); + + if(np->storeUserOAuthKeyAndSecret(user, OAuthAccessTokenKey, OAuthAccessTokenSecret) == false) { + np->handleLogoutRequest(user, ""); + return; + } + np->pinExchangeComplete(user, OAuthAccessTokenKey, OAuthAccessTokenSecret); + LOG4CXX_INFO(logger, user << ": Sent PIN " << data << " and obtained Access Token"); + } +} + +/*void handlePINExchange(const std::string &user, std::string &data) { + sessions[user]->getOAuth().setOAuthPin( data ); + if (sessions[user]->oAuthAccessToken() == false) { + LOG4CXX_ERROR(logger, user << ": Error while exchanging PIN for Access Token!") + handleLogoutRequest(user, ""); + return; + } + + std::string OAuthAccessTokenKey, OAuthAccessTokenSecret; + sessions[user]->getOAuth().getOAuthTokenKey( OAuthAccessTokenKey ); + sessions[user]->getOAuth().getOAuthTokenSecret( OAuthAccessTokenSecret ); + + UserInfo info; + if(storagebackend->getUser(user, info) == false) { + LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") + handleLogoutRequest(user, ""); + return; + } + + storagebackend->updateUserSetting((long)info.id, OAUTH_KEY, OAuthAccessTokenKey); + storagebackend->updateUserSetting((long)info.id, OAUTH_SECRET, OAuthAccessTokenSecret); + + connectionState[user] = CONNECTED; + LOG4CXX_INFO(logger, user << ": Sent PIN " << data << " and obtained Access Token"); +}*/ diff --git a/backends/twitter/Requests/PINExchangeProcess.h b/backends/twitter/Requests/PINExchangeProcess.h new file mode 100644 index 00000000..4aba98a1 --- /dev/null +++ b/backends/twitter/Requests/PINExchangeProcess.h @@ -0,0 +1,39 @@ +#ifndef PIN_EXCHANGE +#define PIN_EXCHANGE + +#include "../ThreadPool.h" +#include "../libtwitcurl/twitcurl.h" +#include "../TwitterPlugin.h" +#include "transport/networkplugin.h" +#include "transport/logging.h" + +#include +#include + +//class TwitterPlugin; +using namespace Transport; +class PINExchangeProcess : public Thread +{ + twitCurl *twitObj; + std::string data; + std::string user; + TwitterPlugin *np; + bool success; + + public: + PINExchangeProcess(TwitterPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) { + twitObj = obj; + data = _data; + user = _user; + np = _np; + } + + ~PINExchangeProcess() { + //delete twitObj; + } + + void run(); + void finalize(); +}; + +#endif diff --git a/backends/twitter/TwitterPlugin.cpp b/backends/twitter/TwitterPlugin.cpp new file mode 100644 index 00000000..512a1496 --- /dev/null +++ b/backends/twitter/TwitterPlugin.cpp @@ -0,0 +1,218 @@ +#include "TwitterPlugin.h" +#include "Requests/StatusUpdateRequest.h" +#include "Requests/DirectMessageRequest.h" +#include "Requests/TimelineRequest.h" +#include "Requests/FetchFriends.h" +#include "Requests/HelpMessageRequest.h" +#include "Requests/PINExchangeProcess.h" +#include "Requests/OAuthFlow.h" + +DEFINE_LOGGER(logger, "Twitter Backend"); + +TwitterPlugin *np = NULL; +Swift::SimpleEventLoop *loop_; // Event Loop +TwitterPlugin::TwitterPlugin(Config *config, Swift::SimpleEventLoop *loop, StorageBackend *storagebackend, const std::string &host, int port) : NetworkPlugin() +{ + this->config = config; + this->storagebackend = storagebackend; + + if(CONFIG_HAS_KEY(config, "twitter.consumer_key") == false || + CONFIG_HAS_KEY(config, "twitter.consumer_secret") == false) { + LOG4CXX_ERROR(logger, "Couldn't find consumer key and/or secret. Please check config file."); + exit(0); + } + consumerKey = CONFIG_STRING(config, "twitter.consumer_key"); + consumerSecret = CONFIG_STRING(config, "twitter.consumer_secret"); + OAUTH_KEY = "oauth_key"; + OAUTH_SECRET = "oauth_secret"; + + m_factories = new Swift::BoostNetworkFactories(loop); + m_conn = m_factories->getConnectionFactory()->createConnection(); + m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1)); + m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port)); + + tp = new ThreadPool(loop_, 10); + + LOG4CXX_INFO(logger, "Starting the plugin."); +} + +TwitterPlugin::~TwitterPlugin() +{ + delete storagebackend; + std::map::iterator it; + for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second; + delete tp; +} + +// Send data to NetworkPlugin server +void TwitterPlugin::sendData(const std::string &string) +{ + m_conn->write(Swift::createSafeByteArray(string)); +} + +// Receive date from the NetworkPlugin server and invoke the appropirate payload handler (implement in the NetworkPlugin class) +void TwitterPlugin::_handleDataRead(boost::shared_ptr data) +{ + std::string d(data->begin(), data->end()); + handleDataRead(d); +} + +// User trying to login into his twitter account +void TwitterPlugin::handleLoginRequest(const std::string &user, const std::string &legacyName, const std::string &password) +{ + if(connectionState.count(user) && (connectionState[user] == NEW || + connectionState[user] == CONNECTED || + connectionState[user] == WAITING_FOR_PIN)) { + LOG4CXX_INFO(logger, std::string("A session corresponding to ") + user + std::string(" is already active")) + return; + } + + LOG4CXX_INFO(logger, std::string("Received login request for ") + user) + + initUserSession(user, password); + + handleConnected(user); + handleBuddyChanged(user, "twitter-account", "twitter", std::vector(), pbnetwork::STATUS_ONLINE); + + LOG4CXX_INFO(logger, "Querying database for usersettings of " << user) + + std::string key, secret; + getUserOAuthKeyAndSecret(user, key, secret); + + if(key == "" || secret == "") { + LOG4CXX_INFO(logger, "Intiating OAuth Flow for user " << user) + tp->runAsThread(new OAuthFlow(np, sessions[user], user, sessions[user]->getTwitterUsername())); + } else { + LOG4CXX_INFO(logger, user << " is already registerd. Using the stored oauth key and secret") + LOG4CXX_INFO(logger, key << " " << secret) + pinExchangeComplete(user, key, secret); + } +} + +// User logging out +void TwitterPlugin::handleLogoutRequest(const std::string &user, const std::string &legacyName) +{ + delete sessions[user]; + sessions[user] = NULL; + connectionState[user] = DISCONNECTED; +} + + +void TwitterPlugin::handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml) +{ + + if(legacyName == "twitter-account") { + std::string cmd = message.substr(0, message.find(':')); + std::string data = message.substr(message.find(':') + 1); + + handleMessage(user, "twitter-account", cmd + " " + data); + + if(cmd == "#pin") tp->runAsThread(new PINExchangeProcess(np, sessions[user], user, data)); + else if(cmd == "#help") tp->runAsThread(new HelpMessageRequest(np, user)); + else if(cmd[0] == '@') { + std::string username = cmd.substr(1); + tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); + } + else if(cmd == "#status") tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data)); + else if(cmd == "#timeline") tp->runAsThread(new TimelineRequest(np, sessions[user], user)); + else if(cmd == "#friends") tp->runAsThread(new FetchFriends(np, sessions[user], user)); + } +} + +void TwitterPlugin::handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups) +{ + LOG4CXX_INFO(logger, user << ": Added buddy " << buddyName << "."); + handleBuddyChanged(user, buddyName, alias, groups, pbnetwork::STATUS_ONLINE); +} + +void TwitterPlugin::handleBuddyRemovedRequest(const std::string &user, const std::string &buddyName, const std::vector &groups) +{ + +} + + +bool TwitterPlugin::getUserOAuthKeyAndSecret(const std::string user, std::string &key, std::string &secret) +{ + boost::mutex::scoped_lock lock(dblock); + + UserInfo info; + if(storagebackend->getUser(user, info) == false) { + LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") + return false; + } + + key="", secret=""; int type; + storagebackend->getUserSetting((long)info.id, OAUTH_KEY, type, key); + storagebackend->getUserSetting((long)info.id, OAUTH_SECRET, type, secret); + return true; +} + +bool TwitterPlugin::storeUserOAuthKeyAndSecret(const std::string user, const std::string OAuthKey, const std::string OAuthSecret) +{ + + boost::mutex::scoped_lock lock(dblock); + + UserInfo info; + if(storagebackend->getUser(user, info) == false) { + LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") + return false; + } + + storagebackend->updateUserSetting((long)info.id, OAUTH_KEY, OAuthKey); + storagebackend->updateUserSetting((long)info.id, OAUTH_SECRET, OAuthSecret); + return true; +} + +void TwitterPlugin::initUserSession(const std::string user, const std::string password) +{ + boost::mutex::scoped_lock lock(userlock); + + std::string username = user.substr(0,user.find('@')); + std::string passwd = password; + LOG4CXX_INFO(logger, username + " " + passwd) + + sessions[user] = new twitCurl(); + if(CONFIG_HAS_KEY(config,"proxy.server")) { + std::string ip = CONFIG_STRING(config,"proxy.server"); + + std::ostringstream out; + out << CONFIG_INT(config,"proxy.port"); + std::string port = out.str(); + + std::string puser = CONFIG_STRING(config,"proxy.user"); + std::string ppasswd = CONFIG_STRING(config,"proxy.password"); + + LOG4CXX_INFO(logger, ip << " " << port << " " << puser << " " << ppasswd) + + if(ip != "localhost" && port != "0") { + sessions[user]->setProxyServerIp(ip); + sessions[user]->setProxyServerPort(port); + sessions[user]->setProxyUserName(puser); + sessions[user]->setProxyPassword(ppasswd); + } + } + + connectionState[user] = NEW; + sessions[user]->setTwitterUsername(username); + sessions[user]->setTwitterPassword(passwd); + sessions[user]->getOAuth().setConsumerKey(consumerKey); + sessions[user]->getOAuth().setConsumerSecret(consumerSecret); +} + +void TwitterPlugin::OAuthFlowComplete(const std::string user, twitCurl *obj) +{ + boost::mutex::scoped_lock lock(userlock); + + //delete sessions[user]; + //sessions[user] = obj; + connectionState[user] = WAITING_FOR_PIN; +} + +void TwitterPlugin::pinExchangeComplete(const std::string user, const std::string OAuthAccessTokenKey, const std::string OAuthAccessTokenSecret) +{ + boost::mutex::scoped_lock lock(userlock); + + //sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey ); + //sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret ); + connectionState[user] = CONNECTED; +} diff --git a/backends/twitter/TwitterPlugin.h b/backends/twitter/TwitterPlugin.h new file mode 100644 index 00000000..979985f3 --- /dev/null +++ b/backends/twitter/TwitterPlugin.h @@ -0,0 +1,101 @@ +#ifndef TWITTER_PLUGIN +#define TWITTER_PLUGIN + +#include "transport/config.h" +#include "transport/networkplugin.h" +#include "transport/logging.h" +#include "transport/sqlite3backend.h" +#include "transport/mysqlbackend.h" +#include "transport/pqxxbackend.h" +#include "transport/storagebackend.h" + +#include "Swiften/Swiften.h" +#include "unistd.h" +#include "signal.h" +#include "sys/wait.h" +#include "sys/signal.h" + +#include +#include +#include +#include + +#include "twitcurl.h" +#include "TwitterResponseParser.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "ThreadPool.h" + +using namespace boost::filesystem; +using namespace boost::program_options; +using namespace Transport; + + +#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")") +class TwitterPlugin; +extern TwitterPlugin *np; +extern Swift::SimpleEventLoop *loop_; // Event Loop + +class TwitterPlugin : public NetworkPlugin { + public: + Swift::BoostNetworkFactories *m_factories; + Swift::BoostIOServiceThread m_boostIOServiceThread; + boost::shared_ptr m_conn; + StorageBackend *storagebackend; + + TwitterPlugin(Config *config, Swift::SimpleEventLoop *loop, StorageBackend *storagebackend, const std::string &host, int port); + ~TwitterPlugin(); + + // Send data to NetworkPlugin server + void sendData(const std::string &string); + + // Receive date from the NetworkPlugin server and invoke the appropirate payload handler (implement in the NetworkPlugin class) + void _handleDataRead(boost::shared_ptr data); + + // User trying to login into his twitter account + void handleLoginRequest(const std::string &user, const std::string &legacyName, const std::string &password); + + // User logging out + void handleLogoutRequest(const std::string &user, const std::string &legacyName); + + void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = ""); + + void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups); + + void handleBuddyRemovedRequest(const std::string &user, const std::string &buddyName, const std::vector &groups); + + bool getUserOAuthKeyAndSecret(const std::string user, std::string &key, std::string &secret); + + bool storeUserOAuthKeyAndSecret(const std::string user, const std::string OAuthKey, const std::string OAuthSecret); + + void initUserSession(const std::string user, const std::string password); + + void OAuthFlowComplete(const std::string user, twitCurl *obj); + + void pinExchangeComplete(const std::string user, const std::string OAuthAccessTokenKey, const std::string OAuthAccessTokenSecret); + + private: + enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED}; + + Config *config; + + std::string consumerKey; + std::string consumerSecret; + std::string OAUTH_KEY; + std::string OAUTH_SECRET; + + boost::mutex dblock, userlock; + + ThreadPool *tp; + std::map sessions; + std::map connectionState; +}; + +#endif diff --git a/backends/twitter/libtwitcurl/twitcurl.cpp b/backends/twitter/libtwitcurl/twitcurl.cpp index b19e7860..24de2853 100644 --- a/backends/twitter/libtwitcurl/twitcurl.cpp +++ b/backends/twitter/libtwitcurl/twitcurl.cpp @@ -1497,7 +1497,7 @@ bool twitCurl::performGet( const std::string& getUrl ) /* Set http request and url */ curl_easy_setopt( m_curlHandle, CURLOPT_HTTPGET, 1 ); -// curl_easy_setopt( m_curlHandle, CURLOPT_VERBOSE, 1 ); + curl_easy_setopt( m_curlHandle, CURLOPT_VERBOSE, 1 ); curl_easy_setopt( m_curlHandle, CURLOPT_URL, getUrl.c_str() ); /* Send http request */ @@ -1544,6 +1544,7 @@ bool twitCurl::performGet( const std::string& getUrl, const std::string& oAuthHt /* Set http request and url */ curl_easy_setopt( m_curlHandle, CURLOPT_HTTPGET, 1 ); + curl_easy_setopt( m_curlHandle, CURLOPT_VERBOSE, 1 ); curl_easy_setopt( m_curlHandle, CURLOPT_URL, getUrl.c_str() ); /* Set header */ @@ -1675,6 +1676,7 @@ bool twitCurl::performPost( const std::string& postUrl, std::string dataStr ) /* Set http request, url and data */ curl_easy_setopt( m_curlHandle, CURLOPT_POST, 1 ); + curl_easy_setopt( m_curlHandle, CURLOPT_VERBOSE, 1 ); curl_easy_setopt( m_curlHandle, CURLOPT_URL, postUrl.c_str() ); if( dataStr.length() ) { diff --git a/backends/twitter/libtwitcurl/twitcurl.h b/backends/twitter/libtwitcurl/twitcurl.h index 3552e20c..e2d261fc 100644 --- a/backends/twitter/libtwitcurl/twitcurl.h +++ b/backends/twitter/libtwitcurl/twitcurl.h @@ -224,7 +224,7 @@ public: twitCurl* clone() { twitCurl *cloneObj = new twitCurl(); - + /* cURL flags */ cloneObj->m_curlProxyParamsSet = m_curlProxyParamsSet; cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet; @@ -234,7 +234,7 @@ public: cloneObj->m_proxyServerIp = m_proxyServerIp; cloneObj->m_proxyServerPort = m_proxyServerPort; cloneObj->m_proxyUserName = m_proxyUserName; - cloneObj->m_proxyPassword = m_proxyPassword; + /* Twitter data */ cloneObj->m_twitterUsername = m_twitterUsername; diff --git a/backends/twitter/main.cpp b/backends/twitter/main.cpp index a9fb47d9..d9db800d 100644 --- a/backends/twitter/main.cpp +++ b/backends/twitter/main.cpp @@ -1,4 +1,4 @@ -#include "transport/config.h" +/*#include "transport/config.h" #include "transport/networkplugin.h" #include "transport/logging.h" #include "transport/sqlite3backend.h" @@ -34,18 +34,21 @@ #include "Requests/TimelineRequest.h" #include "Requests/FetchFriends.h" #include "Requests/HelpMessageRequest.h" +#include "Requests/PINExchangeProcess.h" +#include "Requests/OAuthFlow.h" + + using namespace boost::filesystem; using namespace boost::program_options; -using namespace Transport; +using namespace Transport;*/ + +#include "TwitterPlugin.h" DEFINE_LOGGER(logger, "Twitter Backend"); -Swift::SimpleEventLoop *loop_; // Event Loop -class TwitterPlugin; // The plugin -TwitterPlugin * np = NULL; -StorageBackend *storagebackend; +//class TwitterPlugin; // The plugin -#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")") +/*#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")") class TwitterPlugin : public NetworkPlugin { private: @@ -95,7 +98,7 @@ class TwitterPlugin : public NetworkPlugin { std::string d(data->begin(), data->end()); handleDataRead(d); } - + // User trying to login into his twitter account void handleLoginRequest(const std::string &user, const std::string &legacyName, const std::string &password) { if(connectionState.count(user) && (connectionState[user] == NEW || @@ -107,14 +110,102 @@ class TwitterPlugin : public NetworkPlugin { LOG4CXX_INFO(logger, std::string("Received login request for ") + user) - std::string username = user.substr(0,user.find('@')); - std::string passwd = password; - LOG4CXX_INFO(logger, username + " " + passwd) - - sessions[user] = new twitCurl(); + initUserSession(user, password); + handleConnected(user); handleBuddyChanged(user, "twitter-account", "twitter", std::vector(), pbnetwork::STATUS_ONLINE); + LOG4CXX_INFO(logger, "Querying database for usersettings of " << user) + + std::string key, secret; + getUserOAuthKeyAndSecret(user, key, secret); + + if(key == "" || secret == "") { + LOG4CXX_INFO(logger, "Intiating OAuth Flow for user " << user) + tp->runAsThread(new OAuthFlow(np, sessions[user], user, sessions[user]->getTwitterUsername())); + } else { + LOG4CXX_INFO(logger, user << " is already registerd. Using the stored oauth key and secret") + LOG4CXX_INFO(logger, key << " " << secret) + pinExchangeComplete(user, key, secret); + } + } + + // User logging out + void handleLogoutRequest(const std::string &user, const std::string &legacyName) { + delete sessions[user]; + sessions[user] = NULL; + connectionState[user] = DISCONNECTED; + } + + + void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { + + if(legacyName == "twitter-account") { + std::string cmd = message.substr(0, message.find(':')); + std::string data = message.substr(message.find(':') + 1); + + handleMessage(user, "twitter-account", cmd + " " + data); + + if(cmd == "#pin") tp->runAsThread(new PINExchangeProcess(np, sessions[user], user, data)); + else if(cmd == "#help") tp->runAsThread(new HelpMessageRequest(np, user)); + else if(cmd[0] == '@') { + std::string username = cmd.substr(1); + tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); + } + else if(cmd == "#status") tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data)); + else if(cmd == "#timeline") tp->runAsThread(new TimelineRequest(np, sessions[user], user)); + else if(cmd == "#friends") tp->runAsThread(new FetchFriends(np, sessions[user], user)); + } + } + + void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups) { + LOG4CXX_INFO(logger, user << ": Added buddy " << buddyName << "."); + handleBuddyChanged(user, buddyName, alias, groups, pbnetwork::STATUS_ONLINE); + } + + void handleBuddyRemovedRequest(const std::string &user, const std::string &buddyName, const std::vector &groups) { + + } + + + bool getUserOAuthKeyAndSecret(const std::string user, std::string &key, std::string &secret) { + boost::mutex::scoped_lock lock(dblock); + + UserInfo info; + if(storagebackend->getUser(user, info) == false) { + LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") + return false; + } + + key="", secret=""; int type; + storagebackend->getUserSetting((long)info.id, OAUTH_KEY, type, key); + storagebackend->getUserSetting((long)info.id, OAUTH_SECRET, type, secret); + return true; + } + + bool storeUserOAuthKeyAndSecret(const std::string user, const std::string OAuthKey, const std::string OAuthSecret) { + + boost::mutex::scoped_lock lock(dblock); + + UserInfo info; + if(storagebackend->getUser(user, info) == false) { + LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") + return false; + } + + storagebackend->updateUserSetting((long)info.id, OAUTH_KEY, OAuthKey); + storagebackend->updateUserSetting((long)info.id, OAUTH_SECRET, OAuthSecret); + return true; + } + + void initUserSession(const std::string user, const std::string password){ + boost::mutex::scoped_lock lock(userlock); + + std::string username = user.substr(0,user.find('@')); + std::string passwd = password; + LOG4CXX_INFO(logger, username + " " + passwd) + + sessions[user] = new twitCurl(); if(CONFIG_HAS_KEY(config,"proxy.server")) { std::string ip = CONFIG_STRING(config,"proxy.server"); @@ -140,110 +231,23 @@ class TwitterPlugin : public NetworkPlugin { sessions[user]->setTwitterPassword(passwd); sessions[user]->getOAuth().setConsumerKey(consumerKey); sessions[user]->getOAuth().setConsumerSecret(consumerSecret); - - LOG4CXX_INFO(logger, "Querying database for usersettings of " << user) - - UserInfo info; - storagebackend->getUser(user, info); - - std::string key="", secret=""; - int type; - storagebackend->getUserSetting((long)info.id, OAUTH_KEY, type, key); - storagebackend->getUserSetting((long)info.id, OAUTH_SECRET, type, secret); - - if(key == "" || secret == "") { - LOG4CXX_INFO(logger, "Intiating oauthflow for user " << user) - - std::string authUrl; - if (sessions[user]->oAuthRequestToken( authUrl ) == false ) { - LOG4CXX_ERROR(logger, "Error creating twitter authorization url!"); - handleLogoutRequest(user, username); - return; - } - handleMessage(user, "twitter-account", std::string("Please visit the following link and authorize this application: ") + authUrl); - handleMessage(user, "twitter-account", std::string("Please reply with the PIN provided by twitter. Prefix the pin with 'pin:'. Ex. 'pin: 1234'")); - connectionState[user] = WAITING_FOR_PIN; - } else { - LOG4CXX_INFO(logger, user << " is already registerd. Using the stored oauth key and secret") - LOG4CXX_INFO(logger, key << " " << secret) - - sessions[user]->getOAuth().setOAuthTokenKey( key ); - sessions[user]->getOAuth().setOAuthTokenSecret( secret ); - connectionState[user] = CONNECTED; - } } - // User logging out - void handleLogoutRequest(const std::string &user, const std::string &legacyName) { + void OAuthFlowComplete(const std::string user, twitCurl *obj) { + boost::mutex::scoped_lock lock(userlock); + delete sessions[user]; - sessions[user] = NULL; - connectionState[user] = DISCONNECTED; - } + sessions[user] = obj->clone(); + connectionState[user] = WAITING_FOR_PIN; + } - void handlePINExchange(const std::string &user, std::string &data) { - sessions[user]->getOAuth().setOAuthPin( data ); - if (sessions[user]->oAuthAccessToken() == false) { - LOG4CXX_ERROR(logger, user << ": Error while exchanging PIN for Access Token!") - handleLogoutRequest(user, ""); - return; - } - - std::string OAuthAccessTokenKey, OAuthAccessTokenSecret; - sessions[user]->getOAuth().getOAuthTokenKey( OAuthAccessTokenKey ); - sessions[user]->getOAuth().getOAuthTokenSecret( OAuthAccessTokenSecret ); - - UserInfo info; - if(storagebackend->getUser(user, info) == false) { - LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!") - handleLogoutRequest(user, ""); - return; - } - - storagebackend->updateUserSetting((long)info.id, OAUTH_KEY, OAuthAccessTokenKey); - storagebackend->updateUserSetting((long)info.id, OAUTH_SECRET, OAuthAccessTokenSecret); - - connectionState[user] = CONNECTED; - LOG4CXX_INFO(logger, user << ": Sent PIN " << data << " and obtained Access Token"); - } - - - void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { - - if(legacyName == "twitter-account") { - std::string cmd = message.substr(0, message.find(':')); - std::string data = message.substr(message.find(':') + 1); + void pinExchangeComplete(const std::string user, const std::string OAuthAccessTokenKey, const std::string OAuthAccessTokenSecret) { + boost::mutex::scoped_lock lock(userlock); - handleMessage(user, "twitter-account", cmd + " " + data); - - if(cmd == "#pin") handlePINExchange(user, data); - else if(cmd == "#help") { - tp->runAsThread(new HelpMessageRequest(np, user)); - } - else if(cmd[0] == '@') { - std::string username = cmd.substr(1); - tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); - } - else if(cmd == "#status") { - tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data)); - } - else if(cmd == "#timeline") { - tp->runAsThread(new TimelineRequest(np, sessions[user], user)); - //fetchTimeline(user); - } - else if(cmd == "#friends") { - tp->runAsThread(new FetchFriends(np, sessions[user], user)); - } - } - } - - void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups) { - LOG4CXX_INFO(logger, user << ": Added buddy " << buddyName << "."); - handleBuddyChanged(user, buddyName, alias, groups, pbnetwork::STATUS_ONLINE); - } - - void handleBuddyRemovedRequest(const std::string &user, const std::string &buddyName, const std::vector &groups) { - - } + sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey ); + sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret ); + connectionState[user] = CONNECTED; + } private: enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED}; @@ -255,10 +259,12 @@ class TwitterPlugin : public NetworkPlugin { std::string OAUTH_KEY; std::string OAUTH_SECRET; + boost::mutex dblock, userlock; + ThreadPool *tp; std::map sessions; std::map connectionState; -}; +};*/ static void spectrum_sigchld_handler(int sig) { @@ -322,6 +328,8 @@ int main (int argc, char* argv[]) { Logging::initBackendLogging(&config); std::string error; + StorageBackend *storagebackend; + storagebackend = StorageBackend::createBackend(&config, error); if (storagebackend == NULL) { LOG4CXX_ERROR(logger, "Error creating StorageBackend! " << error) @@ -335,7 +343,7 @@ int main (int argc, char* argv[]) { Swift::SimpleEventLoop eventLoop; loop_ = &eventLoop; - np = new TwitterPlugin(&config, &eventLoop, host, port); + np = new TwitterPlugin(&config, &eventLoop, storagebackend, host, port); loop_->run(); return 0;