working ... sorry for bad log messages\!
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import traceback
14
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 class Chain(object):
29     def __init__(self, chain_id_data):
30         assert False
31         self.chain_id_data = chain_id_data
32         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
33         
34         self.share2s = {} # hash -> share2
35         self.highest = variable.Variable(None) # hash
36         
37         self.requesting = set()
38         self.request_map = {}
39     
40     def accept(self, share, net):
41         if share.chain_id_data != self.chain_id_data:
42             raise ValueError('share does not belong to this chain')
43         
44         if share.hash in self.share2s:
45             return 'dup'
46         
47         if share.previous_share_hash is None:
48             previous_height, previous_share2 = -1, None
49         elif share.previous_share_hash not in self.share2s:
50             return 'orphan'
51         else:
52             previous_share2 = self.share2s[share.previous_share_hash]
53             previous_height = previous_share2.height
54         
55         height = previous_height + 1
56         
57         share2 = share.check(self, height, previous_share2, net) # raises exceptions
58         
59         if share2.share is not share:
60             raise ValueError()
61         
62         self.share2s[share.hash] = share2
63         
64         if self.highest.value is None or height > self.share2s[self.highest.value].height:
65             self.highest.set(share.hash)
66         
67         return 'good'
68     
69     def get_highest_share2(self):
70         return self.share2s[self.highest.value] if self.highest.value is not None else None
71     
72     def get_down(self, share_hash):
73         blocks = []
74         
75         while True:
76             blocks.append(share_hash)
77             if share_hash not in self.share2s:
78                 break
79             share2 = self.share2s[share_hash]
80             if share2.share.previous_share_hash is None:
81                 break
82             share_hash = share2.share.previous_share_hash
83         
84         return blocks
85
86 @defer.inlineCallbacks
87 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
88     block_hash = current_block_hash
89     while True:
90         if block_hash == net.ROOT_BLOCK:
91             defer.returnValue(block_hash)
92         try:
93             block = yield get_block(block_hash)
94         except:
95             traceback.print_exc()
96             continue
97         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
98         try:
99             coinbase = p2pool.coinbase_type.unpack(coinbase_data)
100         except bitcoin.data.EarlyEnd:
101             pass
102         else:
103             try:
104                 if coinbase['identifier'] == net.IDENTIFIER:
105                     payouts = {}
106                     for tx_out in block['txs'][0]['tx_outs']:
107                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
108                     subsidy = sum(payouts.itervalues())
109                     if coinbase['subsidy'] == subsidy:
110                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
111                             defer.returnValue(block_hash)
112             except Exception:
113                 print
114                 print 'Error matching block:'
115                 print 'block:', block
116                 traceback.print_exc()
117                 print
118         block_hash = block['header']['previous_block']
119
120 @deferral.retry('Error getting work from bitcoind:', 1)
121 @defer.inlineCallbacks
122 def getwork(bitcoind):
123     # a block could arrive in between these two queries
124     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
125     try:
126         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
127     finally:
128         # get rid of residual errors
129         getwork_df.addErrback(lambda fail: None)
130         height_df.addErrback(lambda fail: None)
131     defer.returnValue((getwork, height))
132
133
134 @defer.inlineCallbacks
135 def main(args):
136     try:
137         print 'p2pool (version %s)' % (__version__,)
138         print
139         
140         # connect to bitcoind over JSON-RPC and do initial getwork
141         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
142         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
143         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
144         
145         work, height = yield getwork(bitcoind)
146         
147         print '    ...success!'
148         print '    Current block hash: %x height: %i' % (work.previous_block, height)
149         print
150         
151         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
152         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
153         factory = bitcoin.p2p.ClientFactory(args.net)
154         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
155         
156         while True:
157             try:
158                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
159                 if res['reply'] != 'success':
160                     print
161                     print 'Error getting payout script:'
162                     print res
163                     print
164                     continue
165                 my_script = res['script']
166             except:
167                 print
168                 print 'Error getting payout script:'
169                 traceback.print_exc()
170                 print
171             else:
172                 break
173             yield deferral.sleep(1)
174         
175         print '    ...success!'
176         print '    Payout script:', my_script.encode('hex')
177         print
178         
179         @defer.inlineCallbacks
180         def real_get_block(block_hash):
181             block = yield (yield factory.getProtocol()).get_block(block_hash)
182             print 'Got block %x' % (block_hash,)
183             defer.returnValue(block)
184         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
185         
186         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
187         
188         tracker = p2pool.Tracker()
189         chains = expiring_dict.ExpiringDict(300)
190         def get_chain(chain_id_data):
191             return chains.setdefault(chain_id_data, Chain(chain_id_data))
192         
193         # information affecting work that should trigger a long-polling update
194         current_work = variable.Variable(None)
195         # information affecting work that should not trigger a long-polling update
196         current_work2 = variable.Variable(None)
197         
198         share_dbs = [db.SQLiteDict(sqlite3.connect(filename, isolation_level=None), 'shares') for filename in args.store_shares]
199         
200         @defer.inlineCallbacks
201         def set_real_work():
202             work, height = yield getwork(bitcoind)
203             current_work.set(dict(
204                 version=work.version,
205                 previous_block=work.previous_block,
206                 target=work.target,
207                 
208                 height=height + 1,
209                 
210                 highest_p2pool_share_hash=tracker.get_best_share_hash(),
211             ))
212             current_work2.set(dict(
213                 timestamp=work.timestamp,
214             ))
215         
216         print 'Initializing work...'
217         yield set_real_work()
218         print '    ...success!'
219         
220         # setup p2p logic and join p2pool network
221         
222         def share_share2(share2, ignore_peer=None):
223             for peer in p2p_node.peers.itervalues():
224                 if peer is ignore_peer:
225                     continue
226                 peer.send_share(share2.share)
227             share2.flag_shared()
228         
229         def p2p_share(share, peer=None):
230             if share.hash <= share.header['target']:
231                 print
232                 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
233                 #print share.__dict__
234                 print
235                 if factory.conn is not None:
236                     factory.conn.send_block(block=share.as_block())
237                 else:
238                     print 'No bitcoind connection! Erp!'
239             
240             res = tracker.add_share(share)
241             if res == 'good':
242                 share2 = chain.share2s[share.hash]
243                 
244                 def save():
245                     hash_data = bitcoin.p2p.HashType().pack(share.hash)
246                     share1_data = p2pool.share1.pack(share.as_share1())
247                     for share_db in share_dbs:
248                         share_db[hash_data] = share1_data
249                 reactor.callLater(1, save)
250                 
251                 if chain is current_work.value['current_chain']:
252                     if share.hash == chain.highest.value:
253                         print 'Accepted share, passing to peers. Height: %i Hash: %x Script: %s' % (share2.height, share.hash, share2.shares[-1].encode('hex'))
254                         share_share2(share2, peer)
255                     else:
256                         print 'Accepted share, not highest. Height: %i Hash: %x' % (share2.height, share.hash,)
257                 else:
258                     print 'Accepted share to non-current chain. Height: %i Hash: %x' % (share2.height, share.hash,)
259             elif res == 'dup':
260                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
261             elif res == 'orphan':
262                 print 'Got share referencing unknown share, requesting past shares from peer. Hash: %x' % (share.hash,)
263                 if peer is None:
264                     raise ValueError()
265                 peer.send_gettobest(
266                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
267                     have=random.sample(chain.share2s.keys(), min(8, len(chain.share2s))) + [chain.share2s[chain.highest.value].share.hash] if chain.highest.value is not None else [],
268                 )
269             else:
270                 raise ValueError('unknown result from chain.accept - %r' % (res,))
271             
272             w = dict(current_work.value)
273             w['highest_p2pool_share_hash'] = w['current_chain'].get_highest_share_hash()
274             current_work.set(w)
275         
276         def p2p_share_hash(chain_id_data, hash, peer):
277             chain = get_chain(chain_id_data)
278             if chain is current_work.value['current_chain']:
279                 if hash not in chain.share2s:
280                     print "Got share hash, requesting! Hash: %x" % (hash,)
281                     peer.send_getshares(chain_id=p2pool.chain_id_type.unpack(chain_id_data), hashes=[hash])
282                 else:
283                     print "Got share hash, already have, ignoring. Hash: %x" % (hash,)
284             else:
285                 print "Got share hash to non-current chain, storing. Hash: %x" % (hash,)
286                 chain.request_map.setdefault(hash, []).append(peer)
287         
288         def p2p_get_to_best(chain_id_data, have, peer):
289             chain = get_chain(chain_id_data)
290             if chain.highest.value is None:
291                 return
292             
293             chain_hashes = chain.get_down(chain.highest.value)
294             
295             have2 = set()
296             for hash_ in have:
297                 have2 |= set(chain.get_down(hash_))
298             
299             for share_hash in reversed(chain_hashes):
300                 if share_hash in have2:
301                     continue
302                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
303         
304         def p2p_get_shares(chain_id_data, hashes, peer):
305             chain = get_chain(chain_id_data)
306             for hash_ in hashes:
307                 if hash_ in chain.share2s:
308                     peer.send_share(chain.share2s[hash_].share, full=True)
309         
310         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
311         
312         def parse(x):
313             if ':' in x:
314                 ip, port = x.split(':')
315                 return ip, int(port)
316             else:
317                 return x, args.net.P2P_PORT
318         
319         nodes = [('72.14.191.28', args.net.P2P_PORT)]
320         try:
321             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
322         except:
323             traceback.print_exc()
324         
325         p2p_node = p2p.Node(
326             current_work=current_work,
327             port=args.p2pool_port,
328             net=args.net,
329             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
330             mode=0 if args.low_bandwidth else 1,
331             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
332         )
333         p2p_node.handle_share = p2p_share
334         p2p_node.handle_share_hash = p2p_share_hash
335         p2p_node.handle_get_to_best = p2p_get_to_best
336         p2p_node.handle_get_shares = p2p_get_shares
337         
338         p2p_node.start()
339         
340         # send share when the chain changes to their chain
341         def work_changed(new_work):
342             #print 'Work changed:', new_work
343             chain = new_work['current_chain']
344             if chain.highest.value is not None:
345                 for share_hash in chain.get_down(chain.highest.value):
346                     share2 = chain.share2s[share_hash]
347                     if not share2.shared:
348                         print 'Sharing share of switched to chain. Hash:', share2.share.hash
349                         share_share2(share2)
350             for hash, peers in chain.request_map.iteritems():
351                 if hash not in chain.share2s:
352                     random.choice(peers).send_getshares(hashes=[hash])
353         current_work.changed.watch(work_changed)
354         
355         print '    ...success!'
356         print
357         
358         # start listening for workers with a JSON-RPC server
359         
360         print 'Listening for workers on port %i...' % (args.worker_port,)
361         
362         # setup worker logic
363         
364         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
365         
366         def compute(state):
367             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
368             generate_tx = p2pool.generate_transaction(
369                 tracker=tracker,
370                 previous_share_hash=state['highest_p2pool_share_hash'],
371                 new_script=my_script,
372                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
373                 nonce=struct.pack("<Q", random.randrange(2**64)),
374                 block_target=state['target'],
375                 net=args.net,
376             )
377             print 'Generating!' #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
378             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
379             merkle_root = bitcoin.data.merkle_hash(transactions)
380             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
381             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['timestamp'], state['target'])
382             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
383         
384         def got_response(data):
385             # match up with transactions
386             header = bitcoin.getwork.decode_data(data)
387             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
388             if transactions is None:
389                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
390                 return False
391             share = p2pool.Share.from_block(dict(header=header, txs=transactions))
392             print 'GOT SHARE! %x' % (share.hash,)
393             try:
394                 p2p_share(share)
395             except:
396                 print
397                 print 'Error processing data received from worker:'
398                 traceback.print_exc()
399                 print
400                 return False
401             else:
402                 return True
403         
404         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
405         
406         print '    ...success!'
407         print
408         
409         # done!
410         
411         def get_blocks(start_hash):
412             while True:
413                 try:
414                     block = get_block.call_now(start_hash)
415                 except deferral.NotNowError:
416                     break
417                 yield start_hash, block
418                 start_hash = block['header']['previous_block']
419         
420         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
421         
422         class Tx(object):
423             def __init__(self, tx, seen_at_block):
424                 self.hash = bitcoin.data.tx_type.hash256(tx)
425                 self.tx = tx
426                 self.seen_at_block = seen_at_block
427                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
428                 #print
429                 #print "%x %r" % (seen_at_block, tx)
430                 #for mention in self.mentions:
431                 #    print "%x" % mention
432                 #print
433                 self.parents_all_in_blocks = False
434                 self.value_in = 0
435                 #print self.tx
436                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
437                 self._find_parents_in_blocks()
438             
439             @defer.inlineCallbacks
440             def _find_parents_in_blocks(self):
441                 for tx_in in self.tx['tx_ins']:
442                     try:
443                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
444                     except Exception:
445                         return
446                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
447                     #print raw_transaction
448                     if not raw_transaction['parent_blocks']:
449                         return
450                 self.parents_all_in_blocks = True
451             
452             def is_good(self):
453                 if not self.parents_all_in_blocks:
454                     return False
455                 x = self.is_good2()
456                 #print "is_good:", x
457                 return x
458             
459             def is_good2(self):
460                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
461                     if block_hash == self.seen_at_block:
462                         return True
463                     for tx in block['txs']:
464                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
465                         if mentions & self.mentions:
466                             return False
467                 return False
468         
469         @defer.inlineCallbacks
470         def new_tx(tx_hash):
471             assert isinstance(tx_hash, (int, long))
472             tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
473             tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
474         factory.new_tx.watch(new_tx)
475         
476         def new_block(block):
477             set_real_work()
478         factory.new_block.watch(new_block)
479         
480         print 'Started successfully!'
481         print
482     except:
483         print
484         print 'Fatal error:'
485         traceback.print_exc()
486         print
487         reactor.stop()
488
489 def run():
490     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
491     parser.add_argument('--version', action='version', version=__version__)
492     parser.add_argument('--testnet',
493         help='use the testnet; make sure you change the ports too',
494         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
495     parser.add_argument('--store-shares', metavar='FILENAME',
496         help='write shares to a database (not needed for normal usage)',
497         type=str, action='append', default=[], dest='store_shares')
498     
499     p2pool_group = parser.add_argument_group('p2pool interface')
500     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
501         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
502         type=int, action='store', default=None, dest='p2pool_port')
503     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
504         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
505         type=str, action='append', default=[], dest='p2pool_nodes')
506     parser.add_argument('-l', '--low-bandwidth',
507         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
508         action='store_true', default=False, dest='low_bandwidth')
509     
510     worker_group = parser.add_argument_group('worker interface')
511     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
512         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
513         type=int, action='store', default=9332, dest='worker_port')
514     
515     bitcoind_group = parser.add_argument_group('bitcoind interface')
516     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
517         help='connect to a bitcoind at this address (default: 127.0.0.1)',
518         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
519     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
520         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
521         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
522     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
523         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
524         type=int, action='store', default=None, dest='bitcoind_p2p_port')
525     
526     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
527         help='bitcoind RPC interface username',
528         type=str, action='store', dest='bitcoind_rpc_username')
529     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
530         help='bitcoind RPC interface password',
531         type=str, action='store', dest='bitcoind_rpc_password')
532     
533     args = parser.parse_args()
534     
535     if args.bitcoind_p2p_port is None:
536         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
537     
538     if args.p2pool_port is None:
539         args.p2pool_port = args.net.P2P_PORT
540     
541     reactor.callWhenRunning(main, args)
542     reactor.run()