d286c8c2bac6161d8e6fd25aaefe4e65c4bada8b
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import time
14
15 from twisted.internet import defer, reactor, task
16 from twisted.web import server
17 from twisted.python import log
18
19 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
20 from util import db, expiring_dict, jsonrpc, variable, deferral, math, skiplist
21 from . import p2p, worker_interface
22 import p2pool.data as p2pool
23 import p2pool as p2pool_init
24
25 @deferral.retry('Error getting work from bitcoind:', 3)
26 @defer.inlineCallbacks
27 def getwork(bitcoind):
28     # a block could arrive in between these two queries
29     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
30     try:
31         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
32     finally:
33         # get rid of residual errors
34         getwork_df.addErrback(lambda fail: None)
35         height_df.addErrback(lambda fail: None)
36     defer.returnValue((getwork, height))
37
38 @deferral.retry('Error getting payout script from bitcoind:', 1)
39 @defer.inlineCallbacks
40 def get_payout_script(factory):
41     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
42     if res['reply'] == 'success':
43         my_script = res['script']
44     elif res['reply'] == 'denied':
45         my_script = None
46     else:
47         raise ValueError('Unexpected reply: %r' % (res,))
48
49 @deferral.retry('Error creating payout script:', 10)
50 @defer.inlineCallbacks
51 def get_payout_script2(bitcoind, net):
52     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
53
54 @defer.inlineCallbacks
55 def main(args):
56     try:
57         print 'p2pool (version %s)' % (p2pool_init.__version__,)
58         print
59         
60         # connect to bitcoind over JSON-RPC and do initial getwork
61         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
62         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
63         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
64         temp_work, temp_height = yield getwork(bitcoind)
65         print '    ...success!'
66         print '    Current block hash: %x height: %i' % (temp_work.previous_block, temp_height)
67         print
68         
69         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
70         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
71         factory = bitcoin.p2p.ClientFactory(args.net)
72         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
73         my_script = yield get_payout_script(factory)
74         if args.pubkey_hash is None:
75             if my_script is None:
76                 print 'IP transaction denied ... falling back to sending to address.'
77                 my_script = yield get_payout_script2(bitcoind, args.net)
78         else:
79             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         @defer.inlineCallbacks
85         def real_get_block(block_hash):
86             block = yield (yield factory.getProtocol()).get_block(block_hash)
87             print 'Got block %x' % (block_hash,)
88             defer.returnValue(block)
89         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90         
91         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92         
93         ht = bitcoin.p2p.HeightTracker(factory)
94         
95         tracker = p2pool.OkayTracker(args.net)
96         chains = expiring_dict.ExpiringDict(300)
97         def get_chain(chain_id_data):
98             return chains.setdefault(chain_id_data, Chain(chain_id_data))
99         
100         # information affecting work that should trigger a long-polling update
101         current_work = variable.Variable(None)
102         # information affecting work that should not trigger a long-polling update
103         current_work2 = variable.Variable(None)
104         
105         requested = set()
106         task.LoopingCall(requested.clear).start(60)
107         
108         @defer.inlineCallbacks
109         def set_real_work1():
110             work, height = yield getwork(bitcoind)
111             current_work.set(dict(
112                 version=work.version,
113                 previous_block=work.previous_block,
114                 target=work.target,
115                 height=height,
116                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
117             ))
118             current_work2.set(dict(
119                 time=work.timestamp,
120             ))
121         
122         def set_real_work2():
123             best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
124             
125             t = dict(current_work.value)
126             t['best_share_hash'] = best
127             current_work.set(t)
128             
129             for peer2, share_hash in desired:
130                 if peer2 is None:
131                     continue
132                 if (peer2.nonce, share_hash) in requested:
133                     continue
134                 print 'Requesting parent share %x' % (share_hash % 2**32,)
135                 peer2.send_getshares(
136                     hashes=[share_hash],
137                     parents=2000,
138                     stops=list(set(tracker.heads) | set(
139                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
140                     )),
141                 )
142                 requested.add((peer2.nonce, share_hash))
143         
144         print 'Initializing work...'
145         yield set_real_work1()
146         yield set_real_work2()
147         print '    ...success!'
148         
149         # setup p2p logic and join p2pool network
150         
151         def share_share(share, ignore_peer=None):
152             for peer in p2p_node.peers.itervalues():
153                 if peer is ignore_peer:
154                     continue
155                 peer.send_shares([share])
156             share.flag_shared()
157         
158         def p2p_shares(shares, peer=None):
159             for share in shares:
160                 if share.hash in tracker.shares:
161                     #print 'Got duplicate share, ignoring. Hash: %x' % (share.hash % 2**32,)
162                     continue
163                 
164                 #print 'Received share %x from %r' % (share.hash % 2**32, share.peer.transport.getPeer() if share.peer is not None else None)
165                 
166                 tracker.add(share)
167                 #for peer2, share_hash in desired:
168                 #    print 'Requesting parent share %x' % (share_hash,)
169                 #    peer2.send_getshares(hashes=[share_hash], parents=2000)
170                 
171                 if share.bitcoin_hash <= share.header['target']:
172                         print
173                         print 'GOT BLOCK! Passing to bitcoind! %x bitcoin: %x' % (share.hash % 2**32, share.bitcoin_hash,)
174                         print
175                         if factory.conn.value is not None:
176                             factory.conn.value.send_block(block=share.as_block(tracker, net))
177                         else:
178                             print 'No bitcoind connection! Erp!'
179             
180             if shares:
181                 share = shares[0]
182                 
183                 best, desired = tracker.think(ht, current_work.value['previous_block'], current_work2.value['time'])
184                 
185                 if best == share.hash:
186                     print ('MINE: ' if peer is None else '') + 'Accepted share, new best, will pass to peers! Hash: %x' % (share.hash % 2**32,)
187                 else:
188                     print ('MINE: ' if peer is None else '') + 'Accepted share, not best. Hash: %x' % (share.hash % 2**32,)
189                 
190                 w = dict(current_work.value)
191                 w['best_share_hash'] = best
192                 current_work.set(w)
193         
194         def p2p_share_hashes(share_hashes, peer):
195             get_hashes = []
196             for share_hash in share_hashes:
197                 if share_hash in tracker.shares:
198                     pass # print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash % 2**32,)
199                 else:
200                     print 'Got share hash, requesting! Hash: %x' % (share_hash % 2**32,)
201                     get_hashes.append(share_hash)
202             if get_hashes:
203                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
204         
205         def p2p_get_shares(share_hashes, parents, stops, peer):
206             parents = min(parents, 1000//len(share_hashes))
207             stops = set(stops)
208             shares = []
209             for share_hash in share_hashes:
210                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
211                     if share.hash in stops:
212                         break
213                     shares.append(share)
214             peer.send_shares(shares, full=True)
215         
216         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
217         
218         def parse(x):
219             if ':' in x:
220                 ip, port = x.split(':')
221                 return ip, int(port)
222             else:
223                 return x, args.net.P2P_PORT
224         
225         nodes = [
226             ('72.14.191.28', args.net.P2P_PORT),
227             ('62.204.197.159', args.net.P2P_PORT),
228         ]
229         try:
230             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
231         except:
232             print
233             print 'Error resolving bootstrap node IP:'
234             log.err()
235             print
236         
237         p2p_node = p2p.Node(
238             current_work=current_work,
239             port=args.p2pool_port,
240             net=args.net,
241             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
242             mode=0 if args.low_bandwidth else 1,
243             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
244         )
245         p2p_node.handle_shares = p2p_shares
246         p2p_node.handle_share_hashes = p2p_share_hashes
247         p2p_node.handle_get_shares = p2p_get_shares
248         
249         p2p_node.start()
250         
251         # send share when the chain changes to their chain
252         def work_changed(new_work):
253             #print 'Work changed:', new_work
254             for share in tracker.get_chain_known(new_work['best_share_hash']):
255                 if share.shared:
256                     break
257                 share_share(share, share.peer)
258         current_work.changed.watch(work_changed)
259         
260         print '    ...success!'
261         print
262         
263         # start listening for workers with a JSON-RPC server
264         
265         print 'Listening for workers on port %i...' % (args.worker_port,)
266         
267         # setup worker logic
268         
269         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
270         run_identifier = struct.pack('<Q', random.randrange(2**64))
271         
272         def compute(state, all_targets):
273             start = time.time()
274             start = time.time()
275             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
276             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
277             extra_txs = []
278             size = 0
279             for tx in pre_extra_txs:
280                 this_size = len(bitcoin.data.tx_type.pack(tx.tx))
281                 if size + this_size > 500000:
282                     break
283                 extra_txs.append(tx)
284                 size += this_size
285             # XXX check sigops!
286             # XXX assuming generate_tx is smallish here..
287             generate_tx = p2pool.generate_transaction(
288                 tracker=tracker,
289                 previous_share_hash=state['best_share_hash'],
290                 new_script=my_script,
291                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
292                 nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
293                 block_target=state['target'],
294                 net=args.net,
295             )
296             print 'Generating! Difficulty: %.06f Payout if block: %.6f BTC' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], generate_tx['tx_outs'][-1]['value']*1e-8)
297             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
298             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
299             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
300             merkle_root = bitcoin.data.merkle_hash(transactions)
301             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
302             
303             timestamp = current_work2.value['time']
304             if state['best_share_hash'] is not None:
305                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
306                 if timestamp2 > timestamp:
307                     print 'Toff', timestamp2 - timestamp
308                     timestamp = timestamp2
309             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
310             if not all_targets:
311                 target2 = min(2**256//2**32 - 1, target2)
312             print "TOOK", time.time() - start
313             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
314             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
315             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
316         
317         my_shares = set()
318         times = {}
319         
320         def got_response(data):
321             try:
322                 # match up with transactions
323                 header = bitcoin.getwork.decode_data(data)
324                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
325                 if transactions is None:
326                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
327                     return False
328                 block = dict(header=header, txs=transactions)
329                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
330                 if hash_ <= block['header']['target']:
331                     print
332                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
333                     print
334                     if factory.conn.value is not None:
335                         factory.conn.value.send_block(block=block)
336                     else:
337                         print 'No bitcoind connection! Erp!'
338                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
339                 if hash_ > target:
340                     print 'Received invalid share from worker - %x/%x' % (hash_, target)
341                     return False
342                 share = p2pool.Share.from_block(block)
343                 my_shares.add(share.hash)
344                 print 'GOT SHARE! %x %x' % (share.hash, 0 if share.previous_hash is None else share.previous_hash), "DEAD ON ARRIVAL" if share.previous_hash != current_work.value['best_share_hash'] else "", time.time() - times[share.nonce]
345                 p2p_shares([share])
346             except:
347                 print
348                 print 'Error processing data received from worker:'
349                 log.err()
350                 print
351                 return False
352             else:
353                 return True
354         
355         def get_rate():
356             if current_work.value['best_share_hash'] is not None:
357                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
358                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
359                 return att_s
360         
361         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response, get_rate)))
362         
363         print '    ...success!'
364         print
365         
366         # done!
367         
368         def get_blocks(start_hash):
369             while True:
370                 try:
371                     block = get_block.call_now(start_hash)
372                 except deferral.NotNowError:
373                     break
374                 yield start_hash, block
375                 start_hash = block['header']['previous_block']
376         
377         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
378         
379         class Tx(object):
380             def __init__(self, tx, seen_at_block):
381                 self.hash = bitcoin.data.tx_type.hash256(tx)
382                 self.tx = tx
383                 self.seen_at_block = seen_at_block
384                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
385                 #print
386                 #print '%x %r' % (seen_at_block, tx)
387                 #for mention in self.mentions:
388                 #    print '%x' % mention
389                 #print
390                 self.parents_all_in_blocks = False
391                 self.value_in = 0
392                 #print self.tx
393                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
394                 self._find_parents_in_blocks()
395             
396             @defer.inlineCallbacks
397             def _find_parents_in_blocks(self):
398                 for tx_in in self.tx['tx_ins']:
399                     try:
400                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
401                     except Exception:
402                         return
403                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
404                     #print raw_transaction
405                     if not raw_transaction['parent_blocks']:
406                         return
407                 self.parents_all_in_blocks = True
408             
409             def is_good(self):
410                 if not self.parents_all_in_blocks:
411                     return False
412                 x = self.is_good2()
413                 #print 'is_good:', x
414                 return x
415             
416             def is_good2(self):
417                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
418                     if block_hash == self.seen_at_block:
419                         return True
420                     for tx in block['txs']:
421                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins'] if tx_in['previous_output'] is not None])
422                         if mentions & self.mentions:
423                             return False
424                 return False
425         
426         @defer.inlineCallbacks
427         def new_tx(tx_hash):
428             try:
429                 assert isinstance(tx_hash, (int, long))
430                 #print "REQUESTING", tx_hash
431                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
432                 #print "GOT", tx
433                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
434             except:
435                 print
436                 print 'Error handling tx:'
437                 log.err()
438                 print
439         factory.new_tx.watch(new_tx)
440         
441         @defer.inlineCallbacks
442         def new_block(block):
443             yield set_real_work1()
444             set_real_work2()
445         factory.new_block.watch(new_block)
446         
447         print 'Started successfully!'
448         print
449         
450         @defer.inlineCallbacks
451         def work1_thread():
452             while True:
453                 yield deferral.sleep(random.expovariate(1/1))
454                 try:
455                     yield set_real_work1()
456                 except:
457                     log.err()
458         
459         
460         @defer.inlineCallbacks
461         def work2_thread():
462             while True:
463                 yield deferral.sleep(random.expovariate(1/1))
464                 try:
465                     yield set_real_work2()
466                 except:
467                     log.err()
468         
469         work1_thread()
470         work2_thread()
471         
472         counter = skiplist.CountsSkipList(tracker, my_script, run_identifier)
473         
474         while True:
475             yield deferral.sleep(random.expovariate(1/1))
476             try:
477                 if current_work.value['best_share_hash'] is not None:
478                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
479                     if height > 5:
480                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)
481                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
482                         count = counter(current_work.value['best_share_hash'], height, 2**100)
483                         print 'Pool: %i mhash/s in %i shares Recent: %.02f%% >%i mhash/s Known: %i shares (so %i stales)' % (
484                             att_s//1000000,
485                             height,
486                             weights.get(my_script, 0)/total_weight*100,
487                             weights.get(my_script, 0)/total_weight*att_s//1000000,
488                             count,
489                             len(my_shares) - count,
490                         )
491                         #weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 100), 2**100)
492                         #for k, v in weights.iteritems():
493                         #    print k.encode('hex'), v/total_weight
494             except:
495                 log.err()
496     except:
497         print
498         print 'Fatal error:'
499         log.err()
500         print
501         reactor.stop()
502
503 def run():
504     if __debug__:
505         defer.setDebugging(True)
506     
507     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,))
508     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
509     parser.add_argument('--testnet',
510         help='use the testnet',
511         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
512     parser.add_argument('-a', '--address',
513         help='generate to this address (defaults to requesting one from bitcoind)',
514         type=str, action='store', default=None, dest='address')
515     
516     p2pool_group = parser.add_argument_group('p2pool interface')
517     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
518         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
519         type=int, action='store', default=None, dest='p2pool_port')
520     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
521         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
522         type=str, action='append', default=[], dest='p2pool_nodes')
523     parser.add_argument('-l', '--low-bandwidth',
524         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
525         action='store_true', default=False, dest='low_bandwidth')
526     
527     worker_group = parser.add_argument_group('worker interface')
528     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
529         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
530         type=int, action='store', default=9332, dest='worker_port')
531     
532     bitcoind_group = parser.add_argument_group('bitcoind interface')
533     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
534         help='connect to a bitcoind at this address (default: 127.0.0.1)',
535         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
536     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
537         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
538         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
539     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
540         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
541         type=int, action='store', default=None, dest='bitcoind_p2p_port')
542     
543     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
544         help='bitcoind RPC interface username',
545         type=str, action='store', dest='bitcoind_rpc_username')
546     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
547         help='bitcoind RPC interface password',
548         type=str, action='store', dest='bitcoind_rpc_password')
549     
550     args = parser.parse_args()
551     
552     if args.bitcoind_p2p_port is None:
553         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
554     
555     if args.p2pool_port is None:
556         args.p2pool_port = args.net.P2P_PORT
557     
558     if args.address is not None:
559         try:
560             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
561         except Exception, e:
562             raise ValueError("error parsing address: " + repr(e))
563     else:
564         args.pubkey_hash = None
565     
566     reactor.callWhenRunning(main, args)
567     reactor.run()