made FloatingInteger usage explicit
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
52
53 @defer.inlineCallbacks
54 def main(args, net, datadir_path):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         try:
59             from . import draw
60         except ImportError:
61             draw = None
62             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
63             print
64         
65         # connect to bitcoind over JSON-RPC and do initial getmemorypool
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield getwork(bitcoind)
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin_p2p.ClientFactory(net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         yield factory.getProtocol() # waits until handshake is successful
83         print '    ...success!'
84         print
85         
86         if args.pubkey_hash is None:
87             print 'Getting payout address from bitcoind...'
88             my_script = yield get_payout_script2(bitcoind, net)
89         else:
90             print 'Computing payout script from provided address....'
91             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
92         print '    ...success!'
93         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net)
94         print
95         
96         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
97         
98         tracker = p2pool_data.OkayTracker(net)
99         shared_share_hashes = set()
100         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
101         known_verified = set()
102         print "Loading shares..."
103         for i, (mode, contents) in enumerate(ss.get_shares()):
104             if mode == 'share':
105                 if contents.hash in tracker.shares:
106                     continue
107                 shared_share_hashes.add(contents.hash)
108                 contents.time_seen = 0
109                 tracker.add(contents)
110                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
111                     print "    %i" % (len(tracker.shares),)
112             elif mode == 'verified_hash':
113                 known_verified.add(contents)
114             else:
115                 raise AssertionError()
116         print "    ...inserting %i verified shares..." % (len(known_verified),)
117         for h in known_verified:
118             if h not in tracker.shares:
119                 ss.forget_verified_share(h)
120                 continue
121             tracker.verified.add(tracker.shares[h])
122         print "    ...done loading %i shares!" % (len(tracker.shares),)
123         print
124         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
125         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
126         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
127         
128         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
129         
130         pre_current_work = variable.Variable(None)
131         pre_current_work2 = variable.Variable(None)
132         pre_merged_work = variable.Variable(None)
133         # information affecting work that should trigger a long-polling update
134         current_work = variable.Variable(None)
135         # information affecting work that should not trigger a long-polling update
136         current_work2 = variable.Variable(None)
137         
138         work_updated = variable.Event()
139         
140         requested = expiring_dict.ExpiringDict(300)
141         
142         @defer.inlineCallbacks
143         def set_real_work1():
144             work = yield getwork(bitcoind)
145             pre_current_work2.set(dict(
146                 time=work['time'],
147                 transactions=work['transactions'],
148                 subsidy=work['subsidy'],
149                 clock_offset=time.time() - work['time'],
150                 last_update=time.time(),
151             )) # second set first because everything hooks on the first
152             pre_current_work.set(dict(
153                 version=work['version'],
154                 previous_block=work['previous_block_hash'],
155                 bits=work['bits'],
156             ))
157         
158         def set_real_work2():
159             best, desired = tracker.think(ht, pre_current_work.value['previous_block'], time.time() - pre_current_work2.value['clock_offset'])
160             
161             current_work2.set(pre_current_work2.value)
162             t = dict(pre_current_work.value)
163             t['best_share_hash'] = best
164             t['aux_work'] = pre_merged_work.value
165             current_work.set(t)
166             
167             t = time.time()
168             for peer2, share_hash in desired:
169                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
170                     continue
171                 last_request_time, count = requested.get(share_hash, (None, 0))
172                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
173                     continue
174                 potential_peers = set()
175                 for head in tracker.tails[share_hash]:
176                     potential_peers.update(peer_heads.get(head, set()))
177                 potential_peers = [peer for peer in potential_peers if peer.connected2]
178                 if count == 0 and peer2 is not None and peer2.connected2:
179                     peer = peer2
180                 else:
181                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
182                     if peer is None:
183                         continue
184                 
185                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
186                 peer.send_getshares(
187                     hashes=[share_hash],
188                     parents=2000,
189                     stops=list(set(tracker.heads) | set(
190                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
191                     ))[:100],
192                 )
193                 requested[share_hash] = t, count + 1
194         pre_current_work.changed.watch(lambda _: set_real_work2())
195         
196         print 'Initializing work...'
197         yield set_real_work1()
198         print '    ...success!'
199         print
200         
201         pre_merged_work.changed.watch(lambda _: set_real_work2())
202         ht.updated.watch(set_real_work2)
203         
204         @defer.inlineCallbacks
205         def set_merged_work():
206             if not args.merged_url:
207                 return
208             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
209             while True:
210                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
211                 pre_merged_work.set(dict(
212                     hash=int(auxblock['hash'], 16),
213                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
214                     chain_id=auxblock['chainid'],
215                 ))
216                 yield deferral.sleep(1)
217         set_merged_work()
218         
219         start_time = time.time() - current_work2.value['clock_offset']
220         
221         # setup p2p logic and join p2pool network
222         
223         def p2p_shares(shares, peer=None):
224             if len(shares) > 5:
225                 print 'Processing %i shares...' % (len(shares),)
226             
227             new_count = 0
228             for share in shares:
229                 if share.hash in tracker.shares:
230                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
231                     continue
232                 
233                 new_count += 1
234                 
235                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
236                 
237                 tracker.add(share)
238             
239             if shares and peer is not None:
240                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
241             
242             if new_count:
243                 set_real_work2()
244             
245             if len(shares) > 5:
246                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
247         
248         @tracker.verified.added.watch
249         def _(share):
250             if share.pow_hash <= share.header['bits'].target:
251                 if factory.conn.value is not None:
252                     factory.conn.value.send_block(block=share.as_block(tracker))
253                 else:
254                     print 'No bitcoind connection! Erp!'
255                 print
256                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
257                 print
258         
259         def p2p_share_hashes(share_hashes, peer):
260             t = time.time()
261             get_hashes = []
262             for share_hash in share_hashes:
263                 if share_hash in tracker.shares:
264                     continue
265                 last_request_time, count = requested.get(share_hash, (None, 0))
266                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
267                     continue
268                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
269                 get_hashes.append(share_hash)
270                 requested[share_hash] = t, count + 1
271             
272             if share_hashes and peer is not None:
273                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
274             if get_hashes:
275                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
276         
277         def p2p_get_shares(share_hashes, parents, stops, peer):
278             parents = min(parents, 1000//len(share_hashes))
279             stops = set(stops)
280             shares = []
281             for share_hash in share_hashes:
282                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
283                     if share.hash in stops:
284                         break
285                     shares.append(share)
286             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
287             peer.sendShares(shares)
288         
289         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
290         
291         def parse(x):
292             if ':' in x:
293                 ip, port = x.split(':')
294                 return ip, int(port)
295             else:
296                 return x, net.P2P_PORT
297         
298         nodes = set([
299             ('72.14.191.28', net.P2P_PORT),
300             ('62.204.197.159', net.P2P_PORT),
301             ('142.58.248.28', net.P2P_PORT),
302             ('94.23.34.145', net.P2P_PORT),
303         ])
304         for host in [
305             'p2pool.forre.st',
306             'dabuttonfactory.com',
307             ] + (['liteco.in'] if net.NAME == 'litecoin' else []) + [
308         ]:
309             try:
310                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
311             except:
312                 log.err(None, 'Error resolving bootstrap node IP:')
313         
314         addrs = {}
315         try:
316             addrs = dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt')))
317         except:
318             print "error reading addrs"
319         
320         def save_addrs():
321             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
322         task.LoopingCall(save_addrs).start(60)
323         
324         p2p_node = p2p.Node(
325             current_work=current_work,
326             port=args.p2pool_port,
327             net=net,
328             addr_store=addrs,
329             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
330         )
331         p2p_node.handle_shares = p2p_shares
332         p2p_node.handle_share_hashes = p2p_share_hashes
333         p2p_node.handle_get_shares = p2p_get_shares
334         
335         p2p_node.start()
336         
337         # send share when the chain changes to their chain
338         def work_changed(new_work):
339             #print 'Work changed:', new_work
340             shares = []
341             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
342                 if share.hash in shared_share_hashes:
343                     break
344                 shared_share_hashes.add(share.hash)
345                 shares.append(share)
346             
347             for peer in p2p_node.peers.itervalues():
348                 peer.sendShares([share for share in shares if share.peer is not peer])
349         
350         current_work.changed.watch(work_changed)
351         
352         def save_shares():
353             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
354                 ss.add_share(share)
355                 if share.hash in tracker.verified.shares:
356                     ss.add_verified_hash(share.hash)
357         task.LoopingCall(save_shares).start(60)
358         
359         print '    ...success!'
360         print
361         
362         @defer.inlineCallbacks
363         def upnp_thread():
364             while True:
365                 try:
366                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
367                     if is_lan:
368                         pm = yield portmapper.get_port_mapper()
369                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
370                 except defer.TimeoutError:
371                     pass
372                 except:
373                     if p2pool.DEBUG:
374                         log.err(None, "UPnP error:")
375                 yield deferral.sleep(random.expovariate(1/120))
376         
377         if args.upnp:
378             upnp_thread()
379         
380         # start listening for workers with a JSON-RPC server
381         
382         print 'Listening for workers on port %i...' % (args.worker_port,)
383         
384         # setup worker logic
385         
386         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
387         run_identifier = struct.pack('<I', random.randrange(2**32))
388         
389         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
390         removed_unstales = set()
391         def get_share_counts(doa=False):
392             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
393             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
394             shares_in_chain = my_shares & matching_in_chain
395             stale_shares = my_shares - matching_in_chain
396             if doa:
397                 stale_doa_shares = stale_shares & doa_shares
398                 stale_not_doa_shares = stale_shares - stale_doa_shares
399                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
400             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
401         @tracker.verified.removed.watch
402         def _(share):
403             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
404                 removed_unstales.add(share.hash)
405         
406         
407         def get_payout_script_from_username(user):
408             if user is None:
409                 return None
410             try:
411                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, net))
412             except: # XXX blah
413                 return None
414         
415         def compute(request):
416             state = current_work.value
417             user = worker_interface.get_username(request)
418             
419             payout_script = get_payout_script_from_username(user)
420             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
421                 payout_script = my_script
422             
423             if len(p2p_node.peers) == 0 and net.PERSIST:
424                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
425             if state['best_share_hash'] is None and net.PERSIST:
426                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
427             if time.time() > current_work2.value['last_update'] + 60:
428                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
429             
430             previous_share = None if state['best_share_hash'] is None else tracker.shares[state['best_share_hash']]
431             subsidy = current_work2.value['subsidy']
432             share_info, generate_tx = p2pool_data.generate_transaction(
433                 tracker=tracker,
434                 share_data=dict(
435                     previous_share_hash=state['best_share_hash'],
436                     coinbase='' if state['aux_work'] is None else '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
437                     nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
438                     new_script=payout_script,
439                     subsidy=subsidy,
440                     donation=math.perfect_round(65535*args.donation_percentage/100),
441                     stale_frac=(lambda shares, stales:
442                         255 if shares == 0 else math.perfect_round(254*stales/shares)
443                     )(*get_share_counts()),
444                 ),
445                 block_target=state['bits'].target,
446                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
447                 net=net,
448             )
449             
450             print 'New work for worker %s! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
451                 user,
452                 bitcoin_data.target_to_difficulty(share_info['bits'].target),
453                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - subsidy//200)*1e-8, net.BITCOIN_SYMBOL,
454                 subsidy*1e-8, net.BITCOIN_SYMBOL,
455                 len(current_work2.value['transactions']),
456             )
457             
458             transactions = [generate_tx] + list(current_work2.value['transactions'])
459             merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
460             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
461             
462             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['time'], state['bits'], share_info['bits'].target), state['best_share_hash']
463         
464         my_shares = set()
465         doa_shares = set()
466         
467         def got_response(header, request):
468             try:
469                 user = worker_interface.get_username(request)
470                 # match up with transactions
471                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
472                 if xxx is None:
473                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
474                     return False
475                 share_info, transactions, getwork_time = xxx
476                 
477                 hash_ = bitcoin_data.block_header_type.hash256(header)
478                 
479                 pow_hash = net.BITCOIN_POW_FUNC(header)
480                 
481                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
482                     if factory.conn.value is not None:
483                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
484                     else:
485                         print 'No bitcoind connection! Erp!'
486                     if pow_hash <= header['bits'].target:
487                         print
488                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
489                         print
490                 
491                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
492                     try:
493                         aux_pow = dict(
494                             merkle_tx=dict(
495                                 tx=transactions[0],
496                                 block_hash=hash_,
497                                 merkle_branch=[x['hash'] for x in p2pool_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0)],
498                                 index=0,
499                             ),
500                             merkle_branch=[],
501                             index=0,
502                             parent_block_header=header,
503                         )
504                         
505                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
506                         #print a, b
507                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
508                         def _(res):
509                             print "MERGED RESULT:", res
510                         merged.rpc_getauxblock(a, b).addBoth(_)
511                     except:
512                         log.err(None, 'Error while processing merged mining POW:')
513                 
514                 if pow_hash > share_info['bits'].target:
515                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, share_info['bits'].target)
516                     return False
517                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
518                 my_shares.add(share.hash)
519                 if share.previous_hash != current_work.value['best_share_hash']:
520                     doa_shares.add(share.hash)
521                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
522                 good = share.previous_hash == current_work.value['best_share_hash']
523                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
524                 p2p_shares([share])
525                 # eg. good = share.hash == current_work.value['best_share_hash'] here
526                 return good
527             except:
528                 log.err(None, 'Error processing data received from worker:')
529                 return False
530         
531         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
532         
533         def get_rate():
534             if current_work.value['best_share_hash'] is not None:
535                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
536                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
537                 fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
538                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
539             return json.dumps(None)
540         
541         def get_users():
542             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
543             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
544             res = {}
545             for script in sorted(weights, key=lambda s: weights[s]):
546                 res[bitcoin_data.script2_to_human(script, net)] = weights[script]/total_weight
547             return json.dumps(res)
548         
549         class WebInterface(resource.Resource):
550             def __init__(self, func, mime_type):
551                 self.func, self.mime_type = func, mime_type
552             
553             def render_GET(self, request):
554                 request.setHeader('Content-Type', self.mime_type)
555                 return self.func()
556         
557         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
558         web_root.putChild('users', WebInterface(get_users, 'application/json'))
559         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
560         if draw is not None:
561             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
562         
563         reactor.listenTCP(args.worker_port, server.Site(web_root))
564         
565         print '    ...success!'
566         print
567         
568         # done!
569         
570         # do new getwork when a block is heard on the p2p interface
571         
572         def new_block(block_hash):
573             work_updated.happened()
574         factory.new_block.watch(new_block)
575         
576         print 'Started successfully!'
577         print
578         
579         @defer.inlineCallbacks
580         def work1_thread():
581             while True:
582                 flag = work_updated.get_deferred()
583                 try:
584                     yield set_real_work1()
585                 except:
586                     log.err()
587                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
588         
589         @defer.inlineCallbacks
590         def work2_thread():
591             while True:
592                 try:
593                     set_real_work2()
594                 except:
595                     log.err()
596                 yield deferral.sleep(random.expovariate(1/20))
597         
598         work1_thread()
599         work2_thread()
600         
601         
602         if hasattr(signal, 'SIGALRM'):
603             def watchdog_handler(signum, frame):
604                 print 'Watchdog timer went off at:'
605                 traceback.print_stack()
606             
607             signal.signal(signal.SIGALRM, watchdog_handler)
608             task.LoopingCall(signal.alarm, 30).start(1)
609         
610         @defer.inlineCallbacks
611         def status_thread():
612             last_str = None
613             last_time = 0
614             while True:
615                 yield deferral.sleep(3)
616                 try:
617                     if time.time() > current_work2.value['last_update'] + 60:
618                         print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
619                     if current_work.value['best_share_hash'] is not None:
620                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
621                         if height > 2:
622                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
623                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
624                             shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
625                             stale_shares = stale_doa_shares + stale_not_doa_shares
626                             fracs = [share.stale_frac for share in tracker.get_chain(current_work.value['best_share_hash'], min(120, height)) if share.stale_frac is not None]
627                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
628                                 math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
629                                 height,
630                                 len(tracker.verified.shares),
631                                 len(tracker.shares),
632                                 weights.get(my_script, 0)/total_weight*100,
633                                 math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
634                                 shares,
635                                 stale_not_doa_shares,
636                                 stale_doa_shares,
637                                 len(p2p_node.peers),
638                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
639                             if fracs:
640                                 med = math.median(fracs)
641                                 this_str += '\nPool stales: %i%%' % (int(100*med+.5),)
642                                 conf = 0.95
643                                 if shares:
644                                     this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
645                                     if med < .99:
646                                         this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - med) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
647                             if this_str != last_str or time.time() > last_time + 15:
648                                 print this_str
649                                 last_str = this_str
650                                 last_time = time.time()
651                 except:
652                     log.err()
653         status_thread()
654     except:
655         log.err(None, 'Fatal error:')
656
657 def run():
658     class FixedArgumentParser(argparse.ArgumentParser):
659         def _read_args_from_files(self, arg_strings):
660             # expand arguments referencing files
661             new_arg_strings = []
662             for arg_string in arg_strings:
663                 
664                 # for regular arguments, just add them back into the list
665                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
666                     new_arg_strings.append(arg_string)
667                 
668                 # replace arguments referencing files with the file content
669                 else:
670                     try:
671                         args_file = open(arg_string[1:])
672                         try:
673                             arg_strings = []
674                             for arg_line in args_file.read().splitlines():
675                                 for arg in self.convert_arg_line_to_args(arg_line):
676                                     arg_strings.append(arg)
677                             arg_strings = self._read_args_from_files(arg_strings)
678                             new_arg_strings.extend(arg_strings)
679                         finally:
680                             args_file.close()
681                     except IOError:
682                         err = sys.exc_info()[1]
683                         self.error(str(err))
684             
685             # return the modified argument list
686             return new_arg_strings
687         
688         def convert_arg_line_to_args(self, arg_line):
689             return [arg for arg in arg_line.split() if arg.strip()]
690     
691     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
692     parser.add_argument('--version', action='version', version=p2pool.__version__)
693     parser.add_argument('--net',
694         help='use specified network (default: bitcoin)',
695         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
696     parser.add_argument('--testnet',
697         help='''use the network's testnet''',
698         action='store_const', const=True, default=False, dest='testnet')
699     parser.add_argument('--debug',
700         help='enable debugging mode',
701         action='store_const', const=True, default=False, dest='debug')
702     parser.add_argument('-a', '--address',
703         help='generate payouts to this address (default: <address requested from bitcoind>)',
704         type=str, action='store', default=None, dest='address')
705     parser.add_argument('--logfile',
706         help='''log to this file (default: data/<NET>/log)''',
707         type=str, action='store', default=None, dest='logfile')
708     parser.add_argument('--merged-url',
709         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
710         type=str, action='store', default=None, dest='merged_url')
711     parser.add_argument('--merged-userpass',
712         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
713         type=str, action='store', default=None, dest='merged_userpass')
714     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
715         help='donate this percentage of work to author of p2pool (default: 0.5)',
716         type=float, action='store', default=0.5, dest='donation_percentage')
717     
718     p2pool_group = parser.add_argument_group('p2pool interface')
719     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
720         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
721         type=int, action='store', default=None, dest='p2pool_port')
722     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
723         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
724         type=str, action='append', default=[], dest='p2pool_nodes')
725     parser.add_argument('--disable-upnp',
726         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
727         action='store_false', default=True, dest='upnp')
728     
729     worker_group = parser.add_argument_group('worker interface')
730     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
731         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
732         type=int, action='store', default=None, dest='worker_port')
733     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
734         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
735         type=float, action='store', default=0, dest='worker_fee')
736     
737     bitcoind_group = parser.add_argument_group('bitcoind interface')
738     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
739         help='connect to this address (default: 127.0.0.1)',
740         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
741     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
742         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_RPC_PORT) for _, n in sorted(networks.realnets.items())),
743         type=int, action='store', default=None, dest='bitcoind_rpc_port')
744     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
745         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_P2P_PORT) for _, n in sorted(networks.realnets.items())),
746         type=int, action='store', default=None, dest='bitcoind_p2p_port')
747     
748     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
749         help='bitcoind RPC interface username (default: <empty>)',
750         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
751     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
752         help='bitcoind RPC interface password',
753         type=str, action='store', dest='bitcoind_rpc_password')
754     
755     args = parser.parse_args()
756     
757     if args.debug:
758         p2pool.DEBUG = True
759     
760     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
761     
762     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
763     if not os.path.exists(datadir_path):
764         os.makedirs(datadir_path)
765     
766     if args.logfile is None:
767         args.logfile = os.path.join(datadir_path, 'log')
768     
769     class EncodeReplacerPipe(object):
770         def __init__(self, inner_file):
771             self.inner_file = inner_file
772             self.softspace = 0
773         def write(self, data):
774             if isinstance(data, unicode):
775                 data = data.encode(self.inner_file.encoding, 'replace')
776             self.inner_file.write(data)
777         def flush(self):
778             self.inner_file.flush()
779     class LogFile(object):
780         def __init__(self, filename):
781             self.filename = filename
782             self.inner_file = None
783             self.reopen()
784         def reopen(self):
785             if self.inner_file is not None:
786                 self.inner_file.close()
787             open(self.filename, 'a').close()
788             f = open(self.filename, 'rb')
789             f.seek(0, os.SEEK_END)
790             length = f.tell()
791             if length > 100*1000*1000:
792                 f.seek(-1000*1000, os.SEEK_END)
793                 while True:
794                     if f.read(1) in ('', '\n'):
795                         break
796                 data = f.read()
797                 f.close()
798                 f = open(self.filename, 'wb')
799                 f.write(data)
800             f.close()
801             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
802         def write(self, data):
803             self.inner_file.write(data)
804         def flush(self):
805             self.inner_file.flush()
806     class TeePipe(object):
807         def __init__(self, outputs):
808             self.outputs = outputs
809         def write(self, data):
810             for output in self.outputs:
811                 output.write(data)
812         def flush(self):
813             for output in self.outputs:
814                 output.flush()
815     class TimestampingPipe(object):
816         def __init__(self, inner_file):
817             self.inner_file = inner_file
818             self.buf = ''
819             self.softspace = 0
820         def write(self, data):
821             buf = self.buf + data
822             lines = buf.split('\n')
823             for line in lines[:-1]:
824                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
825                 self.inner_file.flush()
826             self.buf = lines[-1]
827         def flush(self):
828             pass
829     class AbortPipe(object):
830         def __init__(self, inner_file):
831             self.inner_file = inner_file
832             self.softspace = 0
833         def write(self, data):
834             try:
835                 self.inner_file.write(data)
836             except:
837                 sys.stdout = sys.__stdout__
838                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
839                 raise
840         def flush(self):
841             self.inner_file.flush()
842     logfile = LogFile(args.logfile)
843     sys.stdout = sys.stderr = log.DefaultObserver.stderr = AbortPipe(TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile])))
844     if hasattr(signal, "SIGUSR1"):
845         def sigusr1(signum, frame):
846             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
847             logfile.reopen()
848             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
849         signal.signal(signal.SIGUSR1, sigusr1)
850     task.LoopingCall(logfile.reopen).start(5)
851     
852     if args.bitcoind_rpc_port is None:
853         args.bitcoind_rpc_port = net.BITCOIN_RPC_PORT
854     
855     if args.bitcoind_p2p_port is None:
856         args.bitcoind_p2p_port = net.BITCOIN_P2P_PORT
857     
858     if args.p2pool_port is None:
859         args.p2pool_port = net.P2P_PORT
860     
861     if args.worker_port is None:
862         args.worker_port = net.WORKER_PORT
863     
864     if args.address is not None:
865         try:
866             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net)
867         except Exception, e:
868             parser.error('error parsing address: ' + repr(e))
869     else:
870         args.pubkey_hash = None
871     
872     if (args.merged_url is None) ^ (args.merged_userpass is None):
873         parser.error('must specify --merged-url and --merged-userpass')
874     
875     reactor.callWhenRunning(main, args, net, datadir_path)
876     reactor.run()