Multi-Threaded Request

This commit is contained in:
Sarang Bharadwaj 2012-06-03 15:01:50 +05:30
parent f87f8cc356
commit a2b385e88f
13 changed files with 145 additions and 251 deletions

View file

@ -2,11 +2,11 @@
DEFINE_LOGGER(logger, "DirectMessageRequest")
void DirectMessageRequest::run()
{
if(twitObj.directMessageSend(username, data, false) == false) {
if(twitObj->directMessageSend(username, data, false) == false) {
LOG4CXX_ERROR(logger, user << ": Error while sending directed message to " << username );
return;
}
twitObj.getLastWebResponse( replyMsg );
twitObj->getLastWebResponse( replyMsg );
}
void DirectMessageRequest::finalize()

View file

@ -12,7 +12,7 @@ using namespace Transport;
class DirectMessageRequest : public Thread
{
twitCurl twitObj;
twitCurl *twitObj;
std::string data;
std::string user;
std::string username;
@ -21,13 +21,17 @@ class DirectMessageRequest : public Thread
public:
DirectMessageRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string & _username, const std::string &_data) {
twitObj = *obj;
twitObj = obj->clone();
data = _data;
user = _user;
username = _username;
np = _np;
}
~DirectMessageRequest() {
delete twitObj;
}
void run();
void finalize();
};

View file

@ -4,18 +4,18 @@ DEFINE_LOGGER(logger, "FetchFriends")
void FetchFriends::run()
{
replyMsg = "";
if( twitObj.friendsIdsGet(twitObj.getTwitterUsername())) {
if( twitObj->friendsIdsGet(twitObj->getTwitterUsername())) {
while(replyMsg.length() == 0) {
twitObj.getLastWebResponse( replyMsg );
twitObj->getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << " - " << replyMsg.length() << " " << replyMsg << "\n" );
std::vector<std::string> IDs = getIDs( replyMsg );
twitObj.userLookup(IDs, true);
twitObj.getLastWebResponse( replyMsg );
twitObj->userLookup(IDs, true);
twitObj->getLastWebResponse( replyMsg );
LOG4CXX_INFO(logger, user << " - UserLookUp web response - " << replyMsg.length() << " " << replyMsg << "\n" );
@ -35,7 +35,7 @@ void FetchFriends::finalize()
if(replyMsg != "" ) {
np->handleMessage(user, "twitter-account", userlist);
} else {
twitObj.getLastCurlError( replyMsg );
twitObj->getLastCurlError( replyMsg );
LOG4CXX_INFO(logger, user << " - friendsIdsGet error - " << replyMsg );
}
}

View file

@ -13,7 +13,7 @@ using namespace Transport;
class FetchFriends : public Thread
{
twitCurl twitObj;
twitCurl *twitObj;
std::string user;
std::string replyMsg;
std::string userlist;
@ -21,11 +21,15 @@ class FetchFriends : public Thread
public:
FetchFriends(NetworkPlugin *_np, twitCurl *obj, const std::string &_user) {
twitObj = *obj;
twitObj = obj->clone();
np = _np;
user = _user;
}
~FetchFriends() {
delete twitObj;
}
void run();
void finalize();
};

View file

@ -0,0 +1,19 @@
#include "HelpMessageRequest.h"
DEFINE_LOGGER(logger, "HelpMessageRequest")
void HelpMessageRequest::run()
{
std::string helpMsg = "";
helpMsg = helpMsg
+ "\n******************************HELP************************************\n"
+ "#status:<your status> ==> Update your status\n"
+ "#timeline ==> Retrieve your timeline\n"
+ "@<username>:<message> ==> Send a directed message to the user <username>\n"
+ "#help ==> Print this help message\n"
+ "************************************************************************\n";
np->handleMessage(user, "twitter-account", helpMsg);
}
void HelpMessageRequest::finalize()
{
}

View file

@ -0,0 +1,28 @@
#ifndef HELPMESSAGE_H
#define HELPMESSAGE_H
#include "../ThreadPool.h"
#include "../libtwitcurl/twitcurl.h"
#include "transport/networkplugin.h"
#include "transport/logging.h"
#include <string>
#include <iostream>
using namespace Transport;
class HelpMessageRequest : public Thread
{
std::string user;
NetworkPlugin *np;
public:
HelpMessageRequest(NetworkPlugin *_np, const std::string &_user) {
user = _user;
np = _np;
}
void run();
void finalize();
};
#endif

View file

@ -2,20 +2,22 @@
DEFINE_LOGGER(logger, "StatusUpdateRequest")
void StatusUpdateRequest::run()
{
if( twitObj.statusUpdate( data ) ) {
replyMsg = "";
replyMsg = "";
if( twitObj->statusUpdate( data ) ) {
while(replyMsg.length() == 0) {
twitObj.getLastWebResponse( replyMsg );
twitObj->getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << "StatusUpdateRequest response " << replyMsg );
} else {
twitObj.getLastCurlError( replyMsg );
LOG4CXX_ERROR(logger, user << "Error - " << replyMsg );
}
}
}
void StatusUpdateRequest::finalize()
{
LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data);
if(replyMsg != "" ) {
LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data);
} else {
twitObj->getLastCurlError( replyMsg );
LOG4CXX_ERROR(logger, user << "Error - " << replyMsg );
}
return;
}

