sockagg.h

Go to the documentation of this file.
00001 /*
00002  * sockagg.h
00003  *
00004  * Generalised Socket Aggregation functions
00005  *
00006  * Portable Windows Library
00007  *
00008  * Copyright (C) 2005 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Log: sockagg.h,v $
00030  * Revision 1.9  2006/03/09 05:32:59  csoutheren
00031  * Reverted to conservative locking strategy, with OnClose
00032  *
00033  * Revision 1.8  2006/03/07 07:38:02  csoutheren
00034  * Refine timing windows on socket handling and cleanup unused code
00035  *
00036  * Revision 1.7  2006/03/06 02:37:25  csoutheren
00037  * Change handle locking to help prevent aggregation threads from hogging list
00038  *  access
00039  *
00040  * Revision 1.6  2006/03/02 07:50:37  csoutheren
00041  * Cleanup unused code
00042  * Add OnClose function
00043  *
00044  * Revision 1.5  2006/01/18 07:16:56  csoutheren
00045  * Latest version of socket aggregation code
00046  *
00047  * Revision 1.4  2006/01/03 04:23:32  csoutheren
00048  * Fixed Unix implementation
00049  *
00050  * Revision 1.3  2005/12/23 06:44:30  csoutheren
00051  * Working implementation
00052  *
00053  * Revision 1.2  2005/12/22 07:27:36  csoutheren
00054  * More implementation
00055  *
00056  * Revision 1.1  2005/12/22 03:55:52  csoutheren
00057  * Added initial version of socket aggregation classes
00058  *
00059  *
00060  */
00061 
00062 
00063 #ifndef _SOCKAGG_H
00064 #define _SOCKAGG_H
00065 
00066 #ifdef P_USE_PRAGMA
00067 #pragma interface
00068 #endif
00069 
00070 #include <ptlib.h>
00071 #include <ptlib/sockets.h>
00072 
00073 /*
00074 
00075 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is
00076 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323
00077 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be
00078 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at:
00079 
00080          http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf
00081 
00082 There are two assumptions made by this code:
00083 
00084   1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other
00085      polling mechanism is less efficient
00086 
00087   2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of
00088      other calls that are handled in the same thread
00089 
00090 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks
00091 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 
00092 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect
00093 received data into a buffer until the handling routine decides that a full PDU has been received.
00094 
00095 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed
00096 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can
00097 be woken up in order to allow the addition or removal of sockets to the list
00098 
00099 */
00100 
00101 #include <ptlib.h>
00102 #include <functional>
00103 #include <vector>
00104 
00106 //
00107 // this class encapsulates the system specific handle needed to specifiy a socket.
00108 // On Unix systems, this is a simple file handle. This file handle is used to uniquely
00109 // identify the socket and used in the "select" system call
00110 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT
00111 // handle is needed for the WSWaitForMultpleEvents call.
00112 // This is further complicated by the fact that we need to treat some pairs of sockets as a single
00113 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code.
00114 //
00115 
00116 class PAggregatedHandle;
00117 
00118 class PAggregatorFD 
00119 {
00120   public:
00121 #ifdef _WIN32
00122     typedef WSAEVENT FD;
00123     typedef SOCKET FDType;
00124     SOCKET socket;
00125 #else
00126     typedef int FD;
00127     typedef int FDType;
00128 #endif
00129 
00130     PAggregatorFD(FDType fd);
00131 
00132     FD fd;
00133 
00134     ~PAggregatorFD();
00135     bool IsValid();
00136 };
00137 
00138 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00139 
00141 //
00142 // This class defines an abstract class used to define a handle that can be aggregated
00143 //
00144 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets
00145 // which greatly complicates things
00146 //
00147 
00148 #ifdef _MSC_VER
00149 #pragma warning(push)
00150 #pragma warning(disable:4127)
00151 #endif
00152 
00153 class PAggregatedHandle : public PObject
00154 {
00155   PCLASSINFO(PAggregatedHandle, PObject);
00156   public:
00157     PAggregatedHandle(BOOL _autoDelete = FALSE)
00158       : autoDelete(_autoDelete), closed(FALSE), beingProcessed(FALSE), preReadDone(FALSE)
00159     { }
00160 
00161     virtual PAggregatorFDList_t GetFDs() = 0;
00162 
00163     virtual PTimeInterval GetTimeout()
00164     { return PMaxTimeInterval; }
00165 
00166     virtual BOOL Init()      { return TRUE; }
00167     virtual BOOL PreRead()   { return TRUE; }
00168     virtual BOOL OnRead() = 0;
00169     virtual void DeInit()    { }
00170     virtual void OnClose()   { }
00171 
00172     virtual BOOL IsPreReadDone() const
00173     { return preReadDone; }
00174 
00175     virtual void SetPreReadDone(BOOL v = TRUE)
00176     { preReadDone = v; }
00177 
00178     BOOL autoDelete;
00179     BOOL closed;
00180     BOOL beingProcessed;
00181 
00182   protected:
00183     BOOL preReadDone;
00184 };
00185 
00186 #ifdef _MSC_VER
00187 #pragma warning(pop)
00188 #endif
00189 
00190 
00192 //
00193 // This class is the actual socket aggregator
00194 //
00195 
00196 class PHandleAggregator : public PObject
00197 {
00198   PCLASSINFO(PHandleAggregator, PObject)
00199   public:
00200     class EventBase
00201     {
00202       public:
00203         virtual PAggregatorFD::FD GetHandle() = 0;
00204         virtual void Set() = 0;
00205         virtual void Reset() = 0;
00206     };
00207 
00208     typedef std::vector<PAggregatedHandle *> PAggregatedHandleList_t;
00209 
00210     class WorkerThreadBase : public PThread
00211     {
00212       public:
00213         WorkerThreadBase(EventBase & _event);
00214 
00215         virtual void Trigger() = 0;
00216         void Main();
00217 
00218         PMutex workerMutex;
00219 
00220         EventBase & event;
00221         PAggregatedHandleList_t handleList;
00222         BOOL listChanged;
00223         BOOL shutdown;
00224     };
00225 
00226     typedef std::vector<WorkerThreadBase *> WorkerList_t;
00227 
00228     PHandleAggregator(unsigned _max = 10);
00229 
00230     BOOL AddHandle(PAggregatedHandle * handle);
00231 
00232     BOOL RemoveHandle(PAggregatedHandle * handle);
00233 
00234     PMutex listMutex;
00235     WorkerList_t workers;
00236     unsigned maxWorkerSize;
00237 };
00238 
00239 
00241 //
00242 // This template class allows the creation of aggregators for sockets that are
00243 // descendants of PIPSocket
00244 //
00245 
00246 template <class PSocketType>
00247 class PSocketAggregator : public PHandleAggregator
00248 {
00249   PCLASSINFO(PSocketAggregator, PHandleAggregator)
00250   public:
00251     class AggregatedPSocket : public PAggregatedHandle
00252     {
00253       public:
00254         AggregatedPSocket(PSocketType * _s)
00255           : psocket(_s), fd(_s->GetHandle()) { }
00256 
00257         BOOL OnRead()
00258         { return psocket->OnRead(); }
00259 
00260         PAggregatorFDList_t GetFDs()
00261         { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00262 
00263       protected:
00264         PSocketType * psocket;
00265         PAggregatorFD fd;
00266     };
00267 
00268     typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00269     SocketList_t socketList;
00270 
00271     BOOL AddSocket(PSocketType * sock)
00272     { 
00273       PWaitAndSignal m(listMutex);
00274 
00275       AggregatedPSocket * handle = new AggregatedPSocket(sock);
00276       if (AddHandle(handle)) {
00277         socketList.insert(SocketList_t::value_type(sock, handle));
00278         return true;
00279       }
00280 
00281       delete handle;
00282       return false;
00283     }
00284 
00285     BOOL RemoveSocket(PSocketType * sock)
00286     { 
00287       PWaitAndSignal m(listMutex);
00288 
00289       typename SocketList_t::iterator r = socketList.find(sock);
00290       if (r == socketList.end()) 
00291         return FALSE;
00292 
00293       AggregatedPSocket * handle = r->second;
00294       RemoveHandle(handle);
00295       delete handle;
00296       socketList.erase(r);
00297       return TRUE;
00298     }
00299 };
00300 
00301 #endif

Generated on Fri Mar 7 06:25:03 2008 for PTLib by  doxygen 1.5.1