cleanup, compressed p2p, bugs, finished HeightTracker
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 @deferral.retry('Error getting work from bitcoind:', 1)
29 @defer.inlineCallbacks
30 def getwork(bitcoind):
31     # a block could arrive in between these two queries
32     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33     try:
34         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35     finally:
36         # get rid of residual errors
37         getwork_df.addErrback(lambda fail: None)
38         height_df.addErrback(lambda fail: None)
39     defer.returnValue((getwork, height))
40
41 @deferral.retry('Error getting payout script from bitcoind:', 1)
42 @defer.inlineCallbacks
43 def get_payout_script(factory):
44     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
45     if res['reply'] == 'success':
46         my_script = res['script']
47     elif res['reply'] == 'denied':
48         my_script = None
49     else:
50         raise ValueError('Unexpected reply: %r' % (res,))
51
52 @deferral.retry('Error creating payout script:', 10)
53 @defer.inlineCallbacks
54 def get_payout_script2(bitcoind, net):
55     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getnewaddress()), net)))
56
57 @defer.inlineCallbacks
58 def main(args):
59     try:
60         print 'p2pool (version %s)' % (__version__,)
61         print
62         
63         # connect to bitcoind over JSON-RPC and do initial getwork
64         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
65         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
66         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
67         work, height = yield getwork(bitcoind)
68         print '    ...success!'
69         print '    Current block hash: %x height: %i' % (work.previous_block, height)
70         print
71         
72         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
73         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74         factory = bitcoin.p2p.ClientFactory(args.net)
75         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76         my_script = yield get_payout_script(factory)
77         if my_script is None:
78             print 'IP transaction denied ... falling back to sending to address. Enable IP transactions on your bitcoind!'
79             my_script = yield get_payout_script2(bitcoind, args.net)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         @defer.inlineCallbacks
85         def real_get_block(block_hash):
86             block = yield (yield factory.getProtocol()).get_block(block_hash)
87             print 'Got block %x' % (block_hash,)
88             defer.returnValue(block)
89         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90         
91         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92         
93         ht = bitcoin.p2p.HeightTracker(factory)
94         
95         tracker = p2pool.OkayTracker(args.net)
96         chains = expiring_dict.ExpiringDict(300)
97         def get_chain(chain_id_data):
98             return chains.setdefault(chain_id_data, Chain(chain_id_data))
99         
100         # information affecting work that should trigger a long-polling update
101         current_work = variable.Variable(None)
102         # information affecting work that should not trigger a long-polling update
103         current_work2 = variable.Variable(None)
104         
105         @defer.inlineCallbacks
106         def set_real_work():
107             work, height = yield getwork(bitcoind)
108             best, desired = tracker.think(ht)
109             for peer2, share_hash in desired:
110                 print 'Requesting parent share %x' % (share_hash,)
111                 peer2.send_getshares(hashes=[share_hash], parents=2000, stops=list(set(tracker.heads) | set())
112             current_work.set(dict(
113                 version=work.version,
114                 previous_block=work.previous_block,
115                 target=work.target,
116                 
117                 height=height + 1,
118                 
119                 best_share_hash=best,
120             ))
121             current_work2.set(dict(
122                 timestamp=work.timestamp,
123             ))
124         
125         print 'Initializing work...'
126         yield set_real_work()
127         print '    ...success!'
128         
129         # setup p2p logic and join p2pool network
130         
131         def share_share(share, ignore_peer=None):
132             for peer in p2p_node.peers.itervalues():
133                 if peer is ignore_peer:
134                     continue
135                 peer.send_share(share)
136             share.flag_shared()
137         
138         def p2p_share(share, peer=None):
139             if share.hash in tracker.shares:
140                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
141                 return
142             
143             #print 'Received share %x' % (share.hash,)
144             
145             tracker.add(share)
146             best, desired = tracker.think(ht)
147             #for peer2, share_hash in desired:
148             #    print 'Requesting parent share %x' % (share_hash,)
149             #    peer2.send_getshares(hashes=[share_hash], parents=2000)
150             
151             if share.gentx is not None:
152                 if share.hash <= share.header['target']:
153                     print
154                     print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
155                     print
156                     if factory.conn.value is not None:
157                         factory.conn.value.send_block(block=share.as_block())
158                     else:
159                         print 'No bitcoind connection! Erp!'
160             
161             w = dict(current_work.value)
162             w['best_share_hash'] = best
163             current_work.set(w)
164             
165             if best == share.hash:
166                 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
167             else:
168                 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
169         
170         def p2p_share_hash(share_hash, peer):
171             if share_hash in tracker.shares:
172                 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
173             else:
174                 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
175                 peer.send_getshares(hashes=[share_hash], parents=0, stops=[])
176         
177         def p2p_get_to_best(chain_id_data, have, peer):
178             # XXX
179             chain = get_chain(chain_id_data)
180             if chain.highest.value is None:
181                 return
182             
183             chain_hashes = chain.get_down(chain.highest.value)
184             
185             have2 = set()
186             for hash_ in have:
187                 have2 |= set(chain.get_down(hash_))
188             
189             for share_hash in reversed(chain_hashes):
190                 if share_hash in have2:
191                     continue
192                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
193         
194         def p2p_get_shares(share_hashes, parents, peer):
195             parents = min(parents, 100//len(share_hashes))
196             for share_hash in share_hashes:
197                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
198                     peer.send_share(share, full=True)
199         
200         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
201         
202         def parse(x):
203             if ':' in x:
204                 ip, port = x.split(':')
205                 return ip, int(port)
206             else:
207                 return x, args.net.P2P_PORT
208         
209         nodes = [
210             ('72.14.191.28', args.net.P2P_PORT),
211             ('62.204.197.159', args.net.P2P_PORT),
212         ]
213         try:
214             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
215         except:
216             print
217             print 'Error resolving bootstrap node IP:'
218             log.err()
219             print
220         
221         p2p_node = p2p.Node(
222             current_work=current_work,
223             port=args.p2pool_port,
224             net=args.net,
225             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
226             mode=0 if args.low_bandwidth else 1,
227             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
228         )
229         p2p_node.handle_share = p2p_share
230         p2p_node.handle_share_hash = p2p_share_hash
231         p2p_node.handle_get_to_best = p2p_get_to_best
232         p2p_node.handle_get_shares = p2p_get_shares
233         
234         p2p_node.start()
235         
236         # send share when the chain changes to their chain
237         def work_changed(new_work):
238             #print 'Work changed:', new_work
239             for share in tracker.get_chain_known(new_work['best_share_hash']):
240                 if share.shared:
241                     break
242                 share_share(share, share.peer)
243         current_work.changed.watch(work_changed)
244         
245         print '    ...success!'
246         print
247         
248         # start listening for workers with a JSON-RPC server
249         
250         print 'Listening for workers on port %i...' % (args.worker_port,)
251         
252         # setup worker logic
253         
254         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
255         
256         def compute(state):
257             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
258             # XXX limit to merkle_branch and block max size - 1000000 byte
259             # and sigops
260             generate_tx = p2pool.generate_transaction(
261                 tracker=tracker,
262                 previous_share_hash=state['best_share_hash'],
263                 new_script=my_script,
264                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
265                 nonce=struct.pack('<Q', random.randrange(2**64)),
266                 block_target=state['target'],
267                 net=args.net,
268             )
269             print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
270             print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
271             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
272             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
273             merkle_root = bitcoin.data.merkle_hash(transactions)
274             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
275             
276             timestamp = current_work2.value['timestamp']
277             if state['best_share_hash'] is not None:
278                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
279                 if timestamp2 > timestamp:
280                     print 'Toff', timestamp2 - timestamp
281                     timestamp = timestamp2
282             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
283             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
284             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
285         
286         def got_response(data):
287             try:
288                 # match up with transactions
289                 header = bitcoin.getwork.decode_data(data)
290                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
291                 if transactions is None:
292                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
293                     return False
294                 block = dict(header=header, txs=transactions)
295                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
296                 if hash_ <= block['header']['target']:
297                     print
298                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
299                     print
300                     if factory.conn.value is not None:
301                         factory.conn.value.send_block(block=block)
302                     else:
303                         print 'No bitcoind connection! Erp!'
304                 share = p2pool.Share.from_block(block)
305                 print 'GOT SHARE! %x' % (share.hash,)
306                 p2p_share(share)
307             except:
308                 print
309                 print 'Error processing data received from worker:'
310                 log.err()
311                 print
312                 return False
313             else:
314                 return True
315         
316         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
317         
318         print '    ...success!'
319         print
320         
321         # done!
322         
323         def get_blocks(start_hash):
324             while True:
325                 try:
326                     block = get_block.call_now(start_hash)
327                 except deferral.NotNowError:
328                     break
329                 yield start_hash, block
330                 start_hash = block['header']['previous_block']
331         
332         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
333         
334         class Tx(object):
335             def __init__(self, tx, seen_at_block):
336                 self.hash = bitcoin.data.tx_type.hash256(tx)
337                 self.tx = tx
338                 self.seen_at_block = seen_at_block
339                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
340                 #print
341                 #print '%x %r' % (seen_at_block, tx)
342                 #for mention in self.mentions:
343                 #    print '%x' % mention
344                 #print
345                 self.parents_all_in_blocks = False
346                 self.value_in = 0
347                 #print self.tx
348                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
349                 self._find_parents_in_blocks()
350             
351             @defer.inlineCallbacks
352             def _find_parents_in_blocks(self):
353                 for tx_in in self.tx['tx_ins']:
354                     try:
355                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
356                     except Exception:
357                         return
358                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
359                     #print raw_transaction
360                     if not raw_transaction['parent_blocks']:
361                         return
362                 self.parents_all_in_blocks = True
363             
364             def is_good(self):
365                 if not self.parents_all_in_blocks:
366                     return False
367                 x = self.is_good2()
368                 #print 'is_good:', x
369                 return x
370             
371             def is_good2(self):
372                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
373                     if block_hash == self.seen_at_block:
374                         return True
375                     for tx in block['txs']:
376                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
377                         if mentions & self.mentions:
378                             return False
379                 return False
380         
381         @defer.inlineCallbacks
382         def new_tx(tx_hash):
383             try:
384                 assert isinstance(tx_hash, (int, long))
385                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
386                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
387             except:
388                 print
389                 print 'Error handling tx:'
390                 log.err()
391                 print
392         factory.new_tx.watch(new_tx)
393         
394         def new_block(block):
395             set_real_work()
396         factory.new_block.watch(new_block)
397         
398         print 'Started successfully!'
399         print
400         
401         while True:
402             yield deferral.sleep(1)
403             set_real_work()
404     except:
405         print
406         print 'Fatal error:'
407         log.err()
408         print
409         reactor.stop()
410
411 def run():
412     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
413     parser.add_argument('--version', action='version', version=__version__)
414     parser.add_argument('--testnet',
415         help='use the testnet',
416         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
417     
418     p2pool_group = parser.add_argument_group('p2pool interface')
419     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
420         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
421         type=int, action='store', default=None, dest='p2pool_port')
422     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
423         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
424         type=str, action='append', default=[], dest='p2pool_nodes')
425     parser.add_argument('-l', '--low-bandwidth',
426         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
427         action='store_true', default=False, dest='low_bandwidth')
428     
429     worker_group = parser.add_argument_group('worker interface')
430     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
431         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
432         type=int, action='store', default=9332, dest='worker_port')
433     
434     bitcoind_group = parser.add_argument_group('bitcoind interface')
435     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
436         help='connect to a bitcoind at this address (default: 127.0.0.1)',
437         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
438     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
439         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
440         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
441     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
442         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
443         type=int, action='store', default=None, dest='bitcoind_p2p_port')
444     
445     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
446         help='bitcoind RPC interface username',
447         type=str, action='store', dest='bitcoind_rpc_username')
448     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
449         help='bitcoind RPC interface password',
450         type=str, action='store', dest='bitcoind_rpc_password')
451     
452     args = parser.parse_args()
453     
454     if args.bitcoind_p2p_port is None:
455         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
456     
457     if args.p2pool_port is None:
458         args.p2pool_port = args.net.P2P_PORT
459     
460     reactor.callWhenRunning(main, args)
461     reactor.run()