share sharing seems to be working
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13 import traceback
14
15 from twisted.internet import defer, reactor
16 from twisted.web import server
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 class Chain(object):
29     def __init__(self, chain_id_data):
30         assert False
31         self.chain_id_data = chain_id_data
32         self.last_p2pool_block_hash = p2pool.chain_id_type.unpack(chain_id_data)['last_p2pool_block_hash']
33         
34         self.share2s = {} # hash -> share2
35         self.highest = variable.Variable(None) # hash
36         
37         self.requesting = set()
38         self.request_map = {}
39     
40     def accept(self, share, net):
41         if share.chain_id_data != self.chain_id_data:
42             raise ValueError('share does not belong to this chain')
43         
44         if share.hash in self.share2s:
45             return 'dup'
46         
47         if share.previous_share_hash is None:
48             previous_height, previous_share2 = -1, None
49         elif share.previous_share_hash not in self.share2s:
50             return 'orphan'
51         else:
52             previous_share2 = self.share2s[share.previous_share_hash]
53             previous_height = previous_share2.height
54         
55         height = previous_height + 1
56         
57         share2 = share.check(self, height, previous_share2, net) # raises exceptions
58         
59         if share2.share is not share:
60             raise ValueError()
61         
62         self.share2s[share.hash] = share2
63         
64         if self.highest.value is None or height > self.share2s[self.highest.value].height:
65             self.highest.set(share.hash)
66         
67         return 'good'
68     
69     def get_highest_share2(self):
70         return self.share2s[self.highest.value] if self.highest.value is not None else None
71     
72     def get_down(self, share_hash):
73         blocks = []
74         
75         while True:
76             blocks.append(share_hash)
77             if share_hash not in self.share2s:
78                 break
79             share2 = self.share2s[share_hash]
80             if share2.share.previous_share_hash is None:
81                 break
82             share_hash = share2.share.previous_share_hash
83         
84         return blocks
85
86 @defer.inlineCallbacks
87 def get_last_p2pool_block_hash(current_block_hash, get_block, net):
88     block_hash = current_block_hash
89     while True:
90         if block_hash == net.ROOT_BLOCK:
91             defer.returnValue(block_hash)
92         try:
93             block = yield get_block(block_hash)
94         except:
95             traceback.print_exc()
96             continue
97         coinbase_data = block['txs'][0]['tx_ins'][0]['script']
98         try:
99             coinbase = p2pool.coinbase_type.unpack(coinbase_data)
100         except bitcoin.data.EarlyEnd:
101             pass
102         else:
103             try:
104                 if coinbase['identifier'] == net.IDENTIFIER:
105                     payouts = {}
106                     for tx_out in block['txs'][0]['tx_outs']:
107                         payouts[tx_out['script']] = payouts.get(tx_out['script'], 0) + tx_out['value']
108                     subsidy = sum(payouts.itervalues())
109                     if coinbase['subsidy'] == subsidy:
110                         if payouts.get(net.SCRIPT, 0) >= subsidy//64:
111                             defer.returnValue(block_hash)
112             except Exception:
113                 print
114                 print 'Error matching block:'
115                 print 'block:', block
116                 traceback.print_exc()
117                 print
118         block_hash = block['header']['previous_block']
119
120 @deferral.retry('Error getting work from bitcoind:', 1)
121 @defer.inlineCallbacks
122 def getwork(bitcoind):
123     # a block could arrive in between these two queries
124     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
125     try:
126         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
127     finally:
128         # get rid of residual errors
129         getwork_df.addErrback(lambda fail: None)
130         height_df.addErrback(lambda fail: None)
131     defer.returnValue((getwork, height))
132
133
134 @defer.inlineCallbacks
135 def main(args):
136     try:
137         print 'p2pool (version %s)' % (__version__,)
138         print
139         
140         # connect to bitcoind over JSON-RPC and do initial getwork
141         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
142         print "Testing bitcoind RPC connection to '%s' with authorization '%s:%s'..." % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
143         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
144         
145         work, height = yield getwork(bitcoind)
146         
147         print '    ...success!'
148         print '    Current block hash: %x height: %i' % (work.previous_block, height)
149         print
150         
151         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
152         print "Testing bitcoind P2P connection to '%s:%s'..." % (args.bitcoind_address, args.bitcoind_p2p_port)
153         factory = bitcoin.p2p.ClientFactory(args.net)
154         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
155         
156         while True:
157             try:
158                 res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
159                 if res['reply'] != 'success':
160                     print
161                     print 'Error getting payout script:'
162                     print res
163                     print
164                     continue
165                 my_script = res['script']
166             except:
167                 print
168                 print 'Error getting payout script:'
169                 traceback.print_exc()
170                 print
171             else:
172                 break
173             yield deferral.sleep(1)
174         
175         print '    ...success!'
176         print '    Payout script:', my_script.encode('hex')
177         print
178         
179         @defer.inlineCallbacks
180         def real_get_block(block_hash):
181             block = yield (yield factory.getProtocol()).get_block(block_hash)
182             print 'Got block %x' % (block_hash,)
183             defer.returnValue(block)
184         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
185         
186         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
187         
188         tracker = p2pool.OkayTracker(args.net)
189         chains = expiring_dict.ExpiringDict(300)
190         def get_chain(chain_id_data):
191             return chains.setdefault(chain_id_data, Chain(chain_id_data))
192         
193         # information affecting work that should trigger a long-polling update
194         current_work = variable.Variable(None)
195         # information affecting work that should not trigger a long-polling update
196         current_work2 = variable.Variable(None)
197         
198         @defer.inlineCallbacks
199         def set_real_work():
200             work, height = yield getwork(bitcoind)
201             best, desired = tracker.think()
202             # XXX desired?
203             current_work.set(dict(
204                 version=work.version,
205                 previous_block=work.previous_block,
206                 target=work.target,
207                 
208                 height=height + 1,
209                 
210                 best_share_hash=best,
211             ))
212             current_work2.set(dict(
213                 timestamp=work.timestamp,
214             ))
215         
216         print 'Initializing work...'
217         yield set_real_work()
218         print '    ...success!'
219         
220         # setup p2p logic and join p2pool network
221         
222         def share_share(share, ignore_peer=None):
223             for peer in p2p_node.peers.itervalues():
224                 if peer is ignore_peer:
225                     continue
226                 peer.send_share(share)
227             share.flag_shared()
228         
229         def p2p_share(share, peer=None):
230             if share.hash <= share.header['target']:
231                 print
232                 print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
233                 print
234                 if factory.conn is not None:
235                     factory.conn.send_block(block=share.as_block())
236                 else:
237                     print 'No bitcoind connection! Erp!'
238             
239             print "Received share %x" % (share.hash,)
240             
241             tracker.add(share)
242             best, desired = tracker.think()
243             for peer2, share_hash in desired:
244                 print "Requesting parent share %x" % (share_hash,)
245                 peer2.send_getshares(hashes=[share_hash])
246             
247             w = dict(current_work.value)
248             w['best_share_hash'] = best
249             current_work.set(w)
250             '''
251             if res == 'good':
252                 share2 = chain.share2s[share.hash]
253                 
254                 if chain is current_work.value['current_chain']:
255                     if share.hash == chain.highest.value:
256                         print 'Accepted share, passing to peers. Height: %i Hash: %x Script: %s' % (share2.height, share.hash, share2.shares[-1].encode('hex'))
257                         share_share2(share2, peer)
258                     else:
259                         print 'Accepted share, not highest. Height: %i Hash: %x' % (share2.height, share.hash,)
260                 else:
261                     print 'Accepted share to non-current chain. Height: %i Hash: %x' % (share2.height, share.hash,)
262             elif res == 'dup':
263                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
264             elif res == 'orphan':
265                 print 'Got share referencing unknown share, requesting past shares from peer. Hash: %x' % (share.hash,)
266                 if peer is None:
267                     raise ValueError()
268                 peer.send_gettobest(
269                     chain_id=p2pool.chain_id_type.unpack(share.chain_id_data),
270                     have=random.sample(chain.share2s.keys(), min(8, len(chain.share2s))) + [chain.share2s[chain.highest.value].share.hash] if chain.highest.value is not None else [],
271                 )
272             else:
273                 raise ValueError('unknown result from chain.accept - %r' % (res,))
274             
275             w = dict(current_work.value)
276             w['best_share_hash'] = w['current_chain'].get_highest_share_hash()
277             current_work.set(w)
278             '''
279         
280         def p2p_share_hash(share_hash, peer):
281             if share_hash in tracker.shares:
282                 print "Got share hash, already have, ignoring. Hash: %x" % (share_hash,)
283             else:
284                 print "Got share hash, requesting! Hash: %x" % (share_hash,)
285                 peer.send_getshares(hashes=[share_hash])
286         
287         def p2p_get_to_best(chain_id_data, have, peer):
288             chain = get_chain(chain_id_data)
289             if chain.highest.value is None:
290                 return
291             
292             chain_hashes = chain.get_down(chain.highest.value)
293             
294             have2 = set()
295             for hash_ in have:
296                 have2 |= set(chain.get_down(hash_))
297             
298             for share_hash in reversed(chain_hashes):
299                 if share_hash in have2:
300                     continue
301                 peer.send_share(chain.share2s[share_hash].share, full=True) # doesn't have to be full ... but does that still guarantee ordering?
302         
303         def p2p_get_shares(share_hashes, peer):
304             for share_hash in share_hashes:
305                 if share_hash in tracker.shares:
306                     peer.send_share(tracker.shares[share_hash], full=True)
307         
308         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
309         
310         def parse(x):
311             if ':' in x:
312                 ip, port = x.split(':')
313                 return ip, int(port)
314             else:
315                 return x, args.net.P2P_PORT
316         
317         nodes = [('72.14.191.28', args.net.P2P_PORT)]
318         try:
319             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
320         except:
321             traceback.print_exc()
322         
323         p2p_node = p2p.Node(
324             current_work=current_work,
325             port=args.p2pool_port,
326             net=args.net,
327             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
328             mode=0 if args.low_bandwidth else 1,
329             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
330         )
331         p2p_node.handle_share = p2p_share
332         p2p_node.handle_share_hash = p2p_share_hash
333         p2p_node.handle_get_to_best = p2p_get_to_best
334         p2p_node.handle_get_shares = p2p_get_shares
335         
336         p2p_node.start()
337         
338         # send share when the chain changes to their chain
339         def work_changed(new_work):
340             #print 'Work changed:', new_work
341             for share in tracker.get_chain_known(new_work['best_share_hash']):
342                 if share.shared:
343                     break
344                 share_share(share)
345             return
346             chain = new_work['current_chain']
347             if chain.highest.value is not None:
348                 for share_hash in chain.get_down(chain.highest.value):
349                     share2 = chain.share2s[share_hash]
350                     if not share2.shared:
351                         print 'Sharing share of switched to chain. Hash:', share2.share.hash
352                         share_share2(share2)
353             # XXX ???
354             for hash, peers in chain.request_map.iteritems():
355                 if hash not in chain.share2s:
356                     random.choice(peers).send_getshares(hashes=[hash])
357         current_work.changed.watch(work_changed)
358         
359         print '    ...success!'
360         print
361         
362         # start listening for workers with a JSON-RPC server
363         
364         print 'Listening for workers on port %i...' % (args.worker_port,)
365         
366         # setup worker logic
367         
368         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
369         
370         def compute(state):
371             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
372             generate_tx = p2pool.generate_transaction(
373                 tracker=tracker,
374                 previous_share_hash=state['best_share_hash'],
375                 new_script=my_script,
376                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
377                 nonce=struct.pack("<Q", random.randrange(2**64)),
378                 block_target=state['target'],
379                 net=args.net,
380             )
381             print 'Generating!' #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
382             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
383             merkle_root = bitcoin.data.merkle_hash(transactions)
384             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
385             
386             timestamp = current_work2.value['timestamp']
387             if state['best_share_hash'] is not None:
388                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
389                 if timestamp2 > timestamp:
390                     print "Toff", timestamp2 - timestamp
391                     timestamp = timestamp2
392             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
393             #print "SENT", 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
394             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
395         
396         def got_response(data):
397             # match up with transactions
398             header = bitcoin.getwork.decode_data(data)
399             transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
400             if transactions is None:
401                 print "Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool"
402                 return False
403             share = p2pool.Share.from_block(dict(header=header, txs=transactions))
404             print 'GOT SHARE! %x' % (share.hash,)
405             try:
406                 p2p_share(share)
407             except:
408                 print
409                 print 'Error processing data received from worker:'
410                 traceback.print_exc()
411                 print
412                 return False
413             else:
414                 return True
415         
416         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
417         
418         print '    ...success!'
419         print
420         
421         # done!
422         
423         def get_blocks(start_hash):
424             while True:
425                 try:
426                     block = get_block.call_now(start_hash)
427                 except deferral.NotNowError:
428                     break
429                 yield start_hash, block
430                 start_hash = block['header']['previous_block']
431         
432         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
433         
434         class Tx(object):
435             def __init__(self, tx, seen_at_block):
436                 self.hash = bitcoin.data.tx_type.hash256(tx)
437                 self.tx = tx
438                 self.seen_at_block = seen_at_block
439                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
440                 #print
441                 #print "%x %r" % (seen_at_block, tx)
442                 #for mention in self.mentions:
443                 #    print "%x" % mention
444                 #print
445                 self.parents_all_in_blocks = False
446                 self.value_in = 0
447                 #print self.tx
448                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
449                 self._find_parents_in_blocks()
450             
451             @defer.inlineCallbacks
452             def _find_parents_in_blocks(self):
453                 for tx_in in self.tx['tx_ins']:
454                     try:
455                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
456                     except Exception:
457                         return
458                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
459                     #print raw_transaction
460                     if not raw_transaction['parent_blocks']:
461                         return
462                 self.parents_all_in_blocks = True
463             
464             def is_good(self):
465                 if not self.parents_all_in_blocks:
466                     return False
467                 x = self.is_good2()
468                 #print "is_good:", x
469                 return x
470             
471             def is_good2(self):
472                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
473                     if block_hash == self.seen_at_block:
474                         return True
475                     for tx in block['txs']:
476                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
477                         if mentions & self.mentions:
478                             return False
479                 return False
480         
481         @defer.inlineCallbacks
482         def new_tx(tx_hash):
483             try:
484                 assert isinstance(tx_hash, (int, long))
485                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
486                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
487             except:
488                 traceback.print_exc()
489         factory.new_tx.watch(new_tx)
490         
491         def new_block(block):
492             set_real_work()
493         factory.new_block.watch(new_block)
494         
495         print 'Started successfully!'
496         print
497         
498         while True:
499             yield deferral.sleep(1)
500             set_real_work()
501     except:
502         print
503         print 'Fatal error:'
504         traceback.print_exc()
505         print
506         reactor.stop()
507
508 def run():
509     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
510     parser.add_argument('--version', action='version', version=__version__)
511     parser.add_argument('--testnet',
512         help='use the testnet; make sure you change the ports too',
513         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
514     
515     p2pool_group = parser.add_argument_group('p2pool interface')
516     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
517         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
518         type=int, action='store', default=None, dest='p2pool_port')
519     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
520         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
521         type=str, action='append', default=[], dest='p2pool_nodes')
522     parser.add_argument('-l', '--low-bandwidth',
523         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
524         action='store_true', default=False, dest='low_bandwidth')
525     
526     worker_group = parser.add_argument_group('worker interface')
527     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
528         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
529         type=int, action='store', default=9332, dest='worker_port')
530     
531     bitcoind_group = parser.add_argument_group('bitcoind interface')
532     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
533         help='connect to a bitcoind at this address (default: 127.0.0.1)',
534         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
535     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
536         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
537         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
538     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
539         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
540         type=int, action='store', default=None, dest='bitcoind_p2p_port')
541     
542     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
543         help='bitcoind RPC interface username',
544         type=str, action='store', dest='bitcoind_rpc_username')
545     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
546         help='bitcoind RPC interface password',
547         type=str, action='store', dest='bitcoind_rpc_password')
548     
549     args = parser.parse_args()
550     
551     if args.bitcoind_p2p_port is None:
552         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
553     
554     if args.p2pool_port is None:
555         args.p2pool_port = args.net.P2P_PORT
556     
557     reactor.callWhenRunning(main, args)
558     reactor.run()