Made p2pool work not pass through worker_interface
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from bitcoin import worker_interface
25 from util import db, expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists
27 import p2pool.data as p2pool
28 import p2pool as p2pool_init
29
30 @deferral.retry('Error getting work from bitcoind:', 3)
31 @defer.inlineCallbacks
32 def getwork(bitcoind, ht, net):
33     work = yield bitcoind.rpc_getmemorypool()
34     defer.returnValue(dict(
35         version=work['version'],
36         previous_block_hash=int(work['previousblockhash'], 16),
37         transactions=[bitcoin.data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38         subsidy=work['coinbasevalue'],
39         time=work['time'],
40         target=bitcoin.data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin.data.FloatingInteger(work['bits']),
41     ))
42
43 @deferral.retry('Error getting payout script from bitcoind:', 1)
44 @defer.inlineCallbacks
45 def get_payout_script(factory):
46     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
47     if res['reply'] == 'success':
48         defer.returnValue(res['script'])
49     elif res['reply'] == 'denied':
50         defer.returnValue(None)
51     else:
52         raise ValueError('Unexpected reply: %r' % (res,))
53
54 @deferral.retry('Error creating payout script:', 10)
55 @defer.inlineCallbacks
56 def get_payout_script2(bitcoind, net):
57     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
58
59 @defer.inlineCallbacks
60 def main(args):
61     try:
62         print 'p2pool (version %s)' % (p2pool_init.__version__,)
63         print
64         try:
65             from . import draw
66         except ImportError:
67             draw = None
68             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
69             print
70         
71         # connect to bitcoind over JSON-RPC and do initial getwork
72         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
73         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
74         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
75         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
76         if not good:
77             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
78             return
79         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
80         print '    ...success!'
81         print '    Current block hash: %x' % (temp_work.previous_block,)
82         print
83         
84         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
85         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
86         factory = bitcoin.p2p.ClientFactory(args.net)
87         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
88         my_script = yield get_payout_script(factory)
89         if args.pubkey_hash is None:
90             if my_script is None:
91                 print '    IP transaction denied ... falling back to sending to address.'
92                 my_script = yield get_payout_script2(bitcoind, args.net)
93         else:
94             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
95         print '    ...success!'
96         print '    Payout script:', bitcoin.data.script2_to_human(my_script, args.net)
97         print
98         
99         print 'Loading cached block headers...'
100         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
101         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
102         print
103         
104         tracker = p2pool.OkayTracker(args.net)
105         shared_share_hashes = set()
106         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
107         known_verified = set()
108         print "Loading shares..."
109         for i, (mode, contents) in enumerate(ss.get_shares()):
110             if mode == 'share':
111                 if contents.hash in tracker.shares:
112                     continue
113                 shared_share_hashes.add(contents.hash)
114                 contents.time_seen = 0
115                 tracker.add(contents)
116                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
117                     print "    %i" % (len(tracker.shares),)
118             elif mode == 'verified_hash':
119                 known_verified.add(contents)
120             else:
121                 raise AssertionError()
122         print "    ...inserting %i verified shares..." % (len(known_verified),)
123         for h in known_verified:
124             if h not in tracker.shares:
125                 ss.forget_verified_share(h)
126                 continue
127             tracker.verified.add(tracker.shares[h])
128         print "    ...done loading %i shares!" % (len(tracker.shares),)
129         print
130         tracker.added.watch(lambda share: ss.add_share(share))
131         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
132         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
133         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
134         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
135         
136         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
137         
138         # information affecting work that should trigger a long-polling update
139         current_work = variable.Variable(None)
140         # information affecting work that should not trigger a long-polling update
141         current_work2 = variable.Variable(None)
142         
143         work_updated = variable.Event()
144         
145         requested = expiring_dict.ExpiringDict(300)
146         
147         @defer.inlineCallbacks
148         def set_real_work1():
149             work = yield getwork(bitcoind, ht, args.net)
150             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
151             current_work.set(dict(
152                 version=work['version'],
153                 previous_block=work['previous_block_hash'],
154                 target=work['target'],
155                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
156                 aux_work=current_work.value['aux_work'] if current_work.value is not None else None,
157             ))
158             current_work2.set(dict(
159                 time=work['time'],
160                 transactions=work['transactions'],
161                 subsidy=work['subsidy'],
162                 clock_offset=time.time() - work['time'],
163                 last_update=time.time(),
164             ))
165             if changed:
166                 set_real_work2()
167         
168         def set_real_work2():
169             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
170             
171             t = dict(current_work.value)
172             t['best_share_hash'] = best
173             current_work.set(t)
174             
175             t = time.time()
176             for peer2, share_hash in desired:
177                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
178                     continue
179                 last_request_time, count = requested.get(share_hash, (None, 0))
180                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
181                     continue
182                 potential_peers = set()
183                 for head in tracker.tails[share_hash]:
184                     potential_peers.update(peer_heads.get(head, set()))
185                 potential_peers = [peer for peer in potential_peers if peer.connected2]
186                 if count == 0 and peer2 is not None and peer2.connected2:
187                     peer = peer2
188                 else:
189                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
190                     if peer is None:
191                         continue
192                 
193                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
194                 peer.send_getshares(
195                     hashes=[share_hash],
196                     parents=2000,
197                     stops=list(set(tracker.heads) | set(
198                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
199                     ))[:100],
200                 )
201                 requested[share_hash] = t, count + 1
202         
203         print 'Initializing work...'
204         yield set_real_work1()
205         set_real_work2()
206         print '    ...success!'
207         print
208         
209         @defer.inlineCallbacks
210         def set_merged_work():
211             if not args.merged_url:
212                 return
213             while True:
214                 merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
215                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
216                 x = dict(current_work.value)
217                 x['aux_work'] = dict(
218                     hash=int(auxblock['hash'], 16),
219                     target=bitcoin.data.HashType().unpack(auxblock['target'].decode('hex')),
220                     chain_id=auxblock['chainid'],
221                 )
222                 #print x['aux_work']
223                 current_work.set(x)
224                 yield deferral.sleep(1)
225         set_merged_work()
226         
227         start_time = time.time() - current_work2.value['clock_offset']
228         
229         # setup p2p logic and join p2pool network
230         
231         def share_share(share, ignore_peer=None):
232             for peer in p2p_node.peers.itervalues():
233                 if peer is ignore_peer:
234                     continue
235                 #if p2pool_init.DEBUG:
236                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
237                 peer.sendShares([share])
238             shared_share_hashes.add(share.hash)
239         
240         def p2p_shares(shares, peer=None):
241             if len(shares) > 5:
242                 print 'Processing %i shares...' % (len(shares),)
243             
244             new_count = 0
245             for share in shares:
246                 if share.hash in tracker.shares:
247                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
248                     continue
249                 
250                 new_count += 1
251                 
252                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
253                 
254                 tracker.add(share)
255             
256             if shares and peer is not None:
257                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
258             
259             if new_count:
260                 set_real_work2()
261             
262             if len(shares) > 5:
263                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*args.net.CHAIN_LENGTH)
264         
265         @tracker.verified.added.watch
266         def _(share):
267             if share.pow_hash <= share.header['target']:
268                 if factory.conn.value is not None:
269                     factory.conn.value.send_block(block=share.as_block(tracker, args.net))
270                 else:
271                     print 'No bitcoind connection! Erp!'
272                 print
273                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.header_hash,)
274                 print
275         
276         def p2p_share_hashes(share_hashes, peer):
277             t = time.time()
278             get_hashes = []
279             for share_hash in share_hashes:
280                 if share_hash in tracker.shares:
281                     continue
282                 last_request_time, count = requested.get(share_hash, (None, 0))
283                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
284                     continue
285                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
286                 get_hashes.append(share_hash)
287                 requested[share_hash] = t, count + 1
288             
289             if share_hashes and peer is not None:
290                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
291             if get_hashes:
292                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
293         
294         def p2p_get_shares(share_hashes, parents, stops, peer):
295             parents = min(parents, 1000//len(share_hashes))
296             stops = set(stops)
297             shares = []
298             for share_hash in share_hashes:
299                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
300                     if share.hash in stops:
301                         break
302                     shares.append(share)
303             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
304             peer.sendShares(shares, full=True)
305         
306         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
307         
308         def parse(x):
309             if ':' in x:
310                 ip, port = x.split(':')
311                 return ip, int(port)
312             else:
313                 return x, args.net.P2P_PORT
314         
315         nodes = set([
316             ('72.14.191.28', args.net.P2P_PORT),
317             ('62.204.197.159', args.net.P2P_PORT),
318             ('142.58.248.28', args.net.P2P_PORT),
319             ('94.23.34.145', args.net.P2P_PORT),
320         ])
321         for host in [
322             'p2pool.forre.st',
323             'dabuttonfactory.com',
324         ]:
325             try:
326                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
327             except:
328                 log.err(None, 'Error resolving bootstrap node IP:')
329
330         if args.net_name == 'litecoin':
331             nodes.add(((yield reactor.resolve('liteco.in')), args.net.P2P_PORT))
332         
333         p2p_node = p2p.Node(
334             current_work=current_work,
335             port=args.p2pool_port,
336             net=args.net,
337             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
338             mode=0 if args.low_bandwidth else 1,
339             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
340         )
341         p2p_node.handle_shares = p2p_shares
342         p2p_node.handle_share_hashes = p2p_share_hashes
343         p2p_node.handle_get_shares = p2p_get_shares
344         
345         p2p_node.start()
346         
347         # send share when the chain changes to their chain
348         def work_changed(new_work):
349             #print 'Work changed:', new_work
350             for share in tracker.get_chain_known(new_work['best_share_hash']):
351                 if share.hash in shared_share_hashes:
352                     break
353                 share_share(share, share.peer)
354         current_work.changed.watch(work_changed)
355         
356         print '    ...success!'
357         print
358         
359         @defer.inlineCallbacks
360         def upnp_thread():
361             while True:
362                 try:
363                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
364                     if is_lan:
365                         pm = yield portmapper.get_port_mapper()
366                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
367                 except defer.TimeoutError:
368                     pass
369                 except:
370                     if p2pool_init.DEBUG:
371                         log.err(None, "UPnP error:")
372                 yield deferral.sleep(random.expovariate(1/120))
373         
374         if args.upnp:
375             upnp_thread()
376         
377         # start listening for workers with a JSON-RPC server
378         
379         print 'Listening for workers on port %i...' % (args.worker_port,)
380         
381         # setup worker logic
382         
383         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
384         run_identifier = struct.pack('<I', random.randrange(2**32))
385         
386         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
387         removed_unstales = set()
388         def get_share_counts(doa=False):
389             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
390             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
391             shares_in_chain = my_shares & matching_in_chain
392             stale_shares = my_shares - matching_in_chain
393             if doa:
394                 stale_doa_shares = stale_shares & doa_shares
395                 stale_not_doa_shares = stale_shares - stale_doa_shares
396                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
397             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
398         @tracker.verified.removed.watch
399         def _(share):
400             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
401                 removed_unstales.add(share.hash)
402         
403         
404         def get_payout_script_from_username(request):
405             user = worker_interface.get_username(request)
406             if user is None:
407                 return None
408             try:
409                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, args.net))
410             except: # XXX blah
411                 return None
412         
413         def compute(request):
414             state = current_work.value
415             payout_script = get_payout_script_from_username(request)
416             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
417                 payout_script = my_script
418             if state['best_share_hash'] is None and args.net.PERSIST:
419                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
420             if len(p2p_node.peers) == 0 and args.net.PERSIST:
421                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
422             if time.time() > current_work2.value['last_update'] + 60:
423                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
424             
425             if state['aux_work'] is not None:
426                 aux_str = '\xfa\xbemm' + bitcoin.data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)
427             else:
428                 aux_str = ''
429             
430             # XXX assuming generate_tx is smallish here..
431             def get_stale_frac():
432                 shares, stale_shares = get_share_counts()
433                 if shares == 0:
434                     return ""
435                 frac = stale_shares/shares
436                 return 2*struct.pack('<H', int(65535*frac + .5))
437             subsidy = current_work2.value['subsidy']
438             
439             
440             if int(time.time() - current_work2.value['clock_offset']) >= p2pool.TRANSITION_TIME:
441                 timestamp = current_work2.value['time']
442                 is_new = True
443                 previous_share = tracker.shares[state['best_share_hash']] if state['best_share_hash'] is not None else None
444                 new_share_info, generate_tx = p2pool.new_generate_transaction(
445                     tracker=tracker,
446                     new_share_data=dict(
447                         previous_share_hash=state['best_share_hash'],
448                         coinbase=aux_str,
449                         nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
450                         new_script=payout_script,
451                         subsidy=subsidy,
452                         donation=math.perfect_round(65535*args.donation_percentage/100),
453                         stale_frac=(lambda shares, stales:
454                             255 if shares == 0 else math.perfect_round(254*stales/shares)
455                         )(*get_share_counts()),
456                     ),
457                     block_target=state['target'],
458                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
459                     net=args.net,
460                 )
461             else:
462                 timestamp = int(time.time() - current_work2.value['clock_offset'])
463                 if state['best_share_hash'] is not None:
464                     timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
465                     if timestamp2 > timestamp:
466                         print 'Toff', timestamp2 - timestamp
467                         timestamp = timestamp2
468                 is_new = False
469                 share_info, generate_tx = p2pool.generate_transaction(
470                     tracker=tracker,
471                     previous_share_hash=state['best_share_hash'],
472                     new_script=payout_script,
473                     subsidy=subsidy,
474                     nonce=run_identifier + struct.pack('<H', random.randrange(2**16)) + aux_str + get_stale_frac(),
475                     block_target=state['target'],
476                     net=args.net,
477                 )
478             
479             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (bitcoin.data.target_to_difficulty((new_share_info if is_new else share_info['share_data'])['target']), (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) -subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, subsidy*1e-8, args.net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
480             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
481             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
482             transactions = [generate_tx] + list(current_work2.value['transactions'])
483             merkle_root = bitcoin.data.merkle_hash(transactions)
484             merkle_root_to_transactions[merkle_root] = is_new, new_share_info if is_new else share_info, transactions
485             
486             target2 = (new_share_info if is_new else share_info['share_data'])['target']
487             times[merkle_root] = time.time()
488             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
489             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2), state['best_share_hash']
490         
491         my_shares = set()
492         doa_shares = set()
493         times = {}
494         
495         def got_response(header, request):
496             try:
497                 user = worker_interface.get_username(request)
498                 # match up with transactions
499                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
500                 if xxx is None:
501                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
502                     return False
503                 is_new, share_info, transactions = xxx
504                 new_share_info = share_info
505                 
506                 hash_ = bitcoin.data.block_header_type.hash256(header)
507                 
508                 pow_hash = args.net.BITCOIN_POW_FUNC(header)
509                 
510                 if pow_hash <= header['target'] or p2pool_init.DEBUG:
511                     if factory.conn.value is not None:
512                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
513                     else:
514                         print 'No bitcoind connection! Erp!'
515                     if pow_hash <= header['target']:
516                         print
517                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
518                         print
519                 
520                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
521                     try:
522                         aux_pow = dict(
523                             merkle_tx=dict(
524                                 tx=transactions[0],
525                                 block_hash=hash_,
526                                 merkle_branch=[x['hash'] for x in p2pool.calculate_merkle_branch(transactions, 0)],
527                                 index=0,
528                             ),
529                             merkle_branch=[],
530                             index=0,
531                             parent_block_header=header,
532                         )
533                         
534                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin.data.aux_pow_type.pack(aux_pow).encode('hex')
535                         #print a, b
536                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
537                         def _(res):
538                             print "MERGED RESULT:", res
539                         merged.rpc_getauxblock(a, b).addBoth(_)
540                     except:
541                         log.err(None, 'Error while processing merged mining POW:')
542                 
543                 target = (new_share_info if is_new else share_info['share_data'])['target']
544                 if pow_hash > target:
545                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, target)
546                     return False
547                 if is_new:
548                     share = p2pool.NewShare(args.net, header, new_share_info, other_txs=transactions[1:])
549                 else:
550                     share = p2pool.Share(args.net, header, share_info, other_txs=transactions[1:])
551                 my_shares.add(share.hash)
552                 if share.previous_hash != current_work.value['best_share_hash']:
553                     doa_shares.add(share.hash)
554                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[header['merkle_root']]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
555                 good = share.previous_hash == current_work.value['best_share_hash']
556                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
557                 p2p_shares([share])
558                 # eg. good = share.hash == current_work.value['best_share_hash'] here
559                 return good
560             except:
561                 log.err(None, 'Error processing data received from worker:')
562                 return False
563         
564         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
565         
566         def get_rate():
567             if current_work.value['best_share_hash'] is not None:
568                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
569                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
570                 fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
571                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
572             return json.dumps(None)
573         
574         def get_users():
575             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
576             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
577             res = {}
578             for script in sorted(weights, key=lambda s: weights[s]):
579                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
580             return json.dumps(res)
581         
582         class WebInterface(resource.Resource):
583             def __init__(self, func, mime_type):
584                 self.func, self.mime_type = func, mime_type
585             
586             def render_GET(self, request):
587                 request.setHeader('Content-Type', self.mime_type)
588                 return self.func()
589         
590         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
591         web_root.putChild('users', WebInterface(get_users, 'application/json'))
592         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
593         if draw is not None:
594             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
595         
596         reactor.listenTCP(args.worker_port, server.Site(web_root))
597         
598         print '    ...success!'
599         print
600         
601         # done!
602         
603         # do new getwork when a block is heard on the p2p interface
604         
605         def new_block(block_hash):
606             work_updated.happened()
607         factory.new_block.watch(new_block)
608         
609         print 'Started successfully!'
610         print
611         
612         ht.updated.watch(set_real_work2)
613         
614         @defer.inlineCallbacks
615         def work1_thread():
616             while True:
617                 flag = work_updated.get_deferred()
618                 try:
619                     yield set_real_work1()
620                 except:
621                     log.err()
622                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
623         
624         @defer.inlineCallbacks
625         def work2_thread():
626             while True:
627                 try:
628                     set_real_work2()
629                 except:
630                     log.err()
631                 yield deferral.sleep(random.expovariate(1/20))
632         
633         work1_thread()
634         work2_thread()
635         
636         
637         if hasattr(signal, 'SIGALRM'):
638             def watchdog_handler(signum, frame):
639                 print 'Watchdog timer went off at:'
640                 traceback.print_stack()
641             
642             signal.signal(signal.SIGALRM, watchdog_handler)
643             task.LoopingCall(signal.alarm, 30).start(1)
644         
645         
646         pool_str = None;
647         while True:
648             yield deferral.sleep(3)
649             try:
650                 if time.time() > current_work2.value['last_update'] + 60:
651                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
652                 if current_work.value['best_share_hash'] is not None:
653                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
654                     if height > 2:
655                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
656                         weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
657                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
658                         stale_shares = stale_doa_shares + stale_not_doa_shares
659                         fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
660                         str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
661                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
662                             height,
663                             len(tracker.verified.shares),
664                             len(tracker.shares),
665                             weights.get(my_script, 0)/total_weight*100,
666                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
667                             shares,
668                             stale_not_doa_shares,
669                             stale_doa_shares,
670                             len(p2p_node.peers),
671                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
672                         if (str != pool_str):
673                             print str;
674                             pool_str = str;
675                         if fracs:
676                             med = math.median(fracs)
677                             print 'Median stale proportion:', med
678                             if shares:
679                                 print '    Own:', stale_shares/shares
680                                 if med < .99:
681                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
682             
683             
684             except:
685                 log.err()
686     except:
687         log.err(None, 'Fatal error:')
688     finally:
689         reactor.stop()
690
691 def run():
692     class FixedArgumentParser(argparse.ArgumentParser):
693         def _read_args_from_files(self, arg_strings):
694             # expand arguments referencing files
695             new_arg_strings = []
696             for arg_string in arg_strings:
697                 
698                 # for regular arguments, just add them back into the list
699                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
700                     new_arg_strings.append(arg_string)
701                 
702                 # replace arguments referencing files with the file content
703                 else:
704                     try:
705                         args_file = open(arg_string[1:])
706                         try:
707                             arg_strings = []
708                             for arg_line in args_file.read().splitlines():
709                                 for arg in self.convert_arg_line_to_args(arg_line):
710                                     arg_strings.append(arg)
711                             arg_strings = self._read_args_from_files(arg_strings)
712                             new_arg_strings.extend(arg_strings)
713                         finally:
714                             args_file.close()
715                     except IOError:
716                         err = sys.exc_info()[1]
717                         self.error(str(err))
718             
719             # return the modified argument list
720             return new_arg_strings
721         
722         def convert_arg_line_to_args(self, arg_line):
723             return [arg for arg in arg_line.split() if arg.strip()]
724     
725     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
726     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
727     parser.add_argument('--net',
728         help='use specified network (default: bitcoin)',
729         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
730     parser.add_argument('--testnet',
731         help='''use the network's testnet''',
732         action='store_const', const=True, default=False, dest='testnet')
733     parser.add_argument('--debug',
734         help='debugging mode',
735         action='store_const', const=True, default=False, dest='debug')
736     parser.add_argument('-a', '--address',
737         help='generate to this address (defaults to requesting one from bitcoind)',
738         type=str, action='store', default=None, dest='address')
739     parser.add_argument('--logfile',
740         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
741         type=str, action='store', default=None, dest='logfile')
742     parser.add_argument('--merged-url',
743         help='call getauxblock on this url to get work for merged mining',
744         type=str, action='store', default=None, dest='merged_url')
745     parser.add_argument('--merged-userpass',
746         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
747         type=str, action='store', default=None, dest='merged_userpass')
748     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
749         help='percentage amount to donate to author of p2pool. Default: 0.5',
750         type=float, action='store', default=0.5, dest='donation_percentage')
751     
752     p2pool_group = parser.add_argument_group('p2pool interface')
753     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
754         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
755         type=int, action='store', default=None, dest='p2pool_port')
756     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
757         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
758         type=str, action='append', default=[], dest='p2pool_nodes')
759     parser.add_argument('-l', '--low-bandwidth',
760         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
761         action='store_true', default=False, dest='low_bandwidth')
762     parser.add_argument('--disable-upnp',
763         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
764         action='store_false', default=True, dest='upnp')
765     
766     worker_group = parser.add_argument_group('worker interface')
767     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
768         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329 solidcoin: 9328 litecoin: 9327, +10000 for testnets)',
769         type=int, action='store', default=None, dest='worker_port')
770     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
771         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:9332/fee . default: 0''',
772         type=float, action='store', default=0, dest='worker_fee')
773     
774     bitcoind_group = parser.add_argument_group('bitcoind interface')
775     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
776         help='connect to a bitcoind at this address (default: 127.0.0.1)',
777         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
778     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
779         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332 ixcoin: 8338 i0coin: 7332 litecoin: 9332)',
780         type=int, action='store', default=None, dest='bitcoind_rpc_port')
781     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
782         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 namecoin: 8334 ixcoin: 8337 i0coin: 7333 solidcoin: 7555 litecoin: 9333, +10000 for testnets)',
783         type=int, action='store', default=None, dest='bitcoind_p2p_port')
784     
785     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
786         help='bitcoind RPC interface username (default: empty)',
787         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
788     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
789         help='bitcoind RPC interface password',
790         type=str, action='store', dest='bitcoind_rpc_password')
791     
792     args = parser.parse_args()
793     
794     if args.debug:
795         p2pool_init.DEBUG = True
796     
797     if args.logfile is None:
798         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
799     
800     class LogFile(object):
801         def __init__(self, filename):
802             self.filename = filename
803             self.inner_file = None
804             self.reopen()
805         def reopen(self):
806             if self.inner_file is not None:
807                 self.inner_file.close()
808             open(self.filename, 'a').close()
809             f = open(self.filename, 'rb')
810             f.seek(0, os.SEEK_END)
811             length = f.tell()
812             if length > 100*1000*1000:
813                 f.seek(-1000*1000, os.SEEK_END)
814                 while True:
815                     if f.read(1) in ('', '\n'):
816                         break
817                 data = f.read()
818                 f.close()
819                 f = open(self.filename, 'wb')
820                 f.write(data)
821             f.close()
822             self.inner_file = open(self.filename, 'a')
823         def write(self, data):
824             self.inner_file.write(data)
825         def flush(self):
826             self.inner_file.flush()
827     class TeePipe(object):
828         def __init__(self, outputs):
829             self.outputs = outputs
830         def write(self, data):
831             for output in self.outputs:
832                 output.write(data)
833         def flush(self):
834             for output in self.outputs:
835                 output.flush()
836     class TimestampingPipe(object):
837         def __init__(self, inner_file):
838             self.inner_file = inner_file
839             self.buf = ''
840             self.softspace = 0
841         def write(self, data):
842             buf = self.buf + data
843             lines = buf.split('\n')
844             for line in lines[:-1]:
845                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
846                 self.inner_file.flush()
847             self.buf = lines[-1]
848         def flush(self):
849             pass
850     logfile = LogFile(args.logfile)
851     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
852     if hasattr(signal, "SIGUSR1"):
853         def sigusr1(signum, frame):
854             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
855             logfile.reopen()
856             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
857         signal.signal(signal.SIGUSR1, sigusr1)
858     task.LoopingCall(logfile.reopen).start(5)
859     
860     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
861     
862     if args.bitcoind_rpc_port is None:
863         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
864     
865     if args.bitcoind_p2p_port is None:
866         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
867     
868     if args.p2pool_port is None:
869         args.p2pool_port = args.net.P2P_PORT
870     
871     if args.worker_port is None:
872         args.worker_port = args.net.WORKER_PORT
873     
874     if args.address is not None:
875         try:
876             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
877         except Exception, e:
878             parser.error('error parsing address: ' + repr(e))
879     else:
880         args.pubkey_hash = None
881     
882     if (args.merged_url is None) ^ (args.merged_userpass is None):
883         parser.error('must specify --merged-url and --merged-userpass')
884     
885     reactor.callWhenRunning(main, args)
886     reactor.run()