1e1c55694899e2b1116ef877aaffca8e582d40f9
[p2pool.git] / p2pool / node.py
1 import random
2 import sys
3 import time
4
5 from twisted.internet import defer, reactor
6 from twisted.python import log
7
8 from p2pool import data as p2pool_data, p2p
9 from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker
10 from p2pool.util import deferral, variable
11
12
13 class P2PNode(p2p.Node):
14     def __init__(self, node, **kwargs):
15         self.node = node
16         p2p.Node.__init__(self,
17             best_share_hash_func=lambda: node.best_share_var.value,
18             net=node.net,
19             known_txs_var=node.known_txs_var,
20             mining_txs_var=node.mining_txs_var,
21         **kwargs)
22     
23     def handle_shares(self, shares, peer):
24         if len(shares) > 5:
25             print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
26         
27         new_count = 0
28         all_new_txs = {}
29         for share, new_txs in shares:
30             if new_txs is not None:
31                 all_new_txs.update((bitcoin_data.hash256(bitcoin_data.tx_type.pack(new_tx)), new_tx) for new_tx in new_txs)
32             
33             if share.hash in self.node.tracker.items:
34                 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
35                 continue
36             
37             new_count += 1
38             
39             #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer_addr)
40             
41             self.node.tracker.add(share)
42         
43         new_known_txs = dict(self.node.known_txs_var.value)
44         new_known_txs.update(all_new_txs)
45         self.node.known_txs_var.set(new_known_txs)
46         
47         if new_count:
48             self.node.set_best_share()
49         
50         if len(shares) > 5:
51             print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH)
52     
53     @defer.inlineCallbacks
54     def handle_share_hashes(self, hashes, peer):
55         new_hashes = [x for x in hashes if x not in self.node.tracker.items]
56         if not new_hashes:
57             return
58         try:
59             shares = yield peer.get_shares(
60                 hashes=new_hashes,
61                 parents=0,
62                 stops=[],
63             )
64         except:
65             log.err(None, 'in handle_share_hashes:')
66         else:
67             self.handle_shares([(share, []) for share in shares], peer)
68     
69     def handle_get_shares(self, hashes, parents, stops, peer):
70         parents = min(parents, 1000//len(hashes))
71         stops = set(stops)
72         shares = []
73         for share_hash in hashes:
74             for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))):
75                 if share.hash in stops:
76                     break
77                 shares.append(share)
78         if len(shares) > 0:
79             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
80         return shares
81     
82     def handle_bestblock(self, header, peer):
83         if self.node.net.PARENT.scrypt(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
84             raise p2p.PeerMisbehavingError('received block header fails PoW test')
85         self.node.handle_header(header)
86     
87     def broadcast_share(self, share_hash):
88         shares = []
89         for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
90             if share.hash in self.shared_share_hashes:
91                 break
92             self.shared_share_hashes.add(share.hash)
93             shares.append(share)
94         
95         for peer in self.peers.itervalues():
96             peer.sendShares([share for share in shares if share.peer_addr != peer.addr], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
97     
98     def start(self):
99         p2p.Node.start(self)
100         
101         self.shared_share_hashes = set(self.node.tracker.items)
102         self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash))
103         
104         @apply
105         @defer.inlineCallbacks
106         def download_shares():
107             while True:
108                 desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
109                 peer_addr, share_hash = random.choice(desired)
110                 
111                 if len(self.peers) == 0:
112                     yield deferral.sleep(1)
113                     continue
114                 peer = random.choice(self.peers.values())
115                 
116                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
117                 try:
118                     shares = yield peer.get_shares(
119                         hashes=[share_hash],
120                         parents=random.randrange(500), # randomize parents so that we eventually get past a too large block of shares
121                         stops=list(set(self.node.tracker.heads) | set(
122                             self.node.tracker.get_nth_parent_hash(head, min(max(0, self.node.tracker.get_height_and_last(head)[0] - 1), 10)) for head in self.node.tracker.heads
123                         ))[:100],
124                     )
125                 except defer.TimeoutError:
126                     print 'Share request timed out!'
127                     continue
128                 except:
129                     log.err(None, 'in download_shares:')
130                     continue
131                 
132                 if not shares:
133                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
134                     continue
135                 self.handle_shares([(share, []) for share in shares], peer)
136         
137         
138         @self.node.best_block_header.changed.watch
139         def _(header):
140             for peer in self.peers.itervalues():
141                 peer.send_bestblock(header=header)
142         
143         # send share when the chain changes to their chain
144         self.node.best_share_var.changed.watch(self.broadcast_share)
145         
146         @self.node.tracker.verified.added.watch
147         def _(share):
148             if share.timestamp < share.min_header['timestamp']:
149                 return
150
151             if not (share.pow_hash <= share.header['bits'].target):
152                 return
153             
154             def spread():
155                 if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or
156                     self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
157                     self.broadcast_share(share.hash)
158             spread()
159             reactor.callLater(5, spread) # so get_height_rel_highest can update
160         
161
162 class Node(object):
163     def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net):
164         self.factory = factory
165         self.bitcoind = bitcoind
166         self.net = net
167         
168         self.tracker = p2pool_data.OkayTracker(self.net)
169         
170         for share in shares:
171             self.tracker.add(share)
172         
173         for share_hash in known_verified_share_hashes:
174             if share_hash in self.tracker.items:
175                 self.tracker.verified.add(self.tracker.items[share_hash])
176         
177         self.p2p_node = None # overwritten externally
178     
179     @defer.inlineCallbacks
180     def start(self):
181         stop_signal = variable.Event()
182         self.stop = stop_signal.happened
183         
184         # BITCOIND WORK
185         
186         self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
187         @defer.inlineCallbacks
188         def work_poller():
189             while stop_signal.times == 0:
190                 flag = self.factory.new_block.get_deferred()
191                 try:
192                     self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
193                 except:
194                     log.err()
195                 yield defer.DeferredList([flag, deferral.sleep(5)], fireOnOneCallback=True)
196         work_poller()
197         
198         # PEER WORK
199         
200         self.best_block_header = variable.Variable(None)
201         self.pow_subsidy = 0
202         def handle_header(new_header):
203             self.pow_bits = self.bitcoind_work.value['bits']
204             self.pow_subsidy = self.bitcoind_work.value['subsidy']
205
206             # check that header matches current target
207             if not (bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
208                 return
209             bitcoind_best_block = self.bitcoind_work.value['previous_block']
210             if (self.best_block_header.value is None
211                 or (
212                     new_header['previous_block'] == bitcoind_best_block and
213                     bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
214                 ) # new is child of current and previous is current
215                 or (
216                     bitcoin_data.scrypt(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
217                     self.best_block_header.value['previous_block'] != bitcoind_best_block
218                 )): # new is current and previous is not a child of current
219                 self.best_block_header.set(new_header)
220         self.handle_header = handle_header
221         @defer.inlineCallbacks
222         def poll_header():
223             if self.factory.conn.value is None:
224                 return
225             handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
226         self.bitcoind_work.changed.watch(lambda _: poll_header())
227         yield deferral.retry('Error while requesting best block header:')(poll_header)()
228         
229         # BEST SHARE
230         
231         self.known_txs_var = variable.Variable({}) # hash -> tx
232         self.mining_txs_var = variable.Variable({}) # hash -> tx
233         self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net)
234         
235         self.best_share_var = variable.Variable(None)
236         self.desired_var = variable.Variable(None)
237         self.bitcoind_work.changed.watch(lambda _: self.set_best_share())
238         self.set_best_share()
239         
240         # setup p2p logic and join p2pool network
241         
242         # update mining_txs according to getwork results
243         @self.bitcoind_work.changed.run_and_watch
244         def _(_=None):
245             new_mining_txs = {}
246             new_known_txs = dict(self.known_txs_var.value)
247             for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']):
248                 new_mining_txs[tx_hash] = tx
249                 new_known_txs[tx_hash] = tx
250             self.mining_txs_var.set(new_mining_txs)
251             self.known_txs_var.set(new_known_txs)
252         # add p2p transactions from bitcoind to known_txs
253         @self.factory.new_tx.watch
254         def _(tx):
255             if tx.timestamp > time.time() + 3600:
256                 return
257             if tx.timestamp > self.bitcoind_work.value['txn_timestamp']:
258                 self.bitcoind_work.value['txn_timestamp'] = tx.timestamp
259             new_known_txs = dict(self.known_txs_var.value)
260             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
261             self.known_txs_var.set(new_known_txs)
262         # forward transactions seen to bitcoind
263         @self.known_txs_var.transitioned.watch
264         @defer.inlineCallbacks
265         def _(before, after):
266             yield deferral.sleep(random.expovariate(1/1))
267             if self.factory.conn.value is None:
268                 return
269             for tx_hash in set(after) - set(before):
270                 self.factory.conn.value.send_tx(tx=after[tx_hash])
271         
272         @self.tracker.verified.added.watch
273         def _(share):
274             if not (share.pow_hash <= share.header['bits'].target):
275                 return
276             
277             block = share.as_block(self.tracker, self.known_txs_var.value)
278             if block is None:
279                 print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
280                 return
281             helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net)
282             print
283             print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
284             print
285         
286         def forget_old_txs():
287             new_known_txs = {}
288             if self.p2p_node is not None:
289                 for peer in self.p2p_node.peers.itervalues():
290                     new_known_txs.update(peer.remembered_txs)
291             new_known_txs.update(self.mining_txs_var.value)
292             for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))):
293                 for tx_hash in share.new_transaction_hashes:
294                     if tx_hash in self.known_txs_var.value:
295                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
296             self.known_txs_var.set(new_known_txs)
297         t = deferral.RobustLoopingCall(forget_old_txs)
298         t.start(10)
299         stop_signal.watch(t.stop)
300         
301         t = deferral.RobustLoopingCall(self.clean_tracker)
302         t.start(5)
303         stop_signal.watch(t.stop)
304     
305     def set_best_share(self):
306         best, desired, decorated_heads, bad_peer_addresses = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
307         
308         self.best_share_var.set(best)
309         self.desired_var.set(desired)
310         if self.p2p_node is not None:
311             for bad_peer_address in bad_peer_addresses:
312                 # XXX O(n)
313                 for peer in self.p2p_node.peers.itervalues():
314                     if peer.addr == bad_peer_address:
315                         peer.badPeerHappened()
316                         break
317     
318     def get_current_txouts(self):
319         return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
320     
321     def clean_tracker(self):
322         best, desired, decorated_heads, bad_peer_addresses = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
323         
324         # eat away at heads
325         if decorated_heads:
326             for i in xrange(1000):
327                 to_remove = set()
328                 for share_hash, tail in self.tracker.heads.iteritems():
329                     if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]:
330                         #print 1
331                         continue
332                     if self.tracker.items[share_hash].time_seen > time.time() - 300:
333                         #print 2
334                         continue
335                     if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid
336                         #print 3
337                         continue
338                     to_remove.add(share_hash)
339                 if not to_remove:
340                     break
341                 for share_hash in to_remove:
342                     if share_hash in self.tracker.verified.items:
343                         self.tracker.verified.remove(share_hash)
344                     self.tracker.remove(share_hash)
345                 #print "_________", to_remove
346         
347         # drop tails
348         for i in xrange(1000):
349             to_remove = set()
350             for tail, heads in self.tracker.tails.iteritems():
351                 if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10:
352                     continue
353                 to_remove.update(self.tracker.reverse.get(tail, set()))
354             if not to_remove:
355                 break
356             # if removed from this, it must be removed from verified
357             #start = time.time()
358             for aftertail in to_remove:
359                 if self.tracker.items[aftertail].previous_hash not in self.tracker.tails:
360                     print "erk", aftertail, self.tracker.items[aftertail].previous_hash
361                     continue
362                 if aftertail in self.tracker.verified.items:
363                     self.tracker.verified.remove(aftertail)
364                 self.tracker.remove(aftertail)
365             #end = time.time()
366             #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))
367         
368         self.set_best_share()