threadpool.h

Go to the documentation of this file.
00001 /*
00002  * threadpool.h
00003  *
00004  * Generalised Thread Pooling functions
00005  *
00006  * Portable Tools Library
00007  *
00008  * Copyright (C) 2009 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Revision: 22992 $
00030  * $Author: rjongbloed $
00031  * $Date: 2009-06-26 00:56:24 -0500 (Fri, 26 Jun 2009) $
00032  */
00033 
00034 
00035 #ifndef PTLIB_THREADPOOL_H
00036 #define PTLIB_THREADPOOL_H
00037 
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041 
00042 #include <map>
00043 
00044 
00129 class PThreadPoolBase : public PObject
00130 {
00131   public:
00132     class WorkerThreadBase : public PThread
00133     {
00134       public:
00135         WorkerThreadBase()
00136           : PThread(100, NoAutoDeleteThread, NormalPriority, "Pool")
00137           , m_shutdown(false)
00138         { }
00139 
00140         virtual void Shutdown() = 0;
00141         virtual unsigned GetWorkSize() const = 0;
00142 
00143         bool m_shutdown;
00144         PMutex m_workerMutex;
00145     };
00146 
00147     class InternalWorkBase
00148     {
00149       public:
00150         InternalWorkBase(const char * group)
00151         { 
00152           if (group != NULL)
00153             m_group = group;
00154         }
00155         std::string m_group;
00156     };
00157 
00158     PThreadPoolBase(unsigned maxWorkerCount = 10, unsigned maxWorkUnitCount = 0);
00159     ~PThreadPoolBase();
00160 
00161     virtual WorkerThreadBase * CreateWorkerThread() = 0;
00162     virtual WorkerThreadBase * AllocateWorker();
00163     virtual WorkerThreadBase * NewWorker();
00164 
00165   protected:
00166     virtual bool CheckWorker(WorkerThreadBase * worker);
00167     void StopWorker(WorkerThreadBase * worker);
00168     PMutex m_listMutex;
00169 
00170     typedef std::vector<WorkerThreadBase *> WorkerList_t;
00171     WorkerList_t m_workers;
00172 
00173     unsigned m_maxWorkerCount;
00174     unsigned m_maxWorkUnitCount;
00175 };
00176 
00177 
00178 template <class Work_T>
00179 class PThreadPool : public PThreadPoolBase
00180 {
00181   PCLASSINFO(PThreadPool, PThreadPoolBase);
00182   public:
00183     //
00184     // define the ancestor of the worker thread
00185     //
00186     class WorkerThread : public WorkerThreadBase
00187     {
00188       public:
00189         WorkerThread(PThreadPool & pool_)
00190           : m_pool(pool_)
00191         {
00192         }
00193 
00194         virtual void AddWork(Work_T * work) = 0;
00195         virtual void RemoveWork(Work_T * work) = 0;
00196         virtual void Main() = 0;
00197   
00198       protected:
00199         PThreadPool & m_pool;
00200     };
00201 
00202     //
00203     // define internal worker wrapper class
00204     //
00205     class InternalWork : public InternalWorkBase
00206     {
00207       public:
00208         InternalWork(WorkerThread * worker, Work_T * work, const char * group)
00209           : InternalWorkBase(group)
00210           , m_worker(worker)
00211           , m_work(work)
00212         { 
00213         }
00214 
00215         WorkerThread * m_worker;
00216         Work_T * m_work;
00217     };
00218 
00219     //
00220     // define map for external work units to internal work
00221     //
00222     typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
00223     ExternalToInternalWorkMap_T m_externalToInternalWorkMap;
00224 
00225 
00226     //
00227     // define class for storing group informationm
00228     //
00229     struct GroupInfo {
00230       unsigned m_count;
00231       WorkerThread * m_worker;
00232     };
00233 
00234 
00235     //
00236     //  define map for group ID to group information
00237     //
00238     typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
00239     GroupInfoMap_t m_groupInfoMap;
00240 
00241 
00242     //
00243     //  constructor
00244     //
00245     PThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0)
00246       : PThreadPoolBase(maxWorkers, maxWorkUnits) 
00247     { }
00248 
00249 
00250     //
00251     //  add a new unit of work to a worker thread
00252     //
00253     bool AddWork(Work_T * work, const char * group = NULL)
00254     {
00255       PWaitAndSignal m(m_listMutex);
00256 
00257       // allocate by group if specified
00258       // else allocate to least busy
00259       WorkerThread * worker;
00260       if ((group == NULL) || (strlen(group) == 0)) {
00261         worker = (WorkerThread *)AllocateWorker();
00262       }
00263       else {
00264 
00265         // find the worker thread with the matching group ID
00266         // if no matching Id, then create a new thread
00267         typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
00268         if (g == m_groupInfoMap.end()) 
00269           worker = (WorkerThread *)AllocateWorker();
00270         else {
00271           worker = g->second.m_worker;
00272           PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
00273         }
00274       }
00275 
00276       // if cannot allocate worker, return
00277       if (worker == NULL) 
00278         return false;
00279 
00280       // create internal work structure
00281       InternalWork internalWork(worker, work, group);
00282 
00283       // add work to external to internal map
00284       m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
00285 
00286       // add group ID to map
00287       if (!internalWork.m_group.empty()) {
00288         typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
00289         if (r != m_groupInfoMap.end())
00290           ++r->second.m_count;
00291         else {
00292           GroupInfo info;
00293           info.m_count  = 1;
00294           info.m_worker = worker;
00295           m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
00296         }
00297       }
00298       
00299       // give the work to the worker
00300       worker->AddWork(work);
00301     
00302       return true;
00303     }
00304 
00305     //
00306     //  remove a unit of work from a worker thread
00307     //
00308     bool RemoveWork(Work_T * work, bool removeFromWorker = true)
00309     {
00310       PWaitAndSignal m(m_listMutex);
00311 
00312       // find worker with work unit to remove
00313       typename ExternalToInternalWorkMap_T::iterator r = m_externalToInternalWorkMap.find(work);
00314       if (r == m_externalToInternalWorkMap.end())
00315         return false;
00316 
00317       InternalWork & internalWork = r->second;
00318 
00319       // tell worker to stop processing work
00320       if (removeFromWorker)
00321         internalWork.m_worker->RemoveWork(work);
00322 
00323       // update group information
00324       if (!internalWork.m_group.empty()) {
00325         typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
00326         PAssert(r != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
00327         if (r != m_groupInfoMap.end()) {
00328           if (--r->second.m_count == 0)
00329             m_groupInfoMap.erase(r);
00330         }
00331       }
00332 
00333       // see if workers need reorganising
00334       CheckWorker(internalWork.m_worker);
00335 
00336       // remove element from work unit map
00337       m_externalToInternalWorkMap.erase(r);
00338 
00339       return true;
00340     }
00341 };
00342 
00343 
00344 #endif // PTLIB_THREADPOOL_H
00345 
00346 
00347 // End Of File ///////////////////////////////////////////////////////////////

Generated on Thu May 27 01:36:48 2010 for PTLib by  doxygen 1.4.7