6eaf294c35a91fe6e3d3ac0dd13d3fcffc586b2d
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import subprocess
11 import sys
12 import traceback
13
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16
17 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
18 import db
19 import expiring_dict
20 import jsonrpc
21 import p2p
22 import p2pool.data as p2pool
23 import util
24 import worker_interface
25
26 try:
27     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
28 except:
29     __version__ = 'unknown'
30
31 if hasattr(sys, "frozen"):
32     __file__ = sys.executable
33
34 class Chain(object):
35     def __init__(self, chain_id_data):
36         self.chain_id_data = chain_id_data
37         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
38         
39         self.share2s = {} # hash -> share2
40         self.highest = util.Variable(None) # hash
41         
42         self.requesting = set()
43         self.request_map = {}
44     
45     def accept(self, share, net):
46         if share.chain_id_data != self.chain_id_data:
47             raise ValueError('share does not belong to this chain')
48         
49         if share.hash in self.share2s:
50             return 'dup'
51         
52         if share.previous_share_hash is None:
53             previous_height, previous_share2 = -1, None
54         elif share.previous_share_hash not in self.share2s:
55             return 'orphan'
56         else:
57             previous_share2 = self.share2s[share.previous_share_hash]
58             previous_height = previous_share2.height
59         
60         height = previous_height + 1
61         
62         share2 = share.check(self, height, previous_share2, net) # raises exceptions
63         
64         if share2.share is not share:
65             raise ValueError()
66         
67         self.share2s[share.hash] = share2
68         
69         if self.highest.value is None or height > self.share2s[self.highest.value].height:
70             self.highest.set(share.hash)
71         
72         return 'good'
73     
74     def get_highest_share2(self):
75         return self.share2s[self.highest.value] if self.highest.value is not None else None
76     
77     def get_down(self, share_hash):
78         blocks = []
79         
80         while True:
81             blocks.append(share_hash)
82             if share_hash not in self.share2s:
83                 break
84             share2 = self.share2s[share_hash]
85             if share2.share.previous_share_hash is None:
86                 break
87             share_hash = share2.share.previous_share_hash
88         
89         return blocks
90
91 @defer.inlineCallbacks
92 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
93     block_hash = current_block_hash
94     while True:
95         if block_hash == net.ROOT_BLOCK:
96             defer.returnValue(block_hash)
97         try:
98             block = yield get_block(block_hash)
99         except:
100             traceback.print_exc()
101             continue
102         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
103         try:
104             coinbase = p2pool.coinbase_type.unpack(coinbase_data)
105         except bitcoin.data.EarlyEnd:
106             pass
107         else:
108             try:
109                 if coinbase['identifier'] == net.IDENTIFIER:
110                     payouts = {}
111                     for tx_out in block['txs'][0]['tx_outs']:
112                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
113                     subsidy = sum(payouts.itervalues())
114                     if coinbase['subsidy'] == subsidy:
115                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
116                             defer.returnValue(block_hash)
117             except Exception:
118                 print
119                 print 'Error matching block:'
120                 print 'block:', block
121                 traceback.print_exc()
122                 print
123         block_hash = block['header']['previous_block']
124
125 @defer.inlineCallbacks
126 def getwork(bitcoind):
127     while True:
128         try:
129             # a block could arrive in between these two queries
130             getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
131             try:
132                 getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
133             finally:
134                 # get rid of residual errors
135                 getwork_df.addErrback(lambda fail: None)
136                 height_df.addErrback(lambda fail: None)
137         except:
138             print
139             print 'Error getting work from bitcoind:'
140             traceback.print_exc()
141             print
142             
143             
144             yield util.sleep(1)
145             
146             continue
147         defer.returnValue((getwork, height))
148
149
150 @defer.inlineCallbacks
151 def main(args):
152     try:
153         net = p2pool.Testnet if args.testnet else p2pool.Main
154         
155         print 'p2pool (version %s)' % (__version__,)
156         print
157         
158         # connect to bitcoind over JSON-RPC and do initial getwork
159         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
160         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
161         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
162         
163         work, height = yield getwork(bitcoind)
164         
165         print '    ...success!'
166         print '    Current block hash: %x height: %i' % (work.previous_block, height)
167         print
168         
169         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
170         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
171         factory = bitcoin.p2p.ClientFactory(args.testnet)
172         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
173         
174         while True:
175             try:
176                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
177                 if res['reply'] != 'success':
178                     print
179                     print 'Error getting payout script:'
180                     print res
181                     print
182                     continue
183                 my_script = res['script']
184             except:
185                 print
186                 print 'Error getting payout script:'
187                 traceback.print_exc()
188                 print
189             else:
190                 break
191             yield util.sleep(1)
192         
193         print '    ...success!'
194         print '    Payout script:', my_script.encode('hex')
195         print
196         
197         @defer.inlineCallbacks
198         def real_get_block(block_hash):
199             block = yield (yield factory.getProtocol()).get_block(block_hash)
200             print 'Got block %x' % (block_hash,)
201             defer.returnValue(block)
202         get_block = util.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
203         
204         get_raw_transaction = util.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
205         
206         chains = expiring_dict.ExpiringDict(300)
207         def get_chain(chain_id_data):
208             return chains.setdefault(chain_id_data, Chain(chain_id_data))
209         # information affecting work that should trigger a long-polling update
210         current_work = util.Variable(None)
211         # information affecting work that should not trigger a long-polling update
212         current_work2 = util.Variable(None)
213         
214         share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares]
215         
216         @defer.inlineCallbacks
217         def set_real_work():
218             work, height = yield getwork(bitcoind)
219             last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, net))
220             chain = get_chain(p2pool.chain_id_type.pack(dict(last_p2pool_block_hash=last_p2pool_block_hash, bits=work.bits)))
221             current_work.set(dict(
222                 version=work.version,
223                 previous_block=work.previous_block,
224                 bits=work.bits,
225                 height=height + 1,
226                 current_chain=chain,
227                 highest_p2pool_share2=chain.get_highest_share2(),
228                 last_p2pool_block_hash=last_p2pool_block_hash,
229             ))
230             current_work2.set(dict(
231                 timestamp=work.timestamp,
232             ))
233         
234         print 'Searching for last p2pool-generated block...'
235         yield set_real_work()
236         print '    ...success!'
237         print '    Matched block %x' % (current_work.value['last_p2pool_block_hash'],)
238         print
239         
240         # setup p2p logic and join p2pool network
241         
242         def share_share2(share2, ignore_peer=None):
243             for peer in p2p_node.peers.itervalues():
244                 if peer is ignore_peer:
245                     continue
246                 peer.send_share(share2.share)
247             share2.flag_shared()
248         
249         def p2p_share(share, peer=None):
250             if share.hash <= conv.bits_to_target(share.header['bits']):
251                 print
252                 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
253                 #print share.__dict__
254                 print
255                 if factory.conn is not None:
256                     factory.conn.send_block(block=share.as_block())
257                 else:
258                     print 'No bitcoind connection! Erp!'
259             
260             chain = get_chain(share.chain_id_data)
261             res = chain.accept(share, net)
262             if res == 'good':
263                 share2 = chain.share2s[share.hash]
264                 
265                 def save():
266                     hash_data = bitcoin.p2p.HashType().pack(share.hash)
267                     share1_data = p2pool.share1.pack(share.as_share1())
268                     for share_db in share_dbs:
269                         share_db[hash_data] = share1_data
270                 reactor.callLater(1, save)
271                 
272                 if chain is current_work.value['current_chain']:
273                     if share.hash == chain.highest.value:
274                         print 'Accepted share, passing to peers. Height: %i Hash: %x Script: %s' % (share2.height, share.hash, share2.shares[-1].encode('hex'))
275                         share_share2(share2, peer)
276                     else:
277                         print 'Accepted share, not highest. Height: %i Hash: %x' % (share2.height, share.hash,)
278                 else:
279                     print 'Accepted share to non-current chain. Height: %i Hash: %x' % (share2.height, share.hash,)
280             elif res == 'dup':
281                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
282             elif res == 'orphan':
283                 print 'Got share referencing unknown share, requesting past shares from peer. Hash: %x' % (share.hash,)
284                 if peer is None:
285                     raise ValueError()
286                 peer.send_gettobest(
287                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
288                     have=random.sample(chain.share2s.keys(), min(8, len(chain.share2s))) + [chain.share2s[chain.highest.value].share.hash] if chain.highest.value is not None else [],
289                 )
290             else:
291                 raise ValueError('unknown result from chain.accept - %r' % (res,))
292             
293             w = dict(current_work.value)
294             w['highest_p2pool_share2'] = w['current_chain'].get_highest_share2()
295             current_work.set(w)
296         
297         def p2p_share_hash(chain_id_data, hash, peer):
298             chain = get_chain(chain_id_data)
299             if chain is current_work.value['current_chain']:
300                 if hash not in chain.share2s:
301                     print "Got share hash, requesting! Hash: %x" % (hash,)
302                     peer.send_getshares(chain_id=p2pool.chain_id_type.unpack(chain_id_data), hashes=[hash])
303                 else:
304                     print "Got share hash, already have, ignoring. Hash: %x" % (hash,)
305             else:
306                 print "Got share hash to non-current chain, storing. Hash: %x" % (hash,)
307                 chain.request_map.setdefault(hash, []).append(peer)
308         
309         def p2p_get_to_best(chain_id_data, have, peer):
310             chain = get_chain(chain_id_data)
311             if chain.highest.value is None:
312                 return
313             
314             chain_hashes = chain.get_down(chain.highest.value)
315             
316             have2 = set()
317             for hash_ in have:
318                 have2 |= set(chain.get_down(hash_))
319             
320             for share_hash in reversed(chain_hashes):
321                 if share_hash in have2:
322                     continue
323                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
324         
325         def p2p_get_shares(chain_id_data, hashes, peer):
326             chain = get_chain(chain_id_data)
327             for hash_ in hashes:
328                 if hash_ in chain.share2s:
329                     peer.send_share(chain.share2s[hash_].share, full=True)
330         
331         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
332         
333         def parse(x):
334             if ':' in x:
335                 ip, port = x.split(':')
336                 return ip, int(port)
337             else:
338                 return x, {False: 9333, True: 19333}[args.testnet]
339         
340         if args.testnet:
341             nodes = [('72.14.191.28', 19333)]
342         else:
343             nodes = [('72.14.191.28', 9333)]
344         try:
345             nodes.append(((yield reactor.resolve('p2pool.forre.st')), {False: 9333, True: 19333}[args.testnet]))
346         except:
347             traceback.print_exc()
348         
349         p2p_node = p2p.Node(
350             current_work=current_work,
351             port=args.p2pool_port,
352             net=net,
353             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), net.ADDRS_TABLE),
354             mode=0 if args.low_bandwidth else 1,
355             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
356         )
357         p2p_node.handle_share = p2p_share
358         p2p_node.handle_share_hash = p2p_share_hash
359         p2p_node.handle_get_to_best = p2p_get_to_best
360         p2p_node.handle_get_shares = p2p_get_shares
361         
362         p2p_node.start()
363         
364         # send share when the chain changes to their chain
365         def work_changed(new_work):
366             #print 'Work changed:', new_work
367             chain = new_work['current_chain']
368             if chain.highest.value is not None:
369                 for share_hash in chain.get_down(chain.highest.value):
370                     share2 = chain.share2s[share_hash]
371                     if not share2.shared:
372                         print 'Sharing share of switched to chain. Hash:', share2.share.hash
373                         share_share2(share2)
374             for hash, peers in chain.request_map.iteritems():
375                 if hash not in chain.share2s:
376                     random.choice(peers).send_getshares(hashes=[hash])
377         current_work.changed.watch(work_changed)
378         
379         print '    ...success!'
380         print
381         
382         # start listening for workers with a JSON-RPC server
383         
384         print 'Listening for workers on port %i...' % (args.worker_port,)
385         
386         # setup worker logic
387         
388         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
389         
390         def compute(state):
391             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
392             generate_tx, shares = p2pool.generate_transaction(
393                 last_p2pool_block_hash=state['last_p2pool_block_hash'],
394                 previous_share2=state['highest_p2pool_share2'],
395                 new_script=my_script,
396                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
397                 nonce=struct.pack("<Q", random.randrange(2**64)),
398                 net=net,
399             )
400             print 'Generating, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
401             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
402             merkle_root = bitcoin.p2p.merkle_hash(transactions)
403             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
404             ba = conv.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
405             return ba.getwork(net.TARGET_MULTIPLIER)
406         
407         def got_response(data):
408             # match up with transactions
409             header = conv.decode_data(data)
410             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
411             if transactions is None:
412                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
413                 return False
414             share = p2pool.Share(header=header, txs=transactions)
415             print 'GOT SHARE! %x' % (share.hash,)
416             try:
417                 p2p_share(share)
418             except:
419                 print
420                 print 'Error processing data received from worker:'
421                 traceback.print_exc()
422                 print
423                 return False
424             else:
425                 return True
426         
427         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
428         
429         print '    ...success!'
430         print
431         
432         # done!
433         
434         def get_blocks(start_hash):
435             while True:
436                 try:
437                     block = get_block.call_now(start_hash)
438                 except util.NotNowError:
439                     break
440                 yield start_hash, block
441                 start_hash = block['header']['previous_block']
442         
443         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
444         
445         class Tx(object):
446             def __init__(self, tx, seen_at_block):
447                 self.hash = bitcoin.data.tx_hash(tx)
448                 self.tx = tx
449                 self.seen_at_block = seen_at_block
450                 self.mentions = set([bitcoin.data.tx_hash(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
451                 #print
452                 #print "%x %r" % (seen_at_block, tx)
453                 #for mention in self.mentions:
454                 #    print "%x" % mention
455                 #print
456                 self.parents_all_in_blocks = False
457                 self.value_in = 0
458                 #print self.tx
459                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
460                 self._find_parents_in_blocks()
461             
462             @defer.inlineCallbacks
463             def _find_parents_in_blocks(self):
464                 for tx_in in self.tx['tx_ins']:
465                     try:
466                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
467                     except Exception:
468                         return
469                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
470                     #print raw_transaction
471                     if not raw_transaction['parent_blocks']:
472                         return
473                 self.parents_all_in_blocks = True
474             
475             def is_good(self):
476                 if not self.parents_all_in_blocks:
477                     return False
478                 x = self.is_good2()
479                 #print "is_good:", x
480                 return x
481             
482             def is_good2(self):
483                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
484                     if block_hash == self.seen_at_block:
485                         return True
486                     for tx in block['txs']:
487                         mentions = set([bitcoin.data.tx_hash(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
488                         if mentions & self.mentions:
489                             return False
490                 return False
491         
492         def new_tx(tx):
493             tx_pool[bitcoin.data.tx_hash(tx)] = Tx(tx, current_work.value['previous_block'])
494         factory.new_tx.watch(new_tx)
495         
496         def new_block(block):
497             set_real_work()
498         factory.new_block.watch(new_block)
499         
500         print 'Started successfully!'
501         print
502     except:
503         print
504         print 'Fatal error:'
505         traceback.print_exc()
506         print
507         reactor.stop()
508
509 def run():
510     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
511     parser.add_argument('--version', action='version', version=__version__)
512     parser.add_argument('--testnet',
513         help='use the testnet; make sure you change the ports too',
514         action='store_true', default=False, dest='testnet')
515     parser.add_argument('--store-shares', metavar='FILENAME',
516         help='write shares to a database (not needed for normal usage)',
517         type=str, action='append', default=[], dest='store_shares')
518     
519     p2pool_group = parser.add_argument_group('p2pool interface')
520     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
521         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
522         type=int, action='store', default=None, dest='p2pool_port')
523     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
524         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
525         type=str, action='append', default=[], dest='p2pool_nodes')
526     parser.add_argument('-l', '--low-bandwidth',
527         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
528         action='store_true', default=False, dest='low_bandwidth')
529     
530     worker_group = parser.add_argument_group('worker interface')
531     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
532         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
533         type=int, action='store', default=9332, dest='worker_port')
534     
535     bitcoind_group = parser.add_argument_group('bitcoind interface')
536     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
537         help='connect to a bitcoind at this address (default: 127.0.0.1)',
538         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
539     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
540         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
541         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
542     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
543         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
544         type=int, action='store', default=None, dest='bitcoind_p2p_port')
545     
546     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
547         help='bitcoind RPC interface username',
548         type=str, action='store', dest='bitcoind_rpc_username')
549     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
550         help='bitcoind RPC interface password',
551         type=str, action='store', dest='bitcoind_rpc_password')
552     
553     args = parser.parse_args()
554     
555     if args.bitcoind_p2p_port is None:
556         args.bitcoind_p2p_port = {False: 8333, True: 18333}[args.testnet]
557     
558     if args.p2pool_port is None:
559         args.p2pool_port = {False: 9333, True: 19333}[args.testnet]
560     
561     reactor.callWhenRunning(main, args)
562     reactor.run()