typo
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import time
14
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
18
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23
24 prev = os.getcwd()
25 os.chdir(os.path.abspath(os.path.dirname(sys.argv[0])))
26 try:
27     __version__ = subprocess.Popen(['git', 'describe', '--always'], stdout=subprocess.PIPE).stdout.read().strip()
28 except:
29     __version__ = 'unknown'
30 os.chdir(prev)
31
32 @deferral.retry('Error getting work from bitcoind:', 3)
33 @defer.inlineCallbacks
34 def getwork(bitcoind):
35     # a block could arrive in between these two queries
36     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
37     try:
38         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
39     finally:
40         # get rid of residual errors
41         getwork_df.addErrback(lambda fail: None)
42         height_df.addErrback(lambda fail: None)
43     defer.returnValue((getwork, height))
44
45 @deferral.retry('Error getting payout script from bitcoind:', 1)
46 @defer.inlineCallbacks
47 def get_payout_script(factory):
48     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
49     if res['reply'] == 'success':
50         my_script = res['script']
51     elif res['reply'] == 'denied':
52         my_script = None
53     else:
54         raise ValueError('Unexpected reply: %r' % (res,))
55
56 @deferral.retry('Error creating payout script:', 10)
57 @defer.inlineCallbacks
58 def get_payout_script2(bitcoind, net):
59     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
60
61 @defer.inlineCallbacks
62 def main(args):
63     try:
64         print 'p2pool (version %s)' % (__version__,)
65         print
66         
67         # connect to bitcoind over JSON-RPC and do initial getwork
68         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
69         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
70         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
71         temp_work, temp_height = yield getwork(bitcoind)
72         print '    ...success!'
73         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
74         print
75         
76         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
77         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
78         factory = bitcoin.p2p.ClientFactory(args.net)
79         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
80         my_script = yield get_payout_script(factory)
81         if args.pubkey_hash is None:
82             if my_script is None:
83                 print 'IP transaction denied ... falling back to sending to address.'
84                 my_script = yield get_payout_script2(bitcoind, args.net)
85         else:
86             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
87         print '    ...success!'
88         print '    Payout script:', my_script.encode('hex')
89         print
90         
91         @defer.inlineCallbacks
92         def real_get_block(block_hash):
93             block = yield (yield factory.getProtocol()).get_block(block_hash)
94             print 'Got block %x' % (block_hash,)
95             defer.returnValue(block)
96         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
97         
98         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
99         
100         ht = bitcoin.p2p.HeightTracker(factory)
101         
102         tracker = p2pool.OkayTracker(args.net)
103         chains = expiring_dict.ExpiringDict(300)
104         def get_chain(chain_id_data):
105             return chains.setdefault(chain_id_data, Chain(chain_id_data))
106         
107         # information affecting work that should trigger a long-polling update
108         current_work = variable.Variable(None)
109         # information affecting work that should not trigger a long-polling update
110         current_work2 = variable.Variable(None)
111         
112         requested = set()
113         task.LoopingCall(requested.clear).start(60)
114         
115         @defer.inlineCallbacks
116         def set_real_work1():
117             work, height = yield getwork(bitcoind)
118             current_work.set(dict(
119                 version=work.version,
120                 previous_block=work.previous_block,
121                 target=work.target,
122                 height=height,
123                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
124             ))
125             current_work2.set(dict(
126                 time=work.timestamp,
127             ))
128         
129         def set_real_work2():
130             best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
131             
132             t = dict(current_work.value)
133             t['best_share_hash'] = best
134             current_work.set(t)
135             
136             for peer2, share_hash in desired:
137                 if peer2 is None:
138                     continue
139                 if (peer2.nonce, share_hash) in requested:
140                     continue
141                 print 'Requesting parent share %x' % (share_hash,)
142                 peer2.send_getshares(
143                     hashes=[share_hash],
144                     parents=2000,
145                     stops=list(set(tracker.heads) | set(
146                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
147                     )),
148                 )
149                 requested.add((peer2.nonce, share_hash))
150         
151         print 'Initializing work...'
152         yield set_real_work1()
153         yield set_real_work2()
154         print '    ...success!'
155         
156         # setup p2p logic and join p2pool network
157         
158         def share_share(share, ignore_peer=None):
159             for peer in p2p_node.peers.itervalues():
160                 if peer is ignore_peer:
161                     continue
162                 peer.send_shares([share])
163             share.flag_shared()
164         
165         def p2p_shares(shares, peer=None):
166             for share in shares:
167                 if share.hash in tracker.shares:
168                     print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
169                     continue
170                 
171                 #print 'Received share %x from %r' % (share.hash, share.peer.transport.getPeer() if share.peer is not None else None)
172                 
173                 tracker.add(share)
174                 #for peer2, share_hash in desired:
175                 #    print 'Requesting parent share %x' % (share_hash,)
176                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
177                 
178                 if share.gentx is not None:
179                     if share.bitcoin_hash <= share.header['target']:
180                         print
181                         print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash, share.bitcoin_hash,)
182                         print
183                         if factory.conn.value is not None:
184                             factory.conn.value.send_block(block=share.as_block())
185                         else:
186                             print 'No bitcoind connection! Erp!'
187             
188             best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
189             
190             if best == share.hash:
191                 print 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash,)
192             else:
193                 print 'Accepted share, not best. Hash: %x' % (share.hash,)
194             
195             w = dict(current_work.value)
196             w['best_share_hash'] = best
197             current_work.set(w)
198         
199         def p2p_share_hashes(share_hashes, peer):
200             get_hashes = []
201             for share_hash in share_hashes:
202                 if share_hash in tracker.shares:
203                     print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
204                 else:
205                     print 'Got share hash, requesting! Hash: %x' % (share_hash,)
206                     get_hashes.append(share_hash)
207             if get_hashes:
208                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
209         
210         def p2p_get_shares(share_hashes, parents, stops, peer):
211             parents = min(parents, 1000//len(share_hashes))
212             stops = set(stops)
213             shares = []
214             for share_hash in share_hashes:
215                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
216                     if share.hash in stops:
217                         break
218                     shares.append(share)
219             peer.send_shares(shares, full=True)
220         
221         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
222         
223         def parse(x):
224             if ':' in x:
225                 ip, port = x.split(':')
226                 return ip, int(port)
227             else:
228                 return x, args.net.P2P_PORT
229         
230         nodes = [
231             ('72.14.191.28', args.net.P2P_PORT),
232             ('62.204.197.159', args.net.P2P_PORT),
233         ]
234         try:
235             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
236         except:
237             print
238             print 'Error resolving bootstrap node IP:'
239             log.err()
240             print
241         
242         p2p_node = p2p.Node(
243             current_work=current_work,
244             port=args.p2pool_port,
245             net=args.net,
246             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
247             mode=0 if args.low_bandwidth else 1,
248             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
249         )
250         p2p_node.handle_shares = p2p_shares
251         p2p_node.handle_share_hashes = p2p_share_hashes
252         p2p_node.handle_get_shares = p2p_get_shares
253         
254         p2p_node.start()
255         
256         # send share when the chain changes to their chain
257         def work_changed(new_work):
258             #print 'Work changed:', new_work
259             for share in tracker.get_chain_known(new_work['best_share_hash']):
260                 if share.shared:
261                     break
262                 share_share(share, share.peer)
263         current_work.changed.watch(work_changed)
264         
265         print '    ...success!'
266         print
267         
268         # start listening for workers with a JSON-RPC server
269         
270         print 'Listening for workers on port %i...' % (args.worker_port,)
271         
272         # setup worker logic
273         
274         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
275         
276         def compute(state, all_targets):
277             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
278             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
279             extra_txs = []
280             size = 0
281             for tx in pre_extra_txs:
282                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
283                 if size + this_size > 500000:
284                     break
285                 extra_txs.append(tx)
286                 size += this_size
287             # XXX check sigops!
288             # XXX assuming generate_tx is smallish here..
289             generate_tx = p2pool.generate_transaction(
290                 tracker=tracker,
291                 previous_share_hash=state['best_share_hash'],
292                 new_script=my_script,
293                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
294                 nonce=struct.pack('<Q', random.randrange(2**64)),
295                 block_target=state['target'],
296                 net=args.net,
297             )
298             print 'Generating!'
299             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
300             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
301             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
302             merkle_root = bitcoin.data.merkle_hash(transactions)
303             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
304             
305             timestamp = current_work2.value['time']
306             if state['best_share_hash'] is not None:
307                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
308                 if timestamp2 > timestamp:
309                     print 'Toff', timestamp2 - timestamp
310                     timestamp = timestamp2
311             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
312             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
313             target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
314             if not all_targets:
315                 target = min(2**256//2**32 - 1, target)
316             return ba.getwork(target)
317         
318         def got_response(data):
319             try:
320                 # match up with transactions
321                 header = bitcoin.getwork.decode_data(data)
322                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
323                 if transactions is None:
324                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
325                     return False
326                 block = dict(header=header, txs=transactions)
327                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
328                 if hash_ <= block['header']['target']:
329                     print
330                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
331                     print
332                     if factory.conn.value is not None:
333                         factory.conn.value.send_block(block=block)
334                     else:
335                         print 'No bitcoind connection! Erp!'
336                 share = p2pool.Share.from_block(block)
337                 print 'GOT SHARE! %x' % (share.hash,)
338                 p2p_shares([share])
339             except:
340                 print
341                 print 'Error processing data received from worker:'
342                 log.err()
343                 print
344                 return False
345             else:
346                 return True
347         
348         def get_rate():
349             if current_work.value['best_share_hash'] is not None:
350                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
351                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
352                 return att_s
353         
354         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
355         
356         print '    ...success!'
357         print
358         
359         # done!
360         
361         def get_blocks(start_hash):
362             while True:
363                 try:
364                     block = get_block.call_now(start_hash)
365                 except deferral.NotNowError:
366                     break
367                 yield start_hash, block
368                 start_hash = block['header']['previous_block']
369         
370         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
371         
372         class Tx(object):
373             def __init__(self, tx, seen_at_block):
374                 self.hash = bitcoin.data.tx_type.hash256(tx)
375                 self.tx = tx
376                 self.seen_at_block = seen_at_block
377                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
378                 #print
379                 #print '%x %r' % (seen_at_block, tx)
380                 #for mention in self.mentions:
381                 #    print '%x' % mention
382                 #print
383                 self.parents_all_in_blocks = False
384                 self.value_in = 0
385                 #print self.tx
386                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
387                 self._find_parents_in_blocks()
388             
389             @defer.inlineCallbacks
390             def _find_parents_in_blocks(self):
391                 for tx_in in self.tx['tx_ins']:
392                     try:
393                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
394                     except Exception:
395                         return
396                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
397                     #print raw_transaction
398                     if not raw_transaction['parent_blocks']:
399                         return
400                 self.parents_all_in_blocks = True
401             
402             def is_good(self):
403                 if not self.parents_all_in_blocks:
404                     return False
405                 x = self.is_good2()
406                 #print 'is_good:', x
407                 return x
408             
409             def is_good2(self):
410                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
411                     if block_hash == self.seen_at_block:
412                         return True
413                     for tx in block['txs']:
414                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
415                         if mentions & self.mentions:
416                             return False
417                 return False
418         
419         @defer.inlineCallbacks
420         def new_tx(tx_hash):
421             try:
422                 assert isinstance(tx_hash, (int, long))
423                 #print "REQUESTING", tx_hash
424                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
425                 #print "GOT", tx
426                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
427             except:
428                 print
429                 print 'Error handling tx:'
430                 log.err()
431                 print
432         factory.new_tx.watch(new_tx)
433         
434         @defer.inlineCallbacks
435         def new_block(block):
436             yield set_real_work1()
437             set_real_work2()
438         factory.new_block.watch(new_block)
439         
440         print 'Started successfully!'
441         print
442         
443         @defer.inlineCallbacks
444         def work1_thread():
445             while True:
446                 yield deferral.sleep(random.expovariate(1/1))
447                 try:
448                     yield set_real_work1()
449                 except:
450                     log.err()
451         
452         
453         @defer.inlineCallbacks
454         def work2_thread():
455             while True:
456                 yield deferral.sleep(random.expovariate(1/1))
457                 try:
458                     yield set_real_work2()
459                 except:
460                     log.err()
461         
462         work1_thread()
463         work2_thread()
464         
465         while True:
466             yield deferral.sleep(random.expovariate(1/1))
467             try:
468                 if current_work.value['best_share_hash'] is not None:
469                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
470                     att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
471                     if height > 5:
472                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
473                         print 'Pool rate: %i mhash/s %i shares Contribution: %.02f%% >%i mhash/s' % (
474                             att_s//1000000,
475                             height,
476                             weights.get(my_script, 0)/total_weight*100,
477                             weights.get(my_script, 0)/total_weight*att_s//1000000,
478                         )
479             except:
480                 log.err()
481     except:
482         print
483         print 'Fatal error:'
484         log.err()
485         print
486         reactor.stop()
487
488 def run():
489     if __debug__:
490         defer.setDebugging(True)
491     
492     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
493     parser.add_argument('--version', action='version', version=__version__)
494     parser.add_argument('--testnet',
495         help='use the testnet',
496         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
497     parser.add_argument('-a', '--address',
498         help='generate to this address (defaults to requesting one from bitcoind)',
499         type=str, action='store', default=None, dest='address')
500     
501     p2pool_group = parser.add_argument_group('p2pool interface')
502     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
503         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
504         type=int, action='store', default=None, dest='p2pool_port')
505     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
506         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
507         type=str, action='append', default=[], dest='p2pool_nodes')
508     parser.add_argument('-l', '--low-bandwidth',
509         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
510         action='store_true', default=False, dest='low_bandwidth')
511     
512     worker_group = parser.add_argument_group('worker interface')
513     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
514         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
515         type=int, action='store', default=9332, dest='worker_port')
516     
517     bitcoind_group = parser.add_argument_group('bitcoind interface')
518     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
519         help='connect to a bitcoind at this address (default: 127.0.0.1)',
520         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
521     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
522         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
523         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
524     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
525         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
526         type=int, action='store', default=None, dest='bitcoind_p2p_port')
527     
528     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
529         help='bitcoind RPC interface username',
530         type=str, action='store', dest='bitcoind_rpc_username')
531     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
532         help='bitcoind RPC interface password',
533         type=str, action='store', dest='bitcoind_rpc_password')
534     
535     args = parser.parse_args()
536     
537     if args.bitcoind_p2p_port is None:
538         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
539     
540     if args.p2pool_port is None:
541         args.p2pool_port = args.net.P2P_PORT
542     
543     if args.address is not None:
544         try:
545             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
546         except Exception, e:
547             raise ValueError("error parsing address: " + repr(e))
548     else:
549         args.pubkey_hash = None
550     
551     reactor.callWhenRunning(main, args)
552     reactor.run()