00001
00002
00003 #include "pch.h"
00004 #include "wait.h"
00005 #include "misc.h"
00006
00007 #ifdef SOCKETS_AVAILABLE
00008
00009 #ifdef USE_BERKELEY_STYLE_SOCKETS
00010 #include <errno.h>
00011 #include <sys/types.h>
00012 #include <sys/time.h>
00013 #include <unistd.h>
00014 #endif
00015
00016 NAMESPACE_BEGIN(CryptoPP)
00017
00018 WaitObjectContainer::WaitObjectContainer()
00019 {
00020 Clear();
00021 }
00022
00023 void WaitObjectContainer::Clear()
00024 {
00025 #ifdef USE_WINDOWS_STYLE_SOCKETS
00026 m_handles.clear();
00027 #else
00028 m_maxFd = 0;
00029 FD_ZERO(&m_readfds);
00030 FD_ZERO(&m_writefds);
00031 #endif
00032 m_noWait = false;
00033 }
00034
00035 #ifdef USE_WINDOWS_STYLE_SOCKETS
00036
00037 struct WaitingThreadData
00038 {
00039 bool waitingToWait, terminate;
00040 HANDLE startWaiting, stopWaiting;
00041 const HANDLE *waitHandles;
00042 unsigned int count;
00043 HANDLE threadHandle;
00044 DWORD threadId;
00045 DWORD* error;
00046 };
00047
00048 WaitObjectContainer::~WaitObjectContainer()
00049 {
00050 try
00051 {
00052 if (!m_threads.empty())
00053 {
00054 HANDLE threadHandles[MAXIMUM_WAIT_OBJECTS];
00055 unsigned int i;
00056 for (i=0; i<m_threads.size(); i++)
00057 {
00058 WaitingThreadData &thread = *m_threads[i];
00059 while (!thread.waitingToWait)
00060 Sleep(0);
00061 thread.terminate = true;
00062 threadHandles[i] = thread.threadHandle;
00063 }
00064 PulseEvent(m_startWaiting);
00065 ::WaitForMultipleObjects(m_threads.size(), threadHandles, TRUE, INFINITE);
00066 for (i=0; i<m_threads.size(); i++)
00067 CloseHandle(threadHandles[i]);
00068 CloseHandle(m_startWaiting);
00069 CloseHandle(m_stopWaiting);
00070 }
00071 }
00072 catch (...)
00073 {
00074 }
00075 }
00076
00077
00078 void WaitObjectContainer::AddHandle(HANDLE handle)
00079 {
00080 m_handles.push_back(handle);
00081 }
00082
00083 DWORD WINAPI WaitingThread(LPVOID lParam)
00084 {
00085 std::auto_ptr<WaitingThreadData> pThread((WaitingThreadData *)lParam);
00086 WaitingThreadData &thread = *pThread;
00087 std::vector<HANDLE> handles;
00088
00089 while (true)
00090 {
00091 thread.waitingToWait = true;
00092 ::WaitForSingleObject(thread.startWaiting, INFINITE);
00093 thread.waitingToWait = false;
00094
00095 if (thread.terminate)
00096 break;
00097 if (!thread.count)
00098 continue;
00099
00100 handles.resize(thread.count + 1);
00101 handles[0] = thread.stopWaiting;
00102 std::copy(thread.waitHandles, thread.waitHandles+thread.count, handles.begin()+1);
00103
00104 DWORD result = ::WaitForMultipleObjects(handles.size(), &handles[0], FALSE, INFINITE);
00105
00106 if (result == WAIT_OBJECT_0)
00107 continue;
00108 SetEvent(thread.stopWaiting);
00109 if (!(result > WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + handles.size()))
00110 {
00111 assert(!"error in WaitingThread");
00112 *thread.error = ::GetLastError();
00113 }
00114 }
00115
00116 return S_OK;
00117 }
00118
00119 void WaitObjectContainer::CreateThreads(unsigned int count)
00120 {
00121 unsigned int currentCount = m_threads.size();
00122 if (currentCount == 0)
00123 {
00124 m_startWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL);
00125 m_stopWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL);
00126 }
00127
00128 if (currentCount < count)
00129 {
00130 m_threads.resize(count);
00131 for (unsigned int i=currentCount; i<count; i++)
00132 {
00133 m_threads[i] = new WaitingThreadData;
00134 WaitingThreadData &thread = *m_threads[i];
00135 thread.terminate = false;
00136 thread.startWaiting = m_startWaiting;
00137 thread.stopWaiting = m_stopWaiting;
00138 thread.waitingToWait = false;
00139 thread.threadHandle = CreateThread(NULL, 0, &WaitingThread, &thread, 0, &thread.threadId);
00140 }
00141 }
00142 }
00143
00144 bool WaitObjectContainer::Wait(unsigned long milliseconds)
00145 {
00146 if (m_noWait || m_handles.empty())
00147 return true;
00148
00149 if (m_handles.size() > MAXIMUM_WAIT_OBJECTS)
00150 {
00151
00152 static const unsigned int WAIT_OBJECTS_PER_THREAD = MAXIMUM_WAIT_OBJECTS-1;
00153 unsigned int nThreads = (m_handles.size() + WAIT_OBJECTS_PER_THREAD - 1) / WAIT_OBJECTS_PER_THREAD;
00154 if (nThreads > MAXIMUM_WAIT_OBJECTS)
00155 throw Err("WaitObjectContainer: number of wait objects exceeds limit");
00156 CreateThreads(nThreads);
00157 DWORD error = S_OK;
00158
00159 for (unsigned int i=0; i<m_threads.size(); i++)
00160 {
00161 WaitingThreadData &thread = *m_threads[i];
00162 while (!thread.waitingToWait)
00163 Sleep(0);
00164 if (i<nThreads)
00165 {
00166 thread.waitHandles = &m_handles[i*WAIT_OBJECTS_PER_THREAD];
00167 thread.count = STDMIN(WAIT_OBJECTS_PER_THREAD, m_handles.size() - i*WAIT_OBJECTS_PER_THREAD);
00168 thread.error = &error;
00169 }
00170 else
00171 thread.count = 0;
00172 }
00173
00174 ResetEvent(m_stopWaiting);
00175 PulseEvent(m_startWaiting);
00176
00177 DWORD result = ::WaitForSingleObject(m_stopWaiting, milliseconds);
00178 if (result == WAIT_OBJECT_0)
00179 {
00180 if (error == S_OK)
00181 return true;
00182 else
00183 throw Err("WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(error));
00184 }
00185 SetEvent(m_stopWaiting);
00186 if (result == WAIT_TIMEOUT)
00187 return false;
00188 else
00189 throw Err("WaitObjectContainer: WaitForSingleObject failed with error " + IntToString(::GetLastError()));
00190 }
00191 else
00192 {
00193 DWORD result = ::WaitForMultipleObjects(m_handles.size(), &m_handles[0], FALSE, milliseconds);
00194 if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + m_handles.size())
00195 return true;
00196 else if (result == WAIT_TIMEOUT)
00197 return false;
00198 else
00199 throw Err("WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(::GetLastError()));
00200 }
00201 }
00202
00203 #else
00204
00205 void WaitObjectContainer::AddReadFd(int fd)
00206 {
00207 FD_SET(fd, &m_readfds);
00208 m_maxFd = STDMAX(m_maxFd, fd);
00209 }
00210
00211 void WaitObjectContainer::AddWriteFd(int fd)
00212 {
00213 FD_SET(fd, &m_writefds);
00214 m_maxFd = STDMAX(m_maxFd, fd);
00215 }
00216
00217 bool WaitObjectContainer::Wait(unsigned long milliseconds)
00218 {
00219 if (m_noWait || m_maxFd == 0)
00220 return true;
00221
00222 timeval tv, *timeout;
00223
00224 if (milliseconds == INFINITE_TIME)
00225 timeout = NULL;
00226 else
00227 {
00228 tv.tv_sec = milliseconds / 1000;
00229 tv.tv_usec = (milliseconds % 1000) * 1000;
00230 timeout = &tv;
00231 }
00232
00233 int result = select(m_maxFd+1, &m_readfds, &m_writefds, NULL, timeout);
00234
00235 if (result > 0)
00236 return true;
00237 else if (result == 0)
00238 return false;
00239 else
00240 throw Err("WaitObjectContainer: select failed with error " + errno);
00241 }
00242
00243 #endif
00244
00245
00246
00247 bool Waitable::Wait(unsigned long milliseconds)
00248 {
00249 WaitObjectContainer container;
00250 GetWaitObjects(container);
00251 return container.Wait(milliseconds);
00252 }
00253
00254 NAMESPACE_END
00255
00256 #endif