00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef PTLIB_THREADPOOL_H
00036 #define PTLIB_THREADPOOL_H
00037
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041
00042 #include <map>
00043
00044
00129 class PThreadPoolBase : public PObject
00130 {
00131 public:
00132 class WorkerThreadBase : public PThread
00133 {
00134 public:
00135 WorkerThreadBase()
00136 : PThread(100, NoAutoDeleteThread, NormalPriority, "Pool")
00137 , m_shutdown(false)
00138 { }
00139
00140 virtual void Shutdown() = 0;
00141 virtual unsigned GetWorkSize() const = 0;
00142
00143 bool m_shutdown;
00144 PMutex m_workerMutex;
00145 };
00146
00147 class InternalWorkBase
00148 {
00149 public:
00150 InternalWorkBase(const char * group)
00151 {
00152 if (group != NULL)
00153 m_group = group;
00154 }
00155 std::string m_group;
00156 };
00157
00158 PThreadPoolBase(unsigned maxWorkerCount = 10, unsigned maxWorkUnitCount = 0);
00159 ~PThreadPoolBase();
00160
00161 virtual WorkerThreadBase * CreateWorkerThread() = 0;
00162 virtual WorkerThreadBase * AllocateWorker();
00163 virtual WorkerThreadBase * NewWorker();
00164
00165 protected:
00166 virtual bool CheckWorker(WorkerThreadBase * worker);
00167 void StopWorker(WorkerThreadBase * worker);
00168 PMutex m_listMutex;
00169
00170 typedef std::vector<WorkerThreadBase *> WorkerList_t;
00171 WorkerList_t m_workers;
00172
00173 unsigned m_maxWorkerCount;
00174 unsigned m_maxWorkUnitCount;
00175 };
00176
00177
00178 template <class Work_T>
00179 class PThreadPool : public PThreadPoolBase
00180 {
00181 PCLASSINFO(PThreadPool, PThreadPoolBase);
00182 public:
00183
00184
00185
00186 class WorkerThread : public WorkerThreadBase
00187 {
00188 public:
00189 WorkerThread(PThreadPool & pool_)
00190 : m_pool(pool_)
00191 {
00192 }
00193
00194 virtual void AddWork(Work_T * work) = 0;
00195 virtual void RemoveWork(Work_T * work) = 0;
00196 virtual void Main() = 0;
00197
00198 protected:
00199 PThreadPool & m_pool;
00200 };
00201
00202
00203
00204
00205 class InternalWork : public InternalWorkBase
00206 {
00207 public:
00208 InternalWork(WorkerThread * worker, Work_T * work, const char * group)
00209 : InternalWorkBase(group)
00210 , m_worker(worker)
00211 , m_work(work)
00212 {
00213 }
00214
00215 WorkerThread * m_worker;
00216 Work_T * m_work;
00217 };
00218
00219
00220
00221
00222 typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
00223 ExternalToInternalWorkMap_T m_externalToInternalWorkMap;
00224
00225
00226
00227
00228
00229 struct GroupInfo {
00230 unsigned m_count;
00231 WorkerThread * m_worker;
00232 };
00233
00234
00235
00236
00237
00238 typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
00239 GroupInfoMap_t m_groupInfoMap;
00240
00241
00242
00243
00244
00245 PThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0)
00246 : PThreadPoolBase(maxWorkers, maxWorkUnits)
00247 { }
00248
00249
00250
00251
00252
00253 bool AddWork(Work_T * work, const char * group = NULL)
00254 {
00255 PWaitAndSignal m(m_listMutex);
00256
00257
00258
00259 WorkerThread * worker;
00260 if ((group == NULL) || (strlen(group) == 0)) {
00261 worker = (WorkerThread *)AllocateWorker();
00262 }
00263 else {
00264
00265
00266
00267 typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
00268 if (g == m_groupInfoMap.end())
00269 worker = (WorkerThread *)AllocateWorker();
00270 else {
00271 worker = g->second.m_worker;
00272 PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
00273 }
00274 }
00275
00276
00277 if (worker == NULL)
00278 return false;
00279
00280
00281 InternalWork internalWork(worker, work, group);
00282
00283
00284 m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
00285
00286
00287 if (!internalWork.m_group.empty()) {
00288 typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
00289 if (r != m_groupInfoMap.end())
00290 ++r->second.m_count;
00291 else {
00292 GroupInfo info;
00293 info.m_count = 1;
00294 info.m_worker = worker;
00295 m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
00296 }
00297 }
00298
00299
00300 worker->AddWork(work);
00301
00302 return true;
00303 }
00304
00305
00306
00307
00308 bool RemoveWork(Work_T * work, bool removeFromWorker = true)
00309 {
00310 PWaitAndSignal m(m_listMutex);
00311
00312
00313 typename ExternalToInternalWorkMap_T::iterator r = m_externalToInternalWorkMap.find(work);
00314 if (r == m_externalToInternalWorkMap.end())
00315 return false;
00316
00317 InternalWork & internalWork = r->second;
00318
00319
00320 if (removeFromWorker)
00321 internalWork.m_worker->RemoveWork(work);
00322
00323
00324 if (!internalWork.m_group.empty()) {
00325 typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
00326 PAssert(r != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
00327 if (r != m_groupInfoMap.end()) {
00328 if (--r->second.m_count == 0)
00329 m_groupInfoMap.erase(r);
00330 }
00331 }
00332
00333
00334 CheckWorker(internalWork.m_worker);
00335
00336
00337 m_externalToInternalWorkMap.erase(r);
00338
00339 return true;
00340 }
00341 };
00342
00343
00344 #endif // PTLIB_THREADPOOL_H
00345
00346
00347