bug
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import traceback
14
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 class Chain(object):
29     def __init__(self, chain_id_data):
30         assert False
31         self.chain_id_data = chain_id_data
32         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
33         
34         self.share2s = {} # hash -> share2
35         self.highest = variable.Variable(None) # hash
36         
37         self.requesting = set()
38         self.request_map = {}
39     
40     def accept(self, share, net):
41         if share.chain_id_data != self.chain_id_data:
42             raise ValueError('share does not belong to this chain')
43         
44         if share.hash in self.share2s:
45             return 'dup'
46         
47         if share.previous_share_hash is None:
48             previous_height, previous_share2 = -1, None
49         elif share.previous_share_hash not in self.share2s:
50             return 'orphan'
51         else:
52             previous_share2 = self.share2s[share.previous_share_hash]
53             previous_height = previous_share2.height
54         
55         height = previous_height + 1
56         
57         share2 = share.check(self, height, previous_share2, net) # raises exceptions
58         
59         if share2.share is not share:
60             raise ValueError()
61         
62         self.share2s[share.hash] = share2
63         
64         if self.highest.value is None or height > self.share2s[self.highest.value].height:
65             self.highest.set(share.hash)
66         
67         return 'good'
68     
69     def get_highest_share2(self):
70         return self.share2s[self.highest.value] if self.highest.value is not None else None
71     
72     def get_down(self, share_hash):
73         blocks = []
74         
75         while True:
76             blocks.append(share_hash)
77             if share_hash not in self.share2s:
78                 break
79             share2 = self.share2s[share_hash]
80             if share2.share.previous_share_hash is None:
81                 break
82             share_hash = share2.share.previous_share_hash
83         
84         return blocks
85
86 @defer.inlineCallbacks
87 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
88     block_hash = current_block_hash
89     while True:
90         if block_hash == net.ROOT_BLOCK:
91             defer.returnValue(block_hash)
92         try:
93             block = yield get_block(block_hash)
94         except:
95             traceback.print_exc()
96             continue
97         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
98         try:
99             coinbase = p2pool.coinbase_type.unpack(coinbase_data)
100         except bitcoin.data.EarlyEnd:
101             pass
102         else:
103             try:
104                 if coinbase['identifier'] == net.IDENTIFIER:
105                     payouts = {}
106                     for tx_out in block['txs'][0]['tx_outs']:
107                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
108                     subsidy = sum(payouts.itervalues())
109                     if coinbase['subsidy'] == subsidy:
110                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
111                             defer.returnValue(block_hash)
112             except Exception:
113                 print
114                 print 'Error matching block:'
115                 print 'block:', block
116                 traceback.print_exc()
117                 print
118         block_hash = block['header']['previous_block']
119
120 @deferral.retry('Error getting work from bitcoind:', 1)
121 @defer.inlineCallbacks
122 def getwork(bitcoind):
123     # a block could arrive in between these two queries
124     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
125     try:
126         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
127     finally:
128         # get rid of residual errors
129         getwork_df.addErrback(lambda fail: None)
130         height_df.addErrback(lambda fail: None)
131     defer.returnValue((getwork, height))
132
133
134 @defer.inlineCallbacks
135 def main(args):
136     try:
137         print 'p2pool (version %s)' % (__version__,)
138         print
139         
140         # connect to bitcoind over JSON-RPC and do initial getwork
141         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
142         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
143         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
144         
145         work, height = yield getwork(bitcoind)
146         
147         print '    ...success!'
148         print '    Current block hash: %x height: %i' % (work.previous_block, height)
149         print
150         
151         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
152         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
153         factory = bitcoin.p2p.ClientFactory(args.net)
154         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
155         
156         while True:
157             try:
158                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
159                 if res['reply'] != 'success':
160                     print
161                     print 'Error getting payout script:'
162                     print res
163                     print
164                     continue
165                 my_script = res['script']
166             except:
167                 print
168                 print 'Error getting payout script:'
169                 traceback.print_exc()
170                 print
171             else:
172                 break
173             yield deferral.sleep(1)
174         
175         print '    ...success!'
176         print '    Payout script:', my_script.encode('hex')
177         print
178         
179         @defer.inlineCallbacks
180         def real_get_block(block_hash):
181             block = yield (yield factory.getProtocol()).get_block(block_hash)
182             print 'Got block %x' % (block_hash,)
183             defer.returnValue(block)
184         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
185         
186         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
187         
188         tracker = p2pool.OkayTracker(args.net)
189         chains = expiring_dict.ExpiringDict(300)
190         def get_chain(chain_id_data):
191             return chains.setdefault(chain_id_data, Chain(chain_id_data))
192         
193         # information affecting work that should trigger a long-polling update
194         current_work = variable.Variable(None)
195         # information affecting work that should not trigger a long-polling update
196         current_work2 = variable.Variable(None)
197         
198         @defer.inlineCallbacks
199         def set_real_work():
200             work, height = yield getwork(bitcoind)
201             best, desired = tracker.think()
202             # XXX desired?
203             current_work.set(dict(
204                 version=work.version,
205                 previous_block=work.previous_block,
206                 target=work.target,
207                 
208                 height=height + 1,
209                 
210                 best_share_hash=best,
211             ))
212             current_work2.set(dict(
213                 timestamp=work.timestamp,
214             ))
215         
216         print 'Initializing work...'
217         yield set_real_work()
218         print '    ...success!'
219         
220         # setup p2p logic and join p2pool network
221         
222         def share_share(share, ignore_peer=None):
223             for peer in p2p_node.peers.itervalues():
224                 if peer is ignore_peer:
225                     continue
226                 peer.send_share(share.share)
227             share.flag_shared()
228         
229         def p2p_share(share, peer=None):
230             if share.hash <= share.header['target']:
231                 print
232                 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
233                 print
234                 if factory.conn is not None:
235                     factory.conn.send_block(block=share.as_block())
236                 else:
237                     print 'No bitcoind connection! Erp!'
238             
239             print "Received share %x" % (share.hash,)
240             
241             tracker.add(share)
242             best, desired = tracker.think()
243             for peer2, share_hash in desired:
244                 peer2.get_shares([share_hash])
245             
246             w = dict(current_work.value)
247             w['best_share_hash'] = best
248             current_work.set(w)
249             '''
250             if res == 'good':
251                 share2 = chain.share2s[share.hash]
252                 
253                 if chain is current_work.value['current_chain']:
254                     if share.hash == chain.highest.value:
255                         print 'Accepted share, passing to peers. Height: %i Hash: %x Script: %s' % (share2.height, share.hash, share2.shares[-1].encode('hex'))
256                         share_share2(share2, peer)
257                     else:
258                         print 'Accepted share, not highest. Height: %i Hash: %x' % (share2.height, share.hash,)
259                 else:
260                     print 'Accepted share to non-current chain. Height: %i Hash: %x' % (share2.height, share.hash,)
261             elif res == 'dup':
262                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
263             elif res == 'orphan':
264                 print 'Got share referencing unknown share, requesting past shares from peer. Hash: %x' % (share.hash,)
265                 if peer is None:
266                     raise ValueError()
267                 peer.send_gettobest(
268                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
269                     have=random.sample(chain.share2s.keys(), min(8, len(chain.share2s))) + [chain.share2s[chain.highest.value].share.hash] if chain.highest.value is not None else [],
270                 )
271             else:
272                 raise ValueError('unknown result from chain.accept - %r' % (res,))
273             
274             w = dict(current_work.value)
275             w['best_share_hash'] = w['current_chain'].get_highest_share_hash()
276             current_work.set(w)
277             '''
278         
279         def p2p_share_hash(chain_id_data, hash, peer):
280             chain = get_chain(chain_id_data)
281             if chain is current_work.value['current_chain']:
282                 if hash not in chain.share2s:
283                     print "Got share hash, requesting! Hash: %x" % (hash,)
284                     peer.send_getshares(chain_id=p2pool.chain_id_type.unpack(chain_id_data), hashes=[hash])
285                 else:
286                     print "Got share hash, already have, ignoring. Hash: %x" % (hash,)
287             else:
288                 print "Got share hash to non-current chain, storing. Hash: %x" % (hash,)
289                 chain.request_map.setdefault(hash, []).append(peer)
290         
291         def p2p_get_to_best(chain_id_data, have, peer):
292             chain = get_chain(chain_id_data)
293             if chain.highest.value is None:
294                 return
295             
296             chain_hashes = chain.get_down(chain.highest.value)
297             
298             have2 = set()
299             for hash_ in have:
300                 have2 |= set(chain.get_down(hash_))
301             
302             for share_hash in reversed(chain_hashes):
303                 if share_hash in have2:
304                     continue
305                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
306         
307         def p2p_get_shares(share_hashes, peer):
308             for share_hash in share_hashes:
309                 if share_hash in tracker.shares:
310                     peer.send_share(chain.shares[share_hash], full=True)
311         
312         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
313         
314         def parse(x):
315             if ':' in x:
316                 ip, port = x.split(':')
317                 return ip, int(port)
318             else:
319                 return x, args.net.P2P_PORT
320         
321         nodes = [('72.14.191.28', args.net.P2P_PORT)]
322         try:
323             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
324         except:
325             traceback.print_exc()
326         
327         p2p_node = p2p.Node(
328             current_work=current_work,
329             port=args.p2pool_port,
330             net=args.net,
331             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
332             mode=0 if args.low_bandwidth else 1,
333             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
334         )
335         p2p_node.handle_share = p2p_share
336         p2p_node.handle_share_hash = p2p_share_hash
337         p2p_node.handle_get_to_best = p2p_get_to_best
338         p2p_node.handle_get_shares = p2p_get_shares
339         
340         p2p_node.start()
341         
342         # send share when the chain changes to their chain
343         def work_changed(new_work):
344             #print 'Work changed:', new_work
345             for share in tracker.get_chain_known(new_work['best_share_hash']):
346                 if share.shared:
347                     break
348                 share_share(share)
349             return
350             chain = new_work['current_chain']
351             if chain.highest.value is not None:
352                 for share_hash in chain.get_down(chain.highest.value):
353                     share2 = chain.share2s[share_hash]
354                     if not share2.shared:
355                         print 'Sharing share of switched to chain. Hash:', share2.share.hash
356                         share_share2(share2)
357             # XXX ???
358             for hash, peers in chain.request_map.iteritems():
359                 if hash not in chain.share2s:
360                     random.choice(peers).send_getshares(hashes=[hash])
361         current_work.changed.watch(work_changed)
362         
363         print '    ...success!'
364         print
365         
366         # start listening for workers with a JSON-RPC server
367         
368         print 'Listening for workers on port %i...' % (args.worker_port,)
369         
370         # setup worker logic
371         
372         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
373         
374         def compute(state):
375             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
376             generate_tx = p2pool.generate_transaction(
377                 tracker=tracker,
378                 previous_share_hash=state['best_share_hash'],
379                 new_script=my_script,
380                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
381                 nonce=struct.pack("<Q", random.randrange(2**64)),
382                 block_target=state['target'],
383                 net=args.net,
384             )
385             print 'Generating!' #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
386             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
387             merkle_root = bitcoin.data.merkle_hash(transactions)
388             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
389             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['target'])
390             print "SENT", 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
391             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
392         
393         def got_response(data):
394             # match up with transactions
395             header = bitcoin.getwork.decode_data(data)
396             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
397             if transactions is None:
398                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
399                 return False
400             share = p2pool.Share.from_block(dict(header=header, txs=transactions))
401             print 'GOT SHARE! %x' % (share.hash,)
402             try:
403                 p2p_share(share)
404             except:
405                 print
406                 print 'Error processing data received from worker:'
407                 traceback.print_exc()
408                 print
409                 return False
410             else:
411                 return True
412         
413         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
414         
415         print '    ...success!'
416         print
417         
418         # done!
419         
420         def get_blocks(start_hash):
421             while True:
422                 try:
423                     block = get_block.call_now(start_hash)
424                 except deferral.NotNowError:
425                     break
426                 yield start_hash, block
427                 start_hash = block['header']['previous_block']
428         
429         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
430         
431         class Tx(object):
432             def __init__(self, tx, seen_at_block):
433                 self.hash = bitcoin.data.tx_type.hash256(tx)
434                 self.tx = tx
435                 self.seen_at_block = seen_at_block
436                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
437                 #print
438                 #print "%x %r" % (seen_at_block, tx)
439                 #for mention in self.mentions:
440                 #    print "%x" % mention
441                 #print
442                 self.parents_all_in_blocks = False
443                 self.value_in = 0
444                 #print self.tx
445                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
446                 self._find_parents_in_blocks()
447             
448             @defer.inlineCallbacks
449             def _find_parents_in_blocks(self):
450                 for tx_in in self.tx['tx_ins']:
451                     try:
452                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
453                     except Exception:
454                         return
455                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
456                     #print raw_transaction
457                     if not raw_transaction['parent_blocks']:
458                         return
459                 self.parents_all_in_blocks = True
460             
461             def is_good(self):
462                 if not self.parents_all_in_blocks:
463                     return False
464                 x = self.is_good2()
465                 #print "is_good:", x
466                 return x
467             
468             def is_good2(self):
469                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
470                     if block_hash == self.seen_at_block:
471                         return True
472                     for tx in block['txs']:
473                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
474                         if mentions & self.mentions:
475                             return False
476                 return False
477         
478         @defer.inlineCallbacks
479         def new_tx(tx_hash):
480             try:
481                 assert isinstance(tx_hash, (int, long))
482                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
483                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
484             except:
485                 traceback.print_exc()
486         factory.new_tx.watch(new_tx)
487         
488         def new_block(block):
489             set_real_work()
490         factory.new_block.watch(new_block)
491         
492         print 'Started successfully!'
493         print
494         
495         while True:
496             yield deferral.sleep(1)
497             set_real_work()
498     except:
499         print
500         print 'Fatal error:'
501         traceback.print_exc()
502         print
503         reactor.stop()
504
505 def run():
506     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
507     parser.add_argument('--version', action='version', version=__version__)
508     parser.add_argument('--testnet',
509         help='use the testnet; make sure you change the ports too',
510         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
511     
512     p2pool_group = parser.add_argument_group('p2pool interface')
513     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
514         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
515         type=int, action='store', default=None, dest='p2pool_port')
516     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
517         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
518         type=str, action='append', default=[], dest='p2pool_nodes')
519     parser.add_argument('-l', '--low-bandwidth',
520         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
521         action='store_true', default=False, dest='low_bandwidth')
522     
523     worker_group = parser.add_argument_group('worker interface')
524     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
525         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
526         type=int, action='store', default=9332, dest='worker_port')
527     
528     bitcoind_group = parser.add_argument_group('bitcoind interface')
529     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
530         help='connect to a bitcoind at this address (default: 127.0.0.1)',
531         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
532     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
533         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
534         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
535     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
536         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
537         type=int, action='store', default=None, dest='bitcoind_p2p_port')
538     
539     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
540         help='bitcoind RPC interface username',
541         type=str, action='store', dest='bitcoind_rpc_username')
542     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
543         help='bitcoind RPC interface password',
544         type=str, action='store', dest='bitcoind_rpc_password')
545     
546     args = parser.parse_args()
547     
548     if args.bitcoind_p2p_port is None:
549         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
550     
551     if args.p2pool_port is None:
552         args.p2pool_port = args.net.P2P_PORT
553     
554     reactor.callWhenRunning(main, args)
555     reactor.run()