setup.py
[p2pool.git] / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import os
7 import random
8 import sqlite3
9 import subprocess
10 import sys
11 import traceback
12
13 from twisted.internet import defer, reactor
14 from twisted.web import server
15
16 import bitcoin_p2p
17 import conv
18 import db
19 import expiring_dict
20 import jsonrpc
21 import p2p
22 import p2pool
23 import util
24 import worker_interface
25
26 class Chain(object):
27     def __init__(self, chain_id_data):
28         self.chain_id_data = chain_id_data
29         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
30         
31         self.share2s = {} # hash -> share2
32         self.highest = util.Variable(None) # hash
33         
34         self.requesting = set()
35         self.request_map = {}
36     
37     def accept(self, share, net):
38         if share.chain_id_data != self.chain_id_data:
39             raise ValueError('share does not belong to this chain')
40         
41         if share.hash in self.share2s:
42             return 'dup'
43         
44         if share.previous_share_hash is None:
45             previous_height, previous_share2 = -1, None
46         elif share.previous_share_hash not in self.share2s:
47             return 'orphan'
48         else:
49             previous_share2 = self.share2s[share.previous_share_hash]
50             previous_height = previous_share2.height
51         
52         height = previous_height + 1
53         
54         share2 = share.check(self, height, previous_share2, net) # raises exceptions
55         
56         if share2.share is not share:
57             raise ValueError()
58         
59         self.share2s[share.hash] = share2
60         
61         if self.highest.value is None or height > self.share2s[self.highest.value].height:
62             self.highest.set(share.hash)
63         
64         return 'good'
65     
66     def get_highest_share2(self):
67         return self.share2s[self.highest.value] if self.highest.value is not None else None
68
69 @defer.inlineCallbacks
70 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
71     block_hash = current_block_hash
72     while True:
73         if block_hash == net.ROOT_BLOCK:
74             defer.returnValue(block_hash)
75         block = yield get_block(block_hash)
76         coinbase_data = block['txns'][0]['tx_ins'][0]['script']
77         try:
78             coinbase = p2pool.coinbase_type.unpack(coinbase_data, ignore_extra=True)
79         except bitcoin_p2p.EarlyEnd:
80             pass
81         else:
82             try:
83                 if coinbase['identifier'] == net.IDENTIFIER:
84                     payouts = {}
85                     for tx_out in block['txns'][0]['tx_outs']:
86                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
87                     subsidy = sum(payouts.itervalues())
88                     if coinbase['subsidy'] == subsidy:
89                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
90                             defer.returnValue(block_hash)
91             except Exception:
92                 print
93                 print 'Error matching block:'
94                 print 'block:', block
95                 traceback.print_exc()
96                 print
97         block_hash = block['header']['previous_block']
98
99 @defer.inlineCallbacks
100 def getwork(bitcoind):
101     while True:
102         try:
103             getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
104             getwork, height = conv.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
105         except:
106             print
107             print 'Error getting work from bitcoind:'
108             traceback.print_exc()
109             print
110             yield util.sleep(1)
111             continue
112         defer.returnValue((getwork, height))
113
114 @defer.inlineCallbacks
115 def main(args):
116     try:
117         net = p2pool.Testnet if args.testnet else p2pool.Main
118         
119         print name
120         print
121         
122         # connect to bitcoind over JSON-RPC and do initial getwork
123         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
124         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
125         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
126         
127         work, height = yield getwork(bitcoind)
128         
129         print '    ...success!'
130         print '    Current block hash: %x height: %i' % (work.previous_block, height)
131         print
132         
133         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
134         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
135         factory = bitcoin_p2p.ClientFactory(args.testnet)
136         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
137         
138         while True:
139             try:
140                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin_p2p.Protocol.null_order)
141                 if res['reply'] != 'success':
142                     print
143                     print 'Error getting payout script:'
144                     print res
145                     print
146                     continue
147                 my_script = res['script']
148             except:
149                 print
150                 print 'Error getting payout script:'
151                 traceback.print_exc()
152                 print
153             else:
154                 break
155             yield util.sleep(1)
156         
157         print '    ...success!'
158         print '    Payout script:', my_script.encode('hex')
159         print
160         
161         @defer.inlineCallbacks
162         def real_get_block(block_hash):
163             block = yield (yield factory.getProtocol()).get_block(block_hash)
164             print 'Got block %x' % (block_hash,)
165             defer.returnValue(block)
166         get_block = util.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
167         
168         chains = expiring_dict.ExpiringDict(300)
169         def get_chain(chain_id_data):
170             return chains.setdefault(chain_id_data, Chain(chain_id_data))
171         # information affecting work that should trigger a long-polling update
172         current_work = util.Variable(None)
173         # information affecting work that should not trigger a long-polling update
174         current_work2 = util.Variable(None)
175         
176         share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares]
177         
178         @defer.inlineCallbacks
179         def set_real_work():
180             work, height = yield getwork(bitcoind)
181             last_p2pool_block_hash = (yield get_last_p2pool_block_hash(work.previous_block, get_block, net))
182             chain = get_chain(p2pool.chain_id_type.pack(dict(last_p2pool_block_hash=last_p2pool_block_hash, bits=work.bits)))
183             current_work.set(dict(
184                 version=work.version,
185                 previous_block=work.previous_block,
186                 bits=work.bits,
187                 height=height + 1,
188                 current_chain=chain,
189                 highest_p2pool_share2=chain.get_highest_share2(),
190                 last_p2pool_block_hash=last_p2pool_block_hash,
191             ))
192             current_work2.set(dict(
193                 timestamp=work.timestamp,
194             ))
195         
196         print 'Searching for last p2pool-generated block...'
197         yield set_real_work()
198         print '    ...success!'
199         print '    Matched block %x' % (current_work.value['last_p2pool_block_hash'],)
200         print
201         
202         # setup p2p logic and join p2pool network
203         
204         def share_share2(share2, ignore_peer=None):
205             for peer in p2p_node.peers.itervalues():
206                 if peer is ignore_peer:
207                     continue
208                 peer.send_share(share2.share)
209             share2.flag_shared()
210         
211         def p2p_share(share, peer=None):
212             if share.hash <= conv.bits_to_target(share.header['bits']):
213                 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
214                 if factory.conn is not None:
215                     factory.conn.send_block(block=share.as_block())
216                 else:
217                     print 'No bitcoind connection! Erp!'
218             
219             chain = get_chain(share.chain_id_data)
220             res = chain.accept(share, net)
221             if res == 'good':
222                 share2 = chain.share2s[share.hash]
223                 
224                 hash_data = bitcoin_p2p.HashType().pack(share.hash)
225                 share1_data = p2pool.share1.pack(share.as_share1())
226                 for share_db in share_dbs:
227                     share_db[hash_data] = share1_data
228                 
229                 if chain is current_work.value['current_chain']:
230                     print 'Accepted share, passing to peers. Hash: %x' % (share.hash,)
231                     share_share2(share2, peer)
232                 else:
233                     print 'Accepted share to non-current chain. Hash: %x' % (share.hash,)
234             elif res == 'dup':
235                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
236             elif res == 'orphan':
237                 print 'Got share referencing unknown share, requesting past shares from peer. Hash: %x' % (share.hash,)
238                 if peer is None:
239                     raise ValueError()
240                 peer.send_gettobest(
241                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
242                     have=random.sample(chain.share2s.keys(), min(8, len(chain.share2s))) + [chain.share2s[chain.highest.value].share.hash] if chain.highest.value is not None else [],
243                 )
244             else:
245                 raise ValueError('unknown result from chain.accept - %r' % (res,))
246             
247             w = dict(current_work.value)
248             w['highest_p2pool_share2'] = w['current_chain'].get_highest_share2()
249             current_work.set(w)
250         
251         def p2p_share_hash(chain_id_data, hash, peer):
252             chain = get_chain(chain_id_data)
253             if chain is current_work.value['current_chain']:
254                 if hash not in chain.share2s:
255                     if hash not in chain.requesting:
256                         print "Got share hash, requesting! Hash: %x" % (hash,)
257                         peer.send_getshares(chain_id=p2pool.chain_id_type.unpack(chain_id_data), hashes=[hash])
258                         chain.requesting.add(hash)
259                         reactor.callLater(5, chain.requesting.remove, hash)
260                     else:
261                         print "Got share hash, already requested, ignoring. Hash: %x" % (hash,)
262                 else:
263                     print "Got share hash, already have, ignoring. Hash: %x" % (hash,)
264             else:
265                 print "Got share hash to non-current chain, storing. Hash: %x" % (hash,)
266                 if hash not in chain.request_map:
267                     chain.request_map[hash] = peer
268         
269         def p2p_get_to_best(chain_id_data, have, peer):
270             chain = get_chain(chain_id_data)
271             if chain.highest.value is None:
272                 return
273             
274             def get_down(share_hash):
275                 blocks = []
276                 
277                 while True:
278                     blocks.append(share_hash)
279                     if share_hash not in chain.share2s:
280                         break
281                     share2 = chain.share2s[share_hash]
282                     if share2.share.previous_share_hash is None:
283                         break
284                     share_hash = share2.share.previous_share_hash
285                 
286                 return blocks
287             
288             chain_hashes = get_down(chain.highest.value)
289             
290             have2 = set()
291             for hash_ in have:
292                 have2 |= set(get_down(hash_))
293             
294             for share_hash in reversed(chain_hashes):
295                 if share_hash in have2:
296                     continue
297                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
298         
299         def p2p_get_shares(chain_id_data, hashes, peer):
300             chain = get_chain(chain_id_data)
301             for hash_ in hashes:
302                 if hash_ in chain.share2s:
303                     peer.send_share(chain.share2s[hash_].share, full=True)
304         
305         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
306         
307         def parse(x):
308             if ':' in x:
309                 ip, port = x.split(':')
310                 return ip, int(port)
311             else:
312                 return ip, {False: 9333, True: 19333}[args.testnet]
313         
314         if args.testnet:
315             nodes = [('72.14.191.28', 19333)]
316         else:
317             nodes = [('72.14.191.28', 9333)]
318         
319         p2p_node = p2p.Node(
320             current_work=current_work,
321             port=args.p2pool_port,
322             testnet=args.testnet,
323             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(__file__), 'addrs.dat'), isolation_level=None), 'addrs'),
324             mode=0 if args.low_bandwidth else 1,
325             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
326         )
327         p2p_node.handle_share = p2p_share
328         p2p_node.handle_share_hash = p2p_share_hash
329         p2p_node.handle_get_to_best = p2p_get_to_best
330         p2p_node.handle_get_shares = p2p_get_shares
331         
332         p2p_node.start()
333         
334         # send share when the chain changes to their chain
335         def work_changed(new_work):
336             #print 'Work changed:', new_work
337             chain = new_work['current_chain']
338             for share2 in chain.share2s.itervalues():
339                 if not share2.shared:
340                     print 'Sharing share of switched to chain. Hash:', share2.share.hash
341                     share_share2(share2)
342             for hash, peer in chain.request_map.iteritems():
343                 if hash not in chain.share2s:
344                     peer.send_getshares(hashes=[hash])
345         current_work.changed.watch(work_changed)
346         
347         print '    ...success!'
348         print
349         
350         # start listening for workers with a JSON-RPC server
351         
352         print 'Listening for workers on port %i...' % (args.worker_port,)
353         
354         # setup worker logic
355         
356         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
357         
358         def compute(state):
359             generate_txn, shares = p2pool.generate_transaction(
360                 last_p2pool_block_hash=state['last_p2pool_block_hash'],
361                 previous_share2=state['highest_p2pool_share2'],
362                 add_script=my_script,
363                 subsidy=50*100000000 >> state['height']//210000,
364                 nonce=random.randrange(2**64),
365                 net=net,
366             )
367             print 'Generating, have', shares.count(my_script) - 2, 'share(s) in the current chain.'
368             transactions = [generate_txn] # XXX
369             merkle_root = bitcoin_p2p.merkle_hash(transactions)
370             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
371             ba = conv.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['bits'])
372             return ba.getwork(net.TARGET_MULTIPLIER)
373         
374         def got_response(data):
375             # match up with transactions
376             header = conv.decode_data(data)
377             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
378             if transactions is None:
379                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
380                 return False
381             share = p2pool.Share(header=header, txns=transactions)
382             print 'GOT SHARE! %x' % (share.hash,)
383             try:
384                 p2p_share(share)
385             except:
386                 print
387                 print 'Error processing data received from worker:'
388                 traceback.print_exc()
389                 print
390                 return False
391             else:
392                 return True
393         
394         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
395         
396         print '    ...success!'
397         print
398         
399         # done!
400         
401         print 'Started successfully!'
402         print
403         
404         while True:
405             yield set_real_work()
406             yield util.sleep(1)
407     except:
408         print
409         print 'Fatal error:'
410         traceback.print_exc()
411         print
412     reactor.stop()
413
414 if __name__ == '__main__':
415     try:
416         __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
417     except:
418         __version__ = 'unknown'
419     
420     name = 'p2pool (version %s)' % (__version__,)
421     
422     parser = argparse.ArgumentParser(description=name)
423     parser.add_argument('--version', action='version', version=__version__)
424     parser.add_argument('-t', '--testnet',
425         help='use the testnet; make sure you change the ports too',
426         action='store_true', default=False, dest='testnet')
427     parser.add_argument('-s', '--store-shares', metavar='FILENAME',
428         help='write shares to a database (not needed for normal usage)',
429         type=str, action='append', default=[], dest='store_shares')
430     
431     p2pool_group = parser.add_argument_group('p2pool interface')
432     p2pool_group.add_argument('-p', '--p2pool-port', metavar='PORT',
433         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
434         type=int, action='store', default=None, dest='p2pool_port')
435     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
436         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
437         type=str, action='append', default=[], dest='p2pool_nodes')
438     parser.add_argument('-l', '--low-bandwidth',
439         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
440         action='store_true', default=False, dest='low_bandwidth')
441     
442     worker_group = parser.add_argument_group('worker interface')
443     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
444         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
445         type=int, action='store', default=9332, dest='worker_port')
446     
447     bitcoind_group = parser.add_argument_group('bitcoind interface')
448     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
449         help='connect to a bitcoind at this address (default: 127.0.0.1)',
450         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
451     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
452         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
453         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
454     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
455         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
456         type=int, action='store', default=None, dest='bitcoind_p2p_port')
457     
458     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
459         help='bitcoind RPC interface username',
460         type=str, action='store', dest='bitcoind_rpc_username')
461     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
462         help='bitcoind RPC interface password',
463         type=str, action='store', dest='bitcoind_rpc_password')
464     
465     args = parser.parse_args()
466     
467     if args.bitcoind_p2p_port is None:
468         args.bitcoind_p2p_port = {False: 8333, True: 18333}[args.testnet]
469     
470     if args.p2pool_port is None:
471         args.p2pool_port = {False: 9333, True: 19333}[args.testnet]
472     
473     reactor.callWhenRunning(main, args)
474     reactor.run()