don't try to submit merged mining solutions unless merged mining is enabled
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p, bitcoin.getwork, bitcoin.data
24 from util import db, expiring_dict, jsonrpc, variable, deferral, math
25 from . import p2p, worker_interface, skiplists
26 import p2pool.data as p2pool
27 import p2pool as p2pool_init
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind, ht, net):
32     try:
33         work = yield bitcoind.rpc_getmemorypool()
34         defer.returnValue(dict(
35             version=work['version'],
36             previous_block_hash=int(work['previousblockhash'], 16),
37             transactions=[bitcoin.data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
38             subsidy=work['coinbasevalue'],
39             time=work['time'],
40             target=bitcoin.data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin.data.FloatingInteger(work['bits']),
41         ))
42     except jsonrpc.Error, e:
43         if e.code != -32601:
44             raise
45         
46         print "---> Update your bitcoind to support the 'getmemorypool' RPC call. Not including transactions in generated blocks! <---"
47         work = bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork()))
48         try:
49             subsidy = net.BITCOIN_SUBSIDY_FUNC(ht.getHeight(work.previous_block))
50         except ValueError:
51             subsidy = net.BITCOIN_SUBSIDY_FUNC(1000)
52         
53         defer.returnValue(dict(
54             version=work.version,
55             previous_block_hash=work.previous_block,
56             transactions=[],
57             subsidy=subsidy,
58             time=work.timestamp,
59             target=work.block_target,
60         ))
61
62 @deferral.retry('Error getting payout script from bitcoind:', 1)
63 @defer.inlineCallbacks
64 def get_payout_script(factory):
65     res = yield (yield factory.getProtocol()).check_order(order=bitcoin.p2p.Protocol.null_order)
66     if res['reply'] == 'success':
67         defer.returnValue(res['script'])
68     elif res['reply'] == 'denied':
69         defer.returnValue(None)
70     else:
71         raise ValueError('Unexpected reply: %r' % (res,))
72
73 @deferral.retry('Error creating payout script:', 10)
74 @defer.inlineCallbacks
75 def get_payout_script2(bitcoind, net):
76     defer.returnValue(bitcoin.data.pubkey_hash_to_script2(bitcoin.data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net)))
77
78 @defer.inlineCallbacks
79 def main(args):
80     try:
81         print 'p2pool (version %s)' % (p2pool_init.__version__,)
82         print
83         try:
84             from . import draw
85         except ImportError:
86             draw = None
87             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
88             print
89         
90         # connect to bitcoind over JSON-RPC and do initial getwork
91         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
92         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
93         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
94         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(args.net.BITCOIN_RPC_CHECK)(bitcoind)
95         if not good:
96             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
97             return
98         temp_work = yield deferral.retry('Error while testing getwork:', 1)(defer.inlineCallbacks(lambda: defer.returnValue(bitcoin.getwork.BlockAttempt.from_getwork((yield bitcoind.rpc_getwork())))))()
99         print '    ...success!'
100         print '    Current block hash: %x' % (temp_work.previous_block,)
101         print
102         
103         # connect to bitcoind over bitcoin-p2p and do checkorder to get pubkey to send payouts to
104         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
105         factory = bitcoin.p2p.ClientFactory(args.net)
106         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
107         my_script = yield get_payout_script(factory)
108         if args.pubkey_hash is None:
109             if my_script is None:
110                 print '    IP transaction denied ... falling back to sending to address.'
111                 my_script = yield get_payout_script2(bitcoind, args.net)
112         else:
113             my_script = bitcoin.data.pubkey_hash_to_script2(args.pubkey_hash)
114         print '    ...success!'
115         print '    Payout script:', bitcoin.data.script2_to_human(my_script, args.net)
116         print
117         
118         print 'Loading cached block headers...'
119         ht = bitcoin.p2p.HeightTracker(factory, args.net.NAME + '_headers.dat')
120         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
121         print
122         
123         tracker = p2pool.OkayTracker(args.net)
124         ss = p2pool.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), args.net.NAME + '_shares.'), args.net)
125         known_verified = set()
126         print "Loading shares..."
127         for i, (mode, contents) in enumerate(ss.get_shares()):
128             if mode == 'share':
129                 if contents.hash in tracker.shares:
130                     continue
131                 contents.shared = True
132                 contents.stored = True
133                 contents.time_seen = 0
134                 tracker.add(contents)
135                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
136                     print "    %i" % (len(tracker.shares),)
137             elif mode == 'verified_hash':
138                 known_verified.add(contents)
139             else:
140                 raise AssertionError()
141         print "    ...inserting %i verified shares..." % (len(known_verified),)
142         for h in known_verified:
143             if h not in tracker.shares:
144                 ss.forget_verified_share(h)
145                 continue
146             tracker.verified.add(tracker.shares[h])
147         print "    ...done loading %i shares!" % (len(tracker.shares),)
148         print
149         tracker.added.watch(lambda share: ss.add_share(share))
150         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
151         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
152         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
153         
154         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
155         
156         # information affecting work that should trigger a long-polling update
157         current_work = variable.Variable(None)
158         # information affecting work that should not trigger a long-polling update
159         current_work2 = variable.Variable(None)
160         
161         work_updated = variable.Event()
162         
163         requested = expiring_dict.ExpiringDict(300)
164         
165         @defer.inlineCallbacks
166         def set_real_work1():
167             work = yield getwork(bitcoind, ht, args.net)
168             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
169             current_work.set(dict(
170                 version=work['version'],
171                 previous_block=work['previous_block_hash'],
172                 target=work['target'],
173                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
174                 aux_work=current_work.value['aux_work'] if current_work.value is not None else None,
175             ))
176             current_work2.set(dict(
177                 transactions=work['transactions'],
178                 subsidy=work['subsidy'],
179                 clock_offset=time.time() - work['time'],
180                 last_update=time.time(),
181             ))
182             if changed:
183                 set_real_work2()
184         
185         def set_real_work2():
186             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
187             
188             t = dict(current_work.value)
189             t['best_share_hash'] = best
190             current_work.set(t)
191             
192             t = time.time()
193             for peer2, share_hash in desired:
194                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
195                     continue
196                 last_request_time, count = requested.get(share_hash, (None, 0))
197                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
198                     continue
199                 potential_peers = set()
200                 for head in tracker.tails[share_hash]:
201                     potential_peers.update(peer_heads.get(head, set()))
202                 potential_peers = [peer for peer in potential_peers if peer.connected2]
203                 if count == 0 and peer2 is not None and peer2.connected2:
204                     peer = peer2
205                 else:
206                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
207                     if peer is None:
208                         continue
209                 
210                 print 'Requesting parent share %s from %s' % (p2pool.format_hash(share_hash), '%s:%i' % peer.addr)
211                 peer.send_getshares(
212                     hashes=[share_hash],
213                     parents=2000,
214                     stops=list(set(tracker.heads) | set(
215                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
216                     ))[:100],
217                 )
218                 requested[share_hash] = t, count + 1
219         
220         print 'Initializing work...'
221         yield set_real_work1()
222         set_real_work2()
223         print '    ...success!'
224         print
225         
226         @defer.inlineCallbacks
227         def set_merged_work():
228             if not args.merged_url:
229                 return
230             while True:
231                 merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
232                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
233                 x = dict(current_work.value)
234                 x['aux_work'] = dict(
235                     hash=int(auxblock['hash'], 16),
236                     target=bitcoin.data.HashType().unpack(auxblock['target'].decode('hex')),
237                     chain_id=auxblock['chainid'],
238                 )
239                 #print x['aux_work']
240                 current_work.set(x)
241                 yield deferral.sleep(1)
242         set_merged_work()
243         
244         start_time = time.time() - current_work2.value['clock_offset']
245         
246         # setup p2p logic and join p2pool network
247         
248         def share_share(share, ignore_peer=None):
249             for peer in p2p_node.peers.itervalues():
250                 if peer is ignore_peer:
251                     continue
252                 #if p2pool_init.DEBUG:
253                 #    print "Sending share %s to %r" % (p2pool.format_hash(share.hash), peer.addr)
254                 peer.send_shares([share])
255             share.flag_shared()
256         
257         def p2p_shares(shares, peer=None):
258             if len(shares) > 5:
259                 print 'Processing %i shares...' % (len(shares),)
260             
261             new_count = 0
262             for share in shares:
263                 if share.hash in tracker.shares:
264                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool.format_hash(share.hash),)
265                     continue
266                 
267                 new_count += 1
268                 
269                 #print 'Received share %s from %r' % (p2pool.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
270                 
271                 tracker.add(share)
272             
273             if shares and peer is not None:
274                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
275             
276             if new_count:
277                 set_real_work2()
278             
279             if len(shares) > 5:
280                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*args.net.CHAIN_LENGTH)
281         
282         @tracker.verified.added.watch
283         def _(share):
284             if share.bitcoin_hash <= share.header['target']:
285                 print
286                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool.format_hash(share.hash), share.bitcoin_hash,)
287                 print
288                 if factory.conn.value is not None:
289                     factory.conn.value.send_block(block=share.as_block(tracker, args.net))
290                 else:
291                     print 'No bitcoind connection! Erp!'
292         
293         def p2p_share_hashes(share_hashes, peer):
294             t = time.time()
295             get_hashes = []
296             for share_hash in share_hashes:
297                 if share_hash in tracker.shares:
298                     continue
299                 last_request_time, count = requested.get(share_hash, (None, 0))
300                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
301                     continue
302                 print 'Got share hash, requesting! Hash: %s' % (p2pool.format_hash(share_hash),)
303                 get_hashes.append(share_hash)
304                 requested[share_hash] = t, count + 1
305             
306             if share_hashes and peer is not None:
307                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
308             if get_hashes:
309                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
310         
311         def p2p_get_shares(share_hashes, parents, stops, peer):
312             parents = min(parents, 1000//len(share_hashes))
313             stops = set(stops)
314             shares = []
315             for share_hash in share_hashes:
316                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
317                     if share.hash in stops:
318                         break
319                     shares.append(share)
320             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
321             peer.send_shares(shares, full=True)
322         
323         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
324         
325         def parse(x):
326             if ':' in x:
327                 ip, port = x.split(':')
328                 return ip, int(port)
329             else:
330                 return x, args.net.P2P_PORT
331         
332         nodes = set([
333             ('72.14.191.28', args.net.P2P_PORT),
334             ('62.204.197.159', args.net.P2P_PORT),
335             ('142.58.248.28', args.net.P2P_PORT),
336             ('94.23.34.145', args.net.P2P_PORT),
337         ])
338         for host in [
339             'p2pool.forre.st',
340             'dabuttonfactory.com',
341         ]:
342             try:
343                 nodes.add(((yield reactor.resolve(host)), args.net.P2P_PORT))
344             except:
345                 log.err(None, 'Error resolving bootstrap node IP:')
346         
347         p2p_node = p2p.Node(
348             current_work=current_work,
349             port=args.p2pool_port,
350             net=args.net,
351             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), args.net.NAME),
352             mode=0 if args.low_bandwidth else 1,
353             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
354         )
355         p2p_node.handle_shares = p2p_shares
356         p2p_node.handle_share_hashes = p2p_share_hashes
357         p2p_node.handle_get_shares = p2p_get_shares
358         
359         p2p_node.start()
360         
361         # send share when the chain changes to their chain
362         def work_changed(new_work):
363             #print 'Work changed:', new_work
364             for share in tracker.get_chain_known(new_work['best_share_hash']):
365                 if share.shared:
366                     break
367                 share_share(share, share.peer)
368         current_work.changed.watch(work_changed)
369         
370         print '    ...success!'
371         print
372         
373         @defer.inlineCallbacks
374         def upnp_thread():
375             while True:
376                 try:
377                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
378                     if is_lan:
379                         pm = yield portmapper.get_port_mapper()
380                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
381                 except defer.TimeoutError:
382                     pass
383                 except:
384                     if p2pool_init.DEBUG:
385                         log.err(None, "UPnP error:")
386                 yield deferral.sleep(random.expovariate(1/120))
387         
388         if args.upnp:
389             upnp_thread()
390         
391         # start listening for workers with a JSON-RPC server
392         
393         print 'Listening for workers on port %i...' % (args.worker_port,)
394         
395         # setup worker logic
396         
397         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
398         run_identifier = struct.pack('<I', random.randrange(2**32))
399         
400         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
401         removed_unstales = set()
402         def get_share_counts(doa=False):
403             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
404             matching_in_chain = share_counter(current_work.value['best_share_hash'], max(0, height - 1)) | removed_unstales
405             shares_in_chain = my_shares & matching_in_chain
406             stale_shares = my_shares - matching_in_chain
407             if doa:
408                 stale_doa_shares = stale_shares & doa_shares
409                 stale_not_doa_shares = stale_shares - stale_doa_shares
410                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
411             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
412         @tracker.verified.removed.watch
413         def _(share):
414             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
415                 removed_unstales.add(share.hash)
416         
417         def compute(state, payout_script):
418             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
419                 payout_script = my_script
420             if state['best_share_hash'] is None and args.net.PERSIST:
421                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
422             if len(p2p_node.peers) == 0 and args.net.PERSIST:
423                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
424             if time.time() > current_work2.value['last_update'] + 60:
425                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
426             
427             if state['aux_work'] is not None:
428                 aux_str = '\xfa\xbemm' + bitcoin.data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)
429             else:
430                 aux_str = ''
431             
432             # XXX assuming generate_tx is smallish here..
433             def get_stale_frac():
434                 shares, stale_shares = get_share_counts()
435                 if shares == 0:
436                     return ""
437                 frac = stale_shares/shares
438                 return 2*struct.pack('<H', int(65535*frac + .5))
439             subsidy = current_work2.value['subsidy']
440             generate_tx = p2pool.generate_transaction(
441                 tracker=tracker,
442                 previous_share_hash=state['best_share_hash'],
443                 new_script=payout_script,
444                 subsidy=subsidy,
445                 nonce=run_identifier + struct.pack('<H', random.randrange(2**16)) + get_stale_frac() + aux_str,
446                 block_target=state['target'],
447                 net=args.net,
448             )
449             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (0xffff*2**208/p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'], (generate_tx['tx_outs'][-1]['value']-subsidy//200)*1e-8, args.net.BITCOIN_SYMBOL, subsidy*1e-8, args.net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
450             #print 'Target: %x' % (p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
451             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
452             transactions = [generate_tx] + list(current_work2.value['transactions'])
453             merkle_root = bitcoin.data.merkle_hash(transactions)
454             merkle_root_to_transactions[merkle_root] = transactions # will stay for 1000 seconds
455             
456             timestamp = int(time.time() - current_work2.value['clock_offset'])
457             if state['best_share_hash'] is not None:
458                 timestamp2 = math.median((s.timestamp for s in itertools.islice(tracker.get_chain_to_root(state['best_share_hash']), 11)), use_float=False) + 1
459                 if timestamp2 > timestamp:
460                     print 'Toff', timestamp2 - timestamp
461                     timestamp = timestamp2
462             target2 = p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
463             times[p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['nonce']] = time.time()
464             #print 'SENT', 2**256//p2pool.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
465             return bitcoin.getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2)
466         
467         my_shares = set()
468         doa_shares = set()
469         times = {}
470         
471         def got_response(data, user):
472             try:
473                 # match up with transactions
474                 header = bitcoin.getwork.decode_data(data)
475                 transactions = merkle_root_to_transactions.get(header['merkle_root'], None)
476                 if transactions is None:
477                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
478                     return False
479                 block = dict(header=header, txs=transactions)
480                 hash_ = bitcoin.data.block_header_type.hash256(block['header'])
481                 if hash_ <= block['header']['target'] or p2pool_init.DEBUG:
482                     if factory.conn.value is not None:
483                         factory.conn.value.send_block(block=block)
484                     else:
485                         print 'No bitcoind connection! Erp!'
486                     if hash_ <= block['header']['target']:
487                         print
488                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
489                         print
490                 
491                 if args.merged_url is not None:
492                     try:
493                         aux_pow = dict(
494                             merkle_tx=dict(
495                                 tx=transactions[0],
496                                 block_hash=hash_,
497                                 merkle_branch=[x['hash'] for x in p2pool.calculate_merkle_branch(transactions, 0)],
498                                 index=0,
499                             ),
500                             merkle_branch=[],
501                             index=0,
502                             parent_block_header=header,
503                         )
504                         
505                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin.data.aux_pow_type.pack(aux_pow).encode('hex')
506                         #print a, b
507                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
508                         def _(res):
509                             print "MERGED RESULT:", res
510                         merged.rpc_getauxblock(a, b).addBoth(_)
511                     except:
512                         log.err(None, 'Error while processing merged mining POW:')
513                 
514                 target = p2pool.coinbase_type.unpack(transactions[0]['tx_ins'][0]['script'])['share_data']['target']
515                 if hash_ > target:
516                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (hash_, target)
517                     return False
518                 share = p2pool.Share.from_block(block)
519                 my_shares.add(share.hash)
520                 if share.previous_hash != current_work.value['best_share_hash']:
521                     doa_shares.add(share.hash)
522                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool.format_hash(share.hash), p2pool.format_hash(share.previous_hash), time.time() - times[share.nonce]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
523                 good = share.previous_hash == current_work.value['best_share_hash']
524                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
525                 p2p_shares([share])
526                 # eg. good = share.hash == current_work.value['best_share_hash'] here
527                 return good
528             except:
529                 log.err(None, 'Error processing data received from worker:')
530                 return False
531         
532         web_root = worker_interface.WorkerInterface(current_work, compute, got_response, args.net)
533         
534         def get_rate():
535             if current_work.value['best_share_hash'] is not None:
536                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
537                 att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
538                 fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
539                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
540             return json.dumps(None)
541         
542         def get_users():
543             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
544             weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**256)
545             res = {}
546             for script in sorted(weights, key=lambda s: weights[s]):
547                 res[bitcoin.data.script2_to_human(script, args.net)] = weights[script]/total_weight
548             return json.dumps(res)
549         
550         class WebInterface(resource.Resource):
551             def __init__(self, func, mime_type):
552                 self.func, self.mime_type = func, mime_type
553             
554             def render_GET(self, request):
555                 request.setHeader('Content-Type', self.mime_type)
556                 return self.func()
557         
558         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
559         web_root.putChild('users', WebInterface(get_users, 'application/json'))
560         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
561         if draw is not None:
562             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
563         
564         reactor.listenTCP(args.worker_port, server.Site(web_root))
565         
566         print '    ...success!'
567         print
568         
569         # done!
570         
571         # do new getwork when a block is heard on the p2p interface
572         
573         def new_block(block_hash):
574             work_updated.happened()
575         factory.new_block.watch(new_block)
576         
577         print 'Started successfully!'
578         print
579         
580         ht.updated.watch(set_real_work2)
581         
582         @defer.inlineCallbacks
583         def work1_thread():
584             while True:
585                 flag = work_updated.get_deferred()
586                 try:
587                     yield set_real_work1()
588                 except:
589                     log.err()
590                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
591         
592         @defer.inlineCallbacks
593         def work2_thread():
594             while True:
595                 try:
596                     set_real_work2()
597                 except:
598                     log.err()
599                 yield deferral.sleep(random.expovariate(1/20))
600         
601         work1_thread()
602         work2_thread()
603         
604         
605         if hasattr(signal, 'SIGALRM'):
606             def watchdog_handler(signum, frame):
607                 print 'Watchdog timer went off at:'
608                 traceback.print_stack()
609             
610             signal.signal(signal.SIGALRM, watchdog_handler)
611             task.LoopingCall(signal.alarm, 30).start(1)
612         
613         
614         def read_stale_frac(share):
615             if len(share.nonce) != 20:
616                 return None
617             a, b = struct.unpack("<HH", share.nonce[-4:])
618             if a != b:
619                 return None
620             return a/65535
621         
622         while True:
623             yield deferral.sleep(3)
624             try:
625                 if time.time() > current_work2.value['last_update'] + 60:
626                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
627                 if current_work.value['best_share_hash'] is not None:
628                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
629                     if height > 2:
630                         att_s = p2pool.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], args.net, min(height - 1, 720))
631                         weights, total_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 2**100)
632                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
633                         stale_shares = stale_doa_shares + stale_not_doa_shares
634                         fracs = [read_stale_frac(share) for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if read_stale_frac(share) is not None]
635                         print 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
636                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
637                             height,
638                             len(tracker.verified.shares),
639                             len(tracker.shares),
640                             weights.get(my_script, 0)/total_weight*100,
641                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
642                             shares,
643                             stale_not_doa_shares,
644                             stale_doa_shares,
645                             len(p2p_node.peers),
646                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool_init.DEBUG else '')
647                         if fracs:
648                             med = math.median(fracs)
649                             print 'Median stale proportion:', med
650                             if shares:
651                                 print '    Own:', stale_shares/shares
652                                 if med < .99:
653                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
654             
655             
656             except:
657                 log.err()
658     except:
659         log.err(None, 'Fatal error:')
660     finally:
661         reactor.stop()
662
663 def run():
664     class FixedArgumentParser(argparse.ArgumentParser):
665         def _read_args_from_files(self, arg_strings):
666             # expand arguments referencing files
667             new_arg_strings = []
668             for arg_string in arg_strings:
669                 
670                 # for regular arguments, just add them back into the list
671                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
672                     new_arg_strings.append(arg_string)
673                 
674                 # replace arguments referencing files with the file content
675                 else:
676                     try:
677                         args_file = open(arg_string[1:])
678                         try:
679                             arg_strings = []
680                             for arg_line in args_file.read().splitlines():
681                                 for arg in self.convert_arg_line_to_args(arg_line):
682                                     arg_strings.append(arg)
683                             arg_strings = self._read_args_from_files(arg_strings)
684                             new_arg_strings.extend(arg_strings)
685                         finally:
686                             args_file.close()
687                     except IOError:
688                         err = sys.exc_info()[1]
689                         self.error(str(err))
690             
691             # return the modified argument list
692             return new_arg_strings
693         
694         def convert_arg_line_to_args(self, arg_line):
695             return [arg for arg in arg_line.split() if arg.strip()]
696     
697     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool_init.__version__,), fromfile_prefix_chars='@')
698     parser.add_argument('--version', action='version', version=p2pool_init.__version__)
699     parser.add_argument('--net',
700         help='use specified network (default: bitcoin)',
701         action='store', choices=sorted(x for x in p2pool.nets if 'testnet' not in x), default='bitcoin', dest='net_name')
702     parser.add_argument('--testnet',
703         help='''use the network's testnet''',
704         action='store_const', const=True, default=False, dest='testnet')
705     parser.add_argument('--debug',
706         help='debugging mode',
707         action='store_const', const=True, default=False, dest='debug')
708     parser.add_argument('-a', '--address',
709         help='generate to this address (defaults to requesting one from bitcoind)',
710         type=str, action='store', default=None, dest='address')
711     parser.add_argument('--logfile',
712         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
713         type=str, action='store', default=None, dest='logfile')
714     parser.add_argument('--merged-url',
715         help='call getauxblock on this url to get work for merged mining',
716         type=str, action='store', default=None, dest='merged_url')
717     parser.add_argument('--merged-userpass',
718         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
719         type=str, action='store', default=None, dest='merged_userpass')
720     
721     p2pool_group = parser.add_argument_group('p2pool interface')
722     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
723         help='use TCP port PORT to listen for connections (default: 9333 normally, 19333 for testnet) (forward this port from your router!)',
724         type=int, action='store', default=None, dest='p2pool_port')
725     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
726         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to 9333 normally, 19333 for testnet), in addition to builtin addresses',
727         type=str, action='append', default=[], dest='p2pool_nodes')
728     parser.add_argument('-l', '--low-bandwidth',
729         help='trade lower bandwidth usage for higher latency (reduced efficiency)',
730         action='store_true', default=False, dest='low_bandwidth')
731     parser.add_argument('--disable-upnp',
732         help='''don't attempt to forward port 9333 (19333 for testnet) from the WAN to this computer using UPnP''',
733         action='store_false', default=True, dest='upnp')
734     
735     worker_group = parser.add_argument_group('worker interface')
736     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
737         help='listen on PORT for RPC connections from miners asking for work and providing responses (default: bitcoin: 9332 namecoin: 9331 ixcoin: 9330 i0coin: 9329, +10000 for testnets)',
738         type=int, action='store', default=None, dest='worker_port')
739     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
740         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:9332/fee . default: 0''',
741         type=float, action='store', default=0, dest='worker_fee')
742     
743     bitcoind_group = parser.add_argument_group('bitcoind interface')
744     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
745         help='connect to a bitcoind at this address (default: 127.0.0.1)',
746         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
747     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
748         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getwork (default: 8332, 8338 for ixcoin)',
749         type=int, action='store', default=None, dest='bitcoind_rpc_port')
750     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
751         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: 8333 normally. 18333 for testnet)',
752         type=int, action='store', default=None, dest='bitcoind_p2p_port')
753     
754     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
755         help='bitcoind RPC interface username (default: empty)',
756         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
757     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
758         help='bitcoind RPC interface password',
759         type=str, action='store', dest='bitcoind_rpc_password')
760     
761     args = parser.parse_args()
762     
763     if args.debug:
764         p2pool_init.DEBUG = True
765     
766     if args.logfile is None:
767         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), args.net_name + ('_testnet' if args.testnet else '') + '.log')
768     
769     class LogFile(object):
770         def __init__(self, filename):
771             self.filename = filename
772             self.inner_file = None
773             self.reopen()
774         def reopen(self):
775             if self.inner_file is not None:
776                 self.inner_file.close()
777             open(self.filename, 'a').close()
778             f = open(self.filename, 'rb')
779             f.seek(0, os.SEEK_END)
780             length = f.tell()
781             if length > 100*1000*1000:
782                 f.seek(-1000*1000, os.SEEK_END)
783                 while True:
784                     if f.read(1) in ('', '\n'):
785                         break
786                 data = f.read()
787                 f.close()
788                 f = open(self.filename, 'wb')
789                 f.write(data)
790             f.close()
791             self.inner_file = open(self.filename, 'a')
792         def write(self, data):
793             self.inner_file.write(data)
794         def flush(self):
795             self.inner_file.flush()
796     class TeePipe(object):
797         def __init__(self, outputs):
798             self.outputs = outputs
799         def write(self, data):
800             for output in self.outputs:
801                 output.write(data)
802         def flush(self):
803             for output in self.outputs:
804                 output.flush()
805     class TimestampingPipe(object):
806         def __init__(self, inner_file):
807             self.inner_file = inner_file
808             self.buf = ''
809             self.softspace = 0
810         def write(self, data):
811             buf = self.buf + data
812             lines = buf.split('\n')
813             for line in lines[:-1]:
814                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
815                 self.inner_file.flush()
816             self.buf = lines[-1]
817         def flush(self):
818             pass
819     logfile = LogFile(args.logfile)
820     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
821     if hasattr(signal, "SIGUSR1"):
822         def sigusr1(signum, frame):
823             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
824             logfile.reopen()
825             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
826         signal.signal(signal.SIGUSR1, sigusr1)
827     task.LoopingCall(logfile.reopen).start(5)
828     
829     args.net = p2pool.nets[args.net_name + ('_testnet' if args.testnet else '')]
830     
831     if args.bitcoind_rpc_port is None:
832         args.bitcoind_rpc_port = args.net.BITCOIN_RPC_PORT
833     
834     if args.bitcoind_p2p_port is None:
835         args.bitcoind_p2p_port = args.net.BITCOIN_P2P_PORT
836     
837     if args.p2pool_port is None:
838         args.p2pool_port = args.net.P2P_PORT
839     
840     if args.worker_port is None:
841         args.worker_port = args.net.WORKER_PORT
842     
843     if args.address is not None:
844         try:
845             args.pubkey_hash = bitcoin.data.address_to_pubkey_hash(args.address, args.net)
846         except Exception, e:
847             parser.error('error parsing address: ' + repr(e))
848     else:
849         args.pubkey_hash = None
850     
851     if (args.merged_url is None) ^ (args.merged_userpass is None):
852         parser.error('must specify --merged-url and --merged-userpass')
853     
854     reactor.callWhenRunning(main, args)
855     reactor.run()