00001 /* 00002 * sockagg.h 00003 * 00004 * Generalised Socket Aggregation functions 00005 * 00006 * Portable Windows Library 00007 * 00008 * Copyright (C) 2005 Post Increment 00009 * 00010 * The contents of this file are subject to the Mozilla Public License 00011 * Version 1.0 (the "License"); you may not use this file except in 00012 * compliance with the License. You may obtain a copy of the License at 00013 * http://www.mozilla.org/MPL/ 00014 * 00015 * Software distributed under the License is distributed on an "AS IS" 00016 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 00017 * the License for the specific language governing rights and limitations 00018 * under the License. 00019 * 00020 * The Original Code is Portable Windows Library. 00021 * 00022 * The Initial Developer of the Original Code is Post Increment 00023 * 00024 * Portions of this code were written with the financial assistance of 00025 * Metreos Corporation (http://www.metros.com). 00026 * 00027 * Contributor(s): ______________________________________. 00028 * 00029 * $Log: sockagg.h,v $ 00030 * Revision 1.9 2006/03/09 05:32:59 csoutheren 00031 * Reverted to conservative locking strategy, with OnClose 00032 * 00033 * Revision 1.8 2006/03/07 07:38:02 csoutheren 00034 * Refine timing windows on socket handling and cleanup unused code 00035 * 00036 * Revision 1.7 2006/03/06 02:37:25 csoutheren 00037 * Change handle locking to help prevent aggregation threads from hogging list 00038 * access 00039 * 00040 * Revision 1.6 2006/03/02 07:50:37 csoutheren 00041 * Cleanup unused code 00042 * Add OnClose function 00043 * 00044 * Revision 1.5 2006/01/18 07:16:56 csoutheren 00045 * Latest version of socket aggregation code 00046 * 00047 * Revision 1.4 2006/01/03 04:23:32 csoutheren 00048 * Fixed Unix implementation 00049 * 00050 * Revision 1.3 2005/12/23 06:44:30 csoutheren 00051 * Working implementation 00052 * 00053 * Revision 1.2 2005/12/22 07:27:36 csoutheren 00054 * More implementation 00055 * 00056 * Revision 1.1 2005/12/22 03:55:52 csoutheren 00057 * Added initial version of socket aggregation classes 00058 * 00059 * 00060 */ 00061 00062 00063 #ifndef _SOCKAGG_H 00064 #define _SOCKAGG_H 00065 00066 #ifdef P_USE_PRAGMA 00067 #pragma interface 00068 #endif 00069 00070 #include <ptlib.h> 00071 #include <ptlib/sockets.h> 00072 00073 /* 00074 00075 These classes implements a generalised method for aggregating sockets so that they can be handled by a single thread. It is 00076 intended to provide a backwards-compatible mechanism to supplant the "one socket - one thread" model used by OpenH323 00077 and OPAL with a model that provides a better thread to call ratio. A more complete explanation of this model can be 00078 found in the white paper "Increasing the Maximum Call Density of OpenH323 and OPAL" which can be at: 00079 00080 http://www.voxgratia.org/docs/call%20thread%20handling%20model%201.0.pdf 00081 00082 There are two assumptions made by this code: 00083 00084 1) The most efficient way to handle I/O is for a thread to be blocked on I/O. Any sort of timer or other 00085 polling mechanism is less efficient 00086 00087 2) The time taken to handle a received PDU is relatively small, and will not interfere with the handling of 00088 other calls that are handled in the same thread 00089 00090 UDP and TCP sockets are aggregated in different ways. UDP sockets are aggregated on the basis of a simple loop that looks 00091 for received datagrams and then processes them. TCP sockets are more complex because there is no intrinsic record-marking 00092 protocol, so it is difficult to know when a complete PDU has been received. This problem is solved by having the loop collect 00093 received data into a buffer until the handling routine decides that a full PDU has been received. 00094 00095 At the heart of each socket aggregator is a select statement that contains all of the file descriptors that are managed 00096 by the thread. One extra handle for a pipe (or on Windows, a local socket) is added to each handle list so that the thread can 00097 be woken up in order to allow the addition or removal of sockets to the list 00098 00099 */ 00100 00101 #include <ptlib.h> 00102 #include <functional> 00103 #include <vector> 00104 00106 // 00107 // this class encapsulates the system specific handle needed to specifiy a socket. 00108 // On Unix systems, this is a simple file handle. This file handle is used to uniquely 00109 // identify the socket and used in the "select" system call 00110 // On Windows systems the SOCKET handle is used to identify the socket, but a seperate WSAEVENT 00111 // handle is needed for the WSWaitForMultpleEvents call. 00112 // This is further complicated by the fact that we need to treat some pairs of sockets as a single 00113 // entity (i.e. RTP sockets) to avoid rewriting the RTP handler code. 00114 // 00115 00116 class PAggregatedHandle; 00117 00118 class PAggregatorFD 00119 { 00120 public: 00121 #ifdef _WIN32 00122 typedef WSAEVENT FD; 00123 typedef SOCKET FDType; 00124 SOCKET socket; 00125 #else 00126 typedef int FD; 00127 typedef int FDType; 00128 #endif 00129 00130 PAggregatorFD(FDType fd); 00131 00132 FD fd; 00133 00134 ~PAggregatorFD(); 00135 bool IsValid(); 00136 }; 00137 00138 typedef std::vector<PAggregatorFD *> PAggregatorFDList_t; 00139 00141 // 00142 // This class defines an abstract class used to define a handle that can be aggregated 00143 // 00144 // Normally this will correspond directly to a socket, but for RTP this actually corresponds to two sockets 00145 // which greatly complicates things 00146 // 00147 00148 #ifdef _MSC_VER 00149 #pragma warning(push) 00150 #pragma warning(disable:4127) 00151 #endif 00152 00153 class PAggregatedHandle : public PObject 00154 { 00155 PCLASSINFO(PAggregatedHandle, PObject); 00156 public: 00157 PAggregatedHandle(BOOL _autoDelete = FALSE) 00158 : autoDelete(_autoDelete), closed(FALSE), beingProcessed(FALSE), preReadDone(FALSE) 00159 { } 00160 00161 virtual PAggregatorFDList_t GetFDs() = 0; 00162 00163 virtual PTimeInterval GetTimeout() 00164 { return PMaxTimeInterval; } 00165 00166 virtual BOOL Init() { return TRUE; } 00167 virtual BOOL PreRead() { return TRUE; } 00168 virtual BOOL OnRead() = 0; 00169 virtual void DeInit() { } 00170 virtual void OnClose() { } 00171 00172 virtual BOOL IsPreReadDone() const 00173 { return preReadDone; } 00174 00175 virtual void SetPreReadDone(BOOL v = TRUE) 00176 { preReadDone = v; } 00177 00178 BOOL autoDelete; 00179 BOOL closed; 00180 BOOL beingProcessed; 00181 00182 protected: 00183 BOOL preReadDone; 00184 }; 00185 00186 #ifdef _MSC_VER 00187 #pragma warning(pop) 00188 #endif 00189 00190 00192 // 00193 // This class is the actual socket aggregator 00194 // 00195 00196 class PHandleAggregator : public PObject 00197 { 00198 PCLASSINFO(PHandleAggregator, PObject) 00199 public: 00200 class EventBase 00201 { 00202 public: 00203 virtual PAggregatorFD::FD GetHandle() = 0; 00204 virtual void Set() = 0; 00205 virtual void Reset() = 0; 00206 }; 00207 00208 typedef std::vector<PAggregatedHandle *> PAggregatedHandleList_t; 00209 00210 class WorkerThreadBase : public PThread 00211 { 00212 public: 00213 WorkerThreadBase(EventBase & _event); 00214 00215 virtual void Trigger() = 0; 00216 void Main(); 00217 00218 PMutex workerMutex; 00219 00220 EventBase & event; 00221 PAggregatedHandleList_t handleList; 00222 BOOL listChanged; 00223 BOOL shutdown; 00224 }; 00225 00226 typedef std::vector<WorkerThreadBase *> WorkerList_t; 00227 00228 PHandleAggregator(unsigned _max = 10); 00229 00230 BOOL AddHandle(PAggregatedHandle * handle); 00231 00232 BOOL RemoveHandle(PAggregatedHandle * handle); 00233 00234 PMutex listMutex; 00235 WorkerList_t workers; 00236 unsigned maxWorkerSize; 00237 }; 00238 00239 00241 // 00242 // This template class allows the creation of aggregators for sockets that are 00243 // descendants of PIPSocket 00244 // 00245 00246 template <class PSocketType> 00247 class PSocketAggregator : public PHandleAggregator 00248 { 00249 PCLASSINFO(PSocketAggregator, PHandleAggregator) 00250 public: 00251 class AggregatedPSocket : public PAggregatedHandle 00252 { 00253 public: 00254 AggregatedPSocket(PSocketType * _s) 00255 : psocket(_s), fd(_s->GetHandle()) { } 00256 00257 BOOL OnRead() 00258 { return psocket->OnRead(); } 00259 00260 PAggregatorFDList_t GetFDs() 00261 { PAggregatorFDList_t list; list.push_back(&fd); return list; } 00262 00263 protected: 00264 PSocketType * psocket; 00265 PAggregatorFD fd; 00266 }; 00267 00268 typedef std::map<PSocketType *, AggregatedPSocket *> SocketList_t; 00269 SocketList_t socketList; 00270 00271 BOOL AddSocket(PSocketType * sock) 00272 { 00273 PWaitAndSignal m(listMutex); 00274 00275 AggregatedPSocket * handle = new AggregatedPSocket(sock); 00276 if (AddHandle(handle)) { 00277 socketList.insert(SocketList_t::value_type(sock, handle)); 00278 return true; 00279 } 00280 00281 delete handle; 00282 return false; 00283 } 00284 00285 BOOL RemoveSocket(PSocketType * sock) 00286 { 00287 PWaitAndSignal m(listMutex); 00288 00289 typename SocketList_t::iterator r = socketList.find(sock); 00290 if (r == socketList.end()) 00291 return FALSE; 00292 00293 AggregatedPSocket * handle = r->second; 00294 RemoveHandle(handle); 00295 delete handle; 00296 socketList.erase(r); 00297 return TRUE; 00298 } 00299 }; 00300 00301 #endif