00001 /* 00002 * sockagg.h 00003 * 00004 * Generalised Socket Aggregation functions 00005 * 00006 * Portable Windows Library 00007 * 00008 * Copyright (C) 2005 Post Increment 00009 * 00010 * The contents of this file are subject to the Mozilla Public License 00011 * Version 1.0 (the "License"); you may not use this file except in 00012 * compliance with the License. You may obtain a copy of the License at 00013 * http://www.mozilla.org/MPL/ 00014 * 00015 * Software distributed under the License is distributed on an "AS IS" 00016 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 00017 * the License for the specific language governing rights and limitations 00018 * under the License. 00019 * 00020 * The Original Code is Portable Windows Library. 00021 * 00022 * The Initial Developer of the Original Code is Post Increment 00023 * 00024 * Portions of this code were written with the financial assistance of 00025 * Metreos Corporation (http://www.metros.com). 00026 * 00027 * Contributor(s): ______________________________________. 00028 * 00029 * $Revision: 19008 $ 00030 * $Author: rjongbloed $ 00031 * $Date: 2007-11-29 09:17:41 +0000 (Thu, 29 Nov 2007) $ 00032 */ 00033 00034 00035 #ifndef _SOCKAGG_H 00036 #define _SOCKAGG_H 00037 00038 #ifdef P_USE_PRAGMA 00039 #pragma interface 00040 #endif 00041 00042 #include <ptlib.h> 00043 #include <ptlib/sockets.h> 00044 00045 /* 00046 00047 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is 00048 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323 00049 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be 00050 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at: 00051 00052 http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf 00053 00054 There are two assumptions made by this code: 00055 00056 1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other 00057 polling mechanism is less efficient 00058 00059 2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of 00060 other calls that are handled in the same thread 00061 00062 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks 00063 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 00064 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect 00065 received data into a buffer until the handling routine decides that a full PDU has been received. 00066 00067 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed 00068 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can 00069 be woken up in order to allow the addition or removal of sockets to the list 00070 00071 */ 00072 00073 #include <ptlib.h> 00074 #include <functional> 00075 #include <vector> 00076 00078 // 00079 // this class encapsulates the system specific handle needed to specifiy a socket. 00080 // On Unix systems, this is a simple file handle. This file handle is used to uniquely 00081 // identify the socket and used in the "select" system call 00082 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT 00083 // handle is needed for the WSWaitForMultpleEvents call. 00084 // This is further complicated by the fact that we need to treat some pairs of sockets as a single 00085 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code. 00086 // 00087 00088 class PAggregatedHandle; 00089 00090 class PAggregatorFD 00091 { 00092 public: 00093 #ifdef _WIN32 00094 typedef WSAEVENT FD; 00095 typedef SOCKET FDType; 00096 SOCKET socket; 00097 #else 00098 typedef int FD; 00099 typedef int FDType; 00100 #endif 00101 00102 PAggregatorFD(FDType fd); 00103 00104 FD fd; 00105 00106 ~PAggregatorFD(); 00107 bool IsValid(); 00108 }; 00109 00110 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t; 00111 00113 // 00114 // This class defines an abstract class used to define a handle that can be aggregated 00115 // 00116 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets 00117 // which greatly complicates things 00118 // 00119 00120 #ifdef _MSC_VER 00121 #pragma warning(push) 00122 #pragma warning(disable:4127) 00123 #endif 00124 00125 class PAggregatedHandle : public PObject 00126 { 00127 PCLASSINFO(PAggregatedHandle, PObject); 00128 public: 00129 PAggregatedHandle(PBoolean _autoDelete = PFalse) 00130 : autoDelete(_autoDelete), closed(PFalse), beingProcessed(PFalse), preReadDone(PFalse) 00131 { } 00132 00133 virtual PAggregatorFDList_t GetFDs() = 0; 00134 00135 virtual PTimeInterval GetTimeout() 00136 { return PMaxTimeInterval; } 00137 00138 virtual PBoolean Init() { return PTrue; } 00139 virtual PBoolean PreRead() { return PTrue; } 00140 virtual PBoolean OnRead() = 0; 00141 virtual void DeInit() { } 00142 virtual void OnClose() { } 00143 00144 virtual PBoolean IsPreReadDone() const 00145 { return preReadDone; } 00146 00147 virtual void SetPreReadDone(PBoolean v = PTrue) 00148 { preReadDone = v; } 00149 00150 PBoolean autoDelete; 00151 PBoolean closed; 00152 PBoolean beingProcessed; 00153 00154 protected: 00155 PBoolean preReadDone; 00156 }; 00157 00158 #ifdef _MSC_VER 00159 #pragma warning(pop) 00160 #endif 00161 00162 00164 // 00165 // This class is the actual socket aggregator 00166 // 00167 00168 class PHandleAggregator : public PObject 00169 { 00170 PCLASSINFO(PHandleAggregator, PObject) 00171 public: 00172 class EventBase 00173 { 00174 public: 00175 virtual PAggregatorFD::FD GetHandle() = 0; 00176 virtual void Set() = 0; 00177 virtual void Reset() = 0; 00178 }; 00179 00180 typedef std::vector<PAggregatedHandle *> PAggregatedHandleList_t; 00181 00182 class WorkerThreadBase : public PThread 00183 { 00184 public: 00185 WorkerThreadBase(EventBase & _event); 00186 00187 virtual void Trigger() = 0; 00188 void Main(); 00189 00190 PMutex workerMutex; 00191 00192 EventBase & event; 00193 PAggregatedHandleList_t handleList; 00194 PBoolean listChanged; 00195 PBoolean shutdown; 00196 }; 00197 00198 typedef std::vector<WorkerThreadBase *> WorkerList_t; 00199 00200 PHandleAggregator(unsigned _max = 10); 00201 00202 PBoolean AddHandle(PAggregatedHandle * handle); 00203 00204 PBoolean RemoveHandle(PAggregatedHandle * handle); 00205 00206 PMutex listMutex; 00207 WorkerList_t workers; 00208 unsigned maxWorkerSize; 00209 }; 00210 00211 00213 // 00214 // This template class allows the creation of aggregators for sockets that are 00215 // descendants of PIPSocket 00216 // 00217 00218 template <class PSocketType> 00219 class PSocketAggregator : public PHandleAggregator 00220 { 00221 PCLASSINFO(PSocketAggregator, PHandleAggregator) 00222 public: 00223 class AggregatedPSocket : public PAggregatedHandle 00224 { 00225 public: 00226 AggregatedPSocket(PSocketType * _s) 00227 : psocket(_s), fd(_s->GetHandle()) { } 00228 00229 PBoolean OnRead() 00230 { return psocket->OnRead(); } 00231 00232 PAggregatorFDList_t GetFDs() 00233 { PAggregatorFDList_t list; list.push_back(&fd); return list; } 00234 00235 protected: 00236 PSocketType * psocket; 00237 PAggregatorFD fd; 00238 }; 00239 00240 typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t; 00241 SocketList_t socketList; 00242 00243 PBoolean AddSocket(PSocketType * sock) 00244 { 00245 PWaitAndSignal m(listMutex); 00246 00247 AggregatedPSocket * handle = new AggregatedPSocket(sock); 00248 if (AddHandle(handle)) { 00249 socketList.insert(SocketList_t::value_type(sock, handle)); 00250 return true; 00251 } 00252 00253 delete handle; 00254 return false; 00255 } 00256 00257 PBoolean RemoveSocket(PSocketType * sock) 00258 { 00259 PWaitAndSignal m(listMutex); 00260 00261 typename SocketList_t::iterator r = socketList.find(sock); 00262 if (r == socketList.end()) 00263 return PFalse; 00264 00265 AggregatedPSocket * handle = r->second; 00266 RemoveHandle(handle); 00267 delete handle; 00268 socketList.erase(r); 00269 return PTrue; 00270 } 00271 }; 00272 00273 #endif