View file

@ -11,18 +11,23 @@
using namespace Transport;
class StatusUpdateRequest : public Thread
{
twitCurl twitObj;
twitCurl *twitObj;
std::string data;
std::string user;
std::string replyMsg;
NetworkPlugin *np;
public:
StatusUpdateRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) {
twitObj = *obj;
twitObj = obj->clone();
data = _data;
user = _user;
np = _np;
}
~StatusUpdateRequest() {
delete twitObj;
}
void run();
void finalize();
};

View file

@ -3,11 +3,11 @@ DEFINE_LOGGER(logger, "TimelineRequest")
void TimelineRequest::run()
{
replyMsg = "";
if( twitObj.timelinePublicGet() ) {
if( twitObj->timelineHomeGet() ) {
LOG4CXX_INFO(logger, "Sending timeline request for user " << user)
while(replyMsg.length() == 0) {
twitObj.getLastWebResponse( replyMsg );
twitObj->getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << " - " << replyMsg.length() << " " << replyMsg << "\n" );
@ -27,7 +27,7 @@ void TimelineRequest::finalize()
np->handleMessage(user, "twitter-account", timeline); //send timeline
}
else {
twitObj.getLastCurlError( replyMsg );
twitObj->getLastCurlError( replyMsg );
LOG4CXX_ERROR(logger, user << " - " << replyMsg );
}
}

View file

@ -13,7 +13,7 @@ using namespace Transport;
class TimelineRequest : public Thread
{
twitCurl twitObj;
twitCurl *twitObj;
std::string user;
std::string replyMsg;
std::string timeline;
@ -21,11 +21,16 @@ class TimelineRequest : public Thread
public:
TimelineRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user) {
twitObj = *obj;
twitObj = obj->clone();
np = _np;
user = _user;
}
~TimelineRequest() {
//std::cerr << "*****Timeline request: DESTROYING twitObj****" << std::endl;
delete twitObj;
}
void run();
void finalize();
};

View file

@ -90,6 +90,20 @@ public:
bool extractOAuthTokenKeySecret( const std::string& requestTokenResponse /* in */ );
oAuth clone()
{
oAuth cloneObj;
cloneObj.m_consumerKey = m_consumerKey;
cloneObj.m_consumerSecret = m_consumerSecret;
cloneObj.m_oAuthTokenKey = m_oAuthTokenKey;
cloneObj.m_oAuthTokenSecret = m_oAuthTokenSecret;
cloneObj.m_oAuthPin = m_oAuthPin;
cloneObj.m_nonce = m_nonce;
cloneObj.m_timeStamp = m_timeStamp;
cloneObj.m_oAuthScreenName = m_oAuthScreenName;
return cloneObj;
}
private:
/* OAuth data */
@ -121,4 +135,4 @@ private:
void generateNonceTimeStamp();
};
#endif // __OAUTHLIB_H__
#endif // __OAUTHLIB_H__

View file

@ -220,6 +220,35 @@ public:
void setProxyUserName( std::string& proxyUserName /* in */ );
void setProxyPassword( std::string& proxyPassword /* in */ );
twitCurl* clone()
{
twitCurl *cloneObj = new twitCurl();
/* cURL flags */
cloneObj->m_curlProxyParamsSet = m_curlProxyParamsSet;
cloneObj->m_curlLoginParamsSet = m_curlLoginParamsSet;
cloneObj->m_curlCallbackParamsSet = m_curlCallbackParamsSet;
/* cURL proxy data */
cloneObj->m_proxyServerIp = m_proxyServerIp;
cloneObj->m_proxyServerPort = m_proxyServerPort;
cloneObj->m_proxyUserName = m_proxyUserName;
cloneObj->m_proxyPassword = m_proxyPassword;
/* Twitter data */
cloneObj->m_twitterUsername = m_twitterUsername;
cloneObj->m_twitterPassword = m_twitterPassword;
/* Twitter API type */
cloneObj->m_eApiFormatType = m_eApiFormatType;
/* OAuth data */
cloneObj->m_oAuth = m_oAuth.clone();
return cloneObj;
}
private:
/* cURL data */
CURL* m_curlHandle;

