added /peer_addresses
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net2):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net2)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     assert bitcoin_data.pubkey_to_address(pubkey, net2) == address
52     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
53
54 @defer.inlineCallbacks
55 def main(args, net, datadir_path):
56     try:
57         print 'p2pool (version %s)' % (p2pool.__version__,)
58         print
59         try:
60             from . import draw
61         except ImportError:
62             draw = None
63             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
64             print
65         
66         bitcoin_data.Type.enable_caching()
67         
68         # connect to bitcoind over JSON-RPC and do initial getmemorypool
69         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
70         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
71         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
72         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
73         if not good:
74             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
75             return
76         temp_work = yield getwork(bitcoind)
77         print '    ...success!'
78         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
79         print
80         
81         # connect to bitcoind over bitcoin-p2p
82         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83         factory = bitcoin_p2p.ClientFactory(net.PARENT)
84         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85         yield factory.getProtocol() # waits until handshake is successful
86         print '    ...success!'
87         print
88         
89         if args.pubkey_hash is None:
90             print 'Getting payout address from bitcoind...'
91             my_script = yield get_payout_script2(bitcoind, net.PARENT)
92         else:
93             print 'Computing payout script from provided address....'
94             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
95         print '    ...success!'
96         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
97         print
98         
99         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
100         
101         tracker = p2pool_data.OkayTracker(net)
102         shared_share_hashes = set()
103         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
104         known_verified = set()
105         print "Loading shares..."
106         for i, (mode, contents) in enumerate(ss.get_shares()):
107             if mode == 'share':
108                 if contents.hash in tracker.shares:
109                     continue
110                 shared_share_hashes.add(contents.hash)
111                 contents.time_seen = 0
112                 tracker.add(contents)
113                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
114                     print "    %i" % (len(tracker.shares),)
115             elif mode == 'verified_hash':
116                 known_verified.add(contents)
117             else:
118                 raise AssertionError()
119         print "    ...inserting %i verified shares..." % (len(known_verified),)
120         for h in known_verified:
121             if h not in tracker.shares:
122                 ss.forget_verified_share(h)
123                 continue
124             tracker.verified.add(tracker.shares[h])
125         print "    ...done loading %i shares!" % (len(tracker.shares),)
126         print
127         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
128         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
129         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
130         
131         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
132         
133         pre_current_work = variable.Variable(None)
134         pre_merged_work = variable.Variable(None)
135         # information affecting work that should trigger a long-polling update
136         current_work = variable.Variable(None)
137         # information affecting work that should not trigger a long-polling update
138         current_work2 = variable.Variable(None)
139         
140         requested = expiring_dict.ExpiringDict(300)
141         
142         @defer.inlineCallbacks
143         def set_real_work1():
144             work = yield getwork(bitcoind)
145             current_work2.set(dict(
146                 time=work['time'],
147                 transactions=work['transactions'],
148                 subsidy=work['subsidy'],
149                 clock_offset=time.time() - work['time'],
150                 last_update=time.time(),
151             )) # second set first because everything hooks on the first
152             pre_current_work.set(dict(
153                 version=work['version'],
154                 previous_block=work['previous_block_hash'],
155                 bits=work['bits'],
156             ))
157         factory.new_block.watch(lambda block_hash: set_real_work1())
158         
159         def set_real_work2():
160             best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
161             
162             t = dict(pre_current_work.value)
163             t['best_share_hash'] = best
164             t['aux_work'] = pre_merged_work.value
165             current_work.set(t)
166             
167             t = time.time()
168             for peer2, share_hash in desired:
169                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
170                     continue
171                 last_request_time, count = requested.get(share_hash, (None, 0))
172                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
173                     continue
174                 potential_peers = set()
175                 for head in tracker.tails[share_hash]:
176                     potential_peers.update(peer_heads.get(head, set()))
177                 potential_peers = [peer for peer in potential_peers if peer.connected2]
178                 if count == 0 and peer2 is not None and peer2.connected2:
179                     peer = peer2
180                 else:
181                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
182                     if peer is None:
183                         continue
184                 
185                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
186                 peer.send_getshares(
187                     hashes=[share_hash],
188                     parents=2000,
189                     stops=list(set(tracker.heads) | set(
190                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
191                     ))[:100],
192                 )
193                 requested[share_hash] = t, count + 1
194         pre_current_work.changed.watch(lambda _: set_real_work2())
195         
196         print 'Initializing work...'
197         yield set_real_work1()
198         print '    ...success!'
199         print
200         
201         pre_merged_work.changed.watch(lambda _: set_real_work2())
202         ht.updated.watch(set_real_work2)
203         
204         @defer.inlineCallbacks
205         def set_merged_work():
206             if not args.merged_url:
207                 return
208             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
209             while True:
210                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
211                 pre_merged_work.set(dict(
212                     hash=int(auxblock['hash'], 16),
213                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
214                     chain_id=auxblock['chainid'],
215                 ))
216                 yield deferral.sleep(1)
217         set_merged_work()
218         
219         @pre_merged_work.changed.watch
220         def _(new_merged_work):
221             print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
222         
223         start_time = time.time() - current_work2.value['clock_offset']
224         
225         # setup p2p logic and join p2pool network
226         
227         def p2p_shares(shares, peer=None):
228             if len(shares) > 5:
229                 print 'Processing %i shares...' % (len(shares),)
230             
231             new_count = 0
232             for share in shares:
233                 if share.hash in tracker.shares:
234                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
235                     continue
236                 
237                 new_count += 1
238                 
239                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
240                 
241                 tracker.add(share)
242             
243             if shares and peer is not None:
244                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
245             
246             if new_count:
247                 set_real_work2()
248             
249             if len(shares) > 5:
250                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
251         
252         @tracker.verified.added.watch
253         def _(share):
254             if share.pow_hash <= share.header['bits'].target:
255                 if factory.conn.value is not None:
256                     factory.conn.value.send_block(block=share.as_block(tracker))
257                 else:
258                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
259                 print
260                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
261                 print
262         
263         def p2p_share_hashes(share_hashes, peer):
264             t = time.time()
265             get_hashes = []
266             for share_hash in share_hashes:
267                 if share_hash in tracker.shares:
268                     continue
269                 last_request_time, count = requested.get(share_hash, (None, 0))
270                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
271                     continue
272                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
273                 get_hashes.append(share_hash)
274                 requested[share_hash] = t, count + 1
275             
276             if share_hashes and peer is not None:
277                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
278             if get_hashes:
279                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
280         
281         def p2p_get_shares(share_hashes, parents, stops, peer):
282             parents = min(parents, 1000//len(share_hashes))
283             stops = set(stops)
284             shares = []
285             for share_hash in share_hashes:
286                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
287                     if share.hash in stops:
288                         break
289                     shares.append(share)
290             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
291             peer.sendShares(shares)
292         
293         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
294         
295         def parse(x):
296             if ':' in x:
297                 ip, port = x.split(':')
298                 return ip, int(port)
299             else:
300                 return x, net.P2P_PORT
301         
302         nodes = set([
303             ('72.14.191.28', net.P2P_PORT),
304             ('62.204.197.159', net.P2P_PORT),
305             ('142.58.248.28', net.P2P_PORT),
306             ('94.23.34.145', net.P2P_PORT),
307         ])
308         for host in [
309             'p2pool.forre.st',
310             'dabuttonfactory.com',
311             ] + (['liteco.in'] if net.NAME == 'litecoin' else []) + [
312         ]:
313             try:
314                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
315             except:
316                 log.err(None, 'Error resolving bootstrap node IP:')
317         
318         addrs = {}
319         try:
320             addrs = dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt')))
321         except:
322             print >>sys.stderr, "error reading addrs"
323         
324         def save_addrs():
325             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
326         task.LoopingCall(save_addrs).start(60)
327         
328         p2p_node = p2p.Node(
329             current_work=current_work,
330             port=args.p2pool_port,
331             net=net,
332             addr_store=addrs,
333             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
334         )
335         p2p_node.handle_shares = p2p_shares
336         p2p_node.handle_share_hashes = p2p_share_hashes
337         p2p_node.handle_get_shares = p2p_get_shares
338         
339         p2p_node.start()
340         
341         # send share when the chain changes to their chain
342         def work_changed(new_work):
343             #print 'Work changed:', new_work
344             shares = []
345             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
346                 if share.hash in shared_share_hashes:
347                     break
348                 shared_share_hashes.add(share.hash)
349                 shares.append(share)
350             
351             for peer in p2p_node.peers.itervalues():
352                 peer.sendShares([share for share in shares if share.peer is not peer])
353         
354         current_work.changed.watch(work_changed)
355         
356         def save_shares():
357             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
358                 ss.add_share(share)
359                 if share.hash in tracker.verified.shares:
360                     ss.add_verified_hash(share.hash)
361         task.LoopingCall(save_shares).start(60)
362         
363         print '    ...success!'
364         print
365         
366         @defer.inlineCallbacks
367         def upnp_thread():
368             while True:
369                 try:
370                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
371                     if is_lan:
372                         pm = yield portmapper.get_port_mapper()
373                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
374                 except defer.TimeoutError:
375                     pass
376                 except:
377                     if p2pool.DEBUG:
378                         log.err(None, "UPnP error:")
379                 yield deferral.sleep(random.expovariate(1/120))
380         
381         if args.upnp:
382             upnp_thread()
383         
384         # start listening for workers with a JSON-RPC server
385         
386         print 'Listening for workers on port %i...' % (args.worker_port,)
387         
388         # setup worker logic
389         
390         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
391         
392         removed_unstales_var = variable.Variable((0, 0, 0))
393         @tracker.verified.removed.watch
394         def _(share):
395             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
396                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
397                 removed_unstales_var.set((
398                     removed_unstales_var.value[0] + 1,
399                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
400                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
401                 ))
402         
403         removed_doa_unstales_var = variable.Variable(0)
404         @tracker.verified.removed.watch
405         def _(share):
406             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
407                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
408         
409         stale_counter = skiplists.SumSkipList(tracker, lambda share: (
410             1 if share.hash in my_share_hashes else 0,
411             1 if share.hash in my_doa_share_hashes else 0,
412             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 253 else 0,
413             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 254 else 0,
414         ), (0, 0, 0, 0), math.add_tuples)
415         def get_stale_counts():
416             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
417             my_shares = len(my_share_hashes)
418             my_doa_shares = len(my_doa_share_hashes)
419             my_shares_in_chain, my_doa_shares_in_chain, orphans_recorded_in_chain, doas_recorded_in_chain = stale_counter(
420                 current_work.value['best_share_hash'],
421                 tracker.verified.get_height(current_work.value['best_share_hash']),
422             )
423             my_shares_in_chain += removed_unstales_var.value[0]
424             my_doa_shares_in_chain += removed_doa_unstales_var.value
425             orphans_recorded_in_chain += removed_unstales_var.value[1]
426             doas_recorded_in_chain += removed_unstales_var.value[2]
427             
428             my_shares_not_in_chain = my_shares - my_shares_in_chain
429             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
430             
431             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
432         
433         
434         def get_payout_script_from_username(user):
435             if user is None:
436                 return None
437             try:
438                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
439             except: # XXX blah
440                 return None
441             return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
442         
443         def precompute(request):
444             payout_script = get_payout_script_from_username(request.getUser())
445             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
446                 payout_script = my_script
447             return payout_script,
448
449         def compute(payout_script):
450             if len(p2p_node.peers) == 0 and net.PERSIST:
451                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
452             if current_work.value['best_share_hash'] is None and net.PERSIST:
453                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
454             if time.time() > current_work2.value['last_update'] + 60:
455                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
456             
457             share_info, generate_tx = p2pool_data.generate_transaction(
458                 tracker=tracker,
459                 share_data=dict(
460                     previous_share_hash=current_work.value['best_share_hash'],
461                     coinbase='' if current_work.value['aux_work'] is None else
462                         '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
463                     nonce=struct.pack('<Q', random.randrange(2**64)),
464                     new_script=payout_script,
465                     subsidy=current_work2.value['subsidy'],
466                     donation=math.perfect_round(65535*args.donation_percentage/100),
467                     stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
468                         253 if orphans > orphans_recorded_in_chain else
469                         254 if doas > doas_recorded_in_chain else
470                         0
471                     )(*get_stale_counts()),
472                 ),
473                 block_target=current_work.value['bits'].target,
474                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
475                 net=net,
476             )
477             
478             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
479                 bitcoin_data.target_to_difficulty(share_info['bits'].target),
480                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
481                 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
482                 len(current_work2.value['transactions']),
483             )
484             
485             transactions = [generate_tx] + list(current_work2.value['transactions'])
486             merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
487             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
488             
489             return bitcoin_getwork.BlockAttempt(
490                 version=current_work.value['version'],
491                 previous_block=current_work.value['previous_block'],
492                 merkle_root=merkle_root,
493                 timestamp=current_work2.value['time'],
494                 bits=current_work.value['bits'],
495                 share_target=share_info['bits'].target,
496             )
497         
498         my_share_hashes = set()
499         my_doa_share_hashes = set()
500         
501         def got_response(header, request):
502             try:
503                 # match up with transactions
504                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
505                 if xxx is None:
506                     print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
507                     return False
508                 share_info, transactions, getwork_time = xxx
509                 
510                 hash_ = bitcoin_data.block_header_type.hash256(header)
511                 
512                 pow_hash = net.PARENT.POW_FUNC(header)
513                 
514                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
515                     if factory.conn.value is not None:
516                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
517                     else:
518                         print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
519                     if pow_hash <= header['bits'].target:
520                         print
521                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
522                         print
523                 
524                 try:
525                     if current_work.value['aux_work'] is not None and (pow_hash <= current_work.value['aux_work']['target'] or p2pool.DEBUG):
526                         aux_pow = dict(
527                             merkle_tx=dict(
528                                 tx=transactions[0],
529                                 block_hash=hash_,
530                                 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
531                                 index=0,
532                             ),
533                             merkle_branch=[],
534                             index=0,
535                             parent_block_header=header,
536                         )
537                         
538                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
539                         #print a, b
540                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
541                         def _(res):
542                             print "MERGED RESULT:", res
543                         merged.rpc_getauxblock(a, b).addBoth(_)
544                 except:
545                     log.err(None, 'Error while processing merged mining POW:')
546                 
547                 if pow_hash > share_info['bits'].target:
548                     print >>sys.stderr, 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, share_info['bits'].target)
549                     return False
550                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
551                 my_share_hashes.add(share.hash)
552                 if share.previous_hash != current_work.value['best_share_hash']:
553                     my_doa_share_hashes.add(share.hash)
554                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (request.getUser(), p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
555                 good = share.previous_hash == current_work.value['best_share_hash']
556                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
557                 p2p_shares([share])
558                 # eg. good = share.hash == current_work.value['best_share_hash'] here
559                 return good
560             except:
561                 log.err(None, 'Error processing data received from worker:')
562                 return False
563         
564         web_root = resource.Resource()
565         worker_interface.WorkerInterface(compute, got_response, current_work.changed, precompute).attach_to(web_root)
566         
567         def get_rate():
568             if tracker.get_height(current_work.value['best_share_hash']) < 720:
569                 return json.dumps(None)
570             return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
571                 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
572         
573         def get_users():
574             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
575             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
576             res = {}
577             for script in sorted(weights, key=lambda s: weights[s]):
578                 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
579             return json.dumps(res)
580         
581         def get_global_stats():
582             # averaged over last hour
583             lookbehind = 3600//net.SHARE_PERIOD
584             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
585                 return None
586             
587             nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
588             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
589             return json.dumps(dict(
590                 pool_nonstale_hash_rate=nonstale_hash_rate,
591                 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
592                 pool_stale_prop=stale_prop,
593             ))
594         
595         def get_local_stats():
596             lookbehind = 3600//net.SHARE_PERIOD
597             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
598                 return None
599             
600             global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
601             
602             my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
603             my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
604             my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
605             my_share_count = my_unstale_count + my_orphan_count + my_doa_count
606             my_stale_count = my_orphan_count + my_doa_count
607             
608             my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
609             
610             my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
611                 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
612                 if share.hash in my_share_hashes)
613             actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
614                 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
615             share_att_s = my_work / actual_time
616             
617             return json.dumps(dict(
618                 my_hash_rates_in_last_hour=dict(
619                     nonstale=share_att_s,
620                     rewarded=share_att_s/(1 - global_stale_prop),
621                     actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
622                 ),
623                 my_share_counts_in_last_hour=dict(
624                     shares=my_share_count,
625                     unstale_shares=my_unstale_count,
626                     stale_shares=my_stale_count,
627                     orphan_stale_shares=my_orphan_count,
628                     doa_stale_shares=my_doa_count,
629                 ),
630                 my_stale_proportions_in_last_hour=dict(
631                     stale=my_stale_prop,
632                     orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
633                     dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
634                 ),
635             ))
636         
637         def get_peer_addresses():
638             return ' '.join(peer.transport.getPeer().host + ':' + str(peer.transport.getPeer().port) for peer in p2p_node.peers.itervalues())
639         
640         class WebInterface(resource.Resource):
641             def __init__(self, func, mime_type):
642                 self.func, self.mime_type = func, mime_type
643             
644             def render_GET(self, request):
645                 request.setHeader('Content-Type', self.mime_type)
646                 return self.func()
647         
648         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
649         web_root.putChild('users', WebInterface(get_users, 'application/json'))
650         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
651         web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
652         web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
653         web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
654         if draw is not None:
655             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
656         
657         reactor.listenTCP(args.worker_port, server.Site(web_root))
658         
659         print '    ...success!'
660         print
661         
662         
663         # call getmemorypool every 15 seconds to check that bitcoind is alive
664         task.LoopingCall(set_real_work1).start(15)
665         
666         
667         # done!
668         print 'Started successfully!'
669         print
670         
671         
672         if hasattr(signal, 'SIGALRM'):
673             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
674                 lambda: sys.stdout.write('Watchdog timer went off at:\n' + ''.join(traceback.format_stack()))
675             ))
676             signal.siginterrupt(signal.SIGALRM, False)
677             task.LoopingCall(signal.alarm, 30).start(1)
678         
679         @defer.inlineCallbacks
680         def status_thread():
681             last_str = None
682             last_time = 0
683             while True:
684                 yield deferral.sleep(3)
685                 try:
686                     if time.time() > current_work2.value['last_update'] + 60:
687                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
688                     if current_work.value['best_share_hash'] is not None:
689                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
690                         if height > 2:
691                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
692                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
693                             (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
694                             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
695                             real_att_s = att_s / (1 - stale_prop)
696                             my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
697                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
698                                 math.format(int(real_att_s)),
699                                 height,
700                                 len(tracker.verified.shares),
701                                 len(tracker.shares),
702                                 weights.get(my_script, 0)/total_weight*100,
703                                 math.format(int(my_att_s)),
704                                 shares,
705                                 stale_orphan_shares,
706                                 stale_doa_shares,
707                                 len(p2p_node.peers),
708                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
709                             this_str += '\nAverage time between blocks: %.2f days' % (
710                                 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
711                             )
712                             this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
713                             conf = 0.95
714                             if shares:
715                                 stale_shares = stale_orphan_shares + stale_doa_shares
716                                 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
717                                 if stale_prop < .99:
718                                     this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - stale_prop) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
719                             if this_str != last_str or time.time() > last_time + 15:
720                                 print this_str
721                                 last_str = this_str
722                                 last_time = time.time()
723                 except:
724                     log.err()
725         status_thread()
726     except:
727         log.err(None, 'Fatal error:')
728
729 def run():
730     class FixedArgumentParser(argparse.ArgumentParser):
731         def _read_args_from_files(self, arg_strings):
732             # expand arguments referencing files
733             new_arg_strings = []
734             for arg_string in arg_strings:
735                 
736                 # for regular arguments, just add them back into the list
737                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
738                     new_arg_strings.append(arg_string)
739                 
740                 # replace arguments referencing files with the file content
741                 else:
742                     try:
743                         args_file = open(arg_string[1:])
744                         try:
745                             arg_strings = []
746                             for arg_line in args_file.read().splitlines():
747                                 for arg in self.convert_arg_line_to_args(arg_line):
748                                     arg_strings.append(arg)
749                             arg_strings = self._read_args_from_files(arg_strings)
750                             new_arg_strings.extend(arg_strings)
751                         finally:
752                             args_file.close()
753                     except IOError:
754                         err = sys.exc_info()[1]
755                         self.error(str(err))
756             
757             # return the modified argument list
758             return new_arg_strings
759         
760         def convert_arg_line_to_args(self, arg_line):
761             return [arg for arg in arg_line.split() if arg.strip()]
762     
763     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
764     parser.add_argument('--version', action='version', version=p2pool.__version__)
765     parser.add_argument('--net',
766         help='use specified network (default: bitcoin)',
767         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
768     parser.add_argument('--testnet',
769         help='''use the network's testnet''',
770         action='store_const', const=True, default=False, dest='testnet')
771     parser.add_argument('--debug',
772         help='enable debugging mode',
773         action='store_const', const=True, default=False, dest='debug')
774     parser.add_argument('-a', '--address',
775         help='generate payouts to this address (default: <address requested from bitcoind>)',
776         type=str, action='store', default=None, dest='address')
777     parser.add_argument('--logfile',
778         help='''log to this file (default: data/<NET>/log)''',
779         type=str, action='store', default=None, dest='logfile')
780     parser.add_argument('--merged-url',
781         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
782         type=str, action='store', default=None, dest='merged_url')
783     parser.add_argument('--merged-userpass',
784         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
785         type=str, action='store', default=None, dest='merged_userpass')
786     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
787         help='donate this percentage of work to author of p2pool (default: 0.5)',
788         type=float, action='store', default=0.5, dest='donation_percentage')
789     
790     p2pool_group = parser.add_argument_group('p2pool interface')
791     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
792         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
793         type=int, action='store', default=None, dest='p2pool_port')
794     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
795         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
796         type=str, action='append', default=[], dest='p2pool_nodes')
797     parser.add_argument('--disable-upnp',
798         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
799         action='store_false', default=True, dest='upnp')
800     
801     worker_group = parser.add_argument_group('worker interface')
802     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
803         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
804         type=int, action='store', default=None, dest='worker_port')
805     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
806         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
807         type=float, action='store', default=0, dest='worker_fee')
808     
809     bitcoind_group = parser.add_argument_group('bitcoind interface')
810     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
811         help='connect to this address (default: 127.0.0.1)',
812         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
813     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
814         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
815         type=int, action='store', default=None, dest='bitcoind_rpc_port')
816     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
817         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
818         type=int, action='store', default=None, dest='bitcoind_p2p_port')
819     
820     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
821         help='bitcoind RPC interface username (default: <empty>)',
822         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
823     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
824         help='bitcoind RPC interface password',
825         type=str, action='store', dest='bitcoind_rpc_password')
826     
827     args = parser.parse_args()
828     
829     if args.debug:
830         p2pool.DEBUG = True
831     
832     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
833     
834     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
835     if not os.path.exists(datadir_path):
836         os.makedirs(datadir_path)
837     
838     if args.logfile is None:
839         args.logfile = os.path.join(datadir_path, 'log')
840     
841     class EncodeReplacerPipe(object):
842         def __init__(self, inner_file):
843             self.inner_file = inner_file
844             self.softspace = 0
845         def write(self, data):
846             if isinstance(data, unicode):
847                 try:
848                     data = data.encode(self.inner_file.encoding, 'replace')
849                 except:
850                     data = data.encode('ascii', 'replace')
851             self.inner_file.write(data)
852         def flush(self):
853             self.inner_file.flush()
854     class LogFile(object):
855         def __init__(self, filename):
856             self.filename = filename
857             self.inner_file = None
858             self.reopen()
859         def reopen(self):
860             if self.inner_file is not None:
861                 self.inner_file.close()
862             open(self.filename, 'a').close()
863             f = open(self.filename, 'rb')
864             f.seek(0, os.SEEK_END)
865             length = f.tell()
866             if length > 100*1000*1000:
867                 f.seek(-1000*1000, os.SEEK_END)
868                 while True:
869                     if f.read(1) in ('', '\n'):
870                         break
871                 data = f.read()
872                 f.close()
873                 f = open(self.filename, 'wb')
874                 f.write(data)
875             f.close()
876             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
877         def write(self, data):
878             self.inner_file.write(data)
879         def flush(self):
880             self.inner_file.flush()
881     class TeePipe(object):
882         def __init__(self, outputs):
883             self.outputs = outputs
884         def write(self, data):
885             for output in self.outputs:
886                 output.write(data)
887         def flush(self):
888             for output in self.outputs:
889                 output.flush()
890     class TimestampingPipe(object):
891         def __init__(self, inner_file):
892             self.inner_file = inner_file
893             self.buf = ''
894             self.softspace = 0
895         def write(self, data):
896             buf = self.buf + data
897             lines = buf.split('\n')
898             for line in lines[:-1]:
899                 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
900                 self.inner_file.flush()
901             self.buf = lines[-1]
902         def flush(self):
903             pass
904     class AbortPipe(object):
905         def __init__(self, inner_file):
906             self.inner_file = inner_file
907             self.softspace = 0
908         def write(self, data):
909             try:
910                 self.inner_file.write(data)
911             except:
912                 sys.stdout = sys.__stdout__
913                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
914                 raise
915         def flush(self):
916             self.inner_file.flush()
917     class PrefixPipe(object):
918         def __init__(self, inner_file, prefix):
919             self.inner_file = inner_file
920             self.prefix = prefix
921             self.buf = ''
922             self.softspace = 0
923         def write(self, data):
924             buf = self.buf + data
925             lines = buf.split('\n')
926             for line in lines[:-1]:
927                 self.inner_file.write(self.prefix + line + '\n')
928                 self.inner_file.flush()
929             self.buf = lines[-1]
930         def flush(self):
931             pass
932     logfile = LogFile(args.logfile)
933     pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
934     sys.stdout = AbortPipe(pipe)
935     sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
936     if hasattr(signal, "SIGUSR1"):
937         def sigusr1(signum, frame):
938             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
939             logfile.reopen()
940             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
941         signal.signal(signal.SIGUSR1, sigusr1)
942     task.LoopingCall(logfile.reopen).start(5)
943     
944     if args.bitcoind_rpc_port is None:
945         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
946     
947     if args.bitcoind_p2p_port is None:
948         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
949     
950     if args.p2pool_port is None:
951         args.p2pool_port = net.P2P_PORT
952     
953     if args.worker_port is None:
954         args.worker_port = net.WORKER_PORT
955     
956     if args.address is not None:
957         try:
958             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
959         except Exception, e:
960             parser.error('error parsing address: ' + repr(e))
961     else:
962         args.pubkey_hash = None
963     
964     if (args.merged_url is None) ^ (args.merged_userpass is None):
965         parser.error('must specify --merged-url and --merged-userpass')
966     
967     reactor.callWhenRunning(main, args, net, datadir_path)
968     reactor.run()