allocate a connection slot for each node added with -n, instead of merely adding...
[p2pool.git] / p2pool / main.py
1 #!/usr/bin/python
2 # coding=utf-8
3
4 from __future__ import division
5
6 import argparse
7 import codecs
8 import datetime
9 import os
10 import random
11 import struct
12 import sys
13 import time
14 import json
15 import signal
16 import traceback
17
18 from twisted.internet import defer, reactor, task
19 from twisted.web import server, resource
20 from twisted.python import log
21 from nattraverso import portmapper, ipdiscover
22
23 import bitcoin.p2p as bitcoin_p2p, bitcoin.getwork as bitcoin_getwork, bitcoin.data as bitcoin_data
24 from bitcoin import worker_interface
25 from util import expiring_dict, jsonrpc, variable, deferral, math
26 from . import p2p, skiplists, networks, graphs
27 import p2pool, p2pool.data as p2pool_data
28
29 @deferral.retry('Error getting work from bitcoind:', 3)
30 @defer.inlineCallbacks
31 def getwork(bitcoind):
32     work = yield bitcoind.rpc_getmemorypool()
33     defer.returnValue(dict(
34         version=work['version'],
35         previous_block_hash=int(work['previousblockhash'], 16),
36         transactions=[bitcoin_data.tx_type.unpack(x.decode('hex')) for x in work['transactions']],
37         subsidy=work['coinbasevalue'],
38         time=work['time'],
39         bits=bitcoin_data.FloatingIntegerType().unpack(work['bits'].decode('hex')[::-1]) if isinstance(work['bits'], (str, unicode)) else bitcoin_data.FloatingInteger(work['bits']),
40     ))
41
42 @defer.inlineCallbacks
43 def main(args, net, datadir_path):
44     try:
45         print 'p2pool (version %s)' % (p2pool.__version__,)
46         print
47         try:
48             from . import draw
49         except ImportError:
50             draw = None
51             print "Install Pygame and PIL to enable visualizations! Visualizations disabled."
52             print
53         
54         # connect to bitcoind over JSON-RPC and do initial getmemorypool
55         url = 'http://%s:%i/' % (args.bitcoind_address, args.bitcoind_rpc_port)
56         print '''Testing bitcoind RPC connection to '%s' with username '%s'...''' % (url, args.bitcoind_rpc_username)
57         bitcoind = jsonrpc.Proxy(url, (args.bitcoind_rpc_username, args.bitcoind_rpc_password))
58         good = yield deferral.retry('Error while checking bitcoind identity:', 1)(net.PARENT.RPC_CHECK)(bitcoind)
59         if not good:
60             print >>sys.stderr, "    Check failed! Make sure that you're connected to the right bitcoind with --bitcoind-rpc-port!"
61             return
62         temp_work = yield getwork(bitcoind)
63         print '    ...success!'
64         print '    Current block hash: %x' % (temp_work['previous_block_hash'],)
65         print
66         
67         # connect to bitcoind over bitcoin-p2p
68         print '''Testing bitcoind P2P connection to '%s:%s'...''' % (args.bitcoind_address, args.bitcoind_p2p_port)
69         factory = bitcoin_p2p.ClientFactory(net.PARENT)
70         reactor.connectTCP(args.bitcoind_address, args.bitcoind_p2p_port, factory)
71         yield factory.getProtocol() # waits until handshake is successful
72         print '    ...success!'
73         print
74         
75         if args.pubkey_hash is None:
76             print 'Getting payout address from bitcoind...'
77             my_script = yield deferral.retry('Error getting payout address from bitcoind:', 5)(defer.inlineCallbacks(lambda: defer.returnValue(
78                 bitcoin_data.pubkey_hash_to_script2(bitcoin_data.address_to_pubkey_hash((yield bitcoind.rpc_getaccountaddress('p2pool')), net.PARENT)))
79             ))()
80         else:
81             print 'Computing payout script from provided address....'
82             my_script = bitcoin_data.pubkey_hash_to_script2(args.pubkey_hash)
83         print '    ...success!'
84         print '    Payout script:', bitcoin_data.script2_to_human(my_script, net.PARENT)
85         print
86         
87         ht = bitcoin_p2p.HeightTracker(bitcoind, factory)
88         
89         tracker = p2pool_data.OkayTracker(net)
90         shared_share_hashes = set()
91         ss = p2pool_data.ShareStore(os.path.join(datadir_path, 'shares.'), net)
92         known_verified = set()
93         recent_blocks = []
94         print "Loading shares..."
95         for i, (mode, contents) in enumerate(ss.get_shares()):
96             if mode == 'share':
97                 if contents.hash in tracker.shares:
98                     continue
99                 shared_share_hashes.add(contents.hash)
100                 contents.time_seen = 0
101                 tracker.add(contents)
102                 if len(tracker.shares) % 1000 == 0 and tracker.shares:
103                     print "    %i" % (len(tracker.shares),)
104             elif mode == 'verified_hash':
105                 known_verified.add(contents)
106             else:
107                 raise AssertionError()
108         print "    ...inserting %i verified shares..." % (len(known_verified),)
109         for h in known_verified:
110             if h not in tracker.shares:
111                 ss.forget_verified_share(h)
112                 continue
113             tracker.verified.add(tracker.shares[h])
114         print "    ...done loading %i shares!" % (len(tracker.shares),)
115         print
116         tracker.removed.watch(lambda share: ss.forget_share(share.hash))
117         tracker.verified.removed.watch(lambda share: ss.forget_verified_share(share.hash))
118         tracker.removed.watch(lambda share: shared_share_hashes.discard(share.hash))
119         
120         peer_heads = expiring_dict.ExpiringDict(300) # hash -> peers that know of it
121         
122         pre_current_work = variable.Variable(None)
123         pre_merged_work = variable.Variable(None)
124         # information affecting work that should trigger a long-polling update
125         current_work = variable.Variable(None)
126         # information affecting work that should not trigger a long-polling update
127         current_work2 = variable.Variable(None)
128         
129         requested = expiring_dict.ExpiringDict(300)
130         
131         @defer.inlineCallbacks
132         def set_real_work1():
133             work = yield getwork(bitcoind)
134             current_work2.set(dict(
135                 time=work['time'],
136                 transactions=work['transactions'],
137                 subsidy=work['subsidy'],
138                 clock_offset=time.time() - work['time'],
139                 last_update=time.time(),
140             )) # second set first because everything hooks on the first
141             pre_current_work.set(dict(
142                 version=work['version'],
143                 previous_block=work['previous_block_hash'],
144                 bits=work['bits'],
145             ))
146         
147         def set_real_work2():
148             best, desired = tracker.think(ht, pre_current_work.value['previous_block'])
149             
150             t = dict(pre_current_work.value)
151             t['best_share_hash'] = best
152             t['aux_work'] = pre_merged_work.value
153             current_work.set(t)
154             
155             t = time.time()
156             for peer2, share_hash in desired:
157                 if share_hash not in tracker.tails: # was received in the time tracker.think was running
158                     continue
159                 last_request_time, count = requested.get(share_hash, (None, 0))
160                 if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
161                     continue
162                 potential_peers = set()
163                 for head in tracker.tails[share_hash]:
164                     potential_peers.update(peer_heads.get(head, set()))
165                 potential_peers = [peer for peer in potential_peers if peer.connected2]
166                 if count == 0 and peer2 is not None and peer2.connected2:
167                     peer = peer2
168                 else:
169                     peer = random.choice(potential_peers) if potential_peers and random.random() > .2 else peer2
170                     if peer is None:
171                         continue
172                 
173                 print 'Requesting parent share %s from %s' % (p2pool_data.format_hash(share_hash), '%s:%i' % peer.addr)
174                 peer.send_getshares(
175                     hashes=[share_hash],
176                     parents=2000,
177                     stops=list(set(tracker.heads) | set(
178                         tracker.get_nth_parent_hash(head, min(max(0, tracker.get_height_and_last(head)[0] - 1), 10)) for head in tracker.heads
179                     ))[:100],
180                 )
181                 requested[share_hash] = t, count + 1
182         pre_current_work.changed.watch(lambda _: set_real_work2())
183         
184         print 'Initializing work...'
185         yield set_real_work1()
186         print '    ...success!'
187         print
188         
189         pre_merged_work.changed.watch(lambda _: set_real_work2())
190         ht.updated.watch(set_real_work2)
191         
192         merged_proxy = jsonrpc.Proxy(args.merged_url, (args.merged_userpass,)) if args.merged_url else None
193         
194         @defer.inlineCallbacks
195         def set_merged_work():
196             while True:
197                 auxblock = yield deferral.retry('Error while calling merged getauxblock:', 1)(merged_proxy.rpc_getauxblock)()
198                 pre_merged_work.set(dict(
199                     hash=int(auxblock['hash'], 16),
200                     target=bitcoin_data.HashType().unpack(auxblock['target'].decode('hex')),
201                     chain_id=auxblock['chainid'],
202                 ))
203                 yield deferral.sleep(1)
204         if merged_proxy is not None:
205             set_merged_work()
206         
207         @pre_merged_work.changed.watch
208         def _(new_merged_work):
209             print "Got new merged mining work! Difficulty: %f" % (bitcoin_data.target_to_difficulty(new_merged_work['target']),)
210         
211         start_time = time.time() - current_work2.value['clock_offset']
212         
213         # setup p2p logic and join p2pool network
214         
215         class Node(p2p.Node):
216             def handle_shares(self, shares, peer):
217                 if len(shares) > 5:
218                     print 'Processing %i shares...' % (len(shares),)
219                 
220                 new_count = 0
221                 for share in shares:
222                     if share.hash in tracker.shares:
223                         #print 'Got duplicate share, ignoring. Hash: %s' % (p2pool_data.format_hash(share.hash),)
224                         continue
225                     
226                     new_count += 1
227                     
228                     #print 'Received share %s from %r' % (p2pool_data.format_hash(share.hash), share.peer.addr if share.peer is not None else None)
229                     
230                     tracker.add(share)
231                 
232                 if shares and peer is not None:
233                     peer_heads.setdefault(shares[0].hash, set()).add(peer)
234                 
235                 if new_count:
236                     set_real_work2()
237                 
238                 if len(shares) > 5:
239                     print '... done processing %i shares. New: %i Have: %i/~%i' % (len(shares), new_count, len(tracker.shares), 2*net.CHAIN_LENGTH)
240             
241             def handle_share_hashes(self, hashes, peer):
242                 t = time.time()
243                 get_hashes = []
244                 for share_hash in hashes:
245                     if share_hash in tracker.shares:
246                         continue
247                     last_request_time, count = requested.get(share_hash, (None, 0))
248                     if last_request_time is not None and last_request_time - 5 < t < last_request_time + 10 * 1.5**count:
249                         continue
250                     print 'Got share hash, requesting! Hash: %s' % (p2pool_data.format_hash(share_hash),)
251                     get_hashes.append(share_hash)
252                     requested[share_hash] = t, count + 1
253                 
254                 if hashes and peer is not None:
255                     peer_heads.setdefault(hashes[0], set()).add(peer)
256                 if get_hashes:
257                     peer.send_getshares(hashes=get_hashes, parents=0, stops=[])
258             
259             def handle_get_shares(self, hashes, parents, stops, peer):
260                 parents = min(parents, 1000//len(hashes))
261                 stops = set(stops)
262                 shares = []
263                 for share_hash in hashes:
264                     for share in tracker.get_chain(share_hash, min(parents + 1, tracker.get_height(share_hash))):
265                         if share.hash in stops:
266                             break
267                         shares.append(share)
268                 print 'Sending %i shares to %s:%i' % (len(shares), peer.addr[0], peer.addr[1])
269                 peer.sendShares(shares)
270         
271         @tracker.verified.added.watch
272         def _(share):
273             if share.pow_hash <= share.header['bits'].target:
274                 if factory.conn.value is not None:
275                     factory.conn.value.send_block(block=share.as_block(tracker))
276                 else:
277                     print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
278                 print
279                 print 'GOT BLOCK FROM PEER! Passing to bitcoind! %s bitcoin: %x' % (p2pool_data.format_hash(share.hash), share.header_hash)
280                 print
281                 recent_blocks.append({ 'ts': share.timestamp, 'hash': '%x' % (share.header_hash) })
282         
283         print 'Joining p2pool network using port %i...' % (args.p2pool_port,)
284         
285         def parse(x):
286             if ':' in x:
287                 ip, port = x.split(':')
288                 return ip, int(port)
289             else:
290                 return x, net.P2P_PORT
291         
292         addrs = {}
293         if os.path.exists(os.path.join(datadir_path, 'addrs.txt')):
294             try:
295                 addrs.update(dict(eval(x) for x in open(os.path.join(datadir_path, 'addrs.txt'))))
296             except:
297                 print >>sys.stderr, "error reading addrs"
298         for addr in map(parse, net.BOOTSTRAP_ADDRS):
299             if addr not in addrs:
300                 addrs[addr] = (0, time.time(), time.time())
301         
302         p2p_node = Node(
303             best_share_hash_func=lambda: current_work.value['best_share_hash'],
304             port=args.p2pool_port,
305             net=net,
306             addr_store=addrs,
307             connect_addrs=set(map(parse, args.p2pool_nodes)),
308         )
309         p2p_node.start()
310         
311         def save_addrs():
312             open(os.path.join(datadir_path, 'addrs.txt'), 'w').writelines(repr(x) + '\n' for x in p2p_node.addr_store.iteritems())
313         task.LoopingCall(save_addrs).start(60)
314         
315         # send share when the chain changes to their chain
316         def work_changed(new_work):
317             #print 'Work changed:', new_work
318             shares = []
319             for share in tracker.get_chain(new_work['best_share_hash'], tracker.get_height(new_work['best_share_hash'])):
320                 if share.hash in shared_share_hashes:
321                     break
322                 shared_share_hashes.add(share.hash)
323                 shares.append(share)
324             
325             for peer in p2p_node.peers.itervalues():
326                 peer.sendShares([share for share in shares if share.peer is not peer])
327         
328         current_work.changed.watch(work_changed)
329         
330         def save_shares():
331             for share in tracker.get_chain(current_work.value['best_share_hash'], min(tracker.get_height(current_work.value['best_share_hash']), 2*net.CHAIN_LENGTH)):
332                 ss.add_share(share)
333                 if share.hash in tracker.verified.shares:
334                     ss.add_verified_hash(share.hash)
335         task.LoopingCall(save_shares).start(60)
336         
337         print '    ...success!'
338         print
339         
340         @defer.inlineCallbacks
341         def upnp_thread():
342             while True:
343                 try:
344                     is_lan, lan_ip = yield ipdiscover.get_local_ip()
345                     if is_lan:
346                         pm = yield portmapper.get_port_mapper()
347                         yield pm._upnp.add_port_mapping(lan_ip, args.p2pool_port, args.p2pool_port, 'p2pool', 'TCP')
348                 except defer.TimeoutError:
349                     pass
350                 except:
351                     if p2pool.DEBUG:
352                         log.err(None, "UPnP error:")
353                 yield deferral.sleep(random.expovariate(1/120))
354         
355         if args.upnp:
356             upnp_thread()
357         
358         # start listening for workers with a JSON-RPC server
359         
360         print 'Listening for workers on port %i...' % (args.worker_port,)
361         
362         # setup worker logic
363         
364         removed_unstales_var = variable.Variable((0, 0, 0))
365         @tracker.verified.removed.watch
366         def _(share):
367             if share.hash in my_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
368                 assert share.share_data['stale_info'] in [0, 253, 254] # we made these shares in this instance
369                 removed_unstales_var.set((
370                     removed_unstales_var.value[0] + 1,
371                     removed_unstales_var.value[1] + (1 if share.share_data['stale_info'] == 253 else 0),
372                     removed_unstales_var.value[2] + (1 if share.share_data['stale_info'] == 254 else 0),
373                 ))
374         
375         removed_doa_unstales_var = variable.Variable(0)
376         @tracker.verified.removed.watch
377         def _(share):
378             if share.hash in my_doa_share_hashes and tracker.is_child_of(share.hash, current_work.value['best_share_hash']):
379                 removed_doa_unstales.set(removed_doa_unstales.value + 1)
380         
381         stale_counter = skiplists.SumSkipList(tracker, lambda share: (
382             1 if share.hash in my_share_hashes else 0,
383             1 if share.hash in my_doa_share_hashes else 0,
384             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 253 else 0,
385             1 if share.hash in my_share_hashes and share.share_data['stale_info'] == 254 else 0,
386         ), (0, 0, 0, 0), math.add_tuples)
387         def get_stale_counts():
388             '''Returns (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain)'''
389             my_shares = len(my_share_hashes)
390             my_doa_shares = len(my_doa_share_hashes)
391             my_shares_in_chain, my_doa_shares_in_chain, orphans_recorded_in_chain, doas_recorded_in_chain = stale_counter(
392                 current_work.value['best_share_hash'],
393                 tracker.verified.get_height(current_work.value['best_share_hash']),
394             )
395             my_shares_in_chain += removed_unstales_var.value[0]
396             my_doa_shares_in_chain += removed_doa_unstales_var.value
397             orphans_recorded_in_chain += removed_unstales_var.value[1]
398             doas_recorded_in_chain += removed_unstales_var.value[2]
399             
400             my_shares_not_in_chain = my_shares - my_shares_in_chain
401             my_doa_shares_not_in_chain = my_doa_shares - my_doa_shares_in_chain
402             
403             return (my_shares_not_in_chain - my_doa_shares_not_in_chain, my_doa_shares_not_in_chain), my_shares, (orphans_recorded_in_chain, doas_recorded_in_chain)
404         
405         my_share_hashes = set()
406         my_doa_share_hashes = set()
407         
408         class WorkerBridge(worker_interface.WorkerBridge):
409             def __init__(self):
410                 worker_interface.WorkerBridge.__init__(self)
411                 self.new_work_event = current_work.changed
412                 
413                 self.merkle_root_to_transactions = expiring_dict.ExpiringDict(300)
414             
415             def _get_payout_script_from_username(self, user):
416                 if user is None:
417                     return None
418                 try:
419                     pubkey_hash = bitcoin_data.address_to_pubkey_hash(user, net.PARENT)
420                 except: # XXX blah
421                     return None
422                 return bitcoin_data.pubkey_hash_to_script2(pubkey_hash)
423             
424             def preprocess_request(self, request):
425                 payout_script = self._get_payout_script_from_username(request.getUser())
426                 if payout_script is None or random.uniform(0, 100) < args.worker_fee:
427                     payout_script = my_script
428                 return payout_script,
429             
430             def get_work(self, payout_script):
431                 if len(p2p_node.peers) == 0 and net.PERSIST:
432                     raise jsonrpc.Error(-12345, u'p2pool is not connected to any peers')
433                 if current_work.value['best_share_hash'] is None and net.PERSIST:
434                     raise jsonrpc.Error(-12345, u'p2pool is downloading shares')
435                 if time.time() > current_work2.value['last_update'] + 60:
436                     raise jsonrpc.Error(-12345, u'lost contact with bitcoind')
437                 
438                 share_info, generate_tx = p2pool_data.generate_transaction(
439                     tracker=tracker,
440                     share_data=dict(
441                         previous_share_hash=current_work.value['best_share_hash'],
442                         coinbase='' if current_work.value['aux_work'] is None else
443                             '\xfa\xbemm' + bitcoin_data.HashType().pack(current_work.value['aux_work']['hash'])[::-1] + struct.pack('<ii', 1, 0),
444                         nonce=struct.pack('<Q', random.randrange(2**64)),
445                         new_script=payout_script,
446                         subsidy=current_work2.value['subsidy'],
447                         donation=math.perfect_round(65535*args.donation_percentage/100),
448                         stale_info=(lambda (orphans, doas), total, (orphans_recorded_in_chain, doas_recorded_in_chain):
449                             253 if orphans > orphans_recorded_in_chain else
450                             254 if doas > doas_recorded_in_chain else
451                             0
452                         )(*get_stale_counts()),
453                     ),
454                     block_target=current_work.value['bits'].target,
455                     desired_timestamp=int(time.time() - current_work2.value['clock_offset']),
456                     net=net,
457                 )
458                 
459                 print 'New work for worker! Share difficulty: %.06f Payout if block: %.6f %s Total block value: %.6f %s including %i transactions' % (
460                     bitcoin_data.target_to_difficulty(share_info['bits'].target),
461                     (sum(t['value'] for t in generate_tx['tx_outs'] if t['script'] == payout_script) - current_work2.value['subsidy']//200)*1e-8, net.PARENT.SYMBOL,
462                     current_work2.value['subsidy']*1e-8, net.PARENT.SYMBOL,
463                     len(current_work2.value['transactions']),
464                 )
465                 
466                 target = 2**256//2**32//8 - 1
467                 target = max(target, share_info['bits'].target)
468                 if current_work.value['aux_work']:
469                     target = max(target, current_work.value['aux_work']['target'])
470                 
471                 transactions = [generate_tx] + list(current_work2.value['transactions'])
472                 merkle_root = bitcoin_data.merkle_hash(map(bitcoin_data.tx_type.hash256, transactions))
473                 self.merkle_root_to_transactions[merkle_root] = share_info, transactions, time.time(), current_work.value['aux_work'], target
474                 
475                 return bitcoin_getwork.BlockAttempt(
476                     version=current_work.value['version'],
477                     previous_block=current_work.value['previous_block'],
478                     merkle_root=merkle_root,
479                     timestamp=current_work2.value['time'],
480                     bits=current_work.value['bits'],
481                     share_target=target,
482                 )
483             
484             def got_response(self, header, request):
485                 # match up with transactions
486                 if header['merkle_root'] not in self.merkle_root_to_transactions:
487                     print >>sys.stderr, '''Couldn't link returned work's merkle root with its transactions - should only happen if you recently restarted p2pool'''
488                     return False
489                 share_info, transactions, getwork_time, aux_work, target = self.merkle_root_to_transactions[header['merkle_root']]
490                 
491                 pow_hash = net.PARENT.POW_FUNC(header)
492                 on_time = current_work.value['best_share_hash'] == share_info['share_data']['previous_share_hash']
493                 
494                 try:
495                     if pow_hash <= header['bits'].target or p2pool.DEBUG:
496                         if factory.conn.value is not None:
497                             factory.conn.value.send_block(block=dict(header=header, txs=transactions))
498                         else:
499                             print >>sys.stderr, 'No bitcoind connection when block submittal attempted! Erp!'
500                         if pow_hash <= header['bits'].target:
501                             print
502                             print 'GOT BLOCK FROM MINER! Passing to bitcoind! bitcoin: %x' % (bitcoin_data.block_header_type.hash256(header),)
503                             print
504                             recent_blocks.append({ 'ts': time.time(), 'hash': '%x' % (bitcoin_data.block_header_type.hash256(header),) })
505                 except:
506                     log.err(None, 'Error while processing potential block:')
507                 
508                 try:
509                     if aux_work is not None and (pow_hash <= aux_work['target'] or p2pool.DEBUG):
510                         assert bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex') == transactions[0]['tx_ins'][0]['script'][-32-8:-8].encode('hex')
511                         df = deferral.retry('Error submitting merged block: (will retry)', 10, 10)(merged_proxy.rpc_getauxblock)(
512                             bitcoin_data.HashType().pack(aux_work['hash'])[::-1].encode('hex'),
513                             bitcoin_data.aux_pow_type.pack(dict(
514                                 merkle_tx=dict(
515                                     tx=transactions[0],
516                                     block_hash=bitcoin_data.block_header_type.hash256(header),
517                                     merkle_branch=bitcoin_data.calculate_merkle_branch(map(bitcoin_data.tx_type.hash256, transactions), 0),
518                                     index=0,
519                                 ),
520                                 merkle_branch=[],
521                                 index=0,
522                                 parent_block_header=header,
523                             )).encode('hex'),
524                         )
525                         @df.addCallback
526                         def _(result):
527                             if result != (pow_hash <= aux_work['target']):
528                                 print >>sys.stderr, 'Merged block submittal result: %s Expected: %s' % (result, pow_hash <= aux_work['target'])
529                             else:
530                                 print 'Merged block submittal result: %s' % (result,)
531                         @df.addErrback
532                         def _(err):
533                             log.err(err, 'Error submitting merged block:')
534                 except:
535                     log.err(None, 'Error while processing merged mining POW:')
536                 
537                 if pow_hash <= share_info['bits'].target:
538                     share = p2pool_data.Share(net, header, share_info, other_txs=transactions[1:])
539                     print 'GOT SHARE! %s %s prev %s age %.2fs%s' % (
540                         request.getUser(),
541                         p2pool_data.format_hash(share.hash),
542                         p2pool_data.format_hash(share.previous_hash),
543                         time.time() - getwork_time,
544                         ' DEAD ON ARRIVAL' if not on_time else '',
545                     )
546                     my_share_hashes.add(share.hash)
547                     if not on_time:
548                         my_doa_share_hashes.add(share.hash)
549                     p2p_node.handle_shares([share], None)
550                 
551                 if pow_hash <= target:
552                     reactor.callLater(1, grapher.add_localrate_point, bitcoin_data.target_to_average_attempts(target), not on_time)
553                 
554                 if pow_hash > target:
555                     print 'Worker submitted share with hash > target:'
556                     print '    Hash:   %56x' % (pow_hash,)
557                     print '    Target: %56x' % (target,)
558                 
559                 return on_time
560         
561         web_root = resource.Resource()
562         worker_interface.WorkerInterface(WorkerBridge()).attach_to(web_root)
563         
564         def get_rate():
565             if tracker.get_height(current_work.value['best_share_hash']) < 720:
566                 return json.dumps(None)
567             return json.dumps(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
568                 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
569         
570         def get_users():
571             height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
572             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
573             res = {}
574             for script in sorted(weights, key=lambda s: weights[s]):
575                 res[bitcoin_data.script2_to_human(script, net.PARENT)] = weights[script]/total_weight
576             return json.dumps(res)
577         
578         def get_current_txouts():
579             wb = WorkerBridge()
580             tmp_tag = str(random.randrange(2**64))
581             outputs = wb.merkle_root_to_transactions[wb.get_work(tmp_tag).merkle_root][1][0]['tx_outs']
582             total = sum(out['value'] for out in outputs)
583             total_without_tag = sum(out['value'] for out in outputs if out['script'] != tmp_tag)
584             total_diff = total - total_without_tag
585             return dict((out['script'], out['value'] + math.perfect_round(out['value']*total_diff/total)) for out in outputs if out['script'] != tmp_tag and out['value'])
586         
587         def get_current_scaled_txouts(scale, trunc=0):
588             txouts = get_current_txouts()
589             total = sum(txouts.itervalues())
590             results = dict((script, value*scale//total) for script, value in txouts.iteritems())
591             if trunc > 0:
592                 total_random = 0
593                 random_set = set()
594                 for s in sorted(results, key=results.__getitem__):
595                     total_random += results[s]
596                     random_set.add(s)
597                     if total_random >= trunc and results[s] >= trunc:
598                         break
599                 winner = math.weighted_choice((script, results[script]) for script in random_set)
600                 for script in random_set:
601                     del results[script]
602                 results[winner] = total_random
603             if sum(results.itervalues()) < int(scale):
604                 results[math.weighted_choice(results.iteritems())] += int(scale) - sum(results.itervalues())
605             return results
606         
607         def get_current_payouts():
608             return json.dumps(dict((bitcoin_data.script2_to_human(script, net.PARENT), value/1e8) for script, value in get_current_txouts().iteritems()))
609         
610         def get_patron_sendmany(this):
611             try:
612                 if '/' in this:
613                     this, trunc = this.split('/', 1)
614                 else:
615                     trunc = '0.01'
616                 return json.dumps(dict(
617                     (bitcoin_data.script2_to_address(script, net.PARENT), value/1e8)
618                     for script, value in get_current_scaled_txouts(scale=int(float(this)*1e8), trunc=int(float(trunc)*1e8)).iteritems()
619                     if bitcoin_data.script2_to_address(script, net.PARENT) is not None
620                 ))
621             except:
622                 return json.dumps(None)
623         
624         def get_global_stats():
625             # averaged over last hour
626             lookbehind = 3600//net.SHARE_PERIOD
627             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
628                 return None
629             
630             nonstale_hash_rate = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], lookbehind)
631             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
632             return json.dumps(dict(
633                 pool_nonstale_hash_rate=nonstale_hash_rate,
634                 pool_hash_rate=nonstale_hash_rate/(1 - stale_prop),
635                 pool_stale_prop=stale_prop,
636             ))
637         
638         def get_local_stats():
639             lookbehind = 3600//net.SHARE_PERIOD
640             if tracker.get_height(current_work.value['best_share_hash']) < lookbehind:
641                 return None
642             
643             global_stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], lookbehind)
644             
645             my_unstale_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes)
646             my_orphan_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 253)
647             my_doa_count = sum(1 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind) if share.hash in my_share_hashes and share.share_data['stale_info'] == 254)
648             my_share_count = my_unstale_count + my_orphan_count + my_doa_count
649             my_stale_count = my_orphan_count + my_doa_count
650             
651             my_stale_prop = my_stale_count/my_share_count if my_share_count != 0 else None
652             
653             my_work = sum(bitcoin_data.target_to_average_attempts(share.target)
654                 for share in tracker.get_chain(current_work.value['best_share_hash'], lookbehind - 1)
655                 if share.hash in my_share_hashes)
656             actual_time = (tracker.shares[current_work.value['best_share_hash']].timestamp -
657                 tracker.shares[tracker.get_nth_parent_hash(current_work.value['best_share_hash'], lookbehind - 1)].timestamp)
658             share_att_s = my_work / actual_time
659             
660             return json.dumps(dict(
661                 my_hash_rates_in_last_hour=dict(
662                     nonstale=share_att_s,
663                     rewarded=share_att_s/(1 - global_stale_prop),
664                     actual=share_att_s/(1 - my_stale_prop) if my_stale_prop is not None else 0, # 0 because we don't have any shares anyway
665                 ),
666                 my_share_counts_in_last_hour=dict(
667                     shares=my_share_count,
668                     unstale_shares=my_unstale_count,
669                     stale_shares=my_stale_count,
670                     orphan_stale_shares=my_orphan_count,
671                     doa_stale_shares=my_doa_count,
672                 ),
673                 my_stale_proportions_in_last_hour=dict(
674                     stale=my_stale_prop,
675                     orphan_stale=my_orphan_count/my_share_count if my_share_count != 0 else None,
676                     dead_stale=my_doa_count/my_share_count if my_share_count != 0 else None,
677                 ),
678             ))
679         
680         def get_peer_addresses():
681             return ' '.join(peer.transport.getPeer().host + (':' + str(peer.transport.getPeer().port) if peer.transport.getPeer().port != net.P2P_PORT else '') for peer in p2p_node.peers.itervalues())
682         
683         class WebInterface(resource.Resource):
684             def __init__(self, func, mime_type, *fields):
685                 self.func, self.mime_type, self.fields = func, mime_type, fields
686             
687             def render_GET(self, request):
688                 request.setHeader('Content-Type', self.mime_type)
689                 request.setHeader('Access-Control-Allow-Origin', '*')
690                 return self.func(*(request.args[field][0] for field in self.fields))
691         
692         web_root.putChild('rate', WebInterface(get_rate, 'application/json'))
693         web_root.putChild('users', WebInterface(get_users, 'application/json'))
694         web_root.putChild('fee', WebInterface(lambda: json.dumps(args.worker_fee), 'application/json'))
695         web_root.putChild('current_payouts', WebInterface(get_current_payouts, 'application/json'))
696         web_root.putChild('patron_sendmany', WebInterface(get_patron_sendmany, 'text/plain', 'total'))
697         web_root.putChild('global_stats', WebInterface(get_global_stats, 'application/json'))
698         web_root.putChild('local_stats', WebInterface(get_local_stats, 'application/json'))
699         web_root.putChild('peer_addresses', WebInterface(get_peer_addresses, 'text/plain'))
700         web_root.putChild('payout_addr', WebInterface(lambda: json.dumps(bitcoin_data.script2_to_human(my_script, net.PARENT)), 'application/json'))
701         web_root.putChild('recent_blocks', WebInterface(lambda: json.dumps(recent_blocks), 'application/json'))
702         if draw is not None:
703             web_root.putChild('chain_img', WebInterface(lambda: draw.get(tracker, current_work.value['best_share_hash']), 'image/png'))
704         
705         grapher = graphs.Grapher(os.path.join(datadir_path, 'rrd'))
706         web_root.putChild('graphs', grapher.get_resource())
707         def add_point():
708             if tracker.get_height(current_work.value['best_share_hash']) < 720:
709                 return
710             grapher.add_poolrate_point(p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], 720)
711                 / (1 - p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], 720)))
712         task.LoopingCall(add_point).start(100)
713         
714         reactor.listenTCP(args.worker_port, server.Site(web_root))
715         
716         print '    ...success!'
717         print
718         
719         
720         @defer.inlineCallbacks
721         def work_poller():
722             while True:
723                 flag = factory.new_block.get_deferred()
724                 try:
725                     yield set_real_work1()
726                 except:
727                     log.err()
728                 yield defer.DeferredList([flag, deferral.sleep(15)], fireOnOneCallback=True)
729         work_poller()
730         
731         
732         # done!
733         print 'Started successfully!'
734         print
735         
736         
737         if hasattr(signal, 'SIGALRM'):
738             signal.signal(signal.SIGALRM, lambda signum, frame: reactor.callFromThread(
739                 sys.stderr.write, 'Watchdog timer went off at:\n' + ''.join(traceback.format_stack())
740             ))
741             signal.siginterrupt(signal.SIGALRM, False)
742             task.LoopingCall(signal.alarm, 30).start(1)
743         
744         @defer.inlineCallbacks
745         def status_thread():
746             last_str = None
747             last_time = 0
748             while True:
749                 yield deferral.sleep(3)
750                 try:
751                     if time.time() > current_work2.value['last_update'] + 60:
752                         print >>sys.stderr, '''---> LOST CONTACT WITH BITCOIND for 60 seconds, check that it isn't frozen or dead <---'''
753                     if current_work.value['best_share_hash'] is not None:
754                         height, last = tracker.get_height_and_last(current_work.value['best_share_hash'])
755                         if height > 2:
756                             att_s = p2pool_data.get_pool_attempts_per_second(tracker, current_work.value['best_share_hash'], min(height - 1, 720))
757                             weights, total_weight, donation_weight = tracker.get_cumulative_weights(current_work.value['best_share_hash'], min(height, 720), 65535*2**256)
758                             (stale_orphan_shares, stale_doa_shares), shares, _ = get_stale_counts()
759                             stale_prop = p2pool_data.get_average_stale_prop(tracker, current_work.value['best_share_hash'], min(720, height))
760                             real_att_s = att_s / (1 - stale_prop)
761                             my_att_s = real_att_s*weights.get(my_script, 0)/total_weight
762                             this_str = 'Pool: %sH/s in %i shares (%i/%i verified) Recent: %.02f%% >%sH/s Shares: %i (%i orphan, %i dead) Peers: %i (%i incoming)' % (
763                                 math.format(int(real_att_s)),
764                                 height,
765                                 len(tracker.verified.shares),
766                                 len(tracker.shares),
767                                 weights.get(my_script, 0)/total_weight*100,
768                                 math.format(int(my_att_s)),
769                                 shares,
770                                 stale_orphan_shares,
771                                 stale_doa_shares,
772                                 len(p2p_node.peers),
773                                 sum(1 for peer in p2p_node.peers.itervalues() if peer.incoming),
774                             ) + (' FDs: %i R/%i W' % (len(reactor.getReaders()), len(reactor.getWriters())) if p2pool.DEBUG else '')
775                             this_str += '\nAverage time between blocks: %.2f days' % (
776                                 2**256 / current_work.value['bits'].target / real_att_s / (60 * 60 * 24),
777                             )
778                             this_str += '\nPool stales: %i%%' % (int(100*stale_prop+.5),)
779                             conf = 0.95
780                             if shares:
781                                 stale_shares = stale_orphan_shares + stale_doa_shares
782                                 this_str += u' Own: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius(math.binomial_conf_interval(stale_shares, shares, conf)))
783                                 if stale_prop < .99:
784                                     this_str += u' Own efficiency: %i±%i%%' % tuple(int(100*x+.5) for x in math.interval_to_center_radius((1 - y)/(1 - stale_prop) for y in math.binomial_conf_interval(stale_shares, shares, conf)[::-1]))
785                             if this_str != last_str or time.time() > last_time + 15:
786                                 print this_str
787                                 last_str = this_str
788                                 last_time = time.time()
789                 except:
790                     log.err()
791         status_thread()
792     except:
793         log.err(None, 'Fatal error:')
794
795 def run():
796     class FixedArgumentParser(argparse.ArgumentParser):
797         def _read_args_from_files(self, arg_strings):
798             # expand arguments referencing files
799             new_arg_strings = []
800             for arg_string in arg_strings:
801                 
802                 # for regular arguments, just add them back into the list
803                 if not arg_string or arg_string[0] not in self.fromfile_prefix_chars:
804                     new_arg_strings.append(arg_string)
805                 
806                 # replace arguments referencing files with the file content
807                 else:
808                     try:
809                         args_file = open(arg_string[1:])
810                         try:
811                             arg_strings = []
812                             for arg_line in args_file.read().splitlines():
813                                 for arg in self.convert_arg_line_to_args(arg_line):
814                                     arg_strings.append(arg)
815                             arg_strings = self._read_args_from_files(arg_strings)
816                             new_arg_strings.extend(arg_strings)
817                         finally:
818                             args_file.close()
819                     except IOError:
820                         err = sys.exc_info()[1]
821                         self.error(str(err))
822             
823             # return the modified argument list
824             return new_arg_strings
825         
826         def convert_arg_line_to_args(self, arg_line):
827             return [arg for arg in arg_line.split() if arg.strip()]
828     
829     parser = FixedArgumentParser(description='p2pool (version %s)' % (p2pool.__version__,), fromfile_prefix_chars='@')
830     parser.add_argument('--version', action='version', version=p2pool.__version__)
831     parser.add_argument('--net',
832         help='use specified network (default: bitcoin)',
833         action='store', choices=sorted(networks.realnets), default='bitcoin', dest='net_name')
834     parser.add_argument('--testnet',
835         help='''use the network's testnet''',
836         action='store_const', const=True, default=False, dest='testnet')
837     parser.add_argument('--debug',
838         help='enable debugging mode',
839         action='store_const', const=True, default=False, dest='debug')
840     parser.add_argument('-a', '--address',
841         help='generate payouts to this address (default: <address requested from bitcoind>)',
842         type=str, action='store', default=None, dest='address')
843     parser.add_argument('--logfile',
844         help='''log to this file (default: data/<NET>/log)''',
845         type=str, action='store', default=None, dest='logfile')
846     parser.add_argument('--merged-url',
847         help='call getauxblock on this url to get work for merged mining (example: http://127.0.0.1:10332/)',
848         type=str, action='store', default=None, dest='merged_url')
849     parser.add_argument('--merged-userpass',
850         help='use this user and password when requesting merged mining work (example: ncuser:ncpass)',
851         type=str, action='store', default=None, dest='merged_userpass')
852     parser.add_argument('--give-author', metavar='DONATION_PERCENTAGE',
853         help='donate this percentage of work to author of p2pool (default: 0.5)',
854         type=float, action='store', default=0.5, dest='donation_percentage')
855     
856     p2pool_group = parser.add_argument_group('p2pool interface')
857     p2pool_group.add_argument('--p2pool-port', metavar='PORT',
858         help='use port PORT to listen for connections (forward this port from your router!) (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.P2P_PORT) for _, n in sorted(networks.realnets.items())),
859         type=int, action='store', default=None, dest='p2pool_port')
860     p2pool_group.add_argument('-n', '--p2pool-node', metavar='ADDR[:PORT]',
861         help='connect to existing p2pool node at ADDR listening on port PORT (defaults to default p2pool P2P port) in addition to builtin addresses',
862         type=str, action='append', default=[], dest='p2pool_nodes')
863     parser.add_argument('--disable-upnp',
864         help='''don't attempt to use UPnP to forward p2pool's P2P port from the Internet to this computer''',
865         action='store_false', default=True, dest='upnp')
866     
867     worker_group = parser.add_argument_group('worker interface')
868     worker_group.add_argument('-w', '--worker-port', metavar='PORT',
869         help='listen on PORT for RPC connections from miners (default: %s)' % ', '.join('%s:%i' % (n.NAME, n.WORKER_PORT) for _, n in sorted(networks.realnets.items())),
870         type=int, action='store', default=None, dest='worker_port')
871     worker_group.add_argument('-f', '--fee', metavar='FEE_PERCENTAGE',
872         help='''charge workers mining to their own bitcoin address (by setting their miner's username to a bitcoin address) this percentage fee to mine on your p2pool instance. Amount displayed at http://127.0.0.1:WORKER_PORT/fee (default: 0)''',
873         type=float, action='store', default=0, dest='worker_fee')
874     
875     bitcoind_group = parser.add_argument_group('bitcoind interface')
876     bitcoind_group.add_argument('--bitcoind-address', metavar='BITCOIND_ADDRESS',
877         help='connect to this address (default: 127.0.0.1)',
878         type=str, action='store', default='127.0.0.1', dest='bitcoind_address')
879     bitcoind_group.add_argument('--bitcoind-rpc-port', metavar='BITCOIND_RPC_PORT',
880         help='''connect to JSON-RPC interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.RPC_PORT) for _, n in sorted(networks.realnets.items())),
881         type=int, action='store', default=None, dest='bitcoind_rpc_port')
882     bitcoind_group.add_argument('--bitcoind-p2p-port', metavar='BITCOIND_P2P_PORT',
883         help='''connect to P2P interface at this port (default: %s)''' % ', '.join('%s:%i' % (n.NAME, n.PARENT.P2P_PORT) for _, n in sorted(networks.realnets.items())),
884         type=int, action='store', default=None, dest='bitcoind_p2p_port')
885     
886     bitcoind_group.add_argument(metavar='BITCOIND_RPCUSER',
887         help='bitcoind RPC interface username (default: <empty>)',
888         type=str, action='store', default='', nargs='?', dest='bitcoind_rpc_username')
889     bitcoind_group.add_argument(metavar='BITCOIND_RPCPASSWORD',
890         help='bitcoind RPC interface password',
891         type=str, action='store', dest='bitcoind_rpc_password')
892     
893     args = parser.parse_args()
894     
895     if args.debug:
896         p2pool.DEBUG = True
897     
898     net = networks.nets[args.net_name + ('_testnet' if args.testnet else '')]
899     
900     datadir_path = os.path.join(os.path.dirname(sys.argv[0]), 'data', net.NAME)
901     if not os.path.exists(datadir_path):
902         os.makedirs(datadir_path)
903     
904     if args.logfile is None:
905         args.logfile = os.path.join(datadir_path, 'log')
906     
907     class EncodeReplacerPipe(object):
908         def __init__(self, inner_file):
909             self.inner_file = inner_file
910             self.softspace = 0
911         def write(self, data):
912             if isinstance(data, unicode):
913                 try:
914                     data = data.encode(self.inner_file.encoding, 'replace')
915                 except:
916                     data = data.encode('ascii', 'replace')
917             self.inner_file.write(data)
918         def flush(self):
919             self.inner_file.flush()
920     class LogFile(object):
921         def __init__(self, filename):
922             self.filename = filename
923             self.inner_file = None
924             self.reopen()
925         def reopen(self):
926             if self.inner_file is not None:
927                 self.inner_file.close()
928             open(self.filename, 'a').close()
929             f = open(self.filename, 'rb')
930             f.seek(0, os.SEEK_END)
931             length = f.tell()
932             if length > 100*1000*1000:
933                 f.seek(-1000*1000, os.SEEK_END)
934                 while True:
935                     if f.read(1) in ('', '\n'):
936                         break
937                 data = f.read()
938                 f.close()
939                 f = open(self.filename, 'wb')
940                 f.write(data)
941             f.close()
942             self.inner_file = codecs.open(self.filename, 'a', 'utf-8')
943         def write(self, data):
944             self.inner_file.write(data)
945         def flush(self):
946             self.inner_file.flush()
947     class TeePipe(object):
948         def __init__(self, outputs):
949             self.outputs = outputs
950         def write(self, data):
951             for output in self.outputs:
952                 output.write(data)
953         def flush(self):
954             for output in self.outputs:
955                 output.flush()
956     class TimestampingPipe(object):
957         def __init__(self, inner_file):
958             self.inner_file = inner_file
959             self.buf = ''
960             self.softspace = 0
961         def write(self, data):
962             buf = self.buf + data
963             lines = buf.split('\n')
964             for line in lines[:-1]:
965                 self.inner_file.write('%s %s\n' % (datetime.datetime.now(), line))
966                 self.inner_file.flush()
967             self.buf = lines[-1]
968         def flush(self):
969             pass
970     class AbortPipe(object):
971         def __init__(self, inner_file):
972             self.inner_file = inner_file
973             self.softspace = 0
974         def write(self, data):
975             try:
976                 self.inner_file.write(data)
977             except:
978                 sys.stdout = sys.__stdout__
979                 log.DefaultObserver.stderr = sys.stderr = sys.__stderr__
980                 raise
981         def flush(self):
982             self.inner_file.flush()
983     class PrefixPipe(object):
984         def __init__(self, inner_file, prefix):
985             self.inner_file = inner_file
986             self.prefix = prefix
987             self.buf = ''
988             self.softspace = 0
989         def write(self, data):
990             buf = self.buf + data
991             lines = buf.split('\n')
992             for line in lines[:-1]:
993                 self.inner_file.write(self.prefix + line + '\n')
994                 self.inner_file.flush()
995             self.buf = lines[-1]
996         def flush(self):
997             pass
998     logfile = LogFile(args.logfile)
999     pipe = TimestampingPipe(TeePipe([EncodeReplacerPipe(sys.stderr), logfile]))
1000     sys.stdout = AbortPipe(pipe)
1001     sys.stderr = log.DefaultObserver.stderr = AbortPipe(PrefixPipe(pipe, '> '))
1002     if hasattr(signal, "SIGUSR1"):
1003         def sigusr1(signum, frame):
1004             print 'Caught SIGUSR1, closing %r...' % (args.logfile,)
1005             logfile.reopen()
1006             print '...and reopened %r after catching SIGUSR1.' % (args.logfile,)
1007         signal.signal(signal.SIGUSR1, sigusr1)
1008     task.LoopingCall(logfile.reopen).start(5)
1009     
1010     if args.bitcoind_rpc_port is None:
1011         args.bitcoind_rpc_port = net.PARENT.RPC_PORT
1012     
1013     if args.bitcoind_p2p_port is None:
1014         args.bitcoind_p2p_port = net.PARENT.P2P_PORT
1015     
1016     if args.p2pool_port is None:
1017         args.p2pool_port = net.P2P_PORT
1018     
1019     if args.worker_port is None:
1020         args.worker_port = net.WORKER_PORT
1021     
1022     if args.address is not None:
1023         try:
1024             args.pubkey_hash = bitcoin_data.address_to_pubkey_hash(args.address, net.PARENT)
1025         except Exception, e:
1026             parser.error('error parsing address: ' + repr(e))
1027     else:
1028         args.pubkey_hash = None
1029     
1030     if (args.merged_url is None) ^ (args.merged_userpass is None):
1031         parser.error('must specify --merged-url and --merged-userpass')
1032     
1033     reactor.callWhenRunning(main, args, net, datadir_path)
1034     reactor.run()