275c8f593177fc6a86326db5bfbfdb481d1bcf15
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import datetime
8 import itertools
9 import os
10 import random
11 import sqlite3
12 import struct
13 import sys
14 import time
15 import json
16 import signal
17 import traceback
18
19 from twisted.internet import defer, reactor, task
20 from twisted.web import server, resource
21 from twisted.python import log
22 from nattraverso import portmapper, ipdiscover
23
24 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
25 from bitcoin import worker_interface
26 from util import db, expiring_dict, jsonrpc, variable, deferral, math
27 from . import p2p, skiplists, networks
28 import p2pool, p2pool.data as p2pool_data
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind):
33     work = yield bitcoind.rpc_getmemorypool()
34     defer.returnValue(dict(
35         version=work['version'],
36         previous_block_hash=int(work['previousblockhash'], 16),
37         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38         subsidy=work['coinbasevalue'],
39         time=work['time'],
40         target=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
41     ))
42
43 @deferral.retry('Error creating payout script:', 10)
44 @defer.inlineCallbacks
45 def get_payout_script2(bitcoind, net):
46     address = yield bitcoind.rpc_getaccountaddress('p2pool')
47     validate_response = yield bitcoind.rpc_validateaddress(address)
48     if 'pubkey' not in validate_response:
49         print '    Pubkey request failed. Falling back to payout to address.'
50         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net)))
51     pubkey = validate_response['pubkey'].decode('hex')
52     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
53
54 @defer.inlineCallbacks
55 def main(args, net):
56     try:
57         print 'p2pool (version %s)' % (p2pool.__version__,)
58         print
59         try:
60             from . import draw
61         except ImportError:
62             draw = None
63             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
64             print
65         
66         # connect to bitcoind over JSON-RPC and do initial getmemorypool
67         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
68         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
69         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
70         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.BITCOIN_RPC_CHECK)(bitcoind)
71         if not good:
72             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
73             return
74         temp_work = yield getwork(bitcoind)
75         print '    ...success!'
76         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
77         print
78         
79         # connect to bitcoind over bitcoin-p2p
80         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
81         factory = bitcoin_p2p.ClientFactory(net)
82         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
83         yield factory.getProtocol() # waits until handshake is successful
84         print '    ...success!'
85         print
86         
87         if args.pubkey_hash is None:
88             print 'Getting payout address from bitcoind...'
89             my_script = yield get_payout_script2(bitcoind, net)
90         else:
91             print 'Computing payout script from provided address....'
92             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
93         print '    ...success!'
94         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net)
95         print
96         
97         print 'Loading cached block headers...'
98         ht = bitcoin_p2p.HeightTracker(factory, net.NAME + '_headers.dat')
99         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
100         print
101         
102         tracker = p2pool_data.OkayTracker(net)
103         shared_share_hashes = set()
104         ss = p2pool_data.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), net.NAME + '_shares.'), net)
105         known_verified = set()
106         print "Loading shares..."
107         for i, (mode, contents) in enumerate(ss.get_shares()):
108             if mode == 'share':
109                 if contents.hash in tracker.shares:
110                     continue
111                 shared_share_hashes.add(contents.hash)
112                 contents.time_seen = 0
113                 tracker.add(contents)
114                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
115                     print "    %i" % (len(tracker.shares),)
116             elif mode == 'verified_hash':
117                 known_verified.add(contents)
118             else:
119                 raise AssertionError()
120         print "    ...inserting %i verified shares..." % (len(known_verified),)
121         for h in known_verified:
122             if h not in tracker.shares:
123                 ss.forget_verified_share(h)
124                 continue
125             tracker.verified.add(tracker.shares[h])
126         print "    ...done loading %i shares!" % (len(tracker.shares),)
127         print
128         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
129         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
130         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
131         
132         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
133         
134         pre_current_work = variable.Variable(None)
135         pre_current_work2 = variable.Variable(None)
136         pre_merged_work = variable.Variable(None)
137         # information affecting work that should trigger a long-polling update
138         current_work = variable.Variable(None)
139         # information affecting work that should not trigger a long-polling update
140         current_work2 = variable.Variable(None)
141         
142         work_updated = variable.Event()
143         
144         requested = expiring_dict.ExpiringDict(300)
145         
146         @defer.inlineCallbacks
147         def set_real_work1():
148             work = yield getwork(bitcoind)
149             pre_current_work2.set(dict(
150                 time=work['time'],
151                 transactions=work['transactions'],
152                 subsidy=work['subsidy'],
153                 clock_offset=time.time() - work['time'],
154                 last_update=time.time(),
155             )) # second set first because everything hooks on the first
156             pre_current_work.set(dict(
157                 version=work['version'],
158                 previous_block=work['previous_block_hash'],
159                 target=work['target'],
160             ))
161         
162         def set_real_work2():
163             best, desired = tracker.think(ht, pre_current_work.value['previous_block'], time.time() - pre_current_work2.value['clock_offset'])
164             
165             current_work2.set(pre_current_work2.value)
166             t = dict(pre_current_work.value)
167             t['best_share_hash'] = best
168             t['aux_work'] = pre_merged_work.value
169             current_work.set(t)
170             
171             t = time.time()
172             for peer2, share_hash in desired:
173                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
174                     continue
175                 last_request_time, count = requested.get(share_hash, (None, 0))
176                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
177                     continue
178                 potential_peers = set()
179                 for head in tracker.tails[share_hash]:
180                     potential_peers.update(peer_heads.get(head, set()))
181                 potential_peers = [peer for peer in potential_peers if peer.connected2]
182                 if count == 0 and peer2 is not None and peer2.connected2:
183                     peer = peer2
184                 else:
185                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
186                     if peer is None:
187                         continue
188                 
189                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
190                 peer.send_getshares(
191                     hashes=[share_hash],
192                     parents=2000,
193                     stops=list(set(tracker.heads) | set(
194                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
195                     ))[:100],
196                 )
197                 requested[share_hash] = t, count + 1
198         pre_current_work.changed.watch(lambda _: set_real_work2())
199         pre_merged_work.changed.watch(lambda _: set_real_work2())
200         ht.updated.watch(set_real_work2)
201         
202         print 'Initializing work...'
203         yield set_real_work1()
204         print '    ...success!'
205         print
206         
207         @defer.inlineCallbacks
208         def set_merged_work():
209             if not args.merged_url:
210                 return
211             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
212             while True:
213                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
214                 pre_merged_work.set(dict(
215                     hash=int(auxblock['hash'], 16),
216                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
217                     chain_id=auxblock['chainid'],
218                 ))
219                 yield deferral.sleep(1)
220         set_merged_work()
221         
222         start_time = time.time() - current_work2.value['clock_offset']
223         
224         # setup p2p logic and join p2pool network
225         
226         def p2p_shares(shares, peer=None):
227             if len(shares) > 5:
228                 print 'Processing %i shares...' % (len(shares),)
229             
230             new_count = 0
231             for share in shares:
232                 if share.hash in tracker.shares:
233                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
234                     continue
235                 
236                 new_count += 1
237                 
238                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
239                 
240                 tracker.add(share)
241             
242             if shares and peer is not None:
243                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
244             
245             if new_count:
246                 set_real_work2()
247             
248             if len(shares) > 5:
249                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
250         
251         @tracker.verified.added.watch
252         def _(share):
253             if share.pow_hash <= share.header['target']:
254                 if factory.conn.value is not None:
255                     factory.conn.value.send_block(block=share.as_block(tracker, net))
256                 else:
257                     print 'No bitcoind connection! Erp!'
258                 print
259                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
260                 print
261         
262         def p2p_share_hashes(share_hashes, peer):
263             t = time.time()
264             get_hashes = []
265             for share_hash in share_hashes:
266                 if share_hash in tracker.shares:
267                     continue
268                 last_request_time, count = requested.get(share_hash, (None, 0))
269                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
270                     continue
271                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
272                 get_hashes.append(share_hash)
273                 requested[share_hash] = t, count + 1
274             
275             if share_hashes and peer is not None:
276                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
277             if get_hashes:
278                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
279         
280         def p2p_get_shares(share_hashes, parents, stops, peer):
281             parents = min(parents, 1000//len(share_hashes))
282             stops = set(stops)
283             shares = []
284             for share_hash in share_hashes:
285                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
286                     if share.hash in stops:
287                         break
288                     shares.append(share)
289             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
290             peer.sendShares(shares)
291         
292         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
293         
294         def parse(x):
295             if ':' in x:
296                 ip, port = x.split(':')
297                 return ip, int(port)
298             else:
299                 return x, net.P2P_PORT
300         
301         nodes = set([
302             ('72.14.191.28', net.P2P_PORT),
303             ('62.204.197.159', net.P2P_PORT),
304             ('142.58.248.28', net.P2P_PORT),
305             ('94.23.34.145', net.P2P_PORT),
306         ])
307         for host in [
308             'p2pool.forre.st',
309             'dabuttonfactory.com',
310         ]:
311             try:
312                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
313             except:
314                 log.err(None, 'Error resolving bootstrap node IP:')
315         
316         if net.NAME == 'litecoin':
317             nodes.add(((yield reactor.resolve('liteco.in')), net.P2P_PORT))
318         
319         p2p_node = p2p.Node(
320             current_work=current_work,
321             port=args.p2pool_port,
322             net=net,
323             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), net.NAME),
324             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
325         )
326         p2p_node.handle_shares = p2p_shares
327         p2p_node.handle_share_hashes = p2p_share_hashes
328         p2p_node.handle_get_shares = p2p_get_shares
329         
330         p2p_node.start()
331         
332         # send share when the chain changes to their chain
333         def work_changed(new_work):
334             #print 'Work changed:', new_work
335             shares = []
336             for share in tracker.get_chain_known(new_work['best_share_hash']):
337                 if share.hash in shared_share_hashes:
338                     break
339                 shared_share_hashes.add(share.hash)
340                 shares.append(share)
341             
342             for peer in p2p_node.peers.itervalues():
343                 peer.sendShares([share for share in shares if share.peer is not peer])
344         
345         current_work.changed.watch(work_changed)
346         
347         def save_shares():
348             for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH):
349                 ss.add_share(share)
350                 if share.hash in tracker.verified.shares:
351                     ss.add_verified_hash(share.hash)
352         task.LoopingCall(save_shares).start(60)
353         
354         print '    ...success!'
355         print
356         
357         @defer.inlineCallbacks
358         def upnp_thread():
359             while True:
360                 try:
361                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
362                     if is_lan:
363                         pm = yield portmapper.get_port_mapper()
364                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
365                 except defer.TimeoutError:
366                     pass
367                 except:
368                     if p2pool.DEBUG:
369                         log.err(None, "UPnP error:")
370                 yield deferral.sleep(random.expovariate(1/120))
371         
372         if args.upnp:
373             upnp_thread()
374         
375         # start listening for workers with a JSON-RPC server
376         
377         print 'Listening for workers on port %i...' % (args.worker_port,)
378         
379         # setup worker logic
380         
381         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
382         run_identifier = struct.pack('<I', random.randrange(2**32))
383         
384         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
385         removed_unstales = set()
386         def get_share_counts(doa=False):
387             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
388             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
389             shares_in_chain = my_shares & matching_in_chain
390             stale_shares = my_shares - matching_in_chain
391             if doa:
392                 stale_doa_shares = stale_shares & doa_shares
393                 stale_not_doa_shares = stale_shares - stale_doa_shares
394                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
395             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
396         @tracker.verified.removed.watch
397         def _(share):
398             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
399                 removed_unstales.add(share.hash)
400         
401         
402         def get_payout_script_from_username(request):
403             user = worker_interface.get_username(request)
404             if user is None:
405                 return None
406             try:
407                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, net))
408             except: # XXX blah
409                 return None
410         
411         def compute(request):
412             state = current_work.value
413             
414             payout_script = get_payout_script_from_username(request)
415             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
416                 payout_script = my_script
417             
418             if len(p2p_node.peers) == 0 and net.PERSIST:
419                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
420             if state['best_share_hash'] is None and net.PERSIST:
421                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
422             if time.time() > current_work2.value['last_update'] + 60:
423                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
424             
425             previous_share = None if state['best_share_hash'] is None else tracker.shares[state['best_share_hash']]
426             subsidy = current_work2.value['subsidy']
427             share_info, generate_tx = p2pool_data.generate_transaction(
428                 tracker=tracker,
429                 share_data=dict(
430                     previous_share_hash=state['best_share_hash'],
431                     coinbase='' if state['aux_work'] is None else '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
432                     nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
433                     new_script=payout_script,
434                     subsidy=subsidy,
435                     donation=math.perfect_round(65535*args.donation_percentage/100),
436                     stale_frac=(lambda shares, stales:
437                         255 if shares == 0 else math.perfect_round(254*stales/shares)
438                     )(*get_share_counts()),
439                 ),
440                 block_target=state['target'],
441                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
442                 net=net,
443             )
444             
445             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
446                 bitcoin_data.target_to_difficulty(share_info['target']),
447                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - subsidy//200)*1e-8, net.BITCOIN_SYMBOL,
448                 subsidy*1e-8, net.BITCOIN_SYMBOL,
449                 len(current_work2.value['transactions']),
450             )
451             
452             transactions = [generate_tx] + list(current_work2.value['transactions'])
453             merkle_root = bitcoin_data.merkle_hash(transactions)
454             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
455             
456             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, current_work2.value['time'], state['target'], share_info['target']), state['best_share_hash']
457         
458         my_shares = set()
459         doa_shares = set()
460         
461         def got_response(header, request):
462             try:
463                 user = worker_interface.get_username(request)
464                 # match up with transactions
465                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
466                 if xxx is None:
467                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
468                     return False
469                 share_info, transactions, getwork_time = xxx
470                 
471                 hash_ = bitcoin_data.block_header_type.hash256(header)
472                 
473                 pow_hash = net.BITCOIN_POW_FUNC(header)
474                 
475                 if pow_hash <= header['target'] or p2pool.DEBUG:
476                     if factory.conn.value is not None:
477                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
478                     else:
479                         print 'No bitcoind connection! Erp!'
480                     if pow_hash <= header['target']:
481                         print
482                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
483                         print
484                 
485                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
486                     try:
487                         aux_pow = dict(
488                             merkle_tx=dict(
489                                 tx=transactions[0],
490                                 block_hash=hash_,
491                                 merkle_branch=[x['hash'] for x in p2pool_data.calculate_merkle_branch(transactions, 0)],
492                                 index=0,
493                             ),
494                             merkle_branch=[],
495                             index=0,
496                             parent_block_header=header,
497                         )
498                         
499                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
500                         #print a, b
501                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
502                         def _(res):
503                             print "MERGED RESULT:", res
504                         merged.rpc_getauxblock(a, b).addBoth(_)
505                     except:
506                         log.err(None, 'Error while processing merged mining POW:')
507                 
508                 target = share_info['target']
509                 if pow_hash > target:
510                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, target)
511                     return False
512                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
513                 my_shares.add(share.hash)
514                 if share.previous_hash != current_work.value['best_share_hash']:
515                     doa_shares.add(share.hash)
516                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
517                 good = share.previous_hash == current_work.value['best_share_hash']
518                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
519                 p2p_shares([share])
520                 # eg. good = share.hash == current_work.value['best_share_hash'] here
521                 return good
522             except:
523                 log.err(None, 'Error processing data received from worker:')
524                 return False
525         
526         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
527         
528         def get_rate():
529             if current_work.value['best_share_hash'] is not None:
530                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
531                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
532                 fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
533                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
534             return json.dumps(None)
535         
536         def get_users():
537             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
538             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
539             res = {}
540             for script in sorted(weights, key=lambda s: weights[s]):
541                 res[bitcoin_data.script2_to_human(script, net)] = weights[script]/total_weight
542             return json.dumps(res)
543         
544         class WebInterface(resource.Resource):
545             def __init__(self, func, mime_type):
546                 self.func, self.mime_type = func, mime_type
547             
548             def render_GET(self, request):
549                 request.setHeader('Content-Type', self.mime_type)
550                 return self.func()
551         
552         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
553         web_root.putChild('users', WebInterface(get_users, 'application/json'))
554         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
555         if draw is not None:
556             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
557         
558         reactor.listenTCP(args.worker_port, server.Site(web_root))
559         
560         print '    ...success!'
561         print
562         
563         # done!
564         
565         # do new getwork when a block is heard on the p2p interface
566         
567         def new_block(block_hash):
568             work_updated.happened()
569         factory.new_block.watch(new_block)
570         
571         print 'Started successfully!'
572         print
573         
574         @defer.inlineCallbacks
575         def work1_thread():
576             while True:
577                 flag = work_updated.get_deferred()
578                 try:
579                     yield set_real_work1()
580                 except:
581                     log.err()
582                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
583         
584         @defer.inlineCallbacks
585         def work2_thread():
586             while True:
587                 try:
588                     set_real_work2()
589                 except:
590                     log.err()
591                 yield deferral.sleep(random.expovariate(1/20))
592         
593         work1_thread()
594         work2_thread()
595         
596         
597         if hasattr(signal, 'SIGALRM'):
598             def watchdog_handler(signum, frame):
599                 print 'Watchdog timer went off at:'
600                 traceback.print_stack()
601             
602             signal.signal(signal.SIGALRM, watchdog_handler)
603             task.LoopingCall(signal.alarm, 30).start(1)
604         
605         last_str = None
606         last_time = 0
607         while True:
608             yield deferral.sleep(3)
609             try:
610                 if time.time() > current_work2.value['last_update'] + 60:
611                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
612                 if current_work.value['best_share_hash'] is not None:
613                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
614                     if height > 2:
615                         att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
616                         weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
617                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
618                         stale_shares = stale_doa_shares + stale_not_doa_shares
619                         fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
620                         this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
621                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
622                             height,
623                             len(tracker.verified.shares),
624                             len(tracker.shares),
625                             weights.get(my_script, 0)/total_weight*100,
626                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
627                             shares,
628                             stale_not_doa_shares,
629                             stale_doa_shares,
630                             len(p2p_node.peers),
631                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
632                         if fracs:
633                             med = math.median(fracs)
634                             this_str += '\nPool stales: %i%%' % (int(100*med+.5),)
635                             conf = 0.9
636                             if shares:
637                                 this_str += ' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
638                                 if med < .99:
639                                     this_str += ' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - med) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
640                                 this_str += ' (%i%% confidence)' % (int(100*conf+.5),)
641                         if this_str != last_str or time.time() > last_time + 15:
642                             print this_str
643                             last_str = this_str
644                             last_time = time.time()
645             
646             
647             except:
648                 log.err()
649     except:
650         log.err(None, 'Fatal error:')
651     finally:
652         reactor.stop()
653
654 def run():
655     class FixedArgumentParser(argparse.ArgumentParser):
656         def _read_args_from_files(self, arg_strings):
657             # expand arguments referencing files
658             new_arg_strings = []
659             for arg_string in arg_strings:
660                 
661                 # for regular arguments, just add them back into the list
662                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
663                     new_arg_strings.append(arg_string)
664                 
665                 # replace arguments referencing files with the file content
666                 else:
667                     try:
668                         args_file = open(arg_string[1:])
669                         try:
670                             arg_strings = []
671                             for arg_line in args_file.read().splitlines():
672                                 for arg in self.convert_arg_line_to_args(arg_line):
673                                     arg_strings.append(arg)
674                             arg_strings = self._read_args_from_files(arg_strings)
675                             new_arg_strings.extend(arg_strings)
676                         finally:
677                             args_file.close()
678                     except IOError:
679                         err = sys.exc_info()[1]
680                         self.error(str(err))
681             
682             # return the modified argument list
683             return new_arg_strings
684         
685         def convert_arg_line_to_args(self, arg_line):
686             return [arg for arg in arg_line.split() if arg.strip()]
687     
688     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
689     parser.add_argument('--version', action='version', version=p2pool.__version__)
690     parser.add_argument('--net',
691         help='use specified network (default: bitcoin)',
692         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
693     parser.add_argument('--testnet',
694         help='''use the network's testnet''',
695         action='store_const', const=True, default=False, dest='testnet')
696     parser.add_argument('--debug',
697         help='debugging mode',
698         action='store_const', const=True, default=False, dest='debug')
699     parser.add_argument('-a', '--address',
700         help='generate to this address (defaults to requesting one from bitcoind)',
701         type=str, action='store', default=None, dest='address')
702     parser.add_argument('--logfile',
703         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
704         type=str, action='store', default=None, dest='logfile')
705     parser.add_argument('--merged-url',
706         help='call getauxblock on this url to get work for merged mining',
707         type=str, action='store', default=None, dest='merged_url')
708     parser.add_argument('--merged-userpass',
709         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
710         type=str, action='store', default=None, dest='merged_userpass')
711     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
712         help='percentage amount to donate to author of p2pool. Default: 0.5',
713         type=float, action='store', default=0.5, dest='donation_percentage')
714     
715     p2pool_group = parser.add_argument_group('p2pool interface')
716     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
717         help='use TCP port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
718         type=int, action='store', default=None, dest='p2pool_port')
719     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
720         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to default p2pool P2P port), in addition to builtin addresses',
721         type=str, action='append', default=[], dest='p2pool_nodes')
722     parser.add_argument('--disable-upnp',
723         help='''don't attempt to forward p2pool P2P port from the WAN to this computer using UPnP''',
724         action='store_false', default=True, dest='upnp')
725     
726     worker_group = parser.add_argument_group('worker interface')
727     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
728         help='listen on PORT for RPC connections from miners asking for work and providing responses (default:%s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
729         type=int, action='store', default=None, dest='worker_port')
730     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
731         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee . default: 0''',
732         type=float, action='store', default=0, dest='worker_fee')
733     
734     bitcoind_group = parser.add_argument_group('bitcoind interface')
735     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
736         help='connect to a bitcoind at this address (default: 127.0.0.1)',
737         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
738     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
739         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getmemorypool (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_RPC_PORT) for _, n in sorted(networks.realnets.items())),
740         type=int, action='store', default=None, dest='bitcoind_rpc_port')
741     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
742         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_P2P_PORT) for _, n in sorted(networks.realnets.items())),
743         type=int, action='store', default=None, dest='bitcoind_p2p_port')
744     
745     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
746         help='bitcoind RPC interface username (default: empty)',
747         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
748     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
749         help='bitcoind RPC interface password',
750         type=str, action='store', dest='bitcoind_rpc_password')
751     
752     args = parser.parse_args()
753     
754     if args.debug:
755         p2pool.DEBUG = True
756     
757     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
758     
759     if args.logfile is None:
760         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), net.NAME + '.log')
761     
762     class LogFile(object):
763         def __init__(self, filename):
764             self.filename = filename
765             self.inner_file = None
766             self.reopen()
767         def reopen(self):
768             if self.inner_file is not None:
769                 self.inner_file.close()
770             open(self.filename, 'a').close()
771             f = open(self.filename, 'rb')
772             f.seek(0, os.SEEK_END)
773             length = f.tell()
774             if length > 100*1000*1000:
775                 f.seek(-1000*1000, os.SEEK_END)
776                 while True:
777                     if f.read(1) in ('', '\n'):
778                         break
779                 data = f.read()
780                 f.close()
781                 f = open(self.filename, 'wb')
782                 f.write(data)
783             f.close()
784             self.inner_file = open(self.filename, 'a')
785         def write(self, data):
786             self.inner_file.write(data)
787         def flush(self):
788             self.inner_file.flush()
789     class TeePipe(object):
790         def __init__(self, outputs):
791             self.outputs = outputs
792         def write(self, data):
793             for output in self.outputs:
794                 output.write(data)
795         def flush(self):
796             for output in self.outputs:
797                 output.flush()
798     class TimestampingPipe(object):
799         def __init__(self, inner_file):
800             self.inner_file = inner_file
801             self.buf = ''
802             self.softspace = 0
803         def write(self, data):
804             buf = self.buf + data
805             lines = buf.split('\n')
806             for line in lines[:-1]:
807                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
808                 self.inner_file.flush()
809             self.buf = lines[-1]
810         def flush(self):
811             pass
812     logfile = LogFile(args.logfile)
813     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
814     if hasattr(signal, "SIGUSR1"):
815         def sigusr1(signum, frame):
816             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
817             logfile.reopen()
818             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
819         signal.signal(signal.SIGUSR1, sigusr1)
820     task.LoopingCall(logfile.reopen).start(5)
821     
822     if args.bitcoind_rpc_port is None:
823         args.bitcoind_rpc_port = net.BITCOIN_RPC_PORT
824     
825     if args.bitcoind_p2p_port is None:
826         args.bitcoind_p2p_port = net.BITCOIN_P2P_PORT
827     
828     if args.p2pool_port is None:
829         args.p2pool_port = net.P2P_PORT
830     
831     if args.worker_port is None:
832         args.worker_port = net.WORKER_PORT
833     
834     if args.address is not None:
835         try:
836             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net)
837         except Exception, e:
838             parser.error('error parsing address: ' + repr(e))
839     else:
840         args.pubkey_hash = None
841     
842     if (args.merged_url is None) ^ (args.merged_userpass is None):
843         parser.error('must specify --merged-url and --merged-userpass')
844     
845     reactor.callWhenRunning(main, args, net)
846     reactor.run()