don't enable serialization caching by default
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net2):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net2)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
52
53 @defer.inlineCallbacks
54 def main(args, net, datadir_path):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         try:
59             from . import draw
60         except ImportError:
61             draw = None
62             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
63             print
64         
65         bitcoin_data.Type.enable_caching()
66         
67         # connect to bitcoind over JSON-RPC and do initial getmemorypool
68         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
69         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
70         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
71         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
72         if not good:
73             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
74             return
75         temp_work = yield getwork(bitcoind)
76         print '    ...success!'
77         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
78         print
79         
80         # connect to bitcoind over bitcoin-p2p
81         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
82         factory = bitcoin_p2p.ClientFactory(net.PARENT)
83         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
84         yield factory.getProtocol() # waits until handshake is successful
85         print '    ...success!'
86         print
87         
88         if args.pubkey_hash is None:
89             print 'Getting payout address from bitcoind...'
90             my_script = yield get_payout_script2(bitcoind, net.PARENT)
91         else:
92             print 'Computing payout script from provided address....'
93             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
94         print '    ...success!'
95         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
96         print
97         
98         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
99         
100         tracker = p2pool_data.OkayTracker(net)
101         shared_share_hashes = set()
102         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
103         known_verified = set()
104         print "Loading shares..."
105         for i, (mode, contents) in enumerate(ss.get_shares()):
106             if mode == 'share':
107                 if contents.hash in tracker.shares:
108                     continue
109                 shared_share_hashes.add(contents.hash)
110                 contents.time_seen = 0
111                 tracker.add(contents)
112                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
113                     print "    %i" % (len(tracker.shares),)
114             elif mode == 'verified_hash':
115                 known_verified.add(contents)
116             else:
117                 raise AssertionError()
118         print "    ...inserting %i verified shares..." % (len(known_verified),)
119         for h in known_verified:
120             if h not in tracker.shares:
121                 ss.forget_verified_share(h)
122                 continue
123             tracker.verified.add(tracker.shares[h])
124         print "    ...done loading %i shares!" % (len(tracker.shares),)
125         print
126         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
127         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
128         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
129         
130         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
131         
132         pre_current_work = variable.Variable(None)
133         pre_current_work2 = variable.Variable(None)
134         pre_merged_work = variable.Variable(None)
135         # information affecting work that should trigger a long-polling update
136         current_work = variable.Variable(None)
137         # information affecting work that should not trigger a long-polling update
138         current_work2 = variable.Variable(None)
139         
140         work_updated = variable.Event()
141         
142         requested = expiring_dict.ExpiringDict(300)
143         
144         @defer.inlineCallbacks
145         def set_real_work1():
146             work = yield getwork(bitcoind)
147             pre_current_work2.set(dict(
148                 time=work['time'],
149                 transactions=work['transactions'],
150                 subsidy=work['subsidy'],
151                 clock_offset=time.time() - work['time'],
152                 last_update=time.time(),
153             )) # second set first because everything hooks on the first
154             pre_current_work.set(dict(
155                 version=work['version'],
156                 previous_block=work['previous_block_hash'],
157                 bits=work['bits'],
158             ))
159         
160         def set_real_work2():
161             best, desired = tracker.think(ht, pre_current_work.value['previous_block'], time.time() - pre_current_work2.value['clock_offset'])
162             
163             current_work2.set(pre_current_work2.value)
164             t = dict(pre_current_work.value)
165             t['best_share_hash'] = best
166             t['aux_work'] = pre_merged_work.value
167             current_work.set(t)
168             
169             t = time.time()
170             for peer2, share_hash in desired:
171                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
172                     continue
173                 last_request_time, count = requested.get(share_hash, (None, 0))
174                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
175                     continue
176                 potential_peers = set()
177                 for head in tracker.tails[share_hash]:
178                     potential_peers.update(peer_heads.get(head, set()))
179                 potential_peers = [peer for peer in potential_peers if peer.connected2]
180                 if count == 0 and peer2 is not None and peer2.connected2:
181                     peer = peer2
182                 else:
183                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
184                     if peer is None:
185                         continue
186                 
187                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
188                 peer.send_getshares(
189                     hashes=[share_hash],
190                     parents=2000,
191                     stops=list(set(tracker.heads) | set(
192                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
193                     ))[:100],
194                 )
195                 requested[share_hash] = t, count + 1
196         pre_current_work.changed.watch(lambda _: set_real_work2())
197         
198         print 'Initializing work...'
199         yield set_real_work1()
200         print '    ...success!'
201         print
202         
203         pre_merged_work.changed.watch(lambda _: set_real_work2())
204         ht.updated.watch(set_real_work2)
205         
206         @defer.inlineCallbacks
207         def set_merged_work():
208             if not args.merged_url:
209                 return
210             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
211             while True:
212                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
213                 pre_merged_work.set(dict(
214                     hash=int(auxblock['hash'], 16),
215                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
216                     chain_id=auxblock['chainid'],
217                 ))
218                 yield deferral.sleep(1)
219         set_merged_work()
220         
221         @pre_merged_work.changed.watch
222         def _(new_merged_work):
223             print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
224         
225         start_time = time.time() - current_work2.value['clock_offset']
226         
227         # setup p2p logic and join p2pool network
228         
229         def p2p_shares(shares, peer=None):
230             if len(shares) > 5:
231                 print 'Processing %i shares...' % (len(shares),)
232             
233             new_count = 0
234             for share in shares:
235                 if share.hash in tracker.shares:
236                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
237                     continue
238                 
239                 new_count += 1
240                 
241                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
242                 
243                 tracker.add(share)
244             
245             if shares and peer is not None:
246                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
247             
248             if new_count:
249                 set_real_work2()
250             
251             if len(shares) > 5:
252                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
253         
254         @tracker.verified.added.watch
255         def _(share):
256             if share.pow_hash <= share.header['bits'].target:
257                 if factory.conn.value is not None:
258                     factory.conn.value.send_block(block=share.as_block(tracker))
259                 else:
260                     print 'No bitcoind connection! Erp!'
261                 print
262                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
263                 print
264         
265         def p2p_share_hashes(share_hashes, peer):
266             t = time.time()
267             get_hashes = []
268             for share_hash in share_hashes:
269                 if share_hash in tracker.shares:
270                     continue
271                 last_request_time, count = requested.get(share_hash, (None, 0))
272                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
273                     continue
274                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
275                 get_hashes.append(share_hash)
276                 requested[share_hash] = t, count + 1
277             
278             if share_hashes and peer is not None:
279                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
280             if get_hashes:
281                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
282         
283         def p2p_get_shares(share_hashes, parents, stops, peer):
284             parents = min(parents, 1000//len(share_hashes))
285             stops = set(stops)
286             shares = []
287             for share_hash in share_hashes:
288                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
289                     if share.hash in stops:
290                         break
291                     shares.append(share)
292             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
293             peer.sendShares(shares)
294         
295         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
296         
297         def parse(x):
298             if ':' in x:
299                 ip, port = x.split(':')
300                 return ip, int(port)
301             else:
302                 return x, net.P2P_PORT
303         
304         nodes = set([
305             ('72.14.191.28', net.P2P_PORT),
306             ('62.204.197.159', net.P2P_PORT),
307             ('142.58.248.28', net.P2P_PORT),
308             ('94.23.34.145', net.P2P_PORT),
309         ])
310         for host in [
311             'p2pool.forre.st',
312             'dabuttonfactory.com',
313             ] + (['liteco.in'] if net.NAME == 'litecoin' else []) + [
314         ]:
315             try:
316                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
317             except:
318                 log.err(None, 'Error resolving bootstrap node IP:')
319         
320         addrs = {}
321         try:
322             addrs = dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt')))
323         except:
324             print "error reading addrs"
325         
326         def save_addrs():
327             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
328         task.LoopingCall(save_addrs).start(60)
329         
330         p2p_node = p2p.Node(
331             current_work=current_work,
332             port=args.p2pool_port,
333             net=net,
334             addr_store=addrs,
335             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
336         )
337         p2p_node.handle_shares = p2p_shares
338         p2p_node.handle_share_hashes = p2p_share_hashes
339         p2p_node.handle_get_shares = p2p_get_shares
340         
341         p2p_node.start()
342         
343         # send share when the chain changes to their chain
344         def work_changed(new_work):
345             #print 'Work changed:', new_work
346             shares = []
347             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
348                 if share.hash in shared_share_hashes:
349                     break
350                 shared_share_hashes.add(share.hash)
351                 shares.append(share)
352             
353             for peer in p2p_node.peers.itervalues():
354                 peer.sendShares([share for share in shares if share.peer is not peer])
355         
356         current_work.changed.watch(work_changed)
357         
358         def save_shares():
359             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
360                 ss.add_share(share)
361                 if share.hash in tracker.verified.shares:
362                     ss.add_verified_hash(share.hash)
363         task.LoopingCall(save_shares).start(60)
364         
365         print '    ...success!'
366         print
367         
368         @defer.inlineCallbacks
369         def upnp_thread():
370             while True:
371                 try:
372                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
373                     if is_lan:
374                         pm = yield portmapper.get_port_mapper()
375                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
376                 except defer.TimeoutError:
377                     pass
378                 except:
379                     if p2pool.DEBUG:
380                         log.err(None, "UPnP error:")
381                 yield deferral.sleep(random.expovariate(1/120))
382         
383         if args.upnp:
384             upnp_thread()
385         
386         # start listening for workers with a JSON-RPC server
387         
388         print 'Listening for workers on port %i...' % (args.worker_port,)
389         
390         # setup worker logic
391         
392         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
393         run_identifier = struct.pack('<I', random.randrange(2**32))
394         
395         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
396         removed_unstales = set()
397         def get_share_counts(doa=False):
398             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
399             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
400             shares_in_chain = my_shares & matching_in_chain
401             stale_shares = my_shares - matching_in_chain
402             if doa:
403                 stale_doa_shares = stale_shares & doa_shares
404                 stale_not_doa_shares = stale_shares - stale_doa_shares
405                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
406             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
407         @tracker.verified.removed.watch
408         def _(share):
409             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
410                 removed_unstales.add(share.hash)
411         
412         
413         def get_payout_script_from_username(user):
414             if user is None:
415                 return None
416             try:
417                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, net.PARENT))
418             except: # XXX blah
419                 return None
420         
421         def compute(request):
422             state = current_work.value
423             user = worker_interface.get_username(request)
424             
425             payout_script = get_payout_script_from_username(user)
426             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
427                 payout_script = my_script
428             
429             if len(p2p_node.peers) == 0 and net.PERSIST:
430                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
431             if state['best_share_hash'] is None and net.PERSIST:
432                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
433             if time.time() > current_work2.value['last_update'] + 60:
434                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
435             
436             previous_share = None if state['best_share_hash'] is None else tracker.shares[state['best_share_hash']]
437             subsidy = current_work2.value['subsidy']
438             share_info, generate_tx = p2pool_data.generate_transaction(
439                 tracker=tracker,
440                 share_data=dict(
441                     previous_share_hash=state['best_share_hash'],
442                     coinbase='' if state['aux_work'] is None else '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
443                     nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
444                     new_script=payout_script,
445                     subsidy=subsidy,
446                     donation=math.perfect_round(65535*args.donation_percentage/100),
447                     stale_frac=(lambda shares, stales:
448                         255 if shares == 0 else math.perfect_round(254*stales/shares)
449                     )(*get_share_counts()),
450                 ),
451                 block_target=state['bits'].target,
452                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
453                 net=net,
454             )
455             
456             print 'New work for worker %s! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
457                 user,
458                 bitcoin_data.target_to_difficulty(share_info['bits'].target),
459                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - subsidy//200)*1e-8, net.PARENT.SYMBOL,
460                 subsidy*1e-8, net.PARENT.SYMBOL,
461                 len(current_work2.value['transactions']),
462             )
463             
464             transactions = [generate_tx] + list(current_work2.value['transactions'])
465             merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
466             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
467             
468             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['time'], state['bits'], share_info['bits'].target), state['best_share_hash']
469         
470         my_shares = set()
471         doa_shares = set()
472         
473         def got_response(header, request):
474             try:
475                 user = worker_interface.get_username(request)
476                 # match up with transactions
477                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
478                 if xxx is None:
479                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
480                     return False
481                 share_info, transactions, getwork_time = xxx
482                 
483                 hash_ = bitcoin_data.block_header_type.hash256(header)
484                 
485                 pow_hash = net.PARENT.POW_FUNC(header)
486                 
487                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
488                     if factory.conn.value is not None:
489                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
490                     else:
491                         print 'No bitcoind connection! Erp!'
492                     if pow_hash <= header['bits'].target:
493                         print
494                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
495                         print
496                 
497                 try:
498                     if current_work.value['aux_work'] is not None and (pow_hash <= current_work.value['aux_work']['target'] or p2pool.DEBUG):
499                         aux_pow = dict(
500                             merkle_tx=dict(
501                                 tx=transactions[0],
502                                 block_hash=hash_,
503                                 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
504                                 index=0,
505                             ),
506                             merkle_branch=[],
507                             index=0,
508                             parent_block_header=header,
509                         )
510                         
511                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
512                         #print a, b
513                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
514                         def _(res):
515                             print "MERGED RESULT:", res
516                         merged.rpc_getauxblock(a, b).addBoth(_)
517                 except:
518                     log.err(None, 'Error while processing merged mining POW:')
519                 
520                 if pow_hash > share_info['bits'].target:
521                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, share_info['bits'].target)
522                     return False
523                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
524                 my_shares.add(share.hash)
525                 if share.previous_hash != current_work.value['best_share_hash']:
526                     doa_shares.add(share.hash)
527                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
528                 good = share.previous_hash == current_work.value['best_share_hash']
529                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
530                 p2p_shares([share])
531                 # eg. good = share.hash == current_work.value['best_share_hash'] here
532                 return good
533             except:
534                 log.err(None, 'Error processing data received from worker:')
535                 return False
536         
537         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
538         
539         def get_rate():
540             if current_work.value['best_share_hash'] is not None:
541                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
542                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
543                 fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
544                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
545             return json.dumps(None)
546         
547         def get_users():
548             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
549             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
550             res = {}
551             for script in sorted(weights, key=lambda s: weights[s]):
552                 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
553             return json.dumps(res)
554         
555         class WebInterface(resource.Resource):
556             def __init__(self, func, mime_type):
557                 self.func, self.mime_type = func, mime_type
558             
559             def render_GET(self, request):
560                 request.setHeader('Content-Type', self.mime_type)
561                 return self.func()
562         
563         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
564         web_root.putChild('users', WebInterface(get_users, 'application/json'))
565         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
566         if draw is not None:
567             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
568         
569         reactor.listenTCP(args.worker_port, server.Site(web_root))
570         
571         print '    ...success!'
572         print
573         
574         # done!
575         
576         # do new getwork when a block is heard on the p2p interface
577         
578         def new_block(block_hash):
579             work_updated.happened()
580         factory.new_block.watch(new_block)
581         
582         print 'Started successfully!'
583         print
584         
585         @defer.inlineCallbacks
586         def work1_thread():
587             while True:
588                 flag = work_updated.get_deferred()
589                 try:
590                     yield set_real_work1()
591                 except:
592                     log.err()
593                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
594         
595         @defer.inlineCallbacks
596         def work2_thread():
597             while True:
598                 try:
599                     set_real_work2()
600                 except:
601                     log.err()
602                 yield deferral.sleep(random.expovariate(1/20))
603         
604         work1_thread()
605         work2_thread()
606         
607         
608         if hasattr(signal, 'SIGALRM'):
609             def watchdog_handler(signum, frame):
610                 print 'Watchdog timer went off at:'
611                 traceback.print_stack()
612             
613             signal.signal(signal.SIGALRM, watchdog_handler)
614             task.LoopingCall(signal.alarm, 30).start(1)
615         
616         @defer.inlineCallbacks
617         def status_thread():
618             last_str = None
619             last_time = 0
620             while True:
621                 yield deferral.sleep(3)
622                 try:
623                     if time.time() > current_work2.value['last_update'] + 60:
624                         print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
625                     if current_work.value['best_share_hash'] is not None:
626                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
627                         if height > 2:
628                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
629                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
630                             shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
631                             stale_shares = stale_doa_shares + stale_not_doa_shares
632                             fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
633                             real_att_s = att_s / (1. - (math.median(fracs) if fracs else 0))
634                             my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
635                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
636                                 math.format(int(real_att_s)),
637                                 height,
638                                 len(tracker.verified.shares),
639                                 len(tracker.shares),
640                                 weights.get(my_script, 0)/total_weight*100,
641                                 math.format(int(my_att_s)),
642                                 shares,
643                                 stale_not_doa_shares,
644                                 stale_doa_shares,
645                                 len(p2p_node.peers),
646                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
647                             this_str += '\nAverage time between blocks: %.2f days' % (
648                                 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
649                             )
650                             if fracs:
651                                 med = math.median(fracs)
652                                 this_str += '\nPool stales: %i%%' % (int(100*med+.5),)
653                                 conf = 0.95
654                                 if shares:
655                                     this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
656                                     if med < .99:
657                                         this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - med) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
658                             if this_str != last_str or time.time() > last_time + 15:
659                                 print this_str
660                                 last_str = this_str
661                                 last_time = time.time()
662                 except:
663                     log.err()
664         status_thread()
665     except:
666         log.err(None, 'Fatal error:')
667
668 def run():
669     class FixedArgumentParser(argparse.ArgumentParser):
670         def _read_args_from_files(self, arg_strings):
671             # expand arguments referencing files
672             new_arg_strings = []
673             for arg_string in arg_strings:
674                 
675                 # for regular arguments, just add them back into the list
676                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
677                     new_arg_strings.append(arg_string)
678                 
679                 # replace arguments referencing files with the file content
680                 else:
681                     try:
682                         args_file = open(arg_string[1:])
683                         try:
684                             arg_strings = []
685                             for arg_line in args_file.read().splitlines():
686                                 for arg in self.convert_arg_line_to_args(arg_line):
687                                     arg_strings.append(arg)
688                             arg_strings = self._read_args_from_files(arg_strings)
689                             new_arg_strings.extend(arg_strings)
690                         finally:
691                             args_file.close()
692                     except IOError:
693                         err = sys.exc_info()[1]
694                         self.error(str(err))
695             
696             # return the modified argument list
697             return new_arg_strings
698         
699         def convert_arg_line_to_args(self, arg_line):
700             return [arg for arg in arg_line.split() if arg.strip()]
701     
702     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
703     parser.add_argument('--version', action='version', version=p2pool.__version__)
704     parser.add_argument('--net',
705         help='use specified network (default: bitcoin)',
706         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
707     parser.add_argument('--testnet',
708         help='''use the network's testnet''',
709         action='store_const', const=True, default=False, dest='testnet')
710     parser.add_argument('--debug',
711         help='enable debugging mode',
712         action='store_const', const=True, default=False, dest='debug')
713     parser.add_argument('-a', '--address',
714         help='generate payouts to this address (default: <address requested from bitcoind>)',
715         type=str, action='store', default=None, dest='address')
716     parser.add_argument('--logfile',
717         help='''log to this file (default: data/<NET>/log)''',
718         type=str, action='store', default=None, dest='logfile')
719     parser.add_argument('--merged-url',
720         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
721         type=str, action='store', default=None, dest='merged_url')
722     parser.add_argument('--merged-userpass',
723         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
724         type=str, action='store', default=None, dest='merged_userpass')
725     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
726         help='donate this percentage of work to author of p2pool (default: 0.5)',
727         type=float, action='store', default=0.5, dest='donation_percentage')
728     
729     p2pool_group = parser.add_argument_group('p2pool interface')
730     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
731         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
732         type=int, action='store', default=None, dest='p2pool_port')
733     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
734         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
735         type=str, action='append', default=[], dest='p2pool_nodes')
736     parser.add_argument('--disable-upnp',
737         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
738         action='store_false', default=True, dest='upnp')
739     
740     worker_group = parser.add_argument_group('worker interface')
741     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
742         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
743         type=int, action='store', default=None, dest='worker_port')
744     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
745         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
746         type=float, action='store', default=0, dest='worker_fee')
747     
748     bitcoind_group = parser.add_argument_group('bitcoind interface')
749     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
750         help='connect to this address (default: 127.0.0.1)',
751         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
752     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
753         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
754         type=int, action='store', default=None, dest='bitcoind_rpc_port')
755     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
756         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
757         type=int, action='store', default=None, dest='bitcoind_p2p_port')
758     
759     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
760         help='bitcoind RPC interface username (default: <empty>)',
761         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
762     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
763         help='bitcoind RPC interface password',
764         type=str, action='store', dest='bitcoind_rpc_password')
765     
766     args = parser.parse_args()
767     
768     if args.debug:
769         p2pool.DEBUG = True
770     
771     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
772     
773     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
774     if not os.path.exists(datadir_path):
775         os.makedirs(datadir_path)
776     
777     if args.logfile is None:
778         args.logfile = os.path.join(datadir_path, 'log')
779     
780     class EncodeReplacerPipe(object):
781         def __init__(self, inner_file):
782             self.inner_file = inner_file
783             self.softspace = 0
784         def write(self, data):
785             if isinstance(data, unicode):
786                 try:
787                     data = data.encode(self.inner_file.encoding, 'replace')
788                 except:
789                     data = data.encode('ascii', 'replace')
790             self.inner_file.write(data)
791         def flush(self):
792             self.inner_file.flush()
793     class LogFile(object):
794         def __init__(self, filename):
795             self.filename = filename
796             self.inner_file = None
797             self.reopen()
798         def reopen(self):
799             if self.inner_file is not None:
800                 self.inner_file.close()
801             open(self.filename, 'a').close()
802             f = open(self.filename, 'rb')
803             f.seek(0, os.SEEK_END)
804             length = f.tell()
805             if length > 100*1000*1000:
806                 f.seek(-1000*1000, os.SEEK_END)
807                 while True:
808                     if f.read(1) in ('', '\n'):
809                         break
810                 data = f.read()
811                 f.close()
812                 f = open(self.filename, 'wb')
813                 f.write(data)
814             f.close()
815             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
816         def write(self, data):
817             self.inner_file.write(data)
818         def flush(self):
819             self.inner_file.flush()
820     class TeePipe(object):
821         def __init__(self, outputs):
822             self.outputs = outputs
823         def write(self, data):
824             for output in self.outputs:
825                 output.write(data)
826         def flush(self):
827             for output in self.outputs:
828                 output.flush()
829     class TimestampingPipe(object):
830         def __init__(self, inner_file):
831             self.inner_file = inner_file
832             self.buf = ''
833             self.softspace = 0
834         def write(self, data):
835             buf = self.buf + data
836             lines = buf.split('\n')
837             for line in lines[:-1]:
838                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
839                 self.inner_file.flush()
840             self.buf = lines[-1]
841         def flush(self):
842             pass
843     class AbortPipe(object):
844         def __init__(self, inner_file):
845             self.inner_file = inner_file
846             self.softspace = 0
847         def write(self, data):
848             try:
849                 self.inner_file.write(data)
850             except:
851                 sys.stdout = sys.__stdout__
852                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
853                 raise
854         def flush(self):
855             self.inner_file.flush()
856     logfile = LogFile(args.logfile)
857     sys.stdout = sys.stderr = log.DefaultObserver.stderr = AbortPipe(TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile])))
858     if hasattr(signal, "SIGUSR1"):
859         def sigusr1(signum, frame):
860             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
861             logfile.reopen()
862             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
863         signal.signal(signal.SIGUSR1, sigusr1)
864     task.LoopingCall(logfile.reopen).start(5)
865     
866     if args.bitcoind_rpc_port is None:
867         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
868     
869     if args.bitcoind_p2p_port is None:
870         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
871     
872     if args.p2pool_port is None:
873         args.p2pool_port = net.P2P_PORT
874     
875     if args.worker_port is None:
876         args.worker_port = net.WORKER_PORT
877     
878     if args.address is not None:
879         try:
880             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
881         except Exception, e:
882             parser.error('error parsing address: ' + repr(e))
883     else:
884         args.pubkey_hash = None
885     
886     if (args.merged_url is None) ^ (args.merged_userpass is None):
887         parser.error('must specify --merged-url and --merged-userpass')
888     
889     reactor.callWhenRunning(main, args, net, datadir_path)
890     reactor.run()