31 #ifndef PTLIB_THREADPOOL_H
32 #define PTLIB_THREADPOOL_H
45 #define PThreadPoolTraceModule "ThreadPool"
160 virtual bool Work() = 0;
202 unsigned maxWorkerCount,
203 unsigned maxWorkUnitCount,
204 const char * threadName,
205 PThread::Priority priority
210 virtual void StopWorker(WorkerThreadBase * worker);
220 unsigned m_highWaterMark;
229 template <
class Work_T>
238 unsigned maxWorkers = 10,
239 unsigned maxWorkUnits = 0,
240 const char * threadName = NULL,
241 PThread::Priority priority = PThread::NormalPriority
252 : WorkerThreadBase(pool, priority, threadName)
257 virtual void AddWork(Work_T * work,
const string & group) = 0;
268 : InternalWorkBase(group)
305 bool AddWork(Work_T * work,
const char * group = NULL)
308 InternalWork internalWork(work, group);
310 typename GroupInfoMap_t::iterator iterGroup =
m_groupInfoMap.end();
319 if (internalWork.m_worker == NULL)
323 if (!internalWork.m_group.empty()) {
324 m_groupInfoMap.insert(make_pair(internalWork.m_group, GroupInfo(internalWork.m_worker)));
326 " with group Id \"" << internalWork.m_group <<
'"');
330 internalWork.m_worker = iterGroup->second.m_worker;
331 ++iterGroup->second.m_count;
333 " with group Id \"" << internalWork.m_group <<
"\", count=" << iterGroup->second.m_count);
340 internalWork.m_worker->AddWork(work, internalWork.m_group);
357 InternalWork & internalWork = iterWork->second;
360 if (!internalWork.m_group.empty()) {
361 typename GroupInfoMap_t::iterator iterGroup =
m_groupInfoMap.find(internalWork.m_group);
362 if (
PAssert(iterGroup !=
m_groupInfoMap.end(),
"Unknown work group") && --iterGroup->second.m_count == 0) {
364 " from group Id \"" << internalWork.m_group <<
'"');
370 internalWork.m_worker->RemoveWork(work);
382 template <
class Work_T>
396 unsigned maxWorkUnits = 0,
397 const char * threadName = NULL,
398 PThread::Priority priority = PThread::NormalPriority,
400 unsigned workerIncreaseLimit = 0
401 ) :
PThreadPool<Work_T>(maxWorkers, maxWorkUnits, threadName, priority)
406 " maxWorkers=" << maxWorkers <<
","
407 " maxWorkUnits=" << maxWorkUnits <<
","
409 " priority=" << priority <<
","
410 " workerIncreaseLatency=" << workerIncreaseLatency <<
","
411 " workerIncreaseLimit=" << workerIncreaseLimit);
423 PThread::Priority priority = PThread::NormalPriority,
424 const char * threadName = NULL)
432 this->WaitForTermination();
435 void AddWork(Work_T * work,
const string & group)
448 unsigned work = this->
m_queue.size();
457 if (!this->
m_queue.Dequeue(item))
479 this->m_shutdown =
true;
504 TraceMaxWaitTime(2,
"using max work units, threads=", 0, thread, latency, group);
512 TraceMaxWaitTime(5,
"recent adjustment prevents increase, threads=", 0, thread, latency, group);
522 TraceMaxWaitTime(2,
"cannot increase threads from ", newMaxWorkers, thread, latency, group);
528 TraceMaxWaitTime(2,
"increasing pool threads from ", newMaxWorkers, thread, latency, group);
535 void TraceMaxWaitTime(
unsigned level,
537 size_t newMaxWorkers,
538 const QueuedWorkerThread & thread,
540 const string & group)
544 trace <<
"Thread pool";
546 trace <<
" (group=\"" << group <<
"\")";
548 " work-size=" << thread.GetWorkSize() <<
"), "
550 if (newMaxWorkers > 0)
551 trace <<
" to " << newMaxWorkers;
592 template <
class PtrClass,
typename FuncRet =
void>
608 , m_function(function)
613 (
dynamic_cast<PtrClass&
>(obj).*(this->m_function))();
622 typename FuncRet =
void
629 typedef FuncRet (PtrClass::*
Function)(Arg1Type arg1);
641 , m_function(function)
647 (
dynamic_cast<PtrClass&
>(obj).*(this->m_function))(
m_arg1);
657 typename FuncRet =
void
664 typedef FuncRet (PtrClass::*
Function)(Arg1Type arg1, Arg2Type arg2);
678 , m_function(function)
685 (
dynamic_cast<PtrClass&
>(obj).*(this->m_function))(
m_arg1,
m_arg2);
696 typename FuncRet =
void
703 typedef FuncRet (PtrClass::*
Function)(Arg1Type arg1, Arg2Type arg2, Arg3Type arg3);
719 , m_function(function)
732 #endif // PTLIB_THREADPOOL_H
#define PMaxTimeInterval
Definition: timeint.h:31
This class waits for the semaphore on construction and automatically signals the semaphore on destruc...
Definition: psync.h:115
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:611
WorkerThreadBase(PThreadPoolBase &pool, Priority priority, const char *threadName)
void SetMaxUnits(unsigned count)
Definition: threadpool.h:196
#define PTRACE_CONTEXT_ID_PUSH_THREAD(obj)
Definition: object.h:1114
virtual void CallFunction(PSafeObject &obj)=0
Definition: threadpool.h:264
This class defines an arbitrary time interval to millisecond accuracy.
Definition: timeint.h:51
A PSafeWork thread pool item where call back has no arguments.
Definition: threadpool.h:593
FuncRet(PtrClass::* Function)(Arg1Type arg1)
Definition: threadpool.h:629
This class defines a base class for thread-safe pointer to an object.
Definition: safecoll.h:568
bool RemoveWork(Work_T *work)
Definition: threadpool.h:348
~PThreadPoolBase()
Definition: threadpool.h:180
virtual bool OnWorkerStarted(WorkerThreadBase &thread)
PSafeObject * GetObject() const
Return pointer to safe object.
Definition: safecoll.h:661
unsigned GetMaxUnits() const
Definition: threadpool.h:194
PSafeWorkNoArg(PtrClass *ptr, Function function)
Definition: threadpool.h:604
Work_T * m_work
Definition: threadpool.h:274
#define PTRACE_PARAM(...)
Definition: object.h:935
This class defines an absolute time and date.
Definition: ptime.h:49
std::vector< WorkerThreadBase * > WorkerList_t
Definition: threadpool.h:213
void SetMaxWorkers(unsigned count)
Definition: threadpool.h:419
virtual void StopWorker(WorkerThreadBase *worker)
Definition: threadpool.h:484
PSafeWorkArg3(PtrClass *ptr, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Function function)
Definition: threadpool.h:712
~QueuedWorkerThread()
Definition: threadpool.h:430
WorkerList_t m_workers
Definition: threadpool.h:214
Definition: threadpool.h:169
PTimeInterval GetElapsed() const
Retrun time elapsed from "now".
Work_T * m_work
Definition: threadpool.h:489
P_ALIGN_FIELD(Function, m_function, 16)
P_ALIGN_FIELD(Function, m_function, 16)
unsigned m_count
Definition: threadpool.h:289
void SetWorkerIncreaseLatency(const PTimeInterval &time)
Definition: threadpool.h:415
PSyncQueue< QueuedWork > m_queue
Definition: threadpool.h:492
Arg1Type m_arg1
Definition: threadpool.h:668
PTimeInterval m_workerIncreaseLatency
Definition: threadpool.h:386
static ostream & End(ostream &strm)
End a trace output.
A PThreadPool work item template that uses PSafePtr to execute callback function. ...
Definition: threadpool.h:568
A synchronous queue of objects.
Definition: syncthrd.h:579
PQueuedThreadPool< PSafeWork > PSafeThreadPool
The thread pool for PSafeWork items.
Definition: threadpool.h:588
WorkerThread * m_worker
Definition: threadpool.h:290
ExternalToInternalWorkMap_T m_externalToInternalWorkMap
Definition: threadpool.h:282
PTime m_nextWorkerIncreaseTime
Definition: threadpool.h:388
void AddWork(Work_T *work, const string &group)
Definition: threadpool.h:435
GroupInfoMap_t m_groupInfoMap
Definition: threadpool.h:299
WorkerThread(PThreadPool &pool, Priority priority=NormalPriority, const char *threadName=NULL)
Definition: threadpool.h:251
PTime m_time
Definition: threadpool.h:488
PThreadPoolBase(unsigned maxWorkerCount, unsigned maxWorkUnitCount, const char *threadName, PThread::Priority priority)
P_ALIGN_FIELD(Function, m_function, 16)
GroupInfo(WorkerThread *worker)
Definition: threadpool.h:291
std::map< Work_T *, InternalWork > ExternalToInternalWorkMap_T
Definition: threadpool.h:281
P_ALIGN_FIELD(Function, m_function, 16)
Arg2Type m_arg2
Definition: threadpool.h:708
virtual void RemoveWork(Work_T *work)=0
PSafeWorkArg1(PtrClass *ptr, Arg1Type arg1, Function function)
Definition: threadpool.h:636
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:645
void RemoveWork(Work_T *work)
Definition: threadpool.h:441
#define PTRACE(...)
Output trace.
Definition: object.h:1039
unsigned m_maxWorkUnitCount
Definition: threadpool.h:218
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:725
A PSafeWork thread pool item where call back has 3 arguments.
Definition: threadpool.h:698
FuncRet(PtrClass::* Function)(Arg1Type arg1, Arg2Type arg2)
Definition: threadpool.h:664
Arg2Type m_arg2
Definition: threadpool.h:669
High Level (queued work item) thread pool.
Definition: threadpool.h:383
virtual void OnMaxWaitTime(const QueuedWorkerThread &PTRACE_PARAM(thread), const PTimeInterval &PTRACE_PARAM(latency), const string &PTRACE_PARAM(group))
Definition: threadpool.h:498
unsigned GetWorkSize() const
Definition: threadpool.h:446
FuncRet(PtrClass::* Function)()
Definition: threadpool.h:598
PSafeWorkArg2(PtrClass *ptr, Arg1Type arg1, Arg2Type arg2, Function function)
Definition: threadpool.h:672
These classes and templates implement a generic thread pooling mechanism.
Definition: threadpool.h:150
#define PAssertNULL(ptr)
This macro is used to assert that a pointer must be non-null.
Definition: object.h:428
This class defines a thread-safe object in a collection.
Definition: safecoll.h:123
virtual bool Work()
Definition: threadpool.h:454
virtual PThreadPoolBase::WorkerThreadBase * CreateWorkerThread()
Definition: threadpool.h:558
QueuedWork()
Definition: threadpool.h:486
bool m_shutdown
Definition: threadpool.h:165
WorkerThread * m_worker
Definition: threadpool.h:275
void Shutdown()
Definition: threadpool.h:477
The character string class.
Definition: pstring.h:108
virtual void AddWork(Work_T *work, const string &group)=0
FuncRet(PtrClass::* Function)(Arg1Type arg1, Arg2Type arg2, Arg3Type arg3)
Definition: threadpool.h:703
virtual WorkerThreadBase * CreateWorkerThread()=0
const PTimeInterval & GetWorkerIncreaseLatency() const
Definition: threadpool.h:414
A PSafeWork thread pool item where call back has 1 argument.
Definition: threadpool.h:624
std::map< std::string, GroupInfo > GroupInfoMap_t
Definition: threadpool.h:298
Definition: threadpool.h:153
virtual void Work()
Definition: threadpool.h:574
Definition: threadpool.h:288
This class defines a thread of execution in the system.
Definition: thread.h:66
virtual void CallFunction(PSafeObject &obj)
Definition: threadpool.h:683
PDECLARE_MUTEX(m_workerMutex)
PThreadPool(unsigned maxWorkers=10, unsigned maxWorkUnits=0, const char *threadName=NULL, PThread::Priority priority=PThread::NormalPriority)
Definition: threadpool.h:237
Arg1Type m_arg1
Definition: threadpool.h:707
virtual void Main()
User override function for the main execution routine of the thread.
static unsigned GetNumProcessors()
Get number of processors, or processor cores, this machine has available.
virtual WorkerThreadBase * AllocateWorker()
virtual void Shutdown()=0
bool AddWork(Work_T *work, const char *group=NULL)
Definition: threadpool.h:305
#define PTRACE_BEGIN(...)
Begin output trace.
Definition: object.h:1078
unsigned m_maxWorkerCount
Definition: threadpool.h:217
string m_group
Definition: threadpool.h:490
unsigned GetWorkerIncreaseLimit() const
Definition: threadpool.h:416
#define PThreadPoolTraceModule
Definition: threadpool.h:45
Arg1Type m_arg1
Definition: threadpool.h:633
#define PAssert(b, msg)
This macro is used to assert that a condition must be true.
Definition: object.h:400
InternalWorkBase(const char *group)
Definition: threadpool.h:172
A PSafeWork thread pool item where call back has 2 arguments.
Definition: threadpool.h:659
#define P_REMOVE_VIRTUAL_VOID(fn)
Definition: object.h:145
PThreadPoolBase & m_pool
Definition: threadpool.h:164
atomic< bool > m_working
Definition: threadpool.h:493
std::string m_group
Definition: threadpool.h:177
Definition: threadpool.h:248
InternalWork(Work_T *work, const char *group)
Definition: threadpool.h:267
virtual void ReclaimWorkers()
PSafeWork(PSafeObject *ptr)
Definition: threadpool.h:570
Ultimate parent class for all objects in the class library.
Definition: object.h:2204
unsigned GetMaxWorkers() const
Definition: threadpool.h:188
QueuedWorkerThread(PThreadPool< Work_T > &pool, PThread::Priority priority=PThread::NormalPriority, const char *threadName=NULL)
Definition: threadpool.h:422
Arg3Type m_arg3
Definition: threadpool.h:709
unsigned m_workerIncreaseLimit
Definition: threadpool.h:387
void SetWorkerIncreaseLimit(unsigned limit)
Definition: threadpool.h:417
PQueuedThreadPool(unsigned maxWorkers=std::max(PThread::GetNumProcessors(), 10U), unsigned maxWorkUnits=0, const char *threadName=NULL, PThread::Priority priority=PThread::NormalPriority, const PTimeInterval &workerIncreaseLatency=PMaxTimeInterval, unsigned workerIncreaseLimit=0)
Definition: threadpool.h:394
Low Level thread pool.
Definition: threadpool.h:230
PString m_threadName
Definition: threadpool.h:222
static PBoolean CanTrace(unsigned level)
Determine if the level may cause trace output.
PThread::Priority m_priority
Definition: threadpool.h:223
virtual unsigned GetWorkSize() const =0
QueuedWork(Work_T *work, const string &group)
Definition: threadpool.h:487