From: Pieter Wuille Date: Fri, 6 Apr 2012 11:11:14 +0000 (-0700) Subject: Merge pull request #1033 from sipa/wait X-Git-Tag: v0.4.0-unstable~129^2~99 X-Git-Url: https://git.novaco.in/?a=commitdiff_plain;h=9362da78b0d8f0dd7bbad259adcb3e4d0cf58e56;hp=-c;p=novacoin.git Merge pull request #1033 from sipa/wait Condition variables instead of polling --- 9362da78b0d8f0dd7bbad259adcb3e4d0cf58e56 diff --combined src/net.cpp index 7dc2d4c,2d9cf6e..59bace4 --- a/src/net.cpp +++ b/src/net.cpp @@@ -64,6 -64,9 +64,9 @@@ map mapAlreadyAskedFor set setservAddNodeAddresses; CCriticalSection cs_setservAddNodeAddresses; + static CWaitableCriticalSection csOutbound; + static int nOutbound = 0; + static CConditionVariable condOutbound; unsigned short GetListenPort() @@@ -289,6 -292,105 +292,6 @@@ void AddressCurrentlyConnected(const CS -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1) -{ - // If the dialog might get closed before the reply comes back, - // call this in the destructor so it doesn't get called after it's deleted. - CRITICAL_BLOCK(cs_vNodes) - { - BOOST_FOREACH(CNode* pnode, vNodes) - { - CRITICAL_BLOCK(pnode->cs_mapRequests) - { - for (map::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();) - { - CRequestTracker& tracker = (*mi).second; - if (tracker.fn == fn && tracker.param1 == param1) - pnode->mapRequests.erase(mi++); - else - mi++; - } - } - } - } -} - - - - - - - -// -// Subscription methods for the broadcast and subscription system. -// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT. -// -// The subscription system uses a meet-in-the-middle strategy. -// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers -// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through. -// - -bool AnySubscribed(unsigned int nChannel) -{ - if (pnodeLocalHost->IsSubscribed(nChannel)) - return true; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode->IsSubscribed(nChannel)) - return true; - return false; -} - -bool CNode::IsSubscribed(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return false; - return vfSubscribe[nChannel]; -} - -void CNode::Subscribe(unsigned int nChannel, unsigned int nHops) -{ - if (nChannel >= vfSubscribe.size()) - return; - - if (!AnySubscribed(nChannel)) - { - // Relay subscribe - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("subscribe", nChannel, nHops); - } - - vfSubscribe[nChannel] = true; -} - -void CNode::CancelSubscribe(unsigned int nChannel) -{ - if (nChannel >= vfSubscribe.size()) - return; - - // Prevent from relaying cancel if wasn't subscribed - if (!vfSubscribe[nChannel]) - return; - vfSubscribe[nChannel] = false; - - if (!AnySubscribed(nChannel)) - { - // Relay subscription cancel - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (pnode != this) - pnode->PushMessage("sub-cancel", nChannel); - } -} - - - - - - - CNode* FindNode(const CNetAddr& ip) @@@ -361,6 -463,8 +364,8 @@@ CNode* ConnectNode(CAddress addrConnect pnode->AddRef(); CRITICAL_BLOCK(cs_vNodes) vNodes.push_back(pnode); + WAITABLE_CRITICAL_BLOCK(csOutbound) + nOutbound++; pnode->nTimeConnected = GetTime(); return pnode; @@@ -387,6 -491,13 +392,6 @@@ void CNode::CloseSocketDisconnect( void CNode::Cleanup() { - // All of a nodes broadcasts and subscriptions are automatically torn down - // when it goes down, so a node has to stay up to keep its broadcast going. - - // Cancel subscriptions - for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++) - if (vfSubscribe[nChannel]) - CancelSubscribe(nChannel); } @@@ -504,6 -615,15 +509,15 @@@ void ThreadSocketHandler2(void* parg // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); + if (!pnode->fInbound) + WAITABLE_CRITICAL_BLOCK(csOutbound) + { + nOutbound--; + + // Connection slot(s) were removed, notify connection creator(s) + NOTIFY(condOutbound); + } + // close socket and cleanup pnode->CloseSocketDisconnect(); pnode->Cleanup(); @@@ -1172,8 -1292,6 +1186,6 @@@ void ThreadOpenConnections2(void* parg int64 nStart = GetTime(); loop { - int nOutbound = 0; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; Sleep(500); vnThreadsRunning[THREAD_OPENCONNECTIONS]++; @@@ -1181,23 -1299,13 +1193,13 @@@ return; // Limit outbound connections - loop - { - nOutbound = 0; - CRITICAL_BLOCK(cs_vNodes) - BOOST_FOREACH(CNode* pnode, vNodes) - if (!pnode->fInbound) - nOutbound++; - int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS; - nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125)); - if (nOutbound < nMaxOutboundConnections) - break; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - Sleep(2000); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - } + int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + vnThreadsRunning[THREAD_OPENCONNECTIONS]--; + WAITABLE_CRITICAL_BLOCK(csOutbound) + WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound); + vnThreadsRunning[THREAD_OPENCONNECTIONS]++; + if (fShutdown) + return; bool fAddSeeds = false; @@@ -1646,6 -1754,7 +1648,7 @@@ bool StopNode( fShutdown = true; nTransactionsUpdated++; int64 nStart = GetTime(); + NOTIFY_ALL(condOutbound); do { int nThreadsRunning = 0; diff --combined src/util.cpp index 5c47551,09361ef..d55e7ae --- a/src/util.cpp +++ b/src/util.cpp @@@ -33,6 -33,16 +33,6 @@@ bool fNoListen = false bool fLogTimestamps = false; CMedianFilter vTimeOffsets(200,0); - - -// Workaround for "multiple definition of `_tls_used'" -// http://svn.boost.org/trac/boost/ticket/4258 -extern "C" void tss_cleanup_implemented() { } - - - - - // Init openssl library multithreading support static boost::interprocess::interprocess_mutex** ppmutexOpenSSL; void locking_callback(int mode, int i, const char* file, int line) @@@ -1183,62 -1193,14 +1183,14 @@@ static void pop_lock( dd_mutex.unlock(); } - void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) + void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs) { - push_lock(this, CLockLocation(pszName, pszFile, nLine)); - #ifdef DEBUG_LOCKCONTENTION - bool result = mutex.try_lock(); - if (!result) - { - printf("LOCKCONTENTION: %s\n", pszName); - printf("Locker: %s:%d\n", pszFile, nLine); - mutex.lock(); - printf("Locked\n"); - } - #else - mutex.lock(); - #endif - } - void CCriticalSection::Leave() - { - mutex.unlock(); - pop_lock(); - } - bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine) - { - push_lock(this, CLockLocation(pszName, pszFile, nLine)); - bool result = mutex.try_lock(); - if (!result) pop_lock(); - return result; + push_lock(cs, CLockLocation(pszName, pszFile, nLine)); } - #else - - void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine) + void LeaveCritical() { - #ifdef DEBUG_LOCKCONTENTION - bool result = mutex.try_lock(); - if (!result) - { - printf("LOCKCONTENTION: %s\n", pszName); - printf("Locker: %s:%d\n", pszFile, nLine); - mutex.lock(); - } - #else - mutex.lock(); - #endif - } - - void CCriticalSection::Leave() - { - mutex.unlock(); - } - - bool CCriticalSection::TryEnter(const char*, const char*, int) - { - bool result = mutex.try_lock(); - return result; + pop_lock(); } #endif /* DEBUG_LOCKORDER */ -