1ff37e415b8c98c6ffe7a80790765f7622a225f7
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import traceback
14
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 class Chain(object):
29     def __init__(self, chain_id_data):
30         assert False
31         self.chain_id_data = chain_id_data
32         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
33         
34         self.share2s = {} # hash -> share2
35         self.highest = variable.Variable(None) # hash
36         
37         self.requesting = set()
38         self.request_map = {}
39     
40     def accept(self, share, net):
41         if share.chain_id_data != self.chain_id_data:
42             raise ValueError('share does not belong to this chain')
43         
44         if share.hash in self.share2s:
45             return 'dup'
46         
47         if share.previous_share_hash is None:
48             previous_height, previous_share2 = -1, None
49         elif share.previous_share_hash not in self.share2s:
50             return 'orphan'
51         else:
52             previous_share2 = self.share2s[share.previous_share_hash]
53             previous_height = previous_share2.height
54         
55         height = previous_height + 1
56         
57         share2 = share.check(self, height, previous_share2, net) # raises exceptions
58         
59         if share2.share is not share:
60             raise ValueError()
61         
62         self.share2s[share.hash] = share2
63         
64         if self.highest.value is None or height > self.share2s[self.highest.value].height:
65             self.highest.set(share.hash)
66         
67         return 'good'
68     
69     def get_highest_share2(self):
70         return self.share2s[self.highest.value] if self.highest.value is not None else None
71     
72     def get_down(self, share_hash):
73         blocks = []
74         
75         while True:
76             blocks.append(share_hash)
77             if share_hash not in self.share2s:
78                 break
79             share2 = self.share2s[share_hash]
80             if share2.share.previous_share_hash is None:
81                 break
82             share_hash = share2.share.previous_share_hash
83         
84         return blocks
85
86 @defer.inlineCallbacks
87 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
88     block_hash = current_block_hash
89     while True:
90         if block_hash == net.ROOT_BLOCK:
91             defer.returnValue(block_hash)
92         try:
93             block = yield get_block(block_hash)
94         except:
95             traceback.print_exc()
96             continue
97         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
98         try:
99             coinbase = p2pool.coinbase_type.unpack(coinbase_data)
100         except bitcoin.data.EarlyEnd:
101             pass
102         else:
103             try:
104                 if coinbase['identifier'] == net.IDENTIFIER:
105                     payouts = {}
106                     for tx_out in block['txs'][0]['tx_outs']:
107                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
108                     subsidy = sum(payouts.itervalues())
109                     if coinbase['subsidy'] == subsidy:
110                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
111                             defer.returnValue(block_hash)
112             except Exception:
113                 print
114                 print 'Error matching block:'
115                 print 'block:', block
116                 traceback.print_exc()
117                 print
118         block_hash = block['header']['previous_block']
119
120 @deferral.retry('Error getting work from bitcoind:', 1)
121 @defer.inlineCallbacks
122 def getwork(bitcoind):
123     # a block could arrive in between these two queries
124     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
125     try:
126         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
127     finally:
128         # get rid of residual errors
129         getwork_df.addErrback(lambda fail: None)
130         height_df.addErrback(lambda fail: None)
131     defer.returnValue((getwork, height))
132
133 @defer.inlineCallbacks
134 def get_payout_script(factory):
135         while True:
136             try:
137                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
138                 if res['reply'] != 'success':
139                     print
140                     print 'Error getting payout script:'
141                     print res
142                     print
143                     continue
144                 my_script = res['script']
145             except Exception:
146                 print
147                 print 'Error getting payout script:'
148                 traceback.print_exc()
149                 print
150             else:
151                 defer.returnValue(my_script)
152             yield deferral.sleep(1)
153
154 @defer.inlineCallbacks
155 def main(args):
156     try:
157         print 'p2pool (version %s)' % (__version__,)
158         print
159         
160         # connect to bitcoind over JSON-RPC and do initial getwork
161         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
162         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
163         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
164         work, height = yield getwork(bitcoind)
165         print '    ...success!'
166         print '    Current block hash: %x height: %i' % (work.previous_block, height)
167         print
168         
169         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
170         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
171         factory = bitcoin.p2p.ClientFactory(args.net)
172         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
173         my_script = yield get_payout_script(factory)
174         print '    ...success!'
175         print '    Payout script:', my_script.encode('hex')
176         print
177         
178         @defer.inlineCallbacks
179         def real_get_block(block_hash):
180             block = yield (yield factory.getProtocol()).get_block(block_hash)
181             print 'Got block %x' % (block_hash,)
182             defer.returnValue(block)
183         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
184         
185         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
186         
187         ht = bitcoin.p2p.HeightTracker(factory)
188         
189         tracker = p2pool.OkayTracker(args.net)
190         chains = expiring_dict.ExpiringDict(300)
191         def get_chain(chain_id_data):
192             return chains.setdefault(chain_id_data, Chain(chain_id_data))
193         
194         # information affecting work that should trigger a long-polling update
195         current_work = variable.Variable(None)
196         # information affecting work that should not trigger a long-polling update
197         current_work2 = variable.Variable(None)
198         
199         @defer.inlineCallbacks
200         def set_real_work():
201             work, height = yield getwork(bitcoind)
202             best, desired = tracker.think(ht)
203             # XXX desired?
204             current_work.set(dict(
205                 version=work.version,
206                 previous_block=work.previous_block,
207                 target=work.target,
208                 
209                 height=height + 1,
210                 
211                 best_share_hash=best,
212             ))
213             current_work2.set(dict(
214                 timestamp=work.timestamp,
215             ))
216         
217         print 'Initializing work...'
218         yield set_real_work()
219         print '    ...success!'
220         
221         # setup p2p logic and join p2pool network
222         
223         def share_share(share, ignore_peer=None):
224             for peer in p2p_node.peers.itervalues():
225                 if peer is ignore_peer:
226                     continue
227                 peer.send_share(share)
228             share.flag_shared()
229         
230         def p2p_share(share, peer=None):
231             if share.hash in tracker.shares:
232                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
233                 return
234             
235             #print "Received share %x" % (share.hash,)
236             
237             tracker.add(share)
238             best, desired = tracker.think(ht)
239             for peer2, share_hash in desired:
240                 print "Requesting parent share %x" % (share_hash,)
241                 peer2.send_getshares(hashes=[share_hash])
242             
243             if share.gentx is not None:
244                 if share.hash <= share.header['target']:
245                     print
246                     print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
247                     print
248                     if factory.conn.value is not None:
249                         factory.conn.value.send_block(block=share.as_block())
250                     else:
251                         print 'No bitcoind connection! Erp!'
252             
253             w = dict(current_work.value)
254             w['best_share_hash'] = best
255             current_work.set(w)
256             
257             if best == share.hash:
258                 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
259             else:
260                 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
261         
262         def p2p_share_hash(share_hash, peer):
263             if share_hash in tracker.shares:
264                 print "Got share hash, already have, ignoring. Hash: %x" % (share_hash,)
265             else:
266                 print "Got share hash, requesting! Hash: %x" % (share_hash,)
267                 peer.send_getshares(hashes=[share_hash])
268         
269         def p2p_get_to_best(chain_id_data, have, peer):
270             chain = get_chain(chain_id_data)
271             if chain.highest.value is None:
272                 return
273             
274             chain_hashes = chain.get_down(chain.highest.value)
275             
276             have2 = set()
277             for hash_ in have:
278                 have2 |= set(chain.get_down(hash_))
279             
280             for share_hash in reversed(chain_hashes):
281                 if share_hash in have2:
282                     continue
283                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
284         
285         def p2p_get_shares(share_hashes, peer):
286             for share_hash in share_hashes:
287                 if share_hash in tracker.shares:
288                     peer.send_share(tracker.shares[share_hash], full=True)
289         
290         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
291         
292         def parse(x):
293             if ':' in x:
294                 ip, port = x.split(':')
295                 return ip, int(port)
296             else:
297                 return x, args.net.P2P_PORT
298         
299         nodes = [
300             ('72.14.191.28', args.net.P2P_PORT),
301             ('62.204.197.159', args.net.P2P_PORT),
302         ]
303         try:
304             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
305         except:
306             traceback.print_exc()
307         
308         p2p_node = p2p.Node(
309             current_work=current_work,
310             port=args.p2pool_port,
311             net=args.net,
312             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
313             mode=0 if args.low_bandwidth else 1,
314             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
315         )
316         p2p_node.handle_share = p2p_share
317         p2p_node.handle_share_hash = p2p_share_hash
318         p2p_node.handle_get_to_best = p2p_get_to_best
319         p2p_node.handle_get_shares = p2p_get_shares
320         
321         p2p_node.start()
322         
323         # send share when the chain changes to their chain
324         def work_changed(new_work):
325             #print 'Work changed:', new_work
326             for share in tracker.get_chain_known(new_work['best_share_hash']):
327                 if share.shared:
328                     break
329                 share_share(share, share.peer)
330         current_work.changed.watch(work_changed)
331         
332         print '    ...success!'
333         print
334         
335         # start listening for workers with a JSON-RPC server
336         
337         print 'Listening for workers on port %i...' % (args.worker_port,)
338         
339         # setup worker logic
340         
341         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
342         
343         def compute(state):
344             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
345             generate_tx = p2pool.generate_transaction(
346                 tracker=tracker,
347                 previous_share_hash=state['best_share_hash'],
348                 new_script=my_script,
349                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
350                 nonce=struct.pack("<Q", random.randrange(2**64)),
351                 block_target=state['target'],
352                 net=args.net,
353             )
354             print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
355             print "Target: %x" % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
356             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
357             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
358             merkle_root = bitcoin.data.merkle_hash(transactions)
359             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
360             
361             timestamp = current_work2.value['timestamp']
362             if state['best_share_hash'] is not None:
363                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
364                 if timestamp2 > timestamp:
365                     print "Toff", timestamp2 - timestamp
366                     timestamp = timestamp2
367             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
368             #print "SENT", 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
369             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
370         
371         def got_response(data):
372             # match up with transactions
373             header = bitcoin.getwork.decode_data(data)
374             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
375             if transactions is None:
376                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
377                 return False
378             block = dict(header=header, txs=transactions)
379             hash_ = bitcoin.data.block_header_type.hash256(block['header'])
380             if hash_ <= block['header']['target']:
381                 print
382                 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
383                 print
384                 if factory.conn.value is not None:
385                     factory.conn.value.send_block(block=block)
386                 else:
387                     print 'No bitcoind connection! Erp!'
388             share = p2pool.Share.from_block(block)
389             print 'GOT SHARE! %x' % (share.hash,)
390             try:
391                 p2p_share(share)
392             except:
393                 print
394                 print 'Error processing data received from worker:'
395                 traceback.print_exc()
396                 print
397                 return False
398             else:
399                 return True
400         
401         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
402         
403         print '    ...success!'
404         print
405         
406         # done!
407         
408         def get_blocks(start_hash):
409             while True:
410                 try:
411                     block = get_block.call_now(start_hash)
412                 except deferral.NotNowError:
413                     break
414                 yield start_hash, block
415                 start_hash = block['header']['previous_block']
416         
417         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
418         
419         class Tx(object):
420             def __init__(self, tx, seen_at_block):
421                 self.hash = bitcoin.data.tx_type.hash256(tx)
422                 self.tx = tx
423                 self.seen_at_block = seen_at_block
424                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
425                 #print
426                 #print "%x %r" % (seen_at_block, tx)
427                 #for mention in self.mentions:
428                 #    print "%x" % mention
429                 #print
430                 self.parents_all_in_blocks = False
431                 self.value_in = 0
432                 #print self.tx
433                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
434                 self._find_parents_in_blocks()
435             
436             @defer.inlineCallbacks
437             def _find_parents_in_blocks(self):
438                 for tx_in in self.tx['tx_ins']:
439                     try:
440                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
441                     except Exception:
442                         return
443                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
444                     #print raw_transaction
445                     if not raw_transaction['parent_blocks']:
446                         return
447                 self.parents_all_in_blocks = True
448             
449             def is_good(self):
450                 if not self.parents_all_in_blocks:
451                     return False
452                 x = self.is_good2()
453                 #print "is_good:", x
454                 return x
455             
456             def is_good2(self):
457                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
458                     if block_hash == self.seen_at_block:
459                         return True
460                     for tx in block['txs']:
461                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
462                         if mentions & self.mentions:
463                             return False
464                 return False
465         
466         @defer.inlineCallbacks
467         def new_tx(tx_hash):
468             try:
469                 assert isinstance(tx_hash, (int, long))
470                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
471                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
472             except:
473                 traceback.print_exc()
474         factory.new_tx.watch(new_tx)
475         
476         def new_block(block):
477             set_real_work()
478         factory.new_block.watch(new_block)
479         
480         print 'Started successfully!'
481         print
482         
483         while True:
484             yield deferral.sleep(1)
485             set_real_work()
486     except:
487         print
488         print 'Fatal error:'
489         traceback.print_exc()
490         print
491         reactor.stop()
492
493 def run():
494     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
495     parser.add_argument('--version', action='version', version=__version__)
496     parser.add_argument('--testnet',
497         help='use the testnet; make sure you change the ports too',
498         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
499     
500     p2pool_group = parser.add_argument_group('p2pool interface')
501     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
502         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
503         type=int, action='store', default=None, dest='p2pool_port')
504     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
505         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
506         type=str, action='append', default=[], dest='p2pool_nodes')
507     parser.add_argument('-l', '--low-bandwidth',
508         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
509         action='store_true', default=False, dest='low_bandwidth')
510     
511     worker_group = parser.add_argument_group('worker interface')
512     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
513         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
514         type=int, action='store', default=9332, dest='worker_port')
515     
516     bitcoind_group = parser.add_argument_group('bitcoind interface')
517     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
518         help='connect to a bitcoind at this address (default: 127.0.0.1)',
519         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
520     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
521         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
522         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
523     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
524         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
525         type=int, action='store', default=None, dest='bitcoind_p2p_port')
526     
527     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
528         help='bitcoind RPC interface username',
529         type=str, action='store', dest='bitcoind_rpc_username')
530     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
531         help='bitcoind RPC interface password',
532         type=str, action='store', dest='bitcoind_rpc_password')
533     
534     args = parser.parse_args()
535     
536     if args.bitcoind_p2p_port is None:
537         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
538     
539     if args.p2pool_port is None:
540         args.p2pool_port = args.net.P2P_PORT
541     
542     reactor.callWhenRunning(main, args)
543     reactor.run()