sockagg.h

Go to the documentation of this file.
00001 /*
00002  * sockagg.h
00003  *
00004  * Generalised Socket Aggregation functions
00005  *
00006  * Portable Windows Library
00007  *
00008  * Copyright (C) 2005 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Revision: 20385 $
00030  * $Author: rjongbloed $
00031  * $Date: 2008-06-04 10:40:38 +0000 (Wed, 04 Jun 2008) $
00032  */
00033 
00034 
00035 #ifndef _SOCKAGG_H
00036 #define _SOCKAGG_H
00037 
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041 
00042 #include <ptlib.h>
00043 #include <ptlib/sockets.h>
00044 
00045 #include <list>
00046 #include <map>
00047 
00048 
00049 /*
00050  *  These classes and templates implement a generic thread pooling mechanism
00051  */
00052 
00053 class PThreadPoolBase;
00054 
00055 class PThreadPoolWorkerBase : public PThread
00056 {
00057   public:
00058     PThreadPoolWorkerBase(PThreadPoolBase & threadPool);
00059 
00060     virtual unsigned GetWorkSize() const = 0;
00061     virtual void Shutdown() = 0;
00062 
00063     //virtual void OnAddWork(work_base *);
00064     //virtual void OnRemoveWork(work_base *);
00065 
00066     PThreadPoolBase & pool;
00067     PBoolean shutdown;
00068     PMutex workerMutex;
00069 };
00070 
00071 class PThreadPoolBase : public PObject
00072 {
00073   public:
00074     PThreadPoolBase(unsigned _max = 10);
00075     ~PThreadPoolBase();
00076 
00077     virtual PThreadPoolWorkerBase * CreateWorkerThread() = 0;
00078 
00079     virtual PThreadPoolWorkerBase * AllocateWorker();
00080 
00081   protected:
00082     virtual bool CheckWorker(PThreadPoolWorkerBase * worker);
00083     void StopWorker(PThreadPoolWorkerBase * worker);
00084     PMutex listMutex;
00085     typedef std::vector<PThreadPoolWorkerBase *> WorkerList_t;
00086     WorkerList_t workers;
00087 
00088     unsigned maxWorkerSize;
00089 };
00090 
00091 template <class WorkUnit_T, class WorkerThread_T>
00092 class PThreadPool : public PThreadPoolBase
00093 {
00094   PCLASSINFO(PThreadPool, PThreadPoolBase);
00095   public:
00096     typedef typename std::map<WorkUnit_T *, WorkerThread_T *> WorkUnitMap_T;
00097 
00098     PThreadPool(unsigned _max = 10)
00099       : PThreadPoolBase(_max) { }
00100 
00101     virtual PThreadPoolWorkerBase * CreateWorkerThread()
00102     { return new WorkerThread_T(*this); }
00103 
00104     bool AddWork(WorkUnit_T * workUnit)
00105     {
00106       PWaitAndSignal m(listMutex);
00107 
00108       PThreadPoolWorkerBase * _worker = AllocateWorker();
00109       if (_worker == NULL)
00110         return false;
00111 
00112       WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(_worker);
00113       workUnitMap.insert(typename WorkUnitMap_T::value_type(workUnit, worker));
00114 
00115       worker->OnAddWork(workUnit);
00116 
00117       return true;
00118     }
00119 
00120     bool RemoveWork(WorkUnit_T * workUnit)
00121     {
00122       PWaitAndSignal m(listMutex);
00123 
00124       // find worker with work unit to remove
00125       typename WorkUnitMap_T::iterator r = workUnitMap.find(workUnit);
00126       if (r == workUnitMap.end())
00127         return false;
00128 
00129       WorkerThread_T * worker = dynamic_cast<WorkerThread_T *>(r->second);
00130 
00131       workUnitMap.erase(r);
00132 
00133       worker->OnRemoveWork(workUnit);
00134 
00135       CheckWorker(worker);
00136 
00137       return true;
00138     }
00139 
00140   protected:
00141     WorkUnitMap_T workUnitMap;
00142 };
00143 
00144 #if 0 
00145 
00146 // aggregator code disabled pending reimplementation
00147 
00149 
00150 /*
00151 
00152 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is
00153 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323
00154 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be
00155 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at:
00156 
00157          http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf
00158 
00159 There are two assumptions made by this code:
00160 
00161   1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other
00162      polling mechanism is less efficient
00163 
00164   2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of
00165      other calls that are handled in the same thread
00166 
00167 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks
00168 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 
00169 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect
00170 received data into a buffer until the handling routine decides that a full PDU has been received.
00171 
00172 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed
00173 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can
00174 be woken up in order to allow the addition or removal of sockets to the list
00175 
00176 */
00177 
00178 #include <ptlib.h>
00179 #include <functional>
00180 #include <vector>
00181 
00183 //
00184 // this class encapsulates the system specific handle needed to specifiy a socket.
00185 // On Unix systems, this is a simple file handle. This file handle is used to uniquely
00186 // identify the socket and used in the "select" system call
00187 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT
00188 // handle is needed for the WSWaitForMultpleEvents call.
00189 // This is further complicated by the fact that we need to treat some pairs of sockets as a single
00190 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code.
00191 //
00192 
00193 class PAggregatedHandle;
00194 
00195 class PAggregatorFD 
00196 {
00197   public:
00198 #ifdef _WIN32
00199     typedef WSAEVENT FD;
00200     typedef SOCKET FDType;
00201     SOCKET socket;
00202 #else
00203     typedef int FD;
00204     typedef int FDType;
00205 #endif
00206 
00207     PAggregatorFD(FDType fd);
00208 
00209     FD fd;
00210 
00211     ~PAggregatorFD();
00212     bool IsValid();
00213 };
00214 
00215 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00216 
00218 //
00219 // This class defines an abstract class used to define a handle that can be aggregated
00220 //
00221 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets
00222 // which greatly complicates things
00223 //
00224 
00225 #ifdef _MSC_VER
00226 #pragma warning(push)
00227 #pragma warning(disable:4127)
00228 #endif
00229 
00230 class PAggregatedHandle : public PObject
00231 {
00232   PCLASSINFO(PAggregatedHandle, PObject);
00233   public:
00234     PAggregatedHandle(PBoolean _autoDelete = PFalse)
00235       : autoDelete(_autoDelete), closed(PFalse), beingProcessed(PFalse), preReadDone(PFalse)
00236     { }
00237 
00238     virtual PAggregatorFDList_t GetFDs() = 0;
00239 
00240     virtual PTimeInterval GetTimeout()
00241     { return PMaxTimeInterval; }
00242 
00243     virtual PBoolean Init()      { return PTrue; }
00244     virtual PBoolean PreRead()   { return PTrue; }
00245     virtual PBoolean OnRead() = 0;
00246     virtual void DeInit()    { }
00247     virtual void OnClose()   { }
00248 
00249     virtual PBoolean IsPreReadDone() const
00250     { return preReadDone; }
00251 
00252     virtual void SetPreReadDone(PBoolean v = PTrue)
00253     { preReadDone = v; }
00254 
00255     PBoolean autoDelete;
00256     PBoolean closed;
00257     PBoolean beingProcessed;
00258 
00259   protected:
00260     PBoolean preReadDone;
00261 };
00262 
00263 #ifdef _MSC_VER
00264 #pragma warning(pop)
00265 #endif
00266 
00267 
00269 //
00270 // This class is the actual socket aggregator
00271 //
00272 
00273 #ifdef _WIN32
00274 
00275 class EventBase
00276 {
00277   public:
00278     EventBase()
00279     { 
00280       event = ::CreateEvent(NULL, PTrue, PFalse,NULL); 
00281       PAssert(event != NULL, "CreateEvent failed");
00282     }
00283 
00284     ~EventBase()
00285     { CloseHandle(event); }
00286 
00287     PAggregatorFD::FD GetHandle()
00288     { return (PAggregatorFD::FD)event; }
00289 
00290     void Set()
00291     { SetEvent(event);  }
00292 
00293     void Reset()
00294     { ResetEvent(event); }
00295 
00296   protected:
00297     HANDLE event;
00298 };
00299 
00300 #endif
00301 
00302 typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00303 
00304 class PAggregatorWorker : public PThreadPoolWorkerBase
00305 {
00306   public:
00307     PAggregatorWorker(PThreadPoolBase & _pool);
00308 
00309     unsigned GetWorkSize() const;
00310     void Shutdown();
00311 
00312     void OnAddWork(PAggregatedHandle *);
00313     void OnRemoveWork(PAggregatedHandle *);
00314 
00315     void Main();
00316     PAggregatedHandleList_t handleList;
00317 
00318     void Trigger()  { localEvent.Set(); }
00319     
00320     PBoolean listChanged;
00321 };
00322 
00323 typedef PThreadPool<PAggregatedHandle, PAggregatorWorker> PHandleAggregatorBase;
00324 
00325 class PHandleAggregator : public PHandleAggregatorBase
00326 {
00327   PCLASSINFO(PHandleAggregator, PHandleAggregatorBase)
00328   public:
00329     typedef std::list<PAggregatedHandle *> PAggregatedHandleList_t;
00330 
00331     PHandleAggregator(unsigned _max = 10);
00332 
00333     PBoolean AddHandle(PAggregatedHandle * handle);
00334 
00335     PBoolean RemoveHandle(PAggregatedHandle * handle);
00336 };
00337 
00338 
00340 //
00341 // This template class allows the creation of aggregators for sockets that are
00342 // descendants of PIPSocket
00343 //
00344 
00345 #if 0
00346 
00347 template <class PSocketType>
00348 class PSocketAggregator : public PHandleAggregator
00349 {
00350   PCLASSINFO(PSocketAggregator, PHandleAggregator)
00351   public:
00352     class AggregatedPSocket : public PAggregatedHandle
00353     {
00354       public:
00355         AggregatedPSocket(PSocketType * _s)
00356           : psocket(_s), fd(_s->GetHandle()) { }
00357 
00358         PBoolean OnRead()
00359         { return psocket->OnRead(); }
00360 
00361         PAggregatorFDList_t GetFDs()
00362         { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00363 
00364       protected:
00365         PSocketType * psocket;
00366         PAggregatorFD fd;
00367     };
00368 
00369     typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00370     SocketList_t socketList;
00371 
00372     PBoolean AddSocket(PSocketType * sock)
00373     { 
00374       PWaitAndSignal m(listMutex);
00375 
00376       AggregatedPSocket * handle = new AggregatedPSocket(sock);
00377       if (AddHandle(handle)) {
00378         socketList.insert(SocketList_t::value_type(sock, handle));
00379         return true;
00380       }
00381 
00382       delete handle;
00383       return false;
00384     }
00385 
00386     PBoolean RemoveSocket(PSocketType * sock)
00387     { 
00388       PWaitAndSignal m(listMutex);
00389 
00390       typename SocketList_t::iterator r = socketList.find(sock);
00391       if (r == socketList.end()) 
00392         return PFalse;
00393 
00394       AggregatedPSocket * handle = r->second;
00395       RemoveHandle(handle);
00396       delete handle;
00397       socketList.erase(r);
00398       return PTrue;
00399     }
00400 };
00401 #endif  // #if 0
00402 
00403 #endif
00404 
00405 // aggregator code disabled pending reimplementation
00406 
00407 #endif // _SOCKAGG_H

Generated on Mon Feb 23 01:57:54 2009 for PTLib by  doxygen 1.5.1