fad52bb8a0846a1c9dac32fe17bb6b9e8bddb8bd
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @deferral.retry('Error creating payout script:', 10)
43 @defer.inlineCallbacks
44 def get_payout_script2(bitcoind, net2):
45     address = yield bitcoind.rpc_getaccountaddress('p2pool')
46     validate_response = yield bitcoind.rpc_validateaddress(address)
47     if 'pubkey' not in validate_response:
48         print '    Pubkey request failed. Falling back to payout to address.'
49         defer.returnValue(bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash(address, net2)))
50     pubkey = validate_response['pubkey'].decode('hex')
51     assert bitcoin_data.pubkey_to_address(pubkey, net2) == address
52     defer.returnValue(bitcoin_data.pubkey_to_script2(pubkey))
53
54 @defer.inlineCallbacks
55 def main(args, net, datadir_path):
56     try:
57         print 'p2pool (version %s)' % (p2pool.__version__,)
58         print
59         try:
60             from . import draw
61         except ImportError:
62             draw = None
63             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
64             print
65         
66         bitcoin_data.Type.enable_caching()
67         
68         # connect to bitcoind over JSON-RPC and do initial getmemorypool
69         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
70         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
71         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
72         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
73         if not good:
74             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
75             return
76         temp_work = yield getwork(bitcoind)
77         print '    ...success!'
78         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
79         print
80         
81         # connect to bitcoind over bitcoin-p2p
82         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
83         factory = bitcoin_p2p.ClientFactory(net.PARENT)
84         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
85         yield factory.getProtocol() # waits until handshake is successful
86         print '    ...success!'
87         print
88         
89         if args.pubkey_hash is None:
90             print 'Getting payout address from bitcoind...'
91             my_script = yield get_payout_script2(bitcoind, net.PARENT)
92         else:
93             print 'Computing payout script from provided address....'
94             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
95         print '    ...success!'
96         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
97         print
98         
99         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
100         
101         tracker = p2pool_data.OkayTracker(net)
102         shared_share_hashes = set()
103         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
104         known_verified = set()
105         print "Loading shares..."
106         for i, (mode, contents) in enumerate(ss.get_shares()):
107             if mode == 'share':
108                 if contents.hash in tracker.shares:
109                     continue
110                 shared_share_hashes.add(contents.hash)
111                 contents.time_seen = 0
112                 tracker.add(contents)
113                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
114                     print "    %i" % (len(tracker.shares),)
115             elif mode == 'verified_hash':
116                 known_verified.add(contents)
117             else:
118                 raise AssertionError()
119         print "    ...inserting %i verified shares..." % (len(known_verified),)
120         for h in known_verified:
121             if h not in tracker.shares:
122                 ss.forget_verified_share(h)
123                 continue
124             tracker.verified.add(tracker.shares[h])
125         print "    ...done loading %i shares!" % (len(tracker.shares),)
126         print
127         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
128         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
129         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
130         
131         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
132         
133         pre_current_work = variable.Variable(None)
134         pre_merged_work = variable.Variable(None)
135         # information affecting work that should trigger a long-polling update
136         current_work = variable.Variable(None)
137         # information affecting work that should not trigger a long-polling update
138         current_work2 = variable.Variable(None)
139         
140         requested = expiring_dict.ExpiringDict(300)
141         
142         @defer.inlineCallbacks
143         def set_real_work1():
144             work = yield getwork(bitcoind)
145             current_work2.set(dict(
146                 time=work['time'],
147                 transactions=work['transactions'],
148                 subsidy=work['subsidy'],
149                 clock_offset=time.time() - work['time'],
150                 last_update=time.time(),
151             )) # second set first because everything hooks on the first
152             pre_current_work.set(dict(
153                 version=work['version'],
154                 previous_block=work['previous_block_hash'],
155                 bits=work['bits'],
156             ))
157         factory.new_block.watch(lambda block_hash: set_real_work1())
158         
159         def set_real_work2():
160             best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
161             
162             t = dict(pre_current_work.value)
163             t['best_share_hash'] = best
164             t['aux_work'] = pre_merged_work.value
165             current_work.set(t)
166             
167             t = time.time()
168             for peer2, share_hash in desired:
169                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
170                     continue
171                 last_request_time, count = requested.get(share_hash, (None, 0))
172                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
173                     continue
174                 potential_peers = set()
175                 for head in tracker.tails[share_hash]:
176                     potential_peers.update(peer_heads.get(head, set()))
177                 potential_peers = [peer for peer in potential_peers if peer.connected2]
178                 if count == 0 and peer2 is not None and peer2.connected2:
179                     peer = peer2
180                 else:
181                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
182                     if peer is None:
183                         continue
184                 
185                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
186                 peer.send_getshares(
187                     hashes=[share_hash],
188                     parents=2000,
189                     stops=list(set(tracker.heads) | set(
190                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
191                     ))[:100],
192                 )
193                 requested[share_hash] = t, count + 1
194         pre_current_work.changed.watch(lambda _: set_real_work2())
195         
196         print 'Initializing work...'
197         yield set_real_work1()
198         print '    ...success!'
199         print
200         
201         pre_merged_work.changed.watch(lambda _: set_real_work2())
202         ht.updated.watch(set_real_work2)
203         
204         @defer.inlineCallbacks
205         def set_merged_work():
206             if not args.merged_url:
207                 return
208             merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
209             while True:
210                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged.rpc_getauxblock)()
211                 pre_merged_work.set(dict(
212                     hash=int(auxblock['hash'], 16),
213                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
214                     chain_id=auxblock['chainid'],
215                 ))
216                 yield deferral.sleep(1)
217         set_merged_work()
218         
219         @pre_merged_work.changed.watch
220         def _(new_merged_work):
221             print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
222         
223         start_time = time.time() - current_work2.value['clock_offset']
224         
225         # setup p2p logic and join p2pool network
226         
227         def p2p_shares(shares, peer=None):
228             if len(shares) > 5:
229                 print 'Processing %i shares...' % (len(shares),)
230             
231             new_count = 0
232             for share in shares:
233                 if share.hash in tracker.shares:
234                     #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
235                     continue
236                 
237                 new_count += 1
238                 
239                 #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
240                 
241                 tracker.add(share)
242             
243             if shares and peer is not None:
244                 peer_heads.setdefault(shares[0].hash, set()).add(peer)
245             
246             if new_count:
247                 set_real_work2()
248             
249             if len(shares) > 5:
250                 print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
251         
252         @tracker.verified.added.watch
253         def _(share):
254             if share.pow_hash <= share.header['bits'].target:
255                 if factory.conn.value is not None:
256                     factory.conn.value.send_block(block=share.as_block(tracker))
257                 else:
258                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
259                 print
260                 print 'GOT BLOCK! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash,)
261                 print
262         
263         def p2p_share_hashes(share_hashes, peer):
264             t = time.time()
265             get_hashes = []
266             for share_hash in share_hashes:
267                 if share_hash in tracker.shares:
268                     continue
269                 last_request_time, count = requested.get(share_hash, (None, 0))
270                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
271                     continue
272                 print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
273                 get_hashes.append(share_hash)
274                 requested[share_hash] = t, count + 1
275             
276             if share_hashes and peer is not None:
277                 peer_heads.setdefault(share_hashes[0], set()).add(peer)
278             if get_hashes:
279                 peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
280         
281         def p2p_get_shares(share_hashes, parents, stops, peer):
282             parents = min(parents, 1000//len(share_hashes))
283             stops = set(stops)
284             shares = []
285             for share_hash in share_hashes:
286                 for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
287                     if share.hash in stops:
288                         break
289                     shares.append(share)
290             print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
291             peer.sendShares(shares)
292         
293         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
294         
295         def parse(x):
296             if ':' in x:
297                 ip, port = x.split(':')
298                 return ip, int(port)
299             else:
300                 return x, net.P2P_PORT
301         
302         nodes = set([
303             ('72.14.191.28', net.P2P_PORT),
304             ('62.204.197.159', net.P2P_PORT),
305             ('142.58.248.28', net.P2P_PORT),
306             ('94.23.34.145', net.P2P_PORT),
307         ])
308         for host in [
309             'p2pool.forre.st',
310             'dabuttonfactory.com',
311             ] + (['liteco.in'] if net.NAME == 'litecoin' else []) + [
312         ]:
313             try:
314                 nodes.add(((yield reactor.resolve(host)), net.P2P_PORT))
315             except:
316                 log.err(None, 'Error resolving bootstrap node IP:')
317         
318         addrs = {}
319         try:
320             addrs = dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt')))
321         except:
322             print >>sys.stderr, "error reading addrs"
323         
324         def save_addrs():
325             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in addrs.iteritems())
326         task.LoopingCall(save_addrs).start(60)
327         
328         p2p_node = p2p.Node(
329             current_work=current_work,
330             port=args.p2pool_port,
331             net=net,
332             addr_store=addrs,
333             preferred_addrs=set(map(parse, args.p2pool_nodes)) | nodes,
334         )
335         p2p_node.handle_shares = p2p_shares
336         p2p_node.handle_share_hashes = p2p_share_hashes
337         p2p_node.handle_get_shares = p2p_get_shares
338         
339         p2p_node.start()
340         
341         # send share when the chain changes to their chain
342         def work_changed(new_work):
343             #print 'Work changed:', new_work
344             shares = []
345             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
346                 if share.hash in shared_share_hashes:
347                     break
348                 shared_share_hashes.add(share.hash)
349                 shares.append(share)
350             
351             for peer in p2p_node.peers.itervalues():
352                 peer.sendShares([share for share in shares if share.peer is not peer])
353         
354         current_work.changed.watch(work_changed)
355         
356         def save_shares():
357             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
358                 ss.add_share(share)
359                 if share.hash in tracker.verified.shares:
360                     ss.add_verified_hash(share.hash)
361         task.LoopingCall(save_shares).start(60)
362         
363         print '    ...success!'
364         print
365         
366         @defer.inlineCallbacks
367         def upnp_thread():
368             while True:
369                 try:
370                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
371                     if is_lan:
372                         pm = yield portmapper.get_port_mapper()
373                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP') # XXX try to forward external correct port?
374                 except defer.TimeoutError:
375                     pass
376                 except:
377                     if p2pool.DEBUG:
378                         log.err(None, "UPnP error:")
379                 yield deferral.sleep(random.expovariate(1/120))
380         
381         if args.upnp:
382             upnp_thread()
383         
384         # start listening for workers with a JSON-RPC server
385         
386         print 'Listening for workers on port %i...' % (args.worker_port,)
387         
388         # setup worker logic
389         
390         merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
391         
392         removed_unstales_var = variable.Variable((0, 0, 0))
393         @tracker.verified.removed.watch
394         def _(share):
395             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
396                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
397                 removed_unstales_var.set((
398                     removed_unstales_var.value[0] + 1,
399                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
400                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
401                 ))
402         
403         removed_doa_unstales_var = variable.Variable(0)
404         @tracker.verified.removed.watch
405         def _(share):
406             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
407                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
408         
409         stale_counter = skiplists.SumSkipList(tracker, lambda share: (
410             1 if share.hash in my_share_hashes else 0,
411             1 if share.hash in my_doa_share_hashes else 0,
412             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 253 else 0,
413             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 254 else 0,
414         ), (0, 0, 0, 0), math.add_tuples)
415         def get_stale_counts():
416             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
417             my_shares = len(my_share_hashes)
418             my_doa_shares = len(my_doa_share_hashes)
419             my_shares_in_chain, my_doa_shares_in_chain, orphans_recorded_in_chain, doas_recorded_in_chain = stale_counter(
420                 current_work.value['best_share_hash'],
421                 tracker.verified.get_height(current_work.value['best_share_hash']),
422             )
423             my_shares_in_chain += removed_unstales_var.value[0]
424             my_doa_shares_in_chain += removed_doa_unstales_var.value
425             orphans_recorded_in_chain += removed_unstales_var.value[1]
426             doas_recorded_in_chain += removed_unstales_var.value[2]
427             
428             my_shares_not_in_chain = my_shares - my_shares_in_chain
429             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
430             
431             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
432         
433         
434         def get_payout_script_from_username(user):
435             if user is None:
436                 return None
437             try:
438                 pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
439             except: # XXX blah
440                 return None
441             return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
442         
443         def precompute(request):
444             payout_script = get_payout_script_from_username(request.getUser())
445             if payout_script is None or random.uniform(0, 100) < args.worker_fee:
446                 payout_script = my_script
447             return payout_script,
448
449         def compute(payout_script):
450             if len(p2p_node.peers) == 0 and net.PERSIST:
451                 raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
452             if current_work.value['best_share_hash'] is None and net.PERSIST:
453                 raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
454             if time.time() > current_work2.value['last_update'] + 60:
455                 raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
456             
457             share_info, generate_tx = p2pool_data.generate_transaction(
458                 tracker=tracker,
459                 share_data=dict(
460                     previous_share_hash=current_work.value['best_share_hash'],
461                     coinbase='' if current_work.value['aux_work'] is None else
462                         '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
463                     nonce=struct.pack('<Q', random.randrange(2**64)),
464                     new_script=payout_script,
465                     subsidy=current_work2.value['subsidy'],
466                     donation=math.perfect_round(65535*args.donation_percentage/100),
467                     stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
468                         253 if orphans > orphans_recorded_in_chain else
469                         254 if doas > doas_recorded_in_chain else
470                         0
471                     )(*get_stale_counts()),
472                 ),
473                 block_target=current_work.value['bits'].target,
474                 desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
475                 net=net,
476             )
477             
478             print 'New work for worker! Difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
479                 bitcoin_data.target_to_difficulty(share_info['bits'].target),
480                 (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
481                 current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
482                 len(current_work2.value['transactions']),
483             )
484             
485             transactions = [generate_tx] + list(current_work2.value['transactions'])
486             merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
487             merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time()
488             
489             return bitcoin_getwork.BlockAttempt(
490                 version=current_work.value['version'],
491                 previous_block=current_work.value['previous_block'],
492                 merkle_root=merkle_root,
493                 timestamp=current_work2.value['time'],
494                 bits=current_work.value['bits'],
495                 share_target=share_info['bits'].target,
496             )
497         
498         my_share_hashes = set()
499         my_doa_share_hashes = set()
500         
501         def got_response(header, request):
502             try:
503                 # match up with transactions
504                 xxx = merkle_root_to_transactions.get(header['merkle_root'], None)
505                 if xxx is None:
506                     print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
507                     return False
508                 share_info, transactions, getwork_time = xxx
509                 
510                 hash_ = bitcoin_data.block_header_type.hash256(header)
511                 
512                 pow_hash = net.PARENT.POW_FUNC(header)
513                 
514                 if pow_hash <= header['bits'].target or p2pool.DEBUG:
515                     if factory.conn.value is not None:
516                         factory.conn.value.send_block(block=dict(header=header, txs=transactions))
517                     else:
518                         print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
519                     if pow_hash <= header['bits'].target:
520                         print
521                         print 'GOT BLOCK! Passing to bitcoind! bitcoin: %x' % (hash_,)
522                         print
523                 
524                 try:
525                     if current_work.value['aux_work'] is not None and (pow_hash <= current_work.value['aux_work']['target'] or p2pool.DEBUG):
526                         aux_pow = dict(
527                             merkle_tx=dict(
528                                 tx=transactions[0],
529                                 block_hash=hash_,
530                                 merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
531                                 index=0,
532                             ),
533                             merkle_branch=[],
534                             index=0,
535                             parent_block_header=header,
536                         )
537                         
538                         a, b = transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex'), bitcoin_data.aux_pow_type.pack(aux_pow).encode('hex')
539                         #print a, b
540                         merged = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,))
541                         def _(res):
542                             print "MERGED RESULT:", res
543                         merged.rpc_getauxblock(a, b).addBoth(_)
544                 except:
545                     log.err(None, 'Error while processing merged mining POW:')
546                 
547                 if pow_hash > share_info['bits'].target:
548                     print >>sys.stderr, 'Worker submitted share with hash > target:\nhash  : %x\ntarget: %x' % (pow_hash, share_info['bits'].target)
549                     return False
550                 share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
551                 my_share_hashes.add(share.hash)
552                 if share.previous_hash != current_work.value['best_share_hash']:
553                     my_doa_share_hashes.add(share.hash)
554                 print 'GOT SHARE! %s %s prev %s age %.2fs' % (request.getUser(), p2pool_data.format_hash(share.hash), p2pool_data.format_hash(share.previous_hash), time.time() - getwork_time) + (' DEAD ON ARRIVAL' if share.previous_hash != current_work.value['best_share_hash'] else '')
555                 good = share.previous_hash == current_work.value['best_share_hash']
556                 # maybe revert back to tracker being non-blocking so 'good' can be more accurate?
557                 p2p_shares([share])
558                 # eg. good = share.hash == current_work.value['best_share_hash'] here
559                 return good
560             except:
561                 log.err(None, 'Error processing data received from worker:')
562                 return False
563         
564         web_root = resource.Resource()
565         worker_interface.WorkerInterface(compute, got_response, current_work.changed, precompute).attach_to(web_root)
566         
567         def get_rate():
568             if tracker.get_height(current_work.value['best_share_hash']) < 720:
569                 return json.dumps(None)
570             return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
571                 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
572         
573         def get_users():
574             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
575             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
576             res = {}
577             for script in sorted(weights, key=lambda s: weights[s]):
578                 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
579             return json.dumps(res)
580         
581         def get_global_stats():
582             # averaged over last hour
583             lookbehind = 3600//net.SHARE_PERIOD
584             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
585                 return None
586             
587             nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
588             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
589             return json.dumps(dict(
590                 pool_nonstale_hash_rate=nonstale_hash_rate,
591                 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
592                 pool_stale_prop=stale_prop,
593             ))
594         
595         def get_local_stats():
596             lookbehind = 3600//net.SHARE_PERIOD
597             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
598                 return None
599             
600             global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
601             
602             my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
603             my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
604             my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
605             my_share_count = my_unstale_count + my_orphan_count + my_doa_count
606             my_stale_count = my_orphan_count + my_doa_count
607             
608             my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
609             
610             my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
611                 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
612                 if share.hash in my_share_hashes)
613             actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
614                 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
615             share_att_s = my_work / actual_time
616             
617             return json.dumps(dict(
618                 my_hash_rates_in_last_hour=dict(
619                     nonstale=share_att_s,
620                     rewarded=share_att_s/(1 - global_stale_prop),
621                     actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
622                 ),
623                 my_share_counts_in_last_hour=dict(
624                     shares=my_share_count,
625                     unstale_shares=my_unstale_count,
626                     stale_shares=my_stale_count,
627                     orphan_stale_shares=my_orphan_count,
628                     doa_stale_shares=my_doa_count,
629                 ),
630                 my_stale_proportions_in_last_hour=dict(
631                     stale=my_stale_prop,
632                     orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
633                     dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
634                 ),
635             ))
636         
637         class WebInterface(resource.Resource):
638             def __init__(self, func, mime_type):
639                 self.func, self.mime_type = func, mime_type
640             
641             def render_GET(self, request):
642                 request.setHeader('Content-Type', self.mime_type)
643                 return self.func()
644         
645         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
646         web_root.putChild('users', WebInterface(get_users, 'application/json'))
647         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
648         web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
649         web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
650         if draw is not None:
651             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
652         
653         reactor.listenTCP(args.worker_port, server.Site(web_root))
654         
655         print '    ...success!'
656         print
657         
658         
659         # call getmemorypool every 15 seconds to check that bitcoind is alive
660         task.LoopingCall(set_real_work1).start(15)
661         
662         
663         # done!
664         print 'Started successfully!'
665         print
666         
667         
668         if hasattr(signal, 'SIGALRM'):
669             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
670                 lambda: sys.stdout.write('Watchdog timer went off at:\n' + ''.join(traceback.format_stack()))
671             ))
672             signal.siginterrupt(signal.SIGALRM, False)
673             task.LoopingCall(signal.alarm, 30).start(1)
674         
675         @defer.inlineCallbacks
676         def status_thread():
677             last_str = None
678             last_time = 0
679             while True:
680                 yield deferral.sleep(3)
681                 try:
682                     if time.time() > current_work2.value['last_update'] + 60:
683                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
684                     if current_work.value['best_share_hash'] is not None:
685                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
686                         if height > 2:
687                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
688                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
689                             (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
690                             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
691                             real_att_s = att_s / (1 - stale_prop)
692                             my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
693                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i' % (
694                                 math.format(int(real_att_s)),
695                                 height,
696                                 len(tracker.verified.shares),
697                                 len(tracker.shares),
698                                 weights.get(my_script, 0)/total_weight*100,
699                                 math.format(int(my_att_s)),
700                                 shares,
701                                 stale_orphan_shares,
702                                 stale_doa_shares,
703                                 len(p2p_node.peers),
704                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
705                             this_str += '\nAverage time between blocks: %.2f days' % (
706                                 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
707                             )
708                             this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
709                             conf = 0.95
710                             if shares:
711                                 stale_shares = stale_orphan_shares + stale_doa_shares
712                                 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
713                                 if stale_prop < .99:
714                                     this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - stale_prop) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
715                             if this_str != last_str or time.time() > last_time + 15:
716                                 print this_str
717                                 last_str = this_str
718                                 last_time = time.time()
719                 except:
720                     log.err()
721         status_thread()
722     except:
723         log.err(None, 'Fatal error:')
724
725 def run():
726     class FixedArgumentParser(argparse.ArgumentParser):
727         def _read_args_from_files(self, arg_strings):
728             # expand arguments referencing files
729             new_arg_strings = []
730             for arg_string in arg_strings:
731                 
732                 # for regular arguments, just add them back into the list
733                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
734                     new_arg_strings.append(arg_string)
735                 
736                 # replace arguments referencing files with the file content
737                 else:
738                     try:
739                         args_file = open(arg_string[1:])
740                         try:
741                             arg_strings = []
742                             for arg_line in args_file.read().splitlines():
743                                 for arg in self.convert_arg_line_to_args(arg_line):
744                                     arg_strings.append(arg)
745                             arg_strings = self._read_args_from_files(arg_strings)
746                             new_arg_strings.extend(arg_strings)
747                         finally:
748                             args_file.close()
749                     except IOError:
750                         err = sys.exc_info()[1]
751                         self.error(str(err))
752             
753             # return the modified argument list
754             return new_arg_strings
755         
756         def convert_arg_line_to_args(self, arg_line):
757             return [arg for arg in arg_line.split() if arg.strip()]
758     
759     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
760     parser.add_argument('--version', action='version', version=p2pool.__version__)
761     parser.add_argument('--net',
762         help='use specified network (default: bitcoin)',
763         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
764     parser.add_argument('--testnet',
765         help='''use the network's testnet''',
766         action='store_const', const=True, default=False, dest='testnet')
767     parser.add_argument('--debug',
768         help='enable debugging mode',
769         action='store_const', const=True, default=False, dest='debug')
770     parser.add_argument('-a', '--address',
771         help='generate payouts to this address (default: <address requested from bitcoind>)',
772         type=str, action='store', default=None, dest='address')
773     parser.add_argument('--logfile',
774         help='''log to this file (default: data/<NET>/log)''',
775         type=str, action='store', default=None, dest='logfile')
776     parser.add_argument('--merged-url',
777         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
778         type=str, action='store', default=None, dest='merged_url')
779     parser.add_argument('--merged-userpass',
780         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
781         type=str, action='store', default=None, dest='merged_userpass')
782     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
783         help='donate this percentage of work to author of p2pool (default: 0.5)',
784         type=float, action='store', default=0.5, dest='donation_percentage')
785     
786     p2pool_group = parser.add_argument_group('p2pool interface')
787     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
788         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
789         type=int, action='store', default=None, dest='p2pool_port')
790     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
791         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
792         type=str, action='append', default=[], dest='p2pool_nodes')
793     parser.add_argument('--disable-upnp',
794         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
795         action='store_false', default=True, dest='upnp')
796     
797     worker_group = parser.add_argument_group('worker interface')
798     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
799         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
800         type=int, action='store', default=None, dest='worker_port')
801     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
802         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
803         type=float, action='store', default=0, dest='worker_fee')
804     
805     bitcoind_group = parser.add_argument_group('bitcoind interface')
806     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
807         help='connect to this address (default: 127.0.0.1)',
808         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
809     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
810         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
811         type=int, action='store', default=None, dest='bitcoind_rpc_port')
812     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
813         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
814         type=int, action='store', default=None, dest='bitcoind_p2p_port')
815     
816     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
817         help='bitcoind RPC interface username (default: <empty>)',
818         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
819     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
820         help='bitcoind RPC interface password',
821         type=str, action='store', dest='bitcoind_rpc_password')
822     
823     args = parser.parse_args()
824     
825     if args.debug:
826         p2pool.DEBUG = True
827     
828     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
829     
830     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
831     if not os.path.exists(datadir_path):
832         os.makedirs(datadir_path)
833     
834     if args.logfile is None:
835         args.logfile = os.path.join(datadir_path, 'log')
836     
837     class EncodeReplacerPipe(object):
838         def __init__(self, inner_file):
839             self.inner_file = inner_file
840             self.softspace = 0
841         def write(self, data):
842             if isinstance(data, unicode):
843                 try:
844                     data = data.encode(self.inner_file.encoding, 'replace')
845                 except:
846                     data = data.encode('ascii', 'replace')
847             self.inner_file.write(data)
848         def flush(self):
849             self.inner_file.flush()
850     class LogFile(object):
851         def __init__(self, filename):
852             self.filename = filename
853             self.inner_file = None
854             self.reopen()
855         def reopen(self):
856             if self.inner_file is not None:
857                 self.inner_file.close()
858             open(self.filename, 'a').close()
859             f = open(self.filename, 'rb')
860             f.seek(0, os.SEEK_END)
861             length = f.tell()
862             if length > 100*1000*1000:
863                 f.seek(-1000*1000, os.SEEK_END)
864                 while True:
865                     if f.read(1) in ('', '\n'):
866                         break
867                 data = f.read()
868                 f.close()
869                 f = open(self.filename, 'wb')
870                 f.write(data)
871             f.close()
872             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
873         def write(self, data):
874             self.inner_file.write(data)
875         def flush(self):
876             self.inner_file.flush()
877     class TeePipe(object):
878         def __init__(self, outputs):
879             self.outputs = outputs
880         def write(self, data):
881             for output in self.outputs:
882                 output.write(data)
883         def flush(self):
884             for output in self.outputs:
885                 output.flush()
886     class TimestampingPipe(object):
887         def __init__(self, inner_file):
888             self.inner_file = inner_file
889             self.buf = ''
890             self.softspace = 0
891         def write(self, data):
892             buf = self.buf + data
893             lines = buf.split('\n')
894             for line in lines[:-1]:
895                 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
896                 self.inner_file.flush()
897             self.buf = lines[-1]
898         def flush(self):
899             pass
900     class AbortPipe(object):
901         def __init__(self, inner_file):
902             self.inner_file = inner_file
903             self.softspace = 0
904         def write(self, data):
905             try:
906                 self.inner_file.write(data)
907             except:
908                 sys.stdout = sys.__stdout__
909                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
910                 raise
911         def flush(self):
912             self.inner_file.flush()
913     class PrefixPipe(object):
914         def __init__(self, inner_file, prefix):
915             self.inner_file = inner_file
916             self.prefix = prefix
917             self.buf = ''
918             self.softspace = 0
919         def write(self, data):
920             buf = self.buf + data
921             lines = buf.split('\n')
922             for line in lines[:-1]:
923                 self.inner_file.write(self.prefix + line + '\n')
924                 self.inner_file.flush()
925             self.buf = lines[-1]
926         def flush(self):
927             pass
928     logfile = LogFile(args.logfile)
929     pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
930     sys.stdout = AbortPipe(pipe)
931     sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
932     if hasattr(signal, "SIGUSR1"):
933         def sigusr1(signum, frame):
934             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
935             logfile.reopen()
936             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
937         signal.signal(signal.SIGUSR1, sigusr1)
938     task.LoopingCall(logfile.reopen).start(5)
939     
940     if args.bitcoind_rpc_port is None:
941         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
942     
943     if args.bitcoind_p2p_port is None:
944         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
945     
946     if args.p2pool_port is None:
947         args.p2pool_port = net.P2P_PORT
948     
949     if args.worker_port is None:
950         args.worker_port = net.WORKER_PORT
951     
952     if args.address is not None:
953         try:
954             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
955         except Exception, e:
956             parser.error('error parsing address: ' + repr(e))
957     else:
958         args.pubkey_hash = None
959     
960     if (args.merged_url is None) ^ (args.merged_userpass is None):
961         parser.error('must specify --merged-url and --merged-userpass')
962     
963     reactor.callWhenRunning(main, args, net, datadir_path)
964     reactor.run()