PTLib  Version 2.12.9
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
threadpool.h
Go to the documentation of this file.
1 /*
2  * threadpool.h
3  *
4  * Generalised Thread Pooling functions
5  *
6  * Portable Tools Library
7  *
8  * Copyright (C) 2009 Post Increment
9  *
10  * The contents of this file are subject to the Mozilla Public License
11  * Version 1.0 (the "License"); you may not use this file except in
12  * compliance with the License. You may obtain a copy of the License at
13  * http://www.mozilla.org/MPL/
14  *
15  * Software distributed under the License is distributed on an "AS IS"
16  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17  * the License for the specific language governing rights and limitations
18  * under the License.
19  *
20  * The Original Code is Portable Windows Library.
21  *
22  * The Initial Developer of the Original Code is Post Increment
23  *
24  * Portions of this code were written with the financial assistance of
25  * Metreos Corporation (http://www.metros.com).
26  *
27  * Contributor(s): ______________________________________.
28  *
29  * $Revision: 29866 $
30  * $Author: rjongbloed $
31  * $Date: 2013-06-03 12:53:30 +1000 (Mon, 03 Jun 2013) $
32  */
33 
34 
35 #ifndef PTLIB_THREADPOOL_H
36 #define PTLIB_THREADPOOL_H
37 
38 #ifdef P_USE_PRAGMA
39 #pragma interface
40 #endif
41 
42 
43 #include <ptlib/thread.h>
44 #include <ptlib/safecoll.h>
45 #include <map>
46 #include <queue>
47 
48 
153 class PThreadPoolBase : public PObject
154 {
155  public:
156  class WorkerThreadBase : public PThread
157  {
158  protected:
159  WorkerThreadBase(Priority priority, const char * threadName)
160  : PThread(100, NoAutoDeleteThread, priority, threadName)
161  , m_shutdown(false)
162  { }
163 
164  public:
165  virtual void Shutdown() = 0;
166  virtual unsigned GetWorkSize() const = 0;
167 
170  };
171 
173  {
174  public:
175  InternalWorkBase(const char * group)
176  {
177  if (group != NULL)
178  m_group = group;
179  }
180  std::string m_group;
181  };
182 
184 
185  virtual WorkerThreadBase * CreateWorkerThread() = 0;
186  virtual WorkerThreadBase * AllocateWorker();
187  virtual WorkerThreadBase * NewWorker();
188 
189  unsigned GetMaxWorkers() const { return m_maxWorkerCount; }
190 
192  unsigned count
193  ) { m_maxWorkerCount = count; }
194 
195  unsigned GetMaxUnits() const { return m_maxWorkUnitCount; }
196 
198  unsigned count
199  ) { m_maxWorkUnitCount = count; }
200 
201  protected:
203  unsigned maxWorkerCount,
204  unsigned maxWorkUnitCount,
205  const char * threadName,
206  PThread::Priority priority
207  );
208 
209  virtual bool CheckWorker(WorkerThreadBase * worker);
210  void StopWorker(WorkerThreadBase * worker);
212 
213  typedef std::vector<WorkerThreadBase *> WorkerList_t;
215 
218  unsigned m_highWaterMark; // For logging
221 };
222 
223 
226 template <class Work_T>
228 {
229  PCLASSINFO(PThreadPool, PThreadPoolBase);
230  public:
231  //
232  // constructor
233  //
235  unsigned maxWorkers = 10,
236  unsigned maxWorkUnits = 0,
237  const char * threadName = NULL,
239  ) : PThreadPoolBase(maxWorkers, maxWorkUnits, threadName, priority)
240  { }
241 
242  //
243  // define the ancestor of the worker thread
244  //
245  class WorkerThread : public WorkerThreadBase
246  {
247  protected:
248  WorkerThread(PThreadPool & pool, Priority priority = NormalPriority, const char * threadName = NULL)
249  : WorkerThreadBase(priority, threadName)
250  , m_pool(pool)
251  {
252  }
253 
254  public:
255  virtual void AddWork(Work_T * work) = 0;
256  virtual void RemoveWork(Work_T * work) = 0;
257  virtual void Main() = 0;
258 
259  protected:
261  };
262 
263  //
264  // define internal worker wrapper class
265  //
266  class InternalWork : public InternalWorkBase
267  {
268  public:
269  InternalWork(WorkerThread * worker, Work_T * work, const char * group)
270  : InternalWorkBase(group)
271  , m_worker(worker)
272  , m_work(work)
273  {
274  }
275 
277  Work_T * m_work;
278  };
279 
280  //
281  // define map for external work units to internal work
282  //
283  typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
285 
286 
287  //
288  // define class for storing group informationm
289  //
290  struct GroupInfo {
291  unsigned m_count;
293  };
294 
295 
296  //
297  // define map for group ID to group information
298  //
299  typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
301 
302 
303  //
304  // add a new unit of work to a worker thread
305  //
306  bool AddWork(Work_T * work, const char * group = NULL)
307  {
309 
310  // allocate by group if specified
311  // else allocate to least busy
312  WorkerThread * worker;
313  if ((group == NULL) || (strlen(group) == 0)) {
314  worker = (WorkerThread *)AllocateWorker();
315  }
316  else {
317 
318  // find the worker thread with the matching group ID
319  // if no matching Id, then create a new thread
320  typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
321  if (g == m_groupInfoMap.end())
322  worker = (WorkerThread *)AllocateWorker();
323  else {
324  worker = g->second.m_worker;
325  PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
326  }
327  }
328 
329  // if cannot allocate worker, return
330  if (worker == NULL)
331  return false;
332 
333  // create internal work structure
334  InternalWork internalWork(worker, work, group);
335 
336  // add work to external to internal map
337  m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
338 
339  // add group ID to map
340  if (!internalWork.m_group.empty()) {
341  typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
342  if (r != m_groupInfoMap.end())
343  ++r->second.m_count;
344  else {
345  GroupInfo info;
346  info.m_count = 1;
347  info.m_worker = worker;
348  m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
349  }
350  }
351 
352  // give the work to the worker
353  worker->AddWork(work);
354 
355  return true;
356  }
357 
358  //
359  // remove a unit of work from a worker thread
360  //
361  bool RemoveWork(Work_T * work, bool removeFromWorker = true)
362  {
363  PWaitAndSignal m(m_listMutex);
364 
365  // find worker with work unit to remove
366  typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
367  if (iterWork == m_externalToInternalWorkMap.end())
368  return false;
369 
370  InternalWork & internalWork = iterWork->second;
371 
372  // tell worker to stop processing work
373  if (removeFromWorker)
374  internalWork.m_worker->RemoveWork(work);
375 
376  // update group information
377  if (!internalWork.m_group.empty()) {
378  typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
379  PAssert(iterGroup != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
380  if (iterGroup != m_groupInfoMap.end()) {
381  if (--iterGroup->second.m_count == 0)
382  m_groupInfoMap.erase(iterGroup);
383  }
384  }
385 
386  // see if workers need reorganising
387  CheckWorker(internalWork.m_worker);
388 
389  // remove element from work unit map
390  m_externalToInternalWorkMap.erase(iterWork);
391 
392  return true;
393  }
394 };
395 
396 
399 template <class Work_T>
400 class PQueuedThreadPool : public PThreadPool<Work_T>
401 {
402  public:
403  //
404  // constructor
405  //
407  unsigned maxWorkers = 10,
408  unsigned maxWorkUnits = 0,
409  const char * threadName = NULL,
411  ) : PThreadPool<Work_T>(maxWorkers, maxWorkUnits, threadName, priority)
412  { }
413 
415  {
416  public:
419  const char * threadName = NULL)
420  : PThreadPool<Work_T>::WorkerThread(pool, priority, threadName)
421  , m_available(0, INT_MAX)
422  {
423  }
424 
426  { }
427 
428  void AddWork(Work_T * work)
429  {
430  m_mutex.Wait();
431  m_queue.push(work);
433  m_mutex.Signal();
434  }
435 
436  void RemoveWork(Work_T * )
437  {
438  m_mutex.Wait();
439  Work_T * work = m_queue.front();
440  m_queue.pop();
441  m_mutex.Signal();
442  delete work;
443  }
444 
445  unsigned GetWorkSize() const
446  {
447  return (unsigned)m_queue.size();
448  }
449 
450  void Main()
451  {
452  for (;;) {
453  m_available.Wait();
455  break;
456 
457  m_mutex.Wait();
458  Work_T * work = m_queue.empty() ? NULL : m_queue.front();
459  m_mutex.Signal();
460 
461  if (work != NULL) {
462  work->Work();
464  }
465  }
466  }
467 
468  void Shutdown()
469  {
472  }
473 
474  protected:
475  typedef std::queue<Work_T *> Queue;
479  };
480 
481 
483  {
484  return new QueuedWorkerThread(*this, this->m_priority, this->m_threadName);
485  }
486 };
487 
488 
492 class PSafeWork : public PSafePtrBase {
493  public:
495  PSafeObject * ptr
496  ) : PSafePtrBase(ptr) { }
497 
498  virtual void Work()
499  {
500  PSafeObject * ptr = this->GetObject();
501  if (ptr != NULL) {
503  CallFunction(*ptr);
504  }
505  }
506 
507  virtual void CallFunction(PSafeObject & obj) = 0;
508 };
509 
510 
513 
514 
516 template <class PtrClass, typename FuncRet = void>
517 class PSafeWorkNoArg : public PSafeWork {
518  public:
519  typedef FuncRet (PtrClass::*Function)();
520 
521  protected:
522 #pragma pack(16)
524 #pragma pack()
525 
526  public:
528  PtrClass * ptr,
529  Function function
530  ) : PSafeWork(ptr)
531  , m_function(function)
532  { }
533 
534  virtual void CallFunction(PSafeObject & obj)
535  {
536  (dynamic_cast<PtrClass&>(obj).*(this->m_function))();
537  }
538 };
539 
540 
542 template <
543  class PtrClass,
544  typename Arg1Type,
545  typename FuncRet = void
546 >
547 class PSafeWorkArg1 : public PSafeWork {
548  public:
549  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1);
550 
551  protected:
552 #pragma pack(16)
554  Arg1Type m_arg1;
555 #pragma pack()
556 
557  public:
559  PtrClass * ptr,
560  Arg1Type arg1,
561  Function function
562  ) : PSafeWork(ptr)
563  , m_function(function)
564  , m_arg1(arg1)
565  { }
566 
567  virtual void CallFunction(PSafeObject & obj)
568  {
569  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1);
570  }
571 };
572 
573 
575 template <
576  class PtrClass,
577  typename Arg1Type,
578  typename Arg2Type,
579  typename FuncRet = void
580 >
581 class PSafeWorkArg2 : public PSafeWork {
582  public:
583  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1, Arg2Type arg2);
584 
585  protected:
586 #pragma pack(16)
588  Arg1Type m_arg1;
589  Arg2Type m_arg2;
590 #pragma pack()
591 
592  public:
594  PtrClass * ptr,
595  Arg1Type arg1,
596  Arg2Type arg2,
597  Function function
598  ) : PSafeWork(ptr)
599  , m_function(function)
600  , m_arg1(arg1)
601  , m_arg2(arg2)
602  { }
603 
604  virtual void CallFunction(PSafeObject & obj)
605  {
606  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1, m_arg2);
607  }
608 };
609 
610 
612 template <
613  class PtrClass,
614  typename Arg1Type,
615  typename Arg2Type,
616  typename Arg3Type,
617  typename FuncRet = void
618 >
619 class PSafeWorkArg3 : public PSafeWork {
620  public:
621  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1, Arg2Type arg2, Arg3Type arg3);
622 
623  protected:
624 #pragma pack(16)
626  Arg1Type m_arg1;
627  Arg2Type m_arg2;
628  Arg3Type m_arg3;
629 #pragma pack()
630 
631  public:
633  PtrClass * ptr,
634  Arg1Type arg1,
635  Arg2Type arg2,
636  Arg2Type arg3,
637  Function function
638  ) : PSafeWork(ptr)
639  , m_function(function)
640  , m_arg1(arg1)
641  , m_arg2(arg2)
642  , m_arg3(arg3)
643  { }
644 
645  virtual void CallFunction(PSafeObject & obj)
646  {
647  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1, m_arg2, m_arg3);
648  }
649 };
650 
651 
652 #endif // PTLIB_THREADPOOL_H
653 
654 
655 // End Of File ///////////////////////////////////////////////////////////////