Merge pull request #1033 from sipa/wait
authorPieter Wuille <pieter.wuille@gmail.com>
Fri, 6 Apr 2012 11:11:14 +0000 (04:11 -0700)
committerPieter Wuille <pieter.wuille@gmail.com>
Fri, 6 Apr 2012 11:11:14 +0000 (04:11 -0700)
Condition variables instead of polling

1  2 
src/net.cpp
src/util.cpp

diff --combined src/net.cpp
@@@ -64,6 -64,9 +64,9 @@@ map<CInv, int64> mapAlreadyAskedFor
  set<CNetAddr> setservAddNodeAddresses;
  CCriticalSection cs_setservAddNodeAddresses;
  
+ static CWaitableCriticalSection csOutbound;
+ static int nOutbound = 0;
+ static CConditionVariable condOutbound;
  
  
  unsigned short GetListenPort()
@@@ -289,6 -292,105 +292,6 @@@ void AddressCurrentlyConnected(const CS
  
  
  
 -void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1)
 -{
 -    // If the dialog might get closed before the reply comes back,
 -    // call this in the destructor so it doesn't get called after it's deleted.
 -    CRITICAL_BLOCK(cs_vNodes)
 -    {
 -        BOOST_FOREACH(CNode* pnode, vNodes)
 -        {
 -            CRITICAL_BLOCK(pnode->cs_mapRequests)
 -            {
 -                for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();)
 -                {
 -                    CRequestTracker& tracker = (*mi).second;
 -                    if (tracker.fn == fn && tracker.param1 == param1)
 -                        pnode->mapRequests.erase(mi++);
 -                    else
 -                        mi++;
 -                }
 -            }
 -        }
 -    }
 -}
 -
 -
 -
 -
 -
 -
 -
 -//
 -// Subscription methods for the broadcast and subscription system.
 -// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT.
 -//
 -// The subscription system uses a meet-in-the-middle strategy.
 -// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers
 -// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through.
 -//
 -
 -bool AnySubscribed(unsigned int nChannel)
 -{
 -    if (pnodeLocalHost->IsSubscribed(nChannel))
 -        return true;
 -    CRITICAL_BLOCK(cs_vNodes)
 -        BOOST_FOREACH(CNode* pnode, vNodes)
 -            if (pnode->IsSubscribed(nChannel))
 -                return true;
 -    return false;
 -}
 -
 -bool CNode::IsSubscribed(unsigned int nChannel)
 -{
 -    if (nChannel >= vfSubscribe.size())
 -        return false;
 -    return vfSubscribe[nChannel];
 -}
 -
 -void CNode::Subscribe(unsigned int nChannel, unsigned int nHops)
 -{
 -    if (nChannel >= vfSubscribe.size())
 -        return;
 -
 -    if (!AnySubscribed(nChannel))
 -    {
 -        // Relay subscribe
 -        CRITICAL_BLOCK(cs_vNodes)
 -            BOOST_FOREACH(CNode* pnode, vNodes)
 -                if (pnode != this)
 -                    pnode->PushMessage("subscribe", nChannel, nHops);
 -    }
 -
 -    vfSubscribe[nChannel] = true;
 -}
 -
 -void CNode::CancelSubscribe(unsigned int nChannel)
 -{
 -    if (nChannel >= vfSubscribe.size())
 -        return;
 -
 -    // Prevent from relaying cancel if wasn't subscribed
 -    if (!vfSubscribe[nChannel])
 -        return;
 -    vfSubscribe[nChannel] = false;
 -
 -    if (!AnySubscribed(nChannel))
 -    {
 -        // Relay subscription cancel
 -        CRITICAL_BLOCK(cs_vNodes)
 -            BOOST_FOREACH(CNode* pnode, vNodes)
 -                if (pnode != this)
 -                    pnode->PushMessage("sub-cancel", nChannel);
 -    }
 -}
 -
 -
 -
 -
 -
 -
 -
  
  
  CNode* FindNode(const CNetAddr& ip)
