use mean instead of median for stale proportion calculation
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net2):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net2)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
52
53 @defer.inlineCallbacks
54 def main(args, net, datadir_path):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         try:
59             from . import draw
60         except ImportError:
61             draw = None
62             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
63             print
64         
65         bitcoin_data.Type.enable_caching()
66         
67         # connect to bitcoind over JSON-RPC and do initial getmemorypool
68         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
69         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
70         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
71         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
72         if not good:
73             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
74             return
75         temp_work = yield getwork(bitcoind)
76         print '    ...success!'
77         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
78         print
79         
80         # connect to bitcoind over bitcoin-p2p
81         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
82         factory = bitcoin_p2p.ClientFactory(net.PARENT)
83         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
84         yield factory.getProtocol() # waits until handshake is successful
85         print '    ...success!'
86         print
87         
88         if args.pubkey_hash is None:
89             print 'Getting payout address from bitcoind...'
90             my_script = yield get_payout_script2(bitcoind, net.PARENT)
91         else:
92             print 'Computing payout script from provided address....'
93             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
94         print '    ...success!'
95         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
96         print
97         
98         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
99         
100         tracker = p2pool_data.OkayTracker(net)
101         shared_share_hashes = set()
102         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
103         known_verified = set()
104         print "Loading shares..."
105         for i, (mode, contents) in enumerate(ss.get_shares()):
106             if mode == 'share':
107                 if contents.hash in tracker.shares:
108                     continue
109                 shared_share_hashes.add(contents.hash)
110                 contents.time_seen = 0
111                 tracker.add(contents)
112                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
113                     print "    %i" % (len(tracker.shares),)
114             elif mode == 'verified_hash':
115                 known_verified.add(contents)
116             else:
117                 raise AssertionError()
118         print "    ...inserting %i verified shares..." % (len(known_verified),)
119         for h in known_verified:
120             if h not in tracker.shares:
121                 ss.forget_verified_share(h)
122                 continue
123             tracker.verified.add(tracker.shares[h])
124         print "    ...done loading %i shares!" % (len(tracker.shares),)
125         print
126         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
127         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
128         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
129         
130         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
131         
132         pre_current_work = variable.Variable(None)
133         pre_current_work2 = variable.Variable(None)
134         pre_merged_work = variable.Variable(None)
135         # information affecting work that should trigger a long-polling update
136         current_work = variable.Variable(None)
137         # information affecting work that should not trigger a long-polling update
138         current_work2 = variable.Variable(None)
139         
140         requested = expiring_dict.ExpiringDict(300)
141         
142         @defer.inlineCallbacks
143         def set_real_work1():
144             work = yield getwork(bitcoind)
145             pre_current_work2.set(dict(
146                 time=work['time'],
147                 transactions=work['transactions'],
148                 subsidy=work['subsidy'],
149                 clock_offset=time.time() - work['time'],
150                 last_update=time.time(),
151             )) # second set first because everything hooks on the first
152             pre_current_work.set(dict(
153                 version=work['version'],
154                 previous_block=work['previous_block_hash'],
155                 bits=work['bits'],
156             ))
157         factory.new_block.watch(lambda block_hash: set_real_work1())
158         
159         def set_real_work2():
160             best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
161             
162             current_work2.set(pre_current_work2.value)
163             t = dict(pre_current_work.value)
164             t['best_share_hash'] = best
165             t['aux_work'] = pre_merged_work.value
166             current_work.set(t)
167             
168             t = time.time()
169             for peer2, share_hash in desired:
170                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
171                     continue
172                 last_request_time, count = requested.get(share_hash, (None, 0))
173                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
174                     continue
175                 potential_peers = set()
176                 for head in tracker.tails[share_hash]:
177                     potential_peers.update(peer_heads.get(head, set()))
178                 potential_peers = [peer for peer in potential_peers if peer.connected2]
179                 if count == 0 and peer2 is not None and peer2.connected2:
180                     peer = peer2
181                 else:
182                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
183                     if peer is None:
184                         continue
185                 
186                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
187                 peer.send_getshares(
188                     hashes=[share_hash],
189                     parents=2000,
190                     stops=list(set(tracker.heads) | set(
191                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
192                     ))[:100],
193                 )
194                 requested[share_hash] = t, count + 1
195         pre_current_work.changed.watch(lambda _: set_real_work2())
196         
197         print 'Initializing work...'
198         yield set_real_work1()
199         print '    ...success!'
200         print
201         
202         pre_merged_work.changed.watch(lambda _: set_real_work2())
203         ht.updated.watch(set_real_work2)
204         
205         @defer.inlineCallbacks
206         def set_merged_work():
207             if not args.merged_url:
208                 return
209             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
210             while True:
211                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
212                 pre_merged_work.set(dict(
213                     hash=int(auxblock['hash'], 16),
214                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
215                     chain_id=auxblock['chainid'],
216                 ))
217                 yield deferral.sleep(1)
218         set_merged_work()
219         
220         @pre_merged_work.changed.watch
221         def _(new_merged_work):
222             print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
223         
224         start_time = time.time() - current_work2.value['clock_offset']
225         
226         # setup p2p logic and join p2pool network
227         
228         def p2p_shares(shares, peer=None):
229             if len(shares) > 5:
230                 print 'Processing %i shares...' % (len(shares),)
231             
232             new_count = 0
233             for share in shares:
234                 if share.hash in tracker.shares:
235                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
236                     continue
237                 
238                 new_count += 1
239                 
240                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
241                 
242                 tracker.add(share)
243             
244             if shares and peer is not None:
245                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
246             
247             if new_count:
248                 set_real_work2()
249             
250             if len(shares) > 5:
251                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
252         
253         @tracker.verified.added.watch
254         def _(share):
255             if share.pow_hash <= share.header['bits'].target:
256                 if factory.conn.value is not None:
257                     factory.conn.value.send_block(block=share.as_block(tracker))
258                 else:
259                     print 'No bitcoind connection! Erp!'
260                 print
261                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
262                 print
263         
264         def p2p_share_hashes(share_hashes, peer):
265             t = time.time()
266             get_hashes = []
267             for share_hash in share_hashes:
268                 if share_hash in tracker.shares:
269                     continue
270                 last_request_time, count = requested.get(share_hash, (None, 0))
271                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
272                     continue
273                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
274                 get_hashes.append(share_hash)
275                 requested[share_hash] = t, count + 1
276             
277             if share_hashes and peer is not None:
278                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
279             if get_hashes:
280                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
281         
282         def p2p_get_shares(share_hashes, parents, stops, peer):
283             parents = min(parents, 1000//len(share_hashes))
284             stops = set(stops)
285             shares = []
286             for share_hash in share_hashes:
287                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
288                     if share.hash in stops:
289                         break
290                     shares.append(share)
291             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
292             peer.sendShares(shares)
293         
294         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
295         
296         def parse(x):
297             if ':' in x:
298                 ip, port = x.split(':')
299                 return ip, int(port)
300             else:
301                 return x, net.P2P_PORT
302         
303         nodes = set([
304             ('72.14.191.28', net.P2P_PORT),
305             ('62.204.197.159', net.P2P_PORT),
306             ('142.58.248.28', net.P2P_PORT),
307             ('94.23.34.145', net.P2P_PORT),
308         ])
309         for host in [
310             'p2pool.forre.st',
311             'dabuttonfactory.com',
312             ] + (['liteco.in'] if net.NAME == 'litecoin' else []) + [
313         ]:
314             try:
315                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
316             except:
317                 log.err(None, 'Error resolving bootstrap node IP:')
318         
319         addrs = {}
320         try:
321             addrs = dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt')))
322         except:
323             print "error reading addrs"
324         
325         def save_addrs():
326             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
327         task.LoopingCall(save_addrs).start(60)
328         
329         p2p_node = p2p.Node(
330             current_work=current_work,
331             port=args.p2pool_port,
332             net=net,
333             addr_store=addrs,
334             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
335         )
336         p2p_node.handle_shares = p2p_shares
337         p2p_node.handle_share_hashes = p2p_share_hashes
338         p2p_node.handle_get_shares = p2p_get_shares
339         
340         p2p_node.start()
341         
342         # send share when the chain changes to their chain
343         def work_changed(new_work):
344             #print 'Work changed:', new_work
345             shares = []
346             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
347                 if share.hash in shared_share_hashes:
348                     break
349                 shared_share_hashes.add(share.hash)
350                 shares.append(share)
351             
352             for peer in p2p_node.peers.itervalues():
353                 peer.sendShares([share for share in shares if share.peer is not peer])
354         
355         current_work.changed.watch(work_changed)
356         
357         def save_shares():
358             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
359                 ss.add_share(share)
360                 if share.hash in tracker.verified.shares:
361                     ss.add_verified_hash(share.hash)
362         task.LoopingCall(save_shares).start(60)
363         
364         print '    ...success!'
365         print
366         
367         @defer.inlineCallbacks
368         def upnp_thread():
369             while True:
370                 try:
371                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
372                     if is_lan:
373                         pm = yield portmapper.get_port_mapper()
374                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
375                 except defer.TimeoutError:
376                     pass
377                 except:
378                     if p2pool.DEBUG:
379                         log.err(None, "UPnP error:")
380                 yield deferral.sleep(random.expovariate(1/120))
381         
382         if args.upnp:
383             upnp_thread()
384         
385         # start listening for workers with a JSON-RPC server
386         
387         print 'Listening for workers on port %i...' % (args.worker_port,)
388         
389         # setup worker logic
390         
391         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
392         
393         removed_unstales_var = variable.Variable(0)
394         @tracker.verified.removed.watch
395         def _(share):
396             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
397                 removed_unstales.set(removed_unstales.value + 1)
398         
399         removed_doa_unstales_var = variable.Variable(0)
400         @tracker.verified.removed.watch
401         def _(share):
402             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
403                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
404         
405         stale_counter = skiplists.SumSkipList(tracker, lambda share: (
406             1 if share.hash in my_share_hashes else 0,
407             1 if share.hash in my_doa_share_hashes else 0,
408         ), (0, 0), math.add_tuples)
409         def get_stale_counts():
410             '''Returns (orphans, doas), total'''
411             my_shares = len(my_share_hashes)
412             my_doa_shares = len(my_doa_share_hashes)
413             my_shares_in_chain, my_doa_shares_in_chain = stale_counter(
414                 current_work.value['best_share_hash'],
415                 tracker.verified.get_height(current_work.value['best_share_hash']),
416             )
417             my_shares_in_chain += removed_unstales_var.value
418             my_doa_shares_in_chain += removed_doa_unstales_var.value
419             my_shares_not_in_chain = my_shares - my_shares_in_chain
420             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
421             
422             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares
423         
424         
425         def get_payout_script_from_username(user):
426             if user is None:
427                 return None
428             try:
429                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
430             except: # XXX blah
431                 return None
432             return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
433         
434         def compute(user):
435             state = current_work.value
436             
437             payout_script = get_payout_script_from_username(user)
438             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
439                 payout_script = my_script
440             
441             if len(p2p_node.peers) == 0 and net.PERSIST:
442                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
443             if state['best_share_hash'] is None and net.PERSIST:
444                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
445             if time.time() > current_work2.value['last_update'] + 60:
446                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
447             
448             previous_share = None if state['best_share_hash'] is None else tracker.shares[state['best_share_hash']]
449             subsidy = current_work2.value['subsidy']
450             share_info, generate_tx = p2pool_data.generate_transaction(
451                 tracker=tracker,
452                 share_data=dict(
453                     previous_share_hash=state['best_share_hash'],
454                     coinbase='' if state['aux_work'] is None else '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
455                     nonce=struct.pack('<Q', random.randrange(2**64)),
456                     new_script=payout_script,
457                     subsidy=subsidy,
458                     donation=math.perfect_round(65535*args.donation_percentage/100),
459                     stale_frac=(lambda (orphans, doas), total:
460                         255 if total == 0 else math.perfect_round(254*(orphans + doas)/total)
461                     )(*get_stale_counts()),
462                 ),
463                 block_target=state['bits'].target,
464                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
465                 net=net,
466             )
467             
468             print 'New work for worker %s! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
469                 user,
470                 bitcoin_data.target_to_difficulty(share_info['bits'].target),
471                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - subsidy//200)*1e-8, net.PARENT.SYMBOL,
472                 subsidy*1e-8, net.PARENT.SYMBOL,
473                 len(current_work2.value['transactions']),
474             )
475             
476             transactions = [generate_tx] + list(current_work2.value['transactions'])
477             merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
478             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
479             
480             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['time'], state['bits'], share_info['bits'].target)
481         
482         my_share_hashes = set()
483         my_doa_share_hashes = set()
484         
485         def got_response(header, request):
486             try:
487                 # match up with transactions
488                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
489                 if xxx is None:
490                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
491                     return False
492                 share_info, transactions, getwork_time = xxx
493                 
494                 hash_ = bitcoin_data.block_header_type.hash256(header)
495                 
496                 pow_hash = net.PARENT.POW_FUNC(header)
497                 
498                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
499                     if factory.conn.value is not None:
500                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
501                     else:
502                         print 'No bitcoind connection! Erp!'
503                     if pow_hash <= header['bits'].target:
504                         print
505                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
506                         print
507                 
508                 try:
509                     if current_work.value['aux_work'] is not None and (pow_hash <= current_work.value['aux_work']['target'] or p2pool.DEBUG):
510                         aux_pow = dict(
511                             merkle_tx=dict(
512                                 tx=transactions[0],
513                                 block_hash=hash_,
514                                 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
515                                 index=0,
516                             ),
517                             merkle_branch=[],
518                             index=0,
519                             parent_block_header=header,
520                         )
521                         
522                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
523                         #print a, b
524                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
525                         def _(res):
526                             print "MERGED RESULT:", res
527                         merged.rpc_getauxblock(a, b).addBoth(_)
528                 except:
529                     log.err(None, 'Error while processing merged mining POW:')
530                 
531                 if pow_hash > share_info['bits'].target:
532                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, share_info['bits'].target)
533                     return False
534                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
535                 my_share_hashes.add(share.hash)
536                 if share.previous_hash != current_work.value['best_share_hash']:
537                     my_doa_share_hashes.add(share.hash)
538                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (request.getUser(), p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
539                 good = share.previous_hash == current_work.value['best_share_hash']
540                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
541                 p2p_shares([share])
542                 # eg. good = share.hash == current_work.value['best_share_hash'] here
543                 return good
544             except:
545                 log.err(None, 'Error processing data received from worker:')
546                 return False
547         
548         web_root = resource.Resource()
549         worker_interface.WorkerInterface(compute, got_response, current_work.changed).attach_to(web_root)
550         
551         def get_rate():
552             if current_work.value['best_share_hash'] is not None:
553                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
554                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
555                 fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
556                 return json.dumps(int(att_s / (1. - (math.mean(fracs) if fracs else 0))))
557             return json.dumps(None)
558         
559         def get_users():
560             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
561             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
562             res = {}
563             for script in sorted(weights, key=lambda s: weights[s]):
564                 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
565             return json.dumps(res)
566         
567         class WebInterface(resource.Resource):
568             def __init__(self, func, mime_type):
569                 self.func, self.mime_type = func, mime_type
570             
571             def render_GET(self, request):
572                 request.setHeader('Content-Type', self.mime_type)
573                 return self.func()
574         
575         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
576         web_root.putChild('users', WebInterface(get_users, 'application/json'))
577         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
578         if draw is not None:
579             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
580         
581         reactor.listenTCP(args.worker_port, server.Site(web_root))
582         
583         print '    ...success!'
584         print
585         
586         
587         # call getmemorypool every 15 seconds to check that bitcoind is alive
588         task.LoopingCall(set_real_work1).start(15)
589         
590         
591         # done!
592         print 'Started successfully!'
593         print
594         
595         
596         if hasattr(signal, 'SIGALRM'):
597             def watchdog_handler(signum, frame):
598                 print 'Watchdog timer went off at:'
599                 traceback.print_stack()
600             
601             signal.signal(signal.SIGALRM, watchdog_handler)
602             task.LoopingCall(signal.alarm, 30).start(1)
603         
604         @defer.inlineCallbacks
605         def status_thread():
606             last_str = None
607             last_time = 0
608             while True:
609                 yield deferral.sleep(3)
610                 try:
611                     if time.time() > current_work2.value['last_update'] + 60:
612                         print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
613                     if current_work.value['best_share_hash'] is not None:
614                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
615                         if height > 2:
616                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
617                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
618                             (stale_orphan_shares, stale_doa_shares), shares = get_stale_counts()
619                             fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
620                             real_att_s = att_s / (1. - (math.mean(fracs) if fracs else 0))
621                             my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
622                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
623                                 math.format(int(real_att_s)),
624                                 height,
625                                 len(tracker.verified.shares),
626                                 len(tracker.shares),
627                                 weights.get(my_script, 0)/total_weight*100,
628                                 math.format(int(my_att_s)),
629                                 shares,
630                                 stale_orphan_shares,
631                                 stale_doa_shares,
632                                 len(p2p_node.peers),
633                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
634                             this_str += '\nAverage time between blocks: %.2f days' % (
635                                 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
636                             )
637                             if fracs:
638                                 med = math.mean(fracs)
639                                 this_str += '\nPool stales: %i%%' % (int(100*med+.5),)
640                                 conf = 0.95
641                                 if shares:
642                                     stale_shares = stale_orphan_shares + stale_doa_shares
643                                     this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
644                                     if med < .99:
645                                         this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - med) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
646                             if this_str != last_str or time.time() > last_time + 15:
647                                 print this_str
648                                 last_str = this_str
649                                 last_time = time.time()
650                 except:
651                     log.err()
652         status_thread()
653     except:
654         log.err(None, 'Fatal error:')
655
656 def run():
657     class FixedArgumentParser(argparse.ArgumentParser):
658         def _read_args_from_files(self, arg_strings):
659             # expand arguments referencing files
660             new_arg_strings = []
661             for arg_string in arg_strings:
662                 
663                 # for regular arguments, just add them back into the list
664                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
665                     new_arg_strings.append(arg_string)
666                 
667                 # replace arguments referencing files with the file content
668                 else:
669                     try:
670                         args_file = open(arg_string[1:])
671                         try:
672                             arg_strings = []
673                             for arg_line in args_file.read().splitlines():
674                                 for arg in self.convert_arg_line_to_args(arg_line):
675                                     arg_strings.append(arg)
676                             arg_strings = self._read_args_from_files(arg_strings)
677                             new_arg_strings.extend(arg_strings)
678                         finally:
679                             args_file.close()
680                     except IOError:
681                         err = sys.exc_info()[1]
682                         self.error(str(err))
683             
684             # return the modified argument list
685             return new_arg_strings
686         
687         def convert_arg_line_to_args(self, arg_line):
688             return [arg for arg in arg_line.split() if arg.strip()]
689     
690     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
691     parser.add_argument('--version', action='version', version=p2pool.__version__)
692     parser.add_argument('--net',
693         help='use specified network (default: bitcoin)',
694         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
695     parser.add_argument('--testnet',
696         help='''use the network's testnet''',
697         action='store_const', const=True, default=False, dest='testnet')
698     parser.add_argument('--debug',
699         help='enable debugging mode',
700         action='store_const', const=True, default=False, dest='debug')
701     parser.add_argument('-a', '--address',
702         help='generate payouts to this address (default: <address requested from bitcoind>)',
703         type=str, action='store', default=None, dest='address')
704     parser.add_argument('--logfile',
705         help='''log to this file (default: data/<NET>/log)''',
706         type=str, action='store', default=None, dest='logfile')
707     parser.add_argument('--merged-url',
708         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
709         type=str, action='store', default=None, dest='merged_url')
710     parser.add_argument('--merged-userpass',
711         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
712         type=str, action='store', default=None, dest='merged_userpass')
713     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
714         help='donate this percentage of work to author of p2pool (default: 0.5)',
715         type=float, action='store', default=0.5, dest='donation_percentage')
716     
717     p2pool_group = parser.add_argument_group('p2pool interface')
718     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
719         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
720         type=int, action='store', default=None, dest='p2pool_port')
721     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
722         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
723         type=str, action='append', default=[], dest='p2pool_nodes')
724     parser.add_argument('--disable-upnp',
725         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
726         action='store_false', default=True, dest='upnp')
727     
728     worker_group = parser.add_argument_group('worker interface')
729     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
730         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
731         type=int, action='store', default=None, dest='worker_port')
732     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
733         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
734         type=float, action='store', default=0, dest='worker_fee')
735     
736     bitcoind_group = parser.add_argument_group('bitcoind interface')
737     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
738         help='connect to this address (default: 127.0.0.1)',
739         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
740     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
741         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
742         type=int, action='store', default=None, dest='bitcoind_rpc_port')
743     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
744         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
745         type=int, action='store', default=None, dest='bitcoind_p2p_port')
746     
747     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
748         help='bitcoind RPC interface username (default: <empty>)',
749         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
750     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
751         help='bitcoind RPC interface password',
752         type=str, action='store', dest='bitcoind_rpc_password')
753     
754     args = parser.parse_args()
755     
756     if args.debug:
757         p2pool.DEBUG = True
758     
759     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
760     
761     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
762     if not os.path.exists(datadir_path):
763         os.makedirs(datadir_path)
764     
765     if args.logfile is None:
766         args.logfile = os.path.join(datadir_path, 'log')
767     
768     class EncodeReplacerPipe(object):
769         def __init__(self, inner_file):
770             self.inner_file = inner_file
771             self.softspace = 0
772         def write(self, data):
773             if isinstance(data, unicode):
774                 try:
775                     data = data.encode(self.inner_file.encoding, 'replace')
776                 except:
777                     data = data.encode('ascii', 'replace')
778             self.inner_file.write(data)
779         def flush(self):
780             self.inner_file.flush()
781     class LogFile(object):
782         def __init__(self, filename):
783             self.filename = filename
784             self.inner_file = None
785             self.reopen()
786         def reopen(self):
787             if self.inner_file is not None:
788                 self.inner_file.close()
789             open(self.filename, 'a').close()
790             f = open(self.filename, 'rb')
791             f.seek(0, os.SEEK_END)
792             length = f.tell()
793             if length > 100*1000*1000:
794                 f.seek(-1000*1000, os.SEEK_END)
795                 while True:
796                     if f.read(1) in ('', '\n'):
797                         break
798                 data = f.read()
799                 f.close()
800                 f = open(self.filename, 'wb')
801                 f.write(data)
802             f.close()
803             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
804         def write(self, data):
805             self.inner_file.write(data)
806         def flush(self):
807             self.inner_file.flush()
808     class TeePipe(object):
809         def __init__(self, outputs):
810             self.outputs = outputs
811         def write(self, data):
812             for output in self.outputs:
813                 output.write(data)
814         def flush(self):
815             for output in self.outputs:
816                 output.flush()
817     class TimestampingPipe(object):
818         def __init__(self, inner_file):
819             self.inner_file = inner_file
820             self.buf = ''
821             self.softspace = 0
822         def write(self, data):
823             buf = self.buf + data
824             lines = buf.split('\n')
825             for line in lines[:-1]:
826                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
827                 self.inner_file.flush()
828             self.buf = lines[-1]
829         def flush(self):
830             pass
831     class AbortPipe(object):
832         def __init__(self, inner_file):
833             self.inner_file = inner_file
834             self.softspace = 0
835         def write(self, data):
836             try:
837                 self.inner_file.write(data)
838             except:
839                 sys.stdout = sys.__stdout__
840                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
841                 raise
842         def flush(self):
843             self.inner_file.flush()
844     logfile = LogFile(args.logfile)
845     sys.stdout = sys.stderr = log.DefaultObserver.stderr = AbortPipe(TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile])))
846     if hasattr(signal, "SIGUSR1"):
847         def sigusr1(signum, frame):
848             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
849             logfile.reopen()
850             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
851         signal.signal(signal.SIGUSR1, sigusr1)
852     task.LoopingCall(logfile.reopen).start(5)
853     
854     if args.bitcoind_rpc_port is None:
855         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
856     
857     if args.bitcoind_p2p_port is None:
858         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
859     
860     if args.p2pool_port is None:
861         args.p2pool_port = net.P2P_PORT
862     
863     if args.worker_port is None:
864         args.worker_port = net.WORKER_PORT
865     
866     if args.address is not None:
867         try:
868             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
869         except Exception, e:
870             parser.error('error parsing address: ' + repr(e))
871     else:
872         args.pubkey_hash = None
873     
874     if (args.merged_url is None) ^ (args.merged_userpass is None):
875         parser.error('must specify --merged-url and --merged-userpass')
876     
877     reactor.callWhenRunning(main, args, net, datadir_path)
878     reactor.run()