use bitcoin timestamp to judge shares
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import itertools
7 import os
8 import random
9 import sqlite3
10 import struct
11 import subprocess
12 import sys
13
14 from twisted.internet import defer, reactor
15 from twisted.web import server
16 from twisted.python import log
17
18 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
19 from util import db, expiring_dict, jsonrpc, variable, deferral, math
20 from . import p2p, worker_interface
21 import p2pool.data as p2pool
22
23 try:
24     __version__ = subprocess.Popen(['svnversion', os.path.dirname(sys.argv[0])], stdout=subprocess.PIPE).stdout.read().strip()
25 except:
26     __version__ = 'unknown'
27
28 @deferral.retry('Error getting work from bitcoind:', 3)
29 @defer.inlineCallbacks
30 def getwork(bitcoind):
31     # a block could arrive in between these two queries
32     getwork_df, height_df = bitcoind.rpc_getwork(), bitcoind.rpc_getblocknumber()
33     try:
34         getwork, height = bitcoin.getwork.BlockAttempt.from_getwork((yield getwork_df)), (yield height_df)
35     finally:
36         # get rid of residual errors
37         getwork_df.addErrback(lambda fail: None)
38         height_df.addErrback(lambda fail: None)
39     defer.returnValue((getwork, height))
40
41 @deferral.retry('Error getting payout script from bitcoind:', 1)
42 @defer.inlineCallbacks
43 def get_payout_script(factory):
44     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
45     if res['reply'] == 'success':
46         my_script = res['script']
47     elif res['reply'] == 'denied':
48         my_script = None
49     else:
50         raise ValueError('Unexpected reply: %r' % (res,))
51
52 @deferral.retry('Error creating payout script:', 10)
53 @defer.inlineCallbacks
54 def get_payout_script2(bitcoind, net):
55     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getnewaddress()), net)))
56
57 @defer.inlineCallbacks
58 def main(args):
59     try:
60         print 'p2pool (version %s)' % (__version__,)
61         print
62         
63         # connect to bitcoind over JSON-RPC and do initial getwork
64         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
65         print '''Testing bitcoind RPC connection to '%s' with authorization '%s:%s'...''' % (url, args.bitcoind_rpc_username, args.bitcoind_rpc_password)
66         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
67         work, height = yield getwork(bitcoind)
68         print '    ...success!'
69         print '    Current block hash: %x height: %i' % (work.previous_block, height)
70         print
71         
72         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
73         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
74         factory = bitcoin.p2p.ClientFactory(args.net)
75         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
76         my_script = yield get_payout_script(factory)
77         if my_script is None:
78             print 'IP transaction denied ... falling back to sending to address. Enable IP transactions on your bitcoind!'
79             my_script = yield get_payout_script2(bitcoind, args.net)
80         print '    ...success!'
81         print '    Payout script:', my_script.encode('hex')
82         print
83         
84         @defer.inlineCallbacks
85         def real_get_block(block_hash):
86             block = yield (yield factory.getProtocol()).get_block(block_hash)
87             print 'Got block %x' % (block_hash,)
88             defer.returnValue(block)
89         get_block = deferral.DeferredCacher(real_get_block, expiring_dict.ExpiringDict(3600))
90         
91         get_raw_transaction = deferral.DeferredCacher(lambda tx_hash: bitcoind.rpc_getrawtransaction('%x' % tx_hash), expiring_dict.ExpiringDict(100))
92         
93         ht = bitcoin.p2p.HeightTracker(factory)
94         
95         tracker = p2pool.OkayTracker(args.net)
96         chains = expiring_dict.ExpiringDict(300)
97         def get_chain(chain_id_data):
98             return chains.setdefault(chain_id_data, Chain(chain_id_data))
99         
100         # information affecting work that should trigger a long-polling update
101         current_work = variable.Variable(None)
102         # information affecting work that should not trigger a long-polling update
103         current_work2 = variable.Variable(None)
104         
105         requested = set()
106         
107         @defer.inlineCallbacks
108         def set_real_work():
109             work, height = yield getwork(bitcoind)
110             current_work2.set(dict(
111                 time=work.timestamp,
112             ))
113             best, desired = tracker.think(ht, current_work2.value['time'])
114             for peer2, share_hash in desired:
115                 if peer2 is None:
116                     continue
117                 if (peer2.nonce, share_hash) in requested:
118                     continue
119                 print 'Requesting parent share %x' % (share_hash,)
120                 peer2.send_getshares(
121                     hashes=[share_hash],
122                     parents=2000,
123                     stops=list(set(tracker.heads) | set(
124                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
125                     )),
126                 )
127                 requested.add((peer2.nonce, share_hash))
128             current_work.set(dict(
129                 version=work.version,
130                 previous_block=work.previous_block,
131                 target=work.target,
132                 height=height,
133                 best_share_hash=best,
134             ))
135         
136         print 'Initializing work...'
137         yield set_real_work()
138         print '    ...success!'
139         
140         # setup p2p logic and join p2pool network
141         
142         def share_share(share, ignore_peer=None):
143             for peer in p2p_node.peers.itervalues():
144                 if peer is ignore_peer:
145                     continue
146                 peer.send_shares([share])
147             share.flag_shared()
148         
149         def p2p_share(share, peer=None):
150             if share.hash in tracker.shares:
151                 print 'Got duplicate share, ignoring. Hash: %x' % (share.hash,)
152                 return
153             
154             #print 'Received share %x' % (share.hash,)
155             
156             tracker.add(share)
157             best, desired = tracker.think(ht, current_work2.value['time'])
158             #for peer2, share_hash in desired:
159             #    print 'Requesting parent share %x' % (share_hash,)
160             #    peer2.send_getshares(hashes=[share_hash], parents=2000)
161             
162             if share.gentx is not None:
163                 if share.hash <= share.header['target']:
164                     print
165                     print 'GOT BLOCK! Passing to bitcoind! %x' % (share.hash,)
166                     print
167                     if factory.conn.value is not None:
168                         factory.conn.value.send_block(block=share.as_block())
169                     else:
170                         print 'No bitcoind connection! Erp!'
171             
172             w = dict(current_work.value)
173             w['best_share_hash'] = best
174             current_work.set(w)
175             
176             if best == share.hash:
177                 print 'Accepted share, new highest, will pass to peers! Hash: %x' % (share.hash,)
178             else:
179                 print 'Accepted share, not highest. Hash: %x' % (share.hash,)
180         
181         def p2p_share_hash(share_hash, peer):
182             if share_hash in tracker.shares:
183                 print 'Got share hash, already have, ignoring. Hash: %x' % (share_hash,)
184             else:
185                 print 'Got share hash, requesting! Hash: %x' % (share_hash,)
186                 peer.send_getshares(hashes=[share_hash], parents=0, stops=[])
187         
188         def p2p_get_shares(share_hashes, parents, stops, peer):
189             parents = min(parents, 1000//len(share_hashes))
190             stops = set(stops)
191             shares = []
192             for share_hash in share_hashes:
193                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
194                     if share.hash in stops:
195                         break
196                     shares.append(share)
197             peer.send_shares(shares, full=True)
198         
199         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
200         
201         def parse(x):
202             if ':' in x:
203                 ip, port = x.split(':')
204                 return ip, int(port)
205             else:
206                 return x, args.net.P2P_PORT
207         
208         nodes = [
209             ('72.14.191.28', args.net.P2P_PORT),
210             ('62.204.197.159', args.net.P2P_PORT),
211         ]
212         try:
213             nodes.append(((yield reactor.resolve('p2pool.forre.st')), args.net.P2P_PORT))
214         except:
215             print
216             print 'Error resolving bootstrap node IP:'
217             log.err()
218             print
219         
220         p2p_node = p2p.Node(
221             current_work=current_work,
222             port=args.p2pool_port,
223             net=args.net,
224             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.ADDRS_TABLE),
225             mode=0 if args.low_bandwidth else 1,
226             preferred_addrs=map(parse, args.p2pool_nodes) + nodes,
227         )
228         p2p_node.handle_share = p2p_share
229         p2p_node.handle_share_hash = p2p_share_hash
230         p2p_node.handle_get_shares = p2p_get_shares
231         
232         p2p_node.start()
233         
234         # send share when the chain changes to their chain
235         def work_changed(new_work):
236             #print 'Work changed:', new_work
237             for share in tracker.get_chain_known(new_work['best_share_hash']):
238                 if share.shared:
239                     break
240                 share_share(share, share.peer)
241         current_work.changed.watch(work_changed)
242         
243         print '    ...success!'
244         print
245         
246         # start listening for workers with a JSON-RPC server
247         
248         print 'Listening for workers on port %i...' % (args.worker_port,)
249         
250         # setup worker logic
251         
252         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
253         
254         def compute(state, all_targets):
255             extra_txs = [tx for tx in tx_pool.itervalues() if tx.is_good()]
256             # XXX limit to merkle_branch and block max size - 1000000 byte
257             # and sigops
258             generate_tx = p2pool.generate_transaction(
259                 tracker=tracker,
260                 previous_share_hash=state['best_share_hash'],
261                 new_script=my_script,
262                 subsidy=(50*100000000 >> (state['height'] + 1)//210000) + sum(tx.value_in - tx.value_out for tx in extra_txs),
263                 nonce=struct.pack('<Q', random.randrange(2**64)),
264                 block_target=state['target'],
265                 net=args.net,
266             )
267             print 'Generating!', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']//1000000
268             print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2'],)
269             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
270             transactions = [generate_tx] + [tx.tx for tx in extra_txs]
271             merkle_root = bitcoin.data.merkle_hash(transactions)
272             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
273             
274             timestamp = current_work2.value['time']
275             if state['best_share_hash'] is not None:
276                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
277                 if timestamp2 > timestamp:
278                     print 'Toff', timestamp2 - timestamp
279                     timestamp = timestamp2
280             ba = bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'])
281             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
282             target = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target2']
283             if not all_targets:
284                 target = min(2**256//2**32 - 1, target)
285             return ba.getwork(target)
286         
287         def got_response(data):
288             try:
289                 # match up with transactions
290                 header = bitcoin.getwork.decode_data(data)
291                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
292                 if transactions is None:
293                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
294                     return False
295                 block = dict(header=header, txs=transactions)
296                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
297                 if hash_ <= block['header']['target']:
298                     print
299                     print 'GOT BLOCK! Passing to bitcoind! %x' % (hash_,)
300                     print
301                     if factory.conn.value is not None:
302                         factory.conn.value.send_block(block=block)
303                     else:
304                         print 'No bitcoind connection! Erp!'
305                 share = p2pool.Share.from_block(block)
306                 print 'GOT SHARE! %x' % (share.hash,)
307                 p2p_share(share)
308             except:
309                 print
310                 print 'Error processing data received from worker:'
311                 log.err()
312                 print
313                 return False
314             else:
315                 return True
316         
317         reactor.listenTCP(args.worker_port, server.Site(worker_interface.WorkerInterface(current_work, compute, got_response)))
318         
319         print '    ...success!'
320         print
321         
322         # done!
323         
324         def get_blocks(start_hash):
325             while True:
326                 try:
327                     block = get_block.call_now(start_hash)
328                 except deferral.NotNowError:
329                     break
330                 yield start_hash, block
331                 start_hash = block['header']['previous_block']
332         
333         tx_pool = expiring_dict.ExpiringDict(600, get_touches=False) # hash -> tx
334         
335         class Tx(object):
336             def __init__(self, tx, seen_at_block):
337                 self.hash = bitcoin.data.tx_type.hash256(tx)
338                 self.tx = tx
339                 self.seen_at_block = seen_at_block
340                 self.mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
341                 #print
342                 #print '%x %r' % (seen_at_block, tx)
343                 #for mention in self.mentions:
344                 #    print '%x' % mention
345                 #print
346                 self.parents_all_in_blocks = False
347                 self.value_in = 0
348                 #print self.tx
349                 self.value_out = sum(txout['value'] for txout in self.tx['tx_outs'])
350                 self._find_parents_in_blocks()
351             
352             @defer.inlineCallbacks
353             def _find_parents_in_blocks(self):
354                 for tx_in in self.tx['tx_ins']:
355                     try:
356                         raw_transaction = yield get_raw_transaction(tx_in['previous_output']['hash'])
357                     except Exception:
358                         return
359                     self.value_in += raw_transaction['tx']['txouts'][tx_in['previous_output']['index']]['value']
360                     #print raw_transaction
361                     if not raw_transaction['parent_blocks']:
362                         return
363                 self.parents_all_in_blocks = True
364             
365             def is_good(self):
366                 if not self.parents_all_in_blocks:
367                     return False
368                 x = self.is_good2()
369                 #print 'is_good:', x
370                 return x
371             
372             def is_good2(self):
373                 for block_hash, block in itertools.islice(get_blocks(current_work.value['previous_block']), 10):
374                     if block_hash == self.seen_at_block:
375                         return True
376                     for tx in block['txs']:
377                         mentions = set([bitcoin.data.tx_type.hash256(tx)] + [tx_in['previous_output']['hash'] for tx_in in tx['tx_ins']])
378                         if mentions & self.mentions:
379                             return False
380                 return False
381         
382         @defer.inlineCallbacks
383         def new_tx(tx_hash):
384             try:
385                 assert isinstance(tx_hash, (int, long))
386                 tx = yield (yield factory.getProtocol()).get_tx(tx_hash)
387                 tx_pool[bitcoin.data.tx_type.hash256(tx)] = Tx(tx, current_work.value['previous_block'])
388             except:
389                 print
390                 print 'Error handling tx:'
391                 log.err()
392                 print
393         factory.new_tx.watch(new_tx)
394         
395         def new_block(block):
396             set_real_work()
397         factory.new_block.watch(new_block)
398         
399         print 'Started successfully!'
400         print
401         
402         while True:
403             yield deferral.sleep(1)
404             yield set_real_work()
405     except:
406         print
407         print 'Fatal error:'
408         log.err()
409         print
410         reactor.stop()
411
412 def run():
413     parser = argparse.ArgumentParser(description='p2pool (version %s)' % (__version__,))
414     parser.add_argument('--version', action='version', version=__version__)
415     parser.add_argument('--testnet',
416         help='use the testnet',
417         action='store_const', const=p2pool.Testnet, default=p2pool.Mainnet, dest='net')
418     
419     p2pool_group = parser.add_argument_group('p2pool interface')
420     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
421         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
422         type=int, action='store', default=None, dest='p2pool_port')
423     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
424         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
425         type=str, action='append', default=[], dest='p2pool_nodes')
426     parser.add_argument('-l', '--low-bandwidth',
427         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
428         action='store_true', default=False, dest='low_bandwidth')
429     
430     worker_group = parser.add_argument_group('worker interface')
431     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
432         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: 9332)',
433         type=int, action='store', default=9332, dest='worker_port')
434     
435     bitcoind_group = parser.add_argument_group('bitcoind interface')
436     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
437         help='connect to a bitcoind at this address (default: 127.0.0.1)',
438         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
439     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
440         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332)',
441         type=int, action='store', default=8332, dest='bitcoind_rpc_port')
442     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
443         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
444         type=int, action='store', default=None, dest='bitcoind_p2p_port')
445     
446     bitcoind_group.add_argument(metavar='BITCOIND_RPC_USERNAME',
447         help='bitcoind RPC interface username',
448         type=str, action='store', dest='bitcoind_rpc_username')
449     bitcoind_group.add_argument(metavar='BITCOIND_RPC_PASSWORD',
450         help='bitcoind RPC interface password',
451         type=str, action='store', dest='bitcoind_rpc_password')
452     
453     args = parser.parse_args()
454     
455     if args.bitcoind_p2p_port is None:
456         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
457     
458     if args.p2pool_port is None:
459         args.p2pool_port = args.net.P2P_PORT
460     
461     reactor.callWhenRunning(main, args)
462     reactor.run()