Multi-threaded requests

This commit is contained in:
Sarang Bharadwaj 2012-06-04 00:08:16 +05:30
parent cb62e78688
commit b62ef77146
6 changed files with 38 additions and 301 deletions

View file

@ -22,14 +22,14 @@ class OAuthFlow : public Thread
public:
OAuthFlow(TwitterPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_username) {
twitObj = obj;
twitObj = obj->clone();
username = _username;
user = _user;
np = _np;
}
~OAuthFlow() {
//delete twitObj;
delete twitObj;
}
void run();

View file

@ -22,14 +22,14 @@ class PINExchangeProcess : public Thread
public:
PINExchangeProcess(TwitterPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) {
twitObj = obj;
twitObj = obj->clone();
data = _data;
user = _user;
np = _np;
}
~PINExchangeProcess() {
//delete twitObj;
delete twitObj;
}
void run();

View file

@ -203,8 +203,8 @@ void TwitterPlugin::OAuthFlowComplete(const std::string user, twitCurl *obj)
{
boost::mutex::scoped_lock lock(userlock);
//delete sessions[user];
//sessions[user] = obj;
delete sessions[user];
sessions[user] = obj->clone();
connectionState[user] = WAITING_FOR_PIN;
}
@ -212,7 +212,7 @@ void TwitterPlugin::pinExchangeComplete(const std::string user, const std::strin
{
boost::mutex::scoped_lock lock(userlock);
//sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey );
//sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret );
sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey );
sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret );
connectionState[user] = CONNECTED;
}

View file

@ -51,6 +51,35 @@ twitCurl::~twitCurl()
}
}
twitCurl* twitCurl::clone()
{
twitCurl *cloneObj = new twitCurl();
/* cURL flags */
//cloneObj->m_curlProxyParamsSet = false;
//cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet;
//cloneObj->m_curlCallbackParamsSet = m_curlCallbackParamsSet;
/* cURL proxy data */
cloneObj->setProxyServerIp(m_proxyServerIp);
cloneObj->setProxyServerPort(m_proxyServerPort);
cloneObj->setProxyUserName(m_proxyUserName);
cloneObj->setProxyPassword(m_proxyPassword);
/* Twitter data */
cloneObj->setTwitterUsername(m_twitterUsername);
cloneObj->setTwitterPassword(m_twitterPassword);
/* Twitter API type */
cloneObj->setTwitterApiType(m_eApiFormatType);
/* OAuth data */
cloneObj->m_oAuth = m_oAuth.clone();
return cloneObj;
}
/*++
* @method: twitCurl::setTwitterApiType
*

View file

@ -220,34 +220,7 @@ public:
void setProxyUserName( std::string& proxyUserName /* in */ );
void setProxyPassword( std::string& proxyPassword /* in */ );
twitCurl* clone()
{
twitCurl *cloneObj = new twitCurl();
/* cURL flags */
cloneObj->m_curlProxyParamsSet = m_curlProxyParamsSet;
cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet;
cloneObj->m_curlCallbackParamsSet = m_curlCallbackParamsSet;
/* cURL proxy data */
cloneObj->m_proxyServerIp = m_proxyServerIp;
cloneObj->m_proxyServerPort = m_proxyServerPort;
cloneObj->m_proxyUserName = m_proxyUserName;
/* Twitter data */
cloneObj->m_twitterUsername = m_twitterUsername;
cloneObj->m_twitterPassword = m_twitterPassword;
/* Twitter API type */
cloneObj->m_eApiFormatType = m_eApiFormatType;
/* OAuth data */
cloneObj->m_oAuth = m_oAuth.clone();
return cloneObj;
}
twitCurl* clone();
private:
/* cURL data */

View file

