messages
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import time
14
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
18
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
24
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28     # a block could arrive in between these two queries
29     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30     try:
31         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32     finally:
33         # get rid of residual errors
34         getwork_df.addErrback(lambda fail: None)
35         height_df.addErrback(lambda fail: None)
36     defer.returnValue((getwork, height))
37
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42     if res['reply'] == 'success':
43         my_script = res['script']
44     elif res['reply'] == 'denied':
45         my_script = None
46     else:
47         raise ValueError('Unexpected reply: %r' % (res,))
48
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53
54 @defer.inlineCallbacks
55 def main(args):
56     try:
57         print 'p2pool (version %s)' % (p2pool_init.__version__,)
58         print
59         
60         # connect to bitcoind over JSON-RPC and do initial getwork
61         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
63         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64         temp_work, temp_height = yield getwork(bitcoind)
65         print '    ...success!'
66         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
67         print
68         
69         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71         factory = bitcoin.p2p.ClientFactory(args.net)
72         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73         my_script = yield get_payout_script(factory)
74         if args.pubkey_hash is None:
75             if my_script is None:
76                 print 'IP transaction denied ... falling back to sending to address.'
77                 my_script = yield get_payout_script2(bitcoind, args.net)
78         else:
79             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         @defer.inlineCallbacks
85         def real_get_block(block_hash):
86             block = yield (yield factory.getProtocol()).get_block(block_hash)
87             print 'Got block %x' % (block_hash,)
88             defer.returnValue(block)
89         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90         
91         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92         
93         ht = bitcoin.p2p.HeightTracker(factory)
94         
95         tracker = p2pool.OkayTracker(args.net)
96         chains = expiring_dict.ExpiringDict(300)
97         def get_chain(chain_id_data):
98             return chains.setdefault(chain_id_data, Chain(chain_id_data))
99         
100         # information affecting work that should trigger a long-polling update
101         current_work = variable.Variable(None)
102         # information affecting work that should not trigger a long-polling update
103         current_work2 = variable.Variable(None)
104         
105         requested = set()
106         task.LoopingCall(requested.clear).start(60)
107         
108         @defer.inlineCallbacks
109         def set_real_work1():
110             work, height = yield getwork(bitcoind)
111             current_work.set(dict(
112                 version=work.version,
113                 previous_block=work.previous_block,
114                 target=work.target,
115                 height=height,
116                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
117             ))
118             current_work2.set(dict(
119                 time=work.timestamp,
120             ))
121         
122         def set_real_work2():
123             best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
124             
125             t = dict(current_work.value)
126             t['best_share_hash'] = best
127             current_work.set(t)
128             
129             for peer2, share_hash in desired:
130                 if peer2 is None:
131                     continue
132                 if (peer2.nonce, share_hash) in requested:
133                     continue
134                 print 'Requesting parent share %x' % (share_hash,)
135                 peer2.send_getshares(
136                     hashes=[share_hash],
137                     parents=2000,
138                     stops=list(set(tracker.heads) | set(
139                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
140                     )),
141                 )
142                 requested.add((peer2.nonce, share_hash))
143         
144         print 'Initializing work...'
145         yield set_real_work1()
146         yield set_real_work2()
147         print '    ...success!'
148         
149         # setup p2p logic and join p2pool network
150         
151         def share_share(share, ignore_peer=None):
152             for peer in p2p_node.peers.itervalues():
153                 if peer is ignore_peer:
154                     continue
155                 peer.send_shares([share])
156             share.flag_shared()
157         
158         def p2p_shares(shares, peer=None):
159             for share in shares:
160                 if share.hash in tracker.shares:
161                     print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
162                     continue
163                 
164                 #print 'Received share %x from %r' % (share.hash, share.peer.transport.getPeer() if share.peer is not None else None)
165                 
166                 tracker.add(share)
167                 #for peer2, share_hash in desired:
168                 #    print 'Requesting parent share %x' % (share_hash,)
169                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
170                 
171                 if share.gentx is not None:
172                     if share.bitcoin_hash <= share.header['target']:
173                         print
174                         print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash, share.bitcoin_hash,)
175                         print
176                         if factory.conn.value is not None:
177                             factory.conn.value.send_block(block=share.as_block())
178                         else:
179                             print 'No bitcoind connection! Erp!'
180             
181             if shares:
182                 share = shares[0]
183                 
184                 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
185                 
186                 if best == share.hash:
187                     print ('MINE> ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash,)
188                 else:
189                     print ('MINE> ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash,)
190                 
191                 w = dict(current_work.value)
192                 w['best_share_hash'] = best
193                 current_work.set(w)
194         
195         def p2p_share_hashes(share_hashes, peer):
196             get_hashes = []
197             for share_hash in share_hashes:
198                 if share_hash in tracker.shares:
199                     pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
200                 else:
201                     print 'Got share hash, requesting! Hash: %x' % (share_hash,)
202                     get_hashes.append(share_hash)
203             if get_hashes:
204                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
205         
206         def p2p_get_shares(share_hashes, parents, stops, peer):
207             parents = min(parents, 1000//len(share_hashes))
208             stops = set(stops)
209             shares = []
210             for share_hash in share_hashes:
211                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
212                     if share.hash in stops:
213                         break
214                     shares.append(share)
215             peer.send_shares(shares, full=True)
216         
217         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
218         
219         def parse(x):
220             if ':' in x:
221                 ip, port = x.split(':')
222                 return ip, int(port)
223             else:
224                 return x, args.net.P2P_PORT
225         
226         nodes = [
227             ('72.14.191.28', args.net.P2P_PORT),
228             ('62.204.197.159', args.net.P2P_PORT),
229         ]
230         try:
231             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
232         except:
233             print
234             print 'Error resolving bootstrap node IP:'
235             log.err()
236             print
237         
238         p2p_node = p2p.Node(
239             current_work=current_work,
240             port=args.p2pool_port,
241             net=args.net,
242             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
243             mode=0 if args.low_bandwidth else 1,
244             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
245         )
246         p2p_node.handle_shares = p2p_shares
247         p2p_node.handle_share_hashes = p2p_share_hashes
248         p2p_node.handle_get_shares = p2p_get_shares
249         
250         p2p_node.start()
251         
252         # send share when the chain changes to their chain
253         def work_changed(new_work):
254             #print 'Work changed:', new_work
255             for share in tracker.get_chain_known(new_work['best_share_hash']):
256                 if share.shared:
257                     break
258                 share_share(share, share.peer)
259         current_work.changed.watch(work_changed)
260         
261         print '    ...success!'
262         print
263         
264         # start listening for workers with a JSON-RPC server
265         
266         print 'Listening for workers on port %i...' % (args.worker_port,)
267         
268         # setup worker logic
269         
270         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
271         run_identifier = struct.pack('<Q', random.randrange(2**64))
272         
273         def compute(state, all_targets):
274             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
275             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
276             extra_txs = []
277             size = 0
278             for tx in pre_extra_txs:
279                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
280                 if size + this_size > 500000:
281                     break
282                 extra_txs.append(tx)
283                 size += this_size
284             # XXX check sigops!
285             # XXX assuming generate_tx is smallish here..
286             generate_tx = p2pool.generate_transaction(
287                 tracker=tracker,
288                 previous_share_hash=state['best_share_hash'],
289                 new_script=my_script,
290                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
291                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
292                 block_target=state['target'],
293                 net=args.net,
294             )
295             print 'Generating!'
296             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
297             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
298             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
299             merkle_root = bitcoin.data.merkle_hash(transactions)
300             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
301             
302             timestamp = current_work2.value['time']
303             if state['best_share_hash'] is not None:
304                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
305                 if timestamp2 > timestamp:
306                     print 'Toff', timestamp2 - timestamp
307                     timestamp = timestamp2
308             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
309             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
310             target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
311             if not all_targets:
312                 target = min(2**256//2**32 - 1, target)
313             return ba.getwork(target)
314         
315         my_shares = set()
316         
317         def got_response(data):
318             try:
319                 # match up with transactions
320                 header = bitcoin.getwork.decode_data(data)
321                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
322                 if transactions is None:
323                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
324                     return False
325                 block = dict(header=header, txs=transactions)
326                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
327                 if hash_ <= block['header']['target']:
328                     print
329                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
330                     print
331                     if factory.conn.value is not None:
332                         factory.conn.value.send_block(block=block)
333                     else:
334                         print 'No bitcoind connection! Erp!'
335                 share = p2pool.Share.from_block(block)
336                 my_shares.add(share.hash)
337                 #print 'GOT SHARE! %x' % (share.hash,), "DEAD ON ARRIVAL" if share.previous_hash != current_work['best_share_hash'] else ""
338                 p2p_shares([share])
339             except:
340                 print
341                 print 'Error processing data received from worker:'
342                 log.err()
343                 print
344                 return False
345             else:
346                 return True
347         
348         def get_rate():
349             if current_work.value['best_share_hash'] is not None:
350                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
351                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
352                 return att_s
353         
354         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
355         
356         print '    ...success!'
357         print
358         
359         # done!
360         
361         def get_blocks(start_hash):
362             while True:
363                 try:
364                     block = get_block.call_now(start_hash)
365                 except deferral.NotNowError:
366                     break
367                 yield start_hash, block
368                 start_hash = block['header']['previous_block']
369         
370         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
371         
372         class Tx(object):
373             def __init__(self, tx, seen_at_block):
374                 self.hash = bitcoin.data.tx_type.hash256(tx)
375                 self.tx = tx
376                 self.seen_at_block = seen_at_block
377                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
378                 #print
379                 #print '%x %r' % (seen_at_block, tx)
380                 #for mention in self.mentions:
381                 #    print '%x' % mention
382                 #print
383                 self.parents_all_in_blocks = False
384                 self.value_in = 0
385                 #print self.tx
386                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
387                 self._find_parents_in_blocks()
388             
389             @defer.inlineCallbacks
390             def _find_parents_in_blocks(self):
391                 for tx_in in self.tx['tx_ins']:
392                     try:
393                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
394                     except Exception:
395                         return
396                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
397                     #print raw_transaction
398                     if not raw_transaction['parent_blocks']:
399                         return
400                 self.parents_all_in_blocks = True
401             
402             def is_good(self):
403                 if not self.parents_all_in_blocks:
404                     return False
405                 x = self.is_good2()
406                 #print 'is_good:', x
407                 return x
408             
409             def is_good2(self):
410                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
411                     if block_hash == self.seen_at_block:
412                         return True
413                     for tx in block['txs']:
414                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
415                         if mentions & self.mentions:
416                             return False
417                 return False
418         
419         @defer.inlineCallbacks
420         def new_tx(tx_hash):
421             try:
422                 assert isinstance(tx_hash, (int, long))
423                 #print "REQUESTING", tx_hash
424                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
425                 #print "GOT", tx
426                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
427             except:
428                 print
429                 print 'Error handling tx:'
430                 log.err()
431                 print
432         factory.new_tx.watch(new_tx)
433         
434         @defer.inlineCallbacks
435         def new_block(block):
436             yield set_real_work1()
437             set_real_work2()
438         factory.new_block.watch(new_block)
439         
440         print 'Started successfully!'
441         print
442         
443         @defer.inlineCallbacks
444         def work1_thread():
445             while True:
446                 yield deferral.sleep(random.expovariate(1/1))
447                 try:
448                     yield set_real_work1()
449                 except:
450                     log.err()
451         
452         
453         @defer.inlineCallbacks
454         def work2_thread():
455             while True:
456                 yield deferral.sleep(random.expovariate(1/1))
457                 try:
458                     yield set_real_work2()
459                 except:
460                     log.err()
461         
462         work1_thread()
463         work2_thread()
464         
465         counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
466         
467         while True:
468             yield deferral.sleep(random.expovariate(1/1))
469             try:
470                 if current_work.value['best_share_hash'] is not None:
471                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
472                     if height > 5:
473                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
474                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
475                         count = counter(current_work.value['best_share_hash'], height, 2**100)
476                         print 'Pool: %i mhash/s in %i shares Recent: %.02f%% >%i mhash/s Known: %i shares (so %i stales)' % (
477                             att_s//1000000,
478                             height,
479                             weights.get(my_script, 0)/total_weight*100,
480                             weights.get(my_script, 0)/total_weight*att_s//1000000,
481                             count,
482                             len(my_shares) - count,
483                         )
484                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
485                         #for k, v in weights.iteritems():
486                         #    print k.encode('hex'), v/total_weight
487             except:
488                 log.err()
489     except:
490         print
491         print 'Fatal error:'
492         log.err()
493         print
494         reactor.stop()
495
496 def run():
497     if __debug__:
498         defer.setDebugging(True)
499     
500     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
501     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
502     parser.add_argument('--testnet',
503         help='use the testnet',
504         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
505     parser.add_argument('-a', '--address',
506         help='generate to this address (defaults to requesting one from bitcoind)',
507         type=str, action='store', default=None, dest='address')
508     
509     p2pool_group = parser.add_argument_group('p2pool interface')
510     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
511         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
512         type=int, action='store', default=None, dest='p2pool_port')
513     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
514         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
515         type=str, action='append', default=[], dest='p2pool_nodes')
516     parser.add_argument('-l', '--low-bandwidth',
517         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
518         action='store_true', default=False, dest='low_bandwidth')
519     
520     worker_group = parser.add_argument_group('worker interface')
521     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
522         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
523         type=int, action='store', default=9332, dest='worker_port')
524     
525     bitcoind_group = parser.add_argument_group('bitcoind interface')
526     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
527         help='connect to a bitcoind at this address (default: 127.0.0.1)',
528         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
529     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
530         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
531         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
532     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
533         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
534         type=int, action='store', default=None, dest='bitcoind_p2p_port')
535     
536     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
537         help='bitcoind RPC interface username',
538         type=str, action='store', dest='bitcoind_rpc_username')
539     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
540         help='bitcoind RPC interface password',
541         type=str, action='store', dest='bitcoind_rpc_password')
542     
543     args = parser.parse_args()
544     
545     if args.bitcoind_p2p_port is None:
546         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
547     
548     if args.p2pool_port is None:
549         args.p2pool_port = args.net.P2P_PORT
550     
551     if args.address is not None:
552         try:
553             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
554         except Exception, e:
555             raise ValueError("error parsing address: " + repr(e))
556     else:
557         args.pubkey_hash = None
558     
559     reactor.callWhenRunning(main, args)
560     reactor.run()