First commit
[novacoin.git] / net.h
1 // Copyright (c) 2009 Satoshi Nakamoto\r
2 // Distributed under the MIT/X11 software license, see the accompanying\r
3 // file license.txt or http://www.opensource.org/licenses/mit-license.php.\r
4 \r
5 class CMessageHeader;\r
6 class CAddress;\r
7 class CInv;\r
8 class CRequestTracker;\r
9 class CNode;\r
10 \r
11 \r
12 \r
13 static const unsigned short DEFAULT_PORT = htons(8333);\r
14 static const unsigned int PUBLISH_HOPS = 5;\r
15 enum\r
16 {\r
17     NODE_NETWORK = (1 << 0),\r
18 };\r
19 \r
20 \r
21 \r
22 \r
23 \r
24 \r
25 bool ConnectSocket(const CAddress& addrConnect, SOCKET& hSocketRet);\r
26 bool GetMyExternalIP(unsigned int& ipRet);\r
27 bool AddAddress(CAddrDB& addrdb, const CAddress& addr);\r
28 CNode* FindNode(unsigned int ip);\r
29 CNode* ConnectNode(CAddress addrConnect, int64 nTimeout=0);\r
30 void AbandonRequests(void (*fn)(void*, CDataStream&), void* param1);\r
31 bool AnySubscribed(unsigned int nChannel);\r
32 void ThreadBitcoinMiner(void* parg);\r
33 bool StartNode(string& strError=REF(string()));\r
34 bool StopNode();\r
35 void CheckForShutdown(int n);\r
36 \r
37 \r
38 \r
39 \r
40 \r
41 \r
42 \r
43 \r
44 \r
45 //\r
46 // Message header\r
47 //  (4) message start\r
48 //  (12) command\r
49 //  (4) size\r
50 \r
51 // The message start string is designed to be unlikely to occur in normal data.\r
52 // The characters are rarely used upper ascii, not valid as UTF-8, and produce\r
53 // a large 4-byte int at any alignment.\r
54 static const char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };\r
55 \r
56 class CMessageHeader\r
57 {\r
58 public:\r
59     enum { COMMAND_SIZE=12 };\r
60     char pchMessageStart[sizeof(::pchMessageStart)];\r
61     char pchCommand[COMMAND_SIZE];\r
62     unsigned int nMessageSize;\r
63 \r
64     CMessageHeader()\r
65     {\r
66         memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));\r
67         memset(pchCommand, 0, sizeof(pchCommand));\r
68         pchCommand[1] = 1;\r
69         nMessageSize = -1;\r
70     }\r
71 \r
72     CMessageHeader(const char* pszCommand, unsigned int nMessageSizeIn)\r
73     {\r
74         memcpy(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart));\r
75         strncpy(pchCommand, pszCommand, COMMAND_SIZE);\r
76         nMessageSize = nMessageSizeIn;\r
77     }\r
78 \r
79     IMPLEMENT_SERIALIZE\r
80     (\r
81         READWRITE(FLATDATA(pchMessageStart));\r
82         READWRITE(FLATDATA(pchCommand));\r
83         READWRITE(nMessageSize);\r
84     )\r
85 \r
86     string GetCommand()\r
87     {\r
88         if (pchCommand[COMMAND_SIZE-1] == 0)\r
89             return string(pchCommand, pchCommand + strlen(pchCommand));\r
90         else\r
91             return string(pchCommand, pchCommand + COMMAND_SIZE);\r
92     }\r
93 \r
94     bool IsValid()\r
95     {\r
96         // Check start string\r
97         if (memcmp(pchMessageStart, ::pchMessageStart, sizeof(pchMessageStart)) != 0)\r
98             return false;\r
99 \r
100         // Check the command string for errors\r
101         for (char* p1 = pchCommand; p1 < pchCommand + COMMAND_SIZE; p1++)\r
102         {\r
103             if (*p1 == 0)\r
104             {\r
105                 // Must be all zeros after the first zero\r
106                 for (; p1 < pchCommand + COMMAND_SIZE; p1++)\r
107                     if (*p1 != 0)\r
108                         return false;\r
109             }\r
110             else if (*p1 < ' ' || *p1 > 0x7E)\r
111                 return false;\r
112         }\r
113 \r
114         // Message size\r
115         if (nMessageSize > 0x10000000)\r
116         {\r
117             printf("CMessageHeader::IsValid() : nMessageSize too large %u\n", nMessageSize);\r
118             return false;\r
119         }\r
120 \r
121         return true;\r
122     }\r
123 };\r
124 \r
125 \r
126 \r
127 \r
128 \r
129 \r
130 static const unsigned char pchIPv4[12] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff };\r
131 \r
132 class CAddress\r
133 {\r
134 public:\r
135     uint64 nServices;\r
136     unsigned char pchReserved[12];\r
137     unsigned int ip;\r
138     unsigned short port;\r
139 \r
140     // disk only\r
141     unsigned int nTime;\r
142 \r
143     // memory only\r
144     unsigned int nLastFailed;\r
145 \r
146     CAddress()\r
147     {\r
148         nServices = 0;\r
149         memcpy(pchReserved, pchIPv4, sizeof(pchReserved));\r
150         ip = 0;\r
151         port = DEFAULT_PORT;\r
152         nTime = GetAdjustedTime();\r
153         nLastFailed = 0;\r
154     }\r
155 \r
156     CAddress(unsigned int ipIn, unsigned short portIn=DEFAULT_PORT, uint64 nServicesIn=0)\r
157     {\r
158         nServices = nServicesIn;\r
159         memcpy(pchReserved, pchIPv4, sizeof(pchReserved));\r
160         ip = ipIn;\r
161         port = portIn;\r
162         nTime = GetAdjustedTime();\r
163         nLastFailed = 0;\r
164     }\r
165 \r
166     explicit CAddress(const struct sockaddr_in& sockaddr, uint64 nServicesIn=0)\r
167     {\r
168         nServices = nServicesIn;\r
169         memcpy(pchReserved, pchIPv4, sizeof(pchReserved));\r
170         ip = sockaddr.sin_addr.s_addr;\r
171         port = sockaddr.sin_port;\r
172         nTime = GetAdjustedTime();\r
173         nLastFailed = 0;\r
174     }\r
175 \r
176     explicit CAddress(const char* pszIn, uint64 nServicesIn=0)\r
177     {\r
178         nServices = nServicesIn;\r
179         memcpy(pchReserved, pchIPv4, sizeof(pchReserved));\r
180         ip = 0;\r
181         port = DEFAULT_PORT;\r
182         nTime = GetAdjustedTime();\r
183         nLastFailed = 0;\r
184 \r
185         char psz[100];\r
186         if (strlen(pszIn) > ARRAYLEN(psz)-1)\r
187             return;\r
188         strcpy(psz, pszIn);\r
189         unsigned int a, b, c, d, e;\r
190         if (sscanf(psz, "%u.%u.%u.%u:%u", &a, &b, &c, &d, &e) < 4)\r
191             return;\r
192         char* pszPort = strchr(psz, ':');\r
193         if (pszPort)\r
194         {\r
195             *pszPort++ = '\0';\r
196             port = htons(atoi(pszPort));\r
197         }\r
198         ip = inet_addr(psz);\r
199     }\r
200 \r
201     IMPLEMENT_SERIALIZE\r
202     (\r
203         if (nType & SER_DISK)\r
204         {\r
205             READWRITE(nVersion);\r
206             READWRITE(nTime);\r
207         }\r
208         READWRITE(nServices);\r
209         READWRITE(FLATDATA(pchReserved));\r
210         READWRITE(ip);\r
211         READWRITE(port);\r
212     )\r
213 \r
214     friend inline bool operator==(const CAddress& a, const CAddress& b)\r
215     {\r
216         return (memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved)) == 0 &&\r
217                 a.ip   == b.ip &&\r
218                 a.port == b.port);\r
219     }\r
220 \r
221     friend inline bool operator<(const CAddress& a, const CAddress& b)\r
222     {\r
223         int ret = memcmp(a.pchReserved, b.pchReserved, sizeof(a.pchReserved));\r
224         if (ret < 0)\r
225             return true;\r
226         else if (ret == 0)\r
227         {\r
228             if (ntohl(a.ip) < ntohl(b.ip))\r
229                 return true;\r
230             else if (a.ip == b.ip)\r
231                 return ntohs(a.port) < ntohs(b.port);\r
232         }\r
233         return false;\r
234     }\r
235 \r
236     vector<unsigned char> GetKey() const\r
237     {\r
238         CDataStream ss;\r
239         ss.reserve(18);\r
240         ss << FLATDATA(pchReserved) << ip << port;\r
241 \r
242         #if defined(_MSC_VER) && _MSC_VER < 1300\r
243         return vector<unsigned char>((unsigned char*)&ss.begin()[0], (unsigned char*)&ss.end()[0]);\r
244         #else\r
245         return vector<unsigned char>(ss.begin(), ss.end());\r
246         #endif\r
247     }\r
248 \r
249     struct sockaddr_in GetSockAddr() const\r
250     {\r
251         struct sockaddr_in sockaddr;\r
252         sockaddr.sin_family = AF_INET;\r
253         sockaddr.sin_addr.s_addr = ip;\r
254         sockaddr.sin_port = port;\r
255         return sockaddr;\r
256     }\r
257 \r
258     bool IsIPv4() const\r
259     {\r
260         return (memcmp(pchReserved, pchIPv4, sizeof(pchIPv4)) == 0);\r
261     }\r
262 \r
263     bool IsRoutable() const\r
264     {\r
265         return !(GetByte(3) == 10 || (GetByte(3) == 192 && GetByte(2) == 168) || GetByte(3) == 127 || GetByte(3) == 0);\r
266     }\r
267 \r
268     unsigned char GetByte(int n) const\r
269     {\r
270         return ((unsigned char*)&ip)[3-n];\r
271     }\r
272 \r
273     string ToStringIPPort() const\r
274     {\r
275         return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));\r
276     }\r
277 \r
278     string ToStringIP() const\r
279     {\r
280         return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0));\r
281     }\r
282 \r
283     string ToString() const\r
284     {\r
285         return strprintf("%u.%u.%u.%u:%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0), ntohs(port));\r
286         //return strprintf("%u.%u.%u.%u", GetByte(3), GetByte(2), GetByte(1), GetByte(0));\r
287     }\r
288 \r
289     void print() const\r
290     {\r
291         printf("CAddress(%s)\n", ToString().c_str());\r
292     }\r
293 };\r
294 \r
295 \r
296 \r
297 \r
298 \r
299 \r
300 \r
301 enum\r
302 {\r
303     MSG_TX = 1,\r
304     MSG_BLOCK,\r
305     MSG_REVIEW,\r
306     MSG_PRODUCT,\r
307     MSG_TABLE,\r
308 };\r
309 \r
310 static const char* ppszTypeName[] =\r
311 {\r
312     "ERROR",\r
313     "tx",\r
314     "block",\r
315     "review",\r
316     "product",\r
317     "table",\r
318 };\r
319 \r
320 class CInv\r
321 {\r
322 public:\r
323     int type;\r
324     uint256 hash;\r
325 \r
326     CInv()\r
327     {\r
328         type = 0;\r
329         hash = 0;\r
330     }\r
331 \r
332     CInv(int typeIn, const uint256& hashIn)\r
333     {\r
334         type = typeIn;\r
335         hash = hashIn;\r
336     }\r
337 \r
338     CInv(const string& strType, const uint256& hashIn)\r
339     {\r
340         int i;\r
341         for (i = 1; i < ARRAYLEN(ppszTypeName); i++)\r
342         {\r
343             if (strType == ppszTypeName[i])\r
344             {\r
345                 type = i;\r
346                 break;\r
347             }\r
348         }\r
349         if (i == ARRAYLEN(ppszTypeName))\r
350             throw std::out_of_range(strprintf("CInv::CInv(string, uint256) : unknown type '%s'", strType.c_str()));\r
351         hash = hashIn;\r
352     }\r
353 \r
354     IMPLEMENT_SERIALIZE\r
355     (\r
356         READWRITE(type);\r
357         READWRITE(hash);\r
358     )\r
359 \r
360     friend inline bool operator<(const CInv& a, const CInv& b)\r
361     {\r
362         return (a.type < b.type || (a.type == b.type && a.hash < b.hash));\r
363     }\r
364 \r
365     bool IsKnownType() const\r
366     {\r
367         return (type >= 1 && type < ARRAYLEN(ppszTypeName));\r
368     }\r
369 \r
370     const char* GetCommand() const\r
371     {\r
372         if (!IsKnownType())\r
373             throw std::out_of_range(strprintf("CInv::GetCommand() : type=% unknown type", type));\r
374         return ppszTypeName[type];\r
375     }\r
376 \r
377     string ToString() const\r
378     {\r
379         return strprintf("%s %s", GetCommand(), hash.ToString().substr(0,14).c_str());\r
380     }\r
381 \r
382     void print() const\r
383     {\r
384         printf("CInv(%s)\n", ToString().c_str());\r
385     }\r
386 };\r
387 \r
388 \r
389 \r
390 \r
391 \r
392 class CRequestTracker\r
393 {\r
394 public:\r
395     void (*fn)(void*, CDataStream&);\r
396     void* param1;\r
397 \r
398     explicit CRequestTracker(void (*fnIn)(void*, CDataStream&)=NULL, void* param1In=NULL)\r
399     {\r
400         fn = fnIn;\r
401         param1 = param1In;\r
402     }\r
403 \r
404     bool IsNull()\r
405     {\r
406         return fn == NULL;\r
407     }\r
408 };\r
409 \r
410 \r
411 \r
412 \r
413 \r
414 extern bool fClient;\r
415 extern uint64 nLocalServices;\r
416 extern CAddress addrLocalHost;\r
417 extern CNode* pnodeLocalHost;\r
418 extern bool fShutdown;\r
419 extern array<bool, 10> vfThreadRunning;\r
420 extern vector<CNode*> vNodes;\r
421 extern CCriticalSection cs_vNodes;\r
422 extern map<vector<unsigned char>, CAddress> mapAddresses;\r
423 extern CCriticalSection cs_mapAddresses;\r
424 extern map<CInv, CDataStream> mapRelay;\r
425 extern deque<pair<int64, CInv> > vRelayExpiration;\r
426 extern CCriticalSection cs_mapRelay;\r
427 extern map<CInv, int64> mapAlreadyAskedFor;\r
428 extern CAddress addrProxy;\r
429 \r
430 \r
431 \r
432 \r
433 \r
434 class CNode\r
435 {\r
436 public:\r
437     // socket\r
438     uint64 nServices;\r
439     SOCKET hSocket;\r
440     CDataStream vSend;\r
441     CDataStream vRecv;\r
442     CCriticalSection cs_vSend;\r
443     CCriticalSection cs_vRecv;\r
444     unsigned int nPushPos;\r
445     CAddress addr;\r
446     int nVersion;\r
447     bool fClient;\r
448     bool fInbound;\r
449     bool fNetworkNode;\r
450     bool fDisconnect;\r
451 protected:\r
452     int nRefCount;\r
453 public:\r
454     int64 nReleaseTime;\r
455     map<uint256, CRequestTracker> mapRequests;\r
456     CCriticalSection cs_mapRequests;\r
457 \r
458     // flood\r
459     vector<CAddress> vAddrToSend;\r
460     set<CAddress> setAddrKnown;\r
461 \r
462     // inventory based relay\r
463     set<CInv> setInventoryKnown;\r
464     set<CInv> setInventoryKnown2;\r
465     vector<CInv> vInventoryToSend;\r
466     CCriticalSection cs_inventory;\r
467     multimap<int64, CInv> mapAskFor;\r
468 \r
469     // publish and subscription\r
470     vector<char> vfSubscribe;\r
471 \r
472 \r
473     CNode(SOCKET hSocketIn, CAddress addrIn, bool fInboundIn=false)\r
474     {\r
475         nServices = 0;\r
476         hSocket = hSocketIn;\r
477         vSend.SetType(SER_NETWORK);\r
478         vRecv.SetType(SER_NETWORK);\r
479         nPushPos = -1;\r
480         addr = addrIn;\r
481         nVersion = 0;\r
482         fClient = false; // set by version message\r
483         fInbound = fInboundIn;\r
484         fNetworkNode = false;\r
485         fDisconnect = false;\r
486         nRefCount = 0;\r
487         nReleaseTime = 0;\r
488         vfSubscribe.assign(256, false);\r
489 \r
490         // Push a version message\r
491         /// when NTP implemented, change to just nTime = GetAdjustedTime()\r
492         int64 nTime = (fInbound ? GetAdjustedTime() : GetTime());\r
493         PushMessage("version", VERSION, nLocalServices, nTime, addr);\r
494     }\r
495 \r
496     ~CNode()\r
497     {\r
498         if (hSocket != INVALID_SOCKET)\r
499             closesocket(hSocket);\r
500     }\r
501 \r
502 private:\r
503     CNode(const CNode&);\r
504     void operator=(const CNode&);\r
505 public:\r
506 \r
507 \r
508     bool ReadyToDisconnect()\r
509     {\r
510         return fDisconnect || GetRefCount() <= 0;\r
511     }\r
512 \r
513     int GetRefCount()\r
514     {\r
515         return max(nRefCount, 0) + (GetTime() < nReleaseTime ? 1 : 0);\r
516     }\r
517 \r
518     void AddRef(int64 nTimeout=0)\r
519     {\r
520         if (nTimeout != 0)\r
521             nReleaseTime = max(nReleaseTime, GetTime() + nTimeout);\r
522         else\r
523             nRefCount++;\r
524     }\r
525 \r
526     void Release()\r
527     {\r
528         nRefCount--;\r
529     }\r
530 \r
531 \r
532 \r
533     void AddInventoryKnown(const CInv& inv)\r
534     {\r
535         CRITICAL_BLOCK(cs_inventory)\r
536             setInventoryKnown.insert(inv);\r
537     }\r
538 \r
539     void PushInventory(const CInv& inv)\r
540     {\r
541         CRITICAL_BLOCK(cs_inventory)\r
542             if (!setInventoryKnown.count(inv))\r
543                 vInventoryToSend.push_back(inv);\r
544     }\r
545 \r
546     void AskFor(const CInv& inv)\r
547     {\r
548         // We're using mapAskFor as a priority queue,\r
549         // the key is the earliest time the request can be sent\r
550         int64& nRequestTime = mapAlreadyAskedFor[inv];\r
551         printf("askfor %s  %I64d\n", inv.ToString().c_str(), nRequestTime);\r
552 \r
553         // Make sure not to reuse time indexes to keep things in the same order\r
554         int64 nNow = (GetTime() - 1) * 1000000;\r
555         static int64 nLastTime;\r
556         nLastTime = nNow = max(nNow, ++nLastTime);\r
557 \r
558         // Each retry is 2 minutes after the last\r
559         nRequestTime = max(nRequestTime + 2 * 60 * 1000000, nNow);\r
560         mapAskFor.insert(make_pair(nRequestTime, inv));\r
561     }\r
562 \r
563 \r
564 \r
565     void BeginMessage(const char* pszCommand)\r
566     {\r
567         EnterCriticalSection(&cs_vSend);\r
568         if (nPushPos != -1)\r
569             AbortMessage();\r
570         nPushPos = vSend.size();\r
571         vSend << CMessageHeader(pszCommand, 0);\r
572         printf("sending: %-12s ", pszCommand);\r
573     }\r
574 \r
575     void AbortMessage()\r
576     {\r
577         if (nPushPos == -1)\r
578             return;\r
579         vSend.resize(nPushPos);\r
580         nPushPos = -1;\r
581         LeaveCriticalSection(&cs_vSend);\r
582         printf("(aborted)\n");\r
583     }\r
584 \r
585     void EndMessage()\r
586     {\r
587         extern int nDropMessagesTest;\r
588         if (nDropMessagesTest > 0 && GetRand(nDropMessagesTest) == 0)\r
589         {\r
590             printf("dropmessages DROPPING SEND MESSAGE\n");\r
591             AbortMessage();\r
592             return;\r
593         }\r
594 \r
595         if (nPushPos == -1)\r
596             return;\r
597 \r
598         // Patch in the size\r
599         unsigned int nSize = vSend.size() - nPushPos - sizeof(CMessageHeader);\r
600         memcpy((char*)&vSend[nPushPos] + offsetof(CMessageHeader, nMessageSize), &nSize, sizeof(nSize));\r
601 \r
602         printf("(%d bytes)  ", nSize);\r
603         //for (int i = nPushPos+sizeof(CMessageHeader); i < min(vSend.size(), nPushPos+sizeof(CMessageHeader)+20U); i++)\r
604         //    printf("%02x ", vSend[i] & 0xff);\r
605         printf("\n");\r
606 \r
607         nPushPos = -1;\r
608         LeaveCriticalSection(&cs_vSend);\r
609     }\r
610 \r
611     void EndMessageAbortIfEmpty()\r
612     {\r
613         if (nPushPos == -1)\r
614             return;\r
615         int nSize = vSend.size() - nPushPos - sizeof(CMessageHeader);\r
616         if (nSize > 0)\r
617             EndMessage();\r
618         else\r
619             AbortMessage();\r
620     }\r
621 \r
622     const char* GetMessageCommand() const\r
623     {\r
624         if (nPushPos == -1)\r
625             return "";\r
626         return &vSend[nPushPos] + offsetof(CMessageHeader, pchCommand);\r
627     }\r
628 \r
629 \r
630 \r
631 \r
632     void PushMessage(const char* pszCommand)\r
633     {\r
634         try\r
635         {\r
636             BeginMessage(pszCommand);\r
637             EndMessage();\r
638         }\r
639         catch (...)\r
640         {\r
641             AbortMessage();\r
642             throw;\r
643         }\r
644     }\r
645 \r
646     template<typename T1>\r
647     void PushMessage(const char* pszCommand, const T1& a1)\r
648     {\r
649         try\r
650         {\r
651             BeginMessage(pszCommand);\r
652             vSend << a1;\r
653             EndMessage();\r
654         }\r
655         catch (...)\r
656         {\r
657             AbortMessage();\r
658             throw;\r
659         }\r
660     }\r
661 \r
662     template<typename T1, typename T2>\r
663     void PushMessage(const char* pszCommand, const T1& a1, const T2& a2)\r
664     {\r
665         try\r
666         {\r
667             BeginMessage(pszCommand);\r
668             vSend << a1 << a2;\r
669             EndMessage();\r
670         }\r
671         catch (...)\r
672         {\r
673             AbortMessage();\r
674             throw;\r
675         }\r
676     }\r
677 \r
678     template<typename T1, typename T2, typename T3>\r
679     void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3)\r
680     {\r
681         try\r
682         {\r
683             BeginMessage(pszCommand);\r
684             vSend << a1 << a2 << a3;\r
685             EndMessage();\r
686         }\r
687         catch (...)\r
688         {\r
689             AbortMessage();\r
690             throw;\r
691         }\r
692     }\r
693 \r
694     template<typename T1, typename T2, typename T3, typename T4>\r
695     void PushMessage(const char* pszCommand, const T1& a1, const T2& a2, const T3& a3, const T4& a4)\r
696     {\r
697         try\r
698         {\r
699             BeginMessage(pszCommand);\r
700             vSend << a1 << a2 << a3 << a4;\r
701             EndMessage();\r
702         }\r
703         catch (...)\r
704         {\r
705             AbortMessage();\r
706             throw;\r
707         }\r
708     }\r
709 \r
710 \r
711     void PushRequest(const char* pszCommand,\r
712                      void (*fn)(void*, CDataStream&), void* param1)\r
713     {\r
714         uint256 hashReply;\r
715         RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));\r
716 \r
717         CRITICAL_BLOCK(cs_mapRequests)\r
718             mapRequests[hashReply] = CRequestTracker(fn, param1);\r
719 \r
720         PushMessage(pszCommand, hashReply);\r
721     }\r
722 \r
723     template<typename T1>\r
724     void PushRequest(const char* pszCommand, const T1& a1,\r
725                      void (*fn)(void*, CDataStream&), void* param1)\r
726     {\r
727         uint256 hashReply;\r
728         RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));\r
729 \r
730         CRITICAL_BLOCK(cs_mapRequests)\r
731             mapRequests[hashReply] = CRequestTracker(fn, param1);\r
732 \r
733         PushMessage(pszCommand, hashReply, a1);\r
734     }\r
735 \r
736     template<typename T1, typename T2>\r
737     void PushRequest(const char* pszCommand, const T1& a1, const T2& a2,\r
738                      void (*fn)(void*, CDataStream&), void* param1)\r
739     {\r
740         uint256 hashReply;\r
741         RAND_bytes((unsigned char*)&hashReply, sizeof(hashReply));\r
742 \r
743         CRITICAL_BLOCK(cs_mapRequests)\r
744             mapRequests[hashReply] = CRequestTracker(fn, param1);\r
745 \r
746         PushMessage(pszCommand, hashReply, a1, a2);\r
747     }\r
748 \r
749 \r
750 \r
751     bool IsSubscribed(unsigned int nChannel);\r
752     void Subscribe(unsigned int nChannel, unsigned int nHops=0);\r
753     void CancelSubscribe(unsigned int nChannel);\r
754     void Disconnect();\r
755 };\r
756 \r
757 \r
758 \r
759 \r
760 \r
761 \r
762 \r
763 \r
764 \r
765 \r
766 inline void RelayInventory(const CInv& inv)\r
767 {\r
768     // Put on lists to offer to the other nodes\r
769     CRITICAL_BLOCK(cs_vNodes)\r
770         foreach(CNode* pnode, vNodes)\r
771             pnode->PushInventory(inv);\r
772 }\r
773 \r
774 template<typename T>\r
775 void RelayMessage(const CInv& inv, const T& a)\r
776 {\r
777     CDataStream ss(SER_NETWORK);\r
778     ss.reserve(10000);\r
779     ss << a;\r
780     RelayMessage(inv, ss);\r
781 }\r
782 \r
783 template<>\r
784 inline void RelayMessage<>(const CInv& inv, const CDataStream& ss)\r
785 {\r
786     CRITICAL_BLOCK(cs_mapRelay)\r
787     {\r
788         // Expire old relay messages\r
789         while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())\r
790         {\r
791             mapRelay.erase(vRelayExpiration.front().second);\r
792             vRelayExpiration.pop_front();\r
793         }\r
794 \r
795         // Save original serialized message so newer versions are preserved\r
796         mapRelay[inv] = ss;\r
797         vRelayExpiration.push_back(make_pair(GetTime() + 15 * 60, inv));\r
798     }\r
799 \r
800     RelayInventory(inv);\r
801 }\r
802 \r
803 \r
804 \r
805 \r
806 \r
807 \r
808 \r
809 \r
810 //\r
811 // Templates for the publish and subscription system.\r
812 // The object being published as T& obj needs to have:\r
813 //   a set<unsigned int> setSources member\r
814 //   specializations of AdvertInsert and AdvertErase\r
815 // Currently implemented for CTable and CProduct.\r
816 //\r
817 \r
818 template<typename T>\r
819 void AdvertStartPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)\r
820 {\r
821     // Add to sources\r
822     obj.setSources.insert(pfrom->addr.ip);\r
823 \r
824     if (!AdvertInsert(obj))\r
825         return;\r
826 \r
827     // Relay\r
828     CRITICAL_BLOCK(cs_vNodes)\r
829         foreach(CNode* pnode, vNodes)\r
830             if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))\r
831                 pnode->PushMessage("publish", nChannel, nHops, obj);\r
832 }\r
833 \r
834 template<typename T>\r
835 void AdvertStopPublish(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)\r
836 {\r
837     uint256 hash = obj.GetHash();\r
838 \r
839     CRITICAL_BLOCK(cs_vNodes)\r
840         foreach(CNode* pnode, vNodes)\r
841             if (pnode != pfrom && (nHops < PUBLISH_HOPS || pnode->IsSubscribed(nChannel)))\r
842                 pnode->PushMessage("pub-cancel", nChannel, nHops, hash);\r
843 \r
844     AdvertErase(obj);\r
845 }\r
846 \r
847 template<typename T>\r
848 void AdvertRemoveSource(CNode* pfrom, unsigned int nChannel, unsigned int nHops, T& obj)\r
849 {\r
850     // Remove a source\r
851     obj.setSources.erase(pfrom->addr.ip);\r
852 \r
853     // If no longer supported by any sources, cancel it\r
854     if (obj.setSources.empty())\r
855         AdvertStopPublish(pfrom, nChannel, nHops, obj);\r
856 }\r