@ -1,270 +1,5 @@
/*#include "transport/config.h"
#include "transport/networkplugin.h"
#include "transport/logging.h"
#include "transport/sqlite3backend.h"
#include "transport/mysqlbackend.h"
#include "transport/pqxxbackend.h"
#include "transport/storagebackend.h"
#include "Swiften/Swiften.h"
#include "unistd.h"
#include "signal.h"
#include "sys/wait.h"
#include "sys/signal.h"
#include <boost/algorithm/string.hpp>
#include <boost/signal.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include "twitcurl.h"
#include "TwitterResponseParser.h"
#include <iostream>
#include <sstream>
#include <map>
#include <vector>
#include <queue>
#include <set>
#include <cstdio>
#include "ThreadPool.h"
#include "Requests/StatusUpdateRequest.h"
#include "Requests/DirectMessageRequest.h"
#include "Requests/TimelineRequest.h"
#include "Requests/FetchFriends.h"
#include "Requests/HelpMessageRequest.h"
#include "Requests/PINExchangeProcess.h"
#include "Requests/OAuthFlow.h"
using namespace boost::filesystem;
using namespace boost::program_options;
using namespace Transport;*/
#include "TwitterPlugin.h"
DEFINE_LOGGER(logger, "Twitter Backend");
//class TwitterPlugin; // The plugin
/*#define STR(x) (std::string("(") + x.from + ", " + x.to + ", " + x.message + ")")
class TwitterPlugin : public NetworkPlugin {
private:
struct Request;
public:
Swift::BoostNetworkFactories *m_factories;
Swift::BoostIOServiceThread m_boostIOServiceThread;
boost::shared_ptr<Swift::Connection> m_conn;
TwitterPlugin(Config *config, Swift::SimpleEventLoop *loop, const std::string &host, int port) : NetworkPlugin() {
this->config = config;
if(CONFIG_HAS_KEY(config, "twitter.consumer_key") == false ||
CONFIG_HAS_KEY(config, "twitter.consumer_secret") == false) {
LOG4CXX_ERROR(logger, "Couldn't find consumer key and/or secret. Please check config file.");
exit(0);
}
consumerKey = CONFIG_STRING(config, "twitter.consumer_key");
consumerSecret = CONFIG_STRING(config, "twitter.consumer_secret");
OAUTH_KEY = "oauth_key";
OAUTH_SECRET = "oauth_secret";
m_factories = new Swift::BoostNetworkFactories(loop);
m_conn = m_factories->getConnectionFactory()->createConnection();
m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
tp = new ThreadPool(loop_, 10);
LOG4CXX_INFO(logger, "Starting the plugin.");
}
~TwitterPlugin() {
delete storagebackend;
std::map<std::string, twitCurl*>::iterator it;
for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second;
delete tp;
}
// Send data to NetworkPlugin server
void sendData(const std::string &string) {
m_conn->write(Swift::createSafeByteArray(string));
}
// Receive date from the NetworkPlugin server and invoke the appropirate payload handler (implement in the NetworkPlugin class)
void _handleDataRead(boost::shared_ptr<Swift::SafeByteArray> data) {
std::string d(data->begin(), data->end());
handleDataRead(d);
}
// User trying to login into his twitter account
void handleLoginRequest(const std::string &user, const std::string &legacyName, const std::string &password) {
if(connectionState.count(user) && (connectionState[user] == NEW ||
connectionState[user] == CONNECTED ||
connectionState[user] == WAITING_FOR_PIN)) {
LOG4CXX_INFO(logger, std::string("A session corresponding to ") + user + std::string(" is already active"))
return;
}
LOG4CXX_INFO(logger, std::string("Received login request for ") + user)
initUserSession(user, password);
handleConnected(user);
handleBuddyChanged(user, "twitter-account", "twitter", std::vector<std::string>(), pbnetwork::STATUS_ONLINE);
LOG4CXX_INFO(logger, "Querying database for usersettings of " << user)
std::string key, secret;
getUserOAuthKeyAndSecret(user, key, secret);
if(key == "" || secret == "") {
LOG4CXX_INFO(logger, "Intiating OAuth Flow for user " << user)
tp->runAsThread(new OAuthFlow(np, sessions[user], user, sessions[user]->getTwitterUsername()));
} else {
LOG4CXX_INFO(logger, user << " is already registerd. Using the stored oauth key and secret")
LOG4CXX_INFO(logger, key << " " << secret)
pinExchangeComplete(user, key, secret);
}
}
// User logging out
void handleLogoutRequest(const std::string &user, const std::string &legacyName) {
delete sessions[user];
sessions[user] = NULL;
connectionState[user] = DISCONNECTED;
}
void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
if(legacyName == "twitter-account") {
std::string cmd = message.substr(0, message.find(':'));
std::string data = message.substr(message.find(':') + 1);
handleMessage(user, "twitter-account", cmd + " " + data);
if(cmd == "#pin") tp->runAsThread(new PINExchangeProcess(np, sessions[user], user, data));
else if(cmd == "#help") tp->runAsThread(new HelpMessageRequest(np, user));
else if(cmd[0] == '@') {
std::string username = cmd.substr(1);
tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data));
}
else if(cmd == "#status") tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data));
else if(cmd == "#timeline") tp->runAsThread(new TimelineRequest(np, sessions[user], user));
else if(cmd == "#friends") tp->runAsThread(new FetchFriends(np, sessions[user], user));
}
}
void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector<std::string> &groups) {
LOG4CXX_INFO(logger, user << ": Added buddy " << buddyName << ".");
handleBuddyChanged(user, buddyName, alias, groups, pbnetwork::STATUS_ONLINE);
}
void handleBuddyRemovedRequest(const std::string &user, const std::string &buddyName, const std::vector<std::string> &groups) {
}
bool getUserOAuthKeyAndSecret(const std::string user, std::string &key, std::string &secret) {
boost::mutex::scoped_lock lock(dblock);
UserInfo info;
if(storagebackend->getUser(user, info) == false) {
LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!")
return false;
}
key="", secret=""; int type;
storagebackend->getUserSetting((long)info.id, OAUTH_KEY, type, key);
storagebackend->getUserSetting((long)info.id, OAUTH_SECRET, type, secret);
return true;
}
bool storeUserOAuthKeyAndSecret(const std::string user, const std::string OAuthKey, const std::string OAuthSecret) {
boost::mutex::scoped_lock lock(dblock);
UserInfo info;
if(storagebackend->getUser(user, info) == false) {
LOG4CXX_ERROR(logger, "Didn't find entry for " << user << " in the database!")
return false;
}
storagebackend->updateUserSetting((long)info.id, OAUTH_KEY, OAuthKey);
storagebackend->updateUserSetting((long)info.id, OAUTH_SECRET, OAuthSecret);
return true;
}
void initUserSession(const std::string user, const std::string password){
boost::mutex::scoped_lock lock(userlock);
std::string username = user.substr(0,user.find('@'));
std::string passwd = password;
LOG4CXX_INFO(logger, username + " " + passwd)
sessions[user] = new twitCurl();
if(CONFIG_HAS_KEY(config,"proxy.server")) {
std::string ip = CONFIG_STRING(config,"proxy.server");
std::ostringstream out;
out << CONFIG_INT(config,"proxy.port");
std::string port = out.str();
std::string puser = CONFIG_STRING(config,"proxy.user");
std::string ppasswd = CONFIG_STRING(config,"proxy.password");
LOG4CXX_INFO(logger, ip << " " << port << " " << puser << " " << ppasswd)
if(ip != "localhost" && port != "0") {
sessions[user]->setProxyServerIp(ip);
sessions[user]->setProxyServerPort(port);
sessions[user]->setProxyUserName(puser);
sessions[user]->setProxyPassword(ppasswd);
}
}
connectionState[user] = NEW;
sessions[user]->setTwitterUsername(username);
sessions[user]->setTwitterPassword(passwd);
sessions[user]->getOAuth().setConsumerKey(consumerKey);
sessions[user]->getOAuth().setConsumerSecret(consumerSecret);
}
void OAuthFlowComplete(const std::string user, twitCurl *obj) {
boost::mutex::scoped_lock lock(userlock);
delete sessions[user];
sessions[user] = obj->clone();
connectionState[user] = WAITING_FOR_PIN;
}
void pinExchangeComplete(const std::string user, const std::string OAuthAccessTokenKey, const std::string OAuthAccessTokenSecret) {
boost::mutex::scoped_lock lock(userlock);
sessions[user]->getOAuth().setOAuthTokenKey( OAuthAccessTokenKey );
sessions[user]->getOAuth().setOAuthTokenSecret( OAuthAccessTokenSecret );
connectionState[user] = CONNECTED;
}
private:
enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED};
Config *config;
std::string consumerKey;
std::string consumerSecret;
std::string OAUTH_KEY;
std::string OAUTH_SECRET;
boost::mutex dblock, userlock;
ThreadPool *tp;
std::map<std::string, twitCurl*> sessions;
std::map<std::string, status> connectionState;
};*/
static void spectrum_sigchld_handler(int sig)
{