threadpool.h

Go to the documentation of this file.
00001 /*
00002  * threadpool.h
00003  *
00004  * Generalised Thread Pooling functions
00005  *
00006  * Portable Tools Library
00007  *
00008  * Copyright (C) 2009 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Revision: 24047 $
00030  * $Author: rjongbloed $
00031  * $Date: 2010-02-10 17:15:13 -0600 (Wed, 10 Feb 2010) $
00032  */
00033 
00034 
00035 #ifndef PTLIB_THREADPOOL_H
00036 #define PTLIB_THREADPOOL_H
00037 
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041 
00042 #include <map>
00043 #include <queue>
00044 
00045 
00130 class PThreadPoolBase : public PObject
00131 {
00132   public:
00133     class WorkerThreadBase : public PThread
00134     {
00135       public:
00136         WorkerThreadBase()
00137           : PThread(100, NoAutoDeleteThread, NormalPriority, "Pool")
00138           , m_shutdown(false)
00139         { }
00140 
00141         virtual void Shutdown() = 0;
00142         virtual unsigned GetWorkSize() const = 0;
00143 
00144         bool   m_shutdown;
00145         PMutex m_workerMutex;
00146     };
00147 
00148     class InternalWorkBase
00149     {
00150       public:
00151         InternalWorkBase(const char * group)
00152         { 
00153           if (group != NULL)
00154             m_group = group;
00155         }
00156         std::string m_group;
00157     };
00158 
00159     PThreadPoolBase(unsigned maxWorkerCount = 10, unsigned maxWorkUnitCount = 0);
00160     ~PThreadPoolBase();
00161 
00162     virtual WorkerThreadBase * CreateWorkerThread() = 0;
00163     virtual WorkerThreadBase * AllocateWorker();
00164     virtual WorkerThreadBase * NewWorker();
00165 
00166   protected:
00167     virtual bool CheckWorker(WorkerThreadBase * worker);
00168     void StopWorker(WorkerThreadBase * worker);
00169     PMutex m_listMutex;
00170 
00171     typedef std::vector<WorkerThreadBase *> WorkerList_t;
00172     WorkerList_t m_workers;
00173 
00174     unsigned m_maxWorkerCount;
00175     unsigned m_maxWorkUnitCount;
00176 };
00177 
00178 
00179 template <class Work_T>
00180 class PThreadPool : public PThreadPoolBase
00181 {
00182   PCLASSINFO(PThreadPool, PThreadPoolBase);
00183   public:
00184     //
00185     // define the ancestor of the worker thread
00186     //
00187     class WorkerThread : public WorkerThreadBase
00188     {
00189       public:
00190         WorkerThread(PThreadPool & pool_)
00191           : m_pool(pool_)
00192         {
00193         }
00194 
00195         virtual void AddWork(Work_T * work) = 0;
00196         virtual void RemoveWork(Work_T * work) = 0;
00197         virtual void Main() = 0;
00198   
00199       protected:
00200         PThreadPool & m_pool;
00201     };
00202 
00203     class QueuedWorkerThread : public WorkerThread
00204     {
00205       public:
00206         QueuedWorkerThread(PThreadPool & pool)
00207           : WorkerThread(pool)
00208           , m_available(0, INT_MAX)
00209         {
00210         }
00211 
00212         void AddWork(Work_T * work)
00213         {
00214           m_mutex.Wait();
00215           m_queue.push(work);
00216           m_available.Signal();
00217           m_mutex.Signal();
00218         }
00219 
00220         void RemoveWork(Work_T * )
00221         {
00222           m_mutex.Wait();
00223           delete m_queue.front();
00224           m_queue.pop();
00225           m_mutex.Signal();
00226         }
00227 
00228         unsigned GetWorkSize() const
00229         {
00230           return m_queue.size();
00231         }
00232 
00233         void Main()
00234         {
00235           for (;;) {
00236             m_available.Wait();
00237             if (WorkerThread::m_shutdown)
00238               break;
00239 
00240             m_mutex.Wait();
00241 
00242             if (!m_queue.empty()) {
00243               Work_T * work = m_queue.front();
00244               if (work != NULL) {
00245                 work->Work();
00246                 WorkerThread::m_pool.RemoveWork(work);
00247               }
00248             }
00249 
00250             m_mutex.Signal();
00251           }
00252         }
00253 
00254         void Shutdown()
00255         {
00256           WorkerThread::m_shutdown = true;
00257           m_available.Signal();
00258         }
00259 
00260       protected:
00261         typedef std::queue<Work_T *> Queue;
00262         Queue      m_queue;
00263         PMutex     m_mutex;
00264         PSemaphore m_available;
00265     };
00266 
00267     //
00268     // define internal worker wrapper class
00269     //
00270     class InternalWork : public InternalWorkBase
00271     {
00272       public:
00273         InternalWork(WorkerThread * worker, Work_T * work, const char * group)
00274           : InternalWorkBase(group)
00275           , m_worker(worker)
00276           , m_work(work)
00277         { 
00278         }
00279 
00280         WorkerThread * m_worker;
00281         Work_T * m_work;
00282     };
00283 
00284     //
00285     // define map for external work units to internal work
00286     //
00287     typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
00288     ExternalToInternalWorkMap_T m_externalToInternalWorkMap;
00289 
00290 
00291     //
00292     // define class for storing group informationm
00293     //
00294     struct GroupInfo {
00295       unsigned m_count;
00296       WorkerThread * m_worker;
00297     };
00298 
00299 
00300     //
00301     //  define map for group ID to group information
00302     //
00303     typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
00304     GroupInfoMap_t m_groupInfoMap;
00305 
00306 
00307     //
00308     //  constructor
00309     //
00310     PThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0)
00311       : PThreadPoolBase(maxWorkers, maxWorkUnits) 
00312     { }
00313 
00314 
00315     //
00316     //  add a new unit of work to a worker thread
00317     //
00318     bool AddWork(Work_T * work, const char * group = NULL)
00319     {
00320       PWaitAndSignal m(m_listMutex);
00321 
00322       // allocate by group if specified
00323       // else allocate to least busy
00324       WorkerThread * worker;
00325       if ((group == NULL) || (strlen(group) == 0)) {
00326         worker = (WorkerThread *)AllocateWorker();
00327       }
00328       else {
00329 
00330         // find the worker thread with the matching group ID
00331         // if no matching Id, then create a new thread
00332         typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
00333         if (g == m_groupInfoMap.end()) 
00334           worker = (WorkerThread *)AllocateWorker();
00335         else {
00336           worker = g->second.m_worker;
00337           PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
00338         }
00339       }
00340 
00341       // if cannot allocate worker, return
00342       if (worker == NULL) 
00343         return false;
00344 
00345       // create internal work structure
00346       InternalWork internalWork(worker, work, group);
00347 
00348       // add work to external to internal map
00349       m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
00350 
00351       // add group ID to map
00352       if (!internalWork.m_group.empty()) {
00353         typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
00354         if (r != m_groupInfoMap.end())
00355           ++r->second.m_count;
00356         else {
00357           GroupInfo info;
00358           info.m_count  = 1;
00359           info.m_worker = worker;
00360           m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
00361         }
00362       }
00363       
00364       // give the work to the worker
00365       worker->AddWork(work);
00366     
00367       return true;
00368     }
00369 
00370     //
00371     //  remove a unit of work from a worker thread
00372     //
00373     bool RemoveWork(Work_T * work, bool removeFromWorker = true)
00374     {
00375       PWaitAndSignal m(m_listMutex);
00376 
00377       // find worker with work unit to remove
00378       typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
00379       if (iterWork == m_externalToInternalWorkMap.end())
00380         return false;
00381 
00382       InternalWork & internalWork = iterWork->second;
00383 
00384       // tell worker to stop processing work
00385       if (removeFromWorker)
00386         internalWork.m_worker->RemoveWork(work);
00387 
00388       // update group information
00389       if (!internalWork.m_group.empty()) {
00390         typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
00391         PAssert(iterGroup != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
00392         if (iterGroup != m_groupInfoMap.end()) {
00393           if (--iterGroup->second.m_count == 0)
00394             m_groupInfoMap.erase(iterGroup);
00395         }
00396       }
00397 
00398       // see if workers need reorganising
00399       CheckWorker(internalWork.m_worker);
00400 
00401       // remove element from work unit map
00402       m_externalToInternalWorkMap.erase(iterWork);
00403 
00404       return true;
00405     }
00406 };
00407 
00408 
00409 #endif // PTLIB_THREADPOOL_H
00410 
00411 
00412 // End Of File ///////////////////////////////////////////////////////////////

Generated on Fri Oct 14 01:44:10 2011 for PTLib by  doxygen 1.4.7