@@@ -361,6 -463,8 +364,8 @@@ CNode* ConnectNode(CAddress addrConnect
              pnode->AddRef();
          CRITICAL_BLOCK(cs_vNodes)
              vNodes.push_back(pnode);
+         WAITABLE_CRITICAL_BLOCK(csOutbound)
+             nOutbound++;
  
          pnode->nTimeConnected = GetTime();
          return pnode;
@@@ -387,6 -491,13 +392,6 @@@ void CNode::CloseSocketDisconnect(
  
  void CNode::Cleanup()
  {
 -    // All of a nodes broadcasts and subscriptions are automatically torn down
 -    // when it goes down, so a node has to stay up to keep its broadcast going.
 -
 -    // Cancel subscriptions
 -    for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++)
 -        if (vfSubscribe[nChannel])
 -            CancelSubscribe(nChannel);
  }
  
  
@@@ -504,6 -615,15 +509,15 @@@ void ThreadSocketHandler2(void* parg
                      // remove from vNodes
                      vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
  
+                     if (!pnode->fInbound)
+                         WAITABLE_CRITICAL_BLOCK(csOutbound)
+                         {
+                             nOutbound--;
+                             // Connection slot(s) were removed, notify connection creator(s)
+                             NOTIFY(condOutbound);
+                         }
                      // close socket and cleanup
                      pnode->CloseSocketDisconnect();
                      pnode->Cleanup();
@@@ -1172,8 -1292,6 +1186,6 @@@ void ThreadOpenConnections2(void* parg
      int64 nStart = GetTime();
      loop
      {
-         int nOutbound = 0;
          vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
          Sleep(500);
          vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
              return;
  
          // Limit outbound connections
-         loop
-         {
-             nOutbound = 0;
-             CRITICAL_BLOCK(cs_vNodes)
-                 BOOST_FOREACH(CNode* pnode, vNodes)
-                     if (!pnode->fInbound)
-                         nOutbound++;
-             int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
-             nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
-             if (nOutbound < nMaxOutboundConnections)
-                 break;
-             vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
-             Sleep(2000);
-             vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
-             if (fShutdown)
-                 return;
-         }
+         int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+         vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
+         WAITABLE_CRITICAL_BLOCK(csOutbound)
+             WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
+         vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
+         if (fShutdown)
+             return;
  
          bool fAddSeeds = false;
  
@@@ -1646,6 -1754,7 +1648,7 @@@ bool StopNode(
      fShutdown = true;
      nTransactionsUpdated++;
      int64 nStart = GetTime();
+     NOTIFY_ALL(condOutbound);
      do
      {
          int nThreadsRunning = 0;
diff --combined src/util.cpp
@@@ -33,6 -33,16 +33,6 @@@ bool fNoListen = false
  bool fLogTimestamps = false;
  CMedianFilter<int64> vTimeOffsets(200,0);
  
 -
 -
 -// Workaround for "multiple definition of `_tls_used'"
 -// http://svn.boost.org/trac/boost/ticket/4258
 -extern "C" void tss_cleanup_implemented() { }
 -
 -
 -
 -
 -
  // Init openssl library multithreading support
  static boost::interprocess::interprocess_mutex** ppmutexOpenSSL;
  void locking_callback(int mode, int i, const char* file, int line)
@@@ -1183,62 -1193,14 +1183,14 @@@ static void pop_lock(
      dd_mutex.unlock();
  }
  
- void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
+ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs)
  {
-     push_lock(this, CLockLocation(pszName, pszFile, nLine));
- #ifdef DEBUG_LOCKCONTENTION
-     bool result = mutex.try_lock();
-     if (!result)
-     {
-         printf("LOCKCONTENTION: %s\n", pszName);
-         printf("Locker: %s:%d\n", pszFile, nLine);
-         mutex.lock();
-         printf("Locked\n");
-     }
- #else
-     mutex.lock();
- #endif
- }
- void CCriticalSection::Leave()
- {
-     mutex.unlock();
-     pop_lock();
- }
- bool CCriticalSection::TryEnter(const char* pszName, const char* pszFile, int nLine)
- {
-     push_lock(this, CLockLocation(pszName, pszFile, nLine));
-     bool result = mutex.try_lock();
-     if (!result) pop_lock();
-     return result;
+     push_lock(cs, CLockLocation(pszName, pszFile, nLine));
  }
  
- #else
- void CCriticalSection::Enter(const char* pszName, const char* pszFile, int nLine)
+ void LeaveCritical()
  {
- #ifdef DEBUG_LOCKCONTENTION
-     bool result = mutex.try_lock();
-     if (!result)
-     {
-         printf("LOCKCONTENTION: %s\n", pszName);
-         printf("Locker: %s:%d\n", pszFile, nLine);
-         mutex.lock();
-     }
- #else
-     mutex.lock();
- #endif
- }
- void CCriticalSection::Leave()
- {
-     mutex.unlock();
- }
- bool CCriticalSection::TryEnter(const char*, const char*, int)
- {
-     bool result = mutex.try_lock();
-     return result;
+     pop_lock();
  }
  
  #endif /* DEBUG_LOCKORDER */