made get_payout_script2 more graceful when bitcoind's validateaddress does not return...
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2
3 from __future__ import division
4
5 import argparse
6 import datetime
7 import itertools
8 import os
9 import random
10 import sqlite3
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import db, expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         target=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
52
53 @defer.inlineCallbacks
54 def main(args, net):
55     try:
56         print 'p2pool (version %s)' % (p2pool.__version__,)
57         print
58         try:
59             from . import draw
60         except ImportError:
61             draw = None
62             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
63             print
64         
65         # connect to bitcoind over JSON-RPC and do initial getmemorypool
66         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
67         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
68         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
69         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.BITCOIN_RPC_CHECK)(bitcoind)
70         if not good:
71             print "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
72             return
73         temp_work = yield getwork(bitcoind)
74         print '    ...success!'
75         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
76         print
77         
78         # connect to bitcoind over bitcoin-p2p
79         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
80         factory = bitcoin_p2p.ClientFactory(net)
81         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
82         yield factory.getProtocol() # waits until handshake is successful
83         print '    ...success!'
84         print
85         
86         if args.pubkey_hash is None:
87             print 'Getting payout address from bitcoind...'
88             my_script = yield get_payout_script2(bitcoind, net)
89         else:
90             print 'Computing payout script from provided address....'
91             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
92         print '    ...success!'
93         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net)
94         print
95         
96         print 'Loading cached block headers...'
97         ht = bitcoin_p2p.HeightTracker(factory, net.NAME + '_headers.dat')
98         print '   ...done loading %i cached block headers.' % (len(ht.tracker.shares),)
99         print
100         
101         tracker = p2pool_data.OkayTracker(net)
102         shared_share_hashes = set()
103         ss = p2pool_data.ShareStore(os.path.join(os.path.dirname(sys.argv[0]), net.NAME + '_shares.'), net)
104         known_verified = set()
105         print "Loading shares..."
106         for i, (mode, contents) in enumerate(ss.get_shares()):
107             if mode == 'share':
108                 if contents.hash in tracker.shares:
109                     continue
110                 shared_share_hashes.add(contents.hash)
111                 contents.time_seen = 0
112                 tracker.add(contents)
113                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
114                     print "    %i" % (len(tracker.shares),)
115             elif mode == 'verified_hash':
116                 known_verified.add(contents)
117             else:
118                 raise AssertionError()
119         print "    ...inserting %i verified shares..." % (len(known_verified),)
120         for h in known_verified:
121             if h not in tracker.shares:
122                 ss.forget_verified_share(h)
123                 continue
124             tracker.verified.add(tracker.shares[h])
125         print "    ...done loading %i shares!" % (len(tracker.shares),)
126         print
127         tracker.added.watch(lambda share: ss.add_share(share))
128         tracker.verified.added.watch(lambda share: ss.add_verified_hash(share.hash))
129         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
130         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
131         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
132         
133         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
134         
135         # information affecting work that should trigger a long-polling update
136         current_work = variable.Variable(None)
137         # information affecting work that should not trigger a long-polling update
138         current_work2 = variable.Variable(None)
139         
140         work_updated = variable.Event()
141         
142         requested = expiring_dict.ExpiringDict(300)
143         
144         @defer.inlineCallbacks
145         def set_real_work1():
146             work = yield getwork(bitcoind)
147             changed = work['previous_block_hash'] != current_work.value['previous_block'] if current_work.value is not None else True
148             current_work.set(dict(
149                 version=work['version'],
150                 previous_block=work['previous_block_hash'],
151                 target=work['target'],
152                 best_share_hash=current_work.value['best_share_hash'] if current_work.value is not None else None,
153                 aux_work=current_work.value['aux_work'] if current_work.value is not None else None,
154             ))
155             current_work2.set(dict(
156                 time=work['time'],
157                 transactions=work['transactions'],
158                 subsidy=work['subsidy'],
159                 clock_offset=time.time() - work['time'],
160                 last_update=time.time(),
161             ))
162             if changed:
163                 set_real_work2()
164         
165         def set_real_work2():
166             best, desired = tracker.think(ht, current_work.value['previous_block'], time.time() - current_work2.value['clock_offset'])
167             
168             t = dict(current_work.value)
169             t['best_share_hash'] = best
170             current_work.set(t)
171             
172             t = time.time()
173             for peer2, share_hash in desired:
174                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
175                     continue
176                 last_request_time, count = requested.get(share_hash, (None, 0))
177                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
178                     continue
179                 potential_peers = set()
180                 for head in tracker.tails[share_hash]:
181                     potential_peers.update(peer_heads.get(head, set()))
182                 potential_peers = [peer for peer in potential_peers if peer.connected2]
183                 if count == 0 and peer2 is not None and peer2.connected2:
184                     peer = peer2
185                 else:
186                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
187                     if peer is None:
188                         continue
189                 
190                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
191                 peer.send_getshares(
192                     hashes=[share_hash],
193                     parents=2000,
194                     stops=list(set(tracker.heads) | set(
195                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
196                     ))[:100],
197                 )
198                 requested[share_hash] = t, count + 1
199         
200         print 'Initializing work...'
201         yield set_real_work1()
202         set_real_work2()
203         print '    ...success!'
204         print
205         
206         @defer.inlineCallbacks
207         def set_merged_work():
208             if not args.merged_url:
209                 return
210             while True:
211                 merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
212                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
213                 x = dict(current_work.value)
214                 x['aux_work'] = dict(
215                     hash=int(auxblock['hash'], 16),
216                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
217                     chain_id=auxblock['chainid'],
218                 )
219                 #print x['aux_work']
220                 current_work.set(x)
221                 yield deferral.sleep(1)
222         set_merged_work()
223         
224         start_time = time.time() - current_work2.value['clock_offset']
225         
226         # setup p2p logic and join p2pool network
227         
228         def p2p_shares(shares, peer=None):
229             if len(shares) > 5:
230                 print 'Processing %i shares...' % (len(shares),)
231             
232             new_count = 0
233             for share in shares:
234                 if share.hash in tracker.shares:
235                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
236                     continue
237                 
238                 new_count += 1
239                 
240                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
241                 
242                 tracker.add(share)
243             
244             if shares and peer is not None:
245                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
246             
247             if new_count:
248                 set_real_work2()
249             
250             if len(shares) > 5:
251                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
252         
253         @tracker.verified.added.watch
254         def _(share):
255             if share.pow_hash <= share.header['target']:
256                 if factory.conn.value is not None:
257                     factory.conn.value.send_block(block=share.as_block(tracker, net))
258                 else:
259                     print 'No bitcoind connection! Erp!'
260                 print
261                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
262                 print
263         
264         def p2p_share_hashes(share_hashes, peer):
265             t = time.time()
266             get_hashes = []
267             for share_hash in share_hashes:
268                 if share_hash in tracker.shares:
269                     continue
270                 last_request_time, count = requested.get(share_hash, (None, 0))
271                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
272                     continue
273                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
274                 get_hashes.append(share_hash)
275                 requested[share_hash] = t, count + 1
276             
277             if share_hashes and peer is not None:
278                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
279             if get_hashes:
280                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
281         
282         def p2p_get_shares(share_hashes, parents, stops, peer):
283             parents = min(parents, 1000//len(share_hashes))
284             stops = set(stops)
285             shares = []
286             for share_hash in share_hashes:
287                 for share in itertools.islice(tracker.get_chain_known(share_hash), parents + 1):
288                     if share.hash in stops:
289                         break
290                     shares.append(share)
291             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
292             peer.sendShares(shares)
293         
294         print 'Joining p2pool network using TCP port %i...' % (args.p2pool_port,)
295         
296         def parse(x):
297             if ':' in x:
298                 ip, port = x.split(':')
299                 return ip, int(port)
300             else:
301                 return x, net.P2P_PORT
302         
303         nodes = set([
304             ('72.14.191.28', net.P2P_PORT),
305             ('62.204.197.159', net.P2P_PORT),
306             ('142.58.248.28', net.P2P_PORT),
307             ('94.23.34.145', net.P2P_PORT),
308         ])
309         for host in [
310             'p2pool.forre.st',
311             'dabuttonfactory.com',
312         ]:
313             try:
314                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
315             except:
316                 log.err(None, 'Error resolving bootstrap node IP:')
317         
318         if net.NAME == 'litecoin':
319             nodes.add(((yield reactor.resolve('liteco.in')), net.P2P_PORT))
320         
321         p2p_node = p2p.Node(
322             current_work=current_work,
323             port=args.p2pool_port,
324             net=net,
325             addr_store=db.SQLiteDict(sqlite3.connect(os.path.join(os.path.dirname(sys.argv[0]), 'addrs.dat'), isolation_level=None), net.NAME),
326             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
327         )
328         p2p_node.handle_shares = p2p_shares
329         p2p_node.handle_share_hashes = p2p_share_hashes
330         p2p_node.handle_get_shares = p2p_get_shares
331         
332         p2p_node.start()
333         
334         # send share when the chain changes to their chain
335         def work_changed(new_work):
336             #print 'Work changed:', new_work
337             shares = []
338             for share in tracker.get_chain_known(new_work['best_share_hash']):
339                 if share.hash in shared_share_hashes:
340                     break
341                 shared_share_hashes.add(share.hash)
342                 shares.append(share)
343             
344             for peer in p2p_node.peers.itervalues():
345                 peer.sendShares([share for share in shares if share.peer is not peer])
346         
347         current_work.changed.watch(work_changed)
348         
349         print '    ...success!'
350         print
351         
352         @defer.inlineCallbacks
353         def upnp_thread():
354             while True:
355                 try:
356                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
357                     if is_lan:
358                         pm = yield portmapper.get_port_mapper()
359                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
360                 except defer.TimeoutError:
361                     pass
362                 except:
363                     if p2pool.DEBUG:
364                         log.err(None, "UPnP error:")
365                 yield deferral.sleep(random.expovariate(1/120))
366         
367         if args.upnp:
368             upnp_thread()
369         
370         # start listening for workers with a JSON-RPC server
371         
372         print 'Listening for workers on port %i...' % (args.worker_port,)
373         
374         # setup worker logic
375         
376         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
377         run_identifier = struct.pack('<I', random.randrange(2**32))
378         
379         share_counter = skiplists.CountsSkipList(tracker, run_identifier)
380         removed_unstales = set()
381         def get_share_counts(doa=False):
382             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
383             matching_in_chain = share_counter(current_work.value['best_share_hash'], height) | removed_unstales
384             shares_in_chain = my_shares & matching_in_chain
385             stale_shares = my_shares - matching_in_chain
386             if doa:
387                 stale_doa_shares = stale_shares & doa_shares
388                 stale_not_doa_shares = stale_shares - stale_doa_shares
389                 return len(shares_in_chain) + len(stale_shares), len(stale_doa_shares), len(stale_not_doa_shares)
390             return len(shares_in_chain) + len(stale_shares), len(stale_shares)
391         @tracker.verified.removed.watch
392         def _(share):
393             if share.hash in my_shares and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
394                 removed_unstales.add(share.hash)
395         
396         
397         def get_payout_script_from_username(request):
398             user = worker_interface.get_username(request)
399             if user is None:
400                 return None
401             try:
402                 return bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(user, net))
403             except: # XXX blah
404                 return None
405         
406         def compute(request):
407             state = current_work.value
408             payout_script = get_payout_script_from_username(request)
409             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
410                 payout_script = my_script
411             if state['best_share_hash'] is None and net.PERSIST:
412                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
413             if len(p2p_node.peers) == 0 and net.PERSIST:
414                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
415             if time.time() > current_work2.value['last_update'] + 60:
416                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
417             
418             if state['aux_work'] is not None:
419                 aux_str = '\xfa\xbemm' + bitcoin_data.HashType().pack(state['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0)
420             else:
421                 aux_str = ''
422             
423             # XXX assuming generate_tx is smallish here..
424             def get_stale_frac():
425                 shares, stale_shares = get_share_counts()
426                 if shares == 0:
427                     return ""
428                 frac = stale_shares/shares
429                 return 2*struct.pack('<H', int(65535*frac + .5))
430             subsidy = current_work2.value['subsidy']
431             
432             
433             timestamp = current_work2.value['time']
434             previous_share = tracker.shares[state['best_share_hash']] if state['best_share_hash'] is not None else None
435             share_info, generate_tx = p2pool_data.generate_transaction(
436                 tracker=tracker,
437                 share_data=dict(
438                     previous_share_hash=state['best_share_hash'],
439                     coinbase=aux_str,
440                     nonce=run_identifier + struct.pack('<Q', random.randrange(2**64)),
441                     new_script=payout_script,
442                     subsidy=subsidy,
443                     donation=math.perfect_round(65535*args.donation_percentage/100),
444                     stale_frac=(lambda shares, stales:
445                         255 if shares == 0 else math.perfect_round(254*stales/shares)
446                     )(*get_share_counts()),
447                 ),
448                 block_target=state['target'],
449                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
450                 net=net,
451             )
452             
453             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (bitcoin_data.target_to_difficulty(share_info['target']), (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) -subsidy//200)*1e-8, net.BITCOIN_SYMBOL, subsidy*1e-8, net.BITCOIN_SYMBOL, len(current_work2.value['transactions']))
454             #print 'Target: %x' % (p2pool_data.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target'],)
455             #, have', shares.count(my_script) - 2, 'share(s) in the current chain. Fee:', sum(tx.value_in - tx.value_out for tx in extra_txs)/100000000
456             transactions = [generate_tx] + list(current_work2.value['transactions'])
457             merkle_root = bitcoin_data.merkle_hash(transactions)
458             merkle_root_to_transactions[merkle_root] = share_info, transactions
459             
460             target2 = share_info['target']
461             times[merkle_root] = time.time()
462             #print 'SENT', 2**256//p2pool_data.coinbase_type.unpack(generate_tx['tx_ins'][0]['script'])['share_data']['target']
463             return bitcoin_getwork.BlockAttempt(state['version'], state['previous_block'], merkle_root, timestamp, state['target'], target2), state['best_share_hash']
464         
465         my_shares = set()
466         doa_shares = set()
467         times = {}
468         
469         def got_response(header, request):
470             try:
471                 user = worker_interface.get_username(request)
472                 # match up with transactions
473                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
474                 if xxx is None:
475                     print '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
476                     return False
477                 share_info, transactions = xxx
478                 
479                 hash_ = bitcoin_data.block_header_type.hash256(header)
480                 
481                 pow_hash = net.BITCOIN_POW_FUNC(header)
482                 
483                 if pow_hash <= header['target'] or p2pool.DEBUG:
484                     if factory.conn.value is not None:
485                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
486                     else:
487                         print 'No bitcoind connection! Erp!'
488                     if pow_hash <= header['target']:
489                         print
490                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
491                         print
492                 
493                 if current_work.value['aux_work'] is not None and pow_hash <= current_work.value['aux_work']['target']:
494                     try:
495                         aux_pow = dict(
496                             merkle_tx=dict(
497                                 tx=transactions[0],
498                                 block_hash=hash_,
499                                 merkle_branch=[x['hash'] for x in p2pool_data.calculate_merkle_branch(transactions, 0)],
500                                 index=0,
501                             ),
502                             merkle_branch=[],
503                             index=0,
504                             parent_block_header=header,
505                         )
506                         
507                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
508                         #print a, b
509                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
510                         def _(res):
511                             print "MERGED RESULT:", res
512                         merged.rpc_getauxblock(a, b).addBoth(_)
513                     except:
514                         log.err(None, 'Error while processing merged mining POW:')
515                 
516                 target = share_info['target']
517                 if pow_hash > target:
518                     print 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, target)
519                     return False
520                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
521                 my_shares.add(share.hash)
522                 if share.previous_hash != current_work.value['best_share_hash']:
523                     doa_shares.add(share.hash)
524                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (user, p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - times[header['merkle_root']]) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
525                 good = share.previous_hash == current_work.value['best_share_hash']
526                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
527                 p2p_shares([share])
528                 # eg. good = share.hash == current_work.value['best_share_hash'] here
529                 return good
530             except:
531                 log.err(None, 'Error processing data received from worker:')
532                 return False
533         
534         web_root = worker_interface.WorkerInterface(compute, got_response, current_work.changed)
535         
536         def get_rate():
537             if current_work.value['best_share_hash'] is not None:
538                 height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
539                 att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], net, min(height - 1, 720))
540                 fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
541                 return json.dumps(int(att_s / (1. - (math.median(fracs) if fracs else 0))))
542             return json.dumps(None)
543         
544         def get_users():
545             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
546             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
547             res = {}
548             for script in sorted(weights, key=lambda s: weights[s]):
549                 res[bitcoin_data.script2_to_human(script, net)] = weights[script]/total_weight
550             return json.dumps(res)
551         
552         class WebInterface(resource.Resource):
553             def __init__(self, func, mime_type):
554                 self.func, self.mime_type = func, mime_type
555             
556             def render_GET(self, request):
557                 request.setHeader('Content-Type', self.mime_type)
558                 return self.func()
559         
560         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
561         web_root.putChild('users', WebInterface(get_users, 'application/json'))
562         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
563         if draw is not None:
564             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
565         
566         reactor.listenTCP(args.worker_port, server.Site(web_root))
567         
568         print '    ...success!'
569         print
570         
571         # done!
572         
573         # do new getwork when a block is heard on the p2p interface
574         
575         def new_block(block_hash):
576             work_updated.happened()
577         factory.new_block.watch(new_block)
578         
579         print 'Started successfully!'
580         print
581         
582         ht.updated.watch(set_real_work2)
583         
584         @defer.inlineCallbacks
585         def work1_thread():
586             while True:
587                 flag = work_updated.get_deferred()
588                 try:
589                     yield set_real_work1()
590                 except:
591                     log.err()
592                 yield defer.DeferredList([flag, deferral.sleep(random.uniform(1, 10))], fireOnOneCallback=True)
593         
594         @defer.inlineCallbacks
595         def work2_thread():
596             while True:
597                 try:
598                     set_real_work2()
599                 except:
600                     log.err()
601                 yield deferral.sleep(random.expovariate(1/20))
602         
603         work1_thread()
604         work2_thread()
605         
606         
607         if hasattr(signal, 'SIGALRM'):
608             def watchdog_handler(signum, frame):
609                 print 'Watchdog timer went off at:'
610                 traceback.print_stack()
611             
612             signal.signal(signal.SIGALRM, watchdog_handler)
613             task.LoopingCall(signal.alarm, 30).start(1)
614         
615         
616         pool_str = None;
617         while True:
618             yield deferral.sleep(3)
619             try:
620                 if time.time() > current_work2.value['last_update'] + 60:
621                     print '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
622                 if current_work.value['best_share_hash'] is not None:
623                     height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
624                     if height > 2:
625                         att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], net, min(height - 1, 720))
626                         weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
627                         shares, stale_doa_shares, stale_not_doa_shares = get_share_counts(True)
628                         stale_shares = stale_doa_shares + stale_not_doa_shares
629                         fracs = [share.stale_frac for share in itertools.islice(tracker.get_chain_known(current_work.value['best_share_hash']), 120) if share.stale_frac is not None]
630                         str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
631                             math.format(int(att_s / (1. - (math.median(fracs) if fracs else 0)))),
632                             height,
633                             len(tracker.verified.shares),
634                             len(tracker.shares),
635                             weights.get(my_script, 0)/total_weight*100,
636                             math.format(int(weights.get(my_script, 0)*att_s//total_weight / (1. - (math.median(fracs) if fracs else 0)))),
637                             shares,
638                             stale_not_doa_shares,
639                             stale_doa_shares,
640                             len(p2p_node.peers),
641                         ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
642                         if (str != pool_str):
643                             print str;
644                             pool_str = str;
645                         if fracs:
646                             med = math.median(fracs)
647                             print 'Median stale proportion:', med
648                             if shares:
649                                 print '    Own:', stale_shares/shares
650                                 if med < .99:
651                                     print '    Own efficiency: %.02f%%' % (100*(1 - stale_shares/shares)/(1 - med),)
652             
653             
654             except:
655                 log.err()
656     except:
657         log.err(None, 'Fatal error:')
658     finally:
659         reactor.stop()
660
661 def run():
662     class FixedArgumentParser(argparse.ArgumentParser):
663         def _read_args_from_files(self, arg_strings):
664             # expand arguments referencing files
665             new_arg_strings = []
666             for arg_string in arg_strings:
667                 
668                 # for regular arguments, just add them back into the list
669                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
670                     new_arg_strings.append(arg_string)
671                 
672                 # replace arguments referencing files with the file content
673                 else:
674                     try:
675                         args_file = open(arg_string[1:])
676                         try:
677                             arg_strings = []
678                             for arg_line in args_file.read().splitlines():
679                                 for arg in self.convert_arg_line_to_args(arg_line):
680                                     arg_strings.append(arg)
681                             arg_strings = self._read_args_from_files(arg_strings)
682                             new_arg_strings.extend(arg_strings)
683                         finally:
684                             args_file.close()
685                     except IOError:
686                         err = sys.exc_info()[1]
687                         self.error(str(err))
688             
689             # return the modified argument list
690             return new_arg_strings
691         
692         def convert_arg_line_to_args(self, arg_line):
693             return [arg for arg in arg_line.split() if arg.strip()]
694     
695     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
696     parser.add_argument('--version', action='version', version=p2pool.__version__)
697     parser.add_argument('--net',
698         help='use specified network (default: bitcoin)',
699         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
700     parser.add_argument('--testnet',
701         help='''use the network's testnet''',
702         action='store_const', const=True, default=False, dest='testnet')
703     parser.add_argument('--debug',
704         help='debugging mode',
705         action='store_const', const=True, default=False, dest='debug')
706     parser.add_argument('-a', '--address',
707         help='generate to this address (defaults to requesting one from bitcoind)',
708         type=str, action='store', default=None, dest='address')
709     parser.add_argument('--logfile',
710         help='''log to specific file (defaults to <network_name>.log in run_p2pool.py's directory)''',
711         type=str, action='store', default=None, dest='logfile')
712     parser.add_argument('--merged-url',
713         help='call getauxblock on this url to get work for merged mining',
714         type=str, action='store', default=None, dest='merged_url')
715     parser.add_argument('--merged-userpass',
716         help='merge daemon user and password, separated by a colon. Example: ncuser:ncpass',
717         type=str, action='store', default=None, dest='merged_userpass')
718     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
719         help='percentage amount to donate to author of p2pool. Default: 0.5',
720         type=float, action='store', default=0.5, dest='donation_percentage')
721     
722     p2pool_group = parser.add_argument_group('p2pool interface')
723     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
724         help='use TCP port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
725         type=int, action='store', default=None, dest='p2pool_port')
726     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
727         help='connect to existing p2pool node at ADDR listening on TCP port PORT (defaults to default p2pool P2P port), in addition to builtin addresses',
728         type=str, action='append', default=[], dest='p2pool_nodes')
729     parser.add_argument('--disable-upnp',
730         help='''don't attempt to forward p2pool P2P port from the WAN to this computer using UPnP''',
731         action='store_false', default=True, dest='upnp')
732     
733     worker_group = parser.add_argument_group('worker interface')
734     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
735         help='listen on PORT for RPC connections from miners asking for work and providing responses (default:%s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
736         type=int, action='store', default=None, dest='worker_port')
737     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
738         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee . default: 0''',
739         type=float, action='store', default=0, dest='worker_fee')
740     
741     bitcoind_group = parser.add_argument_group('bitcoind interface')
742     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
743         help='connect to a bitcoind at this address (default: 127.0.0.1)',
744         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
745     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
746         help='connect to a bitcoind at this port over the RPC interface - used to get the current highest block via getmemorypool (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_RPC_PORT) for _, n in sorted(networks.realnets.items())),
747         type=int, action='store', default=None, dest='bitcoind_rpc_port')
748     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
749         help='connect to a bitcoind at this port over the p2p interface - used to submit blocks and get the pubkey to generate to via an IP transaction (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.BITCOIN_P2P_PORT) for _, n in sorted(networks.realnets.items())),
750         type=int, action='store', default=None, dest='bitcoind_p2p_port')
751     
752     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
753         help='bitcoind RPC interface username (default: empty)',
754         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
755     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
756         help='bitcoind RPC interface password',
757         type=str, action='store', dest='bitcoind_rpc_password')
758     
759     args = parser.parse_args()
760     
761     if args.debug:
762         p2pool.DEBUG = True
763     
764     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
765     
766     if args.logfile is None:
767         args.logfile = os.path.join(os.path.dirname(sys.argv[0]), net.NAME + '.log')
768     
769     class LogFile(object):
770         def __init__(self, filename):
771             self.filename = filename
772             self.inner_file = None
773             self.reopen()
774         def reopen(self):
775             if self.inner_file is not None:
776                 self.inner_file.close()
777             open(self.filename, 'a').close()
778             f = open(self.filename, 'rb')
779             f.seek(0, os.SEEK_END)
780             length = f.tell()
781             if length > 100*1000*1000:
782                 f.seek(-1000*1000, os.SEEK_END)
783                 while True:
784                     if f.read(1) in ('', '\n'):
785                         break
786                 data = f.read()
787                 f.close()
788                 f = open(self.filename, 'wb')
789                 f.write(data)
790             f.close()
791             self.inner_file = open(self.filename, 'a')
792         def write(self, data):
793             self.inner_file.write(data)
794         def flush(self):
795             self.inner_file.flush()
796     class TeePipe(object):
797         def __init__(self, outputs):
798             self.outputs = outputs
799         def write(self, data):
800             for output in self.outputs:
801                 output.write(data)
802         def flush(self):
803             for output in self.outputs:
804                 output.flush()
805     class TimestampingPipe(object):
806         def __init__(self, inner_file):
807             self.inner_file = inner_file
808             self.buf = ''
809             self.softspace = 0
810         def write(self, data):
811             buf = self.buf + data
812             lines = buf.split('\n')
813             for line in lines[:-1]:
814                 self.inner_file.write('%s %s\n' % (datetime.datetime.now().strftime("%H:%M:%S.%f"), line))
815                 self.inner_file.flush()
816             self.buf = lines[-1]
817         def flush(self):
818             pass
819     logfile = LogFile(args.logfile)
820     sys.stdout = sys.stderr = log.DefaultObserver.stderr = TimestampingPipe(TeePipe([sys.stderr, logfile]))
821     if hasattr(signal, "SIGUSR1"):
822         def sigusr1(signum, frame):
823             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
824             logfile.reopen()
825             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
826         signal.signal(signal.SIGUSR1, sigusr1)
827     task.LoopingCall(logfile.reopen).start(5)
828     
829     if args.bitcoind_rpc_port is None:
830         args.bitcoind_rpc_port = net.BITCOIN_RPC_PORT
831     
832     if args.bitcoind_p2p_port is None:
833         args.bitcoind_p2p_port = net.BITCOIN_P2P_PORT
834     
835     if args.p2pool_port is None:
836         args.p2pool_port = net.P2P_PORT
837     
838     if args.worker_port is None:
839         args.worker_port = net.WORKER_PORT
840     
841     if args.address is not None:
842         try:
843             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net)
844         except Exception, e:
845             parser.error('error parsing address: ' + repr(e))
846     else:
847         args.pubkey_hash = None
848     
849     if (args.merged_url is None) ^ (args.merged_userpass is None):
850         parser.error('must specify --merged-url and --merged-userpass')
851     
852     reactor.callWhenRunning(main, args, net)
853     reactor.run()