#include <arpa/inet.h>
#endif
+#include "mruset.h"
#include "netbase.h"
#include "protocol.h"
+#include "addrman.h"
class CAddrDB;
class CRequestTracker;
inline unsigned int ReceiveBufferSize() { return 1000*GetArg("-maxreceivebuffer", 10*1000); }
inline unsigned int SendBufferSize() { return 1000*GetArg("-maxsendbuffer", 10*1000); }
-static const unsigned int PUBLISH_HOPS = 5;
+bool RecvLine(SOCKET hSocket, std::string& strLine);
bool GetMyExternalIP(CNetAddr& ipRet);
-bool AddAddress(CAddress addr, int64 nTimePenalty=0, CAddrDB *pAddrDB=NULL);
void AddressCurrentlyConnected(const CService& addr);
CNode* FindNode(const CNetAddr& ip);
CNode* FindNode(const CService& ip);
CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);
-void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);
-bool AnySubscribed(unsigned int nChannel);
void MapPort(bool fMapPort);
bool BindListenPort(std::string& strError=REF(std::string()));
void StartNode(void* parg);
};
-
+/** Thread types */
enum threadId
{
THREAD_SOCKETHANDLER,
THREAD_UPNP,
THREAD_DNSSEED,
THREAD_ADDEDCONNECTIONS,
+ THREAD_DUMPADDRESS,
THREAD_MAX
};
extern CAddress addrLocalHost;
extern uint64 nLocalHostNonce;
extern boost::array<int, THREAD_MAX> vnThreadsRunning;
+extern CAddrMan addrman;
extern std::vector<CNode*> vNodes;
extern CCriticalSection cs_vNodes;
-extern std::map<std::vector<unsigned char>, CAddress> mapAddresses;
-extern CCriticalSection cs_mapAddresses;
extern std::map<CInv, CDataStream> mapRelay;
extern std::deque<std::pair<int64, CInv> > vRelayExpiration;
extern CCriticalSection cs_mapRelay;
-
+/** Information about a peer */
class CNode
{
public:
std::set<uint256> setKnown;
// inventory based relay
- std::set<CInv> setInventoryKnown;
+ mruset<CInv> setInventoryKnown;
std::vector<CInv> vInventoryToSend;
CCriticalSection cs_inventory;
std::multimap<int64, CInv> mapAskFor;
- // publish and subscription
- std::vector<char> vfSubscribe;
-
CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)
{
nServices = 0;
hSocket = hSocketIn;
vSend.SetType(SER_NETWORK);
vRecv.SetType(SER_NETWORK);
- vSend.SetVersion(209);
- vRecv.SetVersion(209);
+ vSend.SetVersion(MIN_PROTO_VERSION);
+ vRecv.SetVersion(MIN_PROTO_VERSION);
nLastSend = 0;
nLastRecv = 0;
nLastSendEmpty = GetTime();
hashLastGetBlocksEnd = 0;
nStartingHeight = -1;
fGetAddr = false;
- vfSubscribe.assign(256, false);
nMisbehavior = 0;
+ setInventoryKnown.max_size(SendBufferSize() / 1000);
// Be shy and don't send version until we hear
if (!fInbound)
void AddInventoryKnown(const CInv& inv)
{
- CRITICAL_BLOCK(cs_inventory)
+ {
+ LOCK(cs_inventory);
setInventoryKnown.insert(inv);
+ }
}
void PushInventory(const CInv& inv)
{
- CRITICAL_BLOCK(cs_inventory)
+ {
+ LOCK(cs_inventory);
if (!setInventoryKnown.count(inv))
vInventoryToSend.push_back(inv);
+ }
}
void AskFor(const CInv& inv)
uint256 hashReply;
RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
- CRITICAL_BLOCK(cs_mapRequests)
+ {
+ LOCK(cs_mapRequests);
mapRequests[hashReply] = CRequestTracker(fn, param1);
+ }
PushMessage(pszCommand, hashReply);
}
uint256 hashReply;
RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
- CRITICAL_BLOCK(cs_mapRequests)
+ {
+ LOCK(cs_mapRequests);
mapRequests[hashReply] = CRequestTracker(fn, param1);
+ }
PushMessage(pszCommand, hashReply, a1);
}
uint256 hashReply;
RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));
- CRITICAL_BLOCK(cs_mapRequests)
+ {
+ LOCK(cs_mapRequests);
mapRequests[hashReply] = CRequestTracker(fn, param1);
+ }
PushMessage(pszCommand, hashReply, a1, a2);
}
inline void RelayInventory(const CInv& inv)
{
// Put on lists to offer to the other nodes
- CRITICAL_BLOCK(cs_vNodes)
+ {
+ LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes)
pnode->PushInventory(inv);
+ }
}
template<typename T>
template<>
inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)
{
- CRITICAL_BLOCK(cs_mapRelay)
{
+ LOCK(cs_mapRelay);
// Expire old relay messages
while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
{
}
-
-
-
-
-
-
-//
-// Templates for the publish and subscription system.
-// The object being published as T& obj needs to have:
-// a set<unsigned int> setSources member
-// specializations of AdvertInsert and AdvertErase
-// Currently implemented for CTable and CProduct.
-//
-
-template<typename T>
-void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- // Add to sources
- obj.setSources.insert(pfrom->addr.ip);
-
- if (!AdvertInsert(obj))
- return;
-
- // Relay
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
- pnode->PushMessage("publish", nChannel, nHops, obj);
-}
-
-template<typename T>
-void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- uint256 hash = obj.GetHash();
-
- CRITICAL_BLOCK(cs_vNodes)
- BOOST_FOREACH(CNode* pnode, vNodes)
- if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))
- pnode->PushMessage("pub-cancel", nChannel, nHops, hash);
-
- AdvertErase(obj);
-}
-
-template<typename T>
-void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)
-{
- // Remove a source
- obj.setSources.erase(pfrom->addr.ip);
-
- // If no longer supported by any sources, cancel it
- if (obj.setSources.empty())
- AdvertStopPublish(pfrom, nChannel, nHops, obj);
-}
-
#endif