set<CNetAddr> setservAddNodeAddresses;
CCriticalSection cs_setservAddNodeAddresses;
+static CWaitableCriticalSection csOutbound;
+static int nOutbound = 0;
+static CConditionVariable condOutbound;
unsigned short GetListenPort()
-void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1)
-{
- // If the dialog might get closed before the reply comes back,
- // call this in the destructor so it doesn't get called after it's deleted.
- CRITICAL_BLOCK(cs_vNodes)
- {
- BOOST_FOREACH(CNode* pnode, vNodes)
- {
- CRITICAL_BLOCK(pnode->cs_mapRequests)
- {
- for (map<uint256, CRequestTracker>::iterator mi = pnode->mapRequests.begin(); mi != pnode->mapRequests.end();)
- {
- CRequestTracker& tracker = (*mi).second;
- if (tracker.fn == fn && tracker.param1 == param1)
- pnode->mapRequests.erase(mi++);
- else
- mi++;
- }
- }
- }
- }
-}
-
-
-
-
-
-
-
-//
-// Subscription methods for the broadcast and subscription system.
-// Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT.
-//
-// The subscription system uses a meet-in-the-middle strategy.
-// With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers
-// subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through.
-//
-
-bool AnySubscribed(unsigned int nChannel)
-{
- if (pnodeLocalHost->IsSubscribed(nChannel))
- return true;
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode->IsSubscribed(nChannel))
- return true;
- return false;
-}
-
-bool CNode::IsSubscribed(unsigned int nChannel)
-{
- if (nChannel >= vfSubscribe.size())
- return false;
- return vfSubscribe[nChannel];
-}
-
-void CNode::Subscribe(unsigned int nChannel, unsigned int nHops)
-{
- if (nChannel >= vfSubscribe.size())
- return;
-
- if (!AnySubscribed(nChannel))
- {
- // Relay subscribe
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode != this)
- pnode->PushMessage("subscribe", nChannel, nHops);
- }
-
- vfSubscribe[nChannel] = true;
-}
-
-void CNode::CancelSubscribe(unsigned int nChannel)
-{
- if (nChannel >= vfSubscribe.size())
- return;
-
- // Prevent from relaying cancel if wasn't subscribed
- if (!vfSubscribe[nChannel])
- return;
- vfSubscribe[nChannel] = false;
-
- if (!AnySubscribed(nChannel))
- {
- // Relay subscription cancel
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode != this)
- pnode->PushMessage("sub-cancel", nChannel);
- }
-}
-
-
-
-
-
-
-
CNode* FindNode(const CNetAddr& ip)
pnode->AddRef();
CRITICAL_BLOCK(cs_vNodes)
vNodes.push_back(pnode);
+ WAITABLE_CRITICAL_BLOCK(csOutbound)
+ nOutbound++;
pnode->nTimeConnected = GetTime();
return pnode;
void CNode::Cleanup()
{
- // All of a nodes broadcasts and subscriptions are automatically torn down
- // when it goes down, so a node has to stay up to keep its broadcast going.
-
- // Cancel subscriptions
- for (unsigned int nChannel = 0; nChannel < vfSubscribe.size(); nChannel++)
- if (vfSubscribe[nChannel])
- CancelSubscribe(nChannel);
}
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
+ if (!pnode->fInbound)
+ WAITABLE_CRITICAL_BLOCK(csOutbound)
+ {
+ nOutbound--;
+
+ // Connection slot(s) were removed, notify connection creator(s)
+ NOTIFY(condOutbound);
+ }
+
// close socket and cleanup
pnode->CloseSocketDisconnect();
pnode->Cleanup();
{
BOOST_FOREACH(CNetAddr& ip, vaddr)
{
+ int nOneDay = 24*3600;
CAddress addr = CAddress(CService(ip, GetDefaultPort()));
- addr.nTime = 0;
+ addr.nTime = GetTime() - 3*nOneDay - GetRand(4*nOneDay); // use a random age between 3 and 7 days old
vAdd.push_back(addr);
found++;
}
int64 nStart = GetTime();
loop
{
- int nOutbound = 0;
-
vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
Sleep(500);
vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
return;
// Limit outbound connections
- loop
- {
- nOutbound = 0;
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (!pnode->fInbound)
- nOutbound++;
- int nMaxOutboundConnections = MAX_OUTBOUND_CONNECTIONS;
- nMaxOutboundConnections = min(nMaxOutboundConnections, (int)GetArg("-maxconnections", 125));
- if (nOutbound < nMaxOutboundConnections)
- break;
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- Sleep(2000);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
- }
+ int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+ vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
+ WAITABLE_CRITICAL_BLOCK(csOutbound)
+ WAIT(condOutbound, fShutdown || nOutbound < nMaxOutbound);
+ vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
+ if (fShutdown)
+ return;
bool fAddSeeds = false;
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
+ NOTIFY_ALL(condOutbound);
do
{
int nThreadsRunning = 0;