removed current_work dependency from p2p
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net2):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net2)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     assert bitcoin_data.pubkey_to_address(pubkey, net2) == address
52     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
53
54 @defer.inlineCallbacks
55 def main(args, net, datadir_path):
56     try:
57         print 'p2pool (version %s)' % (p2pool.__version__,)
58         print
59         try:
60             from . import draw
61         except ImportError:
62             draw = None
63             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
64             print
65         
66         bitcoin_data.Type.enable_caching()
67         
68         # connect to bitcoind over JSON-RPC and do initial getmemorypool
69         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
70         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
71         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
72         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
73         if not good:
74             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
75             return
76         temp_work = yield getwork(bitcoind)
77         print '    ...success!'
78         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
79         print
80         
81         # connect to bitcoind over bitcoin-p2p
82         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83         factory = bitcoin_p2p.ClientFactory(net.PARENT)
84         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85         yield factory.getProtocol() # waits until handshake is successful
86         print '    ...success!'
87         print
88         
89         if args.pubkey_hash is None:
90             print 'Getting payout address from bitcoind...'
91             my_script = yield get_payout_script2(bitcoind, net.PARENT)
92         else:
93             print 'Computing payout script from provided address....'
94             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
95         print '    ...success!'
96         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
97         print
98         
99         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
100         
101         tracker = p2pool_data.OkayTracker(net)
102         shared_share_hashes = set()
103         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
104         known_verified = set()
105         print "Loading shares..."
106         for i, (mode, contents) in enumerate(ss.get_shares()):
107             if mode == 'share':
108                 if contents.hash in tracker.shares:
109                     continue
110                 shared_share_hashes.add(contents.hash)
111                 contents.time_seen = 0
112                 tracker.add(contents)
113                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
114                     print "    %i" % (len(tracker.shares),)
115             elif mode == 'verified_hash':
116                 known_verified.add(contents)
117             else:
118                 raise AssertionError()
119         print "    ...inserting %i verified shares..." % (len(known_verified),)
120         for h in known_verified:
121             if h not in tracker.shares:
122                 ss.forget_verified_share(h)
123                 continue
124             tracker.verified.add(tracker.shares[h])
125         print "    ...done loading %i shares!" % (len(tracker.shares),)
126         print
127         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
128         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
129         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
130         
131         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
132         
133         pre_current_work = variable.Variable(None)
134         pre_merged_work = variable.Variable(None)
135         # information affecting work that should trigger a long-polling update
136         current_work = variable.Variable(None)
137         # information affecting work that should not trigger a long-polling update
138         current_work2 = variable.Variable(None)
139         
140         requested = expiring_dict.ExpiringDict(300)
141         
142         @defer.inlineCallbacks
143         def set_real_work1():
144             work = yield getwork(bitcoind)
145             current_work2.set(dict(
146                 time=work['time'],
147                 transactions=work['transactions'],
148                 subsidy=work['subsidy'],
149                 clock_offset=time.time() - work['time'],
150                 last_update=time.time(),
151             )) # second set first because everything hooks on the first
152             pre_current_work.set(dict(
153                 version=work['version'],
154                 previous_block=work['previous_block_hash'],
155                 bits=work['bits'],
156             ))
157         factory.new_block.watch(lambda block_hash: set_real_work1())
158         
159         def set_real_work2():
160             best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
161             
162             t = dict(pre_current_work.value)
163             t['best_share_hash'] = best
164             t['aux_work'] = pre_merged_work.value
165             current_work.set(t)
166             
167             t = time.time()
168             for peer2, share_hash in desired:
169                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
170                     continue
171                 last_request_time, count = requested.get(share_hash, (None, 0))
172                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
173                     continue
174                 potential_peers = set()
175                 for head in tracker.tails[share_hash]:
176                     potential_peers.update(peer_heads.get(head, set()))
177                 potential_peers = [peer for peer in potential_peers if peer.connected2]
178                 if count == 0 and peer2 is not None and peer2.connected2:
179                     peer = peer2
180                 else:
181                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
182                     if peer is None:
183                         continue
184                 
185                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
186                 peer.send_getshares(
187                     hashes=[share_hash],
188                     parents=2000,
189                     stops=list(set(tracker.heads) | set(
190                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
191                     ))[:100],
192                 )
193                 requested[share_hash] = t, count + 1
194         pre_current_work.changed.watch(lambda _: set_real_work2())
195         
196         print 'Initializing work...'
197         yield set_real_work1()
198         print '    ...success!'
199         print
200         
201         pre_merged_work.changed.watch(lambda _: set_real_work2())
202         ht.updated.watch(set_real_work2)
203         
204         @defer.inlineCallbacks
205         def set_merged_work():
206             if not args.merged_url:
207                 return
208             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
209             while True:
210                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
211                 pre_merged_work.set(dict(
212                     hash=int(auxblock['hash'], 16),
213                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
214                     chain_id=auxblock['chainid'],
215                 ))
216                 yield deferral.sleep(1)
217         set_merged_work()
218         
219         @pre_merged_work.changed.watch
220         def _(new_merged_work):
221             print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
222         
223         start_time = time.time() - current_work2.value['clock_offset']
224         
225         # setup p2p logic and join p2pool network
226         
227         def p2p_shares(shares, peer=None):
228             if len(shares) > 5:
229                 print 'Processing %i shares...' % (len(shares),)
230             
231             new_count = 0
232             for share in shares:
233                 if share.hash in tracker.shares:
234                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
235                     continue
236                 
237                 new_count += 1
238                 
239                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
240                 
241                 tracker.add(share)
242             
243             if shares and peer is not None:
244                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
245             
246             if new_count:
247                 set_real_work2()
248             
249             if len(shares) > 5:
250                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
251         
252         @tracker.verified.added.watch
253         def _(share):
254             if share.pow_hash <= share.header['bits'].target:
255                 if factory.conn.value is not None:
256                     factory.conn.value.send_block(block=share.as_block(tracker))
257                 else:
258                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
259                 print
260                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
261                 print
262         
263         def p2p_share_hashes(share_hashes, peer):
264             t = time.time()
265             get_hashes = []
266             for share_hash in share_hashes:
267                 if share_hash in tracker.shares:
268                     continue
269                 last_request_time, count = requested.get(share_hash, (None, 0))
270                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
271                     continue
272                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
273                 get_hashes.append(share_hash)
274                 requested[share_hash] = t, count + 1
275             
276             if share_hashes and peer is not None:
277                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
278             if get_hashes:
279                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
280         
281         def p2p_get_shares(share_hashes, parents, stops, peer):
282             parents = min(parents, 1000//len(share_hashes))
283             stops = set(stops)
284             shares = []
285             for share_hash in share_hashes:
286                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
287                     if share.hash in stops:
288                         break
289                     shares.append(share)
290             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
291             peer.sendShares(shares)
292         
293         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
294         
295         def parse(x):
296             if ':' in x:
297                 ip, port = x.split(':')
298                 return ip, int(port)
299             else:
300                 return x, net.P2P_PORT
301         
302         addrs = dict((parse(addr), (0, 0, 0)) for addr in net.BOOTSTRAP_ADDRS)
303         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
304             try:
305                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
306             except:
307                 print >>sys.stderr, "error reading addrs"
308         
309         def save_addrs():
310             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
311         task.LoopingCall(save_addrs).start(60)
312         
313         p2p_node = p2p.Node(
314             best_share_hash_func=lambda: current_work.value['best_share_hash'],
315             port=args.p2pool_port,
316             net=net,
317             addr_store=addrs,
318             preferred_addrs=set(map(parse, args.p2pool_nodes)),
319         )
320         p2p_node.handle_shares = p2p_shares
321         p2p_node.handle_share_hashes = p2p_share_hashes
322         p2p_node.handle_get_shares = p2p_get_shares
323         
324         p2p_node.start()
325         
326         # send share when the chain changes to their chain
327         def work_changed(new_work):
328             #print 'Work changed:', new_work
329             shares = []
330             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
331                 if share.hash in shared_share_hashes:
332                     break
333                 shared_share_hashes.add(share.hash)
334                 shares.append(share)
335             
336             for peer in p2p_node.peers.itervalues():
337                 peer.sendShares([share for share in shares if share.peer is not peer])
338         
339         current_work.changed.watch(work_changed)
340         
341         def save_shares():
342             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
343                 ss.add_share(share)
344                 if share.hash in tracker.verified.shares:
345                     ss.add_verified_hash(share.hash)
346         task.LoopingCall(save_shares).start(60)
347         
348         print '    ...success!'
349         print
350         
351         @defer.inlineCallbacks
352         def upnp_thread():
353             while True:
354                 try:
355                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
356                     if is_lan:
357                         pm = yield portmapper.get_port_mapper()
358                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
359                 except defer.TimeoutError:
360                     pass
361                 except:
362                     if p2pool.DEBUG:
363                         log.err(None, "UPnP error:")
364                 yield deferral.sleep(random.expovariate(1/120))
365         
366         if args.upnp:
367             upnp_thread()
368         
369         # start listening for workers with a JSON-RPC server
370         
371         print 'Listening for workers on port %i...' % (args.worker_port,)
372         
373         # setup worker logic
374         
375         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
376         
377         removed_unstales_var = variable.Variable((0, 0, 0))
378         @tracker.verified.removed.watch
379         def _(share):
380             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
381                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
382                 removed_unstales_var.set((
383                     removed_unstales_var.value[0] + 1,
384                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
385                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
386                 ))
387         
388         removed_doa_unstales_var = variable.Variable(0)
389         @tracker.verified.removed.watch
390         def _(share):
391             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
392                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
393         
394         stale_counter = skiplists.SumSkipList(tracker, lambda share: (
395             1 if share.hash in my_share_hashes else 0,
396             1 if share.hash in my_doa_share_hashes else 0,
397             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 253 else 0,
398             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 254 else 0,
399         ), (0, 0, 0, 0), math.add_tuples)
400         def get_stale_counts():
401             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
402             my_shares = len(my_share_hashes)
403             my_doa_shares = len(my_doa_share_hashes)
404             my_shares_in_chain, my_doa_shares_in_chain, orphans_recorded_in_chain, doas_recorded_in_chain = stale_counter(
405                 current_work.value['best_share_hash'],
406                 tracker.verified.get_height(current_work.value['best_share_hash']),
407             )
408             my_shares_in_chain += removed_unstales_var.value[0]
409             my_doa_shares_in_chain += removed_doa_unstales_var.value
410             orphans_recorded_in_chain += removed_unstales_var.value[1]
411             doas_recorded_in_chain += removed_unstales_var.value[2]
412             
413             my_shares_not_in_chain = my_shares - my_shares_in_chain
414             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
415             
416             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
417         
418         
419         def get_payout_script_from_username(user):
420             if user is None:
421                 return None
422             try:
423                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
424             except: # XXX blah
425                 return None
426             return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
427         
428         def precompute(request):
429             payout_script = get_payout_script_from_username(request.getUser())
430             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
431                 payout_script = my_script
432             return payout_script,
433
434         def compute(payout_script):
435             if len(p2p_node.peers) == 0 and net.PERSIST:
436                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
437             if current_work.value['best_share_hash'] is None and net.PERSIST:
438                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
439             if time.time() > current_work2.value['last_update'] + 60:
440                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
441             
442             share_info, generate_tx = p2pool_data.generate_transaction(
443                 tracker=tracker,
444                 share_data=dict(
445                     previous_share_hash=current_work.value['best_share_hash'],
446                     coinbase='' if current_work.value['aux_work'] is None else
447                         '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
448                     nonce=struct.pack('<Q', random.randrange(2**64)),
449                     new_script=payout_script,
450                     subsidy=current_work2.value['subsidy'],
451                     donation=math.perfect_round(65535*args.donation_percentage/100),
452                     stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
453                         253 if orphans > orphans_recorded_in_chain else
454                         254 if doas > doas_recorded_in_chain else
455                         0
456                     )(*get_stale_counts()),
457                 ),
458                 block_target=current_work.value['bits'].target,
459                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
460                 net=net,
461             )
462             
463             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
464                 bitcoin_data.target_to_difficulty(share_info['bits'].target),
465                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
466                 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
467                 len(current_work2.value['transactions']),
468             )
469             
470             transactions = [generate_tx] + list(current_work2.value['transactions'])
471             merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
472             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
473             
474             return bitcoin_getwork.BlockAttempt(
475                 version=current_work.value['version'],
476                 previous_block=current_work.value['previous_block'],
477                 merkle_root=merkle_root,
478                 timestamp=current_work2.value['time'],
479                 bits=current_work.value['bits'],
480                 share_target=share_info['bits'].target,
481             )
482         
483         my_share_hashes = set()
484         my_doa_share_hashes = set()
485         
486         def got_response(header, request):
487             try:
488                 # match up with transactions
489                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
490                 if xxx is None:
491                     print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
492                     return False
493                 share_info, transactions, getwork_time = xxx
494                 
495                 hash_ = bitcoin_data.block_header_type.hash256(header)
496                 
497                 pow_hash = net.PARENT.POW_FUNC(header)
498                 
499                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
500                     if factory.conn.value is not None:
501                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
502                     else:
503                         print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
504                     if pow_hash <= header['bits'].target:
505                         print
506                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
507                         print
508                 
509                 try:
510                     if current_work.value['aux_work'] is not None and (pow_hash <= current_work.value['aux_work']['target'] or p2pool.DEBUG):
511                         aux_pow = dict(
512                             merkle_tx=dict(
513                                 tx=transactions[0],
514                                 block_hash=hash_,
515                                 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
516                                 index=0,
517                             ),
518                             merkle_branch=[],
519                             index=0,
520                             parent_block_header=header,
521                         )
522                         
523                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
524                         #print a, b
525                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
526                         def _(res):
527                             print "MERGED RESULT:", res
528                         merged.rpc_getauxblock(a, b).addBoth(_)
529                 except:
530                     log.err(None, 'Error while processing merged mining POW:')
531                 
532                 if pow_hash > share_info['bits'].target:
533                     print >>sys.stderr, 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, share_info['bits'].target)
534                     return False
535                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
536                 my_share_hashes.add(share.hash)
537                 if share.previous_hash != current_work.value['best_share_hash']:
538                     my_doa_share_hashes.add(share.hash)
539                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (request.getUser(), p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
540                 good = share.previous_hash == current_work.value['best_share_hash']
541                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
542                 p2p_shares([share])
543                 # eg. good = share.hash == current_work.value['best_share_hash'] here
544                 return good
545             except:
546                 log.err(None, 'Error processing data received from worker:')
547                 return False
548         
549         web_root = resource.Resource()
550         worker_interface.WorkerInterface(compute, got_response, current_work.changed, precompute).attach_to(web_root)
551         
552         def get_rate():
553             if tracker.get_height(current_work.value['best_share_hash']) < 720:
554                 return json.dumps(None)
555             return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
556                 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
557         
558         def get_users():
559             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
560             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
561             res = {}
562             for script in sorted(weights, key=lambda s: weights[s]):
563                 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
564             return json.dumps(res)
565         
566         def get_global_stats():
567             # averaged over last hour
568             lookbehind = 3600//net.SHARE_PERIOD
569             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
570                 return None
571             
572             nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
573             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
574             return json.dumps(dict(
575                 pool_nonstale_hash_rate=nonstale_hash_rate,
576                 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
577                 pool_stale_prop=stale_prop,
578             ))
579         
580         def get_local_stats():
581             lookbehind = 3600//net.SHARE_PERIOD
582             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
583                 return None
584             
585             global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
586             
587             my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
588             my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
589             my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
590             my_share_count = my_unstale_count + my_orphan_count + my_doa_count
591             my_stale_count = my_orphan_count + my_doa_count
592             
593             my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
594             
595             my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
596                 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
597                 if share.hash in my_share_hashes)
598             actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
599                 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
600             share_att_s = my_work / actual_time
601             
602             return json.dumps(dict(
603                 my_hash_rates_in_last_hour=dict(
604                     nonstale=share_att_s,
605                     rewarded=share_att_s/(1 - global_stale_prop),
606                     actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
607                 ),
608                 my_share_counts_in_last_hour=dict(
609                     shares=my_share_count,
610                     unstale_shares=my_unstale_count,
611                     stale_shares=my_stale_count,
612                     orphan_stale_shares=my_orphan_count,
613                     doa_stale_shares=my_doa_count,
614                 ),
615                 my_stale_proportions_in_last_hour=dict(
616                     stale=my_stale_prop,
617                     orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
618                     dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
619                 ),
620             ))
621         
622         def get_peer_addresses():
623             return ' '.join(peer.transport.getPeer().host + ':' + str(peer.transport.getPeer().port) for peer in p2p_node.peers.itervalues())
624         
625         class WebInterface(resource.Resource):
626             def __init__(self, func, mime_type):
627                 self.func, self.mime_type = func, mime_type
628             
629             def render_GET(self, request):
630                 request.setHeader('Content-Type', self.mime_type)
631                 return self.func()
632         
633         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
634         web_root.putChild('users', WebInterface(get_users, 'application/json'))
635         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
636         web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
637         web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
638         web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
639         if draw is not None:
640             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
641         
642         reactor.listenTCP(args.worker_port, server.Site(web_root))
643         
644         print '    ...success!'
645         print
646         
647         
648         # call getmemorypool every 15 seconds to check that bitcoind is alive
649         task.LoopingCall(set_real_work1).start(15)
650         
651         
652         # done!
653         print 'Started successfully!'
654         print
655         
656         
657         if hasattr(signal, 'SIGALRM'):
658             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
659                 lambda: sys.stdout.write('Watchdog timer went off at:\n' + ''.join(traceback.format_stack()))
660             ))
661             signal.siginterrupt(signal.SIGALRM, False)
662             task.LoopingCall(signal.alarm, 30).start(1)
663         
664         @defer.inlineCallbacks
665         def status_thread():
666             last_str = None
667             last_time = 0
668             while True:
669                 yield deferral.sleep(3)
670                 try:
671                     if time.time() > current_work2.value['last_update'] + 60:
672                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
673                     if current_work.value['best_share_hash'] is not None:
674                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
675                         if height > 2:
676                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
677                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
678                             (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
679                             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
680                             real_att_s = att_s / (1 - stale_prop)
681                             my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
682                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
683                                 math.format(int(real_att_s)),
684                                 height,
685                                 len(tracker.verified.shares),
686                                 len(tracker.shares),
687                                 weights.get(my_script, 0)/total_weight*100,
688                                 math.format(int(my_att_s)),
689                                 shares,
690                                 stale_orphan_shares,
691                                 stale_doa_shares,
692                                 len(p2p_node.peers),
693                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
694                             this_str += '\nAverage time between blocks: %.2f days' % (
695                                 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
696                             )
697                             this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
698                             conf = 0.95
699                             if shares:
700                                 stale_shares = stale_orphan_shares + stale_doa_shares
701                                 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
702                                 if stale_prop < .99:
703                                     this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - stale_prop) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
704                             if this_str != last_str or time.time() > last_time + 15:
705                                 print this_str
706                                 last_str = this_str
707                                 last_time = time.time()
708                 except:
709                     log.err()
710         status_thread()
711     except:
712         log.err(None, 'Fatal error:')
713
714 def run():
715     class FixedArgumentParser(argparse.ArgumentParser):
716         def _read_args_from_files(self, arg_strings):
717             # expand arguments referencing files
718             new_arg_strings = []
719             for arg_string in arg_strings:
720                 
721                 # for regular arguments, just add them back into the list
722                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
723                     new_arg_strings.append(arg_string)
724                 
725                 # replace arguments referencing files with the file content
726                 else:
727                     try:
728                         args_file = open(arg_string[1:])
729                         try:
730                             arg_strings = []
731                             for arg_line in args_file.read().splitlines():
732                                 for arg in self.convert_arg_line_to_args(arg_line):
733                                     arg_strings.append(arg)
734                             arg_strings = self._read_args_from_files(arg_strings)
735                             new_arg_strings.extend(arg_strings)
736                         finally:
737                             args_file.close()
738                     except IOError:
739                         err = sys.exc_info()[1]
740                         self.error(str(err))
741             
742             # return the modified argument list
743             return new_arg_strings
744         
745         def convert_arg_line_to_args(self, arg_line):
746             return [arg for arg in arg_line.split() if arg.strip()]
747     
748     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
749     parser.add_argument('--version', action='version', version=p2pool.__version__)
750     parser.add_argument('--net',
751         help='use specified network (default: bitcoin)',
752         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
753     parser.add_argument('--testnet',
754         help='''use the network's testnet''',
755         action='store_const', const=True, default=False, dest='testnet')
756     parser.add_argument('--debug',
757         help='enable debugging mode',
758         action='store_const', const=True, default=False, dest='debug')
759     parser.add_argument('-a', '--address',
760         help='generate payouts to this address (default: <address requested from bitcoind>)',
761         type=str, action='store', default=None, dest='address')
762     parser.add_argument('--logfile',
763         help='''log to this file (default: data/<NET>/log)''',
764         type=str, action='store', default=None, dest='logfile')
765     parser.add_argument('--merged-url',
766         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
767         type=str, action='store', default=None, dest='merged_url')
768     parser.add_argument('--merged-userpass',
769         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
770         type=str, action='store', default=None, dest='merged_userpass')
771     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
772         help='donate this percentage of work to author of p2pool (default: 0.5)',
773         type=float, action='store', default=0.5, dest='donation_percentage')
774     
775     p2pool_group = parser.add_argument_group('p2pool interface')
776     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
777         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
778         type=int, action='store', default=None, dest='p2pool_port')
779     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
780         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
781         type=str, action='append', default=[], dest='p2pool_nodes')
782     parser.add_argument('--disable-upnp',
783         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
784         action='store_false', default=True, dest='upnp')
785     
786     worker_group = parser.add_argument_group('worker interface')
787     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
788         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
789         type=int, action='store', default=None, dest='worker_port')
790     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
791         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
792         type=float, action='store', default=0, dest='worker_fee')
793     
794     bitcoind_group = parser.add_argument_group('bitcoind interface')
795     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
796         help='connect to this address (default: 127.0.0.1)',
797         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
798     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
799         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
800         type=int, action='store', default=None, dest='bitcoind_rpc_port')
801     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
802         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
803         type=int, action='store', default=None, dest='bitcoind_p2p_port')
804     
805     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
806         help='bitcoind RPC interface username (default: <empty>)',
807         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
808     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
809         help='bitcoind RPC interface password',
810         type=str, action='store', dest='bitcoind_rpc_password')
811     
812     args = parser.parse_args()
813     
814     if args.debug:
815         p2pool.DEBUG = True
816     
817     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
818     
819     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
820     if not os.path.exists(datadir_path):
821         os.makedirs(datadir_path)
822     
823     if args.logfile is None:
824         args.logfile = os.path.join(datadir_path, 'log')
825     
826     class EncodeReplacerPipe(object):
827         def __init__(self, inner_file):
828             self.inner_file = inner_file
829             self.softspace = 0
830         def write(self, data):
831             if isinstance(data, unicode):
832                 try:
833                     data = data.encode(self.inner_file.encoding, 'replace')
834                 except:
835                     data = data.encode('ascii', 'replace')
836             self.inner_file.write(data)
837         def flush(self):
838             self.inner_file.flush()
839     class LogFile(object):
840         def __init__(self, filename):
841             self.filename = filename
842             self.inner_file = None
843             self.reopen()
844         def reopen(self):
845             if self.inner_file is not None:
846                 self.inner_file.close()
847             open(self.filename, 'a').close()
848             f = open(self.filename, 'rb')
849             f.seek(0, os.SEEK_END)
850             length = f.tell()
851             if length > 100*1000*1000:
852                 f.seek(-1000*1000, os.SEEK_END)
853                 while True:
854                     if f.read(1) in ('', '\n'):
855                         break
856                 data = f.read()
857                 f.close()
858                 f = open(self.filename, 'wb')
859                 f.write(data)
860             f.close()
861             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
862         def write(self, data):
863             self.inner_file.write(data)
864         def flush(self):
865             self.inner_file.flush()
866     class TeePipe(object):
867         def __init__(self, outputs):
868             self.outputs = outputs
869         def write(self, data):
870             for output in self.outputs:
871                 output.write(data)
872         def flush(self):
873             for output in self.outputs:
874                 output.flush()
875     class TimestampingPipe(object):
876         def __init__(self, inner_file):
877             self.inner_file = inner_file
878             self.buf = ''
879             self.softspace = 0
880         def write(self, data):
881             buf = self.buf + data
882             lines = buf.split('\n')
883             for line in lines[:-1]:
884                 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
885                 self.inner_file.flush()
886             self.buf = lines[-1]
887         def flush(self):
888             pass
889     class AbortPipe(object):
890         def __init__(self, inner_file):
891             self.inner_file = inner_file
892             self.softspace = 0
893         def write(self, data):
894             try:
895                 self.inner_file.write(data)
896             except:
897                 sys.stdout = sys.__stdout__
898                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
899                 raise
900         def flush(self):
901             self.inner_file.flush()
902     class PrefixPipe(object):
903         def __init__(self, inner_file, prefix):
904             self.inner_file = inner_file
905             self.prefix = prefix
906             self.buf = ''
907             self.softspace = 0
908         def write(self, data):
909             buf = self.buf + data
910             lines = buf.split('\n')
911             for line in lines[:-1]:
912                 self.inner_file.write(self.prefix + line + '\n')
913                 self.inner_file.flush()
914             self.buf = lines[-1]
915         def flush(self):
916             pass
917     logfile = LogFile(args.logfile)
918     pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
919     sys.stdout = AbortPipe(pipe)
920     sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
921     if hasattr(signal, "SIGUSR1"):
922         def sigusr1(signum, frame):
923             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
924             logfile.reopen()
925             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
926         signal.signal(signal.SIGUSR1, sigusr1)
927     task.LoopingCall(logfile.reopen).start(5)
928     
929     if args.bitcoind_rpc_port is None:
930         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
931     
932     if args.bitcoind_p2p_port is None:
933         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
934     
935     if args.p2pool_port is None:
936         args.p2pool_port = net.P2P_PORT
937     
938     if args.worker_port is None:
939         args.worker_port = net.WORKER_PORT
940     
941     if args.address is not None:
942         try:
943             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
944         except Exception, e:
945             parser.error('error parsing address: ' + repr(e))
946     else:
947         args.pubkey_hash = None
948     
949     if (args.merged_url is None) ^ (args.merged_userpass is None):
950         parser.error('must specify --merged-url and --merged-userpass')
951     
952     reactor.callWhenRunning(main, args, net, datadir_path)
953     reactor.run()