PTLib  Version 2.18.8
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
threadpool.h
Go to the documentation of this file.
1 /*
2  * threadpool.h
3  *
4  * Generalised Thread Pooling functions
5  *
6  * Portable Tools Library
7  *
8  * Copyright (C) 2009 Post Increment
9  *
10  * The contents of this file are subject to the Mozilla Public License
11  * Version 1.0 (the "License"); you may not use this file except in
12  * compliance with the License. You may obtain a copy of the License at
13  * http://www.mozilla.org/MPL/
14  *
15  * Software distributed under the License is distributed on an "AS IS"
16  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17  * the License for the specific language governing rights and limitations
18  * under the License.
19  *
20  * The Original Code is Portable Windows Library.
21  *
22  * The Initial Developer of the Original Code is Post Increment
23  *
24  * Portions of this code were written with the financial assistance of
25  * Metreos Corporation (http://www.metros.com).
26  *
27  * Contributor(s): ______________________________________.
28  */
29 
30 
31 #ifndef PTLIB_THREADPOOL_H
32 #define PTLIB_THREADPOOL_H
33 
34 #ifdef P_USE_PRAGMA
35 #pragma interface
36 #endif
37 
38 
39 #include <ptlib/thread.h>
40 #include <ptlib/safecoll.h>
41 #include <map>
42 #include <queue>
43 
44 
45 #define PThreadPoolTraceModule "ThreadPool"
46 
150 class PThreadPoolBase : public PObject
151 {
152  public:
153  class WorkerThreadBase : public PThread
154  {
155  protected:
156  WorkerThreadBase(PThreadPoolBase & pool, Priority priority, const char * threadName);
157  virtual void Main();
158 
159  public:
160  virtual bool Work() = 0;
161  virtual void Shutdown() = 0;
162  virtual unsigned GetWorkSize() const = 0;
163 
166  PDECLARE_MUTEX( m_workerMutex);
167  };
168 
170  {
171  public:
172  InternalWorkBase(const char * group)
173  {
174  if (group != NULL)
175  m_group = group;
176  }
177  std::string m_group;
178  };
179 
181  {
182  Shutdown();
183  }
184 
185  void Shutdown();
186  virtual void ReclaimWorkers();
187 
188  unsigned GetMaxWorkers() const { return m_maxWorkerCount; }
189 
190  void SetMaxWorkers(
191  unsigned count
192  );
193 
194  unsigned GetMaxUnits() const { return m_maxWorkUnitCount; }
195 
197  unsigned count
198  ) { m_maxWorkUnitCount = count; }
199 
200  protected:
202  unsigned maxWorkerCount,
203  unsigned maxWorkUnitCount,
204  const char * threadName,
205  PThread::Priority priority
206  );
207 
208  virtual WorkerThreadBase * AllocateWorker();
209  virtual WorkerThreadBase * CreateWorkerThread() = 0;
210  virtual void StopWorker(WorkerThreadBase * worker);
211  virtual bool OnWorkerStarted(WorkerThreadBase & thread);
212 
213  typedef std::vector<WorkerThreadBase *> WorkerList_t;
215  PDECLARE_MUTEX(m_mutex);
216 
219 #if PTRACING
220  unsigned m_highWaterMark; // For logging
221 #endif
223  PThread::Priority m_priority;
224 };
225 
226 
229 template <class Work_T>
231 {
232  PCLASSINFO(PThreadPool, PThreadPoolBase);
233  public:
234  //
235  // constructor
236  //
238  unsigned maxWorkers = 10,
239  unsigned maxWorkUnits = 0,
240  const char * threadName = NULL,
241  PThread::Priority priority = PThread::NormalPriority
242  ) : PThreadPoolBase(maxWorkers, maxWorkUnits, threadName, priority)
243  { }
244 
245  //
246  // define the ancestor of the worker thread
247  //
248  class WorkerThread : public WorkerThreadBase
249  {
250  protected:
251  WorkerThread(PThreadPool & pool, Priority priority = NormalPriority, const char * threadName = NULL)
252  : WorkerThreadBase(pool, priority, threadName)
253  {
254  }
255 
256  public:
257  virtual void AddWork(Work_T * work, const string & group) = 0;
258  virtual void RemoveWork(Work_T * work) = 0;
259  };
260 
261  //
262  // define internal worker wrapper class
263  //
264  class InternalWork : public InternalWorkBase
265  {
266  public:
267  InternalWork(Work_T * work, const char * group)
268  : InternalWorkBase(group)
269  , m_work(work)
270  , m_worker(NULL)
271  {
272  }
273 
274  Work_T * m_work;
276  };
277 
278  //
279  // define map for external work units to internal work
280  //
281  typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
283 
284 
285  //
286  // define class for storing group informationm
287  //
288  struct GroupInfo {
289  unsigned m_count;
291  GroupInfo(WorkerThread * worker) : m_count(1), m_worker(worker) { }
292  };
293 
294 
295  //
296  // define map for group ID to group information
297  //
298  typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
300 
301 
302  //
303  // add a new unit of work to a worker thread
304  //
305  bool AddWork(Work_T * work, const char * group = NULL)
306  {
307  // create internal work structure
308  InternalWork internalWork(work, group);
309 
310  typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.end();
311 
312  PWaitAndSignal m(m_mutex);
313 
314  // allocate by group if specified, else allocate to least busy
315  if (internalWork.m_group.empty() || (iterGroup = m_groupInfoMap.find(group)) == m_groupInfoMap.end()) {
316  internalWork.m_worker = (WorkerThread *)AllocateWorker();
317 
318  // if cannot allocate worker, return
319  if (internalWork.m_worker == NULL)
320  return false;
321 
322  // add group ID to map
323  if (!internalWork.m_group.empty()) {
324  m_groupInfoMap.insert(make_pair(internalWork.m_group, GroupInfo(internalWork.m_worker)));
325  PTRACE(6, PThreadPoolTraceModule, "Setting worker thread \"" << *internalWork.m_worker << "\""
326  " with group Id \"" << internalWork.m_group << '"');
327  }
328  }
329  else {
330  internalWork.m_worker = iterGroup->second.m_worker;
331  ++iterGroup->second.m_count;
332  PTRACE(6, PThreadPoolTraceModule, "Using existing worker thread \"" << *internalWork.m_worker << "\""
333  " with group Id \"" << internalWork.m_group << "\", count=" << iterGroup->second.m_count);
334  }
335 
336  // add work to external to internal map
337  m_externalToInternalWorkMap.insert(make_pair(work, internalWork));
338 
339  // give the work to the worker
340  internalWork.m_worker->AddWork(work, internalWork.m_group);
341 
342  return true;
343  }
344 
345  //
346  // remove a unit of work from a worker thread
347  //
348  bool RemoveWork(Work_T * work)
349  {
350  PWaitAndSignal m(m_mutex);
351 
352  // find worker with work unit to remove
353  typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
354  if (!PAssert(iterWork != m_externalToInternalWorkMap.end(), "Missing work!"))
355  return false;
356 
357  InternalWork & internalWork = iterWork->second;
358 
359  // update group information
360  if (!internalWork.m_group.empty()) {
361  typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
362  if (PAssert(iterGroup != m_groupInfoMap.end(), "Unknown work group") && --iterGroup->second.m_count == 0) {
363  PTRACE(6, PThreadPoolTraceModule, "Removing worker thread \"" << *internalWork.m_worker << "\""
364  " from group Id \"" << internalWork.m_group << '"');
365  m_groupInfoMap.erase(iterGroup);
366  }
367  }
368 
369  // tell worker to stop processing work
370  internalWork.m_worker->RemoveWork(work);
371 
372  // remove element from work unit map
373  m_externalToInternalWorkMap.erase(iterWork);
374 
375  return true;
376  }
377 };
378 
379 
382 template <class Work_T>
383 class PQueuedThreadPool : public PThreadPool<Work_T>
384 {
385  protected:
389 
390  public:
391  //
392  // constructor
393  //
395  unsigned maxWorkers = std::max(PThread::GetNumProcessors(), 10U),
396  unsigned maxWorkUnits = 0,
397  const char * threadName = NULL,
398  PThread::Priority priority = PThread::NormalPriority,
399  const PTimeInterval & workerIncreaseLatency = PMaxTimeInterval,
400  unsigned workerIncreaseLimit = 0
401  ) : PThreadPool<Work_T>(maxWorkers, maxWorkUnits, threadName, priority)
402  , m_workerIncreaseLatency(workerIncreaseLatency)
403  , m_workerIncreaseLimit(std::max(maxWorkers, workerIncreaseLimit))
404  {
405  PTRACE(4, NULL, PThreadPoolTraceModule, "Thread pool created:"
406  " maxWorkers=" << maxWorkers << ","
407  " maxWorkUnits=" << maxWorkUnits << ","
408  " threadName=" << this->m_threadName << ","
409  " priority=" << priority << ","
410  " workerIncreaseLatency=" << workerIncreaseLatency << ","
411  " workerIncreaseLimit=" << workerIncreaseLimit);
412  }
413 
416  unsigned GetWorkerIncreaseLimit() const { return m_workerIncreaseLimit; }
417  void SetWorkerIncreaseLimit(unsigned limit) { m_workerIncreaseLimit = limit; }
418 
420  {
421  public:
423  PThread::Priority priority = PThread::NormalPriority,
424  const char * threadName = NULL)
425  : PThreadPool<Work_T>::WorkerThread(pool, priority, threadName)
426  , m_working(false)
427  {
428  }
429 
431  {
432  this->WaitForTermination();
433  }
434 
435  void AddWork(Work_T * work, const string & group)
436  {
437  if (PAssertNULL(work) != NULL)
438  this->m_queue.Enqueue(QueuedWork(work, group));
439  }
440 
441  void RemoveWork(Work_T * work)
442  {
443  delete work;
444  }
445 
446  unsigned GetWorkSize() const
447  {
448  unsigned work = this->m_queue.size();
449  if (this->m_working)
450  ++work;
451  return work;
452  }
453 
454  virtual bool Work()
455  {
456  QueuedWork item;
457  if (!this->m_queue.Dequeue(item))
458  return false;
459 
460  this->m_working = true;
461 
462  PQueuedThreadPool & pool = dynamic_cast<PQueuedThreadPool &>(this->m_pool);
463  PTimeInterval latency = item.m_time.GetElapsed();
464 
465  item.m_work->Work();
466 
467  if (!pool.RemoveWork(item.m_work))
468  this->RemoveWork(item.m_work);
469 
470  this->m_working = false;
471 
472  if (latency > pool.m_workerIncreaseLatency)
473  pool.OnMaxWaitTime(*this, latency, item.m_group);
474  return true;
475  }
476 
477  void Shutdown()
478  {
479  this->m_shutdown = true;
480  this->m_queue.Close(true);
481  }
482 
483  protected:
484  struct QueuedWork
485  {
486  QueuedWork() : m_time(0), m_work(NULL) { }
487  explicit QueuedWork(Work_T * work, const string & group) : m_work(work), m_group(group) { }
489  Work_T * m_work;
490  string m_group;
491  };
494  };
495 
496  P_REMOVE_VIRTUAL_VOID(OnMaxWaitTime(const PTimeInterval&,const string&))
497 
498  virtual void OnMaxWaitTime(const QueuedWorkerThread & PTRACE_PARAM(thread),
499  const PTimeInterval & PTRACE_PARAM(latency),
500  const string & PTRACE_PARAM(group))
501  {
502  if (this->m_maxWorkUnitCount > 0) {
503 #if PTRACING
504  TraceMaxWaitTime(2, "using max work units, threads=", 0, thread, latency, group);
505 #endif
506  return;
507  }
508 
509  PTime now;
510  if (this->m_nextWorkerIncreaseTime > now) {
511 #if PTRACING
512  TraceMaxWaitTime(5, "recent adjustment prevents increase, threads=", 0, thread, latency, group);
513 #endif
514  return;
515  }
516 
517  this->m_nextWorkerIncreaseTime = now + this->m_workerIncreaseLatency; // Don't increase again until had some time to clear backlog.
518 
519  unsigned newMaxWorkers = std::min((this->m_maxWorkerCount*11+9)/10, this->m_workerIncreaseLimit);
520  if (newMaxWorkers == this->m_maxWorkerCount) {
521 #if PTRACING
522  TraceMaxWaitTime(2, "cannot increase threads from ", newMaxWorkers, thread, latency, group);
523 #endif
524  return;
525  }
526 
527 #if PTRACING
528  TraceMaxWaitTime(2, "increasing pool threads from ", newMaxWorkers, thread, latency, group);
529 #endif
530  this->m_maxWorkerCount = newMaxWorkers;
531  }
532 
533  protected:
534 #if PTRACING
535  void TraceMaxWaitTime(unsigned level,
536  const char * msg,
537  size_t newMaxWorkers,
538  const QueuedWorkerThread & thread,
539  const PTimeInterval & latency,
540  const string & group)
541  {
542  if (PTrace::CanTrace(level)) {
543  ostream & trace = PTRACE_BEGIN(level, NULL, PThreadPoolTraceModule);
544  trace << "Thread pool";
545  if (!group.empty())
546  trace << " (group=\"" << group << "\")";
547  trace << " latency excessive (" << latency << "s > " << this->m_workerIncreaseLatency << "s,"
548  " work-size=" << thread.GetWorkSize() << "), "
549  << msg << this->m_maxWorkerCount;
550  if (newMaxWorkers > 0)
551  trace << " to " << newMaxWorkers;
552  trace << ", limit=" << this->m_workerIncreaseLimit
553  << PTrace::End;
554  }
555  }
556 #endif
557 
559  {
560  return new QueuedWorkerThread(*this, this->m_priority, this->m_threadName);
561  }
562 };
563 
564 
568 class PSafeWork : public PSafePtrBase {
569  public:
571  PSafeObject * ptr
572  ) : PSafePtrBase(ptr) { }
573 
574  virtual void Work()
575  {
576  PSafeObject * ptr = this->GetObject();
577  if (ptr != NULL) {
579  CallFunction(*ptr);
580  }
581  }
582 
583  virtual void CallFunction(PSafeObject & obj) = 0;
584 };
585 
586 
589 
590 
592 template <class PtrClass, typename FuncRet = void>
593 class PSafeWorkNoArg : public PSafeWork
594 {
595  PCLASSINFO_ALIGNED(PSafeWorkNoArg, PSafeWork, 16);
596 
597  public:
598  typedef FuncRet (PtrClass::*Function)();
599 
600  protected:
601  P_ALIGN_FIELD(Function,m_function,16);
602 
603  public:
605  PtrClass * ptr,
606  Function function
607  ) : PSafeWork(ptr)
608  , m_function(function)
609  { }
610 
611  virtual void CallFunction(PSafeObject & obj)
612  {
613  (dynamic_cast<PtrClass&>(obj).*(this->m_function))();
614  }
615 };
616 
617 
619 template <
620  class PtrClass,
621  typename Arg1Type,
622  typename FuncRet = void
623 >
624 class PSafeWorkArg1 : public PSafeWork
625 {
626  PCLASSINFO_ALIGNED(PSafeWorkArg1, PSafeWork, 16);
627 
628  public:
629  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1);
630 
631  protected:
632  P_ALIGN_FIELD(Function,m_function,16);
633  Arg1Type m_arg1;
634 
635  public:
637  PtrClass * ptr,
638  Arg1Type arg1,
639  Function function
640  ) : PSafeWork(ptr)
641  , m_function(function)
642  , m_arg1(arg1)
643  { }
644 
645  virtual void CallFunction(PSafeObject & obj)
646  {
647  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1);
648  }
649 };
650 
651 
653 template <
654  class PtrClass,
655  typename Arg1Type,
656  typename Arg2Type,
657  typename FuncRet = void
658 >
659 class PSafeWorkArg2 : public PSafeWork
660 {
661  PCLASSINFO_ALIGNED(PSafeWorkArg2, PSafeWork, 16);
662 
663  public:
664  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1, Arg2Type arg2);
665 
666  protected:
667  P_ALIGN_FIELD(Function,m_function,16);
668  Arg1Type m_arg1;
669  Arg2Type m_arg2;
670 
671  public:
673  PtrClass * ptr,
674  Arg1Type arg1,
675  Arg2Type arg2,
676  Function function
677  ) : PSafeWork(ptr)
678  , m_function(function)
679  , m_arg1(arg1)
680  , m_arg2(arg2)
681  { }
682 
683  virtual void CallFunction(PSafeObject & obj)
684  {
685  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1, m_arg2);
686  }
687 };
688 
689 
691 template <
692  class PtrClass,
693  typename Arg1Type,
694  typename Arg2Type,
695  typename Arg3Type,
696  typename FuncRet = void
697 >
698 class PSafeWorkArg3 : public PSafeWork
699 {
700  PCLASSINFO_ALIGNED(PSafeWorkArg3, PSafeWork, 16);
701 
702  public:
703  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1, Arg2Type arg2, Arg3Type arg3);
704 
705  protected:
706  P_ALIGN_FIELD(Function,m_function,16);
707  Arg1Type m_arg1;
708  Arg2Type m_arg2;
709  Arg3Type m_arg3;
710 
711  public:
713  PtrClass * ptr,
714  Arg1Type arg1,
715  Arg2Type arg2,
716  Arg3Type arg3,
717  Function function
718  ) : PSafeWork(ptr)
719  , m_function(function)
720  , m_arg1(arg1)
721  , m_arg2(arg2)
722  , m_arg3(arg3)
723  { }
724 
725  virtual void CallFunction(PSafeObject & obj)
726  {
727  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1, m_arg2, m_arg3);
728  }
729 };
730 
731 
732 #endif // PTLIB_THREADPOOL_H
733 
734 
735 // End Of File ///////////////////////////////////////////////////////////////
#define PMaxTimeInterval
Definition: timeint.h:31
This class waits for the semaphore on construction and automatically signals the semaphore on destruc...
Definition: psync.h:115
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:611
WorkerThreadBase(PThreadPoolBase &pool, Priority priority, const char *threadName)
void SetMaxUnits(unsigned count)
Definition: threadpool.h:196
#define PTRACE_CONTEXT_ID_PUSH_THREAD(obj)
Definition: object.h:1114
virtual void CallFunction(PSafeObject &obj)=0
Definition: threadpool.h:264
This class defines an arbitrary time interval to millisecond accuracy.
Definition: timeint.h:51
A PSafeWork thread pool item where call back has no arguments.
Definition: threadpool.h:593
FuncRet(PtrClass::* Function)(Arg1Type arg1)
Definition: threadpool.h:629
This class defines a base class for thread-safe pointer to an object.
Definition: safecoll.h:568
bool RemoveWork(Work_T *work)
Definition: threadpool.h:348
~PThreadPoolBase()
Definition: threadpool.h:180
virtual bool OnWorkerStarted(WorkerThreadBase &thread)
PSafeObject * GetObject() const
Return pointer to safe object.
Definition: safecoll.h:661
unsigned GetMaxUnits() const
Definition: threadpool.h:194
PSafeWorkNoArg(PtrClass *ptr, Function function)
Definition: threadpool.h:604
Work_T * m_work
Definition: threadpool.h:274
#define PTRACE_PARAM(...)
Definition: object.h:935
This class defines an absolute time and date.
Definition: ptime.h:49
std::vector< WorkerThreadBase * > WorkerList_t
Definition: threadpool.h:213
void SetMaxWorkers(unsigned count)
Definition: threadpool.h:419
virtual void StopWorker(WorkerThreadBase *worker)
PSafeWorkArg3(PtrClass *ptr, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Function function)
Definition: threadpool.h:712
~QueuedWorkerThread()
Definition: threadpool.h:430
WorkerList_t m_workers
Definition: threadpool.h:214
Definition: threadpool.h:169
PTimeInterval GetElapsed() const
Retrun time elapsed from &quot;now&quot;.
Work_T * m_work
Definition: threadpool.h:489
P_ALIGN_FIELD(Function, m_function, 16)
P_ALIGN_FIELD(Function, m_function, 16)
unsigned m_count
Definition: threadpool.h:289
void SetWorkerIncreaseLatency(const PTimeInterval &time)
Definition: threadpool.h:415
PSyncQueue< QueuedWork > m_queue
Definition: threadpool.h:492
Arg1Type m_arg1
Definition: threadpool.h:668
PTimeInterval m_workerIncreaseLatency
Definition: threadpool.h:386
static ostream & End(ostream &strm)
End a trace output.
A PThreadPool work item template that uses PSafePtr to execute callback function. ...
Definition: threadpool.h:568
A synchronous queue of objects.
Definition: syncthrd.h:579
PQueuedThreadPool< PSafeWork > PSafeThreadPool
The thread pool for PSafeWork items.
Definition: threadpool.h:588
WorkerThread * m_worker
Definition: threadpool.h:290
ExternalToInternalWorkMap_T m_externalToInternalWorkMap
Definition: threadpool.h:282
PTime m_nextWorkerIncreaseTime
Definition: threadpool.h:388
void AddWork(Work_T *work, const string &group)
Definition: threadpool.h:435
GroupInfoMap_t m_groupInfoMap
Definition: threadpool.h:299
WorkerThread(PThreadPool &pool, Priority priority=NormalPriority, const char *threadName=NULL)
Definition: threadpool.h:251
PTime m_time
Definition: threadpool.h:488
PThreadPoolBase(unsigned maxWorkerCount, unsigned maxWorkUnitCount, const char *threadName, PThread::Priority priority)
P_ALIGN_FIELD(Function, m_function, 16)
GroupInfo(WorkerThread *worker)
Definition: threadpool.h:291
std::map< Work_T *, InternalWork > ExternalToInternalWorkMap_T
Definition: threadpool.h:281
P_ALIGN_FIELD(Function, m_function, 16)
Arg2Type m_arg2
Definition: threadpool.h:708
virtual void RemoveWork(Work_T *work)=0
PSafeWorkArg1(PtrClass *ptr, Arg1Type arg1, Function function)
Definition: threadpool.h:636
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:645
void RemoveWork(Work_T *work)
Definition: threadpool.h:441
#define PTRACE(...)
Output trace.
Definition: object.h:1039
unsigned m_maxWorkUnitCount
Definition: threadpool.h:218
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:725
A PSafeWork thread pool item where call back has 3 arguments.
Definition: threadpool.h:698
FuncRet(PtrClass::* Function)(Arg1Type arg1, Arg2Type arg2)
Definition: threadpool.h:664
Arg2Type m_arg2
Definition: threadpool.h:669
High Level (queued work item) thread pool.
Definition: threadpool.h:383
virtual void OnMaxWaitTime(const QueuedWorkerThread &PTRACE_PARAM(thread), const PTimeInterval &PTRACE_PARAM(latency), const string &PTRACE_PARAM(group))
Definition: threadpool.h:498
unsigned GetWorkSize() const
Definition: threadpool.h:446
FuncRet(PtrClass::* Function)()
Definition: threadpool.h:598
PSafeWorkArg2(PtrClass *ptr, Arg1Type arg1, Arg2Type arg2, Function function)
Definition: threadpool.h:672
These classes and templates implement a generic thread pooling mechanism.
Definition: threadpool.h:150
#define PAssertNULL(ptr)
This macro is used to assert that a pointer must be non-null.
Definition: object.h:428
This class defines a thread-safe object in a collection.
Definition: safecoll.h:123
virtual bool Work()
Definition: threadpool.h:454
virtual PThreadPoolBase::WorkerThreadBase * CreateWorkerThread()
Definition: threadpool.h:558
bool m_shutdown
Definition: threadpool.h:165
WorkerThread * m_worker
Definition: threadpool.h:275
void Shutdown()
Definition: threadpool.h:477
The character string class.
Definition: pstring.h:108
virtual void AddWork(Work_T *work, const string &group)=0
FuncRet(PtrClass::* Function)(Arg1Type arg1, Arg2Type arg2, Arg3Type arg3)
Definition: threadpool.h:703
virtual WorkerThreadBase * CreateWorkerThread()=0
const PTimeInterval & GetWorkerIncreaseLatency() const
Definition: threadpool.h:414
A PSafeWork thread pool item where call back has 1 argument.
Definition: threadpool.h:624
std::map< std::string, GroupInfo > GroupInfoMap_t
Definition: threadpool.h:298
Definition: threadpool.h:153
virtual void Work()
Definition: threadpool.h:574
Definition: threadpool.h:288
This class defines a thread of execution in the system.
Definition: thread.h:66
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:683
PThreadPool(unsigned maxWorkers=10, unsigned maxWorkUnits=0, const char *threadName=NULL, PThread::Priority priority=PThread::NormalPriority)
Definition: threadpool.h:237
Arg1Type m_arg1
Definition: threadpool.h:707
virtual void Main()
User override function for the main execution routine of the thread.
static unsigned GetNumProcessors()
Get number of processors, or processor cores, this machine has available.
virtual WorkerThreadBase * AllocateWorker()
bool AddWork(Work_T *work, const char *group=NULL)
Definition: threadpool.h:305
#define PTRACE_BEGIN(...)
Begin output trace.
Definition: object.h:1078
unsigned m_maxWorkerCount
Definition: threadpool.h:217
string m_group
Definition: threadpool.h:490
unsigned GetWorkerIncreaseLimit() const
Definition: threadpool.h:416
#define PThreadPoolTraceModule
Definition: threadpool.h:45
Arg1Type m_arg1
Definition: threadpool.h:633
#define PAssert(b, msg)
This macro is used to assert that a condition must be true.
Definition: object.h:400
InternalWorkBase(const char *group)
Definition: threadpool.h:172
A PSafeWork thread pool item where call back has 2 arguments.
Definition: threadpool.h:659
#define P_REMOVE_VIRTUAL_VOID(fn)
Definition: object.h:145
PThreadPoolBase & m_pool
Definition: threadpool.h:164
atomic< bool > m_working
Definition: threadpool.h:493
std::string m_group
Definition: threadpool.h:177
Definition: threadpool.h:248
InternalWork(Work_T *work, const char *group)
Definition: threadpool.h:267
PDECLARE_MUTEX(m_mutex)
virtual void ReclaimWorkers()
PSafeWork(PSafeObject *ptr)
Definition: threadpool.h:570
Ultimate parent class for all objects in the class library.
Definition: object.h:2204
unsigned GetMaxWorkers() const
Definition: threadpool.h:188
QueuedWorkerThread(PThreadPool< Work_T > &pool, PThread::Priority priority=PThread::NormalPriority, const char *threadName=NULL)
Definition: threadpool.h:422
Arg3Type m_arg3
Definition: threadpool.h:709
unsigned m_workerIncreaseLimit
Definition: threadpool.h:387
void SetWorkerIncreaseLimit(unsigned limit)
Definition: threadpool.h:417
PQueuedThreadPool(unsigned maxWorkers=std::max(PThread::GetNumProcessors(), 10U), unsigned maxWorkUnits=0, const char *threadName=NULL, PThread::Priority priority=PThread::NormalPriority, const PTimeInterval &workerIncreaseLatency=PMaxTimeInterval, unsigned workerIncreaseLimit=0)
Definition: threadpool.h:394
Low Level thread pool.
Definition: threadpool.h:230
PString m_threadName
Definition: threadpool.h:222
static PBoolean CanTrace(unsigned level)
Determine if the level may cause trace output.
PThread::Priority m_priority
Definition: threadpool.h:223
virtual unsigned GetWorkSize() const =0
QueuedWork(Work_T *work, const string &group)
Definition: threadpool.h:487