moved worker_interface.py to bitcoin/
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from bitcoin import worker_interface
25 from util import db, expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists
27 import p2pool.data as p2pool
28 import p2pool as p2pool_init
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind, ht, net):
33     work = yield bitcoind.rpc_getmemorypool()
34     defer.returnValue(dict(
35         version=work['version'],
36         previous_block_hash=int(work['previousblockhash'], 16),
37         transactions=[bitcoin.data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38         subsidy=work['coinbasevalue'],
39         time=work['time'],
40         target=bitcoin.data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin.data.FloatingInteger(work['bits']),
41     ))
42
43 @deferral.retry('Error getting payout script from bitcoind:', 1)
44 @defer.inlineCallbacks
45 def get_payout_script(factory):
46     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
47     if res['reply'] == 'success':
48         defer.returnValue(res['script'])
49     elif res['reply'] == 'denied':
50         defer.returnValue(None)
51     else:
52         raise ValueError('Unexpected reply: %r' % (res,))
53
54 @deferral.retry('Error creating payout script:', 10)
55 @defer.inlineCallbacks
56 def get_payout_script2(bitcoind, net):
57     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
58
59 @defer.inlineCallbacks
60 def main(args):
61     try:
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         try:
65             from . import draw
66         except ImportError:
67             draw = None
68             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
69             print
70         
71         # connect to bitcoind over JSON-RPC and do initial getwork
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
75         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
76         if not good:
77             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
78             return
79         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
80         print '    ...success!'
81         print '    Current block hash: %x' % (temp_work.previous_block,)
82         print
83         
84         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
85         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
86         factory = bitcoin.p2p.ClientFactory(args.net)
87         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
88         my_script = yield get_payout_script(factory)
89         if args.pubkey_hash is None:
90             if my_script is None:
91                 print '    IP transaction denied ... falling back to sending to address.'
92                 my_script = yield get_payout_script2(bitcoind, args.net)
93         else:
94             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
95         print '    ...success!'
96         print '    Payout script:', bitcoin.data.script2_to_human(my_script, args.net)
97         print
98         
99         print 'Loading cached block headers...'
100         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
101         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
102         print
103         
104         tracker = p2pool.OkayTracker(args.net)
105         shared_share_hashes = set()
106         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
107         known_verified = set()
108         print "Loading shares..."
109         for i, (mode, contents) in enumerate(ss.get_shares()):
110             if mode == 'share':
111                 if contents.hash in tracker.shares:
112                     continue
113                 shared_share_hashes.add(contents.hash)
114                 contents.time_seen = 0
115                 tracker.add(contents)
116                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
117                     print "    %i" % (len(tracker.shares),)
118             elif mode == 'verified_hash':
119                 known_verified.add(contents)
120             else:
121                 raise AssertionError()
122         print "    ...inserting %i verified shares..." % (len(known_verified),)
123         for h in known_verified:
124             if h not in tracker.shares:
125                 ss.forget_verified_share(h)
126                 continue
127             tracker.verified.add(tracker.shares[h])
128         print "    ...done loading %i shares!" % (len(tracker.shares),)
129         print
130         tracker.added.watch(lambda share: ss.add_share(share))
131         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
132         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
133         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
134         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
135         
136         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
137         
138         # information affecting work that should trigger a long-polling update
139         current_work = variable.Variable(None)
140         # information affecting work that should not trigger a long-polling update
141         current_work2 = variable.Variable(None)
142         
143         work_updated = variable.Event()
144         
145         requested = expiring_dict.ExpiringDict(300)
146         
147         @defer.inlineCallbacks
148         def set_real_work1():
149             work = yield getwork(bitcoind, ht, args.net)
150             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
151             current_work.set(dict(
152                 version=work['version'],
153                 previous_block=work['previous_block_hash'],
154                 target=work['target'],
155                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
156                 aux_work=current_work.value['aux_work'] if current_work.value is not None else None,
157             ))
158             current_work2.set(dict(
159                 time=work['time'],
160                 transactions=work['transactions'],
161                 subsidy=work['subsidy'],
162                 clock_offset=time.time() - work['time'],
163                 last_update=time.time(),
164             ))
165             if changed:
166                 set_real_work2()
167         
168         def set_real_work2():
169             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
170             
171             t = dict(current_work.value)
172             t['best_share_hash'] = best
173             current_work.set(t)
174             
175             t = time.time()
176             for peer2, share_hash in desired:
177                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
178                     continue
179                 last_request_time, count = requested.get(share_hash, (None, 0))
180                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
181                     continue
182                 potential_peers = set()
183                 for head in tracker.tails[share_hash]:
184                     potential_peers.update(peer_heads.get(head, set()))
185                 potential_peers = [peer for peer in potential_peers if peer.connected2]
186                 if count == 0 and peer2 is not None and peer2.connected2:
187                     peer = peer2
188                 else:
189                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
190                     if peer is None:
191                         continue
192                 
193                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
194                 peer.send_getshares(
195                     hashes=[share_hash],
196                     parents=2000,
197                     stops=list(set(tracker.heads) | set(
198                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
199                     ))[:100],
200                 )
201                 requested[share_hash] = t, count + 1
202         
203         print 'Initializing work...'
204         yield set_real_work1()
205         set_real_work2()
206         print '    ...success!'
207         print
208         
209         @defer.inlineCallbacks
210         def set_merged_work():
211             if not args.merged_url:
212                 return
213             while True:
214                 merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
215                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
216                 x = dict(current_work.value)
217                 x['aux_work'] = dict(
218                     hash=int(auxblock['hash'], 16),
219                     target=bitcoin.data.HashType().unpack(auxblock['target'].decode('hex')),
220                     chain_id=auxblock['chainid'],
221                 )
222                 #print x['aux_work']
223                 current_work.set(x)
224                 yield deferral.sleep(1)
225         set_merged_work()
226         
227         start_time = time.time() - current_work2.value['clock_offset']
228         
229         # setup p2p logic and join p2pool network
230         
231         def share_share(share, ignore_peer=None):
232             for peer in p2p_node.peers.itervalues():
233                 if peer is ignore_peer:
234                     continue
235                 #if p2pool_init.DEBUG:
236                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
237                 peer.sendShares([share])
238             shared_share_hashes.add(share.hash)
239         
240         def p2p_shares(shares, peer=None):
241             if len(shares) > 5:
242                 print 'Processing %i shares...' % (len(shares),)
243             
244             new_count = 0
245             for share in shares:
246                 if share.hash in tracker.shares:
247                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
248                     continue
249                 
250                 new_count += 1
251                 
252                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
253                 
254                 tracker.add(share)
255             
256             if shares and peer is not None:
257                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
258             
259             if new_count:
260                 set_real_work2()
261             
262             if len(shares) > 5:
263                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*args.net.CHAIN_LENGTH)
264         
265         @tracker.verified.added.watch
266         def _(share):
267             if share.pow_hash <= share.header['target']:
268                 if factory.conn.value is not None:
269                     factory.conn.value.send_block(block=share.as_block(tracker, args.net))
270                 else:
271                     print 'No bitcoind connection! Erp!'
272                 print
273                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.header_hash,)
274                 print
275         
276         def p2p_share_hashes(share_hashes, peer):
277             t = time.time()
278             get_hashes = []
279             for share_hash in share_hashes:
280                 if share_hash in tracker.shares:
281                     continue
282                 last_request_time, count = requested.get(share_hash, (None, 0))
283                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
284                     continue
285                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
286                 get_hashes.append(share_hash)
287                 requested[share_hash] = t, count + 1
288             
289             if share_hashes and peer is not None:
290                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
291             if get_hashes:
292                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
293         
294         def p2p_get_shares(share_hashes, parents, stops, peer):
295             parents = min(parents, 1000//len(share_hashes))
296             stops = set(stops)
297             shares = []
298             for share_hash in share_hashes:
299                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
300                     if share.hash in stops:
301                         break
302                     shares.append(share)
303             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304             peer.sendShares(shares, full=True)
305         
306         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
307         
308         def parse(x):
309             if ':' in x:
310                 ip, port = x.split(':')
311                 return ip, int(port)
312             else:
313                 return x, args.net.P2P_PORT
314         
315         nodes = set([
316             ('72.14.191.28', args.net.P2P_PORT),
317             ('62.204.197.159', args.net.P2P_PORT),
318             ('142.58.248.28', args.net.P2P_PORT),
319             ('94.23.34.145', args.net.P2P_PORT),
320         ])
321         for host in [
322             'p2pool.forre.st',
323             'dabuttonfactory.com',
324         ]:
325             try:
326                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
327             except:
328                 log.err(None, 'Error resolving bootstrap node IP:')
329
330         if args.net_name == 'litecoin':
331             nodes.add(((yield reactor.resolve('liteco.in')), args.net.P2P_PORT))
332         
333         p2p_node = p2p.Node(
334             current_work=current_work,
335             port=args.p2pool_port,
336             net=args.net,
337             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
338             mode=0 if args.low_bandwidth else 1,
339             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
340         )
341         p2p_node.handle_shares = p2p_shares
342         p2p_node.handle_share_hashes = p2p_share_hashes
343         p2p_node.handle_get_shares = p2p_get_shares
344         
345         p2p_node.start()
346         
347         # send share when the chain changes to their chain
348         def work_changed(new_work):
349             #print 'Work changed:', new_work
350             for share in tracker.get_chain_known(new_work['best_share_hash']):
351                 if share.hash in shared_share_hashes:
352                     break
353                 share_share(share, share.peer)
354         current_work.changed.watch(work_changed)
355         
356         print '    ...success!'
357         print
358         
359         @defer.inlineCallbacks
360         def upnp_thread():
361             while True:
362                 try:
363                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
364                     if is_lan:
365                         pm = yield portmapper.get_port_mapper()
366                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
367                 except defer.TimeoutError:
368                     pass
369                 except:
370                     if p2pool_init.DEBUG:
371                         log.err(None, "UPnP error:")
372                 yield deferral.sleep(random.expovariate(1/120))
373         
374         if args.upnp:
375             upnp_thread()
376         
377         # start listening for workers with a JSON-RPC server
378         
379         print 'Listening for workers on port %i...' % (args.worker_port,)
380         
381         # setup worker logic
382         
383         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
384         run_identifier = struct.pack('<I', random.randrange(2**32))
385         
386         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
387         removed_unstales = set()
388         def get_share_counts(doa=False):
389             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
390             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
391             shares_in_chain = my_shares & matching_in_chain
392             stale_shares = my_shares - matching_in_chain
393             if doa:
394                 stale_doa_shares = stale_shares & doa_shares
395                 stale_not_doa_shares = stale_shares - stale_doa_shares
396                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
397             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
398         @tracker.verified.removed.watch
399         def _(share):
400             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
401                 removed_unstales.add(share.hash)
402         
403         
404         def get_payout_script_from_username(request):
405             user = worker_interface.get_username(request)
406             if user is None:
407                 return None
408             try:
409                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, args.net))
410             except: # XXX blah
411                 return None
412         
413         def compute(state, request):
414             payout_script = get_payout_script_from_username(request)
415             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
416                 payout_script = my_script
417             if state['best_share_hash'] is None and args.net.PERSIST:
418                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
419             if len(p2p_node.peers) == 0 and args.net.PERSIST:
420                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
421             if time.time() > current_work2.value['last_update'] + 60:
422                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
423             
424             if state['aux_work'] is not None:
425                 aux_str = '\xfa\xbemm' + bitcoin.data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)
426             else:
427                 aux_str = ''
428             
429             # XXX assuming generate_tx is smallish here..
430             def get_stale_frac():
431                 shares, stale_shares = get_share_counts()
432                 if shares == 0:
433                     return ""
434                 frac = stale_shares/shares
435                 return 2*struct.pack('<H', int(65535*frac + .5))
436             subsidy = current_work2.value['subsidy']
437             
438             
439             if int(time.time() - current_work2.value['clock_offset']) >= p2pool.TRANSITION_TIME:
440                 timestamp = current_work2.value['time']
441                 is_new = True
442                 previous_share = tracker.shares[state['best_share_hash']] if state['best_share_hash'] is not None else None
443                 new_share_info, generate_tx = p2pool.new_generate_transaction(
444                     tracker=tracker,
445                     new_share_data=dict(
446                         previous_share_hash=state['best_share_hash'],
447                         coinbase=aux_str,
448                         nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
449                         new_script=payout_script,
450                         subsidy=subsidy,
451                         donation=math.perfect_round(65535*args.donation_percentage/100),
452                         stale_frac=(lambda shares, stales:
453                             255 if shares == 0 else math.perfect_round(254*stales/shares)
454                         )(*get_share_counts()),
455                     ),
456                     block_target=state['target'],
457                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
458                     net=args.net,
459                 )
460             else:
461                 timestamp = int(time.time() - current_work2.value['clock_offset'])
462                 if state['best_share_hash'] is not None:
463                     timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
464                     if timestamp2 > timestamp:
465                         print 'Toff', timestamp2 - timestamp
466                         timestamp = timestamp2
467                 is_new = False
468                 share_info, generate_tx = p2pool.generate_transaction(
469                     tracker=tracker,
470                     previous_share_hash=state['best_share_hash'],
471                     new_script=payout_script,
472                     subsidy=subsidy,
473                     nonce=run_identifier + struct.pack('<H', random.randrange(2**16)) + aux_str + get_stale_frac(),
474                     block_target=state['target'],
475                     net=args.net,
476                 )
477             
478             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (bitcoin.data.target_to_difficulty((new_share_info if is_new else share_info['share_data'])['target']), (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) -subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, subsidy*1e-8, args.net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
479             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
480             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
481             transactions = [generate_tx] + list(current_work2.value['transactions'])
482             merkle_root = bitcoin.data.merkle_hash(transactions)
483             merkle_root_to_transactions[merkle_root] = is_new, new_share_info if is_new else share_info, transactions
484             
485             target2 = (new_share_info if is_new else share_info['share_data'])['target']
486             times[merkle_root] = time.time()
487             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
488             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
489         
490         my_shares = set()
491         doa_shares = set()
492         times = {}
493         
494         def got_response(header, request):
495             try:
496                 user = worker_interface.get_username(request)
497                 # match up with transactions
498                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
499                 if xxx is None:
500                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
501                     return False
502                 is_new, share_info, transactions = xxx
503                 new_share_info = share_info
504                 
505                 hash_ = bitcoin.data.block_header_type.hash256(header)
506                 
507                 pow_hash = args.net.BITCOIN_POW_FUNC(header)
508                 
509                 if pow_hash <= header['target'] or p2pool_init.DEBUG:
510                     if factory.conn.value is not None:
511                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
512                     else:
513                         print 'No bitcoind connection! Erp!'
514                     if pow_hash <= header['target']:
515                         print
516                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
517                         print
518                 
519                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
520                     try:
521                         aux_pow = dict(
522                             merkle_tx=dict(
523                                 tx=transactions[0],
524                                 block_hash=hash_,
525                                 merkle_branch=[x['hash'] for x in p2pool.calculate_merkle_branch(transactions, 0)],
526                                 index=0,
527                             ),
528                             merkle_branch=[],
529                             index=0,
530                             parent_block_header=header,
531                         )
532                         
533                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin.data.aux_pow_type.pack(aux_pow).encode('hex')
534                         #print a, b
535                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
536                         def _(res):
537                             print "MERGED RESULT:", res
538                         merged.rpc_getauxblock(a, b).addBoth(_)
539                     except:
540                         log.err(None, 'Error while processing merged mining POW:')
541                 
542                 target = (new_share_info if is_new else share_info['share_data'])['target']
543                 if pow_hash > target:
544                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, target)
545                     return False
546                 if is_new:
547                     share = p2pool.NewShare(args.net, header, new_share_info, other_txs=transactions[1:])
548                 else:
549                     share = p2pool.Share(args.net, header, share_info, other_txs=transactions[1:])
550                 my_shares.add(share.hash)
551                 if share.previous_hash != current_work.value['best_share_hash']:
552                     doa_shares.add(share.hash)
553                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[header['merkle_root']]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
554                 good = share.previous_hash == current_work.value['best_share_hash']
555                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
556                 p2p_shares([share])
557                 # eg. good = share.hash == current_work.value['best_share_hash'] here
558                 return good
559             except:
560                 log.err(None, 'Error processing data received from worker:')
561                 return False
562         
563         web_root = worker_interface.WorkerInterface(current_work, compute, got_response)
564         
565         def get_rate():
566             if current_work.value['best_share_hash'] is not None:
567                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
568                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
569                 fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
570                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
571             return json.dumps(None)
572         
573         def get_users():
574             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
575             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
576             res = {}
577             for script in sorted(weights, key=lambda s: weights[s]):
578                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
579             return json.dumps(res)
580         
581         class WebInterface(resource.Resource):
582             def __init__(self, func, mime_type):
583                 self.func, self.mime_type = func, mime_type
584             
585             def render_GET(self, request):
586                 request.setHeader('Content-Type', self.mime_type)
587                 return self.func()
588         
589         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
590         web_root.putChild('users', WebInterface(get_users, 'application/json'))
591         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
592         if draw is not None:
593             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
594         
595         reactor.listenTCP(args.worker_port, server.Site(web_root))
596         
597         print '    ...success!'
598         print
599         
600         # done!
601         
602         # do new getwork when a block is heard on the p2p interface
603         
604         def new_block(block_hash):
605             work_updated.happened()
606         factory.new_block.watch(new_block)
607         
608         print 'Started successfully!'
609         print
610         
611         ht.updated.watch(set_real_work2)
612         
613         @defer.inlineCallbacks
614         def work1_thread():
615             while True:
616                 flag = work_updated.get_deferred()
617                 try:
618                     yield set_real_work1()
619                 except:
620                     log.err()
621                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
622         
623         @defer.inlineCallbacks
624         def work2_thread():
625             while True:
626                 try:
627                     set_real_work2()
628                 except:
629                     log.err()
630                 yield deferral.sleep(random.expovariate(1/20))
631         
632         work1_thread()
633         work2_thread()
634         
635         
636         if hasattr(signal, 'SIGALRM'):
637             def watchdog_handler(signum, frame):
638                 print 'Watchdog timer went off at:'
639                 traceback.print_stack()
640             
641             signal.signal(signal.SIGALRM, watchdog_handler)
642             task.LoopingCall(signal.alarm, 30).start(1)
643         
644         
645         pool_str = None;
646         while True:
647             yield deferral.sleep(3)
648             try:
649                 if time.time() > current_work2.value['last_update'] + 60:
650                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
651                 if current_work.value['best_share_hash'] is not None:
652                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
653                     if height > 2:
654                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
655                         weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
656                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
657                         stale_shares = stale_doa_shares + stale_not_doa_shares
658                         fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
659                         str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
660                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
661                             height,
662                             len(tracker.verified.shares),
663                             len(tracker.shares),
664                             weights.get(my_script, 0)/total_weight*100,
665                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
666                             shares,
667                             stale_not_doa_shares,
668                             stale_doa_shares,
669                             len(p2p_node.peers),
670                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
671                         if (str != pool_str):
672                             print str;
673                             pool_str = str;
674                         if fracs:
675                             med = math.median(fracs)
676                             print 'Median stale proportion:', med
677                             if shares:
678                                 print '    Own:', stale_shares/shares
679                                 if med < .99:
680                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
681             
682             
683             except:
684                 log.err()
685     except:
686         log.err(None, 'Fatal error:')
687     finally:
688         reactor.stop()
689
690 def run():
691     class FixedArgumentParser(argparse.ArgumentParser):
692         def _read_args_from_files(self, arg_strings):
693             # expand arguments referencing files
694             new_arg_strings = []
695             for arg_string in arg_strings:
696                 
697                 # for regular arguments, just add them back into the list
698                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
699                     new_arg_strings.append(arg_string)
700                 
701                 # replace arguments referencing files with the file content
702                 else:
703                     try:
704                         args_file = open(arg_string[1:])
705                         try:
706                             arg_strings = []
707                             for arg_line in args_file.read().splitlines():
708                                 for arg in self.convert_arg_line_to_args(arg_line):
709                                     arg_strings.append(arg)
710                             arg_strings = self._read_args_from_files(arg_strings)
711                             new_arg_strings.extend(arg_strings)
712                         finally:
713                             args_file.close()
714                     except IOError:
715                         err = sys.exc_info()[1]
716                         self.error(str(err))
717             
718             # return the modified argument list
719             return new_arg_strings
720         
721         def convert_arg_line_to_args(self, arg_line):
722             return [arg for arg in arg_line.split() if arg.strip()]
723     
724     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
725     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
726     parser.add_argument('--net',
727         help='use specified network (default: bitcoin)',
728         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
729     parser.add_argument('--testnet',
730         help='''use the network's testnet''',
731         action='store_const', const=True, default=False, dest='testnet')
732     parser.add_argument('--debug',
733         help='debugging mode',
734         action='store_const', const=True, default=False, dest='debug')
735     parser.add_argument('-a', '--address',
736         help='generate to this address (defaults to requesting one from bitcoind)',
737         type=str, action='store', default=None, dest='address')
738     parser.add_argument('--logfile',
739         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
740         type=str, action='store', default=None, dest='logfile')
741     parser.add_argument('--merged-url',
742         help='call getauxblock on this url to get work for merged mining',
743         type=str, action='store', default=None, dest='merged_url')
744     parser.add_argument('--merged-userpass',
745         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
746         type=str, action='store', default=None, dest='merged_userpass')
747     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
748         help='percentage amount to donate to author of p2pool. Default: 0.5',
749         type=float, action='store', default=0.5, dest='donation_percentage')
750     
751     p2pool_group = parser.add_argument_group('p2pool interface')
752     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
753         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
754         type=int, action='store', default=None, dest='p2pool_port')
755     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
756         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
757         type=str, action='append', default=[], dest='p2pool_nodes')
758     parser.add_argument('-l', '--low-bandwidth',
759         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
760         action='store_true', default=False, dest='low_bandwidth')
761     parser.add_argument('--disable-upnp',
762         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
763         action='store_false', default=True, dest='upnp')
764     
765     worker_group = parser.add_argument_group('worker interface')
766     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
767         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329 solidcoin: 9328 litecoin: 9327, +10000 for testnets)',
768         type=int, action='store', default=None, dest='worker_port')
769     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
770         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:9332/fee . default: 0''',
771         type=float, action='store', default=0, dest='worker_fee')
772     
773     bitcoind_group = parser.add_argument_group('bitcoind interface')
774     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
775         help='connect to a bitcoind at this address (default: 127.0.0.1)',
776         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
777     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
778         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332 ixcoin: 8338 i0coin: 7332 litecoin: 9332)',
779         type=int, action='store', default=None, dest='bitcoind_rpc_port')
780     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
781         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 namecoin: 8334 ixcoin: 8337 i0coin: 7333 solidcoin: 7555 litecoin: 9333, +10000 for testnets)',
782         type=int, action='store', default=None, dest='bitcoind_p2p_port')
783     
784     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
785         help='bitcoind RPC interface username (default: empty)',
786         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
787     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
788         help='bitcoind RPC interface password',
789         type=str, action='store', dest='bitcoind_rpc_password')
790     
791     args = parser.parse_args()
792     
793     if args.debug:
794         p2pool_init.DEBUG = True
795     
796     if args.logfile is None:
797         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
798     
799     class LogFile(object):
800         def __init__(self, filename):
801             self.filename = filename
802             self.inner_file = None
803             self.reopen()
804         def reopen(self):
805             if self.inner_file is not None:
806                 self.inner_file.close()
807             open(self.filename, 'a').close()
808             f = open(self.filename, 'rb')
809             f.seek(0, os.SEEK_END)
810             length = f.tell()
811             if length > 100*1000*1000:
812                 f.seek(-1000*1000, os.SEEK_END)
813                 while True:
814                     if f.read(1) in ('', '\n'):
815                         break
816                 data = f.read()
817                 f.close()
818                 f = open(self.filename, 'wb')
819                 f.write(data)
820             f.close()
821             self.inner_file = open(self.filename, 'a')
822         def write(self, data):
823             self.inner_file.write(data)
824         def flush(self):
825             self.inner_file.flush()
826     class TeePipe(object):
827         def __init__(self, outputs):
828             self.outputs = outputs
829         def write(self, data):
830             for output in self.outputs:
831                 output.write(data)
832         def flush(self):
833             for output in self.outputs:
834                 output.flush()
835     class TimestampingPipe(object):
836         def __init__(self, inner_file):
837             self.inner_file = inner_file
838             self.buf = ''
839             self.softspace = 0
840         def write(self, data):
841             buf = self.buf + data
842             lines = buf.split('\n')
843             for line in lines[:-1]:
844                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
845                 self.inner_file.flush()
846             self.buf = lines[-1]
847         def flush(self):
848             pass
849     logfile = LogFile(args.logfile)
850     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
851     if hasattr(signal, "SIGUSR1"):
852         def sigusr1(signum, frame):
853             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
854             logfile.reopen()
855             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
856         signal.signal(signal.SIGUSR1, sigusr1)
857     task.LoopingCall(logfile.reopen).start(5)
858     
859     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
860     
861     if args.bitcoind_rpc_port is None:
862         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
863     
864     if args.bitcoind_p2p_port is None:
865         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
866     
867     if args.p2pool_port is None:
868         args.p2pool_port = args.net.P2P_PORT
869     
870     if args.worker_port is None:
871         args.worker_port = args.net.WORKER_PORT
872     
873     if args.address is not None:
874         try:
875             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
876         except Exception, e:
877             parser.error('error parsing address: ' + repr(e))
878     else:
879         args.pubkey_hash = None
880     
881     if (args.merged_url is None) ^ (args.merged_userpass is None):
882         parser.error('must specify --merged-url and --merged-userpass')
883     
884     reactor.callWhenRunning(main, args)
885     reactor.run()