share combining
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 @deferral.retry('Error getting work from bitcoind:', 1)
29 @defer.inlineCallbacks
30 def getwork(bitcoind):
31     # a block could arrive in between these two queries
32     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33     try:
34         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35     finally:
36         # get rid of residual errors
37         getwork_df.addErrback(lambda fail: None)
38         height_df.addErrback(lambda fail: None)
39     defer.returnValue((getwork, height))
40
41 @deferral.retry('Error getting payout script from bitcoind:', 1)
42 @defer.inlineCallbacks
43 def get_payout_script(factory):
44     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
45     if res['reply'] == 'success':
46         my_script = res['script']
47     elif res['reply'] == 'denied':
48         my_script = None
49     else:
50         raise ValueError('Unexpected reply: %r' % (res,))
51
52 @deferral.retry('Error creating payout script:', 10)
53 @defer.inlineCallbacks
54 def get_payout_script2(bitcoind, net):
55     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getnewaddress()), net)))
56
57 @defer.inlineCallbacks
58 def main(args):
59     try:
60         print 'p2pool (version %s)' % (__version__,)
61         print
62         
63         # connect to bitcoind over JSON-RPC and do initial getwork
64         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
65         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
66         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
67         work, height = yield getwork(bitcoind)
68         print '    ...success!'
69         print '    Current block hash: %x height: %i' % (work.previous_block, height)
70         print
71         
72         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
73         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74         factory = bitcoin.p2p.ClientFactory(args.net)
75         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76         my_script = yield get_payout_script(factory)
77         if my_script is None:
78             print 'IP transaction denied ... falling back to sending to address. Enable IP transactions on your bitcoind!'
79             my_script = yield get_payout_script2(bitcoind, args.net)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         @defer.inlineCallbacks
85         def real_get_block(block_hash):
86             block = yield (yield factory.getProtocol()).get_block(block_hash)
87             print 'Got block %x' % (block_hash,)
88             defer.returnValue(block)
89         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90         
91         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92         
93         ht = bitcoin.p2p.HeightTracker(factory)
94         
95         tracker = p2pool.OkayTracker(args.net)
96         chains = expiring_dict.ExpiringDict(300)
97         def get_chain(chain_id_data):
98             return chains.setdefault(chain_id_data, Chain(chain_id_data))
99         
100         # information affecting work that should trigger a long-polling update
101         current_work = variable.Variable(None)
102         # information affecting work that should not trigger a long-polling update
103         current_work2 = variable.Variable(None)
104         
105         @defer.inlineCallbacks
106         def set_real_work():
107             work, height = yield getwork(bitcoind)
108             best, desired = tracker.think(ht)
109             for peer2, share_hash in desired:
110                 print 'Requesting parent share %x' % (share_hash,)
111                 peer2.send_getshares(
112                     hashes=[share_hash],
113                     parents=2000,
114                     stops=list(set(tracker.heads) | set(
115                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
116                     )),
117                 )
118             current_work.set(dict(
119                 version=work.version,
120                 previous_block=work.previous_block,
121                 target=work.target,
122                 
123                 height=height + 1,
124                 
125                 best_share_hash=best,
126             ))
127             current_work2.set(dict(
128                 timestamp=work.timestamp,
129             ))
130         
131         print 'Initializing work...'
132         yield set_real_work()
133         print '    ...success!'
134         
135         # setup p2p logic and join p2pool network
136         
137         def share_share(share, ignore_peer=None):
138             for peer in p2p_node.peers.itervalues():
139                 if peer is ignore_peer:
140                     continue
141                 peer.send_shares([share])
142             share.flag_shared()
143         
144         def p2p_share(share, peer=None):
145             if share.hash in tracker.shares:
146                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
147                 return
148             
149             #print 'Received share %x' % (share.hash,)
150             
151             tracker.add(share)
152             best, desired = tracker.think(ht)
153             #for peer2, share_hash in desired:
154             #    print 'Requesting parent share %x' % (share_hash,)
155             #    peer2.send_getshares(hashes=[share_hash], parents=2000)
156             
157             if share.gentx is not None:
158                 if share.hash <= share.header['target']:
159                     print
160                     print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
161                     print
162                     if factory.conn.value is not None:
163                         factory.conn.value.send_block(block=share.as_block())
164                     else:
165                         print 'No bitcoind connection! Erp!'
166             
167             w = dict(current_work.value)
168             w['best_share_hash'] = best
169             current_work.set(w)
170             
171             if best == share.hash:
172                 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
173             else:
174                 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
175         
176         def p2p_share_hash(share_hash, peer):
177             if share_hash in tracker.shares:
178                 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
179             else:
180                 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
181                 peer.send_getshares(hashes=[share_hash], parents=0, stops=[])
182         
183         def p2p_get_shares(share_hashes, parents, stops, peer):
184             parents = min(parents, 1000//len(share_hashes))
185             stops = set(stops)
186             shares = []
187             for share_hash in share_hashes:
188                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
189                     if share.hash in stops:
190                         break
191                     shares.append(share)
192             peer.send_shares(shares, full=True)
193         
194         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
195         
196         def parse(x):
197             if ':' in x:
198                 ip, port = x.split(':')
199                 return ip, int(port)
200             else:
201                 return x, args.net.P2P_PORT
202         
203         nodes = [
204             ('72.14.191.28', args.net.P2P_PORT),
205             ('62.204.197.159', args.net.P2P_PORT),
206         ]
207         try:
208             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
209         except:
210             print
211             print 'Error resolving bootstrap node IP:'
212             log.err()
213             print
214         
215         p2p_node = p2p.Node(
216             current_work=current_work,
217             port=args.p2pool_port,
218             net=args.net,
219             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
220             mode=0 if args.low_bandwidth else 1,
221             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
222         )
223         p2p_node.handle_share = p2p_share
224         p2p_node.handle_share_hash = p2p_share_hash
225         p2p_node.handle_get_shares = p2p_get_shares
226         
227         p2p_node.start()
228         
229         # send share when the chain changes to their chain
230         def work_changed(new_work):
231             #print 'Work changed:', new_work
232             for share in tracker.get_chain_known(new_work['best_share_hash']):
233                 if share.shared:
234                     break
235                 share_share(share, share.peer)
236         current_work.changed.watch(work_changed)
237         
238         print '    ...success!'
239         print
240         
241         # start listening for workers with a JSON-RPC server
242         
243         print 'Listening for workers on port %i...' % (args.worker_port,)
244         
245         # setup worker logic
246         
247         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
248         
249         def compute(state):
250             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
251             # XXX limit to merkle_branch and block max size - 1000000 byte
252             # and sigops
253             generate_tx = p2pool.generate_transaction(
254                 tracker=tracker,
255                 previous_share_hash=state['best_share_hash'],
256                 new_script=my_script,
257                 subsidy=(50*100000000 >> state['height']//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
258                 nonce=struct.pack('<Q', random.randrange(2**64)),
259                 block_target=state['target'],
260                 net=args.net,
261             )
262             print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
263             print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
264             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
265             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
266             merkle_root = bitcoin.data.merkle_hash(transactions)
267             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
268             
269             timestamp = current_work2.value['timestamp']
270             if state['best_share_hash'] is not None:
271                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
272                 if timestamp2 > timestamp:
273                     print 'Toff', timestamp2 - timestamp
274                     timestamp = timestamp2
275             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
276             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
277             return ba.getwork(p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'])
278         
279         def got_response(data):
280             try:
281                 # match up with transactions
282                 header = bitcoin.getwork.decode_data(data)
283                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
284                 if transactions is None:
285                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
286                     return False
287                 block = dict(header=header, txs=transactions)
288                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
289                 if hash_ <= block['header']['target']:
290                     print
291                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
292                     print
293                     if factory.conn.value is not None:
294                         factory.conn.value.send_block(block=block)
295                     else:
296                         print 'No bitcoind connection! Erp!'
297                 share = p2pool.Share.from_block(block)
298                 print 'GOT SHARE! %x' % (share.hash,)
299                 p2p_share(share)
300             except:
301                 print
302                 print 'Error processing data received from worker:'
303                 log.err()
304                 print
305                 return False
306             else:
307                 return True
308         
309         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
310         
311         print '    ...success!'
312         print
313         
314         # done!
315         
316         def get_blocks(start_hash):
317             while True:
318                 try:
319                     block = get_block.call_now(start_hash)
320                 except deferral.NotNowError:
321                     break
322                 yield start_hash, block
323                 start_hash = block['header']['previous_block']
324         
325         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
326         
327         class Tx(object):
328             def __init__(self, tx, seen_at_block):
329                 self.hash = bitcoin.data.tx_type.hash256(tx)
330                 self.tx = tx
331                 self.seen_at_block = seen_at_block
332                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
333                 #print
334                 #print '%x %r' % (seen_at_block, tx)
335                 #for mention in self.mentions:
336                 #    print '%x' % mention
337                 #print
338                 self.parents_all_in_blocks = False
339                 self.value_in = 0
340                 #print self.tx
341                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
342                 self._find_parents_in_blocks()
343             
344             @defer.inlineCallbacks
345             def _find_parents_in_blocks(self):
346                 for tx_in in self.tx['tx_ins']:
347                     try:
348                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
349                     except Exception:
350                         return
351                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
352                     #print raw_transaction
353                     if not raw_transaction['parent_blocks']:
354                         return
355                 self.parents_all_in_blocks = True
356             
357             def is_good(self):
358                 if not self.parents_all_in_blocks:
359                     return False
360                 x = self.is_good2()
361                 #print 'is_good:', x
362                 return x
363             
364             def is_good2(self):
365                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
366                     if block_hash == self.seen_at_block:
367                         return True
368                     for tx in block['txs']:
369                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
370                         if mentions & self.mentions:
371                             return False
372                 return False
373         
374         @defer.inlineCallbacks
375         def new_tx(tx_hash):
376             try:
377                 assert isinstance(tx_hash, (int, long))
378                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
379                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
380             except:
381                 print
382                 print 'Error handling tx:'
383                 log.err()
384                 print
385         factory.new_tx.watch(new_tx)
386         
387         def new_block(block):
388             set_real_work()
389         factory.new_block.watch(new_block)
390         
391         print 'Started successfully!'
392         print
393         
394         while True:
395             yield deferral.sleep(1)
396             set_real_work()
397     except:
398         print
399         print 'Fatal error:'
400         log.err()
401         print
402         reactor.stop()
403
404 def run():
405     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
406     parser.add_argument('--version', action='version', version=__version__)
407     parser.add_argument('--testnet',
408         help='use the testnet',
409         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
410     
411     p2pool_group = parser.add_argument_group('p2pool interface')
412     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
413         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
414         type=int, action='store', default=None, dest='p2pool_port')
415     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
416         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
417         type=str, action='append', default=[], dest='p2pool_nodes')
418     parser.add_argument('-l', '--low-bandwidth',
419         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
420         action='store_true', default=False, dest='low_bandwidth')
421     
422     worker_group = parser.add_argument_group('worker interface')
423     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
424         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
425         type=int, action='store', default=9332, dest='worker_port')
426     
427     bitcoind_group = parser.add_argument_group('bitcoind interface')
428     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
429         help='connect to a bitcoind at this address (default: 127.0.0.1)',
430         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
431     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
432         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
433         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
434     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
435         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
436         type=int, action='store', default=None, dest='bitcoind_p2p_port')
437     
438     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
439         help='bitcoind RPC interface username',
440         type=str, action='store', dest='bitcoind_rpc_username')
441     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
442         help='bitcoind RPC interface password',
443         type=str, action='store', dest='bitcoind_rpc_password')
444     
445     args = parser.parse_args()
446     
447     if args.bitcoind_p2p_port is None:
448         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
449     
450     if args.p2pool_port is None:
451         args.p2pool_port = args.net.P2P_PORT
452     
453     reactor.callWhenRunning(main, args)
454     reactor.run()