00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef PTLIB_THREADPOOL_H
00036 #define PTLIB_THREADPOOL_H
00037
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041
00042 #include <map>
00043 #include <queue>
00044
00045
00130 class PThreadPoolBase : public PObject
00131 {
00132 public:
00133 class WorkerThreadBase : public PThread
00134 {
00135 public:
00136 WorkerThreadBase()
00137 : PThread(100, NoAutoDeleteThread, NormalPriority, "Pool")
00138 , m_shutdown(false)
00139 { }
00140
00141 virtual void Shutdown() = 0;
00142 virtual unsigned GetWorkSize() const = 0;
00143
00144 bool m_shutdown;
00145 PMutex m_workerMutex;
00146 };
00147
00148 class InternalWorkBase
00149 {
00150 public:
00151 InternalWorkBase(const char * group)
00152 {
00153 if (group != NULL)
00154 m_group = group;
00155 }
00156 std::string m_group;
00157 };
00158
00159 PThreadPoolBase(unsigned maxWorkerCount = 10, unsigned maxWorkUnitCount = 0);
00160 ~PThreadPoolBase();
00161
00162 virtual WorkerThreadBase * CreateWorkerThread() = 0;
00163 virtual WorkerThreadBase * AllocateWorker();
00164 virtual WorkerThreadBase * NewWorker();
00165
00166 protected:
00167 virtual bool CheckWorker(WorkerThreadBase * worker);
00168 void StopWorker(WorkerThreadBase * worker);
00169 PMutex m_listMutex;
00170
00171 typedef std::vector<WorkerThreadBase *> WorkerList_t;
00172 WorkerList_t m_workers;
00173
00174 unsigned m_maxWorkerCount;
00175 unsigned m_maxWorkUnitCount;
00176 };
00177
00178
00179 template <class Work_T>
00180 class PThreadPool : public PThreadPoolBase
00181 {
00182 PCLASSINFO(PThreadPool, PThreadPoolBase);
00183 public:
00184
00185
00186
00187 class WorkerThread : public WorkerThreadBase
00188 {
00189 public:
00190 WorkerThread(PThreadPool & pool_)
00191 : m_pool(pool_)
00192 {
00193 }
00194
00195 virtual void AddWork(Work_T * work) = 0;
00196 virtual void RemoveWork(Work_T * work) = 0;
00197 virtual void Main() = 0;
00198
00199 protected:
00200 PThreadPool & m_pool;
00201 };
00202
00203 class QueuedWorkerThread : public WorkerThread
00204 {
00205 public:
00206 QueuedWorkerThread(PThreadPool & pool)
00207 : WorkerThread(pool)
00208 , m_available(0, INT_MAX)
00209 {
00210 }
00211
00212 void AddWork(Work_T * work)
00213 {
00214 m_mutex.Wait();
00215 m_queue.push(work);
00216 m_available.Signal();
00217 m_mutex.Signal();
00218 }
00219
00220 void RemoveWork(Work_T * )
00221 {
00222 m_mutex.Wait();
00223 delete m_queue.front();
00224 m_queue.pop();
00225 m_mutex.Signal();
00226 }
00227
00228 unsigned GetWorkSize() const
00229 {
00230 return m_queue.size();
00231 }
00232
00233 void Main()
00234 {
00235 for (;;) {
00236 m_available.Wait();
00237 if (WorkerThread::m_shutdown)
00238 break;
00239
00240 m_mutex.Wait();
00241
00242 if (!m_queue.empty()) {
00243 Work_T * work = m_queue.front();
00244 if (work != NULL) {
00245 work->Work();
00246 WorkerThread::m_pool.RemoveWork(work);
00247 }
00248 }
00249
00250 m_mutex.Signal();
00251 }
00252 }
00253
00254 void Shutdown()
00255 {
00256 WorkerThread::m_shutdown = true;
00257 m_available.Signal();
00258 }
00259
00260 protected:
00261 typedef std::queue<Work_T *> Queue;
00262 Queue m_queue;
00263 PMutex m_mutex;
00264 PSemaphore m_available;
00265 };
00266
00267
00268
00269
00270 class InternalWork : public InternalWorkBase
00271 {
00272 public:
00273 InternalWork(WorkerThread * worker, Work_T * work, const char * group)
00274 : InternalWorkBase(group)
00275 , m_worker(worker)
00276 , m_work(work)
00277 {
00278 }
00279
00280 WorkerThread * m_worker;
00281 Work_T * m_work;
00282 };
00283
00284
00285
00286
00287 typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
00288 ExternalToInternalWorkMap_T m_externalToInternalWorkMap;
00289
00290
00291
00292
00293
00294 struct GroupInfo {
00295 unsigned m_count;
00296 WorkerThread * m_worker;
00297 };
00298
00299
00300
00301
00302
00303 typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
00304 GroupInfoMap_t m_groupInfoMap;
00305
00306
00307
00308
00309
00310 PThreadPool(unsigned maxWorkers = 10, unsigned maxWorkUnits = 0)
00311 : PThreadPoolBase(maxWorkers, maxWorkUnits)
00312 { }
00313
00314
00315
00316
00317
00318 bool AddWork(Work_T * work, const char * group = NULL)
00319 {
00320 PWaitAndSignal m(m_listMutex);
00321
00322
00323
00324 WorkerThread * worker;
00325 if ((group == NULL) || (strlen(group) == 0)) {
00326 worker = (WorkerThread *)AllocateWorker();
00327 }
00328 else {
00329
00330
00331
00332 typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
00333 if (g == m_groupInfoMap.end())
00334 worker = (WorkerThread *)AllocateWorker();
00335 else {
00336 worker = g->second.m_worker;
00337 PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
00338 }
00339 }
00340
00341
00342 if (worker == NULL)
00343 return false;
00344
00345
00346 InternalWork internalWork(worker, work, group);
00347
00348
00349 m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
00350
00351
00352 if (!internalWork.m_group.empty()) {
00353 typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
00354 if (r != m_groupInfoMap.end())
00355 ++r->second.m_count;
00356 else {
00357 GroupInfo info;
00358 info.m_count = 1;
00359 info.m_worker = worker;
00360 m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
00361 }
00362 }
00363
00364
00365 worker->AddWork(work);
00366
00367 return true;
00368 }
00369
00370
00371
00372
00373 bool RemoveWork(Work_T * work, bool removeFromWorker = true)
00374 {
00375 PWaitAndSignal m(m_listMutex);
00376
00377
00378 typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
00379 if (iterWork == m_externalToInternalWorkMap.end())
00380 return false;
00381
00382 InternalWork & internalWork = iterWork->second;
00383
00384
00385 if (removeFromWorker)
00386 internalWork.m_worker->RemoveWork(work);
00387
00388
00389 if (!internalWork.m_group.empty()) {
00390 typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
00391 PAssert(iterGroup != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
00392 if (iterGroup != m_groupInfoMap.end()) {
00393 if (--iterGroup->second.m_count == 0)
00394 m_groupInfoMap.erase(iterGroup);
00395 }
00396 }
00397
00398
00399 CheckWorker(internalWork.m_worker);
00400
00401
00402 m_externalToInternalWorkMap.erase(iterWork);
00403
00404 return true;
00405 }
00406 };
00407
00408
00409 #endif // PTLIB_THREADPOOL_H
00410
00411
00412