moving forgetting about old shares into node
[p2pool.git] / p2pool / node.py
1 import random
2 import sys
3 import time
4
5 from twisted.internet import defer, reactor, task
6 from twisted.python import log
7
8 from p2pool import data as p2pool_data, p2p
9 from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker
10 from p2pool.util import deferral, variable
11
12
13 class P2PNode(p2p.Node):
14     def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs):
15         self.node = node
16         p2p.Node.__init__(self,
17             best_share_hash_func=lambda: node.best_share_var.value,
18             port=p2pool_port,
19             net=node.net,
20             addr_store=addrs,
21             connect_addrs=connect_addrs,
22             max_incoming_conns=p2pool_conns,
23             known_txs_var=node.known_txs_var,
24             mining_txs_var=node.mining_txs_var,
25         )
26     
27     def handle_shares(self, shares, peer):
28         if len(shares) > 5:
29             print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
30         
31         new_count = 0
32         for share in shares:
33             if share.hash in self.node.tracker.items:
34                 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
35                 continue
36             
37             new_count += 1
38             
39             #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
40             
41             self.node.tracker.add(share)
42         
43         if new_count:
44             self.node.set_best_share()
45         
46         if len(shares) > 5:
47             print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH)
48     
49     @defer.inlineCallbacks
50     def handle_share_hashes(self, hashes, peer):
51         new_hashes = [x for x in hashes if x not in self.node.tracker.items]
52         if not new_hashes:
53             return
54         try:
55             shares = yield peer.get_shares(
56                 hashes=new_hashes,
57                 parents=0,
58                 stops=[],
59             )
60         except:
61             log.err(None, 'in handle_share_hashes:')
62         else:
63             self.handle_shares(shares, peer)
64     
65     def handle_get_shares(self, hashes, parents, stops, peer):
66         parents = min(parents, 1000//len(hashes))
67         stops = set(stops)
68         shares = []
69         for share_hash in hashes:
70             for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))):
71                 if share.hash in stops:
72                     break
73                 shares.append(share)
74         print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
75         return shares
76     
77     def handle_bestblock(self, header, peer):
78         if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
79             raise p2p.PeerMisbehavingError('received block header fails PoW test')
80         self.node.handle_header(header)
81     
82     @defer.inlineCallbacks
83     def broadcast_share(self, share_hash):
84         shares = []
85         for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
86             if share.hash in self.shared_share_hashes:
87                 break
88             self.shared_share_hashes.add(share.hash)
89             shares.append(share)
90         
91         for peer in list(self.peers.itervalues()):
92             yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
93     
94     def start(self):
95         p2p.Node.start(self)
96         
97         self.shared_share_hashes = set(self.node.tracker.items)
98         self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash))
99         
100         @apply
101         @defer.inlineCallbacks
102         def download_shares():
103             while True:
104                 desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
105                 peer2, share_hash = random.choice(desired)
106                 
107                 if len(self.peers) == 0:
108                     yield deferral.sleep(1)
109                     continue
110                 peer = random.choice(self.peers.values())
111                 
112                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
113                 try:
114                     shares = yield peer.get_shares(
115                         hashes=[share_hash],
116                         parents=500,
117                         stops=[],
118                     )
119                 except:
120                     log.err(None, 'in download_shares:')
121                     continue
122                 
123                 if not shares:
124                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
125                     continue
126                 self.handle_shares(shares, peer)
127         
128         
129         @self.node.best_block_header.changed.watch
130         def _(header):
131             for peer in self.peers.itervalues():
132                 peer.send_bestblock(header=header)
133         
134         # send share when the chain changes to their chain
135         self.node.best_share_var.changed.watch(self.broadcast_share)
136         
137         @self.node.tracker.verified.added.watch
138         def _(share):
139             if not (share.pow_hash <= share.header['bits'].target):
140                 return
141             
142             def spread():
143                 if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or
144                     self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
145                     self.broadcast_share(share.hash)
146             spread()
147             reactor.callLater(5, spread) # so get_height_rel_highest can update
148         
149
150 class Node(object):
151     def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net):
152         self.factory = factory
153         self.bitcoind = bitcoind
154         self.net = net
155         
156         self.tracker = p2pool_data.OkayTracker(self.net)
157         
158         for share in shares:
159             self.tracker.add(share)
160         
161         for share_hash in known_verified_share_hashes:
162             if share_hash in self.tracker.items:
163                 self.tracker.verified.add(self.tracker.items[share_hash])
164         
165         self.p2p_node = None # overwritten externally
166     
167     @defer.inlineCallbacks
168     def start(self):
169         stop_signal = variable.Event()
170         self.stop = stop_signal.happened
171         
172         # BITCOIND WORK
173         
174         self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
175         @defer.inlineCallbacks
176         def work_poller():
177             while True:
178                 flag = self.factory.new_block.get_deferred()
179                 try:
180                     self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
181                 except:
182                     log.err()
183                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
184         work_poller()
185         
186         # PEER WORK
187         
188         self.best_block_header = variable.Variable(None)
189         def handle_header(new_header):
190             # check that header matches current target
191             if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
192                 return
193             bitcoind_best_block = self.bitcoind_work.value['previous_block']
194             if (self.best_block_header.value is None
195                 or (
196                     new_header['previous_block'] == bitcoind_best_block and
197                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
198                 ) # new is child of current and previous is current
199                 or (
200                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
201                     self.best_block_header.value['previous_block'] != bitcoind_best_block
202                 )): # new is current and previous is not a child of current
203                 self.best_block_header.set(new_header)
204         self.handle_header = handle_header
205         @defer.inlineCallbacks
206         def poll_header():
207             handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
208         self.bitcoind_work.changed.watch(lambda _: poll_header())
209         yield deferral.retry('Error while requesting best block header:')(poll_header)()
210         
211         # BEST SHARE
212         
213         self.known_txs_var = variable.Variable({}) # hash -> tx
214         self.mining_txs_var = variable.Variable({}) # hash -> tx
215         self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net)
216         
217         self.best_share_var = variable.Variable(None)
218         self.desired_var = variable.Variable(None)
219         self.bitcoind_work.changed.watch(lambda _: self.set_best_share())
220         self.set_best_share()
221         
222         # setup p2p logic and join p2pool network
223         
224         # update mining_txs according to getwork results
225         @self.bitcoind_work.changed.run_and_watch
226         def _(_=None):
227             new_mining_txs = {}
228             new_known_txs = dict(self.known_txs_var.value)
229             for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']):
230                 new_mining_txs[tx_hash] = tx
231                 new_known_txs[tx_hash] = tx
232             self.mining_txs_var.set(new_mining_txs)
233             self.known_txs_var.set(new_known_txs)
234         # add p2p transactions from bitcoind to known_txs
235         @self.factory.new_tx.watch
236         def _(tx):
237             new_known_txs = dict(self.known_txs_var.value)
238             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
239             self.known_txs_var.set(new_known_txs)
240         # forward transactions seen to bitcoind
241         @self.known_txs_var.transitioned.watch
242         @defer.inlineCallbacks
243         def _(before, after):
244             yield deferral.sleep(random.expovariate(1/1))
245             if self.factory.conn.value is None:
246                 return
247             for tx_hash in set(after) - set(before):
248                 self.factory.conn.value.send_tx(tx=after[tx_hash])
249         
250         @self.tracker.verified.added.watch
251         def _(share):
252             if not (share.pow_hash <= share.header['bits'].target):
253                 return
254             
255             block = share.as_block(self.tracker, self.known_txs_var.value)
256             if block is None:
257                 print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
258                 return
259             helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net)
260             print
261             print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
262             print
263         
264         def forget_old_txs():
265             new_known_txs = {}
266             if self.p2p_node is not None:
267                 for peer in self.p2p_node.peers.itervalues():
268                     new_known_txs.update(peer.remembered_txs)
269             new_known_txs.update(self.mining_txs_var.value)
270             for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))):
271                 for tx_hash in share.new_transaction_hashes:
272                     if tx_hash in self.known_txs_var.value:
273                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
274             self.known_txs_var.set(new_known_txs)
275         task.LoopingCall(forget_old_txs).start(10)
276         
277         task.LoopingCall(self.clean_tracker).start(5)
278     
279     def set_best_share(self):
280         best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
281         
282         self.best_share_var.set(best)
283         self.desired_var.set(desired)
284     
285     def get_current_txouts(self):
286         return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)
287     
288     def clean_tracker(self):
289         best, desired, decorated_heads = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
290         
291         # eat away at heads
292         if decorated_heads:
293             for i in xrange(1000):
294                 to_remove = set()
295                 for share_hash, tail in self.tracker.heads.iteritems():
296                     if share_hash in [head_hash for score, head_hash in decorated_heads[-5:]]:
297                         #print 1
298                         continue
299                     if self.tracker.items[share_hash].time_seen > time.time() - 300:
300                         #print 2
301                         continue
302                     if share_hash not in self.tracker.verified.items and max(self.tracker.items[after_tail_hash].time_seen for after_tail_hash in self.tracker.reverse.get(tail)) > time.time() - 120: # XXX stupid
303                         #print 3
304                         continue
305                     to_remove.add(share_hash)
306                 if not to_remove:
307                     break
308                 for share_hash in to_remove:
309                     if share_hash in self.tracker.verified.items:
310                         self.tracker.verified.remove(share_hash)
311                     self.tracker.remove(share_hash)
312                 #print "_________", to_remove
313         
314         # drop tails
315         for i in xrange(1000):
316             to_remove = set()
317             for tail, heads in self.tracker.tails.iteritems():
318                 if min(self.tracker.get_height(head) for head in heads) < 2*self.tracker.net.CHAIN_LENGTH + 10:
319                     continue
320                 for aftertail in self.tracker.reverse.get(tail, set()):
321                     if len(self.tracker.reverse[self.tracker.items[aftertail].previous_hash]) > 1: # XXX
322                         print "raw"
323                         continue
324                     to_remove.add(aftertail)
325             if not to_remove:
326                 break
327             # if removed from this, it must be removed from verified
328             #start = time.time()
329             for aftertail in to_remove:
330                 if self.tracker.items[aftertail].previous_hash not in self.tracker.tails:
331                     print "erk", aftertail, self.tracker.items[aftertail].previous_hash
332                     continue
333                 if aftertail in self.tracker.verified.items:
334                     self.tracker.verified.remove(aftertail)
335                 self.tracker.remove(aftertail)
336             #end = time.time()
337             #print "removed! %i %f" % (len(to_remove), (end - start)/len(to_remove))