ThreadPool implementation

This commit is contained in:
Sarang Bharadwaj 2012-06-02 20:10:42 +05:30
parent d2ed406536
commit 2b47d32a91
8 changed files with 323 additions and 12 deletions

View file

@ -1,5 +1,5 @@
include_directories (${libtransport_SOURCE_DIR}/backends/twitter/libtwitcurl)
FILE(GLOB SRC *.cpp libtwitcurl/*.cpp)
FILE(GLOB SRC *.cpp libtwitcurl/*.cpp Requests/*.cpp)
add_executable(spectrum_twitter_backend ${SRC})
#add_executable(parser TwitterResponseParser.cpp test.cpp)
target_link_libraries(spectrum_twitter_backend curl transport pthread sqlite3 ${Boost_LIBRARIES} ${SWIFTEN_LIBRARY} ${LOG4CXX_LIBRARIES})

View file

@ -0,0 +1,16 @@
#include "DirectMessageRequest.h"
DEFINE_LOGGER(logger, "DirectMessageRequest")
void DirectMessageRequest::run()
{
if(twitObj.directMessageSend(username, data, false) == false) {
LOG4CXX_ERROR(logger, user << ": Error while sending directed message to " << username );
return;
}
twitObj.getLastWebResponse( replyMsg );
}
void DirectMessageRequest::finalize()
{
LOG4CXX_INFO(logger, user << ": Sent " << data << " to " << username)
LOG4CXX_INFO(logger, user << ": Twitter reponse - " << replyMsg)
}

View file

@ -0,0 +1,35 @@
#ifndef DIRECT_MESSAGE
#define DIRECT_MESSAGE
#include "../ThreadPool.h"
#include "../libtwitcurl/twitcurl.h"
#include "transport/networkplugin.h"
#include "transport/logging.h"
#include <string>
#include <iostream>
using namespace Transport;
class DirectMessageRequest : public Thread
{
twitCurl twitObj;
std::string data;
std::string user;
std::string username;
std::string replyMsg;
NetworkPlugin *np;
public:
DirectMessageRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string & _username, const std::string &_data) {
twitObj = *obj;
data = _data;
user = _user;
username = _username;
np = _np;
}
void run();
void finalize();
};
#endif

View file

@ -0,0 +1,21 @@
#include "StatusUpdateRequest.h"
DEFINE_LOGGER(logger, "StatusUpdateRequest")
void StatusUpdateRequest::run()
{
if( twitObj.statusUpdate( data ) ) {
replyMsg = "";
while(replyMsg.length() == 0) {
twitObj.getLastWebResponse( replyMsg );
}
LOG4CXX_INFO(logger, user << "StatusUpdateRequest response " << replyMsg );
} else {
twitObj.getLastCurlError( replyMsg );
LOG4CXX_ERROR(logger, user << "Error - " << replyMsg );
}
}
void StatusUpdateRequest::finalize()
{
LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data);
return;
}

View file

@ -0,0 +1,30 @@
#ifndef STATUS_UPDATE
#define STATUS_UPDATE
#include "../ThreadPool.h"
#include "../libtwitcurl/twitcurl.h"
#include "transport/networkplugin.h"
#include "transport/logging.h"
#include <string>
#include <iostream>
using namespace Transport;
class StatusUpdateRequest : public Thread
{
twitCurl twitObj;
std::string data;
std::string user;
std::string replyMsg;
NetworkPlugin *np;
public:
StatusUpdateRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) {
twitObj = *obj;
data = _data;
user = _user;
np = _np;
}
void run();
void finalize();
};
#endif

View file

@ -0,0 +1,122 @@
#include "ThreadPool.h"
DEFINE_LOGGER(logger, "ThreadPool")
boost::signals2::signal< void (Thread*, int) > onWorkCompleted;
void Worker(Thread *t, int wid)
{
LOG4CXX_INFO(logger, "Starting thread " << wid)
t->run();
onWorkCompleted(t, wid);
}
ThreadPool::ThreadPool(int maxthreads) : MAX_THREADS(maxthreads)
{
activeThreads = 0;
worker = new boost::thread*[MAX_THREADS];
for(int i=0 ; i<MAX_THREADS ; i++) {
worker[i] = NULL;
freeThreads.push(i);
}
onWorkCompleted.connect(boost::bind(&ThreadPool::cleandUp, this, _1, _2));
onWorkerAvailable.connect(boost::bind(&ThreadPool::scheduleFromQueue, this));
}
ThreadPool::~ThreadPool()
{
for(int i=0; i<MAX_THREADS ; i++) {
if(worker[i]) {
delete worker[i];
}
}
delete worker;
while(!requestQueue.empty()) {
Thread *t = requestQueue.front(); requestQueue.pop();
delete t;
}
}
int ThreadPool::getActiveThreadCount()
{
int res;
count_lock.lock();
res = activeThreads;
count_lock.unlock();
return res;
}
void ThreadPool::updateActiveThreadCount(int k)
{
count_lock.lock();
activeThreads += k;
count_lock.unlock();
}
int ThreadPool::getFreeThread()
{
int res = -1;
pool_lock.lock();
if(!freeThreads.empty()){
res = freeThreads.front();
freeThreads.pop();
updateActiveThreadCount(-1);
}
pool_lock.unlock();
return res;
}
void ThreadPool::releaseThread(int i)
{
if(i < 0 || i > MAX_THREADS) return;
pool_lock.lock();
delete worker[i];
worker[i] = NULL;
freeThreads.push(i);
updateActiveThreadCount(1);
pool_lock.unlock();
}
void ThreadPool::cleandUp(Thread *t, int wid)
{
LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID())
t->finalize();
delete t;
releaseThread(wid);
onWorkerAvailable();
}
void ThreadPool::scheduleFromQueue()
{
criticalregion.lock();
while(!requestQueue.empty()) {
int w = getFreeThread();
if(w == -1) break;
LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w)
Thread *t = requestQueue.front(); requestQueue.pop();
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w);
updateActiveThreadCount(-1);
}
criticalregion.unlock();
}
void ThreadPool::runAsThread(Thread *t)
{
int w;
if((w = getFreeThread()) != -1) {
LOG4CXX_INFO(logger, "Creating thread #" << w)
t->setThreadID(w);
worker[w] = new boost::thread(Worker, t, w);
updateActiveThreadCount(-1);
}
else {
LOG4CXX_INFO(logger, "No workers available! adding to queue.")
requestQueue.push(t);
}
}

View file

@ -0,0 +1,72 @@
#ifndef THREAD_POOL
#define THREAD_POOL
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/signals2/signal.hpp>
#include <queue>
#include <iostream>
#include "transport/logging.h"
/*
* Thread serves as a base class for any code that has to be excuted as a thread
* by the ThreadPool class. The run method defines the code that has to be run
* as a theard. For example, code in run could be sendinga request to a server
* waiting for the response and storing the response. When the thread finishes
* execution, the ThreadPool invokes finalize where one could have the code necessary
* to collect all the responses and release any resources.
*
* NOTE: The object of the Thread class must be valid (in scope) throughout the
* execution of the thread.
*/
class Thread
{
int threadID;
public:
Thread() {}
virtual ~Thread() {}
virtual void run() = 0;
virtual void finalize() {}
int getThreadID() {return threadID;}
void setThreadID(int tid) {threadID = tid;}
};
/*
* ThreadPool provides the interface to manage a pool of threads. It schedules jobs
* on free threads and when the thread completes it automatically deletes the object
* corresponding to a Thread. If free threads are not available, the requests are
* added to a queue and scheduled later when threads become available.
*/
class ThreadPool
{
const int MAX_THREADS;
int activeThreads;
std::queue<int> freeThreads;
std::queue<Thread*> requestQueue;
boost::thread **worker;
boost::mutex count_lock;
boost::mutex pool_lock;
boost::mutex criticalregion;
boost::signals2::signal < void () > onWorkerAvailable;
public:
ThreadPool(int maxthreads);
~ThreadPool();
void runAsThread(Thread *t);
int getActiveThreadCount();
void updateActiveThreadCount(int k);
void cleandUp(Thread *, int);
void scheduleFromQueue();
int getFreeThread();
void releaseThread(int i);
};
#endif

View file

@ -5,6 +5,7 @@
#include "transport/mysqlbackend.h"
#include "transport/pqxxbackend.h"
#include "transport/storagebackend.h"
#include "Swiften/Swiften.h"
#include "unistd.h"
#include "signal.h"
@ -26,7 +27,10 @@
#include <queue>
#include <set>
#include <cstdio>
#include "userdb.h"
#include "ThreadPool.h"
#include "Requests/StatusUpdateRequest.h"
#include "Requests/DirectMessageRequest.h"
using namespace boost::filesystem;
using namespace boost::program_options;
@ -66,6 +70,8 @@ class TwitterPlugin : public NetworkPlugin {
m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1));
m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port));
onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2));
tp = new ThreadPool(10);
activeThreadCount = 0;
MAX_THREADS = 50;
@ -77,6 +83,7 @@ class TwitterPlugin : public NetworkPlugin {
delete storagebackend;
std::map<std::string, twitCurl*>::iterator it;
for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second;
delete tp;
}
// Send data to NetworkPlugin server
@ -379,27 +386,34 @@ class TwitterPlugin : public NetworkPlugin {
void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") {
Request r;
/*Request r;
r.from = user;
r.to = legacyName;
r.message = message;
LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r))
onDispatchRequest(r,true);
onDispatchRequest(r,true);*/
//requestDispatcher(r, true);
/*if(legacyName == "twitter-account") {
if(legacyName == "twitter-account") {
std::string cmd = message.substr(0, message.find(':'));
std::string data = message.substr(message.find(':') + 1);
handleMessage(user, "twitter-account", cmd + " " + data);
if(cmd == "#pin") handlePINExchange(user, data);
else if(cmd == "#help") printHelpMessage(user);
else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);}
else if(cmd == "#status") handleStatusUpdate(user, data);
else if(cmd == "#timeline") fetchTimeline(user);
else if(cmd == "#friends") fetchFriends(user);
}*/
//else if(cmd == "#help") printHelpMessage(user);
else if(cmd[0] == '@') {
std::string username = cmd.substr(1);
tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data));
//handleDirectMessage(user, username, data);
}
else if(cmd == "#status") {
tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data));
//handleStatusUpdate(user, data);
}
//else if(cmd == "#timeline") fetchTimeline(user);
//else if(cmd == "#friends") fetchFriends(user);
}
}
void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector<std::string> &groups) {
@ -434,6 +448,7 @@ class TwitterPlugin : public NetworkPlugin {
};
Config *config;
std::string consumerKey;
std::string consumerSecret;
std::string OAUTH_KEY;
@ -445,6 +460,7 @@ class TwitterPlugin : public NetworkPlugin {
boost::mutex criticalRegion;
boost::mutex threadLock;
ThreadPool *tp;
std::map<std::string, twitCurl*> sessions;
std::map<std::string, std::queue<Request> > requests;
@ -453,7 +469,6 @@ class TwitterPlugin : public NetworkPlugin {
std::map<std::string, status> connectionState;
boost::signal< void (Request, bool) > onDispatchRequest;
};