00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef _SOCKAGG_H
00036 #define _SOCKAGG_H
00037
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041
00042 #include <ptlib.h>
00043 #include <ptlib/sockets.h>
00044
00045 #include <list>
00046 #include <map>
00047
00048
00049
00050
00051
00052
00053 class PThreadPoolBase;
00054
00055 class PThreadPoolWorkerBase : public PThread
00056 {
00057 public:
00058 PThreadPoolWorkerBase(PThreadPoolBase & threadPool);
00059
00060 virtual unsigned GetWorkSize() const = 0;
00061 virtual void Shutdown() = 0;
00062
00063
00064
00065
00066 PThreadPoolBase & pool;
00067 PBoolean shutdown;
00068 PMutex workerMutex;
00069 };
00070
00071 class PThreadPoolBase : public PObject
00072 {
00073 public:
00074 PThreadPoolBase(unsigned _max = 10);
00075 ~PThreadPoolBase();
00076
00077 virtual PThreadPoolWorkerBase * CreateWorkerThread() = 0;
00078
00079 virtual PThreadPoolWorkerBase * AllocateWorker();
00080
00081 protected:
00082 virtual bool CheckWorker(PThreadPoolWorkerBase * worker);
00083 void StopWorker(PThreadPoolWorkerBase * worker);
00084 PMutex listMutex;
00085 typedef std::vector<PThreadPoolWorkerBase *> WorkerList_t;
00086 WorkerList_t workers;
00087
00088 unsigned maxWorkerSize;
00089 };
00090
00091 template <class WorkUnit_T, class WorkerThread_T>
00092 class PThreadPool : public PThreadPoolBase
00093 {
00094 PCLASSINFO(PThreadPool, PThreadPoolBase);
00095 public:
00096 typedef typename std::map<WorkUnit_T *, WorkerThread_T *> WorkUnitMap_T;
00097
00098 PThreadPool(unsigned _max = 10)
00099 : PThreadPoolBase(_max) { }
00100
00101 virtual PThreadPoolWorkerBase * CreateWorkerThread()
00102 { return new WorkerThread_T(*this); }
00103
00104 bool AddWork(WorkUnit_T * workUnit)
00105 {
00106 PWaitAndSignal m(listMutex);
00107
00108 PThreadPoolWorkerBase * _worker = AllocateWorker();
00109 if (_worker == NULL)
00110 return false;
00111
00112 WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(_worker);
00113 workUnitMap.insert(typename WorkUnitMap_T::value_type(workUnit, worker));
00114
00115 worker->OnAddWork(workUnit);
00116
00117 return true;
00118 }
00119
00120 bool RemoveWork(WorkUnit_T * workUnit)
00121 {
00122 PWaitAndSignal m(listMutex);
00123
00124
00125 typename WorkUnitMap_T::iterator r = workUnitMap.find(workUnit);
00126 if (r == workUnitMap.end())
00127 return false;
00128
00129 WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(r->second);
00130
00131 workUnitMap.erase(r);
00132
00133 worker->OnRemoveWork(workUnit);
00134
00135 CheckWorker(worker);
00136
00137 return true;
00138 }
00139
00140 protected:
00141 WorkUnitMap_T workUnitMap;
00142 };
00143
00144 #if 0
00145
00146
00147
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 #include <ptlib.h>
00179 #include <functional>
00180 #include <vector>
00181
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193 class PAggregatedHandle;
00194
00195 class PAggregatorFD
00196 {
00197 public:
00198 #ifdef _WIN32
00199 typedef WSAEVENT FD;
00200 typedef SOCKET FDType;
00201 SOCKET socket;
00202 #else
00203 typedef int FD;
00204 typedef int FDType;
00205 #endif
00206
00207 PAggregatorFD(FDType fd);
00208
00209 FD fd;
00210
00211 ~PAggregatorFD();
00212 bool IsValid();
00213 };
00214
00215 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00216
00218
00219
00220
00221
00222
00223
00224
00225 #ifdef _MSC_VER
00226 #pragma warning(push)
00227 #pragma warning(disable:4127)
00228 #endif
00229
00230 class PAggregatedHandle : public PObject
00231 {
00232 PCLASSINFO(PAggregatedHandle, PObject);
00233 public:
00234 PAggregatedHandle(PBoolean _autoDelete = PFalse)
00235 : autoDelete(_autoDelete), closed(PFalse), beingProcessed(PFalse), preReadDone(PFalse)
00236 { }
00237
00238 virtual PAggregatorFDList_t GetFDs() = 0;
00239
00240 virtual PTimeInterval GetTimeout()
00241 { return PMaxTimeInterval; }
00242
00243 virtual PBoolean Init() { return PTrue; }
00244 virtual PBoolean PreRead() { return PTrue; }
00245 virtual PBoolean OnRead() = 0;
00246 virtual void DeInit() { }
00247 virtual void OnClose() { }
00248
00249 virtual PBoolean IsPreReadDone() const
00250 { return preReadDone; }
00251
00252 virtual void SetPreReadDone(PBoolean v = PTrue)
00253 { preReadDone = v; }
00254
00255 PBoolean autoDelete;
00256 PBoolean closed;
00257 PBoolean beingProcessed;
00258
00259 protected:
00260 PBoolean preReadDone;
00261 };
00262
00263 #ifdef _MSC_VER
00264 #pragma warning(pop)
00265 #endif
00266
00267
00269
00270
00271
00272
00273 #ifdef _WIN32
00274
00275 class EventBase
00276 {
00277 public:
00278 EventBase()
00279 {
00280 event = ::CreateEvent(NULL, PTrue, PFalse,NULL);
00281 PAssert(event != NULL, "CreateEvent failed");
00282 }
00283
00284 ~EventBase()
00285 { CloseHandle(event); }
00286
00287 PAggregatorFD::FD GetHandle()
00288 { return (PAggregatorFD::FD)event; }
00289
00290 void Set()
00291 { SetEvent(event); }
00292
00293 void Reset()
00294 { ResetEvent(event); }
00295
00296 protected:
00297 HANDLE event;
00298 };
00299
00300 #endif
00301
00302 typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00303
00304 class PAggregatorWorker : public PThreadPoolWorkerBase
00305 {
00306 public:
00307 PAggregatorWorker(PThreadPoolBase & _pool);
00308
00309 unsigned GetWorkSize() const;
00310 void Shutdown();
00311
00312 void OnAddWork(PAggregatedHandle *);
00313 void OnRemoveWork(PAggregatedHandle *);
00314
00315 void Main();
00316 PAggregatedHandleList_t handleList;
00317
00318 void Trigger() { localEvent.Set(); }
00319
00320 PBoolean listChanged;
00321 };
00322
00323 typedef PThreadPool<PAggregatedHandle, PAggregatorWorker> PHandleAggregatorBase;
00324
00325 class PHandleAggregator : public PHandleAggregatorBase
00326 {
00327 PCLASSINFO(PHandleAggregator, PHandleAggregatorBase)
00328 public:
00329 typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00330
00331 PHandleAggregator(unsigned _max = 10);
00332
00333 PBoolean AddHandle(PAggregatedHandle * handle);
00334
00335 PBoolean RemoveHandle(PAggregatedHandle * handle);
00336 };
00337
00338
00340
00341
00342
00343
00344
00345 #if 0
00346
00347 template <class PSocketType>
00348 class PSocketAggregator : public PHandleAggregator
00349 {
00350 PCLASSINFO(PSocketAggregator, PHandleAggregator)
00351 public:
00352 class AggregatedPSocket : public PAggregatedHandle
00353 {
00354 public:
00355 AggregatedPSocket(PSocketType * _s)
00356 : psocket(_s), fd(_s->GetHandle()) { }
00357
00358 PBoolean OnRead()
00359 { return psocket->OnRead(); }
00360
00361 PAggregatorFDList_t GetFDs()
00362 { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00363
00364 protected:
00365 PSocketType * psocket;
00366 PAggregatorFD fd;
00367 };
00368
00369 typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00370 SocketList_t socketList;
00371
00372 PBoolean AddSocket(PSocketType * sock)
00373 {
00374 PWaitAndSignal m(listMutex);
00375
00376 AggregatedPSocket * handle = new AggregatedPSocket(sock);
00377 if (AddHandle(handle)) {
00378 socketList.insert(SocketList_t::value_type(sock, handle));
00379 return true;
00380 }
00381
00382 delete handle;
00383 return false;
00384 }
00385
00386 PBoolean RemoveSocket(PSocketType * sock)
00387 {
00388 PWaitAndSignal m(listMutex);
00389
00390 typename SocketList_t::iterator r = socketList.find(sock);
00391 if (r == socketList.end())
00392 return PFalse;
00393
00394 AggregatedPSocket * handle = r->second;
00395 RemoveHandle(handle);
00396 delete handle;
00397 socketList.erase(r);
00398 return PTrue;
00399 }
00400 };
00401 #endif // #if 0
00402
00403 #endif
00404
00405
00406
00407 #endif // _SOCKAGG_H