added pool rate display
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 @deferral.retry('Error getting work from bitcoind:', 3)
29 @defer.inlineCallbacks
30 def getwork(bitcoind):
31     # a block could arrive in between these two queries
32     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33     try:
34         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35     finally:
36         # get rid of residual errors
37         getwork_df.addErrback(lambda fail: None)
38         height_df.addErrback(lambda fail: None)
39     defer.returnValue((getwork, height))
40
41 @deferral.retry('Error getting payout script from bitcoind:', 1)
42 @defer.inlineCallbacks
43 def get_payout_script(factory):
44     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
45     if res['reply'] == 'success':
46         my_script = res['script']
47     elif res['reply'] == 'denied':
48         my_script = None
49     else:
50         raise ValueError('Unexpected reply: %r' % (res,))
51
52 @deferral.retry('Error creating payout script:', 10)
53 @defer.inlineCallbacks
54 def get_payout_script2(bitcoind, net):
55     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getnewaddress()), net)))
56
57 @defer.inlineCallbacks
58 def main(args):
59     try:
60         print 'p2pool (version %s)' % (__version__,)
61         print
62         
63         # connect to bitcoind over JSON-RPC and do initial getwork
64         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
65         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
66         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
67         work, height = yield getwork(bitcoind)
68         print '    ...success!'
69         print '    Current block hash: %x height: %i' % (work.previous_block, height)
70         print
71         
72         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
73         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74         factory = bitcoin.p2p.ClientFactory(args.net)
75         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76         my_script = yield get_payout_script(factory)
77         if my_script is None:
78             print 'IP transaction denied ... falling back to sending to address. Enable IP transactions on your bitcoind!'
79             my_script = yield get_payout_script2(bitcoind, args.net)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         @defer.inlineCallbacks
85         def real_get_block(block_hash):
86             block = yield (yield factory.getProtocol()).get_block(block_hash)
87             print 'Got block %x' % (block_hash,)
88             defer.returnValue(block)
89         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90         
91         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92         
93         ht = bitcoin.p2p.HeightTracker(factory)
94         
95         tracker = p2pool.OkayTracker(args.net)
96         chains = expiring_dict.ExpiringDict(300)
97         def get_chain(chain_id_data):
98             return chains.setdefault(chain_id_data, Chain(chain_id_data))
99         
100         # information affecting work that should trigger a long-polling update
101         current_work = variable.Variable(None)
102         # information affecting work that should not trigger a long-polling update
103         current_work2 = variable.Variable(None)
104         
105         requested = set()
106         
107         @defer.inlineCallbacks
108         def set_real_work():
109             work, height = yield getwork(bitcoind)
110             current_work2.set(dict(
111                 time=work.timestamp,
112             ))
113             best, desired = tracker.think(ht, current_work2.value['time'])
114             for peer2, share_hash in desired:
115                 if peer2 is None:
116                     continue
117                 if (peer2.nonce, share_hash) in requested:
118                     continue
119                 print 'Requesting parent share %x' % (share_hash,)
120                 peer2.send_getshares(
121                     hashes=[share_hash],
122                     parents=2000,
123                     stops=list(set(tracker.heads) | set(
124                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
125                     )),
126                 )
127                 requested.add((peer2.nonce, share_hash))
128             current_work.set(dict(
129                 version=work.version,
130                 previous_block=work.previous_block,
131                 target=work.target,
132                 height=height,
133                 best_share_hash=best,
134             ))
135         
136         print 'Initializing work...'
137         yield set_real_work()
138         print '    ...success!'
139         
140         # setup p2p logic and join p2pool network
141         
142         def share_share(share, ignore_peer=None):
143             for peer in p2p_node.peers.itervalues():
144                 if peer is ignore_peer:
145                     continue
146                 peer.send_shares([share])
147             share.flag_shared()
148         
149         def p2p_share(share, peer=None):
150             if share.hash in tracker.shares:
151                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
152                 return
153             
154             #print 'Received share %x' % (share.hash,)
155             
156             tracker.add(share)
157             best, desired = tracker.think(ht, current_work2.value['time'])
158             #for peer2, share_hash in desired:
159             #    print 'Requesting parent share %x' % (share_hash,)
160             #    peer2.send_getshares(hashes=[share_hash], parents=2000)
161             
162             if share.gentx is not None:
163                 if share.hash <= share.header['target']:
164                     print
165                     print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
166                     print
167                     if factory.conn.value is not None:
168                         factory.conn.value.send_block(block=share.as_block())
169                     else:
170                         print 'No bitcoind connection! Erp!'
171             
172             w = dict(current_work.value)
173             w['best_share_hash'] = best
174             current_work.set(w)
175             
176             if best == share.hash:
177                 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
178             else:
179                 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
180         
181         def p2p_share_hash(share_hash, peer):
182             if share_hash in tracker.shares:
183                 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
184             else:
185                 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
186                 peer.send_getshares(hashes=[share_hash], parents=0, stops=[])
187         
188         def p2p_get_shares(share_hashes, parents, stops, peer):
189             parents = min(parents, 1000//len(share_hashes))
190             stops = set(stops)
191             shares = []
192             for share_hash in share_hashes:
193                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
194                     if share.hash in stops:
195                         break
196                     shares.append(share)
197             peer.send_shares(shares, full=True)
198         
199         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
200         
201         def parse(x):
202             if ':' in x:
203                 ip, port = x.split(':')
204                 return ip, int(port)
205             else:
206                 return x, args.net.P2P_PORT
207         
208         nodes = [
209             ('72.14.191.28', args.net.P2P_PORT),
210             ('62.204.197.159', args.net.P2P_PORT),
211         ]
212         try:
213             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
214         except:
215             print
216             print 'Error resolving bootstrap node IP:'
217             log.err()
218             print
219         
220         p2p_node = p2p.Node(
221             current_work=current_work,
222             port=args.p2pool_port,
223             net=args.net,
224             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
225             mode=0 if args.low_bandwidth else 1,
226             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
227         )
228         p2p_node.handle_share = p2p_share
229         p2p_node.handle_share_hash = p2p_share_hash
230         p2p_node.handle_get_shares = p2p_get_shares
231         
232         p2p_node.start()
233         
234         # send share when the chain changes to their chain
235         def work_changed(new_work):
236             #print 'Work changed:', new_work
237             for share in tracker.get_chain_known(new_work['best_share_hash']):
238                 if share.shared:
239                     break
240                 share_share(share, share.peer)
241         current_work.changed.watch(work_changed)
242         
243         print '    ...success!'
244         print
245         
246         # start listening for workers with a JSON-RPC server
247         
248         print 'Listening for workers on port %i...' % (args.worker_port,)
249         
250         # setup worker logic
251         
252         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
253         
254         def compute(state, all_targets):
255             pre_extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
256             pre_extra_txs = pre_extra_txs[:2**16 - 1] # merkle_branch limit
257             extra_txs = []
258             size = 0
259             for tx in pre_extra_txs:
260                 this_size = bitcoin_data.tx_type.pack(tx)
261                 if size + this_size > 500000:
262                     break
263                 extra_txs.append(tx)
264                 size += this_size
265             # XXX check sigops!
266             # XXX assuming generate_tx is smallish here..
267             generate_tx = p2pool.generate_transaction(
268                 tracker=tracker,
269                 previous_share_hash=state['best_share_hash'],
270                 new_script=my_script,
271                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
272                 nonce=struct.pack('<Q', random.randrange(2**64)),
273                 block_target=state['target'],
274                 net=args.net,
275             )
276             print 'Generating!'
277             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
278             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
279             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
280             merkle_root = bitcoin.data.merkle_hash(transactions)
281             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
282             
283             timestamp = current_work2.value['time']
284             if state['best_share_hash'] is not None:
285                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
286                 if timestamp2 > timestamp:
287                     print 'Toff', timestamp2 - timestamp
288                     timestamp = timestamp2
289             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
290             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
291             target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
292             if not all_targets:
293                 target = min(2**256//2**32 - 1, target)
294             return ba.getwork(target)
295         
296         def got_response(data):
297             try:
298                 # match up with transactions
299                 header = bitcoin.getwork.decode_data(data)
300                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
301                 if transactions is None:
302                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
303                     return False
304                 block = dict(header=header, txs=transactions)
305                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
306                 if hash_ <= block['header']['target']:
307                     print
308                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
309                     print
310                     if factory.conn.value is not None:
311                         factory.conn.value.send_block(block=block)
312                     else:
313                         print 'No bitcoind connection! Erp!'
314                 share = p2pool.Share.from_block(block)
315                 print 'GOT SHARE! %x' % (share.hash,)
316                 p2p_share(share)
317             except:
318                 print
319                 print 'Error processing data received from worker:'
320                 log.err()
321                 print
322                 return False
323             else:
324                 return True
325         
326         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
327         
328         print '    ...success!'
329         print
330         
331         # done!
332         
333         def get_blocks(start_hash):
334             while True:
335                 try:
336                     block = get_block.call_now(start_hash)
337                 except deferral.NotNowError:
338                     break
339                 yield start_hash, block
340                 start_hash = block['header']['previous_block']
341         
342         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
343         
344         class Tx(object):
345             def __init__(self, tx, seen_at_block):
346                 self.hash = bitcoin.data.tx_type.hash256(tx)
347                 self.tx = tx
348                 self.seen_at_block = seen_at_block
349                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
350                 #print
351                 #print '%x %r' % (seen_at_block, tx)
352                 #for mention in self.mentions:
353                 #    print '%x' % mention
354                 #print
355                 self.parents_all_in_blocks = False
356                 self.value_in = 0
357                 #print self.tx
358                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
359                 self._find_parents_in_blocks()
360             
361             @defer.inlineCallbacks
362             def _find_parents_in_blocks(self):
363                 for tx_in in self.tx['tx_ins']:
364                     try:
365                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
366                     except Exception:
367                         return
368                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
369                     #print raw_transaction
370                     if not raw_transaction['parent_blocks']:
371                         return
372                 self.parents_all_in_blocks = True
373             
374             def is_good(self):
375                 if not self.parents_all_in_blocks:
376                     return False
377                 x = self.is_good2()
378                 #print 'is_good:', x
379                 return x
380             
381             def is_good2(self):
382                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
383                     if block_hash == self.seen_at_block:
384                         return True
385                     for tx in block['txs']:
386                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
387                         if mentions & self.mentions:
388                             return False
389                 return False
390         
391         @defer.inlineCallbacks
392         def new_tx(tx_hash):
393             try:
394                 assert isinstance(tx_hash, (int, long))
395                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
396                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
397             except:
398                 print
399                 print 'Error handling tx:'
400                 log.err()
401                 print
402         factory.new_tx.watch(new_tx)
403         
404         def new_block(block):
405             set_real_work()
406         factory.new_block.watch(new_block)
407         
408         print 'Started successfully!'
409         print
410         
411         while True:
412             yield deferral.sleep(1)
413             try:
414                 yield set_real_work()
415             except:
416                 log.err()
417             try:
418                 if current_work.value['best_share_hash'] is not None:
419                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
420                     if height > 5:
421                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 1000), 2**100)
422                         print 'Pool rate: %i mhash/s Contribution: %.02f%%' % (
423                             p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net)//1000000,
424                             weights.get(my_script, 0)/total_weight,
425                         )
426             except:
427                 log.err()
428     except:
429         print
430         print 'Fatal error:'
431         log.err()
432         print
433         reactor.stop()
434
435 def run():
436     if __debug__:
437         defer.setDebugging(True)
438     
439     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
440     parser.add_argument('--version', action='version', version=__version__)
441     parser.add_argument('--testnet',
442         help='use the testnet',
443         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
444     
445     p2pool_group = parser.add_argument_group('p2pool interface')
446     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
447         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
448         type=int, action='store', default=None, dest='p2pool_port')
449     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
450         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
451         type=str, action='append', default=[], dest='p2pool_nodes')
452     parser.add_argument('-l', '--low-bandwidth',
453         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
454         action='store_true', default=False, dest='low_bandwidth')
455     
456     worker_group = parser.add_argument_group('worker interface')
457     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
458         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
459         type=int, action='store', default=9332, dest='worker_port')
460     
461     bitcoind_group = parser.add_argument_group('bitcoind interface')
462     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
463         help='connect to a bitcoind at this address (default: 127.0.0.1)',
464         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
465     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
466         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
467         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
468     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
469         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
470         type=int, action='store', default=None, dest='bitcoind_p2p_port')
471     
472     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
473         help='bitcoind RPC interface username',
474         type=str, action='store', dest='bitcoind_rpc_username')
475     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
476         help='bitcoind RPC interface password',
477         type=str, action='store', dest='bitcoind_rpc_password')
478     
479     args = parser.parse_args()
480     
481     if args.bitcoind_p2p_port is None:
482         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
483     
484     if args.p2pool_port is None:
485         args.p2pool_port = args.net.P2P_PORT
486     
487     reactor.callWhenRunning(main, args)
488     reactor.run()