View file

@ -33,6 +33,7 @@
#include "Requests/DirectMessageRequest.h"
#include "Requests/TimelineRequest.h"
#include "Requests/FetchFriends.h"
#include "Requests/HelpMessageRequest.h"
using namespace boost::filesystem;
using namespace boost::program_options;
@ -71,13 +72,9 @@ class TwitterPlugin : public NetworkPlugin {
m_conn = m_factories->getConnectionFactory()->createConnection();
m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2));
tp = new ThreadPool(10);
activeThreadCount = 0;
MAX_THREADS = 50;
LOG4CXX_INFO(logger, "Starting the plugin.");
}
@ -209,193 +206,9 @@ class TwitterPlugin : public NetworkPlugin {
LOG4CXX_INFO(logger, user << ": Sent PIN " << data << " and obtained Access Token");
}
void printHelpMessage(const std::string &user) {
std::string helpMsg = "";
helpMsg = helpMsg
+ "\nHELP\n"
+ "#status:<your status> - Update your status\n"
+ "#timeline - Retrieve your timeline\n"
+ "@<username>:<message> - Send a directed message to the user <username>\n"
+ "#help - print this help message\n";
handleMessage(user, "twitter-account", helpMsg);
}
void handleDirectMessage(const std::string &user, std::string &username, std::string &data) {
if(sessions[user]->directMessageSend(username, data, false) == false) {
LOG4CXX_ERROR(logger, user << ": Error while sending directed message to user " << username );
return;
}
LOG4CXX_INFO(logger, user << ": Sending " << data << " to " << username)
std::string replyMsg;
sessions[user]->getLastWebResponse( replyMsg );
LOG4CXX_INFO(logger, replyMsg);
}
void handleStatusUpdate(const std::string &user, std::string &data) {
if(connectionState[user] != CONNECTED) {
LOG4CXX_ERROR(logger, "Trying to update status for " << user << " when not connected!");
return;
}
std::string replyMsg;
if( sessions[user]->statusUpdate( data ) ) {
replyMsg = "";
while(replyMsg.length() == 0) {
sessions[user]->getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << ": twitCurl:statusUpdate web response: " << replyMsg );
} else {
sessions[user]->getLastCurlError( replyMsg );
LOG4CXX_INFO(logger, user << ": twitCurl::statusUpdate error: " << replyMsg );
}
LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data);
}
void fetchTimeline(const std::string &user) {
if(connectionState[user] != CONNECTED) {
LOG4CXX_ERROR(logger, "Trying to fetch timeline for " << user << " when not connected!");
return;
}
std::string replyMsg = "";
if( sessions[user]->timelineHomeGet()) {
while(replyMsg.length() == 0) {
sessions[user]->getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << ": twitCurl::timeline web response: " << replyMsg.length() << " " << replyMsg << "\n" );
std::vector<Status> tweets = getTimeline(replyMsg);
std::string timeline = "\n";
for(int i=0 ; i<tweets.size() ; i++) {
timeline += tweets[i].getTweet() + "\n";
}
handleMessage(user, "twitter-account", timeline);
} else {
sessions[user]->getLastCurlError( replyMsg );
LOG4CXX_INFO(logger, user << ": twitCurl::timeline error: " << replyMsg );
}
}
void fetchFriends(const std::string &user) {
if(connectionState[user] != CONNECTED) {
LOG4CXX_ERROR(logger, "Trying to fetch friends of " << user << " when not connected!");
return;
}
std::string replyMsg = "";
if( sessions[user]->friendsIdsGet(sessions[user]->getTwitterUsername())) {
while(replyMsg.length() == 0) {
sessions[user]->getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << ": twitCurl::friendsIdsGet web response: " << replyMsg.length() << " " << replyMsg << "\n" );
std::vector<std::string> IDs = getIDs( replyMsg );
/*for(int i=0 ; i<IDs.size() ; i++) {
//LOG4CXX_INFO(logger, "ID #" << i+1 << ": " << IDs[i]);
}*/
sessions[user]->userLookup(IDs, true);
sessions[user]->getLastWebResponse( replyMsg );
LOG4CXX_INFO(logger, user << ": twitCurl::UserLookUp web response: " << replyMsg.length() << " " << replyMsg << "\n" );
std::vector<User> users = getUsers( replyMsg );
std::string userlist = "\n***************USER LIST****************\n";
for(int i=0 ; i < users.size() ; i++) {
userlist += "*)" + users[i].getUserName() + " (" + users[i].getScreenName() + ")\n";
}
userlist += "***************************************\n";
handleMessage(user, "twitter-account", userlist);
} else {
sessions[user]->getLastCurlError( replyMsg );
LOG4CXX_INFO(logger, user << ": twitCurl::friendsIdsGet error: " << replyMsg );
}
}
void spawnThreadForRequest(Request r) {
std::string &user = r.from;
std::string &legacyName = r.to;
std::string &message = r.message;
LOG4CXX_INFO(logger, "Sending message from " << user << " to " << legacyName << ".");
if(legacyName == "twitter-account") {
std::string cmd = message.substr(0, message.find(':'));
std::string data = message.substr(message.find(':') + 1);
handleMessage(user, "twitter-account", cmd + " " + data);
if(cmd == "#pin") handlePINExchange(user, data);
else if(cmd == "#help") printHelpMessage(user);
else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);}
else if(cmd == "#status") handleStatusUpdate(user, data);
else if(cmd == "#timeline") fetchTimeline(user);
else if(cmd == "#friends") fetchFriends(user);
}
updateActiveThreadCount(-1);
onDispatchRequest(r, false);
}
/*
* usersBeingServed - set of users being served at present
* usersToServe - queue of users who have requests pending in their request queue and are yet to be served; Each user appears only once here.
*/
void requestDispatcher(Request r, bool incoming) {
criticalRegion.lock();
if(incoming) {
std::string user = r.from;
if(getActiveThreadCount() < MAX_THREADS && usersBeingServed.count(user) == false) {
updateActiveThreadCount(1);
boost::thread(&TwitterPlugin::spawnThreadForRequest, this, r);
usersBeingServed.insert(user);
LOG4CXX_INFO(logger, user << ": Sending request " << STR(r) << " to twitter")
} else {
requests[user].push(r);
LOG4CXX_INFO(logger, user << " is already being served! Adding " << STR(r) << " to request queue");
if (!usersBeingServed.count(user)) {
usersToServe.push(user);
}
}
} else {
usersBeingServed.erase(r.from);
if(requests[r.from].size()) usersToServe.push(r.from);
while(getActiveThreadCount() < MAX_THREADS && !usersToServe.empty()) {
std::string user = usersToServe.front(); usersToServe.pop();
Request s = requests[user].front(); requests[user].pop();
updateActiveThreadCount(1);
boost::thread(&TwitterPlugin::spawnThreadForRequest, this, s);
usersBeingServed.insert(user);
LOG4CXX_INFO(logger, user << ": Sending request " << STR(s) << " to twitter")
}
}
criticalRegion.unlock();
}
void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
/*Request r;
r.from = user;
r.to = legacyName;
r.message = message;
LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r))
onDispatchRequest(r,true);*/
//requestDispatcher(r, true);
if(legacyName == "twitter-account") {
std::string cmd = message.substr(0, message.find(':'));
std::string data = message.substr(message.find(':') + 1);
@ -403,7 +216,9 @@ class TwitterPlugin : public NetworkPlugin {
handleMessage(user, "twitter-account", cmd + " " + data);
if(cmd == "#pin") handlePINExchange(user, data);
//else if(cmd == "#help") printHelpMessage(user);
else if(cmd == "#help") {
tp->runAsThread(new HelpMessageRequest(np, user));
}
else if(cmd[0] == '@') {
std::string username = cmd.substr(1);
tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data));
@ -413,6 +228,7 @@ class TwitterPlugin : public NetworkPlugin {
}
else if(cmd == "#timeline") {
tp->runAsThread(new TimelineRequest(np, sessions[user], user));
//fetchTimeline(user);
}
else if(cmd == "#friends") {
tp->runAsThread(new FetchFriends(np, sessions[user], user));
@ -429,28 +245,9 @@ class TwitterPlugin : public NetworkPlugin {
}
int getActiveThreadCount() {
int res;
threadLock.lock();
res = activeThreadCount;
threadLock.unlock();
return res;
}
void updateActiveThreadCount(int k) {
threadLock.lock();
activeThreadCount+=k;
threadLock.unlock();
}
private:
enum status {NEW, WAITING_FOR_PIN, CONNECTED, DISCONNECTED};
struct Request {
std::string from;
std::string to;
std::string message;
};
Config *config;
std::string consumerKey;
@ -458,22 +255,9 @@ class TwitterPlugin : public NetworkPlugin {
std::string OAUTH_KEY;
std::string OAUTH_SECRET;
int activeThreadCount;
int MAX_THREADS;
boost::mutex criticalRegion;
boost::mutex threadLock;
ThreadPool *tp;
std::map<std::string, twitCurl*> sessions;
std::map<std::string, std::queue<Request> > requests;
std::queue<std::string> usersToServe;
std::set<std::string> usersBeingServed;
std::map<std::string, twitCurl*> sessions;
std::map<std::string, status> connectionState;
boost::signal< void (Request, bool) > onDispatchRequest;
};
static void spectrum_sigchld_handler(int sig)