#include "transport/ThreadPool.h" #include "transport/Logging.h" #include "Swiften/SwiftenCompat.h" namespace Transport { DEFINE_LOGGER(logger, "ThreadPool") ThreadPool::ThreadPool(Swift::EventLoop *loop, int maxthreads) : MAX_THREADS(maxthreads) { this->loop = loop; activeThreads = 0; worker = (boost::thread **) malloc(sizeof(boost::thread *) * MAX_THREADS); for(int i=0 ; i MAX_THREADS) return; pool_lock.lock(); delete worker[i]; worker[i] = NULL; freeThreads.push(i); updateActiveThreadCount(1); pool_lock.unlock(); } void ThreadPool::cleandUp(Thread *t, int wid) { LOG4CXX_INFO(logger, "Cleaning up thread #" << t->getThreadID()); t->finalize(); delete t; releaseThread(wid); onWorkerAvailable(); } void ThreadPool::scheduleFromQueue() { criticalregion.lock(); while(!requestQueue.empty()) { int w = getFreeThread(); if(w == -1) break; LOG4CXX_INFO(logger, "Worker Available. Creating thread #" << w); Thread *t = requestQueue.front(); requestQueue.pop(); t->setThreadID(w); worker[w] = new boost::thread(boost::bind(&ThreadPool::workerBody, this, _1, _2), t, w, loop); updateActiveThreadCount(-1); } criticalregion.unlock(); } void ThreadPool::runAsThread(Thread *t) { int w; if((w = getFreeThread()) != -1) { LOG4CXX_INFO(logger, "Creating thread #" << w); t->setThreadID(w); worker[w] = new boost::thread(boost::bind(&ThreadPool::workerBody, this, _1, _2), t, w, loop); updateActiveThreadCount(-1); } else { LOG4CXX_INFO(logger, "No workers available! adding to queue."); requestQueue.push(t); } } void ThreadPool::workerBody(Thread *t, int wid) { LOG4CXX_INFO(logger, "Starting thread " << wid); t->run(); loop->postEvent(boost::bind(&ThreadPool::cleandUp, this, t, wid), SWIFTEN_SHRPTR_NAMESPACE::shared_ptr()); } }