From 2b47d32a916ce34844b338b72b3ed5ca9648ef4f Mon Sep 17 00:00:00 2001 From: Sarang Bharadwaj Date: Sat, 2 Jun 2012 20:10:42 +0530 Subject: [PATCH] ThreadPool implementation --- backends/twitter/CMakeLists.txt | 2 +- .../twitter/Requests/DirectMessageRequest.cpp | 16 +++ .../twitter/Requests/DirectMessageRequest.h | 35 +++++ .../twitter/Requests/StatusUpdateRequest.cpp | 21 +++ .../twitter/Requests/StatusUpdateRequest.h | 30 +++++ backends/twitter/ThreadPool.cpp | 122 ++++++++++++++++++ backends/twitter/ThreadPool.h | 72 +++++++++++ backends/twitter/main.cpp | 37 ++++-- 8 files changed, 323 insertions(+), 12 deletions(-) create mode 100644 backends/twitter/Requests/DirectMessageRequest.cpp create mode 100644 backends/twitter/Requests/DirectMessageRequest.h create mode 100644 backends/twitter/Requests/StatusUpdateRequest.cpp create mode 100644 backends/twitter/Requests/StatusUpdateRequest.h create mode 100644 backends/twitter/ThreadPool.cpp create mode 100644 backends/twitter/ThreadPool.h diff --git a/backends/twitter/CMakeLists.txt b/backends/twitter/CMakeLists.txt index f5ad1a9b..d090a5a0 100644 --- a/backends/twitter/CMakeLists.txt +++ b/backends/twitter/CMakeLists.txt @@ -1,5 +1,5 @@ include_directories (${libtransport_SOURCE_DIR}/backends/twitter/libtwitcurl) -FILE(GLOB SRC *.cpp libtwitcurl/*.cpp) +FILE(GLOB SRC *.cpp libtwitcurl/*.cpp Requests/*.cpp) add_executable(spectrum_twitter_backend ${SRC}) #add_executable(parser TwitterResponseParser.cpp test.cpp) target_link_libraries(spectrum_twitter_backend curl transport pthread sqlite3 ${Boost_LIBRARIES} ${SWIFTEN_LIBRARY} ${LOG4CXX_LIBRARIES}) diff --git a/backends/twitter/Requests/DirectMessageRequest.cpp b/backends/twitter/Requests/DirectMessageRequest.cpp new file mode 100644 index 00000000..89288272 --- /dev/null +++ b/backends/twitter/Requests/DirectMessageRequest.cpp @@ -0,0 +1,16 @@ +#include "DirectMessageRequest.h" +DEFINE_LOGGER(logger, "DirectMessageRequest") +void DirectMessageRequest::run() +{ + if(twitObj.directMessageSend(username, data, false) == false) { + LOG4CXX_ERROR(logger, user << ": Error while sending directed message to " << username ); + return; + } + twitObj.getLastWebResponse( replyMsg ); +} + +void DirectMessageRequest::finalize() +{ + LOG4CXX_INFO(logger, user << ": Sent " << data << " to " << username) + LOG4CXX_INFO(logger, user << ": Twitter reponse - " << replyMsg) +} diff --git a/backends/twitter/Requests/DirectMessageRequest.h b/backends/twitter/Requests/DirectMessageRequest.h new file mode 100644 index 00000000..6159ed65 --- /dev/null +++ b/backends/twitter/Requests/DirectMessageRequest.h @@ -0,0 +1,35 @@ +#ifndef DIRECT_MESSAGE +#define DIRECT_MESSAGE + +#include "../ThreadPool.h" +#include "../libtwitcurl/twitcurl.h" +#include "transport/networkplugin.h" +#include "transport/logging.h" +#include +#include + +using namespace Transport; + +class DirectMessageRequest : public Thread +{ + twitCurl twitObj; + std::string data; + std::string user; + std::string username; + std::string replyMsg; + NetworkPlugin *np; + + public: + DirectMessageRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string & _username, const std::string &_data) { + twitObj = *obj; + data = _data; + user = _user; + username = _username; + np = _np; + } + + void run(); + void finalize(); +}; + +#endif diff --git a/backends/twitter/Requests/StatusUpdateRequest.cpp b/backends/twitter/Requests/StatusUpdateRequest.cpp new file mode 100644 index 00000000..48db74c4 --- /dev/null +++ b/backends/twitter/Requests/StatusUpdateRequest.cpp @@ -0,0 +1,21 @@ +#include "StatusUpdateRequest.h" +DEFINE_LOGGER(logger, "StatusUpdateRequest") +void StatusUpdateRequest::run() +{ + if( twitObj.statusUpdate( data ) ) { + replyMsg = ""; + while(replyMsg.length() == 0) { + twitObj.getLastWebResponse( replyMsg ); + } + LOG4CXX_INFO(logger, user << "StatusUpdateRequest response " << replyMsg ); + } else { + twitObj.getLastCurlError( replyMsg ); + LOG4CXX_ERROR(logger, user << "Error - " << replyMsg ); + } +} + +void StatusUpdateRequest::finalize() +{ + LOG4CXX_INFO(logger, "Updated status for " << user << ": " << data); + return; +} diff --git a/backends/twitter/Requests/StatusUpdateRequest.h b/backends/twitter/Requests/StatusUpdateRequest.h new file mode 100644 index 00000000..7937a885 --- /dev/null +++ b/backends/twitter/Requests/StatusUpdateRequest.h @@ -0,0 +1,30 @@ +#ifndef STATUS_UPDATE +#define STATUS_UPDATE + +#include "../ThreadPool.h" +#include "../libtwitcurl/twitcurl.h" +#include "transport/networkplugin.h" +#include "transport/logging.h" +#include +#include + +using namespace Transport; +class StatusUpdateRequest : public Thread +{ + twitCurl twitObj; + std::string data; + std::string user; + std::string replyMsg; + NetworkPlugin *np; + public: + StatusUpdateRequest(NetworkPlugin *_np, twitCurl *obj, const std::string &_user, const std::string &_data) { + twitObj = *obj; + data = _data; + user = _user; + np = _np; + } + void run(); + void finalize(); +}; + +#endif diff --git a/backends/twitter/ThreadPool.cpp b/backends/twitter/ThreadPool.cpp new file mode 100644 index 00000000..0a810e5f --- /dev/null +++ b/backends/twitter/ThreadPool.cpp @@ -0,0 +1,122 @@ +#include "ThreadPool.h" +DEFINE_LOGGER(logger, "ThreadPool") +boost::signals2::signal< void (Thread*, int) > onWorkCompleted; + +void Worker(Thread *t, int wid) +{ + LOG4CXX_INFO(logger, "Starting thread " << wid) + t->run(); + onWorkCompleted(t, wid); +} + + +ThreadPool::ThreadPool(int maxthreads) : MAX_THREADS(maxthreads) +{ + activeThreads = 0; + worker = new boost::thread*[MAX_THREADS]; + for(int i=0 ; i MAX_THREADS) return; + pool_lock.lock(); + + delete worker[i]; + worker[i] = NULL; + freeThreads.push(i); + + updateActiveThreadCount(1); + + pool_lock.unlock(); +} + +void ThreadPool::cleandUp(Thread *t, int wid) +{ + LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID()) + t->finalize(); + delete t; + releaseThread(wid); + onWorkerAvailable(); +} + +void ThreadPool::scheduleFromQueue() +{ + criticalregion.lock(); + while(!requestQueue.empty()) { + int w = getFreeThread(); + if(w == -1) break; + + LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w) + Thread *t = requestQueue.front(); requestQueue.pop(); + t->setThreadID(w); + worker[w] = new boost::thread(Worker, t, w); + updateActiveThreadCount(-1); + } + criticalregion.unlock(); +} + + +void ThreadPool::runAsThread(Thread *t) +{ + int w; + if((w = getFreeThread()) != -1) { + LOG4CXX_INFO(logger, "Creating thread #" << w) + t->setThreadID(w); + worker[w] = new boost::thread(Worker, t, w); + updateActiveThreadCount(-1); + } + else { + LOG4CXX_INFO(logger, "No workers available! adding to queue.") + requestQueue.push(t); + } +} diff --git a/backends/twitter/ThreadPool.h b/backends/twitter/ThreadPool.h new file mode 100644 index 00000000..bd172859 --- /dev/null +++ b/backends/twitter/ThreadPool.h @@ -0,0 +1,72 @@ +#ifndef THREAD_POOL +#define THREAD_POOL + +#include +#include +#include +#include +#include +#include "transport/logging.h" + + +/* + * Thread serves as a base class for any code that has to be excuted as a thread + * by the ThreadPool class. The run method defines the code that has to be run + * as a theard. For example, code in run could be sendinga request to a server + * waiting for the response and storing the response. When the thread finishes + * execution, the ThreadPool invokes finalize where one could have the code necessary + * to collect all the responses and release any resources. + * + * NOTE: The object of the Thread class must be valid (in scope) throughout the + * execution of the thread. + */ + +class Thread +{ + int threadID; + + public: + + Thread() {} + virtual ~Thread() {} + virtual void run() = 0; + virtual void finalize() {} + int getThreadID() {return threadID;} + void setThreadID(int tid) {threadID = tid;} +}; + +/* + * ThreadPool provides the interface to manage a pool of threads. It schedules jobs + * on free threads and when the thread completes it automatically deletes the object + * corresponding to a Thread. If free threads are not available, the requests are + * added to a queue and scheduled later when threads become available. + */ + +class ThreadPool +{ + const int MAX_THREADS; + int activeThreads; + std::queue freeThreads; + + std::queue requestQueue; + boost::thread **worker; + + boost::mutex count_lock; + boost::mutex pool_lock; + boost::mutex criticalregion; + + boost::signals2::signal < void () > onWorkerAvailable; + + public: + ThreadPool(int maxthreads); + ~ThreadPool(); + void runAsThread(Thread *t); + int getActiveThreadCount(); + void updateActiveThreadCount(int k); + void cleandUp(Thread *, int); + void scheduleFromQueue(); + int getFreeThread(); + void releaseThread(int i); +}; + +#endif diff --git a/backends/twitter/main.cpp b/backends/twitter/main.cpp index 3d0b2a09..8ee6b805 100644 --- a/backends/twitter/main.cpp +++ b/backends/twitter/main.cpp @@ -5,6 +5,7 @@ #include "transport/mysqlbackend.h" #include "transport/pqxxbackend.h" #include "transport/storagebackend.h" + #include "Swiften/Swiften.h" #include "unistd.h" #include "signal.h" @@ -26,7 +27,10 @@ #include #include #include -#include "userdb.h" + +#include "ThreadPool.h" +#include "Requests/StatusUpdateRequest.h" +#include "Requests/DirectMessageRequest.h" using namespace boost::filesystem; using namespace boost::program_options; @@ -66,6 +70,8 @@ class TwitterPlugin : public NetworkPlugin { m_conn->onDataRead.connect(boost::bind(&TwitterPlugin::_handleDataRead, this, _1)); m_conn->connect(Swift::HostAddressPort(Swift::HostAddress(host), port)); onDispatchRequest.connect(boost::bind(&TwitterPlugin::requestDispatcher, this, _1, _2)); + + tp = new ThreadPool(10); activeThreadCount = 0; MAX_THREADS = 50; @@ -77,6 +83,7 @@ class TwitterPlugin : public NetworkPlugin { delete storagebackend; std::map::iterator it; for(it = sessions.begin() ; it != sessions.end() ; it++) delete it->second; + delete tp; } // Send data to NetworkPlugin server @@ -379,27 +386,34 @@ class TwitterPlugin : public NetworkPlugin { void handleMessageSendRequest(const std::string &user, const std::string &legacyName, const std::string &message, const std::string &xhtml = "") { - Request r; + /*Request r; r.from = user; r.to = legacyName; r.message = message; LOG4CXX_INFO(logger, user << "Dispatching request " << STR(r)) - onDispatchRequest(r,true); + onDispatchRequest(r,true);*/ //requestDispatcher(r, true); - /*if(legacyName == "twitter-account") { + if(legacyName == "twitter-account") { std::string cmd = message.substr(0, message.find(':')); std::string data = message.substr(message.find(':') + 1); handleMessage(user, "twitter-account", cmd + " " + data); if(cmd == "#pin") handlePINExchange(user, data); - else if(cmd == "#help") printHelpMessage(user); - else if(cmd[0] == '@') {std::string username = cmd.substr(1); handleDirectMessage(user, username, data);} - else if(cmd == "#status") handleStatusUpdate(user, data); - else if(cmd == "#timeline") fetchTimeline(user); - else if(cmd == "#friends") fetchFriends(user); - }*/ + //else if(cmd == "#help") printHelpMessage(user); + else if(cmd[0] == '@') { + std::string username = cmd.substr(1); + tp->runAsThread(new DirectMessageRequest(np, sessions[user], user, username, data)); + //handleDirectMessage(user, username, data); + } + else if(cmd == "#status") { + tp->runAsThread(new StatusUpdateRequest(np, sessions[user], user, data)); + //handleStatusUpdate(user, data); + } + //else if(cmd == "#timeline") fetchTimeline(user); + //else if(cmd == "#friends") fetchFriends(user); + } } void handleBuddyUpdatedRequest(const std::string &user, const std::string &buddyName, const std::string &alias, const std::vector &groups) { @@ -434,6 +448,7 @@ class TwitterPlugin : public NetworkPlugin { }; Config *config; + std::string consumerKey; std::string consumerSecret; std::string OAUTH_KEY; @@ -445,6 +460,7 @@ class TwitterPlugin : public NetworkPlugin { boost::mutex criticalRegion; boost::mutex threadLock; + ThreadPool *tp; std::map sessions; std::map > requests; @@ -453,7 +469,6 @@ class TwitterPlugin : public NetworkPlugin { std::map connectionState; - boost::signal< void (Request, bool) > onDispatchRequest; };