somewhat working
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import traceback
14
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 class Chain(object):
29     def __init__(self, chain_id_data):
30         assert False
31         self.chain_id_data = chain_id_data
32         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
33         
34         self.share2s = {} # hash -> share2
35         self.highest = variable.Variable(None) # hash
36         
37         self.requesting = set()
38         self.request_map = {}
39     
40     def accept(self, share, net):
41         if share.chain_id_data != self.chain_id_data:
42             raise ValueError('share does not belong to this chain')
43         
44         if share.hash in self.share2s:
45             return 'dup'
46         
47         if share.previous_share_hash is None:
48             previous_height, previous_share2 = -1, None
49         elif share.previous_share_hash not in self.share2s:
50             return 'orphan'
51         else:
52             previous_share2 = self.share2s[share.previous_share_hash]
53             previous_height = previous_share2.height
54         
55         height = previous_height + 1
56         
57         share2 = share.check(self, height, previous_share2, net) # raises exceptions
58         
59         if share2.share is not share:
60             raise ValueError()
61         
62         self.share2s[share.hash] = share2
63         
64         if self.highest.value is None or height > self.share2s[self.highest.value].height:
65             self.highest.set(share.hash)
66         
67         return 'good'
68     
69     def get_highest_share2(self):
70         return self.share2s[self.highest.value] if self.highest.value is not None else None
71     
72     def get_down(self, share_hash):
73         blocks = []
74         
75         while True:
76             blocks.append(share_hash)
77             if share_hash not in self.share2s:
78                 break
79             share2 = self.share2s[share_hash]
80             if share2.share.previous_share_hash is None:
81                 break
82             share_hash = share2.share.previous_share_hash
83         
84         return blocks
85
86 @defer.inlineCallbacks
87 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
88     block_hash = current_block_hash
89     while True:
90         if block_hash == net.ROOT_BLOCK:
91             defer.returnValue(block_hash)
92         try:
93             block = yield get_block(block_hash)
94         except:
95             traceback.print_exc()
96             continue
97         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
98         try:
99             coinbase = p2pool.coinbase_type.unpack(coinbase_data)
100         except bitcoin.data.EarlyEnd:
101             pass
102         else:
103             try:
104                 if coinbase['identifier'] == net.IDENTIFIER:
105                     payouts = {}
106                     for tx_out in block['txs'][0]['tx_outs']:
107                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
108                     subsidy = sum(payouts.itervalues())
109                     if coinbase['subsidy'] == subsidy:
110                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
111                             defer.returnValue(block_hash)
112             except Exception:
113                 print
114                 print 'Error matching block:'
115                 print 'block:', block
116                 traceback.print_exc()
117                 print
118         block_hash = block['header']['previous_block']
119
120 @deferral.retry('Error getting work from bitcoind:', 1)
121 @defer.inlineCallbacks
122 def getwork(bitcoind):
123     # a block could arrive in between these two queries
124     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
125     try:
126         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
127     finally:
128         # get rid of residual errors
129         getwork_df.addErrback(lambda fail: None)
130         height_df.addErrback(lambda fail: None)
131     defer.returnValue((getwork, height))
132
133 @defer.inlineCallbacks
134 def get_payout_script(factory):
135         while True:
136             try:
137                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
138                 if res['reply'] != 'success':
139                     print
140                     print 'Error getting payout script:'
141                     print res
142                     print
143                     continue
144                 my_script = res['script']
145             except Exception:
146                 print
147                 print 'Error getting payout script:'
148                 traceback.print_exc()
149                 print
150             else:
151                 defer.returnValue(my_script)
152             yield deferral.sleep(1)
153
154 @defer.inlineCallbacks
155 def main(args):
156     try:
157         print 'p2pool (version %s)' % (__version__,)
158         print
159         
160         # connect to bitcoind over JSON-RPC and do initial getwork
161         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
162         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
163         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
164         work, height = yield getwork(bitcoind)
165         print '    ...success!'
166         print '    Current block hash: %x height: %i' % (work.previous_block, height)
167         print
168         
169         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
170         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
171         factory = bitcoin.p2p.ClientFactory(args.net)
172         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
173         my_script = yield get_payout_script(factory)
174         print '    ...success!'
175         print '    Payout script:', my_script.encode('hex')
176         print
177         
178         @defer.inlineCallbacks
179         def real_get_block(block_hash):
180             block = yield (yield factory.getProtocol()).get_block(block_hash)
181             print 'Got block %x' % (block_hash,)
182             defer.returnValue(block)
183         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
184         
185         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
186         
187         ht = bitcoin.p2p.HeightTracker(factory)
188         
189         tracker = p2pool.OkayTracker(args.net)
190         chains = expiring_dict.ExpiringDict(300)
191         def get_chain(chain_id_data):
192             return chains.setdefault(chain_id_data, Chain(chain_id_data))
193         
194         # information affecting work that should trigger a long-polling update
195         current_work = variable.Variable(None)
196         # information affecting work that should not trigger a long-polling update
197         current_work2 = variable.Variable(None)
198         
199         @defer.inlineCallbacks
200         def set_real_work():
201             work, height = yield getwork(bitcoind)
202             best, desired = tracker.think(ht)
203             # XXX desired?
204             current_work.set(dict(
205                 version=work.version,
206                 previous_block=work.previous_block,
207                 target=work.target,
208                 
209                 height=height + 1,
210                 
211                 best_share_hash=best,
212             ))
213             current_work2.set(dict(
214                 timestamp=work.timestamp,
215             ))
216         
217         print 'Initializing work...'
218         yield set_real_work()
219         print '    ...success!'
220         
221         # setup p2p logic and join p2pool network
222         
223         def share_share(share, ignore_peer=None):
224             for peer in p2p_node.peers.itervalues():
225                 if peer is ignore_peer:
226                     continue
227                 peer.send_share(share)
228             share.flag_shared()
229         
230         def p2p_share(share, peer=None):
231             if share.hash in tracker.shares:
232                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
233                 return
234             
235             #print "Received share %x" % (share.hash,)
236             
237             tracker.add(share)
238             best, desired = tracker.think(ht)
239             for peer2, share_hash in desired:
240                 print "Requesting parent share %x" % (share_hash,)
241                 peer2.send_getshares(hashes=[share_hash])
242             
243             if share.gentx is not None:
244                 if share.hash <= share.header['target']:
245                     print
246                     print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
247                     print
248                     if factory.conn.value is not None:
249                         factory.conn.value.send_block(block=share.as_block())
250                     else:
251                         print 'No bitcoind connection! Erp!'
252             
253             w = dict(current_work.value)
254             w['best_share_hash'] = best
255             current_work.set(w)
256             
257             if best == share.hash:
258                 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
259             else:
260                 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
261         
262         def p2p_share_hash(share_hash, peer):
263             if share_hash in tracker.shares:
264                 print "Got share hash, already have, ignoring. Hash: %x" % (share_hash,)
265             else:
266                 print "Got share hash, requesting! Hash: %x" % (share_hash,)
267                 peer.send_getshares(hashes=[share_hash])
268         
269         def p2p_get_to_best(chain_id_data, have, peer):
270             chain = get_chain(chain_id_data)
271             if chain.highest.value is None:
272                 return
273             
274             chain_hashes = chain.get_down(chain.highest.value)
275             
276             have2 = set()
277             for hash_ in have:
278                 have2 |= set(chain.get_down(hash_))
279             
280             for share_hash in reversed(chain_hashes):
281                 if share_hash in have2:
282                     continue
283                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
284         
285         def p2p_get_shares(share_hashes, peer):
286             for share_hash in share_hashes:
287                 if share_hash in tracker.shares:
288                     peer.send_share(tracker.shares[share_hash], full=True)
289         
290         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
291         
292         def parse(x):
293             if ':' in x:
294                 ip, port = x.split(':')
295                 return ip, int(port)
296             else:
297                 return x, args.net.P2P_PORT
298         
299         nodes = [('72.14.191.28', args.net.P2P_PORT)]
300         try:
301             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
302         except:
303             traceback.print_exc()
304         
305         p2p_node = p2p.Node(
306             current_work=current_work,
307             port=args.p2pool_port,
308             net=args.net,
309             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
310             mode=0 if args.low_bandwidth else 1,
311             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
312         )
313         p2p_node.handle_share = p2p_share
314         p2p_node.handle_share_hash = p2p_share_hash
315         p2p_node.handle_get_to_best = p2p_get_to_best
316         p2p_node.handle_get_shares = p2p_get_shares
317         
318         p2p_node.start()
319         
320         # send share when the chain changes to their chain
321         def work_changed(new_work):
322             #print 'Work changed:', new_work
323             for share in tracker.get_chain_known(new_work['best_share_hash']):
324                 if share.shared:
325                     break
326                 share_share(share, share.peer)
327         current_work.changed.watch(work_changed)
328         
329         print '    ...success!'
330         print
331         
332         # start listening for workers with a JSON-RPC server
333         
334         print 'Listening for workers on port %i...' % (args.worker_port,)
335         
336         # setup worker logic
337         
338         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
339         
340         def compute(state):
341             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
342             generate_tx = p2pool.generate_transaction(
343                 tracker=tracker,
344                 previous_share_hash=state['best_share_hash'],
345                 new_script=my_script,
346                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
347                 nonce=struct.pack("<Q", random.randrange(2**64)),
348                 block_target=state['target'],
349                 net=args.net,
350             )
351             print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
352             print "Target: %x" % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
353             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
354             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
355             merkle_root = bitcoin.data.merkle_hash(transactions)
356             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
357             
358             timestamp = current_work2.value['timestamp']
359             if state['best_share_hash'] is not None:
360                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
361                 if timestamp2 > timestamp:
362                     print "Toff", timestamp2 - timestamp
363                     timestamp = timestamp2
364             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
365             #print "SENT", 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
366             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
367         
368         def got_response(data):
369             # match up with transactions
370             header = bitcoin.getwork.decode_data(data)
371             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
372             if transactions is None:
373                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
374                 return False
375             block = dict(header=header, txs=transactions)
376             hash_ = bitcoin.data.block_header_type.hash256(block['header'])
377             if hash_ <= block['header']['target']:
378                 print
379                 print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
380                 print
381                 if factory.conn.value is not None:
382                     factory.conn.value.send_block(block=block)
383                 else:
384                     print 'No bitcoind connection! Erp!'
385             share = p2pool.Share.from_block(block)
386             print 'GOT SHARE! %x' % (share.hash,)
387             try:
388                 p2p_share(share)
389             except:
390                 print
391                 print 'Error processing data received from worker:'
392                 traceback.print_exc()
393                 print
394                 return False
395             else:
396                 return True
397         
398         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
399         
400         print '    ...success!'
401         print
402         
403         # done!
404         
405         def get_blocks(start_hash):
406             while True:
407                 try:
408                     block = get_block.call_now(start_hash)
409                 except deferral.NotNowError:
410                     break
411                 yield start_hash, block
412                 start_hash = block['header']['previous_block']
413         
414         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
415         
416         class Tx(object):
417             def __init__(self, tx, seen_at_block):
418                 self.hash = bitcoin.data.tx_type.hash256(tx)
419                 self.tx = tx
420                 self.seen_at_block = seen_at_block
421                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
422                 #print
423                 #print "%x %r" % (seen_at_block, tx)
424                 #for mention in self.mentions:
425                 #    print "%x" % mention
426                 #print
427                 self.parents_all_in_blocks = False
428                 self.value_in = 0
429                 #print self.tx
430                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
431                 self._find_parents_in_blocks()
432             
433             @defer.inlineCallbacks
434             def _find_parents_in_blocks(self):
435                 for tx_in in self.tx['tx_ins']:
436                     try:
437                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
438                     except Exception:
439                         return
440                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
441                     #print raw_transaction
442                     if not raw_transaction['parent_blocks']:
443                         return
444                 self.parents_all_in_blocks = True
445             
446             def is_good(self):
447                 if not self.parents_all_in_blocks:
448                     return False
449                 x = self.is_good2()
450                 #print "is_good:", x
451                 return x
452             
453             def is_good2(self):
454                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
455                     if block_hash == self.seen_at_block:
456                         return True
457                     for tx in block['txs']:
458                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
459                         if mentions & self.mentions:
460                             return False
461                 return False
462         
463         @defer.inlineCallbacks
464         def new_tx(tx_hash):
465             try:
466                 assert isinstance(tx_hash, (int, long))
467                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
468                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
469             except:
470                 traceback.print_exc()
471         factory.new_tx.watch(new_tx)
472         
473         def new_block(block):
474             set_real_work()
475         factory.new_block.watch(new_block)
476         
477         print 'Started successfully!'
478         print
479         
480         while True:
481             yield deferral.sleep(1)
482             set_real_work()
483     except:
484         print
485         print 'Fatal error:'
486         traceback.print_exc()
487         print
488         reactor.stop()
489
490 def run():
491     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
492     parser.add_argument('--version', action='version', version=__version__)
493     parser.add_argument('--testnet',
494         help='use the testnet; make sure you change the ports too',
495         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
496     
497     p2pool_group = parser.add_argument_group('p2pool interface')
498     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
499         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
500         type=int, action='store', default=None, dest='p2pool_port')
501     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
502         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
503         type=str, action='append', default=[], dest='p2pool_nodes')
504     parser.add_argument('-l', '--low-bandwidth',
505         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
506         action='store_true', default=False, dest='low_bandwidth')
507     
508     worker_group = parser.add_argument_group('worker interface')
509     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
510         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
511         type=int, action='store', default=9332, dest='worker_port')
512     
513     bitcoind_group = parser.add_argument_group('bitcoind interface')
514     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
515         help='connect to a bitcoind at this address (default: 127.0.0.1)',
516         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
517     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
518         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
519         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
520     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
521         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
522         type=int, action='store', default=None, dest='bitcoind_p2p_port')
523     
524     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
525         help='bitcoind RPC interface username',
526         type=str, action='store', dest='bitcoind_rpc_username')
527     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
528         help='bitcoind RPC interface password',
529         type=str, action='store', dest='bitcoind_rpc_password')
530     
531     args = parser.parse_args()
532     
533     if args.bitcoind_p2p_port is None:
534         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
535     
536     if args.p2pool_port is None:
537         args.p2pool_port = args.net.P2P_PORT
538     
539     reactor.callWhenRunning(main, args)
540     reactor.run()