sockagg.h

Go to the documentation of this file.
00001 /*
00002  * sockagg.h
00003  *
00004  * Generalised Socket Aggregation functions
00005  *
00006  * Portable Windows Library
00007  *
00008  * Copyright (C) 2005 Post Increment
00009  *
00010  * The contents of this file are subject to the Mozilla Public License
00011  * Version 1.0 (the "License"); you may not use this file except in
00012  * compliance with the License. You may obtain a copy of the License at
00013  * http://www.mozilla.org/MPL/
00014  *
00015  * Software distributed under the License is distributed on an "AS IS"
00016  * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
00017  * the License for the specific language governing rights and limitations
00018  * under the License.
00019  *
00020  * The Original Code is Portable Windows Library.
00021  *
00022  * The Initial Developer of the Original Code is Post Increment
00023  *
00024  * Portions of this code were written with the financial assistance of 
00025  * Metreos Corporation (http://www.metros.com).
00026  *
00027  * Contributor(s): ______________________________________.
00028  *
00029  * $Revision: 19008 $
00030  * $Author: rjongbloed $
00031  * $Date: 2007-11-29 09:17:41 +0000 (Thu, 29 Nov 2007) $
00032  */
00033 
00034 
00035 #ifndef _SOCKAGG_H
00036 #define _SOCKAGG_H
00037 
00038 #ifdef P_USE_PRAGMA
00039 #pragma interface
00040 #endif
00041 
00042 #include <ptlib.h>
00043 #include <ptlib/sockets.h>
00044 
00045 /*
00046 
00047 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is
00048 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323
00049 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be
00050 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at:
00051 
00052          http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf
00053 
00054 There are two assumptions made by this code:
00055 
00056   1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other
00057      polling mechanism is less efficient
00058 
00059   2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of
00060      other calls that are handled in the same thread
00061 
00062 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks
00063 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 
00064 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect
00065 received data into a buffer until the handling routine decides that a full PDU has been received.
00066 
00067 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed
00068 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can
00069 be woken up in order to allow the addition or removal of sockets to the list
00070 
00071 */
00072 
00073 #include <ptlib.h>
00074 #include <functional>
00075 #include <vector>
00076 
00078 //
00079 // this class encapsulates the system specific handle needed to specifiy a socket.
00080 // On Unix systems, this is a simple file handle. This file handle is used to uniquely
00081 // identify the socket and used in the "select" system call
00082 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT
00083 // handle is needed for the WSWaitForMultpleEvents call.
00084 // This is further complicated by the fact that we need to treat some pairs of sockets as a single
00085 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code.
00086 //
00087 
00088 class PAggregatedHandle;
00089 
00090 class PAggregatorFD 
00091 {
00092   public:
00093 #ifdef _WIN32
00094     typedef WSAEVENT FD;
00095     typedef SOCKET FDType;
00096     SOCKET socket;
00097 #else
00098     typedef int FD;
00099     typedef int FDType;
00100 #endif
00101 
00102     PAggregatorFD(FDType fd);
00103 
00104     FD fd;
00105 
00106     ~PAggregatorFD();
00107     bool IsValid();
00108 };
00109 
00110 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t;
00111 
00113 //
00114 // This class defines an abstract class used to define a handle that can be aggregated
00115 //
00116 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets
00117 // which greatly complicates things
00118 //
00119 
00120 #ifdef _MSC_VER
00121 #pragma warning(push)
00122 #pragma warning(disable:4127)
00123 #endif
00124 
00125 class PAggregatedHandle : public PObject
00126 {
00127   PCLASSINFO(PAggregatedHandle, PObject);
00128   public:
00129     PAggregatedHandle(PBoolean _autoDelete = PFalse)
00130       : autoDelete(_autoDelete), closed(PFalse), beingProcessed(PFalse), preReadDone(PFalse)
00131     { }
00132 
00133     virtual PAggregatorFDList_t GetFDs() = 0;
00134 
00135     virtual PTimeInterval GetTimeout()
00136     { return PMaxTimeInterval; }
00137 
00138     virtual PBoolean Init()      { return PTrue; }
00139     virtual PBoolean PreRead()   { return PTrue; }
00140     virtual PBoolean OnRead() = 0;
00141     virtual void DeInit()    { }
00142     virtual void OnClose()   { }
00143 
00144     virtual PBoolean IsPreReadDone() const
00145     { return preReadDone; }
00146 
00147     virtual void SetPreReadDone(PBoolean v = PTrue)
00148     { preReadDone = v; }
00149 
00150     PBoolean autoDelete;
00151     PBoolean closed;
00152     PBoolean beingProcessed;
00153 
00154   protected:
00155     PBoolean preReadDone;
00156 };
00157 
00158 #ifdef _MSC_VER
00159 #pragma warning(pop)
00160 #endif
00161 
00162 
00164 //
00165 // This class is the actual socket aggregator
00166 //
00167 
00168 class PHandleAggregator : public PObject
00169 {
00170   PCLASSINFO(PHandleAggregator, PObject)
00171   public:
00172     class EventBase
00173     {
00174       public:
00175         virtual PAggregatorFD::FD GetHandle() = 0;
00176         virtual void Set() = 0;
00177         virtual void Reset() = 0;
00178     };
00179 
00180     typedef std::vector<PAggregatedHandle *> PAggregatedHandleList_t;
00181 
00182     class WorkerThreadBase : public PThread
00183     {
00184       public:
00185         WorkerThreadBase(EventBase & _event);
00186 
00187         virtual void Trigger() = 0;
00188         void Main();
00189 
00190         PMutex workerMutex;
00191 
00192         EventBase & event;
00193         PAggregatedHandleList_t handleList;
00194         PBoolean listChanged;
00195         PBoolean shutdown;
00196     };
00197 
00198     typedef std::vector<WorkerThreadBase *> WorkerList_t;
00199 
00200     PHandleAggregator(unsigned _max = 10);
00201 
00202     PBoolean AddHandle(PAggregatedHandle * handle);
00203 
00204     PBoolean RemoveHandle(PAggregatedHandle * handle);
00205 
00206     PMutex listMutex;
00207     WorkerList_t workers;
00208     unsigned maxWorkerSize;
00209 };
00210 
00211 
00213 //
00214 // This template class allows the creation of aggregators for sockets that are
00215 // descendants of PIPSocket
00216 //
00217 
00218 template <class PSocketType>
00219 class PSocketAggregator : public PHandleAggregator
00220 {
00221   PCLASSINFO(PSocketAggregator, PHandleAggregator)
00222   public:
00223     class AggregatedPSocket : public PAggregatedHandle
00224     {
00225       public:
00226         AggregatedPSocket(PSocketType * _s)
00227           : psocket(_s), fd(_s->GetHandle()) { }
00228 
00229         PBoolean OnRead()
00230         { return psocket->OnRead(); }
00231 
00232         PAggregatorFDList_t GetFDs()
00233         { PAggregatorFDList_t list; list.push_back(&fd); return list; }
00234 
00235       protected:
00236         PSocketType * psocket;
00237         PAggregatorFD fd;
00238     };
00239 
00240     typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t;
00241     SocketList_t socketList;
00242 
00243     PBoolean AddSocket(PSocketType * sock)
00244     { 
00245       PWaitAndSignal m(listMutex);
00246 
00247       AggregatedPSocket * handle = new AggregatedPSocket(sock);
00248       if (AddHandle(handle)) {
00249         socketList.insert(SocketList_t::value_type(sock, handle));
00250         return true;
00251       }
00252 
00253       delete handle;
00254       return false;
00255     }
00256 
00257     PBoolean RemoveSocket(PSocketType * sock)
00258     { 
00259       PWaitAndSignal m(listMutex);
00260 
00261       typename SocketList_t::iterator r = socketList.find(sock);
00262       if (r == socketList.end()) 
00263         return PFalse;
00264 
00265       AggregatedPSocket * handle = r->second;
00266       RemoveHandle(handle);
00267       delete handle;
00268       socketList.erase(r);
00269       return PTrue;
00270     }
00271 };
00272 
00273 #endif

Generated on Mon Dec 10 11:18:57 2007 for PTLib by  doxygen 1.5.1