changed Tracker.get_chain_known to the new, more explicit get_chain
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import itertools
10 import os
11 import random
12 import struct
13 import sys
14 import time
15 import json
16 import signal
17 import traceback
18
19 from twisted.internet import defer, reactor, task
20 from twisted.web import server, resource
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
23
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface
26 from util import expiring_dict, jsonrpc, variable, deferral, math
27 from . import p2p, skiplists, networks
28 import p2pool, p2pool.data as p2pool_data
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
33     work = yield bitcoind.rpc_getmemorypool()
34     defer.returnValue(dict(
35         version=work['version'],
36         previous_block_hash=int(work['previousblockhash'], 16),
37         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38         subsidy=work['coinbasevalue'],
39         time=work['time'],
40         target=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
41     ))
42
43 @deferral.retry('Error creating payout script:', 10)
44 @defer.inlineCallbacks
45 def get_payout_script2(bitcoind, net):
46     address = yield bitcoind.rpc_getaccountaddress('p2pool')
47     validate_response = yield bitcoind.rpc_validateaddress(address)
48     if 'pubkey' not in validate_response:
49         print '    Pubkey request failed. Falling back to payout to address.'
50         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net)))
51     pubkey = validate_response['pubkey'].decode('hex')
52     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
53
54 @defer.inlineCallbacks
55 def main(args, net, datadir_path):
56     try:
57         print 'p2pool (version %s)' % (p2pool.__version__,)
58         print
59         try:
60             from . import draw
61         except ImportError:
62             draw = None
63             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
64             print
65         
66         # connect to bitcoind over JSON-RPC and do initial getmemorypool
67         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
68         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
69         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
70         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.BITCOIN_RPC_CHECK)(bitcoind)
71         if not good:
72             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
73             return
74         temp_work = yield getwork(bitcoind)
75         print '    ...success!'
76         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
77         print
78         
79         # connect to bitcoind over bitcoin-p2p
80         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
81         factory = bitcoin_p2p.ClientFactory(net)
82         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
83         yield factory.getProtocol() # waits until handshake is successful
84         print '    ...success!'
85         print
86         
87         if args.pubkey_hash is None:
88             print 'Getting payout address from bitcoind...'
89             my_script = yield get_payout_script2(bitcoind, net)
90         else:
91             print 'Computing payout script from provided address....'
92             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
93         print '    ...success!'
94         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net)
95         print
96         
97         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
98         
99         tracker = p2pool_data.OkayTracker(net)
100         shared_share_hashes = set()
101         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
102         known_verified = set()
103         print "Loading shares..."
104         for i, (mode, contents) in enumerate(ss.get_shares()):
105             if mode == 'share':
106                 if contents.hash in tracker.shares:
107                     continue
108                 shared_share_hashes.add(contents.hash)
109                 contents.time_seen = 0
110                 tracker.add(contents)
111                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
112                     print "    %i" % (len(tracker.shares),)
113             elif mode == 'verified_hash':
114                 known_verified.add(contents)
115             else:
116                 raise AssertionError()
117         print "    ...inserting %i verified shares..." % (len(known_verified),)
118         for h in known_verified:
119             if h not in tracker.shares:
120                 ss.forget_verified_share(h)
121                 continue
122             tracker.verified.add(tracker.shares[h])
123         print "    ...done loading %i shares!" % (len(tracker.shares),)
124         print
125         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
126         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
127         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
128         
129         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
130         
131         pre_current_work = variable.Variable(None)
132         pre_current_work2 = variable.Variable(None)
133         pre_merged_work = variable.Variable(None)
134         # information affecting work that should trigger a long-polling update
135         current_work = variable.Variable(None)
136         # information affecting work that should not trigger a long-polling update
137         current_work2 = variable.Variable(None)
138         
139         work_updated = variable.Event()
140         
141         requested = expiring_dict.ExpiringDict(300)
142         
143         @defer.inlineCallbacks
144         def set_real_work1():
145             work = yield getwork(bitcoind)
146             pre_current_work2.set(dict(
147                 time=work['time'],
148                 transactions=work['transactions'],
149                 subsidy=work['subsidy'],
150                 clock_offset=time.time() - work['time'],
151                 last_update=time.time(),
152             )) # second set first because everything hooks on the first
153             pre_current_work.set(dict(
154                 version=work['version'],
155                 previous_block=work['previous_block_hash'],
156                 target=work['target'],
157             ))
158         
159         def set_real_work2():
160             best, desired = tracker.think(ht, pre_current_work.value['previous_block'], time.time() - pre_current_work2.value['clock_offset'])
161             
162             current_work2.set(pre_current_work2.value)
163             t = dict(pre_current_work.value)
164             t['best_share_hash'] = best
165             t['aux_work'] = pre_merged_work.value
166             current_work.set(t)
167             
168             t = time.time()
169             for peer2, share_hash in desired:
170                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
171                     continue
172                 last_request_time, count = requested.get(share_hash, (None, 0))
173                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
174                     continue
175                 potential_peers = set()
176                 for head in tracker.tails[share_hash]:
177                     potential_peers.update(peer_heads.get(head, set()))
178                 potential_peers = [peer for peer in potential_peers if peer.connected2]
179                 if count == 0 and peer2 is not None and peer2.connected2:
180                     peer = peer2
181                 else:
182                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
183                     if peer is None:
184                         continue
185                 
186                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
187                 peer.send_getshares(
188                     hashes=[share_hash],
189                     parents=2000,
190                     stops=list(set(tracker.heads) | set(
191                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
192                     ))[:100],
193                 )
194                 requested[share_hash] = t, count + 1
195         pre_current_work.changed.watch(lambda _: set_real_work2())
196         
197         print 'Initializing work...'
198         yield set_real_work1()
199         print '    ...success!'
200         print
201         
202         pre_merged_work.changed.watch(lambda _: set_real_work2())
203         ht.updated.watch(set_real_work2)
204         
205         @defer.inlineCallbacks
206         def set_merged_work():
207             if not args.merged_url:
208                 return
209             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
210             while True:
211                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
212                 pre_merged_work.set(dict(
213                     hash=int(auxblock['hash'], 16),
214                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
215                     chain_id=auxblock['chainid'],
216                 ))
217                 yield deferral.sleep(1)
218         set_merged_work()
219         
220         start_time = time.time() - current_work2.value['clock_offset']
221         
222         # setup p2p logic and join p2pool network
223         
224         def p2p_shares(shares, peer=None):
225             if len(shares) > 5:
226                 print 'Processing %i shares...' % (len(shares),)
227             
228             new_count = 0
229             for share in shares:
230                 if share.hash in tracker.shares:
231                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
232                     continue
233                 
234                 new_count += 1
235                 
236                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
237                 
238                 tracker.add(share)
239             
240             if shares and peer is not None:
241                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
242             
243             if new_count:
244                 set_real_work2()
245             
246             if len(shares) > 5:
247                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
248         
249         @tracker.verified.added.watch
250         def _(share):
251             if share.pow_hash <= share.header['target']:
252                 if factory.conn.value is not None:
253                     factory.conn.value.send_block(block=share.as_block(tracker))
254                 else:
255                     print 'No bitcoind connection! Erp!'
256                 print
257                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
258                 print
259         
260         def p2p_share_hashes(share_hashes, peer):
261             t = time.time()
262             get_hashes = []
263             for share_hash in share_hashes:
264                 if share_hash in tracker.shares:
265                     continue
266                 last_request_time, count = requested.get(share_hash, (None, 0))
267                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
268                     continue
269                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
270                 get_hashes.append(share_hash)
271                 requested[share_hash] = t, count + 1
272             
273             if share_hashes and peer is not None:
274                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
275             if get_hashes:
276                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
277         
278         def p2p_get_shares(share_hashes, parents, stops, peer):
279             parents = min(parents, 1000//len(share_hashes))
280             stops = set(stops)
281             shares = []
282             for share_hash in share_hashes:
283                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
284                     if share.hash in stops:
285                         break
286                     shares.append(share)
287             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
288             peer.sendShares(shares)
289         
290         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
291         
292         def parse(x):
293             if ':' in x:
294                 ip, port = x.split(':')
295                 return ip, int(port)
296             else:
297                 return x, net.P2P_PORT
298         
299         nodes = set([
300             ('72.14.191.28', net.P2P_PORT),
301             ('62.204.197.159', net.P2P_PORT),
302             ('142.58.248.28', net.P2P_PORT),
303             ('94.23.34.145', net.P2P_PORT),
304         ])
305         for host in [
306             'p2pool.forre.st',
307             'dabuttonfactory.com',
308         ]:
309             try:
310                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
311             except:
312                 log.err(None, 'Error resolving bootstrap node IP:')
313         
314         if net.NAME == 'litecoin':
315             nodes.add(((yield reactor.resolve('liteco.in')), net.P2P_PORT))
316         
317         addrs = {}
318         try:
319             addrs = dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt')))
320         except:
321             print "error reading addrs"
322         
323         def save_addrs():
324             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
325         task.LoopingCall(save_addrs).start(60)
326         
327         p2p_node = p2p.Node(
328             current_work=current_work,
329             port=args.p2pool_port,
330             net=net,
331             addr_store=addrs,
332             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
333         )
334         p2p_node.handle_shares = p2p_shares
335         p2p_node.handle_share_hashes = p2p_share_hashes
336         p2p_node.handle_get_shares = p2p_get_shares
337         
338         p2p_node.start()
339         
340         # send share when the chain changes to their chain
341         def work_changed(new_work):
342             #print 'Work changed:', new_work
343             shares = []
344             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
345                 if share.hash in shared_share_hashes:
346                     break
347                 shared_share_hashes.add(share.hash)
348                 shares.append(share)
349             
350             for peer in p2p_node.peers.itervalues():
351                 peer.sendShares([share for share in shares if share.peer is not peer])
352         
353         current_work.changed.watch(work_changed)
354         
355         def save_shares():
356             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
357                 ss.add_share(share)
358                 if share.hash in tracker.verified.shares:
359                     ss.add_verified_hash(share.hash)
360         task.LoopingCall(save_shares).start(60)
361         
362         print '    ...success!'
363         print
364         
365         @defer.inlineCallbacks
366         def upnp_thread():
367             while True:
368                 try:
369                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
370                     if is_lan:
371                         pm = yield portmapper.get_port_mapper()
372                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
373                 except defer.TimeoutError:
374                     pass
375                 except:
376                     if p2pool.DEBUG:
377                         log.err(None, "UPnP error:")
378                 yield deferral.sleep(random.expovariate(1/120))
379         
380         if args.upnp:
381             upnp_thread()
382         
383         # start listening for workers with a JSON-RPC server
384         
385         print 'Listening for workers on port %i...' % (args.worker_port,)
386         
387         # setup worker logic
388         
389         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
390         run_identifier = struct.pack('<I', random.randrange(2**32))
391         
392         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
393         removed_unstales = set()
394         def get_share_counts(doa=False):
395             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
396             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
397             shares_in_chain = my_shares & matching_in_chain
398             stale_shares = my_shares - matching_in_chain
399             if doa:
400                 stale_doa_shares = stale_shares & doa_shares
401                 stale_not_doa_shares = stale_shares - stale_doa_shares
402                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
403             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
404         @tracker.verified.removed.watch
405         def _(share):
406             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
407                 removed_unstales.add(share.hash)
408         
409         
410         def get_payout_script_from_username(request):
411             user = worker_interface.get_username(request)
412             if user is None:
413                 return None
414             try:
415                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, net))
416             except: # XXX blah
417                 return None
418         
419         def compute(request):
420             state = current_work.value
421             user = worker_interface.get_username(request)
422             
423             payout_script = get_payout_script_from_username(request)
424             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
425                 payout_script = my_script
426             
427             if len(p2p_node.peers) == 0 and net.PERSIST:
428                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
429             if state['best_share_hash'] is None and net.PERSIST:
430                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
431             if time.time() > current_work2.value['last_update'] + 60:
432                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
433             
434             previous_share = None if state['best_share_hash'] is None else tracker.shares[state['best_share_hash']]
435             subsidy = current_work2.value['subsidy']
436             share_info, generate_tx = p2pool_data.generate_transaction(
437                 tracker=tracker,
438                 share_data=dict(
439                     previous_share_hash=state['best_share_hash'],
440                     coinbase='' if state['aux_work'] is None else '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
441                     nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
442                     new_script=payout_script,
443                     subsidy=subsidy,
444                     donation=math.perfect_round(65535*args.donation_percentage/100),
445                     stale_frac=(lambda shares, stales:
446                         255 if shares == 0 else math.perfect_round(254*stales/shares)
447                     )(*get_share_counts()),
448                 ),
449                 block_target=state['target'],
450                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
451                 net=net,
452             )
453             
454             print 'New work for worker %s! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
455                 user,
456                 bitcoin_data.target_to_difficulty(share_info['target']),
457                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - subsidy//200)*1e-8, net.BITCOIN_SYMBOL,
458                 subsidy*1e-8, net.BITCOIN_SYMBOL,
459                 len(current_work2.value['transactions']),
460             )
461             
462             transactions = [generate_tx] + list(current_work2.value['transactions'])
463             merkle_root = bitcoin_data.merkle_hash(transactions)
464             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
465             
466             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['time'], state['target'], share_info['target']), state['best_share_hash']
467         
468         my_shares = set()
469         doa_shares = set()
470         
471         def got_response(header, request):
472             try:
473                 user = worker_interface.get_username(request)
474                 # match up with transactions
475                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
476                 if xxx is None:
477                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
478                     return False
479                 share_info, transactions, getwork_time = xxx
480                 
481                 hash_ = bitcoin_data.block_header_type.hash256(header)
482                 
483                 pow_hash = net.BITCOIN_POW_FUNC(header)
484                 
485                 if pow_hash <= header['target'] or p2pool.DEBUG:
486                     if factory.conn.value is not None:
487                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
488                     else:
489                         print 'No bitcoind connection! Erp!'
490                     if pow_hash <= header['target']:
491                         print
492                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
493                         print
494                 
495                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
496                     try:
497                         aux_pow = dict(
498                             merkle_tx=dict(
499                                 tx=transactions[0],
500                                 block_hash=hash_,
501                                 merkle_branch=[x['hash'] for x in p2pool_data.calculate_merkle_branch(transactions, 0)],
502                                 index=0,
503                             ),
504                             merkle_branch=[],
505                             index=0,
506                             parent_block_header=header,
507                         )
508                         
509                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
510                         #print a, b
511                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
512                         def _(res):
513                             print "MERGED RESULT:", res
514                         merged.rpc_getauxblock(a, b).addBoth(_)
515                     except:
516                         log.err(None, 'Error while processing merged mining POW:')
517                 
518                 target = share_info['target']
519                 if pow_hash > target:
520                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, target)
521                     return False
522                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
523                 my_shares.add(share.hash)
524                 if share.previous_hash != current_work.value['best_share_hash']:
525                     doa_shares.add(share.hash)
526                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
527                 good = share.previous_hash == current_work.value['best_share_hash']
528                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
529                 p2p_shares([share])
530                 # eg. good = share.hash == current_work.value['best_share_hash'] here
531                 return good
532             except:
533                 log.err(None, 'Error processing data received from worker:')
534                 return False
535         
536         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
537         
538         def get_rate():
539             if current_work.value['best_share_hash'] is not None:
540                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
541                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
542                 fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
543                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
544             return json.dumps(None)
545         
546         def get_users():
547             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
548             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
549             res = {}
550             for script in sorted(weights, key=lambda s: weights[s]):
551                 res[bitcoin_data.script2_to_human(script, net)] = weights[script]/total_weight
552             return json.dumps(res)
553         
554         class WebInterface(resource.Resource):
555             def __init__(self, func, mime_type):
556                 self.func, self.mime_type = func, mime_type
557             
558             def render_GET(self, request):
559                 request.setHeader('Content-Type', self.mime_type)
560                 return self.func()
561         
562         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
563         web_root.putChild('users', WebInterface(get_users, 'application/json'))
564         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
565         if draw is not None:
566             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
567         
568         reactor.listenTCP(args.worker_port, server.Site(web_root))
569         
570         print '    ...success!'
571         print
572         
573         # done!
574         
575         # do new getwork when a block is heard on the p2p interface
576         
577         def new_block(block_hash):
578             work_updated.happened()
579         factory.new_block.watch(new_block)
580         
581         print 'Started successfully!'
582         print
583         
584         @defer.inlineCallbacks
585         def work1_thread():
586             while True:
587                 flag = work_updated.get_deferred()
588                 try:
589                     yield set_real_work1()
590                 except:
591                     log.err()
592                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
593         
594         @defer.inlineCallbacks
595         def work2_thread():
596             while True:
597                 try:
598                     set_real_work2()
599                 except:
600                     log.err()
601                 yield deferral.sleep(random.expovariate(1/20))
602         
603         work1_thread()
604         work2_thread()
605         
606         
607         if hasattr(signal, 'SIGALRM'):
608             def watchdog_handler(signum, frame):
609                 print 'Watchdog timer went off at:'
610                 traceback.print_stack()
611             
612             signal.signal(signal.SIGALRM, watchdog_handler)
613             task.LoopingCall(signal.alarm, 30).start(1)
614         
615         last_str = None
616         last_time = 0
617         while True:
618             yield deferral.sleep(3)
619             try:
620                 if time.time() > current_work2.value['last_update'] + 60:
621                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
622                 if current_work.value['best_share_hash'] is not None:
623                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
624                     if height > 2:
625                         att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
626                         weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
627                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
628                         stale_shares = stale_doa_shares + stale_not_doa_shares
629                         fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
630                         this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
631                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
632                             height,
633                             len(tracker.verified.shares),
634                             len(tracker.shares),
635                             weights.get(my_script, 0)/total_weight*100,
636                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
637                             shares,
638                             stale_not_doa_shares,
639                             stale_doa_shares,
640                             len(p2p_node.peers),
641                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
642                         if fracs:
643                             med = math.median(fracs)
644                             this_str += '\nPool stales: %i%%' % (int(100*med+.5),)
645                             conf = 0.95
646                             if shares:
647                                 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
648                                 if med < .99:
649                                     this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - med) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
650                         if this_str != last_str or time.time() > last_time + 15:
651                             print this_str
652                             last_str = this_str
653                             last_time = time.time()
654             
655             
656             except:
657                 log.err()
658     except:
659         log.err(None, 'Fatal error:')
660     finally:
661         reactor.stop()
662
663 def run():
664     class FixedArgumentParser(argparse.ArgumentParser):
665         def _read_args_from_files(self, arg_strings):
666             # expand arguments referencing files
667             new_arg_strings = []
668             for arg_string in arg_strings:
669                 
670                 # for regular arguments, just add them back into the list
671                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
672                     new_arg_strings.append(arg_string)
673                 
674                 # replace arguments referencing files with the file content
675                 else:
676                     try:
677                         args_file = open(arg_string[1:])
678                         try:
679                             arg_strings = []
680                             for arg_line in args_file.read().splitlines():
681                                 for arg in self.convert_arg_line_to_args(arg_line):
682                                     arg_strings.append(arg)
683                             arg_strings = self._read_args_from_files(arg_strings)
684                             new_arg_strings.extend(arg_strings)
685                         finally:
686                             args_file.close()
687                     except IOError:
688                         err = sys.exc_info()[1]
689                         self.error(str(err))
690             
691             # return the modified argument list
692             return new_arg_strings
693         
694         def convert_arg_line_to_args(self, arg_line):
695             return [arg for arg in arg_line.split() if arg.strip()]
696     
697     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
698     parser.add_argument('--version', action='version', version=p2pool.__version__)
699     parser.add_argument('--net',
700         help='use specified network (default: bitcoin)',
701         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
702     parser.add_argument('--testnet',
703         help='''use the network's testnet''',
704         action='store_const', const=True, default=False, dest='testnet')
705     parser.add_argument('--debug',
706         help='enable debugging mode',
707         action='store_const', const=True, default=False, dest='debug')
708     parser.add_argument('-a', '--address',
709         help='generate payouts to this address (default: <address requested from bitcoind>)',
710         type=str, action='store', default=None, dest='address')
711     parser.add_argument('--logfile',
712         help='''log to this file (default: data/<NET>/log)''',
713         type=str, action='store', default=None, dest='logfile')
714     parser.add_argument('--merged-url',
715         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
716         type=str, action='store', default=None, dest='merged_url')
717     parser.add_argument('--merged-userpass',
718         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
719         type=str, action='store', default=None, dest='merged_userpass')
720     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
721         help='donate this percentage of work to author of p2pool (default: 0.5)',
722         type=float, action='store', default=0.5, dest='donation_percentage')
723     
724     p2pool_group = parser.add_argument_group('p2pool interface')
725     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
726         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
727         type=int, action='store', default=None, dest='p2pool_port')
728     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
729         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
730         type=str, action='append', default=[], dest='p2pool_nodes')
731     parser.add_argument('--disable-upnp',
732         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
733         action='store_false', default=True, dest='upnp')
734     
735     worker_group = parser.add_argument_group('worker interface')
736     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
737         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
738         type=int, action='store', default=None, dest='worker_port')
739     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
740         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
741         type=float, action='store', default=0, dest='worker_fee')
742     
743     bitcoind_group = parser.add_argument_group('bitcoind interface')
744     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
745         help='connect to this address (default: 127.0.0.1)',
746         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
747     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
748         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_RPC_PORT) for _, n in sorted(networks.realnets.items())),
749         type=int, action='store', default=None, dest='bitcoind_rpc_port')
750     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
751         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_P2P_PORT) for _, n in sorted(networks.realnets.items())),
752         type=int, action='store', default=None, dest='bitcoind_p2p_port')
753     
754     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
755         help='bitcoind RPC interface username (default: <empty>)',
756         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
757     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
758         help='bitcoind RPC interface password',
759         type=str, action='store', dest='bitcoind_rpc_password')
760     
761     args = parser.parse_args()
762     
763     if args.debug:
764         p2pool.DEBUG = True
765     
766     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
767     
768     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
769     if not os.path.exists(datadir_path):
770         os.makedirs(datadir_path)
771     
772     if args.logfile is None:
773         args.logfile = os.path.join(datadir_path, 'log')
774     
775     class EncodeReplacerPipe(object):
776         def __init__(self, inner_file):
777             self.inner_file = inner_file
778             self.softspace = 0
779         def write(self, data):
780             if isinstance(data, unicode):
781                 data = data.encode(self.inner_file.encoding, 'replace')
782             self.inner_file.write(data)
783         def flush(self):
784             self.inner_file.flush()
785     class LogFile(object):
786         def __init__(self, filename):
787             self.filename = filename
788             self.inner_file = None
789             self.reopen()
790         def reopen(self):
791             if self.inner_file is not None:
792                 self.inner_file.close()
793             open(self.filename, 'a').close()
794             f = open(self.filename, 'rb')
795             f.seek(0, os.SEEK_END)
796             length = f.tell()
797             if length > 100*1000*1000:
798                 f.seek(-1000*1000, os.SEEK_END)
799                 while True:
800                     if f.read(1) in ('', '\n'):
801                         break
802                 data = f.read()
803                 f.close()
804                 f = open(self.filename, 'wb')
805                 f.write(data)
806             f.close()
807             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
808         def write(self, data):
809             self.inner_file.write(data)
810         def flush(self):
811             self.inner_file.flush()
812     class TeePipe(object):
813         def __init__(self, outputs):
814             self.outputs = outputs
815         def write(self, data):
816             for output in self.outputs:
817                 output.write(data)
818         def flush(self):
819             for output in self.outputs:
820                 output.flush()
821     class TimestampingPipe(object):
822         def __init__(self, inner_file):
823             self.inner_file = inner_file
824             self.buf = ''
825             self.softspace = 0
826         def write(self, data):
827             buf = self.buf + data
828             lines = buf.split('\n')
829             for line in lines[:-1]:
830                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
831                 self.inner_file.flush()
832             self.buf = lines[-1]
833         def flush(self):
834             pass
835     class AbortPipe(object):
836         def __init__(self, inner_file):
837             self.inner_file = inner_file
838             self.softspace = 0
839         def write(self, data):
840             try:
841                 self.inner_file.write(data)
842             except:
843                 sys.stdout = sys.__stdout__
844                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
845                 raise
846         def flush(self):
847             self.inner_file.flush()
848     logfile = LogFile(args.logfile)
849     sys.stdout = sys.stderr = log.DefaultObserver.stderr = AbortPipe(TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile])))
850     if hasattr(signal, "SIGUSR1"):
851         def sigusr1(signum, frame):
852             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
853             logfile.reopen()
854             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
855         signal.signal(signal.SIGUSR1, sigusr1)
856     task.LoopingCall(logfile.reopen).start(5)
857     
858     if args.bitcoind_rpc_port is None:
859         args.bitcoind_rpc_port = net.BITCOIN_RPC_PORT
860     
861     if args.bitcoind_p2p_port is None:
862         args.bitcoind_p2p_port = net.BITCOIN_P2P_PORT
863     
864     if args.p2pool_port is None:
865         args.p2pool_port = net.P2P_PORT
866     
867     if args.worker_port is None:
868         args.worker_port = net.WORKER_PORT
869     
870     if args.address is not None:
871         try:
872             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net)
873         except Exception, e:
874             parser.error('error parsing address: ' + repr(e))
875     else:
876         args.pubkey_hash = None
877     
878     if (args.merged_url is None) ^ (args.merged_userpass is None):
879         parser.error('must specify --merged-url and --merged-userpass')
880     
881     reactor.callWhenRunning(main, args, net, datadir_path)
882     reactor.run()