406787c9fe62aef1d8a72ad7865014a3adc8ae8a
[p2pool.git] / p2pool / node.py
1 import random
2 import sys
3
4 from twisted.internet import defer, reactor, task
5 from twisted.python import log
6
7 from p2pool import data as p2pool_data, p2p
8 from p2pool.bitcoin import data as bitcoin_data, helper, height_tracker
9 from p2pool.util import deferral, variable
10
11
12 class P2PNode(p2p.Node):
13     def __init__(self, node, p2pool_port, p2pool_conns, addrs, connect_addrs):
14         self.node = node
15         p2p.Node.__init__(self,
16             best_share_hash_func=lambda: node.best_share_var.value,
17             port=p2pool_port,
18             net=node.net,
19             addr_store=addrs,
20             connect_addrs=connect_addrs,
21             max_incoming_conns=p2pool_conns,
22             known_txs_var=node.known_txs_var,
23             mining_txs_var=node.mining_txs_var,
24         )
25     
26     def handle_shares(self, shares, peer):
27         if len(shares) > 5:
28             print 'Processing %i shares from %s...' % (len(shares), '%s:%i' % peer.addr if peer is not None else None)
29         
30         new_count = 0
31         for share in shares:
32             if share.hash in self.node.tracker.items:
33                 #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
34                 continue
35             
36             new_count += 1
37             
38             #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
39             
40             self.node.tracker.add(share)
41         
42         if new_count:
43             self.node.set_best_share()
44         
45         if len(shares) > 5:
46             print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(self.node.tracker.items), 2*self.node.net.CHAIN_LENGTH)
47     
48     @defer.inlineCallbacks
49     def handle_share_hashes(self, hashes, peer):
50         new_hashes = [x for x in hashes if x not in self.node.tracker.items]
51         if not new_hashes:
52             return
53         try:
54             shares = yield peer.get_shares(
55                 hashes=new_hashes,
56                 parents=0,
57                 stops=[],
58             )
59         except:
60             log.err(None, 'in handle_share_hashes:')
61         else:
62             self.handle_shares(shares, peer)
63     
64     def handle_get_shares(self, hashes, parents, stops, peer):
65         parents = min(parents, 1000//len(hashes))
66         stops = set(stops)
67         shares = []
68         for share_hash in hashes:
69             for share in self.node.tracker.get_chain(share_hash, min(parents + 1, self.node.tracker.get_height(share_hash))):
70                 if share.hash in stops:
71                     break
72                 shares.append(share)
73         print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
74         return shares
75     
76     def handle_bestblock(self, header, peer):
77         if self.node.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(header)) > header['bits'].target:
78             raise p2p.PeerMisbehavingError('received block header fails PoW test')
79         self.node.handle_header(header)
80     
81     @defer.inlineCallbacks
82     def broadcast_share(self, share_hash):
83         shares = []
84         for share in self.node.tracker.get_chain(share_hash, min(5, self.node.tracker.get_height(share_hash))):
85             if share.hash in self.shared_share_hashes:
86                 break
87             self.shared_share_hashes.add(share.hash)
88             shares.append(share)
89         
90         for peer in list(self.peers.itervalues()):
91             yield peer.sendShares([share for share in shares if share.peer is not peer], self.node.tracker, self.node.known_txs_var.value, include_txs_with=[share_hash])
92     
93     def start(self):
94         p2p.Node.start(self)
95         
96         self.shared_share_hashes = set(self.node.tracker.items)
97         self.node.tracker.removed.watch_weakref(self, lambda self, share: self.shared_share_hashes.discard(share.hash))
98         
99         @apply
100         @defer.inlineCallbacks
101         def download_shares():
102             while True:
103                 desired = yield self.node.desired_var.get_when_satisfies(lambda val: len(val) != 0)
104                 peer2, share_hash = random.choice(desired)
105                 
106                 if len(self.peers) == 0:
107                     yield deferral.sleep(1)
108                     continue
109                 peer = random.choice(self.peers.values())
110                 
111                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
112                 try:
113                     shares = yield peer.get_shares(
114                         hashes=[share_hash],
115                         parents=500,
116                         stops=[],
117                     )
118                 except:
119                     log.err(None, 'in download_shares:')
120                     continue
121                 
122                 if not shares:
123                     yield deferral.sleep(1) # sleep so we don't keep rerequesting the same share nobody has
124                     continue
125                 self.handle_shares(shares, peer)
126         
127         
128         @self.node.best_block_header.changed.watch
129         def _(header):
130             for peer in self.peers.itervalues():
131                 peer.send_bestblock(header=header)
132         
133         # send share when the chain changes to their chain
134         self.node.best_share_var.changed.watch(self.broadcast_share)
135         
136         @self.node.tracker.verified.added.watch
137         def _(share):
138             if not (share.pow_hash <= share.header['bits'].target):
139                 return
140             
141             def spread():
142                 if (self.node.get_height_rel_highest(share.header['previous_block']) > -5 or
143                     self.node.bitcoind_work.value['previous_block'] in [share.header['previous_block'], share.header_hash]):
144                     self.broadcast_share(share.hash)
145             spread()
146             reactor.callLater(5, spread) # so get_height_rel_highest can update
147         
148
149 class Node(object):
150     def __init__(self, factory, bitcoind, shares, known_verified_share_hashes, net):
151         self.factory = factory
152         self.bitcoind = bitcoind
153         self.net = net
154         
155         self.tracker = p2pool_data.OkayTracker(self.net)
156         
157         for share in shares:
158             self.tracker.add(share)
159         
160         for share_hash in known_verified_share_hashes:
161             if share_hash in self.tracker.items:
162                 self.tracker.verified.add(self.tracker.items[share_hash])
163         
164         self.p2p_node = None # overwritten externally
165     
166     @defer.inlineCallbacks
167     def start(self):
168         stop_signal = variable.Event()
169         self.stop = stop_signal.happened
170         
171         # BITCOIND WORK
172         
173         self.bitcoind_work = variable.Variable((yield helper.getwork(self.bitcoind)))
174         @defer.inlineCallbacks
175         def work_poller():
176             while True:
177                 flag = self.factory.new_block.get_deferred()
178                 try:
179                     self.bitcoind_work.set((yield helper.getwork(self.bitcoind, self.bitcoind_work.value['use_getblocktemplate'])))
180                 except:
181                     log.err()
182                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
183         work_poller()
184         
185         # PEER WORK
186         
187         self.best_block_header = variable.Variable(None)
188         def handle_header(new_header):
189             # check that header matches current target
190             if not (self.net.PARENT.POW_FUNC(bitcoin_data.block_header_type.pack(new_header)) <= self.bitcoind_work.value['bits'].target):
191                 return
192             bitcoind_best_block = self.bitcoind_work.value['previous_block']
193             if (self.best_block_header.value is None
194                 or (
195                     new_header['previous_block'] == bitcoind_best_block and
196                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(self.best_block_header.value)) == bitcoind_best_block
197                 ) # new is child of current and previous is current
198                 or (
199                     bitcoin_data.hash256(bitcoin_data.block_header_type.pack(new_header)) == bitcoind_best_block and
200                     self.best_block_header.value['previous_block'] != bitcoind_best_block
201                 )): # new is current and previous is not a child of current
202                 self.best_block_header.set(new_header)
203         self.handle_header = handle_header
204         @defer.inlineCallbacks
205         def poll_header():
206             handle_header((yield self.factory.conn.value.get_block_header(self.bitcoind_work.value['previous_block'])))
207         self.bitcoind_work.changed.watch(lambda _: poll_header())
208         yield deferral.retry('Error while requesting best block header:')(poll_header)()
209         
210         # BEST SHARE
211         
212         self.known_txs_var = variable.Variable({}) # hash -> tx
213         self.mining_txs_var = variable.Variable({}) # hash -> tx
214         self.get_height_rel_highest = yield height_tracker.get_height_rel_highest_func(self.bitcoind, self.factory, lambda: self.bitcoind_work.value['previous_block'], self.net)
215         
216         self.best_share_var = variable.Variable(None)
217         self.desired_var = variable.Variable(None)
218         self.bitcoind_work.changed.watch(lambda _: self.set_best_share())
219         self.set_best_share()
220         
221         # setup p2p logic and join p2pool network
222         
223         # update mining_txs according to getwork results
224         @self.bitcoind_work.changed.run_and_watch
225         def _(_=None):
226             new_mining_txs = {}
227             new_known_txs = dict(self.known_txs_var.value)
228             for tx_hash, tx in zip(self.bitcoind_work.value['transaction_hashes'], self.bitcoind_work.value['transactions']):
229                 new_mining_txs[tx_hash] = tx
230                 new_known_txs[tx_hash] = tx
231             self.mining_txs_var.set(new_mining_txs)
232             self.known_txs_var.set(new_known_txs)
233         # add p2p transactions from bitcoind to known_txs
234         @self.factory.new_tx.watch
235         def _(tx):
236             new_known_txs = dict(self.known_txs_var.value)
237             new_known_txs[bitcoin_data.hash256(bitcoin_data.tx_type.pack(tx))] = tx
238             self.known_txs_var.set(new_known_txs)
239         # forward transactions seen to bitcoind
240         @self.known_txs_var.transitioned.watch
241         @defer.inlineCallbacks
242         def _(before, after):
243             yield deferral.sleep(random.expovariate(1/1))
244             for tx_hash in set(after) - set(before):
245                 self.factory.conn.value.send_tx(tx=after[tx_hash])
246         
247         @self.tracker.verified.added.watch
248         def _(share):
249             if not (share.pow_hash <= share.header['bits'].target):
250                 return
251             
252             block = share.as_block(self.tracker, self.known_txs_var.value)
253             if block is None:
254                 print >>sys.stderr, 'GOT INCOMPLETE BLOCK FROM PEER! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
255                 return
256             helper.submit_block(block, True, self.factory, self.bitcoind, self.bitcoind_work, self.net)
257             print
258             print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %s%064x' % (p2pool_data.format_hash(share.hash), self.net.PARENT.BLOCK_EXPLORER_URL_PREFIX, share.header_hash)
259             print
260         
261         def forget_old_txs():
262             new_known_txs = {}
263             if self.p2p_node is not None:
264                 for peer in self.p2p_node.peers.itervalues():
265                     new_known_txs.update(peer.remembered_txs)
266             new_known_txs.update(self.mining_txs_var.value)
267             for share in self.tracker.get_chain(self.best_share_var.value, min(120, self.tracker.get_height(self.best_share_var.value))):
268                 for tx_hash in share.new_transaction_hashes:
269                     if tx_hash in self.known_txs_var.value:
270                         new_known_txs[tx_hash] = self.known_txs_var.value[tx_hash]
271             self.known_txs_var.set(new_known_txs)
272         task.LoopingCall(forget_old_txs).start(10)
273     
274     def set_best_share(self):
275         best, desired = self.tracker.think(self.get_height_rel_highest, self.bitcoind_work.value['previous_block'], self.bitcoind_work.value['bits'], self.known_txs_var.value)
276         
277         self.best_share_var.set(best)
278         self.desired_var.set(desired)
279     
280     def get_current_txouts(self):
281         return p2pool_data.get_expected_payouts(self.tracker, self.best_share_var.value, self.bitcoind_work.value['bits'].target, self.bitcoind_work.value['subsidy'], self.net)