PTLib  Version 2.14.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
threadpool.h
Go to the documentation of this file.
1 /*
2  * threadpool.h
3  *
4  * Generalised Thread Pooling functions
5  *
6  * Portable Tools Library
7  *
8  * Copyright (C) 2009 Post Increment
9  *
10  * The contents of this file are subject to the Mozilla Public License
11  * Version 1.0 (the "License"); you may not use this file except in
12  * compliance with the License. You may obtain a copy of the License at
13  * http://www.mozilla.org/MPL/
14  *
15  * Software distributed under the License is distributed on an "AS IS"
16  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17  * the License for the specific language governing rights and limitations
18  * under the License.
19  *
20  * The Original Code is Portable Windows Library.
21  *
22  * The Initial Developer of the Original Code is Post Increment
23  *
24  * Portions of this code were written with the financial assistance of
25  * Metreos Corporation (http://www.metros.com).
26  *
27  * Contributor(s): ______________________________________.
28  *
29  * $Revision: 32255 $
30  * $Author: rjongbloed $
31  * $Date: 2014-06-29 17:49:31 +1000 (Sun, 29 Jun 2014) $
32  */
33 
34 
35 #ifndef PTLIB_THREADPOOL_H
36 #define PTLIB_THREADPOOL_H
37 
38 #ifdef P_USE_PRAGMA
39 #pragma interface
40 #endif
41 
42 
43 #include <ptlib/thread.h>
44 #include <ptlib/safecoll.h>
45 #include <map>
46 #include <queue>
47 
48 
153 class PThreadPoolBase : public PObject
154 {
155  public:
156  class WorkerThreadBase : public PThread
157  {
158  protected:
159  WorkerThreadBase(Priority priority, const char * threadName)
160  : PThread(100, NoAutoDeleteThread, priority, threadName)
161  , m_shutdown(false)
162  { }
163 
164  public:
165  virtual void Shutdown() = 0;
166  virtual unsigned GetWorkSize() const = 0;
167 
170  };
171 
173  {
174  public:
175  InternalWorkBase(const char * group)
176  {
177  if (group != NULL)
178  m_group = group;
179  }
180  std::string m_group;
181  };
182 
184 
185  virtual WorkerThreadBase * CreateWorkerThread() = 0;
186  virtual WorkerThreadBase * AllocateWorker();
187  virtual WorkerThreadBase * NewWorker();
188 
189  unsigned GetMaxWorkers() const { return m_maxWorkerCount; }
190 
192  unsigned count
193  ) { m_maxWorkerCount = count; }
194 
195  unsigned GetMaxUnits() const { return m_maxWorkUnitCount; }
196 
198  unsigned count
199  ) { m_maxWorkUnitCount = count; }
200 
201  protected:
203  unsigned maxWorkerCount,
204  unsigned maxWorkUnitCount,
205  const char * threadName,
206  PThread::Priority priority
207  );
208 
209  virtual bool CheckWorker(WorkerThreadBase * worker);
210  void StopWorker(WorkerThreadBase * worker);
212 
213  typedef std::vector<WorkerThreadBase *> WorkerList_t;
215 
218  unsigned m_highWaterMark; // For logging
221 };
222 
223 
226 template <class Work_T>
228 {
229  PCLASSINFO(PThreadPool, PThreadPoolBase);
230  public:
231  //
232  // constructor
233  //
235  unsigned maxWorkers = 10,
236  unsigned maxWorkUnits = 0,
237  const char * threadName = NULL,
239  ) : PThreadPoolBase(maxWorkers, maxWorkUnits, threadName, priority)
240  { }
241 
242  //
243  // define the ancestor of the worker thread
244  //
245  class WorkerThread : public WorkerThreadBase
246  {
247  protected:
248  WorkerThread(PThreadPool & pool, Priority priority = NormalPriority, const char * threadName = NULL)
249  : WorkerThreadBase(priority, threadName)
250  , m_pool(pool)
251  {
252  }
253 
254  public:
255  virtual void AddWork(Work_T * work) = 0;
256  virtual void RemoveWork(Work_T * work) = 0;
257  virtual void Main() = 0;
258 
259  protected:
261  };
262 
263  //
264  // define internal worker wrapper class
265  //
266  class InternalWork : public InternalWorkBase
267  {
268  public:
269  InternalWork(WorkerThread * worker, Work_T * work, const char * group)
270  : InternalWorkBase(group)
271  , m_worker(worker)
272  , m_work(work)
273  {
274  }
275 
277  Work_T * m_work;
278  };
279 
280  //
281  // define map for external work units to internal work
282  //
283  typedef std::map<Work_T *, InternalWork> ExternalToInternalWorkMap_T;
285 
286 
287  //
288  // define class for storing group informationm
289  //
290  struct GroupInfo {
291  unsigned m_count;
293  };
294 
295 
296  //
297  // define map for group ID to group information
298  //
299  typedef std::map<std::string, GroupInfo> GroupInfoMap_t;
301 
302 
303  //
304  // add a new unit of work to a worker thread
305  //
306  bool AddWork(Work_T * work, const char * group = NULL)
307  {
309 
310  // allocate by group if specified
311  // else allocate to least busy
312  WorkerThread * worker;
313  if ((group == NULL) || (strlen(group) == 0)) {
314  worker = (WorkerThread *)AllocateWorker();
315  }
316  else {
317 
318  // find the worker thread with the matching group ID
319  // if no matching Id, then create a new thread
320  typename GroupInfoMap_t::iterator g = m_groupInfoMap.find(group);
321  if (g == m_groupInfoMap.end())
322  worker = (WorkerThread *)AllocateWorker();
323  else {
324  worker = g->second.m_worker;
325  PTRACE(4, "ThreadPool\tAllocated worker thread by group Id " << group);
326  }
327  }
328 
329  // if cannot allocate worker, return
330  if (worker == NULL)
331  return false;
332 
333  // create internal work structure
334  InternalWork internalWork(worker, work, group);
335 
336  // add work to external to internal map
337  m_externalToInternalWorkMap.insert(typename ExternalToInternalWorkMap_T::value_type(work, internalWork));
338 
339  // add group ID to map
340  if (!internalWork.m_group.empty()) {
341  typename GroupInfoMap_t::iterator r = m_groupInfoMap.find(internalWork.m_group);
342  if (r != m_groupInfoMap.end())
343  ++r->second.m_count;
344  else {
345  GroupInfo info;
346  info.m_count = 1;
347  info.m_worker = worker;
348  m_groupInfoMap.insert(typename GroupInfoMap_t::value_type(internalWork.m_group, info));
349  }
350  }
351 
352  // give the work to the worker
353  worker->AddWork(work);
354 
355  return true;
356  }
357 
358  //
359  // remove a unit of work from a worker thread
360  //
361  bool RemoveWork(Work_T * work, bool removeFromWorker = true)
362  {
363  PWaitAndSignal m(m_listMutex);
364 
365  // find worker with work unit to remove
366  typename ExternalToInternalWorkMap_T::iterator iterWork = m_externalToInternalWorkMap.find(work);
367  if (iterWork == m_externalToInternalWorkMap.end())
368  return false;
369 
370  InternalWork & internalWork = iterWork->second;
371 
372  // tell worker to stop processing work
373  if (removeFromWorker)
374  internalWork.m_worker->RemoveWork(work);
375 
376  // update group information
377  if (!internalWork.m_group.empty()) {
378  typename GroupInfoMap_t::iterator iterGroup = m_groupInfoMap.find(internalWork.m_group);
379  PAssert(iterGroup != m_groupInfoMap.end(), "Attempt to find thread from unknown work group");
380  if (iterGroup != m_groupInfoMap.end()) {
381  if (--iterGroup->second.m_count == 0)
382  m_groupInfoMap.erase(iterGroup);
383  }
384  }
385 
386  // see if workers need reorganising
387  CheckWorker(internalWork.m_worker);
388 
389  // remove element from work unit map
390  m_externalToInternalWorkMap.erase(iterWork);
391 
392  return true;
393  }
394 };
395 
396 
399 template <class Work_T>
400 class PQueuedThreadPool : public PThreadPool<Work_T>
401 {
402  public:
403  //
404  // constructor
405  //
407  unsigned maxWorkers = 10,
408  unsigned maxWorkUnits = 0,
409  const char * threadName = NULL,
411  ) : PThreadPool<Work_T>(maxWorkers, maxWorkUnits, threadName, priority)
412  { }
413 
415  {
416  public:
419  const char * threadName = NULL)
420  : PThreadPool<Work_T>::WorkerThread(pool, priority, threadName)
421  , m_available(0, INT_MAX)
422  {
423  }
424 
426  { }
427 
428  void AddWork(Work_T * work)
429  {
430  m_mutex.Wait();
431  m_queue.push(work);
433  m_mutex.Signal();
434  }
435 
436  void RemoveWork(Work_T * )
437  {
438  m_mutex.Wait();
439  Work_T * work = m_queue.front();
440  m_queue.pop();
441  m_mutex.Signal();
442  delete work;
443  }
444 
445  unsigned GetWorkSize() const
446  {
447  return (unsigned)m_queue.size();
448  }
449 
450  void Main()
451  {
452  for (;;) {
453  m_available.Wait();
455  break;
456 
457  m_mutex.Wait();
458  Work_T * work = m_queue.empty() ? NULL : m_queue.front();
459  m_mutex.Signal();
460 
461  if (work != NULL) {
462  work->Work();
464  }
465  }
466  }
467 
468  void Shutdown()
469  {
472  }
473 
474  protected:
475  typedef std::queue<Work_T *> Queue;
479  };
480 
481 
483  {
484  return new QueuedWorkerThread(*this, this->m_priority, this->m_threadName);
485  }
486 };
487 
488 
492 class PSafeWork : public PSafePtrBase {
493  public:
495  PSafeObject * ptr
496  ) : PSafePtrBase(ptr) { }
497 
498  virtual void Work()
499  {
500  PSafeObject * ptr = this->GetObject();
501  if (ptr != NULL) {
503  CallFunction(*ptr);
504  }
505  }
506 
507  virtual void CallFunction(PSafeObject & obj) = 0;
508 };
509 
510 
513 
514 
516 template <class PtrClass, typename FuncRet = void>
517 class PSafeWorkNoArg : public PSafeWork {
518  public:
519  typedef FuncRet (PtrClass::*Function)();
520 
521  protected:
522  P_ALIGN_FIELD(Function,m_function,16);
523 
524  public:
526  PtrClass * ptr,
527  Function function
528  ) : PSafeWork(ptr)
529  , m_function(function)
530  { }
531 
532  virtual void CallFunction(PSafeObject & obj)
533  {
534  (dynamic_cast<PtrClass&>(obj).*(this->m_function))();
535  }
536 };
537 
538 
540 template <
541  class PtrClass,
542  typename Arg1Type,
543  typename FuncRet = void
544 >
545 class PSafeWorkArg1 : public PSafeWork {
546  public:
547  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1);
548 
549  protected:
550  P_ALIGN_FIELD(Function,m_function,16);
551  Arg1Type m_arg1;
552 
553  public:
555  PtrClass * ptr,
556  Arg1Type arg1,
557  Function function
558  ) : PSafeWork(ptr)
559  , m_function(function)
560  , m_arg1(arg1)
561  { }
562 
563  virtual void CallFunction(PSafeObject & obj)
564  {
565  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1);
566  }
567 };
568 
569 
571 template <
572  class PtrClass,
573  typename Arg1Type,
574  typename Arg2Type,
575  typename FuncRet = void
576 >
577 class PSafeWorkArg2 : public PSafeWork {
578  public:
579  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1, Arg2Type arg2);
580 
581  protected:
582  P_ALIGN_FIELD(Function,m_function,16);
583  Arg1Type m_arg1;
584  Arg2Type m_arg2;
585 
586  public:
588  PtrClass * ptr,
589  Arg1Type arg1,
590  Arg2Type arg2,
591  Function function
592  ) : PSafeWork(ptr)
593  , m_function(function)
594  , m_arg1(arg1)
595  , m_arg2(arg2)
596  { }
597 
598  virtual void CallFunction(PSafeObject & obj)
599  {
600  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1, m_arg2);
601  }
602 };
603 
604 
606 template <
607  class PtrClass,
608  typename Arg1Type,
609  typename Arg2Type,
610  typename Arg3Type,
611  typename FuncRet = void
612 >
613 class PSafeWorkArg3 : public PSafeWork {
614  public:
615  typedef FuncRet (PtrClass::*Function)(Arg1Type arg1, Arg2Type arg2, Arg3Type arg3);
616 
617  protected:
618  P_ALIGN_FIELD(Function,m_function,16);
619  Arg1Type m_arg1;
620  Arg2Type m_arg2;
621  Arg3Type m_arg3;
622 
623  public:
625  PtrClass * ptr,
626  Arg1Type arg1,
627  Arg2Type arg2,
628  Arg2Type arg3,
629  Function function
630  ) : PSafeWork(ptr)
631  , m_function(function)
632  , m_arg1(arg1)
633  , m_arg2(arg2)
634  , m_arg3(arg3)
635  { }
636 
637  virtual void CallFunction(PSafeObject & obj)
638  {
639  (dynamic_cast<PtrClass&>(obj).*(this->m_function))(m_arg1, m_arg2, m_arg3);
640  }
641 };
642 
643 
644 #endif // PTLIB_THREADPOOL_H
645 
646 
647 // End Of File ///////////////////////////////////////////////////////////////